LearnerHandlerTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.server.TxnLogProposalIterator;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LearnerHandlerTest extends ZKTestCase {

    protected static final Logger LOG = LoggerFactory.getLogger(LearnerHandlerTest.class);

    class MockLearnerHandler extends LearnerHandler {

        boolean threadStarted = false;

        MockLearnerHandler(Socket sock, Leader leader) throws IOException {
            super(sock, new BufferedInputStream(sock.getInputStream()), leader);
        }

        protected void startSendingPackets() {
            threadStarted = true;
        }

        @Override
        protected boolean shouldSendMarkerPacketForLogging() {
            return false;
        }

    }

    class MockZKDatabase extends ZKDatabase {

        long lastProcessedZxid;
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        LinkedList<Proposal> committedLog = new LinkedList<>();
        LinkedList<Proposal> txnLog = new LinkedList<>();

        public MockZKDatabase(FileTxnSnapLog snapLog) {
            super(snapLog);
        }

        public long getDataTreeLastProcessedZxid() {
            return lastProcessedZxid;
        }

        public long getmaxCommittedLog() {
            if (!committedLog.isEmpty()) {
                return committedLog.getLast().getZxid();
            }
            return 0;
        }

        public long getminCommittedLog() {
            if (!committedLog.isEmpty()) {
                return committedLog.getFirst().getZxid();
            }
            return 0;
        }

        public List<Proposal> getCommittedLog() {
            return committedLog;
        }

        public ReentrantReadWriteLock getLogLock() {
            return lock;
        }

        public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid, long limit) {
            if (peerZxid >= txnLog.peekFirst().getZxid()) {
                return txnLog.iterator();
            } else {
                return Collections.emptyIterator();
            }

        }

        public long calculateTxnLogSizeLimit() {
            return 1;
        }

    }

    private MockLearnerHandler learnerHandler;
    private Socket sock;

    // Member variables for mocking Leader
    private Leader leader;
    private long currentZxid;

    // Member variables for mocking ZkDatabase
    private MockZKDatabase db;

    @BeforeEach
    public void setUp() throws Exception {
        db = new MockZKDatabase(null);
        sock = mock(Socket.class);

        // Intercept when startForwarding is called
        leader = mock(Leader.class);
        when(leader.startForwarding(ArgumentMatchers.any(LearnerHandler.class), ArgumentMatchers.anyLong())).thenAnswer(new Answer<Long>() {
            public Long answer(InvocationOnMock invocation) {
                currentZxid = invocation.getArgument(1);
                return 0L;
            }
        });
        when(leader.getZKDatabase()).thenReturn(db);

        learnerHandler = new MockLearnerHandler(sock, leader);
    }

    Proposal createProposal(long zxid) {
        QuorumPacket packet = new QuorumPacket();
        packet.setZxid(zxid);
        packet.setType(Leader.PROPOSAL);
        Proposal p = new Proposal(packet);
        return p;
    }

    /**
     * Validate that queued packets contains proposal in the following orders as
     * a given array of zxids
     *
     * @param zxids
     */
    public void queuedPacketMatches(long[] zxids) {
        int index = 0;
        for (QuorumPacket qp : learnerHandler.getQueuedPackets()) {
            if (qp.getType() == Leader.PROPOSAL) {
                assertZxidEquals(zxids[index++], qp.getZxid());
            }
        }
    }

    void reset() {
        learnerHandler.getQueuedPackets().clear();
        learnerHandler.threadStarted = false;
        learnerHandler.setFirstPacket(true);
    }

    /**
     * Check if op packet (first packet in the queue) match the expected value
     * @param type - type of packet
     * @param zxid - zxid in the op packet
     * @param currentZxid - last packet queued by syncFollower,
     *                      before invoking startForwarding()
     */
    public void assertOpType(int type, long zxid, long currentZxid) {
        Queue<QuorumPacket> packets = learnerHandler.getQueuedPackets();
        assertTrue(packets.size() > 0);
        assertEquals(type, packets.peek().getType());
        assertZxidEquals(zxid, packets.peek().getZxid());
        assertZxidEquals(currentZxid, this.currentZxid);
    }

    void assertZxidEquals(long expected, long value) {
        assertEquals(expected, value, "Expected 0x" + Long.toHexString(expected) + " but was 0x" + Long.toHexString(value));
    }

    /**
     * Test cases when leader has empty committedLog
     */
    @Test
    public void testEmptyCommittedLog() throws Exception {
        long peerZxid;

        // Peer has newer zxid
        peerZxid = 3;
        db.lastProcessedZxid = 1;
        db.committedLog.clear();
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send TRUNC and forward any packet starting lastProcessedZxid
        assertOpType(Leader.TRUNC, db.lastProcessedZxid, db.lastProcessedZxid);
        reset();

        // Peer is already sync
        peerZxid = 1;
        db.lastProcessedZxid = 1;
        db.committedLog.clear();
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send DIFF and forward any packet starting lastProcessedZxid
        assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid);
        assertEquals(1, learnerHandler.getQueuedPackets().size());
        reset();

        // Peer has 0 zxid (new machine turn up), txnlog
        // is disabled
        peerZxid = 0;
        db.setSnapshotSizeFactor(-1);
        db.lastProcessedZxid = 1;
        db.committedLog.clear();
        // We send SNAP
        assertTrue(learnerHandler.syncFollower(peerZxid, leader));
        assertEquals(0, learnerHandler.getQueuedPackets().size());
        reset();

    }

    /**
     * Test cases when leader has committedLog
     */
    @Test
    public void testCommittedLog() throws Exception {
        long peerZxid;

        // Commit proposal may lag behind data tree, but it shouldn't affect
        // us in any case
        db.lastProcessedZxid = 6;
        db.committedLog.add(createProposal(2));
        db.committedLog.add(createProposal(3));
        db.committedLog.add(createProposal(5));

        // Peer has zxid that we have never seen
        peerZxid = 4;
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send TRUNC to 3 and forward any packet starting 5
        assertOpType(Leader.TRUNC, 3, 5);
        // DIFF + 1 proposals + 1 commit
        assertEquals(3, learnerHandler.getQueuedPackets().size());
        queuedPacketMatches(new long[]{5});
        reset();

        // Peer is within committedLog range
        peerZxid = 2;
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send DIFF and forward any packet starting lastProcessedZxid
        assertOpType(Leader.DIFF, db.getmaxCommittedLog(), db.getmaxCommittedLog());
        // DIFF + 2 proposals + 2 commit
        assertEquals(5, learnerHandler.getQueuedPackets().size());
        queuedPacketMatches(new long[]{3, 5});
        reset();

        // Peer miss the committedLog and txnlog is disabled
        peerZxid = 1;
        db.setSnapshotSizeFactor(-1);
        // We send SNAP
        assertTrue(learnerHandler.syncFollower(peerZxid, leader));
        assertEquals(0, learnerHandler.getQueuedPackets().size());
        reset();
    }

    /**
     * Test cases when txnlog is enabled
     */
    @Test
    public void testTxnLog() throws Exception {
        long peerZxid;
        db.txnLog.add(createProposal(2));
        db.txnLog.add(createProposal(3));
        db.txnLog.add(createProposal(5));
        db.txnLog.add(createProposal(6));
        db.txnLog.add(createProposal(7));
        db.txnLog.add(createProposal(8));
        db.txnLog.add(createProposal(9));

        db.lastProcessedZxid = 9;
        db.committedLog.add(createProposal(6));
        db.committedLog.add(createProposal(7));
        db.committedLog.add(createProposal(8));

        // Peer has zxid that we have never seen
        peerZxid = 4;
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send TRUNC to 3 and forward any packet starting at maxCommittedLog
        assertOpType(Leader.TRUNC, 3, db.getmaxCommittedLog());
        // DIFF + 4 proposals + 4 commit
        assertEquals(9, learnerHandler.getQueuedPackets().size());
        queuedPacketMatches(new long[]{5, 6, 7, 8});
        reset();

        // Peer zxid is in txnlog range
        peerZxid = 3;
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send DIFF and forward any packet starting at maxCommittedLog
        assertOpType(Leader.DIFF, db.getmaxCommittedLog(), db.getmaxCommittedLog());
        // DIFF + 4 proposals + 4 commit
        assertEquals(9, learnerHandler.getQueuedPackets().size());
        queuedPacketMatches(new long[]{5, 6, 7, 8});
        reset();

    }

    /**
     * Test case verifying TxnLogProposalIterator closure.
     */
    @Test
    public void testTxnLogProposalIteratorClosure() throws Exception {
        long peerZxid;

        // CommittedLog is empty, we will use txnlog up to lastProcessZxid
        db = new MockZKDatabase(null) {
            @Override
            public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid, long limit) {
                return TxnLogProposalIterator.EMPTY_ITERATOR;
            }
        };
        db.lastProcessedZxid = 7;
        db.txnLog.add(createProposal(2));
        db.txnLog.add(createProposal(3));
        when(leader.getZKDatabase()).thenReturn(db);

        // Peer zxid
        peerZxid = 4;
        assertTrue(learnerHandler.syncFollower(peerZxid, leader), "Couldn't identify snapshot transfer!");
        reset();
    }

    /**
     * Test cases when txnlog is enabled and committedLog is empty
     */
    @Test
    public void testTxnLogOnly() throws Exception {
        long peerZxid;

        // CommittedLog is empty, we will use txnlog up to lastProcessZxid
        db.lastProcessedZxid = 7;
        db.txnLog.add(createProposal(2));
        db.txnLog.add(createProposal(3));
        db.txnLog.add(createProposal(5));
        db.txnLog.add(createProposal(6));
        db.txnLog.add(createProposal(7));
        db.txnLog.add(createProposal(8));

        // Peer has zxid that we have never seen
        peerZxid = 4;
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send TRUNC to 3 and forward any packet starting at
        // lastProcessedZxid
        assertOpType(Leader.TRUNC, 3, db.lastProcessedZxid);
        // DIFF + 3 proposals + 3 commit
        assertEquals(7, learnerHandler.getQueuedPackets().size());
        queuedPacketMatches(new long[]{5, 6, 7});
        reset();

        // Peer has zxid in txnlog range
        peerZxid = 2;
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send DIFF and forward any packet starting at lastProcessedZxid
        assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid);
        // DIFF + 4 proposals + 4 commit
        assertEquals(9, learnerHandler.getQueuedPackets().size());
        queuedPacketMatches(new long[]{3, 5, 6, 7});
        reset();

        // Peer miss the txnlog
        peerZxid = 1;
        assertTrue(learnerHandler.syncFollower(peerZxid, leader));
        // We send snap
        assertEquals(0, learnerHandler.getQueuedPackets().size());
        reset();
    }

    long getZxid(long epoch, long counter) {
        return ZxidUtils.makeZxid(epoch, counter);
    }

    /**
     * Test cases with zxids that are negative long
     */
    @Test
    public void testTxnLogWithNegativeZxid() throws Exception {
        long peerZxid;
        db.txnLog.add(createProposal(getZxid(0xf, 2)));
        db.txnLog.add(createProposal(getZxid(0xf, 3)));
        db.txnLog.add(createProposal(getZxid(0xf, 5)));
        db.txnLog.add(createProposal(getZxid(0xf, 6)));
        db.txnLog.add(createProposal(getZxid(0xf, 7)));
        db.txnLog.add(createProposal(getZxid(0xf, 8)));
        db.txnLog.add(createProposal(getZxid(0xf, 9)));

        db.lastProcessedZxid = getZxid(0xf, 9);
        db.committedLog.add(createProposal(getZxid(0xf, 6)));
        db.committedLog.add(createProposal(getZxid(0xf, 7)));
        db.committedLog.add(createProposal(getZxid(0xf, 8)));

        // Peer has zxid that we have never seen
        peerZxid = getZxid(0xf, 4);
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send TRUNC to 3 and forward any packet starting at maxCommittedLog
        assertOpType(Leader.TRUNC, getZxid(0xf, 3), db.getmaxCommittedLog());
        // DIFF + 4 proposals + 4 commit
        assertEquals(9, learnerHandler.getQueuedPackets().size());
        queuedPacketMatches(new long[]{getZxid(0xf, 5), getZxid(0xf, 6), getZxid(0xf, 7), getZxid(0xf, 8)});
        reset();

        // Peer zxid is in txnlog range
        peerZxid = getZxid(0xf, 3);
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send DIFF and forward any packet starting at maxCommittedLog
        assertOpType(Leader.DIFF, db.getmaxCommittedLog(), db.getmaxCommittedLog());
        // DIFF + 4 proposals + 4 commit
        assertEquals(9, learnerHandler.getQueuedPackets().size());
        queuedPacketMatches(new long[]{getZxid(0xf, 5), getZxid(0xf, 6), getZxid(0xf, 7), getZxid(0xf, 8)});
        reset();
    }

    /**
     * Test cases when peer has new-epoch zxid
     */
    @Test
    public void testNewEpochZxid() throws Exception {
        long peerZxid;
        db.txnLog.add(createProposal(getZxid(0, 1)));
        db.txnLog.add(createProposal(getZxid(1, 1)));
        db.txnLog.add(createProposal(getZxid(1, 2)));

        // After leader election, lastProcessedZxid will point to new epoch
        db.lastProcessedZxid = getZxid(2, 0);
        db.committedLog.add(createProposal(getZxid(1, 1)));
        db.committedLog.add(createProposal(getZxid(1, 2)));

        // Peer has zxid of epoch 0
        peerZxid = getZxid(0, 0);
        // We should get snap, we can do better here, but the main logic is
        // that we should never send diff if we have never seen any txn older
        // than peer zxid
        assertTrue(learnerHandler.syncFollower(peerZxid, leader));
        assertEquals(0, learnerHandler.getQueuedPackets().size());
        reset();

        // Peer has zxid of epoch 1
        peerZxid = getZxid(1, 0);
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send DIFF to (1, 2) and forward any packet starting at (1, 2)
        assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2));
        // DIFF + 2 proposals + 2 commit
        assertEquals(5, learnerHandler.getQueuedPackets().size());
        queuedPacketMatches(new long[]{getZxid(1, 1), getZxid(1, 2)});
        reset();

        // Peer has zxid of epoch 2, so it is already sync
        peerZxid = getZxid(2, 0);
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send DIFF to (2, 0) and forward any packet starting at (2, 0)
        assertOpType(Leader.DIFF, getZxid(2, 0), getZxid(2, 0));
        // DIFF only
        assertEquals(1, learnerHandler.getQueuedPackets().size());
        reset();

    }

    /**
     * Test cases when there is a duplicate txn in the committedLog. This
     * should never happen unless there is a bug in initialization code
     * but the learner should never see duplicate packets
     */
    @Test
    public void testDuplicatedTxn() throws Exception {
        long peerZxid;
        db.txnLog.add(createProposal(getZxid(0, 1)));
        db.txnLog.add(createProposal(getZxid(1, 1)));
        db.txnLog.add(createProposal(getZxid(1, 2)));
        db.txnLog.add(createProposal(getZxid(1, 1)));
        db.txnLog.add(createProposal(getZxid(1, 2)));

        // After leader election, lastProcessedZxid will point to new epoch
        db.lastProcessedZxid = getZxid(2, 0);
        db.committedLog.add(createProposal(getZxid(1, 1)));
        db.committedLog.add(createProposal(getZxid(1, 2)));
        db.committedLog.add(createProposal(getZxid(1, 1)));
        db.committedLog.add(createProposal(getZxid(1, 2)));

        // Peer has zxid of epoch 1
        peerZxid = getZxid(1, 0);
        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
        // We send DIFF to (1, 2) and forward any packet starting at (1, 2)
        assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2));
        // DIFF + 2 proposals + 2 commit
        assertEquals(5, learnerHandler.getQueuedPackets().size());
        queuedPacketMatches(new long[]{getZxid(1, 1), getZxid(1, 2)});
        reset();

    }

    /**
     * Test cases when we have to TRUNC learner, but it may cross epoch boundary
     * so we need to send snap instead
     */
    @Test
    public void testCrossEpochTrunc() throws Exception {
        long peerZxid;
        db.txnLog.add(createProposal(getZxid(1, 1)));
        db.txnLog.add(createProposal(getZxid(2, 1)));
        db.txnLog.add(createProposal(getZxid(2, 2)));
        db.txnLog.add(createProposal(getZxid(4, 1)));

        // After leader election, lastProcessedZxid will point to new epoch
        db.lastProcessedZxid = getZxid(6, 0);

        // Peer has zxid (3, 1)
        peerZxid = getZxid(3, 1);
        assertTrue(learnerHandler.syncFollower(peerZxid, leader));
        assertEquals(0, learnerHandler.getQueuedPackets().size());
        reset();
    }

    /**
     * Test cases when the leader's disk is slow. There can be a gap
     * between the txnLog and the committedLog. Make sure we detect this
     * and send a snap instead of a diff.
     */
    @Test
    public void testTxnLogGap() throws Exception {
        long peerZxid;
        db.txnLog.add(createProposal(2));
        db.txnLog.add(createProposal(3));
        db.txnLog.add(createProposal(4));

        db.lastProcessedZxid = 8;
        db.committedLog.add(createProposal(7));
        db.committedLog.add(createProposal(8));

        // Peer zxid is in txnlog range
        peerZxid = 3;
        assertTrue(learnerHandler.syncFollower(peerZxid, leader));
        reset();
    }

}