TestClientBuffer.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.execution.buffer;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.buffer.ClientBuffer.PagesSupplier;
import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId;
import com.facebook.presto.execution.buffer.SerializedPageReference.PagesReleasedListener;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import org.testng.annotations.Test;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.execution.buffer.BufferResult.emptyResults;
import static com.facebook.presto.execution.buffer.BufferTestUtils.NO_WAIT;
import static com.facebook.presto.execution.buffer.BufferTestUtils.PAGES_SERDE;
import static com.facebook.presto.execution.buffer.BufferTestUtils.assertBufferResultEquals;
import static com.facebook.presto.execution.buffer.BufferTestUtils.createBufferResult;
import static com.facebook.presto.execution.buffer.BufferTestUtils.createPage;
import static com.facebook.presto.execution.buffer.BufferTestUtils.getFuture;
import static com.facebook.presto.execution.buffer.BufferTestUtils.sizeOfPages;
import static com.facebook.presto.execution.buffer.SerializedPageReference.dereferencePages;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TestClientBuffer
{
    private static final String TASK_INSTANCE_ID = "task-instance-id";
    private static final ImmutableList<BigintType> TYPES = ImmutableList.of(BIGINT);
    private static final OutputBufferId BUFFER_ID = new OutputBufferId(33);
    private static final String INVALID_SEQUENCE_ID = "Invalid sequence id";
    private static final PagesReleasedListener NOOP_RELEASE_LISTENER = (lifespan, releasedPagesCount, releasedSizeInBytes) -> {};

    @Test
    public void testSimplePushBuffer()
    {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);

        // add three pages to the buffer
        for (int i = 0; i < 3; i++) {
            addPage(buffer, createPage(i));
        }
        assertBufferInfo(buffer, 3, 0);

        // get the pages elements from the buffer
        assertBufferResultEquals(TYPES, getBufferResult(buffer, 0, sizeOfPages(10).toBytes(), NO_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
        // pages not acknowledged yet so state is the same
        assertBufferInfo(buffer, 3, 0);

        // acknowledge first three pages in the buffer
        buffer.getPages(3, sizeOfPages(10).toBytes()).cancel(true);
        // pages now acknowledged
        assertBufferInfo(buffer, 0, 3);

        // add 3 more pages
        for (int i = 3; i < 6; i++) {
            addPage(buffer, createPage(i));
        }
        assertBufferInfo(buffer, 3, 3);

        // remove a page
        assertBufferResultEquals(TYPES, getBufferResult(buffer, 3, sizeOfPages(1).toBytes(), NO_WAIT), bufferResult(3, createPage(3)));
        // page not acknowledged yet so sent count is the same
        assertBufferInfo(buffer, 3, 3);

        // set no more pages
        buffer.setNoMorePages();
        // state should not change
        assertBufferInfo(buffer, 3, 3);

        // remove a page
        assertBufferResultEquals(TYPES, getBufferResult(buffer, 4, sizeOfPages(1).toBytes(), NO_WAIT), bufferResult(4, createPage(4)));
        assertBufferInfo(buffer, 2, 4);

        // remove last pages from, should not be finished
        assertBufferResultEquals(TYPES, getBufferResult(buffer, 5, sizeOfPages(30).toBytes(), NO_WAIT), bufferResult(5, createPage(5)));
        assertBufferInfo(buffer, 1, 5);

        // acknowledge all pages from the buffer, should return a finished buffer result
        assertBufferResultEquals(TYPES, getBufferResult(buffer, 6, sizeOfPages(10).toBytes(), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 6, true));
        assertBufferInfo(buffer, 0, 6);

        // buffer is not destroyed until explicitly destroyed
        buffer.destroy();
        assertBufferDestroyed(buffer, 6);
    }

    @Test
    public void testSimplePullBuffer()
    {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);

        // create a page supplier with 3 initial pages
        TestingPagesSupplier supplier = new TestingPagesSupplier();
        for (int i = 0; i < 3; i++) {
            supplier.addPage(createPage(i));
        }
        assertEquals(supplier.getBufferedPages(), 3);

        // get the pages elements from the buffer
        assertBufferResultEquals(TYPES, getBufferResult(buffer, supplier, 0, sizeOfPages(10).toBytes(), NO_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
        // 3 pages are moved to the client buffer, but not acknowledged yet
        assertEquals(supplier.getBufferedPages(), 0);
        assertBufferInfo(buffer, 3, 0);

        // acknowledge first three pages in the buffer
        ListenableFuture<BufferResult> pendingRead = buffer.getPages(3, sizeOfPages(1).toBytes());
        // pages now acknowledged
        assertEquals(supplier.getBufferedPages(), 0);
        assertBufferInfo(buffer, 0, 3);
        assertFalse(pendingRead.isDone());

        // add 3 more pages
        for (int i = 3; i < 6; i++) {
            supplier.addPage(createPage(i));
        }
        assertEquals(supplier.getBufferedPages(), 3);

        // notify the buffer that there is more data, and verify previous read completed
        buffer.loadPagesIfNecessary(supplier);
        assertBufferResultEquals(TYPES, getFuture(pendingRead, NO_WAIT), bufferResult(3, createPage(3)));
        // 1 page wad moved to the client buffer, but not acknowledged yet
        assertEquals(supplier.getBufferedPages(), 2);
        assertBufferInfo(buffer, 1, 3);

        // set no more pages
        supplier.setNoMorePages();
        // state should not change
        assertEquals(supplier.getBufferedPages(), 2);
        assertBufferInfo(buffer, 1, 3);

        // remove a page
        assertBufferResultEquals(TYPES, getBufferResult(buffer, supplier, 4, sizeOfPages(1).toBytes(), NO_WAIT), bufferResult(4, createPage(4)));
        assertBufferInfo(buffer, 1, 4);
        assertEquals(supplier.getBufferedPages(), 1);

        // remove last pages from, should not be finished
        assertBufferResultEquals(TYPES, getBufferResult(buffer, supplier, 5, sizeOfPages(30).toBytes(), NO_WAIT), bufferResult(5, createPage(5)));
        assertBufferInfo(buffer, 1, 5);
        assertEquals(supplier.getBufferedPages(), 0);

        // acknowledge all pages from the buffer, should return a finished buffer result
        assertBufferResultEquals(TYPES, getBufferResult(buffer, supplier, 6, sizeOfPages(10).toBytes(), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 6, true));
        assertBufferInfo(buffer, 0, 6);
        assertEquals(supplier.getBufferedPages(), 0);

        // buffer is not destroyed until explicitly destroyed
        buffer.destroy();
        assertBufferDestroyed(buffer, 6);
    }

    @Test
    public void testBufferResults()
    {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);

        long totalSizeOfPagesInBytes = 0;
        for (int i = 0; i < 3; i++) {
            Page page = createPage(i);
            SerializedPageReference pageReference = new SerializedPageReference(PAGES_SERDE.serialize(page), 1, Lifespan.taskWide());
            totalSizeOfPagesInBytes = totalSizeOfPagesInBytes + pageReference.getSerializedPage().getRetainedSizeInBytes();
            addPage(buffer, page);
        }
        // Everything buffered
        assertBufferInfo(buffer, 3, 0, totalSizeOfPagesInBytes);

        BufferResult bufferResult = getBufferResult(buffer, 0, sizeOfPages(1).toBytes(), NO_WAIT);
        long remainingBytes = totalSizeOfPagesInBytes - bufferResult.getBufferedBytes();
        assertEquals(bufferResult.getBufferedBytes(), sizeOfPages(1).toBytes());
        assertBufferInfo(buffer, 3, 0, totalSizeOfPagesInBytes);

        buffer.acknowledgePages(bufferResult.getNextToken());
        assertBufferInfo(buffer, 2, 1, remainingBytes);
    }

    @Test
    public void testDuplicateRequests()
    {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);

        // add three pages
        for (int i = 0; i < 3; i++) {
            addPage(buffer, createPage(i));
        }
        assertBufferInfo(buffer, 3, 0);

        // get the three pages
        assertBufferResultEquals(TYPES, getBufferResult(buffer, 0, sizeOfPages(10).toBytes(), NO_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
        // pages not acknowledged yet so state is the same
        assertBufferInfo(buffer, 3, 0);

        // get the three pages again
        assertBufferResultEquals(TYPES, getBufferResult(buffer, 0, sizeOfPages(10).toBytes(), NO_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
        // pages not acknowledged yet so state is the same
        assertBufferInfo(buffer, 3, 0);

        // acknowledge the pages
        buffer.getPages(3, 10L).cancel(true);

        // attempt to get the three elements again, which will return an empty result
        assertBufferResultEquals(TYPES, getBufferResult(buffer, 0, sizeOfPages(10).toBytes(), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, false));
        // pages not acknowledged yet so state is the same
        assertBufferInfo(buffer, 0, 3);
    }

    @Test
    public void testAddAfterNoMorePages()
    {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        buffer.setNoMorePages();
        addPage(buffer, createPage(0));
        addPage(buffer, createPage(0));
        assertBufferInfo(buffer, 0, 0);
    }

    @Test
    public void testAddAfterDestroy()
    {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        buffer.destroy();
        addPage(buffer, createPage(0));
        addPage(buffer, createPage(0));
        assertBufferDestroyed(buffer, 0);
    }

    @Test
    public void testDestroy()
    {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);

        // add 5 pages the buffer
        for (int i = 0; i < 5; i++) {
            addPage(buffer, createPage(i));
        }
        buffer.setNoMorePages();

        // read a page
        assertBufferResultEquals(TYPES, getBufferResult(buffer, 0, sizeOfPages(1).toBytes(), NO_WAIT), bufferResult(0, createPage(0)));

        // destroy without acknowledgement
        buffer.destroy();
        assertBufferDestroyed(buffer, 0);

        // follow token from previous read, which should return a finished result
        assertBufferResultEquals(TYPES, getBufferResult(buffer, 1, sizeOfPages(1).toBytes(), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, true));
    }

    @Test
    public void testNoMorePagesFreesReader()
    {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);

        // attempt to get a page
        ListenableFuture<BufferResult> future = buffer.getPages(0, sizeOfPages(10).toBytes());

        // verify we are waiting for a page
        assertFalse(future.isDone());

        // add one item
        addPage(buffer, createPage(0));

        // verify we got one page
        assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), bufferResult(0, createPage(0)));

        // attempt to get another page, and verify we are blocked
        future = buffer.getPages(1, sizeOfPages(10).toBytes());
        assertFalse(future.isDone());

        // finish the buffer
        buffer.setNoMorePages();
        assertBufferInfo(buffer, 0, 1);

        // verify the future completed
        assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), emptyResults(TASK_INSTANCE_ID, 1, true));
    }

    @Test
    public void testDestroyFreesReader()
    {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);

        // attempt to get a page
        ListenableFuture<BufferResult> future = buffer.getPages(0, sizeOfPages(10).toBytes());

        // verify we are waiting for a page
        assertFalse(future.isDone());

        // add one item
        addPage(buffer, createPage(0));
        assertTrue(future.isDone());

        // verify we got one page
        assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), bufferResult(0, createPage(0)));

        // attempt to get another page, and verify we are blocked
        future = buffer.getPages(1, sizeOfPages(10).toBytes());
        assertFalse(future.isDone());

        // destroy the buffer
        buffer.destroy();

        // verify the future completed
        // buffer does not return a "complete" result in this case, but it doesn't matter
        assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), emptyResults(TASK_INSTANCE_ID, 1, false));

        // further requests will see a completed result
        assertBufferDestroyed(buffer, 1);
    }

    @Test
    public void testInvalidTokenFails()
    {
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, NOOP_RELEASE_LISTENER);
        addPage(buffer, createPage(0));
        addPage(buffer, createPage(1));
        buffer.getPages(1, sizeOfPages(10).toBytes()).cancel(true);
        assertBufferInfo(buffer, 1, 1);

        // request negative token
        assertInvalidSequenceId(buffer, -1);
        assertBufferInfo(buffer, 1, 1);

        // request token off end of buffer
        assertInvalidSequenceId(buffer, 10);
        assertBufferInfo(buffer, 1, 1);
    }

    @Test
    public void testReferenceCount()
    {
        AtomicInteger releasedPages = new AtomicInteger(0);
        PagesReleasedListener onPagesReleased = (lifespan, releasedPagesCount, releasedSizeInBytes) -> {
            releasedPages.addAndGet(releasedPagesCount);
        };
        ClientBuffer buffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID, onPagesReleased);

        // add 2 pages and verify they are referenced
        addPage(buffer, createPage(0));
        addPage(buffer, createPage(1));
        assertEquals(releasedPages.get(), 0);
        assertBufferInfo(buffer, 2, 0);

        // read one page
        assertBufferResultEquals(TYPES, getBufferResult(buffer, 0, sizeOfPages(0).toBytes(), NO_WAIT), bufferResult(0, createPage(0)));
        assertEquals(releasedPages.get(), 0);
        assertBufferInfo(buffer, 2, 0);

        // acknowledge first page
        assertBufferResultEquals(TYPES, getBufferResult(buffer, 1, sizeOfPages(1).toBytes(), NO_WAIT), bufferResult(1, createPage(1)));
        assertEquals(releasedPages.get(), 1);
        assertBufferInfo(buffer, 1, 1);

        // destroy the buffer
        buffer.destroy();
        assertEquals(releasedPages.get(), 2);
        assertBufferDestroyed(buffer, 1);
    }

    private static void assertInvalidSequenceId(ClientBuffer buffer, int sequenceId)
    {
        try {
            buffer.getPages(sequenceId, sizeOfPages(10).toBytes());
            fail("Expected " + INVALID_SEQUENCE_ID);
        }
        catch (IllegalArgumentException e) {
            assertEquals(e.getMessage(), INVALID_SEQUENCE_ID);
        }
    }

    private static BufferResult getBufferResult(ClientBuffer buffer, long sequenceId, long maxSizeInBytes, Duration maxWait)
    {
        ListenableFuture<BufferResult> future = buffer.getPages(sequenceId, maxSizeInBytes);
        return getFuture(future, maxWait);
    }

    private static BufferResult getBufferResult(ClientBuffer buffer, PagesSupplier supplier, long sequenceId, long maxSizeInBytes, Duration maxWait)
    {
        ListenableFuture<BufferResult> future = buffer.getPages(sequenceId, maxSizeInBytes, Optional.of(supplier));
        return getFuture(future, maxWait);
    }

    private static void addPage(ClientBuffer buffer, Page page)
    {
        addPage(buffer, page, NOOP_RELEASE_LISTENER);
    }

    private static void addPage(ClientBuffer buffer, Page page, PagesReleasedListener onPagesReleased)
    {
        SerializedPageReference serializedPageReference = new SerializedPageReference(PAGES_SERDE.serialize(page), 1, Lifespan.taskWide());
        addPage(buffer, serializedPageReference, onPagesReleased);
    }

    private static void addPage(ClientBuffer buffer, SerializedPageReference page, PagesReleasedListener onPagesReleased)
    {
        buffer.enqueuePages(ImmutableList.of(page));
        dereferencePages(ImmutableList.of(page), onPagesReleased);
    }

    private static void assertBufferInfo(
            ClientBuffer buffer,
            int bufferedPages,
            int pagesSent)
    {
        assertEquals(
                buffer.getInfo(),
                new BufferInfo(
                        BUFFER_ID,
                        false,
                        bufferedPages,
                        pagesSent,
                        new PageBufferInfo(
                                BUFFER_ID.getId(),
                                bufferedPages,
                                sizeOfPages(bufferedPages).toBytes(),
                                bufferedPages + pagesSent, // every page has one row
                                bufferedPages + pagesSent)));
        assertFalse(buffer.isDestroyed());
    }

    private static void assertBufferInfo(
            ClientBuffer buffer,
            int bufferedPages,
            int pagesSent,
            long bufferedBytes)
    {
        assertEquals(
                buffer.getInfo(),
                new BufferInfo(
                        BUFFER_ID,
                        false,
                        bufferedPages,
                        pagesSent,
                        new PageBufferInfo(
                                BUFFER_ID.getId(),
                                bufferedPages,
                                sizeOfPages(bufferedPages).toBytes(),
                                bufferedPages + pagesSent, // every page has one row
                                bufferedPages + pagesSent)));
        assertFalse(buffer.isDestroyed());
    }

    private static BufferResult bufferResult(long token, Page firstPage, Page... otherPages)
    {
        List<Page> pages = ImmutableList.<Page>builder().add(firstPage).add(otherPages).build();
        return createBufferResult(TASK_INSTANCE_ID, token, pages);
    }

    private static void assertBufferDestroyed(ClientBuffer buffer, int pagesSent)
    {
        BufferInfo bufferInfo = buffer.getInfo();
        assertEquals(bufferInfo.getBufferedPages(), 0);
        assertEquals(bufferInfo.getPagesSent(), pagesSent);
        assertTrue(bufferInfo.isFinished());
        assertTrue(buffer.isDestroyed());
    }

    @ThreadSafe
    private static class TestingPagesSupplier
            implements PagesSupplier
    {
        @GuardedBy("this")
        private final Deque<SerializedPageReference> buffer = new ArrayDeque<>();

        @GuardedBy("this")
        private boolean noMorePages;

        @Override
        public synchronized boolean mayHaveMorePages()
        {
            return !noMorePages || !buffer.isEmpty();
        }

        synchronized void setNoMorePages()
        {
            this.noMorePages = true;
        }

        synchronized int getBufferedPages()
        {
            return buffer.size();
        }

        public synchronized void addPage(Page page)
        {
            requireNonNull(page, "page is null");
            checkState(!noMorePages);
            buffer.add(new SerializedPageReference(PAGES_SERDE.serialize(page), 1, Lifespan.taskWide()));
        }

        @Override
        public synchronized List<SerializedPageReference> getPages(long maxSizeInBytes)
        {
            List<SerializedPageReference> pages = new ArrayList<>();
            long bytesRemoved = 0;

            while (true) {
                SerializedPageReference page = buffer.peek();
                if (page == null) {
                    break;
                }
                bytesRemoved += page.getRetainedSizeInBytes();
                // break (and don't add) if this page would exceed the limit
                if (!pages.isEmpty() && bytesRemoved > maxSizeInBytes) {
                    break;
                }
                // this should not happen since we have a lock
                checkState(buffer.poll() == page, "Buffer corrupted");
                pages.add(page);
            }

            return ImmutableList.copyOf(pages);
        }
    }
}