OrcSelectivePageSourceFactory.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.orc;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.function.SqlFunctionProperties;
import com.facebook.presto.common.predicate.FilterFunction;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.TupleDomainFilter;
import com.facebook.presto.common.relation.Predicate;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.expressions.DefaultRowExpressionTraversalVisitor;
import com.facebook.presto.expressions.DynamicFilters.DynamicFilterExtractResult;
import com.facebook.presto.hive.BucketAdaptation;
import com.facebook.presto.hive.EncryptionInformation;
import com.facebook.presto.hive.FileFormatDataSourceStats;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveCoercer;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HiveFileContext;
import com.facebook.presto.hive.HiveFileSplit;
import com.facebook.presto.hive.HiveOrcAggregatedMemoryContext;
import com.facebook.presto.hive.HiveSelectivePageSourceFactory;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.SubfieldExtractor;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.orc.DwrfEncryptionProvider;
import com.facebook.presto.orc.OrcAggregatedMemoryContext;
import com.facebook.presto.orc.OrcDataSource;
import com.facebook.presto.orc.OrcEncoding;
import com.facebook.presto.orc.OrcPredicate;
import com.facebook.presto.orc.OrcReader;
import com.facebook.presto.orc.OrcReaderOptions;
import com.facebook.presto.orc.OrcSelectiveRecordReader;
import com.facebook.presto.orc.StripeMetadataSourceFactory;
import com.facebook.presto.orc.TupleDomainOrcPredicate;
import com.facebook.presto.orc.cache.OrcFileTailSource;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.FixedPageSource;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.relation.CallExpression;
import com.facebook.presto.spi.relation.DeterminismEvaluator;
import com.facebook.presto.spi.relation.InputReferenceExpression;
import com.facebook.presto.spi.relation.PredicateCompiler;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.relation.SpecialFormExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import io.airlift.units.DataSize;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.joda.time.DateTimeZone;
import javax.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.IntStream;
import static com.facebook.presto.expressions.DynamicFilters.extractDynamicFilters;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.expressions.LogicalRowExpressions.and;
import static com.facebook.presto.expressions.LogicalRowExpressions.binaryExpression;
import static com.facebook.presto.expressions.LogicalRowExpressions.extractConjuncts;
import static com.facebook.presto.expressions.RowExpressionNodeInliner.replaceExpression;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveBucketing.getHiveBucket;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxMergeDistance;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxReadBlockSize;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcTinyStripeThreshold;
import static com.facebook.presto.hive.HiveCommonSessionProperties.isOrcBloomFiltersEnabled;
import static com.facebook.presto.hive.HiveCommonSessionProperties.isOrcZstdJniDecompressionEnabled;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
import static com.facebook.presto.hive.HiveSessionProperties.isAdaptiveFilterReorderingEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isLegacyTimestampBucketing;
import static com.facebook.presto.hive.HiveUtil.getPhysicalHiveColumnHandles;
import static com.facebook.presto.hive.HiveUtil.typedPartitionKey;
import static com.facebook.presto.hive.MetadataUtils.isEntireColumn;
import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.getOrcDataSource;
import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.getOrcReader;
import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.mapToPrestoException;
import static com.facebook.presto.orc.DwrfEncryptionProvider.NO_ENCRYPTION;
import static com.facebook.presto.orc.OrcEncoding.ORC;
import static com.facebook.presto.orc.OrcReader.INITIAL_BATCH_SIZE;
import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.AND;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableBiMap.toImmutableBiMap;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Maps.uniqueIndex;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
public class OrcSelectivePageSourceFactory
implements HiveSelectivePageSourceFactory
{
private final TypeManager typeManager;
private final StandardFunctionResolution functionResolution;
private final RowExpressionService rowExpressionService;
private final boolean useOrcColumnNames;
private final HdfsEnvironment hdfsEnvironment;
private final FileFormatDataSourceStats stats;
private final int domainCompactionThreshold;
private final OrcFileTailSource orcFileTailSource;
private final StripeMetadataSourceFactory stripeMetadataSourceFactory;
private final TupleDomainFilterCache tupleDomainFilterCache;
@Inject
public OrcSelectivePageSourceFactory(
TypeManager typeManager,
StandardFunctionResolution functionResolution,
RowExpressionService rowExpressionService,
HiveClientConfig config,
HdfsEnvironment hdfsEnvironment,
FileFormatDataSourceStats stats,
OrcFileTailSource orcFileTailSource,
StripeMetadataSourceFactory stripeMetadataSourceFactory,
TupleDomainFilterCache tupleDomainFilterCache)
{
this(
typeManager,
functionResolution,
rowExpressionService,
requireNonNull(config, "hiveClientConfig is null").isUseOrcColumnNames(),
hdfsEnvironment,
stats,
config.getDomainCompactionThreshold(),
orcFileTailSource,
stripeMetadataSourceFactory,
tupleDomainFilterCache);
}
public OrcSelectivePageSourceFactory(
TypeManager typeManager,
StandardFunctionResolution functionResolution,
RowExpressionService rowExpressionService,
boolean useOrcColumnNames,
HdfsEnvironment hdfsEnvironment,
FileFormatDataSourceStats stats,
int domainCompactionThreshold,
OrcFileTailSource orcFileTailSource,
StripeMetadataSourceFactory stripeMetadataSourceFactory,
TupleDomainFilterCache tupleDomainFilterCache)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
this.useOrcColumnNames = useOrcColumnNames;
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.stats = requireNonNull(stats, "stats is null");
this.domainCompactionThreshold = domainCompactionThreshold;
this.orcFileTailSource = requireNonNull(orcFileTailSource, "orcFileTailCache is null");
this.stripeMetadataSourceFactory = requireNonNull(stripeMetadataSourceFactory, "stripeMetadataSourceFactory is null");
this.tupleDomainFilterCache = requireNonNull(tupleDomainFilterCache, "tupleDomainFilterCache is null");
}
@Override
public Optional<? extends ConnectorPageSource> createPageSource(
Configuration configuration,
ConnectorSession session,
HiveFileSplit fileSplit,
Storage storage,
List<HiveColumnHandle> selectedColumns,
Map<Integer, String> prefilledValues,
Map<Integer, HiveCoercer> coercers,
Optional<BucketAdaptation> bucketAdaptation,
List<Integer> outputColumns,
TupleDomain<Subfield> domainPredicate,
RowExpression remainingPredicate,
DateTimeZone hiveStorageTimeZone,
HiveFileContext hiveFileContext,
Optional<EncryptionInformation> encryptionInformation,
boolean appendRowNumberEnabled,
Optional<byte[]> rowIDPartitionComponent)
{
if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) {
return Optional.empty();
}
// per HIVE-13040 and ORC-162, empty files are allowed
if (fileSplit.getFileSize() == 0) {
return Optional.of(new FixedPageSource(ImmutableList.of()));
}
return Optional.of(createOrcPageSource(
session,
ORC,
hdfsEnvironment,
configuration,
fileSplit,
selectedColumns,
prefilledValues,
coercers,
bucketAdaptation,
outputColumns,
domainPredicate,
remainingPredicate,
useOrcColumnNames,
hiveStorageTimeZone,
typeManager,
functionResolution,
rowExpressionService,
isOrcBloomFiltersEnabled(session),
stats,
domainCompactionThreshold,
orcFileTailSource,
stripeMetadataSourceFactory,
hiveFileContext,
tupleDomainFilterCache,
encryptionInformation,
NO_ENCRYPTION,
appendRowNumberEnabled,
rowIDPartitionComponent));
}
public static ConnectorPageSource createOrcPageSource(
ConnectorSession session,
OrcEncoding orcEncoding,
HdfsEnvironment hdfsEnvironment,
Configuration configuration,
HiveFileSplit fileSplit,
List<HiveColumnHandle> selectedColumns,
Map<Integer, String> prefilledValues,
Map<Integer, HiveCoercer> coercers,
Optional<BucketAdaptation> bucketAdaptation,
List<Integer> outputColumns,
TupleDomain<Subfield> domainPredicate,
RowExpression remainingPredicate,
boolean useOrcColumnNames,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
StandardFunctionResolution functionResolution,
RowExpressionService rowExpressionService,
boolean orcBloomFiltersEnabled,
FileFormatDataSourceStats stats,
int domainCompactionThreshold,
OrcFileTailSource orcFileTailSource,
StripeMetadataSourceFactory stripeMetadataSourceFactory,
HiveFileContext hiveFileContext,
TupleDomainFilterCache tupleDomainFilterCache,
Optional<EncryptionInformation> encryptionInformation,
DwrfEncryptionProvider dwrfEncryptionProvider,
boolean appendRowNumberEnabled,
Optional<byte[]> rowIDPartitionComponent)
{
checkArgument(domainCompactionThreshold >= 1, "domainCompactionThreshold must be at least 1");
OrcDataSource orcDataSource = getOrcDataSource(session, fileSplit, hdfsEnvironment, configuration, hiveFileContext, stats);
Path path = new Path(fileSplit.getPath());
boolean supplyRowIDs = selectedColumns.stream().anyMatch(column -> HiveColumnHandle.isRowIdColumnHandle(column));
checkArgument(!supplyRowIDs || rowIDPartitionComponent.isPresent(), "rowIDPartitionComponent required when supplying row IDs");
byte[] partitionID = rowIDPartitionComponent.orElseGet(() -> new byte[0]);
String rowGroupId = path.getName();
DataSize maxMergeDistance = getOrcMaxMergeDistance(session);
DataSize tinyStripeThreshold = getOrcTinyStripeThreshold(session);
DataSize maxReadBlockSize = getOrcMaxReadBlockSize(session);
OrcReaderOptions orcReaderOptions = OrcReaderOptions.builder()
.withMaxMergeDistance(maxMergeDistance)
.withTinyStripeThreshold(tinyStripeThreshold)
.withMaxBlockSize(maxReadBlockSize)
.withZstdJniDecompressionEnabled(isOrcZstdJniDecompressionEnabled(session))
.withAppendRowNumber(appendRowNumberEnabled || supplyRowIDs)
.build();
OrcAggregatedMemoryContext systemMemoryUsage = new HiveOrcAggregatedMemoryContext();
try {
checkArgument(!domainPredicate.isNone(), "Unexpected NONE domain");
OrcReader reader = getOrcReader(
orcEncoding,
selectedColumns,
useOrcColumnNames,
orcFileTailSource,
stripeMetadataSourceFactory,
hiveFileContext,
orcReaderOptions,
encryptionInformation,
dwrfEncryptionProvider,
orcDataSource,
path);
List<HiveColumnHandle> physicalColumns = getPhysicalHiveColumnHandles(selectedColumns, useOrcColumnNames, reader.getTypes(), path);
Map<Integer, Integer> indexMapping = IntStream.range(0, selectedColumns.size())
.boxed()
.collect(toImmutableMap(i -> selectedColumns.get(i).getHiveColumnIndex(), i -> physicalColumns.get(i).getHiveColumnIndex()));
Map<Integer, String> columnNames = physicalColumns.stream()
.collect(toImmutableMap(HiveColumnHandle::getHiveColumnIndex, HiveColumnHandle::getName));
Map<Integer, HiveCoercer> mappedCoercers = coercers.entrySet().stream().collect(toImmutableMap(entry -> indexMapping.get(entry.getKey()), Map.Entry::getValue));
OrcPredicate orcPredicate = toOrcPredicate(domainPredicate, physicalColumns, mappedCoercers, typeManager, domainCompactionThreshold, orcBloomFiltersEnabled);
Map<String, Integer> columnIndices = ImmutableBiMap.copyOf(columnNames).inverse();
Map<Integer, Map<Subfield, TupleDomainFilter>> tupleDomainFilters = toTupleDomainFilters(domainPredicate, columnIndices, mappedCoercers, tupleDomainFilterCache);
List<Integer> outputIndices = outputColumns.stream().map(indexMapping::get).collect(toImmutableList());
Map<Integer, List<Subfield>> requiredSubfields = collectRequiredSubfields(physicalColumns, outputIndices, tupleDomainFilters, remainingPredicate, columnIndices, functionResolution, rowExpressionService, session);
Map<Integer, Type> columnTypes = physicalColumns.stream()
.collect(toImmutableMap(HiveColumnHandle::getHiveColumnIndex, column -> typeManager.getType(column.getTypeSignature())));
Map<Integer, Object> typedPrefilledValues = Maps.transformEntries(
prefilledValues.entrySet().stream()
.collect(toImmutableMap(entry -> indexMapping.get(entry.getKey()), Map.Entry::getValue)),
(hiveColumnIndex, value) -> typedPartitionKey(value, columnTypes.get(hiveColumnIndex), columnNames.get(hiveColumnIndex), hiveStorageTimeZone));
BiMap<Integer, Integer> inputs = IntStream.range(0, physicalColumns.size())
.boxed()
.collect(toImmutableBiMap(i -> physicalColumns.get(i).getHiveColumnIndex(), Function.identity()));
// use column types from the current table schema; these types might be different from this partition's schema
Map<VariableReferenceExpression, InputReferenceExpression> variableToInput = columnNames.keySet().stream()
.collect(toImmutableMap(
hiveColumnIndex -> new VariableReferenceExpression(Optional.empty(), columnNames.get(hiveColumnIndex), getColumnTypeFromTableSchema(coercers, columnTypes, hiveColumnIndex)),
hiveColumnIndex -> new InputReferenceExpression(Optional.empty(), inputs.get(hiveColumnIndex), getColumnTypeFromTableSchema(coercers, columnTypes, hiveColumnIndex))));
Optional<BucketAdapter> bucketAdapter = bucketAdaptation.map(adaptation -> new BucketAdapter(
Arrays.stream(adaptation.getBucketColumnIndices())
.map(indexMapping::get)
.map(inputs::get)
.toArray(),
adaptation.getBucketColumnHiveTypes(),
adaptation.getTableBucketCount(),
adaptation.getPartitionBucketCount(),
adaptation.getBucketToKeep(),
isLegacyTimestampBucketing(session)));
List<FilterFunction> filterFunctions = toFilterFunctions(replaceExpression(remainingPredicate, variableToInput), bucketAdapter, session, rowExpressionService.getDeterminismEvaluator(), rowExpressionService.getPredicateCompiler());
OrcSelectiveRecordReader recordReader = reader.createSelectiveRecordReader(
columnTypes,
outputIndices,
tupleDomainFilters,
filterFunctions,
inputs.inverse(),
requiredSubfields,
typedPrefilledValues,
Maps.transformValues(mappedCoercers, Function.class::cast),
orcPredicate,
fileSplit.getStart(),
fileSplit.getLength(),
hiveStorageTimeZone,
systemMemoryUsage,
Optional.empty(),
INITIAL_BATCH_SIZE);
return new OrcSelectivePageSource(
recordReader,
reader.getOrcDataSource(),
systemMemoryUsage,
stats,
hiveFileContext.getStats(),
appendRowNumberEnabled,
partitionID,
rowGroupId,
supplyRowIDs);
}
catch (Exception e) {
try {
orcDataSource.close();
}
catch (IOException ignored) {
}
throw mapToPrestoException(e, path, fileSplit);
}
}
private static Type getColumnTypeFromTableSchema(Map<Integer, HiveCoercer> coercers, Map<Integer, Type> columnTypes, int hiveColumnIndex)
{
return coercers.containsKey(hiveColumnIndex) ? coercers.get(hiveColumnIndex).getToType() : columnTypes.get(hiveColumnIndex);
}
private static Map<Integer, List<Subfield>> collectRequiredSubfields(List<HiveColumnHandle> physicalColumns, List<Integer> outputColumns, Map<Integer, Map<Subfield, TupleDomainFilter>> tupleDomainFilters, RowExpression remainingPredicate, Map<String, Integer> columnIndices, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, ConnectorSession session)
{
/**
* The logic is:
*
* - columns projected fully are not modified;
* - columns projected partially are updated to include subfields used in the filters or
* to be read in full if entire column is used in a filter
* - columns used for filtering only are updated to prune subfields if filters don't use full column
*/
Map<Integer, Set<Subfield>> outputSubfields = new HashMap<>();
physicalColumns.stream()
.filter(column -> outputColumns.contains(column.getHiveColumnIndex()))
.forEach(column -> outputSubfields.put(column.getHiveColumnIndex(), new HashSet<>(column.getRequiredSubfields())));
Map<Integer, Set<Subfield>> predicateSubfields = new HashMap<>();
SubfieldExtractor subfieldExtractor = new SubfieldExtractor(functionResolution, rowExpressionService.getExpressionOptimizer(session), session);
remainingPredicate.accept(
new RequiredSubfieldsExtractor(subfieldExtractor),
subfield -> predicateSubfields.computeIfAbsent(columnIndices.get(subfield.getRootName()), v -> new HashSet<>()).add(subfield));
for (Map.Entry<Integer, Map<Subfield, TupleDomainFilter>> entry : tupleDomainFilters.entrySet()) {
predicateSubfields.computeIfAbsent(entry.getKey(), v -> new HashSet<>()).addAll(entry.getValue().keySet());
}
Map<Integer, List<Subfield>> allSubfields = new HashMap<>();
for (Map.Entry<Integer, Set<Subfield>> entry : outputSubfields.entrySet()) {
int columnIndex = entry.getKey();
if (entry.getValue().isEmpty()) {
// entire column is projected out
continue;
}
if (!predicateSubfields.containsKey(columnIndex)) {
// column is not used in filters
allSubfields.put(columnIndex, ImmutableList.copyOf(entry.getValue()));
continue;
}
List<Subfield> prunedSubfields = pruneSubfields(ImmutableSet.<Subfield>builder()
.addAll(entry.getValue())
.addAll(predicateSubfields.get(columnIndex)).build());
if (prunedSubfields.size() == 1 && isEntireColumn(prunedSubfields.get(0))) {
// entire column is used in a filter
continue;
}
allSubfields.put(columnIndex, prunedSubfields);
}
for (Map.Entry<Integer, Set<Subfield>> entry : predicateSubfields.entrySet()) {
int columnIndex = entry.getKey();
if (outputSubfields.containsKey(columnIndex)) {
// this column has been already processed (in the previous loop)
continue;
}
List<Subfield> prunedSubfields = pruneSubfields(entry.getValue());
if (prunedSubfields.size() == 1 && isEntireColumn(prunedSubfields.get(0))) {
// entire column is used in a filter
continue;
}
allSubfields.put(columnIndex, prunedSubfields);
}
return allSubfields;
}
// Prunes subfields: if one subfield is a prefix of another subfield, keeps the shortest one.
// Example: {a.b.c, a.b} -> {a.b}
private static List<Subfield> pruneSubfields(Set<Subfield> subfields)
{
verify(!subfields.isEmpty());
return subfields.stream()
.filter(subfield -> !prefixExists(subfield, subfields))
.collect(toImmutableList());
}
private static boolean prefixExists(Subfield subfield, Collection<Subfield> subfields)
{
return subfields.stream().anyMatch(path -> path.isPrefix(subfield));
}
private static final class RequiredSubfieldsExtractor
extends DefaultRowExpressionTraversalVisitor<Consumer<Subfield>>
{
private final SubfieldExtractor subfieldExtractor;
public RequiredSubfieldsExtractor(SubfieldExtractor subfieldExtractor)
{
this.subfieldExtractor = requireNonNull(subfieldExtractor, "subfieldExtractor is null");
}
@Override
public Void visitCall(CallExpression call, Consumer<Subfield> context)
{
Optional<Subfield> subfield = subfieldExtractor.extract(call);
if (subfield.isPresent()) {
context.accept(subfield.get());
return null;
}
call.getArguments().forEach(argument -> argument.accept(this, context));
return null;
}
@Override
public Void visitSpecialForm(SpecialFormExpression specialForm, Consumer<Subfield> context)
{
Optional<Subfield> subfield = subfieldExtractor.extract(specialForm);
if (subfield.isPresent()) {
context.accept(subfield.get());
return null;
}
specialForm.getArguments().forEach(argument -> argument.accept(this, context));
return null;
}
@Override
public Void visitVariableReference(VariableReferenceExpression reference, Consumer<Subfield> context)
{
Optional<Subfield> subfield = subfieldExtractor.extract(reference);
if (subfield.isPresent()) {
context.accept(subfield.get());
return null;
}
return null;
}
}
private static Map<Integer, Map<Subfield, TupleDomainFilter>> toTupleDomainFilters(TupleDomain<Subfield> domainPredicate, Map<String, Integer> columnIndices, Map<Integer, HiveCoercer> coercers, TupleDomainFilterCache tupleDomainFilterCache)
{
Map<Subfield, TupleDomainFilter> filtersBySubfield = Maps.transformValues(domainPredicate.getDomains().get(), tupleDomainFilterCache::getFilter);
Map<Integer, Map<Subfield, TupleDomainFilter>> filtersByColumn = new HashMap<>();
for (Map.Entry<Subfield, TupleDomainFilter> entry : filtersBySubfield.entrySet()) {
Subfield subfield = entry.getKey();
int columnIndex = columnIndices.get(subfield.getRootName());
TupleDomainFilter filter = entry.getValue();
if (coercers.containsKey(columnIndex)) {
filter = coercers.get(columnIndex).toCoercingFilter(filter, subfield);
}
filtersByColumn.computeIfAbsent(columnIndex, k -> new HashMap<>()).put(subfield, filter);
}
return ImmutableMap.copyOf(filtersByColumn);
}
private static OrcPredicate toOrcPredicate(TupleDomain<Subfield> domainPredicate, List<HiveColumnHandle> physicalColumns, Map<Integer, HiveCoercer> coercers, TypeManager typeManager, int domainCompactionThreshold, boolean orcBloomFiltersEnabled)
{
ImmutableList.Builder<TupleDomainOrcPredicate.ColumnReference<HiveColumnHandle>> columnReferences = ImmutableList.builder();
for (HiveColumnHandle column : physicalColumns) {
if (column.getColumnType() == REGULAR) {
Type type = typeManager.getType(column.getTypeSignature());
columnReferences.add(new TupleDomainOrcPredicate.ColumnReference<>(column, column.getHiveColumnIndex(), type));
}
}
Map<String, HiveColumnHandle> columnsByName = uniqueIndex(physicalColumns, HiveColumnHandle::getName);
TupleDomain<HiveColumnHandle> entireColumnDomains = domainPredicate
.transform(subfield -> isEntireColumn(subfield) ? columnsByName.get(subfield.getRootName()) : null)
// filter out columns with coercions to avoid type mismatch errors between column stats in the file and values domain
.transform(column -> coercers.containsKey(column.getHiveColumnIndex()) ? null : column);
return new TupleDomainOrcPredicate<>(entireColumnDomains, columnReferences.build(), orcBloomFiltersEnabled, Optional.of(domainCompactionThreshold));
}
/**
* Split filter expression into groups of conjuncts that depend on the same set of inputs,
* then compile each group into FilterFunction.
*/
private static List<FilterFunction> toFilterFunctions(RowExpression filter, Optional<BucketAdapter> bucketAdapter, ConnectorSession session, DeterminismEvaluator determinismEvaluator, PredicateCompiler predicateCompiler)
{
ImmutableList.Builder<FilterFunction> filterFunctions = ImmutableList.builder();
bucketAdapter.map(predicate -> new FilterFunction(session.getSqlFunctionProperties(), true, predicate))
.ifPresent(filterFunctions::add);
if (TRUE_CONSTANT.equals(filter)) {
return filterFunctions.build();
}
DynamicFilterExtractResult extractDynamicFilterResult = extractDynamicFilters(filter);
// dynamic filter will be added through subfield pushdown
filter = and(extractDynamicFilterResult.getStaticConjuncts());
if (!isAdaptiveFilterReorderingEnabled(session)) {
filterFunctions.add(new FilterFunction(session.getSqlFunctionProperties(), determinismEvaluator.isDeterministic(filter), predicateCompiler.compilePredicate(session.getSqlFunctionProperties(), session.getSessionFunctions(), filter).get()));
return filterFunctions.build();
}
List<RowExpression> conjuncts = extractConjuncts(filter);
if (conjuncts.size() == 1) {
filterFunctions.add(new FilterFunction(session.getSqlFunctionProperties(), determinismEvaluator.isDeterministic(filter), predicateCompiler.compilePredicate(session.getSqlFunctionProperties(), session.getSessionFunctions(), filter).get()));
return filterFunctions.build();
}
// Use LinkedHashMap to preserve user-specified order of conjuncts. This will be the initial order in which filters are applied.
Map<Set<Integer>, List<RowExpression>> inputsToConjuncts = new LinkedHashMap<>();
for (RowExpression conjunct : conjuncts) {
inputsToConjuncts.computeIfAbsent(extractInputs(conjunct), k -> new ArrayList<>()).add(conjunct);
}
inputsToConjuncts.values().stream()
.map(expressions -> binaryExpression(AND, expressions))
.map(predicate -> new FilterFunction(session.getSqlFunctionProperties(), determinismEvaluator.isDeterministic(predicate), predicateCompiler.compilePredicate(session.getSqlFunctionProperties(), session.getSessionFunctions(), predicate).get()))
.forEach(filterFunctions::add);
return filterFunctions.build();
}
private static Set<Integer> extractInputs(RowExpression expression)
{
ImmutableSet.Builder<Integer> inputs = ImmutableSet.builder();
expression.accept(new InputReferenceBuilderVisitor(), inputs);
return inputs.build();
}
private static class InputReferenceBuilderVisitor
extends DefaultRowExpressionTraversalVisitor<ImmutableSet.Builder<Integer>>
{
@Override
public Void visitInputReference(InputReferenceExpression input, ImmutableSet.Builder<Integer> builder)
{
builder.add(input.getField());
return null;
}
}
private static class BucketAdapter
implements Predicate
{
public final int[] bucketColumns;
public final int bucketToKeep;
public final int tableBucketCount;
public final int partitionBucketCount; // for sanity check only
private final List<TypeInfo> typeInfoList;
private final boolean useLegacyTimestampBucketing;
public BucketAdapter(int[] bucketColumnIndices, List<HiveType> bucketColumnHiveTypes, int tableBucketCount, int partitionBucketCount, int bucketToKeep, boolean useLegacyTimestampBucketing)
{
this.bucketColumns = requireNonNull(bucketColumnIndices, "bucketColumnIndices is null");
this.bucketToKeep = bucketToKeep;
this.typeInfoList = requireNonNull(bucketColumnHiveTypes, "bucketColumnHiveTypes is null").stream()
.map(HiveType::getTypeInfo)
.collect(toImmutableList());
this.tableBucketCount = tableBucketCount;
this.partitionBucketCount = partitionBucketCount;
this.useLegacyTimestampBucketing = useLegacyTimestampBucketing;
}
@Override
public int[] getInputChannels()
{
return bucketColumns;
}
@Override
public boolean evaluate(SqlFunctionProperties properties, Page page, int position)
{
int bucket = getHiveBucket(tableBucketCount, typeInfoList, page, position, useLegacyTimestampBucketing);
if ((bucket - bucketToKeep) % partitionBucketCount != 0) {
throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format(
"A row that is supposed to be in bucket %s is encountered. Only rows in bucket %s (modulo %s) are expected",
bucket, bucketToKeep % partitionBucketCount, partitionBucketCount));
}
return bucket == bucketToKeep;
}
}
}