WatcherCleanerTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.watch;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.server.ServerMetrics;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WatcherCleanerTest extends ZKTestCase {
private static final Logger LOG = LoggerFactory.getLogger(WatcherCleanerTest.class);
public static class MyDeadWatcherListener implements IDeadWatcherListener {
private CountDownLatch latch;
private int delayMs;
private Set<Integer> deadWatchers = new HashSet<>();
public void setCountDownLatch(CountDownLatch latch) {
this.latch = latch;
}
public void setDelayMs(int delayMs) {
this.delayMs = delayMs;
}
@Override
public void processDeadWatchers(Set<Integer> deadWatchers) {
if (delayMs > 0) {
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
}
}
this.deadWatchers.clear();
this.deadWatchers.addAll(deadWatchers);
latch.countDown();
}
public Set<Integer> getDeadWatchers() {
return deadWatchers;
}
public boolean wait(int maxWaitMs) {
try {
return latch.await(maxWaitMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
return false;
}
}
@Test
public void testProcessDeadWatchersBasedOnThreshold() {
MyDeadWatcherListener listener = new MyDeadWatcherListener();
int threshold = 3;
WatcherCleaner cleaner = new WatcherCleaner(listener, threshold, 60, 1, 10);
cleaner.start();
int i = 0;
while (i++ < threshold - 1) {
cleaner.addDeadWatcher(i);
}
// not trigger processDeadWatchers yet
assertEquals(0, listener.getDeadWatchers().size());
listener.setCountDownLatch(new CountDownLatch(1));
// add another dead watcher to trigger the process
cleaner.addDeadWatcher(i);
assertTrue(listener.wait(1000));
assertEquals(threshold, listener.getDeadWatchers().size());
}
@Test
public void testProcessDeadWatchersBasedOnTime() {
MyDeadWatcherListener listener = new MyDeadWatcherListener();
WatcherCleaner cleaner = new WatcherCleaner(listener, 10, 1, 1, 10);
cleaner.start();
cleaner.addDeadWatcher(1);
// not trigger processDeadWatchers yet
assertEquals(0, listener.getDeadWatchers().size());
listener.setCountDownLatch(new CountDownLatch(1));
assertTrue(listener.wait(2000));
assertEquals(1, listener.getDeadWatchers().size());
// won't trigger event if there is no dead watchers
listener.setCountDownLatch(new CountDownLatch(1));
assertFalse(listener.wait(2000));
}
@Test
public void testMaxInProcessingDeadWatchers() {
MyDeadWatcherListener listener = new MyDeadWatcherListener();
int delayMs = 1000;
listener.setDelayMs(delayMs);
WatcherCleaner cleaner = new WatcherCleaner(listener, 1, 60, 1, 1);
cleaner.start();
listener.setCountDownLatch(new CountDownLatch(2));
long startTime = Time.currentElapsedTime();
cleaner.addDeadWatcher(1);
cleaner.addDeadWatcher(2);
long time = Time.currentElapsedTime() - startTime;
System.out.println("time used " + time);
assertTrue(Time.currentElapsedTime() - startTime >= delayMs);
assertTrue(listener.wait(5000));
}
@Test
public void testDeadWatcherMetrics() throws InterruptedException {
ServerMetrics.getMetrics().resetAll();
MyDeadWatcherListener listener = new MyDeadWatcherListener();
WatcherCleaner cleaner = new WatcherCleaner(listener, 1, 1, 1, 1);
listener.setDelayMs(20);
cleaner.start();
listener.setCountDownLatch(new CountDownLatch(3));
//the dead watchers will be added one by one and cleared one by one because we set both watchCleanThreshold and
//maxInProcessingDeadWatchers to 1
cleaner.addDeadWatcher(1);
cleaner.addDeadWatcher(2);
cleaner.addDeadWatcher(3);
assertTrue(listener.wait(5000));
Map<String, Object> values = MetricsUtils.currentServerMetrics();
// Adding dead watcher should be stalled twice
waitForMetric("add_dead_watcher_stall_time", greaterThan(0L));
waitForMetric("dead_watchers_queued", is(3L));
waitForMetric("dead_watchers_cleared", is(3L));
waitForMetric("cnt_dead_watchers_cleaner_latency", is(3L));
//Each latency should be a little over 20 ms, allow 5 ms deviation
waitForMetric("avg_dead_watchers_cleaner_latency", closeTo(20, 5));
waitForMetric("min_dead_watchers_cleaner_latency", closeTo(20, 5));
waitForMetric("max_dead_watchers_cleaner_latency", closeTo(20, 5));
waitForMetric("p50_dead_watchers_cleaner_latency", closeTo(20, 5));
waitForMetric("p95_dead_watchers_cleaner_latency", closeTo(20, 5));
waitForMetric("p99_dead_watchers_cleaner_latency", closeTo(20, 5));
}
}