QuorumTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.DummyWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LearnerHandler;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QuorumTest extends ZKTestCase {

    private static final Logger LOG = LoggerFactory.getLogger(QuorumTest.class);
    public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;

    private final QuorumBase qb = new QuorumBase();
    private final ClientTest ct = new ClientTest();
    private QuorumUtil qu;

    @BeforeEach
    public void setUp() throws Exception {
        qb.setUp();
        ct.hostPort = qb.hostPort;
        ct.setUpAll();
    }

    @AfterEach
    public void tearDown() throws Exception {
        ct.tearDownAll();
        qb.tearDown();
        if (qu != null) {
            qu.tearDown();
        }
    }

    @Test
    public void testDeleteWithChildren() throws Exception {
        ct.testDeleteWithChildren();
    }

    @Test
    public void testPing() throws Exception {
        ct.testPing();
    }

    @Test
    public void testSequentialNodeNames() throws IOException, InterruptedException, KeeperException {
        ct.testSequentialNodeNames();
    }

    @Test
    public void testACLs() throws Exception {
        ct.testACLs();
    }

    @Test
    public void testClientwithoutWatcherObj() throws IOException, InterruptedException, KeeperException {
        ct.testClientwithoutWatcherObj();
    }

    @Test
    public void testClientWithWatcherObj() throws IOException, InterruptedException, KeeperException {
        ct.testClientWithWatcherObj();
    }

    @Test
    public void testGetView() {
        assertEquals(5, qb.s1.getView().size());
        assertEquals(5, qb.s2.getView().size());
        assertEquals(5, qb.s3.getView().size());
        assertEquals(5, qb.s4.getView().size());
        assertEquals(5, qb.s5.getView().size());
    }

    @Test
    public void testViewContains() {
        // Test view contains self
        assertTrue(qb.s1.viewContains(qb.s1.getMyId()));

        // Test view contains other servers
        assertTrue(qb.s1.viewContains(qb.s2.getMyId()));

        // Test view does not contain non-existent servers
        assertFalse(qb.s1.viewContains(-1L));
    }

    volatile int counter = 0;
    volatile int errors = 0;
    @Test
    public void testLeaderShutdown() throws IOException, InterruptedException, KeeperException {
        ZooKeeper zk = new DisconnectableZooKeeper(
            qb.hostPort,
            ClientBase.CONNECTION_TIMEOUT,
            DummyWatcher.INSTANCE);
        zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk.create("/blah/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Leader leader = qb.s1.leader;
        if (leader == null) {
            leader = qb.s2.leader;
        }
        if (leader == null) {
            leader = qb.s3.leader;
        }
        if (leader == null) {
            leader = qb.s4.leader;
        }
        if (leader == null) {
            leader = qb.s5.leader;
        }
        assertNotNull(leader);
        for (int i = 0; i < 5000; i++) {
            zk.setData("/blah/blah", new byte[0], -1, (rc, path, ctx, stat) -> {
                counter++;
                if (rc != 0) {
                    errors++;
                }
            }, null);
        }
        for (LearnerHandler f : leader.getForwardingFollowers()) {
            f.getSocket().shutdownInput();
        }
        for (int i = 0; i < 5000; i++) {
            zk.setData("/blah/blah", new byte[0], -1, (rc, path, ctx, stat) -> {
                counter++;
                if (rc != 0) {
                    errors++;
                }
            }, null);
        }
        // check if all the followers are alive
        assertTrue(qb.s1.isAlive());
        assertTrue(qb.s2.isAlive());
        assertTrue(qb.s3.isAlive());
        assertTrue(qb.s4.isAlive());
        assertTrue(qb.s5.isAlive());
        zk.close();
    }

    @Test
    public void testMultipleWatcherObjs() throws IOException, InterruptedException, KeeperException {
        ct.testMultipleWatcherObjs();
    }

    /**
     * Make sure that we can change sessions among servers and maintain consistent view
     * using {@link ZooKeeper#sync(String)}.
     */
    @Test
    public void testSessionMovedWithSynchronousSync() throws Exception {
        testSessionMoved(true);
    }

    /**
     * Make sure that we can change sessions among servers and maintain consistent view
     * using {@link ZooKeeper#sync(String, AsyncCallback.VoidCallback, Object)}.
     */
    @Test
    public void testSessionMovedWithAsynchronousSync() throws Exception {
        testSessionMoved(false);
    }

    public void testSessionMoved(boolean synchronous_sync) throws Exception {
        String[] hostPorts = qb.hostPort.split(",");
        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
            hostPorts[0],
            ClientBase.CONNECTION_TIMEOUT,
            DummyWatcher.INSTANCE);
        zk.create("/sessionMoveTest", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        // we want to loop through the list twice
        for (int i = 0; i < hostPorts.length * 2; i++) {
            zk.dontReconnect();
            // This should stomp the zk handle
            DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(
                hostPorts[(i + 1) % hostPorts.length],
                ClientBase.CONNECTION_TIMEOUT,
                DummyWatcher.INSTANCE,
                zk.getSessionId(),
                zk.getSessionPasswd());
            zknew.setData("/", new byte[1], -1);
            syncClient(zknew, synchronous_sync);
            LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]);
            try {
                zk.setData("/", new byte[1], -1);
                fail("Should have lost the connection");
            } catch (KeeperException.ConnectionLossException e) {
            }
            zk = zknew;
        }
        zk.close();
    }

    private static class DiscoWatcher implements Watcher {

        volatile boolean zkDisco = false;
        public void process(WatchedEvent event) {
            if (event.getState() == KeeperState.Disconnected) {
                zkDisco = true;
            }
        }

    }

    /**
     * Make sure the previous connection closed after session move within
     * multiop.
     *
     * @throws IOException
     * @throws InterruptedException
     * @throws KeeperException
     */
    @Test
    public void testSessionMovedWithMultiOp() throws Exception {
        String[] hostPorts = qb.hostPort.split(",");
        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
            hostPorts[0],
            ClientBase.CONNECTION_TIMEOUT,
            DummyWatcher.INSTANCE);
        zk.multi(Arrays.asList(Op.create("/testSessionMovedWithMultiOp", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)));

        // session moved to the next server
        ZooKeeper zknew = new ZooKeeper(
            hostPorts[1],
            ClientBase.CONNECTION_TIMEOUT,
            DummyWatcher.INSTANCE,
            zk.getSessionId(),
            zk.getSessionPasswd());

        zknew.multi(Arrays.asList(Op.create("/testSessionMovedWithMultiOp-1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)));

        // try to issue the multi op again from the old connection
        // expect to have ConnectionLossException instead of keep
        // getting SessionMovedException
        try {
            zk.multi(Arrays.asList(Op.create("/testSessionMovedWithMultiOp-Failed", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)));
            fail("Should have lost the connection");
        } catch (KeeperException.ConnectionLossException e) {
        }

        zk.close();
        zknew.close();
    }

    /**
     * Connect to two different servers with two different handles using the same session and
     * make sure we cannot do any changes.
     */
    @Test
    @Disabled
    public void testSessionMove() throws Exception {
        String[] hps = qb.hostPort.split(",");
        DiscoWatcher oldWatcher = new DiscoWatcher();
        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hps[0], ClientBase.CONNECTION_TIMEOUT, oldWatcher);
        zk.create("/t1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        zk.dontReconnect();
        // This should stomp the zk handle
        DiscoWatcher watcher = new DiscoWatcher();
        DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hps[1], ClientBase.CONNECTION_TIMEOUT, watcher, zk.getSessionId(), zk.getSessionPasswd());
        zknew.create("/t2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        try {
            zk.create("/t3", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            fail("Should have lost the connection");
        } catch (KeeperException.ConnectionLossException e) {
            // wait up to 30 seconds for the disco to be delivered
            for (int i = 0; i < 30; i++) {
                if (oldWatcher.zkDisco) {
                    break;
                }
                Thread.sleep(1000);
            }
            assertTrue(oldWatcher.zkDisco);
        }

        ArrayList<ZooKeeper> toClose = new ArrayList<>();
        toClose.add(zknew);
        // Let's just make sure it can still move
        for (int i = 0; i < 10; i++) {
            zknew.dontReconnect();
            zknew = new DisconnectableZooKeeper(hps[1], ClientBase.CONNECTION_TIMEOUT, new DiscoWatcher(), zk.getSessionId(), zk.getSessionPasswd());
            toClose.add(zknew);
            zknew.create("/t-" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        }
        for (ZooKeeper z : toClose) {
            z.close();
        }
        zk.close();
    }

    /**
     * See ZOOKEEPER-790 for details
     * */
    @Test
    public void testFollowersStartAfterLeader() throws Exception {
        qu = new QuorumUtil(1);
        CountdownWatcher watcher = new CountdownWatcher();
        qu.startQuorum();

        int index = 1;
        while (qu.getPeer(index).peer.leader == null) {
            index++;
        }

        // break the quorum
        qu.shutdown(index);

        // try to reestablish the quorum
        qu.start(index);

        // Connect the client after services are restarted (otherwise we would get
        // SessionExpiredException as the previous local session was not persisted).
        ZooKeeper zk = new ZooKeeper(
                "127.0.0.1:" + qu.getPeer((index == 1) ? 2 : 1).peer.getClientPort(),
                ClientBase.CONNECTION_TIMEOUT,
                watcher);

        try {
            watcher.waitForConnected(CONNECTION_TIMEOUT);
        } catch (TimeoutException e) {
            fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
        }

        zk.close();
    }

    // skip superhammer and clientcleanup as they are too expensive for quorum

    /**
     * Tests if a multiop submitted to a non-leader propagates to the leader properly
     * (see ZOOKEEPER-1124).
     *
     * The test works as follows. It has a client connect to a follower and submit a multiop
     * to the follower. It then verifies that the multiop successfully gets committed by the leader.
     *
     * Without the fix in ZOOKEEPER-1124, this fails with a ConnectionLoss KeeperException.
     */
    @Test
    public void testMultiToFollower() throws Exception {
        qu = new QuorumUtil(1);
        CountdownWatcher watcher = new CountdownWatcher();
        qu.startQuorum();

        int index = 1;
        while (qu.getPeer(index).peer.leader == null) {
            index++;
        }

        ZooKeeper zk = new ZooKeeper(
                "127.0.0.1:" + qu.getPeer((index == 1) ? 2 : 1).peer.getClientPort(),
                ClientBase.CONNECTION_TIMEOUT,
                watcher);
        watcher.waitForConnected(CONNECTION_TIMEOUT);

        zk.multi(Arrays.asList(
                Op.create("/multi0", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                Op.create("/multi1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                Op.create("/multi2", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)));
        zk.getData("/multi0", false, null);
        zk.getData("/multi1", false, null);
        zk.getData("/multi2", false, null);

        zk.close();
    }

}