TestBroadcastOutputBuffer.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.buffer;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId;
import com.facebook.presto.memory.context.AggregatedMemoryContext;
import com.facebook.presto.memory.context.MemoryReservationHandler;
import com.facebook.presto.memory.context.SimpleLocalMemoryContext;
import com.facebook.presto.spi.page.PagesSerde;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.execution.buffer.BufferResult.emptyResults;
import static com.facebook.presto.execution.buffer.BufferState.OPEN;
import static com.facebook.presto.execution.buffer.BufferState.TERMINAL_BUFFER_STATES;
import static com.facebook.presto.execution.buffer.BufferTestUtils.MAX_WAIT;
import static com.facebook.presto.execution.buffer.BufferTestUtils.NO_WAIT;
import static com.facebook.presto.execution.buffer.BufferTestUtils.acknowledgeBufferResult;
import static com.facebook.presto.execution.buffer.BufferTestUtils.addPage;
import static com.facebook.presto.execution.buffer.BufferTestUtils.assertBufferResultEquals;
import static com.facebook.presto.execution.buffer.BufferTestUtils.assertFinished;
import static com.facebook.presto.execution.buffer.BufferTestUtils.assertFutureIsDone;
import static com.facebook.presto.execution.buffer.BufferTestUtils.assertQueueClosed;
import static com.facebook.presto.execution.buffer.BufferTestUtils.assertQueueState;
import static com.facebook.presto.execution.buffer.BufferTestUtils.createBufferResult;
import static com.facebook.presto.execution.buffer.BufferTestUtils.createPage;
import static com.facebook.presto.execution.buffer.BufferTestUtils.enqueuePage;
import static com.facebook.presto.execution.buffer.BufferTestUtils.getBufferResult;
import static com.facebook.presto.execution.buffer.BufferTestUtils.getFuture;
import static com.facebook.presto.execution.buffer.BufferTestUtils.sizeOfPages;
import static com.facebook.presto.execution.buffer.OutputBuffers.BROADCAST_PARTITION_ID;
import static com.facebook.presto.execution.buffer.OutputBuffers.BufferType.BROADCAST;
import static com.facebook.presto.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static com.facebook.presto.execution.buffer.TestingPagesSerdeFactory.testingPagesSerde;
import static com.facebook.presto.memory.context.AggregatedMemoryContext.newRootAggregatedMemoryContext;
import static com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.Unit.BYTE;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class TestBroadcastOutputBuffer
{
private static final PagesSerde PAGES_SERDE = testingPagesSerde();
private static final String TASK_INSTANCE_ID = "task-instance-id";
private static final ImmutableList<BigintType> TYPES = ImmutableList.of(BIGINT);
private static final OutputBufferId FIRST = new OutputBufferId(0);
private static final OutputBufferId SECOND = new OutputBufferId(1);
private static final OutputBufferId THIRD = new OutputBufferId(2);
private ScheduledExecutorService stateNotificationExecutor;
@BeforeClass
public void setUp()
{
stateNotificationExecutor = newScheduledThreadPool(5, daemonThreadsNamed("test-%s"));
}
@AfterClass(alwaysRun = true)
public void tearDown()
{
if (stateNotificationExecutor != null) {
stateNotificationExecutor.shutdownNow();
stateNotificationExecutor = null;
}
}
@Test
public void testInvalidConstructorArg()
{
try {
createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST).withBuffer(FIRST, BROADCAST_PARTITION_ID).withNoMoreBufferIds(), new DataSize(0, BYTE));
fail("Expected IllegalStateException");
}
catch (IllegalArgumentException ignored) {
}
try {
createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST), new DataSize(0, BYTE));
fail("Expected IllegalStateException");
}
catch (IllegalArgumentException ignored) {
}
}
@Test
public void testSimple()
{
OutputBuffers outputBuffers = createInitialEmptyOutputBuffers(BROADCAST);
BroadcastOutputBuffer buffer = createBroadcastBuffer(outputBuffers, sizeOfPages(10));
// add three items
for (int i = 0; i < 3; i++) {
addPage(buffer, createPage(i));
}
outputBuffers = createInitialEmptyOutputBuffers(BROADCAST).withBuffer(FIRST, BROADCAST_PARTITION_ID);
// add a queue
buffer.setOutputBuffers(outputBuffers);
assertQueueState(buffer, FIRST, 3, 0);
// get the three elements
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 0, sizeOfPages(10), NO_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
// pages not acknowledged yet so state is the same
assertQueueState(buffer, FIRST, 3, 0);
// acknowledge first three pages
buffer.get(FIRST, 3, 10L).cancel(true);
// pages now acknowledged
assertQueueState(buffer, FIRST, 0, 3);
// fill the buffer (we already added 3 pages)
for (int i = 3; i < 10; i++) {
addPage(buffer, createPage(i));
}
assertQueueState(buffer, FIRST, 7, 3);
// try to add one more page, which should block
ListenableFuture<?> future = enqueuePage(buffer, createPage(10));
assertFalse(future.isDone());
assertQueueState(buffer, FIRST, 8, 3);
// remove a page
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 3, sizeOfPages(1), NO_WAIT), bufferResult(3, createPage(3)));
// page not acknowledged yet so sent count is the same
assertQueueState(buffer, FIRST, 8, 3);
// we should still be blocked
assertFalse(future.isDone());
//
// add another buffer and verify it sees all pages
outputBuffers = outputBuffers.withBuffer(SECOND, BROADCAST_PARTITION_ID);
buffer.setOutputBuffers(outputBuffers);
assertQueueState(buffer, SECOND, 11, 0);
assertBufferResultEquals(TYPES, getBufferResult(buffer, SECOND, 0, sizeOfPages(10), NO_WAIT), bufferResult(0, createPage(0),
createPage(1),
createPage(2),
createPage(3),
createPage(4),
createPage(5),
createPage(6),
createPage(7),
createPage(8),
createPage(9)));
// page not acknowledged yet so sent count is still zero
assertQueueState(buffer, SECOND, 11, 0);
// acknowledge the 10 pages
buffer.get(SECOND, 10, 10L).cancel(true);
assertQueueState(buffer, SECOND, 1, 10);
//
// tell shared buffer there will be no more queues
outputBuffers = outputBuffers.withNoMoreBufferIds();
buffer.setOutputBuffers(outputBuffers);
// queues consumed the first three pages, so they should be dropped now and the blocked page future from above should be done
assertQueueState(buffer, FIRST, 8, 3);
assertQueueState(buffer, SECOND, 1, 10);
assertFutureIsDone(future);
// we should be able to add 3 more pages (the third will be queued)
// although the first queue fetched the 4th page, the page has not been acknowledged yet
addPage(buffer, createPage(11));
addPage(buffer, createPage(12));
future = enqueuePage(buffer, createPage(13));
assertFalse(future.isDone());
assertQueueState(buffer, FIRST, 11, 3);
assertQueueState(buffer, SECOND, 4, 10);
// acknowledge the receipt of the 3rd page and try to remove the 4th page from the first queue
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 4, sizeOfPages(1), NO_WAIT), bufferResult(4, createPage(4)));
// the blocked page future above should be done
assertFutureIsDone(future);
assertQueueState(buffer, FIRST, 10, 4);
assertQueueState(buffer, SECOND, 4, 10);
//
// finish the buffer
assertFalse(buffer.isFinished());
buffer.setNoMorePages();
assertQueueState(buffer, FIRST, 10, 4);
assertQueueState(buffer, SECOND, 4, 10);
// not fully finished until all pages are consumed
assertFalse(buffer.isFinished());
// remove a page, not finished
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 5, sizeOfPages(1), NO_WAIT), bufferResult(5, createPage(5)));
assertQueueState(buffer, FIRST, 9, 5);
assertQueueState(buffer, SECOND, 4, 10);
assertFalse(buffer.isFinished());
// remove all remaining pages from first queue, should not be finished
BufferResult x = getBufferResult(buffer, FIRST, 6, sizeOfPages(10), NO_WAIT);
assertBufferResultEquals(TYPES, x, bufferResult(6, createPage(6),
createPage(7),
createPage(8),
createPage(9),
createPage(10),
createPage(11),
createPage(12),
createPage(13)));
assertQueueState(buffer, FIRST, 8, 6);
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 14, sizeOfPages(10), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 14, true));
// finish first queue
buffer.abort(FIRST);
assertQueueClosed(buffer, FIRST, 14);
assertQueueState(buffer, SECOND, 4, 10);
assertFalse(buffer.isFinished());
// remove all remaining pages from second queue, should be finished
assertBufferResultEquals(TYPES, getBufferResult(buffer, SECOND, 10, sizeOfPages(10), NO_WAIT), bufferResult(10, createPage(10),
createPage(11),
createPage(12),
createPage(13)));
assertQueueState(buffer, SECOND, 4, 10);
assertBufferResultEquals(TYPES, getBufferResult(buffer, SECOND, 14, sizeOfPages(10), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 14, true));
buffer.abort(SECOND);
assertQueueClosed(buffer, FIRST, 14);
assertQueueClosed(buffer, SECOND, 14);
assertFinished(buffer);
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 14, sizeOfPages(10), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 14, true));
assertBufferResultEquals(TYPES, getBufferResult(buffer, SECOND, 14, sizeOfPages(10), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 14, true));
}
// TODO: remove this after PR #7987 is landed
@Test
public void testAcknowledge()
{
OutputBuffers outputBuffers = createInitialEmptyOutputBuffers(BROADCAST);
BroadcastOutputBuffer buffer = createBroadcastBuffer(outputBuffers, sizeOfPages(10));
// add three items
for (int i = 0; i < 3; i++) {
addPage(buffer, createPage(i));
}
outputBuffers = createInitialEmptyOutputBuffers(BROADCAST).withBuffer(FIRST, BROADCAST_PARTITION_ID);
// add a queue
buffer.setOutputBuffers(outputBuffers);
assertQueueState(buffer, FIRST, 3, 0);
// get the three elements
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 0, sizeOfPages(10), NO_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
// acknowledge pages 0 and 1
acknowledgeBufferResult(buffer, FIRST, 2);
// only page 2 is not removed
assertQueueState(buffer, FIRST, 1, 2);
// acknowledge page 2
acknowledgeBufferResult(buffer, FIRST, 3);
// nothing left
assertQueueState(buffer, FIRST, 0, 3);
// acknowledge more pages will fail
try {
acknowledgeBufferResult(buffer, FIRST, 4);
}
catch (IllegalArgumentException e) {
assertEquals(e.getMessage(), "Invalid sequence id");
}
// fill the buffer
for (int i = 3; i < 6; i++) {
addPage(buffer, createPage(i));
}
assertQueueState(buffer, FIRST, 3, 3);
// getting new pages will again acknowledge the previously acknowledged pages but this is ok
buffer.get(FIRST, 3, 1L).cancel(true);
assertQueueState(buffer, FIRST, 3, 3);
}
@Test
public void testSharedBufferFull()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST), sizeOfPages(2));
// Add two pages, buffer is full
addPage(buffer, createPage(1));
addPage(buffer, createPage(2));
// third page is blocked
enqueuePage(buffer, createPage(3));
}
@Test
public void testDuplicateRequests()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(10));
// add three items
for (int i = 0; i < 3; i++) {
addPage(buffer, createPage(i));
}
// add a queue
assertQueueState(buffer, FIRST, 3, 0);
// get the three elements
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 0, sizeOfPages(10), NO_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
// pages not acknowledged yet so state is the same
assertQueueState(buffer, FIRST, 3, 0);
// get the three elements again
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 0, sizeOfPages(10), NO_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
// pages not acknowledged yet so state is the same
assertQueueState(buffer, FIRST, 3, 0);
// acknowledge the pages
buffer.get(FIRST, 3, 10L).cancel(true);
// attempt to get the three elements again
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 0, sizeOfPages(10), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, false));
// pages not acknowledged yet so state is the same
assertQueueState(buffer, FIRST, 0, 3);
}
@Test
public void testAddQueueAfterCreation()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(10));
assertFalse(buffer.isFinished());
try {
buffer.setOutputBuffers(createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withBuffer(SECOND, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds());
fail("Expected IllegalStateException from addQueue after noMoreQueues has been called");
}
catch (IllegalArgumentException ignored) {
}
}
@Test
public void testAddAfterFinish()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(10));
buffer.setNoMorePages();
addPage(buffer, createPage(0));
addPage(buffer, createPage(0));
assertEquals(buffer.getInfo().getTotalPagesSent(), 0);
}
@Test
public void testAddQueueAfterNoMoreQueues()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST), sizeOfPages(10));
assertFalse(buffer.isFinished());
// tell buffer no more queues will be added
buffer.setOutputBuffers(createInitialEmptyOutputBuffers(BROADCAST).withNoMoreBufferIds());
assertTrue(buffer.isFinished());
// set no more queues a second time to assure that we don't get an exception or such
buffer.setOutputBuffers(createInitialEmptyOutputBuffers(BROADCAST).withNoMoreBufferIds());
assertTrue(buffer.isFinished());
// set no more queues a third time to assure that we don't get an exception or such
buffer.setOutputBuffers(createInitialEmptyOutputBuffers(BROADCAST).withNoMoreBufferIds());
assertTrue(buffer.isFinished());
}
@Test
public void testAddAfterDestroy()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(10));
buffer.destroy();
addPage(buffer, createPage(0));
addPage(buffer, createPage(0));
assertEquals(buffer.getInfo().getTotalPagesSent(), 0);
}
@Test
public void testGetBeforeCreate()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST), sizeOfPages(10));
assertFalse(buffer.isFinished());
// get a page from a buffer that doesn't exist yet
ListenableFuture<BufferResult> future = buffer.get(FIRST, 0L, 1L);
assertFalse(future.isDone());
// add a page and verify the future is complete
addPage(buffer, createPage(33));
assertTrue(future.isDone());
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), bufferResult(0, createPage(33)));
}
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*does not contain.*\\[0]")
public void testSetFinalBuffersWithoutDeclaringUsedBuffer()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST), sizeOfPages(10));
assertFalse(buffer.isFinished());
// get a page from a buffer that doesn't exist yet
ListenableFuture<BufferResult> future = buffer.get(FIRST, 0L, 1L);
assertFalse(future.isDone());
// add a page and set no more pages
addPage(buffer, createPage(33));
buffer.setNoMorePages();
// read the page
assertTrue(future.isDone());
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), bufferResult(0, createPage(33)));
// acknowledge the page and verify we are finished
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 1, sizeOfPages(10), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 1, true));
buffer.abort(FIRST);
// set final buffers to a set that does not contain the buffer, which will fail
buffer.setOutputBuffers(createInitialEmptyOutputBuffers(BROADCAST).withNoMoreBufferIds());
}
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "No more buffers already set")
public void testUseUndeclaredBufferAfterFinalBuffersSet()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(10));
assertFalse(buffer.isFinished());
// get a page from a buffer that was not declared, which will fail
buffer.get(SECOND, 0L, 1L);
}
@Test
public void testAbortBeforeCreate()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST), sizeOfPages(2));
assertFalse(buffer.isFinished());
// get a page from a buffer that doesn't exist yet
ListenableFuture<BufferResult> future = buffer.get(FIRST, 0, 1L);
assertFalse(future.isDone());
// abort that buffer, and verify the future is complete and buffer is finished
buffer.abort(FIRST);
assertTrue(future.isDone());
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 0, sizeOfPages(10), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, true));
}
@Test
public void testFullBufferBlocksWriter()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withBuffer(SECOND, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(2));
// Add two pages, buffer is full
addPage(buffer, createPage(1));
addPage(buffer, createPage(2));
// third page is blocked
enqueuePage(buffer, createPage(3));
}
@Test
public void testAcknowledgementFreesWriters()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withBuffer(SECOND, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(2));
// Add two pages, buffer is full
addPage(buffer, createPage(1));
addPage(buffer, createPage(2));
assertQueueState(buffer, FIRST, 2, 0);
// third page is blocked
ListenableFuture<?> future = enqueuePage(buffer, createPage(3));
// we should be blocked
assertFalse(future.isDone());
assertQueueState(buffer, FIRST, 3, 0);
assertQueueState(buffer, SECOND, 3, 0);
// acknowledge pages for first buffer, no space is freed
buffer.get(FIRST, 2, 10L).cancel(true);
assertFalse(future.isDone());
// acknowledge pages for second buffer, which makes space in the buffer
buffer.get(SECOND, 2, 10L).cancel(true);
// writer should not be blocked
assertFutureIsDone(future);
assertQueueState(buffer, SECOND, 1, 2);
}
@Test
public void testAbort()
{
BroadcastOutputBuffer bufferedBuffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withBuffer(SECOND, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(10));
// fill the buffer
for (int i = 0; i < 10; i++) {
addPage(bufferedBuffer, createPage(i));
}
bufferedBuffer.setNoMorePages();
assertBufferResultEquals(TYPES, getBufferResult(bufferedBuffer, FIRST, 0, sizeOfPages(1), NO_WAIT), bufferResult(0, createPage(0)));
bufferedBuffer.abort(FIRST);
assertQueueClosed(bufferedBuffer, FIRST, 0);
assertBufferResultEquals(TYPES, getBufferResult(bufferedBuffer, FIRST, 1, sizeOfPages(1), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, true));
assertBufferResultEquals(TYPES, getBufferResult(bufferedBuffer, SECOND, 0, sizeOfPages(1), NO_WAIT), bufferResult(0, createPage(0)));
bufferedBuffer.abort(SECOND);
assertQueueClosed(bufferedBuffer, SECOND, 0);
assertFinished(bufferedBuffer);
assertBufferResultEquals(TYPES, getBufferResult(bufferedBuffer, SECOND, 1, sizeOfPages(1), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, true));
}
@Test
public void testFinishClosesEmptyQueues()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withBuffer(SECOND, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(10));
// finish while queues are empty
buffer.setNoMorePages();
assertQueueState(buffer, FIRST, 0, 0);
assertQueueState(buffer, SECOND, 0, 0);
buffer.abort(FIRST);
buffer.abort(SECOND);
assertQueueClosed(buffer, FIRST, 0);
assertQueueClosed(buffer, SECOND, 0);
}
@Test
public void testAbortFreesReader()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withBuffer(SECOND, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(5));
assertFalse(buffer.isFinished());
// attempt to get a page
ListenableFuture<BufferResult> future = buffer.get(FIRST, 0, 10L);
// verify we are waiting for a page
assertFalse(future.isDone());
// add one item
addPage(buffer, createPage(0));
assertTrue(future.isDone());
// verify we got one page
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), bufferResult(0, createPage(0)));
// attempt to get another page, and verify we are blocked
future = buffer.get(FIRST, 1, 10L);
assertFalse(future.isDone());
// abort the buffer
buffer.abort(FIRST);
// verify the future completed
// broadcast buffer does not return a "complete" result in this case, but it doesn't mapper
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), emptyResults(TASK_INSTANCE_ID, 1, false));
// further requests will see a completed result
assertQueueClosed(buffer, FIRST, 1);
}
@Test
public void testFinishFreesReader()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(5));
assertFalse(buffer.isFinished());
// attempt to get a page
ListenableFuture<BufferResult> future = buffer.get(FIRST, 0, 10L);
// verify we are waiting for a page
assertFalse(future.isDone());
// add one item
addPage(buffer, createPage(0));
// verify we got one page
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), bufferResult(0, createPage(0)));
// attempt to get another page, and verify we are blocked
future = buffer.get(FIRST, 1, 10L);
assertFalse(future.isDone());
// finish the buffer
buffer.setNoMorePages();
assertQueueState(buffer, FIRST, 0, 1);
// verify the future completed
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), emptyResults(TASK_INSTANCE_ID, 1, true));
}
@Test
public void testFinishFreesWriter()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(5));
assertFalse(buffer.isFinished());
// fill the buffer
for (int i = 0; i < 5; i++) {
addPage(buffer, createPage(i));
}
// enqueue the addition two pages more pages
ListenableFuture<?> firstEnqueuePage = enqueuePage(buffer, createPage(5));
ListenableFuture<?> secondEnqueuePage = enqueuePage(buffer, createPage(6));
// get and acknowledge one page
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 0, sizeOfPages(1), MAX_WAIT), bufferResult(0, createPage(0)));
buffer.get(FIRST, 1, 1L).cancel(true);
// verify we are still blocked because the buffer is full
assertFalse(firstEnqueuePage.isDone());
assertFalse(secondEnqueuePage.isDone());
// finish the query
buffer.setNoMorePages();
assertFalse(buffer.isFinished());
// verify futures are complete
assertFutureIsDone(firstEnqueuePage);
assertFutureIsDone(secondEnqueuePage);
// get and acknowledge the last 6 pages
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 1, sizeOfPages(100), NO_WAIT),
bufferResult(1, createPage(1), createPage(2), createPage(3), createPage(4), createPage(5), createPage(6)));
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 7, sizeOfPages(100), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 7, true));
buffer.abort(FIRST);
// verify finished
assertFinished(buffer);
}
@Test
public void testDestroyFreesReader()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(5));
assertFalse(buffer.isFinished());
// attempt to get a page
ListenableFuture<BufferResult> future = buffer.get(FIRST, 0, 10L);
// verify we are waiting for a page
assertFalse(future.isDone());
// add one page
addPage(buffer, createPage(0));
// verify we got one page
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), bufferResult(0, createPage(0)));
// attempt to get another page, and verify we are blocked
future = buffer.get(FIRST, 1, 10L);
assertFalse(future.isDone());
// destroy the buffer
buffer.destroy();
assertQueueClosed(buffer, FIRST, 1);
// verify the future completed
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), emptyResults(TASK_INSTANCE_ID, 1, false));
}
@Test
public void testDestroyFreesWriter()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(5));
assertFalse(buffer.isFinished());
// fill the buffer
for (int i = 0; i < 5; i++) {
addPage(buffer, createPage(i));
}
// add two pages to the buffer queue
ListenableFuture<?> firstEnqueuePage = enqueuePage(buffer, createPage(5));
ListenableFuture<?> secondEnqueuePage = enqueuePage(buffer, createPage(6));
// get and acknowledge one page
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 0, sizeOfPages(1), MAX_WAIT), bufferResult(0, createPage(0)));
buffer.get(FIRST, 1, 1L).cancel(true);
// verify we are still blocked because the buffer is full
assertFalse(firstEnqueuePage.isDone());
assertFalse(secondEnqueuePage.isDone());
// destroy the buffer (i.e., cancel the query)
buffer.destroy();
assertFinished(buffer);
// verify the futures are completed
assertFutureIsDone(firstEnqueuePage);
assertFutureIsDone(secondEnqueuePage);
}
@Test
public void testFailDoesNotFreeReader()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(5));
assertFalse(buffer.isFinished());
// attempt to get a page
ListenableFuture<BufferResult> future = buffer.get(FIRST, 0, 10L);
// verify we are waiting for a page
assertFalse(future.isDone());
// add one page
addPage(buffer, createPage(0));
// verify we got one page
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), bufferResult(0, createPage(0)));
// attempt to get another page, and verify we are blocked
future = buffer.get(FIRST, 1, 10L);
assertFalse(future.isDone());
// fail the buffer
buffer.fail();
// future should have not finished
assertFalse(future.isDone());
// attempt to get another page, and verify we are blocked
future = buffer.get(FIRST, 1, 10L);
assertFalse(future.isDone());
}
@Test
public void testFailFreesWriter()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(5));
assertFalse(buffer.isFinished());
// fill the buffer
for (int i = 0; i < 5; i++) {
addPage(buffer, createPage(i));
}
// add two pages to the buffer queue
ListenableFuture<?> firstEnqueuePage = enqueuePage(buffer, createPage(5));
ListenableFuture<?> secondEnqueuePage = enqueuePage(buffer, createPage(6));
// get and acknowledge one page
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 0, sizeOfPages(1), MAX_WAIT), bufferResult(0, createPage(0)));
buffer.get(FIRST, 1, 1L).cancel(true);
// verify we are still blocked because the buffer is full
assertFalse(firstEnqueuePage.isDone());
assertFalse(secondEnqueuePage.isDone());
// fail the buffer (i.e., cancel the query)
buffer.fail();
assertFalse(buffer.isFinished());
// verify the futures are completed
assertFutureIsDone(firstEnqueuePage);
assertFutureIsDone(secondEnqueuePage);
}
@Test
public void testAddBufferAfterFail()
{
OutputBuffers outputBuffers = createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID);
BroadcastOutputBuffer buffer = createBroadcastBuffer(outputBuffers, sizeOfPages(5));
assertFalse(buffer.isFinished());
// attempt to get a page
ListenableFuture<BufferResult> future = buffer.get(FIRST, 0, 10L);
// verify we are waiting for a page
assertFalse(future.isDone());
// add one page
addPage(buffer, createPage(0));
// verify we got one page
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), bufferResult(0, createPage(0)));
// fail the buffer
buffer.fail();
// add a buffer
outputBuffers = outputBuffers.withBuffer(SECOND, BROADCAST_PARTITION_ID);
buffer.setOutputBuffers(outputBuffers);
// attempt to get page, and verify we are blocked
future = buffer.get(FIRST, 1, 10L);
assertFalse(future.isDone());
future = buffer.get(SECOND, 0, 10L);
assertFalse(future.isDone());
// set no more buffers
outputBuffers = outputBuffers.withNoMoreBufferIds();
buffer.setOutputBuffers(outputBuffers);
// attempt to get page, and verify we are blocked
future = buffer.get(FIRST, 1, 10L);
assertFalse(future.isDone());
future = buffer.get(SECOND, 0, 10L);
assertFalse(future.isDone());
}
@Test
public void testBufferCompletion()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(5));
assertFalse(buffer.isFinished());
// fill the buffer
List<Page> pages = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Page page = createPage(i);
addPage(buffer, page);
pages.add(page);
}
buffer.setNoMorePages();
// get and acknowledge 5 pages
assertBufferResultEquals(TYPES, getBufferResult(buffer, FIRST, 0, sizeOfPages(5), MAX_WAIT), createBufferResult(TASK_INSTANCE_ID, 0, pages));
// buffer is not finished
assertFalse(buffer.isFinished());
// there are no more pages and no more buffers, but buffer is not finished because it didn't receive an acknowledgement yet
assertFalse(buffer.isFinished());
// ask the buffer to finish
buffer.abort(FIRST);
// verify that the buffer is finished
assertTrue(buffer.isFinished());
}
@Test
public void testSharedBufferBlocking()
{
SettableFuture<?> blockedFuture = SettableFuture.create();
MockMemoryReservationHandler reservationHandler = new MockMemoryReservationHandler(blockedFuture);
AggregatedMemoryContext memoryContext = newRootAggregatedMemoryContext(reservationHandler, 0L);
Page page = createPage(1);
long pageSize = PAGES_SERDE.serialize(page).getRetainedSizeInBytes();
// create a buffer that can only hold two pages
BroadcastOutputBuffer buffer = createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST), new DataSize(pageSize * 2, BYTE), memoryContext, directExecutor());
OutputBufferMemoryManager memoryManager = buffer.getMemoryManager();
// adding the first page will block as no memory is available (MockMemoryReservationHandler will return a future that is not done)
enqueuePage(buffer, page);
// more memory is available
blockedFuture.set(null);
memoryManager.onMemoryAvailable();
assertTrue(memoryManager.getBufferBlockedFuture().isDone(), "buffer shouldn't be blocked");
// we should be able to add one more page after more memory is available
addPage(buffer, page);
// the buffer is full now
enqueuePage(buffer, page);
}
@Test
public void testSharedBufferBlocking2()
{
// start with a complete future
SettableFuture<?> blockedFuture = SettableFuture.create();
blockedFuture.set(null);
MockMemoryReservationHandler reservationHandler = new MockMemoryReservationHandler(blockedFuture);
AggregatedMemoryContext memoryContext = newRootAggregatedMemoryContext(reservationHandler, 0L);
Page page = createPage(1);
long pageSize = PAGES_SERDE.serialize(page).getRetainedSizeInBytes();
// create a buffer that can only hold two pages
BroadcastOutputBuffer buffer = createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST), new DataSize(pageSize * 2, BYTE), memoryContext, directExecutor());
OutputBufferMemoryManager memoryManager = buffer.getMemoryManager();
// add two pages to fill up the buffer (memory is available)
addPage(buffer, page);
addPage(buffer, page);
// fill up the memory pool
blockedFuture = SettableFuture.create();
reservationHandler.updateBlockedFuture(blockedFuture);
// allocate one more byte to make the buffer full
memoryManager.updateMemoryUsage(1L);
// more memory is available
blockedFuture.set(null);
memoryManager.onMemoryAvailable();
// memoryManager should still return a blocked future as the buffer is still full
assertFalse(memoryManager.getBufferBlockedFuture().isDone(), "buffer should be blocked");
// remove all pages from the memory manager and the 1 byte that we added above
memoryManager.updateMemoryUsage(-pageSize * 2 - 1);
// now we have both buffer space and memory available, so memoryManager shouldn't be blocked
assertTrue(memoryManager.getBufferBlockedFuture().isDone(), "buffer shouldn't be blocked");
// we should be able to add two pages after more memory is available
addPage(buffer, page);
addPage(buffer, page);
// the buffer is full now
enqueuePage(buffer, page);
}
@Test
public void testSharedBufferBlockingNoBlockOnFull()
{
SettableFuture<?> blockedFuture = SettableFuture.create();
MockMemoryReservationHandler reservationHandler = new MockMemoryReservationHandler(blockedFuture);
AggregatedMemoryContext memoryContext = newRootAggregatedMemoryContext(reservationHandler, 0L);
Page page = createPage(1);
long pageSize = PAGES_SERDE.serialize(page).getRetainedSizeInBytes();
// create a buffer that can only hold two pages
BroadcastOutputBuffer buffer = createBroadcastBuffer(createInitialEmptyOutputBuffers(BROADCAST), new DataSize(pageSize * 2, BYTE), memoryContext, directExecutor());
OutputBufferMemoryManager memoryManager = buffer.getMemoryManager();
memoryManager.setNoBlockOnFull();
// even if setNoBlockOnFull() is called the buffer should block on memory when we add the first page
// as no memory is available (MockMemoryReservationHandler will return a future that is not done)
enqueuePage(buffer, page);
// more memory is available
blockedFuture.set(null);
memoryManager.onMemoryAvailable();
assertTrue(memoryManager.getBufferBlockedFuture().isDone(), "buffer shouldn't be blocked");
// we should be able to add one more page after more memory is available
addPage(buffer, page);
// the buffer is full now, but setNoBlockOnFull() is called so the buffer shouldn't block
addPage(buffer, page);
}
private static class MockMemoryReservationHandler
implements MemoryReservationHandler
{
private ListenableFuture<?> blockedFuture;
public MockMemoryReservationHandler(ListenableFuture<?> blockedFuture)
{
this.blockedFuture = requireNonNull(blockedFuture, "blockedFuture is null");
}
@Override
public ListenableFuture<?> reserveMemory(String allocationTag, long delta, boolean enforceBroadcastMemoryLimit)
{
return blockedFuture;
}
@Override
public boolean tryReserveMemory(String allocationTag, long delta, boolean enforceBroadcastMemoryLimit)
{
return true;
}
public void updateBlockedFuture(ListenableFuture<?> blockedFuture)
{
this.blockedFuture = requireNonNull(blockedFuture);
}
}
private BroadcastOutputBuffer createBroadcastBuffer(OutputBuffers outputBuffers, DataSize dataSize, AggregatedMemoryContext memoryContext, Executor notificationExecutor)
{
BroadcastOutputBuffer buffer = new BroadcastOutputBuffer(
TASK_INSTANCE_ID,
new StateMachine<>("bufferState", stateNotificationExecutor, OPEN, TERMINAL_BUFFER_STATES),
dataSize.toBytes(),
() -> memoryContext.newLocalMemoryContext("test"),
notificationExecutor);
buffer.setOutputBuffers(outputBuffers);
buffer.registerLifespanCompletionCallback(ignore -> {});
return buffer;
}
@Test
public void testBufferFinishesWhenClientBuffersDestroyed()
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withBuffer(SECOND, BROADCAST_PARTITION_ID)
.withBuffer(THIRD, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(5));
// add pages before closing the buffers to make sure
// that the buffers close even if there are pending pages
for (int i = 0; i < 5; i++) {
addPage(buffer, createPage(i));
}
// the buffer is in the NO_MORE_BUFFERS state now
// and if we abort all the buffers it should destroy itself
// and move to the FINISHED state
buffer.abort(FIRST);
assertFalse(buffer.isFinished());
buffer.abort(SECOND);
assertFalse(buffer.isFinished());
buffer.abort(THIRD);
assertTrue(buffer.isFinished());
}
@Test
public void testForceFreeMemory()
throws Throwable
{
BroadcastOutputBuffer buffer = createBroadcastBuffer(
createInitialEmptyOutputBuffers(BROADCAST)
.withBuffer(FIRST, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds(),
sizeOfPages(5));
for (int i = 0; i < 3; i++) {
addPage(buffer, createPage(1), 0);
}
OutputBufferMemoryManager memoryManager = buffer.getMemoryManager();
assertTrue(memoryManager.getBufferedBytes() > 0);
buffer.forceFreeMemory();
assertEquals(memoryManager.getBufferedBytes(), 0);
// adding a page after forceFreeMemory() should be NOOP
addPage(buffer, createPage(1));
assertEquals(memoryManager.getBufferedBytes(), 0);
}
private BroadcastOutputBuffer createBroadcastBuffer(OutputBuffers outputBuffers, DataSize dataSize)
{
BroadcastOutputBuffer buffer = new BroadcastOutputBuffer(
TASK_INSTANCE_ID,
new StateMachine<>("bufferState", stateNotificationExecutor, OPEN, TERMINAL_BUFFER_STATES),
dataSize.toBytes(),
() -> new SimpleLocalMemoryContext(newSimpleAggregatedMemoryContext(), "test"),
stateNotificationExecutor);
buffer.setOutputBuffers(outputBuffers);
buffer.registerLifespanCompletionCallback(ignore -> {});
return buffer;
}
private static BufferResult bufferResult(long token, Page firstPage, Page... otherPages)
{
List<Page> pages = ImmutableList.<Page>builder().add(firstPage).add(otherPages).build();
return createBufferResult(TASK_INSTANCE_ID, token, pages);
}
}