CacheConnectionMockTest.java
package redis.clients.jedis.csc;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.util.Arrays;
import java.util.List;
import redis.clients.jedis.util.SafeEncoder;
import java.io.IOException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.ConnectionTestHelper;
import redis.clients.jedis.DefaultJedisClientConfig;
import redis.clients.jedis.DefaultJedisSocketFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.PushConsumer;
import redis.clients.jedis.PushConsumerChainImpl;
import redis.clients.jedis.util.server.TcpMockServer;
/**
* Unit tests for CacheConnection that don't require a real Redis server. Uses TcpMockServer to
* simulate Redis protocol.
* <p>
* These tests verify CacheConnection-specific behavior (PushInvalidateConsumer registration).
* MaintenanceEventConsumer registration is tested in ConnectionMockTest.
* </p>
*/
public class CacheConnectionMockTest {
private TcpMockServer mockServer;
private Cache cache;
@BeforeEach
public void setUp() throws IOException {
mockServer = new TcpMockServer();
mockServer.start();
cache = CacheFactory.getCache(CacheConfig.builder().build());
}
@AfterEach
public void tearDown() throws IOException {
if (mockServer != null) {
mockServer.stop();
}
}
@Nested
class PushInvalidateConsumerTests {
@Test
public void pushInvalidateConsumerRegisteredWithConfigConstructor() {
DefaultJedisClientConfig config = DefaultJedisClientConfig.builder().resp3().build();
HostAndPort hostAndPort = new HostAndPort("localhost", mockServer.getPort());
DefaultJedisSocketFactory socketFactory = new DefaultJedisSocketFactory(hostAndPort, config);
CacheConnection conn = new CacheConnection(socketFactory, config, cache);
List<PushConsumer> consumers = ConnectionTestHelper.getPushConsumers(conn);
// Verify PushInvalidateConsumer is registered
assertThat(consumers, contains(is(PushConsumerChainImpl.PUBSUB_CONSUMER),
instanceOf(PushInvalidateConsumer.class)));
}
@Test
public void pushInvalidateConsumerRegisteredWithBuilder() {
DefaultJedisClientConfig config = DefaultJedisClientConfig.builder().resp3().build();
HostAndPort hostAndPort = new HostAndPort("localhost", mockServer.getPort());
DefaultJedisSocketFactory socketFactory = new DefaultJedisSocketFactory(hostAndPort, config);
CacheConnection conn = (CacheConnection) CacheConnection.builder(cache)
.socketFactory(socketFactory).clientConfig(config).build();
List<PushConsumer> consumers = ConnectionTestHelper.getPushConsumers(conn);
// Verify PushInvalidateConsumer is registered
assertThat(consumers, contains(is(PushConsumerChainImpl.PUBSUB_CONSUMER),
instanceOf(PushInvalidateConsumer.class)));
}
@Test
public void arbitraryPushNotificationDoesNotBreakConnection() {
DefaultJedisClientConfig config = DefaultJedisClientConfig.builder().resp3().build();
HostAndPort hostAndPort = new HostAndPort("localhost", mockServer.getPort());
DefaultJedisSocketFactory socketFactory = new DefaultJedisSocketFactory(hostAndPort, config);
try (CacheConnection conn = new CacheConnection(socketFactory, config, cache)) {
// Send arbitrary push notification (not "invalidate")
mockServer.sendPushMessageToAll("ARBITRARY_PUSH", "arg1", "arg2");
// Execute command after receiving arbitrary push notification
// If push notification handling is broken, this will throw an exception
assertDoesNotThrow(() -> conn.ping(),
"PING after arbitrary push notification should not throw exception");
assertTrue(conn.ping(), "PING should succeed");
// Verify connection is still healthy
assertFalse(conn.isBroken(), "Connection should not be broken");
assertTrue(conn.isConnected(), "Connection should still be connected");
}
}
@Test
public void invalidatePushMessageInvalidatesCacheForRedisKeys() {
DefaultJedisClientConfig config = DefaultJedisClientConfig.builder().resp3().build();
HostAndPort hostAndPort = new HostAndPort("localhost", mockServer.getPort());
DefaultJedisSocketFactory socketFactory = new DefaultJedisSocketFactory(hostAndPort, config);
// Create a spy of the cache to verify method invocations
Cache cacheSpy = spy(cache);
try (CacheConnection conn = new CacheConnection(socketFactory, config, cacheSpy)) {
// Send invalidate push notification with a single key
// RESP3 format: >2\r\n$10\r\ninvalidate\r\n*1\r\n$8\r\ntestkey1\r\n
// This represents: ["invalidate", ["testkey1"]]
String invalidateMessage = ">2\r\n$10\r\ninvalidate\r\n*1\r\n$8\r\ntestkey1\r\n";
mockServer.sendRawPushMessageToAll(invalidateMessage);
// Execute a command to trigger processing of the invalidate push message
assertDoesNotThrow(() -> conn.ping(),
"PING after invalidate push should not throw exception");
// Verify that cache.deleteByRedisKeys was invoked with the correct key
verify(cacheSpy).deleteByRedisKeys(argThat(keys -> keys != null && keys.size() == 1
&& Arrays.equals(SafeEncoder.encode("testkey1"), (byte[]) keys.get(0))));
}
}
}
}