TestClusterSizeMonitor.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.execution;

import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.metadata.InMemoryNodeManager;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.spi.ConnectorId;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static com.facebook.airlift.concurrent.MoreFutures.addExceptionCallback;
import static com.facebook.airlift.concurrent.MoreFutures.addSuccessCallback;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public class TestClusterSizeMonitor
{
    public static final ConnectorId CONNECTOR_ID = new ConnectorId("dummy");
    public static final int DESIRED_WORKER_COUNT = 10;
    public static final int DESIRED_COORDINATOR_COUNT = 3;
    public static final int DESIRED_COORDINATOR_COUNT_ACTIVE = 2;
    public static final int DESIRED_RESOURCE_MANAGER_COUNT_ACTIVE = 1;
    public static final int DESIRED_WORKER_COUNT_ACTIVE = 10;
    public static final int DESIRED_COORDINATOR_SIDECAR_COUNT = 1;

    private InMemoryNodeManager nodeManager;
    private ClusterSizeMonitor monitor;
    private CountDownLatch minWorkersLatch;
    private CountDownLatch minCoordinatorSidecarsLatch;
    private AtomicInteger numWorkers;
    private AtomicInteger numCoordinators;
    private AtomicInteger numResourceManagers;
    private AtomicInteger numCoordinatorSidecars;
    private AtomicBoolean workersTimeout;
    private AtomicBoolean coordinatorSidecarsTimeout;

    @BeforeMethod
    public void setUp()
    {
        numWorkers = new AtomicInteger(0);
        numCoordinators = new AtomicInteger(0);
        numResourceManagers = new AtomicInteger(0);
        numCoordinatorSidecars = new AtomicInteger(0);

        workersTimeout = new AtomicBoolean();
        coordinatorSidecarsTimeout = new AtomicBoolean();

        nodeManager = new InMemoryNodeManager();
        monitor = new ClusterSizeMonitor(
                nodeManager,
                false,
                DESIRED_WORKER_COUNT,
                DESIRED_WORKER_COUNT_ACTIVE,
                new Duration(4, SECONDS),
                DESIRED_COORDINATOR_COUNT,
                DESIRED_COORDINATOR_COUNT_ACTIVE,
                new Duration(4, SECONDS),
                new Duration(4, SECONDS),
                DESIRED_RESOURCE_MANAGER_COUNT_ACTIVE,
                true);

        minWorkersLatch = new CountDownLatch(1);
        minCoordinatorSidecarsLatch = new CountDownLatch(1);

        monitor.start();
    }

    @AfterMethod
    public void tearDown()
    {
        monitor.stop();
    }

    @Test(timeOut = 60_000)
    public void testWaitForMinimumWorkers()
            throws InterruptedException
    {
        ListenableFuture<?> workersFuture = waitForMinimumWorkers();

        for (int i = numWorkers.get() + 1; i < DESIRED_WORKER_COUNT - 1; i++) {
            assertFalse(workersTimeout.get());
            addWorker(nodeManager);
        }
        assertFalse(monitor.hasRequiredWorkers());
        assertFalse(workersTimeout.get());
        assertEquals(minWorkersLatch.getCount(), 1);
        addWorker(nodeManager);
        minWorkersLatch.await(1, SECONDS);
        assertTrue(workersFuture.isDone());
        assertFalse(workersTimeout.get());
        assertTrue(monitor.hasRequiredWorkers());
    }

    @Test(timeOut = 10_000)
    public void testTimeoutWaitingForWorkers()
            throws InterruptedException
    {
        waitForMinimumWorkers();

        assertFalse(workersTimeout.get());
        addWorker(nodeManager);
        assertFalse(workersTimeout.get());
        assertEquals(minWorkersLatch.getCount(), 1);

        Thread.sleep(SECONDS.toMillis(5));

        assertTrue(workersTimeout.get());
        assertEquals(minWorkersLatch.getCount(), 0);
    }

    @Test(timeOut = 60_000)
    public void testWaitForMinimumCoordinatorSidecars()
            throws InterruptedException
    {
        ListenableFuture<?> coordinatorSidecarsFuture = waitForMinimumCoordinatorSidecars();
        assertFalse(monitor.hasRequiredCoordinatorSidecars());
        assertFalse(coordinatorSidecarsTimeout.get());
        assertEquals(minCoordinatorSidecarsLatch.getCount(), 1);
        addCoordinatorSidecar(nodeManager);
        minCoordinatorSidecarsLatch.await(1, SECONDS);
        assertTrue(coordinatorSidecarsFuture.isDone());
        assertFalse(coordinatorSidecarsTimeout.get());
        assertTrue(monitor.hasRequiredCoordinatorSidecars());
    }

    @Test(timeOut = 10_000)
    public void testTimeoutWaitingForCoordinatorSidecars()
            throws InterruptedException
    {
        waitForMinimumCoordinatorSidecars();
        assertFalse(coordinatorSidecarsTimeout.get());
        assertEquals(minCoordinatorSidecarsLatch.getCount(), 1);
        Thread.sleep(SECONDS.toMillis(5));
        assertTrue(coordinatorSidecarsTimeout.get());
        assertEquals(minCoordinatorSidecarsLatch.getCount(), 0);
    }

    @Test
    public void testHasRequiredCoordinatorSidecars()
            throws InterruptedException
    {
        assertFalse(monitor.hasRequiredCoordinatorSidecars());
        for (int i = numCoordinatorSidecars.get(); i < DESIRED_COORDINATOR_SIDECAR_COUNT; i++) {
            addCoordinatorSidecar(nodeManager);
        }
        assertTrue(monitor.hasRequiredCoordinatorSidecars());
    }

    @Test
    public void testHasRequiredCoordinatorSidecarsMoreThanOne()
            throws InterruptedException
    {
        assertFalse(monitor.hasRequiredCoordinatorSidecars());
        for (int i = numCoordinatorSidecars.get(); i < DESIRED_COORDINATOR_SIDECAR_COUNT + 1; i++) {
            addCoordinatorSidecar(nodeManager);
        }
        assertTrue(monitor.hasRequiredCoordinatorSidecars());
    }

    @Test
    public void testHasRequiredResourceManagers()
            throws InterruptedException
    {
        assertFalse(monitor.hasRequiredResourceManagers());
        for (int i = numResourceManagers.get(); i < DESIRED_RESOURCE_MANAGER_COUNT_ACTIVE; i++) {
            addResourceManager(nodeManager);
        }
        assertTrue(monitor.hasRequiredResourceManagers());
    }

    @Test
    public void testHasRequiredCoordinators()
            throws InterruptedException
    {
        assertFalse(monitor.hasRequiredCoordinators());
        for (int i = numResourceManagers.get(); i < DESIRED_COORDINATOR_COUNT_ACTIVE; i++) {
            addCoordinator(nodeManager);
        }
        assertTrue(monitor.hasRequiredCoordinators());
    }

    private ListenableFuture<?> waitForMinimumWorkers()
    {
        ListenableFuture<?> workersFuture = monitor.waitForMinimumWorkers();
        addSuccessCallback(workersFuture, () -> {
            assertFalse(workersTimeout.get());
            minWorkersLatch.countDown();
        });
        addExceptionCallback(workersFuture, () -> {
            assertTrue(workersTimeout.compareAndSet(false, true));
            minWorkersLatch.countDown();
        });
        return workersFuture;
    }

    private ListenableFuture<?> waitForMinimumCoordinatorSidecars()
    {
        ListenableFuture<?> coordinatorSidecarsFuture = monitor.waitForMinimumCoordinatorSidecars();
        addSuccessCallback(coordinatorSidecarsFuture, () -> {
            assertFalse(coordinatorSidecarsTimeout.get());
            minCoordinatorSidecarsLatch.countDown();
            minCoordinatorSidecarsLatch.countDown();
        });
        addExceptionCallback(coordinatorSidecarsFuture, () -> {
            assertTrue(coordinatorSidecarsTimeout.compareAndSet(false, true));
            minCoordinatorSidecarsLatch.countDown();
        });
        return coordinatorSidecarsFuture;
    }

    private void addWorker(InMemoryNodeManager nodeManager)
    {
        String identifier = "worker/" + numWorkers.incrementAndGet();
        nodeManager.addNode(CONNECTOR_ID, new InternalNode(identifier, URI.create("localhost/" + identifier), new NodeVersion("1"), false));
    }

    private void addCoordinator(InMemoryNodeManager nodeManager)
    {
        String identifier = "coordinator/" + numCoordinators.incrementAndGet();
        nodeManager.addNode(CONNECTOR_ID, new InternalNode(identifier, URI.create("localhost/" + identifier), new NodeVersion("1"), true));
    }

    private void addResourceManager(InMemoryNodeManager nodeManager)
    {
        String identifier = "resource_manager/" + numResourceManagers.incrementAndGet();
        nodeManager.addNode(
                CONNECTOR_ID,
                new InternalNode(
                        identifier,
                        URI.create("localhost/" + identifier),
                        new NodeVersion("1"),
                        false,
                        true,
                        false,
                        false));
    }

    private void addCoordinatorSidecar(InMemoryNodeManager nodeManager)
    {
        String identifier = "coordinator_sidecar/" + numCoordinatorSidecars.incrementAndGet();
        nodeManager.addNode(
                CONNECTOR_ID,
                new InternalNode(
                        identifier,
                        URI.create("localhost/" + identifier),
                        new NodeVersion("1"),
                        false,
                        false,
                        false,
                        true));
    }
}