Zab1_0Test.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import static org.apache.zookeeper.server.quorum.ZabUtils.MockLeader;
import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader;
import static org.apache.zookeeper.server.quorum.ZabUtils.createMockLeader;
import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.TestUtils;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.ErrorTxn;
import org.apache.zookeeper.txn.SetDataTxn;
import org.apache.zookeeper.txn.TxnHeader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Zab1_0Test extends ZKTestCase {

    private static final Logger LOG = LoggerFactory.getLogger(Zab1_0Test.class);

    @BeforeEach
    public void setUp() {
        System.setProperty("zookeeper.admin.enableServer", "false");
    }

    private static final class LeadThread extends Thread {

        private final Leader leader;

        private LeadThread(Leader leader) {
            this.leader = leader;
        }

        public void run() {
            try {
                leader.lead();
            } catch (InterruptedException e) {
                LOG.info("Leader thread interrupted", e);
            } catch (Exception e) {
                LOG.warn("Unexpected exception in leader thread", e);
            } finally {
                leader.shutdown("lead ended");
            }
        }

    }

    public static final class FollowerMockThread extends Thread {

        private final Leader leader;
        private final long followerSid;
        public long epoch = -1;
        public String msg = null;
        private boolean onlyGetEpochToPropose;

        private FollowerMockThread(long followerSid, Leader leader, boolean onlyGetEpochToPropose) {
            this.leader = leader;
            this.followerSid = followerSid;
            this.onlyGetEpochToPropose = onlyGetEpochToPropose;
        }

        public void run() {
            if (onlyGetEpochToPropose) {
                try {
                    epoch = leader.getEpochToPropose(followerSid, 0);
                } catch (Exception e) {
                }
            } else {
                try {
                    leader.waitForEpochAck(followerSid, new StateSummary(0, 0));
                    msg = "FollowerMockThread (id = " + followerSid + ")  returned from waitForEpochAck";
                } catch (Exception e) {
                }
            }
        }

    }
    @Test
    public void testLeaderInConnectingFollowers(@TempDir File testData) throws Exception {
        File tmpDir = File.createTempFile("test", "dir", testData);
        tmpDir.delete();
        tmpDir.mkdir();
        Leader leader = null;
        try {
            QuorumPeer peer = createQuorumPeer(tmpDir);
            leader = createLeader(tmpDir, peer);
            peer.leader = leader;
            peer.setAcceptedEpoch(5);

            FollowerMockThread f1 = new FollowerMockThread(1, leader, true);
            FollowerMockThread f2 = new FollowerMockThread(2, leader, true);
            f1.start();
            f2.start();

            // wait until followers time out in getEpochToPropose - they shouldn't return
            // normally because the leader didn't execute getEpochToPropose and so its epoch was not
            // accounted for
            f1.join(leader.self.getInitLimit() * leader.self.getTickTime() + 5000);
            f2.join(leader.self.getInitLimit() * leader.self.getTickTime() + 5000);

            // even though followers timed out, their ids are in connectingFollowers, and their
            // epoch were accounted for, so the leader should not block and since it started with
            // accepted epoch = 5 it should now have 6
            try {
                long epoch = leader.getEpochToPropose(leader.self.getMyId(), leader.self.getAcceptedEpoch());
                assertEquals(6, epoch, "leader got wrong epoch from getEpochToPropose");
            } catch (Exception e) {
                fail("leader timed out in getEpochToPropose");
            }
        } finally {
            if (leader != null) {
                leader.shutdown("end of test");
            }
            TestUtils.deleteFileRecursively(tmpDir);
        }
    }

    /**
     * In this test, the leader sets the last accepted epoch to 5. The call
     * to getEpochToPropose should set epoch to 6 and wait until another
     * follower executes it. If in getEpochToPropose we don't check if
     * lastAcceptedEpoch == epoch, then the call from the subsequent
     * follower with lastAcceptedEpoch = 6 doesn't change the value
     * of epoch, and the test fails. It passes with the fix to predicate.
     *
     * https://issues.apache.org/jira/browse/ZOOKEEPER-1343
     *
     *
     * @throws Exception
     */

    @Test
    public void testLastAcceptedEpoch(@TempDir File testData) throws Exception {
        File tmpDir = File.createTempFile("test", "dir", testData);
        tmpDir.delete();
        tmpDir.mkdir();
        Leader leader = null;
        LeadThread leadThread = null;
        try {
            QuorumPeer peer = createQuorumPeer(tmpDir);
            leader = createMockLeader(tmpDir, peer);
            peer.leader = leader;
            peer.setAcceptedEpoch(5);
            leadThread = new LeadThread(leader);
            leadThread.start();

            while (((MockLeader) leader).getCurrentEpochToPropose() != 6) {
                Thread.sleep(20);
            }

            try {
                long epoch = leader.getEpochToPropose(1, 6);
                assertEquals(7, epoch, "New proposed epoch is wrong");
            } catch (Exception e) {
                fail("Timed out in getEpochToPropose");
            }

        } finally {
            if (leader != null) {
                leader.shutdown("end of test");
            }
            if (leadThread != null) {
                leadThread.interrupt();
                leadThread.join();
            }
            TestUtils.deleteFileRecursively(tmpDir);
        }
    }

    @Test
    public void testLeaderInElectingFollowers(@TempDir File testData) throws Exception {
        File tmpDir = File.createTempFile("test", "dir", testData);
        tmpDir.delete();
        tmpDir.mkdir();
        Leader leader = null;
        try {
            QuorumPeer peer = createQuorumPeer(tmpDir);
            leader = createLeader(tmpDir, peer);
            peer.leader = leader;

            FollowerMockThread f1 = new FollowerMockThread(1, leader, false);
            FollowerMockThread f2 = new FollowerMockThread(2, leader, false);

            // things needed for waitForEpochAck to run (usually in leader.lead(), but we're not running leader here)
            leader.leaderStateSummary = new StateSummary(leader.self.getCurrentEpoch(), leader.zk.getLastProcessedZxid());

            f1.start();
            f2.start();

            // wait until followers time out in waitForEpochAck - they shouldn't return
            // normally because the leader didn't execute waitForEpochAck
            f1.join(leader.self.getInitLimit() * leader.self.getTickTime() + 5000);
            f2.join(leader.self.getInitLimit() * leader.self.getTickTime() + 5000);

            // make sure that they timed out and didn't return normally
            assertTrue(f1.msg == null, f1.msg + " without waiting for leader");
            assertTrue(f2.msg == null, f2.msg + " without waiting for leader");
        } finally {
            if (leader != null) {
                leader.shutdown("end of test");
            }
            TestUtils.deleteFileRecursively(tmpDir);
        }
    }

    static Socket[] getSocketPair() throws IOException {
        ServerSocket ss = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
        InetSocketAddress endPoint = (InetSocketAddress) ss.getLocalSocketAddress();
        Socket s = new Socket(endPoint.getAddress(), endPoint.getPort());
        return new Socket[]{s, ss.accept()};
    }
    static void readPacketSkippingPing(InputArchive ia, QuorumPacket qp) throws IOException {
        while (true) {
            ia.readRecord(qp, null);
            if (qp.getType() != Leader.PING) {
                return;
            }
        }
    }

    public interface LeaderConversation {

        void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws Exception;

    }

    public interface PopulatedLeaderConversation {

        void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long zxid) throws Exception;

    }

    public interface FollowerConversation {

        void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception;

    }

    public interface ObserverConversation {

        void converseWithObserver(InputArchive ia, OutputArchive oa, Observer o) throws Exception;

    }

    public void testLeaderConversation(LeaderConversation conversation, File testData) throws Exception {
        Socket[] pair = getSocketPair();
        Socket leaderSocket = pair[0];
        Socket followerSocket = pair[1];
        File tmpDir = File.createTempFile("test", "dir", testData);
        tmpDir.delete();
        tmpDir.mkdir();
        LeadThread leadThread = null;
        Leader leader = null;
        try {
            QuorumPeer peer = createQuorumPeer(tmpDir);
            leader = createLeader(tmpDir, peer);
            peer.leader = leader;
            leadThread = new LeadThread(leader);
            leadThread.start();

            while (leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) {
                Thread.sleep(20);
            }

            LearnerHandler lh = new LearnerHandler(leaderSocket, new BufferedInputStream(leaderSocket.getInputStream()), leader);
            lh.start();
            leaderSocket.setSoTimeout(4000);

            InputArchive ia = BinaryInputArchive.getArchive(followerSocket.getInputStream());
            OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket.getOutputStream());

            conversation.converseWithLeader(ia, oa, leader);
        } finally {
            if (leader != null) {
                leader.shutdown("end of test");
            }
            if (leadThread != null) {
                leadThread.interrupt();
                leadThread.join();
            }
            TestUtils.deleteFileRecursively(tmpDir);
        }
    }

    public void testPopulatedLeaderConversation(PopulatedLeaderConversation conversation, int ops, File testData) throws Exception {
        Socket[] pair = getSocketPair();
        Socket leaderSocket = pair[0];
        Socket followerSocket = pair[1];
        File tmpDir = File.createTempFile("test", "dir", testData);
        tmpDir.delete();
        tmpDir.mkdir();
        LeadThread leadThread = null;
        Leader leader = null;
        try {
            // Setup a database with two znodes
            FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir);
            ZKDatabase zkDb = new ZKDatabase(snapLog);

            assertTrue(ops >= 1);
            long zxid = ZxidUtils.makeZxid(1, 0);
            for (int i = 1; i <= ops; i++) {
                zxid = ZxidUtils.makeZxid(1, i);
                String path = "/foo-" + i;
                zkDb.processTxn(new TxnHeader(13, 1000 + i, zxid, 30 + i, ZooDefs.OpCode.create),
                        new CreateTxn(path, "fpjwasalsohere".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
                Stat stat = new Stat();
                assertEquals("fpjwasalsohere", new String(zkDb.getData(path, stat, null)));
            }
            assertTrue(zxid > ZxidUtils.makeZxid(1, 0));

            // Generate snapshot and close files.
            snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), false);
            snapLog.close();

            QuorumPeer peer = createQuorumPeer(tmpDir);

            leader = createLeader(tmpDir, peer);
            peer.leader = leader;

            // Set the last accepted epoch and current epochs to be 1
            peer.setAcceptedEpoch(1);
            peer.setCurrentEpoch(1);

            leadThread = new LeadThread(leader);
            leadThread.start();

            while (leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) {
                Thread.sleep(20);
            }

            LearnerHandler lh = new LearnerHandler(leaderSocket, new BufferedInputStream(leaderSocket.getInputStream()), leader);
            lh.start();
            leaderSocket.setSoTimeout(4000);

            InputArchive ia = BinaryInputArchive.getArchive(followerSocket.getInputStream());
            OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket.getOutputStream());

            conversation.converseWithLeader(ia, oa, leader, zxid);
        } finally {
            if (leader != null) {
                leader.shutdown("end of test");
            }
            if (leadThread != null) {
                leadThread.interrupt();
                leadThread.join();
            }
            TestUtils.deleteFileRecursively(tmpDir);
        }
    }

    public void testFollowerConversation(FollowerConversation conversation, File testData) throws Exception {
        File tmpDir = File.createTempFile("test", "dir", testData);
        tmpDir.delete();
        tmpDir.mkdir();
        Thread followerThread = null;
        ConversableFollower follower = null;
        QuorumPeer peer = null;
        try {
            peer = createQuorumPeer(tmpDir);
            follower = createFollower(tmpDir, peer);
            peer.follower = follower;

            ServerSocket ss = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
            QuorumServer leaderQS = new QuorumServer(1, (InetSocketAddress) ss.getLocalSocketAddress());
            follower.setLeaderQuorumServer(leaderQS);
            final Follower followerForThread = follower;

            followerThread = new Thread() {
                public void run() {
                    try {
                        followerForThread.followLeader();
                    } catch (InterruptedException e) {
                        LOG.info("Follower thread interrupted", e);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception in follower thread", e);
                    }
                }
            };
            followerThread.start();
            Socket leaderSocket = ss.accept();

            InputArchive ia = BinaryInputArchive.getArchive(leaderSocket.getInputStream());
            OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket.getOutputStream());

            conversation.converseWithFollower(ia, oa, follower);
        } finally {
            if (follower != null) {
                follower.shutdown();
            }
            if (followerThread != null) {
                followerThread.interrupt();
                followerThread.join();
            }
            if (peer != null) {
                peer.shutdown();
            }
            TestUtils.deleteFileRecursively(tmpDir);
        }
    }

    public void testObserverConversation(ObserverConversation conversation, File testData) throws Exception {
        File tmpDir = File.createTempFile("test", "dir", testData);
        tmpDir.delete();
        tmpDir.mkdir();
        Thread observerThread = null;
        ConversableObserver observer = null;
        QuorumPeer peer = null;
        try {
            peer = createQuorumPeer(tmpDir);
            peer.setSyncEnabled(true);
            observer = createObserver(tmpDir, peer);
            peer.observer = observer;

            ServerSocket ss = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
            QuorumServer leaderQS = new QuorumServer(1, (InetSocketAddress) ss.getLocalSocketAddress());
            observer.setLeaderQuorumServer(leaderQS);
            final Observer observerForThread = observer;

            observerThread = new Thread() {
                public void run() {
                    try {
                        observerForThread.observeLeader();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            observerThread.start();
            Socket leaderSocket = ss.accept();

            InputArchive ia = BinaryInputArchive.getArchive(leaderSocket.getInputStream());
            OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket.getOutputStream());

            conversation.converseWithObserver(ia, oa, observer);
        } finally {
            if (observer != null) {
                observer.shutdown();
            }
            if (observerThread != null) {
                observerThread.interrupt();
                observerThread.join();
            }
            if (peer != null) {
                peer.shutdown();
            }
            TestUtils.deleteFileRecursively(tmpDir);
        }
    }

    @Test
    public void testUnnecessarySnap(@TempDir File testData) throws Exception {
        testPopulatedLeaderConversation(new PopulatedLeaderConversation() {
            @Override
            public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long zxid) throws Exception {

                assertEquals(1, l.self.getAcceptedEpoch());
                assertEquals(1, l.self.getCurrentEpoch());

                /* we test a normal run. everything should work out well. */
                LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
                byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
                QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1, liBytes, null);
                oa.writeRecord(qp, null);

                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.LEADERINFO, qp.getType());
                assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid());
                assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
                assertEquals(2, l.self.getAcceptedEpoch());
                assertEquals(1, l.self.getCurrentEpoch());

                byte[] epochBytes = new byte[4];
                final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
                wrappedEpochBytes.putInt(1);
                qp = new QuorumPacket(Leader.ACKEPOCH, zxid, epochBytes, null);
                oa.writeRecord(qp, null);

                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.DIFF, qp.getType());

            }
        }, 2, testData);
    }

    // We want to track the change with a callback rather than depending on timing
    class TrackerWatcher implements Watcher {

        boolean changed;
        synchronized void waitForChange() throws InterruptedException {
            while (!changed) {
                wait();
            }
        }
        @Override
        public void process(WatchedEvent event) {
            if (event.getType() == EventType.NodeDataChanged) {
                synchronized (this) {
                    changed = true;
                    notifyAll();
                }
            }
        }
        public synchronized boolean changed() {
            return changed;
        }

    }

    @Test
    public void testNormalFollowerRun(@TempDir File testData) throws Exception {
        testFollowerConversation(new FollowerConversation() {
            @Override
            public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception {
                File tmpDir = File.createTempFile("test", "dir", testData);
                tmpDir.delete();
                tmpDir.mkdir();
                File logDir = f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
                File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
                //Spy on ZK so we can check if a snapshot happened or not.
                f.zk = spy(f.zk);
                try {
                    assertEquals(0, f.self.getAcceptedEpoch());
                    assertEquals(0, f.self.getCurrentEpoch());

                    // Setup a database with a single /foo node
                    ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
                    final long firstZxid = ZxidUtils.makeZxid(1, 1);
                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
                    Stat stat = new Stat();
                    assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));

                    QuorumPacket qp = new QuorumPacket();
                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.FOLLOWERINFO, qp.getType());
                    assertEquals(qp.getZxid(), 0);
                    LearnerInfo learnInfo = new LearnerInfo();
                    ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
                    assertEquals(learnInfo.getProtocolVersion(), 0x10000);
                    assertEquals(learnInfo.getServerid(), 0);

                    // We are simulating an established leader, so the epoch is 1
                    qp.setType(Leader.LEADERINFO);
                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
                    byte[] protoBytes = new byte[4];
                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
                    qp.setData(protoBytes);
                    oa.writeRecord(qp, null);

                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACKEPOCH, qp.getType());
                    assertEquals(0, qp.getZxid());
                    assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
                    assertEquals(1, f.self.getAcceptedEpoch());
                    assertEquals(0, f.self.getCurrentEpoch());

                    // Send the snapshot we created earlier
                    qp.setType(Leader.SNAP);
                    qp.setData(new byte[0]);
                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
                    oa.writeRecord(qp, null);
                    zkDb.serializeSnapshot(oa);
                    oa.writeString("BenWasHere", null);
                    Thread.sleep(10); //Give it some time to process the snap
                    //No Snapshot taken yet, the SNAP was applied in memory
                    verify(f.zk, never()).takeSnapshot();

                    qp.setType(Leader.NEWLEADER);
                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
                    oa.writeRecord(qp, null);

                    // Get the ack of the new leader
                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACK, qp.getType());
                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                    assertEquals(1, f.self.getAcceptedEpoch());
                    assertEquals(1, f.self.getCurrentEpoch());
                    //Make sure that we did take the snapshot now
                    verify(f.zk).takeSnapshot(true);
                    assertEquals(firstZxid, f.fzk.getLastProcessedZxid());

                    // Make sure the data was recorded in the filesystem ok
                    ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
                    long lastZxid = zkDb2.loadDataBase();
                    assertEquals("data1", new String(zkDb2.getData("/foo", stat, null)));
                    assertEquals(firstZxid, lastZxid);

                    // Propose an update
                    long proposalZxid = ZxidUtils.makeZxid(1, 1000);
                    proposeSetData(qp, proposalZxid, "data2", 2);
                    oa.writeRecord(qp, null);

                    TrackerWatcher watcher = new TrackerWatcher();

                    // The change should not have happened yet, since we haven't committed
                    assertEquals("data1", new String(f.fzk.getZKDatabase().getData("/foo", stat, watcher)));

                    // The change should happen now
                    qp.setType(Leader.COMMIT);
                    qp.setZxid(proposalZxid);
                    oa.writeRecord(qp, null);

                    qp.setType(Leader.UPTODATE);
                    qp.setZxid(0);
                    oa.writeRecord(qp, null);

                    // Read the uptodate ack
                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACK, qp.getType());
                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());

                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACK, qp.getType());
                    assertEquals(proposalZxid, qp.getZxid());

                    watcher.waitForChange();
                    assertEquals("data2", new String(f.fzk.getZKDatabase().getData("/foo", stat, null)));

                    // check and make sure the change is persisted
                    zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
                    lastZxid = zkDb2.loadDataBase();
                    assertEquals("data2", new String(zkDb2.getData("/foo", stat, null)));
                    assertEquals(proposalZxid, lastZxid);
                } finally {
                    TestUtils.deleteFileRecursively(tmpDir);
                }

            }

            private void proposeSetData(QuorumPacket qp, long zxid, String data, int version) throws IOException {
                qp.setType(Leader.PROPOSAL);
                qp.setZxid(zxid);
                TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData);
                SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(), version);
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                OutputArchive boa = BinaryOutputArchive.getArchive(baos);
                boa.writeRecord(hdr, null);
                boa.writeRecord(sdt, null);
                qp.setData(baos.toByteArray());
            }
        }, testData);
    }

    @Test
    public void testNormalFollowerRunWithDiff(@TempDir File testData) throws Exception {
        testFollowerConversation(new FollowerConversation() {
            @Override
            public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception {
                File tmpDir = File.createTempFile("test", "dir", testData);
                tmpDir.delete();
                tmpDir.mkdir();
                File logDir = f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
                File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
                //Spy on ZK so we can check if a snapshot happened or not.
                f.zk = spy(f.zk);
                try {
                    assertEquals(0, f.self.getAcceptedEpoch());
                    assertEquals(0, f.self.getCurrentEpoch());

                    // Setup a database with a single /foo node
                    ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
                    final long firstZxid = ZxidUtils.makeZxid(1, 1);
                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
                    Stat stat = new Stat();
                    assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));

                    QuorumPacket qp = new QuorumPacket();
                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.FOLLOWERINFO, qp.getType());
                    assertEquals(qp.getZxid(), 0);
                    LearnerInfo learnInfo = new LearnerInfo();
                    ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
                    assertEquals(learnInfo.getProtocolVersion(), 0x10000);
                    assertEquals(learnInfo.getServerid(), 0);

                    // We are simulating an established leader, so the epoch is 1
                    qp.setType(Leader.LEADERINFO);
                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
                    byte[] protoBytes = new byte[4];
                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
                    qp.setData(protoBytes);
                    oa.writeRecord(qp, null);

                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACKEPOCH, qp.getType());
                    assertEquals(ZxidUtils.makeZxid(0, 0), qp.getZxid());
                    assertEquals(0, ByteBuffer.wrap(qp.getData()).getInt());
                    assertEquals(1, f.self.getAcceptedEpoch());
                    assertEquals(0, f.self.getCurrentEpoch());

                    // Send a diff
                    qp.setType(Leader.DIFF);
                    qp.setData(new byte[0]);
                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
                    oa.writeRecord(qp, null);
                    final long createSessionZxid = ZxidUtils.makeZxid(1, 2);
                    proposeNewSession(qp, createSessionZxid, 0x333);
                    oa.writeRecord(qp, null);
                    qp.setType(Leader.COMMIT);
                    qp.setZxid(createSessionZxid);
                    oa.writeRecord(qp, null);
                    qp.setType(Leader.NEWLEADER);
                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
                    qp.setData(null);
                    oa.writeRecord(qp, null);
                    qp.setType(Leader.UPTODATE);
                    qp.setZxid(0);
                    oa.writeRecord(qp, null);

                    // Get the ack of the new leader
                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACK, qp.getType());
                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                    assertEquals(1, f.self.getAcceptedEpoch());
                    assertEquals(1, f.self.getCurrentEpoch());
                    assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());

                    // Read the uptodate ack
                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACK, qp.getType());
                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());

                    // Make sure the data was recorded in the filesystem ok
                    ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
                    zkDb2.loadDataBase();
                    LOG.info("zkdb2 sessions:{}", zkDb2.getSessions());
                    LOG.info("zkdb2 with timeouts:{}", zkDb2.getSessionWithTimeOuts());
                    assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
                    //Snapshot was never taken during very simple sync
                    verify(f.zk, never()).takeSnapshot();
                } finally {
                    TestUtils.deleteFileRecursively(tmpDir);
                }

            }

            private void proposeNewSession(QuorumPacket qp, long zxid, long sessionId) throws IOException {
                qp.setType(Leader.PROPOSAL);
                qp.setZxid(zxid);
                TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.createSession);
                CreateSessionTxn cst = new CreateSessionTxn(30000);
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                OutputArchive boa = BinaryOutputArchive.getArchive(baos);
                boa.writeRecord(hdr, null);
                boa.writeRecord(cst, null);
                qp.setData(baos.toByteArray());
            }
        }, testData);
    }

    @Test
    public void testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File testData) throws Exception {
        testFollowerConversation(new FollowerConversation() {
            @Override
            public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception {
                File tmpDir = File.createTempFile("test", "dir", testData);
                tmpDir.delete();
                tmpDir.mkdir();
                File logDir = f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
                File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
                //Spy on ZK so we can check if a snapshot happened or not.
                f.zk = spy(f.zk);
                try {
                    assertEquals(0, f.self.getAcceptedEpoch());
                    assertEquals(0, f.self.getCurrentEpoch());

                    // Setup a database with a single /foo node
                    ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
                    final long firstZxid = ZxidUtils.makeZxid(1, 1);
                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
                    Stat stat = new Stat();
                    assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));

                    QuorumPacket qp = new QuorumPacket();
                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.FOLLOWERINFO, qp.getType());
                    assertEquals(qp.getZxid(), 0);
                    LearnerInfo learnInfo = new LearnerInfo();
                    ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
                    assertEquals(learnInfo.getProtocolVersion(), 0x10000);
                    assertEquals(learnInfo.getServerid(), 0);

                    // We are simulating an established leader, so the epoch is 1
                    qp.setType(Leader.LEADERINFO);
                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
                    byte[] protoBytes = new byte[4];
                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
                    qp.setData(protoBytes);
                    oa.writeRecord(qp, null);

                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACKEPOCH, qp.getType());
                    assertEquals(0, qp.getZxid());
                    assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
                    assertEquals(1, f.self.getAcceptedEpoch());
                    assertEquals(0, f.self.getCurrentEpoch());

                    // Send the snapshot we created earlier
                    qp.setType(Leader.SNAP);
                    qp.setData(new byte[0]);
                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
                    oa.writeRecord(qp, null);
                    zkDb.serializeSnapshot(oa);
                    oa.writeString("BenWasHere", null);
                    Thread.sleep(10); //Give it some time to process the snap
                    //No Snapshot taken yet, the SNAP was applied in memory
                    verify(f.zk, never()).takeSnapshot();

                    // Leader sends an outstanding proposal
                    long proposalZxid = ZxidUtils.makeZxid(1, 1001);
                    proposeSetData(qp, proposalZxid, "data2", 2);
                    oa.writeRecord(qp, null);

                    qp.setType(Leader.NEWLEADER);
                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
                    qp.setData(null);
                    oa.writeRecord(qp, null);

                    // Get the ack of the new leader
                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACK, qp.getType());
                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                    assertEquals(1, f.self.getAcceptedEpoch());
                    assertEquals(1, f.self.getCurrentEpoch());
                    //Make sure that we did take the snapshot now
                    verify(f.zk).takeSnapshot(true);
                    assertEquals(firstZxid, f.fzk.getLastProcessedZxid());

                    // The outstanding proposal has not been persisted yet
                    ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
                    long lastZxid = zkDb2.loadDataBase();
                    assertEquals("data1", new String(zkDb2.getData("/foo", stat, null)));
                    assertEquals(firstZxid, lastZxid);

                    TrackerWatcher watcher = new TrackerWatcher();

                    // The change should not have happened yet
                    assertEquals("data1", new String(f.fzk.getZKDatabase().getData("/foo", stat, watcher)));

                    // Leader commits proposalZxid right after it sends NEWLEADER to follower
                    qp.setType(Leader.COMMIT);
                    qp.setZxid(proposalZxid);
                    qp.setData(null);
                    oa.writeRecord(qp, null);

                    qp.setType(Leader.UPTODATE);
                    qp.setZxid(0);
                    qp.setData(null);
                    oa.writeRecord(qp, null);

                    // Read the uptodate ack
                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACK, qp.getType());
                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());

                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACK, qp.getType());
                    assertEquals(proposalZxid, qp.getZxid());

                    // The change should happen now
                    watcher.waitForChange();
                    assertEquals("data2", new String(f.fzk.getZKDatabase().getData("/foo", stat, null)));

                    // check and make sure the change is persisted
                    zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
                    lastZxid = zkDb2.loadDataBase();
                    assertEquals("data2", new String(zkDb2.getData("/foo", stat, null)));
                    assertEquals(proposalZxid, lastZxid);
                } finally {
                    TestUtils.deleteFileRecursively(tmpDir);
                }
            }

            private void proposeSetData(QuorumPacket qp, long zxid, String data, int version) throws IOException {
                qp.setType(Leader.PROPOSAL);
                qp.setZxid(zxid);
                TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData);
                SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(), version);
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                OutputArchive boa = BinaryOutputArchive.getArchive(baos);
                boa.writeRecord(hdr, null);
                boa.writeRecord(sdt, null);
                qp.setData(baos.toByteArray());
            }
        }, testData);
    }

    @Test
    public void testNormalRun(@TempDir File testData) throws Exception {
        testLeaderConversation(new LeaderConversation() {
            public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException {
                assertEquals(0, l.self.getAcceptedEpoch());
                assertEquals(0, l.self.getCurrentEpoch());

                /* we test a normal run. everything should work out well. */
                LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
                byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
                QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null);
                oa.writeRecord(qp, null);

                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.LEADERINFO, qp.getType());
                assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
                assertEquals(1, l.self.getAcceptedEpoch());
                assertEquals(0, l.self.getCurrentEpoch());

                qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
                oa.writeRecord(qp, null);

                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.DIFF, qp.getType());

                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.NEWLEADER, qp.getType());
                assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                assertEquals(1, l.self.getAcceptedEpoch());
                assertCurrentEpochGotUpdated(1, l.self, ClientBase.CONNECTION_TIMEOUT);

                qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
                oa.writeRecord(qp, null);

                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.UPTODATE, qp.getType());
            }
        }, testData);
    }

    @Test
    public void testTxnTimeout(@TempDir File testData) throws Exception {
        testLeaderConversation(new LeaderConversation() {
            public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException, org.apache.zookeeper.server.quorum.Leader.XidRolloverException {
                assertEquals(0, l.self.getAcceptedEpoch());
                assertEquals(0, l.self.getCurrentEpoch());

                LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
                byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
                QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null);
                oa.writeRecord(qp, null);

                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.LEADERINFO, qp.getType());
                assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
                assertEquals(1, l.self.getAcceptedEpoch());
                assertEquals(0, l.self.getCurrentEpoch());

                qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
                oa.writeRecord(qp, null);

                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.DIFF, qp.getType());

                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.NEWLEADER, qp.getType());
                assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                assertEquals(1, l.self.getAcceptedEpoch());
                assertCurrentEpochGotUpdated(1, l.self, ClientBase.CONNECTION_TIMEOUT);

                qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
                oa.writeRecord(qp, null);

                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.UPTODATE, qp.getType());

                long zxid = l.zk.getZxid();
                l.propose(new Request(1, 1, ZooDefs.OpCode.create, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.create), new CreateTxn("/test", "hola".getBytes(), null, true, 0), zxid));

                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.PROPOSAL, qp.getType());

                LOG.info("Proposal sent.");

                for (int i = 0; i < (2 * ZabUtils.SYNC_LIMIT) + 2; i++) {
                    try {
                        ia.readRecord(qp, null);
                        LOG.info("Ping received: {}", i);
                        qp = new QuorumPacket(Leader.PING, qp.getZxid(), "".getBytes(), null);
                        oa.writeRecord(qp, null);
                    } catch (EOFException e) {
                        return;
                    }
                }
                fail("Connection hasn't been closed by leader after transaction times out.");
            }
        }, testData);
    }

    private void deserializeSnapshot(InputArchive ia) throws IOException {
        ZKDatabase zkdb = new ZKDatabase(null);
        zkdb.deserializeSnapshot(ia);
        String signature = ia.readString("signature");
        assertEquals("BenWasHere", signature);
    }

    @Test
    public void testNormalObserverRun(@TempDir File testData) throws Exception {
        testObserverConversation(new ObserverConversation() {
            @Override
            public void converseWithObserver(InputArchive ia, OutputArchive oa, Observer o) throws Exception {
                File tmpDir = File.createTempFile("test", "dir", testData);
                tmpDir.delete();
                tmpDir.mkdir();
                File logDir = o.zk.getTxnLogFactory().getDataLogDir().getParentFile();
                File snapDir = o.zk.getTxnLogFactory().getSnapDir().getParentFile();
                try {
                    assertEquals(0, o.self.getAcceptedEpoch());
                    assertEquals(0, o.self.getCurrentEpoch());

                    // Setup a database with a single /foo node
                    ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
                    final long foo1Zxid = ZxidUtils.makeZxid(1, 1);
                    final long foo2Zxid = ZxidUtils.makeZxid(1, 2);
                    zkDb.processTxn(new TxnHeader(13, 1313, foo1Zxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
                    zkDb.processTxn(new TxnHeader(13, 1313, foo2Zxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo2", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
                    Stat stat = new Stat();
                    assertEquals("data1", new String(zkDb.getData("/foo1", stat, null)));
                    assertEquals("data1", new String(zkDb.getData("/foo2", stat, null)));

                    QuorumPacket qp = new QuorumPacket();
                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.OBSERVERINFO, qp.getType());
                    assertEquals(qp.getZxid(), 0);
                    LearnerInfo learnInfo = new LearnerInfo();
                    ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
                    assertEquals(learnInfo.getProtocolVersion(), 0x10000);
                    assertEquals(learnInfo.getServerid(), 0);

                    // We are simulating an established leader, so the epoch is 1
                    qp.setType(Leader.LEADERINFO);
                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
                    byte[] protoBytes = new byte[4];
                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
                    qp.setData(protoBytes);
                    oa.writeRecord(qp, null);

                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACKEPOCH, qp.getType());
                    assertEquals(0, qp.getZxid());
                    assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
                    assertEquals(1, o.self.getAcceptedEpoch());
                    assertEquals(0, o.self.getCurrentEpoch());

                    // Send the snapshot we created earlier
                    qp.setType(Leader.SNAP);
                    qp.setData(new byte[0]);
                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
                    oa.writeRecord(qp, null);
                    zkDb.serializeSnapshot(oa);
                    oa.writeString("BenWasHere", null);
                    qp.setType(Leader.NEWLEADER);
                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
                    oa.writeRecord(qp, null);

                    // Get the ack of the new leader
                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACK, qp.getType());
                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                    assertEquals(1, o.self.getAcceptedEpoch());
                    assertEquals(1, o.self.getCurrentEpoch());

                    assertEquals(foo2Zxid, o.zk.getLastProcessedZxid());

                    // Make sure the data was recorded in the filesystem ok
                    ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
                    long lastZxid = zkDb2.loadDataBase();
                    assertEquals("data1", new String(zkDb2.getData("/foo1", stat, null)));
                    assertEquals(foo2Zxid, lastZxid);

                    // Register watch
                    TrackerWatcher watcher = new TrackerWatcher();
                    assertEquals("data1", new String(o.zk.getZKDatabase().getData("/foo2", stat, watcher)));

                    // Propose /foo1 update
                    long proposalZxid = ZxidUtils.makeZxid(1, 1000);
                    proposeSetData(qp, "/foo1", proposalZxid, "data2", 2);
                    oa.writeRecord(qp, null);

                    // Commit /foo1 update
                    qp.setType(Leader.COMMIT);
                    qp.setZxid(proposalZxid);
                    oa.writeRecord(qp, null);

                    // Inform /foo2 update
                    long informZxid = ZxidUtils.makeZxid(1, 1001);
                    proposeSetData(qp, "/foo2", informZxid, "data2", 2);
                    qp.setType(Leader.INFORM);
                    oa.writeRecord(qp, null);

                    qp.setType(Leader.UPTODATE);
                    qp.setZxid(0);
                    oa.writeRecord(qp, null);

                    // Read the uptodate ack
                    readPacketSkippingPing(ia, qp);
                    assertEquals(Leader.ACK, qp.getType());
                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());

                    // Data should get updated
                    watcher.waitForChange();
                    assertEquals("data2", new String(o.zk.getZKDatabase().getData("/foo1", stat, null)));
                    assertEquals("data2", new String(o.zk.getZKDatabase().getData("/foo2", stat, null)));

                    // Shutdown sequence guarantee that all pending requests
                    // in sync request processor get flush to disk
                    o.zk.shutdown();

                    zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
                    lastZxid = zkDb2.loadDataBase();
                    assertEquals("data2", new String(zkDb2.getData("/foo1", stat, null)));
                    assertEquals("data2", new String(zkDb2.getData("/foo2", stat, null)));
                    assertEquals(informZxid, lastZxid);
                } finally {
                    TestUtils.deleteFileRecursively(tmpDir);
                }

            }

            private void proposeSetData(QuorumPacket qp, String path, long zxid, String data, int version) throws IOException {
                qp.setType(Leader.PROPOSAL);
                qp.setZxid(zxid);
                TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData);
                SetDataTxn sdt = new SetDataTxn(path, data.getBytes(), version);
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                OutputArchive boa = BinaryOutputArchive.getArchive(baos);
                boa.writeRecord(hdr, null);
                boa.writeRecord(sdt, null);
                qp.setData(baos.toByteArray());
            }
        }, testData);
    }

    @Test
    public void testLeaderBehind(@TempDir File testData) throws Exception {
        testLeaderConversation(new LeaderConversation() {
            public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException {
                /* we test a normal run. everything should work out well. */
                LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
                byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
                /* we are going to say we last acked epoch 20 */
                QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, ZxidUtils.makeZxid(20, 0), liBytes, null);
                oa.writeRecord(qp, null);
                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.LEADERINFO, qp.getType());
                assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());
                assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
                qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
                oa.writeRecord(qp, null);
                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.DIFF, qp.getType());
                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.NEWLEADER, qp.getType());
                assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());

                qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
                oa.writeRecord(qp, null);

                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.UPTODATE, qp.getType());
            }
        }, testData);
    }

    /**
     * Tests that when a quorum of followers send LearnerInfo but do not ack the epoch (which is sent
     * by the leader upon receipt of LearnerInfo from a quorum), the leader does not start using this epoch
     * as it would in the normal case (when a quorum do ack the epoch). This tests ZK-1192
     * @throws Exception
     */
    @Test
    public void testAbandonBeforeACKEpoch(@TempDir File testData) throws Exception {
        testLeaderConversation(new LeaderConversation() {
            public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException {
                /* we test a normal run. everything should work out well. */
                LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
                byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
                QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null);
                oa.writeRecord(qp, null);
                readPacketSkippingPing(ia, qp);
                assertEquals(Leader.LEADERINFO, qp.getType());
                assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
                Thread.sleep(l.self.getInitLimit() * l.self.getTickTime() + 5000);

                // The leader didn't get a quorum of acks - make sure that leader's current epoch is not advanced
                assertEquals(0, l.self.getCurrentEpoch());
            }
        }, testData);
    }

    static class ConversableFollower extends Follower {

        ConversableFollower(QuorumPeer self, FollowerZooKeeperServer zk) {
            super(self, zk);
        }

        QuorumServer leaderQuorumServer;
        public void setLeaderQuorumServer(QuorumServer quorumServer) {
            leaderQuorumServer = quorumServer;
        }

        @Override
        protected QuorumServer findLeader() {
            return leaderQuorumServer;
        }

    }
    private ConversableFollower createFollower(File tmpDir, QuorumPeer peer) throws IOException {
        FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
        peer.setTxnFactory(logFactory);
        ZKDatabase zkDb = new ZKDatabase(logFactory);
        FollowerZooKeeperServer zk = new FollowerZooKeeperServer(logFactory, peer, zkDb);
        peer.setZKDatabase(zkDb);
        return new ConversableFollower(peer, zk);
    }

    static class ConversableObserver extends Observer {

        ConversableObserver(QuorumPeer self, ObserverZooKeeperServer zk) {
            super(self, zk);
        }

        QuorumServer leaderQuorumServer;
        public void setLeaderQuorumServer(QuorumServer quorumServer) {
            leaderQuorumServer = quorumServer;
        }

        @Override
        protected QuorumServer findLeader() {
            return leaderQuorumServer;
        }

    }

    private ConversableObserver createObserver(File tmpDir, QuorumPeer peer) throws IOException {
        FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
        peer.setTxnFactory(logFactory);
        ZKDatabase zkDb = new ZKDatabase(logFactory);
        ObserverZooKeeperServer zk = new ObserverZooKeeperServer(logFactory, peer, zkDb);
        peer.setZKDatabase(zkDb);
        return new ConversableObserver(peer, zk);
    }

    private String readContentsOfFile(File f) throws IOException {
        return new BufferedReader(new FileReader(f)).readLine();
    }

    @Test
    public void testInitialAcceptedCurrent(@TempDir File testData) throws Exception {
        File tmpDir = File.createTempFile("test", ".dir", testData);
        tmpDir.delete();
        tmpDir.mkdir();
        try {
            FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
            File version2 = new File(tmpDir, "version-2");
            version2.mkdir();
            logFactory.save(new DataTree(), new ConcurrentHashMap<>(), false);
            long zxid = ZxidUtils.makeZxid(3, 3);
            logFactory.append(new Request(1, 1, ZooDefs.OpCode.error, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error), new ErrorTxn(1), zxid));
            logFactory.commit();
            ZKDatabase zkDb = new ZKDatabase(logFactory);
            QuorumPeer peer = QuorumPeer.testingQuorumPeer();
            peer.setZKDatabase(zkDb);
            peer.setTxnFactory(logFactory);
            peer.getLastLoggedZxid();
            assertEquals(3, peer.getAcceptedEpoch());
            assertEquals(3, peer.getCurrentEpoch());
            assertEquals(3, Integer.parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME))));
            assertEquals(3, Integer.parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME))));
        } finally {
            TestUtils.deleteFileRecursively(tmpDir);
        }
    }

    /*
     * Epoch is first written to file then updated in memory. Give some time to
     * write the epoch in file and then go for assert.
     */
    private void assertCurrentEpochGotUpdated(int expected, QuorumPeer self, long timeout)
            throws IOException {
        long elapsedTime = 0;
        long waitInterval = 10;
        while (self.getCurrentEpoch() != expected && elapsedTime < timeout) {
            try {
                Thread.sleep(waitInterval);
            } catch (InterruptedException e) {
                fail("CurrentEpoch update failed");
            }
            elapsedTime = elapsedTime + waitInterval;
        }
        assertEquals(expected, self.getCurrentEpoch(), "CurrentEpoch update failed");
    }
}