RemoveWatchesTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.WatcherType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Verifies removing watches using ZooKeeper client apis
 */
public class RemoveWatchesTest extends ClientBase {

    private static final Logger LOG = LoggerFactory.getLogger(RemoveWatchesTest.class);
    private ZooKeeper zk1 = null;
    private ZooKeeper zk2 = null;

    @BeforeEach
    @Override
    public void setUp() throws Exception {
        super.setUp();
        zk1 = createClient();
        zk2 = createClient();
    }

    @AfterEach
    @Override
    public void tearDown() throws Exception {
        if (zk1 != null) {
            zk1.close();
        }
        if (zk2 != null) {
            zk2.close();
        }
        super.tearDown();
    }

    private void removeWatches(
        ZooKeeper zk,
        String path,
        Watcher watcher,
        WatcherType watcherType,
        boolean local,
        KeeperException.Code rc,
        boolean useAsync) throws InterruptedException, KeeperException {
        LOG.info("Sending removeWatches req using zk {} path: {} type: {} watcher: {} ", zk, path, watcherType, watcher);
        if (useAsync) {
            MyCallback c1 = new MyCallback(rc.intValue(), path);
            zk.removeWatches(path, watcher, watcherType, local, c1, null);
            assertTrue(c1.matches(), "Didn't succeeds removeWatch operation");
            if (rc.intValue() != c1.rc) {
                throw KeeperException.create(KeeperException.Code.get(c1.rc));
            }
        } else if (rc != Code.OK) {
            try {
                zk.removeWatches(path, watcher, watcherType, local);
                fail("expect exception code " + rc);
            } catch (KeeperException ex) {
                assertEquals(rc, ex.code());
                assertEquals(path, ex.getPath());
            }
        } else {
            zk.removeWatches(path, watcher, watcherType, local);
        }
    }

    private void removeAllWatches(
        ZooKeeper zk,
        String path,
        WatcherType watcherType,
        boolean local,
        KeeperException.Code rc,
        boolean useAsync) throws InterruptedException, KeeperException {
        LOG.info("Sending removeWatches req using zk {} path: {} type: {} ", zk, path, watcherType);
        if (useAsync) {
            MyCallback c1 = new MyCallback(rc.intValue(), path);
            zk.removeAllWatches(path, watcherType, local, c1, null);
            assertTrue(c1.matches(), "Didn't succeeds removeWatch operation");
            if (rc.intValue() != c1.rc) {
                throw KeeperException.create(KeeperException.Code.get(c1.rc));
            }
        } else if (rc != Code.OK) {
            try {
                zk.removeAllWatches(path, watcherType, local);
                fail("expect exception code " + rc);
            } catch (KeeperException ex) {
                assertEquals(rc, ex.code());
                assertEquals(path, ex.getPath());
            }
        } else {
            zk.removeAllWatches(path, watcherType, local);
        }
    }

    private void assertWatchers(ZooKeeper zk, String path, WatcherType... watcherTypes) {
        for (WatcherType watcherType : watcherTypes) {
            String msg = String.format("expect watcher for path %s and type %s", path, watcherType);
            assertTrue(isServerSessionWatcher(zk.getSessionId(), path, watcherType), msg);
        }
    }

    private void assertNoWatchers(ZooKeeper zk, String path, WatcherType... watcherTypes) {
        for (WatcherType watcherType : watcherTypes) {
            String msg = String.format("expect no watcher for path %s and type %s", path, watcherType);
            assertFalse(isServerSessionWatcher(zk.getSessionId(), path, watcherType), msg);
        }
    }

    private void assertWatchersExcept(ZooKeeper zk, String path, WatcherType... watcherTypes) {
        List<WatcherType> excludes = Arrays.asList(watcherTypes);
        for (WatcherType watcherType : WatcherType.values()) {
            if (watcherType == WatcherType.Any) {
                continue;
            }
            if (excludes.contains(watcherType)) {
                assertNoWatchers(zk, path, watcherType);
            } else {
                assertWatchers(zk, path, watcherType);
            }
        }
    }

    /**
     * Test verifies removal of single watcher when there is server connection
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveSingleWatcher(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        zk1.create("/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        MyWatcher w1 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
        MyWatcher w2 = new MyWatcher("/node2", 1);
        LOG.info("Adding data watcher {} on path {}", w2, "/node1");
        assertNotNull(zk2.exists("/node2", w2), "Didn't set data watches");
        removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK, useAsync);
        assertEquals(1, zk2.getDataWatches().size(), "Didn't find data watcher");
        assertEquals("/node2", zk2.getDataWatches().get(0), "Didn't find data watcher");
        removeWatches(zk2, "/node2", w2, WatcherType.Any, false, Code.OK, useAsync);
        assertTrue(w2.matches(), "Didn't remove data watcher");
        // closing session should remove ephemeral nodes and trigger data
        // watches if any
        if (zk1 != null) {
            zk1.close();
            zk1 = null;
        }

        List<EventType> events = w1.getEventsAfterWatchRemoval();
        assertFalse(events.contains(EventType.NodeDeleted), "Shouldn't get NodeDeletedEvent after watch removal");
        assertEquals(0, events.size(), "Shouldn't get NodeDeletedEvent after watch removal");
    }

    /**
     * Test verifies removal of multiple data watchers when there is server
     * connection
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testMultipleDataWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        MyWatcher w1 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
        MyWatcher w2 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", w2, "/node1");
        assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
        removeWatches(zk2, "/node1", w2, WatcherType.Data, false, Code.OK, useAsync);
        assertEquals(1, zk2.getDataWatches().size(), "Didn't find data watcher");
        assertEquals("/node1", zk2.getDataWatches().get(0), "Didn't find data watcher");
        removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync);
        assertTrue(w2.matches(), "Didn't remove data watcher");
        // closing session should remove ephemeral nodes and trigger data
        // watches if any
        if (zk1 != null) {
            zk1.close();
            zk1 = null;
        }

        List<EventType> events = w2.getEventsAfterWatchRemoval();
        assertEquals(0, events.size(), "Shouldn't get NodeDeletedEvent after watch removal");
    }

    /**
     * Test verifies removal of multiple child watchers when there is server
     * connection
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testMultipleChildWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher w1 = new MyWatcher("/node1", 1);
        LOG.info("Adding child watcher {} on path {}", w1, "/node1");
        zk2.getChildren("/node1", w1);
        MyWatcher w2 = new MyWatcher("/node1", 1);
        LOG.info("Adding child watcher {} on path {}", w2, "/node1");
        zk2.getChildren("/node1", w2);
        removeWatches(zk2, "/node1", w2, WatcherType.Children, false, Code.OK, useAsync);
        assertTrue(w2.matches(), "Didn't remove child watcher");
        assertEquals(1, zk2.getChildWatches().size(), "Didn't find child watcher");
        removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync);
        assertTrue(w1.matches(), "Didn't remove child watcher");
        // create child to see NodeChildren notification
        zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        // waiting for child watchers to be notified
        int count = 30;
        while (count > 0) {
            if (w1.getEventsAfterWatchRemoval().size() > 0) {
                break;
            }
            count--;
            Thread.sleep(100);
        }
        // watcher2
        List<EventType> events = w2.getEventsAfterWatchRemoval();
        assertEquals(0, events.size(), "Shouldn't get NodeChildrenChanged event");
    }

    /**
     * Test verifies null watcher with WatcherType.Any - remove all the watchers
     * data, child, exists
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAllWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher w1 = new MyWatcher("/node1", 2);
        MyWatcher w2 = new MyWatcher("/node1", 2);
        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
        LOG.info("Adding data watcher {} on path {}", w2, "/node1");
        assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
        LOG.info("Adding child watcher {} on path {}", w1, "/node1");
        zk2.getChildren("/node1", w1);
        LOG.info("Adding child watcher {} on path {}", w2, "/node1");
        zk2.getChildren("/node1", w2);
        removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync);
        removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK, useAsync);
        zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        assertTrue(w1.matches(), "Didn't remove data watcher");
        assertTrue(w2.matches(), "Didn't remove child watcher");
    }

    /**
     * Test verifies null watcher with WatcherType.Data - remove all data
     * watchers. Child watchers shouldn't be removed
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAllDataWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher w1 = new MyWatcher("/node1", 1);
        MyWatcher w2 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
        LOG.info("Adding data watcher {} on path {}", w2, "/node1");
        assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
        LOG.info("Adding child watcher {} on path {}", w1, "/node1");
        zk2.getChildren("/node1", w1);
        LOG.info("Adding child watcher {} on path {}", w2, "/node1");
        zk2.getChildren("/node1", w2);
        removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK, useAsync);
        removeWatches(zk2, "/node1", w2, WatcherType.Data, false, Code.OK, useAsync);
        zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        assertTrue(w1.matches(), "Didn't remove data watcher");
        assertTrue(w2.matches(), "Didn't remove data watcher");
        // waiting for child watchers to be notified
        int count = 10;
        while (count > 0) {
            if (w1.getEventsAfterWatchRemoval().size() > 0 && w2.getEventsAfterWatchRemoval().size() > 0) {
                break;
            }
            count--;
            Thread.sleep(1000);
        }
        // watcher1
        List<EventType> events = w1.getEventsAfterWatchRemoval();
        assertEquals(1, events.size(), "Didn't get NodeChildrenChanged event");
        assertTrue(events.contains(EventType.NodeChildrenChanged), "Didn't get NodeChildrenChanged event");
        // watcher2
        events = w2.getEventsAfterWatchRemoval();
        assertEquals(1, events.size(), "Didn't get NodeChildrenChanged event");
        assertTrue(events.contains(EventType.NodeChildrenChanged), "Didn't get NodeChildrenChanged event");
    }

    /**
     * Test verifies null watcher with WatcherType.Children - remove all child
     * watchers. Data watchers shouldn't be removed
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAllChildWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher w1 = new MyWatcher("/node1", 1);
        MyWatcher w2 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
        LOG.info("Adding data watcher {} on path {}", w2, "/node1");
        assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
        LOG.info("Adding child watcher {} on path {}", w1, "/node1");
        zk2.getChildren("/node1", w1);
        LOG.info("Adding child watcher {} on path {}", w2, "/node1");
        zk2.getChildren("/node1", w2);
        removeWatches(zk2, "/node1", w1, WatcherType.Children, false, Code.OK, useAsync);
        removeWatches(zk2, "/node1", w2, WatcherType.Children, false, Code.OK, useAsync);
        zk1.setData("/node1", "test".getBytes(), -1);
        assertTrue(w1.matches(), "Didn't remove child watcher");
        assertTrue(w2.matches(), "Didn't remove child watcher");
        // waiting for child watchers to be notified
        int count = 10;
        while (count > 0) {
            if (w1.getEventsAfterWatchRemoval().size() > 0 && w2.getEventsAfterWatchRemoval().size() > 0) {
                break;
            }
            count--;
            Thread.sleep(1000);
        }
        // watcher1
        List<EventType> events = w1.getEventsAfterWatchRemoval();
        assertEquals(1, events.size(), "Didn't get NodeDataChanged event");
        assertTrue(events.contains(EventType.NodeDataChanged), "Didn't get NodeDataChanged event");
        // watcher2
        events = w2.getEventsAfterWatchRemoval();
        assertEquals(1, events.size(), "Didn't get NodeDataChanged event");
        assertTrue(events.contains(EventType.NodeDataChanged), "Didn't get NodeDataChanged event");
    }

    /**
     * Test verifies removing all watcher with WatcherType.Persistent.
     *
     * <p>All other watchers shouldn't be removed.
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAllPersistentWatchers(boolean useAsync) throws InterruptedException, KeeperException {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        BlockingDeque<WatchedEvent> persistentEvents1 = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> persistentEvents2 = new LinkedBlockingDeque<>();

        Watcher persistentWatcher1 = persistentEvents1::add;
        Watcher persistentWatcher2 = persistentEvents2::add;
        zk2.addWatch("/node1", persistentWatcher1, AddWatchMode.PERSISTENT);
        zk2.addWatch("/node1", persistentWatcher2, AddWatchMode.PERSISTENT);

        BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> childrenEvents = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
        zk2.getData("/node1", dataEvents::add, null);
        zk2.getChildren("/node1", childrenEvents::add);
        zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);

        removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.Persistent, false, Code.OK, useAsync);
        removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.Persistent, false, Code.OK, useAsync);
        removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.Data, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.Data, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.Children, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.Children, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.PersistentRecursive, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.PersistentRecursive, false, Code.NOWATCHER, useAsync);

        zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.setData("/node1", null, -1);

        assertEvent(persistentEvents1, EventType.PersistentWatchRemoved, "/node1");
        assertEvent(persistentEvents2, EventType.PersistentWatchRemoved, "/node1");
        assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
        assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
        assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/node2");
        assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");
    }

    /**
     * Test verifies removing all watcher with WatcherType.PersistentRecursive.
     *
     * <p>All other watchers shouldn't be removed
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAllPersistentRecursiveWatchers(boolean useAsync) throws InterruptedException, KeeperException {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        BlockingDeque<WatchedEvent> recursiveEvents1 = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> recursiveEvents2 = new LinkedBlockingDeque<>();

        Watcher recursiveWatcher1 = recursiveEvents1::add;
        Watcher recursiveWatcher2 = recursiveEvents2::add;
        zk2.addWatch("/node1", recursiveWatcher1, AddWatchMode.PERSISTENT_RECURSIVE);
        zk2.addWatch("/node1", recursiveWatcher2, AddWatchMode.PERSISTENT_RECURSIVE);

        BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> childrenEvents = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
        zk2.getData("/node1", dataEvents::add, null);
        zk2.getChildren("/node1", childrenEvents::add);
        zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);

        removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.PersistentRecursive, false, Code.OK, useAsync);
        removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.PersistentRecursive, false, Code.OK, useAsync);
        removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.Data, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.Data, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.Children, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.Children, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.Persistent, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.Persistent, false, Code.NOWATCHER, useAsync);

        assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved, "/node1");
        assertEvent(recursiveEvents2, EventType.PersistentWatchRemoved, "/node1");

        zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.setData("/node1", "test".getBytes(), -1);

        assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
        assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
        assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
    }
    /**
     * Test verifies given watcher doesn't exists!
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testNoWatcherException(boolean useAsync) throws IOException, InterruptedException, KeeperException {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher w1 = new MyWatcher("/node1", 2);
        MyWatcher w2 = new MyWatcher("/node1", 2);
        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
        LOG.info("Adding data watcher {} on path {}", w2, "/node1");
        assertNull(zk2.exists("/node2", w2), "Didn't set data watches");
        LOG.info("Adding child watcher {} on path {}", w1, "/node1");
        zk2.getChildren("/node1", w1);
        LOG.info("Adding child watcher {} on path {}", w2, "/node1");
        zk2.getChildren("/node1", w2);

        // New Watcher which will be used for removal
        MyWatcher w3 = new MyWatcher("/node1", 2);

        removeWatches(zk2, "/node1", w3, WatcherType.Any, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/node1", w3, WatcherType.Children, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/node1", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync);
        removeWatches(zk2, "/nonexists", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync);
    }

    /**
     * Test verifies WatcherType.Any - removes only the configured data watcher
     * function
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAnyDataWatcher(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher w1 = new MyWatcher("/node1", 1);
        MyWatcher w2 = new MyWatcher("/node1", 2);
        // Add multiple data watches
        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
        LOG.info("Adding data watcher {} on path {}", w2, "/node1");
        assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
        // Add child watch
        LOG.info("Adding child watcher {} on path {}", w2, "/node1");
        zk2.getChildren("/node1", w2);
        removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync);
        assertTrue(w1.matches(), "Didn't remove data watcher");
        assertEquals(1, zk2.getChildWatches().size(), "Didn't find child watcher");
        assertEquals(1, zk2.getDataWatches().size(), "Didn't find data watcher");
        removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK, useAsync);
        assertTrue(w2.matches(), "Didn't remove child watcher");
    }

    /**
     * Test verifies WatcherType.Any - removes only the configured child watcher
     * function
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAnyChildWatcher(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher w1 = new MyWatcher("/node1", 2);
        MyWatcher w2 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
        // Add multiple child watches
        LOG.info("Adding child watcher {} on path {}", w1, "/node1");
        zk2.getChildren("/node1", w2);
        LOG.info("Adding child watcher {} on path {}", w2, "/node1");
        zk2.getChildren("/node1", w1);
        removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK, useAsync);
        assertTrue(w2.matches(), "Didn't remove child watcher");
        assertEquals(1, zk2.getChildWatches().size(), "Didn't find child watcher");
        assertEquals(1, zk2.getDataWatches().size(), "Didn't find data watcher");
        removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync);
        assertTrue(w1.matches(), "Didn't remove watchers");
    }

    /**
     * Test verifies when there is no server connection. Remove watches when
     * local=true, otw should retain it
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveWatcherWhenNoConnection(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher w1 = new MyWatcher("/node1", 2);
        MyWatcher w2 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
        // Add multiple child watches
        LOG.info("Adding child watcher {} on path {}", w1, "/node1");
        zk2.getChildren("/node1", w1);
        LOG.info("Adding child watcher {} on path {}", w1, "/node1");
        zk2.getChildren("/node1", w2);
        stopServer();
        removeWatches(zk2, "/node1", w2, WatcherType.Any, true, Code.OK, useAsync);
        assertTrue(w2.matches(), "Didn't remove child watcher");
        assertFalse(w1.matches(), "Shouldn't remove data watcher");
        removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.CONNECTIONLOSS, useAsync);
        assertFalse(w1.matches(), "Shouldn't remove data watcher");

        // when local=true, here if connection not available, simply removes
        // from local session
        removeWatches(zk2, "/node1", w1, WatcherType.Any, true, Code.OK, useAsync);
        assertTrue(w1.matches(), "Didn't remove data watcher");
    }

    /**
     * Test verifies many pre-node watchers. Also, verifies internal
     * datastructure 'watchManager.existWatches'
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testManyPreNodeWatchers(boolean useAsync) throws Exception {
        int count = 50;
        List<MyWatcher> wList = new ArrayList<>(count);
        MyWatcher w;
        String path = "/node";
        // Exists watcher
        for (int i = 0; i < count; i++) {
            final String nodePath = path + i;
            w = new MyWatcher(nodePath, 1);
            wList.add(w);
            LOG.info("Adding pre node watcher {} on path {}", w, nodePath);
            zk1.exists(nodePath, w);
        }
        assertEquals(count, zk1.getExistWatches().size(), "Failed to add watchers!");
        for (int i = 0; i < count; i++) {
            final MyWatcher watcher = wList.get(i);
            removeWatches(zk1, path + i, watcher, WatcherType.Data, false, Code.OK, useAsync);
            assertTrue(watcher.matches(), "Didn't remove data watcher");
        }
        assertEquals(0, zk1.getExistWatches().size(), "Didn't remove watch references!");
    }

    /**
     * Test verifies many child watchers. Also, verifies internal datastructure
     * 'watchManager.childWatches'
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testManyChildWatchers(boolean useAsync) throws Exception {
        int count = 50;
        List<MyWatcher> wList = new ArrayList<>(count);
        MyWatcher w;
        String path = "/node";

        // Child watcher
        for (int i = 0; i < count; i++) {
            String nodePath = path + i;
            zk1.create(nodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            nodePath += "/";
        }
        for (int i = 0; i < count; i++) {
            String nodePath = path + i;
            w = new MyWatcher(path + i, 1);
            wList.add(w);
            LOG.info("Adding child watcher {} on path {}", w, nodePath);
            zk1.getChildren(nodePath, w);
            nodePath += "/";
        }
        assertEquals(count, zk1.getChildWatches().size(), "Failed to add watchers!");
        for (int i = 0; i < count; i++) {
            final MyWatcher watcher = wList.get(i);
            removeWatches(zk1, path + i, watcher, WatcherType.Children, false, Code.OK, useAsync);
            assertTrue(watcher.matches(), "Didn't remove child watcher");
        }
        assertEquals(0, zk1.getChildWatches().size(), "Didn't remove watch references!");
    }

    /**
     * Test verifies many data watchers. Also, verifies internal datastructure
     * 'watchManager.dataWatches'
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testManyDataWatchers(boolean useAsync) throws Exception {
        int count = 50;
        List<MyWatcher> wList = new ArrayList<>(count);
        MyWatcher w;
        String path = "/node";

        // Data watcher
        for (int i = 0; i < count; i++) {
            String nodePath = path + i;
            w = new MyWatcher(path + i, 1);
            wList.add(w);
            zk1.create(nodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            LOG.info("Adding data watcher {} on path {}", w, nodePath);
            zk1.getData(nodePath, w, null);
            nodePath += "/";
        }
        assertEquals(count, zk1.getDataWatches().size(), "Failed to add watchers!");
        for (int i = 0; i < count; i++) {
            final MyWatcher watcher = wList.get(i);
            removeWatches(zk1, path + i, watcher, WatcherType.Data, false, Code.OK, useAsync);
            assertTrue(watcher.matches(), "Didn't remove data watcher");
        }
        assertEquals(0, zk1.getDataWatches().size(), "Didn't remove watch references!");
    }

    /**
     * Test verifies removal of many watchers locally when no connection and
     * WatcherType#Any. Also, verifies internal watchManager datastructures
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testManyWatchersWhenNoConnection(boolean useAsync) throws Exception {
        int count = 3;
        List<MyWatcher> wList = new ArrayList<>(count);
        MyWatcher w;
        String path = "/node";

        // Child watcher
        for (int i = 0; i < count; i++) {
            String nodePath = path + i;
            zk1.create(nodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            nodePath += "/";
        }
        for (int i = 0; i < count; i++) {
            String nodePath = path + i;
            w = new MyWatcher(path + i, 2);
            wList.add(w);
            LOG.info("Adding child watcher {} on path {}", w, nodePath);
            zk1.getChildren(nodePath, w);
            nodePath += "/";
        }
        assertEquals(count, zk1.getChildWatches().size(), "Failed to add watchers!");

        // Data watcher
        for (int i = 0; i < count; i++) {
            String nodePath = path + i;
            w = wList.get(i);
            LOG.info("Adding data watcher {} on path {}", w, nodePath);
            zk1.getData(nodePath, w, null);
            nodePath += "/";
        }
        assertEquals(count, zk1.getDataWatches().size(), "Failed to add watchers!");
        stopServer();
        for (int i = 0; i < count; i++) {
            final MyWatcher watcher = wList.get(i);
            removeWatches(zk1, path + i, watcher, WatcherType.Any, true, Code.OK, useAsync);
            assertTrue(watcher.matches(), "Didn't remove watcher");
        }
        assertEquals(0, zk1.getChildWatches().size(), "Didn't remove watch references!");
        assertEquals(0, zk1.getDataWatches().size(), "Didn't remove watch references!");
    }

    /**
     * Test verifies removing watcher having namespace
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testChRootRemoveWatcher(boolean useAsync) throws Exception {
        // creating the subtree for chRoot clients.
        String chRoot = "/appsX";
        zk1.create("/appsX", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        if (zk1 != null) {
            zk1.close();
        }
        if (zk2 != null) {
            zk2.close();
        }
        // Creating chRoot client.
        zk1 = createClient(this.hostPort + chRoot);
        zk2 = createClient(this.hostPort + chRoot);

        LOG.info("Creating child znode /node1 using chRoot client");
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher w1 = new MyWatcher("/node1", 2);
        MyWatcher w2 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
        // Add multiple child watches
        LOG.info("Adding child watcher {} on path {}", w1, "/node1");
        zk2.getChildren("/node1", w2);
        LOG.info("Adding child watcher {} on path {}", w2, "/node1");
        zk2.getChildren("/node1", w1);
        removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync);
        assertTrue(w1.matches(), "Didn't remove child watcher");
        assertEquals(1, zk2.getChildWatches().size(), "Didn't find child watcher");
        removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK, useAsync);
        assertTrue(w2.matches(), "Didn't remove child watcher");
    }

    /**
     * Verify that if a given watcher doesn't exist, the server properly
     * returns an error code for it.
     *
     * In our Java client implementation, we check that a given watch exists at
     * two points:
     *
     * 1) before submitting the RemoveWatches request
     * 2) after a successful server response, when the watcher needs to be
     *    removed
     *
     * Since this can be racy (i.e. a watch can fire while a RemoveWatches
     * request is in-flight), we need to verify that the watch was actually
     * removed (i.e. from ZKDatabase and DataTree) and return NOWATCHER if
     * needed.
     *
     * Also, other implementations might not do a client side check before
     * submitting a RemoveWatches request. If we don't do a server side check,
     * we would just return ZOK even if no watch was removed.
     *
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testNoWatcherServerException(boolean useAsync) throws KeeperException, InterruptedException, IOException, TimeoutException {
        CountdownWatcher watcher = new CountdownWatcher();
        ZooKeeper zk = spy(new ZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher));
        MyWatchManager watchManager = new MyWatchManager(false, watcher);
        doReturn(watchManager).when(zk).getWatchManager();

        watcher.waitForConnected(CONNECTION_TIMEOUT);

        removeWatches(zk, "/nowatchhere", watcher, WatcherType.Data, false, Code.NOWATCHER, useAsync);

        assertThat("Server didn't return NOWATCHER", watchManager.lastReturnCode, is(Code.NOWATCHER.intValue()));
    }

    /**
     * Test verifies given watcher doesn't exists!
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAllNoWatcherException(boolean useAsync) throws IOException, InterruptedException, KeeperException {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.NOWATCHER, useAsync);
    }

    /**
     * Test verifies null watcher
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 30)
    public void testNullWatcherReference(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        try {
            if (useAsync) {
                zk1.removeWatches("/node1", null, WatcherType.Data, false, null, null);
            } else {
                zk1.removeWatches("/node1", null, WatcherType.Data, false);
            }
            fail("Must throw IllegalArgumentException as watcher is null!");
        } catch (IllegalArgumentException iae) {
            // expected
        }
    }

    /**
     * Test verifies WatcherType.Data - removes only the configured data watcher
     * function
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveWhenMultipleDataWatchesOnAPath(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        final CountDownLatch dataWatchCount = new CountDownLatch(1);
        final CountDownLatch rmWatchCount = new CountDownLatch(1);
        Watcher w1 = event -> {
            if (event.getType() == EventType.DataWatchRemoved) {
                rmWatchCount.countDown();
            }
        };
        Watcher w2 = event -> {
            if (event.getType() == EventType.NodeDataChanged) {
                dataWatchCount.countDown();
            }
        };
        // Add multiple data watches
        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
        LOG.info("Adding data watcher {} on path {}", w2, "/node1");
        assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");

        removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK, useAsync);
        assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher");

        zk1.setData("/node1", "test".getBytes(), -1);
        LOG.info("Waiting for data watchers to be notified");
        assertTrue(dataWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't get data watch notification!");
    }

    /**
     * Test verifies WatcherType.Children - removes only the configured child
     * watcher function
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveWhenMultipleChildWatchesOnAPath(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        final CountDownLatch childWatchCount = new CountDownLatch(1);
        final CountDownLatch rmWatchCount = new CountDownLatch(1);
        Watcher w1 = event -> {
            if (event.getType() == EventType.ChildWatchRemoved) {
                rmWatchCount.countDown();
            }
        };
        Watcher w2 = event -> {
            if (event.getType() == EventType.NodeChildrenChanged) {
                childWatchCount.countDown();
            }
        };
        // Add multiple child watches
        LOG.info("Adding child watcher {} on path {}", w1, "/node1");
        assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set child watches");
        LOG.info("Adding child watcher {} on path {}", w2, "/node1");
        assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches");

        removeWatches(zk2, "/node1", w1, WatcherType.Children, false, Code.OK, useAsync);
        assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove child watcher");

        zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        LOG.info("Waiting for child watchers to be notified");
        assertTrue(childWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't get child watch notification!");
    }

    /**
     * Test verifies {@link WatcherType#Persistent} - removes only the configured watcher function
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveWhenMultiplePersistentWatchesOnAPath(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        BlockingDeque<WatchedEvent> persistentEvents1 = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> persistentEvents2 = new LinkedBlockingDeque<>();
        Watcher w1 = persistentEvents1::add;
        // Add multiple persistent watches
        zk2.addWatch("/node1", w1, AddWatchMode.PERSISTENT);
        zk2.addWatch("/node1", persistentEvents2::add, AddWatchMode.PERSISTENT);

        removeWatches(zk2, "/node1", w1, WatcherType.Persistent, false, Code.OK, useAsync);
        assertEvent(persistentEvents1, EventType.PersistentWatchRemoved, "/node1");

        zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertEvent(persistentEvents2, EventType.NodeChildrenChanged, "/node1");
        assertNoEvent(persistentEvents1);
    }

    /**
     * Test verifies {@link WatcherType#PersistentRecursive} - removes only the configured watcher function
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveWhenMultiplePersistentRecursiveWatchesOnAPath(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        BlockingDeque<WatchedEvent> recursiveEvents1 = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> recursiveEvents2 = new LinkedBlockingDeque<>();
        Watcher w1 = recursiveEvents1::add;
        // Add multiple persistent recursive watches
        zk2.addWatch("/node1", w1, AddWatchMode.PERSISTENT_RECURSIVE);
        zk2.addWatch("/node1", recursiveEvents2::add, AddWatchMode.PERSISTENT_RECURSIVE);

        removeWatches(zk2, "/node1", w1, WatcherType.PersistentRecursive, false, Code.OK, useAsync);
        assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved, "/node1");

        zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertEvent(recursiveEvents2, EventType.NodeCreated, "/node1/node2");
        assertNoEvent(recursiveEvents1);
    }

    /**
     * Test verifies {@link OpCode#checkWatches} {@link WatcherType#Persistent} using {@link WatcherType#Data}.
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemovePersistentWatchesOnAPathPartially(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
        Watcher persistentWatcher = persistentEvents::add;
        zk2.addWatch("/node1", persistentWatcher, AddWatchMode.PERSISTENT);

        assertWatchers(zk2, "/node1", WatcherType.Persistent);
        assertNoWatchers(zk2, "/node1", WatcherType.Data);
        removeWatches(zk2, "/node1", persistentWatcher, WatcherType.Data, false, Code.NOWATCHER, useAsync);
        assertWatchers(zk2, "/node1", WatcherType.Persistent);
        assertNoWatchers(zk2, "/node1", WatcherType.Data);

        zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.setData("/node1", null, -1);

        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
        assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");

        assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS));
    }

    /**
     * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Data}.
     *
     * <p>All other watcher types shouldn't be removed.
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAllDataWatchesOnAPath(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        BlockingDeque<WatchedEvent> dataEvents1 = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> dataEvents2 = new LinkedBlockingDeque<>();
        // Add multiple data watches
        zk2.getData("/node1", dataEvents1::add, null);
        zk2.getData("/node1", dataEvents2::add, null);

        BlockingDeque<WatchedEvent> childrenEvents = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
        zk2.getChildren("/node1", childrenEvents::add);
        zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
        zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);

        assertWatchers(zk2, "/node1", WatcherType.values());
        removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK, useAsync);
        assertEvent(dataEvents1, EventType.DataWatchRemoved, "/node1");
        assertEvent(dataEvents2, EventType.DataWatchRemoved, "/node1");
        assertWatchersExcept(zk2, "/node1", WatcherType.Data);

        zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.setData("/node1", null, -1);

        assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");

        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
        assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");

        assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child");
        assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");

        assertNoEvent(dataEvents1);
        assertNoEvent(dataEvents2);
    }

    /**
     * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Children}.
     *
     * <p>All other watcher types shouldn't be removed.
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAllChildWatchesOnAPath(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        BlockingDeque<WatchedEvent> childrenEvents1 = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> childrenEvents2 = new LinkedBlockingDeque<>();
        // Add multiple child watches
        zk2.getChildren("/node1", childrenEvents1::add);
        zk2.getChildren("/node1", childrenEvents2::add);

        BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
        zk2.getData("/node1", dataEvents::add, null);
        zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
        zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);

        assertWatchers(zk2, "/node1", WatcherType.values());
        removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK, useAsync);
        assertEvent(childrenEvents1, EventType.ChildWatchRemoved, "/node1");
        assertEvent(childrenEvents2, EventType.ChildWatchRemoved, "/node1");
        assertWatchersExcept(zk2, "/node1", WatcherType.Children);

        zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.setData("/node1", null, -1);

        assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
        assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
        assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child");
        assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");

        assertNull(childrenEvents1.poll(10, TimeUnit.MILLISECONDS));
        assertNull(childrenEvents2.poll(10, TimeUnit.MILLISECONDS));
    }

    /**
     * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Persistent}.
     *
     * <p>All other watcher types shouldn't be removed.
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAllPersistentWatchesOnAPath(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        BlockingDeque<WatchedEvent> persistentEvents1 = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> persistentEvents2 = new LinkedBlockingDeque<>();
        // Add multiple persistent watches
        zk2.addWatch("/node1", persistentEvents1::add, AddWatchMode.PERSISTENT);
        zk2.addWatch("/node1", persistentEvents2::add, AddWatchMode.PERSISTENT);

        BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> childrenEvents = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
        zk2.getData("/node1", dataEvents::add, null);
        zk2.getChildren("/node1", childrenEvents::add, null);
        zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);

        assertWatchers(zk2, "/node1", WatcherType.values());
        removeAllWatches(zk2, "/node1", WatcherType.Persistent, false, Code.OK, useAsync);
        assertEvent(persistentEvents1, EventType.PersistentWatchRemoved, "/node1");
        assertEvent(persistentEvents2, EventType.PersistentWatchRemoved, "/node1");
        assertWatchersExcept(zk2, "/node1", WatcherType.Persistent);

        zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.setData("/node1", null, -1);

        assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
        assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
        assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child1");
        assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");

        assertNull(persistentEvents1.poll(10, TimeUnit.MILLISECONDS));
        assertNull(persistentEvents2.poll(10, TimeUnit.MILLISECONDS));
    }

    /**
     * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Persistent} using {@link WatcherType#Data}.
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAllPersistentWatchesOnAPathPartially(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
        zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);

        assertWatchers(zk2, "/node1", WatcherType.Persistent);
        assertNoWatchers(zk2, "/node1", WatcherType.Data);
        removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.NOWATCHER, useAsync);
        assertWatchers(zk2, "/node1", WatcherType.Persistent);
        assertNoWatchers(zk2, "/node1", WatcherType.Data);

        zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.setData("/node1", null, -1);

        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
        assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");

        assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS));
    }

    /**
     * Test verifies {@link OpCode#removeWatches} {@link WatcherType#PersistentRecursive}.
     *
     * <p>All other watcher types shouldn't be removed.
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAllPersistentRecursiveWatchesOnAPath(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        BlockingDeque<WatchedEvent> recursiveEvents1 = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> recursiveEvents2 = new LinkedBlockingDeque<>();
        // Add multiple persistent recursive watches
        zk2.addWatch("/node1", recursiveEvents1::add, AddWatchMode.PERSISTENT_RECURSIVE);
        zk2.addWatch("/node1", recursiveEvents2::add, AddWatchMode.PERSISTENT_RECURSIVE);

        BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> childrenEvents = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
        zk2.getData("/node1", dataEvents::add, null);
        zk2.getChildren("/node1", childrenEvents::add, null);
        zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);

        assertWatchers(zk2, "/node1", WatcherType.values());
        removeAllWatches(zk2, "/node1", WatcherType.PersistentRecursive, false, Code.OK, useAsync);
        assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved, "/node1");
        assertEvent(recursiveEvents2, EventType.PersistentWatchRemoved, "/node1");
        assertWatchersExcept(zk2, "/node1", WatcherType.PersistentRecursive);

        zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.setData("/node1", null, -1);

        assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
        assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
        assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");

        assertNull(recursiveEvents1.poll(10, TimeUnit.MILLISECONDS));
        assertNull(recursiveEvents2.poll(10, TimeUnit.MILLISECONDS));
    }

    /**
     * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Any}.
     *
     * <p>All watcher types should be removed.
     */
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    @Timeout(value = 90)
    public void testRemoveAllWatchesOnAPath(boolean useAsync) throws Exception {
        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        // Add multiple child watches
        BlockingDeque<WatchedEvent> childEvents1 = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> childEvents2 = new LinkedBlockingDeque<>();
        zk2.getChildren("/node1", childEvents1::add);
        zk2.getChildren("/node1", childEvents2::add);

        // Add multiple data watches
        BlockingDeque<WatchedEvent> dataEvents1 = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> dataEvents2 = new LinkedBlockingDeque<>();
        zk2.getData("/node1", dataEvents1::add, null);
        zk2.exists("/node1", dataEvents2::add);

        // Add multiple persistent watches
        BlockingDeque<WatchedEvent> persistentEvents1 = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> persistentEvents2 = new LinkedBlockingDeque<>();
        zk2.addWatch("/node1", persistentEvents1::add, AddWatchMode.PERSISTENT);
        zk2.addWatch("/node1", persistentEvents2::add, AddWatchMode.PERSISTENT);

        // Add multiple recursive watches
        BlockingDeque<WatchedEvent> recursiveEvents1 = new LinkedBlockingDeque<>();
        BlockingDeque<WatchedEvent> recursiveEvents2 = new LinkedBlockingDeque<>();
        zk2.addWatch("/node1", recursiveEvents1::add, AddWatchMode.PERSISTENT_RECURSIVE);
        zk2.addWatch("/node1", recursiveEvents2::add, AddWatchMode.PERSISTENT_RECURSIVE);

        assertWatchers(zk2, "/node1", WatcherType.values());
        removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK, useAsync);

        assertEvent(childEvents1, EventType.ChildWatchRemoved, "/node1");
        assertEvent(childEvents2, EventType.ChildWatchRemoved, "/node1");

        assertEvent(dataEvents1, EventType.DataWatchRemoved, "/node1");
        assertEvent(dataEvents2, EventType.DataWatchRemoved, "/node1");

        assertEvent(persistentEvents1, EventType.PersistentWatchRemoved, "/node1");
        assertEvent(persistentEvents2, EventType.PersistentWatchRemoved, "/node1");

        assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved, "/node1");
        assertEvent(recursiveEvents2, EventType.PersistentWatchRemoved, "/node1");
        assertNoWatchers(zk2, "/node1", WatcherType.values());

        zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk1.setData("/node1", null, -1);

        assertNoEvent(childEvents1);
        assertNoEvent(childEvents2);
        assertNoEvent(dataEvents1);
        assertNoEvent(dataEvents2);
        assertNoEvent(persistentEvents1);
        assertNoEvent(persistentEvents2);
        assertNoEvent(recursiveEvents1);
        assertNoEvent(recursiveEvents2);
    }

    private static class MyWatchManager extends ZKWatchManager {

        int lastReturnCode;

        MyWatchManager(boolean disableAutoWatchReset, Watcher defaultWatcher) {
            super(disableAutoWatchReset, defaultWatcher);
        }

        void containsWatcher(String path, Watcher watcher, WatcherType watcherType) {
            // prevent contains watcher
        }

        @Override
        protected boolean removeWatches(
            Map<String, Set<Watcher>> pathVsWatcher,
            Watcher watcher,
            String path,
            boolean local,
            int rc,
            Set<Watcher> removedWatchers) {
            lastReturnCode = rc;
            return false;
        }
    }

    private static class MyWatcher implements Watcher {

        private final String path;
        private String eventPath;
        private CountDownLatch latch;
        private List<EventType> eventsAfterWatchRemoval = new ArrayList<>();
        MyWatcher(String path, int count) {
            this.path = path;
            latch = new CountDownLatch(count);
        }

        public void process(WatchedEvent event) {
            LOG.debug("Event path : {}, eventPath : {}", path, event.getPath());
            this.eventPath = event.getPath();
            // notifies watcher removal
            if (latch.getCount() == 0) {
                if (event.getType() != EventType.None) {
                    eventsAfterWatchRemoval.add(event.getType());
                }
            }
            if (event.getType() == EventType.ChildWatchRemoved || event.getType() == EventType.DataWatchRemoved) {
                latch.countDown();
            }
        }

        /**
         * Returns true if the watcher was triggered.  Try to avoid using this
         * method with assertFalse statements.  A false return depends on a timed
         * out wait on a latch, which makes tests run long.
         *
         * @return true if the watcher was triggered, false otherwise
         * @throws InterruptedException if interrupted while waiting on latch
         */
        public boolean matches() throws InterruptedException {
            if (!latch.await(CONNECTION_TIMEOUT / 5, TimeUnit.MILLISECONDS)) {
                LOG.error("Failed waiting to remove the watches");
                return false;
            }
            LOG.debug("Client path : {} eventPath : {}", path, eventPath);
            return path.equals(eventPath);
        }

        public List<EventType> getEventsAfterWatchRemoval() {
            return eventsAfterWatchRemoval;
        }

    }

    private class MyCallback implements AsyncCallback.VoidCallback {

        private final String path;
        private final int rc;
        private String eventPath;
        int eventRc;
        private CountDownLatch latch = new CountDownLatch(1);

        public MyCallback(int rc, String path) {
            this.rc = rc;
            this.path = path;
        }

        @Override
        public void processResult(int rc, String eventPath, Object ctx) {
            System.out.println("latch:" + path + " " + eventPath);
            this.eventPath = eventPath;
            this.eventRc = rc;
            this.latch.countDown();
        }

        /**
         * Returns true if the callback was triggered.  Try to avoid using this
         * method with assertFalse statements.  A false return depends on a timed
         * out wait on a latch, which makes tests run long.
         *
         * @return true if the watcher was triggered, false otherwise
         * @throws InterruptedException if interrupted while waiting on latch
         */
        public boolean matches() throws InterruptedException {
            if (!latch.await(CONNECTION_TIMEOUT / 5, TimeUnit.MILLISECONDS)) {
                return false;
            }
            return path.equals(eventPath) && rc == eventRc;
        }

    }

    /**
     * Checks if a session is registered with the server as a watcher.
     *
     * @param sessionId the session ID to check
     * @param path the path to check for watchers
     * @param type the type of watcher
     * @return true if the client session is a watcher on path for the type
     */
    private boolean isServerSessionWatcher(long sessionId, String path, WatcherType type) {
        Set<ServerCnxn> cnxns = new HashSet<>();
        CollectionUtils.addAll(cnxns, serverFactory.getConnections().iterator());
        for (ServerCnxn cnxn : cnxns) {
            if (cnxn.getSessionId() == sessionId) {
                return serverFactory.getZooKeeperServer().getZKDatabase().getDataTree().containsWatcher(path, type, cnxn);
            }
        }
        return false;
    }

    /**
     * Asserts next event from queue has given event type and path.
     */
    private void assertEvent(BlockingQueue<WatchedEvent> events, Watcher.Event.EventType eventType, String path)
            throws InterruptedException {
        WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
        assertNotNull(event);
        assertEquals(eventType, event.getType());
        assertEquals(path, event.getPath());
    }

    /**
     * Asserts no event from queue in a short period.
     */
    private void assertNoEvent(BlockingQueue<WatchedEvent> events) throws InterruptedException {
        // Short timeout so we don't hurt CI too much. It will fail finally given enough run if there are bugs.
        assertNull(events.poll(10, TimeUnit.MILLISECONDS));
    }
}