TestSpoolingOutputBuffer.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.buffer;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.execution.QueryIdGenerator;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.execution.buffer.BufferResult.emptyResults;
import static com.facebook.presto.execution.buffer.BufferState.OPEN;
import static com.facebook.presto.execution.buffer.BufferState.TERMINAL_BUFFER_STATES;
import static com.facebook.presto.execution.buffer.BufferTestUtils.MAX_WAIT;
import static com.facebook.presto.execution.buffer.BufferTestUtils.NO_WAIT;
import static com.facebook.presto.execution.buffer.BufferTestUtils.acknowledgeBufferResult;
import static com.facebook.presto.execution.buffer.BufferTestUtils.addPage;
import static com.facebook.presto.execution.buffer.BufferTestUtils.addPages;
import static com.facebook.presto.execution.buffer.BufferTestUtils.assertBufferResultEquals;
import static com.facebook.presto.execution.buffer.BufferTestUtils.createBufferResult;
import static com.facebook.presto.execution.buffer.BufferTestUtils.createPage;
import static com.facebook.presto.execution.buffer.BufferTestUtils.getBufferResult;
import static com.facebook.presto.execution.buffer.BufferTestUtils.getFuture;
import static com.facebook.presto.execution.buffer.BufferTestUtils.sizeOfPages;
import static com.facebook.presto.execution.buffer.OutputBuffers.BufferType.PARTITIONED;
import static com.facebook.presto.execution.buffer.OutputBuffers.BufferType.SPOOLING;
import static io.airlift.units.DataSize.Unit.BYTE;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
public class TestSpoolingOutputBuffer
{
private static final String TASK_INSTANCE_ID = "task-instance-id";
private static final DataSize THRESHOLD = sizeOfPages(3);
private static final List<BigintType> TYPES = ImmutableList.of(BIGINT);
private static final OutputBufferId BUFFER_ID = new OutputBufferId(0);
private static final OutputBufferId INVALID_BUFFER_ID = new OutputBufferId(1);
private static final OutputBuffers OUTPUT_BUFFERS = OutputBuffers.createSpoolingOutputBuffers();
private static final QueryIdGenerator queryIdGenerator = new QueryIdGenerator();
private static SpoolingOutputBufferFactory spoolingOutputBufferFactory;
private ScheduledExecutorService stateNotificationExecutor;
@BeforeClass
public void setUp()
{
stateNotificationExecutor = newScheduledThreadPool(5, daemonThreadsNamed("test-%s"));
FeaturesConfig featuresConfig = new FeaturesConfig();
featuresConfig.setSpoolingOutputBufferThreshold(THRESHOLD);
spoolingOutputBufferFactory = new SpoolingOutputBufferFactory(featuresConfig);
}
@AfterClass(alwaysRun = true)
public void tearDown()
throws IOException
{
if (stateNotificationExecutor != null) {
stateNotificationExecutor.shutdownNow();
stateNotificationExecutor = null;
}
spoolingOutputBufferFactory.shutdown();
}
@Test
public void testSimpleInMemory()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
// add three pages
for (int i = 0; i < 2; i++) {
addPage(buffer, createPage(i));
}
compareTotalBuffered(buffer, 2);
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(1), MAX_WAIT), bufferResult(0, createPage(0)));
compareTotalBuffered(buffer, 2);
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 1, sizeOfPages(1), MAX_WAIT), bufferResult(1, createPage(1)));
compareTotalBuffered(buffer, 1);
buffer.setNoMorePages();
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 2, sizeOfPages(1), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 2, true));
compareTotalBuffered(buffer, 0);
}
@Test
public void testSimple()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
List<Page> pages = new LinkedList<>();
// add five pages to storage
for (int i = 0; i < 5; i++) {
pages.add(createPage(i));
}
addPages(buffer, pages);
compareTotalBuffered(buffer, 5);
// get one page
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(1), MAX_WAIT), bufferResult(0, createPage(0)));
compareTotalBuffered(buffer, 5);
// get two more pages
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 1, sizeOfPages(2), MAX_WAIT), bufferResult(1, createPage(1), createPage(2)));
compareTotalBuffered(buffer, 5);
// add three more pages to storage
pages = new LinkedList<>();
for (int i = 5; i < 8; i++) {
pages.add(createPage(i));
}
addPages(buffer, pages);
compareTotalBuffered(buffer, 8);
// get three pages from both first and second file
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 3, sizeOfPages(3), MAX_WAIT), bufferResult(3, createPage(3), createPage(4), createPage(5)));
compareTotalBuffered(buffer, 8);
// get the last three pages
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 6, sizeOfPages(2), MAX_WAIT), bufferResult(6, createPage(6), createPage(7)));
// first file removed, but second file remains
compareTotalBuffered(buffer, 3);
// add one page in memory
addPage(buffer, createPage(8));
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 8, sizeOfPages(2), MAX_WAIT), bufferResult(8, createPage(8)));
// second file removed from storage
compareTotalBuffered(buffer, 1);
ListenableFuture<BufferResult> pendingRead = buffer.get(BUFFER_ID, 9, sizeOfPages(1).toBytes());
assertFalse(pendingRead.isDone());
// in memory page removed
compareTotalBuffered(buffer, 0);
buffer.setNoMorePages();
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 9, sizeOfPages(1), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 9, true));
assertEquals(buffer.getInfo().getTotalPagesSent(), 9);
}
@Test
void testUnevenMaxSize()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
List<Page> pages = new LinkedList<>();
// add five pages to storage
for (int i = 0; i < 5; i++) {
pages.add(createPage(i));
}
addPages(buffer, pages);
// add three pages to memory
for (int i = 5; i < 8; i++) {
addPage(buffer, createPage(i));
}
DataSize unevenMaxSize = new DataSize(sizeOfPages(3).toBytes() + 5, BYTE);
// should not try to read the in memory pages
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, unevenMaxSize, MAX_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
compareTotalBuffered(buffer, 8);
}
@Test
void testGetOutOfOrder()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
List<Page> pages = new LinkedList<>();
// add five pages to storage
for (int i = 0; i < 5; i++) {
pages.add(createPage(i));
}
addPages(buffer, pages);
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(3), MAX_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
compareTotalBuffered(buffer, 5);
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 3, sizeOfPages(3), MAX_WAIT), bufferResult(3, createPage(3), createPage(4)));
compareTotalBuffered(buffer, 5);
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 2, sizeOfPages(3), MAX_WAIT), emptyResults(TASK_INSTANCE_ID, 2, false));
compareTotalBuffered(buffer, 5);
}
@Test
public void testSimplePendingRead()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
// attempt to get a page
ListenableFuture<BufferResult> future = buffer.get(BUFFER_ID, 0, sizeOfPages(2).toBytes());
assertFalse(future.isDone());
// add three pages
List<Page> pages = new LinkedList<>();
for (int i = 0; i < 3; i++) {
pages.add(createPage(i));
}
addPages(buffer, pages);
// pending read should have two pages added
assertBufferResultEquals(TYPES, getFuture(future, MAX_WAIT), bufferResult(0, createPage(0), createPage(1)));
// checks we can still read first three pages
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(3), MAX_WAIT), createBufferResult(TASK_INSTANCE_ID, 0, pages));
acknowledgeBufferResult(buffer, BUFFER_ID, 2);
compareTotalBuffered(buffer, 3);
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 2, sizeOfPages(3), MAX_WAIT), bufferResult(2, createPage(2)));
// file should be removed after acknowledging all three pages
acknowledgeBufferResult(buffer, BUFFER_ID, 3);
compareTotalBuffered(buffer, 0);
// attempt to read, but nothing can be read
future = buffer.get(BUFFER_ID, 3, sizeOfPages(3).toBytes());
assertFalse(future.isDone());
}
@Test
public void testMultiplePendingReads()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
// attempt to get a page
ListenableFuture<BufferResult> oldPendingRead = buffer.get(BUFFER_ID, 0, sizeOfPages(3).toBytes());
assertFalse(oldPendingRead.isDone());
ListenableFuture<BufferResult> newPendingRead = buffer.get(BUFFER_ID, 0, sizeOfPages(3).toBytes());
assertFalse(newPendingRead.isDone());
assertTrue(oldPendingRead.isDone());
// add three pages
List<Page> pages = new LinkedList<>();
for (int i = 0; i < 3; i++) {
pages.add(createPage(i));
}
addPages(buffer, pages);
assertBufferResultEquals(TYPES, getFuture(oldPendingRead, MAX_WAIT), emptyResults(TASK_INSTANCE_ID, 0, false));
assertBufferResultEquals(TYPES, getFuture(newPendingRead, MAX_WAIT), createBufferResult(TASK_INSTANCE_ID, 0, pages));
}
@Test
public void testAddAfterPendingRead()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
ListenableFuture<BufferResult> pendingRead = buffer.get(BUFFER_ID, 0, sizeOfPages(5).toBytes());
assertFalse(pendingRead.isDone());
List<Page> pages = new LinkedList<>();
// add five pages to storage
for (int i = 0; i < 5; i++) {
pages.add(createPage(i));
}
addPages(buffer, pages);
compareTotalBuffered(buffer, 5);
assertBufferResultEquals(TYPES, getFuture(pendingRead, MAX_WAIT), createBufferResult(TASK_INSTANCE_ID, 0, pages));
compareTotalBuffered(buffer, 5);
acknowledgeBufferResult(buffer, BUFFER_ID, 5);
buffer.setNoMorePages();
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 5, sizeOfPages(1), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 5, true));
}
@Test
public void testNoMorePagesAfterPendingRead()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
ListenableFuture<BufferResult> pendingRead = buffer.get(BUFFER_ID, 0, sizeOfPages(5).toBytes());
assertFalse(pendingRead.isDone());
buffer.setNoMorePages();
assertBufferResultEquals(TYPES, getFuture(pendingRead, NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, true));
}
@Test
public void testDestroyAfterPendingRead()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
ListenableFuture<BufferResult> pendingRead = buffer.get(BUFFER_ID, 0, sizeOfPages(5).toBytes());
assertFalse(pendingRead.isDone());
buffer.destroy();
assertTrue(pendingRead.isDone());
assertBufferResultEquals(TYPES, getFuture(pendingRead, NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, false));
}
@Test
public void testAcknowledgeSimple()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
for (int i = 0; i < 3; i++) {
addPage(buffer, createPage(i));
}
// get the three elements from the first buffer
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(10), MAX_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
// acknowledge pages 0 and 1
acknowledgeBufferResult(buffer, BUFFER_ID, 2);
compareTotalBuffered(buffer, 3);
// acknowledge page 2
acknowledgeBufferResult(buffer, BUFFER_ID, 3);
compareTotalBuffered(buffer, 0);
// acknowledge more pages will fail
try {
acknowledgeBufferResult(buffer, BUFFER_ID, 4);
}
catch (IllegalArgumentException e) {
assertEquals(e.getMessage(), "Invalid sequenceId");
}
compareTotalBuffered(buffer, 0);
// fill the buffer
for (int i = 3; i < 6; i++) {
addPage(buffer, createPage(i));
}
// acknowledge previously acknowledged pages does nothing
acknowledgeBufferResult(buffer, BUFFER_ID, 3);
compareTotalBuffered(buffer, 3);
}
@Test
public void testAcknowledgeStorageAndMemory()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
// add three pages to file
for (int i = 0; i < 3; i++) {
addPage(buffer, createPage(i));
}
// add two pages in memory
for (int i = 3; i < 5; i++) {
addPage(buffer, createPage(i));
}
// remove file and one page from memory
acknowledgeBufferResult(buffer, BUFFER_ID, 4);
compareTotalBuffered(buffer, 1);
}
@Test
public void testDuplicateGet()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
// add three pages
for (int i = 0; i < 3; i++) {
addPage(buffer, createPage(i));
}
// get the three elements
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(10), MAX_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
compareTotalBuffered(buffer, 3);
// get the same three elements
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(10), MAX_WAIT), bufferResult(0, createPage(0), createPage(1), createPage(2)));
compareTotalBuffered(buffer, 3);
// acknowledge the 3 pages
acknowledgeBufferResult(buffer, BUFFER_ID, 3);
compareTotalBuffered(buffer, 0);
// attempt to get the three elements again
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(10), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, false));
compareTotalBuffered(buffer, 0);
}
@Test
public void testAddAfterNoMorePages()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
for (int i = 0; i < 2; i++) {
addPage(buffer, createPage(i));
}
compareTotalBuffered(buffer, 2);
buffer.setNoMorePages();
// should not be added
addPage(buffer, createPage(2));
compareTotalBuffered(buffer, 2);
// get the two pages added
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(3), MAX_WAIT), bufferResult(0, createPage(0), createPage(1)));
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 2, sizeOfPages(3), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 2, true));
}
@Test
public void testAddAfterDestroy()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
for (int i = 0; i < 2; i++) {
addPage(buffer, createPage(i));
}
compareTotalBuffered(buffer, 2);
buffer.destroy();
// nothing in buffer
compareTotalBuffered(buffer, 0);
// should not be added
addPage(buffer, createPage(2));
compareTotalBuffered(buffer, 0);
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(3), NO_WAIT), emptyResults(TASK_INSTANCE_ID, 0, true));
}
@Test
public void testAbort()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
// add three pages into a file
for (int i = 0; i < 3; i++) {
addPage(buffer, createPage(i));
}
// add two page in memory
for (int i = 3; i < 5; i++) {
addPage(buffer, createPage(i));
}
try {
buffer.abort(INVALID_BUFFER_ID);
}
catch (IllegalArgumentException e) {
assertEquals(e.getMessage(), "Invalid bufferId");
}
compareTotalBuffered(buffer, 5);
buffer.abort(BUFFER_ID);
compareTotalBuffered(buffer, 0);
}
@Test
public void testSetOutputBuffers()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
OutputBuffers newBuffers = new OutputBuffers(SPOOLING, 1, true, ImmutableMap.of());
buffer.setOutputBuffers(newBuffers);
OutputBuffers invalidBuffers = new OutputBuffers(PARTITIONED, 1, true, ImmutableMap.of());
try {
buffer.setOutputBuffers(invalidBuffers);
}
catch (IllegalArgumentException e) {
assertEquals(e.getMessage(), "Invalid output buffers type");
}
}
@Test
public void testBufferCompletion()
{
SpoolingOutputBuffer buffer = createSpoolingOutputBuffer();
assertFalse(buffer.isFinished());
// add three pages into a file
List<Page> pages = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Page page = createPage(i);
addPage(buffer, page);
pages.add(createPage(i));
}
// add two page in memory
for (int i = 3; i < 5; i++) {
Page page = createPage(i);
addPage(buffer, page);
pages.add(createPage(i));
}
compareTotalBuffered(buffer, 5);
buffer.setNoMorePages();
// get and acknowledge 5 pages
assertBufferResultEquals(TYPES, getBufferResult(buffer, BUFFER_ID, 0, sizeOfPages(5), MAX_WAIT), createBufferResult(TASK_INSTANCE_ID, 0, pages));
// there are no more pages and no more buffers, but buffer is not finished because it didn't receive an acknowledgement yet
assertFalse(buffer.isFinished());
// ask the buffer to finish
buffer.destroy();
// verify that the buffer is finished
assertTrue(buffer.isFinished());
}
private SpoolingOutputBuffer createSpoolingOutputBuffer()
{
TaskId taskId = new TaskId(queryIdGenerator.createNextQueryId().toString(), 0, 0, 0, 0);
return spoolingOutputBufferFactory.createSpoolingOutputBuffer(
taskId,
TASK_INSTANCE_ID,
OUTPUT_BUFFERS,
new StateMachine<>("bufferState", stateNotificationExecutor, OPEN, TERMINAL_BUFFER_STATES));
}
private static BufferResult bufferResult(long token, Page firstPage, Page... otherPages)
{
List<Page> pages = ImmutableList.<Page>builder().add(firstPage).add(otherPages).build();
return createBufferResult(TASK_INSTANCE_ID, token, pages);
}
private void compareTotalBuffered(OutputBuffer buffer, int expectedBufferedPages)
{
assertEquals(buffer.getInfo().getTotalBufferedBytes(), (int) sizeOfPages(expectedBufferedPages).getValue());
assertEquals(buffer.getInfo().getTotalBufferedPages(), expectedBufferedPages);
}
}