TestQueryTaskLimit.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.tests;
import com.facebook.presto.Session;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.QUEUED;
import static com.facebook.presto.execution.QueryState.RUNNING;
import static com.facebook.presto.execution.TestQueryRunnerUtil.createQuery;
import static com.facebook.presto.execution.TestQueryRunnerUtil.waitForQueryState;
import static com.facebook.presto.execution.TestQueues.newSession;
import static com.facebook.presto.spi.StandardErrorCode.CLUSTER_HAS_TOO_MANY_RUNNING_TASKS;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Test(singleThreaded = true)
public class TestQueryTaskLimit
{
public static final String LONG_LASTING_QUERY = "SELECT COUNT(*) FROM lineitem";
private ExecutorService executor;
private Session defaultSession;
@BeforeClass
public void setUp()
{
executor = newCachedThreadPool();
defaultSession = testSessionBuilder()
.setCatalog("tpch")
.setSchema("sf1000")
.build();
}
@AfterClass(alwaysRun = true)
public void shutdown()
throws Exception
{
executor.shutdownNow();
assertTrue(executor.awaitTermination(10, SECONDS));
executor = null;
}
@Test(timeOut = 90_000, expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*Query killed because the cluster is overloaded with too many tasks.*")
public void testExceedTaskLimit()
throws Exception
{
ImmutableMap<String, String> extraProperties = ImmutableMap.<String, String>builder()
.put("max-total-running-task-count-to-kill-query", "4")
.put("max-query-running-task-count", "4")
.build();
try (DistributedQueryRunner queryRunner = createQueryRunner(defaultSession, extraProperties)) {
Future<?> query = executor.submit(() -> queryRunner.execute("SELECT COUNT(*), clerk FROM orders GROUP BY clerk"));
waitForQueryToBeKilled(queryRunner);
query.get();
}
}
@Test(timeOut = 90_000)
public void testQueuingWhenTaskLimitExceeds()
throws Exception
{
ImmutableMap<String, String> extraProperties = ImmutableMap.<String, String>builder()
.put("experimental.spill-enabled", "false")
.put("experimental.max-total-running-task-count-to-not-execute-new-query", "2")
.build();
try (DistributedQueryRunner queryRunner = createQueryRunner(defaultSession, extraProperties)) {
QueryId firstQuery = createQuery(queryRunner, newSession("test", ImmutableSet.of(), null),
LONG_LASTING_QUERY);
waitForQueryState(queryRunner, firstQuery, RUNNING);
// wait for the first query to schedule more than 2 tasks, so exceed resource group manager's total task limit
confirmQueryScheduledTasksGreaterThan(queryRunner, firstQuery, 2);
QueryId secondQuery = createQuery(queryRunner, newSession("test", ImmutableSet.of(), null), LONG_LASTING_QUERY);
// When current running tasks exceeded limit, the following query would be queued
waitForQueryState(queryRunner, secondQuery, QUEUED);
// Cancel the first query
queryRunner.getCoordinator().getDispatchManager().cancelQuery(firstQuery);
waitForQueryState(queryRunner, firstQuery, FAILED);
// When first query is cancelled, the second query would be pass to run
waitForQueryState(queryRunner, secondQuery, RUNNING);
}
}
public static DistributedQueryRunner createQueryRunner(Session session, Map<String, String> properties)
throws Exception
{
DistributedQueryRunner queryRunner = new DistributedQueryRunner(session, 2, properties);
try {
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
return queryRunner;
}
catch (Exception e) {
queryRunner.close();
throw e;
}
}
private void waitForQueryToBeKilled(DistributedQueryRunner queryRunner)
throws InterruptedException
{
while (true) {
for (BasicQueryInfo info : queryRunner.getCoordinator().getQueryManager().getQueries()) {
if (info.getState().isDone()) {
assertNotNull(info.getErrorCode());
assertEquals(info.getErrorCode().getCode(), CLUSTER_HAS_TOO_MANY_RUNNING_TASKS.toErrorCode().getCode());
MILLISECONDS.sleep(100);
return;
}
}
MILLISECONDS.sleep(10);
}
}
private void confirmQueryScheduledTasksGreaterThan(DistributedQueryRunner queryRunner, QueryId queryId, int minTaskCount)
throws InterruptedException
{
while (true) {
BasicQueryInfo info = queryRunner.getCoordinator().getDispatchManager().getQueryInfo(queryId);
if (info.getQueryStats().getRunningTasks() > minTaskCount) {
// still wait for a while to ensure resource group manager to refresh
MILLISECONDS.sleep(10);
break;
}
MILLISECONDS.sleep(100);
}
}
}