RaceConditionTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.net.SocketException;
import javax.security.sasl.SaslException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.PrepRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.txn.DeleteTxn;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This test class contains test cases related to race condition in complete
 * ZooKeeper
 */
public class RaceConditionTest extends QuorumPeerTestBase {

    protected static final Logger LOG = LoggerFactory.getLogger(RaceConditionTest.class);
    private static int SERVER_COUNT = 3;
    private MainThread[] mt;

    /**
     * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2380.
     * Deadlock while shutting down the ZooKeeper
     */

    @Test
    @Timeout(value = 30)
    public void testRaceConditionBetweenLeaderAndAckRequestProcessor() throws Exception {
        mt = startQuorum();
        // get leader
        QuorumPeer leader = getLeader(mt);
        long oldLeaderCurrentEpoch = leader.getCurrentEpoch();
        assertNotNull(leader, "Leader should not be null");
        // shutdown 2 followers so that leader does not have majority and goes
        // into looking state or following/leading state.
        shutdownFollowers(mt);
        /**
         * <pre>
         * Verify that there is no deadlock in following ways:
         * 1) If leader is in LOOKING or FOLLOWING, we are sure there is no deadlock.
         * 2) If leader in in LEADING state then we have to check that this LEADING state is
         * after the leader election, not the old LEADING state.
         * </pre>
         */
        boolean leaderStateChanged = ClientBase
            .waitForServerState(leader, 15000, QuorumStats.Provider.LOOKING_STATE, QuorumStats.Provider.FOLLOWING_STATE);
        // Wait for the old leader to start completely
        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + leader.getClientPort(), CONNECTION_TIMEOUT),
            "Failed to bring up the old leader server");
        assertTrue(leaderStateChanged || (leader.getCurrentEpoch() > oldLeaderCurrentEpoch),
            "Leader failed to transition to new state. Current state is " + leader.getServerState());
    }

    @AfterEach
    public void tearDown() {
        // stop all severs
        if (null != mt) {
            for (int i = 0; i < SERVER_COUNT; i++) {
                try {
                    // With the defect, leader hangs here also, but with fix
                    // it does not
                    mt[i].shutdown();
                } catch (InterruptedException e) {
                    LOG.warn("Quorum Peer interrupted while shutting it down", e);
                }
            }
        }
    }

    private MainThread[] startQuorum() throws IOException {
        final int[] clientPorts = new int[SERVER_COUNT];
        StringBuilder sb = new StringBuilder();
        String server;

        for (int i = 0; i < SERVER_COUNT; i++) {
            clientPorts[i] = PortAssignment.unique();
            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
                     + ":participant;127.0.0.1:" + clientPorts[i];
            sb.append(server + "\n");
        }
        String currentQuorumCfgSection = sb.toString();
        MainThread[] mt = new MainThread[SERVER_COUNT];

        // start all the servers
        for (int i = 0; i < SERVER_COUNT; i++) {
            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
                @Override
                public TestQPMain getTestQPMain() {
                    return new MockTestQPMain();
                }
            };
            mt[i].start();
        }

        // ensure all servers started
        for (int i = 0; i < SERVER_COUNT; i++) {
            assertTrue(
                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
                "waiting for server " + i + " being up");
        }
        return mt;
    }

    private QuorumPeer getLeader(MainThread[] mt) {
        for (int i = mt.length - 1; i >= 0; i--) {
            QuorumPeer quorumPeer = mt[i].getQuorumPeer();
            if (quorumPeer != null && ServerState.LEADING == quorumPeer.getPeerState()) {
                return quorumPeer;
            }
        }
        return null;
    }

    private void shutdownFollowers(MainThread[] mt) {
        for (int i = 0; i < mt.length; i++) {
            CustomQuorumPeer quorumPeer = (CustomQuorumPeer) mt[i].getQuorumPeer();
            if (quorumPeer != null && ServerState.FOLLOWING == quorumPeer.getPeerState()) {
                quorumPeer.setStopPing(true);
            }
        }
    }

    private static class CustomQuorumPeer extends QuorumPeer {

        private boolean stopPing;

        public CustomQuorumPeer() throws SaslException {
        }

        public void setStopPing(boolean stopPing) {
            this.stopPing = stopPing;
        }

        @Override
        protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {

            return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {
                @Override
                protected void processPacket(QuorumPacket qp) throws Exception {
                    if (stopPing && qp.getType() == Leader.PING) {
                        LOG.info("Follower skipped ping");
                        throw new SocketException("Socket time out while sending the ping response");
                    } else {
                        super.processPacket(qp);
                    }
                }
            };
        }

        @Override
        protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
            LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, this, this.getZkDb()) {
                @Override
                protected void setupRequestProcessors() {
                    /**
                     * This method is overridden to make a place to inject
                     * MockSyncRequestProcessor
                     */
                    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
                    RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
                    commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
                    commitProcessor.start();
                    ProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(this, commitProcessor);
                    proposalProcessor.initialize();
                    prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
                    prepRequestProcessor.start();
                    firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
                }

            };
            return new Leader(this, zk);
        }

    }

    private static class MockSyncRequestProcessor extends SyncRequestProcessor {

        public MockSyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
            super(zks, nextProcessor);
        }

        @Override
        public void shutdown() {
            /**
             * Add a request so that something is there for SyncRequestProcessor
             * to process, while we are in shutdown flow
             */
            DeleteTxn deleteTxn = new DeleteTxn("/deadLockIssue");
            Request request = new Request(null, 0, 0, ZooDefs.OpCode.delete, RequestRecord.fromRecord(deleteTxn), null);
            processRequest(request);
            super.shutdown();
        }

    }

    private static class MockProposalRequestProcessor extends ProposalRequestProcessor {

        public MockProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
            super(zks, nextProcessor);

            /**
             * The only purpose here is to inject the mocked
             * SyncRequestProcessor
             */
            AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
            syncProcessor = new MockSyncRequestProcessor(zks, ackProcessor);
        }

    }

    private static class MockTestQPMain extends TestQPMain {

        @Override
        protected QuorumPeer getQuorumPeer() throws SaslException {
            return new CustomQuorumPeer();
        }

    }

}