ObserverMasterTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.test;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.management.Attribute;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.InvalidAttributeValueException;
import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.management.RuntimeMBeanException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.DummyWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.server.admin.Commands;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.util.PortForwarder;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ObserverMasterTest extends ObserverMasterTestBase {

    protected static final Logger LOG = LoggerFactory.getLogger(ObserverMasterTest.class);

    /**
     * This test ensures two things:
     * 1. That Observers can successfully proxy requests to the ensemble.
     * 2. That Observers don't participate in leader elections.
     * The second is tested by constructing an ensemble where a leader would
     * be elected if and only if an Observer voted.
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    public void testObserver(boolean testObserverMaster) throws Exception {
        // We expect two notifications before we want to continue
        latch = new CountDownLatch(2);
        setUp(-1, testObserverMaster);
        q3.start();
        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT),
                "waiting for server 3 being up");

        validateObserverSyncTimeMetrics();

        if (testObserverMaster) {
            int masterPort = q3.getQuorumPeer().observer.getSocket().getPort();
            LOG.info("port {} {}", masterPort, OM_PORT);
            assertEquals(masterPort, OM_PORT, "observer failed to connect to observer master");
        }

        zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this);
        zk.create("/obstest", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        // Assert that commands are getting forwarded correctly
        assertEquals(new String(zk.getData("/obstest", null, null)), "test");

        // Now check that other commands don't blow everything up
        zk.sync("/", null, null);
        zk.setData("/obstest", "test2".getBytes(), -1);
        zk.getChildren("/", false);

        assertEquals(zk.getState(), States.CONNECTED);

        LOG.info("Shutting down server 2");
        // Now kill one of the other real servers
        q2.shutdown();

        assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT),
                "Waiting for server 2 to shut down");

        LOG.info("Server 2 down");

        // Now the resulting ensemble shouldn't be quorate
        latch.await();
        assertNotSame(KeeperState.SyncConnected, lastEvent.getState(), "Client is still connected to non-quorate cluster");

        LOG.info("Latch returned");

        try {
            assertNotEquals("Shouldn't get a response when cluster not quorate!", "test", new String(zk.getData("/obstest", null, null)));
        } catch (ConnectionLossException c) {
            LOG.info("Connection loss exception caught - ensemble not quorate (this is expected)");
        }

        latch = new CountDownLatch(1);

        LOG.info("Restarting server 2");

        // Bring it back
        //q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, extraCfgs);
        q2.start();

        LOG.info("Waiting for server 2 to come up");
        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT),
                "waiting for server 2 being up");

        LOG.info("Server 2 started, waiting for latch");

        latch.await();
        // It's possible our session expired - but this is ok, shows we
        // were able to talk to the ensemble
        assertTrue((KeeperState.SyncConnected == lastEvent.getState() || KeeperState.Expired == lastEvent.getState()),
                "Client didn't reconnect to quorate ensemble (state was" + lastEvent.getState() + ")");

        LOG.info("perform a revalidation test");
        int leaderProxyPort = PortAssignment.unique();
        int obsProxyPort = PortAssignment.unique();
        int leaderPort = q1.getQuorumPeer().leader == null ? CLIENT_PORT_QP2 : CLIENT_PORT_QP1;
        PortForwarder leaderPF = new PortForwarder(leaderProxyPort, leaderPort);

        latch = new CountDownLatch(1);
        ZooKeeper client = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", leaderProxyPort, obsProxyPort), ClientBase.CONNECTION_TIMEOUT, this);
        latch.await();
        client.create("/revalidtest", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        assertNotNull(client.exists("/revalidtest", null), "Read-after write failed");

        latch = new CountDownLatch(2);
        PortForwarder obsPF = new PortForwarder(obsProxyPort, CLIENT_PORT_OBS);
        try {
            leaderPF.shutdown();
        } catch (Exception e) {
            // ignore?
        }
        latch.await();
        assertEquals(new String(client.getData("/revalidtest", null, null)), "test");
        client.close();
        obsPF.shutdown();

        shutdown();
    }

    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    public void testRevalidation(boolean testObserverMaster) throws Exception {
        setUp(-1, testObserverMaster);
        q3.start();
        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT),
                "waiting for server 3 being up");
        final int leaderProxyPort = PortAssignment.unique();
        final int obsProxyPort = PortAssignment.unique();

        int leaderPort = q1.getQuorumPeer().leader == null ? CLIENT_PORT_QP2 : CLIENT_PORT_QP1;
        PortForwarder leaderPF = new PortForwarder(leaderProxyPort, leaderPort);

        latch = new CountDownLatch(1);
        zk = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", leaderProxyPort, obsProxyPort), ClientBase.CONNECTION_TIMEOUT, this);
        latch.await();
        zk.create("/revalidtest", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        assertNotNull(zk.exists("/revalidtest", null), "Read-after write failed");

        latch = new CountDownLatch(2);
        PortForwarder obsPF = new PortForwarder(obsProxyPort, CLIENT_PORT_OBS);
        try {
            leaderPF.shutdown();
        } catch (Exception e) {
            // ignore?
        }
        latch.await();
        assertEquals(new String(zk.getData("/revalidtest", null, null)), "test");
        obsPF.shutdown();

        shutdown();
    }

    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    public void testInOrderCommits(boolean testObserverMaster) throws Exception {
        setUp(-1, testObserverMaster);

        zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, null);
        for (int i = 0; i < 10; i++) {
            zk.create("/bulk"
                              + i, ("Initial data of some size").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        zk.close();

        q3.start();
        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT),
                "waiting for observer to be up");

        latch = new CountDownLatch(1);
        zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, this);
        latch.await();
        assertEquals(zk.getState(), States.CONNECTED);

        zk.create("/init", "first".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        final long zxid = q1.getQuorumPeer().getLastLoggedZxid();

        // wait for change to propagate
        waitFor("Timeout waiting for observer sync", new WaitForCondition() {
            public boolean evaluate() {
                return zxid == q3.getQuorumPeer().getLastLoggedZxid();
            }
        }, 30);

        ZooKeeper obsZk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this);
        int followerPort = q1.getQuorumPeer().leader == null ? CLIENT_PORT_QP1 : CLIENT_PORT_QP2;
        ZooKeeper fZk = new ZooKeeper("127.0.0.1:" + followerPort, ClientBase.CONNECTION_TIMEOUT, this);
        final int numTransactions = 10001;
        CountDownLatch gate = new CountDownLatch(1);
        CountDownLatch oAsyncLatch = new CountDownLatch(numTransactions);
        Thread oAsyncWriteThread = new Thread(new AsyncWriter(obsZk, numTransactions, true, oAsyncLatch, "/obs", gate));
        CountDownLatch fAsyncLatch = new CountDownLatch(numTransactions);
        Thread fAsyncWriteThread = new Thread(new AsyncWriter(fZk, numTransactions, true, fAsyncLatch, "/follower", gate));

        LOG.info("ASYNC WRITES");
        oAsyncWriteThread.start();
        fAsyncWriteThread.start();
        gate.countDown();

        oAsyncLatch.await();
        fAsyncLatch.await();

        oAsyncWriteThread.join(ClientBase.CONNECTION_TIMEOUT);
        if (oAsyncWriteThread.isAlive()) {
            LOG.error("asyncWriteThread is still alive");
        }
        fAsyncWriteThread.join(ClientBase.CONNECTION_TIMEOUT);
        if (fAsyncWriteThread.isAlive()) {
            LOG.error("asyncWriteThread is still alive");
        }

        obsZk.close();
        fZk.close();

        shutdown();
    }

    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    public void testAdminCommands(boolean testObserverMaster) throws IOException, MBeanException, InstanceNotFoundException, ReflectionException, InterruptedException, MalformedObjectNameException, AttributeNotFoundException, InvalidAttributeValueException, KeeperException {
        // flush all beans, then start
        for (ZKMBeanInfo beanInfo : MBeanRegistry.getInstance().getRegisteredBeans()) {
            MBeanRegistry.getInstance().unregister(beanInfo);
        }

        JMXEnv.setUp();
        setUp(-1, testObserverMaster);
        q3.start();
        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT),
                "waiting for observer to be up");

        // Assert that commands are getting forwarded correctly
        zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this);
        zk.create("/obstest", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertEquals(new String(zk.getData("/obstest", null, null)), "test");

        // test stats collection
        final Map<String, String> emptyMap = Collections.emptyMap();
        Map<String, Object> stats = Commands.runGetCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap, null, null).toMap();
        assertTrue(stats.containsKey("observer_master_id"), "observer not emitting observer_master_id");

        // check the stats for the first peer
        if (testObserverMaster) {
            if (q1.getQuorumPeer().leader == null) {
                assertEquals(Integer.valueOf(1), q1.getQuorumPeer().getSynced_observers_metric());
            } else {
                assertEquals(Integer.valueOf(0), q1.getQuorumPeer().getSynced_observers_metric());
            }
        } else {
            if (q1.getQuorumPeer().leader == null) {
                assertNull(q1.getQuorumPeer().getSynced_observers_metric());
            } else {
                assertEquals(Integer.valueOf(1), q1.getQuorumPeer().getSynced_observers_metric());
            }
        }

        // check the stats for the second peer
        if (testObserverMaster) {
            if (q2.getQuorumPeer().leader == null) {
                assertEquals(Integer.valueOf(1), q2.getQuorumPeer().getSynced_observers_metric());
            } else {
                assertEquals(Integer.valueOf(0), q2.getQuorumPeer().getSynced_observers_metric());
            }
        } else {
            if (q2.getQuorumPeer().leader == null) {
                assertNull(q2.getQuorumPeer().getSynced_observers_metric());
            } else {
                assertEquals(Integer.valueOf(1), q2.getQuorumPeer().getSynced_observers_metric());
            }
        }

        // test admin commands for disconnection
        ObjectName connBean = null;
        for (ObjectName bean : JMXEnv.conn().queryNames(new ObjectName(MBeanRegistry.DOMAIN + ":*"), null)) {
            if (bean.getCanonicalName().contains("Learner_Connections") && bean.getCanonicalName().contains("id:"
                                                                                                                    + q3.getQuorumPeer().getMyId())) {
                connBean = bean;
                break;
            }
        }
        assertNotNull(connBean, "could not find connection bean");

        latch = new CountDownLatch(1);
        JMXEnv.conn().invoke(connBean, "terminateConnection", new Object[0], null);
        assertTrue(latch.await(CONNECTION_TIMEOUT / 2, TimeUnit.MILLISECONDS),
                "server failed to disconnect on terminate");
        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT),
                "waiting for server 3 being up");

        final String obsBeanName = String.format("org.apache.ZooKeeperService:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Observer", q3.getQuorumPeer().getMyId(), q3.getQuorumPeer().getMyId());
        Set<ObjectName> names = JMXEnv.conn().queryNames(new ObjectName(obsBeanName), null);
        assertEquals(1, names.size(), "expecting singular observer bean");
        ObjectName obsBean = names.iterator().next();

        if (testObserverMaster) {
            // show we can move the observer using the id
            long observerMasterId = q3.getQuorumPeer().observer.getLearnerMasterId();
            latch = new CountDownLatch(1);
            JMXEnv.conn().setAttribute(obsBean, new Attribute("LearnerMaster", Long.toString(3 - observerMasterId)));
            assertTrue(latch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "server failed to disconnect on terminate");
            assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT),
                    "waiting for server 3 being up");
        } else {
            // show we get an error
            final long leaderId = q1.getQuorumPeer().leader == null ? 2 : 1;
            try {
                JMXEnv.conn().setAttribute(obsBean, new Attribute("LearnerMaster", Long.toString(3 - leaderId)));
                fail("should have seen an exception on previous command");
            } catch (RuntimeMBeanException e) {
                assertEquals(IllegalArgumentException.class, e.getCause().getClass(), "mbean failed for the wrong reason");
            }
        }

        shutdown();
        JMXEnv.tearDown();
    }

    private String createServerString(String type, long serverId, int clientPort) {
        return "server." + serverId + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":" + type + ";" + clientPort;
    }

    private void waitServerUp(int clientPort) {
        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPort, CONNECTION_TIMEOUT),
                "waiting for server being up");
    }

    private ZooKeeperAdmin createAdmin(int clientPort) throws IOException {
        System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
        QuorumPeerConfig.setReconfigEnabled(true);
        ZooKeeperAdmin admin = new ZooKeeperAdmin(
            "127.0.0.1:" + clientPort,
            ClientBase.CONNECTION_TIMEOUT,
            DummyWatcher.INSTANCE);
        admin.addAuthInfo("digest", "super:test".getBytes());
        return admin;
    }

    // This test is known to be flaky and fail due to "reconfig already in progress".
    // TODO: Investigate intermittent testDynamicReconfig failures.
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Disabled
    public void testDynamicReconfig(boolean testObserverMaster) throws InterruptedException, IOException, KeeperException {
        if (!testObserverMaster) {
            return;
        }

        ClientBase.setupTestEnv();

        // create a quorum running with different observer master port
        // to make it easier to choose which server the observer is
        // following with
        //
        // we have setObserverMaster function but it's broken, use this
        // solution before we fixed that
        int clientPort1 = PortAssignment.unique();
        int clientPort2 = PortAssignment.unique();
        int omPort1 = PortAssignment.unique();
        int omPort2 = PortAssignment.unique();
        String quorumCfgSection = createServerString("participant", 1, clientPort1)
                                          + "\n"
                                          + createServerString("participant", 2, clientPort2);

        MainThread s1 = new MainThread(1, clientPort1, quorumCfgSection, String.format("observerMasterPort=%d%n", omPort1));
        MainThread s2 = new MainThread(2, clientPort2, quorumCfgSection, String.format("observerMasterPort=%d%n", omPort2));
        s1.start();
        s2.start();
        waitServerUp(clientPort1);
        waitServerUp(clientPort2);

        // create observer to follow non-leader observer master
        long nonLeaderOMPort = s1.getQuorumPeer().leader == null ? omPort1 : omPort2;
        int observerClientPort = PortAssignment.unique();
        int observerId = 10;
        MainThread observer = new MainThread(
            observerId,
            observerClientPort,
            quorumCfgSection + "\n" + createServerString("observer", observerId, observerClientPort),
            String.format("observerMasterPort=%d%n", nonLeaderOMPort));
        LOG.info("starting observer");
        observer.start();
        waitServerUp(observerClientPort);

        // create a client to the observer
        final LinkedBlockingQueue<KeeperState> states = new LinkedBlockingQueue<>();
        ZooKeeper observerClient = new ZooKeeper(
            "127.0.0.1:" + observerClientPort,
            ClientBase.CONNECTION_TIMEOUT,
            event -> {
                try {
                    states.put(event.getState());
                } catch (InterruptedException ignore) {

                }
            });

        // wait for connected
        KeeperState state = states.poll(1000, TimeUnit.MILLISECONDS);
        assertEquals(KeeperState.SyncConnected, state);

        // issue reconfig command
        ArrayList<String> newServers = new ArrayList<>();
        String server = "server.3=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":participant;localhost:" + PortAssignment.unique();
        newServers.add(server);
        ZooKeeperAdmin admin = createAdmin(clientPort1);
        ReconfigTest.reconfig(admin, newServers, null, null, -1);

        // make sure the observer has the new config
        ReconfigTest.testServerHasConfig(observerClient, newServers, null);

        // shouldn't be disconnected during reconfig, so expect to not
        // receive any new event
        state = states.poll(1000, TimeUnit.MILLISECONDS);
        assertNull(state);

        admin.close();
        observerClient.close();
        observer.shutdown();
        s2.shutdown();
        s1.shutdown();
    }

    class AsyncWriter implements Runnable {

        private final ZooKeeper client;
        private final int numTransactions;
        private final boolean issueSync;
        private final CountDownLatch writerLatch;
        private final String root;
        private final CountDownLatch gate;

        AsyncWriter(ZooKeeper client, int numTransactions, boolean issueSync, CountDownLatch writerLatch, String root, CountDownLatch gate) {
            this.client = client;
            this.numTransactions = numTransactions;
            this.issueSync = issueSync;
            this.writerLatch = writerLatch;
            this.root = root;
            this.gate = gate;
        }

        @Override
        public void run() {
            if (gate != null) {
                try {
                    gate.await();
                } catch (InterruptedException e) {
                    LOG.error("Gate interrupted");
                    return;
                }
            }
            for (int i = 0; i < numTransactions; i++) {
                final boolean pleaseLog = i % 100 == 0;
                client.create(root
                                      + i, "inner thread".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
                                          writerLatch.countDown();
                                          if (pleaseLog) {
                                              LOG.info("wrote {}", path);
                                          }
                                      }, null);
                if (pleaseLog) {
                    LOG.info("async wrote {}{}", root, i);
                    if (issueSync) {
                        client.sync(root + "0", null, null);
                    }
                }
            }
        }

    }

    private void validateObserverSyncTimeMetrics() {
        final String name = "observer_sync_time";
        final Map<String, Object> metrics = MetricsUtils.currentServerMetrics();

        assertEquals(5, metrics.keySet().stream().filter(key -> key.contains(name)).count());
        assertNotNull(metrics.get(String.format("avg_%s", name)));
        assertNotNull(metrics.get(String.format("min_%s", name)));
        assertNotNull(metrics.get(String.format("max_%s", name)));
        assertNotNull(metrics.get(String.format("cnt_%s", name)));
        assertNotNull(metrics.get(String.format("sum_%s", name)));
    }
}