ActualProperties.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.sql.planner.optimizations;

import com.facebook.presto.Session;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ConstantProperty;
import com.facebook.presto.spi.LocalProperty;
import com.facebook.presto.spi.plan.Partitioning;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.relation.ConstantExpression;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;

import javax.annotation.concurrent.Immutable;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

import static com.facebook.presto.sql.planner.SystemPartitioningHandle.COORDINATOR_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.optimizations.PartitioningUtils.areCompatiblePartitionings;
import static com.facebook.presto.sql.planner.optimizations.PartitioningUtils.isEffectivelySinglePartition;
import static com.facebook.presto.sql.planner.optimizations.PartitioningUtils.isPartitionedOn;
import static com.facebook.presto.sql.planner.optimizations.PartitioningUtils.isPartitionedOnExactly;
import static com.facebook.presto.sql.planner.optimizations.PartitioningUtils.isRepartitionEffective;
import static com.facebook.presto.sql.planner.optimizations.PartitioningUtils.translatePartitioningRowExpression;
import static com.facebook.presto.sql.planner.optimizations.PartitioningUtils.translateToCoalesce;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.transform;
import static java.util.Objects.requireNonNull;

public class ActualProperties
{
    private final Global global;
    private final List<LocalProperty<VariableReferenceExpression>> localProperties;
    private final Map<VariableReferenceExpression, ConstantExpression> constants;

    private ActualProperties(
            Global global,
            List<? extends LocalProperty<VariableReferenceExpression>> localProperties,
            Map<VariableReferenceExpression, ConstantExpression> constants)
    {
        requireNonNull(global, "globalProperties is null");
        requireNonNull(localProperties, "localProperties is null");
        requireNonNull(constants, "constants is null");

        this.global = global;

        // The constants field implies a ConstantProperty in localProperties (but not vice versa).
        // Let's make sure to include the constants into the local constant properties.
        Set<VariableReferenceExpression> localConstants = LocalProperties.extractLeadingConstants(localProperties);
        localProperties = LocalProperties.stripLeadingConstants(localProperties);

        Set<VariableReferenceExpression> updatedLocalConstants = ImmutableSet.<VariableReferenceExpression>builder()
                .addAll(localConstants)
                .addAll(constants.keySet())
                .build();

        List<LocalProperty<VariableReferenceExpression>> updatedLocalProperties = LocalProperties.normalizeAndPrune(ImmutableList.<LocalProperty<VariableReferenceExpression>>builder()
                .addAll(transform(updatedLocalConstants, ConstantProperty::new))
                .addAll(localProperties)
                .build());

        this.localProperties = ImmutableList.copyOf(updatedLocalProperties);
        this.constants = ImmutableMap.copyOf(constants);
    }

    public boolean isCoordinatorOnly()
    {
        return global.isCoordinatorOnly();
    }

    /**
     * @return true if the plan will only execute on a single node
     */
    public boolean isSingleNode()
    {
        return global.isSingleNode();
    }

    public boolean isNullsAndAnyReplicated()
    {
        return global.isNullsAndAnyReplicated();
    }

    public boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> columns, boolean exactly)
    {
        return isStreamPartitionedOn(columns, false, exactly);
    }

    public boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> columns, boolean nullsAndAnyReplicated, boolean exactly)
    {
        if (exactly) {
            return global.isStreamPartitionedOnExactly(columns, constants.keySet(), nullsAndAnyReplicated);
        }
        else {
            return global.isStreamPartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated);
        }
    }

    public boolean isNodePartitionedOn(Collection<VariableReferenceExpression> columns, boolean exactly)
    {
        return isNodePartitionedOn(columns, false, exactly);
    }

    public boolean isNodePartitionedOn(Collection<VariableReferenceExpression> columns, boolean nullsAndAnyReplicated, boolean exactly)
    {
        if (exactly) {
            return global.isNodePartitionedOnExactly(columns, constants.keySet(), nullsAndAnyReplicated);
        }
        else {
            return global.isNodePartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated);
        }
    }

    @Deprecated
    public boolean isCompatibleTablePartitioningWith(Partitioning partitioning, boolean nullsAndAnyReplicated, Metadata metadata, Session session)
    {
        return global.isCompatibleTablePartitioningWith(partitioning, nullsAndAnyReplicated, metadata, session);
    }

    @Deprecated
    public boolean isCompatibleTablePartitioningWith(ActualProperties other, Function<VariableReferenceExpression, Set<VariableReferenceExpression>> symbolMappings, Metadata metadata, Session session)
    {
        return global.isCompatibleTablePartitioningWith(
                other.global,
                symbolMappings,
                variable -> Optional.ofNullable(constants.get(variable)),
                variable -> Optional.ofNullable(other.constants.get(variable)),
                metadata,
                session);
    }

    public boolean isRefinedPartitioningOver(Partitioning partitioning, boolean nullsAndAnyReplicated, Metadata metadata, Session session)
    {
        return global.isRefinedPartitioningOver(partitioning, nullsAndAnyReplicated, metadata, session);
    }

    public boolean isRefinedPartitioningOver(ActualProperties other, Function<VariableReferenceExpression, Set<VariableReferenceExpression>> symbolMappings, Metadata metadata, Session session)
    {
        return global.isRefinedPartitioningOver(
                other.global,
                symbolMappings,
                variable -> Optional.ofNullable(constants.get(variable)),
                variable -> Optional.ofNullable(other.constants.get(variable)),
                metadata,
                session);
    }

    /**
     * @return true if all the data will effectively land in a single stream
     */
    public boolean isEffectivelySingleStream()
    {
        return global.isEffectivelySingleStream(constants.keySet());
    }

    /**
     * @return true if repartitioning on the keys will yield some difference
     */
    public boolean isStreamRepartitionEffective(Collection<VariableReferenceExpression> keys)
    {
        return global.isStreamRepartitionEffective(keys, constants.keySet());
    }

    public ActualProperties translateVariable(Function<VariableReferenceExpression, Optional<VariableReferenceExpression>> translator)
    {
        Map<VariableReferenceExpression, ConstantExpression> translatedConstants = new HashMap<>();
        for (Map.Entry<VariableReferenceExpression, ConstantExpression> entry : constants.entrySet()) {
            Optional<VariableReferenceExpression> translatedKey = translator.apply(entry.getKey());
            if (translatedKey.isPresent()) {
                translatedConstants.put(translatedKey.get(), entry.getValue());
            }
        }
        return builder()
                .global(global.translateVariableToRowExpression(variable -> {
                    Optional<RowExpression> translated = translator.apply(variable).map(RowExpression.class::cast);
                    if (!translated.isPresent()) {
                        translated = Optional.ofNullable(constants.get(variable));
                    }
                    return translated;
                }))
                .local(LocalProperties.translate(localProperties, translator))
                .constants(translatedConstants)
                .build();
    }

    public ActualProperties translateRowExpression(Map<VariableReferenceExpression, RowExpression> assignments)
    {
        Map<VariableReferenceExpression, VariableReferenceExpression> inputToOutputVariables = new HashMap<>();
        for (Map.Entry<VariableReferenceExpression, RowExpression> assignment : assignments.entrySet()) {
            RowExpression expression = assignment.getValue();
            if (expression instanceof VariableReferenceExpression) {
                inputToOutputVariables.put((VariableReferenceExpression) expression, assignment.getKey());
            }
        }

        Map<VariableReferenceExpression, ConstantExpression> translatedConstants = new HashMap<>();
        for (Map.Entry<VariableReferenceExpression, ConstantExpression> entry : constants.entrySet()) {
            if (inputToOutputVariables.containsKey(entry.getKey())) {
                translatedConstants.put(inputToOutputVariables.get(entry.getKey()), entry.getValue());
            }
        }

        ImmutableMap.Builder<VariableReferenceExpression, RowExpression> inputToOutputMappings = ImmutableMap.builder();
        inputToOutputMappings.putAll(inputToOutputVariables);
        constants.entrySet().stream()
                .filter(entry -> !inputToOutputVariables.containsKey(entry.getKey()))
                .forEach(inputToOutputMappings::put);
        return builder()
                .global(global.translateRowExpression(inputToOutputMappings.build(), assignments))
                .local(LocalProperties.translate(localProperties, variable -> Optional.ofNullable(inputToOutputVariables.get(variable))))
                .constants(translatedConstants)
                .build();
    }

    public Optional<Partitioning> getNodePartitioning()
    {
        return global.getNodePartitioning();
    }

    public Map<VariableReferenceExpression, ConstantExpression> getConstants()
    {
        return constants;
    }

    public List<LocalProperty<VariableReferenceExpression>> getLocalProperties()
    {
        return localProperties;
    }

    public ActualProperties withReplicatedNulls(boolean replicatedNulls)
    {
        return builderFrom(this)
                .global(global.withReplicatedNulls(replicatedNulls))
                .build();
    }

    public static Builder builder()
    {
        return new Builder();
    }

    public static Builder builderFrom(ActualProperties properties)
    {
        return new Builder(properties.global, properties.localProperties, properties.constants);
    }

    public static class Builder
    {
        private Global global;
        private List<LocalProperty<VariableReferenceExpression>> localProperties;
        private Map<VariableReferenceExpression, ConstantExpression> constants;
        private boolean unordered;

        public Builder()
        {
            this(Global.arbitraryPartition(), ImmutableList.of(), ImmutableMap.of());
        }

        public Builder(Global global, List<LocalProperty<VariableReferenceExpression>> localProperties, Map<VariableReferenceExpression, ConstantExpression> constants)
        {
            this.global = requireNonNull(global, "global is null");
            this.localProperties = ImmutableList.copyOf(localProperties);
            this.constants = ImmutableMap.copyOf(constants);
        }

        public Builder global(Global global)
        {
            this.global = global;
            return this;
        }

        public Builder global(ActualProperties other)
        {
            this.global = other.global;
            return this;
        }

        public Builder local(List<? extends LocalProperty<VariableReferenceExpression>> localProperties)
        {
            this.localProperties = ImmutableList.copyOf(localProperties);
            return this;
        }

        public Builder constants(Map<VariableReferenceExpression, ConstantExpression> constants)
        {
            this.constants = ImmutableMap.copyOf(constants);
            return this;
        }

        public Builder unordered(boolean unordered)
        {
            this.unordered = unordered;
            return this;
        }

        public ActualProperties build()
        {
            List<LocalProperty<VariableReferenceExpression>> localProperties = this.localProperties;
            if (unordered) {
                ImmutableList.Builder<LocalProperty<VariableReferenceExpression>> newLocalProperties = ImmutableList.builder();
                for (LocalProperty<VariableReferenceExpression> property : this.localProperties) {
                    if (!property.isOrderSensitive()) {
                        newLocalProperties.add(property);
                    }
                    else {
                        break;
                    }
                }
                localProperties = newLocalProperties.build();
            }
            return new ActualProperties(global, localProperties, constants);
        }
    }

    @Override
    public int hashCode()
    {
        return Objects.hash(global, localProperties, constants.keySet());
    }

    @Override
    public boolean equals(Object obj)
    {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        final ActualProperties other = (ActualProperties) obj;
        return Objects.equals(this.global, other.global)
                && Objects.equals(this.localProperties, other.localProperties)
                && Objects.equals(this.constants.keySet(), other.constants.keySet());
    }

    @Override
    public String toString()
    {
        return toStringHelper(this)
                .add("globalProperties", global)
                .add("localProperties", localProperties)
                .add("constants", constants)
                .toString();
    }

    @Immutable
    public static final class Global
    {
        // Description of the partitioning of the data across nodes
        private final Optional<Partitioning> nodePartitioning; // if missing => partitioned with some unknown scheme
        // Description of the partitioning of the data across streams (splits)
        private final Optional<Partitioning> streamPartitioning; // if missing => partitioned with some unknown scheme

        // NOTE: Partitioning on zero columns (or effectively zero columns if the columns are constant) indicates that all
        // the rows will be partitioned into a single node or stream. However, this can still be a partitioned plan in that the plan
        // will be executed on multiple servers, but only one server will get all the data.

        // Description of whether rows with nulls in partitioning columns or some arbitrary rows have been replicated to all *nodes*
        // When doing an IN query NULL in empty set is false, NULL in non-empty set is NULL. Say non-NULL element A (number 1) in
        // a set that is missing A ( say 2, 3) is false, but A in (2, 3, NULL) is NULL.
        // IN is equivalent to "a = b OR a = c OR a = d...).
        private final boolean nullsAndAnyReplicated;

        private Global(Optional<Partitioning> nodePartitioning, Optional<Partitioning> streamPartitioning, boolean nullsAndAnyReplicated)
        {
            checkArgument(!nodePartitioning.isPresent()
                            || !streamPartitioning.isPresent()
                            || nodePartitioning.get().getVariableReferences().containsAll(streamPartitioning.get().getVariableReferences())
                            || streamPartitioning.get().getVariableReferences().containsAll(nodePartitioning.get().getVariableReferences()),
                    "Global stream partitioning columns should match node partitioning columns");
            this.nodePartitioning = requireNonNull(nodePartitioning, "nodePartitioning is null");
            this.streamPartitioning = requireNonNull(streamPartitioning, "streamPartitioning is null");
            this.nullsAndAnyReplicated = nullsAndAnyReplicated;
        }

        public static Global coordinatorSingleStreamPartition()
        {
            return partitionedOn(
                    COORDINATOR_DISTRIBUTION,
                    ImmutableList.of(),
                    Optional.of(ImmutableList.of()));
        }

        public static Global singleStreamPartition()
        {
            return partitionedOn(
                    SINGLE_DISTRIBUTION,
                    ImmutableList.of(),
                    Optional.of(ImmutableList.of()));
        }

        public static Global arbitraryPartition()
        {
            return new Global(Optional.empty(), Optional.empty(), false);
        }

        public static <T extends RowExpression, U extends RowExpression> Global partitionedOn(
                PartitioningHandle nodePartitioningHandle,
                List<T> nodePartitioning,
                Optional<List<U>> streamPartitioning)
        {
            return new Global(
                    Optional.of(Partitioning.create(nodePartitioningHandle, nodePartitioning)),
                    streamPartitioning.map(columns -> Partitioning.create(SOURCE_DISTRIBUTION, columns)),
                    false);
        }

        public static Global partitionedOn(Partitioning nodePartitioning, Optional<Partitioning> streamPartitioning)
        {
            return new Global(
                    Optional.of(nodePartitioning),
                    streamPartitioning,
                    false);
        }

        public static Global streamPartitionedOn(List<VariableReferenceExpression> streamPartitioning)
        {
            return new Global(
                    Optional.empty(),
                    Optional.of(Partitioning.create(SOURCE_DISTRIBUTION, streamPartitioning)),
                    false);
        }

        public static Global partitionedOnCoalesce(Partitioning one, Partitioning other, Metadata metadata, Session session)
        {
            return new Global(translateToCoalesce(one, other, metadata, session), Optional.empty(), false);
        }

        public Global withReplicatedNulls(boolean replicatedNulls)
        {
            return new Global(nodePartitioning, streamPartitioning, replicatedNulls);
        }

        private boolean isNullsAndAnyReplicated()
        {
            return nullsAndAnyReplicated;
        }

        /**
         * @return true if the plan will only execute on a single node
         */
        private boolean isSingleNode()
        {
            if (!nodePartitioning.isPresent()) {
                return false;
            }

            return nodePartitioning.get().getHandle().isSingleNode();
        }

        private boolean isCoordinatorOnly()
        {
            if (!nodePartitioning.isPresent()) {
                return false;
            }

            return nodePartitioning.get().getHandle().isCoordinatorOnly();
        }

        private boolean isNodePartitionedOn(Collection<VariableReferenceExpression> columns, Set<VariableReferenceExpression> constants, boolean nullsAndAnyReplicated)
        {
            return nodePartitioning.isPresent() && isPartitionedOn(nodePartitioning.get(), columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
        }

        private boolean isNodePartitionedOnExactly(Collection<VariableReferenceExpression> columns, Set<VariableReferenceExpression> constants, boolean nullsAndAnyReplicated)
        {
            return nodePartitioning.isPresent() && isPartitionedOnExactly(nodePartitioning.get(), columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
        }

        private boolean isCompatibleTablePartitioningWith(Partitioning partitioning, boolean nullsAndAnyReplicated, Metadata metadata, Session session)
        {
            return nodePartitioning.isPresent() && areCompatiblePartitionings(nodePartitioning.get(), partitioning, metadata, session) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
        }

        private boolean isCompatibleTablePartitioningWith(
                Global other,
                Function<VariableReferenceExpression, Set<VariableReferenceExpression>> symbolMappings,
                Function<VariableReferenceExpression, Optional<ConstantExpression>> leftConstantMapping,
                Function<VariableReferenceExpression, Optional<ConstantExpression>> rightConstantMapping,
                Metadata metadata,
                Session session)
        {
            return nodePartitioning.isPresent() &&
                    other.nodePartitioning.isPresent() &&
                    areCompatiblePartitionings(
                            nodePartitioning.get(),
                            other.nodePartitioning.get(),
                            symbolMappings,
                            leftConstantMapping,
                            rightConstantMapping,
                            metadata,
                            session) &&
                    nullsAndAnyReplicated == other.nullsAndAnyReplicated;
        }

        private boolean isRefinedPartitioningOver(Partitioning partitioning, boolean nullsAndAnyReplicated, Metadata metadata, Session session)
        {
            return nodePartitioning.isPresent() && PartitioningUtils.isRefinedPartitioningOver(nodePartitioning.get(), partitioning, metadata, session) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
        }

        private boolean isRefinedPartitioningOver(
                Global other,
                Function<VariableReferenceExpression, Set<VariableReferenceExpression>> symbolMappings,
                Function<VariableReferenceExpression, Optional<ConstantExpression>> leftConstantMapping,
                Function<VariableReferenceExpression, Optional<ConstantExpression>> rightConstantMapping,
                Metadata metadata,
                Session session)
        {
            return nodePartitioning.isPresent() &&
                    other.nodePartitioning.isPresent() &&
                    PartitioningUtils.isRefinedPartitioningOver(
                            nodePartitioning.get(),
                            other.nodePartitioning.get(),
                            symbolMappings,
                            leftConstantMapping,
                            rightConstantMapping,
                            metadata,
                            session) &&
                    nullsAndAnyReplicated == other.nullsAndAnyReplicated;
        }

        private Optional<Partitioning> getNodePartitioning()
        {
            return nodePartitioning;
        }

        private boolean isStreamPartitionedOn(Collection<VariableReferenceExpression> columns, Set<VariableReferenceExpression> constants, boolean nullsAndAnyReplicated)
        {
            return streamPartitioning.isPresent() && isPartitionedOn(streamPartitioning.get(), columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
        }

        private boolean isStreamPartitionedOnExactly(Collection<VariableReferenceExpression> columns, Set<VariableReferenceExpression> constants, boolean nullsAndAnyReplicated)
        {
            return streamPartitioning.isPresent() && isPartitionedOnExactly(streamPartitioning.get(), columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated;
        }

        /**
         * @return true if all the data will effectively land in a single stream
         */
        private boolean isEffectivelySingleStream(Set<VariableReferenceExpression> constants)
        {
            return streamPartitioning.isPresent() && isEffectivelySinglePartition(streamPartitioning.get(), constants) && !nullsAndAnyReplicated;
        }

        /**
         * @return true if repartitioning on the keys will yield some difference
         */
        private boolean isStreamRepartitionEffective(Collection<VariableReferenceExpression> keys, Set<VariableReferenceExpression> constants)
        {
            return (!streamPartitioning.isPresent() || isRepartitionEffective(streamPartitioning.get(), keys, constants)) && !nullsAndAnyReplicated;
        }

        private Global translateVariableToRowExpression(
                Function<VariableReferenceExpression, Optional<RowExpression>> translator)
        {
            return new Global(
                    nodePartitioning.flatMap(partitioning -> PartitioningUtils.translateVariableToRowExpression(partitioning, translator)),
                    streamPartitioning.flatMap(partitioning -> PartitioningUtils.translateVariableToRowExpression(partitioning, translator)),
                    nullsAndAnyReplicated);
        }

        private Global translateRowExpression(Map<VariableReferenceExpression, RowExpression> inputToOutputMappings, Map<VariableReferenceExpression, RowExpression> assignments)
        {
            return new Global(
                    nodePartitioning.flatMap(partitioning -> translatePartitioningRowExpression(partitioning, inputToOutputMappings, assignments)),
                    streamPartitioning.flatMap(partitioning -> translatePartitioningRowExpression(partitioning, inputToOutputMappings, assignments)),
                    nullsAndAnyReplicated);
        }

        @Override
        public int hashCode()
        {
            return Objects.hash(nodePartitioning, streamPartitioning, nullsAndAnyReplicated);
        }

        @Override
        public boolean equals(Object obj)
        {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            final Global other = (Global) obj;
            return Objects.equals(this.nodePartitioning, other.nodePartitioning) &&
                    Objects.equals(this.streamPartitioning, other.streamPartitioning) &&
                    this.nullsAndAnyReplicated == other.nullsAndAnyReplicated;
        }

        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("nodePartitioning", nodePartitioning)
                    .add("streamPartitioning", streamPartitioning)
                    .add("nullsAndAnyReplicated", nullsAndAnyReplicated)
                    .toString();
        }
    }
}