LocalDynamicFilter.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.sql.planner;

import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.expressions.DynamicFilters.DynamicFilterExtractResult;
import com.facebook.presto.expressions.DynamicFilters.DynamicFilterPlaceholder;
import com.facebook.presto.spi.plan.AbstractJoinNode;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;

import static com.facebook.presto.expressions.DynamicFilters.extractDynamicFilters;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;

public class LocalDynamicFilter
{
    // Mapping from dynamic filter ID to its probe variables.
    private final Multimap<String, DynamicFilterPlaceholder> probeVariables;

    // Mapping from dynamic filter ID to its build channel indices.
    private final Map<String, Integer> buildChannels;

    private final SettableFuture<TupleDomain<VariableReferenceExpression>> resultFuture;

    // Number of build-side partitions to be collected.
    private final int partitionCount;

    // The resulting predicates from each build-side partition.
    private final List<TupleDomain<String>> partitions;

    public LocalDynamicFilter(Multimap<String, DynamicFilterPlaceholder> probeVariables, Map<String, Integer> buildChannels, int partitionCount)
    {
        this.probeVariables = requireNonNull(probeVariables, "probeVariables is null");
        this.buildChannels = requireNonNull(buildChannels, "buildChannels is null");
        verify(probeVariables.keySet().equals(buildChannels.keySet()), "probeVariables and buildChannels must have same keys");

        this.resultFuture = SettableFuture.create();

        this.partitionCount = partitionCount;
        this.partitions = new ArrayList<>(partitionCount);
    }

    private synchronized void addPartition(TupleDomain<String> tupleDomain)
    {
        // Called concurrently by each DynamicFilterSourceOperator instance (when collection is over).
        verify(partitions.size() < partitionCount);
        // NOTE: may result in a bit more relaxed constraint if there are multiple columns and multiple rows.
        // See the comment at TupleDomain::columnWiseUnion() for more details.
        partitions.add(tupleDomain);
        if (partitions.size() == partitionCount) {
            // No more partitions are left to be processed.
            TupleDomain<VariableReferenceExpression> result = convertTupleDomain(TupleDomain.columnWiseUnion(partitions));
            verify(resultFuture.set(result), "dynamic filter result is provided more than once");
        }
    }

    private TupleDomain<VariableReferenceExpression> convertTupleDomain(TupleDomain<String> result)
    {
        if (result.isNone()) {
            return TupleDomain.none();
        }
        // Convert the predicate to use probe variables (instead dynamic filter IDs).
        // Note that in case of a probe-side union, a single dynamic filter may match multiple probe variables.
        ImmutableMap.Builder<VariableReferenceExpression, Domain> builder = ImmutableMap.builder();
        for (Map.Entry<String, Domain> entry : result.getDomains().get().entrySet()) {
            Domain domain = entry.getValue();
            // Store all matching variables for each build channel index.
            for (DynamicFilterPlaceholder placeholder : probeVariables.get(entry.getKey())) {
                Domain updatedDomain = placeholder.applyComparison(domain);
                builder.put((VariableReferenceExpression) placeholder.getInput(), updatedDomain);
            }
        }
        return TupleDomain.withColumnDomains(builder.build());
    }

    public static Optional<LocalDynamicFilter> create(AbstractJoinNode planNode, int partitionCount)
    {
        Set<String> joinDynamicFilters = planNode.getDynamicFilters().keySet();
        List<FilterNode> filterNodes = PlanNodeSearcher
                .searchFrom(planNode.getProbe())
                .where(PlannerUtils::isFilterAboveTableScan)
                .findAll();

        // Mapping from probe-side dynamic filters' IDs to their matching probe variables.
        ImmutableMultimap.Builder<String, DynamicFilterPlaceholder> probeVariablesBuilder = ImmutableMultimap.builder();
        for (FilterNode filterNode : filterNodes) {
            DynamicFilterExtractResult extractResult = extractDynamicFilters(filterNode.getPredicate());
            for (DynamicFilterPlaceholder placeholder : extractResult.getDynamicConjuncts()) {
                if (placeholder.getInput() instanceof VariableReferenceExpression) {
                    // Add descriptors that match the local dynamic filter (from the current join node).
                    if (joinDynamicFilters.contains(placeholder.getId())) {
                        probeVariablesBuilder.put(placeholder.getId(), placeholder);
                    }
                }
            }
        }

        Multimap<String, DynamicFilterPlaceholder> probeVariables = probeVariablesBuilder.build();
        PlanNode buildNode = planNode.getBuild();
        Map<String, Integer> buildChannels = planNode.getDynamicFilters().entrySet().stream()
                // Skip build channels that don't match local probe dynamic filters.
                .filter(entry -> probeVariables.containsKey(entry.getKey()))
                .collect(toMap(
                        // Dynamic filter ID
                        Map.Entry::getKey,
                        // Build-side channel index
                        entry -> {
                            VariableReferenceExpression buildVariable = entry.getValue();
                            int buildChannelIndex = buildNode.getOutputVariables().indexOf(buildVariable);
                            verify(buildChannelIndex >= 0);
                            return buildChannelIndex;
                        }));

        if (buildChannels.isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(new LocalDynamicFilter(probeVariables, buildChannels, partitionCount));
    }

    public Map<String, Integer> getBuildChannels()
    {
        return buildChannels;
    }

    public ListenableFuture<TupleDomain<VariableReferenceExpression>> getResultFuture()
    {
        return resultFuture;
    }

    public Consumer<TupleDomain<String>> getTupleDomainConsumer()
    {
        return this::addPartition;
    }

    @Override
    public String toString()
    {
        return toStringHelper(this)
                .add("probeVariables", probeVariables)
                .add("buildChannels", buildChannels)
                .add("partitionCount", partitionCount)
                .add("partitions", partitions)
                .toString();
    }
}