DistributedQueueTest.java
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.recipes.queue;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.NoSuchElementException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
/**
* Tests for {@link DistributedQueue}.
*/
public class DistributedQueueTest extends ClientBase {
@AfterEach
public void tearDown() throws Exception {
super.tearDown();
}
@Test
public void testOffer1() throws Exception {
String dir = "/testOffer1";
String testString = "Hello World";
final int numClients = 1;
ZooKeeper[] clients = new ZooKeeper[numClients];
DistributedQueue[] queueHandles = new DistributedQueue[numClients];
for (int i = 0; i < clients.length; i++) {
clients[i] = createClient();
queueHandles[i] = new DistributedQueue(clients[i], dir, null);
}
queueHandles[0].offer(testString.getBytes(UTF_8));
byte[] dequeuedBytes = queueHandles[0].remove();
assertEquals(new String(dequeuedBytes, UTF_8), testString);
}
@Test
public void testOffer2() throws Exception {
String dir = "/testOffer2";
String testString = "Hello World";
final int numClients = 2;
ZooKeeper[] clients = new ZooKeeper[numClients];
DistributedQueue[] queueHandles = new DistributedQueue[numClients];
for (int i = 0; i < clients.length; i++) {
clients[i] = createClient();
queueHandles[i] = new DistributedQueue(clients[i], dir, null);
}
queueHandles[0].offer(testString.getBytes(UTF_8));
byte[] dequeuedBytes = queueHandles[1].remove();
assertEquals(new String(dequeuedBytes, UTF_8), testString);
}
@Test
public void testTake1() throws Exception {
String dir = "/testTake1";
String testString = "Hello World";
final int numClients = 1;
ZooKeeper[] clients = new ZooKeeper[numClients];
DistributedQueue[] queueHandles = new DistributedQueue[numClients];
for (int i = 0; i < clients.length; i++) {
clients[i] = createClient();
queueHandles[i] = new DistributedQueue(clients[i], dir, null);
}
queueHandles[0].offer(testString.getBytes(UTF_8));
byte[] dequeuedBytes = queueHandles[0].take();
assertEquals(new String(dequeuedBytes, UTF_8), testString);
}
@Test
public void testRemove1() throws Exception {
String dir = "/testRemove1";
final int numClients = 1;
ZooKeeper[] clients = new ZooKeeper[numClients];
DistributedQueue[] queueHandles = new DistributedQueue[numClients];
for (int i = 0; i < clients.length; i++) {
clients[i] = createClient();
queueHandles[i] = new DistributedQueue(clients[i], dir, null);
}
try {
queueHandles[0].remove();
} catch (NoSuchElementException e) {
return;
}
fail();
}
public void createNremoveMtest(String dir, int n, int m) throws Exception {
String testString = "Hello World";
final int numClients = 2;
ZooKeeper[] clients = new ZooKeeper[numClients];
DistributedQueue[] queueHandles = new DistributedQueue[numClients];
for (int i = 0; i < clients.length; i++) {
clients[i] = createClient();
queueHandles[i] = new DistributedQueue(clients[i], dir, null);
}
for (int i = 0; i < n; i++) {
String offerString = testString + i;
queueHandles[0].offer(offerString.getBytes(UTF_8));
}
byte[] data = null;
for (int i = 0; i < m; i++) {
data = queueHandles[1].remove();
}
assertNotNull(data);
assertEquals(new String(data, UTF_8), testString + (m - 1));
}
@Test
public void testRemove2() throws Exception {
createNremoveMtest("/testRemove2", 10, 2);
}
@Test
public void testRemove3() throws Exception {
createNremoveMtest("/testRemove3", 1000, 1000);
}
public void createNremoveMelementTest(String dir, int n, int m) throws Exception {
String testString = "Hello World";
final int numClients = 2;
ZooKeeper[] clients = new ZooKeeper[numClients];
DistributedQueue[] queueHandles = new DistributedQueue[numClients];
for (int i = 0; i < clients.length; i++) {
clients[i] = createClient();
queueHandles[i] = new DistributedQueue(clients[i], dir, null);
}
for (int i = 0; i < n; i++) {
String offerString = testString + i;
queueHandles[0].offer(offerString.getBytes(UTF_8));
}
for (int i = 0; i < m; i++) {
queueHandles[1].remove();
}
assertEquals(new String(queueHandles[1].element(), UTF_8), testString + m);
}
@Test
public void testElement1() throws Exception {
createNremoveMelementTest("/testElement1", 1, 0);
}
@Test
public void testElement2() throws Exception {
createNremoveMelementTest("/testElement2", 10, 2);
}
@Test
public void testElement3() throws Exception {
createNremoveMelementTest("/testElement3", 1000, 500);
}
@Test
public void testElement4() throws Exception {
createNremoveMelementTest("/testElement4", 1000, 1000 - 1);
}
@Test
public void testTakeWait1() throws Exception {
String dir = "/testTakeWait1";
final String testString = "Hello World";
final int numClients = 1;
final ZooKeeper[] clients = new ZooKeeper[numClients];
final DistributedQueue[] queueHandles = new DistributedQueue[numClients];
for (int i = 0; i < clients.length; i++) {
clients[i] = createClient();
queueHandles[i] = new DistributedQueue(clients[i], dir, null);
}
final byte[][] takeResult = new byte[1][];
Thread takeThread = new Thread(() -> {
try {
takeResult[0] = queueHandles[0].take();
} catch (KeeperException | InterruptedException ignore) {
// no op
}
});
takeThread.start();
Thread.sleep(1000);
Thread offerThread = new Thread(() -> {
try {
queueHandles[0].offer(testString.getBytes(UTF_8));
} catch (KeeperException | InterruptedException ignore) {
// no op
}
});
offerThread.start();
offerThread.join();
takeThread.join();
assertNotNull(takeResult[0]);
assertEquals(new String(takeResult[0], UTF_8), testString);
}
@Test
public void testTakeWait2() throws Exception {
String dir = "/testTakeWait2";
final String testString = "Hello World";
final int numClients = 1;
final ZooKeeper[] clients = new ZooKeeper[numClients];
final DistributedQueue[] queueHandles = new DistributedQueue[numClients];
for (int i = 0; i < clients.length; i++) {
clients[i] = createClient();
queueHandles[i] = new DistributedQueue(clients[i], dir, null);
}
int numAttempts = 2;
for (int i = 0; i < numAttempts; i++) {
final byte[][] takeResult = new byte[1][];
final String threadTestString = testString + i;
Thread takeThread = new Thread(() -> {
try {
takeResult[0] = queueHandles[0].take();
} catch (KeeperException | InterruptedException ignore) {
// no op
}
});
takeThread.start();
Thread.sleep(1000);
Thread offerThread = new Thread(() -> {
try {
queueHandles[0].offer(threadTestString.getBytes(UTF_8));
} catch (KeeperException | InterruptedException ignore) {
// no op
}
});
offerThread.start();
offerThread.join();
takeThread.join();
assertNotNull(takeResult[0]);
assertEquals(new String(takeResult[0], UTF_8), threadTestString);
}
}
}