TestPagesSerde.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.buffer;
import com.facebook.presto.CompressionCodec;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.page.PagesSerde;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.stream.LongStream;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.operator.PageAssertions.assertPageEquals;
import static com.facebook.presto.spi.page.PagesSerdeUtil.readPages;
import static com.facebook.presto.spi.page.PagesSerdeUtil.writePages;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
public class TestPagesSerde
{
@DataProvider(name = "testCompressionCodec")
public Object[][] createTestCompressionCodec()
{
return new Object[][] {
{CompressionCodec.GZIP},
{CompressionCodec.LZ4},
{CompressionCodec.LZO},
{CompressionCodec.SNAPPY},
{CompressionCodec.ZLIB},
{CompressionCodec.ZSTD},
{CompressionCodec.NONE}
};
}
@Test(dataProvider = "testCompressionCodec")
public void testRoundTrip(CompressionCodec codec)
{
PagesSerde serde = new TestingPagesSerdeFactory(codec).createPagesSerde();
BlockBuilder expectedBlockBuilder = VARCHAR.createBlockBuilder(null, 5);
VARCHAR.writeString(expectedBlockBuilder, "alice");
VARCHAR.writeString(expectedBlockBuilder, "bob");
VARCHAR.writeString(expectedBlockBuilder, "charlie");
VARCHAR.writeString(expectedBlockBuilder, "dave");
Block expectedBlock = expectedBlockBuilder.build();
Page expectedPage = new Page(expectedBlock, expectedBlock, expectedBlock);
DynamicSliceOutput sliceOutput = new DynamicSliceOutput(1024);
writePages(serde, sliceOutput, expectedPage, expectedPage, expectedPage);
List<Type> types = ImmutableList.of(VARCHAR, VARCHAR, VARCHAR);
Iterator<Page> pageIterator = readPages(serde, sliceOutput.slice().getInput());
assertPageEquals(types, pageIterator.next(), expectedPage);
assertPageEquals(types, pageIterator.next(), expectedPage);
assertPageEquals(types, pageIterator.next(), expectedPage);
assertFalse(pageIterator.hasNext());
}
@Test(dataProvider = "testCompressionCodec")
public void testBigintSerializedSize(CompressionCodec codec)
{
BlockBuilder builder = BIGINT.createBlockBuilder(null, 5);
// empty page
Page page = new Page(builder.build());
int pageSize = serializedSize(ImmutableList.of(BIGINT), page, codec);
assertEquals(pageSize, 56); // page overhead ideally 35 but since a 0 sized block will be a RLEBlock we have an overhead of 13
// page with one value
BIGINT.writeLong(builder, 123);
pageSize = 35; // Now we have moved to the normal block implementation so the page size overhead is 35
page = new Page(builder.build());
int firstValueSize = serializedSize(ImmutableList.of(BIGINT), page, codec) - pageSize;
assertEquals(firstValueSize, 17); // value size + value overhead
// page with two values
BIGINT.writeLong(builder, 456);
page = new Page(builder.build());
int secondValueSize = serializedSize(ImmutableList.of(BIGINT), page, codec) - (pageSize + firstValueSize);
assertEquals(secondValueSize, 8); // value size (value overhead is shared with previous value)
}
@Test(dataProvider = "testCompressionCodec")
public void testVarcharSerializedSize(CompressionCodec codec)
{
BlockBuilder builder = VARCHAR.createBlockBuilder(null, 5);
// empty page
Page page = new Page(builder.build());
int pageSize = serializedSize(ImmutableList.of(VARCHAR), page, codec);
assertEquals(pageSize, 52); // page overhead
// page with one value
VARCHAR.writeString(builder, "alice");
page = new Page(builder.build());
int firstValueSize = serializedSize(ImmutableList.of(VARCHAR), page, codec) - pageSize;
assertEquals(firstValueSize, 4 + 5); // length + "alice"
// page with two values
VARCHAR.writeString(builder, "bob");
page = new Page(builder.build());
int secondValueSize = serializedSize(ImmutableList.of(VARCHAR), page, codec) - (pageSize + firstValueSize);
assertEquals(secondValueSize, 4 + 3); // length + "bob" (null shared with first entry)
}
@Test(dataProvider = "testCompressionCodec")
public void testRoundTripSizeForCompactPageStaysWithinTwentyPercent(CompressionCodec codec)
{
PagesSerde serde = new TestingPagesSerdeFactory(codec).createPagesSerde();
BlockBuilder variableWidthBlockBuilder1 = VARCHAR.createBlockBuilder(null, 128);
BlockBuilder variableWidthBlockBuilder2 = VARCHAR.createBlockBuilder(null, 256);
BlockBuilder bigintBlockBuilder = BIGINT.createBlockBuilder(null, 128);
Block emptyVariableWidthBlock = VARCHAR.createBlockBuilder(null, 128).build();
LongStream.range(0, 100).forEach(value -> {
VARCHAR.writeString(variableWidthBlockBuilder1, UUID.randomUUID().toString());
VARCHAR.writeString(variableWidthBlockBuilder2, UUID.randomUUID().toString());
VARCHAR.writeString(variableWidthBlockBuilder2, UUID.randomUUID().toString());
BIGINT.writeLong(bigintBlockBuilder, value);
});
Page compactPage = new Page(
emptyVariableWidthBlock,
variableWidthBlockBuilder1.build(),
variableWidthBlockBuilder2.build(),
bigintBlockBuilder.build())
.compact();
Page deserializedPage = serde.deserialize(serde.serialize(compactPage));
double expectedMaxSize = compactPage.getRetainedSizeInBytes() * 1.2; // 120%
double actualSize = deserializedPage.getRetainedSizeInBytes();
assertTrue(actualSize < expectedMaxSize, "Expected round trip size difference less than 20% of original page");
}
private static int serializedSize(List<? extends Type> types, Page expectedPage, CompressionCodec codec)
{
PagesSerde serde = new TestingPagesSerdeFactory(codec).createPagesSerde();
DynamicSliceOutput sliceOutput = new DynamicSliceOutput(1024);
writePages(serde, sliceOutput, expectedPage);
Slice slice = sliceOutput.slice();
Iterator<Page> pageIterator = readPages(serde, slice.getInput());
if (pageIterator.hasNext()) {
assertPageEquals(types, pageIterator.next(), expectedPage);
}
else {
assertEquals(expectedPage.getPositionCount(), 0);
}
assertFalse(pageIterator.hasNext());
return slice.length();
}
}