TestExchangeClient.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.operator;

import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.Response;
import com.facebook.airlift.http.client.testing.TestingHttpClient;
import com.facebook.presto.block.BlockAssertions;
import com.facebook.presto.common.Page;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.memory.context.SimpleLocalMemoryContext;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.SerializedPage;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.facebook.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.testing.Assertions.assertLessThan;
import static com.facebook.presto.common.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES;
import static com.facebook.presto.execution.buffer.TestingPagesSerdeFactory.testingPagesSerde;
import static com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static com.google.common.collect.Maps.uniqueIndex;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public class TestExchangeClient
{
    private ScheduledExecutorService scheduler;
    private ExecutorService pageBufferClientCallbackExecutor;
    private ExecutorService testingHttpClientExecutor;

    private static final PagesSerde PAGES_SERDE = testingPagesSerde();

    @BeforeClass
    public void setUp()
    {
        scheduler = newScheduledThreadPool(4, daemonThreadsNamed("test-%s"));
        pageBufferClientCallbackExecutor = Executors.newSingleThreadExecutor();
        testingHttpClientExecutor = newCachedThreadPool(daemonThreadsNamed("test-%s"));
    }

    @AfterClass(alwaysRun = true)
    public void tearDown()
    {
        if (scheduler != null) {
            scheduler.shutdownNow();
            scheduler = null;
        }

        if (pageBufferClientCallbackExecutor != null) {
            pageBufferClientCallbackExecutor.shutdownNow();
            pageBufferClientCallbackExecutor = null;
        }

        if (testingHttpClientExecutor != null) {
            testingHttpClientExecutor.shutdownNow();
            testingHttpClientExecutor = null;
        }
    }

    @Test
    public void testHappyPath()
    {
        testHappyPath(false, in -> in);
    }

    @Test
    public void testHappyPathChecksum()
    {
        testHappyPath(true, in -> in);
    }

    @Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = "Received corrupted serialized page from host.*")
    public void testHappyPathChecksumFail()
    {
        testHappyPath(true, in -> {
            in[in.length - 1] = (byte) ~in[in.length - 1];
            return in;
        });
    }

    private void testHappyPath(boolean checksum, Function<byte[], byte[]> dataChanger)
    {
        DataSize bufferCapacity = new DataSize(32, MEGABYTE);
        DataSize maxResponseSize = new DataSize(10, MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize, testingPagesSerde(checksum), dataChanger);

        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, createPage(1));
        processor.addPage(location, createPage(2));
        processor.addPage(location, createPage(3));
        processor.setComplete(location);

        ExchangeClient exchangeClient = createExchangeClient(processor, bufferCapacity, maxResponseSize);

        exchangeClient.addLocation(location, TaskId.valueOf("queryid.0.0.0.0"));
        exchangeClient.noMoreLocations();

        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(1));
        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(2));
        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(3));
        assertNull(getNextPage(exchangeClient));
        assertTrue(exchangeClient.isClosed());

        ExchangeClientStatus status = exchangeClient.getStatus();
        assertEquals(status.getBufferedPages(), 0);
        assertEquals(status.getBufferedBytes(), 0);

        // client should have sent only 2 requests: one to get all pages and once to get the done signal
        assertStatus(status.getPageBufferClientStatuses().get(0), location, "closed", 3, 3, 3, "not scheduled");
    }

    @Test(timeOut = 10000)
    public void testAddLocation()
            throws Exception
    {
        DataSize bufferCapacity = new DataSize(32, MEGABYTE);
        DataSize maxResponseSize = new DataSize(10, MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);

        ExchangeClient exchangeClient = createExchangeClient(processor, bufferCapacity, maxResponseSize);

        URI location1 = URI.create("http://localhost:8081/foo");
        processor.addPage(location1, createPage(1));
        processor.addPage(location1, createPage(2));
        processor.addPage(location1, createPage(3));
        processor.setComplete(location1);
        exchangeClient.addLocation(location1, TaskId.valueOf("foo.0.0.0.0"));

        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(1));
        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(2));
        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(3));

        assertFalse(tryGetFutureValue(exchangeClient.isBlocked(), 10, MILLISECONDS).isPresent());
        assertFalse(exchangeClient.isClosed());

        URI location2 = URI.create("http://localhost:8082/bar");
        processor.addPage(location2, createPage(4));
        processor.addPage(location2, createPage(5));
        processor.addPage(location2, createPage(6));
        processor.setComplete(location2);
        exchangeClient.addLocation(location2, TaskId.valueOf("bar.0.0.0.0"));

        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(4));
        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(5));
        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(6));

        assertFalse(tryGetFutureValue(exchangeClient.isBlocked(), 10, MILLISECONDS).isPresent());
        assertFalse(exchangeClient.isClosed());

        exchangeClient.noMoreLocations();
        // The transition to closed may happen asynchronously, since it requires that all the HTTP clients
        // receive a final GONE response, so just spin until it's closed or the test times out.
        while (!exchangeClient.isClosed()) {
            Thread.sleep(1);
        }

        ImmutableMap<URI, PageBufferClientStatus> statuses = uniqueIndex(exchangeClient.getStatus().getPageBufferClientStatuses(), PageBufferClientStatus::getUri);
        assertStatus(statuses.get(location1), location1, "closed", 3, 3, 3, "not scheduled");
        assertStatus(statuses.get(location2), location2, "closed", 3, 3, 3, "not scheduled");
    }

    @Test
    public void testBufferLimit()
    {
        DataSize bufferCapacity = new DataSize(1, BYTE);
        DataSize maxResponseSize = new DataSize(1, BYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);

        URI location = URI.create("http://localhost:8080");

        // add a pages
        processor.addPage(location, createPage(1));
        processor.addPage(location, createPage(2));
        processor.addPage(location, createPage(3));
        processor.setComplete(location);

        ExchangeClient exchangeClient = createExchangeClient(processor, bufferCapacity, maxResponseSize);

        exchangeClient.addLocation(location, TaskId.valueOf("taskid.0.0.0.0"));
        exchangeClient.noMoreLocations();
        assertFalse(exchangeClient.isClosed());

        long start = System.nanoTime();

        // start fetching pages
        exchangeClient.scheduleRequestIfNecessary();
        // wait for a page to be fetched
        do {
            // there is no thread coordination here, so sleep is the best we can do
            assertLessThan(Duration.nanosSince(start), new Duration(5, TimeUnit.SECONDS));
            sleepUninterruptibly(100, MILLISECONDS);
        }
        while (exchangeClient.getStatus().getBufferedPages() == 0);

        // client should have sent a single request for a single page
        assertEquals(exchangeClient.getStatus().getBufferedPages(), 1);
        assertTrue(exchangeClient.getStatus().getBufferedBytes() > 0);
        assertStatus(exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 1, 1, 1, "not scheduled");

        // remove the page and wait for the client to fetch another page
        assertPageEquals(exchangeClient.pollPage(), createPage(1));
        do {
            assertLessThan(Duration.nanosSince(start), new Duration(5, TimeUnit.SECONDS));
            sleepUninterruptibly(100, MILLISECONDS);
        }
        while (exchangeClient.getStatus().getBufferedPages() == 0);

        // client should have sent a single request for a single page
        assertStatus(exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 2, 2, 2, "not scheduled");
        assertEquals(exchangeClient.getStatus().getBufferedPages(), 1);
        assertTrue(exchangeClient.getStatus().getBufferedBytes() > 0);

        // remove the page and wait for the client to fetch another page
        assertPageEquals(exchangeClient.pollPage(), createPage(2));
        do {
            assertLessThan(Duration.nanosSince(start), new Duration(5, TimeUnit.SECONDS));
            sleepUninterruptibly(100, MILLISECONDS);
        }
        while (exchangeClient.getStatus().getBufferedPages() == 0);

        // client should have sent a single request for a single page
        assertStatus(exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 3, 3, 3, "not scheduled");
        assertEquals(exchangeClient.getStatus().getBufferedPages(), 1);
        assertTrue(exchangeClient.getStatus().getBufferedBytes() > 0);

        // remove last page
        assertPageEquals(getNextPage(exchangeClient), createPage(3));

        //  wait for client to decide there are no more pages
        assertNull(getNextPage(exchangeClient));
        assertEquals(exchangeClient.getStatus().getBufferedPages(), 0);
        assertEquals(exchangeClient.getStatus().getBufferedBytes(), 0);
        assertTrue(exchangeClient.isClosed());
        assertStatus(exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "closed", 3, 5, 5, "not scheduled");
    }

    @Test
    public void testClose()
            throws Exception
    {
        DataSize bufferCapacity = new DataSize(1, BYTE);
        DataSize maxResponseSize = new DataSize(1, BYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);

        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, createPage(1));
        processor.addPage(location, createPage(2));
        processor.addPage(location, createPage(3));

        ExchangeClient exchangeClient = createExchangeClient(processor, bufferCapacity, maxResponseSize);

        exchangeClient.addLocation(location, TaskId.valueOf("taskid.0.0.0.0"));
        exchangeClient.noMoreLocations();

        // fetch a page
        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(1));

        // close client while pages are still available
        exchangeClient.close();
        waitUntilEquals(exchangeClient::isFinished, true, new Duration(5, SECONDS));
        assertTrue(exchangeClient.isClosed());
        assertNull(exchangeClient.pollPage());
        assertEquals(exchangeClient.getStatus().getBufferedPages(), 0);
        assertEquals(exchangeClient.getStatus().getBufferedBytes(), 0);

        // client should have sent only 2 requests: one to get all pages and once to get the done signal
        Optional<PageBufferClientStatus> clientStatusOptional = exchangeClient.getStatus().getPageBufferClientStatuses().stream().filter(pageBufferClientStatus -> pageBufferClientStatus.getUri().equals(location)).findFirst();
        assertTrue(clientStatusOptional.isPresent());
        assertStatus(clientStatusOptional.get(), "closed", "not scheduled");
    }

    @Test
    public void testInitialRequestLimit()
    {
        DataSize bufferCapacity = new DataSize(16, MEGABYTE);
        DataSize maxResponseSize = new DataSize(DEFAULT_MAX_PAGE_SIZE_IN_BYTES, BYTE);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize)
        {
            @Override
            public Response handle(Request request)
            {
                if (!awaitUninterruptibly(countDownLatch, 10, SECONDS)) {
                    throw new UncheckedTimeoutException();
                }
                return super.handle(request);
            }
        };

        List<URI> locations = new ArrayList<>();
        int numLocations = 16;
        List<DataSize> expectedMaxSizes = new ArrayList<>();

        // add pages
        for (int i = 0; i < numLocations; i++) {
            URI location = URI.create("http://localhost:" + (8080 + i));
            locations.add(location);

            processor.addPage(location, createPage(DEFAULT_MAX_PAGE_SIZE_IN_BYTES));
            processor.addPage(location, createPage(DEFAULT_MAX_PAGE_SIZE_IN_BYTES));
            processor.addPage(location, createPage(DEFAULT_MAX_PAGE_SIZE_IN_BYTES));

            processor.setComplete(location);

            expectedMaxSizes.add(maxResponseSize);
        }

        try (ExchangeClient exchangeClient = createExchangeClient(processor, bufferCapacity, maxResponseSize)) {
            for (int i = 0; i < numLocations; i++) {
                exchangeClient.addLocation(locations.get(i), TaskId.valueOf("taskid.0.0." + i + ".0"));
            }
            exchangeClient.noMoreLocations();
            assertFalse(exchangeClient.isClosed());

            long start = System.nanoTime();
            countDownLatch.countDown();

            // wait for a page to be fetched
            do {
                // there is no thread coordination here, so sleep is the best we can do
                assertLessThan(Duration.nanosSince(start), new Duration(5, TimeUnit.SECONDS));
                sleepUninterruptibly(100, MILLISECONDS);
            }
            while (exchangeClient.getStatus().getBufferedPages() < 16);

            // Client should have sent 16 requests for a single page (0) and gotten them back
            // The memory limit should be hit immediately and then it doesn't fetch the third page from each
            assertEquals(exchangeClient.getStatus().getBufferedPages(), 16);
            assertTrue(exchangeClient.getStatus().getBufferedBytes() > 0);
            List<PageBufferClientStatus> pageBufferClientStatuses = exchangeClient.getStatus().getPageBufferClientStatuses();
            assertEquals(
                    16,
                    pageBufferClientStatuses.stream()
                            .filter(status -> status.getPagesReceived() == 1)
                            .mapToInt(PageBufferClientStatus::getPagesReceived)
                            .sum());
            assertEquals(processor.getRequestMaxSizes(), expectedMaxSizes);

            for (int i = 0; i < numLocations * 3; i++) {
                assertNotNull(getNextPage(exchangeClient));
            }

            do {
                // there is no thread coordination here, so sleep is the best we can do
                assertLessThan(Duration.nanosSince(start), new Duration(10, TimeUnit.SECONDS));
                sleepUninterruptibly(100, MILLISECONDS);
            }
            while (processor.getRequestMaxSizes().size() < 64);

            for (int i = 0; i < 48; i++) {
                expectedMaxSizes.add(maxResponseSize);
            }

            assertEquals(processor.getRequestMaxSizes(), expectedMaxSizes);
        }
    }

    @Test
    public void testRemoveRemoteSource()
            throws Exception
    {
        DataSize bufferCapacity = new DataSize(1, BYTE);
        DataSize maxResponseSize = new DataSize(1, BYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);

        URI location1 = URI.create("http://localhost:8081/foo.0.0.0.0");
        TaskId taskId1 = TaskId.valueOf("foo.0.0.0.0");
        URI location2 = URI.create("http://localhost:8082/bar.0.0.0.0");
        TaskId taskId2 = TaskId.valueOf("bar.0.0.0.0");

        processor.addPage(location1, createPage(1));
        processor.addPage(location1, createPage(2));
        processor.addPage(location1, createPage(3));

        ExchangeClient exchangeClient = createExchangeClient(processor, bufferCapacity, maxResponseSize);

        exchangeClient.addLocation(location1, taskId1);
        exchangeClient.addLocation(location2, taskId2);

        assertFalse(exchangeClient.isClosed());

        // Wait until exactly one page is buffered:
        //  * We cannot call ExchangeClient#pollPage() directly, since it will schedule the next request to buffer data,
        //    and this request to buffer data could win the race against the request to remove remote source.
        //  * Buffer capacity is set to 1 byte, so only one page can be buffered.
        waitUntilEquals(() -> exchangeClient.getStatus().getBufferedPages(), 1, new Duration(5, SECONDS));
        assertEquals(exchangeClient.getStatus().getBufferedPages(), 1);

        // remove remote source
        exchangeClient.removeRemoteSource(taskId1);

        // the previously buffered page will still be read out
        assertPageEquals(getNextPage(exchangeClient), createPage(1));

        // client should not receive any further pages from removed remote source
        assertNull(exchangeClient.pollPage());
        assertEquals(exchangeClient.getStatus().getBufferedPages(), 0);

        // add pages to another source
        processor.addPage(location2, createPage(4));
        processor.addPage(location2, createPage(5));
        processor.addPage(location2, createPage(6));
        processor.setComplete(location2);

        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(4));
        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(5));
        assertFalse(exchangeClient.isClosed());
        assertPageEquals(getNextPage(exchangeClient), createPage(6));

        assertFalse(tryGetFutureValue(exchangeClient.isBlocked(), 10, MILLISECONDS).isPresent());
        assertFalse(exchangeClient.isClosed());

        exchangeClient.noMoreLocations();
        // The transition to closed may happen asynchronously, since it requires that all the HTTP clients
        // receive a final GONE response, so just spin until it's closed or the test times out.
        while (!exchangeClient.isClosed()) {
            Thread.sleep(1);
        }

        ExchangeClientStatus exchangeClientStatus = exchangeClient.getStatus();
        Optional<PageBufferClientStatus> clientStatusOptional1 = exchangeClientStatus.getPageBufferClientStatuses()
                .stream()
                .filter(pageBufferClientStatus -> pageBufferClientStatus.getUri().equals(location1)).findFirst();
        assertTrue(clientStatusOptional1.isPresent());
        assertStatus(clientStatusOptional1.get(), "closed", "not scheduled");

        Optional<PageBufferClientStatus> clientStatusOptional2 = exchangeClientStatus.getPageBufferClientStatuses()
                .stream()
                .filter(pageBufferClientStatus -> pageBufferClientStatus.getUri().equals(location2)).findFirst();
        assertTrue(clientStatusOptional2.isPresent());
        assertStatus(clientStatusOptional2.get(), "closed", "not scheduled");
    }

    private static Page createPage(int size)
    {
        return new Page(BlockAssertions.createLongSequenceBlock(0, size));
    }

    private static SerializedPage getNextPage(ExchangeClient exchangeClient)
    {
        ListenableFuture<SerializedPage> futurePage = Futures.transform(exchangeClient.isBlocked(), ignored -> exchangeClient.pollPage(), directExecutor());
        return tryGetFutureValue(futurePage, 100, TimeUnit.SECONDS).orElse(null);
    }

    private static void assertPageEquals(SerializedPage actualPage, Page expectedPage)
    {
        assertNotNull(actualPage);
        assertEquals(actualPage.getPositionCount(), expectedPage.getPositionCount());
        assertEquals(PAGES_SERDE.deserialize(actualPage).getChannelCount(), expectedPage.getChannelCount());
    }

    private static void assertStatus(
            PageBufferClientStatus clientStatus,
            String status,
            String httpRequestState)
    {
        assertEquals(clientStatus.getState(), status, "status");
        assertEquals(clientStatus.getHttpRequestState(), httpRequestState, "httpRequestState");
    }

    private static void assertStatus(
            PageBufferClientStatus clientStatus,
            URI location,
            String status,
            int pagesReceived,
            int requestsScheduled,
            int requestsCompleted,
            String httpRequestState)
    {
        assertEquals(clientStatus.getUri(), location);
        assertEquals(clientStatus.getState(), status, "status");
        assertEquals(clientStatus.getPagesReceived(), pagesReceived, "pagesReceived");
        assertEquals(clientStatus.getRequestsScheduled(), requestsScheduled, "requestsScheduled");
        assertEquals(clientStatus.getRequestsCompleted(), requestsCompleted, "requestsCompleted");
        assertEquals(clientStatus.getHttpRequestState(), httpRequestState, "httpRequestState");
    }

    private <T> void waitUntilEquals(Supplier<T> actualSupplier, T expected, Duration timeout)
    {
        long nanoUntil = System.nanoTime() + timeout.toMillis() * 1_000_000;
        while (System.nanoTime() - nanoUntil < 0) {
            if (expected.equals(actualSupplier.get())) {
                return;
            }
            try {
                Thread.sleep(10);
            }
            catch (InterruptedException e) {
                // do nothing
            }
        }
        assertEquals(actualSupplier.get(), expected);
    }

    private ExchangeClient createExchangeClient(MockExchangeRequestProcessor processor, DataSize bufferCapacity, DataSize maxResponseSize)
    {
        return new ExchangeClient(
                bufferCapacity,
                maxResponseSize,
                1,
                new Duration(1, MINUTES),
                true,
                0.2,
                new HttpShuffleClientProvider(new TestingHttpClient(processor, testingHttpClientExecutor)),
                scheduler,
                new SimpleLocalMemoryContext(newSimpleAggregatedMemoryContext(), "test"),
                pageBufferClientCallbackExecutor);
    }
}