PlanFragmenter.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner;
import com.facebook.presto.Session;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.Partitioning;
import com.facebook.presto.spi.plan.PartitioningScheme;
import com.facebook.presto.spi.plan.PlanFragmentId;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.planner.BasePlanFragmenter.FragmentProperties;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.facebook.presto.sql.planner.sanity.PlanChecker;
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
import com.google.common.collect.ImmutableList;
import javax.inject.Inject;
import java.util.Set;
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.ROOT_FRAGMENT_ID;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.finalizeSubPlan;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getOutputTableWriterNodeIds;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static java.util.Objects.requireNonNull;
/**
* Splits a logical plan into fragments that can be shipped and executed on distributed nodes
*/
public class PlanFragmenter
{
private final Metadata metadata;
private final NodePartitioningManager nodePartitioningManager;
private final QueryManagerConfig config;
private final PlanChecker distributedPlanChecker;
private final PlanChecker singleNodePlanChecker;
@Inject
public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig, FeaturesConfig featuresConfig, PlanCheckerProviderManager planCheckerProviderManager)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
this.config = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
this.distributedPlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), false, planCheckerProviderManager);
this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true, planCheckerProviderManager);
}
public SubPlan createSubPlans(Session session, Plan plan, boolean noExchange, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
VariableAllocator variableAllocator = new VariableAllocator(plan.getTypes().allVariables());
return createSubPlans(session, plan, noExchange, idAllocator, variableAllocator, warningCollector);
}
public SubPlan createSubPlans(Session session, Plan plan, boolean noExchange, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, WarningCollector warningCollector)
{
Fragmenter fragmenter = new Fragmenter(
session,
metadata,
plan.getStatsAndCosts(),
noExchange ? singleNodePlanChecker : distributedPlanChecker,
warningCollector,
idAllocator,
variableAllocator,
getOutputTableWriterNodeIds(plan.getRoot()));
FragmentProperties properties = new FragmentProperties(new PartitioningScheme(
Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()),
plan.getRoot().getOutputVariables()));
if (noExchange || isForceSingleNodeOutput(session)) {
properties = properties.setSingleNodeDistribution();
}
PlanNode root = SimplePlanRewriter.rewriteWith(fragmenter, plan.getRoot(), properties);
SubPlan subPlan = fragmenter.buildRootFragment(root, properties);
return finalizeSubPlan(subPlan, config, metadata, nodePartitioningManager, session, noExchange, warningCollector, subPlan.getFragment().getPartitioning());
}
private static class Fragmenter
extends BasePlanFragmenter
{
private int nextFragmentId = ROOT_FRAGMENT_ID + 1;
public Fragmenter(Session session, Metadata metadata, StatsAndCosts statsAndCosts, PlanChecker planChecker, WarningCollector warningCollector, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, Set<PlanNodeId> outputTableWriterNodeIds)
{
super(session, metadata, statsAndCosts, planChecker, warningCollector, idAllocator, variableAllocator, outputTableWriterNodeIds);
}
@Override
public PlanFragmentId nextFragmentId()
{
return new PlanFragmentId(nextFragmentId++);
}
}
}