SpatialJoinNode.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.spi.plan;

import com.facebook.presto.spi.SourceLocation;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.concurrent.Immutable;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.common.Utils.checkArgument;
import static java.util.Collections.unmodifiableList;
import static java.util.Objects.requireNonNull;

@Immutable
public class SpatialJoinNode
        extends PlanNode
{
    public enum Type
    {
        INNER("SpatialInnerJoin"),
        LEFT("SpatialLeftJoin");

        private final String joinLabel;

        Type(String joinLabel)
        {
            this.joinLabel = joinLabel;
        }

        public String getJoinLabel()
        {
            return joinLabel;
        }

        public static Type fromJoinNodeType(JoinType joinNodeType)
        {
            switch (joinNodeType) {
                case INNER:
                    return Type.INNER;
                case LEFT:
                    return Type.LEFT;
                default:
                    throw new IllegalArgumentException("Unsupported spatial join type: " + joinNodeType);
            }
        }
    }

    private final Type type;
    private final PlanNode left;
    private final PlanNode right;
    private final List<VariableReferenceExpression> outputVariables;
    private final RowExpression filter;
    private final Optional<VariableReferenceExpression> leftPartitionVariable;
    private final Optional<VariableReferenceExpression> rightPartitionVariable;
    private final Optional<String> kdbTree;
    private final DistributionType distributionType;

    public enum DistributionType
    {
        PARTITIONED,
        REPLICATED
    }

    @JsonCreator
    public SpatialJoinNode(
            Optional<SourceLocation> sourceLocation,
            @JsonProperty("id") PlanNodeId id,
            @JsonProperty("type") Type type,
            @JsonProperty("left") PlanNode left,
            @JsonProperty("right") PlanNode right,
            @JsonProperty("outputVariables") List<VariableReferenceExpression> outputVariables,
            @JsonProperty("filter") RowExpression filter,
            @JsonProperty("leftPartitionVariable") Optional<VariableReferenceExpression> leftPartitionVariable,
            @JsonProperty("rightPartitionVariable") Optional<VariableReferenceExpression> rightPartitionVariable,
            @JsonProperty("kdbTree") Optional<String> kdbTree)
    {
        this(sourceLocation, id, Optional.empty(), type, left, right, outputVariables, filter, leftPartitionVariable, rightPartitionVariable, kdbTree);
    }

    public SpatialJoinNode(
            Optional<SourceLocation> sourceLocation,
            PlanNodeId id,
            Optional<PlanNode> statsEquivalentPlanNode,
            Type type,
            PlanNode left,
            PlanNode right,
            List<VariableReferenceExpression> outputVariables,
            RowExpression filter,
            Optional<VariableReferenceExpression> leftPartitionVariable,
            Optional<VariableReferenceExpression> rightPartitionVariable,
            Optional<String> kdbTree)
    {
        super(sourceLocation, id, statsEquivalentPlanNode);

        this.type = requireNonNull(type, "type is null");
        this.left = requireNonNull(left, "left is null");
        this.right = requireNonNull(right, "right is null");
        this.outputVariables = unmodifiableList(new ArrayList<>(requireNonNull(outputVariables, "outputVariables is null")));
        this.filter = requireNonNull(filter, "filter is null");
        this.leftPartitionVariable = requireNonNull(leftPartitionVariable, "leftPartitionVariable is null");
        this.rightPartitionVariable = requireNonNull(rightPartitionVariable, "rightPartitionVariable is null");
        this.kdbTree = requireNonNull(kdbTree, "kdbTree is null");

        Set<VariableReferenceExpression> inputSymbols = new LinkedHashSet<>();
        inputSymbols.addAll(left.getOutputVariables());
        inputSymbols.addAll(right.getOutputVariables());
        checkArgument(inputSymbols.containsAll(outputVariables), "Left and right join inputs do not contain all output variables");

        if (kdbTree.isPresent()) {
            checkArgument(leftPartitionVariable.isPresent(), "Left partition variable is missing");
            checkArgument(rightPartitionVariable.isPresent(), "Right partition variable is missing");
            checkArgument(left.getOutputVariables().contains(leftPartitionVariable.get()), "Left join input does not contain left partition variable");
            checkArgument(right.getOutputVariables().contains(rightPartitionVariable.get()), "Right join input does not contain right partition variable");
            this.distributionType = DistributionType.PARTITIONED;
        }
        else {
            checkArgument(!leftPartitionVariable.isPresent(), "KDB tree is missing");
            checkArgument(!rightPartitionVariable.isPresent(), "KDB tree is missing");
            this.distributionType = DistributionType.REPLICATED;
        }
    }

    @JsonProperty
    public Type getType()
    {
        return type;
    }

    @JsonProperty
    public PlanNode getLeft()
    {
        return left;
    }

    @JsonProperty
    public PlanNode getRight()
    {
        return right;
    }

    @JsonProperty
    public RowExpression getFilter()
    {
        return filter;
    }

    @JsonProperty
    public Optional<VariableReferenceExpression> getLeftPartitionVariable()
    {
        return leftPartitionVariable;
    }

    @JsonProperty
    public Optional<VariableReferenceExpression> getRightPartitionVariable()
    {
        return rightPartitionVariable;
    }

    @Override
    public List<PlanNode> getSources()
    {
        List<PlanNode> sources = new ArrayList<>();
        sources.add(left);
        sources.add(right);
        return unmodifiableList(sources);
    }

    @Override
    @JsonProperty
    public List<VariableReferenceExpression> getOutputVariables()
    {
        return outputVariables;
    }

    @JsonProperty
    public DistributionType getDistributionType()
    {
        return distributionType;
    }

    @JsonProperty
    public Optional<String> getKdbTree()
    {
        return kdbTree;
    }

    @Override
    public <R, C> R accept(PlanVisitor<R, C> visitor, C context)
    {
        return visitor.visitSpatialJoin(this, context);
    }

    @Override
    public PlanNode replaceChildren(List<PlanNode> newChildren)
    {
        checkArgument(newChildren.size() == 2, "expected newChildren to contain 2 nodes");
        return new SpatialJoinNode(getSourceLocation(), getId(), getStatsEquivalentPlanNode(), type, newChildren.get(0), newChildren.get(1), outputVariables, filter, leftPartitionVariable, rightPartitionVariable, kdbTree);
    }

    @Override
    public PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalentPlanNode)
    {
        return new SpatialJoinNode(getSourceLocation(), getId(), statsEquivalentPlanNode, type, left, right, outputVariables, filter, leftPartitionVariable, rightPartitionVariable, kdbTree);
    }
}