ClusterTopologyRefreshTest.java

package redis.clients.jedis.scenario;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.jedis.providers.ClusterConnectionProvider;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.mockito.Mockito.*;

public class ClusterTopologyRefreshTest {

  private static final Logger log = LoggerFactory.getLogger(ClusterTopologyRefreshTest.class);

  private static EndpointConfig endpoint;

  private final FaultInjectionClient faultClient = new FaultInjectionClient();

  @BeforeAll
  public static void beforeClass() {
    try {
      ClusterTopologyRefreshTest.endpoint = HostAndPorts.getRedisEndpoint("re-single-shard-oss-cluster");
    } catch (IllegalArgumentException e) {
      log.warn("Skipping test because no Redis endpoint is configured");
      assumeTrue(false);
    }
  }

  @Test
  public void testWithPool() {
    Set<HostAndPort> jedisClusterNode = new HashSet<>();
    jedisClusterNode.add(endpoint.getHostAndPort());

    JedisClientConfig config = endpoint.getClientConfigBuilder()
        .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS)
        .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build();


    ClusterConnectionProvider provider = new ClusterConnectionProvider(jedisClusterNode, config, RecommendedSettings.poolConfig);
    ClusterConnectionProvider spyProvider = spy(provider);

    try (JedisCluster client = new JedisCluster(spyProvider,
        RecommendedSettings.MAX_RETRIES, RecommendedSettings.MAX_TOTAL_RETRIES_DURATION)) {
      assertEquals(1,
          client.getClusterNodes().size(),"Was this BDB used to run this test before?");

      AtomicLong commandsExecuted = new AtomicLong();

      // Start thread that imitates an application that uses the client
      FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> {
        long i = commandsExecuted.getAndIncrement();
        client.set(String.valueOf(i), String.valueOf(i));
        return true;
      });

      Thread t = new Thread(fakeApp);
      t.start();

      HashMap<String, Object> params = new HashMap<>();
      params.put("bdb_id", endpoint.getBdbId());
      params.put("actions", "[\"reshard\",\"failover\"]");

      FaultInjectionClient.TriggerActionResponse actionResponse = null;

      try {
        log.info("Triggering Resharding and Failover");
        actionResponse = faultClient.triggerAction("sequence_of_actions", params);
      } catch (IOException e) {
        fail("Fault Injection Server error:" + e.getMessage());
      }

      log.info("Action id: {}", actionResponse.getActionId());
      fakeApp.setAction(actionResponse);

      try {
        t.join();
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }

      assertTrue(fakeApp.capturedExceptions().isEmpty());

      log.info("Commands executed: {}", commandsExecuted.get());
      for (long i = 0; i < commandsExecuted.get(); i++) {
        assertTrue(client.exists(String.valueOf(i)));
      }

      verify(spyProvider, atLeast(2)).renewSlotCache(any(Connection.class));
    }
  }

}