ReadOnlyModeTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.ByteArrayOutputStream;
import java.io.LineNumberReader;
import java.io.StringReader;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NotReadOnlyException;
import org.apache.zookeeper.Transaction;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.common.StringUtils;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.LoggerFactory;

public class ReadOnlyModeTest extends ZKTestCase {

    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(ReadOnlyModeTest.class);
    private static int CONNECTION_TIMEOUT = QuorumBase.CONNECTION_TIMEOUT;
    private QuorumUtil qu = new QuorumUtil(1);

    @BeforeEach
    public void setUp() throws Exception {
        System.setProperty("readonlymode.enabled", "true");
    }

    @AfterEach
    public void tearDown() throws Exception {
        System.setProperty("readonlymode.enabled", "false");
        qu.tearDown();
    }

    /**
     * Test write operations using multi request.
     */
    @Test
    @Timeout(value = 90)
    public void testMultiTransaction() throws Exception {
        qu.enableLocalSession(true);
        qu.startQuorum();

        CountdownWatcher watcher = new CountdownWatcher();
        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
        watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected

        final String data = "Data to be read in RO mode";
        final String node1 = "/tnode1";
        final String node2 = "/tnode2";
        zk.create(node1, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk.close();
        watcher.waitForDisconnected(CONNECTION_TIMEOUT);

        watcher.reset();
        qu.shutdown(2);
        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
        watcher.waitForConnected(CONNECTION_TIMEOUT);
        assertEquals(States.CONNECTEDREADONLY, zk.getState(), "Should be in r-o mode");

        // read operation during r/o mode
        String remoteData = new String(zk.getData(node1, false, null));
        assertEquals(data, remoteData, "Failed to read data in r-o mode");

        try {
            Transaction transaction = zk.transaction();
            transaction.setData(node1, "no way".getBytes(), -1);
            transaction.create(node2, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            transaction.commit();
            fail("Write operation using multi-transaction" + " api has succeeded during RO mode");
        } catch (NotReadOnlyException e) {
            // ok
        }

        assertNull(zk.exists(node2, false), "Should have created the znode:" + node2);
    }

    /**
     * Basic test of read-only client functionality. Tries to read and write
     * during read-only mode, then regains a quorum and tries to write again.
     */
    @Test
    @Timeout(value = 90)
    public void testReadOnlyClient() throws Exception {
        qu.enableLocalSession(true);
        qu.startQuorum();

        CountdownWatcher watcher = new CountdownWatcher();
        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
        watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected

        final String data = "Data to be read in RO mode";
        final String node = "/tnode";
        zk.create(node, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        watcher.reset();
        qu.shutdown(2);
        zk.close();

        // Re-connect the client (in case we were connected to the shut down
        // server and the local session was not persisted).
        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
        watcher.waitForConnected(CONNECTION_TIMEOUT);

        // read operation during r/o mode
        String remoteData = new String(zk.getData(node, false, null));
        assertEquals(data, remoteData);

        try {
            zk.setData(node, "no way".getBytes(), -1);
            fail("Write operation has succeeded during RO mode");
        } catch (NotReadOnlyException e) {
            // ok
        }

        watcher.reset();
        qu.start(2);
        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT),
            "waiting for server up");
        zk.close();
        watcher.reset();

        // Re-connect the client (in case we were connected to the shut down
        // server and the local session was not persisted).
        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
        watcher.waitForConnected(CONNECTION_TIMEOUT);
        zk.setData(node, "We're in the quorum now".getBytes(), -1);

        zk.close();
    }

    /**
     * Ensures that upon connection to a read-only server client receives
     * ConnectedReadOnly state notification.
     */
    @Test
    @Timeout(value = 90)
    public void testConnectionEvents() throws Exception {
        qu.enableLocalSession(true);
        qu.startQuorum();

        CountdownWatcher watcher = new CountdownWatcher();
        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
        boolean success = false;
        for (int i = 0; i < 30; i++) {
            try {
                zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                success = true;
                break;
            } catch (KeeperException.ConnectionLossException e) {
                Thread.sleep(1000);
            }
        }
        assertTrue(success, "Did not succeed in connecting in 30s");
        assertFalse(watcher.readOnlyConnected, "The connection should not be read-only yet");

        // kill peer and wait no more than 5 seconds for read-only server
        // to be started (which should take one tickTime (2 seconds))
        qu.shutdown(2);

        // Re-connect the client (in case we were connected to the shut down
        // server and the local session was not persisted).
        watcher = new CountdownWatcher();
        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
        long start = Time.currentElapsedTime();
        while (!(zk.getState() == States.CONNECTEDREADONLY)) {
            Thread.sleep(200);
            // TODO this was originally 5 seconds, but realistically, on random/slow/virt hosts, there is no way to guarantee this
            assertTrue(Time.currentElapsedTime() - start < 30000, "Can't connect to the server");
        }

        watcher.waitForReadOnlyConnected(5000);
        zk.close();
    }

    /**
     * Tests a situation when client firstly connects to a read-only server and
     * then connects to a majority server. Transition should be transparent for
     * the user.
     */
    @Test
    @Timeout(value = 90)
    public void testSessionEstablishment() throws Exception {
        qu.enableLocalSession(true);
        qu.startQuorum();

        qu.shutdown(2);

        CountdownWatcher watcher = new CountdownWatcher();
        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
        watcher.waitForConnected(CONNECTION_TIMEOUT);
        assertSame(States.CONNECTEDREADONLY, zk.getState(), "should be in r/o mode");
        long fakeId = zk.getSessionId();
        LOG.info("Connected as r/o mode with state {} and session id {}", zk.getState(), fakeId);

        watcher.reset();
        qu.start(2);
        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT),
            "waiting for server up");
        LOG.info("Server 127.0.0.1:{} is up", qu.getPeer(2).clientPort);
        // ZOOKEEPER-2722: wait until we can connect to a read-write server after the quorum
        // is formed. Otherwise, it is possible that client first connects to a read-only server,
        // then drops the connection because of shutting down of the read-only server caused
        // by leader election / quorum forming between the read-only server and the newly started
        // server. If we happen to execute the zk.create after the read-only server is shutdown and
        // before the quorum is formed, we will get a ConnectLossException.
        watcher.waitForSyncConnected(CONNECTION_TIMEOUT);
        assertEquals(States.CONNECTED, zk.getState(), "Should be in read-write mode");
        LOG.info("Connected as rw mode with state {} and session id {}", zk.getState(), zk.getSessionId());
        zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertFalse(zk.getSessionId() == fakeId, "fake session and real session have same id");
        zk.close();
    }

    @Test
    @Timeout(value = 90)
    public void testGlobalSessionInRO() throws Exception {
        qu.startQuorum();

        CountdownWatcher watcher = new CountdownWatcher();
        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
        watcher.waitForConnected(CONNECTION_TIMEOUT);
        LOG.info("global session created 0x{}", Long.toHexString(zk.getSessionId()));

        watcher.reset();
        qu.shutdown(2);
        try {
            watcher.waitForConnected(CONNECTION_TIMEOUT);
            fail("Should not be able to renew a global session");
        } catch (TimeoutException e) {
        }
        zk.close();

        watcher.reset();
        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
        try {
            watcher.waitForConnected(CONNECTION_TIMEOUT);
            fail("Should not be able to create a global session");
        } catch (TimeoutException e) {
        }
        zk.close();

        qu.getPeer(1).peer.enableLocalSessions(true);
        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
        try {
            watcher.waitForConnected(CONNECTION_TIMEOUT);
        } catch (TimeoutException e) {
            fail("Should be able to create a local session");
        }
        zk.close();
    }

    /**
     * Ensures that client seeks for r/w servers while it's connected to r/o
     * server.
     */
    @SuppressWarnings("deprecation")
    @Test
    @Timeout(value = 90)
    public void testSeekForRwServer() throws Exception {
        qu.enableLocalSession(true);
        qu.startQuorum();

        try (LoggerTestTool loggerTestTool = new LoggerTestTool("org.apache.zookeeper")) {
            ByteArrayOutputStream os = loggerTestTool.getOutputStream();

            qu.shutdown(2);
            CountdownWatcher watcher = new CountdownWatcher();
            ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
            watcher.waitForConnected(CONNECTION_TIMEOUT);

            // if we don't suspend a peer it will rejoin a quorum
            qu.getPeer(1).peer
                    .setSuspended(true);

            // start two servers to form a quorum; client should detect this and
            // connect to one of them
            watcher.reset();
            qu.start(2);
            qu.start(3);
            ClientBase.waitForServerUp(qu.getConnString(), 2000);
            watcher.waitForConnected(CONNECTION_TIMEOUT);
            zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            // resume poor fellow
            qu.getPeer(1).peer
                    .setSuspended(false);

            String log = os.toString();
            assertFalse(StringUtils.isEmpty(log), "OutputStream doesn't have any log messages");

            LineNumberReader r = new LineNumberReader(new StringReader(log));
            String line;
            Pattern p = Pattern.compile(".*Majority server found.*");
            boolean found = false;
            while ((line = r.readLine()) != null) {
                if (p.matcher(line).matches()) {
                    found = true;
                    break;
                }
            }
            assertTrue(found, "Majority server wasn't found while connected to r/o server");
        }
    }
}