ClusterValuesCommandsTest.java

package redis.clients.jedis.commands.jedis;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import io.redis.test.annotations.ConditionalOnEnv;
import io.redis.test.annotations.SinceRedisVersion;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.BuilderFactory;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.CommandObject;
import redis.clients.jedis.GeoCoordinate;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.ScanIteration;
import redis.clients.jedis.args.GeoUnit;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GeoRadiusStoreParam;
import redis.clients.jedis.resps.ScanResult;
import redis.clients.jedis.util.KeyValue;
import redis.clients.jedis.util.TestEnvUtil;

public class ClusterValuesCommandsTest extends ClusterJedisCommandsTestBase {

  @Test
  public void nullKeys() {
    byte[] bfoo = new byte[]{0x0b, 0x0f, 0x00, 0x00};

    try {
      cluster.exists((byte[]) null);
      fail();
    } catch (NullPointerException e) {
      // expected
    }

    try {
      cluster.exists(bfoo, null);
      fail();
    } catch (NullPointerException e) {
      // expected
    }

    try {
      cluster.exists(null, bfoo);
      fail();
    } catch (NullPointerException e) {
      // expected
    }
  }

  @Test
  public void testHincrByFloat() {
    Double value = cluster.hincrByFloat("foo", "bar", 1.5d);
    assertEquals((Double) 1.5d, value);
    value = cluster.hincrByFloat("foo", "bar", -1.5d);
    assertEquals((Double) 0d, value);
    value = cluster.hincrByFloat("foo", "bar", -10.7d);
    assertEquals(Double.valueOf(-10.7d), value);
  }

  @Test
  public void georadiusStore() {
    // prepare datas
    Map<String, GeoCoordinate> coordinateMap = new HashMap<String, GeoCoordinate>();
    coordinateMap.put("Palermo", new GeoCoordinate(13.361389, 38.115556));
    coordinateMap.put("Catania", new GeoCoordinate(15.087269, 37.502669));
    cluster.geoadd("{Sicily}", coordinateMap);

    long size = cluster.georadiusStore("{Sicily}", 15, 37, 200, GeoUnit.KM,
        GeoRadiusParam.geoRadiusParam(),
        GeoRadiusStoreParam.geoRadiusStoreParam().store("{Sicily}Store"));
    assertEquals(2, size);
    List<String> expected = new ArrayList<String>();
    expected.add("Palermo");
    expected.add("Catania");
    assertEquals(expected, cluster.zrange("{Sicily}Store", 0, -1));
  }

  private void publishOne(final String channel, final String message) {
    Thread t = new Thread(new Runnable() {
      public void run() {
        try {
          cluster.publish(channel, message);
        } catch (Exception ex) {
        }
      }
    });
    t.start();
  }

  @Test
  public void subscribe() throws InterruptedException {
    cluster.subscribe(new JedisPubSub() {
      public void onMessage(String channel, String message) {
        assertEquals("foo", channel);
        assertEquals("exit", message);
        unsubscribe();
      }

      public void onSubscribe(String channel, int subscribedChannels) {
        assertEquals("foo", channel);
        assertEquals(1, subscribedChannels);

        // now that I'm subscribed... publish
        publishOne("foo", "exit");
      }

      public void onUnsubscribe(String channel, int subscribedChannels) {
        assertEquals("foo", channel);
        assertEquals(0, subscribedChannels);
      }
    }, "foo");
  }

  @Test
  public void rawPingBroadcast() {
    String reply = cluster.broadcastCommand(
        new CommandObject<>(new CommandArguments(Protocol.Command.PING), BuilderFactory.STRING));
    assertEquals("PONG", reply);
  }

  @Test
  public void pingBroadcast() {
    assertEquals("PONG", cluster.ping());
  }

  @Test
  public void info() {
    String info = cluster.info();
    assertThat(info, notNullValue());

    info = cluster.info("server");
    assertThat(info, notNullValue());
  }

  @Test
  public void flushAllBroadcast() {
    assertNull(cluster.get("foo"));
    assertEquals("OK", cluster.set("foo", "bar"));
    assertEquals("bar", cluster.get("foo"));
    cluster.flushAll();
    assertNull(cluster.get("foo"));
  }

  @Test
  public void scanIteration() {
    Set<String> allIn = new HashSet<>(26 * 26);
    char[] arr = new char[2];
    for (int i = 0; i < 26; i++) {
      arr[0] = (char) ('a' + i);
      for (int j = 0; j < 26; j++) {
        arr[1] = (char) ('a' + j);
        String str = new String(arr);
        cluster.incr(str);
        allIn.add(str);
      }
    }

    Set<String> allScan = new HashSet<>();
    ScanIteration scan = cluster.scanIteration(10, "*");
    while (!scan.isIterationCompleted()) {
      ScanResult<String> batch = scan.nextBatch();
      allScan.addAll(batch.getResult());
    }
    assertEquals(allIn, allScan);

    Set<String> allTypeScan = new HashSet<>();
    ScanIteration typeScan = cluster.scanIteration(10, "*", "string");
    while (!typeScan.isIterationCompleted()) {
      ScanResult<String> batch = typeScan.nextBatch();
      allTypeScan.addAll(batch.getResult());
    }
    assertEquals(allIn, allTypeScan);
  }

  @Test
  public void scanIterationCollect() {
    Set<String> allIn = new HashSet<>(26 * 26);
    char[] arr = new char[2];
    for (int i = 0; i < 26; i++) {
      arr[0] = (char) ('a' + i);
      for (int j = 0; j < 26; j++) {
        arr[1] = (char) ('a' + j);
        String str = new String(arr);
        cluster.incr(str);
        allIn.add(str);
      }
    }

    assertEquals(allIn, cluster.scanIteration(100, "*").collect(new HashSet<>(26 * 26)));
  }

  @Test
  public void dbSizeAggregation() {
    // Set some keys across the cluster (different hash slots)
    cluster.set("key1", "value1");
    cluster.set("key2", "value2");
    cluster.set("key3", "value3");

    // dbSize should return sum of keys across all shards
    long dbSize = cluster.dbSize();
    assertTrue(dbSize >= 3);
  }

  @Test
  public void msetCrossShard() {
    // MSET with keys on different shards (MULTI_SHARD policy)
    // Using keys without hash tags to distribute across shards
    assertEquals("OK", cluster.mset("mset_key_a", "value_a", "mset_key_b", "value_b", "mset_key_c", "value_c"));

    // Verify all keys were set
    assertEquals("value_a", cluster.get("mset_key_a"));
    assertEquals("value_b", cluster.get("mset_key_b"));
    assertEquals("value_c", cluster.get("mset_key_c"));
  }

  @Test
  public void scriptExistsAggregation() {
    String script = "return 1";
    String sampleKey = "testKey";

    // Load a script to get its SHA1
    String sha1 = cluster.scriptLoad(script, sampleKey);

    // Verify it exists (single SHA1 check - returns Boolean, aggregated via AGG_LOGICAL_AND)
    assertTrue(cluster.scriptExists(sha1, sampleKey));

    // Test with multiple SHA1s - one exists, one doesn't
    String unknownSha1 = "0000000000000000000000000000000000000000";
    List<Boolean> results = cluster.scriptExists(sampleKey, sha1, unknownSha1);
    assertEquals(2, results.size());
    assertTrue(results.get(0));  // Known script exists
    assertFalse(results.get(1)); // Unknown script doesn't exist
  }

  @Test
  @SinceRedisVersion(value = "8.2.0", message = "CLUSTER SLOT-STATS requires Redis 8.2 or later")
  @ConditionalOnEnv(value = TestEnvUtil.ENV_REDIS_ENTERPRISE, enabled = false)
  public void clusterSlotStatsAggregation() {
    // Set some keys across the cluster to ensure slots have data
    cluster.set("key1", "value1");
    cluster.set("key2", "value2");
    cluster.set("key3", "value3");

    // Use broadcastCommand to send to all shards and aggregate with DEFAULT policy
    // CLUSTER SLOT-STATS SLOTSRANGE returns a list (array) of slot statistics from each shard
    // Each element is [slot_number, {key-count: N, cpu-usec: N, ...}]
    // The DEFAULT response policy should concatenate lists from all nodes
    List<Object> result = cluster.broadcastCommand(
        new CommandObject<>(
            new CommandArguments(Protocol.Command.CLUSTER).add("SLOT-STATS").add("SLOTSRANGE").add(0).add(16383),
            BuilderFactory.RAW_OBJECT_LIST));

    // Verify we got aggregated results from multiple shards
    assertThat(result, notNullValue());

    // The result should be a concatenated list containing slot statistics from all shards
    // In a 3-shard cluster, each shard returns stats only for slots it owns
    // so the merged list should contain entries from all 16384 slots
    assertFalse(result.isEmpty(), "Should have aggregated slot statistics from cluster nodes");

    // Verify the aggregated list contains entries from all shards
    assertEquals(16384, result.size(), "Aggregated list should contain slot statistics from multiple shards");
  }

  @Test
  @SinceRedisVersion(value = "7.2.0", message = "WAITAOF requires Redis 7.2 or later")
  @ConditionalOnEnv(value = TestEnvUtil.ENV_REDIS_ENTERPRISE, enabled = false)
  public void waitAOFAggregation() {
    // Set some keys across the cluster to ensure there's data to sync
    cluster.set("key1", "value1");
    cluster.set("key2", "value2");
    cluster.set("key3", "value3");

    // Use broadcastCommand to send WAITAOF to all shards and aggregate with AGG_MIN policy
    // WAITAOF returns a KeyValue<Long, Long> where:
    //   - key = number of local AOF syncs
    //   - value = number of replica AOF syncs
    // The AGG_MIN response policy should return the minimum values across all shards
    KeyValue<Long, Long> result = cluster.broadcastCommand(
        new CommandObject<>(
            new CommandArguments(Protocol.Command.WAITAOF).add(0).add(0).add(100),
            BuilderFactory.LONG_LONG_PAIR));

    // Verify we got aggregated results
    assertThat(result, notNullValue());

    // With numLocal=0 and numReplicas=0, the command should return immediately
    // The minimum across all shards should be >= 0 for both values
    assertTrue(result.getKey() >= 0, "Local AOF sync count should be >= 0");
    assertTrue(result.getValue() >= 0, "Replica AOF sync count should be >= 0");
  }
}