1"""
2Async Redis Keyspace Notifications support for redis-py.
3
4This module provides async utilities for subscribing to and parsing Redis
5keyspace notifications.
6
7Standalone Redis Example:
8 >>> from redis.asyncio import Redis
9 >>> from redis.asyncio.keyspace_notifications import (
10 ... AsyncKeyspaceNotifications,
11 ... )
12 >>> from redis.keyspace_notifications import KeyspaceChannel, EventType
13 >>>
14 >>> async def main():
15 ... async with Redis() as r:
16 ... async with AsyncKeyspaceNotifications(r) as ksn:
17 ... channel = KeyspaceChannel("user:*")
18 ... await ksn.subscribe(channel)
19 ... async for notification in ksn.listen():
20 ... print(f"Key: {notification.key}, Event: {notification.event_type}")
21
22Redis Cluster Example:
23 >>> from redis.asyncio.cluster import RedisCluster
24 >>> from redis.asyncio.keyspace_notifications import (
25 ... AsyncClusterKeyspaceNotifications,
26 ... )
27 >>> from redis.keyspace_notifications import KeyspaceChannel, EventType
28 >>>
29 >>> async def main():
30 ... async with RedisCluster(host="localhost", port=7000) as rc:
31 ... async with AsyncClusterKeyspaceNotifications(rc) as ksn:
32 ... channel = KeyspaceChannel("user:*")
33 ... await ksn.subscribe(channel)
34 ... async for notification in ksn.listen():
35 ... print(f"Key: {notification.key}, Event: {notification.event_type}")
36"""
37
38from __future__ import annotations
39
40import asyncio
41import inspect
42import logging
43import time
44from abc import ABC, abstractmethod
45from collections.abc import AsyncIterator, Awaitable, Callable
46from typing import Any
47
48from redis.asyncio.client import PubSub, Redis
49from redis.asyncio.cluster import ClusterNode, RedisCluster, _ClusterNodePoolAdapter
50from redis.exceptions import (
51 ConnectionError,
52 RedisError,
53 TimeoutError,
54)
55from redis.keyspace_notifications import (
56 ChannelT,
57 KeyeventChannel,
58 KeyNotification,
59 KeyspaceChannel,
60 SubkeyeventChannel,
61 SubkeyspaceChannel,
62 SubkeyspaceeventChannel,
63 SubkeyspaceitemChannel,
64 _is_pattern,
65)
66from redis.utils import safe_str
67
68logger = logging.getLogger(__name__)
69
70
71# Type alias for handlers that can be sync or async
72AsyncHandlerT = Callable[[KeyNotification], None | Awaitable[None]]
73
74
75# =============================================================================
76# Async Interface for Keyspace Notifications
77# =============================================================================
78
79
80class AsyncKeyspaceNotificationsInterface(ABC):
81 """
82 Async interface for keyspace notification managers.
83
84 This interface provides a consistent async API for both standalone
85 (AsyncKeyspaceNotifications) and cluster (AsyncClusterKeyspaceNotifications)
86 implementations.
87 """
88
89 @abstractmethod
90 async def subscribe(
91 self,
92 *channels: ChannelT,
93 handler: AsyncHandlerT | None = None,
94 ):
95 """Subscribe to keyspace notification channels."""
96 pass
97
98 @abstractmethod
99 async def unsubscribe(self, *channels: ChannelT):
100 """Unsubscribe from keyspace notification channels."""
101 pass
102
103 @abstractmethod
104 async def subscribe_keyspace(
105 self,
106 key_or_pattern: str,
107 db: int = 0,
108 handler: AsyncHandlerT | None = None,
109 ):
110 """Subscribe to keyspace notifications for specific keys."""
111 pass
112
113 @abstractmethod
114 async def subscribe_keyevent(
115 self,
116 event: str,
117 db: int = 0,
118 handler: AsyncHandlerT | None = None,
119 ):
120 """Subscribe to keyevent notifications for specific event types."""
121 pass
122
123 @abstractmethod
124 async def subscribe_subkeyspace(
125 self,
126 key_or_pattern: str,
127 db: int = 0,
128 handler: AsyncHandlerT | None = None,
129 ):
130 """Subscribe to subkeyspace notifications for specific keys."""
131 pass
132
133 @abstractmethod
134 async def subscribe_subkeyevent(
135 self,
136 event: str,
137 db: int = 0,
138 handler: AsyncHandlerT | None = None,
139 ):
140 """Subscribe to subkeyevent notifications for specific event types."""
141 pass
142
143 @abstractmethod
144 async def subscribe_subkeyspaceitem(
145 self,
146 key_or_pattern: str,
147 subkey_or_pattern: str,
148 db: int = 0,
149 handler: AsyncHandlerT | None = None,
150 ):
151 """Subscribe to subkeyspaceitem notifications for a specific subkey."""
152 pass
153
154 @abstractmethod
155 async def subscribe_subkeyspaceevent(
156 self,
157 event: str,
158 key_or_pattern: str,
159 db: int = 0,
160 handler: AsyncHandlerT | None = None,
161 ):
162 """Subscribe to subkeyspaceevent notifications for an event on a key."""
163 pass
164
165 @abstractmethod
166 async def get_message(
167 self,
168 ignore_subscribe_messages: bool | None = None,
169 timeout: float = 0.0,
170 ) -> KeyNotification | None:
171 """Get the next keyspace notification if one is available."""
172 pass
173
174 @abstractmethod
175 def listen(self) -> AsyncIterator[KeyNotification]:
176 """Listen for keyspace notifications."""
177 pass
178
179 @abstractmethod
180 async def aclose(self):
181 """Close the notification manager and clean up resources."""
182 pass
183
184 @abstractmethod
185 async def __aenter__(self):
186 pass
187
188 @abstractmethod
189 async def __aexit__(self, _exc_type, _exc_val, _exc_tb):
190 pass
191
192 @property
193 @abstractmethod
194 def subscribed(self) -> bool:
195 """Check if there are any active subscriptions and not closed."""
196 pass
197
198 @abstractmethod
199 async def run(
200 self,
201 poll_timeout: float = 1.0,
202 exception_handler: Callable[
203 [BaseException, AsyncKeyspaceNotificationsInterface],
204 None | Awaitable[None],
205 ]
206 | None = None,
207 ) -> None:
208 """
209 Run the notification loop as a coroutine.
210
211 This is the async equivalent of run_in_thread() for sync notifications.
212 Use asyncio.create_task() to run in the background.
213
214 The exception_handler can be either a sync or async function.
215 """
216 pass
217
218
219# =============================================================================
220# Abstract Base Class for Async Keyspace Notifications
221# =============================================================================
222
223
224class AbstractAsyncKeyspaceNotifications(AsyncKeyspaceNotificationsInterface):
225 """
226 Abstract base class for async keyspace notification managers.
227
228 Provides shared implementation for subscribe/unsubscribe logic.
229 Subclasses must implement:
230 - _execute_subscribe: Execute the subscribe operation
231 - _execute_unsubscribe: Execute the unsubscribe operation
232 - get_message: Get the next notification
233 - listen: Async generator for notifications
234 - aclose: Clean up resources
235 """
236
237 def __init__(
238 self,
239 key_prefix: str | bytes | None = None,
240 ignore_subscribe_messages: bool = True,
241 ):
242 """
243 Initialize the base async keyspace notification manager.
244
245 Args:
246 key_prefix: Optional prefix to filter and strip from keys in notifications
247 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations
248 are not returned by get_message/listen
249 """
250 self.key_prefix = key_prefix
251 self.ignore_subscribe_messages = ignore_subscribe_messages
252 self._closed = False
253
254 async def subscribe(
255 self,
256 *channels: ChannelT,
257 handler: AsyncHandlerT | None = None,
258 ):
259 """
260 Subscribe to keyspace notification channels.
261
262 Automatically detects whether each channel is a pattern (contains
263 wildcards like *, ?, [) or an exact channel name and uses the
264 appropriate Redis subscribe command internally.
265
266 The handler can be either a sync or async function. Note that a
267 **sync** handler will be called directly on the event loop thread,
268 so it must not perform blocking I/O or long-running computation —
269 prefer an ``async`` handler whenever possible.
270 """
271 # Wrap the handler to convert raw messages to KeyNotification objects
272 wrapped_handler: Callable | None = None
273 if handler is not None:
274 key_prefix = self.key_prefix
275 is_async_handler = inspect.iscoroutinefunction(handler)
276
277 if is_async_handler:
278 # We've verified handler is async, so the result is awaitable
279 async_handler = handler
280
281 async def _async_wrap_handler(message):
282 notification = KeyNotification.from_message(
283 message, key_prefix=key_prefix
284 )
285 if notification is not None:
286 await async_handler(notification)
287
288 wrapped_handler = _async_wrap_handler
289 else:
290
291 def _sync_wrap_handler(message):
292 notification = KeyNotification.from_message(
293 message, key_prefix=key_prefix
294 )
295 if notification is not None:
296 handler(notification)
297
298 wrapped_handler = _sync_wrap_handler
299
300 patterns = {}
301 exact_channels = {}
302
303 for channel in channels:
304 if hasattr(channel, "_channel_str"):
305 channel_str = str(channel)
306 else:
307 channel_str = safe_str(channel)
308 if _is_pattern(channel):
309 patterns[channel_str] = wrapped_handler
310 else:
311 exact_channels[channel_str] = wrapped_handler
312
313 # Delegate to subclass implementation first. For standalone Redis
314 # this raises on failure, keeping tracking state clean. For cluster
315 # implementations the operation is best-effort (partial failures are
316 # logged, not raised) so tracking state is always updated afterwards.
317 await self._execute_subscribe(patterns, exact_channels)
318 self._track_subscribe(patterns, exact_channels)
319
320 @abstractmethod
321 async def _execute_subscribe(
322 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
323 ) -> None:
324 """Execute the subscribe operation."""
325 pass
326
327 async def unsubscribe(self, *channels: ChannelT):
328 """Unsubscribe from keyspace notification channels."""
329 patterns = []
330 exact_channels = []
331
332 for channel in channels:
333 if hasattr(channel, "_channel_str"):
334 channel_str = str(channel)
335 else:
336 channel_str = safe_str(channel)
337 if _is_pattern(channel):
338 patterns.append(channel_str)
339 else:
340 exact_channels.append(channel_str)
341
342 # Delegate to subclass implementation first. For standalone Redis
343 # this raises on failure, keeping tracking state intact. For cluster
344 # implementations the operation is best-effort (partial failures are
345 # logged, not raised) so tracking state is always removed afterwards
346 # — this is intentional: the user asked to unsubscribe, so
347 # refresh_subscriptions should not re-subscribe these channels.
348 await self._execute_unsubscribe(patterns, exact_channels)
349 self._untrack_subscribe(patterns, exact_channels)
350
351 @abstractmethod
352 async def _execute_unsubscribe(
353 self, patterns: list[str], exact_channels: list[str]
354 ) -> None:
355 """Execute the unsubscribe operation."""
356 pass
357
358 def _track_subscribe(
359 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
360 ) -> None:
361 """Track newly subscribed patterns/channels.
362
363 Override in subclasses that need to maintain their own subscription
364 registry (e.g. cluster implementations that must re-subscribe
365 new/failed-over nodes). The default is a no-op because standalone
366 implementations delegate tracking to the underlying PubSub object.
367 """
368
369 def _untrack_subscribe(
370 self, patterns: list[str], exact_channels: list[str]
371 ) -> None:
372 """Remove patterns/channels from the subscription registry.
373
374 Override in subclasses that maintain their own subscription registry.
375 The default is a no-op.
376 """
377
378 async def subscribe_keyspace(
379 self,
380 key_or_pattern: str,
381 db: int = 0,
382 handler: AsyncHandlerT | None = None,
383 ):
384 """Subscribe to keyspace notifications for specific keys."""
385 channel = KeyspaceChannel(key_or_pattern, db=db)
386 await self.subscribe(channel, handler=handler)
387
388 async def subscribe_keyevent(
389 self,
390 event: str,
391 db: int = 0,
392 handler: AsyncHandlerT | None = None,
393 ):
394 """Subscribe to keyevent notifications for specific event types."""
395 channel = KeyeventChannel(event, db=db)
396 await self.subscribe(channel, handler=handler)
397
398 async def subscribe_subkeyspace(
399 self,
400 key_or_pattern: str,
401 db: int = 0,
402 handler: AsyncHandlerT | None = None,
403 ):
404 """Subscribe to subkeyspace notifications for specific keys."""
405 channel = SubkeyspaceChannel(key_or_pattern, db=db)
406 await self.subscribe(channel, handler=handler)
407
408 async def subscribe_subkeyevent(
409 self,
410 event: str,
411 db: int = 0,
412 handler: AsyncHandlerT | None = None,
413 ):
414 """Subscribe to subkeyevent notifications for specific event types."""
415 channel = SubkeyeventChannel(event, db=db)
416 await self.subscribe(channel, handler=handler)
417
418 async def subscribe_subkeyspaceitem(
419 self,
420 key_or_pattern: str,
421 subkey_or_pattern: str,
422 db: int = 0,
423 handler: AsyncHandlerT | None = None,
424 ):
425 """Subscribe to subkeyspaceitem notifications for a specific subkey."""
426 channel = SubkeyspaceitemChannel(key_or_pattern, subkey_or_pattern, db=db)
427 await self.subscribe(channel, handler=handler)
428
429 async def subscribe_subkeyspaceevent(
430 self,
431 event: str,
432 key_or_pattern: str,
433 db: int = 0,
434 handler: AsyncHandlerT | None = None,
435 ):
436 """Subscribe to subkeyspaceevent notifications for an event on a key."""
437 channel = SubkeyspaceeventChannel(event, key_or_pattern, db=db)
438 await self.subscribe(channel, handler=handler)
439
440 async def __aenter__(self):
441 return self
442
443 async def __aexit__(self, _exc_type, _exc_val, _exc_tb):
444 await self.aclose()
445 return False
446
447 async def run(
448 self,
449 poll_timeout: float = 1.0,
450 exception_handler: Callable[
451 [BaseException, AsyncKeyspaceNotificationsInterface],
452 None | Awaitable[None],
453 ]
454 | None = None,
455 ) -> None:
456 """
457 Run the notification loop as a coroutine.
458
459 This continuously polls for notifications and triggers handlers.
460 Use asyncio.create_task() to run in the background.
461
462 Args:
463 poll_timeout: Timeout in seconds for each get_message call.
464 exception_handler: Optional callback for handling exceptions.
465 Can be sync or async.
466 """
467 while self.subscribed:
468 try:
469 await self.get_message(timeout=poll_timeout)
470 except asyncio.CancelledError:
471 raise
472 except BaseException as e:
473 if exception_handler is not None:
474 result = exception_handler(e, self)
475 if inspect.isawaitable(result):
476 await result
477 else:
478 raise
479
480
481# =============================================================================
482# Standalone Async Keyspace Notification Manager
483# =============================================================================
484
485
486class AsyncKeyspaceNotifications(AbstractAsyncKeyspaceNotifications):
487 """
488 Manages keyspace notification subscriptions for standalone async Redis.
489
490 For standalone Redis, keyspace notifications work with a single PubSub
491 connection. This class wraps that connection and provides:
492 - Automatic pattern vs exact channel detection
493 - KeyNotification parsing with optional key_prefix filtering
494 - Convenience methods for keyspace and keyevent subscriptions
495 - Context manager and run() coroutine support
496 """
497
498 def __init__(
499 self,
500 redis_client: Redis,
501 key_prefix: str | bytes | None = None,
502 ignore_subscribe_messages: bool = True,
503 ):
504 """
505 Initialize the standalone async keyspace notification manager.
506
507 Note: Keyspace notifications must be enabled on the Redis server via
508 the ``notify-keyspace-events`` configuration option.
509
510 Args:
511 redis_client: An async Redis client instance
512 key_prefix: Optional prefix to filter and strip from keys in notifications
513 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations
514 are not returned by get_message/listen
515 """
516 super().__init__(key_prefix, ignore_subscribe_messages)
517 self.redis = redis_client
518 # Create PubSub with ignore_subscribe_messages=False so per-call arg works
519 self._pubsub: PubSub = redis_client.pubsub(ignore_subscribe_messages=False)
520
521 async def _execute_subscribe(
522 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
523 ) -> None:
524 """Execute subscribe on the single pubsub connection."""
525 if patterns:
526 await self._pubsub.psubscribe(**patterns)
527 if exact_channels:
528 await self._pubsub.subscribe(**exact_channels)
529
530 async def _execute_unsubscribe(
531 self, patterns: list[str], exact_channels: list[str]
532 ) -> None:
533 """Execute unsubscribe on the single pubsub connection."""
534 if patterns:
535 await self._pubsub.punsubscribe(*patterns)
536 if exact_channels:
537 await self._pubsub.unsubscribe(*exact_channels)
538
539 async def get_message(
540 self,
541 ignore_subscribe_messages: bool | None = None,
542 timeout: float = 0.0,
543 ) -> KeyNotification | None:
544 """
545 Get the next keyspace notification if one is available.
546
547 Args:
548 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages.
549 Defaults to the value set in __init__ (True).
550 timeout: Time to wait for a message.
551
552 Returns:
553 A KeyNotification if a notification is available and no handler
554 was registered for the channel, None otherwise.
555 """
556 if ignore_subscribe_messages is None:
557 ignore_subscribe_messages = self.ignore_subscribe_messages
558
559 if self._closed:
560 return None
561
562 message = await self._pubsub.get_message(
563 ignore_subscribe_messages=ignore_subscribe_messages,
564 timeout=timeout,
565 )
566
567 if message is not None:
568 return KeyNotification.from_message(message, key_prefix=self.key_prefix)
569
570 return None
571
572 async def listen(self) -> AsyncIterator[KeyNotification]:
573 """
574 Listen for keyspace notifications.
575
576 This is an async generator that yields KeyNotification objects as they arrive.
577
578 Yields:
579 KeyNotification objects for each keyspace/keyevent notification.
580
581 Example:
582 >>> async for notification in ksn.listen():
583 ... print(f"{notification.key}: {notification.event_type}")
584 """
585 while self.subscribed:
586 notification = await self.get_message(timeout=1.0)
587 if notification is not None:
588 yield notification
589
590 @property
591 def subscribed(self) -> bool:
592 """Check if there are any active subscriptions and not closed."""
593 return not self._closed and self._pubsub.subscribed
594
595 async def aclose(self):
596 """Close the pubsub connection and clean up resources."""
597 self._closed = True
598 try:
599 await self._pubsub.aclose()
600 except Exception:
601 pass
602
603
604# =============================================================================
605# Cluster-Aware Async Keyspace Notification Manager
606# =============================================================================
607
608
609class AsyncClusterKeyspaceNotifications(AbstractAsyncKeyspaceNotifications):
610 """
611 Manages keyspace notification subscriptions across all nodes in an async Redis Cluster.
612
613 In Redis Cluster, keyspace notifications are NOT broadcast between nodes.
614 Each node only emits notifications for keys it owns. This class automatically
615 subscribes to all primary nodes in the cluster and handles topology changes.
616 """
617
618 def __init__(
619 self,
620 redis_cluster: RedisCluster,
621 key_prefix: str | bytes | None = None,
622 ignore_subscribe_messages: bool = True,
623 ):
624 """
625 Initialize the async cluster keyspace notification manager.
626
627 Note: Keyspace notifications must be enabled on all Redis cluster nodes via
628 the ``notify-keyspace-events`` configuration option.
629
630 Args:
631 redis_cluster: An async RedisCluster instance
632 key_prefix: Optional prefix to filter and strip from keys in notifications
633 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations
634 are not returned by get_message/listen
635 """
636 super().__init__(key_prefix, ignore_subscribe_messages)
637 self.cluster = redis_cluster
638
639 # Canonical subscription registry: pattern/channel -> wrapped handler.
640 # In cluster mode there are multiple PubSub objects (one per node), so
641 # this is the single source of truth used to (re-)subscribe new or
642 # failed-over nodes.
643 self._subscribed_patterns: dict[str, Any] = {}
644 self._subscribed_channels: dict[str, Any] = {}
645
646 # Track subscriptions per node
647 self._node_pubsubs: dict[str, PubSub] = {}
648
649 # Lock for topology refresh operations
650 self._refresh_lock = asyncio.Lock()
651
652 # Current pubsub index for round-robin polling
653 self._poll_index = 0
654
655 @property
656 def subscribed(self) -> bool:
657 """Check if there are any active subscriptions and not closed."""
658 return not self._closed and bool(
659 self._subscribed_patterns or self._subscribed_channels
660 )
661
662 def _track_subscribe(
663 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
664 ) -> None:
665 """Track newly subscribed patterns/channels in the cluster registry."""
666 if patterns:
667 self._subscribed_patterns.update(patterns)
668 if exact_channels:
669 self._subscribed_channels.update(exact_channels)
670
671 def _untrack_subscribe(
672 self, patterns: list[str], exact_channels: list[str]
673 ) -> None:
674 """Remove patterns/channels from the cluster registry."""
675 for p in patterns:
676 self._subscribed_patterns.pop(p, None)
677 for c in exact_channels:
678 self._subscribed_channels.pop(c, None)
679
680 def _get_all_primary_nodes(self) -> list[ClusterNode]:
681 """Get all primary nodes in the cluster."""
682 return self.cluster.get_primaries()
683
684 async def _ensure_node_pubsub(self, node: ClusterNode) -> PubSub:
685 """Get or create a PubSub instance for a node.
686
687 Uses a :class:`_ClusterNodePoolAdapter` to borrow a connection
688 from the node's existing pool. When the ``PubSub`` is closed
689 the connection is disconnected and returned to the node,
690 ensuring no subscribed socket is left in the free queue.
691 """
692 if node.name not in self._node_pubsubs:
693 pool_adapter = _ClusterNodePoolAdapter(node)
694 pubsub = PubSub(
695 connection_pool=pool_adapter, # type: ignore[arg-type]
696 ignore_subscribe_messages=False,
697 )
698 self._node_pubsubs[node.name] = pubsub
699 return self._node_pubsubs[node.name]
700
701 async def _cleanup_node(self, node_name: str) -> None:
702 """Remove and close a node's PubSub.
703
704 ``PubSub.aclose()`` disconnects the connection and releases it
705 back to the underlying :class:`ClusterNode` via the adapter.
706 """
707 pubsub = self._node_pubsubs.pop(node_name, None)
708 if pubsub:
709 try:
710 await pubsub.aclose()
711 except Exception:
712 pass
713
714 async def _execute_subscribe(
715 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
716 ) -> None:
717 """Execute subscribe on all cluster nodes.
718
719 Patterns and exact channels are subscribed in a single pass over
720 nodes so that a mid-batch node failure cannot create a
721 partially-caught-up replacement. If a node fails during this
722 call it is removed from ``_node_pubsubs`` and will be fully
723 re-subscribed on the next ``refresh_subscriptions`` cycle.
724
725 If a newly discovered node is encountered (not yet in
726 ``_node_pubsubs``), it is also subscribed to all *previously*
727 tracked patterns/channels so it doesn't miss notifications for
728 subscriptions that were established before this node joined.
729 """
730 if not patterns and not exact_channels:
731 return
732
733 failed_nodes: list[str] = []
734 for node in self._get_all_primary_nodes():
735 is_new_node = node.name not in self._node_pubsubs
736 pubsub = await self._ensure_node_pubsub(node)
737 try:
738 # If this is a brand-new node, catch it up on existing
739 # subscriptions before adding the new channels.
740 if is_new_node:
741 if self._subscribed_patterns:
742 await pubsub.psubscribe(**self._subscribed_patterns)
743 if self._subscribed_channels:
744 await pubsub.subscribe(**self._subscribed_channels)
745
746 if patterns:
747 await pubsub.psubscribe(**patterns)
748 if exact_channels:
749 await pubsub.subscribe(**exact_channels)
750 except Exception:
751 # Remove the broken pubsub and its connection pool
752 # so refresh_subscriptions can re-create both later.
753 await self._cleanup_node(node.name)
754 failed_nodes.append(node.name)
755
756 if failed_nodes:
757 logger.warning(
758 "Failed to subscribe on cluster nodes: %s. "
759 "These nodes will be retried on the next refresh cycle.",
760 ", ".join(failed_nodes),
761 )
762
763 async def _execute_unsubscribe(
764 self, patterns: list[str], exact_channels: list[str]
765 ) -> None:
766 """Execute unsubscribe on all cluster nodes."""
767 if patterns:
768 await self._unsubscribe_from_all_nodes(patterns, use_punsubscribe=True)
769 if exact_channels:
770 await self._unsubscribe_from_all_nodes(
771 exact_channels, use_punsubscribe=False
772 )
773
774 async def _unsubscribe_from_all_nodes(
775 self, channels: list[str], use_punsubscribe: bool
776 ):
777 """Unsubscribe from patterns/channels on all nodes.
778
779 Best-effort: tries every node so that a single broken connection
780 does not prevent the remaining nodes from being unsubscribed.
781 Broken pubsubs are cleaned up; the tracking state is still removed
782 by the caller, so ``refresh_subscriptions`` will *not* re-subscribe
783 these channels on replacement nodes.
784 """
785 failed_nodes: list[str] = []
786 for node_name in list(self._node_pubsubs.keys()):
787 pubsub = self._node_pubsubs.get(node_name)
788 if pubsub is None:
789 continue
790 try:
791 if use_punsubscribe:
792 await pubsub.punsubscribe(*channels)
793 else:
794 await pubsub.unsubscribe(*channels)
795 except Exception:
796 await self._cleanup_node(node_name)
797 failed_nodes.append(node_name)
798
799 if failed_nodes:
800 logger.warning(
801 "Failed to unsubscribe on cluster nodes: %s. "
802 "These nodes will be re-created on the next refresh cycle.",
803 ", ".join(failed_nodes),
804 )
805
806 async def get_message(
807 self,
808 ignore_subscribe_messages: bool | None = None,
809 timeout: float = 0.0,
810 ) -> KeyNotification | None:
811 """
812 Get the next keyspace notification if one is available.
813
814 This method polls all node pubsubs in round-robin fashion until
815 a message is received or the timeout expires.
816 If a connection error occurs, subscriptions are automatically refreshed.
817
818 Args:
819 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages.
820 Defaults to the value set in __init__ (True).
821 timeout: Total time to wait for a message (distributed across all nodes)
822
823 Returns:
824 A KeyNotification if a notification is available, None otherwise.
825 """
826 if self._closed:
827 return None
828
829 total_nodes = len(self._node_pubsubs)
830 if total_nodes == 0:
831 # Sleep for the requested timeout so callers that loop
832 # (run(), listen) don't spin the CPU when all node
833 # connections have been cleaned up.
834 if timeout > 0:
835 await asyncio.sleep(timeout)
836 return None
837
838 if ignore_subscribe_messages is None:
839 ignore_subscribe_messages = self.ignore_subscribe_messages
840
841 # Handle timeout=0 as a single non-blocking poll over all pubsubs
842 if timeout == 0.0:
843 return await self._poll_all_nodes_once(ignore_subscribe_messages)
844
845 # Calculate per-node timeout for each poll
846 per_node_timeout = min(0.1, timeout / max(total_nodes, 1))
847
848 start_time = time.monotonic()
849 end_time = start_time + timeout
850
851 while True:
852 if time.monotonic() >= end_time:
853 return None
854
855 pubsubs = list(self._node_pubsubs.values())
856 if not pubsubs:
857 return None
858
859 # Round-robin polling
860 self._poll_index = self._poll_index % len(pubsubs)
861 pubsub = pubsubs[self._poll_index]
862 self._poll_index += 1
863
864 try:
865 message = await pubsub.get_message(
866 ignore_subscribe_messages=ignore_subscribe_messages,
867 timeout=per_node_timeout,
868 )
869 except (ConnectionError, TimeoutError, RedisError):
870 await self._refresh_subscriptions_on_error()
871 continue
872
873 if message is not None:
874 notification = KeyNotification.from_message(
875 message, key_prefix=self.key_prefix
876 )
877 if notification is not None:
878 return notification
879
880 async def _poll_all_nodes_once(
881 self, ignore_subscribe_messages: bool
882 ) -> KeyNotification | None:
883 """
884 Perform a single non-blocking poll over all node pubsubs.
885
886 Returns:
887 A KeyNotification if one is available, None otherwise.
888 """
889 had_error = False
890 for pubsub in list(self._node_pubsubs.values()):
891 try:
892 message = await pubsub.get_message(
893 ignore_subscribe_messages=ignore_subscribe_messages,
894 timeout=0.0,
895 )
896 except (ConnectionError, TimeoutError, RedisError):
897 # Record the error but continue polling remaining healthy
898 # nodes so that already-buffered notifications are not lost.
899 had_error = True
900 continue
901
902 if message is not None:
903 notification = KeyNotification.from_message(
904 message, key_prefix=self.key_prefix
905 )
906 if notification is not None:
907 # Refresh before returning if any node had an error,
908 # so the next poll cycle has fresh state.
909 if had_error:
910 await self._refresh_subscriptions_on_error()
911 return notification
912
913 # Refresh after polling all nodes if any had errors
914 if had_error:
915 await self._refresh_subscriptions_on_error()
916 return None
917
918 async def listen(self) -> AsyncIterator[KeyNotification]:
919 """
920 Listen for keyspace notifications from all cluster nodes.
921
922 This is an async generator that yields KeyNotification objects as they arrive.
923
924 Yields:
925 KeyNotification objects for each keyspace/keyevent notification.
926
927 Example:
928 >>> async for notification in ksn.listen():
929 ... print(f"{notification.key}: {notification.event_type}")
930 """
931 while self.subscribed:
932 notification = await self.get_message(timeout=1.0)
933 if notification is not None:
934 yield notification
935
936 async def _refresh_subscriptions_on_error(self):
937 """
938 Refresh subscriptions after a connection error.
939
940 This is called automatically when a connection error occurs during
941 get_message(). It checks if nodes changed before refreshing.
942 """
943 self._poll_index = 0 # Reset round-robin index
944
945 try:
946 await self.refresh_subscriptions()
947 except Exception:
948 logger.warning(
949 "Failed to refresh cluster subscriptions, will retry on next error",
950 exc_info=True,
951 )
952
953 def _is_pubsub_connected(self, pubsub) -> bool:
954 """Check if a pubsub connection is still alive."""
955 try:
956 conn = pubsub.connection
957 if conn is None:
958 return False
959 # Async connections use is_connected property
960 return conn.is_connected
961 except Exception:
962 return False
963
964 async def refresh_subscriptions(self):
965 """
966 Refresh subscriptions after a topology change.
967
968 This method is called automatically when topology changes are detected
969 or when connection errors occur. You can also call it manually if needed.
970
971 This method:
972 1. Discovers any new primary nodes and subscribes them
973 2. Removes pubsubs for nodes that are no longer primaries
974 3. Re-creates broken pubsub connections for existing nodes
975 """
976 async with self._refresh_lock:
977 current_primaries = {
978 node.name: node for node in self._get_all_primary_nodes()
979 }
980
981 # Remove pubsubs for nodes that are no longer primaries
982 removed_nodes = set(self._node_pubsubs.keys()) - set(
983 current_primaries.keys()
984 )
985 for node_name in removed_nodes:
986 await self._cleanup_node(node_name)
987
988 # Detect broken connections for existing nodes and remove them
989 # so they get re-created below
990 existing_nodes = set(self._node_pubsubs.keys()) & set(
991 current_primaries.keys()
992 )
993 for node_name in existing_nodes:
994 pubsub = self._node_pubsubs.get(node_name)
995 if pubsub and not self._is_pubsub_connected(pubsub):
996 # Connection is broken, remove it so it gets re-created
997 await self._cleanup_node(node_name)
998
999 # Subscribe new nodes (and nodes with broken connections)
1000 # to existing patterns/channels
1001 new_nodes = set(current_primaries.keys()) - set(self._node_pubsubs.keys())
1002 failed_nodes: list[str] = []
1003 for node_name in new_nodes:
1004 node = current_primaries[node_name]
1005 pubsub = await self._ensure_node_pubsub(node)
1006
1007 try:
1008 if self._subscribed_patterns:
1009 await pubsub.psubscribe(**self._subscribed_patterns)
1010 if self._subscribed_channels:
1011 await pubsub.subscribe(**self._subscribed_channels)
1012 except Exception:
1013 # Subscription failed - remove from dict so retry is possible
1014 await self._cleanup_node(node_name)
1015 failed_nodes.append(node_name)
1016
1017 # Raise after attempting all nodes so we don't skip any
1018 if failed_nodes:
1019 raise ConnectionError(
1020 f"Failed to subscribe to cluster nodes: {', '.join(failed_nodes)}"
1021 )
1022
1023 async def aclose(self):
1024 """Close all pubsub connections and clean up resources."""
1025 self._closed = True
1026 for node_name in list(self._node_pubsubs.keys()):
1027 await self._cleanup_node(node_name)
1028 self._subscribed_patterns.clear()
1029 self._subscribed_channels.clear()