DIFFSyncConsistencyTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Map;
import javax.security.sasl.SaslException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class DIFFSyncConsistencyTest extends QuorumPeerTestBase {

    private static int SERVER_COUNT = 3;
    private MainThread[] mt = new MainThread[SERVER_COUNT];

    @Test
    @Timeout(value = 120)
    public void testInconsistentDueToUncommittedLog() throws Exception {
        final int LEADER_TIMEOUT_MS = 10_000;
        final int[] clientPorts = new int[SERVER_COUNT];

        StringBuilder sb = new StringBuilder();
        String server;
        for (int i = 0; i < SERVER_COUNT; i++) {
            clientPorts[i] = PortAssignment.unique();
            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
                    + ":participant;127.0.0.1:" + clientPorts[i];
            sb.append(server + "\n");
        }
        String currentQuorumCfgSection = sb.toString();

        for (int i = 0; i < SERVER_COUNT; i++) {
            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
                @Override
                public TestQPMain getTestQPMain() {
                    return new MockTestQPMain();
                }
            };
            mt[i].start();
        }

        for (int i = 0; i < SERVER_COUNT; i++) {
            assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
                    "waiting for server " + i + " being up");
        }

        int leader = findLeader(mt);
        CountdownWatcher watch = new CountdownWatcher();
        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[leader], ClientBase.CONNECTION_TIMEOUT, watch);
        watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);

        Map<Long, Proposal> outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals;
        // Increase the tick time to delay the leader going to looking to allow us proposal a transaction while other
        // followers are offline.
        int previousTick = mt[leader].main.quorumPeer.tickTime;
        mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
        // Let the previous tick on the leader exhaust itself so the new tick time takes effect
        Thread.sleep(previousTick);

        LOG.info("LEADER ELECTED {}", leader);

        // Shutdown followers to make sure we don't accidentally send the proposal we are going to make to follower.
        // In other words, we want to make sure the followers get the proposal later through DIFF sync.
        for (int i = 0; i < SERVER_COUNT; i++) {
            if (i != leader) {
                mt[i].shutdown();
            }
        }

        // Send a create request to old leader and make sure it's synced to disk.
        try {
            zk.create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            fail("create /zk" + leader + " should have failed");
        } catch (KeeperException e) {
        }

        // Make sure that we actually did get it in process at the leader; there can be extra sessionClose proposals.
        assertTrue(outstanding.size() > 0);
        Proposal p = findProposalOfType(outstanding, OpCode.create);
        LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding);
        assertNotNull(p, "Old leader doesn't have 'create' proposal");

        // Make sure leader sync the proposal to disk.
        int sleepTime = 0;
        Long longLeader = (long) leader;
        while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) {
            if (sleepTime > 2000) {
                fail("Transaction not synced to disk within 1 second " + p.qvAcksetPairs.get(0).getAckset() + " expected " + leader);
            }
            Thread.sleep(100);
            sleepTime += 100;
        }

        // Start controlled followers where we deliberately make the follower fail once follower receive the UPTODATE
        // message from leader. Because followers only persist proposals from DIFF sync after UPTODATE, this can
        // deterministically simulate the situation where followers ACK NEWLEADER (which makes leader think she has the
        // quorum support, but actually not afterwards) but immediately fail afterwards without persisting the proposals
        // from DIFF sync.
        for (int i = 0; i < SERVER_COUNT; i++) {
            if (i == leader) {
                continue;
            }

            mt[i].start();
            int sleepCount = 0;
            while (mt[i].getQuorumPeer() == null) {
                ++sleepCount;
                if (sleepCount > 100) {
                    fail("Can't start follower " + i + " !");
                }
                Thread.sleep(100);
            }

            ((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(true);
            LOG.info("Follower {} started.", i);
        }

        // Verify leader can see it. The fact that leader can see it implies that
        // leader should, at this point in time, get a quorum of ACK of NEWLEADER
        // from two followers so leader can start serving requests; this also implies
        // that DIFF sync from leader to followers are finished at this point in time.
        // We then verify later that followers should have the same view after we shutdown
        // this leader, otherwise it's a violation of ZAB / sequential consistency.
        int c = 0;
        while (c < 100) {
            ++c;
            try {
                Stat stat = zk.exists("/zk" + leader, false);
                assertNotNull(stat, "server " + leader + " should have /zk");
                break;
            } catch (KeeperException.ConnectionLossException e) {

            }
            Thread.sleep(100);
        }

        // Shutdown all servers
        for (int i = 0; i < SERVER_COUNT; i++) {
            mt[i].shutdown();
        }
        waitForOne(zk, States.CONNECTING);

        // Now restart all servers except the old leader. Only old leader has the transaction sync to disk.
        // The old followers only had in memory view of the transaction, and they didn't have a chance
        // to sync to disk because we made them fail at UPTODATE.
        for (int i = 0; i < SERVER_COUNT; i++) {
            if (i == leader) {
                continue;
            }
            mt[i].start();
            int sleepCount = 0;
            while (mt[i].getQuorumPeer() == null) {
                ++sleepCount;
                if (sleepCount > 100) {
                    fail("Can't start follower " + i + " !");
                }
                Thread.sleep(100);
            }

            ((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(false);
            LOG.info("Follower {} started again.", i);
        }

        int newLeader = findLeader(mt);
        assertNotEquals(newLeader, leader, "new leader is still the old leader " + leader + " !!");

        // This simulates the case where clients connected to the old leader had a view of the data
        // "/zkX", but clients connect to the new leader does not have the same view of data (missing "/zkX").
        // This inconsistent view of the quorum exposed from leaders is a violation of ZAB.
        for (int i = 0; i < SERVER_COUNT; i++) {
            if (i != newLeader) {
                continue;
            }
            zk.close();
            zk = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, watch);
            watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
            Stat val = zk.exists("/zk" + leader, false);
            assertNotNull(val, "Data inconsistency detected! "
                    + "Server " + i + " should have a view of /zk" + leader + "!");
        }

        zk.close();
    }

    @AfterEach
    public void tearDown() {
        for (int i = 0; i < mt.length; i++) {
            try {
                mt[i].shutdown();
            } catch (InterruptedException e) {
                LOG.warn("Quorum Peer interrupted while shutting it down", e);
            }
        }
    }

    static class CustomQuorumPeer extends QuorumPeer {

        private volatile boolean injectError = false;

        public CustomQuorumPeer() throws SaslException {

        }

        @Override
        protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
            return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {

                @Override
                void readPacket(QuorumPacket pp) throws IOException {
                    /**
                     * In real scenario got SocketTimeoutException while reading
                     * the packet from leader because of network problem, but
                     * here throwing SocketTimeoutException based on whether
                     * error is injected or not
                     */
                    super.readPacket(pp);
                    if (injectError && pp.getType() == Leader.UPTODATE) {
                        String type = LearnerHandler.packetToString(pp);
                        throw new SocketTimeoutException("Socket timeout while reading the packet for operation "
                                + type);
                    }
                }

            };
        }

        public void setInjectError(boolean injectError) {
            this.injectError = injectError;
        }

    }

    static class MockTestQPMain extends TestQPMain {

        @Override
        protected QuorumPeer getQuorumPeer() throws SaslException {
            return new CustomQuorumPeer();
        }

    }

    private Proposal findProposalOfType(Map<Long, Proposal> proposals, int type) {
        for (Proposal proposal : proposals.values()) {
            if (proposal.request.getHdr().getType() == type) {
                return proposal;
            }
        }
        return null;
    }

    private int findLeader(MainThread[] mt) {
        for (int i = 0; i < mt.length; i++) {
            if (mt[i].main.quorumPeer.leader != null) {
                return i;
            }
        }
        return -1;
    }
}