AddLocalExchanges.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.sql.planner.optimizations;

import com.facebook.presto.Session;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ConstantProperty;
import com.facebook.presto.spi.GroupingProperty;
import com.facebook.presto.spi.LocalProperty;
import com.facebook.presto.spi.SortingProperty;
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.DeleteNode;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.EquiJoinClause;
import com.facebook.presto.spi.plan.InputDistribution;
import com.facebook.presto.spi.plan.JoinNode;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.MarkDistinctNode;
import com.facebook.presto.spi.plan.OrderingScheme;
import com.facebook.presto.spi.plan.OutputNode;
import com.facebook.presto.spi.plan.Partitioning;
import com.facebook.presto.spi.plan.PartitioningScheme;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.SemiJoinNode;
import com.facebook.presto.spi.plan.SortNode;
import com.facebook.presto.spi.plan.SpatialJoinNode;
import com.facebook.presto.spi.plan.StatisticAggregations;
import com.facebook.presto.spi.plan.TableFinishNode;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.spi.plan.TopNNode;
import com.facebook.presto.spi.plan.UnionNode;
import com.facebook.presto.spi.plan.WindowNode;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties;
import com.facebook.presto.sql.planner.plan.ApplyNode;
import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode;
import com.facebook.presto.sql.planner.plan.IndexJoinNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.LateralJoinNode;
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
import com.facebook.presto.sql.planner.plan.TableWriterMergeNode;
import com.facebook.presto.sql.planner.plan.TopNRowNumberNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.SystemSessionProperties.getTaskConcurrency;
import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount;
import static com.facebook.presto.SystemSessionProperties.getTaskWriterCount;
import static com.facebook.presto.SystemSessionProperties.isDistributedSortEnabled;
import static com.facebook.presto.SystemSessionProperties.isEnforceFixedDistributionForOutputOperator;
import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled;
import static com.facebook.presto.SystemSessionProperties.isNativeExecutionScaleWritersThreadsEnabled;
import static com.facebook.presto.SystemSessionProperties.isNativeJoinBuildPartitionEnforced;
import static com.facebook.presto.SystemSessionProperties.isQuickDistinctLimitEnabled;
import static com.facebook.presto.SystemSessionProperties.isSegmentedAggregationEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.operator.aggregation.AggregationUtils.hasSingleNodeExecutionPreference;
import static com.facebook.presto.operator.aggregation.AggregationUtils.isDecomposable;
import static com.facebook.presto.sql.TemporaryTableUtil.splitIntoPartialAndIntermediate;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.optimizations.StreamPreferredProperties.any;
import static com.facebook.presto.sql.planner.optimizations.StreamPreferredProperties.defaultParallelism;
import static com.facebook.presto.sql.planner.optimizations.StreamPreferredProperties.exactlyPartitionedOn;
import static com.facebook.presto.sql.planner.optimizations.StreamPreferredProperties.fixedParallelism;
import static com.facebook.presto.sql.planner.optimizations.StreamPreferredProperties.singleStream;
import static com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution.SINGLE;
import static com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.derivePropertiesRecursively;
import static com.facebook.presto.sql.planner.plan.ChildReplacer.replaceChildren;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.gatheringExchange;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.mergingExchange;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.partitionedExchange;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.roundRobinExchange;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.systemPartitionedExchange;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

public class AddLocalExchanges
        implements PlanOptimizer
{
    private final Metadata metadata;
    private final boolean nativeExecution;

    public AddLocalExchanges(Metadata metadata, boolean nativeExecution)
    {
        this.metadata = requireNonNull(metadata, "metadata is null");
        this.nativeExecution = nativeExecution;
    }

    @Override
    public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
    {
        PlanWithProperties result = new Rewriter(variableAllocator, idAllocator, session, nativeExecution).accept(plan, any());
        boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isLocal()).findFirst().isPresent();
        return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered);
    }

    private class Rewriter
            extends InternalPlanVisitor<PlanWithProperties, StreamPreferredProperties>
    {
        private final VariableAllocator variableAllocator;
        private final PlanNodeIdAllocator idAllocator;
        private final Session session;
        private final TypeProvider types;
        private final boolean nativeExecution;

        public Rewriter(VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, Session session, boolean nativeExecution)
        {
            this.variableAllocator = variableAllocator;
            this.types = TypeProvider.viewOf(variableAllocator.getVariables());
            this.idAllocator = idAllocator;
            this.session = session;
            this.nativeExecution = nativeExecution;
        }

        @Override
        public PlanWithProperties visitPlan(PlanNode node, StreamPreferredProperties parentPreferences)
        {
            return planAndEnforceChildren(
                    node,
                    parentPreferences.withoutPreference().withDefaultParallelism(session),
                    parentPreferences.withDefaultParallelism(session));
        }

        @Override
        public PlanWithProperties visitApply(ApplyNode node, StreamPreferredProperties parentPreferences)
        {
            throw new IllegalStateException("Unexpected node: " + node.getClass().getName());
        }

        @Override
        public PlanWithProperties visitLateralJoin(LateralJoinNode node, StreamPreferredProperties parentPreferences)
        {
            throw new IllegalStateException("Unexpected node: " + node.getClass().getName());
        }

        @Override
        public PlanWithProperties visitOutput(OutputNode node, StreamPreferredProperties parentPreferences)
        {
            return planAndEnforceChildren(
                    node,
                    any().withOrderSensitivity(),
                    any().withOrderSensitivity());
        }

        @Override
        public PlanWithProperties visitExplainAnalyze(ExplainAnalyzeNode node, StreamPreferredProperties parentPreferences)
        {
            // Although explain analyze discards all output, we want to maintain the behavior
            // of a normal output node, so declare the node to be order sensitive
            return planAndEnforceChildren(
                    node,
                    singleStream().withOrderSensitivity(),
                    singleStream().withOrderSensitivity());
        }

        //
        // Nodes that always require a single stream
        //

        @Override
        public PlanWithProperties visitSort(SortNode node, StreamPreferredProperties parentPreferences)
        {
            if (!node.getPartitionBy().isEmpty()) {
                return planSortWithPartition(node, parentPreferences);
            }
            return planSortWithoutPartition(node, parentPreferences);
        }

        private PlanWithProperties planSortWithPartition(SortNode node, StreamPreferredProperties parentPreferences)
        {
            checkArgument(!node.getPartitionBy().isEmpty());
            StreamPreferredProperties childRequirements = parentPreferences
                    .constrainTo(node.getSource().getOutputVariables())
                    .withDefaultParallelism(session)
                    .withPartitioning(node.getPartitionBy());

            PlanWithProperties child = planAndEnforce(node.getSource(), childRequirements, childRequirements);

            SortNode result = new SortNode(node.getSourceLocation(), idAllocator.getNextId(), child.getNode(), node.getOrderingScheme(), node.isPartial(), node.getPartitionBy());

            return deriveProperties(result, child.getProperties());
        }

        private PlanWithProperties planSortWithoutPartition(SortNode node, StreamPreferredProperties parentPreferences)
        {
            checkArgument(node.getPartitionBy().isEmpty());
            // Remove sort if the child is already sorted and in a single stream
            // TODO: extract to its own optimization after AddLocalExchanges once the
            // constraint optimization framework is in a better state to be extended
            PlanWithProperties childPlan = planAndEnforce(node.getSource(), any(), singleStream());
            if (childPlan.getProperties().isSingleStream() && childPlan.getProperties().isOrdered()) {
                OrderingScheme orderingScheme = node.getOrderingScheme();
                List<LocalProperty<VariableReferenceExpression>> desiredProperties = orderingScheme.getOrderByVariables().stream()
                        .map(variable -> new SortingProperty<>(variable, orderingScheme.getOrdering(variable)))
                        .collect(toImmutableList());
                if (LocalProperties.match(childPlan.getProperties().getLocalProperties(), desiredProperties).stream().noneMatch(Optional::isPresent)) {
                    return childPlan;
                }
            }

            if (isDistributedSortEnabled(session)) {
                PlanWithProperties sortPlan = planAndEnforceChildren(node, fixedParallelism(), fixedParallelism());

                if (!sortPlan.getProperties().isSingleStream()) {
                    SortNode sortNode = (SortNode) sortPlan.getNode();
                    return deriveProperties(
                            mergingExchange(
                                    idAllocator.getNextId(),
                                    LOCAL,
                                    new SortNode(
                                            sortNode.getSourceLocation(),
                                            sortNode.getId(),
                                            getOnlyElement(sortNode.getSources()),
                                            sortNode.getOrderingScheme(),
                                            true,
                                            sortNode.getPartitionBy()),
                                    node.getOrderingScheme()),
                            sortPlan.getProperties());
                }

                return sortPlan;
            }
            // sort requires that all data be in one stream
            // this node changes the input organization completely, so we do not pass through parent preferences
            return planAndEnforceChildren(node, singleStream(), defaultParallelism(session));
        }

        @Override
        public PlanWithProperties visitStatisticsWriterNode(StatisticsWriterNode node, StreamPreferredProperties context)
        {
            // analyze finish requires that all data be in one stream
            // this node changes the input organization completely, so we do not pass through parent preferences
            return planAndEnforceChildren(node, singleStream(), defaultParallelism(session));
        }

        @Override
        public PlanWithProperties visitTableFinish(TableFinishNode node, StreamPreferredProperties parentPreferences)
        {
            // table commit requires that all data be in one stream
            // this node changes the input organization completely, so we do not pass through parent preferences
            return planAndEnforceChildren(node, singleStream(), defaultParallelism(session));
        }

        @Override
        public PlanWithProperties visitTopN(TopNNode node, StreamPreferredProperties parentPreferences)
        {
            if (node.getStep().equals(TopNNode.Step.PARTIAL)) {
                return planAndEnforceChildren(
                        node,
                        parentPreferences.withoutPreference().withDefaultParallelism(session),
                        parentPreferences.withDefaultParallelism(session));
            }

            // final topN requires that all data be in one stream
            // also, a final changes the input organization completely, so we do not pass through parent preferences
            return planAndEnforceChildren(
                    node,
                    singleStream(),
                    defaultParallelism(session));
        }

        @Override
        public PlanWithProperties visitLimit(LimitNode node, StreamPreferredProperties parentPreferences)
        {
            if (node.isPartial()) {
                return planAndEnforceChildren(
                        node,
                        parentPreferences.withoutPreference().withDefaultParallelism(session),
                        parentPreferences.withDefaultParallelism(session));
            }

            // final limit requires that all data be in one stream
            return planAndEnforceChildren(
                    node,
                    singleStream(),
                    parentPreferences.withDefaultParallelism(session));
        }

        @Override
        public PlanWithProperties visitDistinctLimit(DistinctLimitNode node, StreamPreferredProperties parentPreferences)
        {
            // final limit requires that all data be in one stream
            StreamPreferredProperties requiredProperties;
            StreamPreferredProperties preferredProperties;
            if (node.isPartial()) {
                if (isQuickDistinctLimitEnabled(session)) {
                    PlanWithProperties source = accept(node.getSource(), defaultParallelism(session));
                    PlanWithProperties exchange = deriveProperties(
                            roundRobinExchange(idAllocator.getNextId(), LOCAL, source.getNode()),
                            source.getProperties());
                    return rebaseAndDeriveProperties(node, ImmutableList.of(exchange));
                }
                else {
                    requiredProperties = parentPreferences.withoutPreference().withDefaultParallelism(session);
                    preferredProperties = parentPreferences.withDefaultParallelism(session);
                }
            }
            else {
                // a final changes the input organization completely, so we do not pass through parent preferences
                requiredProperties = singleStream();
                preferredProperties = defaultParallelism(session);
            }

            return planAndEnforceChildren(node, requiredProperties, preferredProperties);
        }

        @Override
        public PlanWithProperties visitEnforceSingleRow(EnforceSingleRowNode node, StreamPreferredProperties parentPreferences)
        {
            return planAndEnforceChildren(node, singleStream(), defaultParallelism(session));
        }

        //
        // Nodes that require parallel streams to be partitioned
        //

        @Override
        public PlanWithProperties visitAggregation(AggregationNode node, StreamPreferredProperties parentPreferences)
        {
            checkState(node.getStep() == AggregationNode.Step.SINGLE, "step of aggregation is expected to be SINGLE, but it is %s", node.getStep());

            if (hasSingleNodeExecutionPreference(node, metadata.getFunctionAndTypeManager())) {
                return planAndEnforceChildren(node, singleStream(), defaultParallelism(session));
            }

            List<VariableReferenceExpression> groupingKeys = node.getGroupingKeys();
            if (node.hasDefaultOutput()) {
                checkState(isDecomposable(node, metadata.getFunctionAndTypeManager()));

                // Put fixed local exchange directly below final aggregation to ensure that final and partial aggregations are separated by exchange (in a local runner mode)
                // This is required so that default outputs from multiple instances of partial aggregations are passed to a single final aggregation.
                PlanWithProperties child = planAndEnforce(node.getSource(), any(), defaultParallelism(session));
                PlanWithProperties exchange = deriveProperties(
                        systemPartitionedExchange(
                                idAllocator.getNextId(),
                                LOCAL,
                                child.getNode(),
                                groupingKeys,
                                Optional.empty()),
                        child.getProperties());
                return rebaseAndDeriveProperties(node, ImmutableList.of(exchange));
            }

            StreamPreferredProperties childRequirements = parentPreferences
                    .constrainTo(node.getSource().getOutputVariables())
                    .withDefaultParallelism(session)
                    .withPartitioning(groupingKeys);

            PlanWithProperties child = planAndEnforce(node.getSource(), childRequirements, childRequirements);

            List<VariableReferenceExpression> preGroupedSymbols = ImmutableList.of();
            // Logic in LocalProperties.match(localProperties, groupingKeys)
            // 1. Extract the longest prefix of localProperties to a set that is a subset of groupingKeys
            // 2. Iterate grouped-by keys and add the elements that's not in the set to the result
            // Result would be a List of one element: Optional<GroupingProperty>, GroupingProperty would contain one/multiple elements from step 2
            // Eg:
            // [A, B] [(B, A)]     ->   List.of(Optional.empty())
            // [A, B] [B]          ->   List.of(Optional.of(GroupingProperty(B)))
            // [A, B] [A]          ->   List.of(Optional.empty())
            // [A, B] [(A, C)]     ->   List.of(Optional.of(GroupingProperty(C)))
            // [A, B] [(D, A, C)]  ->   List.of(Optional.of(GroupingProperty(D, C)))
            List<Optional<LocalProperty<VariableReferenceExpression>>> matchResult = LocalProperties.match(child.getProperties().getLocalProperties(), LocalProperties.grouped(groupingKeys));
            if (!matchResult.get(0).isPresent()) {
                // !isPresent() indicates the property was satisfied completely
                preGroupedSymbols = groupingKeys;
            }
            else if (matchResult.get(0).get().getColumns().size() < groupingKeys.size() && isSegmentedAggregationEnabled(session)) {
                // If the result size = original groupingKeys size: all grouping keys are not pre-grouped, can't enable segmented aggregation
                // Otherwise: partial grouping keys are pre-grouped, can enable segmented aggregation, the result represents the grouping keys that's not pre-grouped
                preGroupedSymbols = groupingKeys.stream().filter(groupingKey -> !matchResult.get(0).get().getColumns().contains(groupingKey)).collect(toImmutableList());
            }

            AggregationNode result = new AggregationNode(
                    node.getSourceLocation(),
                    node.getId(),
                    child.getNode(),
                    node.getAggregations(),
                    node.getGroupingSets(),
                    preGroupedSymbols,
                    node.getStep(),
                    node.getHashVariable(),
                    node.getGroupIdVariable(),
                    node.getAggregationId());

            return deriveProperties(result, child.getProperties());
        }

        @Override
        public PlanWithProperties visitWindow(WindowNode node, StreamPreferredProperties parentPreferences)
        {
            StreamPreferredProperties childRequirements = parentPreferences
                    .constrainTo(node.getSource().getOutputVariables())
                    .withDefaultParallelism(session)
                    .withPartitioning(node.getPartitionBy());

            PlanWithProperties child = planAndEnforce(node.getSource(), childRequirements, childRequirements);

            List<LocalProperty<VariableReferenceExpression>> desiredProperties = new ArrayList<>();
            if (!node.getPartitionBy().isEmpty()) {
                desiredProperties.add(new GroupingProperty<>(node.getPartitionBy()));
            }
            node.getOrderingScheme().ifPresent(orderingScheme ->
                    orderingScheme.getOrderByVariables().stream()
                            .map(variable -> new SortingProperty<>(variable, orderingScheme.getOrdering(variable)))
                            .forEach(desiredProperties::add));
            Iterator<Optional<LocalProperty<VariableReferenceExpression>>> matchIterator = LocalProperties.match(child.getProperties().getLocalProperties(), desiredProperties).iterator();

            Set<VariableReferenceExpression> prePartitionedInputs = ImmutableSet.of();
            if (!node.getPartitionBy().isEmpty()) {
                Optional<LocalProperty<VariableReferenceExpression>> groupingRequirement = matchIterator.next();
                Set<VariableReferenceExpression> unPartitionedInputs = groupingRequirement.map(LocalProperty::getColumns).orElse(ImmutableSet.of());
                prePartitionedInputs = node.getPartitionBy().stream()
                        .filter(variable -> !unPartitionedInputs.contains(variable))
                        .collect(toImmutableSet());
            }

            int preSortedOrderPrefix = 0;
            if (prePartitionedInputs.equals(ImmutableSet.copyOf(node.getPartitionBy()))) {
                while (matchIterator.hasNext() && !matchIterator.next().isPresent()) {
                    preSortedOrderPrefix++;
                }
            }

            WindowNode result = new WindowNode(
                    node.getSourceLocation(),
                    node.getId(),
                    child.getNode(),
                    node.getSpecification(),
                    node.getWindowFunctions(),
                    node.getHashVariable(),
                    prePartitionedInputs,
                    preSortedOrderPrefix);

            return deriveProperties(result, child.getProperties());
        }

        @Override
        public PlanWithProperties visitDelete(DeleteNode node, StreamPreferredProperties parentPreferences)
        {
            if (!node.getInputDistribution().isPresent()) {
                return visitPlan(node, parentPreferences);
            }
            InputDistribution inputDistribution = node.getInputDistribution().get();
            StreamPreferredProperties childRequirements = parentPreferences
                    .constrainTo(node.getSource().getOutputVariables())
                    .withDefaultParallelism(session)
                    .withPartitioning(inputDistribution.getPartitionBy());

            PlanWithProperties child = planAndEnforce(node.getSource(), childRequirements, childRequirements);
            DeleteNode result = new DeleteNode(
                    node.getSourceLocation(),
                    idAllocator.getNextId(),
                    node.getStatsEquivalentPlanNode(),
                    child.getNode(),
                    node.getRowId(),
                    node.getOutputVariables(),
                    node.getInputDistribution());

            return deriveProperties(result, child.getProperties());
        }

        @Override
        public PlanWithProperties visitMarkDistinct(MarkDistinctNode node, StreamPreferredProperties parentPreferences)
        {
            // mark distinct requires that all data partitioned
            StreamPreferredProperties childRequirements = parentPreferences
                    .constrainTo(node.getSource().getOutputVariables())
                    .withDefaultParallelism(session)
                    .withPartitioning(node.getDistinctVariables());

            PlanWithProperties child = planAndEnforce(node.getSource(), childRequirements, childRequirements);

            MarkDistinctNode result = new MarkDistinctNode(
                    node.getSourceLocation(),
                    node.getId(),
                    child.getNode(),
                    node.getMarkerVariable(),
                    pruneMarkDistinctVariables(node, child.getProperties().getLocalProperties()),
                    node.getHashVariable());

            return deriveProperties(result, child.getProperties());
        }

        /**
         * Prune redundant distinct symbols to reduce CPU cost of hashing corresponding values and amount of memory
         * needed to store all the distinct values.
         * <p>
         * Consider the following plan,
         * <pre>
         *  - MarkDistinctNode (unique, c1, c2)
         *      - Join
         *          - AssignUniqueId (unique)
         *              - probe (c1, c2)
         *          - build
         * </pre>
         * In this case MarkDistinctNode (unique, c1, c2) is equivalent to MarkDistinctNode (unique),
         * because if two rows match on `unique`, they must match on `c1` and `c2` as well.
         * <p>
         * More generally, any distinct symbol that is functionally dependent on a subset of
         * other distinct symbols can be dropped.
         * <p>
         * Ideally, this logic would be encapsulated in a separate rule, but currently no rule other
         * than AddLocalExchanges can reason about local properties.
         */
        private List<VariableReferenceExpression> pruneMarkDistinctVariables(MarkDistinctNode node, List<LocalProperty<VariableReferenceExpression>> localProperties)
        {
            if (localProperties.isEmpty()) {
                return node.getDistinctVariables();
            }

            // Identify functional dependencies between distinct symbols: in the list of local properties any constant
            // symbol is functionally dependent on the set of symbols that appears earlier.
            ImmutableSet.Builder<VariableReferenceExpression> redundantVariablesBuilder = ImmutableSet.builder();
            for (LocalProperty<VariableReferenceExpression> property : localProperties) {
                if (property instanceof ConstantProperty) {
                    redundantVariablesBuilder.add(((ConstantProperty<VariableReferenceExpression>) property).getColumn());
                }
                else if (!node.getDistinctVariables().containsAll(property.getColumns())) {
                    // Ran into a non-distinct symbol. There will be no more symbols that are functionally dependent on distinct symbols exclusively.
                    break;
                }
            }

            Set<VariableReferenceExpression> redundantVariables = redundantVariablesBuilder.build();
            List<VariableReferenceExpression> remainingSymbols = node.getDistinctVariables().stream()
                    .filter(variable -> !redundantVariables.contains(variable))
                    .collect(toImmutableList());
            if (remainingSymbols.isEmpty()) {
                // This happens when all distinct symbols are constants.
                // In that case, keep the first symbol (don't drop them all).
                return ImmutableList.of(node.getDistinctVariables().get(0));
            }
            return remainingSymbols;
        }

        @Override
        public PlanWithProperties visitRowNumber(RowNumberNode node, StreamPreferredProperties parentPreferences)
        {
            StreamPreferredProperties requiredProperties = parentPreferences.withDefaultParallelism(session);
            // final row number requires that all data be partitioned
            if (!node.isPartial()) {
                requiredProperties = requiredProperties.withPartitioning(node.getPartitionBy());
            }
            return planAndEnforceChildren(node, requiredProperties, requiredProperties);
        }

        @Override
        public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, StreamPreferredProperties parentPreferences)
        {
            StreamPreferredProperties requiredProperties = parentPreferences.withDefaultParallelism(session);

            // final topN row number requires that all data be partitioned
            if (!node.isPartial()) {
                requiredProperties = requiredProperties.withPartitioning(node.getPartitionBy());
            }

            return planAndEnforceChildren(node, requiredProperties, requiredProperties);
        }

        //
        // Table Writer
        //

        @Override
        public PlanWithProperties visitTableWriter(TableWriterNode tableWrite, StreamPreferredProperties parentPreferences)
        {
            // When table is partitioned and single writer per partition is required (for example a bucketed table in Hive connector)
            if (tableWrite.isSingleWriterPerPartitionRequired()) {
                // special case when a single table writer per task is requested
                if (getTaskPartitionedWriterCount(session) == 1) {
                    return planAndEnforceChildren(tableWrite, singleStream(), defaultParallelism(session));
                }
                PlanWithProperties source = accept(tableWrite.getSource(), defaultParallelism(session));
                PlanWithProperties exchange = deriveProperties(
                        partitionedExchange(
                                idAllocator.getNextId(),
                                LOCAL,
                                source.getNode(),
                                tableWrite.getTablePartitioningScheme().get()),
                        source.getProperties());
                return planTableWriteWithTableWriteMerge(tableWrite, exchange);
            }

            // special case when a single table writer per task is requested
            if (getTaskWriterCount(session) == 1) {
                return planAndEnforceChildren(tableWrite, singleStream(), defaultParallelism(session));
            }

            // Writer thread scaling enabled and the output table allows multiple writers per partition (for example non bucketed table in Hive connector)
            if (nativeExecution && isNativeExecutionScaleWritersThreadsEnabled(session)) {
                PlanWithProperties source = accept(tableWrite.getSource(), defaultParallelism(session));
                PartitioningScheme partitioningScheme;
                if (tableWrite.getTablePartitioningScheme().isPresent()) {
                    // Partitioning scheme is present and more than a single writer per partition is allowed (for example when table is not bucketed but partitioned in Hive connector)
                    partitioningScheme = tableWrite.getTablePartitioningScheme().get();
                    verify(partitioningScheme.isScaleWriters());
                }
                else {
                    // When partitioning scheme is not present (for example when table is not partitioned and not bucketed in Hive connector)
                    partitioningScheme = new PartitioningScheme(
                            Partitioning.create(FIXED_ARBITRARY_DISTRIBUTION, ImmutableList.of()),
                            source.getNode().getOutputVariables(),
                            true);
                }
                PlanWithProperties exchange = deriveProperties(
                        partitionedExchange(
                                idAllocator.getNextId(),
                                LOCAL,
                                source.getNode(),
                                partitioningScheme),
                        source.getProperties());
                return planTableWriteWithTableWriteMerge(tableWrite, exchange);
            }

            // Writer thread scaling is disabled and there is no strict partitioning requirement
            int taskWriterCount = getTaskWriterCount(session);
            int taskConcurrency = getTaskConcurrency(session);
            if (taskWriterCount == taskConcurrency) {
                // When table write concurrency is equal to task concurrency do not add en extra local exchange for improved efficiency
                return planTableWriteWithTableWriteMerge(
                        tableWrite,
                        // When source distribution is MULTIPLE (for example a TableScan) add an exchange to achieve a fixed number of writer threads
                        planAndEnforce(tableWrite.getSource(), fixedParallelism(), fixedParallelism()));
            }
            else {
                // When concurrency settings are different add an exchange to achieve a specific level of parallelism for table write
                PlanWithProperties source = accept(tableWrite.getSource(), defaultParallelism(session));
                PlanWithProperties exchange = deriveProperties(
                        roundRobinExchange(idAllocator.getNextId(), LOCAL, source.getNode()),
                        source.getProperties());
                return planTableWriteWithTableWriteMerge(tableWrite, exchange);
            }
        }

        private PlanWithProperties planTableWriteWithTableWriteMerge(TableWriterNode tableWrite, PlanWithProperties source)
        {
            Optional<StatisticAggregations.Parts> statisticAggregations = tableWrite
                    .getStatisticsAggregation()
                    .map(aggregations -> splitIntoPartialAndIntermediate(
                            aggregations,
                            variableAllocator,
                            metadata.getFunctionAndTypeManager()));

            PlanWithProperties tableWriteWithProperties = deriveProperties(
                    new TableWriterNode(
                            tableWrite.getSourceLocation(),
                            tableWrite.getId(),
                            tableWrite.getStatsEquivalentPlanNode(),
                            source.getNode(),
                            tableWrite.getTarget(),
                            variableAllocator.newVariable("partialrowcount", BIGINT),
                            variableAllocator.newVariable("partialfragments", VARBINARY),
                            variableAllocator.newVariable("partialcontext", VARBINARY),
                            tableWrite.getColumns(),
                            tableWrite.getColumnNames(),
                            tableWrite.getNotNullColumnVariables(),
                            tableWrite.getTablePartitioningScheme(),
                            statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation),
                            tableWrite.getTaskCountIfScaledWriter(),
                            tableWrite.getIsTemporaryTableWriter()),
                    source.getProperties());

            PlanWithProperties gatherExchangeWithProperties = deriveProperties(
                    gatheringExchange(
                            idAllocator.getNextId(),
                            LOCAL,
                            tableWriteWithProperties.getNode()),
                    tableWriteWithProperties.getProperties());

            return deriveProperties(
                    new TableWriterMergeNode(
                            tableWrite.getSourceLocation(),
                            idAllocator.getNextId(),
                            gatherExchangeWithProperties.getNode(),
                            tableWrite.getRowCountVariable(),
                            tableWrite.getFragmentVariable(),
                            tableWrite.getTableCommitContextVariable(),
                            statisticAggregations.map(StatisticAggregations.Parts::getIntermediateAggregation)),
                    gatherExchangeWithProperties.getProperties());
        }

        @Override
        public PlanWithProperties visitTableWriteMerge(TableWriterMergeNode node, StreamPreferredProperties context)
        {
            throw new IllegalArgumentException("Unexpected TableWriterMergeNode");
        }

        //
        // Exchanges
        //

        @Override
        public PlanWithProperties visitExchange(ExchangeNode node, StreamPreferredProperties parentPreferences)
        {
            checkArgument(!node.getScope().isLocal(), "AddLocalExchanges can not process a plan containing a local exchange");
            // this node changes the input organization completely, so we do not pass through parent preferences
            if (node.getOrderingScheme().isPresent()) {
                return planAndEnforceChildren(
                        node,
                        any().withOrderSensitivity(),
                        any().withOrderSensitivity());
            }
            return planAndEnforceChildren(
                    node,
                    isEnforceFixedDistributionForOutputOperator(session) ? fixedParallelism() : any(),
                    defaultParallelism(session));
        }

        @Override
        public PlanWithProperties visitUnion(UnionNode node, StreamPreferredProperties preferredProperties)
        {
            // Union is replaced with an exchange which does not retain streaming properties from the children
            List<PlanWithProperties> sourcesWithProperties = node.getSources().stream()
                    .map(source -> accept(source, defaultParallelism(session)))
                    .collect(toImmutableList());

            List<PlanNode> sources = sourcesWithProperties.stream()
                    .map(PlanWithProperties::getNode)
                    .collect(toImmutableList());

            List<StreamProperties> inputProperties = sourcesWithProperties.stream()
                    .map(PlanWithProperties::getProperties)
                    .collect(toImmutableList());

            List<List<VariableReferenceExpression>> inputLayouts = new ArrayList<>(sources.size());
            for (int i = 0; i < sources.size(); i++) {
                inputLayouts.add(node.sourceOutputLayout(i));
            }

            if (preferredProperties.isSingleStreamPreferred()) {
                ExchangeNode exchangeNode = new ExchangeNode(
                        node.getSourceLocation(),
                        idAllocator.getNextId(),
                        GATHER,
                        LOCAL,
                        new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), node.getOutputVariables()),
                        sources,
                        inputLayouts,
                        false,
                        Optional.empty());
                return deriveProperties(exchangeNode, inputProperties);
            }

            Optional<List<VariableReferenceExpression>> preferredPartitionColumns = preferredProperties.getPartitioningColumns();
            if (preferredPartitionColumns.isPresent()) {
                ExchangeNode exchangeNode = new ExchangeNode(
                        node.getSourceLocation(),
                        idAllocator.getNextId(),
                        REPARTITION,
                        LOCAL,
                        new PartitioningScheme(
                                Partitioning.create(FIXED_HASH_DISTRIBUTION, preferredPartitionColumns.get()),
                                node.getOutputVariables()),
                        sources,
                        inputLayouts,
                        false,
                        Optional.empty());
                return deriveProperties(exchangeNode, inputProperties);
            }

            // multiple streams preferred
            ExchangeNode result = new ExchangeNode(
                    node.getSourceLocation(),
                    idAllocator.getNextId(),
                    REPARTITION,
                    LOCAL,
                    new PartitioningScheme(Partitioning.create(FIXED_ARBITRARY_DISTRIBUTION, ImmutableList.of()), node.getOutputVariables()),
                    sources,
                    inputLayouts,
                    false,
                    Optional.empty());
            ExchangeNode exchangeNode = result;

            return deriveProperties(exchangeNode, inputProperties);
        }

        //
        // Joins
        //

        @Override
        public PlanWithProperties visitJoin(JoinNode node, StreamPreferredProperties parentPreferences)
        {
            // Java-based implementation of spilling in join requires constant and known number of
            // LookupJoinOperator's, especially for broadcast joins, when LookupJoinOperator's can be SOURCE
            // distributed. Native implementation doesn't have this limitation.
            // Add LocalExchange with ARBITRARY distribution below join probe source to satisfy that requirement
            // for Java-based execution only.
            PlanWithProperties probe;
            if (isSpillEnabled(session) && isJoinSpillingEnabled(session) && !nativeExecution) {
                probe = planAndEnforce(
                        node.getLeft(),
                        fixedParallelism(),
                        parentPreferences.constrainTo(node.getLeft().getOutputVariables()).withFixedParallelism());
            }
            else {
                probe = planAndEnforce(
                        node.getLeft(),
                        defaultParallelism(session),
                        parentPreferences.constrainTo(node.getLeft().getOutputVariables()).withDefaultParallelism(session));
            }

            // this build consumes the input completely, so we do not pass through parent preferences
            List<VariableReferenceExpression> buildHashVariables = node.getCriteria().stream()
                    .map(EquiJoinClause::getRight)
                    .collect(toImmutableList());
            StreamPreferredProperties buildPreference;
            if (getTaskConcurrency(session) > 1) {
                if (nativeExecution && !isNativeJoinBuildPartitionEnforced(session)) {
                    buildPreference = defaultParallelism(session);
                }
                else {
                    buildPreference = exactlyPartitionedOn(buildHashVariables);
                }
            }
            else {
                buildPreference = singleStream();
            }
            PlanWithProperties build = planAndEnforce(node.getRight(), buildPreference, buildPreference);

            return rebaseAndDeriveProperties(node, ImmutableList.of(probe, build));
        }

        @Override
        public PlanWithProperties visitSemiJoin(SemiJoinNode node, StreamPreferredProperties parentPreferences)
        {
            PlanWithProperties source = planAndEnforce(
                    node.getSource(),
                    defaultParallelism(session),
                    parentPreferences.constrainTo(node.getSource().getOutputVariables()).withDefaultParallelism(session));

            // this filter source consumes the input completely, so we do not pass through parent preferences
            StreamPreferredProperties filteringPreference = nativeExecution ? defaultParallelism(session) : singleStream();
            PlanWithProperties filteringSource = planAndEnforce(node.getFilteringSource(), filteringPreference, filteringPreference);

            return rebaseAndDeriveProperties(node, ImmutableList.of(source, filteringSource));
        }

        @Override
        public PlanWithProperties visitSpatialJoin(SpatialJoinNode node, StreamPreferredProperties parentPreferences)
        {
            PlanWithProperties probe = planAndEnforce(
                    node.getLeft(),
                    defaultParallelism(session),
                    parentPreferences.constrainTo(node.getLeft().getOutputVariables())
                            .withDefaultParallelism(session));

            PlanWithProperties build = planAndEnforce(node.getRight(), singleStream(), singleStream());

            return rebaseAndDeriveProperties(node, ImmutableList.of(probe, build));
        }

        @Override
        public PlanWithProperties visitIndexJoin(IndexJoinNode node, StreamPreferredProperties parentPreferences)
        {
            PlanWithProperties probe = planAndEnforce(
                    node.getProbeSource(),
                    defaultParallelism(session),
                    parentPreferences.constrainTo(node.getProbeSource().getOutputVariables()).withDefaultParallelism(session));

            // index source does not support local parallel and must produce a single stream
            StreamProperties indexStreamProperties = derivePropertiesRecursively(node.getIndexSource(), metadata, session, nativeExecution);
            checkArgument(indexStreamProperties.getDistribution() == SINGLE, "index source must be single stream");
            PlanWithProperties index = new PlanWithProperties(node.getIndexSource(), indexStreamProperties);

            return rebaseAndDeriveProperties(node, ImmutableList.of(probe, index));
        }

        //
        // Helpers
        //

        private PlanWithProperties planAndEnforceChildren(PlanNode node, StreamPreferredProperties requiredProperties, StreamPreferredProperties preferredProperties)
        {
            // plan and enforce each child, but strip any requirement not in terms of symbols produced from the child
            // Note: this assumes the child uses the same symbols as the parent
            List<PlanWithProperties> children = node.getSources().stream()
                    .map(source -> planAndEnforce(
                            source,
                            requiredProperties.constrainTo(source.getOutputVariables()),
                            preferredProperties.constrainTo(source.getOutputVariables())))
                    .collect(toImmutableList());

            return rebaseAndDeriveProperties(node, children);
        }

        private PlanWithProperties planAndEnforce(PlanNode node, StreamPreferredProperties requiredProperties, StreamPreferredProperties preferredProperties)
        {
            // verify properties are in terms of symbols produced by the node
            checkArgument(requiredProperties.getPartitioningColumns().map(node.getOutputVariables()::containsAll).orElse(true));
            checkArgument(preferredProperties.getPartitioningColumns().map(node.getOutputVariables()::containsAll).orElse(true));

            // plan the node using the preferred properties
            PlanWithProperties result = accept(node, preferredProperties);

            // enforce the required properties
            result = enforce(result, requiredProperties);

            checkState(requiredProperties.isSatisfiedBy(result.getProperties()), "required properties not enforced");
            return result;
        }

        private PlanWithProperties enforce(PlanWithProperties planWithProperties, StreamPreferredProperties requiredProperties)
        {
            if (requiredProperties.isSatisfiedBy(planWithProperties.getProperties())) {
                return planWithProperties;
            }

            if (requiredProperties.isSingleStreamPreferred()) {
                ExchangeNode exchangeNode = gatheringExchange(idAllocator.getNextId(), LOCAL, planWithProperties.getNode());
                return deriveProperties(exchangeNode, planWithProperties.getProperties());
            }

            Optional<List<VariableReferenceExpression>> requiredPartitionColumns = requiredProperties.getPartitioningColumns();
            if (!requiredPartitionColumns.isPresent()) {
                // unpartitioned parallel streams required
                return deriveProperties(
                        roundRobinExchange(idAllocator.getNextId(), LOCAL, planWithProperties.getNode()),
                        planWithProperties.getProperties());
            }

            if (requiredProperties.isParallelPreferred()) {
                // partitioned parallel streams required
                ExchangeNode exchangeNode = systemPartitionedExchange(
                        idAllocator.getNextId(),
                        LOCAL,
                        planWithProperties.getNode(),
                        requiredPartitionColumns.get(),
                        Optional.empty());
                return deriveProperties(exchangeNode, planWithProperties.getProperties());
            }

            // no explicit parallel requirement, so gather to a single stream
            ExchangeNode exchangeNode = gatheringExchange(
                    idAllocator.getNextId(),
                    LOCAL,
                    planWithProperties.getNode());
            return deriveProperties(exchangeNode, planWithProperties.getProperties());
        }

        private PlanWithProperties rebaseAndDeriveProperties(PlanNode node, List<PlanWithProperties> children)
        {
            PlanNode result = replaceChildren(
                    node,
                    children.stream()
                            .map(PlanWithProperties::getNode)
                            .collect(toList()));

            List<StreamProperties> inputProperties = children.stream()
                    .map(PlanWithProperties::getProperties)
                    .collect(toImmutableList());

            return deriveProperties(result, inputProperties);
        }

        private PlanWithProperties deriveProperties(PlanNode result, StreamProperties inputProperties)
        {
            return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session, nativeExecution));
        }

        private PlanWithProperties deriveProperties(PlanNode result, List<StreamProperties> inputProperties)
        {
            return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session, nativeExecution));
        }

        private PlanWithProperties accept(PlanNode node, StreamPreferredProperties context)
        {
            PlanWithProperties result = node.accept(this, context);
            // TableWriter and TableWriterMergeNode has different output
            boolean passStatsEquivalentPlanNode = !(node instanceof TableWriterNode && result.getNode() instanceof TableWriterMergeNode);
            return new PlanWithProperties(
                    passStatsEquivalentPlanNode ? result.getNode().assignStatsEquivalentPlanNode(node.getStatsEquivalentPlanNode()) : result.getNode(),
                    result.getProperties());
        }
    }

    private static class PlanWithProperties
    {
        private final PlanNode node;
        private final StreamProperties properties;

        public PlanWithProperties(PlanNode node, StreamProperties properties)
        {
            this.node = requireNonNull(node, "node is null");
            this.properties = requireNonNull(properties, "StreamProperties is null");
        }

        public PlanNode getNode()
        {
            return node;
        }

        public StreamProperties getProperties()
        {
            return properties;
        }
    }
}