PartialResultQueryManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.execution;

import com.facebook.presto.execution.scheduler.PartialResultQueryTaskTracker;
import com.google.inject.Inject;

import javax.annotation.PreDestroy;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.airlift.concurrent.Threads.threadsNamed;
import static java.util.Comparator.comparing;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;

public class PartialResultQueryManager
{
    private final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<>();
    private final PriorityBlockingQueue<PartialResultQueryTaskTracker> queue;

    @Inject
    public PartialResultQueryManager()
    {
        this.queue = new PriorityBlockingQueue<>(1, comparing(PartialResultQueryTaskTracker::getMaxEndTime));
    }

    private void startExecutor()
    {
        // Start the executor if not already started
        if (executor.compareAndSet(null, newSingleThreadScheduledExecutor(threadsNamed("partial-result-query-manager-%s")))) {
            executor.get().scheduleWithFixedDelay(this::checkAndCancelTasks, 1, 1, TimeUnit.SECONDS);
        }
    }

    public void addQueryTaskTracker(PartialResultQueryTaskTracker queryTaskTracker)
    {
        startExecutor();
        queue.add(queryTaskTracker);
    }

    public void checkAndCancelTasks()
    {
        long currentTime = System.nanoTime();
        while (!queue.isEmpty() && currentTime >= queue.peek().getMaxEndTime()) {
            PartialResultQueryTaskTracker queryTracker = queue.poll();
            // Reached max task end time. Cancel pending tasks.
            queryTracker.cancelUnfinishedTasks();
        }
    }

    public int getQueueSize()
    {
        return queue.size();
    }

    @PreDestroy
    public void stop()
    {
        if (executor.get() != null) {
            executor.get().shutdownNow();
        }
    }
}