DeltaMetadata.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.delta;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HiveStorageFormat;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.PrestoTableType;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayout;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static com.facebook.presto.delta.DeltaColumnHandle.ColumnType.PARTITION;
import static com.facebook.presto.delta.DeltaColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.delta.DeltaExpressionUtils.splitPredicate;
import static com.facebook.presto.delta.DeltaTableProperties.EXTERNAL_LOCATION_PROPERTY;
import static com.facebook.presto.delta.DeltaTableProperties.getTableStorageFormat;
import static com.facebook.presto.delta.DeltaTableProperties.isExternalTable;
import static com.facebook.presto.hive.HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER;
import static com.facebook.presto.hive.metastore.PrestoTableType.EXTERNAL_TABLE;
import static com.facebook.presto.hive.metastore.PrestoTableType.MANAGED_TABLE;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.security.PrincipalType.USER;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Collections.emptyList;
import static java.util.Locale.US;
import static java.util.Objects.requireNonNull;
public class DeltaMetadata
implements ConnectorMetadata
{
private static final Logger log = Logger.get(DeltaMetadata.class);
/**
* Special schema used when querying a Delta table by storage location.
* Ex. SELECT * FROM delta."$PATH$"."s3://bucket/path/to/table". User is not able to list any tables
* in this schema. It is just used to query a Delta table by storage location.
*/
private static final String PATH_SCHEMA = "$PATH$";
private final String connectorId;
private final DeltaClient deltaClient;
private final ExtendedHiveMetastore metastore;
private final TypeManager typeManager;
private final DeltaConfig config;
@Inject
public DeltaMetadata(
DeltaConnectorId connectorId,
DeltaClient deltaClient,
ExtendedHiveMetastore metastore,
TypeManager typeManager,
DeltaConfig config)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.deltaClient = requireNonNull(deltaClient, "deltaClient is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.config = requireNonNull(config, "config is null");
}
@Override
public List<String> listSchemaNames(ConnectorSession session)
{
ArrayList<String> schemas = new ArrayList<>();
schemas.addAll(metastore.getAllDatabases(metastoreContext(session)));
schemas.add(PATH_SCHEMA.toLowerCase(US));
return schemas;
}
@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
PrestoTableType tableType = isExternalTable(tableMetadata.getProperties()) ? EXTERNAL_TABLE : MANAGED_TABLE;
Table table = prepareTable(session, tableMetadata, tableType);
PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner());
metastore.createTable(
metastoreContext(session),
table,
principalPrivileges,
emptyList());
}
@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
DeltaTableHandle handle = (DeltaTableHandle) tableHandle;
MetastoreContext metastoreContext = metastoreContext(session);
Optional<Table> target = metastore.getTable(metastoreContext, handle.getDeltaTable().getSchemaName(), handle.getDeltaTable().getTableName());
if (!target.isPresent()) {
throw new TableNotFoundException(handle.toSchemaTableName());
}
metastore.dropTable(
metastoreContext,
handle.getDeltaTable().getSchemaName(),
handle.getDeltaTable().getTableName(),
false);
}
private static PrincipalPrivileges buildInitialPrivilegeSet(String tableOwner)
{
PrestoPrincipal owner = new PrestoPrincipal(USER, tableOwner);
return new PrincipalPrivileges(
ImmutableMultimap.<String, HivePrivilegeInfo>builder()
.put(tableOwner, new HivePrivilegeInfo(HivePrivilegeInfo.HivePrivilege.SELECT, true, owner, owner))
.build(),
ImmutableMultimap.of());
}
private Table prepareTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, PrestoTableType tableType)
{
String schemaName = tableMetadata.getTable().getSchemaName();
String tableName = tableMetadata.getTable().getTableName();
if (!tableType.equals(EXTERNAL_TABLE)) {
throw new PrestoException(NOT_SUPPORTED, "Cannot create managed Delta table");
}
Table.Builder tableBuilder = Table.builder()
.setDatabaseName(schemaName)
.setTableName(tableName)
.setOwner(session.getUser())
.setTableType(tableType);
Map<String, Object> tableProperties = tableMetadata.getProperties();
HiveStorageFormat hiveStorageFormat = getTableStorageFormat(tableMetadata.getProperties());
String tableLocation = tableProperties.get(EXTERNAL_LOCATION_PROPERTY).toString();
tableBuilder.getStorageBuilder()
.setStorageFormat(fromHiveStorageFormat(hiveStorageFormat))
.setLocation(tableLocation);
return tableBuilder.build();
}
@Override
public DeltaTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
{
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();
if (!listSchemaNames(session).contains(schemaName)) {
return null; // indicates table doesn't exist
}
DeltaTableName deltaTableName = DeltaTableName.from(tableName);
String tableLocation;
if (PATH_SCHEMA.equalsIgnoreCase(schemaName)) {
tableLocation = deltaTableName.getTableNameOrPath();
}
else {
Optional<Table> metastoreTable = metastore.getTable(metastoreContext(session), schemaName, deltaTableName.getTableNameOrPath());
if (!metastoreTable.isPresent()) {
return null; // indicates table doesn't exist
}
Map<String, String> tableParameters = metastoreTable.get().getParameters();
Storage storage = metastoreTable.get().getStorage();
tableLocation = storage.getLocation();
// Delta table written using Spark and Hive have set the table parameter
// "spark.sql.sources.provider = delta". If this property is found table
// location is found in SerDe properties with key "path".
if ("delta".equalsIgnoreCase(tableParameters.get("spark.sql.sources.provider"))) {
tableLocation = storage.getSerdeParameters().get("path");
if (Strings.isNullOrEmpty(tableLocation)) {
log.warn("Location key ('path') is missing in SerDe properties for table %s. " +
"Using the 'location' attribute as the table location.", schemaTableName);
// fallback to using the location attribute
tableLocation = storage.getLocation();
}
}
}
Optional<DeltaTable> table = deltaClient.getTable(
config,
session,
schemaTableName,
tableLocation,
deltaTableName.getSnapshotId(),
deltaTableName.getTimestampMillisUtc());
if (table.isPresent()) {
return new DeltaTableHandle(connectorId, table.get());
}
return null;
}
@Override
public List<ConnectorTableLayoutResult> getTableLayouts(
ConnectorSession session,
ConnectorTableHandle table,
Constraint<ColumnHandle> constraint,
Optional<Set<ColumnHandle>> desiredColumns)
{
DeltaTableHandle tableHandle = (DeltaTableHandle) table;
// Split the predicate into partition column predicate and other column predicates
// Only the partition column predicate is fully enforced. Other predicate is partially enforced (best effort).
List<TupleDomain<ColumnHandle>> predicate = splitPredicate(constraint.getSummary());
TupleDomain<ColumnHandle> unenforcedPredicate = predicate.get(1);
DeltaTableLayoutHandle newDeltaTableLayoutHandle = new DeltaTableLayoutHandle(
tableHandle,
constraint.getSummary().transform(DeltaColumnHandle.class::cast),
Optional.of(constraint.getSummary().toString(session.getSqlFunctionProperties())));
ConnectorTableLayout newLayout = new ConnectorTableLayout(
newDeltaTableLayoutHandle,
Optional.empty(),
constraint.getSummary(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableList.of(),
Optional.empty());
return ImmutableList.of(new ConnectorTableLayoutResult(newLayout, unenforcedPredicate));
}
@Override
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle)
{
return new ConnectorTableLayout(handle);
}
@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
DeltaTableHandle deltaTableHandle = (DeltaTableHandle) table;
checkConnectorId(deltaTableHandle);
return getTableMetadata(session, deltaTableHandle.toSchemaTableName());
}
@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
{
List<String> schemaNames = schemaName.<List<String>>map(ImmutableList::of)
.orElseGet(() -> listSchemaNames(session));
ImmutableList.Builder<SchemaTableName> tableNames = ImmutableList.builder();
for (String schema : schemaNames) {
for (String tableName : metastore.getAllTables(metastoreContext(session), schema).orElse(emptyList())) {
tableNames.add(new SchemaTableName(schema, tableName));
}
}
return tableNames.build();
}
@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
DeltaTableHandle deltaTableHandle = (DeltaTableHandle) tableHandle;
checkConnectorId(deltaTableHandle);
ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (DeltaColumn column : deltaTableHandle.getDeltaTable().getColumns()) {
columnHandles.put(
column.getName(),
new DeltaColumnHandle(
column.getName(),
column.getType(),
column.isPartition() ? PARTITION : REGULAR,
Optional.empty()));
}
return columnHandles.build();
}
@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
requireNonNull(prefix, "prefix is null");
ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
for (SchemaTableName tableName : listTables(session, prefix)) {
ConnectorTableMetadata tableMetadata = getTableMetadata(session, tableName);
// table can disappear during listing operation
if (tableMetadata != null) {
columns.put(tableName, tableMetadata.getColumns());
}
}
return columns.build();
}
private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName tableName)
{
DeltaTableHandle tableHandle = getTableHandle(session, tableName);
if (tableHandle == null) {
return null;
}
List<ColumnMetadata> columnMetadata = tableHandle.getDeltaTable().getColumns().stream()
.map(column -> getColumnMetadata(session, column))
.collect(Collectors.toList());
return new ConnectorTableMetadata(tableName, columnMetadata);
}
@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
return getColumnMetadata(columnHandle);
}
private ColumnMetadata getColumnMetadata(ColumnHandle columnHandle)
{
DeltaColumnHandle deltaColumnHandle = (DeltaColumnHandle) columnHandle;
return ColumnMetadata.builder()
.setName(deltaColumnHandle.getName())
.setType(typeManager.getType(deltaColumnHandle.getDataType()))
.build();
}
private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix)
{
if (prefix.getSchemaName() == null) {
return listTables(session, prefix.getSchemaName());
}
return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
}
private ColumnMetadata getColumnMetadata(ConnectorSession session, DeltaColumn deltaColumn)
{
return ColumnMetadata.builder()
.setName(normalizeIdentifier(session, deltaColumn.getName()))
.setType(typeManager.getType(deltaColumn.getType()))
.build();
}
private MetastoreContext metastoreContext(ConnectorSession session)
{
return new MetastoreContext(
session.getIdentity(),
session.getQueryId(),
session.getClientInfo(),
session.getClientTags(),
session.getSource(),
Optional.empty(),
false,
DEFAULT_COLUMN_CONVERTER_PROVIDER,
session.getWarningCollector(),
session.getRuntimeStats());
}
private void checkConnectorId(DeltaTableHandle tableHandle)
{
checkArgument(tableHandle.getConnectorId().equals(connectorId), "table handle is not for this connector");
}
}