TestMergingPageOutput.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.operator.project;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.Type;
import com.google.common.collect.ImmutableList;
import org.testng.annotations.Test;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;

import static com.facebook.presto.SequencePageBuilder.createSequencePage;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.execution.buffer.PageSplitterUtil.splitPage;
import static com.facebook.presto.operator.PageAssertions.assertPageEquals;
import static com.google.common.collect.Iterators.transform;
import static java.lang.Math.toIntExact;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;

public class TestMergingPageOutput
{
    private static final List<Type> TYPES = ImmutableList.of(BIGINT, REAL, DOUBLE);

    @Test
    public void testMinPageSizeThreshold()
    {
        Page page = createSequencePage(TYPES, 10);

        MergingPageOutput output = new MergingPageOutput(TYPES, page.getSizeInBytes(), PageProcessor.MAX_BATCH_SIZE, Integer.MAX_VALUE);

        assertTrue(output.needsInput());
        assertNull(output.getOutput());

        output.addInput(createPagesIterator(page));
        assertFalse(output.needsInput());
        assertSame(output.getOutput(), page);
    }

    @Test
    public void testMinRowCountThreshold()
    {
        Page page = createSequencePage(TYPES, 10);

        MergingPageOutput output = new MergingPageOutput(TYPES, 1024 * 1024, page.getPositionCount(), Integer.MAX_VALUE);

        assertTrue(output.needsInput());
        assertNull(output.getOutput());

        output.addInput(createPagesIterator(page));
        assertFalse(output.needsInput());
        assertSame(output.getOutput(), page);
    }

    @Test
    public void testBufferSmallPages()
    {
        int singlePageRowCount = 10;
        Page page = createSequencePage(TYPES, singlePageRowCount * 2);
        List<Page> splits = splitPage(page, page.getSizeInBytes() / 2);

        MergingPageOutput output = new MergingPageOutput(TYPES, page.getSizeInBytes() + 1, page.getPositionCount() + 1, Integer.MAX_VALUE);

        assertTrue(output.needsInput());
        assertNull(output.getOutput());

        output.addInput(createPagesIterator(splits.get(0)));
        assertFalse(output.needsInput());
        assertNull(output.getOutput());
        assertTrue(output.needsInput());

        output.addInput(createPagesIterator(splits.get(1)));
        assertFalse(output.needsInput());
        assertNull(output.getOutput());

        output.finish();

        assertFalse(output.needsInput());
        assertPageEquals(TYPES, output.getOutput(), page);
    }

    @Test
    public void testFlushOnBigPage()
    {
        Page smallPage = createSequencePage(TYPES, 10);
        Page bigPage = createSequencePage(TYPES, 100);

        MergingPageOutput output = new MergingPageOutput(TYPES, bigPage.getSizeInBytes(), bigPage.getPositionCount(), Integer.MAX_VALUE);

        assertTrue(output.needsInput());
        assertNull(output.getOutput());

        output.addInput(createPagesIterator(smallPage));
        assertFalse(output.needsInput());
        assertNull(output.getOutput());
        assertTrue(output.needsInput());

        output.addInput(createPagesIterator(bigPage));
        assertFalse(output.needsInput());
        assertPageEquals(TYPES, output.getOutput(), smallPage);
        assertFalse(output.needsInput());
        assertSame(output.getOutput(), bigPage);
    }

    @Test
    public void testFlushOnFullPage()
    {
        int singlePageRowCount = 10;
        List<Type> types = ImmutableList.of(BIGINT);
        Page page = createSequencePage(types, singlePageRowCount * 2);
        List<Page> splits = splitPage(page, page.getSizeInBytes() / 2);

        MergingPageOutput output = new MergingPageOutput(types, page.getSizeInBytes() / 2 + 1, page.getPositionCount() / 2 + 1, toIntExact(page.getSizeInBytes()));

        assertTrue(output.needsInput());
        assertNull(output.getOutput());

        output.addInput(createPagesIterator(splits.get(0)));
        assertFalse(output.needsInput());
        assertNull(output.getOutput());
        assertTrue(output.needsInput());

        output.addInput(createPagesIterator(splits.get(1)));
        assertFalse(output.needsInput());
        assertPageEquals(types, output.getOutput(), page);

        output.addInput(createPagesIterator(splits.get(0), splits.get(1)));
        assertFalse(output.needsInput());
        assertPageEquals(types, output.getOutput(), page);
    }

    @Test
    public void testPositionCountOnly()
    {
        int minRowCount = 256;
        MergingPageOutput output = new MergingPageOutput(ImmutableList.of(), 1024 * 1024, minRowCount);

        Page bufferedPage = new Page(minRowCount - 1);
        output.addInput(createPagesIterator(bufferedPage));
        assertEquals(output.getRetainedSizeInBytes(), MergingPageOutput.INSTANCE_SIZE); // no page builder or outputQueue items
        assertNull(output.getOutput());
        assertTrue(output.needsInput());

        Page hugePage = new Page(PageProcessor.MAX_BATCH_SIZE * 2);
        output.addInput(createPagesIterator(hugePage));
        // Sufficiently large pages are passed through directly and not combined with pending counts
        assertSame(output.getOutput(), hugePage);
        assertNull(output.getOutput());

        // Pages >= minRowCount are passed through, but combined with accumulated input
        assertTrue(output.needsInput());
        output.addInput(createPagesIterator(new Page(minRowCount)));
        assertEquals(output.getOutput().getPositionCount(), minRowCount + bufferedPage.getPositionCount());
        assertNull(output.getOutput());

        // Small inputs accumulate until MAX_BATCH_SIZE is reached
        int combinedPositions = 0;
        while (combinedPositions + 100 < PageProcessor.MAX_BATCH_SIZE) {
            combinedPositions += 100;
            assertTrue(output.needsInput());
            output.addInput(createPagesIterator(new Page(100)));
            assertNull(output.getOutput());
        }
        assertTrue(output.needsInput());
        combinedPositions += 100;
        output.addInput(createPagesIterator(new Page(100)));
        assertEquals(output.getOutput().getPositionCount(), PageProcessor.MAX_BATCH_SIZE);
        output.finish();
        assertEquals(output.getOutput().getPositionCount(), combinedPositions - PageProcessor.MAX_BATCH_SIZE);
        assertTrue(output.isFinished());
    }

    private static Iterator<Optional<Page>> createPagesIterator(Page... pages)
    {
        return createPagesIterator(ImmutableList.copyOf(pages));
    }

    private static Iterator<Optional<Page>> createPagesIterator(List<Page> pages)
    {
        return transform(pages.iterator(), Optional::of);
    }
}