SubkeyNotificationsBinaryTestBase.java
package redis.clients.jedis.commands.unified;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static redis.clients.jedis.util.PubSubHelpers.concat;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.RedisProtocol;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.util.PubSubHelpers;
import redis.clients.jedis.util.PubSubHelpers.CapturingBinaryPubSub;
/**
* Binary integration tests for the Redis 8.8 Subkey Notifications feature against
* {@link UnifiedJedis}.
*/
@Tag("integration")
public abstract class SubkeyNotificationsBinaryTestBase extends UnifiedJedisCommandsTestBase {
private String originalNotifyConfig;
private UnifiedJedis subscriber;
private CapturingBinaryPubSub pubSub;
private Thread subscriberThread;
public SubkeyNotificationsBinaryTestBase(RedisProtocol protocol) {
super(protocol);
}
@BeforeEach
public void enableSubkeyNotifications() {
try (Jedis control = new Jedis(endpoint.getHostAndPort(),
endpoint.getClientConfigBuilder().autoNegotiateProtocol(false).build())) {
Map<String, String> current = control.configGet("notify-keyspace-events");
originalNotifyConfig = current.getOrDefault("notify-keyspace-events", "");
}
try {
jedis.configSet("notify-keyspace-events", "AKEhSTIV");
} catch (JedisDataException e) {
Assumptions
.abort("Server does not support subkey notification flags (STIV): " + e.getMessage());
}
}
@AfterEach
public void closeSubscriber() {
if (pubSub != null) {
try {
pubSub.unsubscribe();
} catch (Exception ignore) {
/* best-effort */ }
try {
pubSub.punsubscribe();
} catch (Exception ignore) {
/* best-effort */ }
}
if (subscriberThread != null) {
try {
subscriberThread.join(2_000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (subscriber != null) {
try {
subscriber.close();
} catch (Exception ignore) {
/* best-effort */ }
}
if (jedis != null && originalNotifyConfig != null) {
try {
jedis.configSet("notify-keyspace-events", originalNotifyConfig);
} catch (Exception ignore) {
/* best-effort */ }
}
}
@Test
public void subkeyspace_subkeyContainingPipeAndNonUtf8() throws InterruptedException {
byte[] hashKey = ("bin-subkeyspace-" + System.nanoTime()).getBytes(StandardCharsets.UTF_8);
byte[] field = new byte[] { 'a', '|', 'b', (byte) 0xFE, 'c' };
byte[] channel = concat(prefix("__subkeyspace@0__:"), hashKey);
subscribeChannels(channel);
jedis.hset(hashKey, field, "v1".getBytes(StandardCharsets.UTF_8));
byte[] payload = pubSub.expectMessageOn(channel);
byte[] expected = concat("hset|".getBytes(StandardCharsets.UTF_8),
(field.length + ":").getBytes(StandardCharsets.UTF_8), field);
assertThat(payload, equalTo(expected));
}
@Test
public void subkeyevent_keyAndSubkeyContainingPipe() throws InterruptedException {
byte[] hashKey = new byte[] { 'h', '|', 'k', (byte) 0x80 };
byte[] field = new byte[] { 'f', '|', 'd' };
byte[] channel = "__subkeyevent@0__:hset".getBytes(StandardCharsets.UTF_8);
subscribeChannels(channel);
jedis.hset(hashKey, field, "v1".getBytes(StandardCharsets.UTF_8));
byte[] payload = pubSub.expectMessageOn(channel);
byte[] expected = concat((hashKey.length + ":").getBytes(StandardCharsets.UTF_8), hashKey,
"|".getBytes(StandardCharsets.UTF_8), (field.length + ":").getBytes(StandardCharsets.UTF_8),
field);
assertThat(payload, equalTo(expected));
}
@Test
public void subkeyspaceitem_channelContainsBinaryBytes() throws InterruptedException {
byte[] hashKey = new byte[] { 'i', (byte) 0xC3, (byte) 0xA9 };
byte[] field = new byte[] { 'f', (byte) 0x80, 'd' };
byte[] pattern = concat(prefix("__subkeyspaceitem@0__:"), hashKey,
"\n*".getBytes(StandardCharsets.UTF_8));
byte[] expectedChannel = concat(prefix("__subkeyspaceitem@0__:"), hashKey,
"\n".getBytes(StandardCharsets.UTF_8), field);
subscribePatterns(pattern);
jedis.hset(hashKey, field, "v1".getBytes(StandardCharsets.UTF_8));
byte[] payload = pubSub.expectMessageOn(expectedChannel);
assertThat(payload, equalTo("hset".getBytes(StandardCharsets.UTF_8)));
}
@Test
public void subkeyspaceevent_keyContainingPipeInChannel() throws InterruptedException {
byte[] hashKey = new byte[] { 'k', '|', 'k', (byte) 0xFF };
byte[] field = new byte[] { 'f', 'l', 'd' };
byte[] channel = concat("__subkeyspaceevent@0__:hset|".getBytes(StandardCharsets.UTF_8),
hashKey);
subscribeChannels(channel);
jedis.hset(hashKey, field, "v1".getBytes(StandardCharsets.UTF_8));
byte[] payload = pubSub.expectMessageOn(channel);
byte[] expected = concat((field.length + ":").getBytes(StandardCharsets.UTF_8), field);
assertThat(payload, equalTo(expected));
}
@Test
public void subkeyevent_keyAndSubkeyContainingNewline() throws InterruptedException {
byte[] hashKey = new byte[] { 'h', '\n', 'k' };
byte[] field = new byte[] { 'f', '\n', 'd' };
byte[] channel = "__subkeyevent@0__:hset".getBytes(StandardCharsets.UTF_8);
subscribeChannels(channel);
jedis.hset(hashKey, field, "v1".getBytes(StandardCharsets.UTF_8));
byte[] payload = pubSub.expectMessageOn(channel);
byte[] expected = concat((hashKey.length + ":").getBytes(StandardCharsets.UTF_8), hashKey,
"|".getBytes(StandardCharsets.UTF_8), (field.length + ":").getBytes(StandardCharsets.UTF_8),
field);
assertThat(payload, equalTo(expected));
}
// -------------------------------------------------------------------- helpers
private void subscribeChannels(byte[]... channels) throws InterruptedException {
pubSub = new CapturingBinaryPubSub();
subscriber = createTestClient();
subscriberThread = new Thread(() -> subscriber.subscribe(pubSub, channels),
"subkey-uj-bin-sub-" + System.nanoTime());
subscriberThread.setDaemon(true);
subscriberThread.start();
PubSubHelpers.awaitSubscribed(pubSub.subscribed);
}
private void subscribePatterns(byte[]... patterns) throws InterruptedException {
pubSub = new CapturingBinaryPubSub();
subscriber = createTestClient();
subscriberThread = new Thread(() -> subscriber.psubscribe(pubSub, patterns),
"subkey-uj-bin-psub-" + System.nanoTime());
subscriberThread.setDaemon(true);
subscriberThread.start();
PubSubHelpers.awaitSubscribed(pubSub.subscribed);
}
private static byte[] prefix(String s) {
return s.getBytes(StandardCharsets.UTF_8);
}
}