RemoveWatchesCmdTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Testing remove watches using command line
*/
public class RemoveWatchesCmdTest extends ClientBase {
private static final Logger LOG = LoggerFactory.getLogger(RemoveWatchesCmdTest.class);
private ZooKeeper zk;
private ZooKeeperMain zkMain;
@BeforeEach
@Override
public void setUp() throws Exception {
super.setUp();
zk = createClient();
zkMain = new ZooKeeperMain(zk);
}
@AfterEach
@Override
public void tearDown() throws Exception {
if (zk != null) {
zk.close();
}
super.tearDown();
}
/**
* Test verifies default options. When there is no passed options,
* removewatches command will use default options - WatcherType.ANY and
* local=false
*/
@Test
@Timeout(value = 30)
public void testRemoveWatchesWithNoPassedOptions() throws Exception {
List<EventType> expectedEvents = new ArrayList<>();
expectedEvents.add(EventType.ChildWatchRemoved);
expectedEvents.add(EventType.DataWatchRemoved);
MyWatcher myWatcher = new MyWatcher("/testnode1", expectedEvents, 2);
zk.create("/testnode1", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/testnode2", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
LOG.info("Adding childwatcher to /testnode1 and /testnode2");
zk.getChildren("/testnode1", myWatcher);
zk.getChildren("/testnode2", myWatcher);
LOG.info("Adding datawatcher to /testnode1 and /testnode2");
zk.getData("/testnode1", myWatcher, null);
zk.getData("/testnode2", myWatcher, null);
String cmdstring = "removewatches /testnode1";
LOG.info("Remove watchers using shell command : {}", cmdstring);
zkMain.cl.parseCommand(cmdstring);
assertTrue(zkMain.processZKCmd(zkMain.cl), "Removewatches cmd fails to remove child watches");
LOG.info("Waiting for the DataWatchRemoved event");
myWatcher.matches();
// verifying that other path child watches are not affected
assertTrue(zk.getChildWatches().contains("/testnode2"), "Failed to find child watches for the path testnode2");
assertTrue(zk.getDataWatches().contains("/testnode2"), "Failed to find data watches for the path testnode2");
}
/**
* Test verifies deletion of NodeDataChanged watches
*/
@Test
@Timeout(value = 30)
public void testRemoveNodeDataChangedWatches() throws Exception {
LOG.info("Adding data watcher using getData()");
List<EventType> expectedEvents = new ArrayList<>();
expectedEvents.add(EventType.DataWatchRemoved);
MyWatcher myWatcher = new MyWatcher("/testnode1", expectedEvents, 1);
zk.create("/testnode1", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.getData("/testnode1", myWatcher, null);
String cmdstring = "removewatches /testnode1 -d";
LOG.info("Remove watchers using shell command : {}", cmdstring);
zkMain.cl.parseCommand(cmdstring);
assertTrue(zkMain.processZKCmd(zkMain.cl), "Removewatches cmd fails to remove data watches");
LOG.info("Waiting for the DataWatchRemoved event");
myWatcher.matches();
// verifying that other path data watches are removed
assertEquals(0, zk.getDataWatches().size(), "Data watches are not removed : " + zk.getDataWatches());
}
/**
* Test verifies deletion of NodeCreated data watches
*/
@Test
@Timeout(value = 30)
public void testRemoveNodeCreatedWatches() throws Exception {
List<EventType> expectedEvents = new ArrayList<>();
expectedEvents.add(EventType.DataWatchRemoved);
MyWatcher myWatcher1 = new MyWatcher("/testnode1", expectedEvents, 1);
MyWatcher myWatcher2 = new MyWatcher("/testnode1/testnode2", expectedEvents, 1);
// Adding pre-created watcher
LOG.info("Adding NodeCreated watcher");
zk.exists("/testnode1", myWatcher1);
zk.exists("/testnode1/testnode2", myWatcher2);
String cmdstring1 = "removewatches /testnode1 -d";
LOG.info("Remove watchers using shell command : {}", cmdstring1);
zkMain.cl.parseCommand(cmdstring1);
assertTrue(zkMain.processZKCmd(zkMain.cl), "Removewatches cmd fails to remove pre-create watches");
myWatcher1.matches();
assertEquals(1, zk.getExistWatches().size(), "Failed to remove pre-create watches :" + zk.getExistWatches());
assertTrue(zk.getExistWatches().contains("/testnode1/testnode2"), "Failed to remove pre-create watches :" + zk.getExistWatches());
String cmdstring2 = "removewatches /testnode1/testnode2 -d";
LOG.info("Remove watchers using shell command : {}", cmdstring2);
zkMain.cl.parseCommand(cmdstring2);
assertTrue(zkMain.processZKCmd(zkMain.cl), "Removewatches cmd fails to remove data watches");
myWatcher2.matches();
assertEquals(0, zk.getExistWatches().size(), "Failed to remove pre-create watches : " + zk.getExistWatches());
}
/**
* Test verifies deletion of NodeChildrenChanged watches
*/
@Test
@Timeout(value = 30)
public void testRemoveNodeChildrenChangedWatches() throws Exception {
List<EventType> expectedEvents = new ArrayList<>();
expectedEvents.add(EventType.ChildWatchRemoved);
MyWatcher myWatcher = new MyWatcher("/testnode1", expectedEvents, 1);
zk.create("/testnode1", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
LOG.info("Adding child changed watcher");
zk.getChildren("/testnode1", myWatcher);
String cmdstring = "removewatches /testnode1 -c";
LOG.info("Remove watchers using shell command : {}", cmdstring);
zkMain.cl.parseCommand(cmdstring);
assertTrue(zkMain.processZKCmd(zkMain.cl), "Removewatches cmd fails to remove child watches");
myWatcher.matches();
assertEquals(0, zk.getChildWatches().size(), "Failed to remove child watches : " + zk.getChildWatches());
}
/**
* Test verifies deletion of NodeDeleted watches
*/
@Test
@Timeout(value = 30)
public void testRemoveNodeDeletedWatches() throws Exception {
LOG.info("Adding NodeDeleted watcher");
List<EventType> expectedEvents = new ArrayList<>();
expectedEvents.add(EventType.ChildWatchRemoved);
expectedEvents.add(EventType.NodeDeleted);
MyWatcher myWatcher = new MyWatcher("/testnode1", expectedEvents, 1);
zk.create("/testnode1", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/testnode1/testnode2", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.getChildren("/testnode1/testnode2", myWatcher);
zk.getChildren("/testnode1", myWatcher);
String cmdstring = "removewatches /testnode1 -c";
LOG.info("Remove watchers using shell command : {}", cmdstring);
zkMain.cl.parseCommand(cmdstring);
assertTrue(zkMain.processZKCmd(zkMain.cl), "Removewatches cmd fails to remove child watches");
LOG.info("Waiting for the ChildWatchRemoved event");
myWatcher.matches();
assertEquals(1, zk.getChildWatches().size(), "Failed to remove child watches : " + zk.getChildWatches());
assertTrue(zk.getChildWatches().contains("/testnode1/testnode2"), "Failed to remove child watches :" + zk.getChildWatches());
// verify node delete watcher
zk.delete("/testnode1/testnode2", -1);
myWatcher.matches();
}
/**
* Test verifies deletion of any watches
*/
@Test
@Timeout(value = 30)
public void testRemoveAnyWatches() throws Exception {
verifyRemoveAnyWatches(false);
}
/**
* Test verifies deletion of watches locally when there is no server
* connection
*/
@Test
@Timeout(value = 30)
public void testRemoveWatchesLocallyWhenNoServerConnection() throws Exception {
verifyRemoveAnyWatches(true);
}
private void verifyRemoveAnyWatches(boolean local) throws Exception {
final Map<String, List<EventType>> pathVsEvent = new HashMap<>();
LOG.info("Adding NodeChildrenChanged, NodeDataChanged watchers");
final CountDownLatch watcherLatch = new CountDownLatch(2);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case ChildWatchRemoved:
case DataWatchRemoved:
addWatchNotifications(pathVsEvent, event);
watcherLatch.countDown();
break;
case NodeChildrenChanged:
case NodeDataChanged:
addWatchNotifications(pathVsEvent, event);
break;
}
}
private void addWatchNotifications(Map<String, List<EventType>> pathVsEvent, WatchedEvent event) {
pathVsEvent.computeIfAbsent(event.getPath(), k -> new ArrayList<>())
.add(event.getType());
}
};
zk.create("/testnode1", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.getChildren("/testnode1", watcher);
zk.getData("/testnode1", watcher, null);
String cmdstring = "removewatches /testnode1 -a";
if (local) {
LOG.info("Stopping ZK server to verify deletion of watches locally");
stopServer();
cmdstring = "removewatches /testnode1 -a -l";
}
LOG.info("Remove watchers using shell command : {}", cmdstring);
zkMain.cl.parseCommand(cmdstring);
assertTrue(zkMain.processZKCmd(zkMain.cl), "Removewatches cmd fails to remove child/data watches");
LOG.info("Waiting for the WatchRemoved events");
watcherLatch.await(10, TimeUnit.SECONDS);
assertEquals(1, pathVsEvent.size(), "Didn't receives WatchRemoved events!");
assertTrue(pathVsEvent.get("/testnode1").contains(EventType.DataWatchRemoved), "Didn't receives DataWatchRemoved!");
assertTrue(pathVsEvent.get("/testnode1").contains(EventType.ChildWatchRemoved), "Didn't receives ChildWatchRemoved!");
}
private static class MyWatcher implements Watcher {
private final String path;
private String eventPath;
private final CountDownLatch latch;
private final List<EventType> expectedEvents = new ArrayList<>();
MyWatcher(String path, List<EventType> expectedEvents, int count) {
this.path = path;
this.latch = new CountDownLatch(count);
this.expectedEvents.addAll(expectedEvents);
}
public void process(WatchedEvent event) {
LOG.debug("Event path : {}, eventPath : {}", path, event.getPath());
this.eventPath = event.getPath();
if (expectedEvents.contains(event.getType())) {
latch.countDown();
}
}
public boolean matches() throws InterruptedException {
if (!latch.await(CONNECTION_TIMEOUT / 3, TimeUnit.MILLISECONDS)) {
LOG.error("Failed to get watch notifications!");
return false;
}
LOG.debug("Client path : {} eventPath : {}", path, eventPath);
return path.equals(eventPath);
}
}
}