TestDictionaryAwarePageProjection.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.operator.project;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.block.DictionaryBlock;
import com.facebook.presto.common.block.LazyBlock;
import com.facebook.presto.common.block.LongArrayBlock;
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.common.function.SqlFunctionProperties;
import com.facebook.presto.operator.DriverYieldSignal;
import com.facebook.presto.operator.Work;
import com.google.common.collect.ImmutableList;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.testing.Assertions.assertGreaterThan;
import static com.facebook.airlift.testing.Assertions.assertInstanceOf;
import static com.facebook.presto.block.BlockAssertions.assertBlockEquals;
import static com.facebook.presto.block.BlockAssertions.createLongSequenceBlock;
import static com.facebook.presto.common.block.DictionaryId.randomDictionaryId;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TestDictionaryAwarePageProjection
{
    private static final ScheduledExecutorService executor = newSingleThreadScheduledExecutor(daemonThreadsNamed("test-%s"));

    @DataProvider(name = "forceYield")
    public static Object[][] forceYield()
    {
        return new Object[][] {{true}, {false}};
    }

    @Test
    public void testDelegateMethods()
    {
        DictionaryAwarePageProjection projection = createProjection();
        assertEquals(projection.isDeterministic(), true);
        assertEquals(projection.getInputChannels().getInputChannels(), ImmutableList.of(3));
    }

    @Test(dataProvider = "forceYield")
    public void testSimpleBlock(boolean forceYield)
    {
        Block block = createLongSequenceBlock(0, 100);
        testProject(block, block.getClass(), forceYield);
    }

    @Test(dataProvider = "forceYield")
    public void testRleBlock(boolean forceYield)
    {
        Block value = createLongSequenceBlock(42, 43);
        RunLengthEncodedBlock block = new RunLengthEncodedBlock(value, 100);

        testProject(block, RunLengthEncodedBlock.class, forceYield);
    }

    @Test(dataProvider = "forceYield")
    public void testRleBlockWithFailure(boolean forceYield)
    {
        Block value = createLongSequenceBlock(-43, -42);
        RunLengthEncodedBlock block = new RunLengthEncodedBlock(value, 100);

        testProjectFails(block, RunLengthEncodedBlock.class, forceYield);
    }

    @Test(dataProvider = "forceYield")
    public void testDictionaryBlock(boolean forceYield)
    {
        DictionaryBlock block = createDictionaryBlock(10, 100);

        testProject(block, DictionaryBlock.class, forceYield);
    }

    @Test(dataProvider = "forceYield")
    public void testDictionaryBlockWithFailure(boolean forceYield)
    {
        DictionaryBlock block = createDictionaryBlockWithFailure(10, 100);

        testProjectFails(block, DictionaryBlock.class, forceYield);
    }

    @Test(dataProvider = "forceYield")
    public void testDictionaryBlockProcessingWithUnusedFailure(boolean forceYield)
    {
        DictionaryBlock block = createDictionaryBlockWithUnusedEntries(10, 100);

        // failures in the dictionary processing will cause a fallback to normal columnar processing
        testProject(block, LongArrayBlock.class, forceYield);
    }

    @Test
    public void testDictionaryProcessingIgnoreYield()
    {
        DictionaryAwarePageProjection projection = createProjection();

        // the same input block will bypass yield with multiple projections
        DictionaryBlock block = createDictionaryBlock(10, 100);
        testProjectRange(block, DictionaryBlock.class, projection, true);
        testProjectFastReturnIgnoreYield(block, projection);
        testProjectFastReturnIgnoreYield(block, projection);
        testProjectFastReturnIgnoreYield(block, projection);
    }

    @Test(dataProvider = "forceYield")
    public void testDictionaryProcessingEnableDisable(boolean forceYield)
    {
        DictionaryAwarePageProjection projection = createProjection();

        // function will always processes the first dictionary
        DictionaryBlock ineffectiveBlock = createDictionaryBlock(100, 20);
        testProjectRange(ineffectiveBlock, DictionaryBlock.class, projection, forceYield);
        testProjectFastReturnIgnoreYield(ineffectiveBlock, projection);
        // dictionary processing can reuse the last dictionary
        // in this case, we don't even check yield signal; make yieldForce to false
        testProjectList(ineffectiveBlock, DictionaryBlock.class, projection, false);

        // last dictionary not effective, so dictionary processing is disabled
        DictionaryBlock effectiveBlock = createDictionaryBlock(10, 100);
        testProjectRange(effectiveBlock, LongArrayBlock.class, projection, forceYield);
        testProjectList(effectiveBlock, LongArrayBlock.class, projection, forceYield);

        // last dictionary effective, so dictionary processing is enabled again
        testProjectRange(ineffectiveBlock, DictionaryBlock.class, projection, forceYield);
        testProjectFastReturnIgnoreYield(ineffectiveBlock, projection);
        // dictionary processing can reuse the last dictionary
        // in this case, we don't even check yield signal; make yieldForce to false
        testProjectList(ineffectiveBlock, DictionaryBlock.class, projection, false);

        // last dictionary not effective, so dictionary processing is disabled again
        testProjectRange(effectiveBlock, LongArrayBlock.class, projection, forceYield);
        testProjectList(effectiveBlock, LongArrayBlock.class, projection, forceYield);
    }

    private static DictionaryBlock createDictionaryBlock(int dictionarySize, int blockSize)
    {
        Block dictionary = createLongSequenceBlock(0, dictionarySize);
        int[] ids = new int[blockSize];
        Arrays.setAll(ids, index -> index % dictionarySize);
        return new DictionaryBlock(dictionary, ids);
    }

    private static DictionaryBlock createDictionaryBlockWithFailure(int dictionarySize, int blockSize)
    {
        Block dictionary = createLongSequenceBlock(-10, dictionarySize - 10);
        int[] ids = new int[blockSize];
        Arrays.setAll(ids, index -> index % dictionarySize);
        return new DictionaryBlock(dictionary, ids);
    }

    private static DictionaryBlock createDictionaryBlockWithUnusedEntries(int dictionarySize, int blockSize)
    {
        Block dictionary = createLongSequenceBlock(-10, dictionarySize);
        int[] ids = new int[blockSize];
        Arrays.setAll(ids, index -> (index % dictionarySize) + 10);
        return new DictionaryBlock(dictionary, ids);
    }

    private static List<Block> projectWithYield(Work<List<Block>> work, DriverYieldSignal yieldSignal)
    {
        int yieldCount = 0;
        while (true) {
            yieldSignal.setWithDelay(1, executor);
            yieldSignal.forceYieldForTesting();
            if (work.process()) {
                assertGreaterThan(yieldCount, 0);
                return work.getResult();
            }
            yieldCount++;
            if (yieldCount > 1_000_000) {
                fail("projection is not making progress");
            }
            yieldSignal.reset();
        }
    }

    private static void testProject(Block block, Class<? extends Block> expectedResultType, boolean forceYield)
    {
        testProjectRange(block, expectedResultType, createProjection(), forceYield);
        testProjectList(block, expectedResultType, createProjection(), forceYield);
        testProjectRange(lazyWrapper(block), expectedResultType, createProjection(), forceYield);
        testProjectList(lazyWrapper(block), expectedResultType, createProjection(), forceYield);
    }

    private static void testProjectFails(Block block, Class<? extends Block> expectedResultType, boolean forceYield)
    {
        assertThrows(NegativeValueException.class, () -> testProjectRange(block, expectedResultType, createProjection(), forceYield));
        assertThrows(NegativeValueException.class, () -> testProjectList(block, expectedResultType, createProjection(), forceYield));
        assertThrows(NegativeValueException.class, () -> testProjectRange(lazyWrapper(block), expectedResultType, createProjection(), forceYield));
        assertThrows(NegativeValueException.class, () -> testProjectList(lazyWrapper(block), expectedResultType, createProjection(), forceYield));
    }

    private static void testProjectRange(Block block, Class<? extends Block> expectedResultType, DictionaryAwarePageProjection projection, boolean forceYield)
    {
        DriverYieldSignal yieldSignal = new DriverYieldSignal();
        Work<List<Block>> work = projection.project(null, yieldSignal, new Page(block), SelectedPositions.positionsRange(5, 10));
        List<Block> result;
        if (forceYield) {
            result = projectWithYield(work, yieldSignal);
        }
        else {
            assertTrue(work.process());
            result = work.getResult();
        }
        assertBlockEquals(
                BIGINT,
                result.get(0),
                block.getRegion(5, 10));
        assertInstanceOf(result.get(0), expectedResultType);
    }

    private static void testProjectList(Block block, Class<? extends Block> expectedResultType, DictionaryAwarePageProjection projection, boolean forceYield)
    {
        DriverYieldSignal yieldSignal = new DriverYieldSignal();
        int[] positions = {0, 2, 4, 6, 8, 10};
        Work<List<Block>> work = projection.project(null, yieldSignal, new Page(block), SelectedPositions.positionsList(positions, 0, positions.length));
        List<Block> result;
        if (forceYield) {
            result = projectWithYield(work, yieldSignal);
        }
        else {
            assertTrue(work.process());
            result = work.getResult();
        }
        assertBlockEquals(
                BIGINT,
                result.get(0),
                block.copyPositions(positions, 0, positions.length));
        assertInstanceOf(result.get(0), expectedResultType);
    }

    private static void testProjectFastReturnIgnoreYield(Block block, DictionaryAwarePageProjection projection)
    {
        DriverYieldSignal yieldSignal = new DriverYieldSignal();
        Work<List<Block>> work = projection.project(null, yieldSignal, new Page(block), SelectedPositions.positionsRange(5, 10));
        yieldSignal.setWithDelay(1, executor);
        yieldSignal.forceYieldForTesting();

        // yield signal is ignored given the block has already been loaded
        assertTrue(work.process());
        Block result = work.getResult().get(0);
        yieldSignal.reset();

        assertBlockEquals(
                BIGINT,
                result,
                block.getRegion(5, 10));
        assertInstanceOf(result, DictionaryBlock.class);
    }

    private static DictionaryAwarePageProjection createProjection()
    {
        return new DictionaryAwarePageProjection(
                new TestPageProjection(),
                block -> randomDictionaryId());
    }

    private static LazyBlock lazyWrapper(Block block)
    {
        return new LazyBlock(block.getPositionCount(), lazyBlock -> lazyBlock.setBlock(block));
    }

    private static class TestPageProjection
            implements PageProjection
    {
        @Override
        public boolean isDeterministic()
        {
            return true;
        }

        @Override
        public InputChannels getInputChannels()
        {
            return new InputChannels(3);
        }

        @Override
        public Work<List<Block>> project(SqlFunctionProperties properties, DriverYieldSignal yieldSignal, Page page, SelectedPositions selectedPositions)
        {
            return new TestPageProjectionWork(yieldSignal, page, selectedPositions);
        }

        private class TestPageProjectionWork
                implements Work<List<Block>>
        {
            private final DriverYieldSignal yieldSignal;
            private final Block block;
            private final SelectedPositions selectedPositions;

            private BlockBuilder blockBuilder;
            private int nextIndexOrPosition;
            private Block result;

            public TestPageProjectionWork(DriverYieldSignal yieldSignal, Page page, SelectedPositions selectedPositions)
            {
                this.yieldSignal = yieldSignal;
                this.block = page.getBlock(0);
                this.selectedPositions = selectedPositions;
                this.blockBuilder = BIGINT.createBlockBuilder(null, selectedPositions.size());
            }

            @Override
            public boolean process()
            {
                assertNull(result);
                if (selectedPositions.isList()) {
                    int offset = selectedPositions.getOffset();
                    int[] positions = selectedPositions.getPositions();
                    for (int index = nextIndexOrPosition + offset; index < offset + selectedPositions.size(); index++) {
                        blockBuilder.writeLong(verifyPositive(block.getLong(positions[index])));
                        if (yieldSignal.isSet()) {
                            nextIndexOrPosition = index + 1 - offset;
                            return false;
                        }
                    }
                }
                else {
                    int offset = selectedPositions.getOffset();
                    for (int position = nextIndexOrPosition + offset; position < offset + selectedPositions.size(); position++) {
                        blockBuilder.writeLong(verifyPositive(block.getLong(position)));
                        if (yieldSignal.isSet()) {
                            nextIndexOrPosition = position + 1 - offset;
                            return false;
                        }
                    }
                }
                result = blockBuilder.build();
                blockBuilder = blockBuilder.newBlockBuilderLike(null);
                return true;
            }

            @Override
            public List<Block> getResult()
            {
                assertNotNull(result);
                return ImmutableList.of(result);
            }
        }

        private static long verifyPositive(long value)
        {
            if (value < 0) {
                throw new NegativeValueException(value);
            }
            return value;
        }
    }

    private static class NegativeValueException
            extends RuntimeException
    {
        public NegativeValueException(long value)
        {
            super("value is negative: " + value);
        }
    }
}