TestDiscoveryNodeManager.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.metadata;
import com.facebook.airlift.discovery.client.ServiceDescriptor;
import com.facebook.airlift.discovery.client.ServiceSelector;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.testing.TestingHttpClient;
import com.facebook.airlift.http.client.testing.TestingResponse;
import com.facebook.airlift.node.NodeInfo;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.failureDetector.NoOpFailureDetector;
import com.facebook.presto.operator.TestingDriftClient;
import com.facebook.presto.server.InternalCommunicationConfig;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.annotation.concurrent.GuardedBy;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import static com.facebook.airlift.discovery.client.ServiceDescriptor.serviceDescriptor;
import static com.facebook.airlift.discovery.client.ServiceSelectorConfig.DEFAULT_POOL;
import static com.facebook.airlift.http.client.HttpStatus.OK;
import static com.facebook.airlift.testing.Assertions.assertEqualsIgnoreOrder;
import static com.facebook.presto.server.ServerConfig.POOL_TYPE;
import static com.facebook.presto.spi.NodeState.ACTIVE;
import static com.facebook.presto.spi.NodeState.INACTIVE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotSame;
@Test(singleThreaded = true)
public class TestDiscoveryNodeManager
{
private final NodeInfo workerNodeInfo = new NodeInfo("test");
private final NodeInfo coordinatorNodeInfo = new NodeInfo("test");
private final NodeInfo resourceManagerNodeInfo = new NodeInfo("test");
private final NodeInfo catalogServerNodeInfo = new NodeInfo("test");
private final NodeInfo coordinatorSidecarNodeInfo = new NodeInfo("test");
private final InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig();
private NodeVersion expectedVersion;
private Set<InternalNode> activeNodes;
private Set<InternalNode> workerNodes;
private Set<InternalNode> inactiveNodes;
private InternalNode coordinator;
private InternalNode inActiveCoordinator;
private InternalNode inActiveResourceManager;
private InternalNode resourceManager;
private InternalNode catalogServer;
private InternalNode inActiveCatalogServer;
private InternalNode coordinatorSidecar;
private InternalNode inActiveCoordinatorSidecar;
private InternalNode workerNode1;
private final PrestoNodeServiceSelector selector = new PrestoNodeServiceSelector();
private HttpClient testHttpClient;
private InternalNode workerNode2;
private InternalNode workerNode3;
private InternalNode inActiveWorkerNode1;
private InternalNode inActiveWorkerNode2;
@BeforeMethod
public void setup()
{
testHttpClient = new TestingHttpClient(input -> new TestingResponse(OK, ArrayListMultimap.create(), ACTIVE.name().getBytes()));
expectedVersion = new NodeVersion("1");
coordinator = new InternalNode(coordinatorNodeInfo.getNodeId(), URI.create("https://192.0.2.8"), expectedVersion, true);
resourceManager = new InternalNode(
resourceManagerNodeInfo.getNodeId(),
URI.create("https://192.0.2.9"),
expectedVersion,
false,
true,
false,
false);
catalogServer = new InternalNode(
catalogServerNodeInfo.getNodeId(),
URI.create("https://192.0.3.1"),
expectedVersion,
false,
false,
true,
false);
coordinatorSidecar = new InternalNode(
coordinatorSidecarNodeInfo.getNodeId(),
URI.create("https://192.0.3.1"),
expectedVersion,
false,
false,
false,
true);
workerNode1 = new InternalNode(workerNodeInfo.getNodeId(), URI.create("http://192.0.1.1"), expectedVersion, false);
workerNode2 = new InternalNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.1:8080"), expectedVersion, false);
workerNode3 = new InternalNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.3"), expectedVersion, false);
inActiveResourceManager = new InternalNode(
resourceManagerNodeInfo.getNodeId(),
URI.create("https://192.0.2.9"),
new NodeVersion("2"),
false,
true,
false,
false);
inActiveCatalogServer = new InternalNode(
catalogServerNodeInfo.getNodeId(),
URI.create("https://192.0.3.2"),
new NodeVersion("2"),
false,
false,
true,
false);
inActiveCoordinatorSidecar = new InternalNode(
coordinatorSidecarNodeInfo.getNodeId(),
URI.create("https://192.0.3.2"),
new NodeVersion("2"),
false,
false,
false,
true);
inActiveCoordinator = new InternalNode(UUID.randomUUID().toString(), URI.create("https://192.0.3.1"), new NodeVersion("2"), true);
inActiveWorkerNode1 = new InternalNode(UUID.randomUUID().toString(), URI.create("https://192.0.3.9"), NodeVersion.UNKNOWN, false);
inActiveWorkerNode2 = new InternalNode(UUID.randomUUID().toString(), URI.create("https://192.0.4.9"), new NodeVersion("2"), false);
workerNodes = ImmutableSet.of(workerNode1, workerNode2, workerNode3);
activeNodes = ImmutableSet.<InternalNode>builder()
.addAll(workerNodes)
.add(coordinator)
.add(resourceManager)
.add(catalogServer)
.add(coordinatorSidecar)
.build();
inactiveNodes = ImmutableSet.of(
inActiveCoordinator,
inActiveResourceManager,
inActiveCatalogServer,
inActiveCoordinatorSidecar,
inActiveWorkerNode1,
inActiveWorkerNode2);
selector.announceNodes(activeNodes, inactiveNodes);
}
@Test
public void testGetAllNodesForWorkerNode()
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, workerNodeInfo, new NoOpFailureDetector(), Optional.empty(), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
try {
AllNodes allNodes = manager.getAllNodes();
Set<InternalNode> activeNodes = allNodes.getActiveNodes();
assertEqualsIgnoreOrder(activeNodes, ImmutableSet.of(resourceManager, catalogServer, coordinatorSidecar));
for (InternalNode actual : activeNodes) {
for (InternalNode expected : this.activeNodes) {
assertNotSame(actual, expected);
}
}
assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE));
Set<InternalNode> inactiveNodes = allNodes.getInactiveNodes();
assertEqualsIgnoreOrder(inactiveNodes, ImmutableSet.of(inActiveResourceManager, inActiveCatalogServer, inActiveCoordinatorSidecar));
for (InternalNode actual : inactiveNodes) {
for (InternalNode expected : this.inactiveNodes) {
assertNotSame(actual, expected);
}
}
assertEqualsIgnoreOrder(inactiveNodes, manager.getNodes(INACTIVE));
}
finally {
manager.stop();
}
}
@Test
public void testGetAllNodesForCoordinator()
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, coordinatorNodeInfo, new NoOpFailureDetector(), Optional.empty(), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
try {
AllNodes allNodes = manager.getAllNodes();
Set<InternalNode> activeNodes = allNodes.getActiveNodes();
assertEqualsIgnoreOrder(activeNodes, this.activeNodes);
for (InternalNode actual : activeNodes) {
for (InternalNode expected : this.activeNodes) {
assertNotSame(actual, expected);
}
}
assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE));
Set<InternalNode> inactiveNodes = allNodes.getInactiveNodes();
assertEqualsIgnoreOrder(inactiveNodes, this.inactiveNodes);
for (InternalNode actual : inactiveNodes) {
for (InternalNode expected : this.inactiveNodes) {
assertNotSame(actual, expected);
}
}
assertEqualsIgnoreOrder(inactiveNodes, manager.getNodes(INACTIVE));
}
finally {
manager.stop();
}
}
@Test
public void testGetAllNodesForResourceManager()
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, resourceManagerNodeInfo, new NoOpFailureDetector(), Optional.empty(), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
try {
AllNodes allNodes = manager.getAllNodes();
Set<InternalNode> activeNodes = allNodes.getActiveNodes();
assertEqualsIgnoreOrder(activeNodes, this.activeNodes);
for (InternalNode actual : activeNodes) {
for (InternalNode expected : this.activeNodes) {
assertNotSame(actual, expected);
}
}
assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE));
Set<InternalNode> inactiveNodes = allNodes.getInactiveNodes();
assertEqualsIgnoreOrder(inactiveNodes, this.inactiveNodes);
for (InternalNode actual : inactiveNodes) {
for (InternalNode expected : this.inactiveNodes) {
assertNotSame(actual, expected);
}
}
assertEqualsIgnoreOrder(inactiveNodes, manager.getNodes(INACTIVE));
}
finally {
manager.stop();
}
}
@Test
public void testGetAllNodesForCoordinatorSidecar()
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, coordinatorSidecarNodeInfo, new NoOpFailureDetector(), Optional.empty(), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
try {
AllNodes allNodes = manager.getAllNodes();
Set<InternalNode> activeNodes = allNodes.getActiveNodes();
assertEqualsIgnoreOrder(activeNodes, this.activeNodes);
for (InternalNode actual : activeNodes) {
for (InternalNode expected : this.activeNodes) {
assertNotSame(actual, expected);
}
}
assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE));
Set<InternalNode> inactiveNodes = allNodes.getInactiveNodes();
assertEqualsIgnoreOrder(inactiveNodes, this.inactiveNodes);
for (InternalNode actual : inactiveNodes) {
for (InternalNode expected : this.inactiveNodes) {
assertNotSame(actual, expected);
}
}
assertEqualsIgnoreOrder(inactiveNodes, manager.getNodes(INACTIVE));
}
finally {
manager.stop();
}
}
@Test
public void testGetCurrentNode()
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, workerNodeInfo, new NoOpFailureDetector(), Optional.empty(), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
try {
assertEquals(manager.getCurrentNode(), workerNode1);
}
finally {
manager.stop();
}
}
@Test
public void testGetCoordinators()
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, resourceManagerNodeInfo, new NoOpFailureDetector(), Optional.empty(), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
try {
assertEquals(manager.getCoordinators(), ImmutableSet.of(coordinator));
}
finally {
manager.stop();
}
}
@Test
public void testGetResourceManagers()
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, workerNodeInfo, new NoOpFailureDetector(), Optional.of(host -> false), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
try {
assertEquals(manager.getResourceManagers(), ImmutableSet.of(resourceManager));
}
finally {
manager.stop();
}
}
@Test
public void testGetCatalogServers()
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, workerNodeInfo, new NoOpFailureDetector(), Optional.of(host -> false), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
try {
assertEquals(manager.getCatalogServers(), ImmutableSet.of(catalogServer));
}
finally {
manager.stop();
}
}
@Test
public void testGetCoordinatorSidecar()
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, coordinatorSidecarNodeInfo, new NoOpFailureDetector(), Optional.of(host -> false), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
try {
assertEquals(manager.getCoordinatorSidecars(), ImmutableSet.of(coordinatorSidecar));
}
finally {
manager.stop();
}
}
@SuppressWarnings("ResultOfObjectAllocationIgnored")
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".* current node not returned .*")
public void testGetCurrentNodeRequired()
{
new DiscoveryNodeManager(selector, new NodeInfo("test"), new NoOpFailureDetector(), Optional.empty(), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
}
@Test(timeOut = 60000)
public void testNodeChangeListener()
throws Exception
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, coordinatorNodeInfo, new NoOpFailureDetector(), Optional.empty(), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig);
try {
manager.startPollingNodeStates();
BlockingQueue<AllNodes> notifications = new ArrayBlockingQueue<>(100);
manager.addNodeChangeListener(notifications::add);
AllNodes allNodes = notifications.take();
assertEquals(allNodes.getActiveNodes(), activeNodes);
assertEquals(allNodes.getInactiveNodes(), inactiveNodes);
selector.announceNodes(ImmutableSet.of(workerNode1), ImmutableSet.of(coordinator));
allNodes = notifications.take();
assertEquals(allNodes.getActiveNodes(), ImmutableSet.of(workerNode1, coordinator));
assertEquals(allNodes.getActiveCoordinators(), ImmutableSet.of(coordinator));
selector.announceNodes(activeNodes, inactiveNodes);
allNodes = notifications.take();
assertEquals(allNodes.getActiveNodes(), activeNodes);
assertEquals(allNodes.getInactiveNodes(), inactiveNodes);
}
finally {
manager.stop();
}
}
public static class PrestoNodeServiceSelector
implements ServiceSelector
{
@GuardedBy("this")
private List<ServiceDescriptor> descriptors = ImmutableList.of();
private synchronized void announceNodes(Set<InternalNode> activeNodes, Set<InternalNode> inactiveNodes)
{
ImmutableList.Builder<ServiceDescriptor> descriptors = ImmutableList.builder();
for (InternalNode node : Iterables.concat(activeNodes, inactiveNodes)) {
descriptors.add(serviceDescriptor("presto")
.setNodeId(node.getNodeIdentifier())
.addProperty("http", node.getInternalUri().toString())
.addProperty("node_version", node.getNodeVersion().toString())
.addProperty("coordinator", String.valueOf(node.isCoordinator()))
.addProperty("resource_manager", String.valueOf(node.isResourceManager()))
.addProperty("catalog_server", String.valueOf(node.isCatalogServer()))
.addProperty("sidecar", String.valueOf(node.isCoordinatorSidecar()))
.addProperty(POOL_TYPE, node.getPoolType().name())
.build());
}
this.descriptors = descriptors.build();
}
@Override
public String getType()
{
return "presto";
}
@Override
public String getPool()
{
return DEFAULT_POOL;
}
@Override
public synchronized List<ServiceDescriptor> selectAllServices()
{
return descriptors;
}
@Override
public ListenableFuture<List<ServiceDescriptor>> refresh()
{
throw new UnsupportedOperationException();
}
}
}