ZooKeeperServerControllerEndToEndTest.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.controller;

import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperServerControllerEndToEndTest extends ControllerTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperServerControllerEndToEndTest.class);
    private ZooKeeper zkClient;
    private static final String AnyPath = "/Any";
    private static final byte[] AnyData = new byte[] {0x0, 0x1};

    @After
    @Override
    public void cleanup() throws InterruptedException {
        if (zkClient != null) {
            zkClient.close();
        }
        super.cleanup();
    }

    private void initClient(Watcher watcher) throws IOException {
        zkClient = new ZooKeeper("localhost:" + config.getClientPortAddress().getPort(), 10000, watcher);
    }

    @Test
    public void verifyClientConnects() throws Exception {
        // Basic validation: we can connect and get events.
        BlockingStateWatcher watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
        this.initClient(watcher);
        watcher.waitForEvent();
    }

    @Test
    public void verifyClientDisconnectsAndReconnects() throws Exception {
        // Setup: First connect to the server and wait for connected.
        BlockingStateWatcher watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
        initClient(watcher);
        watcher.waitForEvent();

        // Force a disconnection through the controller and ensure we get the events in order:
        // 1: Disconnected
        // 2: SyncConnected
        watcher.reset(
                new Watcher.Event.KeeperState[] {
                        Watcher.Event.KeeperState.Disconnected,
                        Watcher.Event.KeeperState.SyncConnected
                });
        Assert.assertTrue(commandClient
                .trySendCommand(ControlCommand.Action.CLOSECONNECTION, String.valueOf(zkClient.getSessionId())));
        watcher.waitForEvent();
    }

    @Test
    public void verifySessionExpiration() throws Exception {
        // Setup: First connect to the server and wait for connected.
        BlockingStateWatcher watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
        initClient(watcher);
        watcher.waitForEvent();

        // Force an expiration.
        // 1: Disconnected
        // 2: Expired
        watcher.reset(
                new Watcher.Event.KeeperState[] {
                        Watcher.Event.KeeperState.Disconnected,
                        Watcher.Event.KeeperState.Expired
                });
        Assert.assertTrue(commandClient
                .trySendCommand(ControlCommand.Action.EXPIRESESSION, String.valueOf(zkClient.getSessionId())));
        watcher.waitForEvent();
    }

    @Test
    public void verifyGlobalSessionExpiration() throws Exception {
        // Step 1: Connect.
        BlockingStateWatcher stateWatcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
        initClient(stateWatcher);
        stateWatcher.waitForEvent();

        // Step 2: Add an ephemeral node (upgrades session to global).
        BlockingPathWatcher pathWatcher = new BlockingPathWatcher(AnyPath, Watcher.Event.EventType.NodeCreated);

        zkClient.exists(AnyPath, pathWatcher);
        Assert.assertEquals(AnyPath,
                zkClient.create(AnyPath, AnyData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL));
        pathWatcher.waitForEvent();

        // Force expire all sessions.
        stateWatcher.reset(Watcher.Event.KeeperState.Expired);
        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.EXPIRESESSION));
        stateWatcher.waitForEvent();
    }

    @Ignore
    public void verifyRejectAcceptSessions() throws Exception {
        // Tell the server to reject new requests.
        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.REJECTCONNECTIONS));
        EventWaiter watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
        initClient(watcher);
        try {
            watcher.waitForEvent(100);
            Assert.fail("should have failed connecting");
        } catch (TimeoutException ex) {
        }
        // Now accept requests. We should get a connection quickly.
        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.RESET));
        watcher.waitForEvent();
    }

    private long timedTransaction() throws Exception {
        long startTime = System.currentTimeMillis();
        zkClient.exists(AnyPath, false);
        return System.currentTimeMillis() - startTime;
    }

    @Test
    public void verifyAddDelay() throws Exception {
        EventWaiter watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);

        initClient(watcher);
        watcher.waitForEvent();
        timedTransaction();

        // Add 200 ms of delay to each response.
        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.ADDDELAY, String.valueOf(200)));
        long delayedDuration = timedTransaction();

        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.RESET));
        long resetDuration = timedTransaction();

        Assert.assertTrue(delayedDuration - resetDuration > 200);
    }

    @Test
    public void verifyFailAllRequests() throws Exception {
        // Step 1: Connect.
        BlockingStateWatcher stateWatcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
        initClient(stateWatcher);
        stateWatcher.waitForEvent();

        // Step 2: Tell the server to fail requests.
        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.FAILREQUESTS));

        try {
            zkClient.exists(AnyPath, null);
            Assert.fail("should have failed");
        } catch (KeeperException ex) {
        }

        // 2nd should fail: we haven't reset.
        try {
            zkClient.exists(AnyPath, null);
            Assert.fail("should still fail");
        } catch (KeeperException ex) {
        }

        // Reset; future requests should succeed.
        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.RESET));

        zkClient.exists(AnyPath, null);
    }

    @Test
    public void verifyFailRequestCount() throws Exception {
        // Step 1: Connect.
        BlockingStateWatcher stateWatcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
        initClient(stateWatcher);
        stateWatcher.waitForEvent();

        // Step 2: Tell the server to fail 1 request.
        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.FAILREQUESTS, "1"));

        try {
            zkClient.exists(AnyPath, null);
            Assert.fail("should have failed");
        } catch (KeeperException ex) {
        }

        // Have not reset; should succeed.
        zkClient.exists(AnyPath, null);
    }

    @Test
    public void verifyServerEatsAllResponses() throws Exception {
        // Step 1: Connect.
        BlockingStateWatcher watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
        initClient(watcher);
        watcher.waitForEvent();

        // No data yet.
        Assert.assertNull(zkClient.exists(AnyPath, null));

        // Step 2: Tell the server to eat responses...nom...nom...nom....
        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.NORESPONSE));

        try {
            BlockingPathWatcher pathWatcher = new BlockingPathWatcher(AnyPath, Watcher.Event.EventType.NodeCreated);
            // This async call should succeed in setting the data, but never send a response.
            zkClient.create(AnyPath, AnyData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, pathWatcher, null);
            pathWatcher.waitForEvent(500);
            Assert.fail("should time out since the event should never come");
        } catch (TimeoutException ex) {
        }

        // Re-enable responses.
        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.RESET));
        watcher.reset(Watcher.Event.KeeperState.SyncConnected);

        try {
            // Even though we get a good response, the client doesn't know about
            // the transaction id (xid). This should terminate the connection and
            // throw a KeeperException.
            zkClient.exists(AnyPath, false);
            Assert.fail("should have failed with bad xid");
        } catch (KeeperException ex) {
            // The client believes it has fallen behind so deems this a connection loss.
            Assert.assertTrue(ex instanceof KeeperException.ConnectionLossException);
        }

        // The client should reconnect and be healthy after this.
        watcher.waitForEvent();
        Assert.assertNotNull(zkClient.exists(AnyPath, false));
    }

    /**
     * Our watcher interface is called back on a potentially separate thread.
     * Tests should be logically consolidated into a single method in the following format:
     * for each action in my test
     *     Setup test action
     *     Kick off async action
     *     await state change
     *     verify state
     *
     * To enable this logical pattern, the watcher has an ordered set of states to wait on.
     * When all the states have arrived (in order), the notifier is unblocked.
     */
    private abstract class EventWaiter implements Watcher, AsyncCallback.StringCallback {
        private final int DEFAULT_WAIT_DURATION = 10000;
        private CountDownLatch eventNotification;

        public EventWaiter() {
            reset();
        }

        protected void reset() {
            eventNotification = new CountDownLatch(1);
        }

        @Override
        public void process(WatchedEvent event) {
            // NO-OP. Derived classes should override if required.
            LOG.info("WatchedEvent: {}", event);
        }

        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            // NO-OP. Derived classes to implement if required.
            LOG.info("StringCallback: {}, {}, {}, {}", rc, path, ctx, name);
        }

        public void notifyListener() {
            eventNotification.countDown();
        }

        public void waitForEvent() throws InterruptedException, TimeoutException {
            waitForEvent(DEFAULT_WAIT_DURATION);
        }

        public void waitForEvent(int waitDurationInMs) throws InterruptedException, TimeoutException {
            // Wait ten seconds and throw if we time out.
            if (!eventNotification.await(waitDurationInMs, TimeUnit.MILLISECONDS)) {
                throw new TimeoutException("Timed out waiting for event");
            }
        }
    }

    private class BlockingStateWatcher extends EventWaiter {
        private Object lockMe = new Object();
        private LinkedList<Event.KeeperState> statesToWaitFor;

        public BlockingStateWatcher(Event.KeeperState stateToNotifyOn) {
            reset(stateToNotifyOn);
        }

        @Override
        public void process(WatchedEvent event) {
            LOG.info("State transition: {}", event.getState());

            boolean shouldNotify = false;
            synchronized (lockMe) {
                if (!statesToWaitFor.isEmpty() && statesToWaitFor.getFirst() == event.getState()) {
                    statesToWaitFor.removeFirst();
                    shouldNotify = statesToWaitFor.isEmpty();
                }
            }

            if (shouldNotify) {
                notifyListener();
            }
        }

        public void reset(Event.KeeperState stateToNotifyOn) {
            reset(new Event.KeeperState[] {stateToNotifyOn});
        }

        public void reset(Event.KeeperState[] orderedStatesToWaitOn) {
            if (orderedStatesToWaitOn == null) {
                throw new IllegalArgumentException("orderedStatesToWaitOn can't be null.");
            }

            if (orderedStatesToWaitOn.length <= 0) {
                throw new IllegalArgumentException("orderedStatesToWaitOn length must be positive.");
            }

            synchronized (lockMe) {
                super.reset();
                statesToWaitFor = new LinkedList<>();

                for (Event.KeeperState state : orderedStatesToWaitOn) {
                    statesToWaitFor.add(state);
                }
            }
        }
    }

    private class BlockingPathWatcher extends EventWaiter {
        private String pathToNotifyOn;
        private Event.EventType requiredEventType;

        public BlockingPathWatcher(String pathToNotifyOn, Event.EventType requiredEventType) {
            reset(pathToNotifyOn, requiredEventType);
        }

        public void reset(String pathToNotifyOn, Event.EventType requiredEventType) {
            super.reset();
            this.pathToNotifyOn = pathToNotifyOn;
            this.requiredEventType = requiredEventType;
        }

        @Override
        public void process(WatchedEvent event) {
            LOG.info("WatchEvent {} for path {}", event.getType(), event.getPath());
            if (pathToNotifyOn != null && event.getType() == requiredEventType
                    && pathToNotifyOn.equalsIgnoreCase(event.getPath())) {
                notifyListener();
            }
        }
    }

}