DisconnectedWatcherTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisconnectedWatcherTest extends ClientBase {

    protected static final Logger LOG = LoggerFactory.getLogger(DisconnectedWatcherTest.class);
    final int TIMEOUT = 5000;

    private class MyWatcher extends CountdownWatcher {

        LinkedBlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();

        public void process(WatchedEvent event) {
            super.process(event);
            if (event.getType() != Event.EventType.None) {
                try {
                    events.put(event);
                } catch (InterruptedException e) {
                    LOG.warn("ignoring interrupt during event.put");
                }
            }
        }

    }

    private CountdownWatcher watcher1;
    private ZooKeeper zk1;
    private MyWatcher watcher2;
    private ZooKeeper zk2;

    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        watcher1 = new CountdownWatcher();
        zk1 = createClient(watcher1);
        watcher2 = new MyWatcher();
    }

    @AfterEach
    public void tearDown() throws Exception {
        if (zk2 != null) {
            zk2.close();
        }
        if (zk1 != null) {
            zk1.close();
        }
        super.tearDown();
    }

    // @see jira issue ZOOKEEPER-961

    @Test
    public void testChildWatcherAutoResetWithChroot() throws Exception {
        zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        zk2 = createClient(watcher2, hostPort + "/ch1");
        zk2.getChildren("/", true);

        // this call shouldn't trigger any error or watch
        zk1.create("/youdontmatter1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        // this should trigger the watch
        zk1.create("/ch1/youshouldmatter1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        WatchedEvent e = watcher2.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
        assertNotNull(e);
        assertEquals(EventType.NodeChildrenChanged, e.getType());
        assertEquals("/", e.getPath());

        MyWatcher childWatcher = new MyWatcher();
        zk2.getChildren("/", childWatcher);

        stopServer();
        watcher2.waitForDisconnected(3000);
        startServer();
        watcher2.waitForConnected(3000);
        watcher1.waitForConnected(3000);

        // this should trigger the watch
        zk1.create("/ch1/youshouldmatter2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
        assertNotNull(e);
        assertEquals(EventType.NodeChildrenChanged, e.getType());
        assertEquals("/", e.getPath());
    }

    @Test
    public void testDefaultWatcherAutoResetWithChroot() throws Exception {
        zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        zk2 = createClient(watcher2, hostPort + "/ch1");
        zk2.getChildren("/", true);

        // this call shouldn't trigger any error or watch
        zk1.create("/youdontmatter1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        // this should trigger the watch
        zk1.create("/ch1/youshouldmatter1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        WatchedEvent e = watcher2.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
        assertNotNull(e);
        assertEquals(EventType.NodeChildrenChanged, e.getType());
        assertEquals("/", e.getPath());

        zk2.getChildren("/", true);

        stopServer();
        watcher2.waitForDisconnected(3000);
        startServer();
        watcher2.waitForConnected(3000);
        watcher1.waitForConnected(3000);

        // this should trigger the watch
        zk1.create("/ch1/youshouldmatter2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        e = watcher2.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
        assertNotNull(e);
        assertEquals(EventType.NodeChildrenChanged, e.getType());
        assertEquals("/", e.getPath());
    }

    @Test
    public void testDeepChildWatcherAutoResetWithChroot() throws Exception {
        zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.create("/ch1/here", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.create("/ch1/here/we", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.create("/ch1/here/we/are", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        zk2 = createClient(watcher2, hostPort + "/ch1/here/we");
        zk2.getChildren("/are", true);

        // this should trigger the watch
        zk1.create("/ch1/here/we/are/now", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        WatchedEvent e = watcher2.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
        assertNotNull(e);
        assertEquals(EventType.NodeChildrenChanged, e.getType());
        assertEquals("/are", e.getPath());

        MyWatcher childWatcher = new MyWatcher();
        zk2.getChildren("/are", childWatcher);

        stopServer();
        watcher2.waitForDisconnected(3000);
        startServer();
        watcher2.waitForConnected(3000);
        watcher1.waitForConnected(3000);

        // this should trigger the watch
        zk1.create("/ch1/here/we/are/again", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
        assertNotNull(e);
        assertEquals(EventType.NodeChildrenChanged, e.getType());
        assertEquals("/are", e.getPath());
    }

    // @see jira issue ZOOKEEPER-706. Test auto reset of a large number of
    // watches which require multiple SetWatches calls.
    @Test
    @Timeout(value = 14, unit = TimeUnit.MINUTES)
    public void testManyChildWatchersAutoReset() throws Exception {
        zk2 = createClient(watcher2);

        // 110 character base path
        String pathBase = "/long-path-000000000-111111111-222222222-333333333-444444444-"
                + "555555555-666666666-777777777-888888888-999999999";

        zk1.create(pathBase, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        // Create 10,000 nodes. This should ensure the length of our
        // watches set below exceeds 1MB.
        List<String> paths = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            String path = zk1.create(pathBase + "/ch-", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
            paths.add(path);
        }
        LOG.info("Created 10,000 nodes.");

        MyWatcher childWatcher = new MyWatcher();

        // Set a combination of child/exists/data watches
        int i = 0;
        for (String path : paths) {
            if (i % 3 == 0) {
                zk2.getChildren(path, childWatcher);
            } else if (i % 3 == 1) {
                zk2.exists(path + "/foo", childWatcher);
            } else if (i % 3 == 2) {
                zk2.getData(path, childWatcher, null);
            }

            i++;
        }

        stopServer();
        watcher2.waitForDisconnected(30000);
        startServer();
        watcher2.waitForConnected(30000);
        watcher1.waitForConnected(30000);

        // Trigger the watches and ensure they properly propagate to the client
        i = 0;
        for (String path : paths) {
            if (i % 3 == 0) {
                zk1.create(path + "/ch", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

                WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
                assertNotNull(e);
                assertEquals(EventType.NodeChildrenChanged, e.getType());
                assertEquals(path, e.getPath());
            } else if (i % 3 == 1) {
                zk1.create(path + "/foo", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

                WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
                assertNotNull(e);
                assertEquals(EventType.NodeCreated, e.getType());
                assertEquals(path + "/foo", e.getPath());
            } else if (i % 3 == 2) {
                zk1.setData(path, new byte[] { 1, 2, 3 }, -1);

                WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
                assertNotNull(e);
                assertEquals(EventType.NodeDataChanged, e.getType());
                assertEquals(path, e.getPath());
            }

            i++;
        }
    }

}