HiveManifestUtils.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.github.luben.zstd.Zstd;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.roaringbitmap.RoaringBitmap;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.LongStream;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.hive.HiveErrorCode.MALFORMED_HIVE_FILE_STATISTICS;
import static com.facebook.presto.hive.HiveSessionProperties.isFileRenamingEnabled;
import static com.facebook.presto.hive.PartitionUpdate.FileWriteInfo;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.utf8Slice;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.ISO_8859_1;
public class HiveManifestUtils
{
private static final int FILE_SIZE_CHANNEL = 0;
private static final int ROW_COUNT_CHANNEL = 1;
private static final int COMPRESSION_LEVEL = 7; // default level
private static final String COMMA = ",";
public static final String FILE_NAMES = "FILE_NAMES";
public static final String FILE_SIZES = "FILE_SIZES";
public static final String MANIFEST_VERSION = "MANIFEST_VERSION";
public static final String VERSION_1 = "V1";
private HiveManifestUtils()
{
}
public static Page createFileStatisticsPage(long fileSize, long rowCount)
{
// FileStatistics page layout:
//
// fileSize rowCount
// X X
PageBuilder statsPageBuilder = new PageBuilder(ImmutableList.of(BIGINT, BIGINT));
statsPageBuilder.declarePosition();
BIGINT.writeLong(statsPageBuilder.getBlockBuilder(FILE_SIZE_CHANNEL), fileSize);
BIGINT.writeLong(statsPageBuilder.getBlockBuilder(ROW_COUNT_CHANNEL), rowCount);
return statsPageBuilder.build();
}
public static long getFileSize(Page statisticsPage, int position)
{
// FileStatistics page layout:
//
// fileSize rowCount
// X X
if (position < 0 || position >= statisticsPage.getPositionCount()) {
throw new PrestoException(MALFORMED_HIVE_FILE_STATISTICS, format("Invalid position: %d specified for FileStatistics page", position));
}
return BIGINT.getLong(statisticsPage.getBlock(FILE_SIZE_CHANNEL), position);
}
public static Optional<Page> createPartitionManifest(PartitionUpdate partitionUpdate)
{
// Manifest Page layout:
// fileName fileSize
// X X
// X X
// X X
// ....
PageBuilder manifestBuilder = new PageBuilder(ImmutableList.of(VARCHAR, BIGINT));
BlockBuilder fileNameBuilder = manifestBuilder.getBlockBuilder(0);
BlockBuilder fileSizeBuilder = manifestBuilder.getBlockBuilder(1);
for (FileWriteInfo fileWriteInfo : partitionUpdate.getFileWriteInfos()) {
if (!fileWriteInfo.getFileSize().isPresent()) {
return Optional.empty();
}
manifestBuilder.declarePosition();
VARCHAR.writeSlice(fileNameBuilder, utf8Slice(fileWriteInfo.getWriteFileName()));
BIGINT.writeLong(fileSizeBuilder, fileWriteInfo.getFileSize().get());
}
return Optional.of(manifestBuilder.build());
}
public static Map<String, String> updatePartitionMetadataWithFileNamesAndSizes(PartitionUpdate partitionUpdate, Map<String, String> metadata)
{
ImmutableMap.Builder<String, String> partitionMetadata = ImmutableMap.builder();
List<FileWriteInfo> fileWriteInfos = new ArrayList<>(partitionUpdate.getFileWriteInfos());
if (!partitionUpdate.containsNumberedFileNames()) {
// Filenames starting with ".tmp.presto" will be renamed in TableFinishOperator. So it doesn't make sense to store the filenames in manifest
return metadata;
}
// Sort the file infos based on fileName
fileWriteInfos.sort(Comparator.comparing(info -> Integer.valueOf(info.getWriteFileName())));
List<String> fileNames = fileWriteInfos.stream().map(FileWriteInfo::getWriteFileName).collect(toImmutableList());
List<Long> fileSizes = fileWriteInfos.stream().map(FileWriteInfo::getFileSize).filter(Optional::isPresent).map(Optional::get).collect(toImmutableList());
if (fileSizes.size() < fileNames.size()) {
if (fileSizes.isEmpty()) {
// These files may not have been written by OrcFileWriter. So file sizes not available.
return metadata;
}
throw new PrestoException(
MALFORMED_HIVE_FILE_STATISTICS,
format(
"During manifest creation for partition= %s, filename count= %s is not equal to filesizes count= %s",
partitionUpdate.getName(),
fileNames.size(),
fileSizes.size()));
}
// Compress the file names into a consolidated string
partitionMetadata.put(FILE_NAMES, compressFileNames(fileNames));
// Compress the file sizes
partitionMetadata.put(FILE_SIZES, compressFileSizes(fileSizes));
partitionMetadata.put(MANIFEST_VERSION, VERSION_1);
partitionMetadata.putAll(metadata);
return partitionMetadata.build();
}
public static OptionalLong getManifestSizeInBytes(ConnectorSession session, PartitionUpdate partitionUpdate, Map<String, String> parameters)
{
if (isFileRenamingEnabled(session) && partitionUpdate.containsNumberedFileNames()) {
if (parameters.containsKey(MANIFEST_VERSION)) {
return OptionalLong.of(parameters.get(FILE_NAMES).length() + parameters.get(FILE_SIZES).length());
}
List<FileWriteInfo> fileWriteInfos = partitionUpdate.getFileWriteInfos();
return OptionalLong.of(compressFileNames(fileWriteInfos.stream().map(FileWriteInfo::getWriteFileName).collect(toImmutableList())).length()
+ compressFileSizes(fileWriteInfos.stream().map(FileWriteInfo::getFileSize).filter(Optional::isPresent).map(Optional::get).collect(toImmutableList())).length());
}
return OptionalLong.empty();
}
static String compressFileNames(List<String> fileNames)
{
if (fileNames.size() == 1) {
return fileNames.get(0);
}
boolean isContinuousSequence = true;
int start = 0;
for (String name : fileNames) {
if (start != Integer.parseInt(name)) {
isContinuousSequence = false;
break;
}
start++;
}
if (isContinuousSequence) {
return fileNames.get(fileNames.size() - 1);
}
return compressFileNamesUsingRoaringBitmap(fileNames);
}
static List<String> decompressFileNames(String compressedFileNames)
{
// Check if the compressed fileNames string is a number
if (compressedFileNames.matches("\\d+")) {
long end = Long.parseLong(compressedFileNames);
if (end == 0) {
return ImmutableList.of("0");
}
return LongStream.range(0, end + 1).mapToObj(String::valueOf).collect(toImmutableList());
}
try {
RoaringBitmap roaringBitmap = new RoaringBitmap();
ByteBuffer byteBuffer = ByteBuffer.wrap(compressedFileNames.getBytes(ISO_8859_1));
roaringBitmap.deserialize(byteBuffer);
return Arrays.stream(roaringBitmap.toArray()).mapToObj(Integer::toString).collect(toImmutableList());
}
catch (IOException e) {
throw new PrestoException(MALFORMED_HIVE_FILE_STATISTICS, "Failed de-compressing the file names in manifest");
}
}
private static String compressFileNamesUsingRoaringBitmap(List<String> fileNames)
{
RoaringBitmap roaringBitmap = new RoaringBitmap();
// Add file names to roaring bitmap
fileNames.forEach(name -> roaringBitmap.add(Integer.parseInt(name)));
// Serialize the compressed data into ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(roaringBitmap.serializedSizeInBytes());
roaringBitmap.serialize(byteBuffer);
((Buffer) byteBuffer).flip();
return new String(byteBuffer.array(), ISO_8859_1);
}
public static String compressFileSizes(List<Long> fileSizes)
{
String fileSizesString = Joiner.on(COMMA).join(fileSizes.stream().map(String::valueOf).collect(toImmutableList()));
try {
return new String(Zstd.compress(fileSizesString.getBytes(ISO_8859_1), COMPRESSION_LEVEL), ISO_8859_1);
}
catch (RuntimeException e) {
throw new PrestoException(MALFORMED_HIVE_FILE_STATISTICS, "Failed compressing the file sizes for manifest");
}
}
public static List<Long> decompressFileSizes(String compressedFileSizes)
{
try {
byte[] compressedBytes = compressedFileSizes.getBytes(ISO_8859_1);
long decompressedSize = Zstd.decompressedSize(compressedBytes);
String decompressedFileSizes = new String(Zstd.decompress(compressedBytes, (int) decompressedSize), ISO_8859_1);
return Arrays.stream(decompressedFileSizes.split(COMMA)).map(Long::valueOf).collect(toImmutableList());
}
catch (RuntimeException e) {
throw new PrestoException(MALFORMED_HIVE_FILE_STATISTICS, "Failed de-compressing the file sizes in manifest");
}
}
}