TestSelectiveOrcReader.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc;
import com.facebook.presto.common.InvalidFunctionArgumentException;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.predicate.TupleDomainFilter;
import com.facebook.presto.common.predicate.TupleDomainFilter.BigintRange;
import com.facebook.presto.common.predicate.TupleDomainFilter.BigintValuesUsingHashTable;
import com.facebook.presto.common.predicate.TupleDomainFilter.BooleanValue;
import com.facebook.presto.common.predicate.TupleDomainFilter.BytesRange;
import com.facebook.presto.common.predicate.TupleDomainFilter.BytesValues;
import com.facebook.presto.common.predicate.TupleDomainFilter.DoubleRange;
import com.facebook.presto.common.predicate.TupleDomainFilter.FloatRange;
import com.facebook.presto.common.type.CharType;
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.SqlDate;
import com.facebook.presto.common.type.SqlDecimal;
import com.facebook.presto.common.type.SqlTimestamp;
import com.facebook.presto.common.type.SqlVarbinary;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.orc.OrcTester.OrcReaderSettings;
import com.facebook.presto.orc.metadata.CompressionKind;
import com.google.common.base.Strings;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Streams;
import com.google.common.primitives.Ints;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.joda.time.DateTimeZone;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.IntStream;
import static com.facebook.airlift.testing.Assertions.assertBetweenInclusive;
import static com.facebook.presto.common.predicate.TupleDomainFilter.IS_NOT_NULL;
import static com.facebook.presto.common.predicate.TupleDomainFilter.IS_NULL;
import static com.facebook.presto.common.predicate.TupleDomainFilterUtils.toBigintValues;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.CharType.createCharType;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.orc.NoOpOrcWriterStats.NOOP_WRITER_STATS;
import static com.facebook.presto.orc.OrcReader.MAX_BATCH_SIZE;
import static com.facebook.presto.orc.OrcTester.Format.DWRF;
import static com.facebook.presto.orc.OrcTester.HIVE_STORAGE_TIME_ZONE;
import static com.facebook.presto.orc.OrcTester.MAX_BLOCK_SIZE;
import static com.facebook.presto.orc.OrcTester.arrayType;
import static com.facebook.presto.orc.OrcTester.createCustomOrcSelectiveRecordReader;
import static com.facebook.presto.orc.OrcTester.mapType;
import static com.facebook.presto.orc.OrcTester.quickSelectiveOrcTester;
import static com.facebook.presto.orc.OrcTester.rowType;
import static com.facebook.presto.orc.OrcTester.writeOrcColumnsPresto;
import static com.facebook.presto.orc.TestingOrcPredicate.createOrcPredicate;
import static com.facebook.presto.orc.metadata.CompressionKind.NONE;
import static com.facebook.presto.orc.metadata.CompressionKind.ZLIB;
import static com.facebook.presto.orc.metadata.CompressionKind.ZSTD;
import static com.facebook.presto.testing.DateTimeTestingUtils.sqlTimestampOf;
import static com.facebook.presto.testing.TestingConnectorSession.SESSION;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.cycle;
import static com.google.common.collect.Iterables.limit;
import static com.google.common.collect.Lists.newArrayList;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.nCopies;
import static java.util.stream.Collectors.toList;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class TestSelectiveOrcReader
{
private static final int NUM_ROWS = 31_234;
private static final DecimalType DECIMAL_TYPE_PRECISION_2 = DecimalType.createDecimalType(2, 1);
private static final DecimalType DECIMAL_TYPE_PRECISION_4 = DecimalType.createDecimalType(4, 2);
private static final DecimalType DECIMAL_TYPE_PRECISION_19 = DecimalType.createDecimalType(19, 8);
private static final CharType CHAR_10 = createCharType(10);
private final OrcTester tester = quickSelectiveOrcTester();
@BeforeClass
public void setUp()
{
assertEquals(DateTimeZone.getDefault(), HIVE_STORAGE_TIME_ZONE);
}
@Test
public void testBooleanSequence()
throws Exception
{
tester.testRoundTrip(
BOOLEAN,
newArrayList(limit(cycle(ImmutableList.of(true, false, false)), NUM_ROWS)),
BooleanValue.of(true, false), TupleDomainFilter.IS_NULL);
tester.testRoundTripTypes(ImmutableList.of(BOOLEAN, BOOLEAN),
ImmutableList.of(
newArrayList(limit(cycle(ImmutableList.of(true, false, false)), NUM_ROWS)),
newArrayList(limit(cycle(ImmutableList.of(true, true, false)), NUM_ROWS))),
toSubfieldFilters(
ImmutableMap.of(0, BooleanValue.of(true, false)),
ImmutableMap.of(0, TupleDomainFilter.IS_NULL),
ImmutableMap.of(1, BooleanValue.of(true, false)),
ImmutableMap.of(0, BooleanValue.of(false, false), 1, BooleanValue.of(true, false))));
tester.testRoundTripTypes(
ImmutableList.of(BOOLEAN, BOOLEAN, BOOLEAN),
ImmutableList.of(newArrayList(true, false, null, false, true), newArrayList(null, null, null, null, null), newArrayList(true, false, null, false, null)),
toSubfieldFilters(ImmutableMap.of(0, BooleanValue.of(true, false), 1, BooleanValue.of(true, true), 2, BooleanValue.of(true, true))));
}
@Test
public void testByteValues()
throws Exception
{
List<Byte> byteValues = ImmutableList.of(1, 3, 5, 7, 11, 13, 17)
.stream()
.map(Integer::byteValue)
.collect(toList());
tester.testRoundTrip(TINYINT, byteValues, BigintValuesUsingHashTable.of(1, 17, new long[] {1, 17}, false), IS_NULL);
List<Map<Integer, Map<Subfield, TupleDomainFilter>>> filters = toSubfieldFilters(
ImmutableMap.of(0, BigintRange.of(1, 17, false)),
ImmutableMap.of(0, IS_NULL),
ImmutableMap.of(1, IS_NULL),
ImmutableMap.of(
0,
BigintRange.of(7, 17, false),
1,
BigintRange.of(12, 14, false)));
tester.testRoundTripTypes(ImmutableList.of(TINYINT, TINYINT), ImmutableList.of(byteValues, byteValues), filters);
tester.testRoundTripTypes(
ImmutableList.of(TINYINT, TINYINT, TINYINT),
ImmutableList.of(toByteArray(newArrayList(1, 2, null, 3, 4)), newArrayList(null, null, null, null, null), toByteArray(newArrayList(5, 6, null, 7, null))),
toSubfieldFilters(ImmutableMap.of(
0, BigintValuesUsingHashTable.of(1, 4, new long[] {1, 4}, false),
1, BigintValuesUsingHashTable.of(1, 5, new long[] {1, 5}, true),
2, BigintValuesUsingHashTable.of(5, 7, new long[] {5, 7}, true))));
}
@Test
public void testByteValuesRepeat()
throws Exception
{
List<Byte> byteValues = ImmutableList.of(1, 3, 5, 7, 11, 13, 17)
.stream()
.map(Integer::byteValue)
.collect(toList());
tester.testRoundTrip(
TINYINT,
newArrayList(limit(repeatEach(4, cycle(byteValues)), NUM_ROWS)),
BigintRange.of(1, 14, true));
}
@Test
public void testByteValuesPatchedBase()
throws Exception
{
List<Byte> byteValues = newArrayList(
limit(cycle(concat(
intsBetween(0, 18),
intsBetween(0, 18),
ImmutableList.of(NUM_ROWS, 20_000, 400_000, NUM_ROWS, 20_000))), NUM_ROWS))
.stream()
.map(Integer::byteValue)
.collect(toList());
tester.testRoundTrip(
TINYINT,
byteValues,
BigintRange.of(4, 14, true));
}
@Test
public void testDoubleSequence()
throws Exception
{
tester.testRoundTrip(DOUBLE, doubleSequence(0, 0.1, NUM_ROWS), DoubleRange.of(0, false, false, 1_000, false, false, false), IS_NULL, IS_NOT_NULL);
tester.testRoundTripTypes(
ImmutableList.of(DOUBLE, DOUBLE),
ImmutableList.of(
doubleSequence(0, 0.1, 10_000),
doubleSequence(0, 0.1, 10_000)),
toSubfieldFilters(
ImmutableMap.of(
0, DoubleRange.of(1.0, false, true, 7.0, false, true, true),
1, DoubleRange.of(3.0, false, true, 9.0, false, true, false))));
tester.testRoundTripTypes(
ImmutableList.of(DOUBLE, DOUBLE, DOUBLE),
ImmutableList.of(newArrayList(1.0, 2.0, null, 3.0, 4.0), newArrayList(null, null, null, null, null), newArrayList(1.0, 2.0, null, 3.0, null)),
toSubfieldFilters(ImmutableMap.of(
0, DoubleRange.of(1.0, false, true, 7.0, false, true, true),
1, DoubleRange.of(1.0, false, true, 7.0, false, true, true),
2, DoubleRange.of(1.0, false, true, 7.0, false, true, true))));
}
@Test
public void testDoubleNaNInfinity()
throws Exception
{
List<Map<Subfield, TupleDomainFilter>> filters = toSubfieldFilters(
DoubleRange.of(0, false, false, 1_000, false, false, false),
IS_NULL,
IS_NOT_NULL);
tester.testRoundTrip(DOUBLE, ImmutableList.of(1000.0, -1.0, Double.POSITIVE_INFINITY), filters);
tester.testRoundTrip(DOUBLE, ImmutableList.of(-1000.0, Double.NEGATIVE_INFINITY, 1.0), filters);
tester.testRoundTrip(DOUBLE, ImmutableList.of(0.0, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY), filters);
tester.testRoundTrip(DOUBLE, ImmutableList.of(Double.NaN, -1.0, 1.0), filters);
tester.testRoundTrip(DOUBLE, ImmutableList.of(Double.NaN, -1.0, Double.POSITIVE_INFINITY), filters);
tester.testRoundTrip(DOUBLE, ImmutableList.of(Double.NaN, Double.NEGATIVE_INFINITY, 1.0), filters);
tester.testRoundTrip(DOUBLE, ImmutableList.of(Double.NaN, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY), filters);
}
@Test
public void testLongSequence()
throws Exception
{
testRoundTripNumeric(intsBetween(0, 31_234), BigintRange.of(10, 100, false));
}
@Test
public void testNegativeLongSequence()
throws Exception
{
// A flaw in ORC encoding makes it impossible to represent timestamp
// between 1969-12-31 23:59:59.000, exclusive, and 1970-01-01 00:00:00.000, exclusive.
// Therefore, such data won't round trip and are skipped from test.
testRoundTripNumeric(intsBetween(-31_234, -999), BigintRange.of(-1000, -100, true));
}
@Test
public void testLongSequenceWithHoles()
throws Exception
{
testRoundTripNumeric(skipEvery(5, intsBetween(0, 31_234)), BigintRange.of(10, 100, false));
}
@Test
public void testLongDirect()
throws Exception
{
testRoundTripNumeric(limit(cycle(ImmutableList.of(1, 3, 5, 7, 11, 13, 17)), NUM_ROWS), BigintRange.of(4, 14, false));
Random random = new Random(0);
// read selected positions from all nulls column
tester.testRoundTripTypes(ImmutableList.of(INTEGER, INTEGER),
ImmutableList.of(
createList(NUM_ROWS, i -> random.nextInt(10)),
createList(NUM_ROWS, i -> null)),
toSubfieldFilters(ImmutableMap.of(0, BigintRange.of(0, 5, false))));
}
@Test
public void testLongDirect2()
throws Exception
{
List<Integer> values = IntStream.range(0, 31_234).boxed().collect(toList());
Collections.shuffle(values, new Random(0));
testRoundTripNumeric(values, BigintRange.of(4, 14, false));
}
@Test
public void testLongDirectVarintScale()
throws Exception
{
List<Long> values = varintScaleSequence(NUM_ROWS);
Collections.shuffle(values, new Random(0));
testRoundTripNumeric(values, BigintRange.of(0, 1L << 60, false));
}
@Test
public void testLongShortRepeat()
throws Exception
{
testRoundTripNumeric(limit(repeatEach(4, cycle(ImmutableList.of(1, 3, 5, 7, 11, 13, 17))), NUM_ROWS), BigintRange.of(4, 14, true));
}
@Test
public void testLongPatchedBase()
throws Exception
{
testRoundTripNumeric(limit(cycle(concat(intsBetween(0, 18), intsBetween(0, 18), ImmutableList.of(NUM_ROWS, 20_000, 400_000, NUM_ROWS, 20_000))), NUM_ROWS),
toBigintValues(new long[] {0, 5, 10, 15, 20_000}, true));
}
@Test
public void testLongStrideDictionary()
throws Exception
{
testRoundTripNumeric(concat(ImmutableList.of(1), nCopies(9999, 123), ImmutableList.of(2), nCopies(9999, 123)), BigintRange.of(123, 123, true));
}
@Test
public void testFloats()
throws Exception
{
List<Map<Subfield, TupleDomainFilter>> filters = toSubfieldFilters(
FloatRange.of(0.0f, false, true, 100.0f, false, true, true),
FloatRange.of(-100.0f, false, true, 0.0f, false, true, false),
IS_NULL);
tester.testRoundTrip(REAL, ImmutableList.copyOf(repeatEach(10, ImmutableList.of(-100.0f, 0.0f, 100.0f))), filters);
tester.testRoundTrip(REAL, ImmutableList.copyOf(repeatEach(10, ImmutableList.of(1000.0f, -1.23f, Float.POSITIVE_INFINITY))));
List<Float> floatValues = ImmutableList.of(1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f, 8.0f, 9.0f);
tester.testRoundTripTypes(
ImmutableList.of(REAL, REAL),
ImmutableList.of(
ImmutableList.copyOf(limit(repeatEach(4, cycle(floatValues)), 100)),
ImmutableList.copyOf(limit(repeatEach(4, cycle(floatValues)), 100))),
toSubfieldFilters(
ImmutableMap.of(
0, FloatRange.of(1.0f, false, true, 7.0f, false, true, true),
1, FloatRange.of(3.0f, false, true, 9.0f, false, true, false)),
ImmutableMap.of(
1, FloatRange.of(1.0f, false, true, 7.0f, false, true, true))));
tester.testRoundTripTypes(
ImmutableList.of(REAL, REAL, REAL),
ImmutableList.of(newArrayList(1.0f, 2.0f, 3.0f, 4.0f, 5.0f), newArrayList(null, null, null, null, null), newArrayList(1.0f, 2.0f, null, 3.0f, null)),
toSubfieldFilters(ImmutableMap.of(
0, FloatRange.of(2.0f, false, false, 7.0f, false, true, true),
1, FloatRange.of(1.0f, false, false, 7.0f, false, true, true),
2, FloatRange.of(1.0f, false, false, 7.0f, false, true, true))));
}
@Test
public void testFilterOrder()
throws Exception
{
Random random = new Random(0);
tester.testRoundTripTypesWithOrder(ImmutableList.of(INTEGER, INTEGER),
ImmutableList.of(newArrayList(limit(cycle(ImmutableList.of(1, 3, 5, 7, 11)), NUM_ROWS)), randomIntegers(NUM_ROWS, random)),
toSubfieldFilters(
ImmutableMap.of(
0, BigintRange.of(1, 1, true),
1, BigintRange.of(2, 200, true))),
ImmutableList.of(ImmutableList.of(0, 1)));
tester.testRoundTripTypesWithOrder(ImmutableList.of(INTEGER, INTEGER),
ImmutableList.of(newArrayList(limit(cycle(ImmutableList.of(1, 3, 5, 7, 11)), NUM_ROWS)), randomIntegers(NUM_ROWS, random)),
toSubfieldFilters(
ImmutableMap.of(
0, BigintRange.of(100, 100, false),
1, BigintRange.of(2, 200, true))),
ImmutableList.of(ImmutableList.of(0)));
tester.testRoundTripTypesWithOrder(
ImmutableList.of(INTEGER, INTEGER, DOUBLE, arrayType(INTEGER)),
ImmutableList.of(newArrayList(limit(cycle(ImmutableList.of(1, 3, 5, 7, 11)), NUM_ROWS)), createList(NUM_ROWS, i -> random.nextInt(200)), doubleSequence(0, 0.1, NUM_ROWS), nCopies(NUM_ROWS, randomIntegers(5, random))),
ImmutableList.of(
ImmutableMap.of(
0, toSubfieldFilter(BigintRange.of(1, 1, true)),
1, toSubfieldFilter(BigintRange.of(0, 200, true)),
2, toSubfieldFilter(DoubleRange.of(0, false, false, 0.1, false, false, true)),
3, toSubfieldFilter("c[1]", BigintRange.of(4, 4, false)))),
ImmutableList.of(ImmutableList.of(0, 1, 2, 3)));
}
@Test
public void testArrays()
throws Exception
{
Random random = new Random(0);
// non-null arrays of varying sizes; some arrays may be empty
tester.testRoundTrip(arrayType(INTEGER),
createList(NUM_ROWS, i -> randomIntegers(random.nextInt(10), random)),
IS_NULL, IS_NOT_NULL);
BigintRange negative = BigintRange.of(Integer.MIN_VALUE, 0, false);
BigintRange nonNegative = BigintRange.of(0, Integer.MAX_VALUE, false);
// arrays of strings
tester.testRoundTrip(arrayType(VARCHAR),
createList(1000, i -> randomStrings(5 + random.nextInt(5), random)),
ImmutableList.of(
toSubfieldFilter("c[1]", IS_NULL),
toSubfieldFilter("c[1]", stringIn(true, "a", "b", "c", "d"))));
tester.testRoundTrip(arrayType(VARCHAR),
createList(10, i -> randomStringsWithNulls(5 + random.nextInt(5), random)),
ImmutableList.of(
toSubfieldFilter("c[1]", IS_NULL),
toSubfieldFilter("c[1]", stringIn(true, "a", "b", "c", "d"))));
// non-empty non-null arrays of varying sizes
tester.testRoundTrip(arrayType(INTEGER),
createList(NUM_ROWS, i -> randomIntegers(5 + random.nextInt(5), random)),
ImmutableList.of(
toSubfieldFilter(IS_NULL),
toSubfieldFilter(IS_NOT_NULL),
// c[1] >= 0
toSubfieldFilter("c[1]", nonNegative),
// c[2] >= 0 AND c[4] >= 0
ImmutableMap.of(
new Subfield("c[2]"), nonNegative,
new Subfield("c[4]"), nonNegative)));
// non-null arrays of varying sizes; some arrays may be empty
tester.testRoundTripTypes(ImmutableList.of(INTEGER, arrayType(INTEGER)),
ImmutableList.of(
randomIntegers(NUM_ROWS, random),
createList(NUM_ROWS, i -> randomIntegers(random.nextInt(10), random))),
toSubfieldFilters(
ImmutableMap.of(0, nonNegative),
ImmutableMap.of(
0, nonNegative,
1, IS_NULL),
ImmutableMap.of(
0, nonNegative,
1, IS_NOT_NULL)));
// non-empty non-null arrays of varying sizes
tester.testRoundTripTypes(ImmutableList.of(INTEGER, arrayType(INTEGER)),
ImmutableList.of(
randomIntegers(NUM_ROWS, random),
createList(NUM_ROWS, i -> randomIntegers(5 + random.nextInt(5), random))),
ImmutableList.of(
// c[1] >= 0
ImmutableMap.of(
0, toSubfieldFilter(nonNegative),
1, toSubfieldFilter("c[1]", nonNegative)),
// c[3] >= 0
ImmutableMap.of(
0, toSubfieldFilter(nonNegative),
1, toSubfieldFilter("c[3]", nonNegative)),
// c[2] >= 0 AND c[4] <= 0
ImmutableMap.of(
0, toSubfieldFilter(nonNegative),
1, ImmutableMap.of(
new Subfield("c[2]"), nonNegative,
new Subfield("c[4]"), negative))));
// nested arrays
tester.testRoundTripTypes(ImmutableList.of(INTEGER, arrayType(arrayType(INTEGER))),
ImmutableList.of(
randomIntegers(NUM_ROWS, random),
createList(NUM_ROWS, i -> createList(random.nextInt(10), index -> randomIntegers(random.nextInt(5), random)))),
toSubfieldFilters(
ImmutableMap.of(0, nonNegative),
ImmutableMap.of(1, IS_NULL),
ImmutableMap.of(1, IS_NOT_NULL),
ImmutableMap.of(
0, nonNegative,
1, IS_NULL)));
tester.testRoundTripTypes(ImmutableList.of(INTEGER, arrayType(arrayType(INTEGER))),
ImmutableList.of(
randomIntegers(NUM_ROWS, random),
createList(NUM_ROWS, i -> createList(3 + random.nextInt(10), index -> randomIntegers(3 + random.nextInt(5), random)))),
ImmutableList.of(
// c[1] IS NULL
ImmutableMap.of(1, ImmutableMap.of(new Subfield("c[1]"), IS_NULL)),
// c[2] IS NOT NULL AND c[2][3] >= 0
ImmutableMap.of(1, ImmutableMap.of(
new Subfield("c[2]"), IS_NOT_NULL,
new Subfield("c[2][3]"), nonNegative)),
ImmutableMap.of(
0, toSubfieldFilter(nonNegative),
1, ImmutableMap.of(new Subfield("c[1]"), IS_NULL))));
}
@Test
public void testArraysWithSubfieldPruning()
throws Exception
{
tester.assertRoundTripWithSettings(arrayType(INTEGER),
createList(NUM_ROWS, i -> ImmutableList.of(1, 2, 3, 4)),
ImmutableList.of(
OrcReaderSettings.builder().addRequiredSubfields(0, "c[1]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[1]", "c[2]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[2]").build()));
Random random = new Random(0);
tester.assertRoundTripWithSettings(arrayType(INTEGER),
createList(NUM_ROWS, i -> ImmutableList.of(random.nextInt(10), random.nextInt(10), 3, 4)),
ImmutableList.of(
OrcReaderSettings.builder()
.addRequiredSubfields(0, "c[1]", "c[3]")
.setColumnFilters(ImmutableMap.of(0, ImmutableMap.of(new Subfield("c[1]"), BigintRange.of(0, 4, false))))
.build(),
OrcReaderSettings.builder()
.addRequiredSubfields(0, "c[2]", "c[3]")
.setColumnFilters(ImmutableMap.of(0, ImmutableMap.of(new Subfield("c[2]"), BigintRange.of(0, 4, false))))
.build()));
// arrays of arrays
tester.assertRoundTripWithSettings(arrayType(arrayType(INTEGER)),
createList(NUM_ROWS, i -> nCopies(1 + random.nextInt(5), ImmutableList.of(1, 2, 3))),
ImmutableList.of(
OrcReaderSettings.builder().addRequiredSubfields(0, "c[1][1]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[2][2]", "c[4][2]", "c[5][3]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[2][3]", "c[10][2]", "c[3][10]").build()));
// arrays of maps
tester.assertRoundTripWithSettings(arrayType(mapType(INTEGER, INTEGER)),
createList(NUM_ROWS, i -> nCopies(5, ImmutableMap.of(1, 10, 2, 20))),
ImmutableList.of(
OrcReaderSettings.builder().addRequiredSubfields(0, "c[1][1]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[2][1]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[2][1]", "c[4][1]", "c[3][2]").build()));
}
@Test
public void testArrayIndexOutOfBounds()
throws Exception
{
Random random = new Random(0);
// non-null arrays of varying sizes
try {
tester.testRoundTrip(arrayType(INTEGER),
createList(NUM_ROWS, i -> randomIntegers(random.nextInt(10), random)),
ImmutableList.of(ImmutableMap.of(new Subfield("c[2]"), IS_NULL)));
fail("Expected 'Array subscript out of bounds' exception");
}
catch (InvalidFunctionArgumentException e) {
assertTrue(e.getMessage().contains("Array subscript out of bounds"));
}
// non-null nested arrays of varying sizes
try {
tester.testRoundTrip(arrayType(arrayType(INTEGER)),
createList(NUM_ROWS, i -> ImmutableList.of(randomIntegers(random.nextInt(5), random), randomIntegers(random.nextInt(5), random))),
ImmutableList.of(ImmutableMap.of(new Subfield("c[2][3]"), IS_NULL)));
fail("Expected 'Array subscript out of bounds' exception");
}
catch (InvalidFunctionArgumentException e) {
assertTrue(e.getMessage().contains("Array subscript out of bounds"));
}
// empty arrays
try {
tester.testRoundTrip(arrayType(INTEGER),
nCopies(NUM_ROWS, ImmutableList.of()),
ImmutableList.of(ImmutableMap.of(new Subfield("c[2]"), IS_NULL)));
fail("Expected 'Array subscript out of bounds' exception");
}
catch (InvalidFunctionArgumentException e) {
assertTrue(e.getMessage().contains("Array subscript out of bounds"));
}
// empty nested arrays
try {
tester.testRoundTrip(arrayType(arrayType(INTEGER)),
nCopies(NUM_ROWS, ImmutableList.of()),
ImmutableList.of(ImmutableMap.of(new Subfield("c[2][3]"), IS_NULL)));
fail("Expected 'Array subscript out of bounds' exception");
}
catch (InvalidFunctionArgumentException e) {
assertTrue(e.getMessage().contains("Array subscript out of bounds"));
}
}
@Test
public void testArraysOfNulls()
throws Exception
{
for (Type type : ImmutableList.of(BOOLEAN, BIGINT, INTEGER, SMALLINT, TINYINT, DOUBLE, REAL, TIMESTAMP, DECIMAL_TYPE_PRECISION_19, DECIMAL_TYPE_PRECISION_4, VARCHAR, CHAR_10, VARBINARY, arrayType(INTEGER))) {
tester.testRoundTrip(arrayType(type),
nCopies(NUM_ROWS, nCopies(5, null)),
ImmutableList.of(
ImmutableMap.of(new Subfield("c[2]"), IS_NULL),
ImmutableMap.of(new Subfield("c[2]"), IS_NOT_NULL)));
}
}
@Test
public void testStructs()
throws Exception
{
Random random = new Random(0);
tester.testRoundTripTypes(ImmutableList.of(INTEGER, rowType(INTEGER, BOOLEAN)),
ImmutableList.of(
createList(NUM_ROWS, i -> random.nextInt()),
createList(NUM_ROWS, i -> ImmutableList.of(random.nextInt(), random.nextBoolean()))),
ImmutableList.of(
ImmutableMap.of(0, toSubfieldFilter(BigintRange.of(0, Integer.MAX_VALUE, false))),
ImmutableMap.of(1, toSubfieldFilter(IS_NULL)),
ImmutableMap.of(1, toSubfieldFilter(IS_NOT_NULL)),
ImmutableMap.of(1, toSubfieldFilter("c.field_0", BigintRange.of(0, Integer.MAX_VALUE, false))),
ImmutableMap.of(1, toSubfieldFilter("c.field_0", IS_NULL))));
tester.testRoundTripTypes(ImmutableList.of(rowType(INTEGER, BOOLEAN), INTEGER),
ImmutableList.of(
createList(NUM_ROWS, i -> i % 7 == 0 ? null : ImmutableList.of(random.nextInt(), random.nextBoolean())),
createList(NUM_ROWS, i -> i % 11 == 0 ? null : random.nextInt())),
ImmutableList.of(
ImmutableMap.of(0, toSubfieldFilter(IS_NOT_NULL), 1, toSubfieldFilter(IS_NULL)),
ImmutableMap.of(0, toSubfieldFilter("c.field_0", BigintRange.of(0, Integer.MAX_VALUE, false))),
ImmutableMap.of(0, toSubfieldFilter("c.field_0", BigintRange.of(0, Integer.MAX_VALUE, true)))));
}
@Test
public void testMaps()
throws Exception
{
Random random = new Random(0);
tester.testRoundTrip(mapType(INTEGER, INTEGER), createList(NUM_ROWS, i -> createMap(i)));
// map column with no nulls
tester.testRoundTripTypes(
ImmutableList.of(INTEGER, mapType(INTEGER, INTEGER)),
ImmutableList.of(
createList(NUM_ROWS, i -> random.nextInt()),
createList(NUM_ROWS, i -> createMap(i))),
toSubfieldFilters(
ImmutableMap.of(0, BigintRange.of(0, Integer.MAX_VALUE, false)),
ImmutableMap.of(1, IS_NOT_NULL),
ImmutableMap.of(1, IS_NULL)));
// map column with nulls
tester.testRoundTripTypes(
ImmutableList.of(INTEGER, mapType(INTEGER, INTEGER)),
ImmutableList.of(
createList(NUM_ROWS, i -> random.nextInt()),
createList(NUM_ROWS, i -> i % 5 == 0 ? null : createMap(i))),
toSubfieldFilters(
ImmutableMap.of(0, BigintRange.of(0, Integer.MAX_VALUE, false)),
ImmutableMap.of(1, IS_NOT_NULL),
ImmutableMap.of(1, IS_NULL),
ImmutableMap.of(0, BigintRange.of(0, Integer.MAX_VALUE, false), 1, IS_NULL),
ImmutableMap.of(0, BigintRange.of(0, Integer.MAX_VALUE, false), 1, IS_NOT_NULL)));
// map column with filter, followed by another column with filter
tester.testRoundTripTypes(
ImmutableList.of(mapType(INTEGER, INTEGER), INTEGER),
ImmutableList.of(
createList(NUM_ROWS, i -> i % 5 == 0 ? null : createMap(i)),
createList(NUM_ROWS, i -> random.nextInt())),
toSubfieldFilters(
ImmutableMap.of(0, IS_NULL, 1, BigintRange.of(0, Integer.MAX_VALUE, false)),
ImmutableMap.of(0, IS_NOT_NULL, 1, BigintRange.of(0, Integer.MAX_VALUE, false))));
// empty maps
tester.testRoundTripTypes(ImmutableList.of(INTEGER, mapType(INTEGER, INTEGER)),
ImmutableList.of(
createList(NUM_ROWS, i -> random.nextInt()),
Collections.nCopies(NUM_ROWS, ImmutableMap.of())),
ImmutableList.of());
// read selected positions from all nulls map column
tester.testRoundTripTypes(ImmutableList.of(INTEGER, mapType(INTEGER, INTEGER)),
ImmutableList.of(
createList(NUM_ROWS, i -> random.nextInt(10)),
createList(NUM_ROWS, i -> null)),
toSubfieldFilters(ImmutableMap.of(0, BigintRange.of(0, 5, false))));
}
@Test
public void testMapsWithSubfieldPruning()
throws Exception
{
tester.assertRoundTripWithSettings(mapType(INTEGER, INTEGER),
createList(NUM_ROWS, i -> ImmutableMap.of(1, 10, 2, 20)),
ImmutableList.of(
OrcReaderSettings.builder().addRequiredSubfields(0, "c[1]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[2]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[10]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[2]", "c[10]").build()));
// maps of maps
tester.assertRoundTripWithSettings(mapType(INTEGER, mapType(INTEGER, INTEGER)),
createList(NUM_ROWS, i -> ImmutableMap.of(1, createMap(i), 2, createMap(i + 1))),
ImmutableList.of(
OrcReaderSettings.builder().addRequiredSubfields(0, "c[1][1]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[2][2]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[2][1]", "c[10][1]").build()));
// maps of arrays
tester.assertRoundTripWithSettings(mapType(INTEGER, arrayType(INTEGER)),
createList(NUM_ROWS, i -> ImmutableMap.of(1, nCopies(5, 10), 2, nCopies(5, 20))),
ImmutableList.of(
OrcReaderSettings.builder().addRequiredSubfields(0, "c[1][1]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[2][2]", "c[2][3]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[2][2]", "c[10][3]", "c[2][10]").build(),
OrcReaderSettings.builder().addRequiredSubfields(0, "c[2][2]", "c[1][2]").build()));
}
private static Map<Integer, Integer> createMap(int seed)
{
int mapSize = Math.abs(seed) % 7 + 1;
return IntStream.range(0, mapSize).boxed().collect(toImmutableMap(Function.identity(), i -> i + seed));
}
private static <T> List<T> createList(int size, Function<Integer, T> createElement)
{
return IntStream.range(0, size).mapToObj(createElement::apply).collect(toList());
}
@Test
public void testDecimalSequence()
throws Exception
{
tester.testRoundTrip(DECIMAL_TYPE_PRECISION_4, decimalSequence("-3000", "1", 60_00, 4, 2));
tester.testRoundTrip(DECIMAL_TYPE_PRECISION_19, decimalSequence("-3000000000000000000", "100000000000000101", 60, 19, 8));
tester.testRoundTripTypes(
ImmutableList.of(DECIMAL_TYPE_PRECISION_2, DECIMAL_TYPE_PRECISION_2),
ImmutableList.of(
decimalSequence("-30", "1", 60, 2, 1),
decimalSequence("-30", "1", 60, 2, 1)),
toSubfieldFilters(ImmutableMap.of(0, TupleDomainFilter.BigintRange.of(10, 20, true))));
tester.testRoundTripTypes(
ImmutableList.of(DECIMAL_TYPE_PRECISION_2, DECIMAL_TYPE_PRECISION_2),
ImmutableList.of(
decimalSequence("-30", "1", 60, 2, 1),
decimalSequence("-30", "1", 60, 2, 1)),
toSubfieldFilters(ImmutableMap.of(
0, TupleDomainFilter.BigintRange.of(10, 30, true),
1, TupleDomainFilter.BigintRange.of(15, 25, true))));
tester.testRoundTripTypes(
ImmutableList.of(DECIMAL_TYPE_PRECISION_19, DECIMAL_TYPE_PRECISION_19),
ImmutableList.of(
decimalSequence("-3000000000000000000", "100000000000000101", 60, 19, 8),
decimalSequence("-3000000000000000000", "100000000000000101", 60, 19, 8)),
toSubfieldFilters(ImmutableMap.of(
0, TupleDomainFilter.LongDecimalRange.of(-28999999999L, -28999999999L,
false, true, 28999999999L, 28999999999L, false, true, true),
1, TupleDomainFilter.LongDecimalRange.of(1000000000L, 1000000000L,
false, true, 28999999999L, 28999999999L, false, true, true))));
}
@Test
public void testVarchars()
throws Exception
{
Random random = new Random(0);
tester.testRoundTripTypes(
ImmutableList.of(VARCHAR, VARCHAR, VARCHAR),
ImmutableList.of(newArrayList("abc", "def", null, "hij", "klm"), newArrayList(null, null, null, null, null), newArrayList("abc", "def", null, null, null)),
toSubfieldFilters(ImmutableMap.of(
0, stringIn(true, "abc", "def"),
1, stringIn(true, "10", "11"),
2, stringIn(true, "def", "abc"))));
// direct and dictionary
tester.testRoundTrip(VARCHAR, newArrayList(limit(cycle(ImmutableList.of("apple", "apple pie", "apple\uD835\uDC03", "apple\uFFFD")), NUM_ROWS)),
stringIn(false, "apple", "apple pie"));
// direct and dictionary materialized
tester.testRoundTrip(VARCHAR,
intsBetween(0, NUM_ROWS).stream().map(Object::toString).collect(toList()),
stringIn(false, "10", "11"),
stringIn(true, "10", "11"),
stringBetween(false, "14", "10"));
// direct & dictionary with filters
tester.testRoundTripTypes(
ImmutableList.of(VARCHAR, VARCHAR),
ImmutableList.of(
intsBetween(0, NUM_ROWS).stream().map(Object::toString).collect(toList()),
newArrayList(limit(cycle(ImmutableList.of("A", "B", "C")), NUM_ROWS))),
toSubfieldFilters(ImmutableMap.of(
0, stringBetween(true, "16", "10"),
1, stringBetween(false, "B", "A"))));
//stripe dictionary
tester.testRoundTrip(VARCHAR, newArrayList(concat(ImmutableList.of("a"), nCopies(9999, "123"), ImmutableList.of("b"), nCopies(9999, "123"))));
//empty sequence
tester.testRoundTrip(VARCHAR, nCopies(NUM_ROWS, ""), stringEquals(false, ""));
// copy of AbstractOrcTester::testDwrfInvalidCheckpointsForRowGroupDictionary
List<Integer> values = newArrayList(limit(
cycle(concat(
ImmutableList.of(1), nCopies(9999, 123),
ImmutableList.of(2), nCopies(9999, 123),
ImmutableList.of(3), nCopies(9999, 123),
nCopies(1_000_000, null))),
200_000));
tester.assertRoundTrip(
VARCHAR,
newArrayList(values).stream()
.map(value -> value == null ? null : String.valueOf(value))
.collect(toList()));
//copy of AbstractOrcTester::testDwrfInvalidCheckpointsForStripeDictionary
tester.testRoundTrip(
VARCHAR,
newArrayList(limit(cycle(ImmutableList.of(1, 3, 5, 7, 11, 13, 17)), 200_000)).stream()
.map(Object::toString)
.collect(toList()));
// presentStream is null in some row groups & dictionary materialized
Function<Integer, String> randomStrings = i -> String.valueOf(random.nextInt(NUM_ROWS));
tester.testRoundTripTypes(
ImmutableList.of(INTEGER, VARCHAR),
ImmutableList.of(createList(NUM_ROWS, i -> random.nextInt(NUM_ROWS)), newArrayList(createList(NUM_ROWS, randomStrings))),
toSubfieldFilters(ImmutableMap.of(0, BigintRange.of(10_000, 15_000, true))));
// dataStream is null and lengths are 0
tester.testRoundTrip(VARCHAR, newArrayList("", null), toSubfieldFilters(stringNotEquals(true, "")));
tester.testRoundTrip(VARCHAR, newArrayList("", ""), toSubfieldFilters(stringLessThan(true, "")));
}
@Test
public void testChars()
throws Exception
{
// multiple columns filter on not null
tester.testRoundTripTypes(ImmutableList.of(VARCHAR, createCharType(5)),
ImmutableList.of(
newArrayList(limit(cycle(ImmutableList.of("123456789", "23456789", "3456789")), NUM_ROWS)),
newArrayList(limit(cycle(ImmutableList.of("12345", "23456", "34567")), NUM_ROWS))),
toSubfieldFilters(ImmutableMap.of(0, IS_NOT_NULL)));
tester.testRoundTrip(createCharType(2), newArrayList(limit(cycle(ImmutableList.of("aa", "bb", "cc", "dd")), NUM_ROWS)), IS_NULL);
tester.testRoundTrip(
createCharType(1),
newArrayList(limit(cycle(ImmutableList.of("a", "b", "c", "d")), NUM_ROWS)),
stringIn(false, "a", "b"),
stringIn(true, "a", "b"));
// char with padding
tester.testRoundTrip(
CHAR_10,
intsBetween(0, NUM_ROWS).stream()
.map(i -> toCharValue(i, 10))
.collect(toList()));
// char with filter
// The values are padded with trailing spaces so that they are all 10 characters long
List<?> values = newArrayList(limit(cycle(ImmutableList.of(1, 3, 5, 7, 11, 13, 17)), NUM_ROWS)).stream()
.map(i -> toCharValue(i, 10))
.collect(toList());
// Test with filter c IN {'1','3'}. Note that the values should not have any paddings since the filter passed to the reader is without paddings for CHAR(n).
// The filter for the expected values is with the paddings because of the test limitations.
tester.testRoundTrip(
CHAR_10,
values,
ImmutableList.of(ImmutableMap.of(new Subfield("c"), stringIn(true, toCharValue(1, 1), toCharValue(3, 1)))),
ImmutableList.of(ImmutableMap.of(new Subfield("c"), stringIn(true, toCharValue(1, 10), toCharValue(3, 10)))));
// char with 0 truncated length
tester.testRoundTrip(CHAR_10, newArrayList(limit(cycle(toCharValue("", 10)), NUM_ROWS)));
tester.testRoundTrip(
VARCHAR,
newArrayList(concat(ImmutableList.of("a"), nCopies(9999, "123"), ImmutableList.of("b"), nCopies(9999, "123"))),
ImmutableList.of(ImmutableMap.of(new Subfield("c"), stringIn(false, "a", "b"))),
ImmutableList.of(ImmutableMap.of(new Subfield("c"), stringIn(false, "a", "b"))));
}
@Test
public void testVarBinaries()
throws Exception
{
tester.testRoundTrip(
VARBINARY,
createList(NUM_ROWS, i -> new SqlVarbinary(String.valueOf(i).getBytes(UTF_8))),
stringIn(false, "10", "11"));
tester.testRoundTripTypes(
ImmutableList.of(VARBINARY, VARBINARY),
ImmutableList.of(
createList(NUM_ROWS, i -> new SqlVarbinary(Ints.toByteArray(i))),
Streams.stream(limit(cycle(ImmutableList.of("A", "B", "C")), NUM_ROWS))
.map(String::getBytes)
.map(SqlVarbinary::new)
.collect(toImmutableList())),
toSubfieldFilters(ImmutableMap.of(
0, stringBetween(true, "10", "16"),
1, stringBetween(false, "A", "B"))));
tester.testRoundTrip(
VARBINARY,
createList(NUM_ROWS, i -> new SqlVarbinary(String.valueOf(i).getBytes(UTF_8))),
bytesBetween(false, new byte[] {8}, new byte[] {9}));
tester.testRoundTrip(
VARBINARY,
nCopies(NUM_ROWS, new SqlVarbinary(new byte[0])),
bytesBetween(false, new byte[0], new byte[] {1}));
tester.testRoundTrip(
VARBINARY,
ImmutableList.copyOf(limit(cycle(ImmutableList.of(1, 3, 5, 7, 11, 13, 17)), NUM_ROWS)).stream()
.map(String::valueOf)
.map(string -> string.getBytes(UTF_8))
.map(SqlVarbinary::new)
.collect(toImmutableList()),
bytesBetween(false, new byte[] {1}, new byte[] {12}));
}
@Test
public void testMemoryTracking()
throws Exception
{
testMemoryTracking(NONE, 150000L, 170000L);
testMemoryTracking(ZSTD, 150000L, 170000L);
testMemoryTracking(ZLIB, 220000L, 240000L);
}
private void testMemoryTracking(CompressionKind compression, long lowerRetainedMemoryBound, long upperRetainedMemoryBound)
throws Exception
{
List<Type> types = ImmutableList.of(INTEGER, VARCHAR, VARCHAR);
TempFile tempFile = new TempFile();
List<Integer> intValues = newArrayList(limit(
cycle(concat(
ImmutableList.of(1), nCopies(9999, 123),
ImmutableList.of(2), nCopies(9999, 123),
ImmutableList.of(3), nCopies(9999, 123),
nCopies(1_000_000, null))),
NUM_ROWS));
List<String> varcharDirectValues = newArrayList(limit(cycle(ImmutableList.of("A", "B", "C")), NUM_ROWS));
List<String> varcharDictionaryValues = newArrayList(limit(cycle(ImmutableList.of("apple", "apple pie", "apple\uD835\uDC03", "apple\uFFFD")), NUM_ROWS));
List<List<?>> values = ImmutableList.of(intValues, varcharDirectValues, varcharDictionaryValues);
writeOrcColumnsPresto(tempFile.getFile(), DWRF, compression, Optional.empty(), types, values, NOOP_WRITER_STATS);
OrcPredicate orcPredicate = createOrcPredicate(types, values, DWRF, false);
Map<Integer, Type> includedColumns = IntStream.range(0, types.size())
.boxed()
.collect(toImmutableMap(Function.identity(), types::get));
List<Integer> outputColumns = IntStream.range(0, types.size())
.boxed()
.collect(toImmutableList());
OrcAggregatedMemoryContext systemMemoryUsage = new TestingHiveOrcAggregatedMemoryContext();
try (OrcSelectiveRecordReader recordReader = createCustomOrcSelectiveRecordReader(
tempFile.getFile(),
DWRF.getOrcEncoding(),
orcPredicate,
types,
MAX_BATCH_SIZE,
ImmutableMap.of(),
OrcReaderSettings.builder().build().getFilterFunctions(),
OrcReaderSettings.builder().build().getFilterFunctionInputMapping(),
OrcReaderSettings.builder().build().getRequiredSubfields(),
ImmutableMap.of(),
ImmutableMap.of(),
includedColumns,
outputColumns,
false,
systemMemoryUsage,
false)) {
assertEquals(recordReader.getReaderPosition(), 0);
assertEquals(recordReader.getFilePosition(), 0);
int rowsProcessed = 0;
while (true) {
Page page = recordReader.getNextPage();
if (page == null) {
break;
}
int positionCount = page.getPositionCount();
if (positionCount == 0) {
continue;
}
page.getLoadedPage();
assertBetweenInclusive(systemMemoryUsage.getBytes(), lowerRetainedMemoryBound, upperRetainedMemoryBound);
rowsProcessed += positionCount;
}
assertEquals(rowsProcessed, NUM_ROWS);
}
}
@Test
public void testToZeroBasedColumnIndex() throws Exception
{
List<Type> types = ImmutableList.of(INTEGER, INTEGER, INTEGER);
TempFile tempFile = new TempFile();
List<Integer> column0 = ImmutableList.of(11111);
List<Integer> column1 = ImmutableList.of(33333);
List<Integer> column2 = ImmutableList.of(437856);
List<List<?>> values = ImmutableList.of(column0, column1, column2);
writeOrcColumnsPresto(tempFile.getFile(), DWRF, NONE, Optional.empty(), types, values, NOOP_WRITER_STATS);
Map<Integer, Type> includedColumns = ImmutableMap.of(3, INTEGER, 7, INTEGER, -100, INTEGER);
List<Integer> outputColumns = ImmutableList.of(-100, 7, 3);
try (OrcSelectiveRecordReader recordReader = createCustomOrcSelectiveRecordReader(
tempFile.getFile(),
DWRF.getOrcEncoding(),
OrcPredicate.TRUE,
types,
MAX_BATCH_SIZE,
ImmutableMap.of(),
OrcReaderSettings.builder().build().getFilterFunctions(),
OrcReaderSettings.builder().build().getFilterFunctionInputMapping(),
OrcReaderSettings.builder().build().getRequiredSubfields(),
ImmutableMap.of(),
ImmutableMap.of(),
includedColumns,
outputColumns,
false,
new TestingHiveOrcAggregatedMemoryContext(),
false)) {
assertEquals(recordReader.toZeroBasedColumnIndex(3).getAsInt(), 2);
assertEquals(recordReader.toZeroBasedColumnIndex(7).getAsInt(), 1);
assertEquals(recordReader.toZeroBasedColumnIndex(-100).getAsInt(), 0);
assertFalse(recordReader.toZeroBasedColumnIndex(1).isPresent());
}
}
@Test
public void testOutputNotRequired()
throws Exception
{
List<String> inputValues = newArrayList(limit(cycle(ImmutableList.of("A", "B", "C")), NUM_ROWS));
Map<Subfield, TupleDomainFilter> filters = ImmutableMap.of(new Subfield("c"), stringIn(true, "A", "B", "C"));
verifyOutputNotRequired(inputValues, filters, inputValues);
}
@Test
public void testOutputNotRequiredNonNullFilterNulls()
throws Exception
{
List<String> inputValues = newArrayList(limit(cycle(Arrays.asList("A", null)), NUM_ROWS));
List<String> expectedValues = newArrayList(limit(cycle(Arrays.asList("A")), (NUM_ROWS + 1) / 2));
Map<Subfield, TupleDomainFilter> filters = ImmutableMap.of(new Subfield("c"), IS_NOT_NULL);
verifyOutputNotRequired(inputValues, filters, expectedValues);
}
@Test
public void testOutputNotRequiredNonNullFilterNoNulls()
throws Exception
{
List<String> inputValues = newArrayList(limit(cycle(Arrays.asList("A", "B", "C")), NUM_ROWS));
Map<Subfield, TupleDomainFilter> filters = ImmutableMap.of(new Subfield("c"), IS_NOT_NULL);
verifyOutputNotRequired(inputValues, filters, inputValues);
}
private void verifyOutputNotRequired(List<String> inputValues, Map<Subfield, TupleDomainFilter> filters, List<String> expectedValues)
throws Exception
{
List<Type> types = ImmutableList.of(VARCHAR, VARCHAR);
TempFile tempFile = new TempFile();
List<List<?>> values = ImmutableList.of(inputValues, inputValues);
writeOrcColumnsPresto(tempFile.getFile(), DWRF, NONE, Optional.empty(), types, values, NOOP_WRITER_STATS);
OrcPredicate orcPredicate = createOrcPredicate(types, values, DWRF, false);
Map<Integer, Type> includedColumns = IntStream.range(0, types.size())
.boxed()
.collect(toImmutableMap(Function.identity(), types::get));
// Do not output column 0 but only column 1
List<Integer> outputColumns = ImmutableList.of(1);
try (OrcSelectiveRecordReader recordReader = createCustomOrcSelectiveRecordReader(
tempFile.getFile(),
DWRF.getOrcEncoding(),
orcPredicate,
types,
MAX_BATCH_SIZE,
ImmutableMap.of(0, filters),
OrcReaderSettings.builder().build().getFilterFunctions(),
OrcReaderSettings.builder().build().getFilterFunctionInputMapping(),
OrcReaderSettings.builder().build().getRequiredSubfields(),
ImmutableMap.of(),
ImmutableMap.of(),
includedColumns,
outputColumns,
false,
new TestingHiveOrcAggregatedMemoryContext(),
false)) {
assertEquals(recordReader.getReaderPosition(), 0);
assertEquals(recordReader.getFilePosition(), 0);
int rowsProcessed = 0;
while (true) {
Page page = recordReader.getNextPage();
if (page == null) {
break;
}
int positionCount = page.getPositionCount();
if (positionCount == 0) {
continue;
}
page.getLoadedPage();
// The output block should be the second block
assertBlockPositions(page.getBlock(0), expectedValues.subList(rowsProcessed, rowsProcessed + positionCount));
rowsProcessed += positionCount;
}
assertEquals(rowsProcessed, expectedValues.size());
}
}
@Test
public void testAdaptiveBatchSizes()
throws Exception
{
Type type = VARCHAR;
List<Type> types = ImmutableList.of(type);
TempFile tempFile = new TempFile();
List<String> values = new ArrayList<>();
int rowCount = 10000;
int longStringLength = 5000;
Random random = new Random();
for (int i = 0; i < rowCount; ++i) {
if (i < MAX_BATCH_SIZE) {
StringBuilder builder = new StringBuilder();
for (int j = 0; j < longStringLength; ++j) {
builder.append(random.nextInt(10));
}
values.add(builder.toString());
}
else {
values.add("");
}
}
writeOrcColumnsPresto(tempFile.getFile(), DWRF, NONE, Optional.empty(), types, ImmutableList.of(values), NOOP_WRITER_STATS);
try (OrcSelectiveRecordReader recordReader = createCustomOrcSelectiveRecordReader(tempFile, OrcEncoding.DWRF, OrcPredicate.TRUE, type, MAX_BATCH_SIZE, false, false)) {
assertEquals(recordReader.getFileRowCount(), rowCount);
assertEquals(recordReader.getReaderRowCount(), rowCount);
assertEquals(recordReader.getFilePosition(), 0);
assertEquals(recordReader.getReaderPosition(), 0);
// Size of the first batch should equal to the initial batch size (set to MAX_BATCH_SIZE)
Page page = recordReader.getNextPage();
assertNotNull(page);
page = page.getLoadedPage();
assertEquals(page.getPositionCount(), MAX_BATCH_SIZE);
// Later batches should be adjusted based on maxCombinedBytesPerRow collected during the first batch read
while (true) {
page = recordReader.getNextPage();
assertNotNull(page);
page = page.getLoadedPage();
if (recordReader.getReadPositions() < rowCount) {
assertEquals(page.getPositionCount(), MAX_BLOCK_SIZE.toBytes() / (longStringLength + Integer.BYTES + Byte.BYTES));
}
else {
break;
}
}
}
}
@Test
public void testHiddenConstantColumns()
throws Exception
{
List<Type> types = ImmutableList.of(BIGINT);
List<List<?>> values = ImmutableList.of(ImmutableList.of(1L, 2L));
TempFile tempFile = new TempFile();
writeOrcColumnsPresto(tempFile.getFile(), DWRF, ZSTD, Optional.empty(), types, values, NOOP_WRITER_STATS);
// Hidden columns like partition columns use negative indices (-13).
int hiddenColumnIndex = -13;
Map<Integer, Type> includedColumns = ImmutableMap.of(hiddenColumnIndex, VARCHAR, 0, BIGINT);
List<Integer> outputColumns = ImmutableList.of(hiddenColumnIndex, 0);
Slice constantSlice = Slices.utf8Slice("partition_value");
Map<Integer, Object> constantValues = ImmutableMap.of(hiddenColumnIndex, constantSlice);
OrcAggregatedMemoryContext systemMemoryUsage = new TestingHiveOrcAggregatedMemoryContext();
TupleDomainFilter filter = BigintRange.of(1, 1, false);
Map<Subfield, TupleDomainFilter> subFieldFilter = toSubfieldFilter(filter);
OrcReaderSettings readerSettings = OrcTester.OrcReaderSettings.builder().setColumnFilters(ImmutableMap.of(0, subFieldFilter)).build();
try (OrcSelectiveRecordReader recordReader = createCustomOrcSelectiveRecordReader(
tempFile.getFile(),
DWRF.getOrcEncoding(),
OrcPredicate.TRUE,
types,
1,
readerSettings.getColumnFilters(),
readerSettings.getFilterFunctions(),
readerSettings.getFilterFunctionInputMapping(),
readerSettings.getRequiredSubfields(),
constantValues,
ImmutableMap.of(),
includedColumns,
outputColumns,
false,
systemMemoryUsage,
false)) {
Page page = recordReader.getNextPage();
assertEquals(page.getPositionCount(), 1);
Block partitionValueBlock = page.getBlock(0);
int length = partitionValueBlock.getSliceLength(0);
Slice varcharSlice = partitionValueBlock.getSlice(0, 0, length);
assertEquals(varcharSlice, constantSlice);
Block bigintBlock = page.getBlock(1);
assertEquals(bigintBlock.getLong(0), 1);
assertNull(recordReader.getNextPage());
}
}
private void testRoundTripNumeric(Iterable<? extends Number> values, TupleDomainFilter filter)
throws Exception
{
List<Long> longValues = ImmutableList.copyOf(values).stream()
.map(Number::longValue)
.collect(toList());
List<Integer> intValues = longValues.stream()
.map(Long::intValue) // truncate values to int range
.collect(toList());
List<Short> shortValues = longValues.stream()
.map(Long::shortValue) // truncate values to short range
.collect(toList());
List<SqlDate> dateValues = longValues.stream()
.map(Long::intValue)
.map(SqlDate::new)
.collect(toList());
List<SqlTimestamp> timestamps = longValues.stream()
.map(timestamp -> sqlTimestampOf(timestamp & Integer.MAX_VALUE, SESSION))
.collect(toList());
tester.testRoundTrip(BIGINT, longValues, toSubfieldFilters(filter));
tester.testRoundTrip(INTEGER, intValues, toSubfieldFilters(filter));
tester.testRoundTrip(SMALLINT, shortValues, toSubfieldFilters(filter));
tester.testRoundTrip(DATE, dateValues, toSubfieldFilters(filter));
tester.testRoundTrip(TIMESTAMP, timestamps, toSubfieldFilters(filter));
List<Integer> reversedIntValues = new ArrayList<>(intValues);
Collections.reverse(reversedIntValues);
List<SqlDate> reversedDateValues = new ArrayList<>(dateValues);
Collections.reverse(reversedDateValues);
List<SqlTimestamp> reversedTimestampValues = new ArrayList<>(timestamps);
Collections.reverse(reversedTimestampValues);
tester.testRoundTripTypes(ImmutableList.of(BIGINT, INTEGER, SMALLINT, DATE, TIMESTAMP),
ImmutableList.of(
longValues,
reversedIntValues,
shortValues,
reversedDateValues,
reversedTimestampValues),
toSubfieldFilters(
ImmutableMap.of(0, filter),
ImmutableMap.of(1, filter),
ImmutableMap.of(0, filter, 1, filter),
ImmutableMap.of(0, filter, 1, filter, 2, filter)));
}
private static TupleDomainFilter stringBetween(boolean nullAllowed, String upper, String lower)
{
return BytesRange.of(lower.getBytes(), false, upper.getBytes(), false, nullAllowed);
}
private static TupleDomainFilter stringLessThan(boolean nullAllowed, String upper)
{
return BytesRange.of(null, true, upper.getBytes(), true, nullAllowed);
}
private static TupleDomainFilter stringEquals(boolean nullAllowed, String value)
{
return BytesRange.of(value.getBytes(), false, value.getBytes(), false, nullAllowed);
}
private static TupleDomainFilter stringNotEquals(boolean nullAllowed, String value)
{
return TupleDomainFilter.MultiRange.of(ImmutableList.of(
BytesRange.of(null, false, value.getBytes(), true, nullAllowed),
BytesRange.of(value.getBytes(), true, null, false, nullAllowed)), nullAllowed, false);
}
private static TupleDomainFilter stringIn(boolean nullAllowed, String... values)
{
return BytesValues.of(Arrays.stream(values).map(String::getBytes).toArray(byte[][]::new), nullAllowed);
}
private static ContiguousSet<Integer> intsBetween(int lowerInclusive, int upperExclusive)
{
return ContiguousSet.create(Range.closedOpen(lowerInclusive, upperExclusive), DiscreteDomain.integers());
}
private static BytesRange bytesBetween(boolean nullAllowed, byte[] lower, byte[] upper)
{
return BytesRange.of(lower, false, upper, false, nullAllowed);
}
private static <T> Iterable<T> skipEvery(int n, Iterable<T> iterable)
{
return () -> new AbstractIterator<T>()
{
private final Iterator<T> delegate = iterable.iterator();
private int position;
@Override
protected T computeNext()
{
while (true) {
if (!delegate.hasNext()) {
return endOfData();
}
T next = delegate.next();
position++;
if (position <= n) {
return next;
}
position = 0;
}
}
};
}
private static <T> Iterable<T> repeatEach(int n, Iterable<T> iterable)
{
return () -> new AbstractIterator<T>()
{
private final Iterator<T> delegate = iterable.iterator();
private int position;
private T value;
@Override
protected T computeNext()
{
if (position == 0) {
if (!delegate.hasNext()) {
return endOfData();
}
value = delegate.next();
}
position++;
if (position >= n) {
position = 0;
}
return value;
}
};
}
private static String toCharValue(Object value, int minLength)
{
return Strings.padEnd(value.toString(), minLength, ' ');
}
private static List<Double> doubleSequence(double start, double step, int items)
{
return IntStream.range(0, items)
.mapToDouble(i -> start + i * step)
.boxed()
.collect(ImmutableList.toImmutableList());
}
private static Map<Subfield, TupleDomainFilter> toSubfieldFilter(String subfield, TupleDomainFilter filter)
{
return ImmutableMap.of(new Subfield(subfield), filter);
}
private static Map<Subfield, TupleDomainFilter> toSubfieldFilter(TupleDomainFilter filter)
{
return ImmutableMap.of(new Subfield("c"), filter);
}
private static List<Map<Subfield, TupleDomainFilter>> toSubfieldFilters(TupleDomainFilter... filters)
{
return Arrays.stream(filters).map(TestSelectiveOrcReader::toSubfieldFilter).collect(toImmutableList());
}
private static List<Map<Integer, Map<Subfield, TupleDomainFilter>>> toSubfieldFilters(Map<Integer, TupleDomainFilter>... filters)
{
return Arrays.stream(filters)
.map(columnFilters -> Maps.transformValues(columnFilters, TestSelectiveOrcReader::toSubfieldFilter))
.collect(toImmutableList());
}
private static List<Integer> randomIntegers(int size, Random random)
{
return createList(size, i -> random.nextInt());
}
private static List<String> randomStrings(int size, Random random)
{
return createList(size, i -> generateRandomStringWithLength(random, 10));
}
private static List<String> randomStringsWithNulls(int size, Random random)
{
return createList(size, i -> i % 2 == 0 ? null : generateRandomStringWithLength(random, 10));
}
private static String generateRandomStringWithLength(Random random, int length)
{
byte[] array = new byte[length];
random.nextBytes(array);
return new String(array, UTF_8);
}
private static List<SqlDecimal> decimalSequence(String start, String step, int items, int precision, int scale)
{
BigInteger decimalStep = new BigInteger(step);
List<SqlDecimal> values = new ArrayList<>();
BigInteger nextValue = new BigInteger(start);
for (int i = 0; i < items; i++) {
values.add(new SqlDecimal(nextValue, precision, scale));
nextValue = nextValue.add(decimalStep);
}
return values;
}
private static List<Long> varintScaleSequence(int rows)
{
List<Long> values = new ArrayList();
long[] numbers = new long[] {1L, 1L << 8, 1L << 13, 1L << 20, 1L << 27, 1L << 34, 1L << 40, 1L << 47, 1L << 53, 1L << 60, 1L << 63};
for (int i = 0; i < rows; i++) {
values.add(numbers[i % numbers.length] + i);
values.add(-numbers[i % numbers.length] + i);
}
return values;
}
private static List<Byte> toByteArray(List<Integer> integers)
{
return integers.stream().map((i) -> i == null ? null : i.byteValue()).collect(toList());
}
private static <T> void assertBlockPositions(Block block, List<String> expectedValues)
{
assertEquals(block.getPositionCount(), expectedValues.size());
for (int position = 0; position < block.getPositionCount(); position++) {
if (expectedValues.get(position) == null) {
assertTrue(block.isNull(position));
}
else {
assertEquals(block.getSlice(position, 0, block.getSliceLength(position)), Slices.wrappedBuffer(expectedValues.get(position).getBytes()));
}
}
}
}