DistributedQueryRunner.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.tests;
import com.facebook.airlift.discovery.server.testing.TestingDiscoveryServer;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.jetty.JettyHttpClient;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.testing.Assertions;
import com.facebook.presto.Session;
import com.facebook.presto.Session.SessionBuilder;
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.cost.StatsCalculator;
import com.facebook.presto.execution.QueryInfo;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.metadata.AllNodes;
import com.facebook.presto.metadata.Catalog;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.NodePoolType;
import com.facebook.presto.spi.NodeState;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.split.PageSourceManager;
import com.facebook.presto.split.SplitManager;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.sql.parser.SqlParserOptions;
import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager;
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.testing.TestingAccessControlManager;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closer;
import com.google.inject.Module;
import io.airlift.units.Duration;
import org.intellij.lang.annotations.Language;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
import static com.facebook.airlift.http.client.Request.Builder.prepareGet;
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.presto.client.PrestoHeaders.PRESTO_USER;
import static com.facebook.presto.spi.NodePoolType.INTERMEDIATE;
import static com.facebook.presto.spi.NodePoolType.LEAF;
import static com.facebook.presto.testing.TestingSession.TESTING_CATALOG;
import static com.facebook.presto.testing.TestingSession.createBogusTestingCatalog;
import static com.facebook.presto.tests.AbstractTestQueries.TEST_CATALOG_PROPERTIES;
import static com.facebook.presto.tests.AbstractTestQueries.TEST_SYSTEM_PROPERTIES;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.units.Duration.nanosSince;
import static java.lang.String.format;
import static java.lang.System.nanoTime;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
public class DistributedQueryRunner
implements QueryRunner
{
private static final Logger log = Logger.get(DistributedQueryRunner.class);
private static final String ENVIRONMENT = "testing";
private static final String DEFAULT_USER = "user";
private static final SqlParserOptions DEFAULT_SQL_PARSER_OPTIONS = new SqlParserOptions();
private final TestingDiscoveryServer discoveryServer;
private final List<TestingPrestoServer> coordinators;
private final int coordinatorCount;
private final List<TestingPrestoServer> servers;
private final List<Process> externalWorkers;
private final List<Module> extraModules;
private final Closer closer = Closer.create();
private final HttpClient client = new JettyHttpClient();
private final List<TestingPrestoClient> prestoClients;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private Optional<TestingPrestoServer> catalogServer = Optional.empty();
private Optional<List<TestingPrestoServer>> resourceManagers;
private Optional<TestingPrestoServer> coordinatorSidecar = Optional.empty();
private final int resourceManagerCount;
private final AtomicReference<Handle> testFunctionNamespacesHandle = new AtomicReference<>();
@Deprecated
public DistributedQueryRunner(Session defaultSession, int nodeCount)
throws Exception
{
this(defaultSession, nodeCount, ImmutableMap.of());
}
@Deprecated
public DistributedQueryRunner(Session defaultSession, int nodeCount, Map<String, String> extraProperties)
throws Exception
{
this(
false,
false,
false,
false,
defaultSession,
nodeCount,
1,
1,
extraProperties,
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
DEFAULT_SQL_PARSER_OPTIONS,
ENVIRONMENT,
Optional.empty(),
Optional.empty(),
ImmutableList.of());
}
public static Builder builder(Session defaultSession)
{
return new Builder(defaultSession);
}
private DistributedQueryRunner(
boolean resourceManagerEnabled,
boolean catalogServerEnabled,
boolean coordinatorSidecarEnabled,
boolean skipLoadingResourceGroupConfigurationManager,
Session defaultSession,
int nodeCount,
int coordinatorCount,
int resourceManagerCount,
Map<String, String> extraProperties,
Map<String, String> coordinatorProperties,
Map<String, String> resourceManagerProperties,
Map<String, String> catalogServerProperties,
Map<String, String> coordinatorSidecarProperties,
SqlParserOptions parserOptions,
String environment,
Optional<Path> dataDirectory,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher,
List<Module> extraModules)
throws Exception
{
requireNonNull(defaultSession, "defaultSession is null");
this.extraModules = requireNonNull(extraModules, "extraModules is null");
try {
long start = nanoTime();
discoveryServer = new TestingDiscoveryServer(environment);
this.coordinatorCount = coordinatorCount;
this.resourceManagerCount = resourceManagerCount;
closer.register(() -> closeUnchecked(discoveryServer));
log.info("Created TestingDiscoveryServer in %s", nanosSince(start).convertToMostSuccinctTimeUnit());
URI discoveryUrl = discoveryServer.getBaseUrl();
log.info("Discovery URL %s", discoveryUrl);
ImmutableList.Builder<TestingPrestoServer> servers = ImmutableList.builder();
ImmutableList.Builder<TestingPrestoServer> coordinators = ImmutableList.builder();
ImmutableList.Builder<TestingPrestoServer> resourceManagers = ImmutableList.builder();
Map<String, String> extraCoordinatorProperties = new HashMap<>();
if (externalWorkerLauncher.isPresent()) {
ImmutableList.Builder<Process> externalWorkersBuilder = ImmutableList.builder();
for (int i = 0; i < nodeCount; i++) {
externalWorkersBuilder.add(externalWorkerLauncher.get().apply(i, discoveryUrl));
}
externalWorkers = externalWorkersBuilder.build();
closer.register(() -> {
for (Process worker : externalWorkers) {
worker.destroyForcibly();
}
});
// Don't use coordinator as worker
extraCoordinatorProperties.put("node-scheduler.include-coordinator", "false");
}
else {
externalWorkers = ImmutableList.of();
for (int i = (coordinatorCount + (resourceManagerEnabled ? resourceManagerCount : 0)); i < nodeCount; i++) {
// We are simply splitting the nodes into leaf and intermediate for testing purpose
NodePoolType workerPool = i % 2 == 0 ? LEAF : INTERMEDIATE;
Map<String, String> workerProperties = new HashMap<>(extraProperties);
workerProperties.put("pool-type", workerPool.name());
TestingPrestoServer worker = closer.register(createTestingPrestoServer(
discoveryUrl,
false,
resourceManagerEnabled,
false,
catalogServerEnabled,
false,
coordinatorSidecarEnabled,
false,
skipLoadingResourceGroupConfigurationManager,
workerProperties,
parserOptions,
environment,
dataDirectory,
extraModules));
servers.add(worker);
}
}
extraCoordinatorProperties.put("experimental.iterative-optimizer-enabled", "true");
extraCoordinatorProperties.putAll(extraProperties);
extraCoordinatorProperties.putAll(coordinatorProperties);
if (resourceManagerEnabled) {
for (int i = 0; i < resourceManagerCount; i++) {
Map<String, String> rmProperties = new HashMap<>(resourceManagerProperties);
if (resourceManagerProperties.get("raft.isEnabled") != null) {
int raftPort = Integer.valueOf(resourceManagerProperties.get("raft.port")) + i;
rmProperties.replace("raft.port", String.valueOf(raftPort));
}
TestingPrestoServer resourceManager = closer.register(createTestingPrestoServer(
discoveryUrl,
true,
true,
false,
false,
false,
false,
false,
skipLoadingResourceGroupConfigurationManager,
rmProperties,
parserOptions,
environment,
dataDirectory,
extraModules));
servers.add(resourceManager);
resourceManagers.add(resourceManager);
}
}
if (catalogServerEnabled) {
catalogServer = Optional.of(closer.register(createTestingPrestoServer(
discoveryUrl,
false,
false,
true,
true,
false,
false,
false,
skipLoadingResourceGroupConfigurationManager,
catalogServerProperties,
parserOptions,
environment,
dataDirectory,
extraModules)));
servers.add(catalogServer.get());
}
if (coordinatorSidecarEnabled) {
coordinatorSidecar = Optional.of(closer.register(createTestingPrestoServer(
discoveryUrl,
false,
false,
false,
false,
true,
true,
false,
skipLoadingResourceGroupConfigurationManager,
coordinatorSidecarProperties,
parserOptions,
environment,
dataDirectory,
extraModules)));
servers.add(coordinatorSidecar.get());
}
for (int i = 0; i < coordinatorCount; i++) {
TestingPrestoServer coordinator = closer.register(createTestingPrestoServer(
discoveryUrl,
false,
resourceManagerEnabled,
false,
catalogServerEnabled,
false,
false,
true,
skipLoadingResourceGroupConfigurationManager,
extraCoordinatorProperties,
parserOptions,
environment,
dataDirectory,
extraModules));
servers.add(coordinator);
coordinators.add(coordinator);
extraCoordinatorProperties.remove("http-server.http.port");
}
this.servers = servers.build();
this.coordinators = coordinators.build();
this.resourceManagers = Optional.of(resourceManagers.build());
}
catch (Exception e) {
try {
throw closer.rethrow(e, Exception.class);
}
finally {
closer.close();
}
}
// copy session using property manager in coordinator
defaultSession = defaultSession.toSessionRepresentation().toSession(coordinators.get(0).getMetadata().getSessionPropertyManager());
ImmutableList.Builder<TestingPrestoClient> prestoClientsBuilder = ImmutableList.builder();
for (int i = 0; i < coordinatorCount; i++) {
prestoClientsBuilder.add(closer.register(new TestingPrestoClient(coordinators.get(i), defaultSession)));
}
prestoClients = prestoClientsBuilder.build();
try {
waitForAllNodesGloballyVisible();
}
catch (TimeoutException e) {
closer.close();
throw e;
}
long start = nanoTime();
for (TestingPrestoServer server : servers) {
server.getMetadata().registerBuiltInFunctions(AbstractTestQueries.CUSTOM_FUNCTIONS);
}
log.info("Added functions in %s", nanosSince(start).convertToMostSuccinctTimeUnit());
for (TestingPrestoServer server : servers) {
// add bogus catalog for testing procedures and session properties
Catalog bogusTestingCatalog = createBogusTestingCatalog(TESTING_CATALOG);
server.getCatalogManager().registerCatalog(bogusTestingCatalog);
SessionPropertyManager sessionPropertyManager = server.getMetadata().getSessionPropertyManager();
sessionPropertyManager.addSystemSessionProperties(TEST_SYSTEM_PROPERTIES);
sessionPropertyManager.addConnectorSessionProperties(bogusTestingCatalog.getConnectorId(), TEST_CATALOG_PROPERTIES);
}
}
public void waitForClusterToGetReady()
throws InterruptedException
{
for (int i = 0; i < coordinators.size(); i++) {
NodeState state = NodeState.INACTIVE;
while (state != NodeState.ACTIVE) {
MILLISECONDS.sleep(10);
state = getCoordinatorInfoState(i);
}
}
int availableCoordinators = 0;
if (getResourceManager().isPresent()) {
while (availableCoordinators != coordinators.size()) {
MILLISECONDS.sleep(10);
availableCoordinators = getResourceManager().get().getNodeManager().getCoordinators().size();
}
}
}
private NodeState getCoordinatorInfoState(int coordinator)
{
URI uri = URI.create(getCoordinator(coordinator).getBaseUrl().toString() + "/v1/info/state");
Request request = prepareGet()
.setHeader(PRESTO_USER, DEFAULT_USER)
.setUri(uri)
.build();
NodeState state = client.execute(request, createJsonResponseHandler(jsonCodec(NodeState.class)));
return state;
}
public NodeState getWorkerInfoState(int worker)
{
URI uri = URI.create(getWorker(worker).getBaseUrl().toString() + "/v1/info/state");
Request request = prepareGet()
.setHeader(PRESTO_USER, DEFAULT_USER)
.setUri(uri)
.build();
NodeState state = client.execute(request, createJsonResponseHandler(jsonCodec(NodeState.class)));
return state;
}
public int sendWorkerRequest(int worker, String body)
{
try {
URL url = new URL(getWorker(worker).getBaseUrl().toString() + "/v1/info/state");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("PUT");
connection.setDoOutput(true);
connection.setRequestProperty("Content-Type", "application/json");
try (OutputStream os = connection.getOutputStream()) {
byte[] input = body.getBytes(StandardCharsets.UTF_8);
os.write(input, 0, input.length);
}
return connection.getResponseCode();
}
catch (Exception e) {
e.printStackTrace();
return 500;
}
}
private static TestingPrestoServer createTestingPrestoServer(
URI discoveryUri,
boolean resourceManager,
boolean resourceManagerEnabled,
boolean catalogServer,
boolean catalogServerEnabled,
boolean coordinatorSidecar,
boolean coordinatorSidecarEnabled,
boolean coordinator,
boolean skipLoadingResourceGroupConfigurationManager,
Map<String, String> extraProperties,
SqlParserOptions parserOptions,
String environment,
Optional<Path> dataDirectory,
List<Module> extraModules)
throws Exception
{
long start = nanoTime();
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put("query.client.timeout", "10m")
.put("exchange.http-client.idle-timeout", "1h")
.put("task.max-index-memory", "16kB") // causes index joins to fault load
.put("datasources", "system")
.put("distributed-index-joins-enabled", "true")
.put("exchange.checksum-enabled", "true");
if (coordinator) {
propertiesBuilder.put("node-scheduler.include-coordinator", extraProperties.getOrDefault("node-scheduler.include-coordinator", "true"));
propertiesBuilder.put("join-distribution-type", "PARTITIONED");
}
HashMap<String, String> properties = new HashMap<>(propertiesBuilder.build());
properties.putAll(extraProperties);
TestingPrestoServer server = new TestingPrestoServer(
resourceManager,
resourceManagerEnabled,
catalogServer,
catalogServerEnabled,
coordinatorSidecar,
coordinatorSidecarEnabled,
coordinator,
skipLoadingResourceGroupConfigurationManager,
properties,
environment,
discoveryUri,
parserOptions,
extraModules,
dataDirectory);
String nodeRole = "worker";
if (coordinator) {
nodeRole = "coordinator";
}
else if (resourceManager) {
nodeRole = "resourceManager";
}
else if (catalogServer) {
nodeRole = "catalogServer";
}
else if (coordinatorSidecar) {
nodeRole = "coordinatorSidecar";
}
log.info("Created %s TestingPrestoServer in %s: %s", nodeRole, nanosSince(start).convertToMostSuccinctTimeUnit(), server.getBaseUrl());
return server;
}
private void waitForAllNodesGloballyVisible()
throws Exception
{
long startTimeInMs = nanoTime();
int expectedActiveNodes = externalWorkers.size() + servers.size();
Duration timeout = new Duration(100, SECONDS);
for (int serverIndex = 0; serverIndex < servers.size(); ) {
TestingPrestoServer server = servers.get(serverIndex);
AllNodes allNodes = server.refreshNodes();
int activeNodeCount = allNodes.getActiveNodes().size();
if (!allNodes.getInactiveNodes().isEmpty()) {
throwTimeoutIfNotReady(
startTimeInMs,
timeout,
format("Timed out waiting for all nodes to be globally visible. Inactive nodes: %s", allNodes.getInactiveNodes()));
MILLISECONDS.sleep(10);
serverIndex = 0;
}
else if ((server.isCoordinator() || server.isResourceManager()) && activeNodeCount != expectedActiveNodes) {
throwTimeoutIfNotReady(
startTimeInMs,
timeout,
format("Timed out waiting for all nodes to be globally visible. Node count: %s, expected: %s", activeNodeCount, expectedActiveNodes));
MILLISECONDS.sleep(10);
serverIndex = 0;
}
else {
log.info("Server %s has %s active nodes", server.getBaseUrl(), activeNodeCount);
serverIndex++;
}
}
log.info("Announced servers in %s", nanosSince(startTimeInMs).convertToMostSuccinctTimeUnit());
}
private static void throwTimeoutIfNotReady(long startTimeInMs, Duration timeout, String message)
throws TimeoutException
{
if (nanosSince(startTimeInMs).compareTo(timeout) >= 0) {
throw new TimeoutException(message);
}
}
public TestingPrestoClient getRandomClient()
{
return prestoClients.get(getRandomCoordinatorIndex());
}
private int getRandomCoordinatorIndex()
{
return ThreadLocalRandom.current().nextInt(prestoClients.size());
}
@Override
public int getNodeCount()
{
return servers.size() + externalWorkers.size();
}
@Override
public Session getDefaultSession()
{
return getRandomClient().getDefaultSession();
}
@Override
public TransactionManager getTransactionManager()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getTransactionManager();
}
@Override
public Metadata getMetadata()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getMetadata();
}
@Override
public SplitManager getSplitManager()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getSplitManager();
}
@Override
public PageSourceManager getPageSourceManager()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getPageSourceManager();
}
@Override
public NodePartitioningManager getNodePartitioningManager()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getNodePartitioningManager();
}
@Override
public ConnectorPlanOptimizerManager getPlanOptimizerManager()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getPlanOptimizerManager();
}
@Override
public StatsCalculator getStatsCalculator()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getStatsCalculator();
}
@Override
public List<EventListener> getEventListeners()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getEventListeners();
}
@Override
public TestingAccessControlManager getAccessControl()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getAccessControl();
}
@Override
public PlanCheckerProviderManager getPlanCheckerProviderManager()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getPlanCheckerProviderManager();
}
@Override
public ExpressionOptimizerManager getExpressionManager()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getExpressionManager();
}
public TestingPrestoServer getCoordinator()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0);
}
public TestingPrestoServer getCoordinator(int coordinator)
{
checkState(coordinator < coordinators.size(), format("Expected coordinator index %d < %d", coordinator, coordinatorCount));
return coordinators.get(coordinator);
}
private TestingPrestoServer getWorker(int worker)
{
checkState(worker < servers.size(), format("Expected worker index %d < %d", worker, servers.size()));
return servers.get(worker);
}
public List<TestingPrestoServer> getCoordinators()
{
return coordinators;
}
public Optional<TestingPrestoServer> getResourceManager()
{
return resourceManagers.isPresent() && !resourceManagers.get().isEmpty() ? Optional.of(resourceManagers.get().get(0)) : Optional.empty();
}
public Optional<TestingPrestoServer> getCatalogServer()
{
return catalogServer;
}
public Optional<TestingPrestoServer> getCoordinatorSidecar()
{
return coordinatorSidecar;
}
public TestingPrestoServer getResourceManager(int resourceManager)
{
checkState(resourceManager < resourceManagers.get().size(), format("Expected resource manager index %d < %d", resourceManager, resourceManagerCount));
return resourceManagers.get().get(resourceManager);
}
public List<TestingPrestoServer> getResourceManagers()
{
return resourceManagers.get();
}
public List<TestingPrestoServer> getCoordinatorWorkers()
{
return getServers().stream().filter(server -> !server.isResourceManager()).collect(ImmutableList.toImmutableList());
}
public List<TestingPrestoServer> getServers()
{
return ImmutableList.copyOf(servers);
}
@Override
public void installPlugin(Plugin plugin)
{
installPlugin(plugin, false);
}
@Override
public void installCoordinatorPlugin(CoordinatorPlugin plugin)
{
installCoordinatorPlugin(plugin, false);
}
public void createCatalog(String catalogName, String connectorName)
{
createCatalog(catalogName, connectorName, ImmutableMap.of());
}
@Override
public void createCatalog(String catalogName, String connectorName, Map<String, String> properties)
{
long start = nanoTime();
Set<ConnectorId> connectorIds = new HashSet<>();
for (TestingPrestoServer server : servers) {
connectorIds.add(server.createCatalog(catalogName, connectorName, properties));
}
ConnectorId connectorId = getOnlyElement(connectorIds);
log.info("Created catalog %s (%s) in %s", catalogName, connectorId, nanosSince(start));
// wait for all nodes to announce the new catalog
start = nanoTime();
while (!isConnectorVisibleToAllNodes(connectorId)) {
Assertions.assertLessThan(nanosSince(start), new Duration(100, SECONDS), "waiting for connector " + connectorId + " to be initialized in every node");
try {
MILLISECONDS.sleep(10);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
log.info("Announced catalog %s (%s) in %s", catalogName, connectorId, nanosSince(start));
}
@Override
public void loadFunctionNamespaceManager(String functionNamespaceManagerName, String catalogName, Map<String, String> properties)
{
for (TestingPrestoServer server : servers) {
server.getMetadata().getFunctionAndTypeManager().loadFunctionNamespaceManager(functionNamespaceManagerName, catalogName, properties, server.getPluginNodeManager());
}
}
/**
* This method exists only because it is currently impossible to create a function namespace from the query engine,
* and therefore the query runner needs to be aware of the H2 handle in order to create function namespaces when
* required by the tests.
* <p>
* TODO: Remove when there is a generic way of creating function namespaces as if creating schemas.
*/
public void enableTestFunctionNamespaces(List<String> catalogNames, Map<String, String> additionalProperties)
{
enableTestFunctionNamespaces(catalogNames, additionalProperties, false);
}
public void enableTestFunctionNamespacesOnCoordinators(List<String> catalogNames, Map<String, String> additionalProperties)
{
enableTestFunctionNamespaces(catalogNames, additionalProperties, true);
}
public void createTestFunctionNamespace(String catalogName, String schemaName)
{
checkState(testFunctionNamespacesHandle.get() != null, "Test function namespaces not enabled");
testFunctionNamespacesHandle.get().execute("INSERT INTO function_namespaces SELECT ?, ?", catalogName, schemaName);
}
private boolean isConnectorVisibleToAllNodes(ConnectorId connectorId)
{
if (!externalWorkers.isEmpty()) {
return true;
}
for (TestingPrestoServer server : servers) {
server.refreshNodes();
Set<InternalNode> activeNodesWithConnector = server.getActiveNodesWithConnector(connectorId);
if (((server.isCoordinator() && server.nodeSchedulerIncludeCoordinator()) || server.isResourceManager()) && activeNodesWithConnector.size() != servers.size()) {
return false;
}
}
return true;
}
@Override
public List<QualifiedObjectName> listTables(Session session, String catalog, String schema)
{
lock.readLock().lock();
try {
return getRandomClient().listTables(session, catalog, schema);
}
finally {
lock.readLock().unlock();
}
}
@Override
public boolean tableExists(Session session, String table)
{
lock.readLock().lock();
try {
return getRandomClient().tableExists(session, table);
}
finally {
lock.readLock().unlock();
}
}
@Override
public MaterializedResult execute(@Language("SQL") String sql)
{
return execute(getRandomCoordinatorIndex(), sql);
}
@Override
public MaterializedResult execute(Session session, @Language("SQL") String sql)
{
return execute(getRandomCoordinatorIndex(), session, sql);
}
public ResultWithQueryId<MaterializedResult> executeWithQueryId(Session session, @Language("SQL") String sql)
{
return executeWithQueryId(getRandomCoordinatorIndex(), session, sql);
}
@Override
public MaterializedResultWithPlan executeWithPlan(Session session, String sql, WarningCollector warningCollector)
{
ResultWithQueryId<MaterializedResult> resultWithQueryId = executeWithQueryId(session, sql);
return new MaterializedResultWithPlan(resultWithQueryId.getResult().toTestTypes(), getQueryPlan(resultWithQueryId.getQueryId()));
}
public MaterializedResult execute(int coordinator, @Language("SQL") String sql)
{
checkArgument(coordinator >= 0 && coordinator < coordinators.size());
lock.readLock().lock();
try {
return prestoClients.get(coordinator).execute(sql).getResult();
}
finally {
lock.readLock().unlock();
}
}
public MaterializedResult execute(int coordinator, Session session, @Language("SQL") String sql)
{
checkArgument(coordinator >= 0 && coordinator < coordinators.size());
lock.readLock().lock();
try {
return prestoClients.get(coordinator).execute(session, sql).getResult();
}
finally {
lock.readLock().unlock();
}
}
public ResultWithQueryId<MaterializedResult> executeWithQueryId(int coordinator, Session session, @Language("SQL") String sql)
{
checkArgument(coordinator >= 0 && coordinator < coordinators.size());
lock.readLock().lock();
try {
return prestoClients.get(coordinator).execute(session, sql);
}
finally {
lock.readLock().unlock();
}
}
@Override
public Plan createPlan(Session session, String sql, WarningCollector warningCollector)
{
QueryId queryId = executeWithQueryId(session, sql).getQueryId();
Plan queryPlan = getQueryPlan(queryId);
checkState(coordinators.size() == 1, "Expected a single coordinator");
coordinators.get(0).getQueryManager().cancelQuery(queryId);
return queryPlan;
}
public List<BasicQueryInfo> getQueries()
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getQueryManager().getQueries();
}
public QueryInfo getQueryInfo(QueryId queryId)
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getQueryManager().getFullQueryInfo(queryId);
}
public Plan getQueryPlan(QueryId queryId)
{
checkState(coordinators.size() == 1, "Expected a single coordinator");
return coordinators.get(0).getQueryPlan(queryId);
}
@Override
public Lock getExclusiveLock()
{
return lock.writeLock();
}
@Override
public final synchronized void close()
{
cancelAllQueries();
try {
closer.close();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public void cancelAllQueries()
{
for (TestingPrestoServer coordinator : coordinators) {
QueryManager queryManager = coordinator.getQueryManager();
for (BasicQueryInfo queryInfo : queryManager.getQueries()) {
if (!queryInfo.getState().isDone()) {
queryManager.cancelQuery(queryInfo.getQueryId());
}
}
}
}
private void enableTestFunctionNamespaces(List<String> catalogNames, Map<String, String> additionalProperties, boolean coordinatorOnly)
{
checkState(testFunctionNamespacesHandle.get() == null, "Test function namespaces already enabled");
String databaseName = nanoTime() + "_" + ThreadLocalRandom.current().nextInt();
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("database-name", databaseName)
.putAll(additionalProperties)
.build();
installPlugin(new H2FunctionNamespaceManagerPlugin(), coordinatorOnly);
for (String catalogName : catalogNames) {
loadFunctionNamespaceManager("h2", catalogName, properties, coordinatorOnly);
}
Handle handle = Jdbi.open(H2ConnectionModule.getJdbcUrl(databaseName));
testFunctionNamespacesHandle.set(handle);
closer.register(handle);
}
private void loadFunctionNamespaceManager(
String functionNamespaceManagerName,
String catalogName,
Map<String, String> properties,
boolean coordinatorOnly)
{
for (TestingPrestoServer server : servers) {
if (coordinatorOnly && !server.isCoordinator()) {
continue;
}
server.getMetadata().getFunctionAndTypeManager().loadFunctionNamespaceManager(functionNamespaceManagerName, catalogName, properties, server.getPluginNodeManager());
}
}
private void installPlugin(Plugin plugin, boolean coordinatorOnly)
{
long start = nanoTime();
for (TestingPrestoServer server : servers) {
if (coordinatorOnly && !server.isCoordinator()) {
continue;
}
server.installPlugin(plugin);
}
log.info("Installed plugin %s in %s", plugin.getClass().getSimpleName(), nanosSince(start).convertToMostSuccinctTimeUnit());
}
private void installCoordinatorPlugin(CoordinatorPlugin plugin, boolean coordinatorOnly)
{
long start = nanoTime();
for (TestingPrestoServer server : servers) {
if (coordinatorOnly && !server.isCoordinator()) {
continue;
}
server.installCoordinatorPlugin(plugin);
}
log.info("Installed plugin %s in %s", plugin.getClass().getSimpleName(), nanosSince(start).convertToMostSuccinctTimeUnit());
}
@Override
public void loadSessionPropertyProvider(String sessionPropertyProviderName, Map<String, String> properties)
{
for (TestingPrestoServer server : servers) {
server.getMetadata().getSessionPropertyManager().loadSessionPropertyProvider(
sessionPropertyProviderName,
properties,
Optional.ofNullable(server.getMetadata().getFunctionAndTypeManager()),
Optional.ofNullable(server.getPluginNodeManager()));
}
}
@Override
public void loadTypeManager(String typeManagerName)
{
for (TestingPrestoServer server : servers) {
server.getMetadata().getFunctionAndTypeManager().loadTypeManager(typeManagerName);
}
}
@Override
public void loadPlanCheckerProviderManager(String planCheckerProviderName, Map<String, String> properties)
{
for (TestingPrestoServer server : servers) {
server.getPlanCheckerProviderManager().loadPlanCheckerProvider(planCheckerProviderName, properties, server.getPluginNodeManager());
}
}
private static void closeUnchecked(AutoCloseable closeable)
{
try {
closeable.close();
}
catch (Exception e) {
throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
public static class Builder
{
private Session defaultSession;
private int nodeCount = 4;
private int coordinatorCount = 1;
private Map<String, String> extraProperties = ImmutableMap.of();
private Map<String, String> coordinatorProperties = ImmutableMap.of();
private Map<String, String> resourceManagerProperties = ImmutableMap.of();
private Map<String, String> catalogServerProperties = ImmutableMap.of();
private Map<String, String> coordinatorSidecarProperties = ImmutableMap.of();
private SqlParserOptions parserOptions = DEFAULT_SQL_PARSER_OPTIONS;
private String environment = ENVIRONMENT;
private Optional<Path> dataDirectory = Optional.empty();
private Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher = Optional.empty();
private boolean resourceManagerEnabled;
private boolean catalogServerEnabled;
private boolean coordinatorSidecarEnabled;
private boolean skipLoadingResourceGroupConfigurationManager;
private List<Module> extraModules = ImmutableList.of();
private int resourceManagerCount = 1;
protected Builder(Session defaultSession)
{
this.defaultSession = requireNonNull(defaultSession, "defaultSession is null");
}
public Builder amendSession(Function<SessionBuilder, SessionBuilder> amendSession)
{
SessionBuilder builder = Session.builder(defaultSession);
this.defaultSession = amendSession.apply(builder).build();
return this;
}
public Builder setNodeCount(int nodeCount)
{
this.nodeCount = nodeCount;
return this;
}
public Builder setCoordinatorCount(int coordinatorCount)
{
this.coordinatorCount = coordinatorCount;
return this;
}
public Builder setExtraProperties(Map<String, String> extraProperties)
{
this.extraProperties = extraProperties;
return this;
}
/**
* Sets extra properties being equal to a map containing given key and value.
* Note, that calling this method OVERWRITES previously set property values.
* As a result, it should only be used when only one extra property needs to be set.
*/
public Builder setSingleExtraProperty(String key, String value)
{
return setExtraProperties(ImmutableMap.of(key, value));
}
public Builder setCoordinatorProperties(Map<String, String> coordinatorProperties)
{
this.coordinatorProperties = coordinatorProperties;
return this;
}
public Builder setResourceManagerProperties(Map<String, String> resourceManagerProperties)
{
this.resourceManagerProperties = resourceManagerProperties;
return this;
}
public Builder setCatalogServerProperties(Map<String, String> catalogServerProperties)
{
this.catalogServerProperties = catalogServerProperties;
return this;
}
public Builder setCoordinatorSidecarProperties(Map<String, String> coordinatorSidecarProperties)
{
this.coordinatorSidecarProperties = coordinatorSidecarProperties;
return this;
}
/**
* Sets coordinator properties being equal to a map containing given key and value.
* Note, that calling this method OVERWRITES previously set property values.
* As a result, it should only be used when only one coordinator property needs to be set.
*/
public Builder setSingleCoordinatorProperty(String key, String value)
{
return setCoordinatorProperties(ImmutableMap.of(key, value));
}
public Builder setParserOptions(SqlParserOptions parserOptions)
{
this.parserOptions = parserOptions;
return this;
}
public Builder setEnvironment(String environment)
{
this.environment = environment;
return this;
}
public Builder setDataDirectory(Optional<Path> dataDirectory)
{
this.dataDirectory = requireNonNull(dataDirectory, "dataDirectory is null");
return this;
}
public Builder setExternalWorkerLauncher(Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher)
{
this.externalWorkerLauncher = requireNonNull(externalWorkerLauncher, "externalWorkerLauncher is null");
return this;
}
public Builder setResourceManagerEnabled(boolean resourceManagerEnabled)
{
this.resourceManagerEnabled = resourceManagerEnabled;
return this;
}
public Builder setCatalogServerEnabled(boolean catalogServerEnabled)
{
this.catalogServerEnabled = catalogServerEnabled;
return this;
}
public Builder setCoordinatorSidecarEnabled(boolean coordinatorSidecarEnabled)
{
this.coordinatorSidecarEnabled = coordinatorSidecarEnabled;
return this;
}
public Builder setExtraModules(List<Module> extraModules)
{
this.extraModules = extraModules;
return this;
}
public Builder setResourceManagerCount(int resourceManagerCount)
{
this.resourceManagerCount = resourceManagerCount;
return this;
}
public Builder setSkipLoadingResourceGroupConfigurationManager(boolean skipLoadingResourceGroupConfigurationManager)
{
this.skipLoadingResourceGroupConfigurationManager = skipLoadingResourceGroupConfigurationManager;
return this;
}
public DistributedQueryRunner build()
throws Exception
{
return new DistributedQueryRunner(
resourceManagerEnabled,
catalogServerEnabled,
coordinatorSidecarEnabled,
skipLoadingResourceGroupConfigurationManager,
defaultSession,
nodeCount,
coordinatorCount,
resourceManagerCount,
extraProperties,
coordinatorProperties,
resourceManagerProperties,
catalogServerProperties,
coordinatorSidecarProperties,
parserOptions,
environment,
dataDirectory,
externalWorkerLauncher,
extraModules);
}
}
}