LimitPushDown.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.optimizations;
import com.facebook.presto.Session;
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.MarkDistinctNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SemiJoinNode;
import com.facebook.presto.spi.plan.SortNode;
import com.facebook.presto.spi.plan.TopNNode;
import com.facebook.presto.spi.plan.UnionNode;
import com.facebook.presto.spi.plan.ValuesNode;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static com.facebook.presto.spi.plan.LimitNode.Step.FINAL;
import static com.facebook.presto.spi.plan.LimitNode.Step.PARTIAL;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
public class LimitPushDown
implements PlanOptimizer
{
@Override
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
requireNonNull(plan, "plan is null");
requireNonNull(session, "session is null");
requireNonNull(types, "types is null");
requireNonNull(variableAllocator, "variableAllocator is null");
requireNonNull(idAllocator, "idAllocator is null");
Rewriter rewriter = new Rewriter(idAllocator);
PlanNode rewrittenPlan = SimplePlanRewriter.rewriteWith(rewriter, plan, null);
return PlanOptimizerResult.optimizerResult(rewrittenPlan, rewriter.isPlanChanged());
}
private static class LimitContext
{
private final long count;
private final LimitNode.Step step;
public LimitContext(long count, LimitNode.Step step)
{
this.count = count;
this.step = step;
}
public long getCount()
{
return count;
}
public LimitNode.Step getStep()
{
return step;
}
@Override
public String toString()
{
return toStringHelper(this)
.add("count", count)
.add("step", step)
.toString();
}
}
private static class Rewriter
extends SimplePlanRewriter<LimitContext>
{
private final PlanNodeIdAllocator idAllocator;
private boolean planChanged;
private Rewriter(PlanNodeIdAllocator idAllocator)
{
this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
}
@Override
public PlanNode visitPlan(PlanNode node, RewriteContext<LimitContext> context)
{
PlanNode rewrittenNode = context.defaultRewrite(node);
LimitContext limit = context.get();
if (limit != null) {
// Drop in a LimitNode b/c we cannot push our limit down any further
rewrittenNode = new LimitNode(rewrittenNode.getSourceLocation(), idAllocator.getNextId(), rewrittenNode, limit.getCount(), limit.getStep());
planChanged = true;
}
return rewrittenNode;
}
@Override
public PlanNode visitLimit(LimitNode node, RewriteContext<LimitContext> context)
{
long count = node.getCount();
if (context.get() != null) {
count = Math.min(count, context.get().getCount());
}
// return empty ValuesNode in case of limit 0
if (count == 0) {
planChanged = true;
return new ValuesNode(
node.getSourceLocation(),
idAllocator.getNextId(),
node.getOutputVariables(),
ImmutableList.of(),
Optional.empty());
}
// default visitPlan logic will insert the limit node
return context.rewrite(node.getSource(), new LimitContext(count, FINAL));
}
@Override
@Deprecated
public PlanNode visitAggregation(AggregationNode node, RewriteContext<LimitContext> context)
{
LimitContext limit = context.get();
if (limit != null &&
node.getAggregations().isEmpty() &&
node.getOutputVariables().size() == node.getGroupingKeys().size() &&
node.getOutputVariables().containsAll(node.getGroupingKeys())) {
PlanNode rewrittenSource = context.rewrite(node.getSource());
planChanged = true;
return new DistinctLimitNode(node.getSourceLocation(), idAllocator.getNextId(), rewrittenSource, limit.getCount(), false, rewrittenSource.getOutputVariables(), Optional.empty(), 0);
}
PlanNode rewrittenNode = context.defaultRewrite(node);
if (limit != null) {
// Drop in a LimitNode b/c limits cannot be pushed through aggregations
planChanged = true;
rewrittenNode = new LimitNode(rewrittenNode.getSourceLocation(), idAllocator.getNextId(), rewrittenNode, limit.getCount(), limit.getStep());
}
return rewrittenNode;
}
@Override
public PlanNode visitMarkDistinct(MarkDistinctNode node, RewriteContext<LimitContext> context)
{
// the fallback logic (in visitPlan) for node types we don't know about introduces a limit node,
// so we need this here to push the limit through this trivial node type
return context.defaultRewrite(node, context.get());
}
@Override
public PlanNode visitProject(ProjectNode node, RewriteContext<LimitContext> context)
{
// the fallback logic (in visitPlan) for node types we don't know about introduces a limit node,
// so we need this here to push the limit through this trivial node type
return context.defaultRewrite(node, context.get());
}
@Override
public PlanNode visitTopN(TopNNode node, RewriteContext<LimitContext> context)
{
LimitContext limit = context.get();
PlanNode rewrittenSource = context.rewrite(node.getSource());
if (rewrittenSource == node.getSource() && limit == null) {
return node;
}
long count = node.getCount();
if (limit != null) {
planChanged = true;
count = Math.min(count, limit.getCount());
}
return new TopNNode(node.getSourceLocation(), node.getId(), rewrittenSource, count, node.getOrderingScheme(), node.getStep());
}
@Override
@Deprecated
public PlanNode visitSort(SortNode node, RewriteContext<LimitContext> context)
{
LimitContext limit = context.get();
PlanNode rewrittenSource = context.rewrite(node.getSource());
if (limit != null) {
planChanged = true;
return new TopNNode(node.getSourceLocation(), node.getId(), rewrittenSource, limit.getCount(), node.getOrderingScheme(), TopNNode.Step.SINGLE);
}
else if (rewrittenSource != node.getSource()) {
planChanged = true;
return new SortNode(node.getSourceLocation(), node.getId(), rewrittenSource, node.getOrderingScheme(), node.isPartial(), node.getPartitionBy());
}
return node;
}
@Override
public PlanNode visitUnion(UnionNode node, RewriteContext<LimitContext> context)
{
LimitContext limit = context.get();
LimitContext childLimit = null;
if (limit != null) {
childLimit = new LimitContext(limit.getCount(), PARTIAL);
}
List<PlanNode> sources = new ArrayList<>();
for (int i = 0; i < node.getSources().size(); i++) {
sources.add(context.rewrite(node.getSources().get(i), childLimit));
}
PlanNode output = new UnionNode(node.getSourceLocation(), node.getId(), sources, node.getOutputVariables(), node.getVariableMapping());
if (limit != null) {
planChanged = true;
output = new LimitNode(output.getSourceLocation(), idAllocator.getNextId(), output, limit.getCount(), limit.getStep());
}
return output;
}
@Override
public PlanNode visitSemiJoin(SemiJoinNode node, RewriteContext<LimitContext> context)
{
PlanNode source = context.rewrite(node.getSource(), context.get());
if (source != node.getSource()) {
planChanged = true;
return new SemiJoinNode(
node.getSourceLocation(),
node.getId(),
source,
node.getFilteringSource(),
node.getSourceJoinVariable(),
node.getFilteringSourceJoinVariable(),
node.getSemiJoinOutput(),
node.getSourceHashVariable(),
node.getFilteringSourceHashVariable(),
node.getDistributionType(),
node.getDynamicFilters());
}
return node;
}
public boolean isPlanChanged()
{
return planChanged;
}
}
}