GroupedExecutionTagger.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner;
import com.facebook.presto.Session;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.TableLayout;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.JoinNode;
import com.facebook.presto.spi.plan.JoinType;
import com.facebook.presto.spi.plan.MarkDistinctNode;
import com.facebook.presto.spi.plan.MergeJoinNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.spi.plan.WindowNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.TopNRowNumberNode;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_PLAN_ERROR;
import static com.facebook.presto.spi.connector.ConnectorCapabilities.SUPPORTS_PAGE_SINK_COMMIT;
import static com.facebook.presto.spi.connector.ConnectorCapabilities.SUPPORTS_REWINDABLE_SPLIT_SOURCE;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
class GroupedExecutionTagger
extends InternalPlanVisitor<GroupedExecutionTagger.GroupedExecutionProperties, Void>
{
private final Session session;
private final Metadata metadata;
private final NodePartitioningManager nodePartitioningManager;
private final boolean groupedExecutionEnabled;
public GroupedExecutionTagger(Session session, Metadata metadata, NodePartitioningManager nodePartitioningManager)
{
this.session = requireNonNull(session, "session is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
this.groupedExecutionEnabled = isGroupedExecutionEnabled(session);
}
@Override
public GroupedExecutionTagger.GroupedExecutionProperties visitPlan(PlanNode node, Void context)
{
if (node.getSources().isEmpty()) {
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
}
return processChildren(node);
}
@Override
public GroupedExecutionTagger.GroupedExecutionProperties visitJoin(JoinNode node, Void context)
{
GroupedExecutionTagger.GroupedExecutionProperties left = node.getLeft().accept(this, null);
GroupedExecutionTagger.GroupedExecutionProperties right = node.getRight().accept(this, null);
if (!node.getDistributionType().isPresent() || !groupedExecutionEnabled) {
// This is possible when the optimizers is invoked with `noExchange` set to true.
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
}
if ((node.getType() == JoinType.RIGHT || node.getType() == JoinType.FULL) && !right.currentNodeCapable) {
// For a plan like this, if the fragment participates in grouped execution,
// the LookupOuterOperator corresponding to the RJoin will not work execute properly.
//
// * The operator has to execute as not-grouped because it can only look at the "used" flags in
// join build after all probe has finished.
// * The operator has to execute as grouped the subsequent LJoin expects that incoming
// operators are grouped. Otherwise, the LJoin won't be able to throw out the build side
// for each group as soon as the group completes.
//
// LJoin
// / \
// RJoin Scan
// / \
// Scan Remote
//
// TODO:
// The RJoin can still execute as grouped if there is no subsequent operator that depends
// on the RJoin being executed in a grouped manner. However, this is not currently implemented.
// Support for this scenario is already implemented in the execution side.
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
}
switch (node.getDistributionType().get()) {
case REPLICATED:
// Broadcast join maintains partitioning for the left side.
// Right side of a broadcast is not capable of grouped execution because it always comes from a remote exchange.
checkState(!right.currentNodeCapable);
return left;
case PARTITIONED:
if (left.currentNodeCapable && right.currentNodeCapable) {
checkState(left.totalLifespans == right.totalLifespans, format("Mismatched number of lifespans on left(%s) and right(%s) side of join", left.totalLifespans, right.totalLifespans));
return new GroupedExecutionTagger.GroupedExecutionProperties(
true,
true,
ImmutableList.<PlanNodeId>builder()
.addAll(left.capableTableScanNodes)
.addAll(right.capableTableScanNodes)
.build(),
left.totalLifespans,
left.recoveryEligible && right.recoveryEligible);
}
// right.subTreeUseful && !left.currentNodeCapable:
// It's not particularly helpful to do grouped execution on the right side
// because the benefit is likely cancelled out due to required buffering for hash build.
// In theory, it could still be helpful (e.g. when the underlying aggregation's intermediate group state maybe larger than aggregation output).
// However, this is not currently implemented. JoinBridgeManager need to support such a lifecycle.
// !right.currentNodeCapable:
// The build/right side needs to buffer fully for this JOIN, but the probe/left side will still stream through.
// As a result, there is no reason to change currentNodeCapable or subTreeUseful to false.
//
return left;
default:
throw new UnsupportedOperationException("Unknown distribution type: " + node.getDistributionType());
}
}
@Override
public GroupedExecutionTagger.GroupedExecutionProperties visitMergeJoin(MergeJoinNode node, Void context)
{
GroupedExecutionTagger.GroupedExecutionProperties left = node.getLeft().accept(this, null);
GroupedExecutionTagger.GroupedExecutionProperties right = node.getRight().accept(this, null);
if (groupedExecutionEnabled && left.currentNodeCapable && right.currentNodeCapable) {
checkState(left.totalLifespans == right.totalLifespans, format("Mismatched number of lifespans on left(%s) and right(%s) side of join", left.totalLifespans, right.totalLifespans));
return new GroupedExecutionTagger.GroupedExecutionProperties(
true,
true,
ImmutableList.<PlanNodeId>builder()
.addAll(left.capableTableScanNodes)
.addAll(right.capableTableScanNodes)
.build(),
left.totalLifespans,
left.recoveryEligible && right.recoveryEligible);
}
throw new PrestoException(
INVALID_PLAN_ERROR,
format("When grouped execution can't be enabled, merge join plan is not valid." +
"%s is currently set to %s; left node grouped execution capable is %s and " +
"right node grouped execution capable is %s.",
GROUPED_EXECUTION,
groupedExecutionEnabled,
left.currentNodeCapable,
right.currentNodeCapable));
}
@Override
public GroupedExecutionTagger.GroupedExecutionProperties visitAggregation(AggregationNode node, Void context)
{
GroupedExecutionTagger.GroupedExecutionProperties properties = node.getSource().accept(this, null);
if (groupedExecutionEnabled && properties.isCurrentNodeCapable()) {
switch (node.getStep()) {
case SINGLE:
case FINAL:
return new GroupedExecutionTagger.GroupedExecutionProperties(true, true, properties.capableTableScanNodes, properties.totalLifespans, properties.recoveryEligible);
case PARTIAL:
case INTERMEDIATE:
return properties;
}
}
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
}
@Override
public GroupedExecutionTagger.GroupedExecutionProperties visitWindow(WindowNode node, Void context)
{
return processWindowFunction(node);
}
@Override
public GroupedExecutionTagger.GroupedExecutionProperties visitRowNumber(RowNumberNode node, Void context)
{
return processWindowFunction(node);
}
@Override
public GroupedExecutionTagger.GroupedExecutionProperties visitTopNRowNumber(TopNRowNumberNode node, Void context)
{
return processWindowFunction(node);
}
private GroupedExecutionTagger.GroupedExecutionProperties processWindowFunction(PlanNode node)
{
GroupedExecutionTagger.GroupedExecutionProperties properties = getOnlyElement(node.getSources()).accept(this, null);
if (groupedExecutionEnabled && properties.isCurrentNodeCapable()) {
return new GroupedExecutionTagger.GroupedExecutionProperties(true, true, properties.capableTableScanNodes, properties.totalLifespans, properties.recoveryEligible);
}
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
}
@Override
public GroupedExecutionTagger.GroupedExecutionProperties visitMarkDistinct(MarkDistinctNode node, Void context)
{
GroupedExecutionTagger.GroupedExecutionProperties properties = getOnlyElement(node.getSources()).accept(this, null);
if (groupedExecutionEnabled && properties.isCurrentNodeCapable()) {
return new GroupedExecutionTagger.GroupedExecutionProperties(true, true, properties.capableTableScanNodes, properties.totalLifespans, properties.recoveryEligible);
}
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
}
@Override
public GroupedExecutionTagger.GroupedExecutionProperties visitTableWriter(TableWriterNode node, Void context)
{
GroupedExecutionTagger.GroupedExecutionProperties properties = node.getSource().accept(this, null);
boolean recoveryEligible = properties.isRecoveryEligible();
TableWriterNode.WriterTarget target = node.getTarget().orElseThrow(() -> new VerifyException("target is absent"));
if (target instanceof TableWriterNode.CreateName || target instanceof TableWriterNode.InsertReference || target instanceof TableWriterNode.RefreshMaterializedViewReference) {
recoveryEligible &= metadata.getConnectorCapabilities(session, target.getConnectorId()).contains(SUPPORTS_PAGE_SINK_COMMIT);
}
else {
recoveryEligible = false;
}
return new GroupedExecutionTagger.GroupedExecutionProperties(
properties.isCurrentNodeCapable(),
properties.isSubTreeUseful(),
properties.getCapableTableScanNodes(),
properties.getTotalLifespans(),
recoveryEligible);
}
@Override
public GroupedExecutionTagger.GroupedExecutionProperties visitTableScan(TableScanNode node, Void context)
{
Optional<TableLayout.TablePartitioning> tablePartitioning = metadata.getLayout(session, node.getTable()).getTablePartitioning();
if (!tablePartitioning.isPresent()) {
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
}
List<ConnectorPartitionHandle> partitionHandles = nodePartitioningManager.listPartitionHandles(session, tablePartitioning.get().getPartitioningHandle());
if (ImmutableList.of(NOT_PARTITIONED).equals(partitionHandles)) {
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
}
else {
return new GroupedExecutionTagger.GroupedExecutionProperties(
true,
false,
ImmutableList.of(node.getId()),
partitionHandles.size(),
metadata.getConnectorCapabilities(session, node.getTable().getConnectorId()).contains(SUPPORTS_REWINDABLE_SPLIT_SOURCE));
}
}
private GroupedExecutionTagger.GroupedExecutionProperties processChildren(PlanNode node)
{
// Each fragment has a partitioning handle, which is derived from leaf nodes in the fragment.
// Leaf nodes with different partitioning handle are not allowed to share a single fragment
// (except for special cases as detailed in addSourceDistribution).
// As a result, it is not necessary to check the compatibility between node.getSources because
// they are guaranteed to be compatible.
// * If any child is "not capable", return "not capable"
// * When all children are capable ("capable and useful" or "capable but not useful")
// * Usefulness:
// * if any child is "useful", this node is "useful"
// * if no children is "useful", this node is "not useful"
// * Recovery Eligibility:
// * if all children is "recovery eligible", this node is "recovery eligible"
// * if any child is "not recovery eligible", this node is "not recovery eligible"
boolean anyUseful = false;
OptionalInt totalLifespans = OptionalInt.empty();
boolean allRecoveryEligible = true;
ImmutableList.Builder<PlanNodeId> capableTableScanNodes = ImmutableList.builder();
for (PlanNode source : node.getSources()) {
GroupedExecutionTagger.GroupedExecutionProperties properties = source.accept(this, null);
if (!properties.isCurrentNodeCapable()) {
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
}
anyUseful |= properties.isSubTreeUseful();
allRecoveryEligible &= properties.isRecoveryEligible();
if (!totalLifespans.isPresent()) {
totalLifespans = OptionalInt.of(properties.totalLifespans);
}
else {
checkState(totalLifespans.getAsInt() == properties.totalLifespans, format("Mismatched number of lifespans among children nodes. Expected: %s, actual: %s", totalLifespans.getAsInt(), properties.totalLifespans));
}
capableTableScanNodes.addAll(properties.capableTableScanNodes);
}
return new GroupedExecutionTagger.GroupedExecutionProperties(true, anyUseful, capableTableScanNodes.build(), totalLifespans.getAsInt(), allRecoveryEligible);
}
static class GroupedExecutionProperties
{
// currentNodeCapable:
// Whether grouped execution is possible with the current node.
// For example, a table scan is capable iff it supports addressable split discovery.
// subTreeUseful:
// Whether grouped execution is beneficial in the current node, or any node below it.
// For example, a JOIN can benefit from grouped execution because build can be flushed early, reducing peak memory requirement.
//
// In the current implementation, subTreeUseful implies currentNodeCapable.
// In theory, this doesn't have to be the case. Take an example where a GROUP BY feeds into the build side of a JOIN.
// Even if JOIN cannot take advantage of grouped execution, it could still be beneficial to execute the GROUP BY with grouped execution
// (e.g. when the underlying aggregation's intermediate group state may be larger than aggregation output).
private final boolean currentNodeCapable;
private final boolean subTreeUseful;
private final List<PlanNodeId> capableTableScanNodes;
private final int totalLifespans;
private final boolean recoveryEligible;
public GroupedExecutionProperties(boolean currentNodeCapable, boolean subTreeUseful, List<PlanNodeId> capableTableScanNodes, int totalLifespans, boolean recoveryEligible)
{
this.currentNodeCapable = currentNodeCapable;
this.subTreeUseful = subTreeUseful;
this.capableTableScanNodes = ImmutableList.copyOf(requireNonNull(capableTableScanNodes, "capableTableScanNodes is null"));
this.totalLifespans = totalLifespans;
this.recoveryEligible = recoveryEligible;
// Verify that `subTreeUseful` implies `currentNodeCapable`
checkArgument(!subTreeUseful || currentNodeCapable);
// Verify that `recoveryEligible` implies `currentNodeCapable`
checkArgument(!recoveryEligible || currentNodeCapable);
checkArgument(currentNodeCapable == !capableTableScanNodes.isEmpty());
}
public static GroupedExecutionProperties notCapable()
{
return new GroupedExecutionProperties(false, false, ImmutableList.of(), 1, false);
}
public boolean isCurrentNodeCapable()
{
return currentNodeCapable;
}
public boolean isSubTreeUseful()
{
return subTreeUseful;
}
public List<PlanNodeId> getCapableTableScanNodes()
{
return capableTableScanNodes;
}
public int getTotalLifespans()
{
return totalLifespans;
}
public boolean isRecoveryEligible()
{
return recoveryEligible;
}
}
}