ClusterPipeliningTest.java
package redis.clients.jedis;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static redis.clients.jedis.Protocol.CLUSTER_HASHSLOTS;
import java.util.*;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import redis.clients.jedis.args.*;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.*;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.resps.GeoRadiusResponse;
import redis.clients.jedis.resps.StreamEntry;
import redis.clients.jedis.resps.Tuple;
import redis.clients.jedis.util.AssertUtil;
import redis.clients.jedis.util.GeoCoordinateMatcher;
import redis.clients.jedis.util.GeoRadiusResponseMatcher;
import redis.clients.jedis.util.JedisClusterTestUtil;
import redis.clients.jedis.util.SafeEncoder;
public class ClusterPipeliningTest {
private static final String LOCAL_IP = "127.0.0.1";
private static final DefaultJedisClientConfig DEFAULT_CLIENT_CONFIG
= DefaultJedisClientConfig.builder().password("cluster").build();
private static Jedis node1;
private static Jedis node2;
private static Jedis node3;
private static HostAndPort nodeInfo1 = HostAndPorts.getClusterServers().get(0);
private static HostAndPort nodeInfo2 = HostAndPorts.getClusterServers().get(1);
private static HostAndPort nodeInfo3 = HostAndPorts.getClusterServers().get(2);
private Set<HostAndPort> nodes = new HashSet<>(Arrays.asList(nodeInfo1, nodeInfo2, nodeInfo3));
@BeforeAll
public static void setUp() throws InterruptedException {
node1 = new Jedis(nodeInfo1);
node1.auth("cluster");
node1.flushAll();
node2 = new Jedis(nodeInfo2);
node2.auth("cluster");
node2.flushAll();
node3 = new Jedis(nodeInfo3);
node3.auth("cluster");
node3.flushAll();
// add nodes to cluster
node1.clusterMeet(LOCAL_IP, nodeInfo2.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo3.getPort());
// split available slots across the three nodes
int slotsPerNode = CLUSTER_HASHSLOTS / 3;
int[] node1Slots = new int[slotsPerNode];
int[] node2Slots = new int[slotsPerNode + 1];
int[] node3Slots = new int[slotsPerNode];
for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0; i < CLUSTER_HASHSLOTS; i++) {
if (i < slotsPerNode) {
node1Slots[slot1++] = i;
} else if (i > slotsPerNode * 2) {
node3Slots[slot3++] = i;
} else {
node2Slots[slot2++] = i;
}
}
node1.clusterAddSlots(node1Slots);
node2.clusterAddSlots(node2Slots);
node3.clusterAddSlots(node3Slots);
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3);
}
@BeforeEach
public void prepare() {
node1.flushAll();
node2.flushAll();
node3.flushAll();
}
@AfterEach
public void cleanUp() {
node1.flushDB();
node2.flushDB();
node3.flushDB();
}
@AfterAll
public static void tearDown() throws InterruptedException {
node1.flushDB();
node2.flushDB();
node3.flushDB();
node1.clusterReset(ClusterResetType.SOFT);
node2.clusterReset(ClusterResetType.SOFT);
node3.clusterReset(ClusterResetType.SOFT);
}
@Test
public void constructorClientConfig() {
try (ClusterPipeline pipe = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) {
Response<String> r1 = pipe.set("key1", "value1");
Response<String> r2 = pipe.set("key2", "value2");
Response<String> r3 = pipe.set("key3", "value3");
Response<String> r4 = pipe.get("key1");
Response<String> r5 = pipe.get("key2");
Response<String> r6 = pipe.get("key3");
pipe.sync();
assertEquals("OK", r1.get());
assertEquals("OK", r2.get());
assertEquals("OK", r3.get());
assertEquals("value1", r4.get());
assertEquals("value2", r5.get());
assertEquals("value3", r6.get());
}
}
@Test
public void constructorPoolConfig() {
try (ClusterPipeline pipe = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG, new ConnectionPoolConfig())) {
Response<String> r1 = pipe.set("key1", "value1");
Response<String> r2 = pipe.set("key2", "value2");
Response<String> r3 = pipe.set("key3", "value3");
Response<String> r4 = pipe.get("key1");
Response<String> r5 = pipe.get("key2");
Response<String> r6 = pipe.get("key3");
pipe.sync();
assertEquals("OK", r1.get());
assertEquals("OK", r2.get());
assertEquals("OK", r3.get());
assertEquals("value1", r4.get());
assertEquals("value2", r5.get());
assertEquals("value3", r6.get());
}
}
@Test
public void constructorConnectionProvider() {
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline pipeline = new ClusterPipeline(provider)) {
Response<String> r1 = pipeline.set("key1", "value1");
Response<String> r2 = pipeline.set("key2", "value2");
Response<String> r3 = pipeline.set("key3", "value3");
Response<String> r4 = pipeline.get("key1");
Response<String> r5 = pipeline.get("key2");
Response<String> r6 = pipeline.get("key3");
pipeline.sync();
assertEquals("OK", r1.get());
assertEquals("OK", r2.get());
assertEquals("OK", r3.get());
assertEquals("value1", r4.get());
assertEquals("value2", r5.get());
assertEquals("value3", r6.get());
}
}
@Test
public void clusterPipelined() {
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline pipeline = cluster.pipelined()) {
Response<String> r1 = pipeline.set("key1", "value1");
Response<String> r2 = pipeline.set("key2", "value2");
Response<String> r3 = pipeline.set("key3", "value3");
Response<String> r4 = pipeline.get("key1");
Response<String> r5 = pipeline.get("key2");
Response<String> r6 = pipeline.get("key3");
pipeline.sync();
assertEquals("OK", r1.get());
assertEquals("OK", r2.get());
assertEquals("OK", r3.get());
assertEquals("value1", r4.get());
assertEquals("value2", r5.get());
assertEquals("value3", r6.get());
}
}
@Test
public void intermediateSync() {
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline pipeline = cluster.pipelined()) {
Response<String> r1 = pipeline.set("key1", "value1");
Response<String> r2 = pipeline.set("key2", "value2");
Response<String> r3 = pipeline.set("key3", "value3");
pipeline.sync();
assertEquals("OK", r1.get());
assertEquals("OK", r2.get());
assertEquals("OK", r3.get());
Response<String> r4 = pipeline.get("key1");
Response<String> r5 = pipeline.get("key2");
Response<String> r6 = pipeline.get("key3");
pipeline.sync();
assertEquals("value1", r4.get());
assertEquals("value2", r5.get());
assertEquals("value3", r6.get());
}
}
@Test
public void intermediateSyncs() {
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline pipeline = cluster.pipelined()) {
Response<String> r1 = pipeline.set("key1", "value1");
Response<String> r2 = pipeline.set("key2", "value2");
Response<String> r3 = pipeline.set("key3", "value3");
for (int i = 0; i < 100; i++) pipeline.sync();
assertEquals("OK", r1.get());
assertEquals("OK", r2.get());
assertEquals("OK", r3.get());
Response<String> r4 = pipeline.get("key1");
Response<String> r5 = pipeline.get("key2");
Response<String> r6 = pipeline.get("key3");
for (int i = 0; i < 100; i++) pipeline.sync();
assertEquals("value1", r4.get());
assertEquals("value2", r5.get());
assertEquals("value3", r6.get());
}
}
@Test
public void pipelineResponse() {
try (JedisCluster jc = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
jc.set("string", "foo");
jc.lpush("list", "foo");
jc.hset("hash", "foo", "bar");
jc.zadd("zset", 1, "foo");
jc.sadd("set", "foo");
jc.setrange("setrange", 0, "0123456789");
byte[] bytesForSetRange = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
jc.setrange("setrangebytes".getBytes(), 0, bytesForSetRange);
}
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
Response<String> string = p.get("string");
Response<String> list = p.lpop("list");
Response<String> hash = p.hget("hash", "foo");
Response<List<String>> zset = p.zrange("zset", 0, -1);
Response<String> set = p.spop("set");
Response<Boolean> blist = p.exists("list");
Response<Double> zincrby = p.zincrby("zset", 1, "foo");
Response<Long> zcard = p.zcard("zset");
p.lpush("list", "bar");
Response<List<String>> lrange = p.lrange("list", 0, -1);
Response<Map<String, String>> hgetAll = p.hgetAll("hash");
p.sadd("set", "foo");
Response<Set<String>> smembers = p.smembers("set");
Response<List<Tuple>> zrangeWithScores = p.zrangeWithScores("zset", 0, -1);
Response<String> getrange = p.getrange("setrange", 1, 3);
Response<byte[]> getrangeBytes = p.getrange("setrangebytes".getBytes(), 6, 8);
p.sync();
assertEquals("foo", string.get());
assertEquals("foo", list.get());
assertEquals("bar", hash.get());
assertEquals("foo", zset.get().iterator().next());
assertEquals("foo", set.get());
assertEquals(false, blist.get());
assertEquals(Double.valueOf(2), zincrby.get());
assertEquals(Long.valueOf(1), zcard.get());
assertEquals(1, lrange.get().size());
assertNotNull(hgetAll.get().get("foo"));
assertEquals(1, smembers.get().size());
assertEquals(1, zrangeWithScores.get().size());
assertEquals("123", getrange.get());
byte[] expectedGetRangeBytes = {6, 7, 8};
assertArrayEquals(expectedGetRangeBytes, getrangeBytes.get());
}
}
@Test
public void pipelineBinarySafeHashCommands() {
try (JedisCluster jc = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
jc.hset("key".getBytes(), "f1".getBytes(), "v111".getBytes());
jc.hset("key".getBytes(), "f22".getBytes(), "v2222".getBytes());
}
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
Response<Map<byte[], byte[]>> fmap = p.hgetAll("key".getBytes());
Response<Set<byte[]>> fkeys = p.hkeys("key".getBytes());
Response<List<byte[]>> fordered = p.hmget("key".getBytes(), "f22".getBytes(), "f1".getBytes());
Response<List<byte[]>> fvals = p.hvals("key".getBytes());
p.sync();
assertNotNull(fmap.get());
// we have to do these strange contortions because byte[] is not a very good key for a java
// Map. It only works with equality (you need the exact key object to retrieve the value).
// I recommend we switch to using ByteBuffer or something similar:
// http://stackoverflow.com/questions/1058149/using-a-byte-array-as-hashmap-key-java
Map<byte[], byte[]> map = fmap.get();
Set<byte[]> mapKeys = map.keySet();
Iterator<byte[]> iterMap = mapKeys.iterator();
byte[] firstMapKey = iterMap.next();
byte[] secondMapKey = iterMap.next();
assertFalse(iterMap.hasNext());
verifyHasBothValues(firstMapKey, secondMapKey, "f1".getBytes(), "f22".getBytes());
byte[] firstMapValue = map.get(firstMapKey);
byte[] secondMapValue = map.get(secondMapKey);
verifyHasBothValues(firstMapValue, secondMapValue, "v111".getBytes(), "v2222".getBytes());
assertNotNull(fkeys.get());
Iterator<byte[]> iter = fkeys.get().iterator();
byte[] firstKey = iter.next();
byte[] secondKey = iter.next();
assertFalse(iter.hasNext());
verifyHasBothValues(firstKey, secondKey, "f1".getBytes(), "f22".getBytes());
assertNotNull(fordered.get());
assertArrayEquals("v2222".getBytes(), fordered.get().get(0));
assertArrayEquals("v111".getBytes(), fordered.get().get(1));
assertNotNull(fvals.get());
assertEquals(2, fvals.get().size());
byte[] firstValue = fvals.get().get(0);
byte[] secondValue = fvals.get().get(1);
verifyHasBothValues(firstValue, secondValue, "v111".getBytes(), "v2222".getBytes());
}
}
private void verifyHasBothValues(byte[] firstKey, byte[] secondKey, byte[] value1, byte[] value2) {
assertFalse(Arrays.equals(firstKey, secondKey));
assertTrue(Arrays.equals(firstKey, value1) || Arrays.equals(firstKey, value2));
assertTrue(Arrays.equals(secondKey, value1) || Arrays.equals(secondKey, value2));
}
@Test
public void pipelineResponseWithinPipeline() {
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
Response<String> string = p.get("string");
assertThrows(IllegalStateException.class,string::get);
p.sync();
}
}
@Test
public void pipelineWithPubSub() {
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline pipelined = new ClusterPipeline(provider);
Response<Long> p1 = pipelined.publish("foo", "bar");
Response<Long> p2 = pipelined.publish("foo".getBytes(), "bar".getBytes());
pipelined.sync();
assertEquals(0, p1.get().longValue());
assertEquals(0, p2.get().longValue());
}
}
@Test
public void canRetrieveUnsetKey() {
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
Response<String> shouldNotExist = p.get(UUID.randomUUID().toString());
p.sync();
assertNull(shouldNotExist.get());
}
}
@Test
public void piplineWithError() {
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
p.set("foo", "bar");
Response<Set<String>> error = p.smembers("foo");
Response<String> r = p.get("foo");
p.sync();
try {
error.get();
fail();
} catch (JedisDataException e) {
// that is fine we should be here
}
assertEquals(r.get(), "bar");
}
}
@Test
public void getSetParams() {
ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline p = new ClusterPipeline(provider);
Response<String> r1 = p.set("key1", "value1");
Response<String> r2 = p.set("key2", "value2");
Response<String> r3 = p.set("key3", "value3");
Response<String> r4 = p.set("key3", "value4", new SetParams().nx()); // Should not be updated
Response<String> r5 = p.get("key1");
Response<String> r6 = p.get("key2");
Response<String> r7 = p.get("key3");
p.sync();
assertEquals("OK", r1.get());
assertEquals("OK", r2.get());
assertEquals("OK", r3.get());
assertNull(r4.get());
assertEquals("value1", r5.get());
assertEquals("value2", r6.get());
assertEquals("value3", r7.get());
}
@Test
public void clusterPipelineSort() {
List<String> sorted = new ArrayList<>();
sorted.add("1");
sorted.add("2");
sorted.add("3");
sorted.add("4");
sorted.add("5");
ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline p = new ClusterPipeline(provider);
Response<Long> r1 = p.rpush("key1", "2", "3", "5", "1", "4");
Response<List<String>> r2 = p.sort("key1");
Response<Long> r3 = p.sort("key1", "key1");
Response<List<String>> r4 = p.lrange("key1", 0, 4);
Response<List<String>> r5 = p.sort("key1", new SortingParams().limit(0, 2));
Response<Long> r6 = p.sort("key1", new SortingParams().desc(), "key1");
Response<List<String>> r7 = p.lrange("key1", 0, 4);
p.sync();
assertEquals(Long.valueOf(5), r1.get());
assertEquals(sorted, r2.get());
assertEquals(Long.valueOf(5), r3.get());
assertEquals(sorted, r4.get());
assertEquals(2, r5.get().size());
assertEquals(Long.valueOf(5), r6.get());
Collections.reverse(sorted);
assertEquals(sorted, r7.get());
}
@Test
public void clusterPipelineList() {
List<String> vals = new ArrayList<>();
vals.add("foobar");
ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline p = new ClusterPipeline(provider);
Response<Long> r1 = p.lpush("my{list}", "hello", "hello", "foo", "foo"); // ["foo", "foo", "hello", "hello"]
Response<Long> r2 = p.rpush("my{newlist}", "hello", "hello", "foo", "foo"); // ["hello", "hello", "foo", "foo"]
Response<Long> r3 = p.lpos("my{list}", "foo");
Response<Long> r4 = p.lpos("my{list}", "foo", new LPosParams().maxlen(1));
Response<List<Long>> r5 = p.lpos("my{list}", "foo", new LPosParams().maxlen(1), 2);
Response<String> r6 = p.ltrim("my{list}", 2, 3); // ["hello", "hello"]
Response<Long> r7 = p.llen("my{list}");
Response<String> r8 = p.lindex("my{list}", -1);
Response<String> r9 = p.lset("my{list}", 1, "foobar"); // ["hello", "foobar"]
Response<Long> r10 = p.lrem("my{list}", 1, "hello"); // ["foobar"]
Response<List<String>> r11 = p.lrange("my{list}", 0, 10);
Response<String> r12 = p.rpop("my{newlist}"); // ["hello", "hello", "foo"]
Response<List<String>> r13 = p.lpop("my{list}", 1); // ["foobar"]
Response<List<String>> r14 = p.rpop("my{newlist}", 2); // ["hello"]
Response<Long> r15 = p.linsert("my{newlist}", ListPosition.AFTER, "hello", "world"); // ["hello", "world"]
Response<Long> r16 = p.lpushx("myother{newlist}", "foo", "bar");
Response<Long> r17 = p.rpushx("myother{newlist}", "foo", "bar");
Response<String> r18 = p.rpoplpush("my{newlist}", "myother{newlist}");
Response<String> r19 = p.lmove("my{newlist}", "myother{newlist}", ListDirection.LEFT, ListDirection.RIGHT);
p.sync();
assertEquals(Long.valueOf(4), r1.get());
assertEquals(Long.valueOf(4), r2.get());
assertEquals(Long.valueOf(0), r3.get());
assertEquals(Long.valueOf(0), r4.get());
assertEquals(1, r5.get().size());
assertEquals("OK", r6.get());
assertEquals(Long.valueOf(2), r7.get());
assertEquals("hello", r8.get());
assertEquals("OK", r9.get());
assertEquals(Long.valueOf(1), r10.get());
assertEquals(vals, r11.get());
assertEquals("foo", r12.get());
assertEquals(vals, r13.get());
assertEquals(2, r14.get().size());
assertEquals(Long.valueOf(2), r15.get());
assertEquals(Long.valueOf(0), r16.get());
assertEquals(Long.valueOf(0), r17.get());
assertEquals("world", r18.get());
assertEquals("hello", r19.get());
}
@Test
public void clusterPipelineSet() {
Set<String> diff = new HashSet<>();
diff.add("bar");
diff.add("foo");
Set<String> union = new HashSet<>();
union.add("hello");
union.add("world");
union.add("bar");
union.add("foo");
Set<String> inter = new HashSet<>();
inter.add("world");
inter.add("hello");
ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline p = new ClusterPipeline(provider);
Response<Long> r1 = p.sadd("my{set}", "hello", "hello", "world", "foo", "bar");
p.sadd("mynew{set}", "hello", "hello", "world");
Response<Set<String>> r2 = p.sdiff("my{set}", "mynew{set}");
Response<Long> r3deprecated = p.sdiffStore("diffset{set}deprecated", "my{set}", "mynew{set}");
Response<Long> r3 = p.sdiffstore("diffset{set}", "my{set}", "mynew{set}");
Response<Set<String>> r4 = p.smembers("diffset{set}");
Response<Set<String>> r5 = p.sinter("my{set}", "mynew{set}");
Response<Long> r6 = p.sinterstore("interset{set}", "my{set}", "mynew{set}");
Response<Set<String>> r7 = p.smembers("interset{set}");
Response<Set<String>> r8 = p.sunion("my{set}", "mynew{set}");
Response<Long> r9 = p.sunionstore("unionset{set}", "my{set}", "mynew{set}");
Response<Set<String>> r10 = p.smembers("unionset{set}");
Response<Boolean> r11 = p.sismember("my{set}", "foo");
Response<List<Boolean>> r12 = p.smismember("my{set}", "foo", "foobar");
Response<Long> r13 = p.srem("my{set}", "foo");
Response<Set<String>> r14 = p.spop("my{set}", 1);
Response<Long> r15 = p.scard("my{set}");
Response<String> r16 = p.srandmember("my{set}");
Response<List<String>> r17 = p.srandmember("my{set}", 2);
// Response<Long> r18 = p.smove("my{set}", "mynew{set}", "hello");
p.sync();
assertEquals(Long.valueOf(4), r1.get());
assertEquals(diff, r2.get());
assertEquals(Long.valueOf(diff.size()), r3deprecated.get());
assertEquals(Long.valueOf(diff.size()), r3.get());
assertEquals(diff, r4.get());
assertEquals(inter, r5.get());
assertEquals(Long.valueOf(inter.size()), r6.get());
assertEquals(inter, r7.get());
assertEquals(union, r8.get());
assertEquals(Long.valueOf(union.size()), r9.get());
assertEquals(union, r10.get());
assertTrue(r11.get());
assertTrue(r12.get().get(0) && !r12.get().get(1));
assertEquals(Long.valueOf(1), r13.get());
assertTrue(union.containsAll(r14.get()));
assertEquals(Long.valueOf(2), r15.get());
assertTrue(union.contains(r16.get()));
assertTrue(union.containsAll(r17.get()));
// assertEquals(Long.valueOf(1), r18.get());
}
@Test
public void clusterPipelineSortedSet() {
Map<String, Double> hm = new HashMap<>();
hm.put("a1", 1d);
hm.put("a2", 2d);
hm.put("a3", 3d);
Set<String> members = new HashSet<>(hm.keySet());
Tuple max = new Tuple("a3", 3d);
Tuple min = new Tuple("a1", 1d);
ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline p = new ClusterPipeline(provider);
Response<Long> r1 = p.zadd("myset", hm);
Response<Long> r2 = p.zrank("myset", "a3");
Response<Long> r3 = p.zrevrank("myset", "a3");
Response<Long> r4 = p.zrem("myset", "a1");
Response<Long> r5 = p.zadd("myset", 1d, "a1");
Response<Long> r6 = p.zadd("myotherset", 2d, "a1", new ZAddParams().nx());
Response<Double> r7 = p.zaddIncr("myset", 3d, "a4", new ZAddParams().xx()); // Should not update
Response<List<String>> r8 = p.zrevrange("myset", 0, 0);
Response<List<Tuple>> r9 = p.zrevrangeWithScores("myset", 0, 0);
Response<String> r10 = p.zrandmember("myset");
Response<List<String>> r11 = p.zrandmember("myset", 2);
Response<List<Tuple>> r12 = p.zrandmemberWithScores("myset", 1);
Response<Double> r13 = p.zscore("myset", "a1");
Response<List<Double>> r14 = p.zmscore("myset", "a1", "a2");
Response<Tuple> r15 = p.zpopmax("myset");
Response<Tuple> r16 = p.zpopmin("myset");
Response<Long> r17 = p.zcount("myotherset", 2, 5);
Response<Long> r18 = p.zcount("myotherset", "(2", "5");
p.zadd("myset", hm, new ZAddParams().nx()); // return the elements that were popped
Response<List<Tuple>> r19 = p.zpopmax("myset", 2);
Response<List<Tuple>> r20 = p.zpopmin("myset", 1);
p.sync();
assertEquals(Long.valueOf(3), r1.get());
assertEquals(Long.valueOf(2), r2.get());
assertEquals(Long.valueOf(0), r3.get());
assertEquals(Long.valueOf(1), r4.get());
assertEquals(Long.valueOf(1), r5.get());
assertEquals(Long.valueOf(1), r6.get());
assertNull(r7.get());
assertTrue(r8.get().size() == 1 && r8.get().contains("a3"));
assertTrue(r9.get().size() == 1 && r9.get().contains(max));
assertTrue(members.contains(r10.get()));
assertTrue(members.containsAll(r11.get()));
assertEquals(1, r12.get().size());
assertEquals(Double.valueOf(1), r13.get());
assertTrue(hm.values().containsAll(r14.get()));
assertEquals(max, r15.get());
assertEquals(min, r16.get());
assertEquals(Long.valueOf(1), r17.get());
assertEquals(Long.valueOf(0), r18.get());
assertTrue(r19.get().size() == 2 && r19.get().contains(max));
assertTrue(r20.get().size() == 1 && r20.get().contains(min));
}
@Test
public void clusterPipelineHash() {
Map<String, String> hm = new HashMap<>();
hm.put("field2", "2");
hm.put("field3", "5");
Set<String> keys = new HashSet<>();
keys.add("field2");
List<String> vals = new ArrayList<>();
vals.add("3.5");
List<String> vals2 = new ArrayList<>();
vals2.add("hello");
vals2.add(null);
ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline p = new ClusterPipeline(provider);
Response<Long> r1 = p.hset("myhash", "field1", "hello");
Response<Long> r2 = p.hsetnx("myhash", "field1", "hello");
Response<String> r3 = p.hget("myhash", "field1");
Response<Long> r4 = p.hset("myotherhash", hm);
Response<String> r5 = p.hmset("mynewhash", hm);
p.hincrBy("mynewhash", "field2", 1);
Response<Double> r6 = p.hincrByFloat("mynewhash", "field2", 0.5);
Response<Long> r7 = p.hlen("myhash");
Response<Long> r8 = p.hdel("mynewhash", "field3");
Response<Boolean> r9 = p.hexists("mynewhash", "field3");
Response<Set<String>> r10 = p.hkeys("mynewhash");
Response<List<String>> r11 = p.hvals("mynewhash");
Response<List<String>> r12 = p.hmget("myhash", "field1", "field2");
Response<String> r13 = p.hrandfield("myotherhash");
Response<List<String>> r14 = p.hrandfield("myotherhash", 4);
Response<List<String>> r15 = p.hrandfield("myotherhash", -4);
Response<Long> r16 = p.hstrlen("myhash", "field1");
Response<List<Map.Entry<String, String>>> r17 = p.hrandfieldWithValues("myotherhash", 4);
Response<List<Map.Entry<String, String>>> r18 = p.hrandfieldWithValues("myotherhash", -4);
p.sync();
assertEquals(Long.valueOf(1), r1.get());
assertEquals(Long.valueOf(0), r2.get());
assertEquals("hello", r3.get());
assertEquals(Long.valueOf(2), r4.get());
assertEquals("OK", r5.get());
assertEquals(Double.valueOf(3.5), r6.get());
assertEquals(Long.valueOf(1), r7.get());
assertEquals(Long.valueOf(1), r8.get());
assertFalse(r9.get());
assertEquals(keys, r10.get());
assertEquals(vals, r11.get());
assertEquals(vals2, r12.get());
AssertUtil.assertCollectionContains(hm.keySet(), r13.get());
assertEquals(2, r14.get().size());
assertEquals(4, r15.get().size());
assertEquals(Long.valueOf(5), r16.get());
assertEquals(2, r17.get().size());
assertEquals(4, r18.get().size());
}
@Test
public void clusterPipelineGeo() {
Map<String, GeoCoordinate> hm = new HashMap<>();
hm.put("place1", new GeoCoordinate(2.1909389952632, 41.433791470673));
hm.put("place2", new GeoCoordinate(2.1873744593677, 41.406342043777));
List<String> hashValues = new ArrayList<>();
hashValues.add("sp3e9yg3kd0");
hashValues.add("sp3e9cbc3t0");
hashValues.add(null);
GeoRadiusParam params = new GeoRadiusParam().withCoord().withHash().withDist();
GeoRadiusParam params2 = new GeoRadiusParam().count(1, true);
GeoRadiusStoreParam storeParams = new GeoRadiusStoreParam().store("radius{#}");
ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline p = new ClusterPipeline(provider);
Response<Long> r1 = p.geoadd("barcelona", hm);
p.geoadd("barcelona{#}", new GeoAddParams().nx(), hm);
Response<Double> r2 = p.geodist("barcelona", "place1", "place2");
Response<Double> r3 = p.geodist("barcelona", "place1", "place2", GeoUnit.KM);
Response<List<String>> r4 = p.geohash("barcelona", "place1", "place2", "place3");
Response<List<GeoCoordinate>> r5 = p.geopos("barcelona", "place1", "place2");
Response<List<GeoRadiusResponse>> r6 = p.georadius("barcelona", 2.191, 41.433, 1000, GeoUnit.M);
Response<List<GeoRadiusResponse>> r7 = p.georadiusReadonly("barcelona", 2.191, 41.433, 1000, GeoUnit.M);
Response<List<GeoRadiusResponse>> r8 = p.georadius("barcelona", 2.191, 41.433, 1, GeoUnit.KM, params);
Response<List<GeoRadiusResponse>> r9 = p.georadiusReadonly("barcelona", 2.191, 41.433, 1, GeoUnit.KM, params);
Response<Long> r10 = p.georadiusStore("barcelona{#}", 2.191, 41.433, 1000, GeoUnit.M, params2, storeParams);
Response<List<String>> r11 = p.zrange("radius{#}", 0, -1);
Response<List<GeoRadiusResponse>> r12 = p.georadiusByMember("barcelona", "place1", 4, GeoUnit.KM);
Response<List<GeoRadiusResponse>> r13 = p.georadiusByMemberReadonly("barcelona", "place1", 4, GeoUnit.KM);
Response<List<GeoRadiusResponse>> r14 = p.georadiusByMember("barcelona", "place1", 4, GeoUnit.KM, params2);
Response<List<GeoRadiusResponse>> r15 = p.georadiusByMemberReadonly("barcelona", "place1", 4, GeoUnit.KM, params2);
Response<Long> r16 = p.georadiusByMemberStore("barcelona{#}", "place1", 4, GeoUnit.KM, params2, storeParams);
Response<List<String>> r17 = p.zrange("radius{#}", 0, -1);
p.sync();
assertEquals(Long.valueOf(2), r1.get());
assertEquals(Double.valueOf(3067.4157), r2.get());
assertEquals(Double.valueOf(3.0674), r3.get());
assertEquals(hashValues, r4.get());
assertThat(r5.get(), contains(
GeoCoordinateMatcher.atCoordinates(2.19093829393386841, 41.43379028184083523),
GeoCoordinateMatcher.atCoordinates(2.18737632036209106, 41.40634178640635099))
);
assertTrue(r6.get().size() == 1 && r6.get().get(0).getMemberByString().equals("place1"));
assertTrue(r7.get().size() == 1 && r7.get().get(0).getMemberByString().equals("place1"));
GeoRadiusResponse expectedResponse = new GeoRadiusResponse("place1".getBytes());
expectedResponse.setCoordinate(new GeoCoordinate(2.19093829393386841, 41.43379028184083523));
expectedResponse.setDistance(0.0881);
expectedResponse.setRawScore(3471609698139488L);
assertThat(r8.get().get(0), GeoRadiusResponseMatcher.ofResponse(expectedResponse));
assertThat(r9.get().get(0), GeoRadiusResponseMatcher.ofResponse(expectedResponse));
assertEquals(Long.valueOf(1), r10.get());
assertTrue(r11.get().size() == 1 && r11.get().contains("place1"));
assertTrue(r12.get().size() == 2 && r12.get().get(0).getMemberByString().equals("place2"));
assertTrue(r13.get().size() == 2 && r13.get().get(0).getMemberByString().equals("place2"));
assertTrue(r14.get().size() == 1 && r14.get().get(0).getMemberByString().equals("place2"));
assertTrue(r15.get().size() == 1 && r15.get().get(0).getMemberByString().equals("place2"));
assertEquals(Long.valueOf(1), r16.get());
assertTrue(r17.get().size() == 1 && r17.get().contains("place2"));
}
@Test
public void clusterPipelineHyperLogLog() {
ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline p = new ClusterPipeline(provider);
Response<Long> r1 = p.pfadd("{hll}_1", "foo", "bar", "zap", "a");
Response<Long> r2 = p.pfadd("{hll}_2", "foo", "bar", "zap");
Response<Long> r3 = p.pfcount("{hll}_1", "{hll}_2");
Response<String> r4 = p.pfmerge("{hll}3", "{hll}_1", "{hll}_2");
Response<Long> r5 = p.pfcount("{hll}3");
p.sync();
assertEquals(Long.valueOf(1), r1.get());
assertEquals(Long.valueOf(1), r2.get());
assertEquals(Long.valueOf(4), r3.get());
assertEquals("OK", r4.get());
assertEquals(Long.valueOf(4), r5.get());
}
@Test
public void clusterPipelineStringsAndBits() {
List<Long> fieldRes = new ArrayList<>();
fieldRes.add(1L);
fieldRes.add(0L);
ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline p = new ClusterPipeline(provider);
Response<String> r1 = p.set("{mykey}", "foobar"); // foobar = 66 6f 6f 62 61 72
p.set("my{otherkey}", "foo");
Response<String> r2 = p.substr("{mykey}", 0, 2);
Response<Long> r3 = p.strlen("{mykey}");
Response<Long> r4 = p.bitcount("my{otherkey}");
Response<Long> r5 = p.bitcount("my{otherkey}", 1, 1);
Response<Long> r6 = p.bitpos("{mykey}", true);
Response<Long> r7 = p.bitpos("{mykey}", false, new BitPosParams(1, 2));
Response<List<Long>> r8 = p.bitfield("mynew{key}", "INCRBY", "i5", "100", "1", "GET", "u4", "0");
Response<List<Long>> r9 = p.bitfieldReadonly("hello", "GET", "i8", "17");
p.set("myother{mykey}", "abcdef");
Response<Long> r10 = p.bitop(BitOP.AND, "dest{mykey}", "{mykey}", "myother{mykey}");
Response<String> r11 = p.get("dest{mykey}");
Response<Boolean> r12 = p.setbit("my{otherkey}", 7, true);
Response<Boolean> r13 = p.getbit("my{otherkey}", 7);
p.sync();
assertEquals("OK", r1.get());
assertEquals("foo", r2.get());
assertEquals(Long.valueOf(6), r3.get());
assertEquals(Long.valueOf(16), r4.get());
assertEquals(Long.valueOf(6), r5.get());
assertEquals(Long.valueOf(1), r6.get());
assertEquals(Long.valueOf(8), r7.get());
assertEquals(fieldRes, r8.get());
assertEquals(fieldRes.subList(1, 2), r9.get());
assertEquals(Long.valueOf(6), r10.get());
assertEquals("`bc`ab", r11.get());
assertFalse(r12.get());
assertTrue(r13.get());
}
@Test
public void clusterPipelineStream() {
Map<String, String> hm = new HashMap<>();
hm.put("one", "one");
hm.put("two", "two");
hm.put("three", "three");
StreamEntryID streamId1 = new StreamEntryID("1638277876711-0");
StreamEntryID streamId2 = new StreamEntryID("1638277959731-0");
ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG);
ClusterPipeline p = new ClusterPipeline(provider);
Response<StreamEntryID> r1 = p.xadd("mystream", streamId1, hm);
Response<StreamEntryID> r2 = p.xadd("mystream", new XAddParams().id(new StreamEntryID("1638277959731-0")).maxLen(2).approximateTrimming(), hm);
Response<Long> r3 = p.xlen("mystream");
Response<List<StreamEntry>> r4 = p.xrange("mystream", streamId1, streamId2);
Response<List<StreamEntry>> r5 = p.xrange("mystream", streamId1, streamId2, 1);
Response<List<StreamEntry>> r6 = p.xrevrange("mystream", streamId2, streamId1);
Response<List<StreamEntry>> r7 = p.xrevrange("mystream", streamId2, streamId1, 1);
Response<String> r8 = p.xgroupCreate("mystream", "group", streamId1, false);
Response<String> r9 = p.xgroupSetID("mystream", "group", streamId2);
// More stream commands are missing
p.sync();
assertEquals(streamId1, r1.get());
assertEquals(streamId2, r2.get());
assertEquals(Long.valueOf(2), r3.get());
assertTrue(r4.get().size() == 2
&& r4.get().get(0).getID().compareTo(streamId1) == 0
&& r4.get().get(1).getID().compareTo(streamId2) == 0);
assertTrue(r5.get().size() == 1 && r5.get().get(0).getID().compareTo(streamId1) == 0);
assertTrue(r6.get().size() == 2
&& r6.get().get(1).getID().compareTo(streamId1) == 0
&& r6.get().get(0).getID().compareTo(streamId2) == 0);
assertTrue(r7.get().size() == 1 && r7.get().get(0).getID().compareTo(streamId2) == 0);
assertEquals("OK", r8.get());
assertEquals("OK", r9.get());
}
@Test
public void testEval() {
String script = "return 'success!'";
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
Response<Object> result = p.eval(script);
p.sync();
assertEquals("success!", result.get());
}
}
@Test
public void testEvalWithBinary() {
String script = "return 'success!'";
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
Response<Object> result = p.eval(SafeEncoder.encode(script));
p.sync();
assertArrayEquals(SafeEncoder.encode("success!"), (byte[]) result.get());
}
}
@Test
public void testEvalKeyAndArg() {
String key = "test";
String arg = "3";
String script = "redis.call('INCRBY', KEYS[1], ARGV[1]) redis.call('INCRBY', KEYS[1], ARGV[1])";
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
p.set(key, "0");
Response<Object> result0 = p.eval(script, Collections.singletonList(key),
Collections.singletonList(arg));
p.incr(key);
Response<Object> result1 = p.eval(script, Collections.singletonList(key),
Collections.singletonList(arg));
Response<String> result2 = p.get(key);
p.sync();
assertNull(result0.get());
assertNull(result1.get());
assertEquals("13", result2.get());
}
}
@Test
public void testEvalKeyAndArgWithBinary() {
// binary
byte[] bKey = SafeEncoder.encode("test");
byte[] bArg = SafeEncoder.encode("3");
byte[] bScript = SafeEncoder.encode("redis.call('INCRBY', KEYS[1], ARGV[1]) redis.call('INCRBY', KEYS[1], ARGV[1])");
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline bP = new ClusterPipeline(provider);
bP.set(bKey, SafeEncoder.encode("0"));
Response<Object> bResult0 = bP.eval(bScript, Collections.singletonList(bKey),
Collections.singletonList(bArg));
bP.incr(bKey);
Response<Object> bResult1 = bP.eval(bScript, Collections.singletonList(bKey),
Collections.singletonList(bArg));
Response<byte[]> bResult2 = bP.get(bKey);
bP.sync();
assertNull(bResult0.get());
assertNull(bResult1.get());
assertArrayEquals(SafeEncoder.encode("13"), bResult2.get());
}
}
@Test
public void testEvalNestedLists() {
String script = "return { {KEYS[1]} , {2} }";
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
Response<Object> result = p.eval(script, 1, "key1");
p.sync();
List<?> results = (List<?>) result.get();
MatcherAssert.assertThat((List<String>) results.get(0), Matchers.hasItem("key1"));
MatcherAssert.assertThat((List<Long>) results.get(1), Matchers.hasItem(2L));
}
}
@Test
public void testEvalNestedListsWithBinary() {
byte[] bScript = SafeEncoder.encode("return { {KEYS[1]} , {2} }");
byte[] bKey = SafeEncoder.encode("key1");
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
Response<Object> result = p.eval(bScript, 1, bKey);
p.sync();
List<?> results = (List<?>) result.get();
MatcherAssert.assertThat((List<byte[]>) results.get(0), Matchers.hasItem(bKey));
MatcherAssert.assertThat((List<Long>) results.get(1), Matchers.hasItem(2L));
}
}
@Test
public void testEvalsha() {
String script = "return 'success!'";
String sha1;
try (JedisCluster jc = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
sha1 = jc.scriptLoad(script, "sampleKey");
assertTrue(jc.scriptExists(sha1, "sampleKey"));
}
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
Response<Object> result = p.evalsha(sha1, 1, "sampleKey");
p.sync();
assertEquals("success!", result.get());
}
}
@Test
public void testEvalshaKeyAndArg() {
String key = "test";
String arg = "3";
String script = "redis.call('INCRBY', KEYS[1], ARGV[1]) redis.call('INCRBY', KEYS[1], ARGV[1])";
String sha1;
try (JedisCluster jc = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
sha1 = jc.scriptLoad(script, key);
assertTrue(jc.scriptExists(sha1, key));
}
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
p.set(key, "0");
Response<Object> result0 = p.evalsha(sha1, Arrays.asList(key), Arrays.asList(arg));
p.incr(key);
Response<Object> result1 = p.evalsha(sha1, Arrays.asList(key), Arrays.asList(arg));
Response<String> result2 = p.get(key);
p.sync();
assertNull(result0.get());
assertNull(result1.get());
assertEquals("13", result2.get());
}
}
@Test
public void testEvalshaKeyAndArgWithBinary() {
byte[] bKey = SafeEncoder.encode("test");
byte[] bArg = SafeEncoder.encode("3");
String script = "redis.call('INCRBY', KEYS[1], ARGV[1]) redis.call('INCRBY', KEYS[1], ARGV[1])";
byte[] bScript = SafeEncoder.encode(script);
byte[] bSha1;
try (JedisCluster jc = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
bSha1 = jc.scriptLoad(bScript, bKey);
assertTrue(jc.scriptExists(bSha1, bKey));
}
try (ClusterConnectionProvider provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline p = new ClusterPipeline(provider);
p.set(bKey, SafeEncoder.encode("0"));
Response<Object> result0 = p.evalsha(bSha1, Arrays.asList(bKey), Arrays.asList(bArg));
p.incr(bKey);
Response<Object> result1 = p.evalsha(bSha1, Arrays.asList(bKey), Arrays.asList(bArg));
Response<byte[]> result2 = p.get(bKey);
p.sync();
assertNull(result0.get());
assertNull(result1.get());
assertArrayEquals(SafeEncoder.encode("13"), result2.get());
}
}
@Test
public void spublishInPipeline() {
try (JedisCluster jedis = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
ClusterPipeline pipelined = jedis.pipelined();
Response<Long> p1 = pipelined.publish("foo", "bar");
Response<Long> p2 = pipelined.publish("foo".getBytes(), "bar".getBytes());
pipelined.sync();
assertEquals(0, p1.get().longValue());
assertEquals(0, p2.get().longValue());
}
}
@Test
public void simple() { // TODO: move into 'redis.clients.jedis.commands.unified.cluster' package
try (JedisCluster jedis = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
final int count = 10;
int totalCount = 0;
for (int i = 0; i < count; i++) {
jedis.set("foo" + i, "bar" + i);
}
totalCount += count;
for (int i = 0; i < count; i++) {
jedis.rpush("foobar" + i, "foo" + i, "bar" + i);
}
totalCount += count;
List<Response<?>> responses = new ArrayList<>(totalCount);
List<Object> expected = new ArrayList<>(totalCount);
try (ClusterPipeline pipeline = jedis.pipelined()) {
for (int i = 0; i < count; i++) {
responses.add(pipeline.get("foo" + i));
expected.add("bar" + i);
}
for (int i = 0; i < count; i++) {
responses.add(pipeline.lrange("foobar" + i, 0, -1));
expected.add(Arrays.asList("foo" + i, "bar" + i));
}
}
for (int i = 0; i < totalCount; i++) {
assertEquals(expected.get(i), responses.get(i).get());
}
}
}
@Test
public void transaction() {
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
assertThrows(UnsupportedOperationException.class, () -> cluster.multi());
}
}
@Test
@Timeout(10)
public void multiple() {
final int maxTotal = 100;
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
poolConfig.setMaxTotal(maxTotal);
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG, 5, poolConfig)) {
for (int i = 0; i < maxTotal; i++) {
assertThreadsCount();
String s = Integer.toString(i);
try (ClusterPipeline pipeline = cluster.pipelined()) {
pipeline.set(s, s);
pipeline.sync();
}
assertThreadsCount();
}
}
}
@Test
public void testPipelineKeysAtSameNode() {
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
// test simple key
cluster.set("foo", "bar");
try (ClusterPipeline pipeline = cluster.pipelined()) {
Response<String> foo = pipeline.get("foo");
pipeline.sync();
assertEquals("bar", foo.get());
}
// test multi key but at same node
int cnt = 3;
String prefix = "{foo}:";
for (int i = 0; i < cnt; i++) {
String key = prefix + i;
cluster.set(key, String.valueOf(i));
}
try (ClusterPipeline pipeline = cluster.pipelined()) {
List<Response<String>> results = new ArrayList<>();
for (int i = 0; i < cnt; i++) {
String key = prefix + i;
results.add(pipeline.get(key));
}
Response<Object> foo = pipeline.eval("return redis.call('get', KEYS[1])",
Collections.singletonList("foo"), Collections.emptyList());
pipeline.sync();
int idx = 0;
for (Response<String> res : results) {
assertEquals(String.valueOf(idx), res.get());
idx++;
}
assertEquals("bar", String.valueOf(foo.get()));
}
}
}
private static void assertThreadsCount() {
// Get the root thread group
final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent();
// Create a buffer to store the thread information
final Thread[] threads = new Thread[rootGroup.activeCount()];
// Enumerate all threads into the buffer
rootGroup.enumerate(threads);
// Assert information about threads
final int count = (int) Arrays.stream(threads)
.filter(thread -> thread != null && thread.getName() != null
&& thread.getName().startsWith("pool-"))
.count();
MatcherAssert.assertThat(count, Matchers.lessThanOrEqualTo(20));
}
}