QuorumPeerTestBase.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/**
 *
 */

package org.apache.zookeeper.server.quorum;

import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.server.admin.JettyAdminServer;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.QuorumBase;
import org.junit.jupiter.api.AfterEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Has some common functionality for tests that work with QuorumPeers. Override
 * process(WatchedEvent) to implement the Watcher interface
 */
public class QuorumPeerTestBase extends ZKTestCase implements Watcher {

    protected static final Logger LOG = LoggerFactory.getLogger(QuorumPeerTestBase.class);

    public static final int TIMEOUT = 5000;

    protected Servers servers;
    protected int numServers = 0;

    @AfterEach
    public void tearDown() throws Exception {
        if (servers == null) {
            LOG.info("No servers to shutdown!");
            return;
        }

        if (servers.zk != null) {
            servers.shutdownAllClients();
        }

        if (servers.mt != null) {
            servers.shutDownAllServers();
        }
    }

    public void process(WatchedEvent event) {
        // ignore for this test
    }

    public static class TestQPMain extends QuorumPeerMain {

        public void shutdown() {
            // ensure it closes - in particular wait for thread to exit
            if (quorumPeer != null) {
                QuorumBase.shutdown(quorumPeer);
            }
        }

    }

    public static class MainThread implements Runnable {

        final File confFile;
        final File tmpDir;

        public static final int UNSET_STATIC_CLIENTPORT = -1;
        // standalone mode doesn't need myid
        public static final int UNSET_MYID = -1;

        volatile TestQPMain main;

        File baseDir;
        private int myid;
        private int clientPort;
        private String quorumCfgSection;
        private Map<String, String> otherConfigs;

        /**
         * Create a MainThread
         *
         * @param myid
         * @param clientPort
         * @param quorumCfgSection
         * @param otherConfigs
         * @param tickTime initLimit will be 10 and syncLimit will be 5
         * @throws IOException
         */
        public MainThread(int myid, int clientPort, String quorumCfgSection, Map<String, String> otherConfigs, int tickTime) throws IOException {
            baseDir = ClientBase.createTmpDir();
            this.myid = myid;
            this.clientPort = clientPort;
            this.quorumCfgSection = quorumCfgSection;
            this.otherConfigs = otherConfigs;
            LOG.info("id = {} tmpDir = {} clientPort = {}", myid, baseDir, clientPort);
            confFile = new File(baseDir, "zoo.cfg");

            FileWriter fwriter = new FileWriter(confFile);
            fwriter.write("tickTime=" + tickTime + "\n");
            fwriter.write("initLimit=10\n");
            fwriter.write("syncLimit=5\n");
            fwriter.write("connectToLearnerMasterLimit=5\n");

            tmpDir = new File(baseDir, "data");
            if (!tmpDir.mkdir()) {
                throw new IOException("Unable to mkdir " + tmpDir);
            }

            // Convert windows path to UNIX to avoid problems with "\"
            String dir = tmpDir.toString();
            String osname = java.lang.System.getProperty("os.name");
            if (osname.toLowerCase().contains("windows")) {
                dir = dir.replace('\\', '/');
            }
            fwriter.write("dataDir=" + dir + "\n");

            fwriter.write("clientPort=" + clientPort + "\n");

            // write extra configurations
            Set<Entry<String, String>> entrySet = otherConfigs.entrySet();
            for (Entry<String, String> entry : entrySet) {
                fwriter.write(entry.getKey() + "=" + entry.getValue() + "\n");
            }

            fwriter.write(quorumCfgSection + "\n");
            fwriter.flush();
            fwriter.close();

            File myidFile = new File(tmpDir, "myid");
            fwriter = new FileWriter(myidFile);
            fwriter.write(Integer.toString(myid));
            fwriter.flush();
            fwriter.close();
        }

        public MainThread(int myid, String quorumCfgSection) throws IOException {
            this(myid, quorumCfgSection, true);
        }

        public MainThread(int myid, String quorumCfgSection, Integer secureClientPort, boolean writeDynamicConfigFile) throws IOException {
            this(myid, UNSET_STATIC_CLIENTPORT, JettyAdminServer.DEFAULT_PORT, secureClientPort, quorumCfgSection, null, null, writeDynamicConfigFile, null);
        }

        public MainThread(int myid, String quorumCfgSection, boolean writeDynamicConfigFile) throws IOException {
            this(myid, UNSET_STATIC_CLIENTPORT, quorumCfgSection, writeDynamicConfigFile);
        }

        public MainThread(int myid, int clientPort, String quorumCfgSection, boolean writeDynamicConfigFile) throws IOException {
            this(myid, clientPort, JettyAdminServer.DEFAULT_PORT, quorumCfgSection, null, null, writeDynamicConfigFile);
        }

        public MainThread(int myid, int clientPort, String quorumCfgSection, String peerType, boolean writeDynamicConfigFile) throws IOException {
            this(myid, clientPort, JettyAdminServer.DEFAULT_PORT, quorumCfgSection, null, peerType, writeDynamicConfigFile);
        }

        public MainThread(int myid, int clientPort, String quorumCfgSection, boolean writeDynamicConfigFile, String version) throws IOException {
            this(myid, clientPort, JettyAdminServer.DEFAULT_PORT, quorumCfgSection, null, null, writeDynamicConfigFile, version);
        }

        public MainThread(int myid, int clientPort, String quorumCfgSection, String configs) throws IOException {
            this(myid, clientPort, JettyAdminServer.DEFAULT_PORT, quorumCfgSection, configs, null, true);
        }

        public MainThread(int myid, int clientPort, int adminServerPort, String quorumCfgSection, String configs) throws IOException {
            this(myid, clientPort, adminServerPort, quorumCfgSection, configs, null, true);
        }

        public MainThread(int myid, int clientPort, int adminServerPort, String quorumCfgSection, String configs, String peerType, boolean writeDynamicConfigFile) throws IOException {
            this(myid, clientPort, adminServerPort, quorumCfgSection, configs, peerType, writeDynamicConfigFile, null);
        }

        public MainThread(int myid, int clientPort, int adminServerPort, String quorumCfgSection, String configs, String peerType, boolean writeDynamicConfigFile, String version) throws IOException {
            this(myid, clientPort, adminServerPort, null, quorumCfgSection, configs, peerType, writeDynamicConfigFile, version);
        }

        public MainThread(int myid, int clientPort, int adminServerPort, Integer secureClientPort, String quorumCfgSection, String configs, String peerType, boolean writeDynamicConfigFile, String version) throws IOException {
            tmpDir = ClientBase.createTmpDir();
            LOG.info("id = {} tmpDir = {} clientPort = {} adminServerPort = {}", myid, tmpDir, clientPort, adminServerPort);

            File dataDir = new File(tmpDir, "data");
            if (!dataDir.mkdir()) {
                throw new IOException("Unable to mkdir " + dataDir);
            }

            confFile = new File(tmpDir, "zoo.cfg");

            FileWriter fwriter = new FileWriter(confFile);
            fwriter.write("tickTime=4000\n");
            fwriter.write("initLimit=10\n");
            fwriter.write("syncLimit=5\n");
            fwriter.write("connectToLearnerMasterLimit=5\n");
            if (configs != null) {
                fwriter.write(configs);
            }

            // Convert windows path to UNIX to avoid problems with "\"
            String dir = PathUtils.normalizeFileSystemPath(dataDir.toString());

            fwriter.write("dataDir=" + dir + "\n");
            fwriter.write("admin.serverPort=" + adminServerPort + "\n");

            // For backward compatibility test, some tests create dynamic configuration
            // without setting client port.
            // This could happen both in static file or dynamic file.
            if (clientPort != UNSET_STATIC_CLIENTPORT) {
                fwriter.write("clientPort=" + clientPort + "\n");
            }

            if (secureClientPort != null) {
                fwriter.write("secureClientPort=" + secureClientPort + "\n");
            }

            if (peerType != null) {
                fwriter.write("peerType=" + peerType + "\n");
            }

            if (writeDynamicConfigFile) {
                String dynamicConfigFilename = createDynamicFile(quorumCfgSection, version);
                fwriter.write("dynamicConfigFile=" + dynamicConfigFilename + "\n");
            } else {
                fwriter.write(quorumCfgSection);
            }
            fwriter.flush();
            fwriter.close();

            File myidFile = new File(dataDir, "myid");
            fwriter = new FileWriter(myidFile);
            fwriter.write(Integer.toString(myid));
            fwriter.flush();
            fwriter.close();

            ClientBase.createInitializeFile(dataDir);
        }

        private String createDynamicFile(String quorumCfgSection, String version) throws IOException {
            String filename = "zoo.cfg.dynamic";
            if (version != null) {
                filename = filename + "." + version;
            }

            File dynamicConfigFile = new File(tmpDir, filename);
            String dynamicConfigFilename = PathUtils.normalizeFileSystemPath(dynamicConfigFile.toString());

            FileWriter fDynamicConfigWriter = new FileWriter(dynamicConfigFile);
            fDynamicConfigWriter.write(quorumCfgSection);
            fDynamicConfigWriter.flush();
            fDynamicConfigWriter.close();

            return dynamicConfigFilename;
        }

        public File[] getDynamicFiles() {
            return getFilesWithPrefix("zoo.cfg.dynamic");
        }

        public File[] getFilesWithPrefix(final String prefix) {
            return tmpDir.listFiles(new FilenameFilter() {
                @Override
                public boolean accept(File dir, String name) {
                    return name.startsWith(prefix);
                }
            });
        }

        public File getFileByName(String filename) {
            File f = new File(tmpDir.getPath(), filename);
            return f.isFile() ? f : null;
        }

        public void writeTempDynamicConfigFile(String nextQuorumCfgSection, String version) throws IOException {
            File nextDynamicConfigFile = new File(tmpDir, "zoo.cfg" + QuorumPeerConfig.nextDynamicConfigFileSuffix);
            FileWriter fwriter = new FileWriter(nextDynamicConfigFile);
            fwriter.write(nextQuorumCfgSection + "\n" + "version=" + version);
            fwriter.flush();
            fwriter.close();
        }

        public MainThread(int myid, int clientPort, String quorumCfgSection) throws IOException {
            this(myid, clientPort, quorumCfgSection, new HashMap<>());
        }

        public MainThread(int myid, int clientPort, String quorumCfgSection, Map<String, String> otherConfigs) throws IOException {
            this(myid, clientPort, quorumCfgSection, otherConfigs, 4000);
        }

        Thread currentThread;

        public synchronized void start() {
            main = getTestQPMain();
            currentThread = new Thread(this);
            currentThread.start();
        }

        /**
         * start the QuorumPeer with the passed TestQPMain
         *
         * @param testQPMain the TestQPMain to use
         */

        public synchronized void start(final TestQPMain testQPMain) {
            main = testQPMain;
            currentThread = new Thread(this);
            currentThread.start();
        }

        public TestQPMain getTestQPMain() {
            return new TestQPMain();
        }

        public void run() {
            String[] args = new String[1];
            args[0] = confFile.toString();
            try {
                main.initializeAndRun(args);
            } catch (Exception e) {
                // test will still fail even though we just log/ignore
                LOG.error("unexpected exception in run", e);
            } finally {
                currentThread = null;
            }
        }

        public void shutdown() throws InterruptedException {
            Thread t = currentThread;
            if (t != null && t.isAlive()) {
                main.shutdown();
                t.join(500);
            }
        }

        public void join(long timeout) throws InterruptedException {
            Thread t = currentThread;
            if (t != null) {
                t.join(timeout);
            }
        }

        public boolean isAlive() {
            Thread t = currentThread;
            return t != null && t.isAlive();
        }

        public void reinitialize() throws IOException {
            File dataDir = main.quorumPeer.getTxnFactory().getDataLogDir();
            ClientBase.recursiveDelete(dataDir);
            ClientBase.createInitializeFile(dataDir.getParentFile());
        }

        public boolean isQuorumPeerRunning() {
            return main.quorumPeer != null;
        }

        public String getPropFromStaticFile(String key) throws IOException {
            Properties props = new Properties();
            props.load(new FileReader(confFile));
            return props.getProperty(key, "");
        }

        public QuorumPeer getQuorumPeer() {
            return main.quorumPeer;
        }

        public void deleteBaseDir() {
            ClientBase.recursiveDelete(baseDir);
        }

        public int getMyid() {
            return myid;
        }

        public int getClientPort() {
            return clientPort;
        }

        public String getQuorumCfgSection() {
            return quorumCfgSection;
        }

        public Map<String, String> getOtherConfigs() {
            return otherConfigs;
        }

        public File getConfFile() {
            return confFile;
        }

    }

    // This class holds the servers and clients for those servers
    public static class Servers {

        public MainThread[] mt;
        public ZooKeeper[] zk;
        public int[] clientPorts;
        public int[] adminPorts;

        public void shutDownAllServers() throws InterruptedException {
            for (MainThread t : mt) {
                t.shutdown();
            }
        }

        public void shutdownAllClients() throws InterruptedException {
            for (ZooKeeper zk : zk) {
                zk.close(5000);
            }
        }

        public void restartAllServersAndClients(Watcher watcher) throws IOException, InterruptedException {
            int index = 0;
            for (MainThread t : mt) {
                if (!t.isAlive()) {
                    System.setProperty("zookeeper.admin.serverPort", String.valueOf(adminPorts[index]));
                    t.start();
                    index++;
                }
            }
            for (int i = 0; i < zk.length; i++) {
                restartClient(i, watcher);
            }
        }

        public void restartClient(int clientIndex, Watcher watcher) throws IOException, InterruptedException {
            if (zk[clientIndex] != null) {
                zk[clientIndex].close();
            }
            zk[clientIndex] = new ZooKeeper(
                    "127.0.0.1:" + clientPorts[clientIndex],
                    ClientBase.CONNECTION_TIMEOUT,
                    watcher);
        }

        public int findLeader() {
            for (int i = 0; i < mt.length; i++) {
                if (mt[i].main.quorumPeer.leader != null) {
                    LOG.info("Leader is {}", i);
                    return i;
                }
            }
            LOG.info("Cannot find Leader");
            return -1;
        }

        public int findAnyFollower() {
            for (int i = 0; i < mt.length; i++) {
                if (mt[i].main.quorumPeer.follower != null) {
                    LOG.info("Follower is {}", i);
                    return i;
                }
            }
            LOG.info("Cannot find any follower");
            return -1;
        }

        public int findAnyObserver() {
            for (int i = 0; i < mt.length; i++) {
                if (mt[i].main.quorumPeer.observer != null) {
                    LOG.info("Observer is {}", i);
                    return i;
                }
            }
            LOG.info("Cannot find any observer");
            return -1;
        }
    }

    protected Servers LaunchServers(int numServers) throws IOException, InterruptedException {
        return LaunchServers(numServers, (Integer) null);
    }

    protected Servers LaunchServers(int numServers, Map<String, String> otherConfigs)
        throws IOException, InterruptedException {
        return LaunchServers(numServers, 0, null, otherConfigs);
    }

    protected Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException {
        return LaunchServers(numServers, 0, tickTime);
    }

    protected Servers LaunchServers(int numServers, int numObservers, Integer tickTime)
        throws IOException, InterruptedException {
        return LaunchServers(numServers, numObservers, tickTime, new HashMap<>());
    }

    /** * This is a helper function for launching a set of servers
     *
     * @param numServers the number of participant servers
     * @param numObservers the number of observer servers
     * @param tickTime A ticktime to pass to MainThread
     * @param otherConfigs any zoo.cfg configuration
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    protected Servers LaunchServers(int numServers, int numObservers, Integer tickTime,
        Map<String, String> otherConfigs) throws IOException, InterruptedException {
        int SERVER_COUNT = numServers + numObservers;
        QuorumPeerMainTest.Servers svrs = new QuorumPeerMainTest.Servers();
        svrs.clientPorts = new int[SERVER_COUNT];
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < SERVER_COUNT; i++) {
            svrs.clientPorts[i] = PortAssignment.unique();
            String role = i < numServers ? "participant" : "observer";
            sb.append(String.format("server.%d=127.0.0.1:%d:%d:%s;127.0.0.1:%d\n",
                    i, PortAssignment.unique(), PortAssignment.unique(), role,
                    svrs.clientPorts[i]));
        }

        svrs.adminPorts = new int[SERVER_COUNT];
        for (int i = 0; i < SERVER_COUNT; i++) {
            svrs.adminPorts[i] = PortAssignment.unique();
        }

        String quorumCfgSection = sb.toString();

        svrs.mt = new MainThread[SERVER_COUNT];
        svrs.zk = new ZooKeeper[SERVER_COUNT];
        for (int i = 0; i < SERVER_COUNT; i++) {
            if (tickTime != null) {
                svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, otherConfigs, tickTime);
            } else {
                svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, otherConfigs);
            }
            System.setProperty("zookeeper.admin.serverPort", String.valueOf(svrs.adminPorts[i]));
            svrs.mt[i].start();
            svrs.restartClient(i, this);
        }

        waitForAll(svrs, ZooKeeper.States.CONNECTED);

        return svrs;
    }

    public static void waitForOne(ZooKeeper zk, ZooKeeper.States state) throws InterruptedException {
        int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
        while (zk.getState() != state) {
            if (iterations-- == 0) {
                throw new RuntimeException("Waiting too long " + zk.getState() + " != " + state);
            }
            Thread.sleep(500);
        }
    }

    protected void waitForAll(Servers servers, ZooKeeper.States state) throws InterruptedException {
        waitForAll(servers.zk, state);
    }

    public static void waitForAll(ZooKeeper[] zks, ZooKeeper.States state) throws InterruptedException {
        int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
        boolean someoneNotConnected = true;
        while (someoneNotConnected) {
            if (iterations-- == 0) {
                logStates(zks);
                ClientBase.logAllStackTraces();
                throw new RuntimeException("Waiting too long");
            }

            someoneNotConnected = false;
            for (ZooKeeper zk : zks) {
                if (zk.getState() != state) {
                    someoneNotConnected = true;
                    break;
                }
            }
            Thread.sleep(1000);
        }
    }

    public static void logStates(ZooKeeper[] zks) {
        StringBuilder sbBuilder = new StringBuilder("Connection States: {");
        for (int i = 0; i < zks.length; i++) {
            sbBuilder.append(i + " : " + zks[i].getState() + ", ");
        }
        sbBuilder.append('}');
        LOG.error(sbBuilder.toString());
    }

}