HivePageSourceProvider.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.Subfield.NestedField;
import com.facebook.presto.common.Subfield.PathElement;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HiveSplit.BucketConversion;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordPageSource;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SplitContext;
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTimeZone;
import javax.inject.Inject;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.AGGREGATED;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.hive.HiveBucketing.getHiveBucketFilter;
import static com.facebook.presto.hive.HiveCoercer.createCoercer;
import static com.facebook.presto.hive.HiveColumnHandle.isPushedDownSubfield;
import static com.facebook.presto.hive.HiveColumnHandle.isRowIdColumnHandle;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static com.facebook.presto.hive.HivePageSourceProvider.ColumnMapping.toColumnHandles;
import static com.facebook.presto.hive.HiveSessionProperties.isLegacyTimestampBucketing;
import static com.facebook.presto.hive.HiveSessionProperties.isUseRecordPageSourceForCustomSplit;
import static com.facebook.presto.hive.HiveUtil.getPrefilledColumnValue;
import static com.facebook.presto.hive.HiveUtil.parsePartitionValue;
import static com.facebook.presto.hive.HiveUtil.shouldUseRecordReaderFromInputFormat;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema;
import static com.facebook.presto.hive.metastore.MetastoreUtil.reconstructPartitionSchema;
import static com.facebook.presto.spi.relation.ExpressionOptimizer.Level.OPTIMIZED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Maps.uniqueIndex;
import static java.lang.String.format;
import static java.lang.System.identityHashCode;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
public class HivePageSourceProvider
implements ConnectorPageSourceProvider
{
private final DateTimeZone hiveStorageTimeZone;
private final HdfsEnvironment hdfsEnvironment;
private final Set<HiveRecordCursorProvider> cursorProviders;
private final Set<HiveBatchPageSourceFactory> pageSourceFactories;
private final Set<HiveSelectivePageSourceFactory> selectivePageSourceFactories;
private final Set<HiveAggregatedPageSourceFactory> aggregatedPageSourceFactories;
private final TypeManager typeManager;
private final RowExpressionService rowExpressionService;
private final LoadingCache<RowExpressionCacheKey, RowExpression> optimizedRowExpressionCache;
@Inject
public HivePageSourceProvider(
HiveClientConfig hiveClientConfig,
HdfsEnvironment hdfsEnvironment,
Set<HiveRecordCursorProvider> cursorProviders,
Set<HiveBatchPageSourceFactory> pageSourceFactories,
Set<HiveSelectivePageSourceFactory> selectivePageSourceFactories,
Set<HiveAggregatedPageSourceFactory> aggregatedPageSourceFactories,
TypeManager typeManager,
RowExpressionService rowExpressionService)
{
requireNonNull(hiveClientConfig, "hiveClientConfig is null");
this.hiveStorageTimeZone = hiveClientConfig.getDateTimeZone();
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.cursorProviders = ImmutableSet.copyOf(requireNonNull(cursorProviders, "cursorProviders is null"));
this.pageSourceFactories = ImmutableSet.copyOf(requireNonNull(pageSourceFactories, "pageSourceFactories is null"));
this.selectivePageSourceFactories = ImmutableSet.copyOf(requireNonNull(selectivePageSourceFactories, "selectivePageSourceFactories is null"));
this.aggregatedPageSourceFactories = ImmutableSet.copyOf(requireNonNull(aggregatedPageSourceFactories, "aggregatedPageSourceFactories is null"));
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
this.optimizedRowExpressionCache = CacheBuilder.newBuilder()
.recordStats()
.maximumSize(10_000)
.build(CacheLoader.from(cacheKey -> rowExpressionService.getExpressionOptimizer(cacheKey.session).optimize(cacheKey.rowExpression, OPTIMIZED, cacheKey.session)));
}
@Override
public ConnectorPageSource createPageSource(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorSplit split,
ConnectorTableLayoutHandle layout,
List<ColumnHandle> columns,
SplitContext splitContext,
RuntimeStats runtimeStats)
{
HiveTableLayoutHandle hiveLayout = (HiveTableLayoutHandle) layout;
List<HiveColumnHandle> selectedColumns = columns.stream()
.map(HiveColumnHandle.class::cast)
.collect(toList());
HiveSplit hiveSplit = (HiveSplit) split;
Path path = new Path(hiveSplit.getFileSplit().getPath());
Configuration configuration = hdfsEnvironment.getConfiguration(
new HdfsContext(
session,
hiveSplit.getDatabase(),
hiveSplit.getTable(),
hiveLayout.getTablePath(),
false),
path);
Optional<EncryptionInformation> encryptionInformation = hiveSplit.getEncryptionInformation();
CacheQuota cacheQuota = generateCacheQuota(hiveSplit);
HiveFileContext fileContext = new HiveFileContext(
splitContext.isCacheable(),
cacheQuota,
hiveSplit.getFileSplit().getExtraFileInfo().map(BinaryExtraHiveFileInfo::new),
OptionalLong.of(hiveSplit.getFileSplit().getFileSize()),
OptionalLong.of(hiveSplit.getFileSplit().getStart()),
OptionalLong.of(hiveSplit.getFileSplit().getLength()),
hiveSplit.getFileSplit().getFileModifiedTime(),
HiveSessionProperties.isVerboseRuntimeStatsEnabled(session),
runtimeStats);
if (columns.stream().anyMatch(columnHandle -> ((HiveColumnHandle) columnHandle).getColumnType().equals(AGGREGATED))) {
checkArgument(columns.stream().allMatch(columnHandle -> ((HiveColumnHandle) columnHandle).getColumnType().equals(AGGREGATED)), "Not all columns are of 'AGGREGATED' type");
if (hiveLayout.isFooterStatsUnreliable()) {
throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, format("Partial aggregation pushdown is not supported when footer stats are unreliable. " +
"Table %s has file %s with unreliable footer stats. " +
"Set session property [catalog-name].pushdown_partial_aggregations_into_scan=false and execute query again.",
hiveLayout.getSchemaTableName(),
hiveSplit.getFileSplit().getPath()));
}
return createAggregatedPageSource(aggregatedPageSourceFactories, configuration, session, hiveSplit, hiveLayout, selectedColumns, fileContext, encryptionInformation);
}
if (hiveLayout.isPushdownFilterEnabled()) {
Optional<ConnectorPageSource> selectivePageSource = createSelectivePageSource(
selectivePageSourceFactories,
configuration,
session,
hiveSplit,
hiveLayout,
selectedColumns,
hiveStorageTimeZone,
typeManager,
optimizedRowExpressionCache,
splitContext,
fileContext,
encryptionInformation);
if (selectivePageSource.isPresent()) {
return selectivePageSource.get();
}
}
TupleDomain<HiveColumnHandle> effectivePredicate = hiveLayout.getDomainPredicate()
.transform(Subfield::getRootName)
.transform(hiveLayout.getPredicateColumns()::get);
if (shouldSkipBucket(hiveLayout, hiveSplit, splitContext, isLegacyTimestampBucketing(session))) {
return new HiveEmptySplitPageSource();
}
if (shouldSkipPartition(typeManager, hiveLayout, hiveStorageTimeZone, hiveSplit, splitContext)) {
return new HiveEmptySplitPageSource();
}
Optional<ConnectorPageSource> pageSource = createHivePageSource(
cursorProviders,
pageSourceFactories,
configuration,
session,
hiveSplit.getFileSplit(),
hiveSplit.getTableBucketNumber(),
hiveSplit.getStorage(),
splitContext.getDynamicFilterPredicate().map(filter -> filter.transform(handle -> (HiveColumnHandle) handle).intersect(effectivePredicate)).orElse(effectivePredicate),
selectedColumns,
hiveLayout.getPredicateColumns(),
hiveSplit.getPartitionKeys(),
hiveStorageTimeZone,
typeManager,
hiveLayout.getSchemaTableName(),
hiveLayout.getPartitionColumns().stream().map(HiveColumnHandle.class::cast).collect(toList()),
hiveLayout.getDataColumns(),
hiveLayout.getTableParameters(),
hiveSplit.getPartitionDataColumnCount(),
hiveSplit.getTableToPartitionMapping(),
hiveSplit.getBucketConversion(),
hiveSplit.isS3SelectPushdownEnabled(),
fileContext,
hiveLayout.getRemainingPredicate(),
hiveLayout.isPushdownFilterEnabled(),
rowExpressionService,
encryptionInformation,
hiveSplit.getRowIdPartitionComponent());
if (pageSource.isPresent()) {
return pageSource.get();
}
throw new IllegalStateException("Could not find a file reader for split " + hiveSplit);
}
private ConnectorPageSource createAggregatedPageSource(
Set<HiveAggregatedPageSourceFactory> aggregatedPageSourceFactories,
Configuration configuration,
ConnectorSession session,
HiveSplit hiveSplit,
HiveTableLayoutHandle hiveLayout,
List<HiveColumnHandle> selectedColumns,
HiveFileContext fileContext,
Optional<EncryptionInformation> encryptionInformation)
{
List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(
hiveSplit.getPartitionKeys(),
selectedColumns,
hiveSplit.getBucketConversion().map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()),
hiveSplit.getTableToPartitionMapping(),
hiveSplit.getFileSplit(),
hiveSplit.getTableBucketNumber());
List<ColumnMapping> regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings);
for (HiveAggregatedPageSourceFactory pageSourceFactory : aggregatedPageSourceFactories) {
Optional<? extends ConnectorPageSource> pageSource = pageSourceFactory.createPageSource(
configuration,
session,
hiveSplit.getFileSplit(),
hiveSplit.getStorage(),
toColumnHandles(regularAndInterimColumnMappings, true),
fileContext,
encryptionInformation);
if (pageSource.isPresent()) {
return pageSource.get();
}
}
throw new PrestoException(
HIVE_UNSUPPORTED_FORMAT,
format("Table %s has file of format %s that does not support partial aggregation pushdown. " +
"Set session property [catalog-name].pushdown_partial_aggregations_into_scan=false and execute query again.",
hiveLayout.getSchemaTableName(),
hiveSplit.getStorage().getStorageFormat().getSerDe()));
}
@VisibleForTesting
protected static CacheQuota generateCacheQuota(HiveSplit hiveSplit)
{
Optional<DataSize> quota = hiveSplit.getCacheQuotaRequirement().getQuota();
switch (hiveSplit.getCacheQuotaRequirement().getCacheQuotaScope()) {
case GLOBAL:
return new CacheQuota(".", quota);
case SCHEMA:
return new CacheQuota(hiveSplit.getDatabase(), quota);
case TABLE:
return new CacheQuota(hiveSplit.getDatabase() + "." + hiveSplit.getTable(), quota);
case PARTITION:
return new CacheQuota(hiveSplit.getDatabase() + "." + hiveSplit.getTable() + "." + hiveSplit.getPartitionName(), quota);
default:
throw new PrestoException(HIVE_UNKNOWN_ERROR, format("%s is not supported", quota));
}
}
private static Optional<ConnectorPageSource> createSelectivePageSource(
Set<HiveSelectivePageSourceFactory> selectivePageSourceFactories,
Configuration configuration,
ConnectorSession session,
HiveSplit split,
HiveTableLayoutHandle layout,
List<HiveColumnHandle> selectedColumns,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
LoadingCache<RowExpressionCacheKey, RowExpression> rowExpressionCache,
SplitContext splitContext,
HiveFileContext fileContext,
Optional<EncryptionInformation> encryptionInformation)
{
Set<HiveColumnHandle> interimColumns = ImmutableSet.<HiveColumnHandle>builder()
.addAll(layout.getPredicateColumns().values())
.addAll(split.getBucketConversion().map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()))
.build();
Set<String> columnNames = selectedColumns.stream().map(HiveColumnHandle::getName).collect(toImmutableSet());
List<HiveColumnHandle> allColumns = ImmutableList.<HiveColumnHandle>builder()
.addAll(selectedColumns)
.addAll(interimColumns.stream().filter(column -> !columnNames.contains(column.getName())).collect(toImmutableList()))
.build();
List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(
split.getPartitionKeys(),
allColumns,
ImmutableList.of(),
split.getTableToPartitionMapping(),
split.getFileSplit(),
split.getTableBucketNumber());
Optional<BucketAdaptation> bucketAdaptation = split.getBucketConversion().map(conversion -> toBucketAdaptation(conversion, columnMappings, split.getTableBucketNumber(), mapping -> mapping.getHiveColumnHandle().getHiveColumnIndex(), isLegacyTimestampBucketing(session)));
Map<Integer, String> prefilledValues = columnMappings.stream()
.filter(mapping -> mapping.getKind() == ColumnMappingKind.PREFILLED)
.collect(toImmutableMap(mapping -> mapping.getHiveColumnHandle().getHiveColumnIndex(), ColumnMapping::getPrefilledValue));
Map<Integer, HiveCoercer> coercers = columnMappings.stream()
.filter(mapping -> mapping.getCoercionFrom().isPresent())
.collect(toImmutableMap(
mapping -> mapping.getHiveColumnHandle().getHiveColumnIndex(),
mapping -> createCoercer(typeManager, mapping.getCoercionFrom().get(), mapping.getHiveColumnHandle().getHiveType())));
List<Integer> outputColumns = selectedColumns.stream()
.map(HiveColumnHandle::getHiveColumnIndex)
.collect(toImmutableList());
RowExpression optimizedRemainingPredicate = rowExpressionCache.getUnchecked(new RowExpressionCacheKey(layout.getRemainingPredicate(), session));
if (shouldSkipBucket(layout, split, splitContext, isLegacyTimestampBucketing(session))) {
return Optional.of(new HiveEmptySplitPageSource());
}
if (shouldSkipPartition(typeManager, layout, hiveStorageTimeZone, split, splitContext)) {
return Optional.of(new HiveEmptySplitPageSource());
}
TupleDomain<Subfield> domainPredicate = splitContext.getDynamicFilterPredicate()
.map(filter -> filter.transform(handle -> new Subfield(((HiveColumnHandle) handle).getName())).intersect(layout.getDomainPredicate()))
.orElse(layout.getDomainPredicate());
List<HiveColumnHandle> columnHandles = toColumnHandles(columnMappings, true);
Optional<byte[]> rowIDPartitionComponent = split.getRowIdPartitionComponent();
HiveUtil.checkRowIDPartitionComponent(columnHandles, rowIDPartitionComponent);
for (HiveSelectivePageSourceFactory pageSourceFactory : selectivePageSourceFactories) {
Optional<? extends ConnectorPageSource> pageSource = pageSourceFactory.createPageSource(
configuration,
session,
split.getFileSplit(),
split.getStorage(),
columnHandles,
prefilledValues,
coercers,
bucketAdaptation,
outputColumns,
domainPredicate,
optimizedRemainingPredicate,
hiveStorageTimeZone,
fileContext,
encryptionInformation,
layout.isAppendRowNumberEnabled(),
rowIDPartitionComponent);
if (pageSource.isPresent()) {
return Optional.of(pageSource.get());
}
}
return Optional.empty();
}
public static Optional<ConnectorPageSource> createHivePageSource(
Set<HiveRecordCursorProvider> cursorProviders,
Set<HiveBatchPageSourceFactory> pageSourceFactories,
Configuration configuration,
ConnectorSession session,
HiveFileSplit fileSplit,
OptionalInt tableBucketNumber,
Storage storage,
TupleDomain<HiveColumnHandle> effectivePredicate,
List<HiveColumnHandle> hiveColumns,
Map<String, HiveColumnHandle> predicateColumns,
List<HivePartitionKey> partitionKeys,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
SchemaTableName tableName,
List<HiveColumnHandle> partitionKeyColumnHandles,
List<Column> tableDataColumns,
Map<String, String> tableParameters,
int partitionDataColumnCount,
TableToPartitionMapping tableToPartitionMapping,
Optional<BucketConversion> bucketConversion,
boolean s3SelectPushdownEnabled,
HiveFileContext hiveFileContext,
RowExpression remainingPredicate,
boolean isPushdownFilterEnabled,
RowExpressionService rowExpressionService,
Optional<EncryptionInformation> encryptionInformation,
Optional<byte[]> rowIdPartitionComponent)
{
List<HiveColumnHandle> allColumns;
if (isPushdownFilterEnabled) {
Set<String> columnNames = hiveColumns.stream().map(HiveColumnHandle::getName).collect(toImmutableSet());
List<HiveColumnHandle> additionalColumns = predicateColumns.values().stream()
.filter(column -> !columnNames.contains(column.getName()))
.collect(toImmutableList());
allColumns = ImmutableList.<HiveColumnHandle>builder()
.addAll(hiveColumns)
.addAll(additionalColumns)
.build();
}
else {
allColumns = hiveColumns;
}
List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(
partitionKeys,
allColumns,
bucketConversion.map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()),
tableToPartitionMapping,
fileSplit,
tableBucketNumber);
Set<Integer> outputIndices = hiveColumns.stream()
.map(HiveColumnHandle::getHiveColumnIndex)
.collect(toImmutableSet());
// Finds the non-synthetic columns.
List<ColumnMapping> regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings);
Optional<BucketAdaptation> bucketAdaptation = bucketConversion.map(conversion -> toBucketAdaptation(
conversion,
regularAndInterimColumnMappings,
tableBucketNumber,
ColumnMapping::getIndex,
isLegacyTimestampBucketing(session)));
if (isUseRecordPageSourceForCustomSplit(session) && shouldUseRecordReaderFromInputFormat(configuration, storage, fileSplit.getCustomSplitInfo())) {
return getPageSourceFromCursorProvider(
cursorProviders,
configuration,
session,
fileSplit,
storage,
effectivePredicate,
hiveStorageTimeZone,
typeManager,
tableName,
partitionKeyColumnHandles,
tableDataColumns,
tableParameters,
partitionDataColumnCount,
tableToPartitionMapping,
s3SelectPushdownEnabled,
remainingPredicate,
isPushdownFilterEnabled,
rowExpressionService,
allColumns,
columnMappings,
outputIndices,
regularAndInterimColumnMappings,
bucketAdaptation);
}
for (HiveBatchPageSourceFactory pageSourceFactory : pageSourceFactories) {
Optional<? extends ConnectorPageSource> pageSource = pageSourceFactory.createPageSource(
configuration,
session,
fileSplit,
storage,
tableName,
tableParameters,
toColumnHandles(regularAndInterimColumnMappings, true),
effectivePredicate,
hiveStorageTimeZone,
hiveFileContext,
encryptionInformation,
rowIdPartitionComponent);
if (pageSource.isPresent()) {
HivePageSource hivePageSource = new HivePageSource(
columnMappings,
bucketAdaptation,
hiveStorageTimeZone,
typeManager,
pageSource.get(),
fileSplit.getPath(),
rowIdPartitionComponent);
if (isPushdownFilterEnabled) {
return Optional.of(new FilteringPageSource(
columnMappings,
effectivePredicate,
remainingPredicate,
typeManager,
rowExpressionService,
session,
outputIndices,
hivePageSource));
}
return Optional.of(hivePageSource);
}
}
return getPageSourceFromCursorProvider(
cursorProviders,
configuration,
session,
fileSplit,
storage,
effectivePredicate,
hiveStorageTimeZone,
typeManager,
tableName,
partitionKeyColumnHandles,
tableDataColumns,
tableParameters,
partitionDataColumnCount,
tableToPartitionMapping,
s3SelectPushdownEnabled,
remainingPredicate,
isPushdownFilterEnabled,
rowExpressionService,
allColumns,
columnMappings,
outputIndices,
regularAndInterimColumnMappings,
bucketAdaptation);
}
private static Optional<ConnectorPageSource> getPageSourceFromCursorProvider(
Set<HiveRecordCursorProvider> cursorProviders,
Configuration configuration,
ConnectorSession session,
HiveFileSplit fileSplit,
Storage storage,
TupleDomain<HiveColumnHandle> effectivePredicate,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
SchemaTableName tableName,
List<HiveColumnHandle> partitionKeyColumnHandles,
List<Column> tableDataColumns,
Map<String, String> tableParameters,
int partitionDataColumnCount,
TableToPartitionMapping tableToPartitionMapping,
boolean s3SelectPushdownEnabled,
RowExpression remainingPredicate,
boolean isPushdownFilterEnabled,
RowExpressionService rowExpressionService,
List<HiveColumnHandle> allColumns,
List<ColumnMapping> columnMappings,
Set<Integer> outputIndices,
List<ColumnMapping> regularAndInterimColumnMappings,
Optional<BucketAdaptation> bucketAdaptation)
{
for (HiveRecordCursorProvider provider : cursorProviders) {
// GenericHiveRecordCursor will automatically do the coercion without HiveCoercionRecordCursor
boolean doCoercion = !(provider instanceof GenericHiveRecordCursorProvider);
List<Column> partitionDataColumns = reconstructPartitionSchema(
tableDataColumns,
partitionDataColumnCount,
tableToPartitionMapping.getPartitionSchemaDifference(),
tableToPartitionMapping.getTableToPartitionColumns());
Properties schema = getHiveSchema(
storage,
partitionDataColumns,
tableDataColumns,
tableParameters,
tableName.getSchemaName(),
tableName.getTableName(),
partitionKeyColumnHandles.stream().map(BaseHiveColumnHandle::getName).collect(toImmutableList()),
partitionKeyColumnHandles.stream().map(HiveColumnHandle::getHiveType).collect(toImmutableList()));
Optional<RecordCursor> cursor = provider.createRecordCursor(
configuration,
session,
fileSplit,
schema,
toColumnHandles(regularAndInterimColumnMappings, doCoercion),
effectivePredicate,
hiveStorageTimeZone,
typeManager,
s3SelectPushdownEnabled);
if (cursor.isPresent()) {
RecordCursor delegate = cursor.get();
if (bucketAdaptation.isPresent()) {
delegate = new HiveBucketAdapterRecordCursor(
bucketAdaptation.get().getBucketColumnIndices(),
bucketAdaptation.get().getBucketColumnHiveTypes(),
bucketAdaptation.get().getTableBucketCount(),
bucketAdaptation.get().getPartitionBucketCount(),
bucketAdaptation.get().getBucketToKeep(),
typeManager,
delegate,
isLegacyTimestampBucketing(session));
}
// Need to wrap RcText and RcBinary into a wrapper, which will do the coercion for mismatch columns
if (doCoercion) {
delegate = new HiveCoercionRecordCursor(regularAndInterimColumnMappings, typeManager, delegate);
}
HiveRecordCursor hiveRecordCursor = new HiveRecordCursor(
columnMappings,
hiveStorageTimeZone,
typeManager,
delegate);
List<Type> columnTypes = allColumns.stream()
.map(input -> typeManager.getType(input.getTypeSignature()))
.collect(toList());
RecordPageSource recordPageSource = new RecordPageSource(columnTypes, hiveRecordCursor);
if (isPushdownFilterEnabled) {
return Optional.of(new FilteringPageSource(
columnMappings,
effectivePredicate,
remainingPredicate,
typeManager,
rowExpressionService,
session,
outputIndices,
recordPageSource));
}
return Optional.of(recordPageSource);
}
}
return Optional.empty();
}
private static boolean shouldSkipBucket(HiveTableLayoutHandle hiveLayout, HiveSplit hiveSplit, SplitContext splitContext, boolean useLegacyTimestampBucketing)
{
if (!splitContext.getDynamicFilterPredicate().isPresent()
|| !hiveSplit.getReadBucketNumber().isPresent()
|| !hiveSplit.getStorage().getBucketProperty().isPresent()) {
return false;
}
TupleDomain<ColumnHandle> dynamicFilter = splitContext.getDynamicFilterPredicate().get();
Optional<HiveBucketing.HiveBucketFilter> hiveBucketFilter = getHiveBucketFilter(hiveSplit.getStorage().getBucketProperty(), hiveLayout.getDataColumns(), dynamicFilter, useLegacyTimestampBucketing);
return hiveBucketFilter.map(filter -> !filter.getBucketsToKeep().contains(hiveSplit.getReadBucketNumber().getAsInt())).orElse(false);
}
private static boolean shouldSkipPartition(TypeManager typeManager, HiveTableLayoutHandle hiveLayout, DateTimeZone hiveStorageTimeZone, HiveSplit hiveSplit, SplitContext splitContext)
{
List<HiveColumnHandle> partitionColumns = hiveLayout.getPartitionColumns().stream()
.map(HiveColumnHandle.class::cast)
.collect(toList());
List<Type> partitionTypes = partitionColumns.stream()
.map(column -> typeManager.getType(column.getTypeSignature()))
.collect(toList());
List<HivePartitionKey> partitionKeys = hiveSplit.getPartitionKeys();
if (!splitContext.getDynamicFilterPredicate().isPresent()
|| hiveSplit.getPartitionKeys().isEmpty()
|| partitionColumns.isEmpty()
|| partitionColumns.size() != partitionKeys.size()) {
return false;
}
TupleDomain<ColumnHandle> dynamicFilter = splitContext.getDynamicFilterPredicate().get();
Map<ColumnHandle, Domain> domains = dynamicFilter.getDomains().get();
for (int i = 0; i < partitionKeys.size(); i++) {
Type type = partitionTypes.get(i);
HivePartitionKey hivePartitionKey = partitionKeys.get(i);
HiveColumnHandle hiveColumnHandle = partitionColumns.get(i);
Domain allowedDomain = domains.get(hiveColumnHandle);
NullableValue value = parsePartitionValue(hivePartitionKey, type, hiveStorageTimeZone);
if (allowedDomain != null && !allowedDomain.includesNullableValue(value.getValue())) {
return true;
}
}
return false;
}
private static BucketAdaptation toBucketAdaptation(
BucketConversion conversion,
List<ColumnMapping> columnMappings,
OptionalInt tableBucketNumber,
Function<ColumnMapping, Integer> bucketColumnIndexProducer,
boolean useLegacyTimestamp)
{
Map<Integer, ColumnMapping> hiveIndexToBlockIndex = uniqueIndex(columnMappings, columnMapping -> columnMapping.getHiveColumnHandle().getHiveColumnIndex());
int[] bucketColumnIndices = conversion.getBucketColumnHandles().stream()
.map(HiveColumnHandle::getHiveColumnIndex)
.map(hiveIndexToBlockIndex::get)
.mapToInt(bucketColumnIndexProducer::apply)
.toArray();
List<HiveType> bucketColumnHiveTypes = conversion.getBucketColumnHandles().stream()
.map(HiveColumnHandle::getHiveColumnIndex)
.map(hiveIndexToBlockIndex::get)
.map(ColumnMapping::getHiveColumnHandle)
.map(HiveColumnHandle::getHiveType)
.collect(toImmutableList());
return new BucketAdaptation(
bucketColumnIndices,
bucketColumnHiveTypes,
conversion.getTableBucketCount(),
conversion.getPartitionBucketCount(),
tableBucketNumber.getAsInt(),
useLegacyTimestamp);
}
public static class ColumnMapping
{
private final ColumnMappingKind kind;
private final HiveColumnHandle hiveColumnHandle;
private final Optional<String> prefilledValue;
/**
* ordinal of this column in the underlying page source or record cursor
*/
private final OptionalInt index;
private final Optional<HiveType> coercionFrom;
public static ColumnMapping regular(HiveColumnHandle hiveColumnHandle, int index, Optional<HiveType> coerceFrom)
{
checkArgument(hiveColumnHandle.getColumnType() == REGULAR);
return new ColumnMapping(ColumnMappingKind.REGULAR, hiveColumnHandle, Optional.empty(), OptionalInt.of(index), coerceFrom);
}
public static ColumnMapping aggregated(HiveColumnHandle hiveColumnHandle, int index)
{
checkArgument(hiveColumnHandle.getColumnType() == AGGREGATED);
// Pretend that it is a regular column so that the split manager can process it as normal
return new ColumnMapping(ColumnMappingKind.REGULAR, hiveColumnHandle, Optional.empty(), OptionalInt.of(index), Optional.empty());
}
public static ColumnMapping prefilled(HiveColumnHandle hiveColumnHandle, Optional<String> prefilledValue, Optional<HiveType> coerceFrom)
{
checkArgument(hiveColumnHandle.getColumnType() == PARTITION_KEY || hiveColumnHandle.getColumnType() == SYNTHESIZED);
return new ColumnMapping(ColumnMappingKind.PREFILLED, hiveColumnHandle, prefilledValue, OptionalInt.empty(), coerceFrom);
}
public static ColumnMapping interim(HiveColumnHandle hiveColumnHandle, int index)
{
checkArgument(hiveColumnHandle.getColumnType() == REGULAR);
return new ColumnMapping(ColumnMappingKind.INTERIM, hiveColumnHandle, Optional.empty(), OptionalInt.of(index), Optional.empty());
}
private ColumnMapping(ColumnMappingKind kind, HiveColumnHandle hiveColumnHandle, Optional<String> prefilledValue, OptionalInt index, Optional<HiveType> coerceFrom)
{
this.kind = requireNonNull(kind, "kind is null");
this.hiveColumnHandle = requireNonNull(hiveColumnHandle, "hiveColumnHandle is null");
this.prefilledValue = requireNonNull(prefilledValue, "prefilledValue is null");
this.index = requireNonNull(index, "index is null");
this.coercionFrom = requireNonNull(coerceFrom, "coerceFrom is null");
}
public ColumnMappingKind getKind()
{
return kind;
}
public String getPrefilledValue()
{
checkState(kind == ColumnMappingKind.PREFILLED);
return prefilledValue.orElse("\\N");
}
public HiveColumnHandle getHiveColumnHandle()
{
return hiveColumnHandle;
}
public int getIndex()
{
checkState(kind == ColumnMappingKind.REGULAR || kind == ColumnMappingKind.INTERIM);
return index.getAsInt();
}
public Optional<HiveType> getCoercionFrom()
{
return coercionFrom;
}
/**
* @param columns columns that need to be returned to engine
* @param requiredInterimColumns columns that are needed for processing, but shouldn't be returned to engine (may overlaps with columns)
* @param tableToPartitionMapping table to partition mapping
* @param bucketNumber empty if table is not bucketed, a number within [0, # bucket in table) otherwise
*/
public static List<ColumnMapping> buildColumnMappings(
List<HivePartitionKey> partitionKeys,
List<HiveColumnHandle> columns,
List<HiveColumnHandle> requiredInterimColumns,
TableToPartitionMapping tableToPartitionMapping,
HiveFileSplit fileSplit,
OptionalInt bucketNumber)
{
Map<String, HivePartitionKey> partitionKeysByName = uniqueIndex(partitionKeys, HivePartitionKey::getName);
int regularIndex = 0;
Set<Integer> regularColumnIndices = new HashSet<>();
ImmutableList.Builder<ColumnMapping> columnMappings = ImmutableList.builder();
for (HiveColumnHandle column : columns) {
// will be present if the partition has a different schema (column type, column name) for the column
Optional<Column> partitionColumn = tableToPartitionMapping.getPartitionColumn(column.getHiveColumnIndex());
Optional<HiveType> coercionFrom = Optional.empty();
// we don't care if only the column name has changed
if (partitionColumn.isPresent() && !partitionColumn.get().getType().equals(column.getHiveType())) {
coercionFrom = Optional.of(partitionColumn.get().getType());
}
if (column.getColumnType() == REGULAR) {
checkArgument(regularColumnIndices.add(column.getHiveColumnIndex()), "duplicate hiveColumnIndex in columns list");
columnMappings.add(regular(column, regularIndex, coercionFrom));
regularIndex++;
}
else if (column.getColumnType() == AGGREGATED) {
columnMappings.add(aggregated(column, regularIndex));
regularIndex++;
}
else if (isPushedDownSubfield(column)) {
Optional<HiveType> coercionFromType = getHiveType(coercionFrom, getOnlyElement(column.getRequiredSubfields()));
HiveType coercionToType = column.getHiveType();
if (coercionFromType.isPresent() && coercionFromType.get().equals(coercionToType)) {
// In nested columns, if the resolved type is same as requested type don't add the coercion mapping
coercionFromType = Optional.empty();
}
ColumnMapping columnMapping = new ColumnMapping(ColumnMappingKind.REGULAR, column, Optional.empty(), OptionalInt.of(regularIndex), coercionFromType);
columnMappings.add(columnMapping);
regularIndex++;
}
else if (isRowIdColumnHandle(column)) { // ROW_ID is synthesized but not prefilled.
ColumnMapping columnMapping = new ColumnMapping(ColumnMappingKind.POSTFILLED, column, Optional.empty(), OptionalInt.empty(), coercionFrom);
columnMappings.add(columnMapping);
}
else {
columnMappings.add(prefilled(
column,
getPrefilledColumnValue(column, partitionKeysByName.get(column.getName()), fileSplit, bucketNumber),
coercionFrom));
}
}
for (HiveColumnHandle column : requiredInterimColumns) {
checkArgument(column.getColumnType() == REGULAR);
if (regularColumnIndices.contains(column.getHiveColumnIndex())) {
continue; // This column exists in columns. Do not add it again.
}
// If coercion does not affect bucket number calculation, coercion doesn't need to be applied here.
// Otherwise, read of this partition should not be allowed.
// (Alternatively, the partition could be read as an unbucketed partition. This is not implemented.)
columnMappings.add(interim(column, regularIndex));
regularIndex++;
}
return columnMappings.build();
}
private static Optional<HiveType> getHiveType(Optional<HiveType> baseType, Subfield subfield)
{
List<PathElement> pathElements = subfield.getPath();
ImmutableList.Builder<String> nestedColumnPathBuilder = ImmutableList.builder();
for (PathElement pathElement : pathElements) {
checkArgument(pathElement instanceof NestedField, "Unsupported subfield. Expected only nested path elements. " + subfield);
nestedColumnPathBuilder.add(((NestedField) pathElement).getName());
}
return baseType.flatMap(type -> type.findChildType(nestedColumnPathBuilder.build()));
}
public static List<ColumnMapping> extractRegularAndInterimColumnMappings(List<ColumnMapping> columnMappings)
{
return columnMappings.stream()
.filter(columnMapping -> columnMapping.getKind() == ColumnMappingKind.REGULAR || columnMapping.getKind() == ColumnMappingKind.INTERIM)
.collect(toImmutableList());
}
public static List<HiveColumnHandle> toColumnHandles(List<ColumnMapping> regularColumnMappings, boolean doCoercion)
{
return regularColumnMappings.stream()
.map(columnMapping -> {
HiveColumnHandle columnHandle = columnMapping.getHiveColumnHandle();
if (!doCoercion || !columnMapping.getCoercionFrom().isPresent()) {
return columnHandle;
}
return new HiveColumnHandle(
columnHandle.getName(),
columnMapping.getCoercionFrom().get(),
columnMapping.getCoercionFrom().get().getTypeSignature(),
columnHandle.getHiveColumnIndex(),
columnHandle.getColumnType(),
Optional.empty(),
columnHandle.getRequiredSubfields(),
columnHandle.getPartialAggregation());
})
.collect(toList());
}
}
public enum ColumnMappingKind
{
REGULAR,
PREFILLED,
INTERIM,
AGGREGATED,
POSTFILLED
}
private static final class RowExpressionCacheKey
{
private final RowExpression rowExpression;
private final ConnectorSession session;
RowExpressionCacheKey(RowExpression rowExpression, ConnectorSession session)
{
this.rowExpression = rowExpression;
this.session = session;
}
@Override
public int hashCode()
{
return identityHashCode(rowExpression);
}
@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
RowExpressionCacheKey other = (RowExpressionCacheKey) obj;
return this.rowExpression == other.rowExpression;
}
}
}