TestOrcReaderPositions.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.function.SqlFunctionProperties;
import com.facebook.presto.common.predicate.FilterFunction;
import com.facebook.presto.common.relation.Predicate;
import com.facebook.presto.orc.cache.StorageOrcFileTailSource;
import com.facebook.presto.orc.metadata.CompressionKind;
import com.facebook.presto.orc.metadata.Footer;
import com.facebook.presto.orc.metadata.statistics.IntegerStatistics;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.io.Writable;
import org.apache.orc.NullMemoryManager;
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.LongStream;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.orc.DwrfEncryptionProvider.NO_ENCRYPTION;
import static com.facebook.presto.orc.NoopOrcAggregatedMemoryContext.NOOP_ORC_AGGREGATED_MEMORY_CONTEXT;
import static com.facebook.presto.orc.OrcEncoding.ORC;
import static com.facebook.presto.orc.OrcReader.BATCH_SIZE_GROWTH_FACTOR;
import static com.facebook.presto.orc.OrcReader.INITIAL_BATCH_SIZE;
import static com.facebook.presto.orc.OrcReader.MAX_BATCH_SIZE;
import static com.facebook.presto.orc.OrcTester.Format.ORC_12;
import static com.facebook.presto.orc.OrcTester.MAX_BLOCK_SIZE;
import static com.facebook.presto.orc.OrcTester.createCustomOrcRecordReader;
import static com.facebook.presto.orc.OrcTester.createCustomOrcSelectiveRecordReader;
import static com.facebook.presto.orc.OrcTester.createOrcRecordWriter;
import static com.facebook.presto.orc.OrcTester.createSettableStructObjectInspector;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.Math.min;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.SNAPPY;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class TestOrcReaderPositions
{
@Test
public void testEntireFile()
throws Exception
{
try (TempFile tempFile = new TempFile()) {
createMultiStripeFile(tempFile.getFile());
try (OrcBatchRecordReader reader = createCustomOrcRecordReader(tempFile, ORC, OrcPredicate.TRUE, BIGINT, MAX_BATCH_SIZE, false, false)) {
assertEquals(reader.getReaderRowCount(), 100);
assertEquals(reader.getReaderPosition(), 0);
assertEquals(reader.getFileRowCount(), reader.getReaderRowCount());
assertEquals(reader.getFilePosition(), reader.getReaderPosition());
for (int i = 0; i < 5; i++) {
assertEquals(reader.nextBatch(), 20);
assertEquals(reader.getReaderPosition(), i * 20L);
assertEquals(reader.getFilePosition(), reader.getReaderPosition());
assertCurrentBatch(reader, i);
}
assertEquals(reader.nextBatch(), -1);
assertEquals(reader.getReaderPosition(), 100);
assertEquals(reader.getFilePosition(), reader.getReaderPosition());
}
}
}
@Test
public void testStripeSkipping()
throws Exception
{
try (TempFile tempFile = new TempFile()) {
createMultiStripeFile(tempFile.getFile());
// test reading second and fourth stripes
OrcPredicate predicate = (numberOfRows, statisticsByColumnIndex) -> {
if (numberOfRows == 100) {
return true;
}
IntegerStatistics stats = statisticsByColumnIndex.get(0).getIntegerStatistics();
return ((stats.getMin() == 60) && (stats.getMax() == 117)) ||
((stats.getMin() == 180) && (stats.getMax() == 237));
};
try (OrcBatchRecordReader reader = createCustomOrcRecordReader(tempFile, ORC, predicate, BIGINT, MAX_BATCH_SIZE, false, false)) {
assertEquals(reader.getFileRowCount(), 100);
assertEquals(reader.getReaderRowCount(), 40);
assertEquals(reader.getFilePosition(), 0);
assertEquals(reader.getReaderPosition(), 0);
// second stripe
assertEquals(reader.nextBatch(), 20);
assertEquals(reader.getReaderPosition(), 0);
assertEquals(reader.getFilePosition(), 20);
assertCurrentBatch(reader, 1);
// fourth stripe
assertEquals(reader.nextBatch(), 20);
assertEquals(reader.getReaderPosition(), 20);
assertEquals(reader.getFilePosition(), 60);
assertCurrentBatch(reader, 3);
assertEquals(reader.nextBatch(), -1);
assertEquals(reader.getReaderPosition(), 40);
assertEquals(reader.getFilePosition(), 100);
}
}
}
@Test
public void testCompleteFileWithAppendRowNumber()
throws Exception
{
try (TempFile tempFile = new TempFile()) {
// create single stripe file with multiple row groups
int rowCount = 142_000;
createSequentialFile(tempFile.getFile(), rowCount);
List<Long> expectedValues = new ArrayList<>();
expectedValues.addAll(LongStream.range(0, 142_000).collect(ArrayList::new, List::add, List::addAll));
OrcSelectiveRecordReader reader = createCustomOrcSelectiveRecordReader(tempFile, ORC, OrcPredicate.TRUE, BIGINT, MAX_BATCH_SIZE, false, true);
verifyAppendNumber(expectedValues, reader);
}
}
@Test
public void testCompleteFileWithAppendRowNumberWithValidation()
throws Exception
{
try (TempFile tempFile = new TempFile()) {
// create single stripe file with multiple row groups
int rowCount = 142_000;
createSequentialFile(tempFile.getFile(), rowCount);
List<Long> expectedValues = new ArrayList<>();
expectedValues.addAll(LongStream.range(0, 142_000).collect(ArrayList::new, List::add, List::addAll));
OrcSelectiveRecordReader reader = createCustomOrcSelectiveRecordReader(tempFile, ORC, OrcPredicate.TRUE, BIGINT, MAX_BATCH_SIZE, false, true);
verifyAppendNumber(expectedValues, reader);
}
}
@Test
public void testFilterFunctionWithAppendRowNumber()
throws Exception
{
try (TempFile tempFile = new TempFile()) {
int rowCount = 100;
createSequentialFile(tempFile.getFile(), rowCount);
List<Long> expectedValues = LongStream.range(0, 100).boxed().filter(input -> input % 2 != 0)
.collect(ArrayList::new, List::add, List::addAll);
ConnectorSession session = new TestingConnectorSession(ImmutableList.of());
FilterFunction filter = new FilterFunction(session.getSqlFunctionProperties(), true, new IsOddPredicate());
OrcSelectiveRecordReader reader = createCustomOrcSelectiveRecordReader(tempFile.getFile(),
ORC,
OrcPredicate.TRUE,
ImmutableList.of(BIGINT),
MAX_BATCH_SIZE,
ImmutableMap.of(),
ImmutableList.of(filter),
ImmutableMap.of(0, 0),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(0, BIGINT),
ImmutableList.of(0),
false,
new TestingHiveOrcAggregatedMemoryContext(),
true);
verifyAppendNumber(expectedValues, reader);
}
}
@Test
public void testRowGroupSkippingWithAppendRowNumber()
throws Exception
{
try (TempFile tempFile = new TempFile()) {
// create single stripe file with multiple row groups
int rowCount = 142_000;
createSequentialFile(tempFile.getFile(), rowCount);
// test reading two row groups from middle of file
OrcPredicate predicate = (numberOfRows, statisticsByColumnIndex) -> {
if (numberOfRows == rowCount) {
return true;
}
IntegerStatistics stats = statisticsByColumnIndex.get(0).getIntegerStatistics();
return (stats.getMin() == 50_000) || (stats.getMin() == 70_000);
};
List<Long> expectedValues = new ArrayList<>();
expectedValues.addAll(LongStream.range(50_000, 60_000).collect(ArrayList::new, List::add, List::addAll));
expectedValues.addAll(LongStream.range(70_000, 80_000).collect(ArrayList::new, List::add, List::addAll));
OrcSelectiveRecordReader reader = createCustomOrcSelectiveRecordReader(tempFile, ORC, predicate, BIGINT, MAX_BATCH_SIZE, false, true);
verifyAppendNumber(expectedValues, reader);
}
}
@Test
public void testStripeSkippingWithAppendNumber()
throws Exception
{
try (TempFile tempFile = new TempFile()) {
createMultiStripeFile(tempFile.getFile());
// EVery stripe has 20 rows and there are total of 5 stripes
// test reading second and fourth stripes
OrcPredicate predicate = (numberOfRows, statisticsByColumnIndex) -> {
if (numberOfRows == 100) {
return true;
}
IntegerStatistics stats = statisticsByColumnIndex.get(0).getIntegerStatistics();
return ((stats.getMin() == 60) && (stats.getMax() == 117)) ||
((stats.getMin() == 180) && (stats.getMax() == 237));
};
List<Long> expectedValues = new ArrayList<>();
expectedValues.addAll(LongStream.range(20, 40).collect(ArrayList::new, List::add, List::addAll));
expectedValues.addAll(LongStream.range(60, 80).collect(ArrayList::new, List::add, List::addAll));
List<Long> actualValues = new ArrayList<>();
OrcSelectiveRecordReader reader = createCustomOrcSelectiveRecordReader(tempFile, ORC, predicate, BIGINT, MAX_BATCH_SIZE, false, true);
assertNotNull(reader);
Page returnPage;
while (true) {
returnPage = reader.getNextPage();
if (returnPage == null) {
break;
}
Block rowNumberBlock = returnPage.getBlock(1);
for (int i = 0; i < returnPage.getPositionCount(); i++) {
actualValues.add(rowNumberBlock.getLong(i));
}
}
assertEquals(actualValues, expectedValues);
}
}
private void verifyAppendNumber(List<Long> expectedValues, OrcSelectiveRecordReader reader)
throws IOException
{
assertNotNull(reader);
assertNotNull(expectedValues);
List<Long> actualValues = new ArrayList<>();
while (true) {
Page returnPage = reader.getNextPage();
if (returnPage == null) {
break;
}
Block dataBlock = returnPage.getBlock(0);
Block rowNumberBlock = returnPage.getBlock(1);
for (int i = 0; i < returnPage.getPositionCount(); i++) {
actualValues.add(dataBlock.getLong(i));
assertEquals(dataBlock.getLong(i), rowNumberBlock.getLong(i));
}
}
assertEquals(actualValues, expectedValues);
}
@Test
public void testRowGroupSkipping()
throws Exception
{
try (TempFile tempFile = new TempFile()) {
// create single stripe file with multiple row groups
int rowCount = 142_000;
createSequentialFile(tempFile.getFile(), rowCount);
// test reading two row groups from middle of file
OrcPredicate predicate = (numberOfRows, statisticsByColumnIndex) -> {
if (numberOfRows == rowCount) {
return true;
}
IntegerStatistics stats = statisticsByColumnIndex.get(0).getIntegerStatistics();
return (stats.getMin() == 50_000) || (stats.getMin() == 60_000);
};
try (OrcBatchRecordReader reader = createCustomOrcRecordReader(tempFile, ORC, predicate, BIGINT, MAX_BATCH_SIZE, false, false)) {
assertEquals(reader.getFileRowCount(), rowCount);
assertEquals(reader.getReaderRowCount(), rowCount);
assertEquals(reader.getFilePosition(), 0);
assertEquals(reader.getReaderPosition(), 0);
long position = 50_000;
while (true) {
int batchSize = reader.nextBatch();
if (batchSize == -1) {
break;
}
Block block = reader.readBlock(0);
for (int i = 0; i < batchSize; i++) {
assertEquals(BIGINT.getLong(block, i), position + i);
}
assertEquals(reader.getFilePosition(), position);
assertEquals(reader.getReaderPosition(), position);
position += batchSize;
}
assertEquals(position, 70_000);
assertEquals(reader.getFilePosition(), rowCount);
assertEquals(reader.getReaderPosition(), rowCount);
}
}
}
@Test
public void testBatchSizesForVariableWidth()
throws Exception
{
// the test creates a table with one column and 10 row groups (i.e., 100K rows)
// the 1st row group has strings with each of length 300,
// the 2nd row group has strings with each of length 600,
// the 3rd row group has strings with each of length 900, and so on
// the test is to show when loading those strings,
// we are first bounded by MAX_BATCH_SIZE = 1024 rows because 1024 X 900B < 1MB
// then bounded by MAX_BLOCK_SIZE = 1MB because 1024 X 1200B > 1MB
try (TempFile tempFile = new TempFile()) {
// create single strip file with multiple row groups
int rowsInRowGroup = 10000;
int rowGroupCounts = 10;
int baseStringBytes = 300;
int rowCount = rowsInRowGroup * rowGroupCounts;
createGrowingSequentialFile(tempFile.getFile(), rowCount, rowsInRowGroup, baseStringBytes);
try (OrcBatchRecordReader reader = createCustomOrcRecordReader(tempFile, ORC, OrcPredicate.TRUE, VARCHAR, MAX_BATCH_SIZE, false, false)) {
assertEquals(reader.getFileRowCount(), rowCount);
assertEquals(reader.getReaderRowCount(), rowCount);
assertEquals(reader.getFilePosition(), 0);
assertEquals(reader.getReaderPosition(), 0);
// each value's length = original value length + 4 bytes to denote offset + 1 byte to denote if null
int currentStringBytes = baseStringBytes + Integer.BYTES + Byte.BYTES;
int rowCountsInCurrentRowGroup = 0;
while (true) {
int batchSize = reader.nextBatch();
if (batchSize == -1) {
break;
}
rowCountsInCurrentRowGroup += batchSize;
Block block = reader.readBlock(0);
if (MAX_BATCH_SIZE * currentStringBytes <= MAX_BLOCK_SIZE.toBytes()) {
// Either we are bounded by 1024 rows per batch, or it is the last batch in the row group
// For the first 3 row groups, the strings are of length 300, 600, and 900 respectively
// So the loaded data is bounded by MAX_BATCH_SIZE
assertTrue(block.getPositionCount() == MAX_BATCH_SIZE || rowCountsInCurrentRowGroup == rowsInRowGroup);
}
else {
// Either we are bounded by 1MB per batch, or it is the last batch in the row group
// From the 4th row group, the strings are have length > 1200
// So the loaded data is bounded by MAX_BLOCK_SIZE
assertTrue(block.getPositionCount() == MAX_BLOCK_SIZE.toBytes() / currentStringBytes || rowCountsInCurrentRowGroup == rowsInRowGroup);
}
if (rowCountsInCurrentRowGroup == rowsInRowGroup) {
rowCountsInCurrentRowGroup = 0;
currentStringBytes += baseStringBytes;
}
else if (rowCountsInCurrentRowGroup > rowsInRowGroup) {
fail("read more rows in the current row group");
}
}
}
}
}
@Test
public void testBatchSizesForFixedWidth()
throws Exception
{
// The test creates a table with one column and 10 row groups.
// Each row group has bigints of length 8 in bytes.
// The test is to show that the loaded data is always bounded by MAX_BATCH_SIZE because 1024 X 8B < 1MB
try (TempFile tempFile = new TempFile()) {
// create single strip file with multiple row groups
int rowsInRowGroup = 10_000;
int rowGroupCounts = 10;
int rowCount = rowsInRowGroup * rowGroupCounts;
createSequentialFile(tempFile.getFile(), rowCount);
try (OrcBatchRecordReader reader = createCustomOrcRecordReader(tempFile, ORC, OrcPredicate.TRUE, BIGINT, MAX_BATCH_SIZE, false, false)) {
assertEquals(reader.getFileRowCount(), rowCount);
assertEquals(reader.getReaderRowCount(), rowCount);
assertEquals(reader.getFilePosition(), 0);
assertEquals(reader.getReaderPosition(), 0);
int rowCountsInCurrentRowGroup = 0;
while (true) {
int batchSize = reader.nextBatch();
if (batchSize == -1) {
break;
}
rowCountsInCurrentRowGroup += batchSize;
Block block = reader.readBlock(0);
// 8 bytes per row; 1024 row at most given 1024 X 8B < 1MB
assertTrue(block.getPositionCount() == MAX_BATCH_SIZE || rowCountsInCurrentRowGroup == rowsInRowGroup);
if (rowCountsInCurrentRowGroup == rowsInRowGroup) {
rowCountsInCurrentRowGroup = 0;
}
else if (rowCountsInCurrentRowGroup > rowsInRowGroup) {
fail("read more rows in the current row group");
}
}
}
}
}
@Test
public void testReadUserMetadata()
throws Exception
{
try (TempFile tempFile = new TempFile()) {
Map<String, String> metadata = ImmutableMap.of(
"a", "ala",
"b", "ma",
"c", "kota");
createFileWithOnlyUserMetadata(tempFile.getFile(), metadata);
OrcDataSource orcDataSource = new FileOrcDataSource(tempFile.getFile(), new DataSize(1, MEGABYTE), new DataSize(1, MEGABYTE), new DataSize(1, MEGABYTE), true);
OrcReader orcReader = new OrcReader(
orcDataSource,
ORC,
new StorageOrcFileTailSource(),
new StorageStripeMetadataSource(),
NOOP_ORC_AGGREGATED_MEMORY_CONTEXT,
OrcReaderTestingUtils.createDefaultTestConfig(),
false,
NO_ENCRYPTION,
DwrfKeyProvider.EMPTY,
new RuntimeStats());
Footer footer = orcReader.getFooter();
Map<String, String> readMetadata = Maps.transformValues(footer.getUserMetadata(), Slice::toStringAscii);
assertEquals(readMetadata, metadata);
}
}
@Test
public void testBatchSizeGrowth()
throws Exception
{
try (TempFile tempFile = new TempFile()) {
// Create a file with 5 stripes of 20 rows each.
createMultiStripeFile(tempFile.getFile());
try (OrcBatchRecordReader reader = createCustomOrcRecordReader(tempFile, ORC, OrcPredicate.TRUE, BIGINT, INITIAL_BATCH_SIZE, false, false)) {
assertEquals(reader.getReaderRowCount(), 100);
assertEquals(reader.getReaderPosition(), 0);
assertEquals(reader.getFileRowCount(), reader.getReaderRowCount());
assertEquals(reader.getFilePosition(), reader.getReaderPosition());
// Since all columns are fixed size, all batches should be of 20 rows
int totalReadRows = 0;
while (true) {
int batchSize = reader.nextBatch();
if (batchSize == -1) {
break;
}
assertEquals(batchSize, 20);
assertEquals(reader.getReaderPosition(), totalReadRows);
assertEquals(reader.getFilePosition(), reader.getReaderPosition());
assertCurrentBatch(reader, (int) reader.getReaderPosition(), batchSize);
totalReadRows += batchSize;
}
assertEquals(reader.getReaderPosition(), 100);
assertEquals(reader.getFilePosition(), reader.getReaderPosition());
}
try (OrcBatchRecordReader reader = createCustomOrcRecordReader(tempFile, ORC, OrcPredicate.TRUE, ImmutableList.of(BIGINT, VARCHAR), INITIAL_BATCH_SIZE, false, false)) {
assertEquals(reader.getReaderRowCount(), 100);
assertEquals(reader.getReaderPosition(), 0);
assertEquals(reader.getFileRowCount(), reader.getReaderRowCount());
assertEquals(reader.getFilePosition(), reader.getReaderPosition());
// Since there is a variable width column, the batch size should start from INITIAL_BATCH_SIZE
// and grow by BATCH_SIZE_GROWTH_FACTOR. For INITIAL_BATCH_SIZE = 1 and BATCH_SIZE_GROWTH_FACTOR = 2,
// the batchSize sequence should be 1, 2, 4, 8, 5, 20, 20, 20, 20
int totalReadRows = 0;
int nextBatchSize = INITIAL_BATCH_SIZE;
int expectedBatchSize = INITIAL_BATCH_SIZE;
int rowCountsInCurrentRowGroup = 0;
while (true) {
int batchSize = reader.nextBatch();
if (batchSize == -1) {
break;
}
assertEquals(batchSize, expectedBatchSize);
assertEquals(reader.getReaderPosition(), totalReadRows);
assertEquals(reader.getFilePosition(), reader.getReaderPosition());
assertCurrentBatch(reader, (int) reader.getReaderPosition(), batchSize);
if (nextBatchSize > 20 - rowCountsInCurrentRowGroup) {
nextBatchSize *= BATCH_SIZE_GROWTH_FACTOR;
}
else {
nextBatchSize = batchSize * BATCH_SIZE_GROWTH_FACTOR;
}
rowCountsInCurrentRowGroup += batchSize;
totalReadRows += batchSize;
if (rowCountsInCurrentRowGroup == 20) {
rowCountsInCurrentRowGroup = 0;
}
else if (rowCountsInCurrentRowGroup > 20) {
fail("read more rows in the current row group");
}
expectedBatchSize = min(min(nextBatchSize, MAX_BATCH_SIZE), 20 - rowCountsInCurrentRowGroup);
}
assertEquals(reader.getReaderPosition(), 100);
assertEquals(reader.getFilePosition(), reader.getReaderPosition());
}
}
}
private static void assertCurrentBatch(OrcBatchRecordReader reader, int rowIndex, int batchSize)
throws IOException
{
Block block = reader.readBlock(0);
for (int i = 0; i < batchSize; i++) {
assertEquals(BIGINT.getLong(block, i), (rowIndex + i) * 3);
}
}
private static void assertCurrentBatch(OrcBatchRecordReader reader, int stripe)
throws IOException
{
Block block = reader.readBlock(0);
for (int i = 0; i < 20; i++) {
assertEquals(BIGINT.getLong(block, i), ((stripe * 20L) + i) * 3);
}
}
// write 5 stripes of 20 values each: (0,3,6,..,57), (60,..,117), .., (..297)
private static void createMultiStripeFile(File file)
throws IOException, ReflectiveOperationException, SerDeException
{
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(file, ORC_12, CompressionKind.NONE, ImmutableList.of(BIGINT, VARCHAR));
Serializer serde = new OrcSerde();
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector(ImmutableList.of(BIGINT, VARCHAR));
Object row = objectInspector.create();
StructField bigintField = objectInspector.getAllStructFieldRefs().get(0);
StructField varcharField = objectInspector.getAllStructFieldRefs().get(1);
for (int i = 0; i < 300; i += 3) {
if ((i > 0) && (i % 60 == 0)) {
flushWriter(writer);
}
objectInspector.setStructFieldData(row, bigintField, (long) i);
objectInspector.setStructFieldData(row, varcharField, String.valueOf(i));
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}
writer.close(false);
}
private static void createFileWithOnlyUserMetadata(File file, Map<String, String> metadata)
throws IOException
{
Configuration conf = new Configuration();
OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf)
.memory(new NullMemoryManager())
.inspector(createSettableStructObjectInspector("test", BIGINT))
.compress(SNAPPY);
Writer writer = OrcFile.createWriter(new Path(file.toURI()), writerOptions);
for (Map.Entry<String, String> entry : metadata.entrySet()) {
writer.addUserMetadata(entry.getKey(), ByteBuffer.wrap(entry.getValue().getBytes(UTF_8)));
}
writer.close();
}
private static void flushWriter(FileSinkOperator.RecordWriter writer)
throws IOException, ReflectiveOperationException
{
Field field = OrcOutputFormat.class.getClassLoader()
.loadClass(OrcOutputFormat.class.getName() + "$OrcRecordWriter")
.getDeclaredField("writer");
field.setAccessible(true);
((Writer) field.get(writer)).writeIntermediateFooter();
}
private static void createSequentialFile(File file, int count)
throws IOException, SerDeException
{
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(file, ORC_12, CompressionKind.NONE, BIGINT);
Serializer serde = new OrcSerde();
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector("test", BIGINT);
Object row = objectInspector.create();
StructField field = objectInspector.getAllStructFieldRefs().get(0);
for (int i = 0; i < count; i++) {
objectInspector.setStructFieldData(row, field, (long) i);
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}
writer.close(false);
}
private static void createGrowingSequentialFile(File file, int count, int step, int initialLength)
throws IOException, SerDeException
{
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(file, ORC_12, CompressionKind.NONE, VARCHAR);
Serializer serde = new OrcSerde();
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector("test", VARCHAR);
Object row = objectInspector.create();
StructField field = objectInspector.getAllStructFieldRefs().get(0);
StringBuilder builder = new StringBuilder();
for (int i = 0; i < initialLength; i++) {
builder.append("0");
}
String seedString = builder.toString();
// gradually grow the length of a cell
int previousLength = initialLength;
for (int i = 0; i < count; i++) {
if ((i / step + 1) * initialLength > previousLength) {
previousLength = (i / step + 1) * initialLength;
builder.append(seedString);
}
objectInspector.setStructFieldData(row, field, builder.toString());
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}
writer.close(false);
}
private static class IsOddPredicate
implements Predicate
{
@Override
public int[] getInputChannels()
{
return new int[] {0};
}
@Override
public boolean evaluate(SqlFunctionProperties properties, Page page, int position)
{
long number = page.getBlock(0).getLong(position);
return (number & 1) == 1;
}
}
}