AsyncOps.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.AsyncCallback.ACLCallback;
import org.apache.zookeeper.AsyncCallback.Children2Callback;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.Create2Callback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.MultiCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
public class AsyncOps {
/**
* This is the base class for all of the async callback classes. It will
* verify the expected value against the actual value.
*
* Basic operation is that the subclasses will generate an "expected" value
* which is defined by the "toString" method of the subclass. This is
* passed through to the verify clause by specifying it as the ctx object
* of each async call (processResult methods get the ctx as part of
* the callback). Additionally the callback will also overwrite any
* instance fields with matching parameter arguments to the processResult
* method. The cb instance can then compare the expected to the
* actual value by again calling toString and comparing the two.
*
* The format of each expected value differs (is defined) by subclass.
* Generally the expected value starts with the result code (rc) and path
* of the node being operated on, followed by the fields specific to
* each operation type (cb subclass). For example ChildrenCB specifies
* a list of the expected children suffixed onto the rc and path. See
* the toString() method of each subclass for details of it's format.
*/
public abstract static class AsyncCB {
protected final ZooKeeper zk;
protected long defaultTimeoutMillis = 30000;
/** the latch is used to await the results from the server */
CountDownLatch latch;
Code rc = Code.OK;
String path = "/foo";
String expected;
public AsyncCB(ZooKeeper zk, CountDownLatch latch) {
this.zk = zk;
this.latch = latch;
}
public void setRC(Code rc) {
this.rc = rc;
}
public void setPath(String path) {
this.path = path;
}
public void processResult(Code rc, String path, Object ctx) {
this.rc = rc;
this.path = path;
this.expected = (String) ctx;
latch.countDown();
}
/** String format is rc:path:<suffix> where <suffix> is defined by each
* subclass individually. */
@Override
public String toString() {
return rc + ":" + path + ":";
}
protected void verify() {
try {
latch.await(defaultTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
fail("unexpected interrupt");
}
// on the lookout for timeout
assertSame(0L, latch.getCount());
String actual = toString();
assertEquals(expected, actual);
}
}
public static class StringCB extends AsyncCB implements StringCallback {
byte[] data = new byte[10];
List<ACL> acl = Ids.CREATOR_ALL_ACL;
CreateMode flags = CreateMode.PERSISTENT;
String name = path;
StringCB(ZooKeeper zk) {
this(zk, new CountDownLatch(1));
}
StringCB(ZooKeeper zk, CountDownLatch latch) {
super(zk, latch);
}
public void setPath(String path) {
super.setPath(path);
this.name = path;
}
public String nodeName() {
return path.substring(path.lastIndexOf('/') + 1);
}
public void processResult(int rc, String path, Object ctx, String name) {
this.name = name;
super.processResult(Code.get(rc), path, ctx);
}
public AsyncCB create() {
zk.create(path, data, acl, flags, this, toString());
return this;
}
public AsyncCB createEphemeral() {
zk.create(path, data, acl, CreateMode.EPHEMERAL, this, toString());
return this;
}
public void verifyCreate() {
create();
verify();
}
public void verifyCreateEphemeral() {
createEphemeral();
verify();
}
public void verifyCreateFailure_NodeExists() {
new StringCB(zk).verifyCreate();
rc = Code.NODEEXISTS;
name = null;
zk.create(path, data, acl, flags, this, toString());
verify();
}
public void verifyCreateFailure_NoNode() {
rc = Code.NONODE;
name = null;
path = path + "/bar";
zk.create(path, data, acl, flags, this, toString());
verify();
}
public void verifyCreateFailure_NoChildForEphemeral() {
new StringCB(zk).verifyCreateEphemeral();
rc = Code.NOCHILDRENFOREPHEMERALS;
name = null;
path = path + "/bar";
zk.create(path, data, acl, flags, this, toString());
verify();
}
@Override
public String toString() {
return super.toString() + name;
}
}
public static class ACLCB extends AsyncCB implements ACLCallback {
List<ACL> acl = Ids.CREATOR_ALL_ACL;
int version = 0;
Stat stat = new Stat();
byte[] data = "testing".getBytes();
ACLCB(ZooKeeper zk) {
this(zk, new CountDownLatch(1));
}
ACLCB(ZooKeeper zk, CountDownLatch latch) {
super(zk, latch);
stat.setAversion(0);
stat.setCversion(0);
stat.setEphemeralOwner(0);
stat.setVersion(0);
}
public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
this.acl = acl;
this.stat = stat;
super.processResult(Code.get(rc), path, ctx);
}
public void verifyGetACL() {
new StringCB(zk).verifyCreate();
zk.getACL(path, stat, this, toString());
verify();
}
public void verifyGetACLFailure_NoNode() {
rc = Code.NONODE;
stat = null;
acl = null;
zk.getACL(path, stat, this, toString());
verify();
}
public String toString(List<ACL> acls) {
if (acls == null) {
return "";
}
StringBuilder result = new StringBuilder();
for (ACL acl : acls) {
result.append(acl.getPerms()).append("::");
}
return result.toString();
}
@Override
public String toString() {
return super.toString()
+ toString(acl) + ":"
+ ":" + version
+ ":" + new String(data)
+ ":" + (stat == null ? "null" : stat.getAversion()
+ ":" + stat.getCversion()
+ ":" + stat.getEphemeralOwner()
+ ":" + stat.getVersion());
}
}
public static class ChildrenCB extends AsyncCB implements ChildrenCallback {
List<String> children = new ArrayList<>();
ChildrenCB(ZooKeeper zk) {
this(zk, new CountDownLatch(1));
}
ChildrenCB(ZooKeeper zk, CountDownLatch latch) {
super(zk, latch);
}
public void processResult(int rc, String path, Object ctx, List<String> children) {
this.children = (children == null ? new ArrayList<String>() : children);
Collections.sort(this.children);
super.processResult(Code.get(rc), path, ctx);
}
public StringCB createNode() {
StringCB parent = new StringCB(zk);
parent.verifyCreate();
return parent;
}
public StringCB createNode(StringCB parent) {
String childName = "bar";
return createNode(parent, childName);
}
public StringCB createNode(StringCB parent, String childName) {
StringCB child = new StringCB(zk);
child.setPath(parent.path + "/" + childName);
child.verifyCreate();
return child;
}
public void verifyGetChildrenEmpty() {
StringCB parent = createNode();
path = parent.path;
verify();
}
public void verifyGetChildrenSingle() {
StringCB parent = createNode();
StringCB child = createNode(parent);
path = parent.path;
children.add(child.nodeName());
verify();
}
public void verifyGetChildrenTwo() {
StringCB parent = createNode();
StringCB child1 = createNode(parent, "child1");
StringCB child2 = createNode(parent, "child2");
path = parent.path;
children.add(child1.nodeName());
children.add(child2.nodeName());
verify();
}
public void verifyGetChildrenFailure_NoNode() {
rc = KeeperException.Code.NONODE;
verify();
}
@Override
public void verify() {
zk.getChildren(path, false, this, toString());
super.verify();
}
@Override
public String toString() {
return super.toString() + children.toString();
}
}
public static class Children2CB extends AsyncCB implements Children2Callback {
List<String> children = new ArrayList<>();
Children2CB(ZooKeeper zk) {
this(zk, new CountDownLatch(1));
}
Children2CB(ZooKeeper zk, CountDownLatch latch) {
super(zk, latch);
}
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
this.children = (children == null ? new ArrayList<String>() : children);
Collections.sort(this.children);
super.processResult(Code.get(rc), path, ctx);
}
public StringCB createNode() {
StringCB parent = new StringCB(zk);
parent.verifyCreate();
return parent;
}
public StringCB createNode(StringCB parent) {
String childName = "bar";
return createNode(parent, childName);
}
public StringCB createNode(StringCB parent, String childName) {
StringCB child = new StringCB(zk);
child.setPath(parent.path + "/" + childName);
child.verifyCreate();
return child;
}
public void verifyGetChildrenEmpty() {
StringCB parent = createNode();
path = parent.path;
verify();
}
public void verifyGetChildrenSingle() {
StringCB parent = createNode();
StringCB child = createNode(parent);
path = parent.path;
children.add(child.nodeName());
verify();
}
public void verifyGetChildrenTwo() {
StringCB parent = createNode();
StringCB child1 = createNode(parent, "child1");
StringCB child2 = createNode(parent, "child2");
path = parent.path;
children.add(child1.nodeName());
children.add(child2.nodeName());
verify();
}
public void verifyGetChildrenFailure_NoNode() {
rc = KeeperException.Code.NONODE;
verify();
}
@Override
public void verify() {
zk.getChildren(path, false, this, toString());
super.verify();
}
@Override
public String toString() {
return super.toString() + children.toString();
}
}
public static class Create2CB extends AsyncCB implements Create2Callback {
byte[] data = new byte[10];
List<ACL> acl = Ids.CREATOR_ALL_ACL;
CreateMode flags = CreateMode.PERSISTENT;
String name = path;
Stat stat = new Stat();
Create2CB(ZooKeeper zk) {
this(zk, new CountDownLatch(1));
}
Create2CB(ZooKeeper zk, CountDownLatch latch) {
super(zk, latch);
}
public void setPath(String path) {
super.setPath(path);
this.name = path;
}
public String nodeName() {
return path.substring(path.lastIndexOf('/') + 1);
}
public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
this.name = name;
this.stat = stat;
super.processResult(Code.get(rc), path, ctx);
}
public AsyncCB create() {
zk.create(path, data, acl, flags, this, toString());
return this;
}
public void verifyCreate() {
create();
verify();
}
public void verifyCreateFailure_NodeExists() {
new Create2CB(zk).verifyCreate();
rc = Code.NODEEXISTS;
name = null;
stat = null;
zk.create(path, data, acl, flags, this, toString());
verify();
}
public void verifyCreateFailure_NoNode() {
rc = Code.NONODE;
name = null;
stat = null;
path = path + "/bar";
zk.create(path, data, acl, flags, this, toString());
verify();
}
public void verifyCreateFailure_NoChildForEphemeral() {
new StringCB(zk).verifyCreateEphemeral();
rc = Code.NOCHILDRENFOREPHEMERALS;
name = null;
stat = null;
path = path + "/bar";
zk.create(path, data, acl, flags, this, toString());
verify();
}
@Override
public String toString() {
return super.toString()
+ name + ":"
+ (stat == null
? "null"
: stat.getAversion()
+ ":" + stat.getCversion()
+ ":" + stat.getEphemeralOwner()
+ ":" + stat.getVersion());
}
}
public static class DataCB extends AsyncCB implements DataCallback {
byte[] data = new byte[10];
Stat stat = new Stat();
DataCB(ZooKeeper zk) {
this(zk, new CountDownLatch(1));
}
DataCB(ZooKeeper zk, CountDownLatch latch) {
super(zk, latch);
stat.setAversion(0);
stat.setCversion(0);
stat.setEphemeralOwner(0);
stat.setVersion(0);
}
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
this.data = data;
this.stat = stat;
super.processResult(Code.get(rc), path, ctx);
}
public void verifyGetData() {
new StringCB(zk).verifyCreate();
zk.getData(path, false, this, toString());
verify();
}
public void verifyGetDataFailure_NoNode() {
rc = KeeperException.Code.NONODE;
data = null;
stat = null;
zk.getData(path, false, this, toString());
verify();
}
@Override
public String toString() {
return super.toString()
+ ":" + (data == null ? "null" : new String(data))
+ ":" + (stat == null ? "null" : stat.getAversion()
+ ":" + stat.getCversion()
+ ":" + stat.getEphemeralOwner()
+ ":" + stat.getVersion());
}
}
public static class StatCB extends AsyncCB implements StatCallback {
List<ACL> acl = Ids.CREATOR_ALL_ACL;
int version = 0;
Stat stat = new Stat();
byte[] data = "testing".getBytes();
StatCB(ZooKeeper zk) {
this(zk, new CountDownLatch(1));
}
StatCB(ZooKeeper zk, CountDownLatch latch) {
super(zk, latch);
stat.setAversion(0);
stat.setCversion(0);
stat.setEphemeralOwner(0);
stat.setVersion(0);
}
public void processResult(int rc, String path, Object ctx, Stat stat) {
this.stat = stat;
super.processResult(Code.get(rc), path, ctx);
}
public void verifySetACL() {
stat.setAversion(1);
new StringCB(zk).verifyCreate();
zk.setACL(path, acl, version, this, toString());
verify();
}
public void verifySetACLFailure_NoNode() {
rc = KeeperException.Code.NONODE;
stat = null;
zk.setACL(path, acl, version, this, toString());
verify();
}
public void verifySetACLFailure_BadVersion() {
new StringCB(zk).verifyCreate();
rc = Code.BADVERSION;
stat = null;
zk.setACL(path, acl, version + 1, this, toString());
verify();
}
public void setData() {
zk.setData(path, data, version, this, toString());
}
public void verifySetData() {
stat.setVersion(1);
new StringCB(zk).verifyCreate();
setData();
verify();
}
public void verifySetDataFailure_NoNode() {
rc = KeeperException.Code.NONODE;
stat = null;
zk.setData(path, data, version, this, toString());
verify();
}
public void verifySetDataFailure_BadVersion() {
new StringCB(zk).verifyCreate();
rc = Code.BADVERSION;
stat = null;
zk.setData(path, data, version + 1, this, toString());
verify();
}
public void verifyExists() {
new StringCB(zk).verifyCreate();
zk.exists(path, false, this, toString());
verify();
}
public void verifyExistsFailure_NoNode() {
rc = KeeperException.Code.NONODE;
stat = null;
zk.exists(path, false, this, toString());
verify();
}
@Override
public String toString() {
return super.toString() + version
+ ":" + new String(data)
+ ":" + (stat == null ? "null" : stat.getAversion()
+ ":" + stat.getCversion()
+ ":" + stat.getEphemeralOwner()
+ ":" + stat.getVersion());
}
}
public static class VoidCB extends AsyncCB implements VoidCallback {
int version = 0;
VoidCB(ZooKeeper zk) {
this(zk, new CountDownLatch(1));
}
VoidCB(ZooKeeper zk, CountDownLatch latch) {
super(zk, latch);
}
public void processResult(int rc, String path, Object ctx) {
super.processResult(Code.get(rc), path, ctx);
}
public void delete() {
zk.delete(path, version, this, toString());
}
public void verifyDelete() {
new StringCB(zk).verifyCreate();
delete();
verify();
}
public void verifyDeleteFailure_NoNode() {
rc = Code.NONODE;
zk.delete(path, version, this, toString());
verify();
}
public void verifyDeleteFailure_BadVersion() {
new StringCB(zk).verifyCreate();
rc = Code.BADVERSION;
zk.delete(path, version + 1, this, toString());
verify();
}
public void verifyDeleteFailure_NotEmpty() {
StringCB scb = new StringCB(zk);
scb.create();
scb.setPath(path + "/bar");
scb.create();
rc = Code.NOTEMPTY;
zk.delete(path, version, this, toString());
verify();
}
public void sync() {
zk.sync(path, this, toString());
}
public void verifySync() {
sync();
verify();
}
@Override
public String toString() {
return super.toString() + version;
}
}
public static class MultiCB implements MultiCallback {
ZooKeeper zk;
int rc;
List<OpResult> opResults;
final CountDownLatch latch = new CountDownLatch(1);
MultiCB(ZooKeeper zk) {
this.zk = zk;
}
public void processResult(int rc, String path, Object ctx, List<OpResult> opResults) {
this.rc = rc;
this.opResults = opResults;
latch.countDown();
}
void latch_await() {
try {
latch.await(10000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
fail("unexpected interrupt");
}
assertSame(0L, latch.getCount());
}
public void verifyMulti() {
List<Op> ops = Arrays.asList(
Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
Op.delete("/multi", -1));
zk.multi(ops, this, null);
latch_await();
assertEquals(this.rc, KeeperException.Code.OK.intValue());
assertTrue(this.opResults.get(0) instanceof OpResult.CreateResult);
assertTrue(this.opResults.get(1) instanceof OpResult.DeleteResult);
}
public void verifyMultiFailure_AllErrorResult() {
List<Op> ops = Arrays.asList(
Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
Op.delete("/nonexist1", -1), Op.setData("/multi", "test".getBytes(), -1));
zk.multi(ops, this, null);
latch_await();
assertTrue(this.opResults.get(0) instanceof OpResult.ErrorResult);
assertTrue(this.opResults.get(1) instanceof OpResult.ErrorResult);
assertTrue(this.opResults.get(2) instanceof OpResult.ErrorResult);
}
public void verifyMultiFailure_NoSideEffect() throws KeeperException, InterruptedException {
List<Op> ops = Arrays.asList(
Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
Op.delete("/nonexist1", -1));
zk.multi(ops, this, null);
latch_await();
assertTrue(this.opResults.get(0) instanceof OpResult.ErrorResult);
assertNull(zk.exists("/multi", false));
}
public void verifyMultiSequential_NoSideEffect() throws Exception {
StringCB scb = new StringCB(zk);
scb.verifyCreate();
String path = scb.path + "-";
String seqPath = path + "0000000002";
zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
assertNotNull(zk.exists(path + "0000000001", false));
List<Op> ops = Arrays.asList(
Op.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL),
Op.delete("/nonexist", -1));
zk.multi(ops, this, null);
latch_await();
assertNull(zk.exists(seqPath, false));
zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
assertNotNull(zk.exists(seqPath, false));
}
}
}