HiveMaterializedViewUtils.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.MaterializedViewDefinition;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableNotFoundException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.joda.time.DateTimeZone;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static com.facebook.presto.common.predicate.TupleDomain.extractFixedValues;
import static com.facebook.presto.common.predicate.TupleDomain.toLinkedMap;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static com.facebook.presto.hive.HiveUtil.getPartitionKeyColumnHandles;
import static com.facebook.presto.hive.HiveUtil.parsePartitionValue;
import static com.facebook.presto.hive.metastore.MetastoreUtil.toPartitionNamesAndValues;
import static com.facebook.presto.spi.MaterializedViewStatus.MaterializedDataPredicates;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.lang.String.format;
import static java.util.Collections.emptyMap;
public class HiveMaterializedViewUtils
{
private static final MaterializedDataPredicates EMPTY_MATERIALIZED_VIEW_DATA_PREDICATES = new MaterializedDataPredicates(ImmutableList.of(), ImmutableList.of());
private HiveMaterializedViewUtils()
{
}
/**
* Validate the partition columns of a materialized view to ensure 1) a materialized view is partitioned; 2) it has at least one partition
* directly mapped to all base tables and 3) Outer join conditions have common partitions that are partitions in the view as well
* <p>
* A column is directly mapped to a base table column if it is derived directly or transitively from the base table column,
* by only selecting a column or an aliased column without any function or operator applied.
* For example, with SELECT column_b AS column_a, column_a is directly mapped to column_b.
* With SELECT column_b + column_c AS column_a, column_a is not directly mapped to any column.
* <p>
* {@code viewToBaseColumnMap} only contains direct column mappings.
*/
public static void validateMaterializedViewPartitionColumns(
SemiTransactionalHiveMetastore metastore,
MetastoreContext metastoreContext,
Table viewTable,
MaterializedViewDefinition viewDefinition)
{
SchemaTableName viewName = new SchemaTableName(viewTable.getDatabaseName(), viewTable.getTableName());
Map<String, Map<SchemaTableName, String>> viewToBaseDirectColumnMap = viewDefinition.getDirectColumnMappingsAsMap();
if (viewToBaseDirectColumnMap.isEmpty()) {
throw new PrestoException(
NOT_SUPPORTED,
format("Materialized view %s must have at least one column directly defined by a base table column.", viewName));
}
List<Column> viewPartitions = viewTable.getPartitionColumns();
if (viewPartitions.isEmpty()) {
throw new PrestoException(NOT_SUPPORTED, "Unpartitioned materialized view is not supported.");
}
List<Table> baseTables = viewDefinition.getBaseTables().stream()
.map(baseTableName -> metastore.getTable(metastoreContext, baseTableName.getSchemaName(), baseTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(baseTableName)))
.collect(toImmutableList());
Map<Table, List<Column>> baseTablePartitions = baseTables.stream()
.collect(toImmutableMap(
table -> table,
Table::getPartitionColumns));
for (Table baseTable : baseTablePartitions.keySet()) {
SchemaTableName schemaBaseTable = new SchemaTableName(baseTable.getDatabaseName(), baseTable.getTableName());
if (!isCommonPartitionFound(schemaBaseTable, baseTablePartitions.get(baseTable), viewPartitions, viewToBaseDirectColumnMap)) {
throw new PrestoException(
NOT_SUPPORTED,
format("Materialized view %s must have at least one partition column that exists in %s as well", viewName, baseTable.getTableName()));
}
if (viewDefinition.getBaseTablesOnOuterJoinSide().contains(schemaBaseTable) && viewToBaseTableOnOuterJoinSideIndirectMappedPartitions(viewDefinition, baseTable).get().isEmpty()) {
throw new PrestoException(
NOT_SUPPORTED,
format("Outer join conditions in Materialized view %s must have at least one common partition equality constraint", viewName));
}
}
}
private static boolean isCommonPartitionFound(
SchemaTableName baseTable,
List<Column> baseTablePartitions,
List<Column> viewPartitions,
Map<String, Map<SchemaTableName, String>> viewToBaseColumnMap)
{
for (Column viewPartition : viewPartitions) {
for (Column basePartition : baseTablePartitions) {
if (viewToBaseColumnMap
.getOrDefault(viewPartition.getName(), emptyMap())
.getOrDefault(baseTable, "")
.equals(basePartition.getName())) {
return true;
}
}
}
return false;
}
public static MaterializedDataPredicates getMaterializedDataPredicates(
SemiTransactionalHiveMetastore metastore,
MetastoreContext metastoreContext,
TypeManager typeManager,
Table table,
DateTimeZone timeZone)
{
List<Column> partitionColumns = table.getPartitionColumns();
for (Column partitionColumn : partitionColumns) {
HiveType hiveType = partitionColumn.getType();
if (!hiveType.isSupportedType()) {
throw new PrestoException(
NOT_SUPPORTED,
String.format("Unsupported Hive type %s found in partition keys of table %s.%s", hiveType, table.getDatabaseName(), table.getTableName()));
}
}
List<HiveColumnHandle> partitionKeyColumnHandles = getPartitionKeyColumnHandles(table);
Map<String, Type> partitionTypes = partitionKeyColumnHandles.stream()
.collect(toImmutableMap(HiveColumnHandle::getName, column -> typeManager.getType(column.getTypeSignature())));
List<PartitionNameWithVersion> partitionNames = metastore.getPartitionNames(metastoreContext, table.getDatabaseName(), table.getTableName())
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(table.getDatabaseName(), table.getTableName())));
ImmutableList.Builder<TupleDomain<String>> partitionNamesAndValues = ImmutableList.builder();
for (PartitionNameWithVersion partitionName : partitionNames) {
ImmutableMap.Builder<String, NullableValue> partitionNameAndValuesMap = ImmutableMap.builder();
Map<String, String> partitions = toPartitionNamesAndValues(partitionName.getPartitionName());
if (partitionColumns.size() != partitions.size()) {
throw new PrestoException(HIVE_INVALID_METADATA, String.format(
"Expected %d partition key values, but got %d", partitionColumns.size(), partitions.size()));
}
partitionTypes.forEach((name, type) -> {
String value = partitions.get(name);
if (value == null) {
throw new PrestoException(HIVE_INVALID_PARTITION_VALUE, String.format("partition key value cannot be null for field: %s", name));
}
partitionNameAndValuesMap.put(name, parsePartitionValue(name, value, type, timeZone));
});
TupleDomain<String> tupleDomain = TupleDomain.fromFixedValues(partitionNameAndValuesMap.build());
partitionNamesAndValues.add(tupleDomain);
}
return new MaterializedDataPredicates(partitionNamesAndValues.build(), partitionColumns.stream()
.map(Column::getName)
.collect(toImmutableList()));
}
// Every table on outer join side, must have a partition which is in EQ clause and present in Materialized View as well.
// For a given base table, this function computes partition columns of Materialized View which are not directly mapped to base table,
// and are directly mapped to some other base table which is not on outer join side.
// For example:
// Materialized View: SELECT t1_a as t1.a, t2_a as t2.a FROM t1 LEFT JOIN t2 ON t1.a = t2.a, partitioned by [t1_a, t2_a]
// baseTable: t2, partitioned by [a]
// Output: t1_a -> t2.a
public static Optional<Map<String, String>> viewToBaseTableOnOuterJoinSideIndirectMappedPartitions(
MaterializedViewDefinition viewDefinition,
Table baseTable)
{
SchemaTableName schemaBaseTable = new SchemaTableName(baseTable.getDatabaseName(), baseTable.getTableName());
if (!viewDefinition.getBaseTablesOnOuterJoinSide().contains(schemaBaseTable)) {
return Optional.empty();
}
Map<String, String> viewToBaseIndirectMappedColumns = new HashMap<>();
Map<String, Map<SchemaTableName, String>> columnMappings = viewDefinition.getColumnMappingsAsMap();
Map<String, Map<SchemaTableName, String>> directColumnMappings = viewDefinition.getDirectColumnMappingsAsMap();
for (String viewPartition : viewDefinition.getValidRefreshColumns().orElse(ImmutableList.of())) {
String baseTablePartition = columnMappings.get(viewPartition).get(schemaBaseTable);
// Check if it is a base table partition column
if (baseTable.getPartitionColumns().stream().noneMatch(col -> col.getName().equals(baseTablePartition))) {
continue;
}
// Check if view partition column directly maps to some partition column of other base table which is not on outer join side
// For e.g. in case of left outer join, we want to find partition which maps to left table
if (directColumnMappings.get(viewPartition).keySet().stream().allMatch(e -> !e.equals(schemaBaseTable)) &&
directColumnMappings.get(viewPartition).keySet().stream().allMatch(t -> !viewDefinition.getBaseTablesOnOuterJoinSide().contains(t))) {
viewToBaseIndirectMappedColumns.put(viewPartition, baseTablePartition);
}
}
return Optional.of(viewToBaseIndirectMappedColumns);
}
public static MaterializedDataPredicates differenceDataPredicates(
MaterializedDataPredicates baseTablePredicatesInfo,
MaterializedDataPredicates viewPredicatesInfo,
Map<String, String> viewToBaseTablePredicatesKeyMap)
{
return differenceDataPredicates(baseTablePredicatesInfo, viewPredicatesInfo, viewToBaseTablePredicatesKeyMap, ImmutableMap.of());
}
/**
* From given base table partitions, removes all partitions that are already used to compute
* related view partitions, only returning partitions that are not reflected in Materialized View.
* We assume that given materialized view partitions are still fresh,
* and in sync with base table partitions, i.e. related Materialized View partitions are already invalidated
* when new base table partitions land.
* @param baseTablePredicatesInfo Partitions info for base table
* @param viewPredicatesInfo Partitions info for view
* @param viewToBaseTablePredicatesKeyMap Partitions mapping from view to base table. Only includes direct mapping, i.e. excludes mapping from outer joins EQ clauses.
* @param viewToBaseTableIndirectMap Extra partitions mapping from view to base table, computed from viewToBaseTableOnOuterJoinSideIndirectMappedPartitions()
* @return Base Table partitions that have not been used to refresh view.
*/
public static MaterializedDataPredicates differenceDataPredicates(
MaterializedDataPredicates baseTablePredicatesInfo,
MaterializedDataPredicates viewPredicatesInfo,
Map<String, String> viewToBaseTablePredicatesKeyMap,
Map<String, String> viewToBaseTableIndirectMap)
{
if (viewToBaseTablePredicatesKeyMap.isEmpty()) {
return EMPTY_MATERIALIZED_VIEW_DATA_PREDICATES;
}
if (viewPredicatesInfo.isEmpty()) {
return baseTablePredicatesInfo;
}
Set<String> baseTableMappedCommonKeys = new HashSet<>();
Set<String> viewMappedCommonKeys = new HashSet<>();
for (String rightKey : viewPredicatesInfo.getColumnNames()) {
String leftKey = viewToBaseTablePredicatesKeyMap.get(rightKey);
if (leftKey != null && baseTablePredicatesInfo.getColumnNames().contains(leftKey)) {
baseTableMappedCommonKeys.add(leftKey);
viewMappedCommonKeys.add(rightKey);
}
}
if (baseTableMappedCommonKeys.isEmpty()) {
return EMPTY_MATERIALIZED_VIEW_DATA_PREDICATES;
}
// Intentionally used linkedHashMap so that equal guarantees are kept even if underlying implementation is changed.
Set<LinkedHashMap<String, NullableValue>> viewPredicatesMappedToBaseTableCommonKeys = new HashSet<>();
for (TupleDomain<String> rightPredicate : viewPredicatesInfo.getPredicateDisjuncts()) {
LinkedHashMap<String, NullableValue> viewPredicateKeyValue = getLinkedHashMap(
extractFixedValues(rightPredicate).orElseThrow(() -> new IllegalStateException("rightPredicateKeyValue is not present!")));
LinkedHashMap<String, NullableValue> viewPredicateMappedToBaseTableCommonKeys = getLinkedHashMap(
viewPredicateKeyValue.keySet().stream()
.filter(viewMappedCommonKeys::contains)
.collect(toLinkedMap(
viewToBaseTablePredicatesKeyMap::get,
viewPredicateKeyValue::get)));
viewToBaseTableIndirectMap.entrySet().forEach(
e -> viewPredicateMappedToBaseTableCommonKeys.put(e.getValue(), viewPredicateKeyValue.get(e.getKey())));
viewPredicatesMappedToBaseTableCommonKeys.add(viewPredicateMappedToBaseTableCommonKeys);
}
ImmutableList.Builder<TupleDomain<String>> difference = ImmutableList.builder();
for (TupleDomain<String> leftPredicate : baseTablePredicatesInfo.getPredicateDisjuncts()) {
LinkedHashMap<String, NullableValue> baseTablePredicateKeyValue = getLinkedHashMap(
extractFixedValues(leftPredicate).orElseThrow(() -> new IllegalStateException("leftPredicateKeyValue is not present!")));
LinkedHashMap<String, NullableValue> baseTablePredicateMappedToBaseTableCommonKeys = getLinkedHashMap(
baseTablePredicateKeyValue.keySet().stream()
.filter(baseTableMappedCommonKeys::contains)
.collect(Collectors.toMap(
columnName -> columnName,
baseTablePredicateKeyValue::get)));
if (!viewPredicatesMappedToBaseTableCommonKeys.contains(baseTablePredicateMappedToBaseTableCommonKeys)) {
difference.add(leftPredicate);
}
else if (!viewToBaseTableIndirectMap.isEmpty()) {
// If base table is part of an outer join, its columns can be null. So check if an equivalent partition exists for view with null values
LinkedHashMap<String, NullableValue> baseTablePredicateMappedToBaseTableAllKeys = getLinkedHashMap(
baseTablePredicateKeyValue.keySet().stream()
.filter(baseTableMappedCommonKeys::contains)
.collect(Collectors.toMap(
columnName -> columnName,
columnName -> NullableValue.asNull(baseTablePredicateKeyValue.get(columnName).getType()))));
viewToBaseTableIndirectMap.entrySet().forEach(e -> baseTablePredicateMappedToBaseTableAllKeys.put(e.getValue(), baseTablePredicateKeyValue.get(e.getValue())));
if (!viewPredicatesMappedToBaseTableCommonKeys.contains(baseTablePredicateMappedToBaseTableCommonKeys)) {
difference.add(leftPredicate);
}
}
}
return new MaterializedDataPredicates(difference.build(), baseTablePredicatesInfo.getColumnNames());
}
public static MaterializedDataPredicates getEmptyMaterializedViewDataPredicates()
{
return EMPTY_MATERIALIZED_VIEW_DATA_PREDICATES;
}
private static <K, V> LinkedHashMap<K, V> getLinkedHashMap(Map<K, V> map)
{
return (map instanceof LinkedHashMap) ? (LinkedHashMap<K, V>) map : new LinkedHashMap<>(map);
}
public static Map<SchemaTableName, Map<String, String>> getViewToBasePartitionMap(
Table view,
List<Table> baseTables,
Map<String, Map<SchemaTableName, String>> viewToBaseColumnMap)
{
List<String> viewPartitions = view.getPartitionColumns()
.stream()
.map(Column::getName)
.collect(Collectors.toList());
Map<SchemaTableName, List<String>> baseTablePartitions = baseTables.stream()
.collect(Collectors.toMap(
table -> new SchemaTableName(table.getDatabaseName(), table.getTableName()),
table -> table.getPartitionColumns().stream().map(Column::getName).collect(toImmutableList())));
ImmutableMap.Builder<SchemaTableName, Map<String, String>> viewToBasePartitionMap = ImmutableMap.builder();
for (SchemaTableName baseTable : baseTablePartitions.keySet()) {
Map<String, String> partitionMap = new HashMap<>();
for (String viewPartition : viewPartitions) {
for (String basePartition : baseTablePartitions.get(baseTable)) {
if (viewToBaseColumnMap.containsKey(viewPartition) &&
viewToBaseColumnMap.get(viewPartition).containsKey(baseTable) &&
viewToBaseColumnMap.get(viewPartition).get(baseTable).equals(basePartition)) {
partitionMap.put(viewPartition, basePartition);
}
}
}
if (!partitionMap.isEmpty()) {
viewToBasePartitionMap.put(baseTable, ImmutableMap.copyOf(partitionMap));
}
}
return viewToBasePartitionMap.build();
}
}