WatchManagerTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.watch;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.server.DumbWatcher;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerMetrics;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WatchManagerTest extends ZKTestCase {

    protected static final Logger LOG = LoggerFactory.getLogger(WatchManagerTest.class);

    private static final String PATH_PREFIX = "/path";

    private ConcurrentHashMap<Integer, DumbWatcher> watchers;
    private Random r;

    public static Stream<Arguments> data() {
        return Stream.of(
            Arguments.of(WatchManager.class.getName()),
            Arguments.of(WatchManagerOptimized.class.getName()));
    }

    @BeforeEach
    public void setUp() {
        ServerMetrics.getMetrics().resetAll();
        watchers = new ConcurrentHashMap<>();
        r = new Random(System.nanoTime());
    }

    public IWatchManager getWatchManager(String className) throws IOException {
        System.setProperty(WatchManagerFactory.ZOOKEEPER_WATCH_MANAGER_NAME, className);
        return WatchManagerFactory.createWatchManager();
    }

    public DumbWatcher createOrGetWatcher(int watcherId) {
        if (!watchers.containsKey(watcherId)) {
            DumbWatcher watcher = new DumbWatcher(watcherId);
            watchers.putIfAbsent(watcherId, watcher);
        }
        return watchers.get(watcherId);
    }

    public class AddWatcherWorker extends Thread {

        private final IWatchManager manager;
        private final int paths;
        private final int watchers;
        private final AtomicInteger watchesAdded;
        private volatile boolean stopped = false;

        public AddWatcherWorker(
                IWatchManager manager, int paths, int watchers, AtomicInteger watchesAdded) {
            this.manager = manager;
            this.paths = paths;
            this.watchers = watchers;
            this.watchesAdded = watchesAdded;
        }

        @Override
        public void run() {
            while (!stopped) {
                String path = PATH_PREFIX + r.nextInt(paths);
                Watcher watcher = createOrGetWatcher(r.nextInt(watchers));
                if (manager.addWatch(path, watcher)) {
                    watchesAdded.addAndGet(1);
                }
            }
        }

        public void shutdown() {
            stopped = true;
        }

    }

    public class WatcherTriggerWorker extends Thread {

        private final IWatchManager manager;
        private final int paths;
        private final AtomicInteger triggeredCount;
        private volatile boolean stopped = false;

        public WatcherTriggerWorker(
                IWatchManager manager, int paths, AtomicInteger triggeredCount) {
            this.manager = manager;
            this.paths = paths;
            this.triggeredCount = triggeredCount;
        }

        @Override
        public void run() {
            while (!stopped) {
                String path = PATH_PREFIX + r.nextInt(paths);
                WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted, -1, null);
                if (s != null) {
                    triggeredCount.addAndGet(s.size());
                }
                try {
                    Thread.sleep(r.nextInt(10));
                } catch (InterruptedException e) {
                }
            }
        }

        public void shutdown() {
            stopped = true;
        }

    }

    public class RemoveWatcherWorker extends Thread {

        private final IWatchManager manager;
        private final int paths;
        private final int watchers;
        private final AtomicInteger watchesRemoved;
        private volatile boolean stopped = false;

        public RemoveWatcherWorker(
                IWatchManager manager, int paths, int watchers, AtomicInteger watchesRemoved) {
            this.manager = manager;
            this.paths = paths;
            this.watchers = watchers;
            this.watchesRemoved = watchesRemoved;
        }

        @Override
        public void run() {
            while (!stopped) {
                String path = PATH_PREFIX + r.nextInt(paths);
                Watcher watcher = createOrGetWatcher(r.nextInt(watchers));
                if (manager.removeWatcher(path, watcher)) {
                    watchesRemoved.addAndGet(1);
                }
                try {
                    Thread.sleep(r.nextInt(10));
                } catch (InterruptedException e) {
                }
            }
        }

        public void shutdown() {
            stopped = true;
        }

    }

    public class CreateDeadWatchersWorker extends Thread {

        private final IWatchManager manager;
        private final int watchers;
        private final Set<Watcher> removedWatchers;
        private volatile boolean stopped = false;

        public CreateDeadWatchersWorker(
                IWatchManager manager, int watchers, Set<Watcher> removedWatchers) {
            this.manager = manager;
            this.watchers = watchers;
            this.removedWatchers = removedWatchers;
        }

        @Override
        public void run() {
            while (!stopped) {
                DumbWatcher watcher = createOrGetWatcher(r.nextInt(watchers));
                watcher.setStale();
                manager.removeWatcher(watcher);
                synchronized (removedWatchers) {
                    removedWatchers.add(watcher);
                }
                try {
                    Thread.sleep(r.nextInt(10));
                } catch (InterruptedException e) {
                }
            }
        }

        public void shutdown() {
            stopped = true;
        }

    }

    /**
     * Concurrently add and trigger watch, make sure the watches triggered
     * are the same as the number added.
     */
    @ParameterizedTest
    @MethodSource("data")
    @Timeout(value = 90)
    public void testAddAndTriggerWatcher(String className) throws IOException {
        IWatchManager manager = getWatchManager(className);
        int paths = 1;
        int watchers = 10000;

        // 1. start 5 workers to trigger watchers on that path
        //    count all the watchers have been fired
        AtomicInteger watchTriggered = new AtomicInteger();
        List<WatcherTriggerWorker> triggerWorkers = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            WatcherTriggerWorker worker = new WatcherTriggerWorker(manager, paths, watchTriggered);
            triggerWorkers.add(worker);
            worker.start();
        }

        // 2. start 5 workers to add different watchers on the same path
        //    count all the watchers being added
        AtomicInteger watchesAdded = new AtomicInteger();
        List<AddWatcherWorker> addWorkers = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            AddWatcherWorker worker = new AddWatcherWorker(manager, paths, watchers, watchesAdded);
            addWorkers.add(worker);
            worker.start();
        }

        while (watchesAdded.get() < 100000) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
        }

        // 3. stop all the addWorkers
        for (AddWatcherWorker worker : addWorkers) {
            worker.shutdown();
        }

        // 4. running the trigger worker a bit longer to make sure
        //    all watchers added are fired
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
        }

        // 5. stop all triggerWorkers
        for (WatcherTriggerWorker worker : triggerWorkers) {
            worker.shutdown();
        }

        // 6. make sure the total watch triggered is same as added
        assertTrue(watchesAdded.get() > 0);
        assertEquals(watchesAdded.get(), watchTriggered.get());
    }

    /**
     * Concurrently add and remove watch, make sure the watches left +
     * the watches removed are equal to the total added watches.
     */
    @ParameterizedTest
    @MethodSource("data")
    @Timeout(value = 90)
    public void testRemoveWatcherOnPath(String className) throws IOException {
        IWatchManager manager = getWatchManager(className);
        int paths = 10;
        int watchers = 10000;

        // 1. start 5 workers to remove watchers on those path
        //    record the watchers have been removed
        AtomicInteger watchesRemoved = new AtomicInteger();
        List<RemoveWatcherWorker> removeWorkers = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            RemoveWatcherWorker worker = new RemoveWatcherWorker(manager, paths, watchers, watchesRemoved);
            removeWorkers.add(worker);
            worker.start();
        }

        // 2. start 5 workers to add different watchers on different path
        //    record the watchers have been added
        AtomicInteger watchesAdded = new AtomicInteger();
        List<AddWatcherWorker> addWorkers = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            AddWatcherWorker worker = new AddWatcherWorker(manager, paths, watchers, watchesAdded);
            addWorkers.add(worker);
            worker.start();
        }

        while (watchesAdded.get() < 100000) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
        }

        // 3. stop all workers
        for (RemoveWatcherWorker worker : removeWorkers) {
            worker.shutdown();
        }
        for (AddWatcherWorker worker : addWorkers) {
            worker.shutdown();
        }

        // 4. sleep for a while to make sure all the thread exited
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
        }

        // 5. make sure left watches + removed watches = added watches
        assertTrue(watchesAdded.get() > 0);
        assertTrue(watchesRemoved.get() > 0);
        assertTrue(manager.size() > 0);
        assertEquals(watchesAdded.get(), watchesRemoved.get() + manager.size());
    }

    /**
     * Test add, contains and remove on generic watch manager.
     */
    @ParameterizedTest
    @MethodSource("data")
    public void testAddRemoveWatcher(String className) throws IOException {
        IWatchManager manager = getWatchManager(className);
        Watcher watcher1 = new DumbWatcher();
        Watcher watcher2 = new DumbWatcher();

        // given: add watcher1 to "/node1"
        manager.addWatch("/node1", watcher1);

        // then: contains or remove should fail on mismatch path and watcher pair
        assertFalse(manager.containsWatcher("/node1", watcher2));
        assertFalse(manager.containsWatcher("/node2", watcher1));
        assertFalse(manager.removeWatcher("/node1", watcher2));
        assertFalse(manager.removeWatcher("/node2", watcher1));

        // then: contains or remove should succeed on matching path and watcher pair
        assertTrue(manager.containsWatcher("/node1", watcher1));
        assertTrue(manager.removeWatcher("/node1", watcher1));

        // then: contains or remove should fail on removed path and watcher pair
        assertFalse(manager.containsWatcher("/node1", watcher1));
        assertFalse(manager.removeWatcher("/node1", watcher1));
    }

    /**
     * Test containsWatcher on all pairs, and removeWatcher on mismatch pairs.
     */
    @Test
    public void testContainsMode() {
        IWatchManager manager = new WatchManager();
        Watcher watcher1 = new DumbWatcher();
        Watcher watcher2 = new DumbWatcher();

        // given: add watcher1 to "/node1" in persistent mode
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
        assertNotEquals(WatcherMode.PERSISTENT, WatcherMode.DEFAULT_WATCHER_MODE);

        // then: contains should succeed on watcher1 to "/node1" in persistent and any mode
        assertTrue(manager.containsWatcher("/node1", watcher1));
        assertTrue(manager.containsWatcher("/node1", watcher1, null));
        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));

        // then: contains and remove should fail on mismatch watcher
        assertFalse(manager.containsWatcher("/node1", watcher2));
        assertFalse(manager.containsWatcher("/node1", watcher2, null));
        assertFalse(manager.containsWatcher("/node1", watcher2, WatcherMode.STANDARD));
        assertFalse(manager.containsWatcher("/node1", watcher2, WatcherMode.PERSISTENT));
        assertFalse(manager.containsWatcher("/node1", watcher2, WatcherMode.PERSISTENT_RECURSIVE));
        assertFalse(manager.removeWatcher("/node1", watcher2));
        assertFalse(manager.removeWatcher("/node1", watcher2, null));
        assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.STANDARD));
        assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.PERSISTENT));
        assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.PERSISTENT_RECURSIVE));

        // then: contains and remove should fail on mismatch path
        assertFalse(manager.containsWatcher("/node2", watcher1));
        assertFalse(manager.containsWatcher("/node2", watcher1, null));
        assertFalse(manager.containsWatcher("/node2", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.containsWatcher("/node2", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.containsWatcher("/node2", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertFalse(manager.removeWatcher("/node2", watcher1));
        assertFalse(manager.removeWatcher("/node2", watcher1, null));
        assertFalse(manager.removeWatcher("/node2", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.removeWatcher("/node2", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.removeWatcher("/node2", watcher1, WatcherMode.PERSISTENT_RECURSIVE));

        // then: contains and remove should fail on mismatch modes
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));

        // when: add watcher1 to "/node1" in remaining modes
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));

        // then: contains should succeed on watcher to "/node1" in all modes
        assertTrue(manager.containsWatcher("/node1", watcher1));
        assertTrue(manager.containsWatcher("/node1", watcher1, null));
        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
    }

    /**
     * Test repeatedly {@link WatchManager#addWatch(String, Watcher, WatcherMode)}.
     */
    @Test
    public void testAddModeRepeatedly() {
        IWatchManager manager = new WatchManager();
        Watcher watcher1 = new DumbWatcher();

        // given: add watcher1 to "/node1" in all modes
        manager.addWatch("/node1", watcher1, WatcherMode.STANDARD);
        manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT);
        manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE);

        // when: add watcher1 to "/node1" in these modes repeatedly
        assertFalse(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));

        // then: contains and remove should work normally on watcher1 to "/node1"
        assertTrue(manager.containsWatcher("/node1", watcher1));
        assertTrue(manager.containsWatcher("/node1", watcher1, null));
        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));

        assertTrue(manager.containsWatcher("/node1", watcher1));
        assertTrue(manager.containsWatcher("/node1", watcher1, null));
        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));

        assertTrue(manager.containsWatcher("/node1", watcher1));
        assertTrue(manager.containsWatcher("/node1", watcher1, null));
        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));

        assertFalse(manager.containsWatcher("/node1", watcher1));
        assertFalse(manager.containsWatcher("/node1", watcher1, null));
        assertFalse(manager.removeWatcher("/node1", watcher1));
        assertFalse(manager.removeWatcher("/node1", watcher1, null));
    }

    /**
     * Test {@link WatchManager#removeWatcher(String, Watcher, WatcherMode)} on one pair should not break others.
     */
    @Test
    public void testRemoveModeOne() {
        IWatchManager manager = new WatchManager();
        Watcher watcher1 = new DumbWatcher();
        Watcher watcher2 = new DumbWatcher();

        // given: add watcher1 to "/node1" and watcher2 to "/node2" in all modes
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.STANDARD));
        assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.PERSISTENT));
        assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE));

        // when: remove one pair
        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));

        // then: contains and remove should succeed on other pairs
        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.STANDARD));
        assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.PERSISTENT));
        assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE));
        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertTrue(manager.removeWatcher("/node2", watcher2, WatcherMode.STANDARD));
        assertTrue(manager.removeWatcher("/node2", watcher2, WatcherMode.PERSISTENT));
        assertTrue(manager.removeWatcher("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE));
    }

    /**
     * Test {@link WatchManager#removeWatcher(String, Watcher, WatcherMode)} with {@code null} watcher mode.
     */
    @Test
    public void testRemoveModeAll() {
        IWatchManager manager = new WatchManager();
        Watcher watcher1 = new DumbWatcher();

        // given: add watcher1 to "/node1" in all modes
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));

        // when: remove watcher1 using null watcher mode
        assertTrue(manager.removeWatcher("/node1", watcher1, null));

        // then: contains and remove should fail on watcher1 to "/node1" in all modes
        assertFalse(manager.containsWatcher("/node1", watcher1));
        assertFalse(manager.containsWatcher("/node1", watcher1, null));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertFalse(manager.removeWatcher("/node1", watcher1));
        assertFalse(manager.removeWatcher("/node1", watcher1, null));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));

        // given: add watcher1 to "/node1" in all modes
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));

        // then: remove watcher1 without a mode should behave same to removing all modes
        assertTrue(manager.removeWatcher("/node1", watcher1));

        assertFalse(manager.containsWatcher("/node1", watcher1));
        assertFalse(manager.containsWatcher("/node1", watcher1, null));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertFalse(manager.removeWatcher("/node1", watcher1));
        assertFalse(manager.removeWatcher("/node1", watcher1, null));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
    }

    /**
     * Test {@link WatchManager#removeWatcher(String, Watcher)}.
     */
    @Test
    public void testRemoveModeAllDefault() {
        IWatchManager manager = new WatchManager();
        Watcher watcher1 = new DumbWatcher();

        // given: add watcher1 to "/node1" in all modes
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));

        // then: remove watcher1 without a mode should behave same to removing all modes
        assertTrue(manager.removeWatcher("/node1", watcher1));

        assertFalse(manager.containsWatcher("/node1", watcher1));
        assertFalse(manager.containsWatcher("/node1", watcher1, null));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertFalse(manager.removeWatcher("/node1", watcher1));
        assertFalse(manager.removeWatcher("/node1", watcher1, null));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
    }

    /**
     * Test {@link WatchManager#removeWatcher(String, Watcher, WatcherMode)} all modes individually.
     */
    @Test
    public void testRemoveModeAllIndividually() {
        IWatchManager manager = new WatchManager();
        Watcher watcher1 = new DumbWatcher();

        // given: add watcher1 to "/node1" in all modes
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));

        // when: remove all modes individually
        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));

        // then: contains and remove should fail on watcher1 to "/node1" in all modes
        assertFalse(manager.containsWatcher("/node1", watcher1));
        assertFalse(manager.containsWatcher("/node1", watcher1, null));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertFalse(manager.removeWatcher("/node1", watcher1));
        assertFalse(manager.removeWatcher("/node1", watcher1, null));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
    }

    /**
     * Test {@link WatchManager#removeWatcher(String, Watcher, WatcherMode)} on mismatch pair should break nothing.
     */
    @Test
    public void testRemoveModeMismatch() {
        IWatchManager manager = new WatchManager();
        Watcher watcher1 = new DumbWatcher();
        Watcher watcher2 = new DumbWatcher();

        // given: add watcher1 to "/node1" and watcher2 to "/node2" in all modes
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.STANDARD));
        assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.PERSISTENT));
        assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE));

        // when: remove mismatch path and watcher pairs
        assertFalse(manager.removeWatcher("/node1", watcher2));
        assertFalse(manager.removeWatcher("/node1", watcher2, null));
        assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.STANDARD));
        assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.PERSISTENT));
        assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.PERSISTENT_RECURSIVE));

        // then: no existing watching pairs should break
        assertTrue(manager.containsWatcher("/node1", watcher1));
        assertTrue(manager.containsWatcher("/node1", watcher1, null));
        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
        assertTrue(manager.containsWatcher("/node2", watcher2));
        assertTrue(manager.containsWatcher("/node2", watcher2, null));
        assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.STANDARD));
        assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.PERSISTENT));
        assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE));
    }

    /**
     * Concurrently add watch while close the watcher to simulate the
     * client connections closed on prod.
     */
    @ParameterizedTest
    @MethodSource("data")
    @Timeout(value = 90)
    public void testDeadWatchers(String className) throws IOException {
        System.setProperty("zookeeper.watcherCleanThreshold", "10");
        System.setProperty("zookeeper.watcherCleanIntervalInSeconds", "1");

        IWatchManager manager = getWatchManager(className);
        int paths = 1;
        int watchers = 100000;

        // 1. start 5 workers to randomly mark those watcher as dead
        //    and remove them from watch manager
        Set<Watcher> deadWatchers = new HashSet<>();
        List<CreateDeadWatchersWorker> deadWorkers = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            CreateDeadWatchersWorker worker = new CreateDeadWatchersWorker(manager, watchers, deadWatchers);
            deadWorkers.add(worker);
            worker.start();
        }

        // 2. start 5 workers to add different watchers on the same path
        AtomicInteger watchesAdded = new AtomicInteger();
        List<AddWatcherWorker> addWorkers = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            AddWatcherWorker worker = new AddWatcherWorker(manager, paths, watchers, watchesAdded);
            addWorkers.add(worker);
            worker.start();
        }

        while (watchesAdded.get() < 50000) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
        }

        // 3. stop all workers
        for (CreateDeadWatchersWorker worker : deadWorkers) {
            worker.shutdown();
        }
        for (AddWatcherWorker worker : addWorkers) {
            worker.shutdown();
        }

        // 4. sleep for a while to make sure all the thread exited
        // the cleaner may wait as long as CleanerInterval+CleanerInterval/2+1
        // So need to sleep as least that long
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }

        // 5. make sure the dead watchers are not in the existing watchers
        WatchesReport existingWatchers = manager.getWatches();
        for (Watcher w : deadWatchers) {
            assertFalse(existingWatchers.hasPaths(((ServerCnxn) w).getSessionId()));
        }
    }

    private void checkMetrics(String metricName, long min, long max, double avg, long cnt, long sum) {
        Map<String, Object> values = MetricsUtils.currentServerMetrics();

        assertEquals(min, values.get("min_" + metricName));
        assertEquals(max, values.get("max_" + metricName));
        assertEquals(avg, (Double) values.get("avg_" + metricName), 0.000001);
        assertEquals(cnt, values.get("cnt_" + metricName));
        assertEquals(sum, values.get("sum_" + metricName));
    }

    private void checkMostRecentWatchedEvent(DumbWatcher watcher, String path, EventType eventType, long zxid) {
        assertEquals(path, watcher.getMostRecentPath());
        assertEquals(eventType, watcher.getMostRecentEventType());
        assertEquals(zxid, watcher.getMostRecentZxid());
    }

    @ParameterizedTest
    @MethodSource("data")
    public void testWatcherMetrics(String className) throws IOException {
        IWatchManager manager = getWatchManager(className);
        ServerMetrics.getMetrics().resetAll();

        DumbWatcher watcher1 = new DumbWatcher(1);
        DumbWatcher watcher2 = new DumbWatcher(2);

        final String path1 = "/path1";
        final String path2 = "/path2";

        final String path3 = "/path3";

        //both watcher1 and watcher2 are watching path1
        manager.addWatch(path1, watcher1);
        manager.addWatch(path1, watcher2);

        //path2 is watched by watcher1
        manager.addWatch(path2, watcher1);

        manager.triggerWatch(path3, EventType.NodeCreated, 1, null);
        //path3 is not being watched so metric is 0
        checkMetrics("node_created_watch_count", 0L, 0L, 0D, 0L, 0L);
        // Watchers shouldn't have received any events yet so the zxid should be -1.
        checkMostRecentWatchedEvent(watcher1, null, null, -1);
        checkMostRecentWatchedEvent(watcher2, null, null, -1);

        //path1 is watched by two watchers so two fired
        manager.triggerWatch(path1, EventType.NodeCreated, 2, null);
        checkMetrics("node_created_watch_count", 2L, 2L, 2D, 1L, 2L);
        checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeCreated, 2);
        checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2);

        //path2 is watched by one watcher so one fired now total is 3
        manager.triggerWatch(path2, EventType.NodeCreated, 3, null);
        checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
        checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3);
        checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2);

        //watches on path1 are no longer there so zero fired
        manager.triggerWatch(path1, EventType.NodeDataChanged, 4, null);
        checkMetrics("node_changed_watch_count", 0L, 0L, 0D, 0L, 0L);
        checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3);
        checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2);

        //both watcher and watcher are watching path1
        manager.addWatch(path1, watcher1);
        manager.addWatch(path1, watcher2);

        //path2 is watched by watcher1
        manager.addWatch(path2, watcher1);

        manager.triggerWatch(path1, EventType.NodeDataChanged, 5, null);
        checkMetrics("node_changed_watch_count", 2L, 2L, 2D, 1L, 2L);
        checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeDataChanged, 5);
        checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5);

        manager.triggerWatch(path2, EventType.NodeDeleted, 6, null);
        checkMetrics("node_deleted_watch_count", 1L, 1L, 1D, 1L, 1L);
        checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeDeleted, 6);
        checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5);

        //make sure that node created watch count is not impacted by the fire of other event types
        checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
    }

}