IcebergUtil.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.GenericInternalException;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.Decimals;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.VarbinaryType;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveColumnConverterProvider;
import com.facebook.presto.hive.HivePartition;
import com.facebook.presto.hive.HivePartitionKey;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.PartitionNameWithVersion;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionOperator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.units.DataSize;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.view.View;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.Chars.isCharType;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.Decimals.isLongDecimal;
import static com.facebook.presto.common.type.Decimals.isShortDecimal;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimeType.TIME;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP_MICROSECONDS;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.metastore.MetastoreUtil.PRESTO_QUERY_ID_NAME;
import static com.facebook.presto.hive.metastore.MetastoreUtil.PRESTO_VERSION_NAME;
import static com.facebook.presto.hive.metastore.MetastoreUtil.PRESTO_VIEW_COMMENT;
import static com.facebook.presto.hive.metastore.MetastoreUtil.PRESTO_VIEW_FLAG;
import static com.facebook.presto.hive.metastore.MetastoreUtil.TABLE_COMMENT;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES;
import static com.facebook.presto.iceberg.FileContent.fromIcebergFileContent;
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_FORMAT_VERSION;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_TABLE_TIMESTAMP;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static com.facebook.presto.iceberg.IcebergPartitionType.IDENTITY;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isMergeOnReadModeEnabled;
import static com.facebook.presto.iceberg.IcebergTableProperties.getWriteDataLocation;
import static com.facebook.presto.iceberg.TypeConverter.toIcebergType;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Maps.immutableEntry;
import static com.google.common.collect.Streams.mapWithIndex;
import static com.google.common.collect.Streams.stream;
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.Double.doubleToRawLongBits;
import static java.lang.Double.longBitsToDouble;
import static java.lang.Double.parseDouble;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Float.parseFloat;
import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong;
import static java.lang.Math.toIntExact;
import static java.lang.Math.ulp;
import static java.lang.String.format;
import static java.util.Collections.emptyIterator;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_ENABLED;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES;
import static org.apache.iceberg.LocationProviders.locationsFor;
import static org.apache.iceberg.MetadataTableUtils.createMetadataTableInstance;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.MERGE_MODE;
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED;
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX;
import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT;
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS;
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_FOLDER_STORAGE_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
import static org.apache.iceberg.types.Type.TypeID.BINARY;
import static org.apache.iceberg.types.Type.TypeID.FIXED;
public final class IcebergUtil
{
private static final Logger log = Logger.get(IcebergUtil.class);
public static final int MIN_FORMAT_VERSION_FOR_DELETE = 2;
public static final long DOUBLE_POSITIVE_ZERO = 0x0000000000000000L;
public static final long DOUBLE_POSITIVE_INFINITE = 0x7ff0000000000000L;
public static final long DOUBLE_NEGATIVE_ZERO = 0x8000000000000000L;
public static final long DOUBLE_NEGATIVE_INFINITE = 0xfff0000000000000L;
public static final int REAL_POSITIVE_ZERO = 0x00000000;
public static final int REAL_POSITIVE_INFINITE = 0x7f800000;
public static final int REAL_NEGATIVE_ZERO = 0x80000000;
public static final int REAL_NEGATIVE_INFINITE = 0xff800000;
protected static final String VIEW_OWNER = "view_owner";
private IcebergUtil() {}
public static boolean isIcebergTable(com.facebook.presto.hive.metastore.Table table)
{
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP));
}
public static Table getIcebergTable(ConnectorMetadata metadata, ConnectorSession session, SchemaTableName table)
{
checkArgument(metadata instanceof IcebergAbstractMetadata, "metadata must be instance of IcebergAbstractMetadata!");
IcebergAbstractMetadata icebergMetadata = (IcebergAbstractMetadata) metadata;
return icebergMetadata.getIcebergTable(session, table);
}
public static Table getShallowWrappedIcebergTable(Schema schema, PartitionSpec spec, Map<String, String> properties, Optional<SortOrder> sortOrder)
{
return new PrestoIcebergTableForMetricsConfig(schema, spec, properties, sortOrder);
}
public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, IcebergHiveTableOperationsConfig config, ManifestFileCache manifestFileCache, ConnectorSession session, IcebergCatalogName catalogName, SchemaTableName table)
{
HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName());
TableOperations operations = new HiveTableOperations(
metastore,
new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats()),
hdfsEnvironment,
hdfsContext,
config,
manifestFileCache,
table.getSchemaName(),
table.getTableName());
return new BaseTable(operations, fullTableName(catalogName.getCatalogName(), TableIdentifier.of(table.getSchemaName(), table.getTableName())));
}
public static Table getNativeIcebergTable(IcebergNativeCatalogFactory catalogFactory, ConnectorSession session, SchemaTableName table)
{
return catalogFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table, catalogFactory.isNestedNamespaceEnabled()));
}
public static View getNativeIcebergView(IcebergNativeCatalogFactory catalogFactory, ConnectorSession session, SchemaTableName table)
{
Catalog catalog = catalogFactory.getCatalog(session);
if (!(catalog instanceof ViewCatalog)) {
throw new PrestoException(NOT_SUPPORTED, "This connector does not support get views");
}
return ((ViewCatalog) catalog).loadView(toIcebergTableIdentifier(table, catalogFactory.isNestedNamespaceEnabled()));
}
public static List<IcebergColumnHandle> getPartitionKeyColumnHandles(IcebergTableHandle tableHandle, Table table, TypeManager typeManager)
{
Set<PartitionSpec> partitionSpecs = tableHandle.getIcebergTableName().getSnapshotId()
.map(snapshot -> table.snapshot(snapshot).allManifests(table.io()).stream()
.map(ManifestFile::partitionSpecId)
.map(specId -> table.specs().get(specId))
.collect(toImmutableSet()))
.orElseGet(() -> ImmutableSet.copyOf(table.specs().values())); // No snapshot, so no data. This case doesn't matter.
return table.spec().fields().stream()
.filter(field -> field.transform().isIdentity() &&
partitionSpecs.stream()
.allMatch(partitionSpec -> partitionSpec.getFieldsBySourceId(field.sourceId()).stream()
.anyMatch(partitionField -> partitionField.transform().isIdentity())))
.map(field -> IcebergColumnHandle.create(table.schema().findField(field.sourceId()), typeManager, PARTITION_KEY))
.collect(toImmutableList());
}
public static Optional<Long> resolveSnapshotIdByName(Table table, IcebergTableName name)
{
if (name.getSnapshotId().isPresent()) {
if (table.snapshot(name.getSnapshotId().get()) == null) {
throw new PrestoException(ICEBERG_INVALID_SNAPSHOT_ID, format("Invalid snapshot [%s] for table: %s", name.getSnapshotId().get(), table));
}
return name.getSnapshotId();
}
if (name.getTableType() == IcebergTableType.CHANGELOG) {
return Optional.ofNullable(SnapshotUtil.oldestAncestor(table)).map(Snapshot::snapshotId);
}
return tryGetCurrentSnapshot(table).map(Snapshot::snapshotId);
}
public static long getSnapshotIdTimeOperator(Table table, long millisUtc, VersionOperator operator)
{
return table.history().stream()
.filter(logEntry -> operator == VersionOperator.EQUAL ? logEntry.timestampMillis() <= millisUtc : logEntry.timestampMillis() < millisUtc)
.max(comparing(HistoryEntry::timestampMillis))
.orElseThrow(() -> new PrestoException(ICEBERG_INVALID_TABLE_TIMESTAMP, format("No history found based on timestamp for table %s", table.name())))
.snapshotId();
}
public static Map<String, List<String>> getPartitionFields(PartitionSpec partitionSpec, IcebergPartitionType partitionType)
{
Map<String, List<String>> partitionFields = new HashMap<>();
switch (partitionType) {
case IDENTITY:
for (int i = 0; i < partitionSpec.fields().size(); i++) {
PartitionField field = partitionSpec.fields().get(i);
if (field.transform().isIdentity()) {
partitionFields.put(field.name(), ImmutableList.of(field.transform().toString()));
}
}
break;
case ALL:
for (int i = 0; i < partitionSpec.fields().size(); i++) {
PartitionField field = partitionSpec.fields().get(i);
String sourceColumnName = partitionSpec.schema().findColumnName(field.sourceId());
partitionFields.computeIfAbsent(sourceColumnName, k -> new ArrayList<>())
.add(field.transform().toString());
}
break;
}
return partitionFields;
}
public static List<IcebergColumnHandle> getColumns(Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
{
return getColumns(schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager);
}
public static List<IcebergColumnHandle> getColumns(Stream<Integer> fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
{
Set<String> partitionFieldNames = getPartitionFields(partitionSpec, IDENTITY).keySet();
return fields
.map(schema::findField)
.map(column -> partitionFieldNames.contains(column.name()) ?
IcebergColumnHandle.create(column, typeManager, PARTITION_KEY) :
IcebergColumnHandle.create(column, typeManager, REGULAR))
.collect(toImmutableList());
}
public static Map<PartitionField, Integer> getIdentityPartitions(PartitionSpec partitionSpec)
{
// TODO: expose transform information in Iceberg library
ImmutableMap.Builder<PartitionField, Integer> columns = ImmutableMap.builder();
for (int i = 0; i < partitionSpec.fields().size(); i++) {
PartitionField field = partitionSpec.fields().get(i);
if (field.transform().isIdentity()) {
columns.put(field, i);
}
}
return columns.build();
}
public static Set<Integer> getPartitionSpecsIncludingValidData(Table icebergTable, Optional<Long> snapshotId)
{
return snapshotId.map(snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream()
.filter(manifestFile -> manifestFile.hasAddedFiles() || manifestFile.hasExistingFiles())
.map(ManifestFile::partitionSpecId)
.collect(toImmutableSet()))
.orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter.
}
public static List<Column> toHiveColumns(List<NestedField> columns)
{
return columns.stream()
.map(column -> new Column(
column.name(),
HiveType.toHiveType(HiveSchemaUtil.convert(column.type())),
Optional.empty(),
Optional.empty()))
.collect(toImmutableList());
}
public static FileFormat getFileFormat(Table table)
{
return FileFormat.valueOf(table.properties()
.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)
.toUpperCase(Locale.ENGLISH));
}
public static Optional<String> getTableComment(Table table)
{
return Optional.ofNullable(table.properties().get(TABLE_COMMENT));
}
public static Optional<String> getViewComment(View view)
{
return Optional.ofNullable(view.properties().get(TABLE_COMMENT));
}
public static TableScan getTableScan(TupleDomain<IcebergColumnHandle> predicates, Optional<Long> snapshotId, Table icebergTable)
{
Expression expression = ExpressionConverter.toIcebergExpression(predicates);
TableScan tableScan = icebergTable.newScan().filter(expression);
return snapshotId
.map(id -> isSnapshot(icebergTable, id) ? tableScan.useSnapshot(id) : tableScan.asOfTime(id))
.orElse(tableScan);
}
private static boolean isSnapshot(Table icebergTable, Long id)
{
return stream(icebergTable.snapshots())
.anyMatch(snapshot -> snapshot.snapshotId() == id);
}
public static LocationProvider getLocationProvider(SchemaTableName schemaTableName, String tableLocation, Map<String, String> storageProperties)
{
if (storageProperties.containsKey(WRITE_LOCATION_PROVIDER_IMPL)) {
throw new PrestoException(NOT_SUPPORTED, "Table " + schemaTableName + " specifies " + storageProperties.get(WRITE_LOCATION_PROVIDER_IMPL) +
" as a location provider. Writing to Iceberg tables with custom location provider is not supported.");
}
return locationsFor(tableLocation, storageProperties);
}
public static TableScan buildTableScan(Table icebergTable, MetadataTableType metadataTableType)
{
return createMetadataTableInstance(icebergTable, metadataTableType).newScan();
}
public static Map<String, Integer> columnNameToPositionInSchema(Schema schema)
{
return mapWithIndex(schema.columns().stream(),
(column, position) -> immutableEntry(column.name(), toIntExact(position)))
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
}
public static void validateTableMode(ConnectorSession session, org.apache.iceberg.Table table)
{
if (isMergeOnReadModeEnabled(session)) {
return;
}
String deleteMode = table.properties().get(DELETE_MODE);
String mergeMode = table.properties().get(MERGE_MODE);
String updateMode = table.properties().get(UPDATE_MODE);
if (Stream.of(deleteMode, mergeMode, updateMode).anyMatch(s -> Objects.equals(s, RowLevelOperationMode.MERGE_ON_READ.modeName()))) {
throw new PrestoException(NOT_SUPPORTED, "merge-on-read table mode not supported yet");
}
}
public static Map<String, String> createIcebergViewProperties(ConnectorSession session, String prestoVersion)
{
return ImmutableMap.<String, String>builder()
.put(TABLE_COMMENT, PRESTO_VIEW_COMMENT)
.put(PRESTO_VIEW_FLAG, "true")
.put(PRESTO_VERSION_NAME, prestoVersion)
.put(PRESTO_QUERY_ID_NAME, session.getQueryId())
.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE)
.put(VIEW_OWNER, session.getUser())
.build();
}
public static Optional<Map<String, String>> tryGetProperties(Table table)
{
try {
return Optional.ofNullable(table.properties());
}
catch (TableNotFoundException e) {
log.warn(String.format("Unable to fetch properties for table %s: %s", table.name(), e.getMessage()));
return Optional.empty();
}
}
public static Optional<Snapshot> tryGetCurrentSnapshot(Table table)
{
try {
return Optional.ofNullable(table.currentSnapshot());
}
catch (TableNotFoundException e) {
log.warn(String.format("Unable to fetch snapshot for table %s: %s", table.name(), e.getMessage()));
return Optional.empty();
}
}
public static Optional<String> tryGetLocation(Table table)
{
try {
return Optional.ofNullable(table.location());
}
catch (TableNotFoundException e) {
log.warn(String.format("Unable to fetch location for table %s: %s", table.name(), e.getMessage()));
return Optional.empty();
}
}
public static List<SortField> getSortFields(Table table)
{
try {
return table.sortOrder().fields().stream()
.filter(field -> field.transform().isIdentity())
.map(SortField::fromIceberg)
.collect(toImmutableList());
}
catch (Exception e) {
log.warn(String.format("Unable to fetch sort fields for table %s: %s", table.name(), e.getMessage()));
return ImmutableList.of();
}
}
private static boolean isValidPartitionType(Type type)
{
return type instanceof DecimalType ||
BOOLEAN.equals(type) ||
TINYINT.equals(type) ||
SMALLINT.equals(type) ||
INTEGER.equals(type) ||
BIGINT.equals(type) ||
REAL.equals(type) ||
DOUBLE.equals(type) ||
DATE.equals(type) ||
type instanceof TimestampType ||
TIME.equals(type) ||
VARBINARY.equals(type) ||
isVarcharType(type) ||
isCharType(type);
}
private static void verifyPartitionTypeSupported(String partitionName, Type type)
{
if (!isValidPartitionType(type)) {
throw new PrestoException(NOT_SUPPORTED, format("Unsupported type [%s] for partition: %s", type, partitionName));
}
}
private static NullableValue parsePartitionValue(
FileFormat fileFormat,
String partitionStringValue,
Type prestoType,
String partitionName)
{
verifyPartitionTypeSupported(partitionName, prestoType);
Object partitionValue = deserializePartitionValue(prestoType, partitionStringValue, partitionName);
return partitionValue == null ? NullableValue.asNull(prestoType) : NullableValue.of(prestoType, partitionValue);
}
// Strip the constraints on metadata columns like "$path", "$data_sequence_number" from the list.
public static <U> TupleDomain<IcebergColumnHandle> getNonMetadataColumnConstraints(TupleDomain<U> allConstraints)
{
return allConstraints.transform(c -> isMetadataColumnId(((IcebergColumnHandle) c).getId()) ? null : (IcebergColumnHandle) c);
}
public static <U> TupleDomain<IcebergColumnHandle> getMetadataColumnConstraints(TupleDomain<U> allConstraints)
{
return allConstraints.transform(c -> isMetadataColumnId(((IcebergColumnHandle) c).getId()) ? (IcebergColumnHandle) c : null);
}
public static boolean metadataColumnsMatchPredicates(TupleDomain<IcebergColumnHandle> constraints, String path, long dataSequenceNumber)
{
if (constraints.isAll()) {
return true;
}
boolean matches = true;
if (constraints.getDomains().isPresent()) {
for (Map.Entry<IcebergColumnHandle, Domain> constraint : constraints.getDomains().get().entrySet()) {
if (constraint.getKey() == PATH_COLUMN_HANDLE) {
matches &= constraint.getValue().includesNullableValue(utf8Slice(path));
}
else if (constraint.getKey() == DATA_SEQUENCE_NUMBER_COLUMN_HANDLE) {
matches &= constraint.getValue().includesNullableValue(dataSequenceNumber);
}
}
}
return matches;
}
public static List<HivePartition> getPartitions(
TypeManager typeManager,
ConnectorTableHandle tableHandle,
Table icebergTable,
Constraint<ColumnHandle> constraint,
List<IcebergColumnHandle> partitionColumns)
{
IcebergTableName name = ((IcebergTableHandle) tableHandle).getIcebergTableName();
FileFormat fileFormat = getFileFormat(icebergTable);
// Empty iceberg table would cause `snapshotId` not present
Optional<Long> snapshotId = resolveSnapshotIdByName(icebergTable, name);
if (!snapshotId.isPresent()) {
return ImmutableList.of();
}
TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(getNonMetadataColumnConstraints(constraint
.getSummary()
.simplify())))
.useSnapshot(snapshotId.get());
Set<HivePartition> partitions = new HashSet<>();
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
for (FileScanTask fileScanTask : fileScanTasks) {
// If exists delete files, skip the metadata optimization based on partition values as they might become incorrect
if (!fileScanTask.deletes().isEmpty()) {
return ImmutableList.of(new HivePartition(((IcebergTableHandle) tableHandle).getSchemaTableName()));
}
StructLike partition = fileScanTask.file().partition();
PartitionSpec spec = fileScanTask.spec();
Map<PartitionField, Integer> fieldToIndex = getIdentityPartitions(spec);
ImmutableMap.Builder<ColumnHandle, NullableValue> builder = ImmutableMap.builder();
fieldToIndex.forEach((field, index) -> {
int id = field.sourceId();
org.apache.iceberg.types.Type type = spec.schema().findType(id);
Class<?> javaClass = type.typeId().javaClass();
Object value = partition.get(index, javaClass);
String partitionStringValue;
if (value == null) {
partitionStringValue = null;
}
else {
if (type.typeId() == FIXED || type.typeId() == BINARY) {
partitionStringValue = Base64.getEncoder().encodeToString(((ByteBuffer) value).array());
}
else {
partitionStringValue = value.toString();
}
}
NullableValue partitionValue = parsePartitionValue(fileFormat, partitionStringValue, toPrestoType(type, typeManager), partition.toString());
Optional<IcebergColumnHandle> column = partitionColumns.stream()
.filter(icebergColumnHandle -> Objects.equals(icebergColumnHandle.getId(), field.sourceId()))
.findAny();
if (column.isPresent()) {
builder.put(column.get(), partitionValue);
}
});
Map<ColumnHandle, NullableValue> values = builder.build();
HivePartition newPartition = new HivePartition(
((IcebergTableHandle) tableHandle).getSchemaTableName(),
new PartitionNameWithVersion(partition.toString(), Optional.empty()),
values);
boolean isIncludePartition = true;
Map<ColumnHandle, Domain> domains = constraint.getSummary().getDomains().get();
for (IcebergColumnHandle column : partitionColumns) {
NullableValue value = newPartition.getKeys().get(column);
Domain allowedDomain = domains.get(column);
if (allowedDomain != null && !allowedDomain.includesNullableValue(value.getValue())) {
isIncludePartition = false;
break;
}
}
if (constraint.predicate().isPresent() && !constraint.predicate().get().test(newPartition.getKeys())) {
isIncludePartition = false;
}
if (isIncludePartition) {
partitions.add(newPartition);
}
}
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
return new ArrayList<>(partitions);
}
public static Optional<Schema> tryGetSchema(Table table)
{
try {
return Optional.ofNullable(table.schema());
}
catch (TableNotFoundException e) {
log.warn(String.format("Unable to fetch schema for table %s: %s", table.name(), e.getMessage()));
return Optional.empty();
}
}
public static Schema schemaFromHandles(List<IcebergColumnHandle> columns)
{
List<NestedField> icebergColumns = columns.stream()
.map(column -> NestedField.optional(column.getId(), column.getName(), toIcebergType(column.getType())))
.collect(toImmutableList());
return new Schema(Types.StructType.of(icebergColumns).asStructType().fields());
}
public static Object deserializePartitionValue(Type type, String valueString, String name)
{
if (valueString == null) {
return null;
}
try {
if (type.equals(BOOLEAN)) {
if (valueString.equalsIgnoreCase("true")) {
return true;
}
if (valueString.equalsIgnoreCase("false")) {
return false;
}
throw new IllegalArgumentException();
}
if (type.equals(INTEGER)) {
return parseLong(valueString);
}
if (type.equals(BIGINT)) {
return parseLong(valueString);
}
if (type.equals(REAL)) {
return (long) floatToRawIntBits(parseFloat(valueString));
}
if (type.equals(DOUBLE)) {
return parseDouble(valueString);
}
if (type.equals(TIMESTAMP) || type.equals(TIME)) {
return MICROSECONDS.toMillis(parseLong(valueString));
}
if (type.equals(DATE) || type.equals(TIMESTAMP_MICROSECONDS)) {
return parseLong(valueString);
}
if (type instanceof VarcharType) {
return utf8Slice(valueString);
}
if (type.equals(VarbinaryType.VARBINARY)) {
return wrappedBuffer(Base64.getDecoder().decode(valueString));
}
if (isShortDecimal(type) || isLongDecimal(type)) {
DecimalType decimalType = (DecimalType) type;
BigDecimal decimal = new BigDecimal(valueString);
decimal = decimal.setScale(decimalType.getScale(), BigDecimal.ROUND_UNNECESSARY);
checkArgument(decimal.precision() <= decimalType.getPrecision());
BigInteger unscaledValue = decimal.unscaledValue();
return isShortDecimal(type) ? unscaledValue.longValue() : Decimals.encodeUnscaledValue(unscaledValue);
}
}
catch (IllegalArgumentException e) {
throw new PrestoException(ICEBERG_INVALID_PARTITION_VALUE, format(
"Invalid partition value '%s' for %s partition key: %s",
valueString,
type.getDisplayName(),
name));
}
// Iceberg tables don't partition by non-primitive-type columns.
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Invalid partition type " + type.toString());
}
/**
* Returns the adjacent value that compares bigger than or less than {@code value} based on parameter {@code isPrevious}.
* <p>
* The type of the value must match {@code #type.getJavaType()}.
*
* @throws IllegalStateException if the type is not {@code #isOrderable()}
*/
public static Optional<Object> getAdjacentValue(Type type, Object value, boolean isPrevious)
{
if (!type.isOrderable()) {
throw new IllegalStateException("Type is not orderable: " + type);
}
requireNonNull(value, "value is null");
if (type.equals(BIGINT) || type instanceof TimestampType) {
return getBigintAdjacentValue(value, isPrevious);
}
if (type.equals(INTEGER) || type.equals(DATE)) {
return getIntegerAdjacentValue(value, isPrevious);
}
if (type.equals(SMALLINT)) {
return getSmallIntAdjacentValue(value, isPrevious);
}
if (type.equals(TINYINT)) {
return getTinyIntAdjacentValue(value, isPrevious);
}
if (type.equals(DOUBLE)) {
return getDoubleAdjacentValue(value, isPrevious);
}
if (type.equals(REAL)) {
return getRealAdjacentValue(value, isPrevious);
}
return Optional.empty();
}
public static Map<Integer, HivePartitionKey> getPartitionKeys(ContentScanTask<DataFile> scanTask)
{
StructLike partition = scanTask.file().partition();
PartitionSpec spec = scanTask.spec();
return getPartitionKeys(spec, partition);
}
public static Map<Integer, HivePartitionKey> getPartitionKeys(PartitionSpec spec, StructLike partition)
{
Map<Integer, HivePartitionKey> partitionKeys = new HashMap<>();
int index = 0;
for (PartitionField field : spec.fields()) {
int sourceId = field.sourceId();
String colName = field.name();
org.apache.iceberg.types.Type sourceType = spec.schema().findType(sourceId);
org.apache.iceberg.types.Type type = field.transform().getResultType(sourceType);
Class<?> javaClass = type.typeId().javaClass();
Object value = partition.get(index, javaClass);
if (value == null) {
partitionKeys.put(field.fieldId(), new HivePartitionKey(colName, Optional.empty()));
}
else {
HivePartitionKey partitionValue;
if (type.typeId() == FIXED || type.typeId() == BINARY) {
// this is safe because Iceberg PartitionData directly wraps the byte array
partitionValue = new HivePartitionKey(colName, Optional.of(Base64.getEncoder().encodeToString(((ByteBuffer) value).array())));
}
else {
partitionValue = new HivePartitionKey(colName, Optional.of(value.toString()));
}
partitionKeys.put(field.fieldId(), partitionValue);
if (field.transform().isIdentity()) {
partitionKeys.put(sourceId, partitionValue);
}
}
index += 1;
}
return Collections.unmodifiableMap(partitionKeys);
}
public static Map<String, String> loadCachingProperties(IcebergConfig icebergConfig)
{
return ImmutableMap.<String, String>builderWithExpectedSize(4)
.put(IO_MANIFEST_CACHE_ENABLED, "true")
.put(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, String.valueOf(icebergConfig.getMaxManifestCacheSize()))
.put(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, String.valueOf(icebergConfig.getManifestCacheMaxContentLength()))
.put(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, String.valueOf(icebergConfig.getManifestCacheExpireDuration()))
.build();
}
public static long getDataSequenceNumber(ContentFile<?> file)
{
if (file.dataSequenceNumber() != null) {
return file.dataSequenceNumber();
}
return file.fileSequenceNumber();
}
/**
* Provides the delete files that need to be applied to the given table snapshot.
*
* @param table The table to provide deletes for
* @param snapshot The snapshot id to use
* @param filter Filters to apply during planning
* @param requestedPartitionSpec If provided, only delete files for this partition spec will be provided
* @param requestedSchema If provided, only delete files with this schema will be provided
*/
public static CloseableIterable<DeleteFile> getDeleteFiles(Table table,
long snapshot,
TupleDomain<IcebergColumnHandle> filter,
Optional<Set<Integer>> requestedPartitionSpec,
Optional<Set<Integer>> requestedSchema)
{
Expression filterExpression = toIcebergExpression(filter);
CloseableIterable<FileScanTask> fileTasks = table.newScan().useSnapshot(snapshot).filter(filterExpression).planFiles();
return new CloseableIterable<DeleteFile>()
{
@Override
public void close()
throws IOException
{
fileTasks.close();
}
@Override
public CloseableIterator<DeleteFile> iterator()
{
return new DeleteFilesIterator(table.specs(), fileTasks.iterator(), requestedPartitionSpec, requestedSchema);
}
};
}
private static Optional<Object> getBigintAdjacentValue(Object value, boolean isPrevious)
{
long currentValue = (long) value;
if (isPrevious) {
if (currentValue == Long.MIN_VALUE) {
return Optional.empty();
}
return Optional.of(currentValue - 1);
}
else {
if (currentValue == Long.MAX_VALUE) {
return Optional.empty();
}
return Optional.of(currentValue + 1);
}
}
private static Optional<Object> getIntegerAdjacentValue(Object value, boolean isPrevious)
{
long currentValue = toIntExact((long) value);
if (isPrevious) {
if (currentValue == Integer.MIN_VALUE) {
return Optional.empty();
}
return Optional.of(currentValue - 1);
}
else {
if (currentValue == Integer.MAX_VALUE) {
return Optional.empty();
}
return Optional.of(currentValue + 1);
}
}
private static Optional<Object> getSmallIntAdjacentValue(Object value, boolean isPrevious)
{
long currentValue = (long) value;
if (currentValue > Short.MAX_VALUE) {
throw new GenericInternalException(format("Value %d exceeds MAX_SHORT", value));
}
if (currentValue < Short.MIN_VALUE) {
throw new GenericInternalException(format("Value %d is less than MIN_SHORT", value));
}
if (isPrevious) {
if (currentValue == Short.MIN_VALUE) {
return Optional.empty();
}
return Optional.of(currentValue - 1);
}
else {
if (currentValue == Short.MAX_VALUE) {
return Optional.empty();
}
return Optional.of(currentValue + 1);
}
}
private static Optional<Object> getTinyIntAdjacentValue(Object value, boolean isPrevious)
{
long currentValue = (long) value;
if (currentValue > Byte.MAX_VALUE) {
throw new GenericInternalException(format("Value %d exceeds MAX_BYTE", value));
}
if (currentValue < Byte.MIN_VALUE) {
throw new GenericInternalException(format("Value %d is less than MIN_BYTE", value));
}
if (isPrevious) {
if (currentValue == Byte.MIN_VALUE) {
return Optional.empty();
}
return Optional.of(currentValue - 1);
}
else {
if (currentValue == Byte.MAX_VALUE) {
return Optional.empty();
}
return Optional.of(currentValue + 1);
}
}
private static Optional<Object> getDoubleAdjacentValue(Object value, boolean isPrevious)
{
long longBitForDouble = (long) value;
if (longBitForDouble > DOUBLE_POSITIVE_INFINITE && longBitForDouble < DOUBLE_NEGATIVE_ZERO ||
longBitForDouble > DOUBLE_NEGATIVE_INFINITE && longBitForDouble < DOUBLE_POSITIVE_ZERO) {
throw new GenericInternalException(format("Value %d exceeds the range of double", longBitForDouble));
}
if (isPrevious) {
if (longBitForDouble == DOUBLE_NEGATIVE_INFINITE) {
return Optional.empty();
}
if (longBitForDouble == DOUBLE_POSITIVE_INFINITE) {
return Optional.of(DOUBLE_POSITIVE_INFINITE - 1);
}
double currentValue = longBitsToDouble(longBitForDouble);
return Optional.of(doubleToRawLongBits(currentValue - ulp(currentValue)));
}
else {
if (longBitForDouble == DOUBLE_POSITIVE_INFINITE) {
return Optional.empty();
}
if (longBitForDouble == DOUBLE_NEGATIVE_INFINITE) {
return Optional.of(DOUBLE_NEGATIVE_INFINITE - 1);
}
double currentValue = longBitsToDouble(longBitForDouble);
return Optional.of(doubleToRawLongBits(currentValue + ulp(currentValue)));
}
}
private static Optional<Object> getRealAdjacentValue(Object value, boolean isPrevious)
{
int intBitForFloat = (int) value;
if (intBitForFloat > REAL_POSITIVE_INFINITE && intBitForFloat < REAL_NEGATIVE_ZERO ||
intBitForFloat > REAL_NEGATIVE_INFINITE && intBitForFloat < REAL_POSITIVE_ZERO) {
throw new GenericInternalException(format("Value %d exceeds the range of real", intBitForFloat));
}
if (isPrevious) {
if (intBitForFloat == REAL_NEGATIVE_INFINITE) {
return Optional.empty();
}
if (intBitForFloat == REAL_POSITIVE_INFINITE) {
return Optional.of(REAL_POSITIVE_INFINITE - 1);
}
float currentValue = intBitsToFloat(intBitForFloat);
return Optional.of(floatToRawIntBits(currentValue - ulp(currentValue)));
}
else {
if (intBitForFloat == REAL_POSITIVE_INFINITE) {
return Optional.empty();
}
if (intBitForFloat == REAL_NEGATIVE_INFINITE) {
return Optional.of(REAL_NEGATIVE_INFINITE - 1);
}
float currentValue = intBitsToFloat(intBitForFloat);
return Optional.of(floatToRawIntBits(currentValue + ulp(currentValue)));
}
}
private static class DeleteFilesIterator
implements CloseableIterator<DeleteFile>
{
private final Set<String> seenFiles = new HashSet<>();
private final Map<Integer, PartitionSpec> partitionSpecsById;
private CloseableIterator<FileScanTask> fileTasks;
private final Optional<Set<Integer>> requestedPartitionSpec;
private final Optional<Set<Integer>> requestedSchema;
private Iterator<DeleteFile> currentDeletes = emptyIterator();
private DeleteFile currentFile;
private DeleteFilesIterator(Map<Integer, PartitionSpec> partitionSpecsById,
CloseableIterator<FileScanTask> fileTasks,
Optional<Set<Integer>> requestedPartitionSpec,
Optional<Set<Integer>> requestedSchema)
{
this.partitionSpecsById = partitionSpecsById;
this.fileTasks = fileTasks;
this.requestedPartitionSpec = requestedPartitionSpec;
this.requestedSchema = requestedSchema;
}
@Override
public boolean hasNext()
{
return currentFile != null || advance();
}
private boolean advance()
{
currentFile = null;
while (currentFile == null && (currentDeletes.hasNext() || fileTasks.hasNext())) {
if (!currentDeletes.hasNext()) {
currentDeletes = fileTasks.next().deletes().iterator();
}
while (currentDeletes.hasNext()) {
DeleteFile deleteFile = currentDeletes.next();
if (shouldIncludeFile(deleteFile, partitionSpecsById.get(deleteFile.specId()))) {
// If there is a requested schema only include files that match it
if (seenFiles.add(deleteFile.path().toString())) {
currentFile = deleteFile;
return true;
}
}
}
}
return false;
}
@Override
public DeleteFile next()
{
DeleteFile result = currentFile;
advance();
return result;
}
private boolean shouldIncludeFile(DeleteFile file, PartitionSpec partitionSpec)
{
boolean matchesPartition = !requestedPartitionSpec.isPresent() ||
requestedPartitionSpec.get().equals(partitionSpec.fields().stream().map(PartitionField::fieldId).collect(Collectors.toSet()));
return matchesPartition &&
(fromIcebergFileContent(file.content()) == POSITION_DELETES ||
equalityFieldIdsFulfillRequestSchema(file, partitionSpec));
}
private boolean equalityFieldIdsFulfillRequestSchema(DeleteFile file, PartitionSpec partitionSpec)
{
Set<Integer> identityPartitionSourceIds = partitionSpec.fields().stream()
.filter(partitionField -> partitionField.transform().isIdentity())
.map(PartitionField::sourceId).collect(Collectors.toSet());
// Column ids in `requestedSchema` do not include identity partition columns for the sake of `delete-schema-merging` within the same partition spec.
// So we need to filter out the identity partition columns from delete files' `equalityFiledIds` when determine if they fulfill the `requestedSchema`.
return !requestedSchema.isPresent() ||
requestedSchema.get().equals(Sets.difference(ImmutableSet.copyOf(file.equalityFieldIds()), identityPartitionSourceIds));
}
@Override
public void close()
throws IOException
{
fileTasks.close();
// TODO: remove this after org.apache.iceberg.io.CloseableIterator'withClose
// correct release resources holds by iterator.
// (and make it final)
fileTasks = CloseableIterator.empty();
}
}
public static Map<String, String> populateTableProperties(IcebergAbstractMetadata metadata, ConnectorTableMetadata tableMetadata, IcebergTableProperties tableProperties, FileFormat fileFormat, ConnectorSession session)
{
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builderWithExpectedSize(5);
String writeDataLocation = getWriteDataLocation(tableMetadata.getProperties());
if (!isNullOrEmpty(writeDataLocation)) {
propertiesBuilder.put(WRITE_DATA_LOCATION, writeDataLocation);
}
else {
Optional<String> dataLocation = metadata.getDataLocationBasedOnWarehouseDataDir(tableMetadata.getTable());
dataLocation.ifPresent(location -> propertiesBuilder.put(WRITE_DATA_LOCATION, location));
}
Integer commitRetries = tableProperties.getCommitRetries(session, tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries));
switch (fileFormat) {
case PARQUET:
propertiesBuilder.put(PARQUET_COMPRESSION, getCompressionCodec(session).getParquetCompressionCodec().get().toString());
break;
case ORC:
propertiesBuilder.put(ORC_COMPRESSION, getCompressionCodec(session).getOrcCompressionKind().name());
break;
}
if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}
String formatVersion = tableProperties.getFormatVersion(session, tableMetadata.getProperties());
verify(formatVersion != null, "Format version cannot be null");
propertiesBuilder.put(FORMAT_VERSION, formatVersion);
if (parseFormatVersion(formatVersion) < MIN_FORMAT_VERSION_FOR_DELETE) {
propertiesBuilder.put(DELETE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
propertiesBuilder.put(UPDATE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
}
else {
RowLevelOperationMode deleteMode = tableProperties.getDeleteMode(session, tableMetadata.getProperties());
propertiesBuilder.put(DELETE_MODE, deleteMode.modeName());
RowLevelOperationMode updateMode = tableProperties.getUpdateMode(tableMetadata.getProperties());
propertiesBuilder.put(UPDATE_MODE, updateMode.modeName());
}
Integer metadataPreviousVersionsMax = tableProperties.getMetadataPreviousVersionsMax(session, tableMetadata.getProperties());
propertiesBuilder.put(METADATA_PREVIOUS_VERSIONS_MAX, String.valueOf(metadataPreviousVersionsMax));
Boolean metadataDeleteAfterCommit = tableProperties.isMetadataDeleteAfterCommit(session, tableMetadata.getProperties());
propertiesBuilder.put(METADATA_DELETE_AFTER_COMMIT_ENABLED, String.valueOf(metadataDeleteAfterCommit));
Integer metricsMaxInferredColumn = tableProperties.getMetricsMaxInferredColumn(session, tableMetadata.getProperties());
propertiesBuilder.put(METRICS_MAX_INFERRED_COLUMN_DEFAULTS, String.valueOf(metricsMaxInferredColumn));
propertiesBuilder.put(SPLIT_SIZE, String.valueOf(IcebergTableProperties.getTargetSplitSize(tableMetadata.getProperties())));
return propertiesBuilder.build();
}
public static int parseFormatVersion(String formatVersion)
{
try {
return parseInt(formatVersion);
}
catch (NumberFormatException | IndexOutOfBoundsException e) {
throw new PrestoException(ICEBERG_INVALID_FORMAT_VERSION, "Unable to parse user provided format version");
}
}
public static RowLevelOperationMode getDeleteMode(Table table)
{
return RowLevelOperationMode.fromName(table.properties()
.getOrDefault(DELETE_MODE, DELETE_MODE_DEFAULT)
.toUpperCase(Locale.ENGLISH));
}
public static RowLevelOperationMode getUpdateMode(Table table)
{
return RowLevelOperationMode.fromName(table.properties()
.getOrDefault(UPDATE_MODE, UPDATE_MODE_DEFAULT)
.toUpperCase(Locale.ENGLISH));
}
public static int getMetadataPreviousVersionsMax(Table table)
{
return Integer.parseInt(table.properties()
.getOrDefault(METADATA_PREVIOUS_VERSIONS_MAX,
String.valueOf(METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
}
public static boolean isMetadataDeleteAfterCommit(Table table)
{
return Boolean.valueOf(table.properties()
.getOrDefault(METADATA_DELETE_AFTER_COMMIT_ENABLED,
String.valueOf(METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
}
public static int getMetricsMaxInferredColumn(Table table)
{
return Integer.parseInt(table.properties()
.getOrDefault(METRICS_MAX_INFERRED_COLUMN_DEFAULTS,
String.valueOf(METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT)));
}
public static Optional<PartitionData> partitionDataFromJson(PartitionSpec spec, Optional<String> partitionDataAsJson)
{
org.apache.iceberg.types.Type[] partitionColumnTypes = spec.fields().stream()
.map(field -> field.transform().getResultType(
spec.schema().findType(field.sourceId())))
.toArray(org.apache.iceberg.types.Type[]::new);
Optional<PartitionData> partitionData = Optional.empty();
if (spec.isPartitioned()) {
verify(partitionDataAsJson.isPresent(), "partitionDataJson is null");
partitionData = Optional.of(PartitionData.fromJson(partitionDataAsJson.get(), partitionColumnTypes));
}
return partitionData;
}
public static Optional<PartitionData> partitionDataFromStructLike(PartitionSpec spec, StructLike partition)
{
org.apache.iceberg.types.Type[] partitionColumnTypes = spec.fields().stream()
.map(field -> field.transform().getResultType(
spec.schema().findType(field.sourceId())))
.toArray(org.apache.iceberg.types.Type[]::new);
Optional<PartitionData> partitionData = Optional.empty();
if (spec.isPartitioned()) {
partitionData = Optional.of(PartitionData.fromStructLike(partition, partitionColumnTypes));
}
return partitionData;
}
/**
* Get the metadata location for target {@link Table},
* considering iceberg table properties {@code WRITE_METADATA_LOCATION}
*/
public static String metadataLocation(Table icebergTable)
{
String metadataLocation = icebergTable.properties().get(WRITE_METADATA_LOCATION);
if (metadataLocation != null) {
return String.format("%s", LocationUtil.stripTrailingSlash(metadataLocation));
}
else {
return String.format("%s/%s", icebergTable.location(), "metadata");
}
}
/**
* Get the data location for target {@link Table},
* considering iceberg table properties {@code WRITE_DATA_LOCATION}, {@code OBJECT_STORE_PATH} and {@code WRITE_FOLDER_STORAGE_LOCATION}
*/
public static String dataLocation(Table icebergTable)
{
Map<String, String> properties = icebergTable.properties();
String dataLocation = properties.get(WRITE_DATA_LOCATION);
if (dataLocation == null) {
dataLocation = properties.get(OBJECT_STORE_PATH);
if (dataLocation == null) {
dataLocation = properties.get(WRITE_FOLDER_STORAGE_LOCATION);
if (dataLocation == null) {
dataLocation = String.format("%s/data", icebergTable.location());
}
}
}
return dataLocation;
}
public static Long getSplitSize(Table table)
{
return Long.parseLong(table.properties()
.getOrDefault(SPLIT_SIZE,
String.valueOf(SPLIT_SIZE_DEFAULT)));
}
public static DataSize getTargetSplitSize(long sessionValueProperty, long icebergScanTargetSplitSize)
{
return sessionValueProperty == 0 ?
succinctBytes(icebergScanTargetSplitSize) :
succinctBytes(sessionValueProperty);
}
public static DataSize getTargetSplitSize(ConnectorSession session, Scan<?, ?, ?> scan)
{
return getTargetSplitSize(IcebergSessionProperties.getTargetSplitSize(session), scan.targetSplitSize());
}
// This code is copied from Iceberg
private static String fullTableName(String catalogName, TableIdentifier identifier)
{
StringBuilder sb = new StringBuilder();
if (catalogName.contains("/") || catalogName.contains(":")) {
// use / for URI-like names: thrift://host:port/db.table
sb.append(catalogName);
if (!catalogName.endsWith("/")) {
sb.append("/");
}
}
else {
// use . for non-URI named catalogs: prod.db.table
sb.append(catalogName).append(".");
}
for (String level : identifier.namespace().levels()) {
sb.append(level).append(".");
}
sb.append(identifier.name());
return sb.toString();
}
}