InternalHiveSplitFactory.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.util;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.hive.BlockLocation;
import com.facebook.presto.hive.EncryptionInformation;
import com.facebook.presto.hive.HiveFileInfo;
import com.facebook.presto.hive.HiveSplitPartitionInfo;
import com.facebook.presto.hive.InternalHiveSplit;
import com.facebook.presto.hive.InternalHiveSplit.InternalHiveBlock;
import com.facebook.presto.hive.s3select.S3SelectPushdown;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import static com.facebook.presto.hive.BlockLocation.fromHiveBlockLocations;
import static com.facebook.presto.hive.HiveColumnHandle.FILE_MODIFIED_TIME_COLUMN_INDEX;
import static com.facebook.presto.hive.HiveColumnHandle.FILE_SIZE_COLUMN_INDEX;
import static com.facebook.presto.hive.HiveColumnHandle.PATH_COLUMN_INDEX;
import static com.facebook.presto.hive.HiveUtil.isSelectSplittable;
import static com.facebook.presto.hive.HiveUtil.isSplittable;
import static com.facebook.presto.hive.util.CustomSplitConversionUtils.extractCustomSplitInfo;
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.HARD_AFFINITY;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.utf8Slice;
import static java.util.Objects.requireNonNull;
public class InternalHiveSplitFactory
{
private final FileSystem fileSystem;
private final InputFormat<?, ?> inputFormat;
private final Map<Integer, Domain> infoColumnConstraints;
private final NodeSelectionStrategy nodeSelectionStrategy;
private final boolean s3SelectPushdownEnabled;
private final HiveSplitPartitionInfo partitionInfo;
private final boolean schedulerUsesHostAddresses;
private final Optional<EncryptionInformation> encryptionInformation;
private final long minimumTargetSplitSizeInBytes;
public InternalHiveSplitFactory(
FileSystem fileSystem,
InputFormat<?, ?> inputFormat,
Map<Integer, Domain> infoColumnConstraints,
NodeSelectionStrategy nodeSelectionStrategy,
DataSize minimumTargetSplitSize,
boolean s3SelectPushdownEnabled,
HiveSplitPartitionInfo partitionInfo,
boolean schedulerUsesHostAddresses,
Optional<EncryptionInformation> encryptionInformation)
{
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
this.inputFormat = requireNonNull(inputFormat, "inputFormat is null");
this.infoColumnConstraints = requireNonNull(infoColumnConstraints, "infoColumnConstraints is null");
this.nodeSelectionStrategy = requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null");
this.s3SelectPushdownEnabled = s3SelectPushdownEnabled;
this.partitionInfo = partitionInfo;
this.schedulerUsesHostAddresses = schedulerUsesHostAddresses;
this.encryptionInformation = requireNonNull(encryptionInformation, "encryptionInformation is null");
this.minimumTargetSplitSizeInBytes = requireNonNull(minimumTargetSplitSize, "minimumSplittableSize is null").toBytes();
checkArgument(minimumTargetSplitSizeInBytes > 0, "minimumTargetSplitSize must be > 0, found: %s", minimumTargetSplitSize);
}
public Optional<InternalHiveSplit> createInternalHiveSplit(HiveFileInfo hiveFileInfo, boolean splittable)
{
return createInternalHiveSplit(hiveFileInfo, OptionalInt.empty(), OptionalInt.empty(), splittable);
}
public Optional<InternalHiveSplit> createInternalHiveSplit(HiveFileInfo hiveFileInfo, int readBucketNumber, int tableBucketNumber, boolean splittable)
{
return createInternalHiveSplit(hiveFileInfo, OptionalInt.of(readBucketNumber), OptionalInt.of(tableBucketNumber), splittable);
}
private Optional<InternalHiveSplit> createInternalHiveSplit(HiveFileInfo hiveFileInfo, OptionalInt readBucketNumber, OptionalInt tableBucketNumber, boolean splittable)
{
splittable = splittable &&
hiveFileInfo.getLength() > minimumTargetSplitSizeInBytes &&
(s3SelectPushdownEnabled ?
isSelectSplittable(inputFormat, hiveFileInfo.getPath(), s3SelectPushdownEnabled) :
isSplittable(inputFormat, fileSystem, hiveFileInfo.getPath()));
return createInternalHiveSplit(
hiveFileInfo.getPath(),
hiveFileInfo.getBlockLocations().toArray(new BlockLocation[0]),
0,
hiveFileInfo.getLength(),
hiveFileInfo.getLength(),
hiveFileInfo.getFileModifiedTime(),
readBucketNumber,
tableBucketNumber,
splittable,
hiveFileInfo.getExtraFileInfo(),
hiveFileInfo.getCustomSplitInfo());
}
public Optional<InternalHiveSplit> createInternalHiveSplit(FileSplit split)
throws IOException
{
FileStatus file = fileSystem.getFileStatus(split.getPath());
Map<String, String> customSplitInfo = extractCustomSplitInfo(split);
return createInternalHiveSplit(
split.getPath().toUri().toString(),
fromHiveBlockLocations(fileSystem.getFileBlockLocations(file, split.getStart(), split.getLength())).toArray(new BlockLocation[0]),
split.getStart(),
split.getLength(),
file.getLen(),
file.getModificationTime(),
OptionalInt.empty(),
OptionalInt.empty(),
false,
Optional.empty(),
customSplitInfo);
}
private Optional<InternalHiveSplit> createInternalHiveSplit(
String path,
BlockLocation[] blockLocations,
long start,
long length,
long fileSize,
long fileModificationTime,
OptionalInt readBucketNumber,
OptionalInt tableBucketNumber,
boolean splittable,
Optional<byte[]> extraFileInfo,
Map<String, String> customSplitInfo)
{
if (!infoColumnsMatchPredicates(infoColumnConstraints, path, fileSize, fileModificationTime)) {
return Optional.empty();
}
boolean forceLocalScheduling = this.nodeSelectionStrategy == HARD_AFFINITY;
// For empty files, some filesystem (e.g. LocalFileSystem) produce one empty block
// while others (e.g. hdfs.DistributedFileSystem) produces no block.
// Synthesize an empty block if one does not already exist.
if (fileSize == 0 && blockLocations.length == 0) {
blockLocations = new BlockLocation[] {new BlockLocation(ImmutableList.of(), 0, 0)};
// Turn off force local scheduling because hosts list doesn't exist.
forceLocalScheduling = false;
}
ImmutableList.Builder<InternalHiveBlock> blockBuilder = ImmutableList.builder();
for (BlockLocation blockLocation : blockLocations) {
// clamp the block range
long blockStart = Math.max(start, blockLocation.getOffset());
long blockEnd = Math.min(start + length, blockLocation.getOffset() + blockLocation.getLength());
if (blockStart > blockEnd) {
// block is outside split range
continue;
}
if (blockStart == blockEnd && !(blockStart == start && blockEnd == start + length)) {
// skip zero-width block, except in the special circumstance: slice is empty, and the block covers the empty slice interval.
continue;
}
List<HostAddress> addresses = getHostAddresses(blockLocation);
if (!needsHostAddresses(forceLocalScheduling, addresses)) {
addresses = ImmutableList.of();
}
blockBuilder.add(new InternalHiveBlock(blockEnd, addresses));
}
List<InternalHiveBlock> blocks = blockBuilder.build();
checkBlocks(blocks, start, length);
if (!splittable) {
// not splittable, use the hosts from the first block if it exists
List<HostAddress> addresses = blocks.get(0).getAddresses();
if (!needsHostAddresses(forceLocalScheduling, addresses)) {
addresses = ImmutableList.of();
}
blocks = ImmutableList.of(new InternalHiveBlock(start + length, addresses));
}
String relativePath = path;
boolean isRelative = false;
if (path.startsWith(partitionInfo.getPath())) {
relativePath = path.substring(partitionInfo.getPath().length());
isRelative = true;
}
return Optional.of(new InternalHiveSplit(
relativePath,
isRelative,
start,
start + length,
fileSize,
fileModificationTime,
blocks,
readBucketNumber,
tableBucketNumber,
splittable,
forceLocalScheduling && allBlocksHaveRealAddress(blocks) ? HARD_AFFINITY : nodeSelectionStrategy,
s3SelectPushdownEnabled && S3SelectPushdown.isCompressionCodecSupported(inputFormat, path),
partitionInfo,
extraFileInfo,
encryptionInformation,
customSplitInfo));
}
private boolean needsHostAddresses(boolean forceLocalScheduling, List<HostAddress> addresses)
{
return schedulerUsesHostAddresses || (forceLocalScheduling && hasRealAddress(addresses));
}
private static void checkBlocks(List<InternalHiveBlock> blocks, long start, long length)
{
checkArgument(length >= 0);
checkArgument(!blocks.isEmpty());
checkArgument(start + length == blocks.get(blocks.size() - 1).getEnd());
}
private static boolean allBlocksHaveRealAddress(List<InternalHiveBlock> blocks)
{
return blocks.stream()
.map(InternalHiveBlock::getAddresses)
.allMatch(InternalHiveSplitFactory::hasRealAddress);
}
private static boolean hasRealAddress(List<HostAddress> addresses)
{
// Hadoop FileSystem returns "localhost" as a default
return addresses.stream().anyMatch(address -> !address.getHostText().equals("localhost"));
}
private static List<HostAddress> getHostAddresses(BlockLocation blockLocation)
{
return blockLocation.getHosts().stream()
.map(HostAddress::fromString)
.collect(toImmutableList());
}
private static boolean infoColumnsMatchPredicates(Map<Integer, Domain> constraints,
String path,
long fileSize,
long fileModificationTime)
{
if (constraints.isEmpty()) {
return true;
}
boolean matches = true;
for (Map.Entry<Integer, Domain> constraint : constraints.entrySet()) {
switch (constraint.getKey()) {
case PATH_COLUMN_INDEX:
matches &= constraint.getValue().includesNullableValue(utf8Slice(path));
break;
case FILE_SIZE_COLUMN_INDEX:
matches &= constraint.getValue().includesNullableValue(fileSize);
break;
case FILE_MODIFIED_TIME_COLUMN_INDEX:
matches &= constraint.getValue().includesNullableValue(fileModificationTime);
break;
}
}
return matches;
}
}