PersistentWatcherTest.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.test;
import static org.apache.zookeeper.AddWatchMode.PERSISTENT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PersistentWatcherTest extends ClientBase {
private static final Logger LOG = LoggerFactory.getLogger(PersistentWatcherTest.class);
private BlockingQueue<WatchedEvent> events;
private Watcher persistentWatcher;
@Override
@BeforeEach
public void setUp() throws Exception {
super.setUp();
events = new LinkedBlockingQueue<>();
persistentWatcher = event -> events.add(event);
}
@Test
public void testBasic()
throws IOException, InterruptedException, KeeperException {
try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
zk.addWatch("/a/b", persistentWatcher, PERSISTENT);
internalTestBasic(zk);
}
}
@Test
public void testNullWatch()
throws IOException, InterruptedException, KeeperException {
try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
assertThrows(IllegalArgumentException.class, () -> {
zk.addWatch("/a/b", null, PERSISTENT);
});
assertThrows(IllegalArgumentException.class, () -> {
AsyncCallback.VoidCallback cb = (rc, path, ctx) -> {};
zk.addWatch("/a/b", null, PERSISTENT, cb, null);
});
}
}
@Test
public void testDefaultWatcher()
throws IOException, InterruptedException, KeeperException {
CountdownWatcher watcher = new CountdownWatcher() {
@Override
public synchronized void process(WatchedEvent event) {
super.process(event);
events.add(event);
}
};
try (ZooKeeper zk = createClient(watcher, hostPort)) {
zk.addWatch("/a/b", PERSISTENT);
events.clear(); // clear any events added during client connection
internalTestBasic(zk);
}
}
@Test
public void testBasicAsync()
throws IOException, InterruptedException, KeeperException {
CountdownWatcher watcher = new CountdownWatcher() {
@Override
public synchronized void process(WatchedEvent event) {
super.process(event);
events.add(event);
}
};
try (ZooKeeper zk = createClient(watcher, hostPort)) {
final CountDownLatch latch = new CountDownLatch(1);
AsyncCallback.VoidCallback cb = (rc, path, ctx) -> {
if (rc == KeeperException.Code.OK.intValue()) {
latch.countDown();
}
};
zk.addWatch("/a/b", persistentWatcher, PERSISTENT, cb, null);
assertTrue(latch.await(5, TimeUnit.SECONDS));
events.clear(); // clear any events added during client connection
internalTestBasic(zk);
}
}
@Test
public void testAsyncDefaultWatcher()
throws IOException, InterruptedException, KeeperException {
try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
final CountDownLatch latch = new CountDownLatch(1);
AsyncCallback.VoidCallback cb = (rc, path, ctx) -> {
if (rc == KeeperException.Code.OK.intValue()) {
latch.countDown();
}
};
zk.addWatch("/a/b", persistentWatcher, PERSISTENT, cb, null);
assertTrue(latch.await(5, TimeUnit.SECONDS));
internalTestBasic(zk);
}
}
private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException {
zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.setData("/a/b", new byte[0], -1);
zk.delete("/a/b/c", -1);
zk.delete("/a/b", -1);
zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/a/b");
assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/a/b");
assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b");
assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
}
@Test
public void testRemoval()
throws IOException, InterruptedException, KeeperException {
try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
zk.addWatch("/a/b", persistentWatcher, PERSISTENT);
zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/a/b");
zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false);
zk.delete("/a/b/c", -1);
zk.delete("/a/b", -1);
assertEvent(events, Watcher.Event.EventType.PersistentWatchRemoved, "/a/b");
}
}
@Test
public void testDisconnect() throws Exception {
try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
zk.addWatch("/a/b", persistentWatcher, PERSISTENT);
stopServer();
assertEvent(events, Watcher.Event.EventType.None, null);
startServer();
assertEvent(events, Watcher.Event.EventType.None, null);
internalTestBasic(zk);
}
}
@Test
public void testMultiClient()
throws IOException, InterruptedException, KeeperException {
try (ZooKeeper zk1 = createClient(new CountdownWatcher(), hostPort);
ZooKeeper zk2 = createClient(new CountdownWatcher(), hostPort)) {
zk1.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk1.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk1.addWatch("/a/b", persistentWatcher, PERSISTENT);
zk1.setData("/a/b", "one".getBytes(), -1);
Thread.sleep(1000); // give some time for the event to arrive
zk2.setData("/a/b", "two".getBytes(), -1);
zk2.setData("/a/b", "three".getBytes(), -1);
zk2.setData("/a/b", "four".getBytes(), -1);
assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
}
}
@Test
public void testRootWatcher()
throws IOException, InterruptedException, KeeperException {
try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
zk.addWatch("/", persistentWatcher, PERSISTENT);
zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.setData("/a", new byte[0], -1);
zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/");
assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/");
}
}
private void assertEvent(BlockingQueue<WatchedEvent> events, Watcher.Event.EventType eventType, String path)
throws InterruptedException {
WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
assertNotNull(event);
assertEquals(eventType, event.getType());
assertEquals(path, event.getPath());
}
}