LocalSessionRequestTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Validate that open/close session request of a local session to not propagate
 * to other machines in the quorum. We verify this by checking that
 * these request doesn't show up in committedLog on other machines.
 */
public class LocalSessionRequestTest extends ZKTestCase {

    protected static final Logger LOG = LoggerFactory.getLogger(LocalSessionRequestTest.class);
    // Need to be short since we need to wait for session to expire
    public static final int CONNECTION_TIMEOUT = 4000;

    private final QuorumBase qb = new QuorumBase();

    @BeforeEach
    public void setUp() throws Exception {
        LOG.info("STARTING quorum {}", getClass().getName());
        qb.localSessionsEnabled = true;
        qb.localSessionsUpgradingEnabled = true;
        qb.setUp();
        ClientBase.waitForServerUp(qb.hostPort, 10000);
    }

    @AfterEach
    public void tearDown() throws Exception {
        LOG.info("STOPPING quorum {}", getClass().getName());
        qb.tearDown();
    }

    @Test
    public void testLocalSessionsOnFollower() throws Exception {
        testOpenCloseSession(false);
    }

    @Test
    public void testLocalSessionsOnLeader() throws Exception {
        testOpenCloseSession(true);
    }

    /**
     * Walk through the target peer committedLog.
     * @param sessionId
     * @param peerId
     */
    private void validateRequestLog(long sessionId, int peerId) {
        String session = Long.toHexString(sessionId);
        LOG.info("Searching for txn of session 0x " + session + " on peer " + peerId);
        String peerType = peerId == qb.getLeaderIndex() ? "leader" : "follower";
        QuorumPeer peer = qb.getPeerList().get(peerId);
        ZKDatabase db = peer.getActiveServer().getZKDatabase();
        for (Proposal p : db.getCommittedLog()) {
            assertFalse(p.getRequest().sessionId == sessionId,
                    "Should not see " + Request.op2String(p.getRequest().type)
                            + " request from local session 0x" + session + " on the " + peerType);
        }
    }

    /**
     * Test that a CloseSession request generated by both the server (client
     * disconnect) or by the client (client explicitly issue close()) doesn't
     * get committed by the ensemble
     */
    public void testOpenCloseSession(boolean onLeader) throws Exception {
        int leaderIdx = qb.getLeaderIndex();
        assertFalse(leaderIdx == -1, "No leader in quorum?");
        int followerIdx = (leaderIdx + 1) % 5;
        int testPeerIdx = onLeader ? leaderIdx : followerIdx;
        int verifyPeerIdx = onLeader ? followerIdx : leaderIdx;

        String[] hostPorts = qb.hostPort.split(",");

        CountdownWatcher watcher = new CountdownWatcher();
        DisconnectableZooKeeper client = new DisconnectableZooKeeper(hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
        watcher.waitForConnected(CONNECTION_TIMEOUT);

        long localSessionId1 = client.getSessionId();

        // Cut the connection, so the server will create closeSession as part
        // of expiring the session.
        client.dontReconnect();
        client.disconnect();
        watcher.reset();

        // We don't validate right away, will do another session create first

        ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx], CONNECTION_TIMEOUT);
        watcher.waitForConnected(CONNECTION_TIMEOUT);

        long localSessionId2 = zk.getSessionId();

        // Send closeSession request.
        zk.close();
        watcher.reset();

        // This should be enough time for the first session to expire and for
        // the closeSession request to propagate to other machines (if there is a bug)
        // Since it is time sensitive, we have false negative when test
        // machine is under load
        Thread.sleep(CONNECTION_TIMEOUT * 2);

        // Validate that we don't see any txn from the first session
        validateRequestLog(localSessionId1, verifyPeerIdx);

        // Validate that we don't see any txn from the second session
        validateRequestLog(localSessionId2, verifyPeerIdx);

        qb.shutdownServers();

    }

}