StoragePartitionLoader.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.hive.cache.HiveCachingHdfsConfiguration;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.util.HiveFileIterator;
import com.facebook.presto.hive.util.InternalHiveSplitFactory;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.ListMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import static com.facebook.presto.hive.HiveBucketing.getVirtualBucketNumber;
import static com.facebook.presto.hive.HiveColumnHandle.pathColumnHandle;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_FILE_NAMES;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static com.facebook.presto.hive.HiveMetadata.shouldCreateFilesForMissingBuckets;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.isFileSplittable;
import static com.facebook.presto.hive.HiveSessionProperties.isOrderBasedExecutionEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSkipEmptyFilesEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSymlinkOptimizedReaderEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache;
import static com.facebook.presto.hive.HiveUtil.buildDirectoryContextProperties;
import static com.facebook.presto.hive.HiveUtil.getFooterCount;
import static com.facebook.presto.hive.HiveUtil.getHeaderCount;
import static com.facebook.presto.hive.HiveUtil.getInputFormat;
import static com.facebook.presto.hive.HiveUtil.getTargetPathsHiveFileInfos;
import static com.facebook.presto.hive.HiveUtil.isHudiParquetInputFormat;
import static com.facebook.presto.hive.HiveUtil.readSymlinkPaths;
import static com.facebook.presto.hive.HiveUtil.shouldUseFileSplitsFromInputFormat;
import static com.facebook.presto.hive.HiveWriterFactory.getBucketNumber;
import static com.facebook.presto.hive.NestedDirectoryPolicy.FAIL;
import static com.facebook.presto.hive.NestedDirectoryPolicy.IGNORED;
import static com.facebook.presto.hive.NestedDirectoryPolicy.RECURSE;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getPartitionLocation;
import static com.facebook.presto.hive.s3select.S3SelectPushdown.shouldEnablePushdownForTable;
import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Maps.fromProperties;
import static com.google.common.collect.Streams.stream;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.lang.Math.max;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
public class StoragePartitionLoader
extends PartitionLoader
{
private static final ListenableFuture<?> COMPLETED_FUTURE = immediateFuture(null);
private final Table table;
private final Map<Integer, Domain> infoColumnConstraints;
private final Optional<BucketSplitInfo> tableBucketInfo;
private final HdfsEnvironment hdfsEnvironment;
private final HdfsContext hdfsContext;
private final NamenodeStats namenodeStats;
private final DirectoryLister directoryLister;
private final boolean recursiveDirWalkerEnabled;
private final ConnectorSession session;
private final Deque<Iterator<InternalHiveSplit>> fileIterators;
private final boolean schedulerUsesHostAddresses;
private final boolean partialAggregationsPushedDown;
private static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize";
public StoragePartitionLoader(
Table table,
Map<Integer, Domain> infoColumnConstraints,
Optional<BucketSplitInfo> tableBucketInfo,
ConnectorSession session,
HdfsEnvironment hdfsEnvironment,
NamenodeStats namenodeStats,
DirectoryLister directoryLister,
Deque<Iterator<InternalHiveSplit>> fileIterators,
boolean recursiveDirWalkerEnabled,
boolean schedulerUsesHostAddresses,
boolean partialAggregationsPushedDown)
{
this.table = requireNonNull(table, "table is null");
this.infoColumnConstraints = requireNonNull(infoColumnConstraints, "infoColumnConstraints is null");
this.tableBucketInfo = requireNonNull(tableBucketInfo, "tableBucketInfo is null");
this.session = requireNonNull(session, "session is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
this.recursiveDirWalkerEnabled = recursiveDirWalkerEnabled;
this.hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), table.getStorage().getLocation(), false);
this.fileIterators = requireNonNull(fileIterators, "fileIterators is null");
this.schedulerUsesHostAddresses = schedulerUsesHostAddresses;
this.partialAggregationsPushedDown = partialAggregationsPushedDown;
Optional<DirectoryLister> directoryListerOverride = Optional.empty();
if (!isNullOrEmpty(table.getStorage().getLocation())) {
Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext, new Path(table.getStorage().getLocation()));
try {
InputFormat<?, ?> inputFormat = getInputFormat(
configuration,
table.getStorage().getStorageFormat().getInputFormat(),
table.getStorage().getStorageFormat().getSerDe(),
false);
if (isHudiParquetInputFormat(inputFormat)) {
directoryListerOverride = Optional.of(new HudiDirectoryLister(configuration, session, table));
}
}
catch (PrestoException ex) {
// Tables and partitions can have different format. When Table format is not supported,
// Ignore Hudi check for those tables. Partitions can still be of a supported format.
if (!HIVE_UNSUPPORTED_FORMAT.toErrorCode().equals(ex.getErrorCode())) {
throw ex;
}
}
}
this.directoryLister = directoryListerOverride.orElseGet(() -> requireNonNull(directoryLister, "directoryLister is null"));
}
private ListenableFuture<?> handleSymlinkTextInputFormat(
ExtendedFileSystem fs,
Path path,
InputFormat<?, ?> inputFormat,
boolean s3SelectPushdownEnabled,
Storage storage,
List<HivePartitionKey> partitionKeys,
String partitionName,
int partitionDataColumnCount,
boolean stopped,
HivePartitionMetadata partition,
HiveSplitSource hiveSplitSource,
Configuration configuration,
boolean splittable)
throws IOException
{
if (tableBucketInfo.isPresent()) {
throw new PrestoException(NOT_SUPPORTED, "Bucketed table in SymlinkTextInputFormat is not yet supported");
}
List<Path> targetPaths = getTargetPathsFromSymlink(fs, path, partition.getPartition());
if (isSymlinkOptimizedReaderEnabled(session)) {
Map<Path, List<Path>> parentToTargets = targetPaths.stream().collect(Collectors.groupingBy(Path::getParent));
InputFormat<?, ?> targetInputFormat = getInputFormat(
configuration,
storage.getStorageFormat().getInputFormat(),
storage.getStorageFormat().getSerDe(),
true);
HiveDirectoryContext hiveDirectoryContext = new HiveDirectoryContext(
IGNORED,
isUseListDirectoryCache(session),
isSkipEmptyFilesEnabled(session),
hdfsContext.getIdentity(),
buildDirectoryContextProperties(session),
session.getRuntimeStats());
for (Map.Entry<Path, List<Path>> entry : parentToTargets.entrySet()) {
Iterator<InternalHiveSplit> symlinkIterator = getSymlinkIterator(
path,
s3SelectPushdownEnabled,
storage,
partitionKeys,
partitionName,
partitionDataColumnCount,
partition,
splittable,
entry.getKey(),
entry.getValue(),
targetInputFormat,
hiveDirectoryContext);
fileIterators.addLast(symlinkIterator);
}
return COMPLETED_FUTURE;
}
return getSymlinkSplits(
path,
inputFormat,
s3SelectPushdownEnabled,
storage,
partitionKeys,
partitionName,
partitionDataColumnCount,
stopped,
partition,
hiveSplitSource,
targetPaths);
}
@VisibleForTesting
Iterator<InternalHiveSplit> getSymlinkIterator(
Path path,
boolean s3SelectPushdownEnabled,
Storage storage,
List<HivePartitionKey> partitionKeys,
String partitionName,
int partitionDataColumnCount,
HivePartitionMetadata partition,
boolean splittable,
Path targetParent,
List<Path> currentTargetPaths,
InputFormat<?, ?> targetInputFormat,
HiveDirectoryContext hiveDirectoryContext)
throws IOException
{
ExtendedFileSystem targetFilesystem = hdfsEnvironment.getFileSystem(hdfsContext, targetParent);
List<HiveFileInfo> targetPathsHiveFileInfos = getTargetPathsHiveFileInfos(
path,
partition.getPartition(),
targetParent,
currentTargetPaths,
hiveDirectoryContext,
targetFilesystem,
directoryLister,
table,
namenodeStats,
session);
InternalHiveSplitFactory splitFactory = getHiveSplitFactory(
targetFilesystem,
targetInputFormat,
s3SelectPushdownEnabled,
storage,
targetParent.toUri().toString(),
partitionName,
partitionKeys,
partitionDataColumnCount,
partition,
Optional.empty());
return targetPathsHiveFileInfos.stream()
.map(hiveFileInfo -> splitFactory.createInternalHiveSplit(hiveFileInfo, splittable))
.filter(Optional::isPresent)
.map(Optional::get)
.iterator();
}
private ListenableFuture<?> getSymlinkSplits(
Path path,
InputFormat<?, ?> inputFormat,
boolean s3SelectPushdownEnabled,
Storage storage,
List<HivePartitionKey> partitionKeys,
String partitionName,
int partitionDataColumnCount,
boolean stopped,
HivePartitionMetadata partition,
HiveSplitSource hiveSplitSource,
List<Path> targetPaths)
throws IOException
{
ListenableFuture<?> lastResult = COMPLETED_FUTURE;
for (Path targetPath : targetPaths) {
// The input should be in TextInputFormat.
TextInputFormat targetInputFormat = new TextInputFormat();
// the splits must be generated using the file system for the target path
// get the configuration for the target path -- it may be a different hdfs instance
ExtendedFileSystem targetFilesystem = hdfsEnvironment.getFileSystem(hdfsContext, targetPath);
Configuration targetConfiguration = targetFilesystem.getConf();
if (targetConfiguration instanceof HiveCachingHdfsConfiguration.CachingJobConf) {
targetConfiguration = ((HiveCachingHdfsConfiguration.CachingJobConf) targetConfiguration).getConfig();
}
if (targetConfiguration instanceof CopyOnFirstWriteConfiguration) {
targetConfiguration = ((CopyOnFirstWriteConfiguration) targetConfiguration).getConfig();
}
JobConf targetJob = toJobConf(targetConfiguration);
targetJob.setInputFormat(TextInputFormat.class);
targetInputFormat.configure(targetJob);
targetJob.set(SPLIT_MINSIZE, Long.toString(getMaxSplitSize(session).toBytes()));
FileInputFormat.setInputPaths(targetJob, targetPath);
InputSplit[] targetSplits = targetInputFormat.getSplits(targetJob, 0);
InternalHiveSplitFactory splitFactory = getHiveSplitFactory(
targetFilesystem,
inputFormat,
s3SelectPushdownEnabled,
storage,
path.toUri().toString(),
partitionName,
partitionKeys,
partitionDataColumnCount,
partition,
Optional.empty());
lastResult = addSplitsToSource(targetSplits, splitFactory, hiveSplitSource, stopped);
if (stopped) {
return COMPLETED_FUTURE;
}
}
return lastResult;
}
private ListenableFuture<?> handleGetSplitsFromInputFormat(Configuration configuration,
Path path,
Properties schema,
InputFormat<?, ?> inputFormat,
boolean stopped,
HiveSplitSource hiveSplitSource,
InternalHiveSplitFactory splitFactory)
throws IOException
{
if (tableBucketInfo.isPresent()) {
throw new PrestoException(NOT_SUPPORTED, "Presto cannot read bucketed partition in an input format with UseFileSplitsFromInputFormat annotation: " + inputFormat.getClass().getSimpleName());
}
JobConf jobConf = toJobConf(configuration);
FileInputFormat.setInputPaths(jobConf, path);
// SerDes parameters and Table parameters passing into input format
fromProperties(schema).forEach(jobConf::set);
jobConf.set(SPLIT_MINSIZE, Long.toString(getMaxSplitSize(session).toBytes()));
InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
return addSplitsToSource(splits, splitFactory, hiveSplitSource, stopped);
}
private InternalHiveSplitFactory getHiveSplitFactory(ExtendedFileSystem fs,
InputFormat<?, ?> inputFormat,
boolean s3SelectPushdownEnabled,
Storage storage,
String path,
String partitionName,
List<HivePartitionKey> partitionKeys,
int partitionDataColumnCount,
HivePartitionMetadata partition,
Optional<HiveSplit.BucketConversion> bucketConversion)
{
return new InternalHiveSplitFactory(
fs,
inputFormat,
infoColumnConstraints,
getNodeSelectionStrategy(session),
getMaxInitialSplitSize(session),
s3SelectPushdownEnabled,
new HiveSplitPartitionInfo(
storage,
path,
partitionKeys,
partitionName,
partitionDataColumnCount,
partition.getTableToPartitionMapping(),
bucketConversion,
partition.getRedundantColumnDomains(),
partition.getRowIdPartitionComponent()),
schedulerUsesHostAddresses,
partition.getEncryptionInformation());
}
@Override
public ListenableFuture<?> loadPartition(HivePartitionMetadata partition, HiveSplitSource hiveSplitSource, boolean stopped)
throws IOException
{
String partitionName = partition.getHivePartition().getPartitionId().getPartitionName();
Storage storage = partition.getPartition().map(Partition::getStorage).orElse(table.getStorage());
Properties schema = getPartitionSchema(table, partition.getPartition());
String inputFormatName = storage.getStorageFormat().getInputFormat();
String serDe = storage.getStorageFormat().getSerDe();
int partitionDataColumnCount = partition.getPartition()
.map(p -> p.getColumns().size())
.orElseGet(table.getDataColumns()::size);
List<HivePartitionKey> partitionKeys = getPartitionKeys(table, partition.getPartition(), partitionName);
String location = getPartitionLocation(table, partition.getPartition());
if (location.isEmpty()) {
checkState(!shouldCreateFilesForMissingBuckets(table, session), "Empty location is only allowed for empty temporary table when zero-row file is not created");
return COMPLETED_FUTURE;
}
Path path = new Path(location);
Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext, path);
// This is required for HUDI MOR realtime tables only.
// Similar changes are implemented in HudiDirectoryLister for HUDI COW and MOR read-optimised tables.
if (directoryLister instanceof HudiDirectoryLister) {
if (configuration instanceof HiveCachingHdfsConfiguration.CachingJobConf) {
configuration = ((HiveCachingHdfsConfiguration.CachingJobConf) configuration).getConfig();
}
if (configuration instanceof CopyOnFirstWriteConfiguration) {
configuration = ((CopyOnFirstWriteConfiguration) configuration).getConfig();
}
}
InputFormat<?, ?> inputFormat = getInputFormat(configuration, inputFormatName, serDe, false);
ExtendedFileSystem fs = hdfsEnvironment.getFileSystem(hdfsContext.getIdentity().getUser(), path, configuration);
boolean s3SelectPushdownEnabled = shouldEnablePushdownForTable(session, table, path.toString(), partition.getPartition());
// Streaming aggregation works at the granularity of individual files
// Partial aggregation pushdown works at the granularity of individual files
// therefore we must not split files when either is enabled.
// Skip header / footer lines are not splittable except for a special case when skip.header.line.count=1
boolean splittable = isFileSplittable(session) &&
!isOrderBasedExecutionEnabled(session) &&
!partialAggregationsPushedDown &&
getFooterCount(schema) == 0 && getHeaderCount(schema) <= 1;
if (inputFormat instanceof SymlinkTextInputFormat) {
return handleSymlinkTextInputFormat(fs, path, inputFormat, s3SelectPushdownEnabled, storage, partitionKeys, partitionName,
partitionDataColumnCount, stopped, partition, hiveSplitSource, configuration, splittable);
}
Optional<HiveSplit.BucketConversion> bucketConversion = Optional.empty();
boolean bucketConversionRequiresWorkerParticipation = false;
if (partition.getPartition().isPresent()) {
Optional<HiveBucketProperty> partitionBucketProperty = partition.getPartition().get().getStorage().getBucketProperty();
if (tableBucketInfo.isPresent() && partitionBucketProperty.isPresent()) {
int tableBucketCount = tableBucketInfo.get().getTableBucketCount();
int partitionBucketCount = partitionBucketProperty.get().getBucketCount();
// Validation was done in HiveSplitManager#getPartitionMetadata.
// Here, it's just trying to see if its needs the BucketConversion.
if (tableBucketCount != partitionBucketCount) {
bucketConversion = Optional.of(new HiveSplit.BucketConversion(tableBucketCount, partitionBucketCount, tableBucketInfo.get().getBucketColumns()));
if (tableBucketCount > partitionBucketCount) {
bucketConversionRequiresWorkerParticipation = true;
}
}
}
}
InternalHiveSplitFactory splitFactory = getHiveSplitFactory(
fs,
inputFormat,
s3SelectPushdownEnabled,
storage,
location,
partitionName,
partitionKeys,
partitionDataColumnCount,
partition,
bucketConversionRequiresWorkerParticipation ? bucketConversion : Optional.empty());
if (shouldUseFileSplitsFromInputFormat(inputFormat, directoryLister)) {
return handleGetSplitsFromInputFormat(configuration, path, schema, inputFormat, stopped, hiveSplitSource, splitFactory);
}
// Bucketed partitions are fully loaded immediately since all files must be loaded to determine the file to bucket mapping
if (tableBucketInfo.isPresent()) {
if (tableBucketInfo.get().isVirtuallyBucketed()) {
// For virtual bucket, bucket conversion must not be present because there is no physical partition bucket count
checkState(!bucketConversion.isPresent(), "Virtually bucketed table must not have partitions that are physically bucketed");
checkState(
tableBucketInfo.get().getTableBucketCount() == tableBucketInfo.get().getReadBucketCount(),
"Table and read bucket count should be the same for virtual bucket");
return hiveSplitSource.addToQueue(getVirtuallyBucketedSplits(path, fs, splitFactory, tableBucketInfo.get().getReadBucketCount(), partition.getPartition(), splittable));
}
return hiveSplitSource.addToQueue(getBucketedSplits(path, fs, splitFactory, tableBucketInfo.get(), bucketConversion, partitionName, partition.getPartition(), splittable));
}
fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory, splittable, partition.getPartition()));
return COMPLETED_FUTURE;
}
private ListenableFuture<?> addSplitsToSource(InputSplit[] targetSplits, InternalHiveSplitFactory splitFactory, HiveSplitSource hiveSplitSource, boolean stopped)
throws IOException
{
ListenableFuture<?> lastResult = COMPLETED_FUTURE;
for (InputSplit inputSplit : targetSplits) {
Optional<InternalHiveSplit> internalHiveSplit = splitFactory.createInternalHiveSplit((FileSplit) inputSplit);
if (internalHiveSplit.isPresent()) {
lastResult = hiveSplitSource.addToQueue(internalHiveSplit.get());
}
if (stopped) {
return COMPLETED_FUTURE;
}
}
return lastResult;
}
private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(
Path path,
ExtendedFileSystem fileSystem,
InternalHiveSplitFactory splitFactory,
boolean splittable,
Optional<Partition> partition)
{
boolean cacheable = isUseListDirectoryCache(session);
if (partition.isPresent()) {
// Use cache only for sealed partitions
cacheable &= partition.get().isSealedPartition();
}
HiveDirectoryContext hiveDirectoryContext = new HiveDirectoryContext(
recursiveDirWalkerEnabled ? RECURSE : IGNORED,
cacheable,
isSkipEmptyFilesEnabled(session),
hdfsContext.getIdentity(),
buildDirectoryContextProperties(session),
session.getRuntimeStats());
return stream(directoryLister.list(fileSystem, table, path, partition, namenodeStats, hiveDirectoryContext))
.map(hiveFileInfo -> splitFactory.createInternalHiveSplit(hiveFileInfo, splittable))
.filter(Optional::isPresent)
.map(Optional::get)
.iterator();
}
private List<InternalHiveSplit> getBucketedSplits(
Path path,
ExtendedFileSystem fileSystem,
InternalHiveSplitFactory splitFactory,
BucketSplitInfo bucketSplitInfo,
Optional<HiveSplit.BucketConversion> bucketConversion,
String partitionName,
Optional<Partition> partition,
boolean splittable)
{
int readBucketCount = bucketSplitInfo.getReadBucketCount();
int tableBucketCount = bucketSplitInfo.getTableBucketCount();
int partitionBucketCount = bucketConversion.map(HiveSplit.BucketConversion::getPartitionBucketCount).orElse(tableBucketCount);
checkState(readBucketCount <= tableBucketCount, "readBucketCount(%s) should be less than or equal to tableBucketCount(%s)", readBucketCount, tableBucketCount);
// list all files in the partition
List<HiveFileInfo> fileInfos = new ArrayList<>(partitionBucketCount);
try {
Iterators.addAll(fileInfos, directoryLister.list(fileSystem, table, path, partition, namenodeStats, new HiveDirectoryContext(
FAIL,
isUseListDirectoryCache(session),
isSkipEmptyFilesEnabled(session),
hdfsContext.getIdentity(),
buildDirectoryContextProperties(session),
session.getRuntimeStats())));
}
catch (HiveFileIterator.NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
throw new PrestoException(
HIVE_INVALID_BUCKET_FILES,
format("Hive table '%s' is corrupt. Found sub-directory in bucket directory for partition: %s",
table.getSchemaTableName(),
partitionName));
}
ListMultimap<Integer, HiveFileInfo> bucketToFileInfo = computeBucketToFileInfoMapping(fileInfos, partitionBucketCount, partitionName);
// convert files internal splits
return convertFilesToInternalSplits(bucketSplitInfo, bucketConversion, bucketToFileInfo, splitFactory, splittable);
}
private ListMultimap<Integer, HiveFileInfo> computeBucketToFileInfoMapping(List<HiveFileInfo> fileInfos,
int partitionBucketCount,
String partitionName)
{
ListMultimap<Integer, HiveFileInfo> bucketToFileInfo = ArrayListMultimap.create();
if (!shouldCreateFilesForMissingBuckets(table, session)) {
fileInfos.stream()
.forEach(fileInfo -> {
String fileName = fileInfo.getFileName();
OptionalInt bucket = getBucketNumber(fileName);
if (bucket.isPresent()) {
bucketToFileInfo.put(bucket.getAsInt(), fileInfo);
}
else {
throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format("invalid hive bucket file name: %s", fileName));
}
});
}
else {
// build mapping of file name to bucket
for (HiveFileInfo file : fileInfos) {
String fileName = file.getFileName();
OptionalInt bucket = getBucketNumber(fileName);
if (bucket.isPresent()) {
bucketToFileInfo.put(bucket.getAsInt(), file);
continue;
}
// legacy mode requires exactly one file per bucket
if (fileInfos.size() != partitionBucketCount) {
throw new PrestoException(
HIVE_INVALID_BUCKET_FILES,
format("Hive table '%s' is corrupt. File '%s' does not match the standard naming pattern, and the number " +
"of files in the directory (%s) does not match the declared bucket count (%s) for partition: %s",
table.getSchemaTableName(),
fileName,
fileInfos.size(),
partitionBucketCount,
partitionName));
}
if (fileInfos.get(0).getFileName().matches("\\d+")) {
try {
// File names are integer if they are created when file_renaming_enabled is set to true
fileInfos.sort(Comparator.comparingInt(fileInfo -> Integer.parseInt(fileInfo.getFileName())));
}
catch (NumberFormatException e) {
throw new PrestoException(
HIVE_INVALID_FILE_NAMES,
format("Hive table '%s' is corrupt. Some of the filenames in the partition: %s are not integers",
new SchemaTableName(table.getDatabaseName(), table.getTableName()),
partitionName));
}
}
else {
// Sort FileStatus objects (instead of, e.g., fileStatus.getPath().toString). This matches org.apache.hadoop.hive.ql.metadata.Table.getSortedPaths
fileInfos.sort(null);
}
// Use position in sorted list as the bucket number
bucketToFileInfo.clear();
for (int i = 0; i < fileInfos.size(); i++) {
bucketToFileInfo.put(i, fileInfos.get(i));
}
break;
}
}
return bucketToFileInfo;
}
private List<InternalHiveSplit> convertFilesToInternalSplits(BucketSplitInfo bucketSplitInfo,
Optional<HiveSplit.BucketConversion> bucketConversion,
ListMultimap<Integer, HiveFileInfo> bucketToFileInfo,
InternalHiveSplitFactory splitFactory,
boolean splittable)
{
int readBucketCount = bucketSplitInfo.getReadBucketCount();
int tableBucketCount = bucketSplitInfo.getTableBucketCount();
int partitionBucketCount = bucketConversion.map(HiveSplit.BucketConversion::getPartitionBucketCount).orElse(tableBucketCount);
int bucketCount = max(readBucketCount, partitionBucketCount);
List<InternalHiveSplit> splitList = new ArrayList<>();
for (int bucketNumber = 0; bucketNumber < bucketCount; bucketNumber++) {
// Physical bucket #. This determine file name. It also determines the order of splits in the result.
int partitionBucketNumber = bucketNumber % partitionBucketCount;
if (!bucketToFileInfo.containsKey(partitionBucketNumber)) {
continue;
}
// Logical bucket #. Each logical bucket corresponds to a "bucket" from engine's perspective.
int readBucketNumber = bucketNumber % readBucketCount;
boolean containsIneligibleTableBucket = false;
List<Integer> eligibleTableBucketNumbers = new ArrayList<>();
for (int tableBucketNumber = bucketNumber % tableBucketCount; tableBucketNumber < tableBucketCount; tableBucketNumber += bucketCount) {
// table bucket number: this is used for evaluating "$bucket" filters.
if (bucketSplitInfo.isTableBucketEnabled(tableBucketNumber)) {
eligibleTableBucketNumbers.add(tableBucketNumber);
}
else {
containsIneligibleTableBucket = true;
}
}
if (!eligibleTableBucketNumbers.isEmpty() && containsIneligibleTableBucket) {
throw new PrestoException(
NOT_SUPPORTED,
"The bucket filter cannot be satisfied. There are restrictions on the bucket filter when all the following is true: " +
"1. a table has a different buckets count as at least one of its partitions that is read in this query; " +
"2. the table has a different but compatible bucket number with another table in the query; " +
"3. some buckets of the table is filtered out from the query, most likely using a filter on \"$bucket\". " +
"(table name: " + table.getTableName() + ", table bucket count: " + tableBucketCount + ", " +
"partition bucket count: " + partitionBucketCount + ", effective reading bucket count: " + readBucketCount + ")");
}
if (!eligibleTableBucketNumbers.isEmpty()) {
for (HiveFileInfo fileInfo : bucketToFileInfo.get(partitionBucketNumber)) {
eligibleTableBucketNumbers.stream()
.map(tableBucketNumber -> splitFactory.createInternalHiveSplit(fileInfo, readBucketNumber, tableBucketNumber, splittable))
.forEach(optionalSplit -> optionalSplit.ifPresent(splitList::add));
}
}
}
return splitList;
}
private List<InternalHiveSplit> getVirtuallyBucketedSplits(Path path, ExtendedFileSystem fileSystem, InternalHiveSplitFactory splitFactory, int bucketCount, Optional<Partition> partition, boolean splittable)
{
// List all files recursively in the partition and assign virtual bucket number to each of them
HiveDirectoryContext hiveDirectoryContext = new HiveDirectoryContext(
recursiveDirWalkerEnabled ? RECURSE : IGNORED,
isUseListDirectoryCache(session),
isSkipEmptyFilesEnabled(session),
hdfsContext.getIdentity(),
buildDirectoryContextProperties(session),
session.getRuntimeStats());
return stream(directoryLister.list(fileSystem, table, path, partition, namenodeStats, hiveDirectoryContext))
.map(fileInfo -> {
int virtualBucketNumber = getVirtualBucketNumber(bucketCount, fileInfo.getPath());
return splitFactory.createInternalHiveSplit(fileInfo, virtualBucketNumber, virtualBucketNumber, splittable);
})
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toImmutableList());
}
private List<Path> getTargetPathsFromSymlink(ExtendedFileSystem fileSystem, Path symlinkDir, Optional<Partition> partition)
{
try {
HiveDirectoryContext hiveDirectoryContext = new HiveDirectoryContext(
IGNORED,
isUseListDirectoryCache(session),
isSkipEmptyFilesEnabled(session),
hdfsContext.getIdentity(),
buildDirectoryContextProperties(session),
session.getRuntimeStats());
Iterator<HiveFileInfo> manifestFileInfos = directoryLister.list(fileSystem, table, symlinkDir, partition, namenodeStats, hiveDirectoryContext);
return readSymlinkPaths(fileSystem, manifestFileInfos);
}
catch (IOException e) {
throw new PrestoException(HIVE_BAD_DATA, "Error parsing symlinks from: " + symlinkDir, e);
}
}
private static Properties getPartitionSchema(Table table, Optional<Partition> partition)
{
return partition.map(value -> getHiveSchema(value, table)).orElseGet(() -> getHiveSchema(table));
}
public static class BucketSplitInfo
{
private final List<HiveColumnHandle> bucketColumns;
private final int tableBucketCount;
private final int readBucketCount;
private final IntPredicate bucketFilter;
public static Optional<BucketSplitInfo> createBucketSplitInfo(Optional<HiveBucketHandle> bucketHandle, Optional<HiveBucketing.HiveBucketFilter> bucketFilter)
{
requireNonNull(bucketHandle, "bucketHandle is null");
requireNonNull(bucketFilter, "buckets is null");
if (!bucketHandle.isPresent()) {
checkArgument(!bucketFilter.isPresent(), "bucketHandle must be present if bucketFilter is present");
return Optional.empty();
}
int tableBucketCount = bucketHandle.get().getTableBucketCount();
int readBucketCount = bucketHandle.get().getReadBucketCount();
List<HiveColumnHandle> bucketColumns = bucketHandle.get().getColumns();
IntPredicate predicate = bucketFilter
.<IntPredicate>map(filter -> filter.getBucketsToKeep()::contains)
.orElse(bucket -> true);
return Optional.of(new BucketSplitInfo(bucketColumns, tableBucketCount, readBucketCount, predicate));
}
private BucketSplitInfo(List<HiveColumnHandle> bucketColumns, int tableBucketCount, int readBucketCount, IntPredicate bucketFilter)
{
this.bucketColumns = ImmutableList.copyOf(requireNonNull(bucketColumns, "bucketColumns is null"));
this.tableBucketCount = tableBucketCount;
this.readBucketCount = readBucketCount;
this.bucketFilter = requireNonNull(bucketFilter, "bucketFilter is null");
}
public List<HiveColumnHandle> getBucketColumns()
{
return bucketColumns;
}
public int getTableBucketCount()
{
return tableBucketCount;
}
public int getReadBucketCount()
{
return readBucketCount;
}
public boolean isVirtuallyBucketed()
{
return bucketColumns.size() == 1 && bucketColumns.get(0).equals(pathColumnHandle());
}
/**
* Evaluates whether the provided table bucket number passes the bucket predicate.
* A bucket predicate can be present in two cases:
* <ul>
* <li>Filter on "$bucket" column. e.g. {@code "$bucket" between 0 and 100}
* <li>Single-value equality filter on all bucket columns. e.g. for a table with two bucketing columns,
* {@code bucketCol1 = 'a' AND bucketCol2 = 123}
* </ul>
*/
public boolean isTableBucketEnabled(int tableBucketNumber)
{
return bucketFilter.test(tableBucketNumber);
}
}
}