TestingPrestoServer.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server.testing;
import com.facebook.airlift.bootstrap.Bootstrap;
import com.facebook.airlift.bootstrap.LifeCycleManager;
import com.facebook.airlift.discovery.client.Announcer;
import com.facebook.airlift.discovery.client.DiscoveryModule;
import com.facebook.airlift.discovery.client.ServiceAnnouncement;
import com.facebook.airlift.discovery.client.ServiceSelectorManager;
import com.facebook.airlift.discovery.client.testing.TestingDiscoveryModule;
import com.facebook.airlift.event.client.EventModule;
import com.facebook.airlift.http.server.TheServlet;
import com.facebook.airlift.http.server.testing.TestingHttpServer;
import com.facebook.airlift.http.server.testing.TestingHttpServerModule;
import com.facebook.airlift.jaxrs.JaxrsModule;
import com.facebook.airlift.jmx.testing.TestingJmxModule;
import com.facebook.airlift.json.JsonModule;
import com.facebook.airlift.json.smile.SmileModule;
import com.facebook.airlift.node.testing.TestingNodeModule;
import com.facebook.airlift.tracetoken.TraceTokenModule;
import com.facebook.drift.server.DriftServer;
import com.facebook.drift.transport.netty.server.DriftNettyServerTransport;
import com.facebook.presto.ClientRequestFilterManager;
import com.facebook.presto.ClientRequestFilterModule;
import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.cost.StatsCalculator;
import com.facebook.presto.dispatcher.DispatchManager;
import com.facebook.presto.dispatcher.QueryPrerequisitesManagerModule;
import com.facebook.presto.eventlistener.EventListenerConfig;
import com.facebook.presto.eventlistener.EventListenerManager;
import com.facebook.presto.execution.QueryInfo;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.SqlQueryManager;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.TaskManager;
import com.facebook.presto.execution.resourceGroups.InternalResourceGroupManager;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.memory.ClusterMemoryManager;
import com.facebook.presto.memory.LocalMemoryManager;
import com.facebook.presto.metadata.AllNodes;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.nodeManager.PluginNodeManager;
import com.facebook.presto.resourcemanager.ResourceManagerClusterStateProvider;
import com.facebook.presto.security.AccessControlManager;
import com.facebook.presto.server.GracefulShutdownHandler;
import com.facebook.presto.server.PluginManager;
import com.facebook.presto.server.ServerInfoResource;
import com.facebook.presto.server.ServerMainModule;
import com.facebook.presto.server.ShutdownAction;
import com.facebook.presto.server.security.ServerSecurityModule;
import com.facebook.presto.spi.ClientRequestFilterFactory;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.memory.ClusterMemoryPoolManager;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.split.PageSourceManager;
import com.facebook.presto.split.SplitManager;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.parser.SqlParserOptions;
import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager;
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
import com.facebook.presto.storage.TempStorageManager;
import com.facebook.presto.testing.ProcedureTester;
import com.facebook.presto.testing.TestingAccessControlManager;
import com.facebook.presto.testing.TestingEventListenerManager;
import com.facebook.presto.testing.TestingTempStorageManager;
import com.facebook.presto.testing.TestingWarningCollectorModule;
import com.facebook.presto.transaction.TransactionManager;
import com.facebook.presto.ttl.clusterttlprovidermanagers.ClusterTtlProviderManagerModule;
import com.facebook.presto.ttl.nodettlfetchermanagers.NodeTtlFetcherManagerModule;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Scopes;
import org.weakref.jmx.guice.MBeanModule;
import javax.annotation.concurrent.GuardedBy;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import static com.facebook.airlift.configuration.ConditionalModule.installModuleIf;
import static com.facebook.airlift.discovery.client.ServiceAnnouncement.serviceAnnouncement;
import static com.facebook.airlift.json.JsonBinder.jsonBinder;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static java.lang.Integer.parseInt;
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.Files.isDirectory;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class TestingPrestoServer
implements Closeable
{
private final Injector injector;
private final Path dataDirectory;
private final boolean preserveData;
private final LifeCycleManager lifeCycleManager;
private final PluginManager pluginManager;
private final ConnectorManager connectorManager;
private final TestingHttpServer server;
private final CatalogManager catalogManager;
private final TransactionManager transactionManager;
private final SqlParser sqlParser;
private final Metadata metadata;
private final StatsCalculator statsCalculator;
private final TestingEventListenerManager eventListenerManager;
private final TestingAccessControlManager accessControl;
private final ProcedureTester procedureTester;
private final Optional<InternalResourceGroupManager<?>> resourceGroupManager;
private final SplitManager splitManager;
private final PageSourceManager pageSourceManager;
private final NodePartitioningManager nodePartitioningManager;
private final ConnectorPlanOptimizerManager planOptimizerManager;
private final ClusterMemoryPoolManager clusterMemoryManager;
private final LocalMemoryManager localMemoryManager;
private final InternalNodeManager nodeManager;
private final ServiceSelectorManager serviceSelectorManager;
private final Announcer announcer;
private final DispatchManager dispatchManager;
private final QueryManager queryManager;
private final TaskManager taskManager;
private final GracefulShutdownHandler gracefulShutdownHandler;
private final ShutdownAction shutdownAction;
private final ExpressionOptimizerManager expressionManager;
private final RequestBlocker requestBlocker;
private final boolean resourceManager;
private final boolean catalogServer;
private final boolean coordinatorSidecar;
private final boolean coordinator;
private final boolean nodeSchedulerIncludeCoordinator;
private final ServerInfoResource serverInfoResource;
private final ResourceManagerClusterStateProvider clusterStateProvider;
private final PlanCheckerProviderManager planCheckerProviderManager;
private final NodeManager pluginNodeManager;
private final ClientRequestFilterManager clientRequestFilterManager;
public static class TestShutdownAction
implements ShutdownAction
{
private final CountDownLatch shutdownCalled = new CountDownLatch(1);
@GuardedBy("this")
private boolean isShutdown;
@Override
public synchronized void onShutdown()
{
isShutdown = true;
shutdownCalled.countDown();
}
public void waitForShutdownComplete(long millis)
throws InterruptedException
{
shutdownCalled.await(millis, MILLISECONDS);
}
public synchronized boolean isShutdown()
{
return isShutdown;
}
}
public TestingPrestoServer()
throws Exception
{
this(ImmutableList.of());
}
public TestingPrestoServer(List<Module> additionalModules)
throws Exception
{
this(true, ImmutableMap.of(), null, null, new SqlParserOptions(), additionalModules);
}
public TestingPrestoServer(
boolean coordinator,
Map<String, String> properties,
String environment,
URI discoveryUri,
SqlParserOptions parserOptions,
List<Module> additionalModules)
throws Exception
{
this(coordinator, properties, environment, discoveryUri, parserOptions, additionalModules, Optional.empty());
}
public TestingPrestoServer(
boolean coordinator,
Map<String, String> properties,
String environment,
URI discoveryUri,
SqlParserOptions parserOptions,
List<Module> additionalModules,
Optional<Path> dataDirectory)
throws Exception
{
this(
false,
false,
false,
false,
false,
false,
coordinator,
false,
properties,
environment,
discoveryUri,
parserOptions,
additionalModules,
dataDirectory);
}
public TestingPrestoServer(
boolean resourceManager,
boolean resourceManagerEnabled,
boolean catalogServer,
boolean catalogServerEnabled,
boolean coordinatorSidecar,
boolean coordinatorSidecarEnabled,
boolean coordinator,
boolean skipLoadingResourceGroupConfigurationManager,
Map<String, String> properties,
String environment,
URI discoveryUri,
SqlParserOptions parserOptions,
List<Module> additionalModules,
Optional<Path> dataDirectory)
throws Exception
{
this.resourceManager = resourceManager;
this.catalogServer = catalogServer;
this.coordinatorSidecar = coordinatorSidecar;
this.coordinator = coordinator;
this.dataDirectory = dataDirectory.orElseGet(TestingPrestoServer::tempDirectory);
this.preserveData = dataDirectory.isPresent();
properties = new HashMap<>(properties);
this.nodeSchedulerIncludeCoordinator = (properties.getOrDefault("node-scheduler.include-coordinator", "true")).equals("true");
String coordinatorPort = properties.remove("http-server.http.port");
if (coordinatorPort == null) {
coordinatorPort = "0";
}
Map<String, String> serverProperties = getServerProperties(resourceManagerEnabled, catalogServerEnabled, coordinatorSidecarEnabled, properties, environment, discoveryUri);
ImmutableList.Builder<Module> modules = ImmutableList.<Module>builder()
.add(new TestingNodeModule(Optional.ofNullable(environment)))
.add(new TestingHttpServerModule(parseInt(coordinator ? coordinatorPort : "0")))
.add(new JsonModule())
.add(installModuleIf(
FeaturesConfig.class,
FeaturesConfig::isJsonSerdeCodeGenerationEnabled,
binder -> jsonBinder(binder).addModuleBinding().to(AfterburnerModule.class)))
.add(new SmileModule())
.add(new JaxrsModule(true))
.add(new MBeanModule())
.add(new TestingJmxModule())
.add(new EventModule())
.add(new TraceTokenModule())
.add(new ServerSecurityModule())
.add(new ServerMainModule(parserOptions))
.add(new TestingWarningCollectorModule())
.add(new QueryPrerequisitesManagerModule())
.add(new NodeTtlFetcherManagerModule())
.add(new ClusterTtlProviderManagerModule())
.add(new ClientRequestFilterModule())
.add(binder -> {
binder.bind(TestingAccessControlManager.class).in(Scopes.SINGLETON);
binder.bind(TestingEventListenerManager.class).in(Scopes.SINGLETON);
binder.bind(TestingTempStorageManager.class).in(Scopes.SINGLETON);
binder.bind(AccessControlManager.class).to(TestingAccessControlManager.class).in(Scopes.SINGLETON);
binder.bind(EventListenerManager.class).to(TestingEventListenerManager.class).in(Scopes.SINGLETON);
binder.bind(EventListenerConfig.class).in(Scopes.SINGLETON);
binder.bind(TempStorageManager.class).to(TestingTempStorageManager.class).in(Scopes.SINGLETON);
binder.bind(AccessControl.class).to(AccessControlManager.class).in(Scopes.SINGLETON);
binder.bind(ShutdownAction.class).to(TestShutdownAction.class).in(Scopes.SINGLETON);
binder.bind(GracefulShutdownHandler.class).in(Scopes.SINGLETON);
binder.bind(ProcedureTester.class).in(Scopes.SINGLETON);
binder.bind(RequestBlocker.class).in(Scopes.SINGLETON);
newSetBinder(binder, Filter.class, TheServlet.class).addBinding()
.to(RequestBlocker.class).in(Scopes.SINGLETON);
});
if (discoveryUri != null) {
requireNonNull(environment, "environment required when discoveryUri is present");
modules.add(new DiscoveryModule());
}
else {
modules.add(new TestingDiscoveryModule());
}
modules.addAll(additionalModules);
Bootstrap app = new Bootstrap(modules.build());
Map<String, String> optionalProperties = new HashMap<>();
if (environment != null) {
optionalProperties.put("node.environment", environment);
}
injector = app
.doNotInitializeLogging()
.setRequiredConfigurationProperties(serverProperties)
.setOptionalConfigurationProperties(optionalProperties)
.quiet()
.initialize();
injector.getInstance(Announcer.class).start();
lifeCycleManager = injector.getInstance(LifeCycleManager.class);
pluginManager = injector.getInstance(PluginManager.class);
connectorManager = injector.getInstance(ConnectorManager.class);
server = injector.getInstance(TestingHttpServer.class);
catalogManager = injector.getInstance(CatalogManager.class);
transactionManager = injector.getInstance(TransactionManager.class);
sqlParser = injector.getInstance(SqlParser.class);
metadata = injector.getInstance(Metadata.class);
accessControl = injector.getInstance(TestingAccessControlManager.class);
procedureTester = injector.getInstance(ProcedureTester.class);
splitManager = injector.getInstance(SplitManager.class);
pageSourceManager = injector.getInstance(PageSourceManager.class);
expressionManager = injector.getInstance(ExpressionOptimizerManager.class);
if (coordinator) {
dispatchManager = injector.getInstance(DispatchManager.class);
queryManager = injector.getInstance(QueryManager.class);
ResourceGroupManager<?> resourceGroupManager = injector.getInstance(ResourceGroupManager.class);
this.resourceGroupManager = resourceGroupManager instanceof InternalResourceGroupManager
? Optional.of((InternalResourceGroupManager<?>) resourceGroupManager)
: Optional.empty();
if (!skipLoadingResourceGroupConfigurationManager) {
resourceGroupManager.loadConfigurationManager();
}
nodePartitioningManager = injector.getInstance(NodePartitioningManager.class);
planOptimizerManager = injector.getInstance(ConnectorPlanOptimizerManager.class);
clusterMemoryManager = injector.getInstance(ClusterMemoryManager.class);
statsCalculator = injector.getInstance(StatsCalculator.class);
eventListenerManager = ((TestingEventListenerManager) injector.getInstance(EventListenerManager.class));
clusterStateProvider = null;
planCheckerProviderManager = injector.getInstance(PlanCheckerProviderManager.class);
expressionManager.loadExpressionOptimizerFactories();
}
else if (resourceManager) {
dispatchManager = null;
queryManager = injector.getInstance(QueryManager.class);
resourceGroupManager = Optional.empty();
nodePartitioningManager = injector.getInstance(NodePartitioningManager.class);
planOptimizerManager = injector.getInstance(ConnectorPlanOptimizerManager.class);
clusterMemoryManager = null;
statsCalculator = null;
eventListenerManager = ((TestingEventListenerManager) injector.getInstance(EventListenerManager.class));
clusterStateProvider = injector.getInstance(ResourceManagerClusterStateProvider.class);
planCheckerProviderManager = null;
}
else if (coordinatorSidecar) {
dispatchManager = null;
queryManager = null;
resourceGroupManager = Optional.empty();
nodePartitioningManager = null;
planOptimizerManager = null;
clusterMemoryManager = null;
statsCalculator = null;
eventListenerManager = null;
clusterStateProvider = null;
planCheckerProviderManager = null;
}
else if (catalogServer) {
dispatchManager = null;
queryManager = null;
resourceGroupManager = Optional.empty();
nodePartitioningManager = null;
planOptimizerManager = null;
clusterMemoryManager = null;
statsCalculator = null;
eventListenerManager = null;
clusterStateProvider = null;
planCheckerProviderManager = null;
}
else {
dispatchManager = null;
queryManager = null;
resourceGroupManager = Optional.empty();
nodePartitioningManager = null;
planOptimizerManager = null;
clusterMemoryManager = null;
statsCalculator = null;
eventListenerManager = null;
clusterStateProvider = null;
planCheckerProviderManager = null;
}
localMemoryManager = injector.getInstance(LocalMemoryManager.class);
nodeManager = injector.getInstance(InternalNodeManager.class);
serviceSelectorManager = injector.getInstance(ServiceSelectorManager.class);
gracefulShutdownHandler = injector.getInstance(GracefulShutdownHandler.class);
taskManager = injector.getInstance(TaskManager.class);
shutdownAction = injector.getInstance(ShutdownAction.class);
announcer = injector.getInstance(Announcer.class);
requestBlocker = injector.getInstance(RequestBlocker.class);
serverInfoResource = injector.getInstance(ServerInfoResource.class);
pluginNodeManager = injector.getInstance(PluginNodeManager.class);
clientRequestFilterManager = injector.getInstance(ClientRequestFilterManager.class);
// Announce Thrift server address
DriftServer driftServer = injector.getInstance(DriftServer.class);
driftServer.start();
updateThriftServerAddressAnnouncement(announcer, driftServerPort(driftServer), nodeManager);
announcer.forceAnnounce();
refreshNodes();
}
private Map<String, String> getServerProperties(
boolean resourceManagerEnabled,
boolean catalogServerEnabled,
boolean coordinatorSidecarEnabled,
Map<String, String> properties,
String environment,
URI discoveryUri)
{
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put("coordinator", String.valueOf(coordinator));
serverProperties.put("resource-manager", String.valueOf(resourceManager));
serverProperties.put("resource-manager-enabled", String.valueOf(resourceManagerEnabled));
serverProperties.put("catalog-server", String.valueOf(catalogServer));
serverProperties.put("catalog-server-enabled", String.valueOf(catalogServerEnabled));
serverProperties.put("coordinator-sidecar-enabled", String.valueOf(coordinatorSidecarEnabled));
serverProperties.put("presto.version", "testversion");
serverProperties.put("task.concurrency", "4");
serverProperties.put("task.max-worker-threads", "4");
serverProperties.put("exchange.client-threads", "4");
serverProperties.put("optimizer.ignore-stats-calculator-failures", "false");
serverProperties.put("internal-communication.shared-secret", "internal-shared-secret");
if (coordinator || resourceManager || catalogServer || coordinatorSidecar) {
// enabling failure detector in tests can make them flakey
serverProperties.put("failure-detector.enabled", "false");
}
if (discoveryUri != null) {
requireNonNull(environment, "environment required when discoveryUri is present");
serverProperties.put("discovery.uri", discoveryUri.toString());
}
// Add these last so explicitly specified properties override the defaults
serverProperties.putAll(properties);
return ImmutableMap.copyOf(serverProperties);
}
@Override
public void close()
throws IOException
{
try {
if (lifeCycleManager != null) {
lifeCycleManager.stop();
}
}
catch (Exception e) {
throwIfUnchecked(e);
throw new RuntimeException(e);
}
finally {
if (isDirectory(dataDirectory) && !preserveData) {
deleteRecursively(dataDirectory, ALLOW_INSECURE);
}
}
}
public PluginManager getPluginManager()
{
return pluginManager;
}
public void installPlugin(Plugin plugin)
{
pluginManager.installPlugin(plugin);
}
public void installCoordinatorPlugin(CoordinatorPlugin plugin)
{
pluginManager.installCoordinatorPlugin(plugin);
}
public DispatchManager getDispatchManager()
{
return dispatchManager;
}
public QueryManager getQueryManager()
{
return queryManager;
}
public Plan getQueryPlan(QueryId queryId)
{
checkState(coordinator, "not a coordinator");
checkState(queryManager instanceof SqlQueryManager);
return ((SqlQueryManager) queryManager).getQueryPlan(queryId);
}
public void addFinalQueryInfoListener(QueryId queryId, StateChangeListener<QueryInfo> stateChangeListener)
{
checkState(coordinator, "not a coordinator");
checkState(queryManager instanceof SqlQueryManager);
((SqlQueryManager) queryManager).addFinalQueryInfoListener(queryId, stateChangeListener);
}
public ConnectorId createCatalog(String catalogName, String connectorName)
{
return createCatalog(catalogName, connectorName, ImmutableMap.of());
}
public ConnectorId createCatalog(String catalogName, String connectorName, Map<String, String> properties)
{
ConnectorId connectorId = connectorManager.createConnection(catalogName, connectorName, properties);
if (coordinator && !nodeSchedulerIncludeCoordinator) {
return connectorId;
}
updateConnectorIdAnnouncement(announcer, connectorId, nodeManager);
return connectorId;
}
public Path getDataDirectory()
{
return dataDirectory;
}
public URI getBaseUrl()
{
return server.getBaseUrl();
}
public URI resolve(String path)
{
return server.getBaseUrl().resolve(path);
}
public HostAndPort getAddress()
{
return HostAndPort.fromParts(getBaseUrl().getHost(), getBaseUrl().getPort());
}
public HostAndPort getHttpsAddress()
{
URI httpsUri = server.getHttpServerInfo().getHttpsUri();
return HostAndPort.fromParts(httpsUri.getHost(), httpsUri.getPort());
}
public CatalogManager getCatalogManager()
{
return catalogManager;
}
public TransactionManager getTransactionManager()
{
return transactionManager;
}
public SqlParser getSqlParser()
{
return sqlParser;
}
public Metadata getMetadata()
{
return metadata;
}
public StatsCalculator getStatsCalculator()
{
checkState(coordinator, "not a coordinator");
return statsCalculator;
}
public List<EventListener> getEventListeners()
{
checkState(coordinator, "not a coordinator");
return eventListenerManager.getEventListeners();
}
public TestingAccessControlManager getAccessControl()
{
return accessControl;
}
public ProcedureTester getProcedureTester()
{
return procedureTester;
}
public SplitManager getSplitManager()
{
return splitManager;
}
public PageSourceManager getPageSourceManager()
{
return pageSourceManager;
}
public Optional<InternalResourceGroupManager<?>> getResourceGroupManager()
{
return resourceGroupManager;
}
public InternalNodeManager getNodeManager()
{
return nodeManager;
}
public NodeManager getPluginNodeManager()
{
return pluginNodeManager;
}
public NodePartitioningManager getNodePartitioningManager()
{
return nodePartitioningManager;
}
public ConnectorPlanOptimizerManager getPlanOptimizerManager()
{
return planOptimizerManager;
}
public LocalMemoryManager getLocalMemoryManager()
{
return localMemoryManager;
}
public ClusterMemoryManager getClusterMemoryManager()
{
checkState(coordinator, "not a coordinator");
checkState(clusterMemoryManager instanceof ClusterMemoryManager);
return (ClusterMemoryManager) clusterMemoryManager;
}
public PlanCheckerProviderManager getPlanCheckerProviderManager()
{
return planCheckerProviderManager;
}
public GracefulShutdownHandler getGracefulShutdownHandler()
{
return gracefulShutdownHandler;
}
public ServerInfoResource getServerInfoResource()
{
return serverInfoResource;
}
public TaskManager getTaskManager()
{
return taskManager;
}
public ShutdownAction getShutdownAction()
{
return shutdownAction;
}
public ExpressionOptimizerManager getExpressionManager()
{
return expressionManager;
}
public boolean isCoordinator()
{
return coordinator;
}
public boolean nodeSchedulerIncludeCoordinator()
{
return nodeSchedulerIncludeCoordinator;
}
public boolean isResourceManager()
{
return resourceManager;
}
public final AllNodes refreshNodes()
{
serviceSelectorManager.forceRefresh();
nodeManager.refreshNodes();
return nodeManager.getAllNodes();
}
public final ResourceManagerClusterStateProvider getClusterStateProvider()
{
return clusterStateProvider;
}
public Set<InternalNode> getActiveNodesWithConnector(ConnectorId connectorId)
{
return nodeManager.getActiveConnectorNodes(connectorId);
}
public <T> T getInstance(Key<T> key)
{
return injector.getInstance(key);
}
public void stopResponding()
{
requestBlocker.block();
}
public void startResponding()
{
requestBlocker.unblock();
}
private static void updateConnectorIdAnnouncement(Announcer announcer, ConnectorId connectorId, InternalNodeManager nodeManager)
{
//
// This code was copied from PrestoServer, and is a hack that should be removed when the connectorId property is removed
//
// get existing announcement
ServiceAnnouncement announcement = getPrestoAnnouncement(announcer.getServiceAnnouncements());
// update connectorIds property
Map<String, String> properties = new LinkedHashMap<>(announcement.getProperties());
String property = nullToEmpty(properties.get("connectorIds"));
Set<String> connectorIds = new LinkedHashSet<>(Splitter.on(',').trimResults().omitEmptyStrings().splitToList(property));
connectorIds.add(connectorId.toString());
properties.put("connectorIds", Joiner.on(',').join(connectorIds));
// update announcement
announcer.removeServiceAnnouncement(announcement.getId());
announcer.addServiceAnnouncement(serviceAnnouncement(announcement.getType()).addProperties(properties).build());
announcer.forceAnnounce();
nodeManager.refreshNodes();
}
// TODO: announcement does not work for coordinator
private static void updateThriftServerAddressAnnouncement(Announcer announcer, int thriftPort, InternalNodeManager nodeManager)
{
// get existing announcement
ServiceAnnouncement announcement = getPrestoAnnouncement(announcer.getServiceAnnouncements());
// update announcement and thrift port property
Map<String, String> properties = new LinkedHashMap<>(announcement.getProperties());
properties.put("thriftServerPort", String.valueOf(thriftPort));
announcer.removeServiceAnnouncement(announcement.getId());
announcer.addServiceAnnouncement(serviceAnnouncement(announcement.getType()).addProperties(properties).build());
announcer.forceAnnounce();
nodeManager.refreshNodes();
}
private static ServiceAnnouncement getPrestoAnnouncement(Set<ServiceAnnouncement> announcements)
{
for (ServiceAnnouncement announcement : announcements) {
if (announcement.getType().equals("presto")) {
return announcement;
}
}
throw new RuntimeException("Presto announcement not found: " + announcements);
}
private static Path tempDirectory()
{
try {
return createTempDirectory("PrestoTest");
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static class RequestBlocker
implements Filter
{
private static final Object monitor = new Object();
private volatile boolean blocked;
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException
{
synchronized (monitor) {
while (blocked) {
try {
monitor.wait();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
chain.doFilter(request, response);
}
public void block()
{
synchronized (monitor) {
blocked = true;
}
}
public void unblock()
{
synchronized (monitor) {
blocked = false;
monitor.notifyAll();
}
}
@Override
public void init(FilterConfig filterConfig) {}
@Override
public void destroy() {}
}
private static int driftServerPort(DriftServer server)
{
return ((DriftNettyServerTransport) server.getServerTransport()).getPort();
}
public ClientRequestFilterManager getClientRequestFilterManager(List<ClientRequestFilterFactory> requestFilterFactory)
{
requestFilterFactory.forEach(clientRequestFilterManager::registerClientRequestFilterFactory);
clientRequestFilterManager.loadClientRequestFilters();
return clientRequestFilterManager;
}
}