TestInMemoryGroupedTopNBuilder.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;
import com.facebook.presto.RowPagesBuilder;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.array.ObjectBigArray;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.memory.TestingMemoryContext;
import com.facebook.presto.sql.gen.JoinCompiler;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.ints.IntArrayFIFOQueue;
import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue;
import org.openjdk.jol.info.ClassLayout;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.facebook.airlift.testing.Assertions.assertGreaterThan;
import static com.facebook.presto.RowPagesBuilder.rowPagesBuilder;
import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_LAST;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.metadata.MetadataManager.createTestMetadataManager;
import static com.facebook.presto.operator.PageAssertions.assertPageEquals;
import static com.facebook.presto.operator.UpdateMemory.NOOP;
import static io.airlift.slice.SizeOf.sizeOf;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
public class TestInMemoryGroupedTopNBuilder
{
private static final long INSTANCE_SIZE = ClassLayout.parseClass(InMemoryGroupedTopNBuilder.class).instanceSize();
private static final long INT_FIFO_QUEUE_SIZE = ClassLayout.parseClass(IntArrayFIFOQueue.class).instanceSize();
private static final long OBJECT_OVERHEAD = ClassLayout.parseClass(Object.class).instanceSize();
private static final long PAGE_REFERENCE_INSTANCE_SIZE = ClassLayout.parseClass(TestPageReference.class).instanceSize();
@DataProvider
public static Object[][] produceRowNumbers()
{
return new Object[][] {{true}, {false}};
}
@DataProvider
public static Object[][] pageRowCounts()
{
// make either page or row count > 1024 to expand the big arrays
return new Object[][] {{10000, 20}, {20, 10000}};
}
@Test
public void testEmptyInput()
{
InMemoryGroupedTopNBuilder groupedTopNBuilder = new InMemoryGroupedTopNBuilder(
ImmutableList.of(BIGINT),
(left, leftPosition, right, rightPosition) -> {
throw new UnsupportedOperationException();
},
5,
false,
new TestingMemoryContext(100L),
new NoChannelGroupByHash());
assertFalse(groupedTopNBuilder.buildResult().iterator().hasNext());
}
@Test(dataProvider = "produceRowNumbers")
public void testMultiGroupTopN(boolean produceRowNumbers)
{
List<Type> types = ImmutableList.of(BIGINT, DOUBLE);
List<Page> input = rowPagesBuilder(types)
.row(1L, 0.3)
.row(2L, 0.2)
.row(3L, 0.9)
.row(3L, 0.1)
.pageBreak()
.row(1L, 0.4)
.pageBreak()
.row(1L, 0.5)
.row(1L, 0.6)
.row(4L, 0.6)
.row(2L, 0.8)
.row(2L, 0.7)
.pageBreak()
.row(2L, 0.9)
.build();
for (Page page : input) {
page.compact();
}
GroupByHash groupByHash = createGroupByHash(ImmutableList.of(types.get(0)), ImmutableList.of(0), NOOP);
InMemoryGroupedTopNBuilder groupedTopNBuilder = new InMemoryGroupedTopNBuilder(
types,
new SimplePageWithPositionComparator(types, ImmutableList.of(1), ImmutableList.of(ASC_NULLS_LAST)),
2,
produceRowNumbers,
new TestingMemoryContext(100L),
groupByHash);
assertBuilderSize(groupByHash, types, ImmutableList.of(), ImmutableList.of(), groupedTopNBuilder.getEstimatedSizeInBytes());
// add 4 rows for the first page and created three heaps with 1, 1, 2 rows respectively
assertTrue(groupedTopNBuilder.processPage(input.get(0)).process());
assertBuilderSize(groupByHash, types, ImmutableList.of(4), ImmutableList.of(1, 1, 2), groupedTopNBuilder.getEstimatedSizeInBytes());
// add 1 row for the second page and the three heaps become 2, 1, 2 rows respectively
assertTrue(groupedTopNBuilder.processPage(input.get(1)).process());
assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1), ImmutableList.of(2, 1, 2), groupedTopNBuilder.getEstimatedSizeInBytes());
// add 2 new rows for the third page (which will be compacted into two rows only) and we have four heaps with 2, 2, 2, 1 rows respectively
assertTrue(groupedTopNBuilder.processPage(input.get(2)).process());
assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1, 2), ImmutableList.of(2, 2, 2, 1), groupedTopNBuilder.getEstimatedSizeInBytes());
// the last page will be discarded
assertTrue(groupedTopNBuilder.processPage(input.get(3)).process());
assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1, 2, 0), ImmutableList.of(2, 2, 2, 1), groupedTopNBuilder.getEstimatedSizeInBytes());
List<Page> output = ImmutableList.copyOf(groupedTopNBuilder.buildResult().iterator());
assertEquals(output.size(), 1);
Page expected = rowPagesBuilder(BIGINT, DOUBLE, BIGINT)
.row(1L, 0.3, 1)
.row(1L, 0.4, 2)
.row(2L, 0.2, 1)
.row(2L, 0.7, 2)
.row(3L, 0.1, 1)
.row(3L, 0.9, 2)
.row(4L, 0.6, 1)
.build()
.get(0);
if (produceRowNumbers) {
assertPageEquals(ImmutableList.of(BIGINT, DOUBLE, BIGINT), output.get(0), expected);
}
else {
assertPageEquals(types, output.get(0), new Page(expected.getBlock(0), expected.getBlock(1)));
}
assertBuilderSize(groupByHash, types, ImmutableList.of(0, 0, 0, 0), ImmutableList.of(0, 0, 0, 0), groupedTopNBuilder.getEstimatedSizeInBytes());
}
@Test(dataProvider = "produceRowNumbers")
public void testSingleGroupTopN(boolean produceRowNumbers)
{
List<Type> types = ImmutableList.of(BIGINT, DOUBLE);
List<Page> input = rowPagesBuilder(types)
.row(1L, 0.3)
.row(2L, 0.2)
.row(3L, 0.9)
.row(3L, 0.1)
.pageBreak()
.row(1L, 0.4)
.pageBreak()
.row(1L, 0.5)
.row(1L, 0.6)
.row(4L, 0.6)
.row(2L, 0.8)
.row(2L, 0.7)
.pageBreak()
.row(2L, 0.9)
.build();
for (Page page : input) {
page.compact();
}
InMemoryGroupedTopNBuilder groupedTopNBuilder = new InMemoryGroupedTopNBuilder(
types,
new SimplePageWithPositionComparator(types, ImmutableList.of(1), ImmutableList.of(ASC_NULLS_LAST)),
5,
produceRowNumbers,
new TestingMemoryContext(100L),
new NoChannelGroupByHash());
GroupByHash groupByHash = groupedTopNBuilder.getGroupByHash();
assertBuilderSize(groupByHash, types, ImmutableList.of(), ImmutableList.of(), groupedTopNBuilder.getEstimatedSizeInBytes());
// add 4 rows for the first page and created a single heap with 4 rows
assertTrue(groupedTopNBuilder.processPage(input.get(0)).process());
assertBuilderSize(groupByHash, types, ImmutableList.of(4), ImmutableList.of(4), groupedTopNBuilder.getEstimatedSizeInBytes());
// add 1 row for the second page and the heap is with 5 rows
assertTrue(groupedTopNBuilder.processPage(input.get(1)).process());
assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1), ImmutableList.of(5), groupedTopNBuilder.getEstimatedSizeInBytes());
// update 1 new row from the third page (which will be compacted into a single row only)
assertTrue(groupedTopNBuilder.processPage(input.get(2)).process());
assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1, 1), ImmutableList.of(5), groupedTopNBuilder.getEstimatedSizeInBytes());
// the last page will be discarded
assertTrue(groupedTopNBuilder.processPage(input.get(3)).process());
assertBuilderSize(groupByHash, types, ImmutableList.of(4, 1, 1), ImmutableList.of(5), groupedTopNBuilder.getEstimatedSizeInBytes());
List<Page> output = ImmutableList.copyOf(groupedTopNBuilder.buildResult().iterator());
assertEquals(output.size(), 1);
Page expected = rowPagesBuilder(BIGINT, DOUBLE, BIGINT)
.row(3L, 0.1, 1)
.row(2L, 0.2, 2)
.row(1L, 0.3, 3)
.row(1L, 0.4, 4)
.row(1L, 0.5, 5)
.build()
.get(0);
if (produceRowNumbers) {
assertPageEquals(ImmutableList.of(BIGINT, DOUBLE, BIGINT), output.get(0), expected);
}
else {
assertPageEquals(types, output.get(0), new Page(expected.getBlock(0), expected.getBlock(1)));
}
assertBuilderSize(groupedTopNBuilder.getGroupByHash(), types, ImmutableList.of(0, 0, 0), ImmutableList.of(0), groupedTopNBuilder.getEstimatedSizeInBytes());
}
@Test
public void testYield()
{
List<Type> types = ImmutableList.of(BIGINT, DOUBLE);
Page input = rowPagesBuilder(types)
.row(1L, 0.3)
.row(1L, 0.2)
.row(1L, 0.9)
.row(1L, 0.1)
.build()
.get(0);
input.compact();
AtomicBoolean unblock = new AtomicBoolean();
GroupByHash groupByHash = createGroupByHash(ImmutableList.of(types.get(0)), ImmutableList.of(0), unblock::get);
InMemoryGroupedTopNBuilder groupedTopNBuilder = new InMemoryGroupedTopNBuilder(
types,
new SimplePageWithPositionComparator(types, ImmutableList.of(1), ImmutableList.of(ASC_NULLS_LAST)),
5,
false,
new TestingMemoryContext(100L),
groupByHash);
assertBuilderSize(groupByHash, types, ImmutableList.of(), ImmutableList.of(), groupedTopNBuilder.getEstimatedSizeInBytes());
Work<?> work = groupedTopNBuilder.processPage(input);
assertFalse(work.process());
assertFalse(work.process());
unblock.set(true);
assertTrue(work.process());
List<Page> output = ImmutableList.copyOf(groupedTopNBuilder.buildResult().iterator());
assertEquals(output.size(), 1);
Page expected = rowPagesBuilder(types)
.row(1L, 0.1)
.row(1L, 0.2)
.row(1L, 0.3)
.row(1L, 0.9)
.build()
.get(0);
assertPageEquals(types, output.get(0), expected);
assertBuilderSize(groupByHash, types, ImmutableList.of(0), ImmutableList.of(), groupedTopNBuilder.getEstimatedSizeInBytes());
}
@Test
public void testAutoCompact()
{
List<Type> types = ImmutableList.of(BIGINT, DOUBLE);
List<Page> input = rowPagesBuilder(types)
.row(1L, 0.8)
.row(2L, 0.7)
.row(3L, 0.9)
.row(3L, 0.2)
.row(3L, 0.2)
.row(3L, 0.2)
.row(3L, 0.2)
.pageBreak()
.row(3L, 0.8)
.pageBreak()
.row(2L, 0.6)
.row(3L, 0.1)
.pageBreak()
.row(1L, 0.7)
.pageBreak()
.row(1L, 0.6)
.build();
InMemoryGroupedTopNBuilder groupedTopNBuilder = new InMemoryGroupedTopNBuilder(
types,
new SimplePageWithPositionComparator(types, ImmutableList.of(1), ImmutableList.of(ASC_NULLS_LAST)),
1,
false,
new TestingMemoryContext(100L),
createGroupByHash(ImmutableList.of(types.get(0)), ImmutableList.of(0), NOOP));
// page 1:
// the first page will be compacted
assertTrue(groupedTopNBuilder.processPage(input.get(0)).process());
assertEquals(groupedTopNBuilder.getBufferedPages().size(), 1);
Page firstCompactPage = groupedTopNBuilder.getBufferedPages().get(0);
Page expected = rowPagesBuilder(types)
.row(1L, 0.8)
.row(2L, 0.7)
.row(3L, 0.2)
.build()
.get(0);
assertPageEquals(types, firstCompactPage, expected);
// page 2:
// the second page will be removed
assertTrue(groupedTopNBuilder.processPage(input.get(1)).process());
assertEquals(groupedTopNBuilder.getBufferedPages().size(), 1);
// assert the first page is not affected
assertEquals(firstCompactPage, groupedTopNBuilder.getBufferedPages().get(0));
// page 3:
// the third page will trigger another compaction of the first page
assertTrue(groupedTopNBuilder.processPage(input.get(2)).process());
List<Page> bufferedPages = groupedTopNBuilder.getBufferedPages();
assertEquals(bufferedPages.size(), 2);
// assert the previously compacted first page no longer exists
assertNotEquals(firstCompactPage, bufferedPages.get(0));
assertNotEquals(firstCompactPage, bufferedPages.get(1));
List<Page> expectedPages = rowPagesBuilder(types)
.row(1L, 0.8)
.pageBreak()
.row(2L, 0.6)
.row(3L, 0.1)
.build();
assertPageEquals(types, bufferedPages.get(0), expectedPages.get(0));
assertPageEquals(types, bufferedPages.get(1), expectedPages.get(1));
// page 4:
// the fourth page will remove the first page; also it leaves it with an empty slot
assertTrue(groupedTopNBuilder.processPage(input.get(3)).process());
bufferedPages = groupedTopNBuilder.getBufferedPages();
assertEquals(bufferedPages.size(), 2);
expectedPages = rowPagesBuilder(types)
.row(2L, 0.6)
.row(3L, 0.1)
.pageBreak()
.row(1L, 0.7)
.build();
assertPageEquals(types, bufferedPages.get(0), expectedPages.get(0));
assertPageEquals(types, bufferedPages.get(1), expectedPages.get(1));
// page 5:
// the fifth page will remove the fourth page and it will take the empty slot from the first page
assertTrue(groupedTopNBuilder.processPage(input.get(4)).process());
bufferedPages = groupedTopNBuilder.getBufferedPages();
assertEquals(bufferedPages.size(), 2);
// assert the fifth page indeed takes the first empty slot
expectedPages = rowPagesBuilder(types)
.row(1L, 0.6)
.pageBreak()
.row(2L, 0.6)
.row(3L, 0.1)
.build();
assertPageEquals(types, bufferedPages.get(0), expectedPages.get(0));
assertPageEquals(types, bufferedPages.get(1), expectedPages.get(1));
}
@Test(dataProvider = "pageRowCounts")
public void testLargePagesMemoryTracking(int pageCount, int rowCount)
{
// Memory tracking has been tested in other tests for various cases (e.g., compaction, row removal, etc).
// The purpose of this test is to verify:
// (1) the sizes of containers (e.g., big array, heap, etc) are properly tracked when they grow and
// (2) when we flush, the memory usage decreases accordingly.
List<Type> types = ImmutableList.of(BIGINT, DOUBLE);
// Create pageCount pages each page is with rowCount positions:
RowPagesBuilder rowPagesBuilder = rowPagesBuilder(types);
for (int i = 0; i < pageCount; i++) {
rowPagesBuilder.addSequencePage(rowCount, 0, rowCount * i);
}
List<Page> input = rowPagesBuilder.build();
GroupByHash groupByHash = createGroupByHash(ImmutableList.of(types.get(0)), ImmutableList.of(0), NOOP);
InMemoryGroupedTopNBuilder groupedTopNBuilder = new InMemoryGroupedTopNBuilder(
types,
new SimplePageWithPositionComparator(types, ImmutableList.of(1), ImmutableList.of(ASC_NULLS_LAST)),
pageCount * rowCount,
false,
new TestingMemoryContext(100L),
groupByHash);
// Assert memory usage gradually goes up
for (int i = 0; i < pageCount; i++) {
assertTrue(groupedTopNBuilder.processPage(input.get(i)).process());
assertBuilderSize(groupByHash, types, Collections.nCopies(i + 1, rowCount), Collections.nCopies(rowCount, i + 1), groupedTopNBuilder.getEstimatedSizeInBytes());
}
// Assert memory usage gradually goes down (i.e., proportional to the number of rows/pages we have produced)
int outputPageCount = 0;
int remainingRows = pageCount * rowCount;
Iterator<Page> output = groupedTopNBuilder.buildResult().iterator();
while (output.hasNext()) {
remainingRows -= output.next().getPositionCount();
assertBuilderSize(
groupByHash,
types,
remainingRows == 0 ? Collections.nCopies(pageCount, 0) : Collections.nCopies(pageCount, rowCount),
new ImmutableList.Builder<Integer>()
.addAll(Collections.nCopies((remainingRows + pageCount - 1) / pageCount, pageCount))
.addAll(Collections.nCopies(rowCount - (remainingRows + pageCount - 1) / pageCount, 0))
.build(),
groupedTopNBuilder.getEstimatedSizeInBytes());
outputPageCount++;
}
assertEquals(remainingRows, 0);
assertGreaterThan(outputPageCount, 3);
assertBuilderSize(groupByHash, types, Collections.nCopies(pageCount, 0), Collections.nCopies(rowCount, 0), groupedTopNBuilder.getEstimatedSizeInBytes());
}
private static GroupByHash createGroupByHash(List<Type> partitionTypes, List<Integer> partitionChannels, UpdateMemory updateMemory)
{
return GroupByHash.createGroupByHash(
partitionTypes,
Ints.toArray(partitionChannels),
Optional.empty(),
1,
false,
new JoinCompiler(createTestMetadataManager()),
updateMemory);
}
/**
* Assert the retained size in Bytes of {@param builder} with
* {@param groupByHash},
* a list of {@param types} of the input pages,
* a list of how many positions ({@param pagePositions}) of each page, and
* a list of how many rows ({}@param rowCounts}) of each group.
* Currently we do not assert the size of emptyPageReferenceSlots and assume the queue is always with INITIAL_CAPACITY = 4.
*/
private static void assertBuilderSize(
GroupByHash groupByHash,
List<Type> types,
List<Integer> pagePositions,
List<Integer> rowCounts,
long actualSizeInBytes)
{
ObjectBigArray<Object> pageReferences = new ObjectBigArray<>();
pageReferences.ensureCapacity(pagePositions.size());
long pageReferencesSizeInBytes = pageReferences.sizeOf();
ObjectBigArray<Object> groupedRows = new ObjectBigArray<>();
groupedRows.ensureCapacity(rowCounts.size());
long groupedRowsSizeInBytes = groupedRows.sizeOf();
int emptySlots = 4;
long emptyPageReferenceSlotsSizeInBytes = INT_FIFO_QUEUE_SIZE + sizeOf(new int[emptySlots]);
// build fake pages to get the real retained sizes
RowPagesBuilder rowPagesBuilder = rowPagesBuilder(types);
for (int pagePosition : pagePositions) {
if (pagePosition > 0) {
rowPagesBuilder.addSequencePage(pagePosition, new int[types.size()]);
}
}
long referencedPagesSizeInBytes = 0;
for (Page page : rowPagesBuilder.build()) {
// each page reference is with two arrays and a page
referencedPagesSizeInBytes += PAGE_REFERENCE_INSTANCE_SIZE +
page.getRetainedSizeInBytes() +
sizeOf(new Object[page.getPositionCount()]);
}
long rowHeapsSizeInBytes = 0;
for (int count : rowCounts) {
if (count > 0) {
rowHeapsSizeInBytes += new TestRowHeap(count).getEstimatedSizeInBytes();
}
}
long expectedSizeInBytes = INSTANCE_SIZE +
groupByHash.getEstimatedSize() +
referencedPagesSizeInBytes +
rowHeapsSizeInBytes +
pageReferencesSizeInBytes +
groupedRowsSizeInBytes +
(long) groupByHash.getGroupCount() * Integer.BYTES +
emptyPageReferenceSlotsSizeInBytes;
assertEquals(actualSizeInBytes, expectedSizeInBytes);
}
// this class is for memory tracking comparison
private static class TestRowHeap
extends ObjectHeapPriorityQueue<Object>
{
private static final long INSTANCE_SIZE = ClassLayout.parseClass(TestRowHeap.class).instanceSize();
// each Row is with two integers
private static final long ROW_ENTRY_SIZE = 2 * Integer.BYTES + OBJECT_OVERHEAD;
private TestRowHeap(int count)
{
super((left, right) -> 0);
for (int i = 0; i < count; i++) {
enqueue(new Object());
}
}
private long getEstimatedSizeInBytes()
{
return INSTANCE_SIZE + sizeOf(heap) + size() * ROW_ENTRY_SIZE;
}
}
// this class is for memory tracking comparison
private static class TestPageReference
{
// only need reference overhead
private Object page;
private Object reference;
private int usedPositionCount;
}
}