ReconfigDuringLeaderSyncTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {

    private static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
    private static int SERVER_COUNT = 3;
    private MainThread[] mt;
    private static boolean bakAsyncSending;

    public void setup(boolean asyncSending) {
        System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
        Learner.setAsyncSending(asyncSending);
        QuorumPeerConfig.setReconfigEnabled(true);
    }

    @BeforeAll
    public static void saveAsyncSendingFlag() {
        bakAsyncSending = Learner.getAsyncSending();
    }

    @AfterAll
    public static void resetAsyncSendingFlag() {
        Learner.setAsyncSending(bakAsyncSending);
    }

    /**
     * <pre>
     * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2172.
     * Cluster crashes when reconfig a new node as a participant.
     * </pre>
     *
     * This issue occurs when reconfig's PROPOSAL and COMMITANDACTIVATE come in
     * between the snapshot and the UPTODATE. In this case processReconfig was
     * not invoked on the newly added node, and zoo.cfg.dynamic.next wasn't
     * deleted.
     */

    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    public void testDuringLeaderSync(boolean asyncSending) throws Exception {
        setup(asyncSending);
        final int[] clientPorts = new int[SERVER_COUNT + 1];
        StringBuilder sb = new StringBuilder();
        String[] serverConfig = new String[SERVER_COUNT + 1];

        for (int i = 0; i < SERVER_COUNT; i++) {
            clientPorts[i] = PortAssignment.unique();
            serverConfig[i] = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
                              + ":participant;127.0.0.1:" + clientPorts[i];
            sb.append(serverConfig[i] + "\n");
        }
        String currentQuorumCfgSection = sb.toString();
        mt = new MainThread[SERVER_COUNT + 1];

        // start 3 servers
        for (int i = 0; i < SERVER_COUNT; i++) {
            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false);
            mt[i].start();
        }

        // ensure all servers started
        for (int i = 0; i < SERVER_COUNT; i++) {
            assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
                    "waiting for server " + i + " being up");
        }
        CountdownWatcher watch = new CountdownWatcher();
        ZooKeeperAdmin preReconfigClient = new ZooKeeperAdmin(
            "127.0.0.1:" + clientPorts[0],
            ClientBase.CONNECTION_TIMEOUT,
            watch);
        preReconfigClient.addAuthInfo("digest", "super:test".getBytes());
        watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);

        // new server joining
        int joinerId = SERVER_COUNT;
        clientPorts[joinerId] = PortAssignment.unique();
        serverConfig[joinerId] = "server." + joinerId + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
                                 + ":participant;127.0.0.1:" + clientPorts[joinerId];

        // Find leader id.
        int leaderId = -1;
        for (int i = 0; i < SERVER_COUNT; i++) {
            if (mt[i].main.quorumPeer.leader != null) {
                leaderId = i;
                break;
            }
        }
        assertFalse(leaderId == -1);

        // Joiner initial config consists of itself and the leader.
        sb = new StringBuilder();
        sb.append(serverConfig[leaderId] + "\n").append(serverConfig[joinerId] + "\n");

        /**
         * This server will delay the response to a NEWLEADER message, and run
         * reconfig command so that message at this processed in bellow order
         *
         * <pre>
         * NEWLEADER
         * reconfig's PROPOSAL
         * reconfig's COMMITANDACTIVATE
         * UPTODATE
         * </pre>
         */
        mt[joinerId] = new MainThread(joinerId, clientPorts[joinerId], sb.toString(), false) {
            @Override
            public TestQPMain getTestQPMain() {
                return new MockTestQPMain();
            }
        };
        mt[joinerId].start();
        CustomQuorumPeer qp = getCustomQuorumPeer(mt[joinerId]);

        // delete any already existing .next file
        String nextDynamicConfigFilename = qp.getNextDynamicConfigFilename();
        File nextDynaFile = new File(nextDynamicConfigFilename);
        nextDynaFile.delete();

        // call reconfig API when the new server has received
        // Leader.NEWLEADER
        while (true) {
            if (qp.isNewLeaderMessage()) {
                preReconfigClient.reconfigure(serverConfig[joinerId], null, null, -1, null, null);
                break;
            } else {
                // sleep for 10 millisecond and then again check
                Thread.sleep(10);
            }
        }
        watch = new CountdownWatcher();
        ZooKeeper postReconfigClient = new ZooKeeper(
                "127.0.0.1:" + clientPorts[joinerId],
                ClientBase.CONNECTION_TIMEOUT,
                watch);
        watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
        // do one successful operation on the newly added node
        postReconfigClient.create("/reconfigIssue", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertFalse(nextDynaFile.exists(), "zoo.cfg.dynamic.next is not deleted.");

        // verify that joiner has up-to-date config, including all four servers.
        for (long j = 0; j <= SERVER_COUNT; j++) {
            assertNotNull(qp.getQuorumVerifier().getVotingMembers().get(j),
                    "server " + j + " is not present in the new quorum");
        }

        // close clients
        preReconfigClient.close();
        postReconfigClient.close();
    }

    private static CustomQuorumPeer getCustomQuorumPeer(MainThread mt) {
        while (true) {
            QuorumPeer quorumPeer = mt.getQuorumPeer();
            if (null != quorumPeer) {
                return (CustomQuorumPeer) quorumPeer;
            } else {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @AfterEach
    public void tearDown() {
        // stop all severs
        if (null != mt) {
            for (int i = 0; i < mt.length; i++) {
                try {
                    mt[i].shutdown();
                } catch (InterruptedException e) {
                    LOG.warn("Quorum Peer interrupted while shutting it down", e);
                }
            }
        }
    }

    private static class CustomQuorumPeer extends QuorumPeer {

        private boolean newLeaderMessage = false;

        public CustomQuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit) throws IOException {
            super(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false, ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1), new QuorumMaj(quorumPeers));
        }

        /**
         * If true, after 100 millisecond NEWLEADER response is send to leader
         *
         * @return
         */
        public boolean isNewLeaderMessage() {
            return newLeaderMessage;
        }

        @Override
        protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {

            return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {

                @Override
                void writePacket(QuorumPacket pp, boolean flush) throws IOException {
                    if (pp != null && pp.getType() == Leader.ACK) {
                        newLeaderMessage = true;
                        try {
                            /**
                             * Delaying the ACK message, a follower sends as
                             * response to a NEWLEADER message, so that the
                             * leader has a chance to send the reconfig and only
                             * then the UPTODATE message.
                             */
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    super.writePacket(pp, flush);
                }
            };
        }

    }

    private static class MockTestQPMain extends TestQPMain {

        @Override
        public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
            quorumPeer = new CustomQuorumPeer(config.getQuorumVerifier().getAllMembers(), config.getDataDir(), config.getDataLogDir(), config.getClientPortAddress().getPort(), config.getElectionAlg(), config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit(), config.getConnectToLearnerMasterLimit());
            quorumPeer.setConfigFileName(config.getConfigFilename());
            quorumPeer.start();
            try {
                quorumPeer.join();
            } catch (InterruptedException e) {
                LOG.warn("Quorum Peer interrupted", e);
            }
        }

    }

}