TestQueuesDb.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.execution.resourceGroups.db;

import com.facebook.presto.Session;
import com.facebook.presto.dispatcher.DispatchManager;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.resourceGroups.InternalResourceGroupManager;
import com.facebook.presto.resourceGroups.db.H2ResourceGroupsDao;
import com.facebook.presto.resourceGroups.reloading.ReloadingResourceGroupConfigurationManager;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.ResourceGroupInfo;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.facebook.airlift.testing.Assertions.assertContains;
import static com.facebook.airlift.testing.Closeables.closeQuietly;
import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_EXECUTION_TIME;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.FINISHED;
import static com.facebook.presto.execution.QueryState.QUEUED;
import static com.facebook.presto.execution.QueryState.RUNNING;
import static com.facebook.presto.execution.TestQueryRunnerUtil.cancelQuery;
import static com.facebook.presto.execution.TestQueryRunnerUtil.createQuery;
import static com.facebook.presto.execution.TestQueryRunnerUtil.waitForQueryState;
import static com.facebook.presto.execution.TestQueues.createResourceGroupId;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.TEST_ENVIRONMENT;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.adhocSession;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.createQueryRunner;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.dashboardSession;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.getDao;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.getDbConfigUrl;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.getSelectors;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.rejectingSession;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.waitForCompleteQueryCount;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.waitForRunningQueryCount;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_RESOURCE_GROUP;
import static com.facebook.presto.spi.StandardErrorCode.MISSING_RESOURCE_GROUP_SELECTOR;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_QUEUE_FULL;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

// run single threaded to avoid creating multiple query runners at once
@Test(singleThreaded = true)
public class TestQueuesDb
{
    // Copy of TestQueues with tests for db reconfiguration of resource groups
    private static final String LONG_LASTING_QUERY = "SELECT COUNT(*) FROM lineitem";
    private DistributedQueryRunner queryRunner;
    private H2ResourceGroupsDao dao;

    @BeforeMethod
    public void setup()
            throws Exception
    {
        String dbConfigUrl = getDbConfigUrl();
        dao = getDao(dbConfigUrl);
        queryRunner = createQueryRunner(dbConfigUrl, dao, ImmutableMap.of(), 1);
    }

    @AfterMethod(alwaysRun = true)
    public void tearDown()
    {
        closeQuietly(queryRunner);
        queryRunner = null;
    }

    @Test(timeOut = 60_000)
    public void testRunningQuery()
            throws Exception
    {
        queryRunner.execute("SELECT COUNT(*), clerk FROM orders GROUP BY clerk");
        while (true) {
            ResourceGroupInfo global = queryRunner.getCoordinator().getResourceGroupManager().get().getResourceGroupInfo(new ResourceGroupId(new ResourceGroupId("global"), "bi-user"), true, true, true);
            if (global.getSoftMemoryLimit().toBytes() > 0) {
                break;
            }
            TimeUnit.SECONDS.sleep(2);
        }
    }

    @Test(timeOut = 600_000)
    public void testBasic()
            throws Exception
    {
        // submit first "dashboard" query
        QueryId firstDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        // wait for the first "dashboard" query to start
        waitForQueryState(queryRunner, firstDashboardQuery, RUNNING);
        waitForRunningQueryCount(queryRunner, 1);
        // submit second "dashboard" query
        QueryId secondDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        MILLISECONDS.sleep(2000);
        // wait for the second "dashboard" query to be queued ("dashboard.${USER}" queue strategy only allows one "dashboard" query to be accepted for execution)
        waitForQueryState(queryRunner, secondDashboardQuery, QUEUED);
        waitForRunningQueryCount(queryRunner, 1);
        // Update db to allow for 1 more running query in dashboard resource group
        dao.updateResourceGroup(3, "user-${USER}", "1MB", 3, 4, 4, null, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
        dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 1, 2, 2, null, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
        waitForQueryState(queryRunner, secondDashboardQuery, RUNNING);
        QueryId thirdDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, thirdDashboardQuery, QUEUED);
        waitForRunningQueryCount(queryRunner, 2);
        // submit first non "dashboard" query
        QueryId firstNonDashboardQuery = createQuery(queryRunner, adhocSession(), LONG_LASTING_QUERY);
        // wait for the first non "dashboard" query to start
        waitForQueryState(queryRunner, firstNonDashboardQuery, RUNNING);
        waitForRunningQueryCount(queryRunner, 3);
        // submit second non "dashboard" query
        QueryId secondNonDashboardQuery = createQuery(queryRunner, adhocSession(), LONG_LASTING_QUERY);
        // wait for the second non "dashboard" query to start
        waitForQueryState(queryRunner, secondNonDashboardQuery, RUNNING);
        waitForRunningQueryCount(queryRunner, 4);
        // cancel first "dashboard" query, the second "dashboard" query and second non "dashboard" query should start running
        cancelQuery(queryRunner, firstDashboardQuery);
        waitForQueryState(queryRunner, firstDashboardQuery, FAILED);
        waitForQueryState(queryRunner, thirdDashboardQuery, RUNNING);
        waitForRunningQueryCount(queryRunner, 4);
        waitForCompleteQueryCount(queryRunner, 1);
    }

    //Disabling flaky test
    @Test(timeOut = 60_000, enabled = false)
    public void testTwoQueriesAtSameTime()
            throws Exception
    {
        dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 2, 1, 1, null, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
        QueryId firstDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        QueryId secondDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, firstDashboardQuery, RUNNING);
        waitForQueryState(queryRunner, secondDashboardQuery, QUEUED);
    }

    //@Test(timeOut = 90_000)
    public void testTooManyQueries()
            throws Exception
    {
        QueryId firstDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, firstDashboardQuery, RUNNING);

        QueryId secondDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, secondDashboardQuery, QUEUED);

        QueryId thirdDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, thirdDashboardQuery, FAILED);

        // Allow one more query to run and resubmit third query
        dao.updateResourceGroup(3, "user-${USER}", "1MB", 3, 4, 4, null, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
        dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 1, 2, 2, null, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);

        InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get();
        ReloadingResourceGroupConfigurationManager reloadingConfigurationManager = (ReloadingResourceGroupConfigurationManager) manager.getConfigurationManager();

        // Trigger reload to make the test more deterministic
        reloadingConfigurationManager.load();
        waitForQueryState(queryRunner, secondDashboardQuery, RUNNING);
        thirdDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, thirdDashboardQuery, QUEUED);

        // Lower running queries in dashboard resource groups and reload the config
        dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 1, 1, 1, null, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
        reloadingConfigurationManager.load();

        // Cancel query and verify that third query is still queued
        cancelQuery(queryRunner, firstDashboardQuery);
        waitForQueryState(queryRunner, firstDashboardQuery, FAILED);
        MILLISECONDS.sleep(2000);
        waitForQueryState(queryRunner, thirdDashboardQuery, QUEUED);
    }

    @Test(timeOut = 60_000)
    public void testRejection()
            throws Exception
    {
        InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get();
        ReloadingResourceGroupConfigurationManager reloadingConfigurationManager = (ReloadingResourceGroupConfigurationManager) manager.getConfigurationManager();
        // Verify the query cannot be submitted
        QueryId queryId = createQuery(queryRunner, rejectingSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, queryId, FAILED);
        DispatchManager dispatchManager = queryRunner.getCoordinator().getDispatchManager();
        assertEquals(dispatchManager.getQueryInfo(queryId).getErrorCode(), MISSING_RESOURCE_GROUP_SELECTOR.toErrorCode());
        int selectorCount = getSelectors(queryRunner).size();
        dao.insertSelector(4, 100_000, "user.*", "(?i).*reject.*", null, null, null, null);
        reloadingConfigurationManager.load();
        assertEquals(getSelectors(queryRunner).size(), selectorCount + 1);
        // Verify the query can be submitted
        queryId = createQuery(queryRunner, rejectingSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, queryId, RUNNING);
        dao.deleteSelector(4, "user.*", "(?i).*reject.*", null);
        reloadingConfigurationManager.load();
        // Verify the query cannot be submitted
        queryId = createQuery(queryRunner, rejectingSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, queryId, FAILED);
    }

    @Test(timeOut = 60_000)
    public void testQuerySystemTableResourceGroup()
            throws Exception
    {
        QueryId firstQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, firstQuery, RUNNING);
        MaterializedResult result = queryRunner.execute("SELECT resource_group_id FROM system.runtime.queries WHERE source = 'dashboard'");
        assertEquals(result.getOnlyValue(), ImmutableList.of("global", "user-user", "dashboard-user"));
    }

    @Test(timeOut = 60_000)
    public void testSelectorPriority()
            throws Exception
    {
        InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get();
        QueryManager queryManager = queryRunner.getCoordinator().getQueryManager();
        ReloadingResourceGroupConfigurationManager reloadingConfigurationManager = (ReloadingResourceGroupConfigurationManager) manager.getConfigurationManager();

        QueryId firstQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, firstQuery, RUNNING);

        Optional<ResourceGroupId> resourceGroup = queryManager.getFullQueryInfo(firstQuery).getResourceGroupId();
        assertTrue(resourceGroup.isPresent());
        assertEquals(resourceGroup.get().toString(), "global.user-user.dashboard-user");

        // create a new resource group that rejects all queries submitted to it
        dao.insertResourceGroup(10, "reject-all-queries", "1MB", 0, 0, 0, null, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);

        // add a new selector that has a higher priority than the existing dashboard selector and that routes queries to the "reject-all-queries" resource group
        dao.insertSelector(10, 200, "user.*", "(?i).*dashboard.*", null, null, null, null);

        // reload the configuration
        reloadingConfigurationManager.load();

        QueryId secondQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, secondQuery, FAILED);

        DispatchManager dispatchManager = queryRunner.getCoordinator().getDispatchManager();
        BasicQueryInfo basicQueryInfo = dispatchManager.getQueryInfo(secondQuery);
        assertEquals(basicQueryInfo.getErrorCode(), QUERY_QUEUE_FULL.toErrorCode());
    }

    @Test(timeOut = 60_000)
    public void testQueryExecutionTimeLimit()
            throws Exception
    {
        QueryManager queryManager = queryRunner.getCoordinator().getQueryManager();
        InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get();
        ReloadingResourceGroupConfigurationManager reloadingConfigurationManager = (ReloadingResourceGroupConfigurationManager) manager.getConfigurationManager();
        QueryId firstQuery = createQuery(
                queryRunner,
                testSessionBuilder()
                        .setCatalog("tpch")
                        .setSchema("sf100000")
                        .setSource("dashboard")
                        .setSystemProperty(QUERY_MAX_EXECUTION_TIME, "1ms")
                        .build(),
                LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, firstQuery, FAILED);
        assertEquals(queryManager.getFullQueryInfo(firstQuery).getErrorCode(), EXCEEDED_TIME_LIMIT.toErrorCode());
        assertContains(queryManager.getFullQueryInfo(firstQuery).getFailureInfo().getMessage(), "Query exceeded the maximum execution time limit of 1.00ms");
        // set max running queries to 0 for the dashboard resource group so that new queries get queued immediately
        dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 1, null, 0, null, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
        reloadingConfigurationManager.load();
        QueryId secondQuery = createQuery(
                queryRunner,
                testSessionBuilder()
                        .setCatalog("tpch")
                        .setSchema("sf100000")
                        .setSource("dashboard")
                        .setSystemProperty(QUERY_MAX_EXECUTION_TIME, "1ms")
                        .build(),
                LONG_LASTING_QUERY);
        //this query should immediately get queued
        waitForQueryState(queryRunner, secondQuery, QUEUED);
        // after a 5s wait this query should still be QUEUED, not FAILED as the max execution time should be enforced after the query starts running
        Thread.sleep(5_000);
        DispatchManager dispatchManager = queryRunner.getCoordinator().getDispatchManager();
        assertEquals(dispatchManager.getQueryInfo(secondQuery).getState(), QUEUED);
        // reconfigure the resource group to run the second query
        dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 1, null, 1, null, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
        reloadingConfigurationManager.load();
        // cancel the first one and let the second one start
        dispatchManager.cancelQuery(firstQuery);
        // wait until the second one is FAILED
        waitForQueryState(queryRunner, secondQuery, FAILED);
    }

    @Test
    public void testQueryTypeBasedSelection()
            throws InterruptedException
    {
        Session session = testSessionBuilder()
                .setCatalog("tpch")
                .setSchema("sf100000")
                .build();
        QueryId queryId = createQuery(queryRunner, session, "EXPLAIN " + LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, queryId, ImmutableSet.of(RUNNING, FINISHED));
        Optional<ResourceGroupId> resourceGroupId = queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(queryId).getResourceGroupId();
        assertTrue(resourceGroupId.isPresent(), "Query should have a resource group");
        assertEquals(resourceGroupId.get(), createResourceGroupId("explain"));
    }

    @Test
    public void testClientTagsBasedSelection()
            throws InterruptedException
    {
        assertResourceGroupWithClientTags(ImmutableSet.of("tag1"), createResourceGroupId("global", "bi-user"));
        assertResourceGroupWithClientTags(ImmutableSet.of("tag1", "tag2"), createResourceGroupId("global", "user-user", "adhoc-user"));
    }

    @Test
    public void testNonLeafGroup()
            throws Exception
    {
        Session session = testSessionBuilder()
                .setCatalog("tpch")
                .setSchema("sf100000")
                .setSource("non-leaf")
                .build();
        InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get();
        ReloadingResourceGroupConfigurationManager reloadingConfigurationManager = (ReloadingResourceGroupConfigurationManager) manager.getConfigurationManager();
        int originalSize = getSelectors(queryRunner).size();
        // Add a selector for a non leaf group
        dao.insertSelector(3, 100, "user.*", "(?i).*non-leaf.*", null, null, null, null);
        reloadingConfigurationManager.load();
        while (getSelectors(queryRunner).size() != originalSize + 1) {
            MILLISECONDS.sleep(500);
        }
        // Submit query with side effect of creating resource groups
        QueryId firstDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, firstDashboardQuery, RUNNING);
        cancelQuery(queryRunner, firstDashboardQuery);
        waitForQueryState(queryRunner, firstDashboardQuery, FAILED);
        // Submit a query to a non-leaf resource group
        QueryId invalidResourceGroupQuery = createQuery(queryRunner, session, LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, invalidResourceGroupQuery, FAILED);
        assertEquals(queryRunner.getCoordinator().getDispatchManager().getQueryInfo(invalidResourceGroupQuery).getErrorCode(), INVALID_RESOURCE_GROUP.toErrorCode());
    }

    private void assertResourceGroupWithClientTags(Set<String> clientTags, ResourceGroupId expectedResourceGroup)
            throws InterruptedException
    {
        Session session = testSessionBuilder()
                .setCatalog("tpch")
                .setSchema("sf100000")
                .setSource("client_tags")
                .setClientTags(clientTags)
                .build();
        QueryId queryId = createQuery(queryRunner, session, LONG_LASTING_QUERY);
        waitForQueryState(queryRunner, queryId, ImmutableSet.of(RUNNING, FINISHED));
        Optional<ResourceGroupId> resourceGroupId = queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(queryId).getResourceGroupId();
        assertTrue(resourceGroupId.isPresent(), "Query should have a resource group");
        assertEquals(resourceGroupId.get(), expectedResourceGroup, format("Expected: '%s' resource group, found: %s", expectedResourceGroup, resourceGroupId.get()));
    }
}