HiveUtil.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.airlift.json.Codec;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.type.CharType;
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.Decimals;
import com.facebook.presto.common.type.NamedTypeSignature;
import com.facebook.presto.common.type.RowFieldName;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.common.type.TypeSignatureParameter;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.hadoop.TextLineLengthLimitExceededException;
import com.facebook.presto.hive.avro.PrestoAvroSerDe;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.pagefile.PageInputFormat;
import com.facebook.presto.hive.util.FooterAwareRecordReader;
import com.facebook.presto.orc.metadata.OrcType;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.compress.lzo.LzoCodec;
import io.airlift.compress.lzo.LzopCodec;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceUtf8;
import io.airlift.slice.Slices;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.DateTimeFormatterBuilder;
import org.joda.time.format.DateTimeParser;
import org.joda.time.format.DateTimePrinter;
import org.joda.time.format.ISODateTimeFormat;
import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.Chars.isCharType;
import static com.facebook.presto.common.type.Chars.trimTrailingSpaces;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.DecimalType.createDecimalType;
import static com.facebook.presto.common.type.Decimals.isLongDecimal;
import static com.facebook.presto.common.type.Decimals.isShortDecimal;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.TypeUtils.isDistinctType;
import static com.facebook.presto.common.type.TypeUtils.isEnumType;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveColumnHandle.MAX_PARTITION_KEY_COLUMN_INDEX;
import static com.facebook.presto.hive.HiveColumnHandle.bucketColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.fileModifiedTimeColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.fileSizeColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.isBucketColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.isFileModifiedTimeColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.isFileSizeColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.isPathColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.pathColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.rowIdColumnHandle;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_VIEW_DATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_SERDE_NOT_FOUND;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_TABLE_BUCKETING_IS_IGNORED;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache;
import static com.facebook.presto.hive.HiveStorageFormat.TEXTFILE;
import static com.facebook.presto.hive.metastore.MetastoreUtil.HIVE_DEFAULT_DYNAMIC_PARTITION;
import static com.facebook.presto.hive.metastore.MetastoreUtil.checkCondition;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getMetastoreHeaders;
import static com.facebook.presto.hive.util.ConfigurationUtils.copy;
import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf;
import static com.facebook.presto.hive.util.CustomSplitConversionUtils.recreateSplitWithCustomInfo;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.transform;
import static com.google.common.io.CharStreams.readLines;
import static java.lang.Byte.parseByte;
import static java.lang.Double.parseDouble;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.parseFloat;
import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong;
import static java.lang.Short.parseShort;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.math.BigDecimal.ROUND_UNNECESSARY;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.apache.hadoop.fs.Path.getPathWithoutSchemeAndAuthority;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT;
import static org.apache.hadoop.hive.serde.serdeConstants.COLLECTION_DELIM;
import static org.apache.hadoop.hive.serde.serdeConstants.DECIMAL_TYPE_NAME;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB;
import static org.apache.hadoop.hive.serde2.ColumnProjectionUtils.READ_ALL_COLUMNS;
import static org.apache.hadoop.hive.serde2.ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR;
import static org.apache.hadoop.hive.serde2.ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
public final class HiveUtil
{
public static final String CUSTOM_FILE_SPLIT_CLASS_KEY = "custom_split_class";
public static final String PRESTO_QUERY_ID = "presto_query_id";
public static final String PRESTO_QUERY_SOURCE = "presto_query_source";
public static final String PRESTO_CLIENT_INFO = "presto_client_info";
public static final String PRESTO_USER_NAME = "presto_user_name";
public static final String PRESTO_METASTORE_HEADER = "presto_metastore_header";
public static final String PRESTO_CLIENT_TAGS = "presto_client_tags";
public static final String CLIENT_TAGS_DELIMITER = ",";
private static final Pattern DEFAULT_HIVE_COLUMN_NAME_PATTERN = Pattern.compile("_col\\d+");
private static final String VIEW_PREFIX = "/* Presto View: ";
private static final String VIEW_SUFFIX = " */";
private static final String MATERIALIZED_VIEW_PREFIX = "/* Presto Materialized View: ";
private static final String MATERIALIZED_VIEW_SUFFIX = " */";
private static final DateTimeFormatter HIVE_DATE_PARSER = ISODateTimeFormat.date().withZoneUTC();
private static final DateTimeFormatter HIVE_TIMESTAMP_PARSER;
private static final Field COMPRESSION_CODECS_FIELD;
private static final Pattern SUPPORTED_DECIMAL_TYPE = Pattern.compile(DECIMAL_TYPE_NAME + "\\((\\d+),(\\d+)\\)");
private static final int DECIMAL_PRECISION_GROUP = 1;
private static final int DECIMAL_SCALE_GROUP = 2;
private static final String BIG_DECIMAL_POSTFIX = "BD";
private static final String USE_RECORD_READER_FROM_INPUT_FORMAT_ANNOTATION = "UseRecordReaderFromInputFormat";
private static final String USE_FILE_SPLITS_FROM_INPUT_FORMAT_ANNOTATION = "UseFileSplitsFromInputFormat";
public static void checkRowIDPartitionComponent(List<HiveColumnHandle> columns, Optional<byte[]> rowIdPartitionComponent)
{
boolean supplyRowIDs = columns.stream().anyMatch(column -> HiveColumnHandle.isRowIdColumnHandle(column));
if (supplyRowIDs) {
checkArgument(rowIdPartitionComponent.isPresent(), "rowIDPartitionComponent required when supplying row IDs");
}
}
static {
DateTimeParser[] timestampWithoutTimeZoneParser = {
DateTimeFormat.forPattern("yyyy-M-d").getParser(),
DateTimeFormat.forPattern("yyyy-M-d H:m").getParser(),
DateTimeFormat.forPattern("yyyy-M-d H:m:s").getParser(),
DateTimeFormat.forPattern("yyyy-M-d H:m:s.SSS").getParser(),
DateTimeFormat.forPattern("yyyy-M-d H:m:s.SSSSSSS").getParser(),
DateTimeFormat.forPattern("yyyy-M-d H:m:s.SSSSSSSSS").getParser(),
};
DateTimePrinter timestampWithoutTimeZonePrinter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getPrinter();
HIVE_TIMESTAMP_PARSER = new DateTimeFormatterBuilder().append(timestampWithoutTimeZonePrinter, timestampWithoutTimeZoneParser).toFormatter().withZoneUTC();
try {
COMPRESSION_CODECS_FIELD = TextInputFormat.class.getDeclaredField("compressionCodecs");
COMPRESSION_CODECS_FIELD.setAccessible(true);
}
catch (ReflectiveOperationException e) {
throw new AssertionError(e);
}
}
private HiveUtil()
{
}
public static RecordReader<?, ?> createRecordReader(Configuration configuration, Path path, long start, long length, Properties schema, List<HiveColumnHandle> columns, Map<String, String> customSplitInfo)
{
// determine which hive columns we will read
List<HiveColumnHandle> readColumns = ImmutableList.copyOf(filter(columns, column -> column.getColumnType() == REGULAR));
List<Integer> readHiveColumnIndexes = ImmutableList.copyOf(transform(readColumns, HiveColumnHandle::getHiveColumnIndex));
// Tell hive the columns we would like to read, this lets hive optimize reading column oriented files
setReadColumns(configuration, readHiveColumnIndexes);
// Only propagate serialization schema configs by default
Predicate<String> schemaFilter = schemaProperty -> schemaProperty.startsWith("serialization.");
InputFormat<?, ?> inputFormat = getInputFormat(configuration, getInputFormatName(schema), getDeserializerClassName(schema), true);
JobConf jobConf = toJobConf(configuration);
FileSplit fileSplit = new FileSplit(path, start, length, (String[]) null);
if (!customSplitInfo.isEmpty()) {
fileSplit = recreateSplitWithCustomInfo(fileSplit, customSplitInfo);
// Add additional column information for record reader
List<String> readHiveColumnNames = ImmutableList.copyOf(transform(readColumns, HiveColumnHandle::getName));
jobConf.set(READ_COLUMN_NAMES_CONF_STR, Joiner.on(',').join(readHiveColumnNames));
// Remove filter when using customSplitInfo as the record reader requires complete schema configs
schemaFilter = schemaProperty -> true;
}
schema.stringPropertyNames().stream()
.filter(schemaFilter)
.forEach(name -> jobConf.set(name, schema.getProperty(name)));
// add Airlift LZO and LZOP to head of codecs list so as to not override existing entries
List<String> codecs = newArrayList(Splitter.on(",").trimResults().omitEmptyStrings().split(jobConf.get("io.compression.codecs", "")));
if (!codecs.contains(LzoCodec.class.getName())) {
codecs.add(0, LzoCodec.class.getName());
}
if (!codecs.contains(LzopCodec.class.getName())) {
codecs.add(0, LzopCodec.class.getName());
}
jobConf.set("io.compression.codecs", codecs.stream().collect(joining(",")));
try {
RecordReader<WritableComparable, Writable> recordReader = (RecordReader<WritableComparable, Writable>) inputFormat.getRecordReader(fileSplit, jobConf, Reporter.NULL);
int headerCount = getHeaderCount(schema);
// Only skip header rows when the split is at the beginning of the file
if (start == 0 && headerCount > 0) {
Utilities.skipHeader(recordReader, headerCount, recordReader.createKey(), recordReader.createValue());
}
int footerCount = getFooterCount(schema);
if (footerCount > 0) {
recordReader = new FooterAwareRecordReader<>(recordReader, footerCount, jobConf);
}
return recordReader;
}
catch (IOException e) {
if (e instanceof TextLineLengthLimitExceededException) {
throw new PrestoException(HIVE_BAD_DATA, "Line too long in text file: " + path, e);
}
throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, format("Error opening Hive split %s (offset=%s, length=%s) using %s: %s",
path,
start,
length,
getInputFormatName(schema),
firstNonNull(e.getMessage(), e.getClass().getName())),
e);
}
}
public static void setReadColumns(Configuration configuration, List<Integer> readHiveColumnIndexes)
{
configuration.set(READ_COLUMN_IDS_CONF_STR, Joiner.on(',').join(readHiveColumnIndexes));
configuration.setBoolean(READ_ALL_COLUMNS, false);
}
public static Optional<CompressionCodec> getCompressionCodec(TextInputFormat inputFormat, Path file)
{
CompressionCodecFactory compressionCodecFactory;
try {
compressionCodecFactory = (CompressionCodecFactory) COMPRESSION_CODECS_FIELD.get(inputFormat);
}
catch (IllegalAccessException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to find compressionCodec for inputFormat: " + inputFormat.getClass().getName(), e);
}
if (compressionCodecFactory == null) {
return Optional.empty();
}
return Optional.ofNullable(compressionCodecFactory.getCodec(file));
}
public static InputFormat<?, ?> getInputFormat(Configuration configuration, String inputFormatName, String serDe, boolean symlinkTarget)
{
try {
JobConf jobConf = toJobConf(configuration);
Class<? extends InputFormat<?, ?>> inputFormatClass = getInputFormatClass(jobConf, inputFormatName);
if (symlinkTarget && (inputFormatClass == SymlinkTextInputFormat.class)) {
if (serDe == null) {
throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, "Missing SerDe for SymlinkTextInputFormat");
}
/*
* https://github.com/apache/hive/blob/b240eb3266d4736424678d6c71c3c6f6a6fdbf38/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java#L47-L52
* According to Hive implementation of SymlinkInputFormat, The target input data should be in TextInputFormat.
*
* But Delta Lake provides an integration with Presto using Symlink Tables with target input data as MapredParquetInputFormat.
* https://docs.delta.io/latest/presto-integration.html
*
* To comply with Hive implementation, we will keep the default value here as TextInputFormat unless serde is not LazySimpleSerDe
*/
if (serDe.equals(TEXTFILE.getSerDe())) {
inputFormatClass = TextInputFormat.class;
return ReflectionUtils.newInstance(inputFormatClass, jobConf);
}
for (HiveStorageFormat hiveStorageFormat : HiveStorageFormat.values()) {
if (serDe.equals(hiveStorageFormat.getSerDe())) {
inputFormatClass = getInputFormatClass(jobConf, hiveStorageFormat.getInputFormat());
return ReflectionUtils.newInstance(inputFormatClass, jobConf);
}
}
throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, format("Unsupported SerDe for SymlinkTextInputFormat: %s", serDe));
}
return ReflectionUtils.newInstance(inputFormatClass, jobConf);
}
catch (ClassNotFoundException | RuntimeException e) {
throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, "Unable to create input format " + inputFormatName, e);
}
}
@SuppressWarnings({"unchecked", "RedundantCast"})
private static Class<? extends InputFormat<?, ?>> getInputFormatClass(JobConf conf, String inputFormatName)
throws ClassNotFoundException
{
// CDH uses different names for Parquet
if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName) ||
"parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) {
return MapredParquetInputFormat.class;
}
if (PageInputFormat.class.getSimpleName().equals(inputFormatName)) {
return PageInputFormat.class;
}
Class<?> clazz = conf.getClassByName(inputFormatName);
return (Class<? extends InputFormat<?, ?>>) clazz.asSubclass(InputFormat.class);
}
public static String getInputFormatName(Properties schema)
{
String name = schema.getProperty(FILE_INPUT_FORMAT);
checkCondition(name != null, HIVE_INVALID_METADATA, "Table or partition is missing Hive input format property: %s", FILE_INPUT_FORMAT);
return name;
}
static boolean shouldUseRecordReaderFromInputFormat(Configuration configuration, Storage storage, Map<String, String> customSplitInfo)
{
if (customSplitInfo == null || !customSplitInfo.containsKey(CUSTOM_FILE_SPLIT_CLASS_KEY)) {
return false;
}
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, storage.getStorageFormat().getInputFormat(), storage.getStorageFormat().getSerDe(), false);
return Arrays.stream(inputFormat.getClass().getAnnotations())
.map(Annotation::annotationType)
.map(Class::getSimpleName)
.anyMatch(USE_RECORD_READER_FROM_INPUT_FORMAT_ANNOTATION::equals);
}
static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inputFormat, DirectoryLister directoryLister)
{
if (directoryLister instanceof HudiDirectoryLister) {
boolean hasUseSplitsAnnotation = Arrays.stream(inputFormat.getClass().getAnnotations())
.map(Annotation::annotationType)
.map(Class::getSimpleName)
.anyMatch(USE_FILE_SPLITS_FROM_INPUT_FORMAT_ANNOTATION::equals);
return hasUseSplitsAnnotation &&
(!isHudiParquetInputFormat(inputFormat) || shouldUseFileSplitsForHudi(inputFormat, ((HudiDirectoryLister) directoryLister).getMetaClient()));
}
return false;
}
static boolean isHudiParquetInputFormat(InputFormat<?, ?> inputFormat)
{
return inputFormat instanceof HoodieParquetInputFormat;
}
private static boolean shouldUseFileSplitsForHudi(InputFormat<?, ?> inputFormat, HoodieTableMetaClient metaClient)
{
if (inputFormat instanceof HoodieParquetRealtimeInputFormat) {
return true;
}
return metaClient.getTableConfig().getBootstrapBasePath().isPresent();
}
public static long parseHiveDate(String value)
{
long millis = HIVE_DATE_PARSER.parseMillis(value);
return TimeUnit.MILLISECONDS.toDays(millis);
}
public static long parseHiveTimestamp(String value, DateTimeZone timeZone)
{
return HIVE_TIMESTAMP_PARSER.withZone(timeZone).parseMillis(value);
}
public static boolean isSplittable(InputFormat<?, ?> inputFormat, FileSystem fileSystem, String path)
{
if (inputFormat instanceof OrcInputFormat || inputFormat instanceof RCFileInputFormat) {
return true;
}
// use reflection to get isSplittable method on inputFormat
Method method = null;
for (Class<?> clazz = inputFormat.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
try {
method = clazz.getDeclaredMethod("isSplitable", FileSystem.class, Path.class);
break;
}
catch (NoSuchMethodException ignored) {
}
}
if (method == null) {
return false;
}
try {
method.setAccessible(true);
return (boolean) method.invoke(inputFormat, fileSystem, new Path(path));
}
catch (InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
public static boolean isSelectSplittable(InputFormat<?, ?> inputFormat, String path, boolean s3SelectPushdownEnabled)
{
// S3 Select supports splitting for uncompressed CSV & JSON files
// Previous checks for supported input formats, SerDes, column types and S3 path
// are reflected by the value of s3SelectPushdownEnabled.
return !s3SelectPushdownEnabled || isUncompressed(inputFormat, path);
}
private static boolean isUncompressed(InputFormat<?, ?> inputFormat, String path)
{
if (inputFormat instanceof TextInputFormat) {
return !getCompressionCodec((TextInputFormat) inputFormat, new Path(path)).isPresent();
}
return false;
}
public static StructObjectInspector getTableObjectInspector(Deserializer deserializer)
{
try {
ObjectInspector inspector = deserializer.getObjectInspector();
checkArgument(inspector.getCategory() == Category.STRUCT, "expected STRUCT: %s", inspector.getCategory());
return (StructObjectInspector) inspector;
}
catch (SerDeException e) {
throw new RuntimeException(e);
}
}
public static boolean isDeserializerClass(Properties schema, Class<?> deserializerClass)
{
return getDeserializerClassName(schema).equals(deserializerClass.getName());
}
public static String getDeserializerClassName(Properties schema)
{
String name = schema.getProperty(SERIALIZATION_LIB);
checkCondition(name != null, HIVE_INVALID_METADATA, "Table or partition is missing Hive deserializer property: %s", SERIALIZATION_LIB);
return name;
}
public static Deserializer getDeserializer(Configuration configuration, Properties schema)
{
String name = getDeserializerClassName(schema);
// for collection delimiter, Hive 1.x, 2.x uses "colelction.delim" but Hive 3.x uses "collection.delim"
// see also https://issues.apache.org/jira/browse/HIVE-16922
if (name.equals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) {
if (schema.containsKey("colelction.delim") && !schema.containsKey(COLLECTION_DELIM)) {
schema.put(COLLECTION_DELIM, schema.getProperty("colelction.delim"));
}
}
Deserializer deserializer = createDeserializer(getDeserializerClass(name));
initializeDeserializer(configuration, deserializer, schema);
return deserializer;
}
private static Class<? extends Deserializer> getDeserializerClass(String name)
{
// CDH uses different names for Parquet
if ("parquet.hive.serde.ParquetHiveSerDe".equals(name)) {
return ParquetHiveSerDe.class;
}
if ("org.apache.hadoop.hive.serde2.avro.AvroSerDe".equals(name)) {
return PrestoAvroSerDe.class;
}
try {
return Class.forName(name, true, JavaUtils.getClassLoader()).asSubclass(Deserializer.class);
}
catch (ClassNotFoundException e) {
throw new PrestoException(HIVE_SERDE_NOT_FOUND, "deserializer does not exist: " + name);
}
catch (ClassCastException e) {
throw new RuntimeException("invalid deserializer class: " + name);
}
}
private static Deserializer createDeserializer(Class<? extends Deserializer> clazz)
{
try {
return clazz.getConstructor().newInstance();
}
catch (ReflectiveOperationException e) {
throw new RuntimeException("error creating deserializer: " + clazz.getName(), e);
}
}
private static void initializeDeserializer(Configuration configuration, Deserializer deserializer, Properties schema)
{
try {
configuration = copy(configuration); // Some SerDes (e.g. Avro) modify passed configuration
deserializer.initialize(configuration, schema);
validate(deserializer);
}
catch (SerDeException | RuntimeException e) {
throw new RuntimeException("error initializing deserializer: " + deserializer.getClass().getName(), e);
}
}
private static void validate(Deserializer deserializer)
{
if (deserializer instanceof AbstractSerDe && !((AbstractSerDe) deserializer).getConfigurationErrors().isEmpty()) {
throw new RuntimeException("There are configuration errors: " + ((AbstractSerDe) deserializer).getConfigurationErrors());
}
}
public static boolean isHiveNull(byte[] bytes)
{
return bytes.length == 2 && bytes[0] == '\\' && bytes[1] == 'N';
}
public static void verifyPartitionTypeSupported(String partitionName, Type type)
{
if (!isValidPartitionType(type)) {
throw new PrestoException(NOT_SUPPORTED, format("Unsupported type [%s] for partition: %s", type, partitionName));
}
}
private static boolean isValidPartitionType(Type type)
{
return type instanceof DecimalType ||
BOOLEAN.equals(type) ||
TINYINT.equals(type) ||
SMALLINT.equals(type) ||
INTEGER.equals(type) ||
BIGINT.equals(type) ||
REAL.equals(type) ||
DOUBLE.equals(type) ||
DATE.equals(type) ||
TIMESTAMP.equals(type) ||
isVarcharType(type) ||
isCharType(type) ||
isEnumType(type) ||
isDistinctType(type);
}
public static NullableValue parsePartitionValue(HivePartitionKey key, Type type, DateTimeZone timeZone)
{
return parsePartitionValue(key.getName(), key.getValue().orElse(HIVE_DEFAULT_DYNAMIC_PARTITION), type, timeZone);
}
public static NullableValue parsePartitionValue(String partitionName, String value, Type type, ZoneId hiveStorageTimeZoneId)
{
requireNonNull(hiveStorageTimeZoneId, "hiveStorageTimeZoneId is null");
return parsePartitionValue(partitionName, value, type, getDateTimeZone(hiveStorageTimeZoneId));
}
private static DateTimeZone getDateTimeZone(ZoneId hiveStorageTimeZoneId)
{
return DateTimeZone.forID(hiveStorageTimeZoneId.getId());
}
public static NullableValue parsePartitionValue(String partitionName, String value, Type type, DateTimeZone timeZone)
{
verifyPartitionTypeSupported(partitionName, type);
boolean isNull = HIVE_DEFAULT_DYNAMIC_PARTITION.equals(value);
if (type instanceof DecimalType) {
DecimalType decimalType = (DecimalType) type;
if (isNull) {
return NullableValue.asNull(decimalType);
}
if (decimalType.isShort()) {
if (value.isEmpty()) {
return NullableValue.of(decimalType, 0L);
}
return NullableValue.of(decimalType, shortDecimalPartitionKey(value, decimalType, partitionName));
}
else {
if (value.isEmpty()) {
return NullableValue.of(decimalType, Decimals.encodeUnscaledValue(BigInteger.ZERO));
}
return NullableValue.of(decimalType, longDecimalPartitionKey(value, decimalType, partitionName));
}
}
if (BOOLEAN.equals(type)) {
if (isNull) {
return NullableValue.asNull(BOOLEAN);
}
if (value.isEmpty()) {
return NullableValue.of(BOOLEAN, false);
}
return NullableValue.of(BOOLEAN, booleanPartitionKey(value, partitionName));
}
if (TINYINT.equals(type)) {
if (isNull) {
return NullableValue.asNull(TINYINT);
}
if (value.isEmpty()) {
return NullableValue.of(TINYINT, 0L);
}
return NullableValue.of(TINYINT, tinyintPartitionKey(value, partitionName));
}
if (SMALLINT.equals(type)) {
if (isNull) {
return NullableValue.asNull(SMALLINT);
}
if (value.isEmpty()) {
return NullableValue.of(SMALLINT, 0L);
}
return NullableValue.of(SMALLINT, smallintPartitionKey(value, partitionName));
}
if (INTEGER.equals(type)) {
if (isNull) {
return NullableValue.asNull(INTEGER);
}
if (value.isEmpty()) {
return NullableValue.of(INTEGER, 0L);
}
return NullableValue.of(INTEGER, integerPartitionKey(value, partitionName));
}
if (BIGINT.equals(type)) {
if (isNull) {
return NullableValue.asNull(BIGINT);
}
if (value.isEmpty()) {
return NullableValue.of(BIGINT, 0L);
}
return NullableValue.of(BIGINT, bigintPartitionKey(value, partitionName));
}
if (DATE.equals(type)) {
if (isNull) {
return NullableValue.asNull(DATE);
}
return NullableValue.of(DATE, datePartitionKey(value, partitionName));
}
if (TIMESTAMP.equals(type)) {
if (isNull) {
return NullableValue.asNull(TIMESTAMP);
}
return NullableValue.of(TIMESTAMP, timestampPartitionKey(value, timeZone, partitionName));
}
if (REAL.equals(type)) {
if (isNull) {
return NullableValue.asNull(REAL);
}
if (value.isEmpty()) {
return NullableValue.of(REAL, (long) floatToRawIntBits(0.0f));
}
return NullableValue.of(REAL, floatPartitionKey(value, partitionName));
}
if (DOUBLE.equals(type)) {
if (isNull) {
return NullableValue.asNull(DOUBLE);
}
if (value.isEmpty()) {
return NullableValue.of(DOUBLE, 0.0);
}
return NullableValue.of(DOUBLE, doublePartitionKey(value, partitionName));
}
if (isVarcharType(type)) {
if (isNull) {
return NullableValue.asNull(type);
}
return NullableValue.of(type, varcharPartitionKey(value, partitionName, type));
}
if (isCharType(type)) {
if (isNull) {
return NullableValue.asNull(type);
}
return NullableValue.of(type, charPartitionKey(value, partitionName, type));
}
throw new VerifyException(format("Unhandled type [%s] for partition: %s", type, partitionName));
}
public static String encodeViewData(String data)
{
return encodeView(data, VIEW_PREFIX, VIEW_SUFFIX);
}
public static String decodeViewData(String data)
{
return decodeView(data, VIEW_PREFIX, VIEW_SUFFIX);
}
public static String encodeMaterializedViewData(String data)
{
return encodeView(data, MATERIALIZED_VIEW_PREFIX, MATERIALIZED_VIEW_SUFFIX);
}
public static String decodeMaterializedViewData(String data)
{
return decodeView(data, MATERIALIZED_VIEW_PREFIX, MATERIALIZED_VIEW_SUFFIX);
}
private static String encodeView(String data, String prefix, String suffix)
{
return prefix + Base64.getEncoder().encodeToString(data.getBytes(UTF_8)) + suffix;
}
private static String decodeView(String data, String prefix, String suffix)
{
checkCondition(data.startsWith(prefix), HIVE_INVALID_VIEW_DATA, "View data missing prefix: %s", data);
checkCondition(data.endsWith(suffix), HIVE_INVALID_VIEW_DATA, "View data missing suffix: %s", data);
data = data.substring(prefix.length());
data = data.substring(0, data.length() - suffix.length());
return new String(Base64.getDecoder().decode(data), UTF_8);
}
public static Optional<DecimalType> getDecimalType(HiveType hiveType)
{
return getDecimalType(hiveType.getHiveTypeName().toString());
}
public static Optional<DecimalType> getDecimalType(String hiveTypeName)
{
Matcher matcher = SUPPORTED_DECIMAL_TYPE.matcher(hiveTypeName);
if (matcher.matches()) {
int precision = parseInt(matcher.group(DECIMAL_PRECISION_GROUP));
int scale = parseInt(matcher.group(DECIMAL_SCALE_GROUP));
return Optional.of(createDecimalType(precision, scale));
}
else {
return Optional.empty();
}
}
public static boolean isStructuralType(Type type)
{
String baseName = type.getTypeSignature().getBase();
return baseName.equals(StandardTypes.MAP) || baseName.equals(StandardTypes.ARRAY) || baseName.equals(StandardTypes.ROW);
}
public static boolean isStructuralType(HiveType hiveType)
{
return hiveType.getCategory() == Category.LIST || hiveType.getCategory() == Category.MAP || hiveType.getCategory() == Category.STRUCT;
}
public static boolean booleanPartitionKey(String value, String name)
{
if (value.equalsIgnoreCase("true")) {
return true;
}
if (value.equalsIgnoreCase("false")) {
return false;
}
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for BOOLEAN partition key: %s", value, name));
}
public static long bigintPartitionKey(String value, String name)
{
try {
return parseLong(value);
}
catch (NumberFormatException e) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for BIGINT partition key: %s", value, name));
}
}
public static long integerPartitionKey(String value, String name)
{
try {
return parseInt(value);
}
catch (NumberFormatException e) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for INTEGER partition key: %s", value, name));
}
}
public static long smallintPartitionKey(String value, String name)
{
try {
return parseShort(value);
}
catch (NumberFormatException e) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for SMALLINT partition key: %s", value, name));
}
}
public static long tinyintPartitionKey(String value, String name)
{
try {
return parseByte(value);
}
catch (NumberFormatException e) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for TINYINT partition key: %s", value, name));
}
}
public static long floatPartitionKey(String value, String name)
{
try {
return floatToRawIntBits(parseFloat(value));
}
catch (NumberFormatException e) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for FLOAT partition key: %s", value, name));
}
}
public static double doublePartitionKey(String value, String name)
{
try {
return parseDouble(value);
}
catch (NumberFormatException e) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for DOUBLE partition key: %s", value, name));
}
}
public static long datePartitionKey(String value, String name)
{
try {
return parseHiveDate(value);
}
catch (IllegalArgumentException e) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for DATE partition key: %s", value, name));
}
}
public static long timestampPartitionKey(String value, DateTimeZone zone, String name)
{
try {
return parseHiveTimestamp(value, zone);
}
catch (IllegalArgumentException e) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for TIMESTAMP partition key: %s", value, name));
}
}
public static long shortDecimalPartitionKey(String value, DecimalType type, String name)
{
return decimalPartitionKey(value, type, name).unscaledValue().longValue();
}
public static Slice longDecimalPartitionKey(String value, DecimalType type, String name)
{
return Decimals.encodeUnscaledValue(decimalPartitionKey(value, type, name).unscaledValue());
}
private static BigDecimal decimalPartitionKey(String value, DecimalType type, String name)
{
try {
if (value.endsWith(BIG_DECIMAL_POSTFIX)) {
value = value.substring(0, value.length() - BIG_DECIMAL_POSTFIX.length());
}
BigDecimal decimal = new BigDecimal(value);
decimal = decimal.setScale(type.getScale(), ROUND_UNNECESSARY);
if (decimal.precision() > type.getPrecision()) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for %s partition key: %s", value, type.toString(), name));
}
return decimal;
}
catch (NumberFormatException e) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for %s partition key: %s", value, type.toString(), name));
}
}
public static Slice varcharPartitionKey(String value, String name, Type columnType)
{
Slice partitionKey = Slices.utf8Slice(value);
VarcharType varcharType = (VarcharType) columnType;
if (SliceUtf8.countCodePoints(partitionKey) > varcharType.getLength()) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for %s partition key: %s", value, columnType.toString(), name));
}
return partitionKey;
}
public static Slice charPartitionKey(String value, String name, Type columnType)
{
Slice partitionKey = trimTrailingSpaces(Slices.utf8Slice(value));
CharType charType = (CharType) columnType;
if (SliceUtf8.countCodePoints(partitionKey) > charType.getLength()) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, format("Invalid partition value '%s' for %s partition key: %s", value, columnType.toString(), name));
}
return partitionKey;
}
public static List<HiveColumnHandle> hiveColumnHandles(Table table)
{
ImmutableList.Builder<HiveColumnHandle> columns = ImmutableList.builder();
// add the data fields first
columns.addAll(getRegularColumnHandles(table));
// add the partition keys last (like Hive does)
columns.addAll(getPartitionKeyColumnHandles(table));
// add hidden columns
columns.add(pathColumnHandle());
if (table.getStorage().getBucketProperty().isPresent()) {
columns.add(bucketColumnHandle());
}
columns.add(fileSizeColumnHandle());
columns.add(fileModifiedTimeColumnHandle());
columns.add(rowIdColumnHandle());
return columns.build();
}
public static List<HiveColumnHandle> getRegularColumnHandles(Table table)
{
ImmutableList.Builder<HiveColumnHandle> columns = ImmutableList.builder();
int hiveColumnIndex = 0;
for (Column field : table.getDataColumns()) {
// ignore unsupported types rather than failing
HiveType hiveType = field.getType();
if (hiveType.isSupportedType()) {
columns.add(new HiveColumnHandle(field.getName(), hiveType, hiveType.getTypeSignature(), hiveColumnIndex, REGULAR, field.getComment(), Optional.empty()));
}
hiveColumnIndex++;
}
return columns.build();
}
public static List<HiveColumnHandle> getPartitionKeyColumnHandles(Table table)
{
ImmutableList.Builder<HiveColumnHandle> columns = ImmutableList.builder();
List<Column> partitionKeys = table.getPartitionColumns();
int partitionColumnIndex = MAX_PARTITION_KEY_COLUMN_INDEX;
for (Column field : partitionKeys) {
HiveType hiveType = field.getType();
if (!hiveType.isSupportedType()) {
throw new PrestoException(NOT_SUPPORTED, format("Unsupported Hive type %s found in partition keys of table %s.%s", hiveType, table.getDatabaseName(), table.getTableName()));
}
columns.add(new HiveColumnHandle(field.getName(), hiveType, hiveType.getTypeSignature(), partitionColumnIndex--, PARTITION_KEY, field.getComment(), Optional.empty()));
}
return columns.build();
}
@Nullable
public static String columnExtraInfo(boolean partitionKey)
{
return partitionKey ? "partition key" : null;
}
public static Optional<String> getPrefilledColumnValue(HiveColumnHandle columnHandle, HivePartitionKey partitionKey, HiveFileSplit fileSplit, OptionalInt bucketNumber)
{
if (partitionKey != null) {
return partitionKey.getValue();
}
if (isPathColumnHandle(columnHandle)) {
return Optional.of(fileSplit.getPath());
}
if (isBucketColumnHandle(columnHandle)) {
if (!bucketNumber.isPresent()) {
throw new PrestoException(HIVE_TABLE_BUCKETING_IS_IGNORED, "Table bucketing is ignored. The virtual \"$bucket\" column cannot be referenced.");
}
return Optional.of(String.valueOf(bucketNumber.getAsInt()));
}
if (isFileSizeColumnHandle(columnHandle)) {
return Optional.of(String.valueOf(fileSplit.getFileSize()));
}
if (isFileModifiedTimeColumnHandle(columnHandle)) {
return Optional.of(String.valueOf(fileSplit.getFileModifiedTime()));
}
throw new PrestoException(NOT_SUPPORTED, "unsupported hidden column: " + columnHandle);
}
public static void closeWithSuppression(RecordCursor recordCursor, Throwable throwable)
{
requireNonNull(recordCursor, "recordCursor is null");
requireNonNull(throwable, "throwable is null");
try {
recordCursor.close();
}
catch (RuntimeException e) {
// Self-suppression not permitted
if (throwable != e) {
throwable.addSuppressed(e);
}
}
}
public static List<HiveType> extractStructFieldTypes(HiveType hiveType)
{
return ((StructTypeInfo) hiveType.getTypeInfo()).getAllStructFieldTypeInfos().stream()
.map(typeInfo -> HiveType.valueOf(typeInfo.getTypeName()))
.collect(toImmutableList());
}
public static List<String> extractStructFieldNames(HiveType hiveType)
{
return ((StructTypeInfo) hiveType.getTypeInfo()).getAllStructFieldNames();
}
public static int getHeaderCount(Properties schema)
{
return getPositiveIntegerValue(schema, "skip.header.line.count", "0");
}
public static int getFooterCount(Properties schema)
{
return getPositiveIntegerValue(schema, "skip.footer.line.count", "0");
}
private static int getPositiveIntegerValue(Properties schema, String key, String defaultValue)
{
String value = schema.getProperty(key, defaultValue);
try {
int intValue = Integer.parseInt(value);
if (intValue < 0) {
throw new PrestoException(HIVE_INVALID_METADATA, format("Invalid value for %s property: %s", key, value));
}
return intValue;
}
catch (NumberFormatException e) {
throw new PrestoException(HIVE_INVALID_METADATA, format("Invalid value for %s property: %s", key, value));
}
}
public static Object typedPartitionKey(String value, Type type, String name, DateTimeZone hiveStorageTimeZone)
{
byte[] bytes = value.getBytes(UTF_8);
if (isHiveNull(bytes)) {
return null;
}
else if (type.equals(BOOLEAN)) {
return booleanPartitionKey(value, name);
}
else if (type.equals(BIGINT)) {
return bigintPartitionKey(value, name);
}
else if (type.equals(INTEGER)) {
return integerPartitionKey(value, name);
}
else if (type.equals(SMALLINT)) {
return smallintPartitionKey(value, name);
}
else if (type.equals(TINYINT)) {
return tinyintPartitionKey(value, name);
}
else if (type.equals(REAL)) {
return floatPartitionKey(value, name);
}
else if (type.equals(DOUBLE)) {
return doublePartitionKey(value, name);
}
else if (isVarcharType(type)) {
return varcharPartitionKey(value, name, type);
}
else if (isCharType(type)) {
return charPartitionKey(value, name, type);
}
else if (type.equals(DATE)) {
return datePartitionKey(value, name);
}
else if (type.equals(TIMESTAMP)) {
return timestampPartitionKey(value, hiveStorageTimeZone, name);
}
else if (isShortDecimal(type)) {
return shortDecimalPartitionKey(value, (DecimalType) type, name);
}
else if (isLongDecimal(type)) {
return longDecimalPartitionKey(value, (DecimalType) type, name);
}
else {
throw new PrestoException(NOT_SUPPORTED, format("Unsupported column type %s for partition column: %s", type.getDisplayName(), name));
}
}
public static List<HiveColumnHandle> getPhysicalHiveColumnHandles(List<HiveColumnHandle> columns, boolean useOrcColumnNames, List<OrcType> types, Path path)
{
if (!useOrcColumnNames) {
return columns;
}
List<String> columnNames = getColumnNames(types);
boolean hasColumnNames = fileHasColumnNames(columnNames);
if (!hasColumnNames) {
return columns;
}
Map<String, Integer> physicalNameOrdinalMap = buildPhysicalNameOrdinalMap(columnNames);
int nextMissingColumnIndex = physicalNameOrdinalMap.size();
ImmutableList.Builder<HiveColumnHandle> physicalColumns = ImmutableList.builder();
for (HiveColumnHandle column : columns) {
Integer physicalOrdinal = physicalNameOrdinalMap.get(column.getName());
if (physicalOrdinal == null) {
// If the column is missing from the file, assign it a column number larger than the number of columns in the
// file so the reader will fill it with nulls. If the index is negative, i.e. this is a synthesized column like
// a partitioning key, $bucket, $row_id, or $path, leave it as is.
if (column.getHiveColumnIndex() < 0) {
physicalOrdinal = column.getHiveColumnIndex();
}
else {
physicalOrdinal = nextMissingColumnIndex;
nextMissingColumnIndex++;
}
}
physicalColumns.add(new HiveColumnHandle(column.getName(), column.getHiveType(), column.getTypeSignature(), physicalOrdinal, column.getColumnType(), column.getComment(), column.getRequiredSubfields(), column.getPartialAggregation()));
}
return physicalColumns.build();
}
private static List<String> getColumnNames(List<OrcType> types)
{
return types.get(0).getFieldNames();
}
private static boolean fileHasColumnNames(List<String> physicalColumnNames)
{
return physicalColumnNames.isEmpty() || !physicalColumnNames.stream().allMatch(physicalColumnName -> DEFAULT_HIVE_COLUMN_NAME_PATTERN.matcher(physicalColumnName).matches());
}
private static Map<String, Integer> buildPhysicalNameOrdinalMap(List<String> columnNames)
{
ImmutableMap.Builder<String, Integer> physicalNameOrdinalMap = ImmutableMap.builder();
int ordinal = 0;
for (String physicalColumnName : columnNames) {
physicalNameOrdinalMap.put(physicalColumnName, ordinal);
ordinal++;
}
return physicalNameOrdinalMap.build();
}
/**
* Translates Presto type that is incompatible (cannot be stored in a Hive table) to a compatible type with the same physical layout.
* This allows to store more data types in a Hive temporary table than the Hive permanent tables support.
*/
public static List<ColumnMetadata> translateHiveUnsupportedTypesForTemporaryTable(List<ColumnMetadata> columns, TypeManager typeManager)
{
return columns.stream()
.map(column -> ColumnMetadata.builder()
.setName(column.getName())
.setType(translateHiveUnsupportedTypeForTemporaryTable(column.getType(), typeManager))
.setNullable(column.isNullable())
.setComment(column.getComment().orElse(null))
.setExtraInfo(column.getExtraInfo().orElse(null))
.setHidden(column.isHidden())
.setProperties(column.getProperties())
.build())
.collect(toImmutableList());
}
public static Type translateHiveUnsupportedTypeForTemporaryTable(Type type, TypeManager typeManager)
{
return typeManager.getType(translateHiveUnsupportedTypeSignatureForTemporaryTable(type.getTypeSignature()));
}
private static TypeSignature translateHiveUnsupportedTypeSignatureForTemporaryTable(TypeSignature typeSignature)
{
List<TypeSignatureParameter> parameters = typeSignature.getParameters();
if (typeSignature.getBase().equals("unknown")) {
return new TypeSignature(StandardTypes.BOOLEAN);
}
if (typeSignature.getBase().equals(StandardTypes.ROW)) {
ImmutableList.Builder<TypeSignatureParameter> updatedParameters = ImmutableList.builder();
for (int i = 0; i < parameters.size(); i++) {
TypeSignatureParameter typeSignatureParameter = parameters.get(i);
checkArgument(typeSignatureParameter.isNamedTypeSignature(), "unexpected row type signature parameter: %s", typeSignatureParameter);
NamedTypeSignature namedTypeSignature = typeSignatureParameter.getNamedTypeSignature();
int parameterIdx = i;
updatedParameters.add(TypeSignatureParameter.of(new NamedTypeSignature(
Optional.of(namedTypeSignature.getFieldName().orElseGet(() -> new RowFieldName("_field_" + parameterIdx, false))),
translateHiveUnsupportedTypeSignatureForTemporaryTable(namedTypeSignature.getTypeSignature()))));
}
return new TypeSignature(StandardTypes.ROW, updatedParameters.build());
}
if (!parameters.isEmpty()) {
ImmutableList.Builder<TypeSignatureParameter> updatedParameters = ImmutableList.builder();
for (TypeSignatureParameter parameter : parameters) {
switch (parameter.getKind()) {
case LONG:
case VARIABLE:
updatedParameters.add(parameter);
continue;
case TYPE:
updatedParameters.add(TypeSignatureParameter.of(translateHiveUnsupportedTypeSignatureForTemporaryTable(parameter.getTypeSignature())));
break;
case NAMED_TYPE:
NamedTypeSignature namedTypeSignature = parameter.getNamedTypeSignature();
updatedParameters.add(TypeSignatureParameter.of(new NamedTypeSignature(
namedTypeSignature.getFieldName(),
translateHiveUnsupportedTypeSignatureForTemporaryTable(namedTypeSignature.getTypeSignature()))));
break;
default:
throw new IllegalArgumentException("Unexpected parameter type: " + parameter.getKind());
}
}
return new TypeSignature(typeSignature.getBase(), updatedParameters.build());
}
return typeSignature;
}
public static <T> byte[] serializeZstdCompressed(Codec<T> codec, T instance)
{
try (ByteArrayOutputStream output = new ByteArrayOutputStream();
ZstdOutputStreamNoFinalizer zstdOutput = new ZstdOutputStreamNoFinalizer(output)) {
codec.writeBytes(zstdOutput, instance);
zstdOutput.close();
output.close();
return output.toByteArray();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public static <T> T deserializeZstdCompressed(Codec<T> codec, byte[] bytes)
{
try (InputStream input = new ByteArrayInputStream(bytes);
ZstdInputStreamNoFinalizer zstdInput = new ZstdInputStreamNoFinalizer(input)) {
return codec.readBytes(zstdInput);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public static Map<String, String> buildDirectoryContextProperties(ConnectorSession session)
{
ImmutableMap.Builder<String, String> directoryContextProperties = ImmutableMap.builder();
directoryContextProperties.put(PRESTO_QUERY_ID, session.getQueryId());
session.getSource().ifPresent(source -> directoryContextProperties.put(PRESTO_QUERY_SOURCE, source));
session.getClientInfo().ifPresent(clientInfo -> directoryContextProperties.put(PRESTO_CLIENT_INFO, clientInfo));
getMetastoreHeaders(session).ifPresent(metastoreHeaders -> directoryContextProperties.put(PRESTO_METASTORE_HEADER, metastoreHeaders));
directoryContextProperties.put(PRESTO_USER_NAME, session.getUser());
if (!session.getClientTags().isEmpty()) {
directoryContextProperties.put(PRESTO_CLIENT_TAGS, join(CLIENT_TAGS_DELIMITER, session.getClientTags()));
}
return directoryContextProperties.build();
}
public static List<Path> readSymlinkPaths(ExtendedFileSystem fileSystem, Iterator<HiveFileInfo> manifestFileInfos)
throws IOException
{
ImmutableList.Builder<Path> targets = ImmutableList.builder();
while (manifestFileInfos.hasNext()) {
HiveFileInfo symlink = manifestFileInfos.next();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(symlink.getPath())), UTF_8))) {
readLines(reader).stream()
.map(Path::new)
.forEach(targets::add);
}
}
return targets.build();
}
public static List<HiveFileInfo> getTargetPathsHiveFileInfos(
Path path,
Optional<Partition> partition,
Path targetParent,
List<Path> currentTargetPaths,
HiveDirectoryContext hiveDirectoryContext,
ExtendedFileSystem targetFilesystem,
DirectoryLister directoryLister,
Table table,
NamenodeStats namenodeStats,
ConnectorSession session)
{
boolean parentPathCached = directoryLister.isPathCached(targetParent);
Map<String, HiveFileInfo> targetParentHiveFileInfos = new HashMap<>(getTargetParentHiveFileInfoMap(
partition,
targetParent,
hiveDirectoryContext,
targetFilesystem,
directoryLister,
table,
namenodeStats));
// If caching is enabled and the parent path was cached, we verify that all target paths exist in the listing.
// If any target path is missing (likely due to stale cache), we invalidate the cache for that directory
// and re-fetch the listing to ensure we don't miss any files.
if (parentPathCached && isUseListDirectoryCache(session)) {
boolean allPathsExist = currentTargetPaths.stream()
.map(Path::getPathWithoutSchemeAndAuthority)
.map(Path::toString)
.allMatch(targetParentHiveFileInfos::containsKey);
if (!allPathsExist) {
((CachingDirectoryLister) directoryLister).invalidateDirectoryListCache(Optional.of(targetParent.toString()));
targetParentHiveFileInfos.clear();
targetParentHiveFileInfos.putAll(getTargetParentHiveFileInfoMap(
partition,
targetParent,
hiveDirectoryContext,
targetFilesystem,
directoryLister,
table,
namenodeStats));
}
}
return currentTargetPaths.stream().map(targetPath -> {
HiveFileInfo hiveFileInfo = targetParentHiveFileInfos.get(getPathWithoutSchemeAndAuthority(targetPath).toString());
if (hiveFileInfo == null) {
throw new PrestoException(HIVE_FILE_NOT_FOUND, String.format("Invalid path in Symlink manifest file %s: %s does not exist", path, targetPath));
}
return hiveFileInfo;
}).collect(toImmutableList());
}
private static Map<String, HiveFileInfo> getTargetParentHiveFileInfoMap(
Optional<Partition> partition,
Path targetParent,
HiveDirectoryContext hiveDirectoryContext,
ExtendedFileSystem targetFilesystem,
DirectoryLister directoryLister,
Table table,
NamenodeStats namenodeStats)
{
Map<String, HiveFileInfo> targetParentHiveFileInfos = new HashMap<>();
Iterator<HiveFileInfo> hiveFileInfoIterator = directoryLister.list(targetFilesystem, table, targetParent, partition, namenodeStats, hiveDirectoryContext);
// We will use the path without the scheme and authority since the manifest file may contain entries both with and without them
hiveFileInfoIterator.forEachRemaining(hiveFileInfo -> targetParentHiveFileInfos.put(
getPathWithoutSchemeAndAuthority(new Path(hiveFileInfo.getPath())).toString(),
hiveFileInfo));
return targetParentHiveFileInfos;
}
}