H2TestUtil.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.resourceGroups.db;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.Session;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.resourceGroups.ResourceGroupSelector;
import com.facebook.presto.resourceGroups.db.DbResourceGroupConfig;
import com.facebook.presto.resourceGroups.db.H2DaoProvider;
import com.facebook.presto.resourceGroups.db.H2ResourceGroupsDao;
import com.facebook.presto.resourceGroups.reloading.ReloadingResourceGroupConfigurationManager;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.resourceGroups.SchedulingPolicy;
import com.facebook.presto.spi.security.Identity;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import static com.facebook.airlift.json.JsonCodec.listJsonCodec;
import static com.facebook.presto.common.resourceGroups.QueryType.EXPLAIN;
import static com.facebook.presto.execution.QueryState.RUNNING;
import static com.facebook.presto.execution.QueryState.TERMINAL_QUERY_STATES;
import static com.facebook.presto.spi.StandardErrorCode.CONFIGURATION_INVALID;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class H2TestUtil
{
private static final String CONFIGURATION_MANAGER_TYPE = "h2";
public static final String TEST_ENVIRONMENT = "test_environment";
public static final String TEST_ENVIRONMENT_2 = "test_environment_2";
public static final JsonCodec<List<String>> CLIENT_TAGS_CODEC = listJsonCodec(String.class);
private H2TestUtil() {}
public static Session adhocSession()
{
return testSessionBuilder()
.setCatalog("tpch")
.setSchema("sf100000")
.setSource("adhoc")
.build();
}
public static Session testSession(Identity identity)
{
return testSessionBuilder()
.setCatalog("tpch")
.setSchema("sf100000")
.setSource("abc")
.setIdentity(identity)
.build();
}
public static Session dashboardSession()
{
return testSessionBuilder()
.setCatalog("tpch")
.setSchema("sf100000")
.setSource("dashboard")
.build();
}
public static Session rejectingSession()
{
return testSessionBuilder()
.setCatalog("tpch")
.setSchema("sf100000")
.setSource("reject")
.build();
}
public static void waitForCompleteQueryCount(DistributedQueryRunner queryRunner, int expectedCount)
throws InterruptedException
{
waitForQueryCount(queryRunner, TERMINAL_QUERY_STATES, expectedCount);
}
public static void waitForRunningQueryCount(DistributedQueryRunner queryRunner, int expectedCount)
throws InterruptedException
{
waitForQueryCount(queryRunner, ImmutableSet.of(RUNNING), expectedCount);
}
public static void waitForQueryCount(DistributedQueryRunner queryRunner, Set<QueryState> countingStates, int expectedCount)
throws InterruptedException
{
QueryManager queryManager = queryRunner.getCoordinator().getQueryManager();
while (queryManager.getQueries().stream()
.filter(q -> countingStates.contains(q.getState())).count() != expectedCount) {
MILLISECONDS.sleep(500);
}
}
public static String getDbConfigUrl()
{
return "jdbc:h2:mem:test_" + System.nanoTime() + "_" + ThreadLocalRandom.current().nextInt();
}
public static H2ResourceGroupsDao getDao(String url)
{
DbResourceGroupConfig dbResourceGroupConfig = new DbResourceGroupConfig()
.setConfigDbUrl(url);
H2ResourceGroupsDao dao = new H2DaoProvider(dbResourceGroupConfig).get();
dao.createResourceGroupsTable();
dao.createSelectorsTable();
dao.createResourceGroupsGlobalPropertiesTable();
return dao;
}
public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2ResourceGroupsDao dao)
throws Exception
{
return createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, ImmutableMap.of(), 1, false, false);
}
public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2ResourceGroupsDao dao, int coordinatorCount)
throws Exception
{
return createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, ImmutableMap.of(), coordinatorCount, false, false);
}
public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2ResourceGroupsDao dao, Map<String, String> coordinatorProperties, int coordinatorCount)
throws Exception
{
return createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, coordinatorProperties, coordinatorCount, false, false);
}
public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2ResourceGroupsDao dao, Map<String, String> coordinatorProperties, int coordinatorCount, boolean weightedFairSchedulingEnabled, boolean weightedSchedulingEnabled)
throws Exception
{
return createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, coordinatorProperties, coordinatorCount, weightedFairSchedulingEnabled, weightedSchedulingEnabled);
}
public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2ResourceGroupsDao dao, String environment, Map<String, String> coordinatorProperties, int coordinatorCount, boolean weightedFairSchedulingEnabled, boolean weightedSchedulingEnabled)
throws Exception
{
DistributedQueryRunner queryRunner = DistributedQueryRunner
.builder(testSessionBuilder().setCatalog("tpch").setSchema("tiny").build())
.setNodeCount(2)
.setCoordinatorCount(coordinatorCount)
.setEnvironment(environment)
.setResourceManagerEnabled(true)
.setCoordinatorProperties(coordinatorProperties)
.build();
try {
Plugin h2ResourceGroupManagerPlugin = new H2ResourceGroupManagerPlugin();
queryRunner.installPlugin(h2ResourceGroupManagerPlugin);
for (int coordinator = 0; coordinator < coordinatorCount; coordinator++) {
queryRunner.getCoordinator(coordinator).getResourceGroupManager().get()
.forceSetConfigurationManager(CONFIGURATION_MANAGER_TYPE, ImmutableMap.of("resource-groups.config-db-url", dbConfigUrl, "node.environment", environment));
}
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
setup(queryRunner, dao, environment, weightedFairSchedulingEnabled, weightedSchedulingEnabled);
queryRunner.waitForClusterToGetReady();
return queryRunner;
}
catch (Exception e) {
queryRunner.close();
throw e;
}
}
public static DistributedQueryRunner getSimpleQueryRunner()
throws Exception
{
String dbConfigUrl = getDbConfigUrl();
H2ResourceGroupsDao dao = getDao(dbConfigUrl);
return createQueryRunner(dbConfigUrl, dao);
}
private static void resourceGroupSetup(H2ResourceGroupsDao dao)
{
dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h");
dao.insertResourceGroup(1, "global", "1MB", 100, 1000, 1000, null, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT);
dao.insertResourceGroup(2, "bi-${USER}", "1MB", 3, 2, 2, null, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(3, "user-${USER}", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(4, "adhoc-${USER}", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
dao.insertResourceGroup(5, "dashboard-${USER}", "1MB", 1, 1, 1, null, null, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
dao.insertResourceGroup(6, "no-queueing", "1MB", 0, 1, 1, null, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT_2);
dao.insertResourceGroup(7, "explain", "1MB", 0, 1, 1, null, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT);
dao.insertResourceGroup(8, "test", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(9, "test-${USER}", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, null, 8L, TEST_ENVIRONMENT);
dao.insertSelector(2, 10_000, "user.*", "test", null, null, null, null);
dao.insertSelector(4, 1_000, "user.*", "(?i).*adhoc.*", null, null, null, null);
dao.insertSelector(5, 100, "user.*", "(?i).*dashboard.*", null, null, null, null);
dao.insertSelector(4, 10, "user.*", null, null, CLIENT_TAGS_CODEC.toJson(ImmutableList.of("tag1", "tag2")), null, null);
dao.insertSelector(2, 1, "user.*", null, null, CLIENT_TAGS_CODEC.toJson(ImmutableList.of("tag1")), null, null);
dao.insertSelector(6, 6, ".*", ".*", null, null, null, null);
dao.insertSelector(7, 100_000, null, null, EXPLAIN.name(), null, null, null);
dao.insertSelector(9, 10_000, "user.*", "abc", null, null, null, null);
}
private static void resourceGroupSetupWithWeightedFairPolicy(H2ResourceGroupsDao dao)
{
dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h");
dao.insertResourceGroup(1, "global", "1MB", 100, 1000, 1000, null, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT);
dao.insertResourceGroup(2, "bi-${USER}", "1MB", 3, 2, 2, null, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(3, "user-${USER}", "1MB", 3, 4, 4, SchedulingPolicy.WEIGHTED_FAIR.toString(), null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(4, "adhoc-${USER}", "1MB", 3, 3, 3, null, 1000, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
dao.insertResourceGroup(5, "dashboard-${USER}", "1MB", 1, 2, 2, null, 10, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
dao.insertResourceGroup(6, "no-queueing", "1MB", 0, 1, 1, null, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT_2);
dao.insertResourceGroup(7, "explain", "1MB", 0, 1, 1, null, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT);
dao.insertResourceGroup(8, "test", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(9, "test-${USER}", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, null, 8L, TEST_ENVIRONMENT);
dao.insertSelector(2, 10_000, "user.*", "test", null, null, null, null);
dao.insertSelector(4, 1_000, "user.*", "(?i).*adhoc.*", null, null, null, null);
dao.insertSelector(5, 100, "user.*", "(?i).*dashboard.*", null, null, null, null);
dao.insertSelector(4, 10, "user.*", null, null, CLIENT_TAGS_CODEC.toJson(ImmutableList.of("tag1", "tag2")), null, null);
dao.insertSelector(2, 1, "user.*", null, null, CLIENT_TAGS_CODEC.toJson(ImmutableList.of("tag1")), null, null);
dao.insertSelector(6, 6, ".*", ".*", null, null, null, null);
dao.insertSelector(7, 100_000, null, null, EXPLAIN.name(), null, null, null);
dao.insertSelector(9, 10_000, "user.*", "abc", null, null, null, null);
}
private static void resourceGroupSetupWithWeightedPolicy(H2ResourceGroupsDao dao)
{
dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h");
dao.insertResourceGroup(1, "global", "1MB", 100, 1000, 1000, null, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT);
dao.insertResourceGroup(2, "bi-${USER}", "1MB", 3, 2, 2, null, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(3, "user-${USER}", "1MB", 3, 4, 4, SchedulingPolicy.WEIGHTED.toString(), null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(4, "adhoc-${USER}", "1MB", 3, 3, 3, null, 10, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
dao.insertResourceGroup(5, "dashboard-${USER}", "1MB", 1, 1, 2, null, 1000, null, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
dao.insertResourceGroup(6, "no-queueing", "1MB", 0, 1, 1, null, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT_2);
dao.insertResourceGroup(7, "explain", "1MB", 0, 1, 1, null, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT);
dao.insertResourceGroup(8, "test", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(9, "test-${USER}", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, null, 8L, TEST_ENVIRONMENT);
dao.insertSelector(2, 10_000, "user.*", "test", null, null, null, null);
dao.insertSelector(4, 1_000, "user.*", "(?i).*adhoc.*", null, null, null, null);
dao.insertSelector(5, 100, "user.*", "(?i).*dashboard.*", null, null, null, null);
dao.insertSelector(4, 10, "user.*", null, null, CLIENT_TAGS_CODEC.toJson(ImmutableList.of("tag1", "tag2")), null, null);
dao.insertSelector(2, 1, "user.*", null, null, CLIENT_TAGS_CODEC.toJson(ImmutableList.of("tag1")), null, null);
dao.insertSelector(6, 6, ".*", ".*", null, null, null, null);
dao.insertSelector(7, 100_000, null, null, EXPLAIN.name(), null, null, null);
dao.insertSelector(9, 10_000, "user.*", "abc", null, null, null, null);
}
private static void setup(DistributedQueryRunner queryRunner, H2ResourceGroupsDao dao, String environment, boolean weightedFairSchedulingEnabled, boolean weightedSchedulingEnabled)
throws InterruptedException
{
if (weightedFairSchedulingEnabled) {
resourceGroupSetupWithWeightedFairPolicy(dao);
}
else if (weightedSchedulingEnabled) {
resourceGroupSetupWithWeightedPolicy(dao);
}
else {
resourceGroupSetup(dao);
}
int expectedSelectors = 7;
if (environment.equals(TEST_ENVIRONMENT_2)) {
expectedSelectors = 1;
}
// Selectors are loaded last
for (int coordinator = 0; coordinator < queryRunner.getCoordinators().size(); coordinator++) {
while (getSelectors(queryRunner, coordinator).size() != expectedSelectors) {
MILLISECONDS.sleep(500);
}
}
}
public static List<ResourceGroupSelector> getSelectors(DistributedQueryRunner queryRunner)
{
checkState(queryRunner.getCoordinators().size() == 1, "Expected a single coordinator");
return getSelectors(queryRunner, 0);
}
public static List<ResourceGroupSelector> getSelectors(DistributedQueryRunner queryRunner, int coordinator)
{
try {
return ((ReloadingResourceGroupConfigurationManager) queryRunner.getCoordinator(coordinator).getResourceGroupManager().get().getConfigurationManager()).getSelectors();
}
catch (PrestoException e) {
if (e.getErrorCode() == CONFIGURATION_INVALID.toErrorCode()) {
return ImmutableList.of();
}
throw e;
}
}
}