TestMapFlatSelectiveStreamReader.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.orc;

import com.facebook.presto.Session;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.function.SqlFunctionProperties;
import com.facebook.presto.common.predicate.FilterFunction;
import com.facebook.presto.common.predicate.TupleDomainFilter;
import com.facebook.presto.common.relation.Predicate;
import com.facebook.presto.common.type.BooleanType;
import com.facebook.presto.common.type.DoubleType;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.NamedTypeSignature;
import com.facebook.presto.common.type.RowFieldName;
import com.facebook.presto.common.type.SqlVarbinary;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeSignatureParameter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.testng.annotations.Test;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.IntStream;

import static com.facebook.presto.common.predicate.TupleDomainFilter.IS_NOT_NULL;
import static com.facebook.presto.common.predicate.TupleDomainFilter.IS_NULL;
import static com.facebook.presto.common.predicate.TupleDomainFilterUtils.toBigintValues;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.orc.OrcTester.arrayType;
import static com.facebook.presto.orc.OrcTester.assertFileContentsPresto;
import static com.facebook.presto.orc.OrcTester.filterRows;
import static com.facebook.presto.orc.OrcTester.mapType;
import static com.facebook.presto.orc.TestMapFlatSelectiveStreamReader.ExpectedValuesBuilder.Frequency.ALL;
import static com.facebook.presto.orc.TestMapFlatSelectiveStreamReader.ExpectedValuesBuilder.Frequency.ALL_EXCEPT_FIRST;
import static com.facebook.presto.orc.TestMapFlatSelectiveStreamReader.ExpectedValuesBuilder.Frequency.NONE;
import static com.facebook.presto.orc.TestMapFlatSelectiveStreamReader.ExpectedValuesBuilder.Frequency.SOME;
import static com.facebook.presto.orc.TestingOrcPredicate.createOrcPredicate;
import static com.facebook.presto.testing.TestingEnvironment.FUNCTION_AND_TYPE_MANAGER;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.stream.Collectors.toList;

public class TestMapFlatSelectiveStreamReader
{
    // TODO: Add tests for timestamp as value type

    private static final Type STRUCT_TYPE = FUNCTION_AND_TYPE_MANAGER.getParameterizedType(
            StandardTypes.ROW,
            ImmutableList.of(
                    TypeSignatureParameter.of(new NamedTypeSignature(Optional.of(new RowFieldName("value1", false)), IntegerType.INTEGER.getTypeSignature())),
                    TypeSignatureParameter.of(new NamedTypeSignature(Optional.of(new RowFieldName("value2", false)), IntegerType.INTEGER.getTypeSignature())),
                    TypeSignatureParameter.of(new NamedTypeSignature(Optional.of(new RowFieldName("value3", false)), IntegerType.INTEGER.getTypeSignature()))));

    private static final int NUM_ROWS = 31_234;

    @Test
    public void testByte()
            throws Exception
    {
        runTest("test_flat_map/flat_map_byte.dwrf",
                TINYINT,
                ExpectedValuesBuilder.get(Integer::byteValue));
    }

    @Test
    public void testByteWithNull()
            throws Exception
    {
        runTest("test_flat_map/flat_map_byte_with_null.dwrf",
                TINYINT,
                ExpectedValuesBuilder.get(Integer::byteValue).setNullValuesFrequency(SOME));
    }

    @Test
    public void testShort()
            throws Exception
    {
        runTest("test_flat_map/flat_map_short.dwrf",
                SMALLINT,
                ExpectedValuesBuilder.get(Integer::shortValue));
    }

    @Test
    public void testInteger()
            throws Exception
    {
        runTest("test_flat_map/flat_map_int.dwrf",
                INTEGER,
                ExpectedValuesBuilder.get(Function.identity()));
    }

    @Test
    public void testIntegerWithSharedDictionary()
            throws Exception
    {
        runTest("test_flat_map/flat_map_dict_share_simple.dwrf",
                INTEGER,
                ExpectedValuesBuilder.get(Function.identity()).setNumRows(2048));
    }

    @Test
    public void testIntegerWithNull()
            throws Exception
    {
        runTest("test_flat_map/flat_map_int_with_null.dwrf",
                INTEGER,
                ExpectedValuesBuilder.get(Function.identity()).setNullValuesFrequency(SOME));
    }

    @Test
    public void testLong()
            throws Exception
    {
        runTest("test_flat_map/flat_map_long.dwrf",
                BIGINT,
                ExpectedValuesBuilder.get(Integer::longValue));
    }

    @Test
    public void testString()
            throws Exception
    {
        runTest("test_flat_map/flat_map_string.dwrf",
                VARCHAR,
                ExpectedValuesBuilder.get(i -> Integer.toString(i)));
    }

    @Test
    public void testStringWithNull()
            throws Exception
    {
        runTest("test_flat_map/flat_map_string_with_null.dwrf",
                VARCHAR,
                ExpectedValuesBuilder.get(i -> Integer.toString(i)).setNullValuesFrequency(SOME));
    }

    @Test
    public void testBinary()
            throws Exception
    {
        runTest("test_flat_map/flat_map_binary.dwrf",
                VARBINARY,
                ExpectedValuesBuilder.get(i -> new SqlVarbinary(Integer.toString(i).getBytes(StandardCharsets.UTF_8))));
    }

    @Test
    public void testBoolean()
            throws Exception
    {
        runTest("test_flat_map/flat_map_boolean.dwrf",
                INTEGER,
                BooleanType.BOOLEAN,
                ExpectedValuesBuilder.get(Function.identity(), TestMapFlatSelectiveStreamReader::intToBoolean));
    }

    @Test
    public void testBooleanWithNull()
            throws Exception
    {
        runTest("test_flat_map/flat_map_boolean_with_null.dwrf",
                INTEGER,
                BooleanType.BOOLEAN,
                ExpectedValuesBuilder.get(Function.identity(), TestMapFlatSelectiveStreamReader::intToBoolean).setNullValuesFrequency(SOME));
    }

    @Test
    public void testFloat()
            throws Exception
    {
        runTest("test_flat_map/flat_map_float.dwrf",
                INTEGER,
                REAL,
                ExpectedValuesBuilder.get(Function.identity(), Float::valueOf));
    }

    @Test
    public void testFloatWithNull()
            throws Exception
    {
        runTest("test_flat_map/flat_map_float_with_null.dwrf",
                INTEGER,
                REAL,
                ExpectedValuesBuilder.get(Function.identity(), Float::valueOf).setNullValuesFrequency(SOME));
    }

    @Test
    public void testDouble()
            throws Exception
    {
        runTest("test_flat_map/flat_map_double.dwrf",
                INTEGER,
                DoubleType.DOUBLE,
                ExpectedValuesBuilder.get(Function.identity(), Double::valueOf));
    }

    @Test
    public void testDoubleWithNull()
            throws Exception
    {
        runTest("test_flat_map/flat_map_double_with_null.dwrf",
                INTEGER,
                DoubleType.DOUBLE,
                ExpectedValuesBuilder.get(Function.identity(), Double::valueOf).setNullValuesFrequency(SOME));
    }

    @Test
    public void testList()
            throws Exception
    {
        runTest(
                "test_flat_map/flat_map_list.dwrf",
                INTEGER,
                arrayType(INTEGER),
                ExpectedValuesBuilder.get(Function.identity(), TestMapFlatSelectiveStreamReader::intToList));
    }

    @Test
    public void testListWithNull()
            throws Exception
    {
        runTest(
                "test_flat_map/flat_map_list_with_null.dwrf",
                INTEGER,
                arrayType(INTEGER),
                ExpectedValuesBuilder.get(Function.identity(), TestMapFlatSelectiveStreamReader::intToList).setNullValuesFrequency(SOME));
    }

    @Test
    public void testMap()
            throws Exception
    {
        runTest(
                "test_flat_map/flat_map_map.dwrf",
                INTEGER,
                mapType(VARCHAR, REAL),
                ExpectedValuesBuilder.get(Function.identity(), TestMapFlatSelectiveStreamReader::intToMap));
    }

    @Test
    public void testMapWithNull()
            throws Exception
    {
        runTest(
                "test_flat_map/flat_map_map_with_null.dwrf",
                INTEGER,
                mapType(VARCHAR, REAL),
                ExpectedValuesBuilder.get(Function.identity(), TestMapFlatSelectiveStreamReader::intToMap).setNullValuesFrequency(SOME));
    }

    @Test
    public void testMapWithSharedDictionary()
            throws Exception
    {
        runTest(
                "test_flat_map/flat_map_dict_share_nested.dwrf",
                BIGINT,
                mapType(VARCHAR, INTEGER),
                ExpectedValuesBuilder.get(x -> (long) x, TestMapFlatSelectiveStreamReader::intToIntMap).setNumRows(2048));
    }

    @Test
    public void testStruct()
            throws Exception
    {
        runTest(
                "test_flat_map/flat_map_struct.dwrf",
                INTEGER,
                STRUCT_TYPE,
                ExpectedValuesBuilder.get(Function.identity(), TestMapFlatSelectiveStreamReader::intToList));
    }

    @Test
    public void testStructWithNull()
            throws Exception
    {
        runTest(
                "test_flat_map/flat_map_struct_with_null.dwrf",
                INTEGER,
                STRUCT_TYPE,
                ExpectedValuesBuilder.get(Function.identity(), TestMapFlatSelectiveStreamReader::intToList).setNullValuesFrequency(SOME));
    }

    // Some of the maps are null
    @Test
    public void testWithNulls()
            throws Exception
    {
        runTest(
                "test_flat_map/flat_map_some_null_maps.dwrf",
                INTEGER,
                ExpectedValuesBuilder.get(Function.identity()).setNullRowsFrequency(SOME));
    }

    // All maps are null
    @Test
    public void testWithAllNulls()
            throws Exception
    {
        runTest(
                "test_flat_map/flat_map_all_null_maps.dwrf",
                INTEGER,
                ExpectedValuesBuilder.get(Function.identity()).setNullRowsFrequency(ALL));
    }

    @Test
    public void testWithAllNullsExceptFirst()
            throws Exception
    {
        // A test case where every flat map is null except the first one
        runTest(
                "test_flat_map/flat_map_all_null_maps_except_first.dwrf",
                IntegerType.INTEGER,
                ExpectedValuesBuilder.get(Function.identity()).setNullRowsFrequency(ALL_EXCEPT_FIRST));
    }

    // Some maps are empty
    @Test
    public void testWithEmptyMaps()
            throws Exception
    {
        runTest(
                "test_flat_map/flat_map_some_empty_maps.dwrf",
                INTEGER,
                ExpectedValuesBuilder.get(Function.identity()).setEmptyMapsFrequency(SOME));
    }

    // All maps are empty
    @Test
    public void testWithAllEmptyMaps()
            throws Exception
    {
        runTest(
                "test_flat_map/flat_map_all_empty_maps.dwrf",
                INTEGER,
                ExpectedValuesBuilder.get(Function.identity()).setEmptyMapsFrequency(ALL));
    }

    // All maps are empty and encoding is not present
    @Test
    public void testWithAllEmptyMapsWithNoEncoding()
            throws Exception
    {
        runTest(
                "test_flat_map/flat_map_all_empty_maps_no_encoding.dwrf",
                INTEGER,
                ExpectedValuesBuilder.get(Function.identity()).setEmptyMapsFrequency(ALL));
    }

    @Test
    public void testMixedEncodings()
            throws Exception
    {
        // Values associated with one key are direct encoded, and all other keys are
        // dictionary encoded.  The dictionary encoded values have occasional values that only appear once
        // to ensure the IN_DICTIONARY stream is present, which means the checkpoints for dictionary encoded
        // values will have a different number of positions compared to direct encoded values.
        runTest("test_flat_map/flat_map_mixed_encodings.dwrf",
                INTEGER,
                ExpectedValuesBuilder.get(Function.identity()).setMixedEncodings());
    }

    @Test
    public void testIntegerWithMissingSequences()
            throws Exception
    {
        // The additional sequences IDs for a flat map aren't a consecutive range [1,N], the odd
        // sequence IDs have been removed.  This is to simulate the case where a file has been modified to delete
        // certain keys from the map by dropping the ColumnEncodings and the associated data.
        runTest("test_flat_map/flat_map_int_missing_sequences.dwrf",
                INTEGER,
                ExpectedValuesBuilder.get(Function.identity()).setMissingSequences());
    }

    @Test
    public void testIntegerWithMissingSequence0()
            throws Exception
    {
        // A test case where the (dummy) encoding for sequence 0 of the value node doesn't exist
        runTest("test_flat_map/flat_map_int_missing_sequence_0.dwrf",
                IntegerType.INTEGER,
                ExpectedValuesBuilder.get(Function.identity()));
    }

    private <K, V> void runTest(String testOrcFileName, Type type, ExpectedValuesBuilder<K, V> expectedValuesBuilder)
            throws Exception
    {
        runTest(testOrcFileName, type, type, expectedValuesBuilder);
    }

    private <K, V> void runTest(String testOrcFileName, Type keyType, Type valueType, ExpectedValuesBuilder<K, V> expectedValuesBuilder)
            throws Exception
    {
        List<Map<K, V>> expectedValues = expectedValuesBuilder.build();

        Type mapType = mapType(keyType, valueType);

        OrcPredicate orcPredicate = createOrcPredicate(0, mapType, expectedValues, OrcTester.Format.DWRF, true);

        runTest(testOrcFileName, mapType, expectedValues, orcPredicate, Optional.empty(), ImmutableList.of());
        runTest(testOrcFileName, mapType, expectedValues.stream().filter(Objects::isNull).collect(toList()), orcPredicate, Optional.of(IS_NULL), ImmutableList.of());
        runTest(testOrcFileName, mapType, expectedValues.stream().filter(Objects::nonNull).collect(toList()), orcPredicate, Optional.of(IS_NOT_NULL), ImmutableList.of());

        if (keyType != VARBINARY) {
            // read only some keys
            List<K> keys = expectedValues.stream().filter(Objects::nonNull).flatMap(v -> v.keySet().stream()).distinct().collect(toImmutableList());
            if (!keys.isEmpty()) {
                List<K> requiredKeys = ImmutableList.of(keys.get(0));
                runTest(testOrcFileName, mapType, pruneMaps(expectedValues, requiredKeys), orcPredicate, Optional.empty(), toSubfields(keyType, requiredKeys));

                List<Integer> keyIndices = ImmutableList.of(1, 3, 7, 11);
                requiredKeys = keyIndices.stream().filter(k -> k < keys.size()).map(keys::get).collect(toList());
                runTest(testOrcFileName, mapType, pruneMaps(expectedValues, requiredKeys), orcPredicate, Optional.empty(), toSubfields(keyType, requiredKeys));
            }
        }

        // read only some rows
        List<Integer> ids = IntStream.range(0, expectedValues.size()).map(i -> i % 10).boxed().collect(toImmutableList());
        ImmutableList<Type> types = ImmutableList.of(mapType, INTEGER);

        Map<Integer, Map<Subfield, TupleDomainFilter>> filters = ImmutableMap.of(1, ImmutableMap.of(new Subfield("c"), toBigintValues(new long[] {1, 5, 6}, true)));
        assertFileContentsPresto(
                types,
                OrcReaderTestingUtils.getResourceFile(testOrcFileName),
                filterRows(types, ImmutableList.of(expectedValues, ids), filters),
                OrcEncoding.DWRF,
                OrcPredicate.TRUE,
                Optional.of(filters),
                ImmutableList.of(),
                ImmutableMap.of(),
                ImmutableMap.of());

        TestingFilterFunction filterFunction = new TestingFilterFunction(mapType);
        assertFileContentsPresto(
                types,
                OrcReaderTestingUtils.getResourceFile(testOrcFileName),
                filterFunction.filterRows(ImmutableList.of(expectedValues, ids)),
                OrcEncoding.DWRF,
                OrcPredicate.TRUE,
                Optional.empty(),
                ImmutableList.of(filterFunction),
                ImmutableMap.of(0, 0),
                ImmutableMap.of());
    }

    private static <K, V> List<Map<K, V>> pruneMaps(List<Map<K, V>> maps, List<K> keys)
    {
        return maps.stream()
                .map(map -> map == null ? null : Maps.filterKeys(map, keys::contains))
                .collect(toList());
    }

    private static <K> List<Subfield> toSubfields(Type keyType, List<K> keys)
    {
        if (keyType == TINYINT || keyType == SMALLINT || keyType == INTEGER || keyType == BIGINT) {
            return keys.stream()
                    .map(Number.class::cast)
                    .mapToLong(Number::longValue)
                    .mapToObj(key -> new Subfield.LongSubscript(key))
                    .map(subscript -> new Subfield("c", ImmutableList.of(subscript)))
                    .collect(toImmutableList());
        }

        if (keyType == VARCHAR) {
            return keys.stream()
                    .map(String.class::cast)
                    .map(key -> new Subfield.StringSubscript(key))
                    .map(subscript -> new Subfield("c", ImmutableList.of(subscript)))
                    .collect(toImmutableList());
        }

        throw new UnsupportedOperationException("Unsupported key type: " + keyType);
    }

    private <K, V> void runTest(String testOrcFileName, Type mapType, List<Map<K, V>> expectedValues, OrcPredicate orcPredicate, Optional<TupleDomainFilter> filter, List<Subfield> subfields)
            throws Exception
    {
        List<Type> types = ImmutableList.of(mapType);
        Optional<Map<Integer, Map<Subfield, TupleDomainFilter>>> filters = filter.map(f -> ImmutableMap.of(new Subfield("c"), f))
                .map(f -> ImmutableMap.of(0, f));

        assertFileContentsPresto(
                types,
                OrcReaderTestingUtils.getResourceFile(testOrcFileName),
                filters.map(f -> filterRows(types, ImmutableList.of(expectedValues), f)).orElse(ImmutableList.of(expectedValues)),
                OrcEncoding.DWRF,
                orcPredicate,
                filters,
                ImmutableList.of(),
                ImmutableMap.of(),
                ImmutableMap.of(0, subfields));
    }

    private static boolean intToBoolean(int i)
    {
        return i % 2 == 0;
    }

    private static List<Integer> intToList(int i)
    {
        return ImmutableList.of(i * 3, i * 3 + 1, i * 3 + 2);
    }

    private static Map<String, Float> intToMap(int i)
    {
        return ImmutableMap.of(Integer.toString(i * 3), (float) (i * 3), Integer.toString(i * 3 + 1), (float) (i * 3 + 1), Integer.toString(i * 3 + 2), (float) (i * 3 + 2));
    }

    private static Map<String, Integer> intToIntMap(int i)
    {
        return ImmutableMap.of(Integer.toString(i * 3), i * 3, Integer.toString(i * 3 + 1), i * 3 + 1, Integer.toString(i * 3 + 2), i * 3 + 2);
    }

    static class ExpectedValuesBuilder<K, V>
    {
        enum Frequency
        {
            NONE,
            SOME,
            ALL,
            ALL_EXCEPT_FIRST
        }

        private final Function<Integer, K> keyConverter;
        private final Function<Integer, V> valueConverter;
        private Frequency nullValuesFrequency = NONE;
        private Frequency nullRowsFrequency = NONE;
        private Frequency emptyMapsFrequency = NONE;
        private boolean mixedEncodings;
        private boolean missingSequences;
        private int numRows = NUM_ROWS;

        private ExpectedValuesBuilder(Function<Integer, K> keyConverter, Function<Integer, V> valueConverter)
        {
            this.keyConverter = keyConverter;
            this.valueConverter = valueConverter;
        }

        public static <T> ExpectedValuesBuilder<T, T> get(Function<Integer, T> converter)
        {
            return new ExpectedValuesBuilder<>(converter, converter);
        }

        public static <K, V> ExpectedValuesBuilder<K, V> get(Function<Integer, K> keyConverter, Function<Integer, V> valueConverter)
        {
            return new ExpectedValuesBuilder<>(keyConverter, valueConverter);
        }

        public ExpectedValuesBuilder<K, V> setNullValuesFrequency(Frequency frequency)
        {
            this.nullValuesFrequency = frequency;

            return this;
        }

        public ExpectedValuesBuilder<K, V> setNullRowsFrequency(Frequency frequency)
        {
            this.nullRowsFrequency = frequency;

            return this;
        }

        public ExpectedValuesBuilder<K, V> setEmptyMapsFrequency(Frequency frequency)
        {
            this.emptyMapsFrequency = frequency;

            return this;
        }

        public ExpectedValuesBuilder<K, V> setMixedEncodings()
        {
            this.mixedEncodings = true;

            return this;
        }

        public ExpectedValuesBuilder<K, V> setMissingSequences()
        {
            this.missingSequences = true;

            return this;
        }

        public ExpectedValuesBuilder<K, V> setNumRows(int numRows)
        {
            this.numRows = numRows;
            return this;
        }

        public List<Map<K, V>> build()
        {
            List<Map<K, V>> result = new ArrayList<>(numRows);

            for (int i = 0; i < numRows; ++i) {
                if (passesFrequencyCheck(nullRowsFrequency, i)) {
                    result.add(null);
                }
                else if (passesFrequencyCheck(emptyMapsFrequency, i)) {
                    result.add(Collections.emptyMap());
                }
                else {
                    Map<K, V> row = new HashMap<>();

                    for (int j = 0; j < 3; j++) {
                        V value;
                        int key = (i * 3 + j) % 32;

                        if (missingSequences && key % 2 == 1) {
                            continue;
                        }

                        if (j == 0 && passesFrequencyCheck(nullValuesFrequency, i)) {
                            value = null;
                        }
                        else if (mixedEncodings && (key == 1 || j == 2)) {
                            // TODO: add comments to explain the condition
                            value = valueConverter.apply(i * 3 + j);
                        }
                        else {
                            value = valueConverter.apply((i * 3 + j) % 32);
                        }

                        row.put(keyConverter.apply(key), value);
                    }

                    result.add(row);
                }
            }

            return result;
        }

        private boolean passesFrequencyCheck(Frequency frequency, int i)
        {
            switch (frequency) {
                case NONE:
                    return false;
                case ALL:
                    return true;
                case SOME:
                    return i % 5 == 0;
                case ALL_EXCEPT_FIRST:
                    return i != 0;
                default:
                    throw new IllegalArgumentException("Got unexpected Frequency: " + frequency);
            }
        }
    }

    static class TestingFilterFunction
            extends FilterFunction
    {
        private static final Session TEST_SESSION = testSessionBuilder()
                .setCatalog("tpch")
                .setSchema("tiny")
                .build();

        private final Type mapType;

        public TestingFilterFunction(final Type mapType)
        {
            super(TEST_SESSION.getSqlFunctionProperties(), true, new Predicate()
            {
                @Override
                public int[] getInputChannels()
                {
                    return new int[] {0};
                }

                @Override
                public boolean evaluate(SqlFunctionProperties properties, Page page, int position)
                {
                    Block mapBlock = page.getBlock(0);
                    if (mapBlock.isNull(position)) {
                        return false;
                    }
                    Map map = (Map) mapType.getObjectValue(TEST_SESSION.getSqlFunctionProperties(), mapBlock, position);
                    return map.containsKey(1);
                }
            });
            this.mapType = mapType;
        }

        public List<List<?>> filterRows(List<List<?>> values)
        {
            List<Integer> passingRows = IntStream.range(0, values.get(0).size())
                    .filter(row -> values.get(0).get(row) != null)
                    .filter(row -> ((Map) values.get(0).get(row)).containsKey(1))
                    .boxed()
                    .collect(toList());
            return IntStream.range(0, values.size())
                    .mapToObj(column -> passingRows.stream().map(values.get(column)::get).collect(toList()))
                    .collect(toList());
        }
    }
}