ZxidRolloverTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.apache.zookeeper.test.ClientTest;
import org.apache.zookeeper.test.QuorumUtil;
import org.apache.zookeeper.test.QuorumUtil.PeerStruct;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Verify ZOOKEEPER-1277 - ensure that we handle epoch rollover correctly.
 */
public class ZxidRolloverTest extends ZKTestCase {

    private static final Logger LOG = LoggerFactory.getLogger(ZxidRolloverTest.class);

    private QuorumUtil qu;
    private ZooKeeperServer zksLeader;
    private ZooKeeper[] zkClients = new ZooKeeper[3];
    private CountdownWatcher[] zkClientWatchers = new CountdownWatcher[3];
    private int idxLeader;
    private int idxFollower;

    private ZooKeeper getClient(int idx) {
        return zkClients[idx - 1];
    }

    @BeforeEach
    public void setUp() throws Exception {
        System.setProperty("zookeeper.admin.enableServer", "false");
        System.setProperty("zookeeper.test.allowDiscontinuousProposals", "true");

        // set the snap count to something low so that we force log rollover
        // and verify that is working as part of the epoch rollover.
        SyncRequestProcessor.setSnapCount(7);

        qu = new QuorumUtil(1);
        startAll();

        for (int i = 0; i < zkClients.length; i++) {
            zkClientWatchers[i] = new CountdownWatcher();
            PeerStruct peer = qu.getPeer(i + 1);
            zkClients[i] = new ZooKeeper(
                    "127.0.0.1:" + peer.clientPort,
                    ClientTest.CONNECTION_TIMEOUT,
                    zkClientWatchers[i]);
        }
        waitForClientsConnected();
    }

    private void waitForClientsConnected() throws Exception {
        for (int i = 0; i < zkClients.length; i++) {
            zkClientWatchers[i].waitForConnected(ClientTest.CONNECTION_TIMEOUT);
            zkClientWatchers[i].reset();
        }
    }

    /**
     * Ensure all clients are able to talk to the service.
     */
    private void checkClientsConnected() throws Exception {
        for (int i = 0; i < zkClients.length; i++) {
            checkClientConnected(i + 1);
        }
    }

    /**
     * Ensure the client is able to talk to the server.
     *
     * @param idx the idx of the server the client is talking to
     */
    private void checkClientConnected(int idx) throws Exception {
        ZooKeeper zk = getClient(idx);
        if (zk == null) {
            return;
        }
        try {
            assertNull(zk.exists("/foofoofoo-connected", false));
        } catch (ConnectionLossException e) {
            // second chance...
            // in some cases, leader change in particular, the timing is
            // very tricky to get right in order to assure that the client has
            // disconnected and reconnected. In some cases the client will
            // disconnect, then attempt to reconnect before the server is
            // back, in which case we'll see another connloss on the operation
            // in the try, this catches that case and waits for the server
            // to come back
            PeerStruct peer = qu.getPeer(idx);
            assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + peer.clientPort, ClientBase.CONNECTION_TIMEOUT),
                    "Waiting for server down");

            assertNull(zk.exists("/foofoofoo-connected", false));
        }
    }

    /**
     * Ensure all clients are disconnected from the service.
     */
    private void checkClientsDisconnected() throws Exception {
        for (int i = 0; i < zkClients.length; i++) {
            checkClientDisconnected(i + 1);
        }
    }

    /**
     * Ensure the client is able to talk to the server
     *
     * @param idx the idx of the server the client is talking to
     */
    private void checkClientDisconnected(int idx) throws Exception {
        ZooKeeper zk = getClient(idx);
        if (zk == null) {
            return;
        }
        try {
            assertNull(zk.exists("/foofoofoo-disconnected", false));
            fail("expected client to be disconnected");
        } catch (KeeperException e) {
            // success
        }
    }

    private void startAll() throws Exception {
        qu.startAll();
        checkLeader();
        // all clients should be connected
        checkClientsConnected();
    }
    private void start(int idx) throws Exception {
        qu.start(idx);
        for (String hp : qu.getConnString().split(",")) {
            assertTrue(ClientBase.waitForServerUp(hp, ClientTest.CONNECTION_TIMEOUT), "waiting for server up");
        }

        checkLeader();
        // all clients should be connected
        checkClientsConnected();
    }

    private void checkLeader() {
        idxLeader = 1;
        while (qu.getPeer(idxLeader).peer.leader == null) {
            idxLeader++;
        }
        idxFollower = (idxLeader == 1 ? 2 : 1);

        zksLeader = qu.getPeer(idxLeader).peer.getActiveServer();
    }

    private void shutdownAll() throws Exception {
        qu.shutdownAll();
        // all clients should be disconnected
        checkClientsDisconnected();
    }

    private void shutdown(int idx) throws Exception {
        qu.shutdown(idx);

        // leader will shutdown, remaining followers will elect a new leader
        PeerStruct peer = qu.getPeer(idx);
        assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + peer.clientPort, ClientBase.CONNECTION_TIMEOUT),
                "Waiting for server down");

        // if idx is the leader then everyone will get disconnected,
        // otherwise if idx is a follower then just that client will get
        // disconnected
        if (idx == idxLeader) {
            checkClientDisconnected(idx);
            try {
                checkClientsDisconnected();
            } catch (AssertionError e) {
                // the clients may or may not have already reconnected
                // to the recovered cluster, force a check, but ignore
            }
        } else {
            checkClientDisconnected(idx);
        }
    }

    /** Reset the next zxid to be near epoch end */
    private void adjustEpochNearEnd() {
        zksLeader.setZxid((zksLeader.getZxid() & 0xffffffff00000000L) | 0xfffffffcL);
    }

    @AfterEach
    public void tearDown() throws Exception {
        System.clearProperty("zookeeper.test.allowDiscontinuousProposals");
        LOG.info("tearDown starting");
        for (int i = 0; i < zkClients.length; i++) {
            zkClients[i].close();
        }
        qu.shutdownAll();
    }

    /**
     * Create the znodes, this may fail if the lower 32 roll over, if so
     * wait for the clients to be re-connected after the re-election
     */
    private int createNodes(ZooKeeper zk, int start, int count) throws Exception {
        LOG.info("Creating nodes {} thru {}", start, (start + count));
        int j = 0;
        try {
            for (int i = start; i < start + count; i++) {
                zk.create("/foo" + i, new byte[0], Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL);
                j++;
            }
        } catch (ConnectionLossException e) {
            // this is ok - the leader has dropped leadership
            waitForClientsConnected();
        }
        return j;
    }
    /**
     * Verify the expected znodes were created and that the last znode, which
     * caused the roll-over, did not.
     */
    private void checkNodes(ZooKeeper zk, int start, int count) throws Exception {
        LOG.info("Validating nodes {} thru {}", start, (start + count));
        for (int i = start; i < start + count; i++) {
            assertNotNull(zk.exists("/foo" + i, false));
            LOG.error("Exists zxid:{}", Long.toHexString(zk.exists("/foo" + i, false).getCzxid()));
        }
        assertNull(zk.exists("/foo" + (start + count), false));
    }

    /**
     * Prior to the fix this test would hang for a while, then fail with
     * connection loss.
     */
    @Test
    public void testSimpleRolloverFollower() throws Exception {
        adjustEpochNearEnd();

        ZooKeeper zk = getClient((idxLeader == 1 ? 2 : 1));
        int countCreated = createNodes(zk, 0, 10);

        checkNodes(zk, 0, countCreated);
    }

    /**
     * Similar to testSimpleRollover, but ensure the cluster comes back,
     * has the right data, and is able to serve new requests.
     */
    @Test
    public void testRolloverThenRestart() throws Exception {
        ZooKeeper zk = getClient(idxFollower);

        int countCreated = createNodes(zk, 0, 10);

        adjustEpochNearEnd();

        countCreated += createNodes(zk, countCreated, 10);

        shutdownAll();
        startAll();
        zk = getClient(idxLeader);

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        adjustEpochNearEnd();

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        shutdownAll();
        startAll();
        zk = getClient(idxFollower);

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        shutdownAll();
        startAll();
        zk = getClient(idxLeader);

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        // sanity check
        assertTrue(countCreated > 0);
        assertTrue(countCreated < 60);
    }

    /**
     * Similar to testRolloverThenRestart, but ensure a follower comes back,
     * has the right data, and is able to serve new requests.
     */
    @Test
    public void testRolloverThenFollowerRestart() throws Exception {
        ZooKeeper zk = getClient(idxFollower);

        int countCreated = createNodes(zk, 0, 10);

        adjustEpochNearEnd();

        countCreated += createNodes(zk, countCreated, 10);

        shutdown(idxFollower);
        start(idxFollower);

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        adjustEpochNearEnd();

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        shutdown(idxFollower);
        start(idxFollower);

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        shutdown(idxFollower);
        start(idxFollower);

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        // sanity check
        assertTrue(countCreated > 0);
        assertTrue(countCreated < 60);
    }

    /**
     * Similar to testRolloverThenRestart, but ensure leadership can change,
     * comes back, has the right data, and is able to serve new requests.
     */
    @Test
    public void testRolloverThenLeaderRestart() throws Exception {
        ZooKeeper zk = getClient(idxLeader);

        int countCreated = createNodes(zk, 0, 10);

        adjustEpochNearEnd();

        checkNodes(zk, 0, countCreated);

        shutdown(idxLeader);
        start(idxLeader);
        zk = getClient(idxLeader);

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        adjustEpochNearEnd();

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        shutdown(idxLeader);
        start(idxLeader);
        zk = getClient(idxLeader);

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        shutdown(idxLeader);
        start(idxLeader);
        zk = getClient(idxFollower);

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        // sanity check
        assertTrue(countCreated > 0);
        assertTrue(countCreated < 50);
    }

    /**
     * Similar to testRolloverThenRestart, but ensure we can survive multiple
     * epoch rollovers between restarts.
     */
    @Test
    public void testMultipleRollover() throws Exception {
        ZooKeeper zk = getClient(idxFollower);

        int countCreated = createNodes(zk, 0, 10);

        adjustEpochNearEnd();

        countCreated += createNodes(zk, countCreated, 10);

        adjustEpochNearEnd();

        countCreated += createNodes(zk, countCreated, 10);

        adjustEpochNearEnd();

        countCreated += createNodes(zk, countCreated, 10);

        adjustEpochNearEnd();

        countCreated += createNodes(zk, countCreated, 10);

        shutdownAll();
        startAll();
        zk = getClient(idxFollower);

        adjustEpochNearEnd();

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        shutdown(idxLeader);
        start(idxLeader);
        zk = getClient(idxFollower);

        checkNodes(zk, 0, countCreated);
        countCreated += createNodes(zk, countCreated, 10);

        // sanity check
        assertTrue(countCreated > 0);
        assertTrue(countCreated < 70);
    }

}