MultiDbClientTest.java
package redis.clients.jedis;
import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.AfterEach;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.*;
import redis.clients.jedis.MultiDbConfig.DatabaseConfig;
import redis.clients.jedis.exceptions.JedisValidationException;
import redis.clients.jedis.mcf.DatabaseSwitchEvent;
import redis.clients.jedis.mcf.SwitchReason;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
/**
* Basic tests for MultiDbClient functionality.
*/
@Tag("integration")
public class MultiDbClientTest {
private MultiDbClient client;
private static final EndpointConfig endpoint1 = HostAndPorts.getRedisEndpoint("redis-failover-1");
private static final EndpointConfig endpoint2 = HostAndPorts.getRedisEndpoint("redis-failover-2");
private static final ToxiproxyClient tp = new ToxiproxyClient("localhost", 8474);
private static Proxy redisProxy1;
private static Proxy redisProxy2;
@BeforeAll
public static void setupAdminClients() throws IOException {
if (tp.getProxyOrNull("redis-1") != null) {
tp.getProxy("redis-1").delete();
}
if (tp.getProxyOrNull("redis-2") != null) {
tp.getProxy("redis-2").delete();
}
redisProxy1 = tp.createProxy("redis-1", "0.0.0.0:29379", "redis-failover-1:9379");
redisProxy2 = tp.createProxy("redis-2", "0.0.0.0:29380", "redis-failover-2:9380");
}
@BeforeEach
void setUp() {
// Create a simple resilient client with mock endpoints for testing
MultiDbConfig clientConfig = MultiDbConfig.builder()
.database(endpoint1.getHostAndPort(), 100.0f, endpoint1.getClientConfigBuilder().build())
.database(endpoint2.getHostAndPort(), 50.0f, endpoint2.getClientConfigBuilder().build())
.build();
client = MultiDbClient.builder().multiDbConfig(clientConfig).build();
}
@AfterEach
void tearDown() {
if (client != null) {
client.close();
}
}
@Test
void testAddRemoveDatabaseWithEndpointInterface() {
Endpoint newEndpoint = new HostAndPort("unavailable", 6381);
assertDoesNotThrow(
() -> client.addDatabase(newEndpoint, 25.0f, DefaultJedisClientConfig.builder().build()));
assertThat(client.getDatabaseEndpoints(), hasItems(newEndpoint));
assertDoesNotThrow(() -> client.removeDatabase(newEndpoint));
assertThat(client.getDatabaseEndpoints(), not(hasItems(newEndpoint)));
}
@Test
void testAddRemoveDatabaseWithDatabaseConfig() {
// todo : (@ggivo) Replace HostAndPort with Endpoint
HostAndPort newEndpoint = new HostAndPort("unavailable", 6381);
DatabaseConfig newConfig = DatabaseConfig
.builder(newEndpoint, DefaultJedisClientConfig.builder().build()).weight(25.0f).build();
assertDoesNotThrow(() -> client.addDatabase(newConfig));
assertThat(client.getDatabaseEndpoints(), hasItems(newEndpoint));
assertDoesNotThrow(() -> client.removeDatabase(newEndpoint));
assertThat(client.getDatabaseEndpoints(), not(hasItems(newEndpoint)));
}
@Test
void testSetActiveDatabase() {
Endpoint endpoint = client.getActiveDatabaseEndpoint();
awaitIsHealthy(endpoint1.getHostAndPort());
awaitIsHealthy(endpoint2.getHostAndPort());
// Ensure we have a healthy endpoint to switch to
Endpoint newEndpoint = client.getDatabaseEndpoints().stream()
.filter(e -> e.equals(endpoint) && client.isHealthy(e)).findFirst().orElse(null);
assertNotNull(newEndpoint);
// Switch to the new endpoint
client.setActiveDatabase(newEndpoint);
assertEquals(newEndpoint, client.getActiveDatabaseEndpoint());
}
@Test
void testBuilderWithMultipleEndpointTypes() {
MultiDbConfig clientConfig = MultiDbConfig.builder()
.database(endpoint1.getHostAndPort(), 100.0f, DefaultJedisClientConfig.builder().build())
.database(DatabaseConfig
.builder(endpoint2.getHostAndPort(), DefaultJedisClientConfig.builder().build())
.weight(50.0f).build())
.build();
try (MultiDbClient testClient = MultiDbClient.builder().multiDbConfig(clientConfig).build()) {
assertThat(testClient.getDatabaseEndpoints().size(), equalTo(2));
assertThat(testClient.getDatabaseEndpoints(),
hasItems(endpoint1.getHostAndPort(), endpoint2.getHostAndPort()));
}
}
@Test
public void testForceActiveDatabase() {
Endpoint endpoint = client.getActiveDatabaseEndpoint();
// Ensure we have a healthy endpoint to switch to
awaitIsHealthy(endpoint1.getHostAndPort());
awaitIsHealthy(endpoint2.getHostAndPort());
Endpoint newEndpoint = client.getDatabaseEndpoints().stream()
.filter(e -> e.equals(endpoint) && client.isHealthy(e)).findFirst().orElse(null);
assertNotNull(newEndpoint);
// Force switch to the new endpoint for 10 seconds
client.forceActiveDatabase(newEndpoint, Duration.ofMillis(100).toMillis());
// Verify the active endpoint has changed
assertEquals(newEndpoint, client.getActiveDatabaseEndpoint());
}
@Test
public void testForceActiveDatabaseWithNonHealthyEndpoint() {
Endpoint newEndpoint = new HostAndPort("unavailable", 6381);
client.addDatabase(newEndpoint, 25.0f, DefaultJedisClientConfig.builder().build());
assertThrows(JedisValidationException.class,
() -> client.forceActiveDatabase(newEndpoint, Duration.ofMillis(100).toMillis()));
}
@Test
public void testForceActiveDatabaseWithNonExistingEndpoint() {
Endpoint newEndpoint = new HostAndPort("unavailable", 6381);
assertThrows(JedisValidationException.class,
() -> client.forceActiveDatabase(newEndpoint, Duration.ofMillis(100).toMillis()));
}
@Test
public void testWithDatabaseSwitchListener() {
MultiDbConfig endpointsConfig = MultiDbConfig.builder()
.database(DatabaseConfig
.builder(endpoint1.getHostAndPort(), endpoint1.getClientConfigBuilder().build())
.weight(100.0f).build())
.database(DatabaseConfig
.builder(endpoint2.getHostAndPort(), endpoint2.getClientConfigBuilder().build())
.weight(50.0f).build())
.build();
Consumer<DatabaseSwitchEvent> eventConsumer;
List<DatabaseSwitchEvent> events = new ArrayList<>();
eventConsumer = events::add;
try (MultiDbClient testClient = MultiDbClient.builder().databaseSwitchListener(eventConsumer)
.multiDbConfig(endpointsConfig).build()) {
assertThat(events.size(), equalTo(0));
awaitIsHealthy(endpoint2.getHostAndPort());
testClient.setActiveDatabase(endpoint2.getHostAndPort());
assertThat(events.size(), equalTo(1));
assertThat(events.get(0).getEndpoint(), equalTo(endpoint2.getHostAndPort()));
assertThat(events.get(0).getReason(), equalTo(SwitchReason.FORCED));
}
}
private void awaitIsHealthy(HostAndPort hostAndPort) {
await().atMost(Duration.ofSeconds(1)).until(() -> client.isHealthy(hostAndPort));
}
}