ClusterTopologyRefreshIT.java
package redis.clients.jedis.scenario;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.mockito.Mockito.*;
@Tags({ @Tag("scenario") })
public class ClusterTopologyRefreshIT {
private static final Logger log = LoggerFactory.getLogger(ClusterTopologyRefreshIT.class);
private static EndpointConfig endpoint;
private final FaultInjectionClient faultClient = new FaultInjectionClient();
@BeforeAll
public static void beforeClass() {
try {
ClusterTopologyRefreshIT.endpoint = Endpoints.getRedisEndpoint("re-single-shard-oss-cluster");
} catch (IllegalArgumentException e) {
log.warn("Skipping test because no Redis endpoint is configured");
assumeTrue(false);
}
}
@Test
public void testWithPool() {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(endpoint.getHostAndPort());
JedisClientConfig config = endpoint.getClientConfigBuilder()
.socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS)
.connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build();
try (RedisClusterClient client = RedisClusterClient.builder().nodes(jedisClusterNode)
.clientConfig(config).maxAttempts(RecommendedSettings.MAX_RETRIES)
.maxTotalRetriesDuration(RecommendedSettings.MAX_TOTAL_RETRIES_DURATION).build()) {
Set<String> initialNodes = client.getClusterNodes().keySet();
assertEquals(1, initialNodes.size(), "Was this BDB used to run this test before?");
AtomicLong commandsExecuted = new AtomicLong();
// Start thread that imitates an application that uses the client
FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> {
long i = commandsExecuted.getAndIncrement();
client.set(String.valueOf(i), String.valueOf(i));
return true;
});
Thread t = new Thread(fakeApp);
t.start();
HashMap<String, Object> params = new HashMap<>();
params.put("bdb_id", endpoint.getBdbId());
params.put("actions", "[\"reshard\",\"failover\"]");
FaultInjectionClient.TriggerActionResponse actionResponse = null;
try {
log.info("Triggering Resharding and Failover");
actionResponse = faultClient.triggerAction("sequence_of_actions", params);
} catch (IOException e) {
fail("Fault Injection Server error:" + e.getMessage());
}
log.info("Action id: {}", actionResponse.getActionId());
fakeApp.setAction(actionResponse);
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertTrue(fakeApp.capturedExceptions().isEmpty());
log.info("Commands executed: {}", commandsExecuted.get());
for (long i = 0; i < commandsExecuted.get(); i++) {
assertTrue(client.exists(String.valueOf(i)));
}
Set<String> afterReshardNodes = client.getClusterNodes().keySet();
assertThat("After set should have more nodes than initial set", afterReshardNodes.size(),
greaterThan(initialNodes.size()));
boolean hasNewNode = afterReshardNodes.stream().anyMatch(n -> !initialNodes.contains(n));
assertThat("After set should have a node not in initial set", hasNewNode, is(true));
}
}
}