WatcherFuncTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class WatcherFuncTest extends ClientBase {

    private static class SimpleWatcher implements Watcher {

        private LinkedBlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
        private CountDownLatch latch;

        public SimpleWatcher(CountDownLatch latch) {
            this.latch = latch;
        }

        public void process(WatchedEvent event) {
            if (event.getState() == KeeperState.SyncConnected) {
                if (latch != null) {
                    latch.countDown();
                }
            }

            if (event.getType() == EventType.None) {
                return;
            }
            try {
                events.put(event);
            } catch (InterruptedException e) {
                assertTrue(false, "interruption unexpected");
            }
        }

        public void verify(List<WatchedEvent> expected) throws InterruptedException {
            List<WatchedEvent> actual = new ArrayList<>();
            WatchedEvent event;
            while (actual.size() < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) {
                actual.add(event);
            }
            assertEquals(expected.size(), actual.size());
            for (int i = 0; i < expected.size(); i++) {
                TestUtils.assertWatchedEventEquals(expected.get(i), actual.get(i));
            }
            events.clear();
        }

    }

    private SimpleWatcher client_dwatch;
    private volatile CountDownLatch client_latch;
    private ZooKeeper client;
    private SimpleWatcher lsnr_dwatch;
    private volatile CountDownLatch lsnr_latch;
    private ZooKeeper lsnr;

    private List<WatchedEvent> expected;

    @BeforeEach
    @Override
    public void setUp() throws Exception {
        super.setUp();

        client_latch = new CountDownLatch(1);
        client_dwatch = new SimpleWatcher(client_latch);
        client = createClient(client_dwatch, client_latch);

        lsnr_latch = new CountDownLatch(1);
        lsnr_dwatch = new SimpleWatcher(lsnr_latch);
        lsnr = createClient(lsnr_dwatch, lsnr_latch);

        expected = new ArrayList<>();
    }

    @AfterEach
    @Override
    public void tearDown() throws Exception {
        client.close();
        lsnr.close();
        super.tearDown();
    }

    protected ZooKeeper createClient(Watcher watcher, CountDownLatch latch) throws IOException, InterruptedException {
        ZooKeeper zk = new ZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher);
        if (!latch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
            fail("Unable to connect to server");
        }
        return zk;
    }

    private void verify() throws InterruptedException {
        lsnr_dwatch.verify(expected);
        expected.clear();
    }

    private void addEvent(List<WatchedEvent> events, EventType eventType, String path, Stat stat) {
        addEvent(events, eventType, path, stat.getMzxid());
    }

    private void addEvent(List<WatchedEvent> events, EventType eventType, String path, long zxid) {
        events.add(new WatchedEvent(eventType, KeeperState.SyncConnected, path, zxid));
    }

    private long delete(String path) throws InterruptedException, KeeperException {
        client.delete(path, -1);
        int lastSlash = path.lastIndexOf('/');
        String parent = (lastSlash == 0)
            ? "/"
            : path.substring(0, lastSlash);
        // the deletion's zxid will be reflected in the parent's Pzxid
        return client.exists(parent, false).getPzxid();
    }

    @Test
    public void testExistsSync() throws IOException, InterruptedException, KeeperException {
        assertNull(lsnr.exists("/foo", true));
        assertNull(lsnr.exists("/foo/bar", true));

        Stat stat = new Stat();
        client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
        addEvent(expected, EventType.NodeCreated, "/foo", stat);
        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
        addEvent(expected, EventType.NodeCreated, "/foo/bar", stat);

        verify();

        assertNotNull(lsnr.exists("/foo", true));
        assertNotNull(lsnr.exists("/foo/bar", true));

        try {
            assertNull(lsnr.exists("/car", true));
            client.setData("/car", "missing".getBytes(), -1);
            fail();
        } catch (KeeperException e) {
            assertEquals(KeeperException.Code.NONODE, e.code());
            assertEquals("/car", e.getPath());
        }

        try {
            assertNull(lsnr.exists("/foo/car", true));
            client.setData("/foo/car", "missing".getBytes(), -1);
            fail();
        } catch (KeeperException e) {
            assertEquals(KeeperException.Code.NONODE, e.code());
            assertEquals("/foo/car", e.getPath());
        }

        stat = client.setData("/foo", "parent".getBytes(), -1);
        addEvent(expected, EventType.NodeDataChanged, "/foo", stat);
        stat = client.setData("/foo/bar", "child".getBytes(), -1);
        addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat);

        verify();

        assertNotNull(lsnr.exists("/foo", true));
        assertNotNull(lsnr.exists("/foo/bar", true));

        long deleteZxid = delete("/foo/bar");
        addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid);
        deleteZxid = delete("/foo");
        addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);

        verify();
    }

    @Test
    public void testGetDataSync() throws IOException, InterruptedException, KeeperException {
        try {
            lsnr.getData("/foo", true, null);
            fail();
        } catch (KeeperException e) {
            assertEquals(KeeperException.Code.NONODE, e.code());
            assertEquals("/foo", e.getPath());
        }
        try {
            lsnr.getData("/foo/bar", true, null);
            fail();
        } catch (KeeperException e) {
            assertEquals(KeeperException.Code.NONODE, e.code());
            assertEquals("/foo/bar", e.getPath());
        }

        client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertNotNull(lsnr.getData("/foo", true, null));
        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertNotNull(lsnr.getData("/foo/bar", true, null));

        Stat stat = client.setData("/foo", "parent".getBytes(), -1);
        addEvent(expected, EventType.NodeDataChanged, "/foo", stat);
        stat = client.setData("/foo/bar", "child".getBytes(), -1);
        addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat);

        verify();

        assertNotNull(lsnr.getData("/foo", true, null));
        assertNotNull(lsnr.getData("/foo/bar", true, null));

        long deleteZxid = delete("/foo/bar");
        addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid);
        deleteZxid = delete("/foo");
        addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);

        verify();
    }

    @Test
    public void testGetChildrenSync() throws IOException, InterruptedException, KeeperException {
        try {
            lsnr.getChildren("/foo", true);
            fail();
        } catch (KeeperException e) {
            assertEquals(KeeperException.Code.NONODE, e.code());
            assertEquals("/foo", e.getPath());
        }
        try {
            lsnr.getChildren("/foo/bar", true);
            fail();
        } catch (KeeperException e) {
            assertEquals(KeeperException.Code.NONODE, e.code());
            assertEquals("/foo/bar", e.getPath());
        }

        client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertNotNull(lsnr.getChildren("/foo", true));

        Stat stat = new Stat();
        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
        addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo
        assertNotNull(lsnr.getChildren("/foo/bar", true));

        client.setData("/foo", "parent".getBytes(), -1);
        client.setData("/foo/bar", "child".getBytes(), -1);

        assertNotNull(lsnr.exists("/foo", true));

        assertNotNull(lsnr.getChildren("/foo", true));
        assertNotNull(lsnr.getChildren("/foo/bar", true));

        long deleteZxid = delete("/foo/bar");
        addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); // /foo/bar childwatch
        addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo
        deleteZxid = delete("/foo");
        addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);

        verify();
    }

    @Test
    public void testExistsSyncWObj() throws IOException, InterruptedException, KeeperException {
        SimpleWatcher w1 = new SimpleWatcher(null);
        SimpleWatcher w2 = new SimpleWatcher(null);
        SimpleWatcher w3 = new SimpleWatcher(null);
        SimpleWatcher w4 = new SimpleWatcher(null);

        List<WatchedEvent> e2 = new ArrayList<>();

        assertNull(lsnr.exists("/foo", true));
        assertNull(lsnr.exists("/foo", w1));

        assertNull(lsnr.exists("/foo/bar", w2));
        assertNull(lsnr.exists("/foo/bar", w3));
        assertNull(lsnr.exists("/foo/bar", w3));
        assertNull(lsnr.exists("/foo/bar", w4));

        Stat stat = new Stat();
        client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
        addEvent(expected, EventType.NodeCreated, "/foo", stat);
        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
        addEvent(e2, EventType.NodeCreated, "/foo/bar", stat);

        lsnr_dwatch.verify(expected);
        w1.verify(expected);
        w2.verify(e2);
        w3.verify(e2);
        w4.verify(e2);
        expected.clear();
        e2.clear();

        // default not registered
        assertNotNull(lsnr.exists("/foo", w1));

        assertNotNull(lsnr.exists("/foo/bar", w2));
        assertNotNull(lsnr.exists("/foo/bar", w3));
        assertNotNull(lsnr.exists("/foo/bar", w4));
        assertNotNull(lsnr.exists("/foo/bar", w4));

        stat = client.setData("/foo", "parent".getBytes(), -1);
        addEvent(expected, EventType.NodeDataChanged, "/foo", stat);
        stat = client.setData("/foo/bar", "child".getBytes(), -1);
        addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat);

        lsnr_dwatch.verify(new ArrayList<>()); // not reg so should = 0
        w1.verify(expected);
        w2.verify(e2);
        w3.verify(e2);
        w4.verify(e2);
        expected.clear();
        e2.clear();

        assertNotNull(lsnr.exists("/foo", true));
        assertNotNull(lsnr.exists("/foo", w1));
        assertNotNull(lsnr.exists("/foo", w1));

        assertNotNull(lsnr.exists("/foo/bar", w2));
        assertNotNull(lsnr.exists("/foo/bar", w2));
        assertNotNull(lsnr.exists("/foo/bar", w3));
        assertNotNull(lsnr.exists("/foo/bar", w4));

        long deleteZxid = delete("/foo/bar");
        addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid);
        deleteZxid = delete("/foo");
        addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);

        lsnr_dwatch.verify(expected);
        w1.verify(expected);
        w2.verify(e2);
        w3.verify(e2);
        w4.verify(e2);
        expected.clear();
        e2.clear();
    }

    @Test
    public void testGetDataSyncWObj() throws IOException, InterruptedException, KeeperException {
        SimpleWatcher w1 = new SimpleWatcher(null);
        SimpleWatcher w2 = new SimpleWatcher(null);
        SimpleWatcher w3 = new SimpleWatcher(null);
        SimpleWatcher w4 = new SimpleWatcher(null);

        List<WatchedEvent> e2 = new ArrayList<>();

        try {
            lsnr.getData("/foo", w1, null);
            fail();
        } catch (KeeperException e) {
            assertEquals(KeeperException.Code.NONODE, e.code());
            assertEquals("/foo", e.getPath());
        }
        try {
            lsnr.getData("/foo/bar", w2, null);
            fail();
        } catch (KeeperException e) {
            assertEquals(KeeperException.Code.NONODE, e.code());
            assertEquals("/foo/bar", e.getPath());
        }

        client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertNotNull(lsnr.getData("/foo", true, null));
        assertNotNull(lsnr.getData("/foo", w1, null));
        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertNotNull(lsnr.getData("/foo/bar", w2, null));
        assertNotNull(lsnr.getData("/foo/bar", w3, null));
        assertNotNull(lsnr.getData("/foo/bar", w4, null));
        assertNotNull(lsnr.getData("/foo/bar", w4, null));

        Stat stat = client.setData("/foo", "parent".getBytes(), -1);
        addEvent(expected, EventType.NodeDataChanged, "/foo", stat);
        stat = client.setData("/foo/bar", "child".getBytes(), -1);
        addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat);

        lsnr_dwatch.verify(expected);
        w1.verify(expected);
        w2.verify(e2);
        w3.verify(e2);
        w4.verify(e2);
        expected.clear();
        e2.clear();

        assertNotNull(lsnr.getData("/foo", true, null));
        assertNotNull(lsnr.getData("/foo", w1, null));
        assertNotNull(lsnr.getData("/foo/bar", w2, null));
        assertNotNull(lsnr.getData("/foo/bar", w3, null));
        assertNotNull(lsnr.getData("/foo/bar", w3, null));
        assertNotNull(lsnr.getData("/foo/bar", w4, null));

        long deleteZxid = delete("/foo/bar");
        addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid);
        deleteZxid = delete("/foo");
        addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);

        lsnr_dwatch.verify(expected);
        w1.verify(expected);
        w2.verify(e2);
        w3.verify(e2);
        w4.verify(e2);
        expected.clear();
        e2.clear();
    }

    @Test
    public void testGetChildrenSyncWObj() throws IOException, InterruptedException, KeeperException {
        SimpleWatcher w1 = new SimpleWatcher(null);
        SimpleWatcher w2 = new SimpleWatcher(null);
        SimpleWatcher w3 = new SimpleWatcher(null);
        SimpleWatcher w4 = new SimpleWatcher(null);

        List<WatchedEvent> e2 = new ArrayList<>();

        try {
            lsnr.getChildren("/foo", true);
            fail();
        } catch (KeeperException e) {
            assertEquals(KeeperException.Code.NONODE, e.code());
            assertEquals("/foo", e.getPath());
        }
        try {
            lsnr.getChildren("/foo/bar", true);
            fail();
        } catch (KeeperException e) {
            assertEquals(KeeperException.Code.NONODE, e.code());
            assertEquals("/foo/bar", e.getPath());
        }

        client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertNotNull(lsnr.getChildren("/foo", true));
        assertNotNull(lsnr.getChildren("/foo", w1));

        Stat stat = new Stat();
        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
        addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo
        assertNotNull(lsnr.getChildren("/foo/bar", w2));
        assertNotNull(lsnr.getChildren("/foo/bar", w2));
        assertNotNull(lsnr.getChildren("/foo/bar", w3));
        assertNotNull(lsnr.getChildren("/foo/bar", w4));

        client.setData("/foo", "parent".getBytes(), -1);
        client.setData("/foo/bar", "child".getBytes(), -1);

        assertNotNull(lsnr.exists("/foo", true));
        assertNotNull(lsnr.exists("/foo", w1));
        assertNotNull(lsnr.exists("/foo", true));
        assertNotNull(lsnr.exists("/foo", w1));

        assertNotNull(lsnr.getChildren("/foo", true));
        assertNotNull(lsnr.getChildren("/foo", w1));
        assertNotNull(lsnr.getChildren("/foo/bar", w2));
        assertNotNull(lsnr.getChildren("/foo/bar", w3));
        assertNotNull(lsnr.getChildren("/foo/bar", w4));
        assertNotNull(lsnr.getChildren("/foo/bar", w4));

        long deleteZxid = delete("/foo/bar");
        addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid);
        addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo
        deleteZxid = delete("/foo");
        addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);

        lsnr_dwatch.verify(expected);
        w1.verify(expected);
        w2.verify(e2);
        w3.verify(e2);
        w4.verify(e2);
        expected.clear();
        e2.clear();
    }

}