MergingPageOutput.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator.project;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.Type;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.openjdk.jol.info.ClassLayout;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import static com.facebook.presto.common.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES;
import static com.facebook.presto.operator.project.PageProcessor.MAX_BATCH_SIZE;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;
/**
* This class is intended to be used right after the PageProcessor to ensure
* that the size of the pages returned by FilterAndProject and ScanFilterAndProject
* is big enough so it does not introduce considerable synchronization overhead.
* <p>
* As long as the input page contains more than {@link MergingPageOutput#minRowCount} rows
* or is bigger than {@link MergingPageOutput#minPageSizeInBytes} it is returned as is without
* additional memory copy.
* <p>
* The page data that has been buffered so far before receiving a "big" page is being flushed
* before transferring a "big" page.
* <p>
* Although it is still possible that the {@link MergingPageOutput} may return a tiny page,
* this situation is considered to be rare due to the assumption that filter selectivity may not
* vary a lot based on the particular input page.
* <p>
* Considering the CPU time required to process(filter, project) a full (~1MB) page returned by a
* connector, the CPU cost of memory copying (< 50kb, < 1024 rows) is supposed to be negligible.
*/
@NotThreadSafe
public class MergingPageOutput
{
@VisibleForTesting
static final int INSTANCE_SIZE = ClassLayout.parseClass(MergingPageOutput.class).instanceSize();
private static final int MAX_MIN_PAGE_SIZE = 1024 * 1024;
private final List<Type> types;
@Nullable
private final PageBuilder pageBuilder; // when null, only page position counts are output
private final Queue<Page> outputQueue = new LinkedList<>();
private final long minPageSizeInBytes;
private final int minRowCount;
private int pendingPositionCount;
@Nullable
private Iterator<Optional<Page>> currentInput;
private boolean finishing;
public MergingPageOutput(Iterable<? extends Type> types, long minPageSizeInBytes, int minRowCount)
{
this(types, minPageSizeInBytes, minRowCount, DEFAULT_MAX_PAGE_SIZE_IN_BYTES);
}
public MergingPageOutput(Iterable<? extends Type> types, long minPageSizeInBytes, int minRowCount, int maxPageSizeInBytes)
{
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
checkArgument(minRowCount >= 0, "minRowCount must be greater or equal than zero");
checkArgument(minRowCount <= MAX_BATCH_SIZE, "minRowCount must be less than or equal to %s", MAX_BATCH_SIZE);
checkArgument(maxPageSizeInBytes > 0, "maxPageSizeInBytes must be greater than zero");
checkArgument(maxPageSizeInBytes >= minPageSizeInBytes, "maxPageSizeInBytes must be greater or equal than minPageSizeInBytes");
checkArgument(minPageSizeInBytes <= MAX_MIN_PAGE_SIZE, "minPageSizeInBytes must be less or equal than %d", MAX_MIN_PAGE_SIZE);
this.minPageSizeInBytes = minPageSizeInBytes;
this.minRowCount = minRowCount;
if (this.types.isEmpty()) {
pageBuilder = null; // position count only mode
}
else {
pageBuilder = PageBuilder.withMaxPageSize(maxPageSizeInBytes, this.types);
}
}
public boolean needsInput()
{
return currentInput == null && !finishing && outputQueue.isEmpty();
}
public void addInput(Iterator<Optional<Page>> input)
{
requireNonNull(input, "input is null");
checkState(!finishing, "output is in finishing state");
checkState(currentInput == null, "currentInput is present");
currentInput = input;
}
@Nullable
public Page getOutput()
{
if (isPositionCountOnly()) {
return producePositionCountOnlyOutput();
}
if (!outputQueue.isEmpty()) {
return outputQueue.poll();
}
while (currentInput != null) {
if (!currentInput.hasNext()) {
currentInput = null;
break;
}
if (!outputQueue.isEmpty()) {
break;
}
Optional<Page> next = currentInput.next();
if (next.isPresent()) {
process(next.get());
}
else {
break;
}
}
if (currentInput == null && finishing) {
flush();
}
return outputQueue.poll();
}
public void finish()
{
finishing = true;
}
public boolean isFinished()
{
return finishing && currentInput == null && outputQueue.isEmpty() && pendingPositionCount == 0 && (isPositionCountOnly() || pageBuilder.isEmpty());
}
private boolean isPositionCountOnly()
{
return pageBuilder == null;
}
/**
* Specialized implementation of {@link MergingPageOutput#getOutput()} that:
* 1. Doesn't use the {@link MergingPageOutput#pageBuilder} because we can accumulate small pages in an integer field
* 2. Can arbitrarily reorder output pages with no columns and therefore does not need to flush small accumulated input
* values when a sufficiently large page is encountered
* 3. Will periodically flush accumulated positionCounts once a combined sum of {@link PageProcessor#MAX_BATCH_SIZE} is
* reached, this avoids creating pages with huge position counts that might harm downstream operators
* 4. As a consequence of the above, will always produce either 0 or 1 output pages, which means it doesn't need or use {@link MergingPageOutput#outputQueue}
*/
@Nullable
private Page producePositionCountOnlyOutput()
{
while (currentInput != null) {
if (!currentInput.hasNext()) {
currentInput = null;
break;
}
Optional<Page> next = currentInput.next();
if (next.isPresent()) {
Page nextPage = next.get();
if (nextPage.getPositionCount() >= MAX_BATCH_SIZE) {
// Return pages exceeding the target size directly without accumulating
return nextPage;
}
// Accumulate pending positions for small pages
pendingPositionCount += nextPage.getPositionCount();
if (nextPage.getPositionCount() >= minRowCount || pendingPositionCount >= MAX_BATCH_SIZE) {
// Produce a combined positionCount output when individual pages meet the minRowCount
// or when accumulated pending positions has reached our output size limit
int outputPositions = min(pendingPositionCount, MAX_BATCH_SIZE);
pendingPositionCount -= outputPositions;
return new Page(outputPositions);
}
}
else {
break; // yield triggered
}
}
if (currentInput == null && finishing && pendingPositionCount > 0) {
// Flush the remaining output positions
Page result = new Page(pendingPositionCount);
pendingPositionCount = 0;
return result;
}
return null;
}
private void process(Page page)
{
requireNonNull(page, "page is null");
int inputPositions = page.getPositionCount();
if (inputPositions == 0) {
return;
}
// avoid memory copying for pages that are big enough
if (page.getSizeInBytes() >= minPageSizeInBytes || inputPositions >= minRowCount) {
flush();
outputQueue.add(page);
return;
}
buffer(page);
}
private void buffer(Page page)
{
checkArgument(!isPositionCountOnly(), "position count only pages should not be buffered");
pageBuilder.declarePositions(page.getPositionCount());
for (int channel = 0; channel < types.size(); channel++) {
Type type = types.get(channel);
Block block = page.getBlock(channel);
BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(channel);
for (int position = 0; position < page.getPositionCount(); position++) {
type.appendTo(block, position, blockBuilder);
}
}
if (pageBuilder.isFull()) {
flush();
}
}
private void flush()
{
if (!isPositionCountOnly() && !pageBuilder.isEmpty()) {
Page output = pageBuilder.build();
pageBuilder.reset();
outputQueue.add(output);
}
}
public long getRetainedSizeInBytes()
{
if (isPositionCountOnly()) {
return INSTANCE_SIZE; // position count only does not use the pageBuilder or outputQueue
}
long retainedSizeInBytes = INSTANCE_SIZE + pageBuilder.getRetainedSizeInBytes();
for (Page page : outputQueue) {
retainedSizeInBytes += page.getRetainedSizeInBytes();
}
return retainedSizeInBytes;
}
}