PropertyDerivations.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.optimizations;
import com.facebook.presto.Session;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.TableLayout;
import com.facebook.presto.metadata.TableLayout.TablePartitioning;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConstantProperty;
import com.facebook.presto.spi.GroupingProperty;
import com.facebook.presto.spi.LocalProperty;
import com.facebook.presto.spi.SortingProperty;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.DeleteNode;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.EquiJoinClause;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.IndexSourceNode;
import com.facebook.presto.spi.plan.JoinNode;
import com.facebook.presto.spi.plan.JoinType;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.MarkDistinctNode;
import com.facebook.presto.spi.plan.MergeJoinNode;
import com.facebook.presto.spi.plan.OrderingScheme;
import com.facebook.presto.spi.plan.OutputNode;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SemiJoinNode;
import com.facebook.presto.spi.plan.SortNode;
import com.facebook.presto.spi.plan.SpatialJoinNode;
import com.facebook.presto.spi.plan.TableFinishNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.spi.plan.TopNNode;
import com.facebook.presto.spi.plan.UnnestNode;
import com.facebook.presto.spi.plan.ValuesNode;
import com.facebook.presto.spi.plan.WindowNode;
import com.facebook.presto.spi.relation.ConstantExpression;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.RowExpressionInterpreter;
import com.facebook.presto.sql.planner.optimizations.ActualProperties.Global;
import com.facebook.presto.sql.planner.plan.ApplyNode;
import com.facebook.presto.sql.planner.plan.AssignUniqueId;
import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode;
import com.facebook.presto.sql.planner.plan.GroupIdNode;
import com.facebook.presto.sql.planner.plan.IndexJoinNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.LateralJoinNode;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.SampleNode;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
import com.facebook.presto.sql.planner.plan.TableWriterMergeNode;
import com.facebook.presto.sql.planner.plan.TopNRowNumberNode;
import com.facebook.presto.sql.planner.plan.UpdateNode;
import com.facebook.presto.sql.relational.RowExpressionDomainTranslator;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.planWithTableNodePartitioning;
import static com.facebook.presto.common.predicate.TupleDomain.toLinkedMap;
import static com.facebook.presto.spi.relation.DomainTranslator.BASIC_COLUMN_EXTRACTOR;
import static com.facebook.presto.spi.relation.ExpressionOptimizer.Level.OPTIMIZED;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.ARBITRARY_DISTRIBUTION;
import static com.facebook.presto.sql.planner.optimizations.ActualProperties.Global.arbitraryPartition;
import static com.facebook.presto.sql.planner.optimizations.ActualProperties.Global.coordinatorSingleStreamPartition;
import static com.facebook.presto.sql.planner.optimizations.ActualProperties.Global.partitionedOn;
import static com.facebook.presto.sql.planner.optimizations.ActualProperties.Global.partitionedOnCoalesce;
import static com.facebook.presto.sql.planner.optimizations.ActualProperties.Global.singleStreamPartition;
import static com.facebook.presto.sql.planner.optimizations.ActualProperties.Global.streamPartitionedOn;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.stream.Collectors.toMap;
public class PropertyDerivations
{
private PropertyDerivations() {}
public static ActualProperties derivePropertiesRecursively(PlanNode node, Metadata metadata, Session session)
{
List<ActualProperties> inputProperties = node.getSources().stream()
.map(source -> derivePropertiesRecursively(source, metadata, session))
.collect(toImmutableList());
return deriveProperties(node, inputProperties, metadata, session);
}
public static ActualProperties deriveProperties(PlanNode node, List<ActualProperties> inputProperties, Metadata metadata, Session session)
{
ActualProperties output = node.accept(new Visitor(metadata, session), inputProperties);
output.getNodePartitioning().ifPresent(partitioning ->
verify(node.getOutputVariables().containsAll(partitioning.getVariableReferences()), "Node-level partitioning properties contain columns not present in node's output"));
verify(node.getOutputVariables().containsAll(output.getConstants().keySet()), "Node-level constant properties contain columns not present in node's output");
Set<VariableReferenceExpression> localPropertyColumns = output.getLocalProperties().stream()
.flatMap(property -> property.getColumns().stream())
.collect(Collectors.toSet());
verify(node.getOutputVariables().containsAll(localPropertyColumns), "Node-level local properties contain columns not present in node's output");
return output;
}
public static ActualProperties streamBackdoorDeriveProperties(PlanNode node, List<ActualProperties> inputProperties, Metadata metadata, Session session)
{
return node.accept(new Visitor(metadata, session), inputProperties);
}
private static class Visitor
extends InternalPlanVisitor<ActualProperties, List<ActualProperties>>
{
private final Metadata metadata;
private final Session session;
public Visitor(Metadata metadata, Session session)
{
this.metadata = metadata;
this.session = session;
}
@Override
public ActualProperties visitPlan(PlanNode node, List<ActualProperties> inputProperties)
{
throw new UnsupportedOperationException("not yet implemented: " + node.getClass().getName());
}
@Override
public ActualProperties visitExplainAnalyze(ExplainAnalyzeNode node, List<ActualProperties> inputProperties)
{
return ActualProperties.builder()
.global(coordinatorSingleStreamPartition())
.build();
}
@Override
public ActualProperties visitOutput(OutputNode node, List<ActualProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties)
.translateVariable(column -> PropertyDerivations.filterIfMissing(node.getOutputVariables(), column));
}
@Override
public ActualProperties visitEnforceSingleRow(EnforceSingleRowNode node, List<ActualProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public ActualProperties visitAssignUniqueId(AssignUniqueId node, List<ActualProperties> inputProperties)
{
ActualProperties properties = Iterables.getOnlyElement(inputProperties);
ImmutableList.Builder<LocalProperty<VariableReferenceExpression>> newLocalProperties = ImmutableList.builder();
newLocalProperties.addAll(properties.getLocalProperties());
newLocalProperties.add(new GroupingProperty<>(ImmutableList.of(node.getIdVariable())));
node.getSource().getOutputVariables().stream()
.forEach(column -> newLocalProperties.add(new ConstantProperty<>(column)));
if (properties.getNodePartitioning().isPresent()) {
// preserve input (possibly preferred) partitioning
return ActualProperties.builderFrom(properties)
.local(newLocalProperties.build())
.build();
}
return ActualProperties.builderFrom(properties)
.global(partitionedOn(ARBITRARY_DISTRIBUTION, ImmutableList.of(node.getIdVariable()), Optional.empty()))
.local(newLocalProperties.build())
.build();
}
@Override
public ActualProperties visitApply(ApplyNode node, List<ActualProperties> inputProperties)
{
throw new IllegalArgumentException("Unexpected node: " + node.getClass().getName());
}
@Override
public ActualProperties visitLateralJoin(LateralJoinNode node, List<ActualProperties> inputProperties)
{
throw new IllegalArgumentException("Unexpected node: " + node.getClass().getName());
}
@Override
public ActualProperties visitMarkDistinct(MarkDistinctNode node, List<ActualProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public ActualProperties visitWindow(WindowNode node, List<ActualProperties> inputProperties)
{
ActualProperties properties = Iterables.getOnlyElement(inputProperties);
// If the input is completely pre-partitioned and sorted, then the original input properties will be respected
Optional<OrderingScheme> orderingScheme = node.getOrderingScheme();
if (ImmutableSet.copyOf(node.getPartitionBy()).equals(node.getPrePartitionedInputs())
&& (!orderingScheme.isPresent() || node.getPreSortedOrderPrefix() == orderingScheme.get().getOrderByVariables().size())) {
return properties;
}
ImmutableList.Builder<LocalProperty<VariableReferenceExpression>> localProperties = ImmutableList.builder();
// If the WindowNode has pre-partitioned inputs, then it will not change the order of those inputs at output,
// so we should just propagate those underlying local properties that guarantee the pre-partitioning.
// TODO: come up with a more general form of this operation for other streaming operators
if (!node.getPrePartitionedInputs().isEmpty()) {
GroupingProperty<VariableReferenceExpression> prePartitionedProperty = new GroupingProperty<>(node.getPrePartitionedInputs());
for (LocalProperty<VariableReferenceExpression> localProperty : properties.getLocalProperties()) {
if (!prePartitionedProperty.isSimplifiedBy(localProperty)) {
break;
}
localProperties.add(localProperty);
}
}
if (!node.getPartitionBy().isEmpty()) {
localProperties.add(new GroupingProperty<>(node.getPartitionBy()));
}
orderingScheme.ifPresent(scheme ->
scheme.getOrderByVariables().stream()
.map(column -> new SortingProperty<>(column, scheme.getOrdering(column)))
.forEach(localProperties::add));
return ActualProperties.builderFrom(properties)
.local(LocalProperties.normalizeAndPrune(localProperties.build()))
.build();
}
@Override
public ActualProperties visitGroupId(GroupIdNode node, List<ActualProperties> inputProperties)
{
Map<VariableReferenceExpression, VariableReferenceExpression> inputToOutputMappings = new HashMap<>();
for (Map.Entry<VariableReferenceExpression, VariableReferenceExpression> setMapping : node.getGroupingColumns().entrySet()) {
if (node.getCommonGroupingColumns().contains(setMapping.getKey())) {
// TODO: Add support for translating a property on a single column to multiple columns
// when GroupIdNode is copying a single input grouping column into multiple output grouping columns (i.e. aliases), this is basically picking one arbitrarily
inputToOutputMappings.putIfAbsent(setMapping.getValue(), setMapping.getKey());
}
}
// TODO: Add support for translating a property on a single column to multiple columns
// this is deliberately placed after the grouping columns, because preserving properties has a bigger perf impact
for (VariableReferenceExpression argument : node.getAggregationArguments()) {
inputToOutputMappings.putIfAbsent(argument, argument);
}
return Iterables.getOnlyElement(inputProperties).translateVariable(column -> Optional.ofNullable(inputToOutputMappings.get(column)));
}
@Override
public ActualProperties visitAggregation(AggregationNode node, List<ActualProperties> inputProperties)
{
ActualProperties properties = Iterables.getOnlyElement(inputProperties);
ActualProperties translated = properties.translateVariable(variable -> node.getGroupingKeys().contains(variable) ? Optional.of(variable) : Optional.empty());
return ActualProperties.builderFrom(translated)
.local(LocalProperties.grouped(node.getGroupingKeys()))
.build();
}
@Override
public ActualProperties visitRowNumber(RowNumberNode node, List<ActualProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public ActualProperties visitTopNRowNumber(TopNRowNumberNode node, List<ActualProperties> inputProperties)
{
ActualProperties properties = Iterables.getOnlyElement(inputProperties);
ImmutableList.Builder<LocalProperty<VariableReferenceExpression>> localProperties = ImmutableList.builder();
localProperties.add(new GroupingProperty<>(node.getPartitionBy()));
for (VariableReferenceExpression column : node.getOrderingScheme().getOrderByVariables()) {
localProperties.add(new SortingProperty<>(column, node.getOrderingScheme().getOrdering(column)));
}
return ActualProperties.builderFrom(properties)
.local(localProperties.build())
.build();
}
@Override
public ActualProperties visitTopN(TopNNode node, List<ActualProperties> inputProperties)
{
ActualProperties properties = Iterables.getOnlyElement(inputProperties);
List<SortingProperty<VariableReferenceExpression>> localProperties = node.getOrderingScheme().getOrderByVariables().stream()
.map(column -> new SortingProperty<>(column, node.getOrderingScheme().getOrdering(column)))
.collect(toImmutableList());
return ActualProperties.builderFrom(properties)
.local(localProperties)
.build();
}
@Override
public ActualProperties visitSort(SortNode node, List<ActualProperties> inputProperties)
{
ActualProperties properties = Iterables.getOnlyElement(inputProperties);
List<SortingProperty<VariableReferenceExpression>> localProperties = node.getOrderingScheme().getOrderByVariables().stream()
.map(column -> new SortingProperty<>(column, node.getOrderingScheme().getOrdering(column)))
.collect(toImmutableList());
return ActualProperties.builderFrom(properties)
.local(localProperties)
.build();
}
@Override
public ActualProperties visitLimit(LimitNode node, List<ActualProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public ActualProperties visitDistinctLimit(DistinctLimitNode node, List<ActualProperties> inputProperties)
{
ActualProperties properties = Iterables.getOnlyElement(inputProperties);
return ActualProperties.builderFrom(properties)
.local(LocalProperties.grouped(node.getDistinctVariables()))
.build();
}
@Override
public ActualProperties visitStatisticsWriterNode(StatisticsWriterNode node, List<ActualProperties> context)
{
return ActualProperties.builder()
.global(coordinatorSingleStreamPartition())
.build();
}
@Override
public ActualProperties visitTableFinish(TableFinishNode node, List<ActualProperties> inputProperties)
{
return ActualProperties.builder()
.global(coordinatorSingleStreamPartition())
.build();
}
@Override
public ActualProperties visitDelete(DeleteNode node, List<ActualProperties> inputProperties)
{
// drop all symbols in property because delete doesn't pass on any of the columns
return Iterables.getOnlyElement(inputProperties).translateVariable(symbol -> Optional.empty());
}
@Override
public ActualProperties visitUpdate(UpdateNode node, List<ActualProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties).translateVariable(symbol -> Optional.empty());
}
@Override
public ActualProperties visitJoin(JoinNode node, List<ActualProperties> inputProperties)
{
ActualProperties probeProperties = inputProperties.get(0);
ActualProperties buildProperties = inputProperties.get(1);
List<VariableReferenceExpression> outputVariableReferences = node.getOutputVariables();
boolean unordered = spillPossible(session, node.getType());
switch (node.getType()) {
case INNER:
probeProperties = probeProperties.translateVariable(column -> filterOrRewrite(outputVariableReferences, node.getCriteria(), column));
buildProperties = buildProperties.translateVariable(column -> filterOrRewrite(outputVariableReferences, node.getCriteria(), column));
Map<VariableReferenceExpression, ConstantExpression> constants = new HashMap<>();
constants.putAll(probeProperties.getConstants());
constants.putAll(buildProperties.getConstants());
if (node.isCrossJoin()) {
// Cross join preserves only constants from probe and build sides.
// Cross join doesn't preserve sorting or grouping local properties on either side.
return ActualProperties.builder()
.global(probeProperties)
.local(ImmutableList.of())
.constants(constants)
.build();
}
return ActualProperties.builderFrom(probeProperties)
.constants(constants)
.unordered(unordered)
.build();
case LEFT:
return ActualProperties.builderFrom(probeProperties.translateVariable(column -> filterIfMissing(outputVariableReferences, column)))
.unordered(unordered)
.build();
case RIGHT:
buildProperties = buildProperties.translateVariable(column -> filterIfMissing(node.getOutputVariables(), column));
return ActualProperties.builderFrom(buildProperties.translateVariable(column -> filterIfMissing(outputVariableReferences, column)))
.local(ImmutableList.of())
.unordered(true)
.build();
case FULL:
if (probeProperties.isSingleNode()) {
return ActualProperties.builder()
.global(singleStreamPartition())
.build();
}
if (probeProperties.getNodePartitioning().isPresent() &&
buildProperties.getNodePartitioning().isPresent() &&
arePartitionHandlesCompatibleForCoalesce(
probeProperties.getNodePartitioning().get().getHandle(),
buildProperties.getNodePartitioning().get().getHandle(),
metadata,
session)) {
return ActualProperties.builder()
.global(partitionedOnCoalesce(probeProperties.getNodePartitioning().get(), buildProperties.getNodePartitioning().get(), metadata, session))
.build();
}
return ActualProperties.builder()
.global(arbitraryPartition())
.build();
default:
throw new UnsupportedOperationException("Unsupported join type: " + node.getType());
}
}
@Override
public ActualProperties visitSemiJoin(SemiJoinNode node, List<ActualProperties> inputProperties)
{
return inputProperties.get(0);
}
@Override
public ActualProperties visitSpatialJoin(SpatialJoinNode node, List<ActualProperties> inputProperties)
{
ActualProperties probeProperties = inputProperties.get(0);
ActualProperties buildProperties = inputProperties.get(1);
List<VariableReferenceExpression> outputs = node.getOutputVariables();
switch (node.getType()) {
case INNER:
probeProperties = probeProperties.translateVariable(column -> filterIfMissing(outputs, column));
buildProperties = buildProperties.translateVariable(column -> filterIfMissing(outputs, column));
Map<VariableReferenceExpression, ConstantExpression> constants = new HashMap<>();
constants.putAll(probeProperties.getConstants());
constants.putAll(buildProperties.getConstants());
return ActualProperties.builderFrom(probeProperties)
.constants(constants)
.build();
case LEFT:
return ActualProperties.builderFrom(probeProperties.translateVariable(column -> filterIfMissing(outputs, column)))
.build();
default:
throw new IllegalArgumentException("Unsupported spatial join type: " + node.getType());
}
}
@Override
public ActualProperties visitIndexJoin(IndexJoinNode node, List<ActualProperties> inputProperties)
{
// TODO: include all equivalent columns in partitioning properties
ActualProperties probeProperties = inputProperties.get(0);
ActualProperties indexProperties = inputProperties.get(1);
switch (node.getType()) {
case INNER:
return ActualProperties.builderFrom(probeProperties)
.constants(ImmutableMap.<VariableReferenceExpression, ConstantExpression>builder()
.putAll(probeProperties.getConstants())
.putAll(indexProperties.getConstants())
.build())
.build();
case SOURCE_OUTER:
return ActualProperties.builderFrom(probeProperties)
.constants(probeProperties.getConstants())
.build();
default:
throw new UnsupportedOperationException("Unsupported join type: " + node.getType());
}
}
@Override
public ActualProperties visitIndexSource(IndexSourceNode node, List<ActualProperties> context)
{
return ActualProperties.builder()
.global(singleStreamPartition())
.build();
}
@Override
public ActualProperties visitMergeJoin(MergeJoinNode node, List<ActualProperties> inputProperties)
{
ActualProperties leftProperties = inputProperties.get(0);
ActualProperties rightProperties = inputProperties.get(1);
List<VariableReferenceExpression> outputVariableReferences = node.getOutputVariables();
switch (node.getType()) {
case INNER:
leftProperties = leftProperties.translateVariable(column -> filterOrRewrite(outputVariableReferences, node.getCriteria(), column));
rightProperties = rightProperties.translateVariable(column -> filterOrRewrite(outputVariableReferences, node.getCriteria(), column));
Map<VariableReferenceExpression, ConstantExpression> constants = new HashMap<>();
constants.putAll(leftProperties.getConstants());
constants.putAll(rightProperties.getConstants());
return ActualProperties.builderFrom(leftProperties)
.constants(constants)
.build();
case LEFT:
return ActualProperties.builderFrom(leftProperties.translateVariable(column -> filterIfMissing(outputVariableReferences, column)))
.build();
case RIGHT:
rightProperties = rightProperties.translateVariable(column -> filterIfMissing(node.getOutputVariables(), column));
return ActualProperties.builderFrom(rightProperties.translateVariable(column -> filterIfMissing(outputVariableReferences, column)))
.local(ImmutableList.of())
.unordered(true)
.build();
case FULL:
if (leftProperties.isSingleNode()) {
return ActualProperties.builder()
.global(singleStreamPartition())
.build();
}
if (leftProperties.getNodePartitioning().isPresent() &&
rightProperties.getNodePartitioning().isPresent() &&
arePartitionHandlesCompatibleForCoalesce(
leftProperties.getNodePartitioning().get().getHandle(),
rightProperties.getNodePartitioning().get().getHandle(),
metadata,
session)) {
return ActualProperties.builder()
.global(partitionedOnCoalesce(leftProperties.getNodePartitioning().get(), rightProperties.getNodePartitioning().get(), metadata, session))
.build();
}
return ActualProperties.builder()
.global(arbitraryPartition())
.build();
default:
throw new UnsupportedOperationException("Unsupported join type: " + node.getType());
}
}
@Override
public ActualProperties visitExchange(ExchangeNode node, List<ActualProperties> inputProperties)
{
checkArgument(!node.getScope().isRemote() || inputProperties.stream().noneMatch(ActualProperties::isNullsAndAnyReplicated), "Null-and-any replicated inputs should not be remotely exchanged");
Set<Map.Entry<VariableReferenceExpression, ConstantExpression>> entries = null;
for (int sourceIndex = 0; sourceIndex < node.getSources().size(); sourceIndex++) {
List<VariableReferenceExpression> inputVariables = node.getInputs().get(sourceIndex);
Map<VariableReferenceExpression, VariableReferenceExpression> inputToOutput = new HashMap<>();
for (int i = 0; i < node.getOutputVariables().size(); i++) {
inputToOutput.put(inputVariables.get(i), node.getOutputVariables().get(i));
}
ActualProperties translated = inputProperties.get(sourceIndex).translateVariable(variable -> Optional.ofNullable(inputToOutput.get(variable)));
entries = (entries == null) ? translated.getConstants().entrySet() : Sets.intersection(entries, translated.getConstants().entrySet());
}
checkState(entries != null);
Map<VariableReferenceExpression, ConstantExpression> constants = entries.stream()
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
ImmutableList.Builder<SortingProperty<VariableReferenceExpression>> localProperties = ImmutableList.builder();
if (node.getOrderingScheme().isPresent()) {
node.getOrderingScheme().get().getOrderByVariables().stream()
.map(column -> new SortingProperty<>(column, node.getOrderingScheme().get().getOrdering(column)))
.forEach(localProperties::add);
}
// Local exchanges are only created in AddLocalExchanges, at the end of optimization, and
// local exchanges do not produce all global properties as represented by ActualProperties.
// This is acceptable because AddLocalExchanges does not use global properties and is only
// interested in the local properties.
// However, for the purpose of validation, some global properties (single-node vs distributed)
// are computed for local exchanges.
// TODO: implement full properties for local exchanges
if (node.getScope().isLocal()) {
ActualProperties.Builder builder = ActualProperties.builder();
builder.local(localProperties.build());
builder.constants(constants);
if (inputProperties.stream().anyMatch(ActualProperties::isCoordinatorOnly)) {
builder.global(coordinatorSingleStreamPartition());
}
else if (inputProperties.stream().anyMatch(ActualProperties::isSingleNode)) {
builder.global(coordinatorSingleStreamPartition());
}
return builder.build();
}
switch (node.getType()) {
case GATHER:
boolean coordinatorOnly = node.getPartitioningScheme().getPartitioning().getHandle().isCoordinatorOnly();
return ActualProperties.builder()
.global(coordinatorOnly ? coordinatorSingleStreamPartition() : singleStreamPartition())
.local(localProperties.build())
.constants(constants)
.build();
case REPARTITION: {
Global globalPartitioning;
if (node.getPartitioningScheme().isScaleWriters()) {
// no strict partitioning guarantees when multiple writers per partitions allowed (scaled writers)
globalPartitioning = arbitraryPartition();
}
else {
globalPartitioning = partitionedOn(
node.getPartitioningScheme().getPartitioning(),
Optional.of(node.getPartitioningScheme().getPartitioning()))
.withReplicatedNulls(node.getPartitioningScheme().isReplicateNullsAndAny());
}
return ActualProperties.builder()
.global(globalPartitioning)
.constants(constants)
.build();
}
case REPLICATE:
// TODO: this should have the same global properties as the stream taking the replicated data
return ActualProperties.builder()
.global(arbitraryPartition())
.constants(constants)
.build();
}
throw new UnsupportedOperationException("not yet implemented");
}
@Override
public ActualProperties visitFilter(FilterNode node, List<ActualProperties> inputProperties)
{
ActualProperties properties = Iterables.getOnlyElement(inputProperties);
Map<VariableReferenceExpression, ConstantExpression> constants = new HashMap<>(properties.getConstants());
TupleDomain<VariableReferenceExpression> tupleDomain = new RowExpressionDomainTranslator(metadata).fromPredicate(session.toConnectorSession(), node.getPredicate(), BASIC_COLUMN_EXTRACTOR).getTupleDomain();
constants.putAll(extractFixedValuesToConstantExpressions(tupleDomain)
.orElse(ImmutableMap.of()));
return ActualProperties.builderFrom(properties)
.constants(constants)
.build();
}
@Override
public ActualProperties visitProject(ProjectNode node, List<ActualProperties> inputProperties)
{
ActualProperties properties = Iterables.getOnlyElement(inputProperties);
ActualProperties translatedProperties = properties.translateRowExpression(node.getAssignments().getMap());
// Extract additional constants
Map<VariableReferenceExpression, ConstantExpression> constants = new HashMap<>();
for (Map.Entry<VariableReferenceExpression, RowExpression> assignment : node.getAssignments().entrySet()) {
RowExpression expression = assignment.getValue();
VariableReferenceExpression output = assignment.getKey();
// TODO:
// We want to use a symbol resolver that looks up in the constants from the input subplan
// to take advantage of constant-folding for complex expressions
// However, that currently causes errors when those expressions operate on arrays or row types
// ("ROW comparison not supported for fields with null elements", etc)
Object value = new RowExpressionInterpreter(expression, metadata.getFunctionAndTypeManager(), session.toConnectorSession(), OPTIMIZED).optimize();
if (value instanceof VariableReferenceExpression) {
ConstantExpression existingConstantValue = constants.get(value);
if (existingConstantValue != null) {
constants.put(output, new ConstantExpression(((VariableReferenceExpression) value).getSourceLocation(), value, expression.getType()));
}
}
else if (!(value instanceof RowExpression)) {
constants.put(output, new ConstantExpression(value, expression.getType()));
}
}
constants.putAll(translatedProperties.getConstants());
return ActualProperties.builderFrom(translatedProperties)
.constants(constants)
.build();
}
@Override
public ActualProperties visitTableWriter(TableWriterNode node, List<ActualProperties> inputProperties)
{
ActualProperties properties = Iterables.getOnlyElement(inputProperties);
if (properties.isCoordinatorOnly()) {
return ActualProperties.builder()
.global(coordinatorSingleStreamPartition())
.build();
}
return ActualProperties.builder()
.global(properties.isSingleNode() ? singleStreamPartition() : arbitraryPartition())
.build();
}
@Override
public ActualProperties visitTableWriteMerge(TableWriterMergeNode node, List<ActualProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public ActualProperties visitSample(SampleNode node, List<ActualProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
}
@Override
public ActualProperties visitUnnest(UnnestNode node, List<ActualProperties> inputProperties)
{
Set<VariableReferenceExpression> passThroughInputs = ImmutableSet.copyOf(node.getReplicateVariables());
return Iterables.getOnlyElement(inputProperties).translateVariable(column -> {
if (passThroughInputs.contains(column)) {
return Optional.of(column);
}
return Optional.empty();
});
}
@Override
public ActualProperties visitValues(ValuesNode node, List<ActualProperties> context)
{
return ActualProperties.builder()
.global(singleStreamPartition())
.build();
}
public ActualProperties visitSequence(SequenceNode node, List<ActualProperties> context)
{
// Return the rightmost node properties
return context.get(context.size() - 1);
}
@Override
public ActualProperties visitTableScan(TableScanNode node, List<ActualProperties> inputProperties)
{
TableLayout layout = metadata.getLayout(session, node.getTable());
Map<ColumnHandle, VariableReferenceExpression> assignments = ImmutableBiMap.copyOf(node.getAssignments()).inverse();
ActualProperties.Builder properties = ActualProperties.builder();
// Globally constant assignments
Map<ColumnHandle, ConstantExpression> globalConstants = new HashMap<>();
extractFixedValuesToConstantExpressions(node.getCurrentConstraint()).orElse(ImmutableMap.of())
.entrySet().stream()
.filter(entry -> !entry.getValue().isNull())
.forEach(entry -> globalConstants.put(entry.getKey(), entry.getValue()));
Map<VariableReferenceExpression, ConstantExpression> symbolConstants = globalConstants.entrySet().stream()
.filter(entry -> assignments.containsKey(entry.getKey()))
.collect(toMap(entry -> assignments.get(entry.getKey()), Map.Entry::getValue));
properties.constants(symbolConstants);
// Partitioning properties
properties.global(deriveGlobalProperties(layout, assignments, globalConstants));
// Append the global constants onto the local properties to maximize their translation potential
List<LocalProperty<ColumnHandle>> constantAppendedLocalProperties = ImmutableList.<LocalProperty<ColumnHandle>>builder()
.addAll(globalConstants.keySet().stream().map(ConstantProperty::new).iterator())
.addAll(layout.getLocalProperties())
.build();
properties.local(LocalProperties.translate(constantAppendedLocalProperties, column -> Optional.ofNullable(assignments.get(column))));
return properties.build();
}
@Override
public ActualProperties visitRemoteSource(RemoteSourceNode node, List<ActualProperties> inputProperties)
{
if (node.getOrderingScheme().isPresent()) {
return ActualProperties.builder()
.global(singleStreamPartition())
.unordered(false)
.build();
}
if (node.isEnsureSourceOrdering()) {
return ActualProperties.builder()
.global(singleStreamPartition())
.build();
}
return ActualProperties.builder().build();
}
private Global deriveGlobalProperties(TableLayout layout, Map<ColumnHandle, VariableReferenceExpression> assignments, Map<ColumnHandle, ConstantExpression> constants)
{
Optional<List<VariableReferenceExpression>> streamPartitioning = layout.getStreamPartitioningColumns()
.flatMap(columns -> translateToNonConstantSymbols(columns, assignments, constants));
if (planWithTableNodePartitioning(session) && layout.getTablePartitioning().isPresent()) {
TablePartitioning tablePartitioning = layout.getTablePartitioning().get();
Set<ColumnHandle> assignmentsAndConstants = ImmutableSet.<ColumnHandle>builder()
.addAll(assignments.keySet())
.addAll(constants.keySet())
.build();
if (assignmentsAndConstants.containsAll(tablePartitioning.getPartitioningColumns())) {
List<RowExpression> arguments = tablePartitioning.getPartitioningColumns().stream()
.map(column -> assignments.containsKey(column) ? assignments.get(column) : constants.get(column))
.collect(toImmutableList());
return partitionedOn(tablePartitioning.getPartitioningHandle(), arguments, streamPartitioning);
}
}
if (streamPartitioning.isPresent()) {
return streamPartitionedOn(streamPartitioning.get());
}
return arbitraryPartition();
}
private static Optional<List<VariableReferenceExpression>> translateToNonConstantSymbols(
Set<ColumnHandle> columnHandles,
Map<ColumnHandle, VariableReferenceExpression> assignments,
Map<ColumnHandle, ConstantExpression> globalConstants)
{
// Strip off the constants from the partitioning columns (since those are not required for translation)
Set<ColumnHandle> constantsStrippedColumns = columnHandles.stream()
.filter(column -> !globalConstants.containsKey(column))
.collect(toImmutableSet());
ImmutableSet.Builder<VariableReferenceExpression> builder = ImmutableSet.builder();
for (ColumnHandle column : constantsStrippedColumns) {
VariableReferenceExpression translated = assignments.get(column);
if (translated == null) {
return Optional.empty();
}
builder.add(translated);
}
return Optional.of(ImmutableList.copyOf(builder.build()));
}
}
static boolean spillPossible(Session session, JoinType joinType)
{
if (!isSpillEnabled(session) || !isJoinSpillingEnabled(session)) {
return false;
}
switch (joinType) {
case INNER:
case LEFT:
return true;
case RIGHT:
case FULL:
// Currently there is no spill support for outer on the build side.
return false;
default:
throw new IllegalStateException("Unknown join type: " + joinType);
}
}
public static Optional<VariableReferenceExpression> filterIfMissing(Collection<VariableReferenceExpression> columns, VariableReferenceExpression column)
{
if (columns.contains(column)) {
return Optional.of(column);
}
return Optional.empty();
}
// Used to filter columns that are not exposed by join node
// Or, if they are part of the equalities, to translate them
// to the other symbol if that's exposed, instead.
public static Optional<VariableReferenceExpression> filterOrRewrite(Collection<VariableReferenceExpression> columns, List<EquiJoinClause> equalities, VariableReferenceExpression column)
{
// symbol is exposed directly, so no translation needed
if (columns.contains(column)) {
return Optional.of(column);
}
// if the column is part of the equality conditions and its counterpart
// is exposed, use that, instead
for (EquiJoinClause equality : equalities) {
if (equality.getLeft().equals(column) && columns.contains(equality.getRight())) {
return Optional.of(equality.getRight());
}
else if (equality.getRight().equals(column) && columns.contains(equality.getLeft())) {
return Optional.of(equality.getLeft());
}
}
return Optional.empty();
}
public static boolean arePartitionHandlesCompatibleForCoalesce(PartitioningHandle a, PartitioningHandle b, Metadata metadata, Session session)
{
return a.equals(b) || metadata.isRefinedPartitioningOver(session, a, b) || metadata.isRefinedPartitioningOver(session, b, a);
}
/**
* Extract all column constraints that require exactly one value or only null in their respective Domains.
* Returns an empty Optional if the Domain is none.
*/
public static <T> Optional<Map<T, ConstantExpression>> extractFixedValuesToConstantExpressions(TupleDomain<T> tupleDomain)
{
if (!tupleDomain.getDomains().isPresent()) {
return Optional.empty();
}
return Optional.of(tupleDomain.getDomains().get()
.entrySet().stream()
.filter(entry -> entry.getValue().isNullableSingleValue())
.collect(toLinkedMap(Map.Entry::getKey, entry -> new ConstantExpression(entry.getValue().getNullableSingleValue(), entry.getValue().getType()))));
}
}