BenchmarkUnnestOperator.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.operator.unnest.UnnestOperator;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.testing.TestingTaskContext;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.profile.GCProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.metadata.MetadataManager.createTestMetadataManager;
import static com.facebook.presto.operator.PageAssertions.createPageWithRandomData;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
@State(Scope.Thread)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Fork(3)
@Warmup(iterations = 8, time = 1000, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 8, time = 1000, timeUnit = TimeUnit.MILLISECONDS)
@BenchmarkMode(Mode.AverageTime)
public class BenchmarkUnnestOperator
{
private static final int TOTAL_POSITIONS = 10000;
@Benchmark
public List<Page> unnest(BenchmarkData context)
{
DriverContext driverContext = context.createTaskContext().addPipelineContext(0, true, true, false).addDriverContext();
Operator operator = context.getOperatorFactory().createOperator(driverContext);
Iterator<Page> input = context.getPages().iterator();
ImmutableList.Builder<Page> outputPages = ImmutableList.builder();
boolean finishing = false;
for (int loops = 0; !operator.isFinished() && loops < 1_000_000; loops++) {
if (operator.needsInput()) {
if (input.hasNext()) {
Page inputPage = input.next();
operator.addInput(inputPage);
}
else if (!finishing) {
operator.finish();
finishing = true;
}
}
Page outputPage = operator.getOutput();
if (outputPage != null) {
outputPages.add(outputPage);
}
}
return outputPages.build();
}
@State(Scope.Thread)
public static class BenchmarkData
{
@Param({
"bigint",
"varchar"
})
private String replicateType = "bigint";
@Param({
"array(varchar)",
"array(integer)",
"map(varchar,varchar)",
"array(row(varchar,varchar,varchar))",
"array(array(varchar))",
"array(bigint)|array(bigint)",
"array(varchar)|array(varchar)"
})
private String nestedType = "array(bigint)";
@SuppressWarnings("unused")
@Param({"0.0", "0.2"})
private float primitiveNullsRatio; // % of nulls in input primitive elements
@SuppressWarnings("unused")
@Param({"0.0", "0.05"})
private float rowNullsRatio; // % of nulls in row type elements
@Param("1000")
private int positionsPerPage = 1000;
@SuppressWarnings("unused")
@Param({"false", "true"})
private boolean withOrdinality;
private ExecutorService executor;
private ScheduledExecutorService scheduledExecutor;
private OperatorFactory operatorFactory;
private List<Page> pages = new ArrayList<>();
@Setup
public void setup()
{
executor = newCachedThreadPool(daemonThreadsNamed("test-executor-%s"));
scheduledExecutor = newScheduledThreadPool(2, daemonThreadsNamed("test-scheduledExecutor-%s"));
Metadata metadata = createTestMetadataManager();
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> replicatedTypesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> unnestTypesBuilder = ImmutableList.builder();
ImmutableList.Builder<Integer> replicatedChannelsBuilder = ImmutableList.builder();
ImmutableList.Builder<Integer> unnestChannelsBuilder = ImmutableList.builder();
String[] replicatedTypes = replicateType.split("\\|");
for (int i = 0; i < replicatedTypes.length; i++) {
Type replicateType = getType(metadata, replicatedTypes[i]).get();
typesBuilder.add(replicateType);
replicatedTypesBuilder.add(replicateType);
replicatedChannelsBuilder.add(i);
}
String[] unnestTypes = nestedType.split("\\|");
for (int i = 0; i < unnestTypes.length; i++) {
Type unnestType = getType(metadata, unnestTypes[i]).get();
typesBuilder.add(unnestType);
unnestTypesBuilder.add(unnestType);
unnestChannelsBuilder.add(i + replicatedTypes.length);
}
int pageCount = TOTAL_POSITIONS / positionsPerPage;
for (int i = 0; i < pageCount; i++) {
pages.add(createPageWithRandomData(
typesBuilder.build(),
positionsPerPage,
false,
false,
primitiveNullsRatio,
rowNullsRatio,
false,
ImmutableList.of()));
}
operatorFactory = new UnnestOperator.UnnestOperatorFactory(
0,
new PlanNodeId("test"),
replicatedChannelsBuilder.build(),
replicatedTypesBuilder.build(),
unnestChannelsBuilder.build(),
unnestTypesBuilder.build(),
withOrdinality);
}
public Optional<Type> getType(Metadata metadata, String typeString)
{
if (typeString.equals("NONE")) {
return Optional.empty();
}
TypeSignature signature = TypeSignature.parseTypeSignature(typeString);
return Optional.of(metadata.getType(signature));
}
@TearDown
public void cleanup()
{
executor.shutdownNow();
scheduledExecutor.shutdownNow();
}
public TaskContext createTaskContext()
{
return TestingTaskContext.createTaskContext(executor, scheduledExecutor, TEST_SESSION, new DataSize(2, GIGABYTE));
}
public OperatorFactory getOperatorFactory()
{
return operatorFactory;
}
public List<Page> getPages()
{
return pages;
}
}
public static void main(String[] args)
throws RunnerException
{
BenchmarkData data = new BenchmarkData();
data.setup();
new BenchmarkUnnestOperator().unnest(data);
Options options = new OptionsBuilder()
.verbosity(VerboseMode.NORMAL)
.include(".*" + BenchmarkUnnestOperator.class.getSimpleName() + ".*")
.addProfiler(GCProfiler.class)
.build();
new Runner(options).run();
}
}