TestPinotSplitManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.pinot;

import com.facebook.presto.pinot.query.PinotQueryGenerator;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.security.ConnectorIdentity;
import com.facebook.presto.sql.analyzer.FunctionsConfig;
import com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;

import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.pinot.PinotSplit.SplitType.BROKER;
import static com.facebook.presto.pinot.PinotSplit.SplitType.SEGMENT;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static java.util.Locale.ENGLISH;
import static java.util.stream.Collectors.toList;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestPinotSplitManager
        extends TestPinotQueryBase
{
    private static final String LIMIT_KEYWORD_SPLITTER = "LIMIT ";

    // Test table and related info
    private final PinotConfig pinotConfig = new PinotConfig();
    private final PinotConnection pinotConnection = new PinotConnection(new MockPinotClusterInfoFetcher(pinotConfig), pinotConfig, Executors.newSingleThreadExecutor());
    private final PinotSplitManager pinotSplitManager = new PinotSplitManager(pinotConnectorId, pinotConnection);

    @Test
    public void testRealtimeSegmentSplitsOneSegmentPerServer()
    {
        testSegmentSplitsHelperNoFilter(realtimeOnlyTable, 1, 4, false);  // 2 servers with 2 segments each
    }

    private void testSegmentSplitsHelperNoFilter(PinotTableHandle table, int segmentsPerSplit, int expectedNumSplits, boolean expectFilter)
    {
        PinotConfig pinotConfig = new PinotConfig().setForbidBrokerQueries(false);
        SessionHolder sessionHolder = new SessionHolder(pinotConfig);
        PlanBuilder planBuilder = createPlanBuilder(sessionHolder);
        PlanNode plan = tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch);
        PinotQueryGenerator.PinotQueryGeneratorResult pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(plan, sessionHolder.getConnectorSession()).get();
        List<PinotColumnHandle> expectedHandles = ImmutableList.copyOf(pinotQueryGeneratorResult.getContext().getAssignments().values());
        PinotQueryGenerator.GeneratedPinotQuery generatedSql = pinotQueryGeneratorResult.getGeneratedPinotQuery();
        PinotTableHandle pinotTableHandle = new PinotTableHandle(table.getConnectorId(), table.getSchemaName(), table.getTableName(), Optional.of(false), Optional.of(expectedHandles), Optional.of(generatedSql));
        List<PinotSplit> splits = getSplitsHelper(pinotTableHandle, segmentsPerSplit, false);
        assertSplits(splits, expectedNumSplits, SEGMENT);
        splits.forEach(s -> assertSegmentSplitWellFormed(s, expectFilter));
    }

    private void testSegmentSplitsHelperWithFilter(PinotTableHandle table, int segmentsPerSplit, int expectedNumSplits)
    {
        PinotConfig pinotConfig = new PinotConfig().setForbidBrokerQueries(false);
        SessionHolder sessionHolder = new SessionHolder(pinotConfig);
        PlanBuilder planBuilder = createPlanBuilder(sessionHolder);
        PlanNode plan = filter(planBuilder, tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch), getRowExpression("city = 'Boston'", sessionHolder));
        PinotQueryGenerator.PinotQueryGeneratorResult pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(plan, sessionHolder.getConnectorSession()).get();
        List<PinotColumnHandle> expectedHandles = ImmutableList.copyOf(pinotQueryGeneratorResult.getContext().getAssignments().values());
        PinotQueryGenerator.GeneratedPinotQuery generatedSql = pinotQueryGeneratorResult.getGeneratedPinotQuery();
        PinotTableHandle pinotTableHandle = new PinotTableHandle(table.getConnectorId(), table.getSchemaName(), table.getTableName(), Optional.of(false), Optional.of(expectedHandles), Optional.of(generatedSql));
        List<PinotSplit> splits = getSplitsHelper(pinotTableHandle, segmentsPerSplit, false);
        assertSplits(splits, expectedNumSplits, SEGMENT);
        splits.forEach(s -> assertSegmentSplitWellFormed(s, true));
    }

    @Test
    public void testRealtimeSegmentLimitLarge()
    {
        testSegmentLimitLarge(realtimeOnlyTable, 1000, 5000);
    }

    private void testSegmentLimitLarge(PinotTableHandle table, int sessionLimitLarge, int configLimitLarge)
    {
        PinotConfig pinotConfig = new PinotConfig().setLimitLargeForSegment(configLimitLarge);
        SessionHolder sessionHolder = new SessionHolder(pinotConfig);
        ConnectorSession session = createSessionWithLimitLarge(sessionLimitLarge, pinotConfig);
        PlanBuilder planBuilder = createPlanBuilder(sessionHolder);
        PlanNode plan = tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch);
        PinotQueryGenerator.PinotQueryGeneratorResult pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(plan, session).get();
        String[] limits = pinotQueryGeneratorResult.getGeneratedPinotQuery().getQuery().split("LIMIT ");
        assertEquals(Integer.parseInt(limits[1]), sessionLimitLarge);

        plan = tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch);
        pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(plan, sessionHolder.getConnectorSession()).get();
        limits = pinotQueryGeneratorResult.getGeneratedPinotQuery().getQuery().split("LIMIT ");
        assertEquals(Integer.parseInt(limits[1]), configLimitLarge);
    }

    @Test
    public void testBrokerTopNLarge()
    {
        testBrokerTopNLarge(realtimeOnlyTable, 1000, 5000);
    }

    private void testBrokerTopNLarge(PinotTableHandle table, int sessionTopNLarge, int configTopNLarge)
    {
        PinotConfig pinotConfig = new PinotConfig().setTopNLarge(configTopNLarge);
        SessionHolder sessionHolder = new SessionHolder(pinotConfig);
        ConnectorSession session = createSessionWithTopNLarge(sessionTopNLarge, pinotConfig);
        PlanBuilder planBuilder = createPlanBuilder(sessionHolder);
        PlanNode tableScanNode =
                tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch);
        AggregationNode aggregationNode = planBuilder.aggregation(
                aggregationNodeBuilder -> aggregationNodeBuilder
                        .source(tableScanNode)
                        .singleGroupingSet(variable("city"), variable("regionid"))
                        .addAggregation(planBuilder.variable("sum_fare"), getRowExpression("sum(fare)", sessionHolder))
                        .addAggregation(planBuilder.variable("count_regionid"), getRowExpression("count(regionid)", sessionHolder)));

        PinotQueryGenerator.PinotQueryGeneratorResult pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(aggregationNode, session).get();
        String[] limits = pinotQueryGeneratorResult.getGeneratedPinotQuery().getQuery().split(LIMIT_KEYWORD_SPLITTER);
        assertEquals(Integer.parseInt(limits[1]), sessionTopNLarge);

        aggregationNode = planBuilder.aggregation(
                aggregationNodeBuilder -> aggregationNodeBuilder
                        .source(tableScanNode)
                        .singleGroupingSet(variable("city"), variable("regionid"))
                        .addAggregation(planBuilder.variable("sum_fare"), getRowExpression("sum(fare)", sessionHolder))
                        .addAggregation(planBuilder.variable("count_regionid"), getRowExpression("count(regionid)", sessionHolder)));
        pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(aggregationNode, sessionHolder.getConnectorSession()).get();
        limits = pinotQueryGeneratorResult.getGeneratedPinotQuery().getQuery().split(LIMIT_KEYWORD_SPLITTER);
        assertEquals(Integer.parseInt(limits[1]), configTopNLarge);
    }

    @Test
    public void testGetTimeBoundaryForTable()
    {
        assertEquals(pinotConnection.getTimeBoundary("hybrid").getOfflineTimePredicate().get(), "secondsSinceEpoch < '4562345'");
        assertEquals(pinotConnection.getTimeBoundary("hybrid").getOnlineTimePredicate().get(), "secondsSinceEpoch >= '4562345'");
        assertEquals(pinotConnection.getTimeBoundary("hybridTableWithTsTimeColumn").getOfflineTimePredicate().get(), "ts < '2022-05-29 23:56:53.312'");
        assertEquals(pinotConnection.getTimeBoundary("hybridTableWithTsTimeColumn").getOnlineTimePredicate().get(), "ts >= '2022-05-29 23:56:53.312'");
        assertFalse(pinotConnection.getTimeBoundary("unknown").getOfflineTimePredicate().isPresent());
        assertFalse(pinotConnection.getTimeBoundary("unknown").getOfflineTimePredicate().isPresent());
    }

    @Test
    public void testSplitsBroker()
    {
        PinotQueryGenerator.GeneratedPinotQuery generatedSql = new PinotQueryGenerator.GeneratedPinotQuery(realtimeOnlyTable.getTableName(), String.format("SELECT %s, COUNT(1) FROM %s GROUP BY %s TOP %d", city.getColumnName(), realtimeOnlyTable.getTableName(), city.getColumnName(), pinotConfig.getTopNLarge()), ImmutableList.of(0, 1), false, true);
        PinotTableHandle pinotTableHandle = new PinotTableHandle(realtimeOnlyTable.getConnectorId(), realtimeOnlyTable.getSchemaName(), realtimeOnlyTable.getTableName(), Optional.of(true), Optional.of(ImmutableList.of(city, derived("count"))), Optional.of(generatedSql));
        List<PinotSplit> splits = getSplitsHelper(pinotTableHandle, 1, false);
        assertSplits(splits, 1, BROKER);
    }

    @Test(expectedExceptions = PinotSplitManager.QueryNotAdequatelyPushedDownException.class)
    public void testBrokerNonShortQuery()
    {
        PinotQueryGenerator.GeneratedPinotQuery generatedSql = new PinotQueryGenerator.GeneratedPinotQuery(realtimeOnlyTable.getTableName(), String.format("SELECT %s FROM %s", city.getColumnName(), realtimeOnlyTable.getTableName()), ImmutableList.of(0), false, false);
        PinotTableHandle pinotTableHandle = new PinotTableHandle(realtimeOnlyTable.getConnectorId(), realtimeOnlyTable.getSchemaName(), realtimeOnlyTable.getTableName(), Optional.of(false), Optional.of(ImmutableList.of(city)), Optional.of(generatedSql));
        List<PinotSplit> splits = getSplitsHelper(pinotTableHandle, 1, true);
        assertSplits(splits, 1, BROKER);
    }

    @Test
    public void testRealtimeSegmentSplitsManySegmentPerServer()
    {
        testSegmentSplitsHelperNoFilter(realtimeOnlyTable, Integer.MAX_VALUE, 2, false);
    }

    @Test
    public void testOfflineSegmentSplitsManySegmentPerServer()
    {
        testSegmentSplitsHelperNoFilter(offlineOnlyTable, Integer.MAX_VALUE, 2, false);
    }

    @Test
    public void testHybridSegmentSplitsOneSegmentPerServer()
    {
        testSegmentSplitsHelperNoFilter(hybridTable, 1, 8, true);
        testSegmentSplitsHelperWithFilter(hybridTable, 1, 8);
    }

    private void assertSplits(List<PinotSplit> splits, int numSplitsExpected, PinotSplit.SplitType splitType)
    {
        assertEquals(splits.size(), numSplitsExpected);
        splits.forEach(s -> assertEquals(s.getSplitType(), splitType));
    }

    private void assertSegmentSplitWellFormed(PinotSplit split, boolean expectFilter)
    {
        assertEquals(split.getSplitType(), SEGMENT);
        assertTrue(split.getSegmentPinotQuery().isPresent());
        assertTrue(split.getSegmentHost().isPresent());
        assertTrue(split.getGrpcHost().isPresent());
        assertTrue(split.getGrpcHost().get().length() > 0);
        assertEquals(split.getGrpcHost().get(), split.getSegmentHost().get());
        assertTrue(split.getGrpcPort().isPresent());
        assertEquals(split.getGrpcPort().get().intValue(), MockPinotClusterInfoFetcher.DEFAULT_GRPC_PORT);
        assertFalse(split.getSegments().isEmpty());
        String sql = split.getSegmentPinotQuery().get();
        assertFalse(sql.contains("__")); // templates should be fully resolved
        List<String> splitOnWhere = Splitter.on(" WHERE ").splitToList(sql);
        // There should be exactly one WHERE clause and it should partition the sql into two
        assertEquals(splitOnWhere.size(), expectFilter ? 2 : 1, "Expected to find only one WHERE clause in " + sql);
    }

    public static ConnectorSession createSessionWithNumSplits(int numSegmentsPerSplit, boolean forbidSegmentQueries, PinotConfig pinotConfig)
    {
        return new TestingConnectorSession(
                "user",
                new ConnectorIdentity("user", Optional.empty(), Optional.empty()),
                Optional.of("test"),
                Optional.empty(),
                UTC_KEY,
                ENGLISH,
                System.currentTimeMillis(),
                new PinotSessionProperties(pinotConfig).getSessionProperties(),
                ImmutableMap.of(
                        PinotSessionProperties.NUM_SEGMENTS_PER_SPLIT,
                        numSegmentsPerSplit,
                        PinotSessionProperties.FORBID_SEGMENT_QUERIES,
                        forbidSegmentQueries),
                new FunctionsConfig().isLegacyTimestamp(),
                Optional.empty(),
                ImmutableSet.of(),
                Optional.empty(),
                ImmutableMap.of());
    }

    public static ConnectorSession createSessionWithLimitLarge(int limitLarge, PinotConfig pinotConfig)
    {
        return new TestingConnectorSession(
                "user",
                new ConnectorIdentity("user", Optional.empty(), Optional.empty()),
                Optional.of("test"),
                Optional.empty(),
                UTC_KEY,
                ENGLISH,
                System.currentTimeMillis(),
                new PinotSessionProperties(pinotConfig).getSessionProperties(),
                ImmutableMap.of(
                        PinotSessionProperties.LIMIT_LARGE_FOR_SEGMENT,
                        limitLarge),
                new FunctionsConfig().isLegacyTimestamp(),
                Optional.empty(),
                ImmutableSet.of(),
                Optional.empty(),
                ImmutableMap.of());
    }

    public static ConnectorSession createSessionWithTopNLarge(int topNLarge, PinotConfig pinotConfig)
    {
        return new TestingConnectorSession(
                "user",
                new ConnectorIdentity("user", Optional.empty(), Optional.empty()),
                Optional.of("test"),
                Optional.empty(),
                UTC_KEY,
                ENGLISH,
                System.currentTimeMillis(),
                new PinotSessionProperties(pinotConfig).getSessionProperties(),
                ImmutableMap.of(
                        PinotSessionProperties.TOPN_LARGE,
                        topNLarge),
                new FunctionsConfig().isLegacyTimestamp(),
                Optional.empty(),
                ImmutableSet.of(),
                Optional.empty(),
                ImmutableMap.of());
    }

    private List<PinotSplit> getSplitsHelper(PinotTableHandle pinotTable, int numSegmentsPerSplit, boolean forbidSegmentQueries)
    {
        PinotTableLayoutHandle pinotTableLayout = new PinotTableLayoutHandle(pinotTable);
        ConnectorSession session = createSessionWithNumSplits(numSegmentsPerSplit, forbidSegmentQueries, pinotConfig);
        ConnectorSplitSource splitSource = pinotSplitManager.getSplits(null, session, pinotTableLayout, null);
        List<PinotSplit> splits = new ArrayList<>();
        while (!splitSource.isFinished()) {
            splits.addAll(getFutureValue(splitSource.getNextBatch(NOT_PARTITIONED, 1000)).getSplits().stream().map(s -> (PinotSplit) s).collect(toList()));
        }

        return splits;
    }
}