FuzzySnapshotRelatedTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.sasl.SaslException;
import org.apache.jute.OutputArchive;
import org.apache.zookeeper.AsyncCallback.MultiCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.server.DataNode;
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Test cases used to catch corner cases due to fuzzy snapshot.
 */
public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {

    private static final Logger LOG = LoggerFactory.getLogger(FuzzySnapshotRelatedTest.class);

    MainThread[] mt = null;
    ZooKeeper[] zk = null;
    int[] clientPorts = null;
    int leaderId;
    int followerA;

    @BeforeEach
    public void setup() throws Exception {
        ZooKeeperServer.setDigestEnabled(true);

        LOG.info("Start up a 3 server quorum");
        final int ENSEMBLE_SERVERS = 3;
        clientPorts = new int[ENSEMBLE_SERVERS];
        StringBuilder sb = new StringBuilder();
        String server;

        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
            clientPorts[i] = PortAssignment.unique();
            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
                     + ":participant;127.0.0.1:" + clientPorts[i];
            sb.append(server + "\n");
        }
        String currentQuorumCfgSection = sb.toString();

        // start servers
        mt = new MainThread[ENSEMBLE_SERVERS];
        zk = new ZooKeeper[ENSEMBLE_SERVERS];
        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
                @Override
                public TestQPMain getTestQPMain() {
                    return new CustomizedQPMain();
                }
            };
            mt[i].start();
            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
        }
        QuorumPeerMainTest.waitForAll(zk, States.CONNECTED);
        LOG.info("all servers started");

        leaderId = -1;
        followerA = -1;
        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
            if (mt[i].main.quorumPeer.leader != null) {
                leaderId = i;
            } else if (followerA == -1) {
                followerA = i;
            }
        }
    }

    @AfterEach
    public void tearDown() throws Exception {
        ZooKeeperServer.setDigestEnabled(false);

        if (mt != null) {
            for (MainThread t : mt) {
                t.shutdown();
            }
        }

        if (zk != null) {
            for (ZooKeeper z : zk) {
                z.close();
            }
        }
    }

    @Test
    public void testMultiOpConsistency() throws Exception {
        LOG.info("Create a parent node");
        final String path = "/testMultiOpConsistency";
        createEmptyNode(zk[followerA], path, CreateMode.PERSISTENT);

        LOG.info("Hook to catch the 2nd sub create node txn in multi-op");
        CustomDataTree dt = (CustomDataTree) mt[followerA].main.quorumPeer.getZkDb().getDataTree();

        final ZooKeeperServer zkServer = mt[followerA].main.quorumPeer.getActiveServer();

        String node1 = path + "/1";
        String node2 = path + "/2";

        dt.addNodeCreateListener(node2, new NodeCreateListener() {
            @Override
            public void process(String path) {
                LOG.info("Take a snapshot");
                try {
                    zkServer.takeSnapshot(true);
                } catch (final IOException e) {
                    // ignored as it should never reach here because of System.exit() call
                }
            }
        });

        LOG.info("Issue a multi op to create 2 nodes");
        zk[followerA].multi(Arrays.asList(
                Op.create(node1, node1.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                Op.create(node2, node2.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)));

        LOG.info("Restart the server");
        mt[followerA].shutdown();
        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);

        mt[followerA].start();
        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);

        LOG.info("Make sure the node consistent with leader");
        assertEquals(
                new String(zk[leaderId].getData(node2, null, null)),
                new String(zk[followerA].getData(node2, null, null)));
    }

    /**
     * It's possible during SNAP sync, the parent is serialized before the
     * child get deleted during sending the snapshot over.
     *
     * In which case, we need to make sure the pzxid get correctly updated
     * when applying the txns received.
     */
    @Test
    public void testPZxidUpdatedDuringSnapSyncing() throws Exception {
        LOG.info("Enable force snapshot sync");
        System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");

        final String parent = "/testPZxidUpdatedWhenDeletingNonExistNode";
        final String child = parent + "/child";
        createEmptyNode(zk[leaderId], parent, CreateMode.PERSISTENT);
        createEmptyNode(zk[leaderId], child, CreateMode.EPHEMERAL);
        // create another child to test closeSession
        createEmptyNode(zk[leaderId], child + "1", CreateMode.EPHEMERAL);

        LOG.info("shutdown follower {}", followerA);
        mt[followerA].shutdown();
        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);

        LOG.info("Set up ZKDatabase to catch the node serializing in DataTree");
        addSerializeListener(leaderId, parent, child);

        LOG.info("Restart follower A to trigger a SNAP sync with leader");
        mt[followerA].start();
        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);

        LOG.info("Check and make sure the pzxid of the parent is the same on leader and follower A");
        compareStat(parent, leaderId, followerA);
    }

    /**
     * It's possible during taking fuzzy snapshot, the parent is serialized
     * before the child get deleted in the fuzzy range.
     *
     * In which case, we need to make sure the pzxid get correctly updated
     * when replaying the txns.
     */
    @Test
    public void testPZxidUpdatedWhenLoadingSnapshot() throws Exception {

        final String parent = "/testPZxidUpdatedDuringTakingSnapshot";
        final String child = parent + "/child";
        createEmptyNode(zk[followerA], parent, CreateMode.PERSISTENT);
        createEmptyNode(zk[followerA], child, CreateMode.EPHEMERAL);
        // create another child to test closeSession
        createEmptyNode(zk[leaderId], child + "1", CreateMode.EPHEMERAL);

        LOG.info("Set up ZKDatabase to catch the node serializing in DataTree");
        addSerializeListener(followerA, parent, child);

        LOG.info("Take snapshot on follower A");
        ZooKeeperServer zkServer = mt[followerA].main.quorumPeer.getActiveServer();
        zkServer.takeSnapshot(true);

        LOG.info("Restarting follower A to load snapshot");
        mt[followerA].shutdown();
        QuorumPeerMainTest.waitForOne(zk[followerA], States.CLOSED);
        mt[followerA].start();
        // zk[followerA] will be closed in addSerializeListener, re-create it
        zk[followerA] = new ZooKeeper("127.0.0.1:" + clientPorts[followerA],
                ClientBase.CONNECTION_TIMEOUT, this);

        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);

        LOG.info("Check and make sure the pzxid of the parent is the same on leader and follower A");
        compareStat(parent, leaderId, followerA);
    }

    @Test
    public void testMultiOpDigestConsistentDuringSnapshot() throws Exception {
        ServerMetrics.getMetrics().resetAll();

        LOG.info("Create some txns");
        final String path = "/testMultiOpDigestConsistentDuringSnapshot";
        createEmptyNode(zk[followerA], path, CreateMode.PERSISTENT);

        CustomDataTree dt =
                (CustomDataTree) mt[followerA].main.quorumPeer.getZkDb().getDataTree();
        final CountDownLatch setDataLatch = new CountDownLatch(1);
        final CountDownLatch continueSetDataLatch = new CountDownLatch(1);
        final ZooKeeper followerZk = zk[followerA];
        dt.setDigestSerializeListener(new DigestSerializeListener() {
            @Override
            public void process() {
                LOG.info("Trigger a multi op in async");
                followerZk.multi(Arrays.asList(
                        Op.create("/multi0", "/multi0".getBytes(),
                                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                        Op.setData(path, "new data".getBytes(), -1)
                ), new MultiCallback() {
                    @Override
                    public void processResult(int rc, String path, Object ctx,
                            List<OpResult> opResults) {}
                }, null);

                LOG.info("Wait for the signal to continue");
                try {
                    setDataLatch.await(3, TimeUnit.SECONDS);
                } catch (Exception e) {
                    LOG.error("Error while waiting for set data txn, {}", e);
                }
            }

            @Override
            public void finished() {
                LOG.info("Finished writing digest out, continue");
                continueSetDataLatch.countDown();
            }
        });

        dt.setDataListener(new SetDataTxnListener() {
            @Override
            public void process() {
                setDataLatch.countDown();
                try {
                    continueSetDataLatch.await(3, TimeUnit.SECONDS);
                } catch (Exception e) {
                    LOG.error("Error while waiting for continue signal, {}", e);
                }
            }
        });

        LOG.info("Trigger a snapshot");
        ZooKeeperServer zkServer = mt[followerA].main.quorumPeer.getActiveServer();
        zkServer.takeSnapshot(true);
        checkNoMismatchReported();

        LOG.info("Restart the server to load the snapshot again");
        mt[followerA].shutdown();
        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
        mt[followerA].start();
        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);

        LOG.info("Make sure there is nothing caught in the digest mismatch");
        checkNoMismatchReported();

    }

    private void checkNoMismatchReported() {
        long mismatch = (long) MetricsUtils.currentServerMetrics().get("digest_mismatches_count");

        assertFalse(mismatch > 0, "The mismatch count should be zero but is: " + mismatch);
    }

    private void addSerializeListener(int sid, String parent, String child) {
        final ZooKeeper zkClient = zk[sid];
        CustomDataTree dt = (CustomDataTree) mt[sid].main.quorumPeer.getZkDb().getDataTree();
        dt.addListener(parent, new NodeSerializeListener() {
            @Override
            public void nodeSerialized(String path) {
                try {
                    zkClient.delete(child, -1);
                    zkClient.close();
                    LOG.info("Deleted the child node after the parent is serialized");
                } catch (Exception e) {
                    LOG.error("Error when deleting node {}", e);
                }
            }
        });
    }

    private void compareStat(String path, int sid, int compareWithSid) throws Exception {
        ZooKeeper[] compareZk = new ZooKeeper[2];
        compareZk[0] = new ZooKeeper("127.0.0.1:" + clientPorts[sid],
                ClientBase.CONNECTION_TIMEOUT, this);
        compareZk[1] = new ZooKeeper("127.0.0.1:" + clientPorts[compareWithSid],
                ClientBase.CONNECTION_TIMEOUT, this);
        QuorumPeerMainTest.waitForAll(compareZk, States.CONNECTED);

        try {
            Stat stat1 = new Stat();
            compareZk[0].getData(path, null, stat1);

            Stat stat2 = new Stat();
            compareZk[1].getData(path, null, stat2);

            assertEquals(stat1, stat2);
        } finally {
            for (ZooKeeper z: compareZk) {
                z.close();
            }
        }
    }

    @Test
    public void testGlobalSessionConsistency() throws Exception {
        LOG.info("Hook to catch the commitSession event on followerA");
        CustomizedQPMain followerAMain = (CustomizedQPMain) mt[followerA].main;
        final ZooKeeperServer zkServer = followerAMain.quorumPeer.getActiveServer();

        // only take snapshot for the next global session we're going to create
        final AtomicBoolean shouldTakeSnapshot = new AtomicBoolean(true);
        followerAMain.setCommitSessionListener(new CommitSessionListener() {
            @Override
            public void process(long sessionId) {
                LOG.info("Take snapshot");
                if (shouldTakeSnapshot.getAndSet(false)) {
                    try {
                        zkServer.takeSnapshot(true);
                    } catch (IOException e) {
                        // ignored as it should never reach here because of System.exit() call
                    }
                }
            }
        });

        LOG.info("Create a global session");
        ZooKeeper globalClient = new ZooKeeper(
                "127.0.0.1:" + clientPorts[followerA],
                ClientBase.CONNECTION_TIMEOUT,
                this);
        QuorumPeerMainTest.waitForOne(globalClient, States.CONNECTED);

        LOG.info("Restart followerA to load the data from disk");
        mt[followerA].shutdown();
        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);

        mt[followerA].start();
        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);

        LOG.info("Make sure the global sessions are consistent with leader");

        Map<Long, Integer> globalSessionsOnLeader = mt[leaderId].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
        Map<Long, Integer> globalSessionsOnFollowerA = mt[followerA].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
        LOG.info("sessions are {}, {}", globalSessionsOnLeader.keySet(), globalSessionsOnFollowerA.keySet());
        assertTrue(globalSessionsOnFollowerA.keySet().containsAll(globalSessionsOnLeader.keySet()));
    }

    private void createEmptyNode(ZooKeeper zk, String path, CreateMode mode) throws Exception {
        zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, mode);
    }

    interface NodeCreateListener {

        void process(String path);

    }

    interface DigestSerializeListener {
        void process();

        void finished();
    }

    interface SetDataTxnListener {
        void process();
    }

    static class CustomDataTree extends DataTree {

        Map<String, NodeCreateListener> nodeCreateListeners = new HashMap<>();
        Map<String, NodeSerializeListener> listeners = new HashMap<>();
        DigestSerializeListener digestListener;
        SetDataTxnListener setListener;

        @Override
        public void serializeNodeData(OutputArchive oa, String path, DataNode node) throws IOException {
            super.serializeNodeData(oa, path, node);
            NodeSerializeListener listener = listeners.get(path);
            if (listener != null) {
                listener.nodeSerialized(path);
            }
        }

        public void addListener(String path, NodeSerializeListener listener) {
            listeners.put(path, listener);
        }

        @Override
        public void createNode(
                final String path,
                byte[] data,
                List<ACL> acl,
                long ephemeralOwner,
                int parentCVersion,
                long zxid,
                long time,
                Stat outputStat) throws NoNodeException, NodeExistsException {
            NodeCreateListener listener = nodeCreateListeners.get(path);
            if (listener != null) {
                listener.process(path);
            }
            super.createNode(path, data, acl, ephemeralOwner, parentCVersion, zxid, time, outputStat);
        }

        public void addNodeCreateListener(String path, NodeCreateListener listener) {
            nodeCreateListeners.put(path, listener);
        }

        public void setDigestSerializeListener(DigestSerializeListener listener) {
            this.digestListener = listener;
        }

        public void setDataListener(SetDataTxnListener listener) {
            this.setListener = listener;
        }

        @Override
        public boolean serializeZxidDigest(OutputArchive oa) throws IOException {
            if (digestListener != null) {
                digestListener.process();
            }
            boolean result = super.serializeZxidDigest(oa);
            if (digestListener != null) {
                digestListener.finished();
            }
            return result;
        }

        public Stat setData(String path, byte data[], int version, long zxid,
                long time) throws NoNodeException {
            if (setListener != null) {
                setListener.process();
            }

            return super.setData(path, data, version, zxid, time);
        }
    }

    interface NodeSerializeListener {

        void nodeSerialized(String path);

    }

    interface CommitSessionListener {

        void process(long sessionId);

    }

    static class CustomizedQPMain extends TestQPMain {

        CommitSessionListener commitSessionListener;

        public void setCommitSessionListener(CommitSessionListener listener) {
            this.commitSessionListener = listener;
        }

        @Override
        protected QuorumPeer getQuorumPeer() throws SaslException {
            return new QuorumPeer() {
                @Override
                public void setZKDatabase(ZKDatabase database) {
                    super.setZKDatabase(new ZKDatabase(this.getTxnFactory()) {
                        @Override
                        public DataTree createDataTree() {
                            return new CustomDataTree();
                        }
                    });
                }

                @Override
                protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
                    return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb()) {
                        @Override
                        public void createSessionTracker() {
                            sessionTracker = new LearnerSessionTracker(
                                    this,
                                    getZKDatabase().getSessionWithTimeOuts(),
                                    this.tickTime,
                                    self.getMyId(),
                                    self.areLocalSessionsEnabled(),
                                    getZooKeeperServerListener()) {

                                public synchronized boolean commitSession(
                                        long sessionId, int sessionTimeout) {
                                    if (commitSessionListener != null) {
                                        commitSessionListener.process(sessionId);
                                    }
                                    return super.commitSession(sessionId, sessionTimeout);
                                }
                            };
                        }
                    });
                }
            };
        }

    }

}