ClusterShardedPublishSubscribeCommandsTest.java
package redis.clients.jedis.commands.jedis;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItems;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.HashMap;
import java.util.Map;
import io.redis.test.annotations.SinceRedisVersion;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.BinaryJedisShardedPubSub;
import redis.clients.jedis.Connection;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisShardedPubSub;
import redis.clients.jedis.util.JedisClusterCRC16;
import redis.clients.jedis.util.SafeEncoder;
@SinceRedisVersion(value = "7.0.0", message = "Sharded Pub/Sub")
public class ClusterShardedPublishSubscribeCommandsTest extends ClusterJedisCommandsTestBase {
private void publishOne(final String channel, final String message) {
Thread t = new Thread(() -> cluster.spublish(channel, message));
t.start();
}
@Test
public void subscribe() throws InterruptedException {
cluster.ssubscribe(new JedisShardedPubSub() {
@Override public void onSMessage(String channel, String message) {
assertEquals("foo", channel);
assertEquals("exit", message);
sunsubscribe();
}
@Override public void onSSubscribe(String channel, int subscribedChannels) {
assertEquals("foo", channel);
assertEquals(1, subscribedChannels);
// now that I'm subscribed... publish
publishOne("foo", "exit");
}
@Override public void onSUnsubscribe(String channel, int subscribedChannels) {
assertEquals("foo", channel);
assertEquals(0, subscribedChannels);
}
}, "foo");
}
@Test
public void subscribeMany() {
cluster.ssubscribe(new JedisShardedPubSub() {
@Override public void onSMessage(String channel, String message) {
sunsubscribe(channel);
}
@Override public void onSSubscribe(String channel, int subscribedChannels) {
publishOne(channel, "exit");
}
}, "{foo}", "{foo}bar");
}
@Test
public void pubSubChannels() {
cluster.ssubscribe(new JedisShardedPubSub() {
private int count = 0;
@Override public void onSSubscribe(String channel, int subscribedChannels) {
count++;
// All channels are subscribed
if (count == 3) {
try (Connection conn = cluster.getConnectionFromSlot(JedisClusterCRC16.getSlot("testchan"));
Jedis jedis = new Jedis(conn)) {
assertThat(jedis.pubsubShardChannels(),
hasItems("{testchan}1", "{testchan}2", "{testchan}3"));
}
sunsubscribe();
}
}
}, "{testchan}1", "{testchan}2", "{testchan}3");
}
@Test
public void pubSubChannelsWithPattern() {
cluster.ssubscribe(new JedisShardedPubSub() {
private int count = 0;
@Override public void onSSubscribe(String channel, int subscribedChannels) {
count++;
// All channels are subscribed
if (count == 3) {
try (Connection conn = cluster.getConnectionFromSlot(JedisClusterCRC16.getSlot("testchan"));
Jedis otherJedis = new Jedis(conn)) {
assertThat(otherJedis.pubsubShardChannels("*testchan*"),
hasItems("{testchan}1", "{testchan}2", "{testchan}3"));
}
sunsubscribe();
}
}
}, "{testchan}1", "{testchan}2", "{testchan}3");
}
@Test
public void pubSubNumSub() {
final Map<String, Long> expectedNumSub = new HashMap<>();
expectedNumSub.put("{testchannel}1", 1L);
expectedNumSub.put("{testchannel}2", 1L);
cluster.ssubscribe(new JedisShardedPubSub() {
private int count = 0;
@Override public void onSSubscribe(String channel, int subscribedChannels) {
count++;
if (count == 2) {
try (Connection conn = cluster.getConnectionFromSlot(JedisClusterCRC16.getSlot("testchannel"));
Jedis otherJedis = new Jedis(conn)) {
Map<String, Long> numSub = otherJedis.pubsubShardNumSub("{testchannel}1", "{testchannel}2");
assertEquals(expectedNumSub, numSub);
}
sunsubscribe();
}
}
}, "{testchannel}1", "{testchannel}2");
}
@Test
public void binarySubscribe() {
cluster.ssubscribe(new BinaryJedisShardedPubSub() {
@Override public void onSMessage(byte[] channel, byte[] message) {
assertArrayEquals(SafeEncoder.encode("foo"), channel);
assertArrayEquals(SafeEncoder.encode("exit"), message);
sunsubscribe();
}
@Override public void onSSubscribe(byte[] channel, int subscribedChannels) {
assertArrayEquals(SafeEncoder.encode("foo"), channel);
assertEquals(1, subscribedChannels);
publishOne(SafeEncoder.encode(channel), "exit");
}
@Override public void onSUnsubscribe(byte[] channel, int subscribedChannels) {
assertArrayEquals(SafeEncoder.encode("foo"), channel);
assertEquals(0, subscribedChannels);
}
}, SafeEncoder.encode("foo"));
}
@Test
public void binarySubscribeMany() {
cluster.ssubscribe(new BinaryJedisShardedPubSub() {
@Override public void onSMessage(byte[] channel, byte[] message) {
sunsubscribe(channel);
}
@Override public void onSSubscribe(byte[] channel, int subscribedChannels) {
publishOne(SafeEncoder.encode(channel), "exit");
}
}, SafeEncoder.encode("{foo}"), SafeEncoder.encode("{foo}bar"));
}
}