TestTaskExecutor.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.execution.executor;

import com.facebook.airlift.testing.TestingTicker;
import com.facebook.presto.execution.SplitRunner;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.version.EmbedVersion;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static com.facebook.airlift.testing.Assertions.assertGreaterThan;
import static com.facebook.airlift.testing.Assertions.assertLessThan;
import static com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking.QUERY_FAIR;
import static com.facebook.presto.execution.TaskManagerConfig.TaskPriorityTracking.TASK_FAIR;
import static com.facebook.presto.execution.executor.MultilevelSplitQueue.LEVEL_CONTRIBUTION_CAP;
import static com.facebook.presto.execution.executor.MultilevelSplitQueue.LEVEL_THRESHOLD_SECONDS;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestTaskExecutor
{
    @Test(invocationCount = 100)
    public void testTasksComplete()
            throws Exception
    {
        TestingTicker ticker = new TestingTicker();
        TaskExecutor taskExecutor = new TaskExecutor(4, 8, 3, 4, TASK_FAIR, ticker);
        taskExecutor.start();
        ticker.increment(20, MILLISECONDS);

        try {
            TaskId taskId = new TaskId("test", 0, 0, 0, 0);
            TaskHandle taskHandle = taskExecutor.addTask(taskId, () -> 0, 10, new Duration(1, MILLISECONDS), OptionalInt.empty());

            Phaser beginPhase = new Phaser();
            beginPhase.register();
            Phaser verificationComplete = new Phaser();
            verificationComplete.register();

            // add two jobs
            TestingJob driver1 = new TestingJob(ticker, new Phaser(1), beginPhase, verificationComplete, 10, 0);
            ListenableFuture<?> future1 = getOnlyElement(taskExecutor.enqueueSplits(taskHandle, true, ImmutableList.of(driver1)));
            TestingJob driver2 = new TestingJob(ticker, new Phaser(1), beginPhase, verificationComplete, 10, 0);
            ListenableFuture<?> future2 = getOnlyElement(taskExecutor.enqueueSplits(taskHandle, true, ImmutableList.of(driver2)));
            assertEquals(driver1.getCompletedPhases(), 0);
            assertEquals(driver2.getCompletedPhases(), 0);

            // verify worker have arrived but haven't processed yet
            beginPhase.arriveAndAwaitAdvance();
            assertEquals(driver1.getCompletedPhases(), 0);
            assertEquals(driver2.getCompletedPhases(), 0);
            ticker.increment(60, SECONDS);
            assertEquals(taskExecutor.getRunAwaySplitCount(), 0);
            ticker.increment(600, SECONDS);
            assertEquals(taskExecutor.getRunAwaySplitCount(), 2);
            verificationComplete.arriveAndAwaitAdvance();

            // advance one phase and verify
            beginPhase.arriveAndAwaitAdvance();
            assertEquals(driver1.getCompletedPhases(), 1);
            assertEquals(driver2.getCompletedPhases(), 1);

            verificationComplete.arriveAndAwaitAdvance();

            // add one more job
            TestingJob driver3 = new TestingJob(ticker, new Phaser(1), beginPhase, verificationComplete, 10, 0);
            ListenableFuture<?> future3 = getOnlyElement(taskExecutor.enqueueSplits(taskHandle, false, ImmutableList.of(driver3)));

            // advance one phase and verify
            beginPhase.arriveAndAwaitAdvance();
            assertEquals(driver1.getCompletedPhases(), 2);
            assertEquals(driver2.getCompletedPhases(), 2);
            assertEquals(driver3.getCompletedPhases(), 0);
            verificationComplete.arriveAndAwaitAdvance();

            // advance to the end of the first two task and verify
            beginPhase.arriveAndAwaitAdvance();
            for (int i = 0; i < 7; i++) {
                verificationComplete.arriveAndAwaitAdvance();
                beginPhase.arriveAndAwaitAdvance();
                assertEquals(beginPhase.getPhase(), verificationComplete.getPhase() + 1);
            }
            assertEquals(driver1.getCompletedPhases(), 10);
            assertEquals(driver2.getCompletedPhases(), 10);
            assertEquals(driver3.getCompletedPhases(), 8);
            future1.get(1, SECONDS);
            future2.get(1, SECONDS);
            verificationComplete.arriveAndAwaitAdvance();

            // advance two more times and verify
            beginPhase.arriveAndAwaitAdvance();
            verificationComplete.arriveAndAwaitAdvance();
            beginPhase.arriveAndAwaitAdvance();
            assertEquals(driver1.getCompletedPhases(), 10);
            assertEquals(driver2.getCompletedPhases(), 10);
            assertEquals(driver3.getCompletedPhases(), 10);
            future3.get(1, SECONDS);
            verificationComplete.arriveAndAwaitAdvance();

            assertEquals(driver1.getFirstPhase(), 0);
            assertEquals(driver2.getFirstPhase(), 0);
            assertEquals(driver3.getFirstPhase(), 2);

            assertEquals(driver1.getLastPhase(), 10);
            assertEquals(driver2.getLastPhase(), 10);
            assertEquals(driver3.getLastPhase(), 12);

            // no splits remaining
            ticker.increment(610, SECONDS);
            assertEquals(taskExecutor.getRunAwaySplitCount(), 0);
        }
        finally {
            taskExecutor.stop();
        }
    }

    @Test(invocationCount = 100)
    public void testQuantaFairness()
    {
        TestingTicker ticker = new TestingTicker();
        TaskExecutor taskExecutor = new TaskExecutor(1, 2, 3, 4, QUERY_FAIR, ticker);
        taskExecutor.start();
        ticker.increment(20, MILLISECONDS);

        try {
            TaskHandle shortQuantaTaskHandle = taskExecutor.addTask(new TaskId("short_quanta", 0, 0, 0, 0), () -> 0, 10, new Duration(1, MILLISECONDS), OptionalInt.empty());
            TaskHandle longQuantaTaskHandle = taskExecutor.addTask(new TaskId("long_quanta", 0, 0, 0, 0), () -> 0, 10, new Duration(1, MILLISECONDS), OptionalInt.empty());

            Phaser globalPhaser = new Phaser();

            TestingJob shortQuantaDriver = new TestingJob(ticker, new Phaser(), new Phaser(), globalPhaser, 10, 10);
            TestingJob longQuantaDriver = new TestingJob(ticker, new Phaser(), new Phaser(), globalPhaser, 10, 20);

            taskExecutor.enqueueSplits(shortQuantaTaskHandle, true, ImmutableList.of(shortQuantaDriver));
            taskExecutor.enqueueSplits(longQuantaTaskHandle, true, ImmutableList.of(longQuantaDriver));

            for (int i = 0; i < 11; i++) {
                globalPhaser.arriveAndAwaitAdvance();
            }

            assertTrue(shortQuantaDriver.getCompletedPhases() >= 7 && shortQuantaDriver.getCompletedPhases() <= 8);
            assertTrue(longQuantaDriver.getCompletedPhases() >= 3 && longQuantaDriver.getCompletedPhases() <= 4);

            globalPhaser.arriveAndDeregister();
        }
        finally {
            taskExecutor.stop();
        }
    }

    @Test(invocationCount = 100)
    public void testLevelMovement()
    {
        TestingTicker ticker = new TestingTicker();
        TaskExecutor taskExecutor = new TaskExecutor(2, 2, 3, 4, TASK_FAIR, ticker);
        taskExecutor.start();
        ticker.increment(20, MILLISECONDS);

        try {
            TaskHandle testTaskHandle = taskExecutor.addTask(new TaskId("test", 0, 0, 0, 0), () -> 0, 10, new Duration(1, MILLISECONDS), OptionalInt.empty());

            Phaser globalPhaser = new Phaser();
            globalPhaser.bulkRegister(3);

            int quantaTimeMills = 500;
            int phasesPerSecond = 1000 / quantaTimeMills;
            int totalPhases = LEVEL_THRESHOLD_SECONDS[LEVEL_THRESHOLD_SECONDS.length - 1] * phasesPerSecond;
            TestingJob driver1 = new TestingJob(ticker, globalPhaser, new Phaser(), new Phaser(), totalPhases, quantaTimeMills);
            TestingJob driver2 = new TestingJob(ticker, globalPhaser, new Phaser(), new Phaser(), totalPhases, quantaTimeMills);

            taskExecutor.enqueueSplits(testTaskHandle, true, ImmutableList.of(driver1, driver2));

            int completedPhases = 0;
            for (int i = 0; i < (LEVEL_THRESHOLD_SECONDS.length - 1); i++) {
                for (; (completedPhases / phasesPerSecond) < LEVEL_THRESHOLD_SECONDS[i + 1]; completedPhases++) {
                    globalPhaser.arriveAndAwaitAdvance();
                }

                assertEquals(testTaskHandle.getPriority().getLevel(), i + 1);
            }

            globalPhaser.arriveAndDeregister();
        }
        finally {
            taskExecutor.stop();
        }
    }

    @Test(invocationCount = 100)
    public void testLevelMultipliers()
            throws Exception
    {
        TestingTicker ticker = new TestingTicker();
        TaskExecutor taskExecutor = new TaskExecutor(1, 3, 3, 4, TASK_FAIR, new MultilevelSplitQueue(2), ticker);
        taskExecutor.start();
        ticker.increment(20, MILLISECONDS);

        try {
            for (int i = 0; i < (LEVEL_THRESHOLD_SECONDS.length - 1); i++) {
                TaskHandle[] taskHandles = {
                        taskExecutor.addTask(new TaskId("test1", 0, 0, 0, 0), () -> 0, 10, new Duration(1, MILLISECONDS), OptionalInt.empty()),
                        taskExecutor.addTask(new TaskId("test2", 0, 0, 0, 0), () -> 0, 10, new Duration(1, MILLISECONDS), OptionalInt.empty()),
                        taskExecutor.addTask(new TaskId("test3", 0, 0, 0, 0), () -> 0, 10, new Duration(1, MILLISECONDS), OptionalInt.empty())
                };

                // move task 0 to next level
                TestingJob task0Job = new TestingJob(ticker, new Phaser(1), new Phaser(), new Phaser(), 1, LEVEL_THRESHOLD_SECONDS[i + 1] * 1000);
                taskExecutor.enqueueSplits(
                        taskHandles[0],
                        true,
                        ImmutableList.of(task0Job));
                // move tasks 1 and 2 to this level
                TestingJob task1Job = new TestingJob(ticker, new Phaser(1), new Phaser(), new Phaser(), 1, LEVEL_THRESHOLD_SECONDS[i] * 1000);
                taskExecutor.enqueueSplits(
                        taskHandles[1],
                        true,
                        ImmutableList.of(task1Job));
                TestingJob task2Job = new TestingJob(ticker, new Phaser(1), new Phaser(), new Phaser(), 1, LEVEL_THRESHOLD_SECONDS[i] * 1000);
                taskExecutor.enqueueSplits(
                        taskHandles[2],
                        true,
                        ImmutableList.of(task2Job));

                task0Job.getCompletedFuture().get();
                task1Job.getCompletedFuture().get();
                task2Job.getCompletedFuture().get();

                // then, start new drivers for all tasks
                Phaser globalPhaser = new Phaser(2);
                int phasesForNextLevel = LEVEL_THRESHOLD_SECONDS[i + 1] - LEVEL_THRESHOLD_SECONDS[i];
                TestingJob[] drivers = new TestingJob[6];
                for (int j = 0; j < 6; j++) {
                    // shouldn't deregister the global phaser upon the completion of process
                    drivers[j] = new TestingJob(ticker, globalPhaser, new Phaser(), new Phaser(), phasesForNextLevel, 1000, false);
                }

                taskExecutor.enqueueSplits(taskHandles[0], true, ImmutableList.of(drivers[0], drivers[1]));
                taskExecutor.enqueueSplits(taskHandles[1], true, ImmutableList.of(drivers[2], drivers[3]));
                taskExecutor.enqueueSplits(taskHandles[2], true, ImmutableList.of(drivers[4], drivers[5]));

                // run all three drivers
                int lowerLevelStart = drivers[2].getCompletedPhases() + drivers[3].getCompletedPhases() + drivers[4].getCompletedPhases() + drivers[5].getCompletedPhases();
                int higherLevelStart = drivers[0].getCompletedPhases() + drivers[1].getCompletedPhases();
                while (Arrays.stream(drivers).noneMatch(TestingJob::isFinished)) {
                    globalPhaser.arriveAndAwaitAdvance();

                    int lowerLevelEnd = drivers[2].getCompletedPhases() + drivers[3].getCompletedPhases() + drivers[4].getCompletedPhases() + drivers[5].getCompletedPhases();
                    int lowerLevelTime = lowerLevelEnd - lowerLevelStart;
                    int higherLevelEnd = drivers[0].getCompletedPhases() + drivers[1].getCompletedPhases();
                    int higherLevelTime = higherLevelEnd - higherLevelStart;

                    if (higherLevelTime > 20) {
                        assertGreaterThan(lowerLevelTime, (higherLevelTime * 2) - 10);
                        assertLessThan(higherLevelTime, (lowerLevelTime * 2) + 10);
                    }
                }

                try {
                    globalPhaser.arriveAndDeregister();
                }
                catch (IllegalStateException e) {
                    // under high concurrency sometimes the deregister call can occur after completion
                    // this is not a real problem
                }
                taskExecutor.removeTask(taskHandles[0]);
                taskExecutor.removeTask(taskHandles[1]);
                taskExecutor.removeTask(taskHandles[2]);
            }
        }
        finally {
            taskExecutor.stop();
        }
    }

    @Test
    public void testTaskHandle()
    {
        TestingTicker ticker = new TestingTicker();
        TaskExecutor taskExecutor = new TaskExecutor(4, 8, 3, 4, QUERY_FAIR, ticker);
        taskExecutor.start();

        try {
            TaskId taskId = new TaskId("test", 0, 0, 0, 0);
            TaskHandle taskHandle = taskExecutor.addTask(taskId, () -> 0, 10, new Duration(1, MILLISECONDS), OptionalInt.empty());

            Phaser beginPhase = new Phaser();
            beginPhase.register();
            Phaser verificationComplete = new Phaser();
            verificationComplete.register();

            TestingJob driver1 = new TestingJob(ticker, new Phaser(), beginPhase, verificationComplete, 10, 0);
            TestingJob driver2 = new TestingJob(ticker, new Phaser(), beginPhase, verificationComplete, 10, 0);

            // force enqueue a split
            taskExecutor.enqueueSplits(taskHandle, true, ImmutableList.of(driver1));
            assertEquals(taskHandle.getRunningLeafSplits(), 0);

            // normal enqueue a split
            taskExecutor.enqueueSplits(taskHandle, false, ImmutableList.of(driver2));
            assertEquals(taskHandle.getRunningLeafSplits(), 1);

            // let the split continue to run
            beginPhase.arriveAndDeregister();
            verificationComplete.arriveAndDeregister();
        }
        finally {
            taskExecutor.stop();
        }
    }

    @Test
    public void testLevelContributionCap()
    {
        MultilevelSplitQueue splitQueue = new MultilevelSplitQueue(2);
        TaskHandle handle0 = new TaskHandle(new TaskId("test0", 0, 0, 0, 0), new TaskPriorityTracker(splitQueue), () -> 1, 1, new Duration(1, SECONDS), OptionalInt.empty());
        TaskHandle handle1 = new TaskHandle(new TaskId("test1", 0, 0, 0, 0), new TaskPriorityTracker(splitQueue), () -> 1, 1, new Duration(1, SECONDS), OptionalInt.empty());

        for (int i = 0; i < (LEVEL_THRESHOLD_SECONDS.length - 1); i++) {
            long levelAdvanceTime = SECONDS.toNanos(LEVEL_THRESHOLD_SECONDS[i + 1] - LEVEL_THRESHOLD_SECONDS[i]);
            handle0.addScheduledNanos(levelAdvanceTime);
            assertEquals(handle0.getPriority().getLevel(), i + 1);

            handle1.addScheduledNanos(levelAdvanceTime);
            assertEquals(handle1.getPriority().getLevel(), i + 1);

            assertEquals(splitQueue.getLevelScheduledTime(i), 2 * Math.min(levelAdvanceTime, LEVEL_CONTRIBUTION_CAP));
            assertEquals(splitQueue.getLevelScheduledTime(i + 1), 0);
        }
    }

    @Test
    public void testUpdateLevelWithCap()
    {
        MultilevelSplitQueue splitQueue = new MultilevelSplitQueue(2);
        TaskHandle handle0 = new TaskHandle(new TaskId("test0", 0, 0, 0, 0), new TaskPriorityTracker(splitQueue), () -> 1, 1, new Duration(1, SECONDS), OptionalInt.empty());

        long quantaNanos = MINUTES.toNanos(10);
        handle0.addScheduledNanos(quantaNanos);
        long cappedNanos = Math.min(quantaNanos, LEVEL_CONTRIBUTION_CAP);

        for (int i = 0; i < (LEVEL_THRESHOLD_SECONDS.length - 1); i++) {
            long thisLevelTime = Math.min(SECONDS.toNanos(LEVEL_THRESHOLD_SECONDS[i + 1] - LEVEL_THRESHOLD_SECONDS[i]), cappedNanos);
            assertEquals(splitQueue.getLevelScheduledTime(i), thisLevelTime);
            cappedNanos -= thisLevelTime;
        }
    }

    @Test(timeOut = 30_000)
    public void testMinMaxDriversPerTask()
    {
        int maxDriversPerTask = 2;
        MultilevelSplitQueue splitQueue = new MultilevelSplitQueue(2);
        TestingTicker ticker = new TestingTicker();
        TaskExecutor taskExecutor = new TaskExecutor(4, 16, 1, maxDriversPerTask, QUERY_FAIR, splitQueue, ticker);
        taskExecutor.start();
        try {
            TaskHandle testTaskHandle = taskExecutor.addTask(new TaskId("test", 0, 0, 0, 0), () -> 0, 10, new Duration(1, MILLISECONDS), OptionalInt.empty());

            // enqueue all batches of splits
            int batchCount = 4;
            TestingJob[] splits = new TestingJob[8];
            Phaser[] phasers = new Phaser[batchCount];
            for (int batch = 0; batch < batchCount; batch++) {
                phasers[batch] = new Phaser();
                phasers[batch].register();
                TestingJob split1 = new TestingJob(ticker, new Phaser(), new Phaser(), phasers[batch], 1, 0);
                TestingJob split2 = new TestingJob(ticker, new Phaser(), new Phaser(), phasers[batch], 1, 0);
                splits[2 * batch] = split1;
                splits[2 * batch + 1] = split2;
                taskExecutor.enqueueSplits(testTaskHandle, false, ImmutableList.of(split1, split2));
            }

            // assert that the splits are processed in batches as expected
            for (int batch = 0; batch < batchCount; batch++) {
                // wait until the current batch starts
                waitUntilSplitsStart(ImmutableList.of(splits[2 * batch], splits[2 * batch + 1]));
                // assert that only the splits including and up to the current batch are running and the rest haven't started yet
                assertSplitStates(2 * batch + 1, splits);
                // complete the current batch
                phasers[batch].arriveAndDeregister();
            }
        }
        finally {
            taskExecutor.stop();
        }
    }

    @Test(timeOut = 30_000)
    public void testUserSpecifiedMaxDriversPerTask()
    {
        MultilevelSplitQueue splitQueue = new MultilevelSplitQueue(2);
        TestingTicker ticker = new TestingTicker();
        // create a task executor with min/max drivers per task to be 2 and 4
        TaskExecutor taskExecutor = new TaskExecutor(4, 16, 2, 4, TASK_FAIR, splitQueue, ticker);
        taskExecutor.start();
        try {
            // overwrite the max drivers per task to be 1
            TaskHandle testTaskHandle = taskExecutor.addTask(new TaskId("test", 0, 0, 0, 0), () -> 0, 10, new Duration(1, MILLISECONDS), OptionalInt.of(1));

            // enqueue all batches of splits
            int batchCount = 4;
            TestingJob[] splits = new TestingJob[4];
            Phaser[] phasers = new Phaser[batchCount];
            for (int batch = 0; batch < batchCount; batch++) {
                phasers[batch] = new Phaser();
                phasers[batch].register();
                TestingJob split = new TestingJob(ticker, new Phaser(), new Phaser(), phasers[batch], 1, 0);
                splits[batch] = split;
                taskExecutor.enqueueSplits(testTaskHandle, false, ImmutableList.of(split));
            }

            // assert that the splits are processed in batches as expected
            for (int batch = 0; batch < batchCount; batch++) {
                // wait until the current batch starts
                waitUntilSplitsStart(ImmutableList.of(splits[batch]));
                // assert that only the splits including and up to the current batch are running and the rest haven't started yet
                assertSplitStates(batch, splits);
                // complete the current batch
                phasers[batch].arriveAndDeregister();
            }
        }
        finally {
            taskExecutor.stop();
        }
    }

    @Test
    public void testTaskExecutorRunawaySplitInterrupt()
            throws Exception
    {
        TaskExecutor taskExecutor = new TaskExecutor(
                8,
                16,
                3,
                4,
                TASK_FAIR,
                new Duration(1, SECONDS),
                elements -> elements.stream()
                        .anyMatch(element -> element.getFileName().equals("TestTaskExecutor.java")),
                new Duration(1, SECONDS),
                new EmbedVersion(new ServerConfig()),
                new MultilevelSplitQueue(2),
                Ticker.systemTicker());
        taskExecutor.start();

        try {
            TaskId taskId = new TaskId("foo", 0, 0, 0, 0);
            TaskHandle taskHandle = taskExecutor.addTask(
                    taskId, () -> 1.0,
                    1,
                    new Duration(1, TimeUnit.SECONDS),
                    OptionalInt.of(1));
            MockSplitRunner mockSplitRunner = new MockSplitRunner();
            taskExecutor.enqueueSplits(taskHandle, false, ImmutableList.of(mockSplitRunner));
            mockSplitRunner.interrupted.get(60, TimeUnit.SECONDS);
        }
        finally {
            taskExecutor.stop();
        }
    }

    private void assertSplitStates(int endIndex, TestingJob[] splits)
    {
        // assert that splits up to and including endIndex are all started
        for (int i = 0; i <= endIndex; i++) {
            assertTrue(splits[i].isStarted());
        }

        // assert that splits starting from endIndex haven't started yet
        for (int i = endIndex + 1; i < splits.length; i++) {
            assertFalse(splits[i].isStarted());
        }
    }

    private static void waitUntilSplitsStart(List<TestingJob> splits)
    {
        while (splits.stream().anyMatch(split -> !split.isStarted())) {
            try {
                Thread.sleep(200);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    private static class TestingJob
            implements SplitRunner
    {
        private final TestingTicker ticker;
        private final Phaser globalPhaser;
        private final Phaser beginQuantaPhaser;
        private final Phaser endQuantaPhaser;
        private final int requiredPhases;
        private final int quantaTimeMillis;
        private final boolean deregisterGlobalPhaser;
        private final AtomicInteger completedPhases = new AtomicInteger();

        private final AtomicInteger firstPhase = new AtomicInteger(-1);
        private final AtomicInteger lastPhase = new AtomicInteger(-1);

        private final AtomicBoolean started = new AtomicBoolean();
        private final SettableFuture<?> completed = SettableFuture.create();

        public TestingJob(TestingTicker ticker, Phaser globalPhaser, Phaser beginQuantaPhaser, Phaser endQuantaPhaser, int requiredPhases, int quantaTimeMillis)
        {
            this(ticker, globalPhaser, beginQuantaPhaser, endQuantaPhaser, requiredPhases, quantaTimeMillis, true);
        }

        public TestingJob(TestingTicker ticker, Phaser globalPhaser, Phaser beginQuantaPhaser, Phaser endQuantaPhaser, int requiredPhases, int quantaTimeMillis, boolean deregisterGlobalPhaser)
        {
            this.ticker = ticker;
            this.globalPhaser = globalPhaser;
            this.beginQuantaPhaser = beginQuantaPhaser;
            this.endQuantaPhaser = endQuantaPhaser;
            this.requiredPhases = requiredPhases;
            this.quantaTimeMillis = quantaTimeMillis;
            this.deregisterGlobalPhaser = deregisterGlobalPhaser;

            beginQuantaPhaser.register();
            endQuantaPhaser.register();

            if (globalPhaser.getRegisteredParties() == 0) {
                globalPhaser.register();
            }
        }

        private int getFirstPhase()
        {
            return firstPhase.get();
        }

        private int getLastPhase()
        {
            return lastPhase.get();
        }

        private int getCompletedPhases()
        {
            return completedPhases.get();
        }

        @Override
        public ListenableFuture<?> processFor(Duration duration)
        {
            started.set(true);
            ticker.increment(quantaTimeMillis, MILLISECONDS);
            globalPhaser.arriveAndAwaitAdvance();
            int phase = beginQuantaPhaser.arriveAndAwaitAdvance();
            firstPhase.compareAndSet(-1, phase - 1);
            lastPhase.set(phase);
            endQuantaPhaser.arriveAndAwaitAdvance();
            if (completedPhases.incrementAndGet() >= requiredPhases) {
                endQuantaPhaser.arriveAndDeregister();
                beginQuantaPhaser.arriveAndDeregister();
                if (deregisterGlobalPhaser) {
                    globalPhaser.arriveAndDeregister();
                }
                completed.set(null);
            }

            return Futures.immediateFuture(null);
        }

        @Override
        public String getInfo()
        {
            return "testing-split";
        }

        @Override
        public boolean isFinished()
        {
            return completed.isDone();
        }

        public boolean isStarted()
        {
            return started.get();
        }

        @Override
        public void close()
        {
        }

        public Future<?> getCompletedFuture()
        {
            return completed;
        }
    }

    private static class MockSplitRunner
            implements SplitRunner
    {
        private SettableFuture<Boolean> interrupted = SettableFuture.create();

        @Override
        public boolean isFinished()
        {
            return interrupted.isDone();
        }

        @Override
        public ListenableFuture<?> processFor(Duration duration)
        {
            while (true) {
                try {
                    Thread.sleep(1);
                }
                catch (InterruptedException e) {
                    break;
                }
            }
            interrupted.set(true);
            return Futures.immediateFuture(null);
        }

        @Override
        public String getInfo()
        {
            return "";
        }

        @Override
        public void close()
        {
        }
    }
}