FLEMalformedNotificationMessageTest.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FLEMalformedNotificationMessageTest extends ZKTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(FLEMalformedNotificationMessageTest.class);
    private static final byte[] CONFIG_BYTES = "my very invalid config string".getBytes();
    private static final int CONFIG_BYTES_LENGTH = CONFIG_BYTES.length;

    int count;
    HashMap<Long, QuorumServer> peers;
    File tmpdir[];
    int port[];

    QuorumCnxManager mockCnxManager;
    FLETestUtils.LEThread leaderElectionThread;
    QuorumPeer peerRunningLeaderElection;


    @BeforeEach
    public void setUp() throws Exception {
        count = 3;

        peers = new HashMap<>(count);
        tmpdir = new File[count];
        port = new int[count];

        LOG.info("FLEMalformedNotificationMessageTest: {}, {}", getTestName(), count);
        for (int i = 0; i < count; i++) {
            int clientport = PortAssignment.unique();
            peers.put((long) i,
                      new QuorumServer(i,
                                       new InetSocketAddress(clientport),
                                       new InetSocketAddress(PortAssignment.unique())));
            tmpdir[i] = ClientBase.createTmpDir();
            port[i] = clientport;
        }

        /*
         * Start server 0
         */
        peerRunningLeaderElection = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2, 2);
        peerRunningLeaderElection.startLeaderElection();
        leaderElectionThread = new FLETestUtils.LEThread(peerRunningLeaderElection, 0);
        leaderElectionThread.start();
    }


    @AfterEach
    public void tearDown() throws Exception {
        peerRunningLeaderElection.shutdown();
        mockCnxManager.halt();
    }


    @Test
    public void testTooShortPartialNotificationMessage() throws Exception {

        /*
         * Start mock server 1, send a message too short to be compatible with any protocol version
         * This simulates the case when only some parts of the whole message is received.
         */
        startMockServer(1);
        byte requestBytes[] = new byte[12];
        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
        requestBuffer.clear();
        requestBuffer.putInt(ServerState.LOOKING.ordinal());   // state
        requestBuffer.putLong(0);                              // leader
        mockCnxManager.toSend(0L, requestBuffer);

        /*
         * Assert that the message receiver thread in leader election is still healthy:
         * we are sending valid votes and waiting for the leader election to be finished.
         */
        sendValidNotifications(1, 0);
        leaderElectionThread.join(5000);
        if (leaderElectionThread.isAlive()) {
            fail("Leader election thread didn't join, something went wrong.");
        }
    }


    @Test
    public void testNotificationMessageWithNegativeConfigLength() throws Exception {

        /*
         * Start mock server 1, send a message with negative configLength field
         */
        startMockServer(1);
        byte requestBytes[] = new byte[48];
        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
        requestBuffer.clear();
        requestBuffer.putInt(ServerState.LOOKING.ordinal());   // state
        requestBuffer.putLong(0);                              // leader
        requestBuffer.putLong(0);                              // zxid
        requestBuffer.putLong(0);                              // electionEpoch
        requestBuffer.putLong(0);                              // epoch
        requestBuffer.putInt(FastLeaderElection.Notification.CURRENTVERSION);   // version
        requestBuffer.putInt(-123);                            // configData.length
        mockCnxManager.toSend(0L, requestBuffer);

        /*
         * Assert that the message receiver thread in leader election is still healthy:
         * we are sending valid votes and waiting for the leader election to be finished.
         */
        sendValidNotifications(1, 0);
        leaderElectionThread.join(5000);
        if (leaderElectionThread.isAlive()) {
            fail("Leader election thread didn't join, something went wrong.");
        }
    }


    @Test
    public void testNotificationMessageWithInvalidConfigLength() throws Exception {

        /*
         * Start mock server 1, send a message with an invalid configLength field
         * (instead of sending CONFIG_BYTES_LENGTH, we send 10000)
         */
        startMockServer(1);
        byte requestBytes[] = new byte[48 + CONFIG_BYTES_LENGTH];
        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
        requestBuffer.clear();
        requestBuffer.putInt(ServerState.LOOKING.ordinal());   // state
        requestBuffer.putLong(0);                              // leader
        requestBuffer.putLong(0);                              // zxid
        requestBuffer.putLong(0);                              // electionEpoch
        requestBuffer.putLong(0);                              // epoch
        requestBuffer.putInt(FastLeaderElection.Notification.CURRENTVERSION);   // version
        requestBuffer.putInt(10000);                           // configData.length
        requestBuffer.put(CONFIG_BYTES);                       // configData
        mockCnxManager.toSend(0L, requestBuffer);

        /*
         * Assert that the message receiver thread in leader election is still healthy:
         * we are sending valid votes and waiting for the leader election to be finished.
         */
        sendValidNotifications(1, 0);
        leaderElectionThread.join(5000);
        if (leaderElectionThread.isAlive()) {
            fail("Leader election thread didn't join, something went wrong.");
        }
    }


    @Test
    public void testNotificationMessageWithInvalidConfig() throws Exception {

        /*
         * Start mock server 1, send a message with an invalid config field
         * (the receiver should not be able to parse the config part of the message)
         */
        startMockServer(1);
        ByteBuffer requestBuffer = FastLeaderElection.buildMsg(ServerState.LOOKING.ordinal(), 1, 0, 0, 0, CONFIG_BYTES);
        mockCnxManager.toSend(0L, requestBuffer);

        /*
         * Assert that the message receiver thread in leader election is still healthy:
         * we are sending valid votes and waiting for the leader election to be finished.
         */
        sendValidNotifications(1, 0);
        leaderElectionThread.join(5000);
        if (leaderElectionThread.isAlive()) {
            fail("Leader election thread didn't join, something went wrong.");
        }
    }


    @Test
    public void testNotificationMessageWithBadProtocol() throws Exception {

        /*
         * Start mock server 1, send an invalid 30 bytes long message
         * (the receiver should not be able to parse the message and should skip it)
         * This simulates the case when only some parts of the whole message is received.
         */
        startMockServer(1);
        byte requestBytes[] = new byte[30];
        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
        requestBuffer.clear();
        requestBuffer.putInt(ServerState.LOOKING.ordinal());   // state
        requestBuffer.putLong(1);                              // leader
        requestBuffer.putLong(0);                              // zxid
        requestBuffer.putLong(0);                              // electionEpoch
        requestBuffer.putShort((short) 0);                      // this is the first two bytes of a proper
                                                               // 8 bytes Long we should send here
        mockCnxManager.toSend(0L, requestBuffer);

        /*
         * Assert that the message receiver thread in leader election is still healthy:
         * we are sending valid votes and waiting for the leader election to be finished.
         */
        sendValidNotifications(1, 0);
        leaderElectionThread.join(5000);
        if (leaderElectionThread.isAlive()) {
            fail("Leader election thread didn't join, something went wrong.");
        }
    }


    void startMockServer(int sid) throws IOException {
        QuorumPeer peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid], port[sid], 3, sid, 1000, 2, 2, 2);
        mockCnxManager = peer.createCnxnManager();
        mockCnxManager.listener.start();
    }


    void sendValidNotifications(int fromSid, int toSid) throws InterruptedException {
        mockCnxManager.toSend((long) toSid, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), fromSid, 0, 0));
        mockCnxManager.recvQueue.take();
        mockCnxManager.toSend((long) toSid, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), toSid, 0, 0));
    }

}