ParquetReader.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.parquet.reader;
import com.facebook.presto.common.block.ArrayBlock;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.block.IntArrayBlock;
import com.facebook.presto.common.block.LongArrayBlock;
import com.facebook.presto.common.block.RowBlock;
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.common.type.DateTimeEncoding;
import com.facebook.presto.common.type.MapType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeSignatureParameter;
import com.facebook.presto.memory.context.AggregatedMemoryContext;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.parquet.ColumnReader;
import com.facebook.presto.parquet.ColumnReaderFactory;
import com.facebook.presto.parquet.Field;
import com.facebook.presto.parquet.GroupField;
import com.facebook.presto.parquet.ParquetCorruptionException;
import com.facebook.presto.parquet.ParquetDataSource;
import com.facebook.presto.parquet.ParquetResultVerifierUtils;
import com.facebook.presto.parquet.PrimitiveField;
import com.facebook.presto.parquet.RichColumnDescriptor;
import com.facebook.presto.parquet.predicate.Predicate;
import com.facebook.presto.parquet.predicate.TupleDomainParquetPredicate;
import com.facebook.presto.parquet.reader.ColumnIndexFilterUtils.OffsetRange;
import io.airlift.units.DataSize;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import it.unimi.dsi.fastutil.booleans.BooleanList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.crypto.HiddenColumnChunkMetaData;
import org.apache.parquet.crypto.InternalFileDecryptor;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
import org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.PrimitiveColumnIO;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.openjdk.jol.info.ClassLayout;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.StandardTypes.ARRAY;
import static com.facebook.presto.common.type.StandardTypes.MAP;
import static com.facebook.presto.common.type.StandardTypes.ROW;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.parquet.ParquetValidationUtils.validateParquet;
import static com.facebook.presto.parquet.reader.ListColumnReader.calculateCollectionOffsets;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.SizeOf.sizeOf;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
public class ParquetReader
implements Closeable
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(ParquetReader.class).instanceSize();
private static final int MAX_VECTOR_LENGTH = 1024;
private static final int INITIAL_BATCH_SIZE = 1;
private static final int BATCH_SIZE_GROWTH_FACTOR = 2;
private final ColumnReader[] verificationColumnReaders;
private final ParquetDataSource dataSource;
private final Optional<InternalFileDecryptor> fileDecryptor;
private final List<BlockMetaData> blocks;
private final Optional<List<Long>> firstRowsOfBlocks;
private final List<PrimitiveColumnIO> columns;
private final AggregatedMemoryContext systemMemoryContext;
private final LocalMemoryContext parquetReaderMemoryContext;
private final LocalMemoryContext pageReaderMemoryContext;
private final LocalMemoryContext verificationPageReaderMemoryContext;
private final boolean batchReadEnabled;
private final boolean enableVerification;
private final FilterPredicate filter;
private final ColumnReader[] columnReaders;
private final long maxReadBlockBytes;
private final List<ColumnIndexStore> blockIndexStores;
private final List<RowRanges> blockRowRanges;
private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
private final boolean columnIndexFilterEnabled;
private BlockMetaData currentBlockMetadata;
/**
* Index in the Parquet file of the first row of the current group
*/
private Optional<Long> firstRowIndexInGroup = Optional.empty();
private RowRanges currentGroupRowRanges;
private long nextRowInGroup;
private int batchSize;
private int nextBatchSize = INITIAL_BATCH_SIZE;
private final long[] maxBytesPerCell;
private long maxCombinedBytesPerRow;
private int maxBatchSize = MAX_VECTOR_LENGTH;
private int currentBlock;
private long currentPosition;
private long currentGroupRowCount;
public ParquetReader(
MessageColumnIO messageColumnIO,
List<BlockMetaData> blocks,
Optional<List<Long>> firstRowsOfBlocks,
ParquetDataSource dataSource,
AggregatedMemoryContext systemMemoryContext,
DataSize maxReadBlockSize,
boolean batchReadEnabled,
boolean enableVerification,
Predicate parquetPredicate,
List<ColumnIndexStore> blockIndexStores,
boolean columnIndexFilterEnabled,
Optional<InternalFileDecryptor> fileDecryptor)
{
this.blocks = blocks;
this.firstRowsOfBlocks = requireNonNull(firstRowsOfBlocks, "firstRowsOfBlocks is null");
this.dataSource = requireNonNull(dataSource, "dataSource is null");
this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null");
this.parquetReaderMemoryContext = systemMemoryContext.newLocalMemoryContext("ParquetReader");
this.pageReaderMemoryContext = systemMemoryContext.newLocalMemoryContext("PageReader");
this.verificationPageReaderMemoryContext = systemMemoryContext.newLocalMemoryContext("PageReader");
this.maxReadBlockBytes = requireNonNull(maxReadBlockSize, "maxReadBlockSize is null").toBytes();
this.batchReadEnabled = batchReadEnabled;
columns = messageColumnIO.getLeaves();
columnReaders = new ColumnReader[columns.size()];
this.enableVerification = enableVerification;
verificationColumnReaders = enableVerification ? new ColumnReader[columns.size()] : null;
maxBytesPerCell = new long[columns.size()];
this.blockIndexStores = blockIndexStores;
this.blockRowRanges = listWithNulls(this.blocks.size());
firstRowsOfBlocks.ifPresent(firstRows -> {
checkArgument(blocks.size() == firstRows.size(), "elements of firstRowsOfBlocks must correspond to blocks");
});
for (PrimitiveColumnIO column : columns) {
ColumnDescriptor columnDescriptor = column.getColumnDescriptor();
this.paths.put(ColumnPath.get(columnDescriptor.getPath()), columnDescriptor);
}
if (parquetPredicate != null && columnIndexFilterEnabled && parquetPredicate instanceof TupleDomainParquetPredicate) {
this.filter = ((TupleDomainParquetPredicate) parquetPredicate).getParquetUserDefinedPredicate();
}
else {
this.filter = null;
}
this.currentBlock = -1;
this.columnIndexFilterEnabled = columnIndexFilterEnabled;
requireNonNull(fileDecryptor, "fileDecryptor is null");
this.fileDecryptor = fileDecryptor;
}
@Override
public void close()
throws IOException
{
dataSource.close();
parquetReaderMemoryContext.close();
systemMemoryContext.close();
}
public long getPosition()
{
return currentPosition;
}
/**
* Get the global row index of the first row in the last batch.
*/
public long lastBatchStartRow()
{
long baseIndex = firstRowIndexInGroup.orElseThrow(() -> new IllegalStateException("row index unavailable"));
return baseIndex + nextRowInGroup - batchSize;
}
public int nextBatch()
{
if (nextRowInGroup >= currentGroupRowCount && !advanceToNextRowGroup()) {
return -1;
}
batchSize = toIntExact(min(nextBatchSize, maxBatchSize));
nextBatchSize = min(batchSize * BATCH_SIZE_GROWTH_FACTOR, MAX_VECTOR_LENGTH);
batchSize = toIntExact(min(batchSize, currentGroupRowCount - nextRowInGroup));
nextRowInGroup += batchSize;
currentPosition += batchSize;
Arrays.stream(columnReaders)
.forEach(reader -> reader.prepareNextRead(batchSize));
if (enableVerification) {
Arrays.stream(verificationColumnReaders)
.forEach(reader -> reader.prepareNextRead(batchSize));
}
return batchSize;
}
public ParquetDataSource getDataSource()
{
return dataSource;
}
public long getSystemMemoryUsage()
{
return systemMemoryContext.getBytes();
}
private boolean advanceToNextRowGroup()
{
currentBlock++;
if (currentBlock == blocks.size()) {
return false;
}
currentBlockMetadata = blocks.get(currentBlock);
firstRowIndexInGroup = firstRowsOfBlocks.map(firstRows -> firstRows.get(currentBlock));
if (filter != null && columnIndexFilterEnabled) {
ColumnIndexStore columnIndexStore = blockIndexStores.get(currentBlock);
if (columnIndexStore != null) {
currentGroupRowRanges = getRowRanges(currentBlock);
long rowCount = currentGroupRowRanges.rowCount();
if (rowCount == 0) {
return false;
}
}
}
nextRowInGroup = 0L;
currentGroupRowCount = currentBlockMetadata.getRowCount();
initializeColumnReaders();
return true;
}
private ColumnChunk readArray(GroupField field)
throws IOException
{
List<Type> parameters = field.getType().getTypeParameters();
checkArgument(parameters.size() == 1, "Arrays must have a single type parameter, found %d", parameters.size());
Field elementField = field.getChildren().get(0).get();
ColumnChunk columnChunk = readColumnChunk(elementField);
IntList offsets = new IntArrayList();
BooleanList valueIsNull = new BooleanArrayList();
calculateCollectionOffsets(field, offsets, valueIsNull, columnChunk.getDefinitionLevels(), columnChunk.getRepetitionLevels());
Block arrayBlock = ArrayBlock.fromElementBlock(valueIsNull.size(), Optional.of(valueIsNull.toBooleanArray()), offsets.toIntArray(), columnChunk.getBlock());
return new ColumnChunk(arrayBlock, columnChunk.getDefinitionLevels(), columnChunk.getRepetitionLevels());
}
private ColumnChunk readMap(GroupField field)
throws IOException
{
List<Type> parameters = field.getType().getTypeParameters();
checkArgument(parameters.size() == 2, "Maps must have two type parameters, found %d", parameters.size());
Block[] blocks = new Block[parameters.size()];
ColumnChunk columnChunk = readColumnChunk(field.getChildren().get(0).get());
blocks[0] = columnChunk.getBlock();
blocks[1] = readColumnChunk(field.getChildren().get(1).get()).getBlock();
IntList offsets = new IntArrayList();
BooleanList valueIsNull = new BooleanArrayList();
calculateCollectionOffsets(field, offsets, valueIsNull, columnChunk.getDefinitionLevels(), columnChunk.getRepetitionLevels());
Block mapBlock = ((MapType) field.getType()).createBlockFromKeyValue(offsets.size() - 1, Optional.of(valueIsNull.toBooleanArray()), offsets.toIntArray(), blocks[0], blocks[1]);
return new ColumnChunk(mapBlock, columnChunk.getDefinitionLevels(), columnChunk.getRepetitionLevels());
}
private ColumnChunk readStruct(GroupField field)
throws IOException
{
List<TypeSignatureParameter> fields = field.getType().getTypeSignature().getParameters();
Block[] blocks = new Block[fields.size()];
ColumnChunk columnChunk = null;
List<Optional<Field>> parameters = field.getChildren();
for (int i = 0; i < fields.size(); i++) {
Optional<Field> parameter = parameters.get(i);
if (parameter.isPresent()) {
columnChunk = readColumnChunk(parameter.get());
blocks[i] = columnChunk.getBlock();
}
}
for (int i = 0; i < fields.size(); i++) {
if (blocks[i] == null) {
blocks[i] = RunLengthEncodedBlock.create(field.getType(), null, columnChunk.getBlock().getPositionCount());
}
}
BooleanList structIsNull = StructColumnReader.calculateStructOffsets(field, columnChunk.getDefinitionLevels(), columnChunk.getRepetitionLevels());
boolean[] structIsNullVector = structIsNull.toBooleanArray();
Block rowBlock = RowBlock.fromFieldBlocks(structIsNullVector.length, Optional.of(structIsNullVector), blocks);
return new ColumnChunk(rowBlock, columnChunk.getDefinitionLevels(), columnChunk.getRepetitionLevels());
}
private ColumnChunk readPrimitive(PrimitiveField field)
throws IOException
{
ColumnDescriptor columnDescriptor = field.getDescriptor();
int fieldId = field.getId();
ColumnReader columnReader = columnReaders[fieldId];
if (!columnReader.isInitialized()) {
validateParquet(currentBlockMetadata.getRowCount() > 0, "Row group has 0 rows");
ColumnChunkMetaData columnChunkMetaData = getColumnChunkMetaData(columnDescriptor);
long startingPosition = columnChunkMetaData.getStartingPos();
int columnChunkSize = toIntExact(columnChunkMetaData.getTotalSize());
if (shouldUseColumnIndex(columnChunkMetaData.getPath())) {
OffsetIndex offsetIndex = blockIndexStores.get(currentBlock).getOffsetIndex(columnChunkMetaData.getPath());
OffsetIndex filteredOffsetIndex = ColumnIndexFilterUtils.filterOffsetIndex(offsetIndex, currentGroupRowRanges, blocks.get(currentBlock).getRowCount());
List<OffsetRange> offsetRanges = ColumnIndexFilterUtils.calculateOffsetRanges(filteredOffsetIndex, columnChunkMetaData, offsetIndex.getOffset(0), startingPosition);
List<OffsetRange> consecutiveRanges = concatRanges(offsetRanges);
int consecutiveRangesSize = consecutiveRanges.stream().mapToInt(range -> (int) range.getLength()).sum();
PageReader pageReader = createPageReader(
dataSourceAsInputStream(startingPosition, consecutiveRanges),
consecutiveRangesSize,
columnChunkMetaData,
columnDescriptor,
Optional.of(filteredOffsetIndex),
pageReaderMemoryContext);
columnReader.init(pageReader, field, currentGroupRowRanges);
if (enableVerification) {
ColumnReader verificationColumnReader = verificationColumnReaders[field.getId()];
PageReader pageReaderVerification = createPageReader(
dataSourceAsInputStream(startingPosition, consecutiveRanges),
consecutiveRangesSize,
columnChunkMetaData,
columnDescriptor,
Optional.of(filteredOffsetIndex),
verificationPageReaderMemoryContext);
verificationColumnReader.init(pageReaderVerification, field, currentGroupRowRanges);
}
}
else {
PageReader pageReader = createPageReader(
dataSourceAsInputStream(startingPosition, columnChunkSize),
columnChunkSize,
columnChunkMetaData,
columnDescriptor,
Optional.empty(),
pageReaderMemoryContext);
columnReader.init(pageReader, field, null);
if (enableVerification) {
ColumnReader verificationColumnReader = verificationColumnReaders[field.getId()];
PageReader pageReaderVerification = createPageReader(
dataSourceAsInputStream(startingPosition, columnChunkSize),
columnChunkSize,
columnChunkMetaData,
columnDescriptor,
Optional.empty(),
verificationPageReaderMemoryContext);
verificationColumnReader.init(pageReaderVerification, field, null);
}
}
}
ColumnChunk columnChunk = columnReader.readNext();
columnChunk = typeCoercion(columnChunk, field.getDescriptor().getPrimitiveType().getPrimitiveTypeName(), field.getType());
if (enableVerification) {
ColumnReader verificationColumnReader = verificationColumnReaders[field.getId()];
ColumnChunk expected = verificationColumnReader.readNext();
ParquetResultVerifierUtils.verifyColumnChunks(columnChunk, expected, columnDescriptor.getPath().length > 1, field, dataSource.getId());
}
// update max size per primitive column chunk
long bytesPerCell = columnChunk.getBlock().getSizeInBytes() / batchSize;
if (maxBytesPerCell[fieldId] < bytesPerCell) {
// update batch size
maxCombinedBytesPerRow = maxCombinedBytesPerRow - maxBytesPerCell[fieldId] + bytesPerCell;
maxBatchSize = toIntExact(min(maxBatchSize, max(1, maxReadBlockBytes / maxCombinedBytesPerRow)));
maxBytesPerCell[fieldId] = bytesPerCell;
}
return columnChunk;
}
private InputStream dataSourceAsInputStream(long startingPosition, List<OffsetRange> offsetRanges)
{
List<InputStream> inputStreams = new ArrayList<>();
for (OffsetRange r : offsetRanges) {
InputStream inputStream = dataSourceAsInputStream(startingPosition + r.getOffset(), r.getLength());
inputStreams.add(inputStream);
}
return new SequenceInputStream(Collections.enumeration(inputStreams));
}
private InputStream dataSourceAsInputStream(long startingPosition, long totalSize)
{
InputStream dataSourceAsStream = new InputStream()
{
private long readBytes;
private long currentPosition = startingPosition;
@Override
public int read()
{
byte[] buffer = new byte[1];
read(buffer, 0, 1);
return buffer[0];
}
@Override
public int read(byte[] buffer, int offset, int len)
{
if (readBytes >= totalSize) {
return 0;
}
//Read upto totalSize bytes
len = (int) Math.min(len, totalSize - readBytes);
dataSource.readFully(currentPosition, buffer, offset, len);
//Update references
currentPosition += len;
readBytes += len;
return len;
}
@Override
public int available()
{
return (int) (totalSize - readBytes);
}
};
return dataSourceAsStream;
}
private boolean shouldUseColumnIndex(ColumnPath path)
{
return filter != null &&
columnIndexFilterEnabled &&
currentGroupRowRanges != null &&
currentGroupRowRanges.rowCount() < currentGroupRowCount &&
blockIndexStores.get(currentBlock) != null &&
blockIndexStores.get(currentBlock).getColumnIndex(path) != null;
}
private PageReader createPageReader(
InputStream inputStream,
int columnChunkSize,
ColumnChunkMetaData columnChunkMetaData,
ColumnDescriptor columnDescriptor,
Optional<OffsetIndex> offsetIndex,
LocalMemoryContext memoryContext)
throws IOException
{
ColumnChunkDescriptor descriptor = new ColumnChunkDescriptor(columnDescriptor, columnChunkMetaData, columnChunkSize);
//We use the value of the maxReadBlockBytes to set the max size of the data source buffer
ParquetColumnChunk columnChunk = new ParquetColumnChunk(
descriptor,
new ParquetColumnChunk.ColumnChunkBufferedInputStream(requireNonNull(inputStream), min(columnChunkSize, (int) maxReadBlockBytes)),
offsetIndex,
memoryContext);
return createPageReaderInternal(columnDescriptor, columnChunk, memoryContext);
}
private PageReader createPageReaderInternal(ColumnDescriptor columnDescriptor, ParquetColumnChunk columnChunk, LocalMemoryContext memoryContext)
throws IOException
{
if (!isEncryptedColumn(fileDecryptor, columnDescriptor)) {
return columnChunk.buildPageReader(Optional.empty(), -1, -1);
}
int columnOrdinal = fileDecryptor.get().getColumnSetup(ColumnPath.get(columnChunk.getDescriptor().getColumnDescriptor().getPath())).getOrdinal();
return columnChunk.buildPageReader(fileDecryptor, currentBlock, columnOrdinal);
}
private boolean isEncryptedColumn(Optional<InternalFileDecryptor> fileDecryptor, ColumnDescriptor columnDescriptor)
{
ColumnPath columnPath = ColumnPath.get(columnDescriptor.getPath());
return fileDecryptor.isPresent() && !fileDecryptor.get().plaintextFile() && fileDecryptor.get().getColumnSetup(columnPath).isEncrypted();
}
private ColumnChunkMetaData getColumnChunkMetaData(ColumnDescriptor columnDescriptor)
throws IOException
{
for (ColumnChunkMetaData metadata : currentBlockMetadata.getColumns()) {
if (!HiddenColumnChunkMetaData.isHiddenColumn(metadata) && metadata.getPath().equals(ColumnPath.get(columnDescriptor.getPath()))) {
return metadata;
}
}
throw new ParquetCorruptionException("Metadata is missing for column: %s", columnDescriptor);
}
private void initializeColumnReaders()
{
for (PrimitiveColumnIO columnIO : columns) {
RichColumnDescriptor column = new RichColumnDescriptor(columnIO.getColumnDescriptor(), columnIO.getType().asPrimitiveType());
columnReaders[columnIO.getId()] = ColumnReaderFactory.createReader(column, batchReadEnabled);
if (enableVerification) {
verificationColumnReaders[columnIO.getId()] = ColumnReaderFactory.createReader(column, false);
}
}
}
public Block readBlock(Field field)
throws IOException
{
return readColumnChunk(field).getBlock();
}
private ColumnChunk readColumnChunk(Field field)
throws IOException
{
ColumnChunk columnChunk;
if (ROW.equals(field.getType().getTypeSignature().getBase())) {
columnChunk = readStruct((GroupField) field);
}
else if (MAP.equals(field.getType().getTypeSignature().getBase())) {
columnChunk = readMap((GroupField) field);
}
else if (ARRAY.equals(field.getType().getTypeSignature().getBase())) {
columnChunk = readArray((GroupField) field);
}
else {
columnChunk = readPrimitive((PrimitiveField) field);
}
parquetReaderMemoryContext.setBytes(getRetainedSizeInBytes());
return columnChunk;
}
private long getRetainedSizeInBytes()
{
long sizeInBytes = INSTANCE_SIZE;
for (int i = 0; i < columnReaders.length; i++) {
sizeInBytes += columnReaders[i] == null ? 0 : columnReaders[i].getRetainedSizeInBytes();
if (verificationColumnReaders != null) {
sizeInBytes += verificationColumnReaders[i] == null ? 0 : verificationColumnReaders[i].getRetainedSizeInBytes();
}
}
sizeInBytes += sizeOf(maxBytesPerCell);
return sizeInBytes;
}
/**
* The reader returns the Block type (LongArrayBlock or IntArrayBlock) based on the physical type of the column in Parquet file, but
* operators after scan expect output block type based on the metadata defined in Hive table. This causes issues as the operators call
* methods that are based on the expected output block type which the returned block type may not have implemented.
* To overcome this issue rewrite the block.
*/
private static ColumnChunk typeCoercion(ColumnChunk columnChunk, PrimitiveTypeName physicalDataType, Type outputType)
{
Block newBlock = null;
if (SMALLINT.equals(outputType) || TINYINT.equals(outputType)) {
if (columnChunk.getBlock() instanceof IntArrayBlock) {
newBlock = rewriteIntegerArrayBlock((IntArrayBlock) columnChunk.getBlock(), outputType);
}
else if (columnChunk.getBlock() instanceof LongArrayBlock) {
newBlock = rewriteLongArrayBlock((LongArrayBlock) columnChunk.getBlock(), outputType);
}
}
else if (INTEGER.equals(outputType) && physicalDataType == PrimitiveTypeName.INT64) {
if (columnChunk.getBlock() instanceof LongArrayBlock) {
newBlock = rewriteLongArrayBlock((LongArrayBlock) columnChunk.getBlock(), outputType);
}
}
else if (BIGINT.equals(outputType) && physicalDataType == PrimitiveTypeName.INT32) {
if (columnChunk.getBlock() instanceof IntArrayBlock) {
newBlock = rewriteIntegerArrayBlock((IntArrayBlock) columnChunk.getBlock(), outputType);
}
}
else if (TIMESTAMP_WITH_TIME_ZONE.equals(outputType) && physicalDataType == PrimitiveTypeName.INT96) {
if (columnChunk.getBlock() instanceof LongArrayBlock) {
newBlock = rewriteTimeLongArrayBlock((LongArrayBlock) columnChunk.getBlock(), outputType);
}
}
if (newBlock != null) {
return new ColumnChunk(newBlock, columnChunk.getDefinitionLevels(), columnChunk.getRepetitionLevels());
}
return columnChunk;
}
private static Block rewriteIntegerArrayBlock(IntArrayBlock intArrayBlock, Type targetType)
{
int positionCount = intArrayBlock.getPositionCount();
BlockBuilder newBlockBuilder = targetType.createBlockBuilder(null, positionCount);
for (int position = 0; position < positionCount; position++) {
if (intArrayBlock.isNull(position)) {
newBlockBuilder.appendNull();
}
else {
targetType.writeLong(newBlockBuilder, intArrayBlock.getInt(position));
}
}
return newBlockBuilder.build();
}
private static Block rewriteLongArrayBlock(LongArrayBlock longArrayBlock, Type targetType)
{
int positionCount = longArrayBlock.getPositionCount();
BlockBuilder newBlockBuilder = targetType.createBlockBuilder(null, positionCount);
for (int position = 0; position < positionCount; position++) {
if (longArrayBlock.isNull(position)) {
newBlockBuilder.appendNull();
}
else {
targetType.writeLong(newBlockBuilder, longArrayBlock.getLong(position));
}
}
return newBlockBuilder.build();
}
private static Block rewriteTimeLongArrayBlock(LongArrayBlock longArrayBlock, Type targetType)
{
int positionCount = longArrayBlock.getPositionCount();
BlockBuilder newBlockBuilder = targetType.createBlockBuilder(null, positionCount);
for (int position = 0; position < positionCount; position++) {
if (longArrayBlock.isNull(position)) {
newBlockBuilder.appendNull();
}
else {
// We shift the bits to encode timezone information
targetType.writeLong(newBlockBuilder, DateTimeEncoding.packDateTimeWithZone(longArrayBlock.getLong(position), UTC_KEY));
}
}
return newBlockBuilder.build();
}
private static <T> List<T> listWithNulls(int size)
{
return Stream.generate(() -> (T) null).limit(size).collect(Collectors.toCollection(ArrayList<T>::new));
}
private RowRanges getRowRanges(int blockIndex)
{
assert filter != null;
RowRanges rowRanges = blockRowRanges.get(blockIndex);
if (rowRanges == null) {
rowRanges = ColumnIndexFilter.calculateRowRanges(FilterCompat.get(filter), blockIndexStores.get(blockIndex),
paths.keySet(), blocks.get(blockIndex).getRowCount());
blockRowRanges.set(blockIndex, rowRanges);
}
return rowRanges;
}
private List<OffsetRange> concatRanges(List<OffsetRange> offsetRanges)
{
List<OffsetRange> pageRanges = new ArrayList<>();
OffsetRange currentParts = null;
for (OffsetRange range : offsetRanges) {
long startPosition = range.getOffset();
// first part or not consecutive => new list
if (currentParts == null || currentParts.endPos() != startPosition) {
currentParts = new OffsetRange(startPosition, 0);
}
pageRanges.add(currentParts);
currentParts.extendLength(range.getLength());
}
return pageRanges;
}
}