AddExchangesForSingleNodeExecution.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.sql.planner.optimizations;

import com.facebook.presto.Session;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.Partitioning;
import com.facebook.presto.spi.plan.PartitioningScheme;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.TableFinishNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.relation.ConstantExpression;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.plan.ChildReplacer;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
import com.google.common.collect.ImmutableList;

import java.util.Optional;

import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.sql.planner.PlannerUtils.containsSystemTableScan;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.iterative.rule.PickTableLayout.pushPredicateIntoTableScan;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.gatheringExchange;
import static java.util.Objects.requireNonNull;

public class AddExchangesForSingleNodeExecution
        implements PlanOptimizer
{
    private final Metadata metadata;

    public AddExchangesForSingleNodeExecution(Metadata metadata)
    {
        this.metadata = requireNonNull(metadata, "metadata is null");
    }

    @Override
    public boolean isEnabled(Session session)
    {
        return isSingleNodeExecutionEnabled(session);
    }

    @Override
    public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
    {
        if (isEnabled(session)) {
            AddExchangesForSingleNodeExecution.Rewriter rewriter = new AddExchangesForSingleNodeExecution.Rewriter(idAllocator, metadata, session);
            PlanNode rewrittenPlan = SimplePlanRewriter.rewriteWith(rewriter, plan, null);
            return PlanOptimizerResult.optimizerResult(rewrittenPlan, rewriter.isPlanChanged());
        }
        return PlanOptimizerResult.optimizerResult(plan, false);
    }

    private class Rewriter
            extends SimplePlanRewriter<Void>
    {
        private final PlanNodeIdAllocator idAllocator;
        private final Metadata metadata;
        private final Session session;
        private boolean planChanged;

        private Rewriter(PlanNodeIdAllocator idAllocator, Metadata metadata, Session session)
        {
            this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
            this.metadata = requireNonNull(metadata, "metadata is null");
            this.session = requireNonNull(session, "session is null");
        }

        public boolean isPlanChanged()
        {
            return planChanged;
        }

        @Override
        public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
        {
            PlanNode plan = pushPredicateIntoTableScan(node, new ConstantExpression(true, BOOLEAN), true, session, idAllocator, metadata);
            // Presto Java and Presto Native use different hash functions for partitioning
            // An additional exchange makes sure the data flows through a native worker in case it need to be partitioned for downstream processing
            if (containsSystemTableScan(plan)) {
                plan = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, plan);
            }
            return plan;
        }

        @Override
        public PlanNode visitExplainAnalyze(ExplainAnalyzeNode node, RewriteContext<Void> context)
        {
            return addGatherExchange(node, context);
        }

        @Override
        public PlanNode visitTableFinish(TableFinishNode node, RewriteContext<Void> context)
        {
            return addGatherExchange(node, context);
        }

        @Override
        public PlanNode visitStatisticsWriterNode(StatisticsWriterNode node, RewriteContext<Void> context)
        {
            return addGatherExchange(node, context);
        }

        private PlanNode addGatherExchange(PlanNode node, RewriteContext<Void> context)
        {
            PlanNode child = node.getSources().get(0).accept(this, context);

            ExchangeNode gather;
            // In case the child is an exchange, don't add another exchange. Instead, convert it to gather exchange.
            if (child instanceof ExchangeNode) {
                ExchangeNode exchangeNode = (ExchangeNode) child;
                gather = new ExchangeNode(
                        exchangeNode.getSourceLocation(),
                        idAllocator.getNextId(),
                        GATHER,
                        REMOTE_STREAMING,
                        new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), exchangeNode.getOutputVariables()),
                        exchangeNode.getSources(),
                        exchangeNode.getInputs(),
                        true,
                        Optional.empty());
            }
            else {
                gather = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, child);
            }
            planChanged = true;
            return ChildReplacer.replaceChildren(node, ImmutableList.of(gather));
        }
    }
}