ColumnReaderFactory.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.parquet;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.parquet.batchreader.BinaryFlatBatchReader;
import com.facebook.presto.parquet.batchreader.BinaryNestedBatchReader;
import com.facebook.presto.parquet.batchreader.BooleanFlatBatchReader;
import com.facebook.presto.parquet.batchreader.BooleanNestedBatchReader;
import com.facebook.presto.parquet.batchreader.Int32FlatBatchReader;
import com.facebook.presto.parquet.batchreader.Int32NestedBatchReader;
import com.facebook.presto.parquet.batchreader.Int64FlatBatchReader;
import com.facebook.presto.parquet.batchreader.Int64NestedBatchReader;
import com.facebook.presto.parquet.batchreader.Int64TimeAndTimestampMicrosFlatBatchReader;
import com.facebook.presto.parquet.batchreader.Int64TimeAndTimestampMicrosNestedBatchReader;
import com.facebook.presto.parquet.batchreader.LongDecimalFlatBatchReader;
import com.facebook.presto.parquet.batchreader.ShortDecimalFlatBatchReader;
import com.facebook.presto.parquet.batchreader.TimestampFlatBatchReader;
import com.facebook.presto.parquet.batchreader.TimestampNestedBatchReader;
import com.facebook.presto.parquet.batchreader.UuidFlatBatchReader;
import com.facebook.presto.parquet.reader.AbstractColumnReader;
import com.facebook.presto.parquet.reader.BinaryColumnReader;
import com.facebook.presto.parquet.reader.BooleanColumnReader;
import com.facebook.presto.parquet.reader.DoubleColumnReader;
import com.facebook.presto.parquet.reader.FloatColumnReader;
import com.facebook.presto.parquet.reader.IntColumnReader;
import com.facebook.presto.parquet.reader.LongColumnReader;
import com.facebook.presto.parquet.reader.LongDecimalColumnReader;
import com.facebook.presto.parquet.reader.LongTimeMicrosColumnReader;
import com.facebook.presto.parquet.reader.LongTimestampMicrosColumnReader;
import com.facebook.presto.parquet.reader.ShortDecimalColumnReader;
import com.facebook.presto.parquet.reader.TimestampColumnReader;
import com.facebook.presto.spi.PrestoException;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import java.util.Optional;
import static com.facebook.presto.parquet.ParquetTypeUtils.isDecimalType;
import static com.facebook.presto.parquet.ParquetTypeUtils.isShortDecimalType;
import static com.facebook.presto.parquet.ParquetTypeUtils.isTimeMicrosType;
import static com.facebook.presto.parquet.ParquetTypeUtils.isTimeStampMicrosType;
import static com.facebook.presto.parquet.ParquetTypeUtils.isUuidType;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
public class ColumnReaderFactory
{
private static final Logger log = Logger.get(ColumnReaderFactory.class);
private ColumnReaderFactory()
{
}
public static ColumnReader createReader(RichColumnDescriptor descriptor, boolean batchReadEnabled)
{
final boolean isNested = descriptor.getPath().length > 1;
if (batchReadEnabled && (!(isNested && isDecimalType(descriptor)))) {
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case BOOLEAN:
return isNested ? new BooleanNestedBatchReader(descriptor) : new BooleanFlatBatchReader(descriptor);
case INT32:
if (!isNested && isShortDecimalType(descriptor)) {
return new ShortDecimalFlatBatchReader(descriptor);
}
case FLOAT:
return isNested ? new Int32NestedBatchReader(descriptor) : new Int32FlatBatchReader(descriptor);
case INT64:
if (isTimeStampMicrosType(descriptor) || isTimeMicrosType(descriptor)) {
return isNested ? new Int64TimeAndTimestampMicrosNestedBatchReader(descriptor) : new Int64TimeAndTimestampMicrosFlatBatchReader(descriptor);
}
if (!isNested && isShortDecimalType(descriptor)) {
int precision = ((DecimalLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation()).getPrecision();
if (precision < 10) {
log.warn("PrimitiveTypeName is INT64 but precision is less then 10.");
}
return new ShortDecimalFlatBatchReader(descriptor);
}
case DOUBLE:
return isNested ? new Int64NestedBatchReader(descriptor) : new Int64FlatBatchReader(descriptor);
case INT96:
return isNested ? new TimestampNestedBatchReader(descriptor) : new TimestampFlatBatchReader(descriptor);
case BINARY:
Optional<ColumnReader> decimalBatchColumnReader = createDecimalBatchColumnReader(descriptor);
if (decimalBatchColumnReader.isPresent()) {
return decimalBatchColumnReader.get();
}
return isNested ? new BinaryNestedBatchReader(descriptor) : new BinaryFlatBatchReader(descriptor);
case FIXED_LEN_BYTE_ARRAY:
if (!isNested) {
if (isUuidType(descriptor)) {
return new UuidFlatBatchReader(descriptor);
}
decimalBatchColumnReader = createDecimalBatchColumnReader(descriptor);
if (decimalBatchColumnReader.isPresent()) {
return decimalBatchColumnReader.get();
}
}
}
}
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case BOOLEAN:
return new BooleanColumnReader(descriptor);
case INT32:
return createDecimalColumnReader(descriptor).orElseGet(() -> new IntColumnReader(descriptor));
case INT64:
if (isTimeStampMicrosType(descriptor)) {
return new LongTimestampMicrosColumnReader(descriptor);
}
if (isTimeMicrosType(descriptor)) {
return new LongTimeMicrosColumnReader(descriptor);
}
return createDecimalColumnReader(descriptor).orElseGet(() -> new LongColumnReader(descriptor));
case INT96:
return new TimestampColumnReader(descriptor);
case FLOAT:
return new FloatColumnReader(descriptor);
case DOUBLE:
return new DoubleColumnReader(descriptor);
case BINARY:
return createDecimalColumnReader(descriptor).orElseGet(() -> new BinaryColumnReader(descriptor));
case FIXED_LEN_BYTE_ARRAY:
if (isUuidType(descriptor)) {
return new BinaryColumnReader(descriptor);
}
return createDecimalColumnReader(descriptor)
.orElseThrow(() -> new PrestoException(NOT_SUPPORTED, " type FIXED_LEN_BYTE_ARRAY supported as DECIMAL; got " + descriptor.getPrimitiveType().getOriginalType()));
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported parquet type: " + descriptor.getPrimitiveType().getPrimitiveTypeName());
}
}
private static Optional<ColumnReader> createDecimalBatchColumnReader(RichColumnDescriptor descriptor)
{
if (isDecimalType(descriptor)) {
if (isShortDecimalType(descriptor)) {
return Optional.of(new ShortDecimalFlatBatchReader(descriptor));
}
return Optional.of(new LongDecimalFlatBatchReader(descriptor));
}
return Optional.empty();
}
private static Optional<AbstractColumnReader> createDecimalColumnReader(RichColumnDescriptor descriptor)
{
if (isDecimalType(descriptor)) {
if (isShortDecimalType(descriptor)) {
return Optional.of(new ShortDecimalColumnReader(descriptor));
}
return Optional.of(new LongDecimalColumnReader(descriptor));
}
return Optional.empty();
}
}