ConfigWatcherPathTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.Test;
public class ConfigWatcherPathTest extends ClientBase {
private void join(Consumer<CompletableFuture<Void>> task) {
CompletableFuture<Void> future = new CompletableFuture<>();
task.accept(future);
future.join();
}
private AsyncCallback.DataCallback complete(CompletableFuture<Void> future) {
return (rc, path, ctx, data, stat) -> {
if (rc == 0) {
future.complete(null);
} else {
future.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
}
};
}
private void testConfigWatcherPathWithChroot(String chroot) throws Exception {
ZooKeeper zk1 = createClient(hostPort + chroot);
BlockingQueueWatcher configWatcher = new BlockingQueueWatcher();
// given|>config watcher: attach to config node multiple times
byte[] configData = zk1.getConfig(configWatcher, null);
join(future -> zk1.getConfig(configWatcher, complete(future), null));
// given|>default watcher: attach to config node multiple times
BlockingQueueWatcher defaultWatcher = new BlockingQueueWatcher();
zk1.getWatchManager().setDefaultWatcher(defaultWatcher);
zk1.getConfig(true, null);
zk1.getConfig(defaultWatcher, null);
// when: make change to config node
ZooKeeper zk2 = createClient();
zk2.addAuthInfo("digest", "super:test".getBytes());
zk2.setData(ZooDefs.CONFIG_NODE, configData, -1);
// then|>config watcher: only one event with path "/zookeeper/config"
WatchedEvent configEvent = configWatcher.takeEvent(Duration.ofSeconds(10));
assertEquals("/zookeeper/config", configEvent.getPath());
assertNull(configWatcher.pollEvent(Duration.ofMillis(10)));
// then|>default watcher: only one event with path "/zookeeper/config"
WatchedEvent defaultWatcherEvent = defaultWatcher.takeEvent(Duration.ofSeconds(10));
assertEquals("/zookeeper/config", defaultWatcherEvent.getPath());
assertNull(defaultWatcher.pollEvent(Duration.ofMillis(10)));
// given: all watchers fired
// when: make change to config node
zk2.setData(ZooDefs.CONFIG_NODE, configData, -1);
// then: no more events
assertNull(configWatcher.pollEvent(Duration.ofMillis(10)));
assertNull(defaultWatcher.pollEvent(Duration.ofMillis(10)));
}
@Test
public void testConfigWatcherPathWithNoChroot() throws Exception {
testConfigWatcherPathWithChroot("");
}
@Test
public void testConfigWatcherPathWithShortChroot() throws Exception {
testConfigWatcherPathWithChroot("/short");
}
@Test
public void testConfigWatcherPathWithLongChroot() throws Exception {
testConfigWatcherPathWithChroot("/pretty-long-chroot-path");
}
@Test
public void testConfigWatcherPathWithChrootZooKeeperTree() throws Exception {
testConfigWatcherPathWithChroot("/zookeeper");
testConfigWatcherPathWithChroot("/zookeeper/a");
testConfigWatcherPathWithChroot("/zookeeper/config");
testConfigWatcherPathWithChroot("/zookeeper/config/a");
}
@Test
public void testConfigWatcherPathWithChrootZoo() throws Exception {
// "/zoo" is prefix of "/zookeeper/config"
testConfigWatcherPathWithChroot("/zoo");
}
private void testDataWatcherPathWithChroot(String chroot) throws Exception {
assertTrue("/zookeeper/config".startsWith(chroot));
String leafPath = "/zookeeper/config".substring(chroot.length());
String dataPath = leafPath.isEmpty() ? "/" : leafPath;
PathUtils.validatePath(dataPath);
ZooKeeper zk1 = createClient(hostPort + chroot);
BlockingQueueWatcher dataWatcher = new BlockingQueueWatcher();
BlockingQueueWatcher configWatcher = new BlockingQueueWatcher();
// given|>config watcher: attach to config node multiple times
byte[] configData = zk1.getConfig(configWatcher, null);
zk1.getConfig(configWatcher, null);
// given|>data watcher: attach to config node through getData multiple times
zk1.getData(dataPath, dataWatcher, null);
join(future -> zk1.getData(dataPath, dataWatcher, complete(future), null));
// given|>default watcher: attach to config node through getData and getConfig multiple times
BlockingQueueWatcher defaultWatcher = new BlockingQueueWatcher();
zk1.getWatchManager().setDefaultWatcher(defaultWatcher);
zk1.getData(dataPath, true, null);
zk1.getData(dataPath, defaultWatcher, null);
zk1.getConfig(true, null);
zk1.getConfig(defaultWatcher, null);
// when: make change to config node
ZooKeeper zk2 = createClient();
zk2.addAuthInfo("digest", "super:test".getBytes());
zk2.setData(ZooDefs.CONFIG_NODE, configData, -1);
// then|>data watcher: only one event with path dataPath
WatchedEvent dataEvent = dataWatcher.takeEvent(Duration.ofSeconds(10));
assertEquals(dataPath, dataEvent.getPath());
assertNull(dataWatcher.pollEvent(Duration.ofMillis(10)));
// then|>config watcher: only one event with path "/zookeeper/config"
WatchedEvent configEvent = configWatcher.takeEvent(Duration.ofSeconds(10));
assertEquals("/zookeeper/config", configEvent.getPath());
assertNull(configWatcher.pollEvent(Duration.ofMillis(10)));
if (dataPath.equals("/zookeeper/config")) {
// then|>default watcher: only one event with path "/zookeeper/config"
WatchedEvent defaultWatcherEvent = defaultWatcher.takeEvent(Duration.ofSeconds(10));
assertEquals("/zookeeper/config", defaultWatcherEvent.getPath());
} else {
// then|>default watcher: two events with path dataPath and "/zookeeper/config"
Set<String> defaultWatcherPaths = new HashSet<>();
defaultWatcherPaths.add(dataPath);
defaultWatcherPaths.add("/zookeeper/config");
WatchedEvent defaultWatcherEvent1 = defaultWatcher.takeEvent(Duration.ofSeconds(10));
assertThat(defaultWatcherPaths, hasItem(defaultWatcherEvent1.getPath()));
defaultWatcherPaths.remove(defaultWatcherEvent1.getPath());
WatchedEvent defaultWatcherEvent2 = defaultWatcher.takeEvent(Duration.ofSeconds(10));
assertNotNull(defaultWatcherEvent2);
assertThat(defaultWatcherPaths, hasItem(defaultWatcherEvent2.getPath()));
}
assertNull(defaultWatcher.pollEvent(Duration.ofMillis(10)));
// given: all watchers fired
// when: make change to config node
zk2.setData(ZooDefs.CONFIG_NODE, configData, -1);
// then: no more events
assertNull(dataWatcher.pollEvent(Duration.ofMillis(10)));
assertNull(configWatcher.pollEvent(Duration.ofMillis(10)));
assertNull(defaultWatcher.pollEvent(Duration.ofMillis(10)));
}
@Test
public void testDataWatcherPathWithNoChroot() throws Exception {
testDataWatcherPathWithChroot("");
}
@Test
public void testDataWatcherPathWithChrootZooKeeper() throws Exception {
testDataWatcherPathWithChroot("/zookeeper");
}
@Test
public void testDataWatcherPathWithChrootZooKeeperConfig() throws Exception {
testDataWatcherPathWithChroot("/zookeeper/config");
}
@Test
public void testDataWatcherPathWithChrootAndConfigPath() throws Exception {
try (ZooKeeper zk1 = createClient(hostPort + "/root1"); ZooKeeper zk2 = createClient()) {
// given: watcher client path "/zookeeper/config" in chroot "/root1"
BlockingQueueWatcher dataWatcher = new BlockingQueueWatcher();
zk1.addWatch("/zookeeper/config", dataWatcher, AddWatchMode.PERSISTENT);
// and: watch for "/zookeeper/config" in server
BlockingQueueWatcher configWatcher = new BlockingQueueWatcher();
byte[] configData = zk1.getConfig(configWatcher, null);
// when: make change to config node
zk2.addAuthInfo("digest", "super:test".getBytes());
zk2.setData(ZooDefs.CONFIG_NODE, configData, -1);
// then: config watcher works normally
WatchedEvent configEvent = configWatcher.takeEvent(Duration.ofSeconds(10));
assertEquals("/zookeeper/config", configEvent.getPath());
// and: no data watcher for "/zookeeper/config" in chroot "/root1"
assertNull(dataWatcher.pollEvent(Duration.ofSeconds(1)));
}
}
}