1"""
2Async Redis Keyspace Notifications support for redis-py.
3
4This module provides async utilities for subscribing to and parsing Redis
5keyspace notifications.
6
7Standalone Redis Example:
8 >>> from redis.asyncio import Redis
9 >>> from redis.asyncio.keyspace_notifications import (
10 ... AsyncKeyspaceNotifications,
11 ... )
12 >>> from redis.keyspace_notifications import KeyspaceChannel, EventType
13 >>>
14 >>> async def main():
15 ... async with Redis() as r:
16 ... async with AsyncKeyspaceNotifications(r) as ksn:
17 ... channel = KeyspaceChannel("user:*")
18 ... await ksn.subscribe(channel)
19 ... async for notification in ksn.listen():
20 ... print(f"Key: {notification.key}, Event: {notification.event_type}")
21
22Redis Cluster Example:
23 >>> from redis.asyncio.cluster import RedisCluster
24 >>> from redis.asyncio.keyspace_notifications import (
25 ... AsyncClusterKeyspaceNotifications,
26 ... )
27 >>> from redis.keyspace_notifications import KeyspaceChannel, EventType
28 >>>
29 >>> async def main():
30 ... async with RedisCluster(host="localhost", port=7000) as rc:
31 ... async with AsyncClusterKeyspaceNotifications(rc) as ksn:
32 ... channel = KeyspaceChannel("user:*")
33 ... await ksn.subscribe(channel)
34 ... async for notification in ksn.listen():
35 ... print(f"Key: {notification.key}, Event: {notification.event_type}")
36"""
37
38from __future__ import annotations
39
40import asyncio
41import inspect
42import logging
43import time
44from abc import ABC, abstractmethod
45from collections.abc import AsyncIterator, Awaitable, Callable
46from typing import Any
47
48from redis._parsers.encoders import Encoder
49from redis.asyncio.client import PubSub, Redis
50from redis.asyncio.cluster import ClusterNode, RedisCluster
51from redis.exceptions import (
52 ConnectionError,
53 RedisError,
54 TimeoutError,
55)
56from redis.keyspace_notifications import (
57 ChannelT,
58 KeyeventChannel,
59 KeyNotification,
60 KeyspaceChannel,
61 _is_pattern,
62)
63from redis.utils import safe_str
64
65logger = logging.getLogger(__name__)
66
67
68class _ClusterNodePoolAdapter:
69 """Thin adapter exposing the :class:`ConnectionPool` interface that
70 :class:`PubSub` requires, backed by a :class:`ClusterNode`'s own
71 connection pool.
72
73 Connections are acquired from the node via
74 :meth:`ClusterNode.acquire_connection` and returned via
75 :meth:`ClusterNode.release`. :meth:`PubSub.aclose` already
76 disconnects the connection *before* calling :meth:`release`, so the
77 connection is returned to the node's free-queue in a disconnected
78 state — guaranteeing that a subscribed socket is never silently
79 reused for regular commands.
80 """
81
82 def __init__(self, node: ClusterNode) -> None:
83 self._node = node
84 self.connection_kwargs = node.connection_kwargs
85
86 # -- methods used by PubSub ------------------------------------------------
87
88 def get_encoder(self) -> Encoder:
89 return self._node.get_encoder()
90
91 async def get_connection(
92 self, command_name: str | None = None, *keys: Any, **options: Any
93 ) -> Any:
94 connection = self._node.acquire_connection()
95 await connection.connect()
96 return connection
97
98 async def release(self, connection: Any) -> None:
99 # PubSub.aclose() disconnects the connection before calling
100 # release(), so it is safe to put it back in the node's free
101 # queue – it will reconnect lazily on next use.
102 self._node.release(connection)
103
104
105# Type alias for handlers that can be sync or async
106AsyncHandlerT = Callable[[KeyNotification], None | Awaitable[None]]
107
108
109# =============================================================================
110# Async Interface for Keyspace Notifications
111# =============================================================================
112
113
114class AsyncKeyspaceNotificationsInterface(ABC):
115 """
116 Async interface for keyspace notification managers.
117
118 This interface provides a consistent async API for both standalone
119 (AsyncKeyspaceNotifications) and cluster (AsyncClusterKeyspaceNotifications)
120 implementations.
121 """
122
123 @abstractmethod
124 async def subscribe(
125 self,
126 *channels: ChannelT,
127 handler: AsyncHandlerT | None = None,
128 ):
129 """Subscribe to keyspace notification channels."""
130 pass
131
132 @abstractmethod
133 async def unsubscribe(self, *channels: ChannelT):
134 """Unsubscribe from keyspace notification channels."""
135 pass
136
137 @abstractmethod
138 async def subscribe_keyspace(
139 self,
140 key_or_pattern: str,
141 db: int = 0,
142 handler: AsyncHandlerT | None = None,
143 ):
144 """Subscribe to keyspace notifications for specific keys."""
145 pass
146
147 @abstractmethod
148 async def subscribe_keyevent(
149 self,
150 event: str,
151 db: int = 0,
152 handler: AsyncHandlerT | None = None,
153 ):
154 """Subscribe to keyevent notifications for specific event types."""
155 pass
156
157 @abstractmethod
158 async def get_message(
159 self,
160 ignore_subscribe_messages: bool | None = None,
161 timeout: float = 0.0,
162 ) -> KeyNotification | None:
163 """Get the next keyspace notification if one is available."""
164 pass
165
166 @abstractmethod
167 def listen(self) -> AsyncIterator[KeyNotification]:
168 """Listen for keyspace notifications."""
169 pass
170
171 @abstractmethod
172 async def aclose(self):
173 """Close the notification manager and clean up resources."""
174 pass
175
176 @abstractmethod
177 async def __aenter__(self):
178 pass
179
180 @abstractmethod
181 async def __aexit__(self, _exc_type, _exc_val, _exc_tb):
182 pass
183
184 @property
185 @abstractmethod
186 def subscribed(self) -> bool:
187 """Check if there are any active subscriptions and not closed."""
188 pass
189
190 @abstractmethod
191 async def run(
192 self,
193 poll_timeout: float = 1.0,
194 exception_handler: Callable[
195 [BaseException, AsyncKeyspaceNotificationsInterface],
196 None | Awaitable[None],
197 ]
198 | None = None,
199 ) -> None:
200 """
201 Run the notification loop as a coroutine.
202
203 This is the async equivalent of run_in_thread() for sync notifications.
204 Use asyncio.create_task() to run in the background.
205
206 The exception_handler can be either a sync or async function.
207 """
208 pass
209
210
211# =============================================================================
212# Abstract Base Class for Async Keyspace Notifications
213# =============================================================================
214
215
216class AbstractAsyncKeyspaceNotifications(AsyncKeyspaceNotificationsInterface):
217 """
218 Abstract base class for async keyspace notification managers.
219
220 Provides shared implementation for subscribe/unsubscribe logic.
221 Subclasses must implement:
222 - _execute_subscribe: Execute the subscribe operation
223 - _execute_unsubscribe: Execute the unsubscribe operation
224 - get_message: Get the next notification
225 - listen: Async generator for notifications
226 - aclose: Clean up resources
227 """
228
229 def __init__(
230 self,
231 key_prefix: str | bytes | None = None,
232 ignore_subscribe_messages: bool = True,
233 ):
234 """
235 Initialize the base async keyspace notification manager.
236
237 Args:
238 key_prefix: Optional prefix to filter and strip from keys in notifications
239 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations
240 are not returned by get_message/listen
241 """
242 self.key_prefix = key_prefix
243 self.ignore_subscribe_messages = ignore_subscribe_messages
244 self._closed = False
245
246 async def subscribe(
247 self,
248 *channels: ChannelT,
249 handler: AsyncHandlerT | None = None,
250 ):
251 """
252 Subscribe to keyspace notification channels.
253
254 Automatically detects whether each channel is a pattern (contains
255 wildcards like *, ?, [) or an exact channel name and uses the
256 appropriate Redis subscribe command internally.
257
258 The handler can be either a sync or async function. Note that a
259 **sync** handler will be called directly on the event loop thread,
260 so it must not perform blocking I/O or long-running computation —
261 prefer an ``async`` handler whenever possible.
262 """
263 # Wrap the handler to convert raw messages to KeyNotification objects
264 wrapped_handler: Callable | None = None
265 if handler is not None:
266 key_prefix = self.key_prefix
267 is_async_handler = inspect.iscoroutinefunction(handler)
268
269 if is_async_handler:
270 # We've verified handler is async, so the result is awaitable
271 async_handler = handler
272
273 async def _async_wrap_handler(message):
274 notification = KeyNotification.from_message(
275 message, key_prefix=key_prefix
276 )
277 if notification is not None:
278 await async_handler(notification)
279
280 wrapped_handler = _async_wrap_handler
281 else:
282
283 def _sync_wrap_handler(message):
284 notification = KeyNotification.from_message(
285 message, key_prefix=key_prefix
286 )
287 if notification is not None:
288 handler(notification)
289
290 wrapped_handler = _sync_wrap_handler
291
292 patterns = {}
293 exact_channels = {}
294
295 for channel in channels:
296 if hasattr(channel, "_channel_str"):
297 channel_str = str(channel)
298 else:
299 channel_str = safe_str(channel)
300 if _is_pattern(channel):
301 patterns[channel_str] = wrapped_handler
302 else:
303 exact_channels[channel_str] = wrapped_handler
304
305 # Delegate to subclass implementation first. For standalone Redis
306 # this raises on failure, keeping tracking state clean. For cluster
307 # implementations the operation is best-effort (partial failures are
308 # logged, not raised) so tracking state is always updated afterwards.
309 await self._execute_subscribe(patterns, exact_channels)
310 self._track_subscribe(patterns, exact_channels)
311
312 @abstractmethod
313 async def _execute_subscribe(
314 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
315 ) -> None:
316 """Execute the subscribe operation."""
317 pass
318
319 async def unsubscribe(self, *channels: ChannelT):
320 """Unsubscribe from keyspace notification channels."""
321 patterns = []
322 exact_channels = []
323
324 for channel in channels:
325 if hasattr(channel, "_channel_str"):
326 channel_str = str(channel)
327 else:
328 channel_str = safe_str(channel)
329 if _is_pattern(channel):
330 patterns.append(channel_str)
331 else:
332 exact_channels.append(channel_str)
333
334 # Delegate to subclass implementation first. For standalone Redis
335 # this raises on failure, keeping tracking state intact. For cluster
336 # implementations the operation is best-effort (partial failures are
337 # logged, not raised) so tracking state is always removed afterwards
338 # — this is intentional: the user asked to unsubscribe, so
339 # refresh_subscriptions should not re-subscribe these channels.
340 await self._execute_unsubscribe(patterns, exact_channels)
341 self._untrack_subscribe(patterns, exact_channels)
342
343 @abstractmethod
344 async def _execute_unsubscribe(
345 self, patterns: list[str], exact_channels: list[str]
346 ) -> None:
347 """Execute the unsubscribe operation."""
348 pass
349
350 def _track_subscribe(
351 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
352 ) -> None:
353 """Track newly subscribed patterns/channels.
354
355 Override in subclasses that need to maintain their own subscription
356 registry (e.g. cluster implementations that must re-subscribe
357 new/failed-over nodes). The default is a no-op because standalone
358 implementations delegate tracking to the underlying PubSub object.
359 """
360
361 def _untrack_subscribe(
362 self, patterns: list[str], exact_channels: list[str]
363 ) -> None:
364 """Remove patterns/channels from the subscription registry.
365
366 Override in subclasses that maintain their own subscription registry.
367 The default is a no-op.
368 """
369
370 async def subscribe_keyspace(
371 self,
372 key_or_pattern: str,
373 db: int = 0,
374 handler: AsyncHandlerT | None = None,
375 ):
376 """Subscribe to keyspace notifications for specific keys."""
377 channel = KeyspaceChannel(key_or_pattern, db=db)
378 await self.subscribe(channel, handler=handler)
379
380 async def subscribe_keyevent(
381 self,
382 event: str,
383 db: int = 0,
384 handler: AsyncHandlerT | None = None,
385 ):
386 """Subscribe to keyevent notifications for specific event types."""
387 channel = KeyeventChannel(event, db=db)
388 await self.subscribe(channel, handler=handler)
389
390 async def __aenter__(self):
391 return self
392
393 async def __aexit__(self, _exc_type, _exc_val, _exc_tb):
394 await self.aclose()
395 return False
396
397 async def run(
398 self,
399 poll_timeout: float = 1.0,
400 exception_handler: Callable[
401 [BaseException, AsyncKeyspaceNotificationsInterface],
402 None | Awaitable[None],
403 ]
404 | None = None,
405 ) -> None:
406 """
407 Run the notification loop as a coroutine.
408
409 This continuously polls for notifications and triggers handlers.
410 Use asyncio.create_task() to run in the background.
411
412 Args:
413 poll_timeout: Timeout in seconds for each get_message call.
414 exception_handler: Optional callback for handling exceptions.
415 Can be sync or async.
416 """
417 while self.subscribed:
418 try:
419 await self.get_message(timeout=poll_timeout)
420 except asyncio.CancelledError:
421 raise
422 except BaseException as e:
423 if exception_handler is not None:
424 result = exception_handler(e, self)
425 if inspect.isawaitable(result):
426 await result
427 else:
428 raise
429
430
431# =============================================================================
432# Standalone Async Keyspace Notification Manager
433# =============================================================================
434
435
436class AsyncKeyspaceNotifications(AbstractAsyncKeyspaceNotifications):
437 """
438 Manages keyspace notification subscriptions for standalone async Redis.
439
440 For standalone Redis, keyspace notifications work with a single PubSub
441 connection. This class wraps that connection and provides:
442 - Automatic pattern vs exact channel detection
443 - KeyNotification parsing with optional key_prefix filtering
444 - Convenience methods for keyspace and keyevent subscriptions
445 - Context manager and run() coroutine support
446 """
447
448 def __init__(
449 self,
450 redis_client: Redis,
451 key_prefix: str | bytes | None = None,
452 ignore_subscribe_messages: bool = True,
453 ):
454 """
455 Initialize the standalone async keyspace notification manager.
456
457 Note: Keyspace notifications must be enabled on the Redis server via
458 the ``notify-keyspace-events`` configuration option.
459
460 Args:
461 redis_client: An async Redis client instance
462 key_prefix: Optional prefix to filter and strip from keys in notifications
463 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations
464 are not returned by get_message/listen
465 """
466 super().__init__(key_prefix, ignore_subscribe_messages)
467 self.redis = redis_client
468 # Create PubSub with ignore_subscribe_messages=False so per-call arg works
469 self._pubsub: PubSub = redis_client.pubsub(ignore_subscribe_messages=False)
470
471 async def _execute_subscribe(
472 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
473 ) -> None:
474 """Execute subscribe on the single pubsub connection."""
475 if patterns:
476 await self._pubsub.psubscribe(**patterns)
477 if exact_channels:
478 await self._pubsub.subscribe(**exact_channels)
479
480 async def _execute_unsubscribe(
481 self, patterns: list[str], exact_channels: list[str]
482 ) -> None:
483 """Execute unsubscribe on the single pubsub connection."""
484 if patterns:
485 await self._pubsub.punsubscribe(*patterns)
486 if exact_channels:
487 await self._pubsub.unsubscribe(*exact_channels)
488
489 async def get_message(
490 self,
491 ignore_subscribe_messages: bool | None = None,
492 timeout: float = 0.0,
493 ) -> KeyNotification | None:
494 """
495 Get the next keyspace notification if one is available.
496
497 Args:
498 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages.
499 Defaults to the value set in __init__ (True).
500 timeout: Time to wait for a message.
501
502 Returns:
503 A KeyNotification if a notification is available and no handler
504 was registered for the channel, None otherwise.
505 """
506 if ignore_subscribe_messages is None:
507 ignore_subscribe_messages = self.ignore_subscribe_messages
508
509 if self._closed:
510 return None
511
512 message = await self._pubsub.get_message(
513 ignore_subscribe_messages=ignore_subscribe_messages,
514 timeout=timeout,
515 )
516
517 if message is not None:
518 return KeyNotification.from_message(message, key_prefix=self.key_prefix)
519
520 return None
521
522 async def listen(self) -> AsyncIterator[KeyNotification]:
523 """
524 Listen for keyspace notifications.
525
526 This is an async generator that yields KeyNotification objects as they arrive.
527
528 Yields:
529 KeyNotification objects for each keyspace/keyevent notification.
530
531 Example:
532 >>> async for notification in ksn.listen():
533 ... print(f"{notification.key}: {notification.event_type}")
534 """
535 while self.subscribed:
536 notification = await self.get_message(timeout=1.0)
537 if notification is not None:
538 yield notification
539
540 @property
541 def subscribed(self) -> bool:
542 """Check if there are any active subscriptions and not closed."""
543 return not self._closed and self._pubsub.subscribed
544
545 async def aclose(self):
546 """Close the pubsub connection and clean up resources."""
547 self._closed = True
548 try:
549 await self._pubsub.aclose()
550 except Exception:
551 pass
552
553
554# =============================================================================
555# Cluster-Aware Async Keyspace Notification Manager
556# =============================================================================
557
558
559class AsyncClusterKeyspaceNotifications(AbstractAsyncKeyspaceNotifications):
560 """
561 Manages keyspace notification subscriptions across all nodes in an async Redis Cluster.
562
563 In Redis Cluster, keyspace notifications are NOT broadcast between nodes.
564 Each node only emits notifications for keys it owns. This class automatically
565 subscribes to all primary nodes in the cluster and handles topology changes.
566 """
567
568 def __init__(
569 self,
570 redis_cluster: RedisCluster,
571 key_prefix: str | bytes | None = None,
572 ignore_subscribe_messages: bool = True,
573 ):
574 """
575 Initialize the async cluster keyspace notification manager.
576
577 Note: Keyspace notifications must be enabled on all Redis cluster nodes via
578 the ``notify-keyspace-events`` configuration option.
579
580 Args:
581 redis_cluster: An async RedisCluster instance
582 key_prefix: Optional prefix to filter and strip from keys in notifications
583 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations
584 are not returned by get_message/listen
585 """
586 super().__init__(key_prefix, ignore_subscribe_messages)
587 self.cluster = redis_cluster
588
589 # Canonical subscription registry: pattern/channel -> wrapped handler.
590 # In cluster mode there are multiple PubSub objects (one per node), so
591 # this is the single source of truth used to (re-)subscribe new or
592 # failed-over nodes.
593 self._subscribed_patterns: dict[str, Any] = {}
594 self._subscribed_channels: dict[str, Any] = {}
595
596 # Track subscriptions per node
597 self._node_pubsubs: dict[str, PubSub] = {}
598
599 # Lock for topology refresh operations
600 self._refresh_lock = asyncio.Lock()
601
602 # Current pubsub index for round-robin polling
603 self._poll_index = 0
604
605 @property
606 def subscribed(self) -> bool:
607 """Check if there are any active subscriptions and not closed."""
608 return not self._closed and bool(
609 self._subscribed_patterns or self._subscribed_channels
610 )
611
612 def _track_subscribe(
613 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
614 ) -> None:
615 """Track newly subscribed patterns/channels in the cluster registry."""
616 if patterns:
617 self._subscribed_patterns.update(patterns)
618 if exact_channels:
619 self._subscribed_channels.update(exact_channels)
620
621 def _untrack_subscribe(
622 self, patterns: list[str], exact_channels: list[str]
623 ) -> None:
624 """Remove patterns/channels from the cluster registry."""
625 for p in patterns:
626 self._subscribed_patterns.pop(p, None)
627 for c in exact_channels:
628 self._subscribed_channels.pop(c, None)
629
630 def _get_all_primary_nodes(self) -> list[ClusterNode]:
631 """Get all primary nodes in the cluster."""
632 return self.cluster.get_primaries()
633
634 async def _ensure_node_pubsub(self, node: ClusterNode) -> PubSub:
635 """Get or create a PubSub instance for a node.
636
637 Uses a :class:`_ClusterNodePoolAdapter` to borrow a connection
638 from the node's existing pool. When the ``PubSub`` is closed
639 the connection is disconnected and returned to the node,
640 ensuring no subscribed socket is left in the free queue.
641 """
642 if node.name not in self._node_pubsubs:
643 pool_adapter = _ClusterNodePoolAdapter(node)
644 pubsub = PubSub(
645 connection_pool=pool_adapter, # type: ignore[arg-type]
646 ignore_subscribe_messages=False,
647 )
648 self._node_pubsubs[node.name] = pubsub
649 return self._node_pubsubs[node.name]
650
651 async def _cleanup_node(self, node_name: str) -> None:
652 """Remove and close a node's PubSub.
653
654 ``PubSub.aclose()`` disconnects the connection and releases it
655 back to the underlying :class:`ClusterNode` via the adapter.
656 """
657 pubsub = self._node_pubsubs.pop(node_name, None)
658 if pubsub:
659 try:
660 await pubsub.aclose()
661 except Exception:
662 pass
663
664 async def _execute_subscribe(
665 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
666 ) -> None:
667 """Execute subscribe on all cluster nodes.
668
669 Patterns and exact channels are subscribed in a single pass over
670 nodes so that a mid-batch node failure cannot create a
671 partially-caught-up replacement. If a node fails during this
672 call it is removed from ``_node_pubsubs`` and will be fully
673 re-subscribed on the next ``refresh_subscriptions`` cycle.
674
675 If a newly discovered node is encountered (not yet in
676 ``_node_pubsubs``), it is also subscribed to all *previously*
677 tracked patterns/channels so it doesn't miss notifications for
678 subscriptions that were established before this node joined.
679 """
680 if not patterns and not exact_channels:
681 return
682
683 failed_nodes: list[str] = []
684 for node in self._get_all_primary_nodes():
685 is_new_node = node.name not in self._node_pubsubs
686 pubsub = await self._ensure_node_pubsub(node)
687 try:
688 # If this is a brand-new node, catch it up on existing
689 # subscriptions before adding the new channels.
690 if is_new_node:
691 if self._subscribed_patterns:
692 await pubsub.psubscribe(**self._subscribed_patterns)
693 if self._subscribed_channels:
694 await pubsub.subscribe(**self._subscribed_channels)
695
696 if patterns:
697 await pubsub.psubscribe(**patterns)
698 if exact_channels:
699 await pubsub.subscribe(**exact_channels)
700 except Exception:
701 # Remove the broken pubsub and its connection pool
702 # so refresh_subscriptions can re-create both later.
703 await self._cleanup_node(node.name)
704 failed_nodes.append(node.name)
705
706 if failed_nodes:
707 logger.warning(
708 "Failed to subscribe on cluster nodes: %s. "
709 "These nodes will be retried on the next refresh cycle.",
710 ", ".join(failed_nodes),
711 )
712
713 async def _execute_unsubscribe(
714 self, patterns: list[str], exact_channels: list[str]
715 ) -> None:
716 """Execute unsubscribe on all cluster nodes."""
717 if patterns:
718 await self._unsubscribe_from_all_nodes(patterns, use_punsubscribe=True)
719 if exact_channels:
720 await self._unsubscribe_from_all_nodes(
721 exact_channels, use_punsubscribe=False
722 )
723
724 async def _unsubscribe_from_all_nodes(
725 self, channels: list[str], use_punsubscribe: bool
726 ):
727 """Unsubscribe from patterns/channels on all nodes.
728
729 Best-effort: tries every node so that a single broken connection
730 does not prevent the remaining nodes from being unsubscribed.
731 Broken pubsubs are cleaned up; the tracking state is still removed
732 by the caller, so ``refresh_subscriptions`` will *not* re-subscribe
733 these channels on replacement nodes.
734 """
735 failed_nodes: list[str] = []
736 for node_name in list(self._node_pubsubs.keys()):
737 pubsub = self._node_pubsubs.get(node_name)
738 if pubsub is None:
739 continue
740 try:
741 if use_punsubscribe:
742 await pubsub.punsubscribe(*channels)
743 else:
744 await pubsub.unsubscribe(*channels)
745 except Exception:
746 await self._cleanup_node(node_name)
747 failed_nodes.append(node_name)
748
749 if failed_nodes:
750 logger.warning(
751 "Failed to unsubscribe on cluster nodes: %s. "
752 "These nodes will be re-created on the next refresh cycle.",
753 ", ".join(failed_nodes),
754 )
755
756 async def get_message(
757 self,
758 ignore_subscribe_messages: bool | None = None,
759 timeout: float = 0.0,
760 ) -> KeyNotification | None:
761 """
762 Get the next keyspace notification if one is available.
763
764 This method polls all node pubsubs in round-robin fashion until
765 a message is received or the timeout expires.
766 If a connection error occurs, subscriptions are automatically refreshed.
767
768 Args:
769 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages.
770 Defaults to the value set in __init__ (True).
771 timeout: Total time to wait for a message (distributed across all nodes)
772
773 Returns:
774 A KeyNotification if a notification is available, None otherwise.
775 """
776 if self._closed:
777 return None
778
779 total_nodes = len(self._node_pubsubs)
780 if total_nodes == 0:
781 # Sleep for the requested timeout so callers that loop
782 # (run(), listen) don't spin the CPU when all node
783 # connections have been cleaned up.
784 if timeout > 0:
785 await asyncio.sleep(timeout)
786 return None
787
788 if ignore_subscribe_messages is None:
789 ignore_subscribe_messages = self.ignore_subscribe_messages
790
791 # Handle timeout=0 as a single non-blocking poll over all pubsubs
792 if timeout == 0.0:
793 return await self._poll_all_nodes_once(ignore_subscribe_messages)
794
795 # Calculate per-node timeout for each poll
796 per_node_timeout = min(0.1, timeout / max(total_nodes, 1))
797
798 start_time = time.monotonic()
799 end_time = start_time + timeout
800
801 while True:
802 if time.monotonic() >= end_time:
803 return None
804
805 pubsubs = list(self._node_pubsubs.values())
806 if not pubsubs:
807 return None
808
809 # Round-robin polling
810 self._poll_index = self._poll_index % len(pubsubs)
811 pubsub = pubsubs[self._poll_index]
812 self._poll_index += 1
813
814 try:
815 message = await pubsub.get_message(
816 ignore_subscribe_messages=ignore_subscribe_messages,
817 timeout=per_node_timeout,
818 )
819 except (ConnectionError, TimeoutError, RedisError):
820 await self._refresh_subscriptions_on_error()
821 continue
822
823 if message is not None:
824 notification = KeyNotification.from_message(
825 message, key_prefix=self.key_prefix
826 )
827 if notification is not None:
828 return notification
829
830 async def _poll_all_nodes_once(
831 self, ignore_subscribe_messages: bool
832 ) -> KeyNotification | None:
833 """
834 Perform a single non-blocking poll over all node pubsubs.
835
836 Returns:
837 A KeyNotification if one is available, None otherwise.
838 """
839 had_error = False
840 for pubsub in list(self._node_pubsubs.values()):
841 try:
842 message = await pubsub.get_message(
843 ignore_subscribe_messages=ignore_subscribe_messages,
844 timeout=0.0,
845 )
846 except (ConnectionError, TimeoutError, RedisError):
847 # Record the error but continue polling remaining healthy
848 # nodes so that already-buffered notifications are not lost.
849 had_error = True
850 continue
851
852 if message is not None:
853 notification = KeyNotification.from_message(
854 message, key_prefix=self.key_prefix
855 )
856 if notification is not None:
857 # Refresh before returning if any node had an error,
858 # so the next poll cycle has fresh state.
859 if had_error:
860 await self._refresh_subscriptions_on_error()
861 return notification
862
863 # Refresh after polling all nodes if any had errors
864 if had_error:
865 await self._refresh_subscriptions_on_error()
866 return None
867
868 async def listen(self) -> AsyncIterator[KeyNotification]:
869 """
870 Listen for keyspace notifications from all cluster nodes.
871
872 This is an async generator that yields KeyNotification objects as they arrive.
873
874 Yields:
875 KeyNotification objects for each keyspace/keyevent notification.
876
877 Example:
878 >>> async for notification in ksn.listen():
879 ... print(f"{notification.key}: {notification.event_type}")
880 """
881 while self.subscribed:
882 notification = await self.get_message(timeout=1.0)
883 if notification is not None:
884 yield notification
885
886 async def _refresh_subscriptions_on_error(self):
887 """
888 Refresh subscriptions after a connection error.
889
890 This is called automatically when a connection error occurs during
891 get_message(). It checks if nodes changed before refreshing.
892 """
893 self._poll_index = 0 # Reset round-robin index
894
895 try:
896 await self.refresh_subscriptions()
897 except Exception:
898 logger.warning(
899 "Failed to refresh cluster subscriptions, will retry on next error",
900 exc_info=True,
901 )
902
903 def _is_pubsub_connected(self, pubsub) -> bool:
904 """Check if a pubsub connection is still alive."""
905 try:
906 conn = pubsub.connection
907 if conn is None:
908 return False
909 # Async connections use is_connected property
910 return conn.is_connected
911 except Exception:
912 return False
913
914 async def refresh_subscriptions(self):
915 """
916 Refresh subscriptions after a topology change.
917
918 This method is called automatically when topology changes are detected
919 or when connection errors occur. You can also call it manually if needed.
920
921 This method:
922 1. Discovers any new primary nodes and subscribes them
923 2. Removes pubsubs for nodes that are no longer primaries
924 3. Re-creates broken pubsub connections for existing nodes
925 """
926 async with self._refresh_lock:
927 current_primaries = {
928 node.name: node for node in self._get_all_primary_nodes()
929 }
930
931 # Remove pubsubs for nodes that are no longer primaries
932 removed_nodes = set(self._node_pubsubs.keys()) - set(
933 current_primaries.keys()
934 )
935 for node_name in removed_nodes:
936 await self._cleanup_node(node_name)
937
938 # Detect broken connections for existing nodes and remove them
939 # so they get re-created below
940 existing_nodes = set(self._node_pubsubs.keys()) & set(
941 current_primaries.keys()
942 )
943 for node_name in existing_nodes:
944 pubsub = self._node_pubsubs.get(node_name)
945 if pubsub and not self._is_pubsub_connected(pubsub):
946 # Connection is broken, remove it so it gets re-created
947 await self._cleanup_node(node_name)
948
949 # Subscribe new nodes (and nodes with broken connections)
950 # to existing patterns/channels
951 new_nodes = set(current_primaries.keys()) - set(self._node_pubsubs.keys())
952 failed_nodes: list[str] = []
953 for node_name in new_nodes:
954 node = current_primaries[node_name]
955 pubsub = await self._ensure_node_pubsub(node)
956
957 try:
958 if self._subscribed_patterns:
959 await pubsub.psubscribe(**self._subscribed_patterns)
960 if self._subscribed_channels:
961 await pubsub.subscribe(**self._subscribed_channels)
962 except Exception:
963 # Subscription failed - remove from dict so retry is possible
964 await self._cleanup_node(node_name)
965 failed_nodes.append(node_name)
966
967 # Raise after attempting all nodes so we don't skip any
968 if failed_nodes:
969 raise ConnectionError(
970 f"Failed to subscribe to cluster nodes: {', '.join(failed_nodes)}"
971 )
972
973 async def aclose(self):
974 """Close all pubsub connections and clean up resources."""
975 self._closed = True
976 for node_name in list(self._node_pubsubs.keys()):
977 await self._cleanup_node(node_name)
978 self._subscribed_patterns.clear()
979 self._subscribed_channels.clear()