StageLinkage.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.StageExecutionState;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.plan.PlanFragmentId;
import com.google.common.primitives.Ints;

import java.util.List;
import java.util.Set;

import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;

public class StageLinkage
{
    private final PlanFragmentId currentStageFragmentId;
    private final ExchangeLocationsConsumer parent;
    private final Set<OutputBufferManager> childOutputBufferManagers;

    public StageLinkage(PlanFragmentId fragmentId, ExchangeLocationsConsumer parent, Set<SqlStageExecution> children)
    {
        this.currentStageFragmentId = fragmentId;
        this.parent = parent;
        this.childOutputBufferManagers = children.stream()
                .map(childStage -> {
                    PartitioningHandle partitioningHandle = childStage.getFragment().getPartitioningScheme().getPartitioning().getHandle();
                    if (partitioningHandle.equals(FIXED_BROADCAST_DISTRIBUTION)) {
                        return new BroadcastOutputBufferManager(childStage::setOutputBuffers);
                    }
                    else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
                        return new ScaledOutputBufferManager(childStage::setOutputBuffers);
                    }
                    else {
                        int partitionCount = Ints.max(childStage.getFragment().getPartitioningScheme().getBucketToPartition().get()) + 1;
                        return new PartitionedOutputBufferManager(partitioningHandle, partitionCount, childStage::setOutputBuffers);
                    }
                })
                .collect(toImmutableSet());
    }

    public void processScheduleResults(StageExecutionState newState, Set<RemoteTask> newTasks)
    {
        boolean noMoreTasks = false;
        switch (newState) {
            case PLANNED:
            case SCHEDULING:
                // workers are still being added to the query
                break;
            case FINISHED_TASK_SCHEDULING:
            case SCHEDULING_SPLITS:
            case SCHEDULED:
            case RUNNING:
            case FINISHED:
            case CANCELED:
                // no more workers will be added to the query
                noMoreTasks = true;
            case ABORTED:
            case FAILED:
                // DO NOT complete a FAILED or ABORTED stage.  This will cause the
                // stage above to finish normally, which will result in a query
                // completing successfully when it should fail..
                break;
        }

        // Add an exchange location to the parent stage for each new task
        parent.addExchangeLocations(currentStageFragmentId, newTasks, noMoreTasks);

        if (!childOutputBufferManagers.isEmpty()) {
            // Add an output buffer to the child stages for each new task
            List<OutputBuffers.OutputBufferId> newOutputBuffers = newTasks.stream()
                    .map(task -> new OutputBuffers.OutputBufferId(task.getTaskId().getId()))
                    .collect(toImmutableList());
            for (OutputBufferManager child : childOutputBufferManagers) {
                child.addOutputBuffers(newOutputBuffers, noMoreTasks);
            }
        }
    }
}