TestBufferingSplitSource.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.split;

import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.split.SplitSource.SplitBatch;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.testng.annotations.Test;

import java.util.concurrent.Future;

import static com.facebook.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static com.facebook.airlift.testing.Assertions.assertContains;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static com.facebook.presto.split.MockSplitSource.Action.FAIL;
import static com.facebook.presto.split.MockSplitSource.Action.FINISH;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TestBufferingSplitSource
{
    @Test
    public void testSlowSource()
    {
        MockSplitSource mockSource = new MockSplitSource()
                .setBatchSize(1)
                .increaseAvailableSplits(25)
                .atSplitCompletion(FINISH);
        try (SplitSource source = new BufferingSplitSource(mockSource, 10)) {
            requireFutureValue(getNextBatch(source, 20))
                    .assertSize(10)
                    .assertNoMoreSplits(false);
            requireFutureValue(getNextBatch(source, 6))
                    .assertSize(6)
                    .assertNoMoreSplits(false);
            requireFutureValue(getNextBatch(source, 20))
                    .assertSize(9)
                    .assertNoMoreSplits(true);
            assertTrue(source.isFinished());
            assertEquals(mockSource.getNextBatchInvocationCount(), 25);
        }
    }

    @Test
    public void testFastSource()
    {
        MockSplitSource mockSource = new MockSplitSource()
                .setBatchSize(11)
                .increaseAvailableSplits(22)
                .atSplitCompletion(FINISH);
        try (SplitSource source = new BufferingSplitSource(mockSource, 10)) {
            requireFutureValue(getNextBatch(source, 200))
                    .assertSize(11)
                    .assertNoMoreSplits(false);
            requireFutureValue(getNextBatch(source, 200))
                    .assertSize(11)
                    .assertNoMoreSplits(true);
            assertTrue(source.isFinished());
            assertEquals(mockSource.getNextBatchInvocationCount(), 2);
        }
    }

    @Test
    public void testEmptySource()
    {
        MockSplitSource mockSource = new MockSplitSource()
                .setBatchSize(1)
                .atSplitCompletion(FINISH);
        try (SplitSource source = new BufferingSplitSource(mockSource, 100)) {
            requireFutureValue(getNextBatch(source, 200))
                    .assertSize(0)
                    .assertNoMoreSplits(true);
            assertTrue(source.isFinished());
            assertEquals(mockSource.getNextBatchInvocationCount(), 1);
        }
    }

    @Test
    public void testBlocked()
    {
        MockSplitSource mockSource = new MockSplitSource()
                .setBatchSize(1);
        try (SplitSource source = new BufferingSplitSource(mockSource, 10)) {
            // Source has 0 out of 10 needed.
            ListenableFuture<NextBatchResult> nextBatchFuture = getNextBatch(source, 10);
            assertFalse(nextBatchFuture.isDone());
            mockSource.increaseAvailableSplits(9);
            assertFalse(nextBatchFuture.isDone());
            mockSource.increaseAvailableSplits(1);
            requireFutureValue(nextBatchFuture)
                    .assertSize(10)
                    .assertNoMoreSplits(false);

            // Source is completed after getNextBatch invocation.
            nextBatchFuture = getNextBatch(source, 10);
            assertFalse(nextBatchFuture.isDone());
            mockSource.atSplitCompletion(FINISH);
            requireFutureValue(nextBatchFuture)
                    .assertSize(0)
                    .assertNoMoreSplits(true);
            assertTrue(source.isFinished());
        }

        mockSource = new MockSplitSource()
                .setBatchSize(1);
        try (SplitSource source = new BufferingSplitSource(mockSource, 10)) {
            // Source has 1 out of 10 needed.
            mockSource.increaseAvailableSplits(1);
            ListenableFuture<NextBatchResult> nextBatchFuture = getNextBatch(source, 10);
            assertFalse(nextBatchFuture.isDone());
            mockSource.increaseAvailableSplits(9);
            requireFutureValue(nextBatchFuture)
                    .assertSize(10)
                    .assertNoMoreSplits(false);

            // Source is completed with 5 last splits after getNextBatch invocation.
            nextBatchFuture = getNextBatch(source, 10);
            mockSource.increaseAvailableSplits(5);
            assertFalse(nextBatchFuture.isDone());
            mockSource.atSplitCompletion(FINISH);
            requireFutureValue(nextBatchFuture)
                    .assertSize(5)
                    .assertNoMoreSplits(true);
            assertTrue(source.isFinished());
        }

        mockSource = new MockSplitSource()
                .setBatchSize(1);
        try (SplitSource source = new BufferingSplitSource(mockSource, 10)) {
            // Source has 9 out of 10 needed.
            mockSource.increaseAvailableSplits(9);
            ListenableFuture<NextBatchResult> nextBatchFuture = getNextBatch(source, 10);
            assertFalse(nextBatchFuture.isDone());
            mockSource.increaseAvailableSplits(1);
            requireFutureValue(nextBatchFuture)
                    .assertSize(10)
                    .assertNoMoreSplits(false);

            // Source failed after getNextBatch invocation.
            nextBatchFuture = getNextBatch(source, 10);
            mockSource.increaseAvailableSplits(5);
            assertFalse(nextBatchFuture.isDone());
            mockSource.atSplitCompletion(FAIL);
            assertFutureFailsWithMockFailure(nextBatchFuture);
            assertFalse(source.isFinished());
        }

        // Fast source: source produce 8 before, and 8 after invocation. BufferedSource should return all 16 at once.
        mockSource = new MockSplitSource()
                .setBatchSize(8);
        try (SplitSource source = new BufferingSplitSource(mockSource, 10)) {
            mockSource.increaseAvailableSplits(8);
            ListenableFuture<NextBatchResult> nextBatchFuture = getNextBatch(source, 20);
            assertFalse(nextBatchFuture.isDone());
            mockSource.increaseAvailableSplits(8);
            requireFutureValue(nextBatchFuture)
                    .assertSize(16)
                    .assertNoMoreSplits(false);
        }
    }

    @Test
    public void testFinishedSetWithoutIndicationFromSplitBatch()
    {
        MockSplitSource mockSource = new MockSplitSource()
                .setBatchSize(1)
                .increaseAvailableSplits(1);
        try (SplitSource source = new BufferingSplitSource(mockSource, 100)) {
            requireFutureValue(getNextBatch(source, 1))
                    .assertSize(1)
                    .assertNoMoreSplits(false);
            assertFalse(source.isFinished());
            // Most of the time, mockSource.isFinished() returns the same value as
            // the SplitBatch.noMoreSplits field of the preceding mockSource.getNextBatch() call.
            // However, this is NOT always the case.
            // In this case, the preceding getNextBatch() indicates the noMoreSplits is false,
            // but the next isFinished call will return true.
            mockSource.atSplitCompletion(FINISH);
            requireFutureValue(getNextBatch(source, 1))
                    .assertSize(0)
                    .assertNoMoreSplits(true);
            assertTrue(source.isFinished());
            assertEquals(mockSource.getNextBatchInvocationCount(), 2);
        }
    }

    @Test
    public void testFailImmediate()
    {
        MockSplitSource mockSource = new MockSplitSource()
                .setBatchSize(1)
                .atSplitCompletion(FAIL);
        try (SplitSource source = new BufferingSplitSource(mockSource, 100)) {
            assertFutureFailsWithMockFailure(getNextBatch(source, 200));
            assertEquals(mockSource.getNextBatchInvocationCount(), 1);
        }
    }

    @Test
    public void testFail()
    {
        MockSplitSource mockSource = new MockSplitSource()
                .setBatchSize(1)
                .increaseAvailableSplits(1)
                .atSplitCompletion(FAIL);
        try (SplitSource source = new BufferingSplitSource(mockSource, 100)) {
            assertFutureFailsWithMockFailure(getNextBatch(source, 2));
            assertEquals(mockSource.getNextBatchInvocationCount(), 2);
        }
    }

    private static void assertFutureFailsWithMockFailure(ListenableFuture<?> future)
    {
        assertTrue(future.isDone());
        try {
            future.get();
            fail();
        }
        catch (Exception e) {
            assertContains(e.getMessage(), "Mock failure");
        }
    }

    private static <T> T requireFutureValue(Future<T> future)
    {
        return tryGetFutureValue(future).orElseThrow(AssertionError::new);
    }

    private static ListenableFuture<NextBatchResult> getNextBatch(SplitSource splitSource, int maxSize)
    {
        ListenableFuture<SplitBatch> future = splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), maxSize);
        return Futures.transform(future, NextBatchResult::new, directExecutor());
    }

    @SuppressWarnings("UnusedReturnValue")
    private static class NextBatchResult
    {
        private final SplitBatch splitBatch;

        public NextBatchResult(SplitBatch splitBatch)
        {
            this.splitBatch = requireNonNull(splitBatch, "splits is null");
        }

        public NextBatchResult assertSize(int expectedSize)
        {
            assertEquals(splitBatch.getSplits().size(), expectedSize);
            return this;
        }

        public NextBatchResult assertNoMoreSplits(boolean expectedNoMoreSplits)
        {
            assertEquals(splitBatch.isLastBatch(), expectedNoMoreSplits);
            return this;
        }
    }
}