TupleDomainParquetPredicate.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.parquet.predicate;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.Range;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.parquet.DictionaryPage;
import com.facebook.presto.parquet.ParquetDataSourceId;
import com.facebook.presto.parquet.RichColumnDescriptor;
import com.facebook.presto.parquet.dictionary.Dictionary;
import com.facebook.presto.spi.PrestoWarning;
import com.facebook.presto.spi.WarningCollector;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.facebook.presto.parquet.ParquetWarningCode.PARQUET_FILE_STATISTICS_CORRUPTION;
import static com.facebook.presto.parquet.predicate.PredicateUtils.isStatisticsOverflow;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.String.format;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
public class TupleDomainParquetPredicate
implements Predicate
{
private final TupleDomain<ColumnDescriptor> effectivePredicate;
private final List<RichColumnDescriptor> columns;
private final ColumnIndexValueConverter converter;
public TupleDomainParquetPredicate(TupleDomain<ColumnDescriptor> effectivePredicate, List<RichColumnDescriptor> columns)
{
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.converter = new ColumnIndexValueConverter(columns);
}
private static boolean effectivePredicateMatches(Domain effectivePredicateDomain, DictionaryDescriptor dictionary)
{
return !effectivePredicateDomain.intersect(getDomain(effectivePredicateDomain.getType(), dictionary)).isNone();
}
@VisibleForTesting
public static Domain getDomain(
ColumnDescriptor column,
Type type,
long rowCount,
Statistics<?> statistics,
ParquetDataSourceId id,
Optional<WarningCollector> warningCollector)
{
if (statistics == null || statistics.isEmpty()) {
return Domain.all(type);
}
if (statistics.isNumNullsSet() && statistics.getNumNulls() == rowCount) {
return Domain.onlyNull(type);
}
boolean hasNullValue = !statistics.isNumNullsSet() || statistics.getNumNulls() != 0L;
if (!statistics.hasNonNullValue() || statistics.genericGetMin() == null || statistics.genericGetMax() == null) {
return Domain.create(ValueSet.all(type), hasNullValue);
}
try {
return getDomain(
column,
type,
ImmutableList.of(statistics.genericGetMin()),
ImmutableList.of(statistics.genericGetMax()),
hasNullValue);
}
catch (Exception exception) {
if (warningCollector.isPresent()) {
warningCollector.get().add(new PrestoWarning(PARQUET_FILE_STATISTICS_CORRUPTION, format("Corrupted statistics for column \"%s\" in Parquet file \"%s\": [%s]", column.toString(), id, statistics)));
}
return Domain.create(ValueSet.all(type), hasNullValue);
}
}
/**
* Get a domain for the ranges defined by each pair of elements from {@code minimums} and {@code maximums}.
* Both arrays must have the same length.
*/
private static Domain getDomain(
ColumnDescriptor column,
Type type,
List<Object> minimums,
List<Object> maximums,
boolean hasNullValue)
{
checkArgument(minimums.size() == maximums.size(), "Expected minimums and maximums to have the same size");
List<Range> ranges = new ArrayList<>();
if (type.equals(BOOLEAN)) {
boolean hasTrueValues = minimums.stream().anyMatch(value -> (boolean) value) || maximums.stream().anyMatch(value -> (boolean) value);
boolean hasFalseValues = minimums.stream().anyMatch(value -> !(boolean) value) || maximums.stream().anyMatch(value -> !(boolean) value);
if (hasTrueValues && hasFalseValues) {
return Domain.all(type);
}
if (hasTrueValues) {
return Domain.create(ValueSet.of(type, true), hasNullValue);
}
if (hasFalseValues) {
return Domain.create(ValueSet.of(type, false), hasNullValue);
}
// All nulls case is handled earlier
throw new VerifyException("Impossible boolean statistics");
}
if ((type.equals(BIGINT) || type.equals(TINYINT) || type.equals(SMALLINT) || type.equals(INTEGER))) {
for (int i = 0; i < minimums.size(); i++) {
long min = asLong(minimums.get(i));
long max = asLong(maximums.get(i));
if (isStatisticsOverflow(type, min, max)) {
return Domain.create(ValueSet.all(type), hasNullValue);
}
ranges.add(Range.range(type, min, true, max, true));
}
checkArgument(!ranges.isEmpty(), "cannot use empty ranges");
return Domain.create(ValueSet.ofRanges(ranges), hasNullValue);
}
if (type.equals(REAL)) {
for (int i = 0; i < minimums.size(); i++) {
Float min = (Float) minimums.get(i);
Float max = (Float) maximums.get(i);
if (min.isNaN() || max.isNaN()) {
return Domain.create(ValueSet.all(type), hasNullValue);
}
ranges.add(Range.range(type, (long) floatToRawIntBits(min), true, (long) floatToRawIntBits(max), true));
}
checkArgument(!ranges.isEmpty(), "cannot use empty ranges");
return Domain.create(ValueSet.ofRanges(ranges), hasNullValue);
}
if (type.equals(DOUBLE)) {
for (int i = 0; i < minimums.size(); i++) {
Double min = (Double) minimums.get(i);
Double max = (Double) maximums.get(i);
if (min.isNaN() || max.isNaN()) {
return Domain.create(ValueSet.all(type), hasNullValue);
}
ranges.add(Range.range(type, min, true, max, true));
}
checkArgument(!ranges.isEmpty(), "cannot use empty ranges");
return Domain.create(ValueSet.ofRanges(ranges), hasNullValue);
}
if (isVarcharType(type)) {
for (int i = 0; i < minimums.size(); i++) {
Slice min = Slices.wrappedBuffer(((Binary) minimums.get(i)).toByteBuffer());
Slice max = Slices.wrappedBuffer(((Binary) maximums.get(i)).toByteBuffer());
ranges.add(Range.range(type, min, true, max, true));
}
checkArgument(!ranges.isEmpty(), "cannot use empty ranges");
return Domain.create(ValueSet.ofRanges(ranges), hasNullValue);
}
if (type.equals(DATE)) {
for (int i = 0; i < minimums.size(); i++) {
long min = asLong(minimums.get(i));
long max = asLong(maximums.get(i));
if (isStatisticsOverflow(type, min, max)) {
return Domain.create(ValueSet.all(type), hasNullValue);
}
ranges.add(Range.range(type, min, true, max, true));
}
checkArgument(!ranges.isEmpty(), "cannot use empty ranges");
return Domain.create(ValueSet.ofRanges(ranges), hasNullValue);
}
return Domain.create(ValueSet.all(type), hasNullValue);
}
@VisibleForTesting
public static Domain getDomain(Type type, DictionaryDescriptor dictionaryDescriptor)
{
if (dictionaryDescriptor == null) {
return Domain.all(type);
}
ColumnDescriptor columnDescriptor = dictionaryDescriptor.getColumnDescriptor();
Optional<DictionaryPage> dictionaryPage = dictionaryDescriptor.getDictionaryPage();
if (!dictionaryPage.isPresent()) {
return Domain.all(type);
}
Dictionary dictionary;
try {
dictionary = dictionaryPage.get().getEncoding().initDictionary(columnDescriptor, dictionaryPage.get());
}
catch (Exception e) {
// In case of exception, just continue reading the data, not using dictionary page at all
// OK to ignore exception when reading dictionaries
return Domain.all(type);
}
int dictionarySize = dictionaryPage.get().getDictionarySize();
DictionaryValueConverter converter = new DictionaryValueConverter(dictionary);
Function<Integer, Object> convertFunction = converter.getConverter(columnDescriptor.getPrimitiveType());
List<Object> values = new ArrayList<>();
for (int i = 0; i < dictionarySize; i++) {
values.add(convertFunction.apply(i));
}
// TODO: when min == max (i.e., singleton ranges, the construction of Domains can be done more efficiently
return getDomain(columnDescriptor, type, values, values, true);
}
public static long asLong(Object value)
{
if (value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long) {
return ((Number) value).longValue();
}
throw new IllegalArgumentException("Can't convert value to long: " + value.getClass().getName());
}
private static <T extends Comparable<T>> Domain createDomain(Type type, ColumnIndex columnIndex, boolean hasNullValue, List<T> mins, List<T> maxs)
{
if (mins.isEmpty() || maxs.isEmpty() || mins.size() != maxs.size()) {
return Domain.create(ValueSet.all(type), hasNullValue);
}
int pageCount = columnIndex.getMinValues().size();
List<Range> ranges = new ArrayList<>();
for (int i = 0; i < pageCount; i++) {
T min = mins.get(i);
T max = maxs.get(i);
if (min.compareTo(max) > 0) {
return Domain.create(ValueSet.all(type), hasNullValue);
}
if (min instanceof Long) {
if (isStatisticsOverflow(type, asLong(min), asLong(max))) {
return Domain.create(ValueSet.all(type), hasNullValue);
}
ranges.add(Range.range(type, min, true, max, true));
}
else if (min instanceof Double) {
if (((Double) min).isNaN() || ((Double) max).isNaN()) {
return Domain.create(ValueSet.all(type), hasNullValue);
}
ranges.add(Range.range(type, min, true, max, true));
}
else if (min instanceof Slice) {
ranges.add(Range.range(type, min, true, max, true));
}
}
checkArgument(!ranges.isEmpty(), "cannot use empty ranges");
return Domain.create(ValueSet.ofRanges(ranges), hasNullValue);
}
@Override
public boolean matches(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> statistics, ParquetDataSourceId id, Optional<WarningCollector> warningCollector)
{
if (numberOfRows == 0) {
return false;
}
if (effectivePredicate.isNone()) {
return false;
}
Map<ColumnDescriptor, Domain> effectivePredicateDomains = effectivePredicate.getDomains()
.orElseThrow(() -> new IllegalStateException("Effective predicate other than none should have domains"));
for (RichColumnDescriptor column : columns) {
Domain effectivePredicateDomain = effectivePredicateDomains.get(column);
if (effectivePredicateDomain == null) {
continue;
}
Statistics<?> columnStatistics = statistics.get(column);
if (columnStatistics != null && !columnStatistics.isEmpty()) {
Domain domain = getDomain(
column,
effectivePredicateDomain.getType(),
numberOfRows,
columnStatistics,
id,
warningCollector);
if (effectivePredicateDomain.intersect(domain).isNone()) {
return false;
}
}
}
return true;
}
@Override
public boolean matches(DictionaryDescriptor dictionary)
{
requireNonNull(dictionary, "dictionary is null");
if (effectivePredicate.isNone()) {
return false;
}
Map<ColumnDescriptor, Domain> effectivePredicateDomains = effectivePredicate.getDomains()
.orElseThrow(() -> new IllegalStateException("Effective predicate other than none should have domains"));
Domain effectivePredicateDomain = effectivePredicateDomains.get(dictionary.getColumnDescriptor());
return effectivePredicateDomain == null || effectivePredicateMatches(effectivePredicateDomain, dictionary);
}
@Override
public boolean matches(long numberOfRows, Optional<ColumnIndexStore> columnIndexStore)
{
if (numberOfRows == 0 || !columnIndexStore.isPresent()) {
return false;
}
if (effectivePredicate.isNone()) {
return false;
}
Map<ColumnDescriptor, Domain> effectivePredicateDomains = effectivePredicate.getDomains()
.orElseThrow(() -> new IllegalStateException("Effective predicate other than none should have domains"));
for (RichColumnDescriptor column : columns) {
Domain effectivePredicateDomain = effectivePredicateDomains.get(column);
if (effectivePredicateDomain == null) {
continue;
}
if (columnIndexStore.isPresent()) {
ColumnIndex columnIndex = columnIndexStore.get().getColumnIndex(ColumnPath.get(column.getPath()));
if (columnIndex == null || columnIndex.getMinValues().isEmpty() || columnIndex.getMaxValues().isEmpty() || columnIndex.getMinValues().size() != columnIndex.getMaxValues().size()) {
continue;
}
else {
Domain domain = getDomain(effectivePredicateDomain.getType(), numberOfRows, columnIndex, column);
if (effectivePredicateDomain.intersect(domain).isNone()) {
return false;
}
}
}
}
return true;
}
@VisibleForTesting
public Domain getDomain(Type type, long rowCount, ColumnIndex columnIndex, RichColumnDescriptor descriptor)
{
if (columnIndex == null) {
return Domain.all(type);
}
String columnName = descriptor.getPrimitiveType().getName();
if (isCorruptedColumnIndex(columnIndex)) {
return Domain.all(type);
}
if (isEmptyColumnIndex(columnIndex)) {
return Domain.all(type);
}
long totalNullCount = columnIndex.getNullCounts().stream().reduce(0L, (a, b) -> a + b);
if (totalNullCount == rowCount) {
return Domain.onlyNull(type);
}
boolean hasNullValue = totalNullCount > 0;
if (descriptor.getType().equals(PrimitiveTypeName.BOOLEAN)) {
// After row-group filtering for boolean, page filtering shouldn't do more
return Domain.all(type);
}
if (descriptor.getType().equals(INT32) || descriptor.getType().equals(INT64) || descriptor.getType().equals(FLOAT)) {
List<Long> mins = converter.getMinValuesAsLong(type, columnIndex, columnName);
List<Long> maxs = converter.getMaxValuesAsLong(type, columnIndex, columnName);
return createDomain(type, columnIndex, hasNullValue, mins, maxs);
}
if (descriptor.getType().equals(PrimitiveTypeName.DOUBLE)) {
List<Double> mins = converter.getMinValuesAsDouble(type, columnIndex, columnName);
List<Double> maxs = converter.getMaxValuesAsDouble(type, columnIndex, columnName);
return createDomain(type, columnIndex, hasNullValue, mins, maxs);
}
if (descriptor.getType().equals(BINARY)) {
List<Slice> mins = converter.getMinValuesAsSlice(type, columnIndex);
List<Slice> maxs = converter.getMaxValuesAsSlice(type, columnIndex);
return createDomain(type, columnIndex, hasNullValue, mins, maxs);
}
//TODO: Add INT96 and FIXED_LEN_BYTE_ARRAY later
return Domain.create(ValueSet.all(type), hasNullValue);
}
private boolean isCorruptedColumnIndex(ColumnIndex columnIndex)
{
if (columnIndex.getMaxValues() == null || columnIndex.getMinValues() == null ||
columnIndex.getNullCounts() == null || columnIndex.getNullPages() == null) {
return true;
}
if (columnIndex.getMaxValues().size() != columnIndex.getMinValues().size() ||
columnIndex.getMaxValues().size() != columnIndex.getNullPages().size() ||
columnIndex.getMaxValues().size() != columnIndex.getNullCounts().size()) {
return true;
}
return false;
}
// Caller should verify isCorruptedColumnIndex is false first
private boolean isEmptyColumnIndex(ColumnIndex columnIndex)
{
return columnIndex.getMaxValues().isEmpty();
}
public FilterPredicate getParquetUserDefinedPredicate()
{
FilterPredicate filter = null;
// It could be a Presto bug that we don't see effectivePredicate.getDomains().get() has more than 1 domain.
// For example, the 'where c1=3 or c1=10002' clause should have two domains but it has none
// we assume the relation cross domains are 'or'
for (RichColumnDescriptor column : columns) {
Domain domain = effectivePredicate.getDomains().get().get(column);
if (domain == null || domain.isNone()) {
continue;
}
if (domain.isAll()) {
continue;
}
FilterPredicate columnFilter = FilterApi.userDefined(FilterApi.intColumn(ColumnPath.get(column.getPath()).toDotString()), new ParquetUserDefinedPredicateTupleDomain(domain));
if (filter == null) {
filter = columnFilter;
}
else {
filter = FilterApi.or(filter, columnFilter);
}
}
return filter;
}
private static class DictionaryValueConverter
{
private final Dictionary dictionary;
private DictionaryValueConverter(Dictionary dictionary)
{
this.dictionary = dictionary;
}
private Function<Integer, Object> getConverter(PrimitiveType primitiveType)
{
switch (primitiveType.getPrimitiveTypeName()) {
case INT32:
return (i) -> dictionary.decodeToInt(i);
case INT64:
return (i) -> dictionary.decodeToLong(i);
case FLOAT:
return (i) -> dictionary.decodeToFloat(i);
case DOUBLE:
return (i) -> dictionary.decodeToDouble(i);
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
case INT96:
return (i) -> dictionary.decodeToBinary(i);
default:
throw new IllegalArgumentException("Unsupported Parquet primitive type: " + primitiveType.getPrimitiveTypeName());
}
}
}
/**
* This class implements methods defined in UserDefinedPredicate based on the page statistic and tuple domain(for a column).
*/
static class ParquetUserDefinedPredicateTupleDomain<T extends Comparable<T>>
extends UserDefinedPredicate<T>
implements Serializable
{
private Domain columnDomain;
ParquetUserDefinedPredicateTupleDomain(Domain domain)
{
this.columnDomain = domain;
}
@Override
public boolean keep(T value)
{
if (value == null && !columnDomain.isNullAllowed()) {
return false;
}
return true;
}
@Override
public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<T> statistic)
{
if (statistic == null) {
return false;
}
List<Range> ranges = new ArrayList<>();
ranges.add(getRange(columnDomain.getType(), statistic.getMin(), statistic.getMax()));
return canDropWithRangeStatistics(ranges);
}
@Override
public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<T> statistics)
{
// !canDrop() cannot be used because it might not be correct. To be safe, we just keep the record by returning false.
// Since we don't use LogicalNotUserDefined, this method is not called.
return false;
}
private boolean canDropWithRangeStatistics(List<Range> ranges)
{
checkArgument(!ranges.isEmpty(), "cannot use empty ranges");
Domain domain = Domain.create(ValueSet.ofRanges(ranges), true);
return columnDomain.intersect(domain).isNone();
}
}
@VisibleForTesting
public static Range getRange(Type type, Object min, Object max)
{
if (type.equals(BIGINT) || type.equals(INTEGER) || type.equals(SMALLINT) || type.equals(TINYINT)) {
long minValue = asLong(min);
long maxValue = asLong(max);
return Range.range(type, minValue, true, maxValue, true);
}
else if (type.equals(REAL)) {
long minValue = floatToRawIntBits((float) min);
long maxValue = floatToRawIntBits((float) max);
return Range.range(type, minValue, true, maxValue, true);
}
else {
return Range.range(type, min, true, max, true);
}
}
class ColumnIndexValueConverter
{
private final Map<String, Function<Object, Object>> conversions;
private ColumnIndexValueConverter(List<RichColumnDescriptor> columns)
{
this.conversions = new HashMap<>();
for (RichColumnDescriptor column : columns) {
conversions.put(column.getPrimitiveType().getName(), getColumnIndexConversions(column.getPrimitiveType()));
}
}
public List<Long> getMinValuesAsLong(Type type, ColumnIndex columnIndex, String column)
{
return getValuesAsLong(type, column, columnIndex.getMinValues().size(), columnIndex.getMinValues());
}
public List<Long> getMaxValuesAsLong(Type type, ColumnIndex columnIndex, String column)
{
return getValuesAsLong(type, column, columnIndex.getMaxValues().size(), columnIndex.getMaxValues());
}
public List<Double> getMinValuesAsDouble(Type type, ColumnIndex columnIndex, String column)
{
return getValuesAsDouble(type, column, columnIndex.getMinValues().size(), columnIndex.getMinValues());
}
public List<Double> getMaxValuesAsDouble(Type type, ColumnIndex columnIndex, String column)
{
return getValuesAsDouble(type, column, columnIndex.getMaxValues().size(), columnIndex.getMaxValues());
}
public List<Slice> getMinValuesAsSlice(Type type, ColumnIndex columnIndex)
{
return getValuesAsSlice(type, columnIndex.getMinValues().size(), columnIndex.getMinValues());
}
public List<Slice> getMaxValuesAsSlice(Type type, ColumnIndex columnIndex)
{
return getValuesAsSlice(type, columnIndex.getMaxValues().size(), columnIndex.getMaxValues());
}
private List<Long> getValuesAsLong(Type type, String column, int pageCount, List<ByteBuffer> values)
{
List<Long> result = new ArrayList<>();
if (TINYINT.equals(type) || SMALLINT.equals(type) || INTEGER.equals(type)) {
for (int i = 0; i < pageCount; i++) {
result.add((long) converter.convert(values.get(i), column));
}
}
else if (BIGINT.equals(type)) {
for (int i = 0; i < pageCount; i++) {
result.add((long) converter.convert(values.get(i), column));
}
}
else if (REAL.equals(type)) {
for (int i = 0; i < pageCount; i++) {
result.add((long) floatToRawIntBits(converter.convert(values.get(i), column)));
}
}
return result;
}
private List<Double> getValuesAsDouble(Type type, String column, int pageCount, List<ByteBuffer> values)
{
List<Double> result = new ArrayList<>();
if (DOUBLE.equals(type)) {
for (int i = 0; i < pageCount; i++) {
result.add(converter.convert(values.get(i), column));
}
}
return result;
}
private List<Slice> getValuesAsSlice(Type type, int pageCount, List<ByteBuffer> values)
{
List<Slice> result = new ArrayList<>();
if (isVarcharType(type)) {
for (int i = 0; i < pageCount; i++) {
result.add(Slices.wrappedBuffer(values.get(i)));
}
}
return result;
}
private <T> T convert(ByteBuffer buffer, String name)
{
return (T) conversions.get(name).apply(buffer);
}
private Function<Object, Object> getColumnIndexConversions(PrimitiveType type)
{
switch (type.getPrimitiveTypeName()) {
case BOOLEAN:
return buffer -> ((ByteBuffer) buffer).get(0) != 0;
case INT32:
return buffer -> (long) ((ByteBuffer) buffer).order(LITTLE_ENDIAN).getInt(0);
case INT64:
return buffer -> ((ByteBuffer) buffer).order(LITTLE_ENDIAN).getLong(0);
case FLOAT:
return buffer -> ((ByteBuffer) buffer).order(LITTLE_ENDIAN).getFloat(0);
case DOUBLE:
return buffer -> ((ByteBuffer) buffer).order(LITTLE_ENDIAN).getDouble(0);
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
case INT96:
return binary -> ByteBuffer.wrap(((Binary) binary).getBytes());
default:
throw new IllegalArgumentException("Unsupported Parquet type: " + type.getPrimitiveTypeName());
}
}
}
}