BenchmarkWindowOperator.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;
import com.facebook.presto.RowPagesBuilder;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.block.SortOrder;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.testing.TestingTaskContext;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import io.airlift.units.DataSize;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.block.BlockAssertions.createLongSequenceBlock;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.operator.BenchmarkWindowOperator.Context.ROWS_PER_PAGE;
import static com.facebook.presto.operator.BenchmarkWindowOperator.Context.TOTAL_PAGES;
import static com.facebook.presto.operator.TestWindowOperator.ROW_NUMBER;
import static com.facebook.presto.operator.TestWindowOperator.createFactoryUnbounded;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.openjdk.jmh.annotations.Mode.AverageTime;
import static org.openjdk.jmh.annotations.Scope.Thread;
import static org.testng.Assert.assertEquals;
@State(Thread)
@OutputTimeUnit(MILLISECONDS)
@BenchmarkMode(AverageTime)
@Fork(3)
@Warmup(iterations = 5)
@Measurement(iterations = 10, time = 2, timeUnit = SECONDS)
public class BenchmarkWindowOperator
{
@State(Thread)
public static class Context
{
public static final int NUMBER_OF_GROUP_COLUMNS = 2;
public static final int TOTAL_PAGES = 140;
public static final int ROWS_PER_PAGE = 10000;
private static final List<Integer> PARTITION_CHANNELS = Ints.asList(0, 1);
@Param({"10", "20", "100"})
public int rowsPerPartition;
@Param({"0", "1", "2", "3"})
public int numberOfPregroupedColumns;
@Param({"10", "50", "100"})
public int partitionsPerGroup;
private ExecutorService executor;
private ScheduledExecutorService scheduledExecutor;
private OperatorFactory operatorFactory;
private List<Page> pages;
@Setup
public void setup()
{
executor = newCachedThreadPool(daemonThreadsNamed("test-executor-%s"));
scheduledExecutor = newScheduledThreadPool(2, daemonThreadsNamed("test-scheduledExecutor-%s"));
createOperatorFactoryAndGenerateTestData(numberOfPregroupedColumns);
}
private void createOperatorFactoryAndGenerateTestData(int numberOfPreGroupedColumns)
{
pages = generateTestData();
if (numberOfPreGroupedColumns == 0) {
// Ungrouped
operatorFactory = createFactoryUnbounded(
ImmutableList.of(BIGINT, BIGINT, BIGINT, BIGINT),
Ints.asList(0, 1, 2, 3),
ROW_NUMBER,
PARTITION_CHANNELS,
Ints.asList(),
Ints.asList(3),
ImmutableList.of(SortOrder.ASC_NULLS_LAST),
0,
new DummySpillerFactory(),
false);
}
else if (numberOfPreGroupedColumns < NUMBER_OF_GROUP_COLUMNS) {
// Partially grouped
operatorFactory = createFactoryUnbounded(
ImmutableList.of(BIGINT, BIGINT, BIGINT, BIGINT),
Ints.asList(0, 1, 2, 3),
ROW_NUMBER,
PARTITION_CHANNELS,
Ints.asList(1),
Ints.asList(3),
ImmutableList.of(SortOrder.ASC_NULLS_LAST),
0,
new DummySpillerFactory(),
false);
}
else {
// Fully grouped and (potentially) sorted
operatorFactory = createFactoryUnbounded(
ImmutableList.of(BIGINT, BIGINT, BIGINT, BIGINT),
Ints.asList(0, 1, 2, 3),
ROW_NUMBER,
PARTITION_CHANNELS,
Ints.asList(0, 1),
Ints.asList(3),
ImmutableList.of(SortOrder.ASC_NULLS_LAST),
(numberOfPreGroupedColumns - NUMBER_OF_GROUP_COLUMNS),
new DummySpillerFactory(),
false);
}
}
private List<Page> generateTestData()
{
List<Type> typesArray = new ArrayList<>();
int currentPartitionIdentifier = 1;
typesArray.add(BIGINT);
typesArray.add(BIGINT);
typesArray.add(BIGINT);
typesArray.add(BIGINT);
RowPagesBuilder pagesBuilder = buildPages(currentPartitionIdentifier, typesArray);
return pagesBuilder.build();
}
private RowPagesBuilder buildPages(int currentPartitionIdentifier, List<Type> typesArray)
{
int groupIdentifier = 100;
RowPagesBuilder rowPagesBuilder = RowPagesBuilder.rowPagesBuilder(false, ImmutableList.of(0), typesArray);
for (int i = 0; i < TOTAL_PAGES; i++) {
BlockBuilder firstColumnBlockBuilder = BIGINT.createBlockBuilder(null, ROWS_PER_PAGE);
BlockBuilder secondColumnBlockBuilder = BIGINT.createBlockBuilder(null, ROWS_PER_PAGE);
int currentNumberOfRowsInPartition = 0;
int numberOfPartitionsInCurrentGroup = 0;
int currentGroupIdentifier = groupIdentifier++;
for (int j = 0; j < ROWS_PER_PAGE; j++) {
if (currentNumberOfRowsInPartition == rowsPerPartition) {
++currentPartitionIdentifier;
++numberOfPartitionsInCurrentGroup;
currentNumberOfRowsInPartition = 0;
}
if (numberOfPartitionsInCurrentGroup == partitionsPerGroup) {
numberOfPartitionsInCurrentGroup = 0;
currentGroupIdentifier = groupIdentifier++;
}
firstColumnBlockBuilder.writeLong(currentGroupIdentifier);
secondColumnBlockBuilder.writeLong(currentPartitionIdentifier);
++currentNumberOfRowsInPartition;
}
rowPagesBuilder.addBlocksPage(
firstColumnBlockBuilder.build(),
secondColumnBlockBuilder.build(),
createLongSequenceBlock(0, ROWS_PER_PAGE),
createLongSequenceBlock(0, ROWS_PER_PAGE));
}
return rowPagesBuilder;
}
public TaskContext createTaskContext()
{
return TestingTaskContext.createTaskContext(executor, scheduledExecutor, TEST_SESSION, new DataSize(2, GIGABYTE));
}
public OperatorFactory getOperatorFactory()
{
return operatorFactory;
}
public List<Page> getPages()
{
return pages;
}
}
@Benchmark
public List<Page> benchmark(BenchmarkWindowOperator.Context context)
{
DriverContext driverContext = context.createTaskContext().addPipelineContext(0, true, true, false).addDriverContext();
Operator operator = context.getOperatorFactory().createOperator(driverContext);
Iterator<Page> input = context.getPages().iterator();
ImmutableList.Builder<Page> outputPages = ImmutableList.builder();
boolean finishing = false;
for (int loops = 0; !operator.isFinished() && loops < 1_000_000; loops++) {
if (operator.needsInput()) {
if (input.hasNext()) {
Page inputPage = input.next();
operator.addInput(inputPage);
}
else if (!finishing) {
operator.finish();
finishing = true;
}
}
Page outputPage = operator.getOutput();
if (outputPage != null) {
outputPages.add(outputPage);
}
}
return outputPages.build();
}
@Test
public void verifyUnGroupedWithMultiplePartitions()
{
verify(10, 0, false);
}
@Test
public void verifyUnGroupedWithSinglePartition()
{
verify(10, 0, true);
}
@Test
public void verifyPartiallyGroupedWithMultiplePartitions()
{
verify(10, 1, false);
}
@Test
public void verifyPartiallyGroupedWithSinglePartition()
{
verify(10, 1, true);
}
@Test
public void verifyFullyGroupedWithMultiplePartitions()
{
verify(10, 2, false);
}
@Test
public void verifyFullyGroupedWithSinglePartition()
{
verify(10, 2, true);
}
@Test
public void verifyFullyGroupedAndFullySortedWithMultiplePartitions()
{
verify(10, 3, false);
}
@Test
public void verifyFullyGroupedAndFullySortedWithSinglePartition()
{
verify(10, 3, true);
}
private void verify(
int numberOfRowsPerPartition,
int numberOfPreGroupedColumns,
boolean useSinglePartition)
{
Context context = new Context();
context.rowsPerPartition = numberOfRowsPerPartition;
context.numberOfPregroupedColumns = numberOfPreGroupedColumns;
if (useSinglePartition) {
context.partitionsPerGroup = 1;
context.rowsPerPartition = ROWS_PER_PAGE;
}
context.setup();
assertEquals(TOTAL_PAGES, context.getPages().size());
for (int i = 0; i < TOTAL_PAGES; i++) {
assertEquals(ROWS_PER_PAGE, context.getPages().get(i).getPositionCount());
}
benchmark(context);
}
public static void main(String[] args)
throws RunnerException
{
Options options = new OptionsBuilder()
.verbosity(VerboseMode.NORMAL)
.include(".*" + BenchmarkWindowOperator.class.getSimpleName() + ".*")
.build();
new Runner(options).run();
}
}