ClusterCommandExecutorTest.java
package redis.clients.jedis.executors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
import redis.clients.jedis.*;
import redis.clients.jedis.util.JedisClusterCRC16;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.ReflectionTestUtil;
public class ClusterCommandExecutorTest {
private static final Duration ONE_SECOND = Duration.ofSeconds(1);
private static final CommandObject<String> STR_COM_OBJECT = new CommandObject<>(
new CommandArguments(Protocol.Command.GET).key("testkey"), BuilderFactory.STRING);
// Keyless command object for testing keyless command execution with WRITE flag (FLUSHDB is
// keyless and WRITE)
private static final CommandObject<String> KEYLESS_WRITE_COM_OBJECT = new CommandObject<>(
new CommandArguments(Protocol.Command.FLUSHDB), BuilderFactory.STRING);
/**
* Helper method to invoke the private executeKeylessCommand method via reflection.
*/
@SuppressWarnings("unchecked")
private static <T> T invokeExecuteKeylessCommand(ClusterCommandExecutor executor,
CommandObject<T> commandObject) {
return ReflectionTestUtil.invokeMethod(executor, "executeKeylessCommand",
new Class<?>[] { CommandObject.class }, commandObject);
}
@Test
public void runSuccessfulExecute() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Connection connection = mock(Connection.class);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(connection);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
return (T) "foo";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
assertEquals("foo", testMe.executeCommand(STR_COM_OBJECT));
}
@Test
public void runFailOnFirstExecSuccessOnSecondExec() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Connection connection = mock(Connection.class);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(connection);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, ONE_SECOND,
StaticCommandFlagsRegistry.registry()) {
boolean isFirstCall = true;
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
if (isFirstCall) {
isFirstCall = false;
throw new JedisConnectionException("Borkenz");
}
return (T) "foo";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
assertEquals("foo", testMe.executeCommand(STR_COM_OBJECT));
}
@Test
public void runAlwaysFailing() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Connection connection = mock(Connection.class);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(connection);
final LongConsumer sleep = mock(LongConsumer.class);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 3, ONE_SECOND,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
throw new JedisConnectionException("Connection failed");
}
@Override
protected void sleep(long sleepMillis) {
sleep.accept(sleepMillis);
}
};
try {
testMe.executeCommand(STR_COM_OBJECT);
fail("cluster command did not fail");
} catch (JedisClusterOperationException e) {
// expected
}
InOrder inOrder = inOrder(connectionHandler, sleep);
inOrder.verify(connectionHandler, times(2))
.getConnection(ArgumentMatchers.any(CommandArguments.class));
inOrder.verify(sleep).accept(ArgumentMatchers.anyLong());
inOrder.verify(connectionHandler).renewSlotCache();
inOrder.verify(connectionHandler).getConnection(ArgumentMatchers.any(CommandArguments.class));
inOrder.verifyNoMoreInteractions();
}
@Test
public void runMovedSuccess() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Connection connection = mock(Connection.class);
final HostAndPort movedTarget = new HostAndPort(null, 0);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(connection);
when(connectionHandler.getConnection(movedTarget)).thenReturn(connection);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, ONE_SECOND,
StaticCommandFlagsRegistry.registry()) {
boolean isFirstCall = true;
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
if (isFirstCall) {
isFirstCall = false;
// Slot 0 moved
throw new JedisMovedDataException("", movedTarget, 0);
}
return (T) "foo";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
assertEquals("foo", testMe.executeCommand(STR_COM_OBJECT));
InOrder inOrder = inOrder(connectionHandler);
inOrder.verify(connectionHandler).getConnection(ArgumentMatchers.any(CommandArguments.class));
inOrder.verify(connectionHandler).renewSlotCache(ArgumentMatchers.any());
inOrder.verify(connectionHandler).getConnection(movedTarget);
inOrder.verifyNoMoreInteractions();
}
@Test
public void runAskSuccess() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Connection connection = mock(Connection.class);
final HostAndPort askTarget = new HostAndPort(null, 0);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(connection);
when(connectionHandler.getConnection(askTarget)).thenReturn(connection);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, ONE_SECOND,
StaticCommandFlagsRegistry.registry()) {
boolean isFirstCall = true;
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
if (isFirstCall) {
isFirstCall = false;
// Slot 0 moved
throw new JedisAskDataException("", askTarget, 0);
}
return (T) "foo";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
assertEquals("foo", testMe.executeCommand(STR_COM_OBJECT));
InOrder inOrder = inOrder(connectionHandler, connection);
inOrder.verify(connectionHandler).getConnection(ArgumentMatchers.any(CommandArguments.class));
inOrder.verify(connectionHandler).getConnection(askTarget);
// inOrder.verify(connection).asking();
inOrder.verify(connection).close(); // From the finally clause in runWithRetries()
inOrder.verifyNoMoreInteractions();
}
// requires 'execute(Connection connection, CommandObject<T> commandObject)' separately
@Test
public void runMovedThenAllNodesFailing() {
// Test:
// First attempt is a JedisMovedDataException() move, because we asked the wrong node.
// All subsequent attempts are JedisConnectionExceptions, because all nodes are now down.
// In response to the JedisConnectionExceptions, run() retries random nodes until maxAttempts is
// reached.
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
final Connection redirecter = mock(Connection.class);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(redirecter);
final Connection failer = mock(Connection.class);
when(connectionHandler.getConnection(ArgumentMatchers.any(HostAndPort.class)))
.thenReturn(failer);
Mockito.doAnswer((InvocationOnMock invocation) -> {
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(failer);
return null;
}).when(connectionHandler).renewSlotCache();
final LongConsumer sleep = mock(LongConsumer.class);
final HostAndPort movedTarget = new HostAndPort(null, 0);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 5, ONE_SECOND,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
if (redirecter == connection) {
// First attempt, report moved
throw new JedisMovedDataException("Moved", movedTarget, 0);
}
if (failer == connection) {
// Second attempt in response to the move, report failure
throw new JedisConnectionException("Connection failed");
}
throw new IllegalStateException("Should have thrown jedis exception");
}
@Override
protected void sleep(long sleepMillis) {
sleep.accept(sleepMillis);
}
};
try {
testMe.executeCommand(STR_COM_OBJECT);
fail("cluster command did not fail");
} catch (JedisClusterOperationException e) {
// expected
}
InOrder inOrder = inOrder(connectionHandler, sleep);
inOrder.verify(connectionHandler).getConnection(ArgumentMatchers.any(CommandArguments.class));
inOrder.verify(connectionHandler).renewSlotCache(redirecter);
inOrder.verify(connectionHandler, times(2)).getConnection(movedTarget);
inOrder.verify(sleep).accept(ArgumentMatchers.anyLong());
inOrder.verify(connectionHandler).renewSlotCache();
inOrder.verify(connectionHandler, times(2))
.getConnection(ArgumentMatchers.any(CommandArguments.class));
inOrder.verify(sleep).accept(ArgumentMatchers.anyLong());
inOrder.verify(connectionHandler).renewSlotCache();
inOrder.verifyNoMoreInteractions();
}
// requires 'execute(Connection connection, CommandObject<T> commandObject)' separately
@Test
public void runMasterFailingReplicaRecovering() {
// We have two nodes, master and replica, and master has just gone down permanently.
//
// Test:
// 1. We try to contact master => JedisConnectionException
// 2. We try to contact master => JedisConnectionException
// 3. sleep and renew
// 4. We try to contact replica => Success, because it has now failed over
final Connection master = mock(Connection.class);
when(master.toString()).thenReturn("master");
final Connection replica = mock(Connection.class);
when(replica.toString()).thenReturn("replica");
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(master);
Mockito.doAnswer((InvocationOnMock invocation) -> {
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(replica);
return null;
}).when(connectionHandler).renewSlotCache();
final AtomicLong totalSleepMs = new AtomicLong();
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 5, ONE_SECOND,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
assertNotNull(connection);
if (connection.toString().equals("master")) {
throw new JedisConnectionException("Master is down");
}
assert connection.toString().equals("replica");
return (T) "Success!";
}
@Override
protected void sleep(long sleepMillis) {
// assert sleepMillis > 0;
totalSleepMs.addAndGet(sleepMillis);
}
};
assertEquals("Success!", testMe.executeCommand(STR_COM_OBJECT));
InOrder inOrder = inOrder(connectionHandler);
inOrder.verify(connectionHandler, times(2))
.getConnection(ArgumentMatchers.any(CommandArguments.class));
inOrder.verify(connectionHandler).renewSlotCache();
inOrder.verify(connectionHandler).getConnection(ArgumentMatchers.any(CommandArguments.class));
inOrder.verifyNoMoreInteractions();
MatcherAssert.assertThat(totalSleepMs.get(), Matchers.greaterThan(0L));
}
@Test
public void runRethrowsJedisNoReachableClusterNodeException() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenThrow(JedisClusterOperationException.class);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
return null;
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
assertThrows(JedisClusterOperationException.class, () -> testMe.executeCommand(STR_COM_OBJECT));
}
@Test
public void runStopsRetryingAfterTimeout() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Connection connection = mock(Connection.class);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(connection);
// final LongConsumer sleep = mock(LongConsumer.class);
final AtomicLong totalSleepMs = new AtomicLong();
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 3, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
try {
// exceed deadline
Thread.sleep(2L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new JedisConnectionException("Connection failed");
}
@Override
protected void sleep(long sleepMillis) {
// sleep.accept(sleepMillis);
totalSleepMs.addAndGet(sleepMillis);
}
};
try {
testMe.executeCommand(STR_COM_OBJECT);
fail("cluster command did not fail");
} catch (JedisClusterOperationException e) {
// expected
}
// InOrder inOrder = inOrder(connectionHandler, sleep);
InOrder inOrder = inOrder(connectionHandler);
inOrder.verify(connectionHandler).getConnection(ArgumentMatchers.any(CommandArguments.class));
inOrder.verifyNoMoreInteractions();
assertEquals(0L, totalSleepMs.get());
}
@Test
public void runSuccessfulExecuteKeylessCommand() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new HashMap<>();
ConnectionPool pool = mock(ConnectionPool.class);
Connection connection = mock(Connection.class);
connectionMap.put("localhost:6379", pool);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool.getResource()).thenReturn(connection);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
return (T) "OK";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
assertEquals("OK", invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT));
}
@Test
public void runKeylessCommandUsesConnectionMapRoundRobin() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new HashMap<>();
ConnectionPool pool = mock(ConnectionPool.class);
Connection connection = mock(Connection.class);
connectionMap.put("localhost:6379", pool);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool.getResource()).thenReturn(connection);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
return (T) "OK";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT);
// Verify that getPrimaryNodesConnectionMap() was called for round-robin distribution
InOrder inOrder = inOrder(connectionHandler, pool, connection);
inOrder.verify(connectionHandler).getPrimaryNodesConnectionMap();
inOrder.verify(pool).getResource();
inOrder.verify(connection).close();
inOrder.verifyNoMoreInteractions();
}
@Test
public void runKeylessCommandThrowsOnRedirections() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new HashMap<>();
ConnectionPool pool = mock(ConnectionPool.class);
Connection connection1 = mock(Connection.class);
final HostAndPort movedTarget = new HostAndPort("127.0.0.1", 6380);
final int slot = 12345;
connectionMap.put("localhost:6379", pool);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool.getResource()).thenReturn(connection1);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, ONE_SECOND,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
// When followRedirections=false, redirections should be thrown as exceptions
throw new JedisMovedDataException("MOVED " + slot, movedTarget, slot);
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// When followRedirections=false, the redirection exception should be thrown
JedisMovedDataException exception = assertThrows(JedisMovedDataException.class,
() -> invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT));
// Verify exception contains correct information
assertEquals(movedTarget, exception.getTargetNode());
assertEquals(slot, exception.getSlot());
// Verify that we only tried once (no retry after redirection)
verify(connectionHandler, times(1)).getPrimaryNodesConnectionMap();
verify(pool, times(1)).getResource();
verify(connection1).close();
}
@Test
public void runKeylessCommandThrowsAskDataExceptionOnAskRedirection() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new HashMap<>();
ConnectionPool pool = mock(ConnectionPool.class);
Connection connection1 = mock(Connection.class);
final HostAndPort askTarget = new HostAndPort("127.0.0.1", 6381);
final int slot = 9999;
connectionMap.put("localhost:6379", pool);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool.getResource()).thenReturn(connection1);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, ONE_SECOND,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
// When followRedirections=false, ASK redirections should be thrown as exceptions
throw new JedisAskDataException("ASK " + slot, askTarget, slot);
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// When followRedirections=false, the ASK exception should be thrown
JedisAskDataException exception = assertThrows(JedisAskDataException.class,
() -> invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT));
// Verify exception contains correct information
assertEquals(askTarget, exception.getTargetNode());
assertEquals(slot, exception.getSlot());
// Verify that we only tried once (no retry after redirection)
verify(connectionHandler, times(1)).getPrimaryNodesConnectionMap();
verify(pool, times(1)).getResource();
verify(connection1).close();
}
@Test
public void runKeylessCommandFailsAfterMaxAttempts() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new HashMap<>();
ConnectionPool pool = mock(ConnectionPool.class);
Connection connection1 = mock(Connection.class);
Connection connection2 = mock(Connection.class);
Connection connection3 = mock(Connection.class);
final LongConsumer sleep = mock(LongConsumer.class);
connectionMap.put("localhost:6379", pool);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool.getResource()).thenReturn(connection1, connection2, connection3);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 3, ONE_SECOND,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
throw new JedisConnectionException("Connection failed");
}
@Override
protected void sleep(long sleepMillis) {
sleep.accept(sleepMillis);
}
};
try {
invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT);
fail("keyless command did not fail");
} catch (JedisClusterOperationException e) {
// expected
}
// Verify that we tried connection map access and performed slot cache renewal
// getPrimaryNodesConnectionMap() called 3 times (once for each connection attempt)
// getResource() called 3 times, sleep called once, renewSlotCache called once
verify(connectionHandler, times(3)).getPrimaryNodesConnectionMap();
verify(pool, times(3)).getResource();
verify(connection1).close();
verify(connection2).close();
verify(connection3).close();
verify(sleep).accept(ArgumentMatchers.anyLong());
verify(connectionHandler).renewSlotCache();
}
@Test
public void runKeylessCommandFailsWithEmptyConnectionMap() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> emptyConnectionMap = new HashMap<>();
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(emptyConnectionMap);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 3, ONE_SECOND,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
return (T) "should_not_reach_here";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
try {
invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT);
fail("keyless command should fail with empty connection map");
} catch (JedisClusterOperationException e) {
assertEquals("No cluster nodes available.", e.getMessage());
}
// Verify that getPrimaryNodesConnectionMap() was called
verify(connectionHandler).getPrimaryNodesConnectionMap();
}
@Test
public void runKeylessCommandRoundRobinDistribution() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new HashMap<>();
// Create multiple pools to test round-robin
ConnectionPool pool1 = mock(ConnectionPool.class);
ConnectionPool pool2 = mock(ConnectionPool.class);
ConnectionPool pool3 = mock(ConnectionPool.class);
Connection connection1 = mock(Connection.class);
Connection connection2 = mock(Connection.class);
Connection connection3 = mock(Connection.class);
connectionMap.put("localhost:6379", pool1);
connectionMap.put("localhost:6380", pool2);
connectionMap.put("localhost:6381", pool3);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool1.getResource()).thenReturn(connection1);
when(pool2.getResource()).thenReturn(connection2);
when(pool3.getResource()).thenReturn(connection3);
// Track which connections are used
List<Connection> usedConnections = new ArrayList<>();
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
usedConnections.add(connection);
return (T) "OK";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// Execute multiple keyless commands to verify round-robin
invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT);
invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT);
invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT);
invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT); // Should cycle back to first
// Verify round-robin behavior - should cycle through all connections
assertEquals(4, usedConnections.size());
Set<Connection> uniqueConnections = new HashSet<>(usedConnections);
assertEquals(3, uniqueConnections.size(),
"Round-robin should distribute across multiple nodes");
}
@Test
public void runKeylessCommandCircularCounterNeverOverflows() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new HashMap<>();
// Create 3 pools to test circular behavior
ConnectionPool pool1 = mock(ConnectionPool.class);
ConnectionPool pool2 = mock(ConnectionPool.class);
ConnectionPool pool3 = mock(ConnectionPool.class);
Connection connection1 = mock(Connection.class);
Connection connection2 = mock(Connection.class);
Connection connection3 = mock(Connection.class);
connectionMap.put("node1:6379", pool1);
connectionMap.put("node2:6379", pool2);
connectionMap.put("node3:6379", pool3);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool1.getResource()).thenReturn(connection1);
when(pool2.getResource()).thenReturn(connection2);
when(pool3.getResource()).thenReturn(connection3);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
return (T) "OK";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// Execute many commands to test circular behavior
// With our implementation using getAndUpdate(current -> (current + 1) % nodeCount),
// the counter never exceeds nodeCount-1, so overflow is impossible
for (int i = 0; i < 100; i++) {
String result = invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT);
assertEquals("OK", result);
}
// Verify that getPrimaryNodesConnectionMap() was called for each execution
verify(connectionHandler, times(100)).getPrimaryNodesConnectionMap();
// The circular counter implementation ensures no overflow can occur
// because the counter value is always between 0 and (nodeCount-1)
}
@Test
public void runKeylessCommandEvenDistributionRoundRobin() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new HashMap<>();
// Create 4 pools to test even distribution
ConnectionPool pool1 = mock(ConnectionPool.class);
ConnectionPool pool2 = mock(ConnectionPool.class);
ConnectionPool pool3 = mock(ConnectionPool.class);
ConnectionPool pool4 = mock(ConnectionPool.class);
Connection connection1 = mock(Connection.class);
Connection connection2 = mock(Connection.class);
Connection connection3 = mock(Connection.class);
Connection connection4 = mock(Connection.class);
// Use ordered map to ensure consistent iteration order for testing
connectionMap.put("node1:6379", pool1);
connectionMap.put("node2:6379", pool2);
connectionMap.put("node3:6379", pool3);
connectionMap.put("node4:6379", pool4);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool1.getResource()).thenReturn(connection1);
when(pool2.getResource()).thenReturn(connection2);
when(pool3.getResource()).thenReturn(connection3);
when(pool4.getResource()).thenReturn(connection4);
// Track connection usage count
Map<Connection, Integer> connectionUsage = new HashMap<>();
connectionUsage.put(connection1, 0);
connectionUsage.put(connection2, 0);
connectionUsage.put(connection3, 0);
connectionUsage.put(connection4, 0);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
connectionUsage.put(connection, connectionUsage.get(connection) + 1);
return (T) "OK";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// Execute commands - should be evenly distributed
int totalCommands = 40; // Multiple of 4 for perfect distribution
for (int i = 0; i < totalCommands; i++) {
invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT);
}
// Verify even distribution - each node should get exactly 10 commands
int expectedPerNode = totalCommands / 4;
assertEquals(expectedPerNode, connectionUsage.get(connection1).intValue(),
"Node 1 should receive exactly " + expectedPerNode + " commands");
assertEquals(expectedPerNode, connectionUsage.get(connection2).intValue(),
"Node 2 should receive exactly " + expectedPerNode + " commands");
assertEquals(expectedPerNode, connectionUsage.get(connection3).intValue(),
"Node 3 should receive exactly " + expectedPerNode + " commands");
assertEquals(expectedPerNode, connectionUsage.get(connection4).intValue(),
"Node 4 should receive exactly " + expectedPerNode + " commands");
// Verify total commands executed
int totalExecuted = connectionUsage.values().stream().mapToInt(Integer::intValue).sum();
assertEquals(totalCommands, totalExecuted, "Total commands executed should match");
// Verify that getPrimaryNodesConnectionMap() was called for each execution
verify(connectionHandler, times(totalCommands)).getPrimaryNodesConnectionMap();
}
@Test
public void runKeylessCommandRoundRobinSequence() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new HashMap<>();
// Create 3 pools for simpler sequence verification
ConnectionPool pool1 = mock(ConnectionPool.class);
ConnectionPool pool2 = mock(ConnectionPool.class);
ConnectionPool pool3 = mock(ConnectionPool.class);
Connection connection1 = mock(Connection.class);
Connection connection2 = mock(Connection.class);
Connection connection3 = mock(Connection.class);
// Use LinkedHashMap to ensure consistent iteration order
connectionMap = new java.util.LinkedHashMap<>();
connectionMap.put("node1:6379", pool1);
connectionMap.put("node2:6379", pool2);
connectionMap.put("node3:6379", pool3);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool1.getResource()).thenReturn(connection1);
when(pool2.getResource()).thenReturn(connection2);
when(pool3.getResource()).thenReturn(connection3);
// Track the exact sequence of connections used
List<String> connectionSequence = new ArrayList<>();
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
if (connection == connection1) {
connectionSequence.add("node1");
} else if (connection == connection2) {
connectionSequence.add("node2");
} else if (connection == connection3) {
connectionSequence.add("node3");
}
return (T) "OK";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// Execute 9 commands to see 3 complete cycles
for (int i = 0; i < 9; i++) {
invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT);
}
// Verify the round-robin sequence
List<String> expectedSequence = new ArrayList<>();
expectedSequence.add("node1");
expectedSequence.add("node2");
expectedSequence.add("node3"); // First cycle
expectedSequence.add("node1");
expectedSequence.add("node2");
expectedSequence.add("node3"); // Second cycle
expectedSequence.add("node1");
expectedSequence.add("node2");
expectedSequence.add("node3"); // Third cycle
assertEquals(expectedSequence, connectionSequence,
"Round-robin should follow exact sequence: node1 -> node2 -> node3 -> node1 -> ...");
}
@Test
public void runKeylessCommandWithReadOnlyCommandUsesAllNodesConnectionMap() {
// Create a read-only command object using GET command (which has READONLY flag)
CommandObject<String> readOnlyCommandObject = new CommandObject<>(
new CommandArguments(Protocol.Command.GET).key("testkey"), BuilderFactory.STRING);
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> allNodesConnectionMap = new HashMap<>();
ConnectionPool pool = mock(ConnectionPool.class);
Connection connection = mock(Connection.class);
// Setup connection map with all nodes (including replicas)
allNodesConnectionMap.put("primary:6379", pool);
allNodesConnectionMap.put("replica:6380", pool);
// For read-only commands, getConnectionMap() should be called (all nodes including replicas)
when(connectionHandler.getConnectionMap()).thenReturn(allNodesConnectionMap);
when(pool.getResource()).thenReturn(connection);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
return (T) "readonly_result";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
assertEquals("readonly_result", invokeExecuteKeylessCommand(testMe, readOnlyCommandObject));
// Verify that getConnectionMap() was called (for read-only commands, uses all nodes)
// and NOT getPrimaryNodesConnectionMap()
verify(connectionHandler).getConnectionMap();
verify(connectionHandler, times(0)).getPrimaryNodesConnectionMap();
verify(pool).getResource();
verify(connection).close();
}
@Test
public void runKeylessCommandWithWriteCommandUsesPrimaryNodesConnectionMap() {
// Create a write command object using SET command (which has WRITE flag, not READONLY)
CommandObject<String> writeCommandObject = new CommandObject<>(
new CommandArguments(Protocol.Command.SET).key("testkey").add("value"),
BuilderFactory.STRING);
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> primaryNodesConnectionMap = new HashMap<>();
ConnectionPool pool = mock(ConnectionPool.class);
Connection connection = mock(Connection.class);
// Setup connection map with only primary nodes
primaryNodesConnectionMap.put("primary:6379", pool);
// For write commands, getPrimaryNodesConnectionMap() should be called
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(primaryNodesConnectionMap);
when(pool.getResource()).thenReturn(connection);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
return (T) "write_result";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
assertEquals("write_result", invokeExecuteKeylessCommand(testMe, writeCommandObject));
// Verify that getPrimaryNodesConnectionMap() was called (for write commands, uses only
// primaries)
// and NOT getConnectionMap()
verify(connectionHandler).getPrimaryNodesConnectionMap();
verify(connectionHandler, times(0)).getConnectionMap();
verify(pool).getResource();
verify(connection).close();
}
/**
* Provides command objects for commands that use ONE_SUCCEEDED response policy. These commands
* should return success if at least one node succeeds.
*/
static java.util.stream.Stream<CommandObject<String>> oneSucceededPolicyCommands() {
return java.util.stream.Stream.of(
new CommandObject<>(new CommandArguments(Protocol.Command.SCRIPT).add("KILL"),
BuilderFactory.STRING),
new CommandObject<>(new CommandArguments(Protocol.Command.FUNCTION).add("KILL"),
BuilderFactory.STRING));
}
/**
* This test verifies the bug: broadcastCommand throws on partial failure ignoring ONE_SUCCEEDED
* policy. When any node throws an exception, isErrored is set to true, which causes subsequent
* successful replies to be skipped and the method to unconditionally throw
* JedisBroadcastException. For ONE_SUCCEEDED response policy (used by SCRIPT KILL, FUNCTION
* KILL), the method needs to return success if at least one node succeeded. Currently, a single
* node failure causes the entire broadcast to fail even when other nodes succeed. NOTE: This test
* FAILS when the bug exists, demonstrating the issue. When the bug is fixed, this test will pass.
*/
@ParameterizedTest
@MethodSource("oneSucceededPolicyCommands")
public void broadcastCommandShouldSucceedWithOneSucceededPolicyWhenSomeNodesFail(
CommandObject<String> killCommand) {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new java.util.LinkedHashMap<>();
// Create 3 node pools
ConnectionPool pool1 = mock(ConnectionPool.class);
ConnectionPool pool2 = mock(ConnectionPool.class);
ConnectionPool pool3 = mock(ConnectionPool.class);
Connection connection1 = mock(Connection.class);
Connection connection2 = mock(Connection.class);
Connection connection3 = mock(Connection.class);
connectionMap.put("node1:6379", pool1);
connectionMap.put("node2:6379", pool2);
connectionMap.put("node3:6379", pool3);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool1.getResource()).thenReturn(connection1);
when(pool2.getResource()).thenReturn(connection2);
when(pool3.getResource()).thenReturn(connection3);
List<String> nodeResults = new ArrayList<>();
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
int callCount = 0;
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
callCount++;
if (callCount == 1 || callCount == 3) {
// First and third nodes fail (e.g., no script/function running)
nodeResults.add("FAIL");
throw new JedisDataException("NOTBUSY No scripts in execution right now.");
}
// Second node succeeds
nodeResults.add("OK");
return (T) "OK";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// With ONE_SUCCEEDED policy, the broadcast should succeed because at least one node returned OK
// BUG: Currently this throws JedisBroadcastException because isErrored=true ignores the policy
String result = testMe.broadcastCommand(killCommand, true);
assertEquals("OK", result, "broadcastCommand should return OK when at least one node succeeds "
+ "with ONE_SUCCEEDED policy");
// Verify that all nodes were queried
assertEquals(3, nodeResults.size(), "Should have queried all 3 nodes");
assertTrue(nodeResults.contains("OK"), "At least one node should have succeeded");
}
/**
* This test verifies that broadcastCommand throws JedisMovedDataException when a node responds
* with a MOVED redirection, since broadcastCommand uses followRedirections=false. The exception
* should contain the correct target node and slot information.
*/
@Test
public void broadcastCommandThrowsMovedDataExceptionOnRedirection() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new java.util.LinkedHashMap<>();
ConnectionPool pool1 = mock(ConnectionPool.class);
Connection connection1 = mock(Connection.class);
final HostAndPort movedTarget = new HostAndPort("127.0.0.1", 6382);
final int slot = 7777;
connectionMap.put("node1:6379", pool1);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool1.getResource()).thenReturn(connection1);
CommandObject<String> flushdbCommand = new CommandObject<>(
new CommandArguments(Protocol.Command.FLUSHDB), BuilderFactory.STRING);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, ONE_SECOND,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
// Simulate MOVED redirection
throw new JedisMovedDataException("MOVED " + slot, movedTarget, slot);
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// broadcastCommand uses followRedirections=false, so it should throw the exception
// (caught by the aggregator as an error for that node)
// In this case with only one node, the broadcast will fail
try {
testMe.broadcastCommand(flushdbCommand, true);
fail("broadcastCommand should have thrown an exception when redirection occurs");
} catch (Exception e) {
// The exception should be caught by the aggregator; verify the root cause is a redirection
// The aggregator collects errors per node
}
// Verify that we tried to execute the command
verify(pool1, times(1)).getResource();
verify(connection1).close();
}
/**
* This test verifies that broadcastCommand throws JedisAskDataException when a node responds with
* an ASK redirection, since broadcastCommand uses followRedirections=false. The exception should
* contain the correct target node and slot information.
*/
@Test
public void broadcastCommandThrowsAskDataExceptionOnRedirection() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new java.util.LinkedHashMap<>();
ConnectionPool pool1 = mock(ConnectionPool.class);
Connection connection1 = mock(Connection.class);
final HostAndPort askTarget = new HostAndPort("127.0.0.1", 6383);
final int slot = 8888;
connectionMap.put("node1:6379", pool1);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool1.getResource()).thenReturn(connection1);
CommandObject<String> flushdbCommand = new CommandObject<>(
new CommandArguments(Protocol.Command.FLUSHDB), BuilderFactory.STRING);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, ONE_SECOND,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
// Simulate ASK redirection
throw new JedisAskDataException("ASK " + slot, askTarget, slot);
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// broadcastCommand uses followRedirections=false, so it should throw the exception
// (caught by the aggregator as an error for that node)
try {
testMe.broadcastCommand(flushdbCommand, true);
fail("broadcastCommand should have thrown an exception when redirection occurs");
} catch (Exception e) {
// The exception should be caught by the aggregator; verify the root cause is a redirection
}
// Verify that we tried to execute the command
verify(pool1, times(1)).getResource();
verify(connection1).close();
}
/**
* This test verifies the bug: executeMultiShardCommand throws on partial failure ignoring
* ONE_SUCCEEDED policy. The same issue as broadcastCommand exists in executeMultiShardCommand:
* when any shard throws an exception, isErrored is set to true, which causes subsequent
* successful replies to be skipped and the method to unconditionally throw
* JedisBroadcastException. For ONE_SUCCEEDED response policy, the method needs to return success
* if at least one shard succeeded. NOTE: This test FAILS when the bug exists, demonstrating the
* issue. When the bug is fixed, this test will pass.
*/
@Test
public void executeMultiShardCommandShouldSucceedWithOneSucceededPolicyWhenSomeShardsFail() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Connection connection = mock(Connection.class);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(connection);
// Create multiple command objects to simulate multi-shard execution
// Using SCRIPT KILL which has ONE_SUCCEEDED policy
List<CommandObject<String>> commandObjects = new ArrayList<>();
commandObjects.add(new CommandObject<>(
new CommandArguments(Protocol.Command.SCRIPT).add("KILL"), BuilderFactory.STRING));
commandObjects.add(new CommandObject<>(
new CommandArguments(Protocol.Command.SCRIPT).add("KILL"), BuilderFactory.STRING));
commandObjects.add(new CommandObject<>(
new CommandArguments(Protocol.Command.SCRIPT).add("KILL"), BuilderFactory.STRING));
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
int callCount = 0;
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
callCount++;
if (callCount == 1) {
// First shard fails
throw new JedisDataException("NOTBUSY No scripts in execution right now.");
}
// Other shards succeed
return (T) "OK";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// With ONE_SUCCEEDED policy, the multi-shard command should succeed because at least one
// shard returned OK
// BUG: Currently this throws JedisBroadcastException because isErrored=true ignores the policy
String result = testMe.executeMultiShardCommand(commandObjects);
assertEquals("OK", result, "executeMultiShardCommand should return OK when at least one shard "
+ "succeeds with ONE_SUCCEEDED policy");
}
/**
* This test verifies the bug: MGET multi-shard silently returns values in wrong order. The issue
* is that mgetMultiShard groups keys by hash slot using a HashMap, which doesn't preserve
* insertion order. The aggregated result concatenates per-slot lists in arbitrary order, breaking
* MGET's contract that returned values correspond positionally to input keys. Callers using
* index-based access (e.g., values.get(i) for keys[i]) will silently get wrong values when keys
* span multiple slots. This test runs MGET with many different key combinations to find at least
* one case where the HashMap iteration order differs from insertion order, proving the bug
* exists. NOTE: This test FAILS when the bug exists, demonstrating the issue. When the bug is
* fixed (e.g., by using LinkedHashMap to preserve insertion order), this test will pass.
*/
/**
* This test verifies that MGET multi-shard returns values in the correct order matching the input
* keys, even when keys span multiple hash slots. The fix groups consecutive keys with the same
* slot together, but keeps non-consecutive keys with the same slot in separate commands to
* preserve order. For example, keys mapping to slots [A, B, A] result in 3 separate commands, not
* 2, ensuring the concatenated results match the input key order.
*/
@Test
public void mgetMultiShardReturnsValuesInCorrectOrderWhenKeysSpanMultipleSlots() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Connection connection = mock(Connection.class);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(connection);
ClusterCommandObjects commandObjects = new ClusterCommandObjects();
// Try many different key combinations including ones where keys hash to different slots
for (int i = 0; i < 100; i++) {
// Generate unique keys that are likely to hash to different slots
String key1 = "testkey_a_" + i;
String key2 = "testkey_b_" + i;
String key3 = "testkey_c_" + i;
int slot1 = JedisClusterCRC16.getSlot(key1);
int slot2 = JedisClusterCRC16.getSlot(key2);
int slot3 = JedisClusterCRC16.getSlot(key3);
// Only test if all keys hash to different slots (the challenging case)
if (slot1 == slot2 || slot2 == slot3 || slot1 == slot3) {
continue;
}
// Use the standard MGET multi-shard API
List<CommandObject<List<String>>> mgetCommands = commandObjects.mgetMultiShard(key1, key2,
key3);
List<String> inputOrder = java.util.Arrays.asList(key1, key2, key3);
// Execute MGET with the standard executeMultiShardCommand
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10,
Duration.ZERO, StaticCommandFlagsRegistry.registry()) {
@Override
@SuppressWarnings("unchecked")
public <T> T execute(Connection conn, CommandObject<T> commandObject) {
List<String> values = new ArrayList<>();
CommandArguments args = commandObject.getArguments();
boolean isFirst = true;
for (redis.clients.jedis.args.Rawable arg : args) {
if (isFirst) {
isFirst = false;
continue;
}
String key = new String(arg.getRaw());
values.add("value_for_" + key);
}
return (T) values;
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// Use the standard executeMultiShardCommand method
List<String> result = testMe.executeMultiShardCommand(mgetCommands);
// Expected correct order based on input - MGET contract says values must match key positions
List<String> expectedCorrectOrder = java.util.Arrays.asList("value_for_" + key1,
"value_for_" + key2, "value_for_" + key3);
// Verify all values are present
assertEquals(3, result.size(), "Should have 3 values");
assertTrue(result.contains("value_for_" + key1));
assertTrue(result.contains("value_for_" + key2));
assertTrue(result.contains("value_for_" + key3));
// THE KEY ASSERTION: Values must be in the same order as input keys
// With the fix (grouping consecutive keys only), this should pass for all key combinations
assertEquals(expectedCorrectOrder, result,
"MGET multi-shard should return values in the same order as input keys. " + "Input keys: "
+ inputOrder + ", slots: [" + slot1 + ", " + slot2 + ", " + slot3 + "]");
}
}
/**
* This test verifies that MGET multi-shard returns values in the correct order when keys have
* interleaved hash slots (e.g., [slotA, slotB, slotA]). This is the critical case that the fix
* addresses: without the fix, keys with the same slot would be grouped together, producing 2
* commands instead of 3, which would return values in wrong order.
*/
@Test
public void mgetMultiShardReturnsValuesInCorrectOrderForInterleavedSlots() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Connection connection = mock(Connection.class);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(connection);
String key1 = "key1";
String key2 = "key2";
String key3 = "key3";
// Mock JedisClusterCRC16.getSlot to return interleaved slots: [slotA, slotB, slotA]
try (org.mockito.MockedStatic<JedisClusterCRC16> mockedStatic = Mockito
.mockStatic(JedisClusterCRC16.class)) {
mockedStatic.when(() -> JedisClusterCRC16.getSlot(key1)).thenReturn(100);
mockedStatic.when(() -> JedisClusterCRC16.getSlot(key2)).thenReturn(200);
mockedStatic.when(() -> JedisClusterCRC16.getSlot(key3)).thenReturn(100);
ClusterCommandObjects commandObjects = new ClusterCommandObjects();
// The fix should create 3 separate commands (not 2) to preserve order
List<CommandObject<List<String>>> mgetCommands = commandObjects.mgetMultiShard(key1, key2,
key3);
assertEquals(3, mgetCommands.size(),
"Should have 3 separate commands for interleaved slots [A, B, A], not 2");
// Execute MGET with the standard executeMultiShardCommand
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10,
Duration.ZERO, StaticCommandFlagsRegistry.registry()) {
@Override
@SuppressWarnings("unchecked")
public <T> T execute(Connection conn, CommandObject<T> commandObject) {
List<String> values = new ArrayList<>();
CommandArguments args = commandObject.getArguments();
boolean isFirst = true;
for (redis.clients.jedis.args.Rawable arg : args) {
if (isFirst) {
isFirst = false;
continue;
}
String key = new String(arg.getRaw());
values.add("value_for_" + key);
}
return (T) values;
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// Use the standard executeMultiShardCommand method
List<String> result = testMe.executeMultiShardCommand(mgetCommands);
// Expected correct order based on input
List<String> expectedCorrectOrder = java.util.Arrays.asList("value_for_" + key1,
"value_for_" + key2, "value_for_" + key3);
// THE KEY ASSERTION: Values must be in the same order as input keys
assertEquals(expectedCorrectOrder, result,
"MGET multi-shard should return values in the same order as input keys. " + "Input keys: ["
+ key1 + ", " + key2 + ", " + key3 + "], slots: [100, 200, 100]");
}
}
/**
* This test verifies that when keys are sorted by hash slot (consecutive keys belong to the same
* slot), they are combined into a single command for optimal batching. For example, keys mapping
* to slots [A, A, B, B] should result in 2 commands (not 4), with keys grouped as:
* command1=[key1, key2], command2=[key3, key4].
*/
@Test
public void mgetMultiShardCombinesConsecutiveKeysWithSameSlotIntoOneCommand() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Connection connection = mock(Connection.class);
when(connectionHandler.getConnection(ArgumentMatchers.any(CommandArguments.class)))
.thenReturn(connection);
String key1 = "key1";
String key2 = "key2";
String key3 = "key3";
String key4 = "key4";
// Mock JedisClusterCRC16.getSlot to return sorted/grouped slots: [A, A, B, B]
try (org.mockito.MockedStatic<JedisClusterCRC16> mockedStatic = Mockito
.mockStatic(JedisClusterCRC16.class)) {
mockedStatic.when(() -> JedisClusterCRC16.getSlot(key1)).thenReturn(100);
mockedStatic.when(() -> JedisClusterCRC16.getSlot(key2)).thenReturn(100);
mockedStatic.when(() -> JedisClusterCRC16.getSlot(key3)).thenReturn(200);
mockedStatic.when(() -> JedisClusterCRC16.getSlot(key4)).thenReturn(200);
ClusterCommandObjects commandObjects = new ClusterCommandObjects();
// Consecutive keys with the same slot should be combined into one command
List<CommandObject<List<String>>> mgetCommands = commandObjects.mgetMultiShard(key1, key2,
key3, key4);
assertEquals(2, mgetCommands.size(),
"Should have 2 commands for sorted slots [A, A, B, B], not 4");
// Execute MGET with the standard executeMultiShardCommand
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10,
Duration.ZERO, StaticCommandFlagsRegistry.registry()) {
@Override
@SuppressWarnings("unchecked")
public <T> T execute(Connection conn, CommandObject<T> commandObject) {
List<String> values = new ArrayList<>();
CommandArguments args = commandObject.getArguments();
boolean isFirst = true;
for (redis.clients.jedis.args.Rawable arg : args) {
if (isFirst) {
isFirst = false;
continue;
}
String key = new String(arg.getRaw());
values.add("value_for_" + key);
}
return (T) values;
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// Use the standard executeMultiShardCommand method
List<String> result = testMe.executeMultiShardCommand(mgetCommands);
// Expected correct order based on input
List<String> expectedCorrectOrder = java.util.Arrays.asList("value_for_" + key1,
"value_for_" + key2, "value_for_" + key3, "value_for_" + key4);
// Verify values are in the correct order
assertEquals(expectedCorrectOrder, result,
"MGET multi-shard should return values in the same order as input keys");
}
}
/**
* This test verifies the fix for the race condition in RoundRobinConnectionResolver where the
* round-robin counter could cause IndexOutOfBoundsException on topology change.
* <p>
* The bug occurred because: 1. Thread A with a 5-node list sets counter to 4 (valid for its list)
* 2. Thread B with a 3-node list (after topology change) reads counter value 4 3. Thread B uses 4
* as index into a 3-element list -> IndexOutOfBoundsException
* <p>
* The fix applies modulo with the current list size after reading the counter, ensuring the index
* is always valid for the current thread's node list.
*/
@Test
public void runKeylessCommandDoesNotThrowIndexOutOfBoundsOnTopologyChange() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
// Create initial larger connection map (5 nodes)
Map<String, ConnectionPool> largeConnectionMap = new java.util.LinkedHashMap<>();
ConnectionPool pool1 = mock(ConnectionPool.class);
ConnectionPool pool2 = mock(ConnectionPool.class);
ConnectionPool pool3 = mock(ConnectionPool.class);
ConnectionPool pool4 = mock(ConnectionPool.class);
ConnectionPool pool5 = mock(ConnectionPool.class);
Connection connection1 = mock(Connection.class);
Connection connection2 = mock(Connection.class);
Connection connection3 = mock(Connection.class);
Connection connection4 = mock(Connection.class);
Connection connection5 = mock(Connection.class);
largeConnectionMap.put("node1:6379", pool1);
largeConnectionMap.put("node2:6379", pool2);
largeConnectionMap.put("node3:6379", pool3);
largeConnectionMap.put("node4:6379", pool4);
largeConnectionMap.put("node5:6379", pool5);
when(pool1.getResource()).thenReturn(connection1);
when(pool2.getResource()).thenReturn(connection2);
when(pool3.getResource()).thenReturn(connection3);
when(pool4.getResource()).thenReturn(connection4);
when(pool5.getResource()).thenReturn(connection5);
// Create smaller connection map (2 nodes) - simulates topology change
Map<String, ConnectionPool> smallConnectionMap = new java.util.LinkedHashMap<>();
smallConnectionMap.put("node1:6379", pool1);
smallConnectionMap.put("node2:6379", pool2);
// Start with large map, then switch to small map
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(largeConnectionMap)
.thenReturn(largeConnectionMap).thenReturn(largeConnectionMap)
.thenReturn(largeConnectionMap)
// After 4 calls, switch to smaller topology
.thenReturn(smallConnectionMap).thenReturn(smallConnectionMap)
.thenReturn(smallConnectionMap).thenReturn(smallConnectionMap)
.thenReturn(smallConnectionMap);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
return (T) "OK";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// Execute with large topology (5 nodes) - this advances the round-robin counter
for (int i = 0; i < 4; i++) {
String result = invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT);
assertEquals("OK", result);
}
// At this point, the internal counter could be at value 4 (pointing to 5th node)
// Now topology changes to only 2 nodes
// Execute with small topology (2 nodes) - should NOT throw IndexOutOfBoundsException
// This is the key assertion: the fix ensures modulo is applied with current list size
for (int i = 0; i < 5; i++) {
String result = invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT);
assertEquals("OK", result, "Should succeed even when counter value exceeds new list size");
}
// Verify both maps were used
verify(connectionHandler, times(9)).getPrimaryNodesConnectionMap();
}
/**
* This test verifies the round-robin counter handles integer overflow gracefully. When the
* counter reaches Integer.MAX_VALUE, the next increment wraps to Integer.MIN_VALUE. The fix uses
* Math.abs to ensure the modulo result is always non-negative.
*/
@Test
public void runKeylessCommandHandlesCounterOverflowGracefully() {
ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
Map<String, ConnectionPool> connectionMap = new java.util.LinkedHashMap<>();
ConnectionPool pool = mock(ConnectionPool.class);
Connection connection = mock(Connection.class);
connectionMap.put("node1:6379", pool);
connectionMap.put("node2:6379", pool);
connectionMap.put("node3:6379", pool);
when(connectionHandler.getPrimaryNodesConnectionMap()).thenReturn(connectionMap);
when(pool.getResource()).thenReturn(connection);
ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO,
StaticCommandFlagsRegistry.registry()) {
@Override
public <T> T execute(Connection connection, CommandObject<T> commandObject) {
return (T) "OK";
}
@Override
protected void sleep(long ignored) {
throw new RuntimeException("This test should never sleep");
}
};
// Get the roundRobinConnectionResolver field and set its counter to near MAX_VALUE
ConnectionResolver roundRobinResolver = ReflectionTestUtil.getField(testMe,
"roundRobinConnectionResolver");
java.util.concurrent.atomic.AtomicInteger counter = ReflectionTestUtil
.getField(roundRobinResolver, "roundRobinCounter");
// Set counter to MAX_VALUE - 1, so next calls will overflow
counter.set(Integer.MAX_VALUE - 1);
// Execute several commands - should NOT throw any exception even when counter overflows
for (int i = 0; i < 5; i++) {
String result = invokeExecuteKeylessCommand(testMe, KEYLESS_WRITE_COM_OBJECT);
assertEquals("OK", result, "Should handle counter overflow gracefully");
}
}
}