ObserverMasterTestBase.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.test;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.zookeeper.server.util.PortForwarder;

public class ObserverMasterTestBase extends QuorumPeerTestBase implements Watcher {

    protected CountDownLatch latch;
    protected ZooKeeper zk;
    protected int CLIENT_PORT_QP1;
    protected int CLIENT_PORT_QP2;
    protected int CLIENT_PORT_OBS;
    protected int OM_PORT;
    protected MainThread q1;
    protected MainThread q2;
    protected MainThread q3;
    protected WatchedEvent lastEvent = null;

    protected PortForwarder setUp(final int omProxyPort, final Boolean testObserverMaster) throws IOException {
        ClientBase.setupTestEnv();
        final int PORT_QP1 = PortAssignment.unique();
        final int PORT_QP2 = PortAssignment.unique();
        final int PORT_OBS = PortAssignment.unique();
        final int PORT_QP_LE1 = PortAssignment.unique();
        final int PORT_QP_LE2 = PortAssignment.unique();
        final int PORT_OBS_LE = PortAssignment.unique();

        CLIENT_PORT_QP1 = PortAssignment.unique();
        CLIENT_PORT_QP2 = PortAssignment.unique();
        CLIENT_PORT_OBS = PortAssignment.unique();

        OM_PORT = PortAssignment.unique();

        String quorumCfgSection =
                "server.1=127.0.0.1:" + (PORT_QP1)
                        + ":" + (PORT_QP_LE1) + ";" +  CLIENT_PORT_QP1
                        + "\nserver.2=127.0.0.1:" + (PORT_QP2)
                        + ":" + (PORT_QP_LE2) + ";" + CLIENT_PORT_QP2
                        + "\nserver.3=127.0.0.1:" + (PORT_OBS)
                        + ":" + (PORT_OBS_LE) + ":observer" + ";" + CLIENT_PORT_OBS;

        String extraCfgs = testObserverMaster ? String.format("observerMasterPort=%d%n", OM_PORT) : "";
        String extraCfgsObs = testObserverMaster ? String.format("observerMasterPort=%d%n", omProxyPort <= 0 ? OM_PORT : omProxyPort) : "";

        PortForwarder forwarder = null;
        if (testObserverMaster && omProxyPort >= 0) {
            forwarder = new PortForwarder(omProxyPort, OM_PORT);
        }

        q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection, extraCfgs);
        q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, extraCfgs);
        q3 = new MainThread(3, CLIENT_PORT_OBS, quorumCfgSection, extraCfgsObs);
        q1.start();
        q2.start();
        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT),
                "waiting for server 1 being up");
        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT),
                "waiting for server 2 being up");
        return forwarder;
    }

    protected void shutdown() throws InterruptedException {
        LOG.info("Shutting down all servers");

        zk.close();

        q1.shutdown();
        q2.shutdown();
        q3.shutdown();

        assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT),
                "Waiting for server 1 to shut down");
        assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT),
                "Waiting for server 2 to shut down");
        assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT),
                "Waiting for server 3 to shut down");
    }

    /**
     * Implementation of watcher interface.
     */
    public void process(WatchedEvent event) {
        lastEvent = event;
        if (latch != null) {
            latch.countDown();
        }
        LOG.info("Latch got event :: {}", event);
    }
}