ClusterValuesCommandsTest.java
package redis.clients.jedis.commands.jedis;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import io.redis.test.annotations.ConditionalOnEnv;
import io.redis.test.annotations.SinceRedisVersion;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.BuilderFactory;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.CommandObject;
import redis.clients.jedis.GeoCoordinate;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.ScanIteration;
import redis.clients.jedis.args.GeoUnit;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GeoRadiusStoreParam;
import redis.clients.jedis.resps.ScanResult;
import redis.clients.jedis.util.KeyValue;
import redis.clients.jedis.util.TestEnvUtil;
public class ClusterValuesCommandsTest extends ClusterJedisCommandsTestBase {
@Test
public void nullKeys() {
byte[] bfoo = new byte[]{0x0b, 0x0f, 0x00, 0x00};
try {
cluster.exists((byte[]) null);
fail();
} catch (NullPointerException e) {
// expected
}
try {
cluster.exists(bfoo, null);
fail();
} catch (NullPointerException e) {
// expected
}
try {
cluster.exists(null, bfoo);
fail();
} catch (NullPointerException e) {
// expected
}
}
@Test
public void testHincrByFloat() {
Double value = cluster.hincrByFloat("foo", "bar", 1.5d);
assertEquals((Double) 1.5d, value);
value = cluster.hincrByFloat("foo", "bar", -1.5d);
assertEquals((Double) 0d, value);
value = cluster.hincrByFloat("foo", "bar", -10.7d);
assertEquals(Double.valueOf(-10.7d), value);
}
@Test
public void georadiusStore() {
// prepare datas
Map<String, GeoCoordinate> coordinateMap = new HashMap<String, GeoCoordinate>();
coordinateMap.put("Palermo", new GeoCoordinate(13.361389, 38.115556));
coordinateMap.put("Catania", new GeoCoordinate(15.087269, 37.502669));
cluster.geoadd("{Sicily}", coordinateMap);
long size = cluster.georadiusStore("{Sicily}", 15, 37, 200, GeoUnit.KM,
GeoRadiusParam.geoRadiusParam(),
GeoRadiusStoreParam.geoRadiusStoreParam().store("{Sicily}Store"));
assertEquals(2, size);
List<String> expected = new ArrayList<String>();
expected.add("Palermo");
expected.add("Catania");
assertEquals(expected, cluster.zrange("{Sicily}Store", 0, -1));
}
private void publishOne(final String channel, final String message) {
Thread t = new Thread(new Runnable() {
public void run() {
try {
cluster.publish(channel, message);
} catch (Exception ex) {
}
}
});
t.start();
}
@Test
public void subscribe() throws InterruptedException {
cluster.subscribe(new JedisPubSub() {
public void onMessage(String channel, String message) {
assertEquals("foo", channel);
assertEquals("exit", message);
unsubscribe();
}
public void onSubscribe(String channel, int subscribedChannels) {
assertEquals("foo", channel);
assertEquals(1, subscribedChannels);
// now that I'm subscribed... publish
publishOne("foo", "exit");
}
public void onUnsubscribe(String channel, int subscribedChannels) {
assertEquals("foo", channel);
assertEquals(0, subscribedChannels);
}
}, "foo");
}
@Test
public void rawPingBroadcast() {
String reply = cluster.broadcastCommand(
new CommandObject<>(new CommandArguments(Protocol.Command.PING), BuilderFactory.STRING));
assertEquals("PONG", reply);
}
@Test
public void pingBroadcast() {
assertEquals("PONG", cluster.ping());
}
@Test
public void info() {
String info = cluster.info();
assertThat(info, notNullValue());
info = cluster.info("server");
assertThat(info, notNullValue());
}
@Test
public void flushAllBroadcast() {
assertNull(cluster.get("foo"));
assertEquals("OK", cluster.set("foo", "bar"));
assertEquals("bar", cluster.get("foo"));
cluster.flushAll();
assertNull(cluster.get("foo"));
}
@Test
public void scanIteration() {
Set<String> allIn = new HashSet<>(26 * 26);
char[] arr = new char[2];
for (int i = 0; i < 26; i++) {
arr[0] = (char) ('a' + i);
for (int j = 0; j < 26; j++) {
arr[1] = (char) ('a' + j);
String str = new String(arr);
cluster.incr(str);
allIn.add(str);
}
}
Set<String> allScan = new HashSet<>();
ScanIteration scan = cluster.scanIteration(10, "*");
while (!scan.isIterationCompleted()) {
ScanResult<String> batch = scan.nextBatch();
allScan.addAll(batch.getResult());
}
assertEquals(allIn, allScan);
Set<String> allTypeScan = new HashSet<>();
ScanIteration typeScan = cluster.scanIteration(10, "*", "string");
while (!typeScan.isIterationCompleted()) {
ScanResult<String> batch = typeScan.nextBatch();
allTypeScan.addAll(batch.getResult());
}
assertEquals(allIn, allTypeScan);
}
@Test
public void scanIterationCollect() {
Set<String> allIn = new HashSet<>(26 * 26);
char[] arr = new char[2];
for (int i = 0; i < 26; i++) {
arr[0] = (char) ('a' + i);
for (int j = 0; j < 26; j++) {
arr[1] = (char) ('a' + j);
String str = new String(arr);
cluster.incr(str);
allIn.add(str);
}
}
assertEquals(allIn, cluster.scanIteration(100, "*").collect(new HashSet<>(26 * 26)));
}
@Test
public void dbSizeAggregation() {
// Set some keys across the cluster (different hash slots)
cluster.set("key1", "value1");
cluster.set("key2", "value2");
cluster.set("key3", "value3");
// dbSize should return sum of keys across all shards
long dbSize = cluster.dbSize();
assertTrue(dbSize >= 3);
}
@Test
public void msetCrossShard() {
// MSET with keys on different shards (MULTI_SHARD policy)
// Using keys without hash tags to distribute across shards
assertEquals("OK", cluster.mset("mset_key_a", "value_a", "mset_key_b", "value_b", "mset_key_c", "value_c"));
// Verify all keys were set
assertEquals("value_a", cluster.get("mset_key_a"));
assertEquals("value_b", cluster.get("mset_key_b"));
assertEquals("value_c", cluster.get("mset_key_c"));
}
@Test
public void scriptExistsAggregation() {
String script = "return 1";
String sampleKey = "testKey";
// Load a script to get its SHA1
String sha1 = cluster.scriptLoad(script, sampleKey);
// Verify it exists (single SHA1 check - returns Boolean, aggregated via AGG_LOGICAL_AND)
assertTrue(cluster.scriptExists(sha1, sampleKey));
// Test with multiple SHA1s - one exists, one doesn't
String unknownSha1 = "0000000000000000000000000000000000000000";
List<Boolean> results = cluster.scriptExists(sampleKey, sha1, unknownSha1);
assertEquals(2, results.size());
assertTrue(results.get(0)); // Known script exists
assertFalse(results.get(1)); // Unknown script doesn't exist
}
@Test
@SinceRedisVersion(value = "8.2.0", message = "CLUSTER SLOT-STATS requires Redis 8.2 or later")
@ConditionalOnEnv(value = TestEnvUtil.ENV_REDIS_ENTERPRISE, enabled = false)
public void clusterSlotStatsAggregation() {
// Set some keys across the cluster to ensure slots have data
cluster.set("key1", "value1");
cluster.set("key2", "value2");
cluster.set("key3", "value3");
// Use broadcastCommand to send to all shards and aggregate with DEFAULT policy
// CLUSTER SLOT-STATS SLOTSRANGE returns a list (array) of slot statistics from each shard
// Each element is [slot_number, {key-count: N, cpu-usec: N, ...}]
// The DEFAULT response policy should concatenate lists from all nodes
List<Object> result = cluster.broadcastCommand(
new CommandObject<>(
new CommandArguments(Protocol.Command.CLUSTER).add("SLOT-STATS").add("SLOTSRANGE").add(0).add(16383),
BuilderFactory.RAW_OBJECT_LIST));
// Verify we got aggregated results from multiple shards
assertThat(result, notNullValue());
// The result should be a concatenated list containing slot statistics from all shards
// In a 3-shard cluster, each shard returns stats only for slots it owns
// so the merged list should contain entries from all 16384 slots
assertFalse(result.isEmpty(), "Should have aggregated slot statistics from cluster nodes");
// Verify the aggregated list contains entries from all shards
assertEquals(16384, result.size(), "Aggregated list should contain slot statistics from multiple shards");
}
@Test
@SinceRedisVersion(value = "7.2.0", message = "WAITAOF requires Redis 7.2 or later")
@ConditionalOnEnv(value = TestEnvUtil.ENV_REDIS_ENTERPRISE, enabled = false)
public void waitAOFAggregation() {
// Set some keys across the cluster to ensure there's data to sync
cluster.set("key1", "value1");
cluster.set("key2", "value2");
cluster.set("key3", "value3");
// Use broadcastCommand to send WAITAOF to all shards and aggregate with AGG_MIN policy
// WAITAOF returns a KeyValue<Long, Long> where:
// - key = number of local AOF syncs
// - value = number of replica AOF syncs
// The AGG_MIN response policy should return the minimum values across all shards
KeyValue<Long, Long> result = cluster.broadcastCommand(
new CommandObject<>(
new CommandArguments(Protocol.Command.WAITAOF).add(0).add(0).add(100),
BuilderFactory.LONG_LONG_PAIR));
// Verify we got aggregated results
assertThat(result, notNullValue());
// With numLocal=0 and numReplicas=0, the command should return immediately
// The minimum across all shards should be >= 0 for both values
assertTrue(result.getKey() >= 0, "Local AOF sync count should be >= 0");
assertTrue(result.getValue() >= 0, "Replica AOF sync count should be >= 0");
}
}