TestOptimizedPartitionedOutputOperator.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator.repartition;
import com.facebook.presto.CompressionCodec;
import com.facebook.presto.Session;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockEncodingManager;
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.common.block.VariableWidthBlock;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.buffer.BufferState;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.buffer.PagesSerdeFactory;
import com.facebook.presto.execution.buffer.PartitionedOutputBuffer;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.memory.context.SimpleLocalMemoryContext;
import com.facebook.presto.operator.DriverContext;
import com.facebook.presto.operator.HashGenerator;
import com.facebook.presto.operator.InterpretedHashGenerator;
import com.facebook.presto.operator.OperatorContext;
import com.facebook.presto.operator.PageAssertions;
import com.facebook.presto.operator.PartitionFunction;
import com.facebook.presto.operator.PrecomputedHashGenerator;
import com.facebook.presto.operator.exchange.LocalPartitionGenerator;
import com.facebook.presto.operator.repartition.OptimizedPartitionedOutputOperator.OptimizedPartitionedOutputFactory;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.SerializedPage;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.sql.planner.OutputPartitioning;
import com.facebook.presto.testing.TestingTaskContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.testing.Assertions.assertBetweenInclusive;
import static com.facebook.presto.block.BlockAssertions.Encoding.DICTIONARY;
import static com.facebook.presto.block.BlockAssertions.Encoding.RUN_LENGTH;
import static com.facebook.presto.block.BlockAssertions.createLongDictionaryBlock;
import static com.facebook.presto.block.BlockAssertions.createLongSequenceBlock;
import static com.facebook.presto.block.BlockAssertions.createMapType;
import static com.facebook.presto.block.BlockAssertions.createRLEBlock;
import static com.facebook.presto.block.BlockAssertions.createRandomLongsBlock;
import static com.facebook.presto.block.BlockAssertions.createRandomStringBlock;
import static com.facebook.presto.block.BlockAssertions.wrapBlock;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.DecimalType.createDecimalType;
import static com.facebook.presto.common.type.Decimals.MAX_SHORT_PRECISION;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.RowType.withDefaultFieldNames;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.execution.buffer.BufferState.OPEN;
import static com.facebook.presto.execution.buffer.BufferState.TERMINAL_BUFFER_STATES;
import static com.facebook.presto.execution.buffer.OutputBuffers.BufferType.PARTITIONED;
import static com.facebook.presto.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static com.facebook.presto.operator.PageAssertions.assertPageEquals;
import static com.facebook.presto.operator.PageAssertions.mergePages;
import static com.facebook.presto.operator.PageAssertions.updateBlockTypesWithHashBlockAndNullBlock;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.facebook.presto.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.testng.Assert.assertEquals;
public class TestOptimizedPartitionedOutputOperator
{
private static final ExecutorService EXECUTOR = newCachedThreadPool(daemonThreadsNamed("test-EXECUTOR-%s"));
private static final ScheduledExecutorService SCHEDULER = newScheduledThreadPool(1, daemonThreadsNamed("test-%s"));
private static final DataSize MAX_MEMORY = new DataSize(1, GIGABYTE);
private static final PagesSerde PAGES_SERDE = new PagesSerdeFactory(new BlockEncodingManager(), CompressionCodec.NONE).createPagesSerde(); //testingPagesSerde();
private static final int PARTITION_COUNT = 16;
private static final int PAGE_COUNT = 50;
private static final int POSITION_COUNT = 100;
private static final Random RANDOM = new Random(0);
private static final Block NULL_BLOCK = new RunLengthEncodedBlock(BIGINT.createBlockBuilder(null, 1).appendNull().build(), POSITION_COUNT);
private static final Block TESTING_BLOCK = createLongSequenceBlock(0, POSITION_COUNT);
private static final Block TESTING_DICTIONARY_BLOCK = createLongDictionaryBlock(0, POSITION_COUNT);
private static final Block TESTING_RLE_BLOCK = createRLEBlock(new Random(0).nextLong(), POSITION_COUNT);
private static final Page TESTING_PAGE = new Page(TESTING_BLOCK);
private static final Page TESTING_PAGE_WITH_DICTIONARY_BLOCK = new Page(TESTING_DICTIONARY_BLOCK);
private static final Page TESTING_PAGE_WITH_RLE_BLOCK = new Page(TESTING_RLE_BLOCK);
private static final Page TESTING_PAGE_WITH_NULL_BLOCK = new Page(POSITION_COUNT, TESTING_BLOCK, NULL_BLOCK);
private static final Page TESTING_PAGE_WITH_NULL_AND_DICTIONARY_BLOCK = new Page(POSITION_COUNT, TESTING_DICTIONARY_BLOCK, NULL_BLOCK);
private static final Page TESTING_PAGE_WITH_NULL_AND_RLE_BLOCK = new Page(POSITION_COUNT, TESTING_RLE_BLOCK, NULL_BLOCK);
private static final double OUTPUT_SIZE_ESTIMATION_ERROR_ALLOWANCE = 1.2;
@Test
public void testPartitionedSinglePagePrimitiveTypes()
{
testPartitionedSinglePage(ImmutableList.of(BIGINT));
testPartitionedSinglePage(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1)));
testPartitionedSinglePage(ImmutableList.of(SMALLINT));
testPartitionedSinglePage(ImmutableList.of(INTEGER));
testPartitionedSinglePage(ImmutableList.of(REAL));
testPartitionedSinglePage(ImmutableList.of(BOOLEAN));
testPartitionedSinglePage(ImmutableList.of(VARCHAR));
testPartitionedSinglePage(ImmutableList.of(BIGINT, BIGINT));
testPartitionedSinglePage(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION - 1)));
testPartitionedSinglePage(ImmutableList.of(SMALLINT, SMALLINT));
testPartitionedSinglePage(ImmutableList.of(INTEGER, INTEGER));
testPartitionedSinglePage(ImmutableList.of(REAL, REAL));
testPartitionedSinglePage(ImmutableList.of(BOOLEAN, BOOLEAN));
testPartitionedSinglePage(ImmutableList.of(VARCHAR, VARCHAR));
}
@Test
public void testPartitionedSinglePageForArray()
{
testPartitionedSinglePage(ImmutableList.of(new ArrayType(BIGINT), new ArrayType(BIGINT)));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), new ArrayType((createDecimalType(MAX_SHORT_PRECISION + 1)))));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(SMALLINT), new ArrayType(SMALLINT)));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(INTEGER), new ArrayType(INTEGER)));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(BOOLEAN), new ArrayType(BOOLEAN)));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(VARCHAR), new ArrayType(VARCHAR)));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(BIGINT))));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)))));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(SMALLINT))));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(INTEGER))));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(BOOLEAN))));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(VARCHAR))));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(BIGINT)))));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1))))));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(SMALLINT)))));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(INTEGER)))));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(BOOLEAN)))));
testPartitionedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(VARCHAR)))));
}
@Test
public void testPartitionedSinglePageForMap()
{
testPartitionedSinglePage(ImmutableList.of(createMapType(BIGINT, BIGINT)));
testPartitionedSinglePage(ImmutableList.of(createMapType(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1))));
testPartitionedSinglePage(ImmutableList.of(createMapType(SMALLINT, SMALLINT)));
testPartitionedSinglePage(ImmutableList.of(createMapType(INTEGER, INTEGER)));
testPartitionedSinglePage(ImmutableList.of(createMapType(BOOLEAN, BOOLEAN)));
testPartitionedSinglePage(ImmutableList.of(createMapType(VARCHAR, VARCHAR)));
testPartitionedSinglePage(ImmutableList.of(createMapType(BIGINT, createMapType(BIGINT, BIGINT))));
testPartitionedSinglePage(ImmutableList.of(createMapType(createDecimalType(MAX_SHORT_PRECISION + 1), createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1)))));
testPartitionedSinglePage(ImmutableList.of(createMapType(SMALLINT, createMapType(SMALLINT, SMALLINT))));
testPartitionedSinglePage(ImmutableList.of(createMapType(INTEGER, createMapType(INTEGER, INTEGER))));
testPartitionedSinglePage(ImmutableList.of(createMapType(BOOLEAN, createMapType(BOOLEAN, BOOLEAN))));
testPartitionedSinglePage(ImmutableList.of(createMapType(VARCHAR, createMapType(VARCHAR, VARCHAR))));
testPartitionedSinglePage(ImmutableList.of(createMapType(createMapType(BIGINT, BIGINT), new ArrayType(createMapType(BIGINT, BIGINT)))));
testPartitionedSinglePage(ImmutableList.of(createMapType(createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1)), new ArrayType(createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1))))));
testPartitionedSinglePage(ImmutableList.of(createMapType(createMapType(SMALLINT, SMALLINT), new ArrayType(createMapType(SMALLINT, SMALLINT)))));
testPartitionedSinglePage(ImmutableList.of(createMapType(createMapType(INTEGER, INTEGER), new ArrayType(createMapType(INTEGER, INTEGER)))));
testPartitionedSinglePage(ImmutableList.of(createMapType(createMapType(INTEGER, INTEGER), new ArrayType(createMapType(INTEGER, INTEGER)))));
testPartitionedSinglePage(ImmutableList.of(createMapType(createMapType(INTEGER, INTEGER), new ArrayType(createMapType(INTEGER, INTEGER)))));
}
@Test
public void testPartitionedSinglePageForRow()
{
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT, BIGINT))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1)))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(INTEGER, INTEGER, INTEGER))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(SMALLINT, SMALLINT, SMALLINT))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BOOLEAN, BOOLEAN, BOOLEAN))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(VARCHAR, VARCHAR, VARCHAR))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BIGINT, withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(INTEGER, withDefaultFieldNames(ImmutableList.of(INTEGER, INTEGER))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(SMALLINT, withDefaultFieldNames(ImmutableList.of(SMALLINT, SMALLINT))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BOOLEAN, withDefaultFieldNames(ImmutableList.of(BOOLEAN, BOOLEAN, BOOLEAN))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(VARCHAR, withDefaultFieldNames(ImmutableList.of(VARCHAR, VARCHAR, VARCHAR))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BIGINT), new ArrayType(BIGINT)))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(INTEGER), new ArrayType(INTEGER)))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(SMALLINT), new ArrayType(SMALLINT)))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BOOLEAN), new ArrayType(BOOLEAN)))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(VARCHAR), new ArrayType(VARCHAR)))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BIGINT), createMapType(BIGINT, BIGINT)))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(INTEGER), createMapType(INTEGER, INTEGER)))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(SMALLINT), createMapType(SMALLINT, SMALLINT)))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BOOLEAN), createMapType(BOOLEAN, BOOLEAN)))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(VARCHAR), createMapType(VARCHAR, VARCHAR)))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BIGINT), createMapType(BIGINT, withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT)))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), createMapType(BIGINT, withDefaultFieldNames(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1))))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(INTEGER), createMapType(INTEGER, withDefaultFieldNames(ImmutableList.of(INTEGER, INTEGER)))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(SMALLINT), createMapType(SMALLINT, withDefaultFieldNames(ImmutableList.of(SMALLINT, SMALLINT)))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BOOLEAN), createMapType(BOOLEAN, withDefaultFieldNames(ImmutableList.of(BOOLEAN, BOOLEAN)))))));
testPartitionedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(VARCHAR), createMapType(VARCHAR, withDefaultFieldNames(ImmutableList.of(VARCHAR, VARCHAR)))))));
}
@Test
public void testPartitionedMultiplePagesPrimitiveTypes()
{
testPartitionedMultiplePages(ImmutableList.of(BIGINT));
testPartitionedMultiplePages(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1)));
testPartitionedMultiplePages(ImmutableList.of(SMALLINT));
testPartitionedMultiplePages(ImmutableList.of(INTEGER));
testPartitionedMultiplePages(ImmutableList.of(REAL));
testPartitionedMultiplePages(ImmutableList.of(BOOLEAN));
testPartitionedMultiplePages(ImmutableList.of(VARCHAR));
testPartitionedMultiplePages(ImmutableList.of(BIGINT, BIGINT));
testPartitionedMultiplePages(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION - 1)));
testPartitionedMultiplePages(ImmutableList.of(SMALLINT, SMALLINT));
testPartitionedMultiplePages(ImmutableList.of(INTEGER, INTEGER));
testPartitionedMultiplePages(ImmutableList.of(REAL, REAL));
testPartitionedMultiplePages(ImmutableList.of(BOOLEAN, BOOLEAN));
testPartitionedMultiplePages(ImmutableList.of(VARCHAR, VARCHAR));
}
@Test
public void testPartitionedMultiplePagesForArray()
{
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(BIGINT), new ArrayType(BIGINT)));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), new ArrayType((createDecimalType(MAX_SHORT_PRECISION + 1)))));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(SMALLINT), new ArrayType(SMALLINT)));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(INTEGER), new ArrayType(INTEGER)));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(BOOLEAN), new ArrayType(BOOLEAN)));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(VARCHAR), new ArrayType(VARCHAR)));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(BIGINT))));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)))));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(SMALLINT))));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(INTEGER))));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(BOOLEAN))));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(VARCHAR))));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(BIGINT)))));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1))))));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(SMALLINT)))));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(INTEGER)))));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(BOOLEAN)))));
testPartitionedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(VARCHAR)))));
}
@Test
public void testPartitionedMultiplePagesForMap()
{
testPartitionedMultiplePages(ImmutableList.of(createMapType(BIGINT, BIGINT)));
testPartitionedMultiplePages(ImmutableList.of(createMapType(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1))));
testPartitionedMultiplePages(ImmutableList.of(createMapType(SMALLINT, SMALLINT)));
testPartitionedMultiplePages(ImmutableList.of(createMapType(INTEGER, INTEGER)));
testPartitionedMultiplePages(ImmutableList.of(createMapType(BOOLEAN, BOOLEAN)));
testPartitionedMultiplePages(ImmutableList.of(createMapType(VARCHAR, VARCHAR)));
testPartitionedMultiplePages(ImmutableList.of(createMapType(BIGINT, createMapType(BIGINT, BIGINT))));
testPartitionedMultiplePages(ImmutableList.of(createMapType(createDecimalType(MAX_SHORT_PRECISION + 1), createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1)))));
testPartitionedMultiplePages(ImmutableList.of(createMapType(SMALLINT, createMapType(SMALLINT, SMALLINT))));
testPartitionedMultiplePages(ImmutableList.of(createMapType(INTEGER, createMapType(INTEGER, INTEGER))));
testPartitionedMultiplePages(ImmutableList.of(createMapType(BOOLEAN, createMapType(BOOLEAN, BOOLEAN))));
testPartitionedMultiplePages(ImmutableList.of(createMapType(VARCHAR, createMapType(VARCHAR, VARCHAR))));
testPartitionedMultiplePages(ImmutableList.of(createMapType(createMapType(BIGINT, BIGINT), new ArrayType(createMapType(BIGINT, BIGINT)))));
testPartitionedMultiplePages(ImmutableList.of(createMapType(createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1)), new ArrayType(createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1))))));
testPartitionedMultiplePages(ImmutableList.of(createMapType(createMapType(SMALLINT, SMALLINT), new ArrayType(createMapType(SMALLINT, SMALLINT)))));
testPartitionedMultiplePages(ImmutableList.of(createMapType(createMapType(INTEGER, INTEGER), new ArrayType(createMapType(INTEGER, INTEGER)))));
testPartitionedMultiplePages(ImmutableList.of(createMapType(createMapType(INTEGER, INTEGER), new ArrayType(createMapType(INTEGER, INTEGER)))));
testPartitionedMultiplePages(ImmutableList.of(createMapType(createMapType(INTEGER, INTEGER), new ArrayType(createMapType(INTEGER, INTEGER)))));
}
@Test
public void testPartitionedMultiplePagesForRow()
{
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT, BIGINT))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1)))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(INTEGER, INTEGER, INTEGER))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(SMALLINT, SMALLINT, SMALLINT))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BOOLEAN, BOOLEAN, BOOLEAN))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(VARCHAR, VARCHAR, VARCHAR))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BIGINT, withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(INTEGER, withDefaultFieldNames(ImmutableList.of(INTEGER, INTEGER))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(SMALLINT, withDefaultFieldNames(ImmutableList.of(SMALLINT, SMALLINT))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BOOLEAN, withDefaultFieldNames(ImmutableList.of(BOOLEAN, BOOLEAN, BOOLEAN))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(VARCHAR, withDefaultFieldNames(ImmutableList.of(VARCHAR, VARCHAR, VARCHAR))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BIGINT), new ArrayType(BIGINT)))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(INTEGER), new ArrayType(INTEGER)))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(SMALLINT), new ArrayType(SMALLINT)))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BOOLEAN), new ArrayType(BOOLEAN)))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(VARCHAR), new ArrayType(VARCHAR)))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BIGINT), createMapType(BIGINT, BIGINT)))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(INTEGER), createMapType(INTEGER, INTEGER)))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(SMALLINT), createMapType(SMALLINT, SMALLINT)))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BOOLEAN), createMapType(BOOLEAN, BOOLEAN)))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(VARCHAR), createMapType(VARCHAR, VARCHAR)))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BIGINT), createMapType(BIGINT, withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT)))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), createMapType(BIGINT, withDefaultFieldNames(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1))))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(INTEGER), createMapType(INTEGER, withDefaultFieldNames(ImmutableList.of(INTEGER, INTEGER)))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(SMALLINT), createMapType(SMALLINT, withDefaultFieldNames(ImmutableList.of(SMALLINT, SMALLINT)))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BOOLEAN), createMapType(BOOLEAN, withDefaultFieldNames(ImmutableList.of(BOOLEAN, BOOLEAN)))))));
testPartitionedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(VARCHAR), createMapType(VARCHAR, withDefaultFieldNames(ImmutableList.of(VARCHAR, VARCHAR)))))));
}
@Test
public void testReplicatedSinglePagePrimitiveTypes()
{
testReplicatedSinglePage(ImmutableList.of(BIGINT));
testReplicatedSinglePage(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1)));
testReplicatedSinglePage(ImmutableList.of(SMALLINT));
testReplicatedSinglePage(ImmutableList.of(INTEGER));
testReplicatedSinglePage(ImmutableList.of(BOOLEAN));
testReplicatedSinglePage(ImmutableList.of(VARCHAR));
testReplicatedSinglePage(ImmutableList.of(BIGINT, BIGINT));
testReplicatedSinglePage(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION - 1)));
testReplicatedSinglePage(ImmutableList.of(SMALLINT, SMALLINT));
testReplicatedSinglePage(ImmutableList.of(INTEGER, INTEGER));
testReplicatedSinglePage(ImmutableList.of(BOOLEAN, BOOLEAN));
testReplicatedSinglePage(ImmutableList.of(VARCHAR, VARCHAR));
}
@Test
public void testReplicatedSinglePageForArray()
{
testReplicatedSinglePage(ImmutableList.of(new ArrayType(BIGINT), new ArrayType(BIGINT)));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), new ArrayType((createDecimalType(MAX_SHORT_PRECISION + 1)))));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(SMALLINT), new ArrayType(SMALLINT)));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(INTEGER), new ArrayType(INTEGER)));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(BOOLEAN), new ArrayType(BOOLEAN)));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(VARCHAR), new ArrayType(VARCHAR)));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(BIGINT))));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)))));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(SMALLINT))));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(INTEGER))));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(BOOLEAN))));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(VARCHAR))));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(BIGINT)))));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1))))));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(SMALLINT)))));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(INTEGER)))));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(BOOLEAN)))));
testReplicatedSinglePage(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(VARCHAR)))));
}
@Test
public void testReplicatedSinglePageForMap()
{
testReplicatedSinglePage(ImmutableList.of(createMapType(BIGINT, BIGINT)));
testReplicatedSinglePage(ImmutableList.of(createMapType(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1))));
testReplicatedSinglePage(ImmutableList.of(createMapType(SMALLINT, SMALLINT)));
testReplicatedSinglePage(ImmutableList.of(createMapType(INTEGER, INTEGER)));
testReplicatedSinglePage(ImmutableList.of(createMapType(BOOLEAN, BOOLEAN)));
testReplicatedSinglePage(ImmutableList.of(createMapType(VARCHAR, VARCHAR)));
testReplicatedSinglePage(ImmutableList.of(createMapType(BIGINT, createMapType(BIGINT, BIGINT))));
testReplicatedSinglePage(ImmutableList.of(createMapType(createDecimalType(MAX_SHORT_PRECISION + 1), createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1)))));
testReplicatedSinglePage(ImmutableList.of(createMapType(SMALLINT, createMapType(SMALLINT, SMALLINT))));
testReplicatedSinglePage(ImmutableList.of(createMapType(INTEGER, createMapType(INTEGER, INTEGER))));
testReplicatedSinglePage(ImmutableList.of(createMapType(BOOLEAN, createMapType(BOOLEAN, BOOLEAN))));
testReplicatedSinglePage(ImmutableList.of(createMapType(VARCHAR, createMapType(VARCHAR, VARCHAR))));
testReplicatedSinglePage(ImmutableList.of(createMapType(createMapType(BIGINT, BIGINT), new ArrayType(createMapType(BIGINT, BIGINT)))));
testReplicatedSinglePage(ImmutableList.of(createMapType(createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1)), new ArrayType(createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1))))));
testReplicatedSinglePage(ImmutableList.of(createMapType(createMapType(SMALLINT, SMALLINT), new ArrayType(createMapType(SMALLINT, SMALLINT)))));
testReplicatedSinglePage(ImmutableList.of(createMapType(createMapType(INTEGER, INTEGER), new ArrayType(createMapType(INTEGER, INTEGER)))));
testReplicatedSinglePage(ImmutableList.of(createMapType(createMapType(INTEGER, INTEGER), new ArrayType(createMapType(INTEGER, INTEGER)))));
testReplicatedSinglePage(ImmutableList.of(createMapType(createMapType(INTEGER, INTEGER), new ArrayType(createMapType(INTEGER, INTEGER)))));
}
@Test
public void testReplicatedSinglePageForRow()
{
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT, BIGINT))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1)))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(INTEGER, INTEGER, INTEGER))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(SMALLINT, SMALLINT, SMALLINT))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BOOLEAN, BOOLEAN, BOOLEAN))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(VARCHAR, VARCHAR, VARCHAR))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BIGINT, withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(INTEGER, withDefaultFieldNames(ImmutableList.of(INTEGER, INTEGER))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(SMALLINT, withDefaultFieldNames(ImmutableList.of(SMALLINT, SMALLINT))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BOOLEAN, withDefaultFieldNames(ImmutableList.of(BOOLEAN, BOOLEAN, BOOLEAN))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(VARCHAR, withDefaultFieldNames(ImmutableList.of(VARCHAR, VARCHAR, VARCHAR))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BIGINT), new ArrayType(BIGINT)))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(INTEGER), new ArrayType(INTEGER)))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(SMALLINT), new ArrayType(SMALLINT)))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BOOLEAN), new ArrayType(BOOLEAN)))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(VARCHAR), new ArrayType(VARCHAR)))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BIGINT), createMapType(BIGINT, BIGINT)))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(INTEGER), createMapType(INTEGER, INTEGER)))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(SMALLINT), createMapType(SMALLINT, SMALLINT)))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BOOLEAN), createMapType(BOOLEAN, BOOLEAN)))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(VARCHAR), createMapType(VARCHAR, VARCHAR)))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BIGINT), createMapType(BIGINT, withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT)))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), createMapType(BIGINT, withDefaultFieldNames(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1))))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(INTEGER), createMapType(INTEGER, withDefaultFieldNames(ImmutableList.of(INTEGER, INTEGER)))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(SMALLINT), createMapType(SMALLINT, withDefaultFieldNames(ImmutableList.of(SMALLINT, SMALLINT)))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BOOLEAN), createMapType(BOOLEAN, withDefaultFieldNames(ImmutableList.of(BOOLEAN, BOOLEAN)))))));
testReplicatedSinglePage(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(VARCHAR), createMapType(VARCHAR, withDefaultFieldNames(ImmutableList.of(VARCHAR, VARCHAR)))))));
}
@Test
public void testReplicatedMultiplePagesPrimitiveTypes()
{
testReplicatedMultiplePages(ImmutableList.of(BIGINT));
testReplicatedMultiplePages(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1)));
testReplicatedMultiplePages(ImmutableList.of(SMALLINT));
testReplicatedMultiplePages(ImmutableList.of(INTEGER));
testReplicatedMultiplePages(ImmutableList.of(BOOLEAN));
testReplicatedMultiplePages(ImmutableList.of(VARCHAR));
testReplicatedMultiplePages(ImmutableList.of(BIGINT, BIGINT));
testReplicatedMultiplePages(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION - 1)));
testReplicatedMultiplePages(ImmutableList.of(SMALLINT, SMALLINT));
testReplicatedMultiplePages(ImmutableList.of(INTEGER, INTEGER));
testReplicatedMultiplePages(ImmutableList.of(BOOLEAN, BOOLEAN));
testReplicatedMultiplePages(ImmutableList.of(VARCHAR, VARCHAR));
}
@Test
public void testReplicatedMultiplePagesForArray()
{
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(BIGINT), new ArrayType(BIGINT)));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), new ArrayType((createDecimalType(MAX_SHORT_PRECISION + 1)))));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(SMALLINT), new ArrayType(SMALLINT)));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(INTEGER), new ArrayType(INTEGER)));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(BOOLEAN), new ArrayType(BOOLEAN)));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(VARCHAR), new ArrayType(VARCHAR)));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(BIGINT))));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)))));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(SMALLINT))));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(INTEGER))));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(BOOLEAN))));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(VARCHAR))));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(BIGINT)))));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1))))));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(SMALLINT)))));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(INTEGER)))));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(BOOLEAN)))));
testReplicatedMultiplePages(ImmutableList.of(new ArrayType(new ArrayType(new ArrayType(VARCHAR)))));
}
@Test
public void testReplicatedMultiplePagesForMap()
{
testReplicatedMultiplePages(ImmutableList.of(createMapType(BIGINT, BIGINT)));
testReplicatedMultiplePages(ImmutableList.of(createMapType(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1))));
testReplicatedMultiplePages(ImmutableList.of(createMapType(SMALLINT, SMALLINT)));
testReplicatedMultiplePages(ImmutableList.of(createMapType(INTEGER, INTEGER)));
testReplicatedMultiplePages(ImmutableList.of(createMapType(BOOLEAN, BOOLEAN)));
testReplicatedMultiplePages(ImmutableList.of(createMapType(VARCHAR, VARCHAR)));
testReplicatedMultiplePages(ImmutableList.of(createMapType(BIGINT, createMapType(BIGINT, BIGINT))));
testReplicatedMultiplePages(ImmutableList.of(createMapType(createDecimalType(MAX_SHORT_PRECISION + 1), createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1)))));
testReplicatedMultiplePages(ImmutableList.of(createMapType(SMALLINT, createMapType(SMALLINT, SMALLINT))));
testReplicatedMultiplePages(ImmutableList.of(createMapType(INTEGER, createMapType(INTEGER, INTEGER))));
testReplicatedMultiplePages(ImmutableList.of(createMapType(BOOLEAN, createMapType(BOOLEAN, BOOLEAN))));
testReplicatedMultiplePages(ImmutableList.of(createMapType(VARCHAR, createMapType(VARCHAR, VARCHAR))));
testReplicatedMultiplePages(ImmutableList.of(createMapType(createMapType(BIGINT, BIGINT), new ArrayType(createMapType(BIGINT, BIGINT)))));
testReplicatedMultiplePages(ImmutableList.of(createMapType(createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1)), new ArrayType(createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1))))));
testReplicatedMultiplePages(ImmutableList.of(createMapType(createMapType(SMALLINT, SMALLINT), new ArrayType(createMapType(SMALLINT, SMALLINT)))));
testReplicatedMultiplePages(ImmutableList.of(createMapType(createMapType(INTEGER, INTEGER), new ArrayType(createMapType(INTEGER, INTEGER)))));
testReplicatedMultiplePages(ImmutableList.of(createMapType(createMapType(INTEGER, INTEGER), new ArrayType(createMapType(INTEGER, INTEGER)))));
testReplicatedMultiplePages(ImmutableList.of(createMapType(createMapType(INTEGER, INTEGER), new ArrayType(createMapType(INTEGER, INTEGER)))));
}
@Test
public void testReplicatedMultiplePagesForRow()
{
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT, BIGINT))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1)))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(INTEGER, INTEGER, INTEGER))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(SMALLINT, SMALLINT, SMALLINT))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BOOLEAN, BOOLEAN, BOOLEAN))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(VARCHAR, VARCHAR, VARCHAR))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BIGINT, withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(INTEGER, withDefaultFieldNames(ImmutableList.of(INTEGER, INTEGER))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(SMALLINT, withDefaultFieldNames(ImmutableList.of(SMALLINT, SMALLINT))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(BOOLEAN, withDefaultFieldNames(ImmutableList.of(BOOLEAN, BOOLEAN, BOOLEAN))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(VARCHAR, withDefaultFieldNames(ImmutableList.of(VARCHAR, VARCHAR, VARCHAR))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BIGINT), new ArrayType(BIGINT)))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(INTEGER), new ArrayType(INTEGER)))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(SMALLINT), new ArrayType(SMALLINT)))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BOOLEAN), new ArrayType(BOOLEAN)))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(VARCHAR), new ArrayType(VARCHAR)))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BIGINT), createMapType(BIGINT, BIGINT)))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), createMapType(BIGINT, createDecimalType(MAX_SHORT_PRECISION + 1))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(INTEGER), createMapType(INTEGER, INTEGER)))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(SMALLINT), createMapType(SMALLINT, SMALLINT)))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BOOLEAN), createMapType(BOOLEAN, BOOLEAN)))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(VARCHAR), createMapType(VARCHAR, VARCHAR)))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BIGINT), createMapType(BIGINT, withDefaultFieldNames(ImmutableList.of(BIGINT, BIGINT)))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(createDecimalType(MAX_SHORT_PRECISION + 1)), createMapType(BIGINT, withDefaultFieldNames(ImmutableList.of(createDecimalType(MAX_SHORT_PRECISION + 1), createDecimalType(MAX_SHORT_PRECISION + 1))))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(INTEGER), createMapType(INTEGER, withDefaultFieldNames(ImmutableList.of(INTEGER, INTEGER)))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(SMALLINT), createMapType(SMALLINT, withDefaultFieldNames(ImmutableList.of(SMALLINT, SMALLINT)))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(BOOLEAN), createMapType(BOOLEAN, withDefaultFieldNames(ImmutableList.of(BOOLEAN, BOOLEAN)))))));
testReplicatedMultiplePages(ImmutableList.of(withDefaultFieldNames(ImmutableList.of(new ArrayType(VARCHAR), createMapType(VARCHAR, withDefaultFieldNames(ImmutableList.of(VARCHAR, VARCHAR)))))));
}
@Test
public void testEmptyPage()
{
List<Type> types = updateBlockTypesWithHashBlockAndNullBlock(ImmutableList.of(BIGINT), true, false);
Page page = PageAssertions.createPageWithRandomData(ImmutableList.of(BIGINT), 0, true, false, 0.0f, 0.0f, false, ImmutableList.of());
testPartitioned(types, ImmutableList.of(page), new DataSize(128, MEGABYTE));
}
@Test
public void testPageWithNoBlocks()
{
List<Type> types = updateBlockTypesWithHashBlockAndNullBlock(ImmutableList.of(), false, false);
Page page = PageAssertions.createPageWithRandomData(ImmutableList.of(), 1, false, false, 0.0f, 0.0f, false, ImmutableList.of());
testPartitionedForZeroBlocks(types, ImmutableList.of(page), new DataSize(128, MEGABYTE));
}
/**
* Test BlockFlattener's allocator does not return borrowed arrays among blocks
*/
@Test
public void testPageWithBlocksOfDifferentPositionCounts()
{
Block[] blocks = new Block[4];
// PreComputed Hash Block
blocks[0] = createRandomLongsBlock(POSITION_COUNT, 0.0f);
// Create blocks whose base blocks are with increasing number of positions
blocks[1] = wrapBlock(createRandomStringBlock(10, 0.2f, 10), POSITION_COUNT, ImmutableList.of(DICTIONARY, DICTIONARY));
blocks[2] = wrapBlock(createRandomStringBlock(100, 0.2f, 10), POSITION_COUNT, ImmutableList.of(DICTIONARY, DICTIONARY));
blocks[3] = wrapBlock(createRandomStringBlock(1000, 0.2f, 10), POSITION_COUNT, ImmutableList.of(DICTIONARY, DICTIONARY));
Page page = new Page(blocks);
List<Type> types = ImmutableList.of(BIGINT, VARCHAR, VARCHAR, VARCHAR);
testPartitioned(types, ImmutableList.of(page), new DataSize(128, MEGABYTE));
testPartitioned(types, ImmutableList.of(page), new DataSize(1, KILOBYTE));
}
@Test
public void testPageWithVariableWidthBlocksOfSliceViews()
{
Block[] blocks = new Block[2];
// PreComputed Hash Block
blocks[0] = createRandomLongsBlock(POSITION_COUNT, 0.0f);
// Create blocks whose base blocks are with increasing number of positions
blocks[1] = createVariableWidthBlockOverSliceView(POSITION_COUNT);
Page page = new Page(blocks);
List<Type> types = ImmutableList.of(BIGINT, VARCHAR);
testPartitioned(types, ImmutableList.of(page), new DataSize(128, MEGABYTE));
testPartitioned(types, ImmutableList.of(page), new DataSize(1, KILOBYTE));
}
private void testPartitionedSinglePage(List<Type> targetTypes)
{
List<Type> types = updateBlockTypesWithHashBlockAndNullBlock(targetTypes, true, false);
// Test plain blocks: no block views, no Dictionary/RLE blocks
Page page = PageAssertions.createPageWithRandomData(targetTypes, POSITION_COUNT, true, false, 0.2f, 0.2f, false, ImmutableList.of());
// First test for the cases where the buffer can hold the whole page, then force flushing for every a few rows.
testPartitioned(types, ImmutableList.of(page), new DataSize(128, MEGABYTE));
testPartitioned(types, ImmutableList.of(page), new DataSize(1, KILOBYTE));
// Test block views and Dictionary/RLE blocks
page = PageAssertions.createPageWithRandomData(targetTypes, POSITION_COUNT, true, false, 0.2f, 0.2f, true, ImmutableList.of(DICTIONARY, RUN_LENGTH, DICTIONARY, RUN_LENGTH));
testPartitioned(types, ImmutableList.of(page), new DataSize(128, MEGABYTE));
testPartitioned(types, ImmutableList.of(page), new DataSize(1, KILOBYTE));
page = PageAssertions.createPageWithRandomData(targetTypes, POSITION_COUNT, true, false, 0.2f, 0.2f, true, ImmutableList.of(RUN_LENGTH, DICTIONARY, RUN_LENGTH, DICTIONARY));
testPartitioned(types, ImmutableList.of(page), new DataSize(128, MEGABYTE));
testPartitioned(types, ImmutableList.of(page), new DataSize(1, KILOBYTE));
}
private void testPartitionedMultiplePages(List<Type> targetTypes)
{
List<Type> types = updateBlockTypesWithHashBlockAndNullBlock(targetTypes, true, false);
List<Page> pages = new ArrayList<>();
for (int i = 0; i < PAGE_COUNT; i++) {
pages.add(PageAssertions.createPageWithRandomData(targetTypes, POSITION_COUNT + RANDOM.nextInt(POSITION_COUNT), true, false, 0.2f, 0.2f, false, ImmutableList.of()));
pages.add(PageAssertions.createPageWithRandomData(targetTypes, POSITION_COUNT + RANDOM.nextInt(POSITION_COUNT), true, false, 0.2f, 0.2f, true, ImmutableList.of(DICTIONARY, DICTIONARY, RUN_LENGTH)));
pages.add(PageAssertions.createPageWithRandomData(targetTypes, POSITION_COUNT + RANDOM.nextInt(POSITION_COUNT), true, false, 0.2f, 0.2f, true, ImmutableList.of(RUN_LENGTH, DICTIONARY, DICTIONARY)));
}
testPartitioned(types, pages, new DataSize(128, MEGABYTE));
testPartitioned(types, pages, new DataSize(1, KILOBYTE));
pages.clear();
for (int i = 0; i < PAGE_COUNT / 3; i++) {
pages.add(PageAssertions.createPageWithRandomData(targetTypes, POSITION_COUNT + RANDOM.nextInt(POSITION_COUNT), true, false, 0.2f, 0.2f, false, ImmutableList.of()));
pages.add(PageAssertions.createPageWithRandomData(targetTypes, POSITION_COUNT + RANDOM.nextInt(POSITION_COUNT), true, false, 0.2f, 0.2f, true, ImmutableList.of(DICTIONARY, DICTIONARY, RUN_LENGTH)));
pages.add(PageAssertions.createPageWithRandomData(targetTypes, POSITION_COUNT + RANDOM.nextInt(POSITION_COUNT), true, false, 0.2f, 0.2f, true, ImmutableList.of(RUN_LENGTH, DICTIONARY, DICTIONARY)));
}
testPartitioned(types, pages, new DataSize(128, MEGABYTE));
testPartitioned(types, pages, new DataSize(1, KILOBYTE));
}
private void testReplicatedSinglePage(List<Type> targetTypes)
{
// Add a block that only contain null as the last block to force replicating all rows.
List<Type> types = updateBlockTypesWithHashBlockAndNullBlock(targetTypes, true, true);
Page page = PageAssertions.createPageWithRandomData(targetTypes, POSITION_COUNT, true, true, 0.2f, 0.2f, false, ImmutableList.of());
testReplicated(types, ImmutableList.of(page), new DataSize(128, MEGABYTE));
testReplicated(types, ImmutableList.of(page), new DataSize(1, KILOBYTE));
}
private void testReplicatedMultiplePages(List<Type> targetTypes)
{
List<Type> types = updateBlockTypesWithHashBlockAndNullBlock(targetTypes, true, true);
List<Page> pages = new ArrayList<>();
for (int i = 0; i < PAGE_COUNT; i++) {
pages.add(PageAssertions.createPageWithRandomData(targetTypes, POSITION_COUNT + RANDOM.nextInt(POSITION_COUNT), true, true, 0.2f, 0.2f, false, ImmutableList.of()));
}
testReplicated(types, pages, new DataSize(128, MEGABYTE));
testReplicated(types, pages, new DataSize(1, KILOBYTE));
}
private void testPartitionedForZeroBlocks(List<Type> types, List<Page> pages, DataSize maxMemory)
{
testPartitioned(types, pages, maxMemory, ImmutableList.of(), new InterpretedHashGenerator(ImmutableList.of(), new int[0]));
}
private void testPartitioned(List<Type> types, List<Page> pages, DataSize maxMemory)
{
testPartitioned(types, pages, maxMemory, ImmutableList.of(0), new PrecomputedHashGenerator(0));
}
private void testPartitioned(List<Type> types, List<Page> pages, DataSize maxMemory, List<Integer> partitionChannel, HashGenerator hashGenerator)
{
TestingPartitionedOutputBuffer outputBuffer = createPartitionedOutputBuffer();
PartitionFunction partitionFunction = new LocalPartitionGenerator(hashGenerator, PARTITION_COUNT);
OptimizedPartitionedOutputOperator operator = createOptimizedPartitionedOutputOperator(
types,
partitionChannel,
partitionFunction,
outputBuffer,
OptionalInt.empty(),
maxMemory);
Map<Integer, List<Page>> expectedPageList = new HashMap<>();
for (Page page : pages) {
Map<Integer, List<Integer>> positionsByPartition = new HashMap<>();
for (int i = 0; i < page.getPositionCount(); i++) {
int partitionNumber = partitionFunction.getPartition(page, i);
positionsByPartition.computeIfAbsent(partitionNumber, k -> new ArrayList<>()).add(i);
}
for (Map.Entry<Integer, List<Integer>> entry : positionsByPartition.entrySet()) {
if (!entry.getValue().isEmpty()) {
expectedPageList.computeIfAbsent(entry.getKey(), k -> new ArrayList<>()).add(copyPositions(page, entry.getValue()));
}
}
operator.addInput(page);
}
operator.finish();
Map<Integer, Page> expectedPages = Maps.transformValues(expectedPageList, outputPages -> mergePages(types, outputPages));
Map<Integer, Page> actualPages = Maps.transformValues(outputBuffer.getPages(), outputPages -> mergePages(types, outputPages));
assertEquals(actualPages.size(), expectedPages.size());
assertEquals(actualPages.keySet(), expectedPages.keySet());
for (Map.Entry<Integer, Page> entry : expectedPages.entrySet()) {
int key = entry.getKey();
assertPageEquals(types, actualPages.get(key), entry.getValue());
}
}
private void testReplicated(List<Type> types, List<Page> pages, DataSize maxMemory)
{
TestingPartitionedOutputBuffer outputBuffer = createPartitionedOutputBuffer();
PartitionFunction partitionFunction = new LocalPartitionGenerator(new PrecomputedHashGenerator(0), PARTITION_COUNT);
OptimizedPartitionedOutputOperator operator = createOptimizedPartitionedOutputOperator(
types,
ImmutableList.of(0),
partitionFunction,
outputBuffer,
OptionalInt.of(types.size() - 1),
maxMemory);
for (Page page : pages) {
operator.addInput(page);
}
operator.finish();
Map<Integer, List<Page>> actualPageLists = outputBuffer.getPages();
assertEquals(actualPageLists.size(), PARTITION_COUNT);
Page expectedPage = mergePages(types, pages);
actualPageLists.values().forEach(pageList -> assertPageEquals(types, mergePages(types, pageList), expectedPage));
}
@Test
public void testOutputForSimplePage()
{
OptimizedPartitionedOutputOperator operator = createOptimizedPartitionedOutputOperator(ImmutableList.of(BIGINT), false);
processPages(operator, TESTING_PAGE);
verifyOutputSizes(operator, PAGE_COUNT * TESTING_PAGE.getLogicalSizeInBytes(), PAGE_COUNT * TESTING_PAGE.getPositionCount());
}
@Test
public void testOutputSizeForPageWithDictionary()
{
OptimizedPartitionedOutputOperator operator = createOptimizedPartitionedOutputOperator(ImmutableList.of(BIGINT), false);
processPages(operator, TESTING_PAGE_WITH_DICTIONARY_BLOCK);
verifyOutputSizes(operator, PAGE_COUNT * TESTING_PAGE_WITH_DICTIONARY_BLOCK.getLogicalSizeInBytes(), PAGE_COUNT * TESTING_PAGE_WITH_DICTIONARY_BLOCK.getPositionCount());
}
@Test
public void testOutputForPageWithRunLength()
{
OptimizedPartitionedOutputOperator operator = createOptimizedPartitionedOutputOperator(ImmutableList.of(BIGINT), false);
processPages(operator, TESTING_PAGE_WITH_RLE_BLOCK);
verifyOutputSizes(operator, PAGE_COUNT * TESTING_PAGE_WITH_RLE_BLOCK.getLogicalSizeInBytes(), PAGE_COUNT * TESTING_PAGE_WITH_RLE_BLOCK.getPositionCount());
}
@Test
public void testOutputForSimplePageReplicated()
{
OptimizedPartitionedOutputOperator operator = createOptimizedPartitionedOutputOperator(ImmutableList.of(BIGINT), true);
processPages(operator, TESTING_PAGE_WITH_NULL_BLOCK);
// Use TESTING_PAGE instead of TESTING_PAGE_WITH_NULL_BLOCK's logical size to estimate the output data size, because the null Block should not be sent over the wire.
verifyOutputSizes(operator, PARTITION_COUNT * PAGE_COUNT * TESTING_PAGE.getLogicalSizeInBytes(), PARTITION_COUNT * PAGE_COUNT * TESTING_PAGE_WITH_NULL_BLOCK.getPositionCount());
}
@Test
public void testOutputForPageWithDictionaryReplicated()
{
OptimizedPartitionedOutputOperator operator = createOptimizedPartitionedOutputOperator(ImmutableList.of(BIGINT), true);
processPages(operator, TESTING_PAGE_WITH_NULL_AND_DICTIONARY_BLOCK);
// Use TESTING_PAGE_WITH_DICTIONARY_BLOCK instead of TESTING_PAGE_WITH_NULL_AND_DICTIONARY_BLOCK's logical size to estimate the output data size, because the null Block should not be sent over the wire.
verifyOutputSizes(operator, PARTITION_COUNT * PAGE_COUNT * TESTING_PAGE_WITH_DICTIONARY_BLOCK.getLogicalSizeInBytes(), PARTITION_COUNT * PAGE_COUNT * TESTING_PAGE_WITH_DICTIONARY_BLOCK.getPositionCount());
}
@Test
public void testOutputForPageWithRunLengthReplicated()
{
OptimizedPartitionedOutputOperator operator = createOptimizedPartitionedOutputOperator(ImmutableList.of(BIGINT), true);
processPages(operator, TESTING_PAGE_WITH_NULL_AND_RLE_BLOCK);
// Use TESTING_PAGE_WITH_RLE_BLOCK instead of TESTING_PAGE_WITH_NULL_AND_RLE_BLOCK's logical size to estimate the output data size, because the null Block should not be sent over the wire.
verifyOutputSizes(operator, PARTITION_COUNT * PAGE_COUNT * TESTING_PAGE_WITH_RLE_BLOCK.getLogicalSizeInBytes(), PARTITION_COUNT * PAGE_COUNT * TESTING_PAGE_WITH_NULL_AND_RLE_BLOCK.getPositionCount());
}
private static void processPages(OptimizedPartitionedOutputOperator operator, Page testingPageWithRleBlock)
{
for (int i = 0; i < PAGE_COUNT; i++) {
operator.addInput(testingPageWithRleBlock);
}
operator.finish();
}
private static void verifyOutputSizes(OptimizedPartitionedOutputOperator operator, long expectedSizeInBytes, long expectedPositionCount)
{
OperatorContext operatorContext = operator.getOperatorContext();
assertBetweenInclusive(operatorContext.getOutputDataSize().getTotalCount(),
(long) (expectedSizeInBytes / OUTPUT_SIZE_ESTIMATION_ERROR_ALLOWANCE),
(long) (expectedSizeInBytes * OUTPUT_SIZE_ESTIMATION_ERROR_ALLOWANCE));
assertEquals(operatorContext.getOutputPositions().getTotalCount(), expectedPositionCount);
}
private Page copyPositions(Page page, List<Integer> positions)
{
Block[] blocks = new Block[page.getChannelCount()];
for (int i = 0; i < blocks.length; i++) {
blocks[i] = page.getBlock(i).copyPositions(positions.stream().mapToInt(j -> j).toArray(), 0, positions.size());
}
return new Page(positions.size(), blocks);
}
private TestingPartitionedOutputBuffer createPartitionedOutputBuffer()
{
OutputBuffers buffers = createInitialEmptyOutputBuffers(PARTITIONED);
for (int partition = 0; partition < PARTITION_COUNT; partition++) {
buffers = buffers.withBuffer(new OutputBuffers.OutputBufferId(partition), partition);
}
TestingPartitionedOutputBuffer buffer = createPartitionedBuffer(
buffers.withNoMoreBufferIds(),
new DataSize(Long.MAX_VALUE, BYTE)); // don't let output buffer block
buffer.registerLifespanCompletionCallback(ignore -> {});
return buffer;
}
private OptimizedPartitionedOutputOperator createOptimizedPartitionedOutputOperator(List<Type> types, boolean replicateAllRows)
{
TestingPartitionedOutputBuffer outputBuffer = createPartitionedOutputBuffer();
PartitionFunction partitionFunction = new LocalPartitionGenerator(new PrecomputedHashGenerator(0), PARTITION_COUNT);
if (replicateAllRows) {
List<Type> replicatedTypes = updateBlockTypesWithHashBlockAndNullBlock(types, false, true);
return createOptimizedPartitionedOutputOperator(
replicatedTypes,
ImmutableList.of(0),
partitionFunction,
outputBuffer,
OptionalInt.of(replicatedTypes.size() - 1),
MAX_MEMORY);
}
else {
return createOptimizedPartitionedOutputOperator(
types,
ImmutableList.of(0),
partitionFunction,
outputBuffer,
OptionalInt.empty(),
MAX_MEMORY);
}
}
private OptimizedPartitionedOutputOperator createOptimizedPartitionedOutputOperator(
List<Type> types,
List<Integer> partitionChannel,
PartitionFunction partitionFunction,
PartitionedOutputBuffer buffer,
OptionalInt nullChannel,
DataSize maxMemory)
{
PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new BlockEncodingManager(), CompressionCodec.NONE);
OutputPartitioning outputPartitioning = new OutputPartitioning(
partitionFunction,
partitionChannel,
ImmutableList.of(Optional.empty()),
false,
nullChannel);
OptimizedPartitionedOutputFactory operatorFactory = new OptimizedPartitionedOutputFactory(buffer, maxMemory);
return (OptimizedPartitionedOutputOperator) operatorFactory
.createOutputOperator(0, new PlanNodeId("plan-node-0"), types, Function.identity(), Optional.of(outputPartitioning), serdeFactory)
.createOperator(createDriverContext());
}
private DriverContext createDriverContext()
{
Session testSession = testSessionBuilder()
.setCatalog("tpch")
.setSchema(TINY_SCHEMA_NAME)
.build();
return TestingTaskContext.builder(EXECUTOR, SCHEDULER, testSession)
.setMemoryPoolSize(MAX_MEMORY)
.build()
.addPipelineContext(0, true, true, false)
.addDriverContext();
}
private TestingPartitionedOutputBuffer createPartitionedBuffer(OutputBuffers buffers, DataSize dataSize)
{
return new TestingPartitionedOutputBuffer(
"task-instance-id",
new StateMachine<>("bufferState", SCHEDULER, OPEN, TERMINAL_BUFFER_STATES),
buffers,
dataSize,
() -> new SimpleLocalMemoryContext(newSimpleAggregatedMemoryContext(), "test"),
SCHEDULER);
}
private static Block createVariableWidthBlockOverSliceView(int entries)
{
// Create a slice view whose address starts in the middle of the original slice, and length is half of original slice
DynamicSliceOutput dynamicSliceOutput = new DynamicSliceOutput(entries * 2);
for (int i = 0; i < entries * 2; i++) {
dynamicSliceOutput.writeByte(i);
}
Slice slice = dynamicSliceOutput.slice().slice(entries, entries);
int[] offsets = IntStream.range(0, entries + 1).toArray();
return new VariableWidthBlock(entries, slice, offsets, Optional.empty());
}
private static class TestingPartitionedOutputBuffer
extends PartitionedOutputBuffer
{
private final Map<Integer, List<Page>> pages = new HashMap<>();
public TestingPartitionedOutputBuffer(
String taskInstanceId,
StateMachine<BufferState> state,
OutputBuffers outputBuffers,
DataSize maxBufferSize,
Supplier<LocalMemoryContext> systemMemoryContextSupplier,
Executor notificationExecutor)
{
super(taskInstanceId, state, outputBuffers, maxBufferSize.toBytes(), systemMemoryContextSupplier, notificationExecutor);
}
@Override
public void enqueue(Lifespan lifespan, int partitionNumber, List<SerializedPage> pages)
{
this.pages.computeIfAbsent(partitionNumber, k -> new ArrayList<>());
pages.stream().map(PAGES_SERDE::deserialize).forEach(this.pages.get(partitionNumber)::add);
}
public Map<Integer, List<Page>> getPages()
{
return pages;
}
}
}