TestAsyncQueue.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.facebook.presto.hive.util;

import com.facebook.airlift.concurrent.Threads;
import com.facebook.presto.hive.util.AsyncQueue.BorrowResult;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue;
import static com.facebook.airlift.testing.Assertions.assertContains;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TestAsyncQueue
{
    private ExecutorService executor;

    @BeforeClass
    public void setUpClass()
    {
        executor = Executors.newFixedThreadPool(8, Threads.daemonThreadsNamed("test-async-queue-%s"));
    }

    @AfterClass(alwaysRun = true)
    public void tearDownClass()
    {
        executor.shutdownNow();
    }

    @Test(timeOut = 10_000)
    public void testGetPartial()
            throws Exception
    {
        AsyncQueue<String> queue = new AsyncQueue<>(4, executor);

        queue.offer("1");
        queue.offer("2");
        queue.offer("3");
        assertEquals(queue.getBatchAsync(100).get(), ImmutableList.of("1", "2", "3"));

        queue.finish();
        assertTrue(queue.isFinished());
    }

    @Test(timeOut = 10_000)
    public void testFullQueue()
            throws Exception
    {
        AsyncQueue<String> queue = new AsyncQueue<>(4, executor);

        assertTrue(queue.offer("1").isDone());
        assertTrue(queue.offer("2").isDone());
        assertTrue(queue.offer("3").isDone());

        assertFalse(queue.offer("4").isDone());
        assertFalse(queue.offer("5").isDone());
        ListenableFuture<?> offerFuture = queue.offer("6");
        assertFalse(offerFuture.isDone());

        assertEquals(queue.getBatchAsync(2).get(), ImmutableList.of("1", "2"));
        assertFalse(offerFuture.isDone());

        assertEquals(queue.getBatchAsync(1).get(), ImmutableList.of("3"));
        offerFuture.get();

        offerFuture = queue.offer("7");
        assertFalse(offerFuture.isDone());

        queue.finish();
        offerFuture.get();
        assertFalse(queue.isFinished());
        assertEquals(queue.getBatchAsync(4).get(), ImmutableList.of("4", "5", "6", "7"));
        assertTrue(queue.isFinished());
    }

    @Test(timeOut = 10_000)
    public void testEmptyQueue()
            throws Exception
    {
        AsyncQueue<String> queue = new AsyncQueue<>(4, executor);

        assertTrue(queue.offer("1").isDone());
        assertTrue(queue.offer("2").isDone());
        assertTrue(queue.offer("3").isDone());
        assertEquals(queue.getBatchAsync(2).get(), ImmutableList.of("1", "2"));
        assertEquals(queue.getBatchAsync(2).get(), ImmutableList.of("3"));
        ListenableFuture<?> batchFuture = queue.getBatchAsync(2);
        assertFalse(batchFuture.isDone());

        assertTrue(queue.offer("4").isDone());
        assertEquals(batchFuture.get(), ImmutableList.of("4"));

        batchFuture = queue.getBatchAsync(2);
        assertFalse(batchFuture.isDone());
        queue.finish();
        batchFuture.get();
        assertTrue(queue.isFinished());
    }

    @Test(timeOut = 10_000)
    public void testOfferAfterFinish()
            throws Exception
    {
        AsyncQueue<String> queue = new AsyncQueue<>(4, executor);

        assertTrue(queue.offer("1").isDone());
        assertTrue(queue.offer("2").isDone());
        assertTrue(queue.offer("3").isDone());
        assertFalse(queue.offer("4").isDone());

        queue.finish();
        assertTrue(queue.offer("5").isDone());
        assertTrue(queue.offer("6").isDone());
        assertTrue(queue.offer("7").isDone());
        assertFalse(queue.isFinished());

        assertEquals(queue.getBatchAsync(100).get(), ImmutableList.of("1", "2", "3", "4"));
        assertTrue(queue.isFinished());
    }

    @Test
    public void testBorrow()
            throws Exception
    {
        // The numbers are chosen so that depletion of elements can happen.
        // Size is 5. Two threads each borrowing 3 can deplete the queue.
        // The third thread may try to borrow when the queue is already empty.
        // We also want to confirm that isFinished won't return true even if queue is depleted.

        AsyncQueue<Integer> queue = new AsyncQueue<>(4, executor);
        queue.offer(1);
        queue.offer(2);
        queue.offer(3);
        queue.offer(4);
        queue.offer(5);

        // Repeatedly remove up to 3 elements and re-insert them.
        Runnable runnable = () -> {
            for (int i = 0; i < 700; i++) {
                getFutureValue(queue.borrowBatchAsync(3, elements -> new BorrowResult<>(elements, null)));
            }
        };

        Future<?> future1 = executor.submit(runnable);
        Future<?> future2 = executor.submit(runnable);
        Future<?> future3 = executor.submit(runnable);
        future1.get();
        future2.get();
        future3.get();

        queue.finish();
        assertFalse(queue.isFinished());

        AtomicBoolean done = new AtomicBoolean();
        executor.submit(() -> {
            while (!done.get()) {
                assertFalse(queue.isFinished() || done.get());
            }
        });

        future1 = executor.submit(runnable);
        future2 = executor.submit(runnable);
        future3 = executor.submit(runnable);
        future1.get();
        future2.get();
        future3.get();
        done.set(true);

        assertFalse(queue.isFinished());
        ArrayList<Integer> list = new ArrayList<>(queue.getBatchAsync(100).get());
        list.sort(Integer::compare);
        assertEquals(list, ImmutableList.of(1, 2, 3, 4, 5));
        assertTrue(queue.isFinished());
    }

    @Test
    public void testBorrowThrows()
            throws Exception
    {
        // It doesn't matter the exact behavior when the caller-supplied function to borrow fails.
        // However, it must not block pending futures.

        AsyncQueue<Integer> queue = new AsyncQueue<>(4, executor);
        queue.offer(1);
        queue.offer(2);
        queue.offer(3);
        queue.offer(4);
        queue.offer(5);

        ListenableFuture<?> future1 = queue.offer(6);
        assertFalse(future1.isDone());

        Runnable runnable = () -> {
            getFutureValue(queue.borrowBatchAsync(1, elements -> {
                throw new RuntimeException("test fail");
            }));
        };

        try {
            executor.submit(runnable).get();
            fail("expected failure");
        }
        catch (ExecutionException e) {
            assertContains(e.getMessage(), "test fail");
        }

        ListenableFuture<?> future2 = queue.offer(7);
        assertFalse(future1.isDone());
        assertFalse(future2.isDone());
        queue.finish();
        future1.get();
        future2.get();
        assertTrue(queue.offer(8).isDone());

        try {
            executor.submit(runnable).get();
            fail("expected failure");
        }
        catch (ExecutionException e) {
            assertContains(e.getMessage(), "test fail");
        }

        assertTrue(queue.offer(9).isDone());

        assertFalse(queue.isFinished());
        ArrayList<Integer> list = new ArrayList<>(queue.getBatchAsync(100).get());
        // 1 and 2 were removed by borrow call; 8 and 9 were never inserted because insertion happened after finish.
        assertEquals(list, ImmutableList.of(3, 4, 5, 6, 7));
        assertTrue(queue.isFinished());
    }
}