PersistentRecursiveWatcherTest.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.test;

import static org.apache.zookeeper.AddWatchMode.PERSISTENT;
import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentRecursiveWatcherTest extends ClientBase {
    private static final Logger LOG = LoggerFactory.getLogger(PersistentRecursiveWatcherTest.class);
    private BlockingQueue<WatchedEvent> events;
    private Watcher persistentWatcher;

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();

        events = new LinkedBlockingQueue<>();
        persistentWatcher = event -> events.add(event);
    }

    @Test
    public void testBasic()
            throws IOException, InterruptedException, KeeperException {
        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
            zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
            internalTestBasic(zk);
        }
    }

    @Test
    public void testBasicAsync()
            throws IOException, InterruptedException, KeeperException {
        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
            final CountDownLatch latch = new CountDownLatch(1);
            AsyncCallback.VoidCallback cb = (rc, path, ctx) -> {
                if (rc == KeeperException.Code.OK.intValue()) {
                    latch.countDown();
                }
            };
            zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE, cb, null);
            assertTrue(latch.await(5, TimeUnit.SECONDS));
            internalTestBasic(zk);
        }
    }

    private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException {
        zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        Stat stat = new Stat();
        zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
        assertEvent(events, EventType.NodeCreated, "/a/b", stat);

        zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
        assertEvent(events, EventType.NodeCreated, "/a/b/c", stat);

        zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
        assertEvent(events, EventType.NodeCreated, "/a/b/c/d", stat);

        zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
        assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat);

        stat = zk.setData("/a/b/c/d/e", new byte[0], -1);
        assertEvent(events, EventType.NodeDataChanged, "/a/b/c/d/e", stat);

        zk.delete("/a/b/c/d/e", -1);
        assertEvent(events, EventType.NodeDeleted, "/a/b/c/d/e", zk.exists("/a/b/c/d", false).getPzxid());

        zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
        assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat);
    }

    @Test
    public void testRemoval()
            throws IOException, InterruptedException, KeeperException {
        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
            zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
            zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Stat stat = new Stat();
            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
            assertEvent(events, EventType.NodeCreated, "/a/b", stat);
            zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
            assertEvent(events, EventType.NodeCreated, "/a/b/c", stat);

            zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false);
            zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            assertEvent(events, EventType.PersistentWatchRemoved, "/a/b", WatchedEvent.NO_ZXID);
        }
    }

    @Test
    public void testNoChildEvents() throws Exception {
        try (ZooKeeper zk = createClient()) {
            zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            zk.addWatch("/", persistentWatcher, PERSISTENT_RECURSIVE);

            BlockingQueue<WatchedEvent> childEvents = new LinkedBlockingQueue<>();
            zk.getChildren("/a", childEvents::add);

            Stat createABStat = new Stat();
            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createABStat);
            Stat createABCStat = new Stat();
            zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createABCStat);

            assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a", createABStat.getPzxid());

            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b", createABStat);
            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c", createABCStat);
            assertTrue(events.isEmpty());
        }
    }

    @Test
    public void testDisconnect() throws Exception {
        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
            zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
            stopServer();
            assertEvent(events, EventType.None, KeeperState.Disconnected, null, WatchedEvent.NO_ZXID);
            startServer();
            assertEvent(events, EventType.None, KeeperState.SyncConnected, null, WatchedEvent.NO_ZXID);
            internalTestBasic(zk);
        }
    }

    @Test
    public void testMultiClient()
            throws IOException, InterruptedException, KeeperException {
        try (ZooKeeper zk1 = createClient(new CountdownWatcher(), hostPort); ZooKeeper zk2 = createClient(new CountdownWatcher(), hostPort)) {

            zk1.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            zk1.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            zk1.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            zk1.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
            Stat stat = zk1.setData("/a/b/c", "one".getBytes(), -1);
            assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid());

            stat = zk2.setData("/a/b/c", "two".getBytes(), -1);
            assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid());
            stat = zk2.setData("/a/b/c", "three".getBytes(), -1);
            assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid());
            stat = zk2.setData("/a/b/c", "four".getBytes(), -1);
            assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid());
        }
    }

    @Test
    public void testSamePathWithDifferentWatchModes() throws Exception {
        try (ZooKeeper zk = createClient()) {
            BlockingQueue<WatchedEvent> dataEvents = new LinkedBlockingQueue<>();
            BlockingQueue<WatchedEvent> childEvents = new LinkedBlockingQueue<>();
            BlockingQueue<WatchedEvent> persistentEvents = new LinkedBlockingQueue<>();
            BlockingQueue<WatchedEvent> recursiveEvents = new LinkedBlockingQueue<>();

            zk.addWatch("/a", persistentEvents::add, PERSISTENT);
            zk.addWatch("/a", recursiveEvents::add, PERSISTENT_RECURSIVE);
            zk.exists("/a", dataEvents::add);

            Stat stat = new Stat();
            zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
            assertEvent(dataEvents, Watcher.Event.EventType.NodeCreated, "/a", stat);
            assertEvent(persistentEvents, Watcher.Event.EventType.NodeCreated, "/a", stat);
            assertEvent(recursiveEvents, Watcher.Event.EventType.NodeCreated, "/a", stat);

            zk.getData("/a", dataEvents::add, null);
            stat = zk.setData("/a", new byte[0], -1);
            assertEvent(dataEvents, Watcher.Event.EventType.NodeDataChanged, "/a", stat);
            assertEvent(persistentEvents, Watcher.Event.EventType.NodeDataChanged, "/a", stat);
            assertEvent(recursiveEvents, Watcher.Event.EventType.NodeDataChanged, "/a", stat);

            zk.getChildren("/a", childEvents::add);
            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
            assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a", stat);
            assertEvent(persistentEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a", stat);
            assertEvent(recursiveEvents, Watcher.Event.EventType.NodeCreated, "/a/b", stat);

            zk.getChildren("/a", childEvents::add);
            zk.delete("/a/b", -1);
            stat = zk.exists("/a", false);
            assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a", stat.getPzxid());
            assertEvent(persistentEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a", stat.getPzxid());
            assertEvent(recursiveEvents, Watcher.Event.EventType.NodeDeleted, "/a/b", stat.getPzxid());

            zk.getChildren("/a", childEvents::add);
            zk.getData("/a", dataEvents::add, null);
            zk.exists("/a", dataEvents::add);
            zk.delete("/a", -1);
            stat = zk.exists("/", false);
            assertEvent(childEvents, Watcher.Event.EventType.NodeDeleted, "/a", stat.getPzxid());
            assertEvent(dataEvents, Watcher.Event.EventType.NodeDeleted, "/a", stat.getPzxid());
            assertEvent(dataEvents, Watcher.Event.EventType.NodeDeleted, "/a", stat.getPzxid());
            assertEvent(persistentEvents, Watcher.Event.EventType.NodeDeleted, "/a", stat.getPzxid());
            assertEvent(recursiveEvents, Watcher.Event.EventType.NodeDeleted, "/a", stat.getPzxid());
        }
    }

    @Test
    public void testRootWatcher()
            throws IOException, InterruptedException, KeeperException {
        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
            zk.addWatch("/", persistentWatcher, PERSISTENT_RECURSIVE);
            Stat stat = new Stat();

            zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
            assertEvent(events, EventType.NodeCreated, "/a", stat.getMzxid());

            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
            assertEvent(events, EventType.NodeCreated, "/a/b", stat.getMzxid());

            zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
            assertEvent(events, EventType.NodeCreated, "/b", stat.getMzxid());

            zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
            assertEvent(events, EventType.NodeCreated, "/b/c", stat.getMzxid());
        }
    }

    private void assertEvent(BlockingQueue<WatchedEvent> events, EventType eventType, String path, Stat stat)
        throws InterruptedException {
        assertEvent(events, eventType, path, stat.getMzxid());
    }

    private void assertEvent(BlockingQueue<WatchedEvent> events, EventType eventType, String path, long zxid)
        throws InterruptedException {
        assertEvent(events, eventType, KeeperState.SyncConnected, path, zxid);
    }

    private void assertEvent(BlockingQueue<WatchedEvent> events, EventType eventType, KeeperState keeperState,
        String path, long zxid) throws InterruptedException {
        WatchedEvent actualEvent = events.poll(5, TimeUnit.SECONDS);
        assertNotNull(actualEvent);
        WatchedEvent expectedEvent = new WatchedEvent(
            eventType,
            keeperState,
            path,
            zxid
        );
        TestUtils.assertWatchedEventEquals(expectedEvent, actualEvent);
    }
}