JedisClusterTest.java
package redis.clients.jedis;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static redis.clients.jedis.Protocol.CLUSTER_HASHSLOTS;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import io.redis.test.annotations.SinceRedisVersion;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import redis.clients.jedis.args.ClusterResetType;
import redis.clients.jedis.exceptions.*;
import redis.clients.jedis.util.ClientKillerUtil;
import redis.clients.jedis.util.JedisClusterTestUtil;
import redis.clients.jedis.util.JedisClusterCRC16;
import redis.clients.jedis.util.Pool;
public class JedisClusterTest extends JedisClusterTestBase {
private static final int DEFAULT_TIMEOUT = 2000; //sec
private static final int DEFAULT_REDIRECTIONS = 5;
private static final ConnectionPoolConfig DEFAULT_POOL_CONFIG = new ConnectionPoolConfig();
private static final DefaultJedisClientConfig DEFAULT_CLIENT_CONFIG
= DefaultJedisClientConfig.builder().password("cluster").build();
@Test
public void testThrowMovedException() {
assertThrows(JedisMovedDataException.class, ()->node1.set("foo", "bar"));
}
@Test
public void testMovedExceptionParameters() {
try {
node1.set("foo", "bar");
} catch (JedisMovedDataException jme) {
assertEquals(12182, jme.getSlot());
assertEquals(new HostAndPort("127.0.0.1", 7381), jme.getTargetNode());
return;
}
fail();
}
@Test
public void testThrowAskException() {
int keySlot = JedisClusterCRC16.getSlot("test");
String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes());
node2.clusterSetSlotMigrating(keySlot, node3Id);
assertThrows(JedisAskDataException.class, ()->node2.get("test"));
}
@Test
public void testDiscoverNodesAutomatically() {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
assertEquals(3, jc.getClusterNodes().size());
}
try (JedisCluster jc2 = new JedisCluster(new HostAndPort("127.0.0.1", 7379), DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT, DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
assertEquals(3, jc2.getClusterNodes().size());
}
}
@Test
public void testDiscoverNodesAutomaticallyWithSocketConfig() {
HostAndPort hp = new HostAndPort("127.0.0.1", 7379);
try (JedisCluster jc = new JedisCluster(hp, DEFAULT_CLIENT_CONFIG, DEFAULT_REDIRECTIONS,
DEFAULT_POOL_CONFIG)) {
assertEquals(3, jc.getClusterNodes().size());
}
try (JedisCluster jc = new JedisCluster(Collections.singleton(hp), DEFAULT_CLIENT_CONFIG,
DEFAULT_REDIRECTIONS, DEFAULT_POOL_CONFIG)) {
assertEquals(3, jc.getClusterNodes().size());
}
}
@Test
public void testSetClientName() {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
String clientName = "myAppName";
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", clientName, DEFAULT_POOL_CONFIG)) {
for (Pool<Connection> pool : jc.getClusterNodes().values()) {
try (Jedis jedis = new Jedis(pool.getResource())) {
assertEquals(clientName, jedis.clientGetname());
}
}
}
}
@Test
public void testSetClientNameWithConfig() {
HostAndPort hp = new HostAndPort("127.0.0.1", 7379);
String clientName = "config-pattern-app";
try (JedisCluster jc = new JedisCluster(Collections.singleton(hp),
DefaultJedisClientConfig.builder().password("cluster").clientName(clientName).build(),
DEFAULT_REDIRECTIONS, DEFAULT_POOL_CONFIG)) {
jc.getClusterNodes().values().forEach(pool -> {
try (Jedis jedis = new Jedis(pool.getResource())) {
assertEquals(clientName, jedis.clientGetname());
}
});
}
}
@Test
public void testCalculateConnectionPerSlot() {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
jc.set("foo", "bar");
jc.set("test", "test");
assertEquals("bar", node3.get("foo"));
assertEquals("test", node2.get("test"));
}
try (JedisCluster jc2 = new JedisCluster(new HostAndPort("127.0.0.1", 7379), DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT, DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
jc2.set("foo", "bar");
jc2.set("test", "test");
assertEquals("bar", node3.get("foo"));
assertEquals("test", node2.get("test"));
}
}
@Test
public void testReadonlyAndReadwrite() throws Exception {
node1.clusterMeet(LOCAL_IP, nodeInfoSlave2.getPort());
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, nodeSlave2);
for (String nodeInfo : node2.clusterNodes().split("\n")) {
if (nodeInfo.contains("myself")) {
nodeSlave2.clusterReplicate(nodeInfo.split(" ")[0]);
break;
}
}
try {
nodeSlave2.get("test");
fail();
} catch (JedisMovedDataException e) {
}
nodeSlave2.readonly();
nodeSlave2.get("test");
nodeSlave2.readwrite();
try {
nodeSlave2.get("test");
fail();
} catch (JedisMovedDataException e) {
}
nodeSlave2.clusterReset(ClusterResetType.SOFT);
nodeSlave2.flushDB();
}
@Test
public void testReadFromReplicas() throws Exception {
node1.clusterMeet(LOCAL_IP, nodeInfoSlave2.getPort());
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, nodeSlave2);
for (String nodeInfo : node2.clusterNodes().split("\n")) {
if (nodeInfo.contains("myself")) {
nodeSlave2.clusterReplicate(nodeInfo.split(" ")[0]);
break;
}
}
DefaultJedisClientConfig READ_REPLICAS_CLIENT_CONFIG = DefaultJedisClientConfig.builder()
.password("cluster").readOnlyForRedisClusterReplicas().build();
ClusterCommandObjects commandObjects = new ClusterCommandObjects();
try (JedisCluster jedisCluster = new JedisCluster(nodeInfo1, READ_REPLICAS_CLIENT_CONFIG,
DEFAULT_REDIRECTIONS, DEFAULT_POOL_CONFIG)) {
assertEquals("OK", jedisCluster.set("test", "read-from-replicas"));
assertEquals("read-from-replicas", jedisCluster.executeCommandToReplica(commandObjects.get("test")));
// TODO: ensure data being served from replica node(s)
}
nodeSlave2.clusterReset(ClusterResetType.SOFT);
nodeSlave2.flushDB();
}
/**
* slot->nodes 15363 node3 e
*/
@Test
public void testMigrate() {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(nodeInfo1);
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes());
String node2Id = JedisClusterTestUtil.getNodeId(node2.clusterNodes());
node3.clusterSetSlotMigrating(15363, node2Id);
node2.clusterSetSlotImporting(15363, node3Id);
try {
node2.set("e", "e");
} catch (JedisMovedDataException jme) {
assertEquals(15363, jme.getSlot());
assertEquals(new HostAndPort(LOCAL_IP, nodeInfo3.getPort()), jme.getTargetNode());
}
try {
node3.set("e", "e");
} catch (JedisAskDataException jae) {
assertEquals(15363, jae.getSlot());
assertEquals(new HostAndPort(LOCAL_IP, nodeInfo2.getPort()), jae.getTargetNode());
}
jc.set("e", "e");
try {
node2.get("e");
} catch (JedisMovedDataException jme) {
assertEquals(15363, jme.getSlot());
assertEquals(new HostAndPort(LOCAL_IP, nodeInfo3.getPort()), jme.getTargetNode());
}
try {
node3.get("e");
} catch (JedisAskDataException jae) {
assertEquals(15363, jae.getSlot());
assertEquals(new HostAndPort(LOCAL_IP, nodeInfo2.getPort()), jae.getTargetNode());
}
assertEquals("e", jc.get("e"));
node2.clusterSetSlotNode(15363, node2Id);
node3.clusterSetSlotNode(15363, node2Id);
// assertEquals("e", jc.get("e"));
assertEquals("e", node2.get("e"));
// assertEquals("e", node3.get("e"));
}
}
@Test
public void testMigrateToNewNode() throws InterruptedException {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(nodeInfo1);
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
node3.clusterMeet(LOCAL_IP, nodeInfo4.getPort());
String node3Id = JedisClusterTestUtil.getNodeId(node3.clusterNodes());
String node4Id = JedisClusterTestUtil.getNodeId(node4.clusterNodes());
JedisClusterTestUtil.waitForClusterReady(node4);
node3.clusterSetSlotMigrating(15363, node4Id);
node4.clusterSetSlotImporting(15363, node3Id);
try {
node4.set("e", "e");
} catch (JedisMovedDataException jme) {
assertEquals(15363, jme.getSlot());
assertEquals(new HostAndPort(LOCAL_IP, nodeInfo3.getPort()), jme.getTargetNode());
}
try {
node3.set("e", "e");
} catch (JedisAskDataException jae) {
assertEquals(15363, jae.getSlot());
assertEquals(new HostAndPort(LOCAL_IP, nodeInfo4.getPort()), jae.getTargetNode());
}
try {
node3.set("e", "e");
} catch (JedisAskDataException jae) {
assertEquals(15363, jae.getSlot());
assertEquals(new HostAndPort(LOCAL_IP, nodeInfo4.getPort()), jae.getTargetNode());
}
jc.set("e", "e");
try {
node4.get("e");
} catch (JedisMovedDataException jme) {
assertEquals(15363, jme.getSlot());
assertEquals(new HostAndPort(LOCAL_IP, nodeInfo3.getPort()), jme.getTargetNode());
}
try {
node3.get("e");
} catch (JedisAskDataException jae) {
assertEquals(15363, jae.getSlot());
assertEquals(new HostAndPort(LOCAL_IP, nodeInfo4.getPort()), jae.getTargetNode());
}
assertEquals("e", jc.get("e"));
node4.clusterSetSlotNode(15363, node4Id);
node3.clusterSetSlotNode(15363, node4Id);
// assertEquals("e", jc.get("e"));
assertEquals("e", node4.get("e"));
// assertEquals("e", node3.get("e"));
}
}
@Test
public void testRecalculateSlotsWhenMoved() throws InterruptedException {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
int slot51 = JedisClusterCRC16.getSlot("51");
node2.clusterDelSlots(slot51);
node3.clusterDelSlots(slot51);
node3.clusterAddSlots(slot51);
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3);
jc.set("51", "foo");
assertEquals("foo", jc.get("51"));
}
}
@Test
public void testAskResponse() {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
int slot51 = JedisClusterCRC16.getSlot("51");
node3.clusterSetSlotImporting(slot51, JedisClusterTestUtil.getNodeId(node2.clusterNodes()));
node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes()));
jc.set("51", "foo");
assertEquals("foo", jc.get("51"));
}
}
@Test
public void testAskResponseWithConfig() {
HostAndPort hp = new HostAndPort("127.0.0.1", 7379);
try (JedisCluster jc = new JedisCluster(Collections.singleton(hp), DEFAULT_CLIENT_CONFIG,
DEFAULT_REDIRECTIONS, DEFAULT_POOL_CONFIG)) {
int slot51 = JedisClusterCRC16.getSlot("51");
node3.clusterSetSlotImporting(slot51, JedisClusterTestUtil.getNodeId(node2.clusterNodes()));
node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes()));
jc.set("51", "foo");
assertEquals("foo", jc.get("51"));
}
}
// @Test(expected = JedisClusterMaxAttemptsException.class)
@Test
public void testRedisClusterMaxRedirections() {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
assertThrows(JedisClusterOperationException.class,()-> {
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
int slot51 = JedisClusterCRC16.getSlot("51");
// This will cause an infinite redirection loop
node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes()));
jc.set("51", "foo");
}
});
}
// @Test(expected = JedisClusterMaxAttemptsException.class)
@Test
public void testRedisClusterMaxRedirectionsWithConfig() {
HostAndPort hp = new HostAndPort("127.0.0.1", 7379);
try (JedisCluster jc = new JedisCluster(Collections.singleton(hp), DEFAULT_CLIENT_CONFIG,
DEFAULT_REDIRECTIONS, DEFAULT_POOL_CONFIG)) {
int slot51 = JedisClusterCRC16.getSlot("51");
// This will cause an infinite redirection loop
node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes()));
assertThrows(JedisClusterOperationException.class, ()->jc.set("51", "foo"));
}
}
@Test
public void testClusterForgetNode() {
// at first, join node4 to cluster
node1.clusterMeet("127.0.0.1", nodeInfo4.getPort());
node2.clusterMeet("127.0.0.1", nodeInfo4.getPort());
node3.clusterMeet("127.0.0.1", nodeInfo4.getPort());
String node4Id = JedisClusterTestUtil.getNodeId(node4.clusterNodes());
JedisClusterTestUtil.assertNodeIsKnown(node1, node4Id, 1000);
JedisClusterTestUtil.assertNodeIsKnown(node2, node4Id, 1000);
JedisClusterTestUtil.assertNodeIsKnown(node3, node4Id, 1000);
assertNodeHandshakeEnded(node1, 1000);
assertNodeHandshakeEnded(node2, 1000);
assertNodeHandshakeEnded(node3, 1000);
assertEquals(4, node1.clusterNodes().split("\n").length);
assertEquals(4, node2.clusterNodes().split("\n").length);
assertEquals(4, node3.clusterNodes().split("\n").length);
// do cluster forget
node1.clusterForget(node4Id);
node2.clusterForget(node4Id);
node3.clusterForget(node4Id);
JedisClusterTestUtil.assertNodeIsUnknown(node1, node4Id, 1000);
JedisClusterTestUtil.assertNodeIsUnknown(node2, node4Id, 1000);
JedisClusterTestUtil.assertNodeIsUnknown(node3, node4Id, 1000);
assertEquals(3, node1.clusterNodes().split("\n").length);
assertEquals(3, node2.clusterNodes().split("\n").length);
assertEquals(3, node3.clusterNodes().split("\n").length);
}
@Test
public void testClusterFlushSlots() {
String slotRange = getNodeServingSlotRange(node1.clusterNodes());
assertNotNull(slotRange);
try {
node1.clusterFlushSlots();
assertNull(getNodeServingSlotRange(node1.clusterNodes()));
} finally {
// rollback
String[] rangeInfo = slotRange.split("-");
int lower = Integer.parseInt(rangeInfo[0]);
int upper = Integer.parseInt(rangeInfo[1]);
int[] node1Slots = new int[upper - lower + 1];
for (int i = 0; lower <= upper;) {
node1Slots[i++] = lower++;
}
node1.clusterAddSlots(node1Slots);
}
}
@Test
public void testClusterCountKeysInSlot() {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort(nodeInfo1.getHost(), nodeInfo1.getPort()));
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
int count = 5;
for (int index = 0; index < count; index++) {
jc.set("foo{bar}" + index, "hello");
}
int slot = JedisClusterCRC16.getSlot("foo{bar}");
assertEquals(count, node1.clusterCountKeysInSlot(slot));
}
}
@Test
public void testStableSlotWhenMigratingNodeOrImportingNodeIsNotSpecified()
throws InterruptedException {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort(nodeInfo1.getHost(), nodeInfo1.getPort()));
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
int slot51 = JedisClusterCRC16.getSlot("51");
jc.set("51", "foo");
// node2 is responsible of taking care of slot51 (7186)
node3.clusterSetSlotImporting(slot51, JedisClusterTestUtil.getNodeId(node2.clusterNodes()));
assertEquals("foo", jc.get("51"));
node3.clusterSetSlotStable(slot51);
assertEquals("foo", jc.get("51"));
node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes()));
// assertEquals("foo", jc.get("51")); // it leads Max Redirections
node2.clusterSetSlotStable(slot51);
assertEquals("foo", jc.get("51"));
}
}
@Test
public void testIfPoolConfigAppliesToClusterPools() {
GenericObjectPoolConfig<Connection> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(0);
config.setMaxWait(Duration.ofMillis(DEFAULT_TIMEOUT));
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", config)) {
assertThrows(JedisException.class, ()->jc.set("52", "poolTestValue"));
}
}
@Test
public void testCloseable() throws IOException {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort(nodeInfo1.getHost(), nodeInfo1.getPort()));
JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG);
jc.set("51", "foo");
jc.close();
assertEquals(0, jc.getClusterNodes().size());
}
@Test
public void testCloseableWithConfig() {
HostAndPort hp = nodeInfo1;
try (JedisCluster jc = new JedisCluster(hp, DEFAULT_CLIENT_CONFIG, DEFAULT_REDIRECTIONS,
DEFAULT_POOL_CONFIG)) {
jc.set("51", "foo");
jc.close();
assertEquals(0, jc.getClusterNodes().size());
}
}
@Test
public void testJedisClusterTimeout() {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort(nodeInfo1.getHost(), nodeInfo1.getPort()));
try (JedisCluster jc = new JedisCluster(jedisClusterNode, 4000, 4000, DEFAULT_REDIRECTIONS,
"cluster", DEFAULT_POOL_CONFIG)) {
for (Pool<Connection> pool : jc.getClusterNodes().values()) {
try (Connection conn = pool.getResource()) {
assertEquals(4000, conn.getSoTimeout());
}
}
}
}
@Test
public void testJedisClusterTimeoutWithConfig() {
HostAndPort hp = nodeInfo1;
try (JedisCluster jc = new JedisCluster(hp, DefaultJedisClientConfig.builder()
.connectionTimeoutMillis(4000).socketTimeoutMillis(4000).password("cluster").build(),
DEFAULT_REDIRECTIONS, DEFAULT_POOL_CONFIG)) {
jc.getClusterNodes().values().forEach(pool -> {
try (Connection conn = pool.getResource()) {
assertEquals(4000, conn.getSoTimeout());
}
});
}
}
@Test
public void testJedisClusterRunsWithMultithreaded() throws InterruptedException,
ExecutionException, IOException {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
final JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG);
jc.set("foo", "bar");
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 100, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < 50; i++) {
executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
// FIXME : invalidate slot cache from JedisCluster to test
// random connection also does work
return jc.get("foo");
}
});
}
for (Future<String> future : futures) {
String value = future.get();
assertEquals("bar", value);
}
jc.close();
}
@Test
@Timeout(value = DEFAULT_TIMEOUT * 2, unit = TimeUnit.MILLISECONDS)
public void testReturnConnectionOnJedisConnectionException() throws InterruptedException {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
ConnectionPoolConfig config = new ConnectionPoolConfig();
config.setMaxTotal(1);
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", config)) {
try (Connection c = jc.getClusterNodes().get("127.0.0.1:7380").getResource()) {
Jedis j = new Jedis(c);
ClientKillerUtil.tagClient(j, "DEAD");
ClientKillerUtil.killClient(j, "DEAD");
}
jc.get("test");
}
}
@Test
@Timeout(value = DEFAULT_TIMEOUT, unit = TimeUnit.MILLISECONDS)
public void testReturnConnectionOnRedirection() {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
ConnectionPoolConfig config = new ConnectionPoolConfig();
config.setMaxTotal(1);
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", config)) {
// This will cause an infinite redirection between node 2 and 3
node3.clusterSetSlotMigrating(15363, JedisClusterTestUtil.getNodeId(node2.clusterNodes()));
assertThrows(JedisClusterOperationException.class, ()->jc.get("e"));
}
}
@Test
public void testLocalhostNodeNotAddedWhen127Present() {
HostAndPort localhost = new HostAndPort("localhost", 7379);
Set<HostAndPort> jedisClusterNode = new HashSet<>();
// cluster node is defined as 127.0.0.1; adding localhost should work,
// but shouldn't show up.
jedisClusterNode.add(localhost);
ConnectionPoolConfig config = new ConnectionPoolConfig();
config.setMaxTotal(1);
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", config)) {
Map<String, ?> clusterNodes = jc.getClusterNodes();
assertEquals(3, clusterNodes.size());
assertFalse(clusterNodes.containsKey(JedisClusterInfoCache.getNodeKey(localhost)));
}
}
@Test
public void testInvalidStartNodeNotAdded() {
HostAndPort invalidHost = new HostAndPort("not-a-real-host", 7379);
Set<HostAndPort> jedisClusterNode = new LinkedHashSet<HostAndPort>();
jedisClusterNode.add(invalidHost);
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
ConnectionPoolConfig config = new ConnectionPoolConfig();
config.setMaxTotal(1);
try (JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,
DEFAULT_REDIRECTIONS, "cluster", config)) {
Map<String, ?> clusterNodes = jc.getClusterNodes();
assertEquals(3, clusterNodes.size());
assertFalse(clusterNodes.containsKey(JedisClusterInfoCache.getNodeKey(invalidHost)));
}
}
@Test
@SinceRedisVersion("7.0.0")
public void clusterLinks2() {
Set<String> mapKeys = new HashSet<>(Arrays.asList("direction", "node", "create-time", "events",
"send-buffer-allocated", "send-buffer-used"));
List<Map<String, Object>> links = node1.clusterLinks();
assertNotNull(links);
assertTrue(links.size() >= 3);
for (Map<String, Object> link : links) {
assertEquals(6, link.size());
assertEquals(mapKeys, link.keySet());
}
}
@Test
public void clusterRefreshNodes() throws Exception {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(nodeInfo1);
jedisClusterNode.add(nodeInfo2);
jedisClusterNode.add(nodeInfo3);
try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT, DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
assertEquals(3, cluster.getClusterNodes().size());
cleanUp(); // cleanup and add node4
// at first, join node4 to cluster
node1.clusterMeet(LOCAL_IP, nodeInfo2.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo3.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo4.getPort());
// split available slots across the three nodes
int slotsPerNode = CLUSTER_HASHSLOTS / 4;
int[] node1Slots = new int[slotsPerNode];
int[] node2Slots = new int[slotsPerNode];
int[] node3Slots = new int[slotsPerNode];
int[] node4Slots = new int[slotsPerNode];
for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0, slot4 = 0; i < CLUSTER_HASHSLOTS; i++) {
if (i < slotsPerNode) {
node1Slots[slot1++] = i;
} else if (i >= slotsPerNode && i < slotsPerNode*2) {
node2Slots[slot2++] = i;
} else if (i >= slotsPerNode*2 && i < slotsPerNode*3) {
node3Slots[slot3++] = i;
} else {
node4Slots[slot4++] = i;
}
}
node1.clusterAddSlots(node1Slots);
node2.clusterAddSlots(node2Slots);
node3.clusterAddSlots(node3Slots);
node4.clusterAddSlots(node4Slots);
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, node4);
// cluster.set("key", "value"); will get JedisMovedDataException and renewSlotCache
cluster.set("key", "value");
assertEquals(4, cluster.getClusterNodes().size());
String nodeKey4 = LOCAL_IP + ":" + nodeInfo4.getPort();
assertTrue(cluster.getClusterNodes().keySet().contains(nodeKey4));
// make 4 nodes to 3 nodes
cleanUp();
setUp();
// cluster.set("bar", "foo") will get JedisMovedDataException and renewSlotCache
cluster.set("bar", "foo");
assertEquals(3, cluster.getClusterNodes().size());
}
}
@Test
@Timeout(30)
public void clusterPeriodTopologyRefreshTest() throws Exception {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(nodeInfo1);
jedisClusterNode.add(nodeInfo2);
jedisClusterNode.add(nodeInfo3);
// we set topologyRefreshPeriod is 1s
Duration topologyRefreshPeriod = Duration.ofSeconds(1);
try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_CLIENT_CONFIG, DEFAULT_POOL_CONFIG,
topologyRefreshPeriod, DEFAULT_REDIRECTIONS, Duration.ofSeconds(10))) {
assertEquals(3, cluster.getClusterNodes().size());
cleanUp(); // cleanup and add node4
// at first, join node4 to cluster
node1.clusterMeet(LOCAL_IP, nodeInfo2.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo3.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo4.getPort());
// split available slots across the three nodes
int slotsPerNode = CLUSTER_HASHSLOTS / 4;
int[] node1Slots = new int[slotsPerNode];
int[] node2Slots = new int[slotsPerNode];
int[] node3Slots = new int[slotsPerNode];
int[] node4Slots = new int[slotsPerNode];
for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0, slot4 = 0; i < CLUSTER_HASHSLOTS; i++) {
if (i < slotsPerNode) {
node1Slots[slot1++] = i;
} else if (i >= slotsPerNode && i < slotsPerNode*2) {
node2Slots[slot2++] = i;
} else if (i >= slotsPerNode*2 && i < slotsPerNode*3) {
node3Slots[slot3++] = i;
} else {
node4Slots[slot4++] = i;
}
}
node1.clusterAddSlots(node1Slots);
node2.clusterAddSlots(node2Slots);
node3.clusterAddSlots(node3Slots);
node4.clusterAddSlots(node4Slots);
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, node4);
// Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (3 -> 4)
Thread.sleep(topologyRefreshPeriod.toMillis() * 3);
assertEquals(4, cluster.getClusterNodes().size());
String nodeKey4 = LOCAL_IP + ":" + nodeInfo4.getPort();
assertTrue(cluster.getClusterNodes().keySet().contains(nodeKey4));
// make 4 nodes to 3 nodes
cleanUp();
setUp();
// Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (4 -> 3)
Thread.sleep(topologyRefreshPeriod.toMillis() * 3);
assertEquals(3, cluster.getClusterNodes().size());
}
}
private static String getNodeServingSlotRange(String infoOutput) {
// f4f3dc4befda352a4e0beccf29f5e8828438705d 127.0.0.1:7380 master - 0
// 1394372400827 0 connected 5461-10922
for (String infoLine : infoOutput.split("\n")) {
if (infoLine.contains("myself")) {
try {
return infoLine.split(" ")[8];
} catch (ArrayIndexOutOfBoundsException e) {
return null;
}
}
}
return null;
}
private void assertNodeHandshakeEnded(Jedis node, int timeoutMs) {
int sleepInterval = 100;
for (int sleepTime = 0; sleepTime <= timeoutMs; sleepTime += sleepInterval) {
boolean isHandshaking = isAnyNodeHandshaking(node);
if (!isHandshaking) return;
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
}
}
throw new JedisException("Node handshaking is not ended");
}
private boolean isAnyNodeHandshaking(Jedis node) {
String infoOutput = node.clusterNodes();
for (String infoLine : infoOutput.split("\n")) {
if (infoLine.contains("handshake")) {
return true;
}
}
return false;
}
}