TestStreamLayout.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.block.MapBlockBuilder;
import com.facebook.presto.common.type.MapType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.orc.cache.StorageOrcFileTailSource;
import com.facebook.presto.orc.metadata.ColumnEncoding;
import com.facebook.presto.orc.metadata.CompressionKind;
import com.facebook.presto.orc.metadata.DwrfSequenceEncoding;
import com.facebook.presto.orc.metadata.Stream;
import com.facebook.presto.orc.metadata.Stream.StreamKind;
import com.facebook.presto.orc.metadata.StripeFooter;
import com.facebook.presto.orc.proto.DwrfProto;
import com.facebook.presto.orc.stream.StreamDataOutput;
import com.facebook.presto.orc.writer.ColumnSizeLayout;
import com.facebook.presto.orc.writer.StreamLayout.ByStreamSize;
import com.facebook.presto.orc.writer.StreamLayoutFactory;
import com.facebook.presto.orc.writer.StreamOrderingLayout;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import org.joda.time.DateTimeZone;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.IOException;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.SortedMap;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.orc.NoOpOrcWriterStats.NOOP_WRITER_STATS;
import static com.facebook.presto.orc.NoopOrcAggregatedMemoryContext.NOOP_ORC_AGGREGATED_MEMORY_CONTEXT;
import static com.facebook.presto.orc.OrcTester.arrayType;
import static com.facebook.presto.orc.OrcTester.createOrcWriter;
import static com.facebook.presto.orc.OrcTester.mapType;
import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DIRECT;
import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DWRF_MAP_FLAT;
import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID;
import static com.facebook.presto.orc.metadata.Stream.StreamKind.DATA;
import static com.facebook.presto.orc.metadata.Stream.StreamKind.IN_MAP;
import static com.facebook.presto.orc.metadata.Stream.StreamKind.LENGTH;
import static com.facebook.presto.orc.metadata.Stream.StreamKind.PRESENT;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.Collections.shuffle;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
public class TestStreamLayout
{
private static StreamDataOutput createStream(int nodeId, int seqId, StreamKind streamKind, int length)
{
Stream stream = new Stream(nodeId, seqId, streamKind, length, true);
return new StreamDataOutput(Slices.allocate(1024), stream);
}
private static StreamDataOutput createStream(int nodeId, StreamKind streamKind, int length)
{
Stream stream = new Stream(nodeId, DEFAULT_SEQUENCE_ID, streamKind, length, true);
return new StreamDataOutput(Slices.allocate(1024), stream);
}
private static void verifyStream(Stream stream, int nodeId, StreamKind streamKind, int length)
{
assertEquals(stream.getColumn(), nodeId);
assertEquals(stream.getLength(), length);
assertEquals(stream.getStreamKind(), streamKind);
}
private static void verifyStream(Stream actual, int nodeId, int seqId, StreamKind streamKind, int length)
{
Stream expected = new Stream(nodeId, seqId, streamKind, length, true);
assertEquals(actual, expected);
}
@Test
public void testByStreamSize()
{
List<StreamDataOutput> streams = new ArrayList<>();
int length = 10_000;
for (int i = 0; i < 10; i++) {
streams.add(createStream(i, PRESENT, length - i));
streams.add(createStream(i, DATA, length - 100 - i));
}
shuffle(streams);
new ByStreamSize().reorder(streams);
assertEquals(streams.size(), 20);
Iterator<StreamDataOutput> iterator = streams.iterator();
for (int i = 9; i >= 0; i--) {
verifyStream(iterator.next().getStream(), i, DATA, length - 100 - i);
}
for (int i = 9; i >= 0; i--) {
verifyStream(iterator.next().getStream(), i, PRESENT, length - i);
}
assertFalse(iterator.hasNext());
}
@DataProvider
public static Object[][] testByColumnSizeDataProvider()
{
// Assume the following schema:
// Node 1, Column 0, Type: MAP (map1)
// Node 2, Column 0, Type: INT
// Node 3, Column 0, Type: LIST
// Node 4, Column 0, Type: INT
// Node 5, Column 1, Type: MAP (FLAT flatMap1)
// Node 6, Column 1, Type: INT (absent in flat maps)
// Node 7, Column 1, Type: LIST
// Node 8, Column 1, Type: INT
// Node 9, Column 2, Type: LIST (list1)
// Node 10, Column 2, Type: INT
// Node 11, Column 3, Name: m2, Type: MAP (FLAT flatMap2)
// Node 12, Column 3, Name: key, Type: INT (absent in flat maps)
// Node 13, Column 3, Name: item, Type: INT
// Node 14, Column 4, Type: INT (regular1)
// Node 15, Column 5, Type: INT (regular2)
// Node 16, Column 6, Type: INT (regular3)
final int map1 = 1;
final int map1Key = 2;
final int map1Val = 3;
final int map1ValElem = 4;
final int flatMap1 = 5;
final int flatMap1Val = 7;
final int flatMap1ValElem = 8;
final int list1 = 9;
final int list1Elem = 10;
final int flatMap2 = 11;
final int flatMap2Val = 13;
final int regular1 = 14;
final int regular2 = 15;
final int regular3 = 16;
// supply streams in the expected order, test will perform several reorder
// iterations with shuffling
return new Object[][] {
{
"split non-flatmap and flatmap columns into separate groups",
new StreamDataOutput[] {
createStream(map1, PRESENT, 0),
createStream(map1Key, PRESENT, 0),
createStream(list1, PRESENT, 0),
createStream(regular1, PRESENT, 0),
createStream(regular2, PRESENT, 0),
createStream(flatMap1, PRESENT, 0),
createStream(flatMap1Val, PRESENT, 0),
createStream(flatMap1ValElem, 1, PRESENT, 0),
createStream(flatMap1ValElem, 2, PRESENT, 0),
createStream(flatMap1ValElem, 3, PRESENT, 0),
createStream(flatMap2, PRESENT, 0),
createStream(flatMap2Val, 1, PRESENT, 0),
createStream(flatMap2Val, 2, PRESENT, 0)
}
},
{
"order columns by total column size in desc order",
new StreamDataOutput[] {
createStream(regular1, PRESENT, 5_000_000),
createStream(regular2, PRESENT, 4_000_000),
createStream(list1, PRESENT, 3_000_000),
createStream(list1Elem, PRESENT, 200),
createStream(map1, PRESENT, 10),
createStream(map1Key, PRESENT, 3_000_000),
createStream(flatMap2, PRESENT, 1),
createStream(flatMap2Val, 1, PRESENT, 5),
createStream(flatMap2Val, 1, DATA, 5),
createStream(flatMap2Val, 2, DATA, 5),
createStream(flatMap2Val, 3, PRESENT, 5),
createStream(flatMap1, PRESENT, 1),
createStream(flatMap1Val, PRESENT, 1),
createStream(flatMap1ValElem, 1, PRESENT, 1),
createStream(flatMap1ValElem, 1, DATA, 1),
createStream(flatMap1ValElem, 1, LENGTH, 1),
}
},
{
"group by sequence",
new StreamDataOutput[] {
createStream(flatMap1, PRESENT, 1),
createStream(flatMap1Val, 1, PRESENT, 1),
createStream(flatMap1ValElem, 1, PRESENT, 0),
createStream(flatMap1ValElem, 1, DATA, 0),
createStream(flatMap1ValElem, 1, LENGTH, 0),
createStream(flatMap1Val, 2, PRESENT, 1),
createStream(flatMap1ValElem, 2, PRESENT, 0),
createStream(flatMap1ValElem, 2, DATA, 0),
createStream(flatMap1ValElem, 2, LENGTH, 0),
createStream(flatMap2, PRESENT, 0),
createStream(flatMap2Val, 1, PRESENT, 0),
createStream(flatMap2Val, 1, DATA, 0),
createStream(flatMap2Val, 1, LENGTH, 0),
createStream(flatMap2Val, 2, PRESENT, 0),
createStream(flatMap2Val, 2, DATA, 0),
createStream(flatMap2Val, 2, LENGTH, 0),
}
},
{
"order sequence streams by column+sequence size in desc order",
new StreamDataOutput[] {
createStream(flatMap1, PRESENT, 1000),
// seq 2
createStream(flatMap1Val, 2, PRESENT, 20),
createStream(flatMap1ValElem, 2, PRESENT, 20),
createStream(flatMap1ValElem, 2, DATA, 20),
createStream(flatMap1ValElem, 2, LENGTH, 20),
// seq 1
createStream(flatMap1Val, 1, PRESENT, 10),
createStream(flatMap1ValElem, 1, PRESENT, 10),
createStream(flatMap1ValElem, 1, DATA, 10),
createStream(flatMap1ValElem, 1, LENGTH, 10),
//
createStream(flatMap2, PRESENT, 10),
// seq 1
createStream(flatMap2Val, 1, PRESENT, 30),
createStream(flatMap2Val, 1, DATA, 30),
createStream(flatMap2Val, 1, LENGTH, 30),
// seq 2
createStream(flatMap2Val, 2, PRESENT, 10),
createStream(flatMap2Val, 2, DATA, 10),
createStream(flatMap2Val, 2, LENGTH, 10),
}
},
{
"order by the node in asc order",
new StreamDataOutput[] {
createStream(list1, PRESENT, 5),
createStream(list1Elem, PRESENT, 5),
createStream(regular1, PRESENT, 10),
createStream(regular2, DATA, 10),
createStream(flatMap1, PRESENT, 5),
createStream(flatMap1Val, 1, DATA, 5),
createStream(flatMap2, PRESENT, 5),
createStream(flatMap2Val, 1, DATA, 5),
}
},
{
"order by stream kind",
new StreamDataOutput[] {
createStream(list1, PRESENT, 0),
createStream(list1Elem, PRESENT, 0),
createStream(list1Elem, DATA, 0),
createStream(list1Elem, LENGTH, 0),
}
},
};
}
@Test(dataProvider = "testByColumnSizeDataProvider")
public void testByColumnSize(String testName, StreamDataOutput[] streams)
{
List<StreamDataOutput> expectedStreams = ImmutableList.copyOf(streams);
List<StreamDataOutput> testStreams = new ArrayList<>(ImmutableList.copyOf(streams));
Map<Integer, Integer> nodeToColumn = ImmutableMap.<Integer, Integer>builder()
.put(1, 0)
.put(2, 0)
.put(3, 0)
.put(4, 0)
.put(5, 1)
.put(6, 1)
.put(7, 1)
.put(8, 1)
.put(9, 2)
.put(10, 2)
.put(11, 3)
.put(12, 3)
.put(13, 3)
.put(14, 4)
.put(15, 5)
.put(16, 6)
.build();
Map<Integer, ColumnEncoding> nodeIdToColumnEncodings = ImmutableMap.<Integer, ColumnEncoding>builder()
.put(5, new ColumnEncoding(DWRF_MAP_FLAT, 0))
.put(11, new ColumnEncoding(DWRF_MAP_FLAT, 0))
.build();
ColumnSizeLayout layout = new ColumnSizeLayout();
int seed = LocalDate.now(ZoneId.of("America/Los_Angeles")).getDayOfYear();
Random rnd = new Random(seed);
for (int i = 0; i < 25; i++) {
shuffle(testStreams, rnd);
layout.reorder(testStreams, nodeToColumn, nodeIdToColumnEncodings);
assertEquals(testStreams, expectedStreams);
}
}
@DataProvider(name = "testParams")
public static Object[][] testParams()
{
return new Object[][] {{true}, {false}};
}
@Test(dataProvider = "testParams")
public void testByStreamSizeStreamOrdering(boolean isEmptyMap)
{
List<StreamDataOutput> streams = createStreams(isEmptyMap);
ByStreamSize streamLayout = new ByStreamSize();
StreamOrderingLayout streamOrderingLayout = new StreamOrderingLayout(createStreamReorderingInput(), streamLayout);
streamOrderingLayout.reorder(streams, createNodeIdToColumnId(), createColumnEncodings(isEmptyMap));
Iterator<StreamDataOutput> iterator = streams.iterator();
if (!isEmptyMap) {
verifyFlatMapColumns(iterator);
}
// non flat map columns
verifyStream(iterator.next().getStream(), 5, 0, DATA, 1);
verifyStream(iterator.next().getStream(), 2, 0, LENGTH, 2);
verifyStream(iterator.next().getStream(), 1, 0, DATA, 3);
verifyStream(iterator.next().getStream(), 4, 0, LENGTH, 5);
verifyStream(iterator.next().getStream(), 3, 0, DATA, 8);
verifyStream(iterator.next().getStream(), 1, 0, PRESENT, 12);
if (!isEmptyMap) {
// flat map stream not reordered
verifyStream(iterator.next().getStream(), 11, 5, IN_MAP, 13);
verifyStream(iterator.next().getStream(), 11, 5, LENGTH, 14);
verifyStream(iterator.next().getStream(), 12, 5, DATA, 15);
}
assertFalse(iterator.hasNext());
}
@Test(dataProvider = "testParams")
public void testByColumnSizeStreamOrdering(boolean isEmptyMap)
{
List<StreamDataOutput> streams = createStreams(isEmptyMap);
ColumnSizeLayout layout = new ColumnSizeLayout();
StreamOrderingLayout streamOrderingLayout = new StreamOrderingLayout(createStreamReorderingInput(), layout);
streamOrderingLayout.reorder(streams, createNodeIdToColumnId(), createColumnEncodings(isEmptyMap));
Iterator<StreamDataOutput> iterator = streams.iterator();
if (!isEmptyMap) {
verifyFlatMapColumns(iterator);
}
// regular columns
// column 1 with total size 16, ordered by nodes
verifyStream(iterator.next().getStream(), 2, 0, LENGTH, 2);
verifyStream(iterator.next().getStream(), 3, 0, DATA, 8);
verifyStream(iterator.next().getStream(), 4, 0, LENGTH, 5);
verifyStream(iterator.next().getStream(), 5, 0, DATA, 1);
// column 0 with total size 15, ordered by stream kind
verifyStream(iterator.next().getStream(), 1, 0, PRESENT, 12);
verifyStream(iterator.next().getStream(), 1, 0, DATA, 3);
if (!isEmptyMap) {
// flat map stream are also ordered by node and kind
verifyStream(iterator.next().getStream(), 11, 5, LENGTH, 14);
verifyStream(iterator.next().getStream(), 11, 5, IN_MAP, 13);
verifyStream(iterator.next().getStream(), 12, 5, DATA, 15);
}
assertFalse(iterator.hasNext());
}
private static Map<Integer, ColumnEncoding> createColumnEncodings(boolean isEmptyMap)
{
SortedMap<Integer, DwrfSequenceEncoding> seqIdToEncodings1 = ImmutableSortedMap.of(
1, new DwrfSequenceEncoding(
DwrfProto.KeyInfo.newBuilder().setIntKey(100).build(),
new ColumnEncoding(DIRECT, 0)),
2, new DwrfSequenceEncoding(
DwrfProto.KeyInfo.newBuilder().setIntKey(200).build(),
new ColumnEncoding(DIRECT, 0)),
3, new DwrfSequenceEncoding(
DwrfProto.KeyInfo.newBuilder().setIntKey(300).build(),
new ColumnEncoding(DIRECT, 0)));
SortedMap<Integer, DwrfSequenceEncoding> seqIdToEncodings2 = ImmutableSortedMap.of(
1, new DwrfSequenceEncoding(
DwrfProto.KeyInfo.newBuilder().setIntKey(100).build(),
new ColumnEncoding(DIRECT, 0)),
2, new DwrfSequenceEncoding(
DwrfProto.KeyInfo.newBuilder().setIntKey(200).build(),
new ColumnEncoding(DIRECT, 0)),
3, new DwrfSequenceEncoding(
DwrfProto.KeyInfo.newBuilder().setIntKey(300).build(),
new ColumnEncoding(DIRECT, 0)),
4, new DwrfSequenceEncoding(
DwrfProto.KeyInfo.newBuilder().setIntKey(400).build(),
new ColumnEncoding(DIRECT, 0)),
5, new DwrfSequenceEncoding(
DwrfProto.KeyInfo.newBuilder().setIntKey(500).build(),
new ColumnEncoding(DIRECT, 0)));
return ImmutableMap.<Integer, ColumnEncoding>builder()
.put(1, new ColumnEncoding(DIRECT, 0))
.put(2, new ColumnEncoding(DIRECT, 0))
.put(3, new ColumnEncoding(DIRECT, 0))
.put(4, new ColumnEncoding(DIRECT, 0))
.put(5, new ColumnEncoding(DIRECT, 0))
.put(6, new ColumnEncoding(DWRF_MAP_FLAT, 0))
.put(8, new ColumnEncoding(DIRECT, 0, isEmptyMap ? Optional.empty() : Optional.of(seqIdToEncodings1)))
.put(9, new ColumnEncoding(DWRF_MAP_FLAT, 0))
.put(11, new ColumnEncoding(DIRECT, 0, isEmptyMap ? Optional.empty() : Optional.of(seqIdToEncodings2)))
.put(12, new ColumnEncoding(DIRECT, 0, isEmptyMap ? Optional.empty() : Optional.of(seqIdToEncodings2)))
.build();
}
private static Map<Integer, Integer> createNodeIdToColumnId()
{
// col Id 0, 1, 2, 3
// node Id 0 to 12
return ImmutableMap.<Integer, Integer>builder()
.put(1, 0)
.put(2, 1)
.put(3, 1)
.put(4, 1)
.put(5, 1)
.put(6, 2)
.put(7, 2)
.put(8, 2)
.put(9, 3)
.put(10, 3)
.put(11, 3)
.put(12, 3)
.build();
}
private static DwrfStreamOrderingConfig createStreamReorderingInput()
{
Map<Integer, List<DwrfProto.KeyInfo>> columnIdToFlatMapKeyIds = new HashMap<>();
columnIdToFlatMapKeyIds.put(
// column exists
// complete overlap between column encodings and key ordering
2, ImmutableList.of(
DwrfProto.KeyInfo.newBuilder().setIntKey(300).build(),
DwrfProto.KeyInfo.newBuilder().setIntKey(200).build(),
DwrfProto.KeyInfo.newBuilder().setIntKey(100).build()));
columnIdToFlatMapKeyIds.put(
// column exists,
// key 600 doesn't exist in the column encodings
// key 500 exists in the column encodings not present in the order list
3, ImmutableList.of(
DwrfProto.KeyInfo.newBuilder().setIntKey(100).build(),
DwrfProto.KeyInfo.newBuilder().setIntKey(200).build(),
DwrfProto.KeyInfo.newBuilder().setIntKey(400).build(),
DwrfProto.KeyInfo.newBuilder().setIntKey(300).build(),
DwrfProto.KeyInfo.newBuilder().setIntKey(600).build()));
columnIdToFlatMapKeyIds.put(// column does not exist in the input schema
4, ImmutableList.of(
DwrfProto.KeyInfo.newBuilder().setIntKey(100).build(),
DwrfProto.KeyInfo.newBuilder().setIntKey(200).build()));
return new DwrfStreamOrderingConfig(columnIdToFlatMapKeyIds);
}
private static void verifyFlatMapColumns(Iterator<StreamDataOutput> iterator)
{
// flat map stream are ordered by node and kind
// Kind order: DATA:1, LENGTH:2, IN_MAP:12
// column 2
verifyStream(iterator.next().getStream(), 8, 3, DATA, 7);
verifyStream(iterator.next().getStream(), 8, 3, IN_MAP, 6);
verifyStream(iterator.next().getStream(), 8, 2, DATA, 5);
verifyStream(iterator.next().getStream(), 8, 2, IN_MAP, 4);
verifyStream(iterator.next().getStream(), 8, 1, DATA, 3);
verifyStream(iterator.next().getStream(), 8, 1, IN_MAP, 2);
// column 3
verifyStream(iterator.next().getStream(), 11, 1, LENGTH, 2);
verifyStream(iterator.next().getStream(), 11, 1, IN_MAP, 1);
verifyStream(iterator.next().getStream(), 12, 1, DATA, 3);
verifyStream(iterator.next().getStream(), 11, 2, LENGTH, 5);
verifyStream(iterator.next().getStream(), 11, 2, IN_MAP, 4);
verifyStream(iterator.next().getStream(), 12, 2, DATA, 6);
verifyStream(iterator.next().getStream(), 11, 4, LENGTH, 11);
verifyStream(iterator.next().getStream(), 11, 4, IN_MAP, 10);
verifyStream(iterator.next().getStream(), 12, 4, DATA, 12);
verifyStream(iterator.next().getStream(), 11, 3, LENGTH, 8);
verifyStream(iterator.next().getStream(), 11, 3, IN_MAP, 7);
verifyStream(iterator.next().getStream(), 12, 3, DATA, 9);
}
private static List<StreamDataOutput> createStreams(boolean isEmptyMap)
{
// Assume the file has the following schema:
// column 0: 1INT
// column 1: 2MAP<3INT, 4LIST<5INT>> // non flat map
// column 2: 6MAP<7INT, 8FLOAT> // flat map
// column 3: 9MAP<10INT, 11LIST<12INT> // flat map
List<StreamDataOutput> streams = new ArrayList<>();
// column 0
streams.add(createStream(1, DATA, 3));
streams.add(createStream(1, PRESENT, 12));
// column 1 MAP<INT, LIST<INT>> <2, <3, 4<5>>>>
streams.add(createStream(2, LENGTH, 2)); // MAP
streams.add(createStream(3, DATA, 8)); // INT
streams.add(createStream(4, LENGTH, 5)); // LIST<INT>
streams.add(createStream(5, DATA, 1)); // INT
if (!isEmptyMap) {
// column 2 MAP<INT, FLOAT> <6 <7, 8>>
streams.add(createStream(8, 1, IN_MAP, 2));
streams.add(createStream(8, 1, DATA, 3));
streams.add(createStream(8, 2, IN_MAP, 4));
streams.add(createStream(8, 2, DATA, 5));
streams.add(createStream(8, 3, IN_MAP, 6));
streams.add(createStream(8, 3, DATA, 7));
// column 3 MAP<INT, LIST<INT> <9 <10, 11<12>>>
streams.add(createStream(11, 1, IN_MAP, 1));
streams.add(createStream(11, 1, LENGTH, 2));
streams.add(createStream(12, 1, DATA, 3));
streams.add(createStream(11, 2, IN_MAP, 4));
streams.add(createStream(11, 2, LENGTH, 5));
streams.add(createStream(12, 2, DATA, 6));
streams.add(createStream(11, 3, IN_MAP, 7));
streams.add(createStream(11, 3, LENGTH, 8));
streams.add(createStream(12, 3, DATA, 9));
streams.add(createStream(11, 4, IN_MAP, 10));
streams.add(createStream(11, 4, LENGTH, 11));
streams.add(createStream(12, 4, DATA, 12));
streams.add(createStream(11, 5, IN_MAP, 13));
streams.add(createStream(11, 5, LENGTH, 14));
streams.add(createStream(12, 5, DATA, 15));
}
return streams;
}
@DataProvider
public static Object[][] streamOrderingLayoutProvider()
{
// stream ordering test writes the following keys: 0, 1, 2
return new Object[][] {
{2L, 0L, 1L}, // full match
{new Long[0]}, // no keys
{0L}, // partially matching keys
{10L, 20L}, // full mismatch
{2L, 10L} // partial match + mismatch
};
}
@Test(dataProvider = "streamOrderingLayoutProvider")
public void testStreamOrderingLayoutEndToEndPrimitive(Object[] orderedKeys)
throws Exception
{
MapType mapType = (MapType) mapType(INTEGER, INTEGER);
// create a map with a single row and three keys 0, 1, 2
MapBlockBuilder mapBlockBuilder = (MapBlockBuilder) mapType.createBlockBuilder(null, 10);
BlockBuilder mapKeyBuilder = mapBlockBuilder.getKeyBlockBuilder();
BlockBuilder mapValueBuilder = mapBlockBuilder.getValueBlockBuilder();
mapBlockBuilder.beginDirectEntry();
for (int k = 0; k < 3; k++) {
INTEGER.writeLong(mapKeyBuilder, k);
INTEGER.writeLong(mapValueBuilder, k);
}
mapBlockBuilder.closeEntry();
Page page = new Page(mapBlockBuilder.build());
// inject a custom layout factory
StreamLayoutFactory streamLayoutFactory = createLongKeysStreamLayoutFactory(0, orderedKeys);
OrcWriterOptions writerOptions = OrcWriterOptions.builder()
.withFlattenedColumns(ImmutableSet.of(0))
.withStreamLayoutFactory(streamLayoutFactory)
.build();
try (TempFile tempFile = new TempFile()) {
try (OrcWriter orcWriter = createOrcWriter(
tempFile.getFile(),
OrcEncoding.DWRF,
CompressionKind.ZLIB,
Optional.empty(),
ImmutableList.of(mapType),
writerOptions,
NOOP_WRITER_STATS)) {
orcWriter.write(page);
}
assertFileStreamsOrder(orderedKeys, mapType, tempFile);
}
}
@Test(dataProvider = "streamOrderingLayoutProvider")
public void testStreamOrderingLayoutEndToEndComplexEmpty(Object[] orderedKeys)
throws Exception
{
MapType mapType = (MapType) mapType(INTEGER, arrayType(INTEGER));
// create an empty map
MapBlockBuilder mapBlockBuilder = (MapBlockBuilder) mapType.createBlockBuilder(null, 10);
mapBlockBuilder.beginDirectEntry();
mapBlockBuilder.closeEntry();
Page page = new Page(mapBlockBuilder.build());
// inject a custom layout factory
StreamLayoutFactory streamLayoutFactory = createLongKeysStreamLayoutFactory(0, orderedKeys);
OrcWriterOptions writerOptions = OrcWriterOptions.builder()
.withFlattenedColumns(ImmutableSet.of(0))
.withStreamLayoutFactory(streamLayoutFactory)
.build();
try (TempFile tempFile = new TempFile()) {
try (OrcWriter orcWriter = createOrcWriter(
tempFile.getFile(),
OrcEncoding.DWRF,
CompressionKind.ZLIB,
Optional.empty(),
ImmutableList.of(mapType),
writerOptions,
NOOP_WRITER_STATS)) {
orcWriter.write(page);
// no need to verify empty map
}
}
}
private static void assertFileStreamsOrder(Object[] orderedKeys, MapType mapType, TempFile tempFile)
throws IOException
{
// introspect the file to get access to the file meta information
CapturingOrcFileIntrospector introspector = new CapturingOrcFileIntrospector();
readFileWithIntrospector(mapType, introspector, tempFile);
assertEquals(introspector.getStripeFooterByStripeOffset().size(), 1);
StripeFooter stripeFooter = introspector.getStripeFooterByStripeOffset().values().iterator().next();
// get data streams, because only data streams are ordered
List<Stream> dataStreams = stripeFooter.getStreams().stream()
.filter(s -> s.getStreamKind() != StreamKind.ROW_INDEX)
.collect(toImmutableList());
// get sequence to key mapping for flat map value node
int node = 3; // map value node
ColumnEncoding columnEncoding = stripeFooter.getColumnEncodings().get(node);
SortedMap<Integer, DwrfSequenceEncoding> nodeSequences = columnEncoding.getAdditionalSequenceEncodings().get();
ImmutableMap.Builder<Long, Integer> keyToSequenceBuilder = ImmutableMap.builder();
for (Map.Entry<Integer, DwrfSequenceEncoding> entry : nodeSequences.entrySet()) {
long key = entry.getValue().getKey().getIntKey();
int sequence = entry.getKey();
keyToSequenceBuilder.put(key, sequence);
}
Map<Long, Integer> keyToSequence = keyToSequenceBuilder.build();
// remove mismatching keys that are not written by the writer
List<Long> filteredKeys = Arrays.stream(orderedKeys)
.map(k -> (Long) k)
.filter(k -> k >= 0 && k < 3)
.collect(toImmutableList());
// assert that the streams are indeed ordered
// ordered streams come at the head of the all data streams
Iterator<Stream> dataStreamsIterator = dataStreams.iterator();
for (Long key : filteredKeys) {
// there should be IN_MAP + DATA streams for each sequence, we don't care about what stream kind comes first
int sequence = keyToSequence.get(key);
assertEquals(dataStreamsIterator.next().getSequence(), sequence);
assertEquals(dataStreamsIterator.next().getSequence(), sequence);
}
}
private static void readFileWithIntrospector(Type type, CapturingOrcFileIntrospector introspector, TempFile tempFile)
throws IOException
{
OrcDataSource dataSource = new FileOrcDataSource(tempFile.getFile(),
new DataSize(1, MEGABYTE),
new DataSize(1, MEGABYTE),
new DataSize(1, MEGABYTE),
true);
OrcReaderOptions readerOptions = OrcReaderOptions.builder()
.withMaxMergeDistance(new DataSize(1, MEGABYTE))
.withTinyStripeThreshold(new DataSize(1, MEGABYTE))
.withMaxBlockSize(new DataSize(1, MEGABYTE))
.build();
OrcReader reader = new OrcReader(
dataSource,
OrcEncoding.DWRF,
new StorageOrcFileTailSource(),
StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()),
Optional.empty(),
NOOP_ORC_AGGREGATED_MEMORY_CONTEXT,
readerOptions,
false,
DwrfEncryptionProvider.NO_ENCRYPTION,
DwrfKeyProvider.EMPTY,
new RuntimeStats(),
Optional.of(introspector),
tempFile.getFile().lastModified());
OrcSelectiveRecordReader recordReader = reader.createSelectiveRecordReader(
ImmutableMap.of(0, type),
ImmutableList.of(0),
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
OrcPredicate.TRUE,
0,
dataSource.getSize(),
DateTimeZone.UTC,
NOOP_ORC_AGGREGATED_MEMORY_CONTEXT,
Optional.empty(),
1000);
while (recordReader.getNextPage() != null) {
// ignore
}
recordReader.close();
}
private static StreamLayoutFactory createLongKeysStreamLayoutFactory(int column, Object[] orderedKeys)
{
List<DwrfProto.KeyInfo> keyInfos = Arrays.stream(orderedKeys)
.map(key -> DwrfProto.KeyInfo.newBuilder()
.setIntKey((Long) key)
.build())
.collect(toImmutableList());
Map<Integer, List<DwrfProto.KeyInfo>> columnToKeys = ImmutableMap.of(column, keyInfos);
DwrfStreamOrderingConfig streamOrderingConfig = new DwrfStreamOrderingConfig(columnToKeys);
StreamLayoutFactory baseStreamLayoutFactory = new StreamLayoutFactory.ColumnSizeLayoutFactory();
return () -> new StreamOrderingLayout(streamOrderingConfig, baseStreamLayoutFactory.create());
}
}