TestIcebergSplitManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.iceberg;

import com.facebook.presto.Session;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.security.AllowAllAccessControl;
import com.facebook.presto.split.SplitManager;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.split.SplitSource.SplitBatch;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.TableProperties;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.IntStream;

import static com.facebook.presto.hive.HiveCommonSessionProperties.AFFINITY_SCHEDULING_FILE_SECTION_SIZE;
import static com.facebook.presto.hive.HiveCommonSessionProperties.NODE_SELECTION_STRATEGY;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED;
import static com.facebook.presto.iceberg.IcebergSessionProperties.TARGET_SPLIT_SIZE_BYTES;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TestIcebergSplitManager
        extends AbstractTestQueryFramework
{
    protected TestIcebergSplitManager() {}

    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        return IcebergQueryRunner.builder().build().getQueryRunner();
    }

    @Test
    public void testGetSplitsByPartitionMixNormalColumnsWithFilterPushdown()
    {
        testGetSplitsByPartitionMixNormalColumns("get_splits_with_filter_pushdown", true);
    }

    @Test
    public void testGetSplitsByPartitionMixNormalColumnsWithoutFilterPushdown()
    {
        testGetSplitsByPartitionMixNormalColumns("get_splits_without_filter_pushdown", false);
    }

    @Test
    public void testGetSplitsByNonIdentityPartitionColumnsWithFilterPushdown()
    {
        testGetSplitsByNonIdentityPartitionColumns("get_splits_by_nonidentity_with_filter_pushdown", true);
    }

    @Test
    public void testGetSplitsByNonIdentityPartitionColumnsWithoutFilterPushdown()
    {
        testGetSplitsByNonIdentityPartitionColumns("get_splits_by_nonidentity_without_filter_pushdown", false);
    }

    private void testGetSplitsByPartitionMixNormalColumns(String tableName, boolean filterPushdown)
    {
        assertUpdate("CREATE TABLE " + tableName + " (a varchar, b integer, r row(c int, d varchar)) WITH(partitioning = ARRAY['a'])");

        // Firstly, we build an Iceberg table which has three files:
        //      file1: partition column a's value is 'var1', and column b's value between (1, 3)
        //      file2: partition column a's value is 'var2', and column b's value between (8, 10)
        //      flie3: partition column a's value is 'var1', and column b's value between (2, 9)
        assertUpdate("INSERT INTO " + tableName + " VALUES ('var1', 1, (1001, 't1')), ('var1', 3, (1003, 't3'))", 2);
        assertUpdate("INSERT INTO " + tableName + " VALUES ('var2', 8, (1008, 't8')), ('var2', 10, (1010, 't10'))", 2);
        assertUpdate("INSERT INTO " + tableName + " VALUES ('var1', 2, (1002, 't2')), ('var1', 9, (1009, 't9'))", 2);

        TransactionManager transactionManager = getQueryRunner().getTransactionManager();
        SplitManager splitManager = getQueryRunner().getSplitManager();

        // Get a plan for query on <tableName> with filter `a = 'var1'`
        // There should be 2 splits after file scanning with the filter
        validateSplitsPlannedForSql(splitManager, transactionManager, filterPushdown,
                "select * from " + tableName + " where a = 'var1'",
                2);

        // Get a plan for query on <tableName> with filter `a = 'var1' and b = 5`
        // There should leave only 1 split after file scanning with the filter
        validateSplitsPlannedForSql(splitManager, transactionManager, filterPushdown,
                "select * from " + tableName + " where a = 'var1' and b = 5",
                1);

        // Get a plan for query on <tableName> with filter `b = 8`
        // There should be 2 splits after file scanning with the filter
        validateSplitsPlannedForSql(splitManager, transactionManager, filterPushdown,
                "select * from " + tableName + " where b = 8",
                2);

        // Get a plan for query on <tableName> with filter `b = 6`
        // There should leave only 1 splits after file scanning with the filter
        validateSplitsPlannedForSql(splitManager, transactionManager, filterPushdown,
                "select * from " + tableName + " where b = 6",
                1);

        // Get a plan for query on <tableName> with filter `a = 'var1' and r.c = 1008`
        // There should be 2 splits after file scanning with the filter
        validateSplitsPlannedForSql(splitManager, transactionManager, filterPushdown,
                "select * from " + tableName + " where a = 'var1' and r.c = 1008",
                2);

        assertQuerySucceeds("DROP TABLE " + tableName);
    }

    private void testGetSplitsByNonIdentityPartitionColumns(String tableName, boolean filterPushdown)
    {
        // Take transformer 'year()' for example
        assertUpdate("CREATE TABLE " + tableName + " (a date, b integer, r row(c int, d varchar)) WITH(partitioning = ARRAY['year(a)'])");

        // Firstly, we build an Iceberg table which has three files:
        //      file1: partition column a's value is year(1984) with range ('1984-01-08', '1984-07-08'), and column b's value between (1, 3)
        //      file2: partition column a's value is year(2001) with range ('2001-01-01', '2001-11-01'), and column b's value between (8, 10)
        //      flie3: partition column a's value is year(1984) with range ('1984-06-08', '1984-12-08'), and column b's value between (2, 9)
        assertUpdate("INSERT INTO " + tableName + " VALUES (date '1984-01-08', 1, (1001, 't1')), (date '1984-07-08', 3, (1003, 't3'))", 2);
        assertUpdate("INSERT INTO " + tableName + " VALUES (date '2001-01-01', 8, (1008, 't8')), (date '2001-11-01', 10, (1010, 't10'))", 2);
        assertUpdate("INSERT INTO " + tableName + " VALUES (date '1984-06-08', 2, (1002, 't2')), (date '1984-12-08', 9, (1009, 't9'))", 2);

        TransactionManager transactionManager = getQueryRunner().getTransactionManager();
        SplitManager splitManager = getQueryRunner().getSplitManager();

        // Get a plan for query on <tableName> with filter `a = date '1984-03-01'`
        // There should leave only 1 split after file scanning with the filter
        validateSplitsPlannedForSql(splitManager, transactionManager, filterPushdown,
                "select * from " + tableName + " where a = date '1984-03-01'",
                1);

        // Get a plan for query on <tableName> with filter `a = date '1984-06-09'`
        // There should be 2 splits after file scanning with the filter
        validateSplitsPlannedForSql(splitManager, transactionManager, filterPushdown,
                "select * from " + tableName + " where a = date '1984-06-09'",
                2);

        // Get a plan for query on <tableName> with filter `a = date '1984-06-09' and b = 8`
        // There should leave only 1 split after file scanning with the filter
        validateSplitsPlannedForSql(splitManager, transactionManager, filterPushdown,
                "select * from " + tableName + " where a = date '1984-06-09' and b = 8",
                1);

        assertQuerySucceeds("DROP TABLE " + tableName);
    }

    @Test
    public void testSplitSchedulingWithTablePropertyAndSession()
    {
        Session session = Session.builder(getSession())
                .setCatalogSessionProperty("iceberg", IcebergSessionProperties.TARGET_SPLIT_SIZE_BYTES, "0")
                .build();
        assertQuerySucceeds("CREATE TABLE test_split_size as SELECT * FROM UNNEST(sequence(1, 512)) as t(i)");
        // verify that the session property hasn't propagated into the table
        assertEquals(getQueryRunner().execute("SELECT value FROM \"test_split_size$properties\" WHERE key = 'read.split.target-size'").getOnlyValue(),
                Long.toString(TableProperties.SPLIT_SIZE_DEFAULT));
        assertQuerySucceeds("ALTER TABLE test_split_size SET PROPERTIES (\"read.split.target-size\" = 1)");
        String selectQuery = "SELECT * FROM test_split_size";
        long maxSplits = getSplitsForSql(session, selectQuery).size();

        IntStream.range(1, 5)
                .mapToObj(i -> Math.pow(2, i))
                .forEach(splitSize -> {
                    assertQuerySucceeds("ALTER TABLE test_split_size SET PROPERTIES (\"read.split.target-size\" =" + splitSize.intValue() + ")");
                    assertEquals(getSplitsForSql(session, selectQuery).size(), (double) maxSplits / splitSize, 5);
                });
        // split size should be set to 32 on the table property.
        // Set it to 1 with the session property to override the table value and verify we get the
        // same number of splits as when the table value is set to 1.
        Session minSplitSession = Session.builder(session)
                .setCatalogSessionProperty("iceberg", TARGET_SPLIT_SIZE_BYTES, "1")
                .build();
        assertEquals(getSplitsForSql(minSplitSession, selectQuery).size(), maxSplits);
        assertQuerySucceeds("DROP TABLE test_split_size");
    }

    @Test
    public void testSoftAffinitySchedulingSectionConfig()
    {
        Session maxIdentifiers = Session.builder(getSession())
                .setCatalogSessionProperty("iceberg", AFFINITY_SCHEDULING_FILE_SECTION_SIZE, "1B")
                .setCatalogSessionProperty("iceberg", TARGET_SPLIT_SIZE_BYTES, "1")
                .setCatalogSessionProperty("iceberg", NODE_SELECTION_STRATEGY, "SOFT_AFFINITY")
                .build();
        assertQuerySucceeds("CREATE TABLE test_affinity_section_scheduling as SELECT * FROM UNNEST(sequence(1, 512)) as t(i)");
        String selectQuery = "SELECT * FROM test_affinity_section_scheduling";
        Function<Session, Set<String>> getIdentifiers = (session) -> {
            List<Split> splits = getSplitsForSql(session, selectQuery);
            Set<String> allIdentifiers = new HashSet<>();
            splits.stream().map(Split::getConnectorSplit)
                    .forEach(connectorSplit -> connectorSplit.getPreferredNodes(identifier -> {
                        allIdentifiers.add(identifier);
                        return ImmutableList.of();
                    }));
            return allIdentifiers;
        };
        Set<String> maxSplitsIds = getIdentifiers.apply(maxIdentifiers);
        Set<String> halfSplitIds = getIdentifiers.apply(Session.builder(maxIdentifiers)
                .setCatalogSessionProperty("iceberg", AFFINITY_SCHEDULING_FILE_SECTION_SIZE, "2B").build());
        assertEquals((double) halfSplitIds.size() / maxSplitsIds.size(), 0.5, 1E-10);

        Set<String> singleSplitId = getIdentifiers.apply(Session.builder(maxIdentifiers)
                .setCatalogSessionProperty("iceberg", AFFINITY_SCHEDULING_FILE_SECTION_SIZE, "1GB").build());
        assertEquals(singleSplitId.size(), 1);
        assertQuerySucceeds("DROP TABLE test_affinity_section_scheduling");
    }

    private Session sessionWithFilterPushdown(boolean pushdown)
    {
        return Session.builder(getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty(ICEBERG_CATALOG, PUSHDOWN_FILTER_ENABLED, pushdown ? "true" : "false")
                .build();
    }

    private List<Split> getSplitsForSql(Session session, String sql)
    {
        TransactionManager transactionManager = getQueryRunner().getTransactionManager();
        SplitManager splitManager = getQueryRunner().getSplitManager();

        List<TableScanNode> tableScanNodes = getTableScanFromOptimizedPlanOfSql(sql, session);
        assertNotNull(tableScanNodes);
        assertEquals(tableScanNodes.size(), 1);

        TransactionId transactionId = transactionManager.beginTransaction(false);
        session = session.beginTransactionId(transactionId, transactionManager, new AllowAllAccessControl());
        TableHandle tableHandle = tableScanNodes.get(0).getTable();
        TableHandle newTableHandle = new TableHandle(tableHandle.getConnectorId(),
                tableHandle.getConnectorHandle(),
                transactionManager.getConnectorTransaction(transactionId, tableHandle.getConnectorId()),
                tableHandle.getLayout(),
                tableHandle.getDynamicFilter());

        try (SplitSource splitSource = splitManager.getSplits(session, newTableHandle, SplitSchedulingStrategy.UNGROUPED_SCHEDULING, WarningCollector.NOOP)) {
            ImmutableList.Builder<Split> splits = ImmutableList.builder();
            while (!splitSource.isFinished()) {
                splits.addAll(splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), 1024).get().getSplits());
            }
            assertTrue(splitSource.isFinished());
            return splits.build();
        }
        catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
        finally {
            transactionManager.asyncAbort(transactionId);
        }
    }

    private void validateSplitsPlannedForSql(SplitManager splitManager,
            TransactionManager transactionManager,
            boolean filterPushdown,
            String sql,
            int expectedSplitCount)
    {
        Session session = sessionWithFilterPushdown(filterPushdown);
        List<TableScanNode> tableScanNodes = getTableScanFromOptimizedPlanOfSql(sql, session);
        assertNotNull(tableScanNodes);
        assertEquals(tableScanNodes.size(), 1);

        TransactionId transactionId = transactionManager.beginTransaction(false);
        session = session.beginTransactionId(transactionId, transactionManager, new AllowAllAccessControl());
        TableHandle tableHandle = tableScanNodes.get(0).getTable();
        TableHandle newTableHandle = new TableHandle(tableHandle.getConnectorId(),
                tableHandle.getConnectorHandle(),
                transactionManager.getConnectorTransaction(transactionId, tableHandle.getConnectorId()),
                tableHandle.getLayout(),
                tableHandle.getDynamicFilter());

        try (SplitSource splitSource = splitManager.getSplits(session, newTableHandle, SplitSchedulingStrategy.UNGROUPED_SCHEDULING, WarningCollector.NOOP)) {
            SplitBatch splitBatch = splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), 1024).get();
            assertTrue(splitSource.isFinished());
            assertEquals(splitBatch.getSplits().size(), expectedSplitCount);
        }
        catch (Exception e) {
            fail("Should not throw exception when getting split source: " + e.getMessage());
        }
        finally {
            transactionManager.asyncAbort(transactionId);
        }
    }

    private List<TableScanNode> getTableScanFromOptimizedPlanOfSql(String sql, Session session)
    {
        PlanNode planNode = plan(sql, session).getRoot();

        TableScanNodesExtractingVisitor insertValuesExtractingVisitor = new TableScanNodesExtractingVisitor();
        insertValuesExtractingVisitor.visitPlan(planNode, null);
        return insertValuesExtractingVisitor.getTableScanNodes();
    }

    private static class TableScanNodesExtractingVisitor
            extends InternalPlanVisitor<Void, Void>
    {
        private final List<TableScanNode> tableScanNodes = new ArrayList<>();

        public List<TableScanNode> getTableScanNodes()
        {
            return tableScanNodes;
        }

        @Override
        public Void visitPlan(PlanNode node, Void context)
        {
            if (node.getSources() != null) {
                for (PlanNode source : node.getSources()) {
                    source.accept(this, context);
                }
            }
            return null;
        }

        @Override
        public Void visitTableScan(TableScanNode node, Void context)
        {
            tableScanNodes.add(node);
            return null;
        }
    }
}