1"""
2Redis Keyspace Notifications support for redis-py.
3
4This module provides utilities for subscribing to and parsing Redis keyspace
5notifications. Keyspace notifications allow clients to receive events when
6keys are modified in Redis.
7
8Note: Keyspace notifications must be enabled on the Redis server via the
9``notify-keyspace-events`` configuration option. This is a server-side
10configuration that should be done by your infrastructure/operations team.
11See the Redis documentation for details:
12https://redis.io/docs/latest/develop/pubsub/keyspace-notifications/
13
14Standalone Redis Example:
15 >>> from redis import Redis
16 >>> from redis.keyspace_notifications import (
17 ... KeyspaceNotifications,
18 ... KeyspaceChannel,
19 ... EventType,
20 ... )
21 >>>
22 >>> r = Redis()
23 >>> # Server must have notify-keyspace-events configured (e.g., "KEA")
24 >>> ksn = KeyspaceNotifications(r)
25 >>>
26 >>> # Subscribe using Channel class (patterns auto-detected)
27 >>> channel = KeyspaceChannel("user:*")
28 >>> ksn.subscribe(channel)
29 >>>
30 >>> # Or use convenience methods for specific event types
31 >>> ksn.subscribe_keyevent(EventType.SET)
32 >>>
33 >>> for notification in ksn.listen():
34 ... print(f"Key: {notification.key}, Event: {notification.event_type}")
35
36Redis Cluster Example:
37 >>> from redis.cluster import RedisCluster
38 >>> from redis.keyspace_notifications import (
39 ... ClusterKeyspaceNotifications,
40 ... KeyspaceChannel,
41 ... EventType,
42 ... )
43 >>>
44 >>> rc = RedisCluster(host="localhost", port=7000)
45 >>> # Server must have notify-keyspace-events configured (e.g., "KEA")
46 >>> ksn = ClusterKeyspaceNotifications(rc)
47 >>>
48 >>> # Subscribe using Channel class (patterns auto-detected)
49 >>> channel = KeyspaceChannel("user:*")
50 >>> ksn.subscribe(channel)
51 >>>
52 >>> # Or use convenience methods for specific event types
53 >>> ksn.subscribe_keyevent(EventType.SET)
54 >>>
55 >>> for notification in ksn.listen():
56 ... print(f"Key: {notification.key}, Event: {notification.event_type}")
57"""
58
59from __future__ import annotations
60
61import logging
62import re
63import threading
64import time
65from abc import ABC, abstractmethod
66from collections.abc import Callable
67from dataclasses import dataclass
68from enum import Enum
69from typing import TYPE_CHECKING, Any, ClassVar, Union
70
71from redis.client import Redis
72from redis.cluster import RedisCluster
73from redis.exceptions import (
74 ConnectionError,
75 RedisError,
76 TimeoutError,
77)
78from redis.utils import safe_str
79
80logger = logging.getLogger(__name__)
81
82if TYPE_CHECKING:
83 from typing import TypeAlias
84
85# Type alias for channel arguments - can be a string, bytes, or Channel object
86# This is defined here and the actual types are added after class definitions
87ChannelT: TypeAlias = Union[str, bytes, "KeyspaceChannel", "KeyeventChannel"]
88
89
90# Type alias for sync handlers
91SyncHandlerT = Callable[["KeyNotification"], None]
92
93
94# =============================================================================
95# Event Type Constants
96# =============================================================================
97# These are common Redis keyspace notification event types provided for
98# convenience. You can use any string as an event type - these constants
99# are not exhaustive and Redis may add new events in future versions.
100
101
102class EventType:
103 """
104 Common Redis keyspace notification event type constants.
105
106 These are provided for convenience and IDE autocomplete. You can use
107 any string as an event type - new Redis events will work without
108 needing library updates.
109 """
110
111 # String commands
112 SET = "set"
113 SETEX = "setex"
114 SETNX = "setnx"
115 SETRANGE = "setrange"
116 INCR = "incr"
117 INCRBY = "incrby"
118 INCRBYFLOAT = "incrbyfloat"
119 DECR = "decr"
120 DECRBY = "decrby"
121 APPEND = "append"
122
123 # Generic commands
124 DEL = "del"
125 UNLINK = "unlink"
126 RENAME = "rename"
127 RENAME_FROM = "rename_from"
128 RENAME_TO = "rename_to"
129 COPY_TO = "copy_to"
130 MOVE = "move"
131 RESTORE = "restore"
132
133 # Expiration events
134 EXPIRE = "expire"
135 EXPIREAT = "expireat"
136 PEXPIRE = "pexpire"
137 PEXPIREAT = "pexpireat"
138 EXPIRED = "expired"
139 PERSIST = "persist"
140
141 # Eviction events
142 EVICTED = "evicted"
143
144 # List commands
145 LPUSH = "lpush"
146 RPUSH = "rpush"
147 LPOP = "lpop"
148 RPOP = "rpop"
149 LINSERT = "linsert"
150 LSET = "lset"
151 LTRIM = "ltrim"
152 LMOVE = "lmove"
153 BLPOP = "blpop"
154 BRPOP = "brpop"
155 BLMOVE = "blmove"
156
157 # Set commands
158 SADD = "sadd"
159 SREM = "srem"
160 SPOP = "spop"
161 SMOVE = "smove"
162 SINTERSTORE = "sinterstore"
163 SUNIONSTORE = "sunionstore"
164 SDIFFSTORE = "sdiffstore"
165
166 # Sorted set commands
167 ZADD = "zadd"
168 ZINCRBY = "zincrby"
169 ZREM = "zrem"
170 ZREMRANGEBYRANK = "zremrangebyrank"
171 ZREMRANGEBYSCORE = "zremrangebyscore"
172 ZREMRANGEBYLEX = "zremrangebylex"
173 ZPOPMIN = "zpopmin"
174 ZPOPMAX = "zpopmax"
175 BZPOPMIN = "bzpopmin"
176 BZPOPMAX = "bzpopmax"
177 ZINTERSTORE = "zinterstore"
178 ZUNIONSTORE = "zunionstore"
179 ZDIFFSTORE = "zdiffstore"
180 ZRANGESTORE = "zrangestore"
181
182 # Hash commands
183 HSET = "hset"
184 HSETNX = "hsetnx"
185 HDEL = "hdel"
186 HINCRBY = "hincrby"
187 HINCRBYFLOAT = "hincrbyfloat"
188
189 # Stream commands
190 XADD = "xadd"
191 XTRIM = "xtrim"
192 XDEL = "xdel"
193 XGROUP_CREATE = "xgroup-create"
194 XGROUP_CREATECONSUMER = "xgroup-createconsumer"
195 XGROUP_DELCONSUMER = "xgroup-delconsumer"
196 XGROUP_DESTROY = "xgroup-destroy"
197 XGROUP_SETID = "xgroup-setid"
198 XSETID = "xsetid"
199 XCLAIM = "xclaim"
200 XAUTOCLAIM = "xautoclaim"
201 XREADGROUP = "xreadgroup"
202
203 # Other
204 NEW = "new" # Key created (when tracking new keys)
205 SORTSTORE = "sortstore"
206 GETEX = "getex"
207 GETDEL = "getdel"
208 SETIFGT = "setifgt"
209 SETIFLT = "setiflt"
210 SETIFEQ = "setifeq"
211 SETIFNE = "setifne"
212
213
214@dataclass
215class KeyNotification:
216 """
217 Represents a parsed Redis keyspace or keyevent notification.
218
219 This class provides convenient access to the notification details
220 like key, event type, and database number.
221
222 Attributes:
223 key: The Redis key that was affected (for keyspace notifications)
224 or the key name from the message data (for keyevent notifications)
225 event_type: The type of operation that occurred (e.g., "set", "del").
226 This is a plain string, so new Redis events work automatically.
227 Compare against EventType constants or any string.
228 database: The database number where the event occurred
229 channel: The original channel name
230 is_keyspace: True if this is a keyspace notification, False for keyevent
231 """
232
233 # Regex patterns for parsing keyspace/keyevent channels
234 # Pattern: __keyspace@<db>__:<key> or __keyevent@<db>__:<event>
235 _KEYSPACE_PATTERN: ClassVar[re.Pattern] = re.compile(
236 r"^__keyspace@(\d+|\*)__:(.+)$"
237 )
238 _KEYEVENT_PATTERN: ClassVar[re.Pattern] = re.compile(
239 r"^__keyevent@(\d+|\*)__:(.+)$"
240 )
241
242 key: str
243 event_type: str
244 database: int
245 channel: str
246 is_keyspace: bool
247
248 @classmethod
249 def from_message(
250 cls,
251 message: dict[str, Any] | None,
252 key_prefix: str | bytes | None = None,
253 ) -> KeyNotification | None:
254 """
255 Parse a pub/sub message into a KeyNotification.
256
257 Args:
258 message: A pub/sub message dict with 'channel', 'data', and 'type' keys
259 key_prefix: Optional prefix to filter and strip from keys.
260 If provided, only notifications for keys starting with
261 this prefix will be returned, and the prefix will be
262 stripped from the key.
263
264 Returns:
265 A KeyNotification if the message is a valid keyspace/keyevent
266 notification, None otherwise.
267
268 Example:
269 >>> message = {
270 ... 'type': 'pmessage',
271 ... 'pattern': '__keyspace@0__:user:*',
272 ... 'channel': '__keyspace@0__:user:123',
273 ... 'data': 'set'
274 ... }
275 >>> notification = KeyNotification.from_message(message)
276 >>> notification.key
277 'user:123'
278 >>> notification.event_type
279 'set'
280 """
281 if message is None:
282 return None
283
284 msg_type = message.get("type")
285 if msg_type not in ("message", "pmessage"):
286 return None
287
288 channel = message.get("channel")
289 data = message.get("data")
290
291 if channel is None or data is None:
292 return None
293
294 return cls.try_parse(channel, data, key_prefix)
295
296 @classmethod
297 def try_parse(
298 cls,
299 channel: str | bytes,
300 data: str | bytes,
301 key_prefix: str | bytes | None = None,
302 ) -> KeyNotification | None:
303 """
304 Try to parse a channel and data into a KeyNotification.
305
306 This is a lower-level method that takes the channel and data directly,
307 useful when working with callback-based subscription handlers.
308
309 Args:
310 channel: The channel name (e.g., "__keyspace@0__:mykey")
311 data: The message data (event type for keyspace, key for keyevent)
312 key_prefix: Optional prefix to filter and strip from keys
313
314 Returns:
315 A KeyNotification if valid, None otherwise.
316 """
317 channel = safe_str(channel)
318 data = safe_str(data)
319
320 return cls._parse(channel, data, key_prefix)
321
322 @classmethod
323 def _parse(
324 cls,
325 channel: str,
326 data: str,
327 key_prefix: str | bytes | None = None,
328 ) -> KeyNotification | None:
329 """Internal parsing logic."""
330 # Normalize key_prefix
331 key_prefix = safe_str(key_prefix) if key_prefix else None
332
333 # Try keyspace pattern first: __keyspace@<db>__:<key>
334 match = cls._KEYSPACE_PATTERN.match(channel)
335 if match:
336 db_str, key = match.groups()
337 database = int(db_str) if db_str != "*" else -1
338 event_type = data # For keyspace, the data is the event type
339
340 # Apply key prefix filter
341 if key_prefix:
342 if not key.startswith(key_prefix):
343 return None
344 key = key[len(key_prefix) :]
345
346 return cls(
347 key=key,
348 event_type=event_type,
349 database=database,
350 channel=channel,
351 is_keyspace=True,
352 )
353
354 # Try keyevent pattern: __keyevent@<db>__:<event>
355 match = cls._KEYEVENT_PATTERN.match(channel)
356 if match:
357 db_str, event_type = match.groups()
358 database = int(db_str) if db_str != "*" else -1
359 key = data # For keyevent, the data is the key
360
361 # Apply key prefix filter
362 if key_prefix:
363 if not key.startswith(key_prefix):
364 return None
365 key = key[len(key_prefix) :]
366
367 return cls(
368 key=key,
369 event_type=event_type,
370 database=database,
371 channel=channel,
372 is_keyspace=False,
373 )
374
375 return None
376
377 def key_starts_with(self, prefix: str | bytes) -> bool:
378 """Check if the key starts with the given prefix."""
379 prefix = safe_str(prefix)
380 return self.key.startswith(prefix)
381
382
383# =============================================================================
384# Channel Classes
385# =============================================================================
386
387
388class KeyspaceChannel:
389 """
390 Represents a keyspace notification channel for subscribing to events on keys.
391
392 Keyspace notifications publish the event type (e.g., "set", "del") as the message
393 when a key matching the pattern is modified.
394
395 This class can be used directly with subscribe()/psubscribe() as it implements
396 __str__ to return the channel string.
397
398 Attributes:
399 key_or_pattern: The key or pattern to monitor (use '*' for wildcards)
400 db: The database number (defaults to 0, the only database in Redis Cluster)
401 is_pattern: Whether this channel contains wildcards
402
403 Examples:
404 >>> channel = KeyspaceChannel("user:123", db=0)
405 >>> str(channel)
406 '__keyspace@0__:user:123'
407
408 >>> # Pattern subscription (wildcards are auto-detected)
409 >>> channel = KeyspaceChannel("user:*", db=0)
410 >>> str(channel)
411 '__keyspace@0__:user:*'
412
413 >>> # Use with KeyspaceNotifications
414 >>> notifications = KeyspaceNotifications(redis_client)
415 >>> notifications.subscribe(channel)
416 """
417
418 PREFIX: ClassVar[str] = "__keyspace@"
419
420 def __init__(self, key_or_pattern: str, db: int = 0):
421 """
422 Create a keyspace notification channel.
423
424 Args:
425 key_or_pattern: The key or pattern to monitor. Use '*' for wildcards.
426 db: The database number. Defaults to 0 (the only database in Redis Cluster).
427 """
428 self.key_or_pattern = key_or_pattern
429 self.db = db
430 self._channel_str = self._build_channel_string()
431
432 def _build_channel_string(self) -> str:
433 return f"{self.PREFIX}{self.db}__:{self.key_or_pattern}"
434
435 @property
436 def is_pattern(self) -> bool:
437 """Check if this channel contains wildcards and should use psubscribe."""
438 return _is_pattern(self.key_or_pattern)
439
440 def __str__(self) -> str:
441 return self._channel_str
442
443 def __repr__(self) -> str:
444 return f"KeyspaceChannel({self.key_or_pattern!r}, db={self.db})"
445
446 def __eq__(self, other: object) -> bool:
447 if isinstance(other, KeyspaceChannel):
448 return self._channel_str == other._channel_str
449 if isinstance(other, str):
450 return self._channel_str == other
451 return NotImplemented
452
453 def __hash__(self) -> int:
454 return hash(self._channel_str)
455
456
457class KeyeventChannel:
458 """
459 Represents a keyevent notification channel for subscribing to event types.
460
461 Keyevent notifications publish the key name as the message when the specified
462 event type occurs on any key.
463
464 This class can be used directly with subscribe()/psubscribe() as it implements
465 __str__ to return the channel string.
466
467 Attributes:
468 event: The event type to monitor
469 db: The database number (defaults to 0, the only database in Redis Cluster)
470 is_pattern: Whether this channel contains wildcards
471
472 Examples:
473 >>> channel = KeyeventChannel(EventType.SET, db=0)
474 >>> str(channel)
475 '__keyevent@0__:set'
476
477 >>> channel = KeyeventChannel.all_events(db=0)
478 >>> str(channel)
479 '__keyevent@0__:*'
480
481 >>> # Use with KeyspaceNotifications
482 >>> notifications = KeyspaceNotifications(redis_client)
483 >>> notifications.subscribe(channel)
484 """
485
486 PREFIX: ClassVar[str] = "__keyevent@"
487
488 def __init__(self, event: str, db: int = 0):
489 """
490 Create a keyevent notification channel.
491
492 Args:
493 event: The event type to monitor (e.g., EventType.SET or "set")
494 db: The database number. Defaults to 0 (the only database in Redis Cluster).
495 """
496 self.event = event
497 self.db = db
498 self._channel_str = self._build_channel_string()
499
500 def _build_channel_string(self) -> str:
501 return f"{self.PREFIX}{self.db}__:{self.event}"
502
503 @property
504 def is_pattern(self) -> bool:
505 """Check if this channel contains wildcards and should use psubscribe."""
506 return _is_pattern(self.event)
507
508 @classmethod
509 def all_events(cls, db: int = 0) -> "KeyeventChannel":
510 """
511 Create a keyevent pattern for subscribing to all event types.
512
513 This is equivalent to KeyeventChannel("*").
514
515 Args:
516 db: The database number. Defaults to 0 (the only database in Redis Cluster).
517
518 Returns:
519 A KeyeventChannel configured to receive all events.
520
521 Examples:
522 >>> channel = KeyeventChannel.all_events()
523 >>> str(channel)
524 '__keyevent@0__:*'
525 """
526 return cls("*", db=db)
527
528 def __str__(self) -> str:
529 return self._channel_str
530
531 def __repr__(self) -> str:
532 return f"KeyeventChannel({self.event!r}, db={self.db})"
533
534 def __eq__(self, other: object) -> bool:
535 if isinstance(other, KeyeventChannel):
536 return self._channel_str == other._channel_str
537 if isinstance(other, str):
538 return self._channel_str == other
539 return NotImplemented
540
541 def __hash__(self) -> int:
542 return hash(self._channel_str)
543
544
545class ChannelType(Enum):
546 """
547 Enum representing the type of a Redis keyspace notification channel.
548
549 Redis provides two types of keyspace notifications:
550 - KEYSPACE: Notifies about events on specific keys. The channel format is
551 `__keyspace@{db}__:{key}` and the message data contains the event type.
552 - KEYEVENT: Notifies about specific event types. The channel format is
553 `__keyevent@{db}__:{event}` and the message data contains the key name.
554
555 Examples:
556 >>> get_channel_type("__keyspace@0__:mykey")
557 ChannelType.KEYSPACE
558 >>> get_channel_type("__keyevent@0__:set")
559 ChannelType.KEYEVENT
560 >>> get_channel_type("regular_channel") is None
561 True
562 """
563
564 KEYSPACE = "keyspace"
565 KEYEVENT = "keyevent"
566
567
568def get_channel_type(channel: str | bytes) -> ChannelType | None:
569 """
570 Determine the type of a Redis keyspace notification channel.
571
572 Args:
573 channel: The channel name to check (string or bytes).
574
575 Returns:
576 ChannelType.KEYSPACE if it's a keyspace notification channel,
577 ChannelType.KEYEVENT if it's a keyevent notification channel,
578 None if it's not a keyspace notification channel.
579
580 Examples:
581 >>> get_channel_type("__keyspace@0__:mykey")
582 ChannelType.KEYSPACE
583 >>> get_channel_type("__keyevent@0__:set")
584 ChannelType.KEYEVENT
585 >>> get_channel_type("regular_channel") is None
586 True
587 >>> get_channel_type(b"__keyspace@0__:mykey")
588 ChannelType.KEYSPACE
589 """
590 channel_str = safe_str(channel)
591 if channel_str.startswith(KeyspaceChannel.PREFIX):
592 return ChannelType.KEYSPACE
593 if channel_str.startswith(KeyeventChannel.PREFIX):
594 return ChannelType.KEYEVENT
595 return None
596
597
598def _is_pattern(
599 channel: str | bytes | KeyspaceChannel | KeyeventChannel,
600) -> bool:
601 """
602 Check if a channel string contains glob-style pattern characters.
603
604 Redis uses glob-style patterns for psubscribe:
605 - * matches any sequence of characters
606 - ? matches any single character
607 - [...] matches any character in the brackets
608
609 Args:
610 channel: The channel string to check. Can be a string, bytes,
611 or a KeyspaceChannel/KeyeventChannel object.
612
613 Returns:
614 True if the channel contains pattern characters, False otherwise.
615 """
616 # Handle Channel objects that have _channel_str attribute
617 # (KeyspaceChannel, KeyeventChannel)
618 if hasattr(channel, "_channel_str"):
619 channel = channel._channel_str
620 channel = safe_str(channel)
621 # Check for unescaped glob pattern characters.
622 # * and ? are always pattern characters.
623 # [ is only a pattern character when followed by a matching unescaped ],
624 # forming a bracket expression like [abc] or [a-z]. A lone [ (e.g. in
625 # a key named "my[key") is treated as a literal by Redis.
626 i = 0
627 while i < len(channel):
628 char = channel[i]
629 if char == "\\":
630 # Skip escaped character
631 i += 2
632 continue
633 if char in ("*", "?"):
634 return True
635 if char == "[":
636 # Look for a matching unescaped ]
637 j = i + 1
638 while j < len(channel):
639 if channel[j] == "\\":
640 j += 2
641 continue
642 if channel[j] == "]":
643 return True
644 j += 1
645 # No matching ] found — literal [
646 i += 1
647 return False
648
649
650# =============================================================================
651# Abstract Base Class for Keyspace Notifications
652# =============================================================================
653
654
655class KeyspaceNotificationsInterface(ABC):
656 """
657 Interface for keyspace notification managers.
658
659 This interface provides a consistent API for both standalone (KeyspaceNotifications)
660 and cluster (ClusterKeyspaceNotifications) implementations, allowing the same
661 code patterns to work with both standalone and cluster Redis deployments.
662 """
663
664 @abstractmethod
665 def subscribe(
666 self,
667 *channels: ChannelT,
668 handler: SyncHandlerT | None = None,
669 ):
670 """Subscribe to keyspace notification channels."""
671 pass
672
673 @abstractmethod
674 def unsubscribe(self, *channels: ChannelT):
675 """Unsubscribe from keyspace notification channels."""
676 pass
677
678 @abstractmethod
679 def subscribe_keyspace(
680 self,
681 key_or_pattern: str,
682 db: int = 0,
683 handler: SyncHandlerT | None = None,
684 ):
685 """Subscribe to keyspace notifications for specific keys."""
686 pass
687
688 @abstractmethod
689 def subscribe_keyevent(
690 self,
691 event: str,
692 db: int = 0,
693 handler: SyncHandlerT | None = None,
694 ):
695 """Subscribe to keyevent notifications for specific event types."""
696 pass
697
698 @abstractmethod
699 def get_message(
700 self,
701 ignore_subscribe_messages: bool | None = None,
702 timeout: float = 0.0,
703 ) -> KeyNotification | None:
704 """Get the next keyspace notification if one is available."""
705 pass
706
707 @abstractmethod
708 def listen(self):
709 """Listen for keyspace notifications."""
710 pass
711
712 @abstractmethod
713 def close(self):
714 """Close the notification manager and clean up resources."""
715 pass
716
717 @abstractmethod
718 def __enter__(self):
719 pass
720
721 @abstractmethod
722 def __exit__(self, _exc_type, _exc_val, _exc_tb):
723 pass
724
725 @property
726 @abstractmethod
727 def subscribed(self) -> bool:
728 """Check if there are any active subscriptions and not closed."""
729 pass
730
731 @abstractmethod
732 def run_in_thread(
733 self,
734 poll_timeout: float = 0.0,
735 daemon: bool = False,
736 exception_handler: Callable[
737 [
738 BaseException,
739 KeyspaceNotificationsInterface,
740 KeyspaceWorkerThread,
741 ],
742 None,
743 ]
744 | None = None,
745 ) -> KeyspaceWorkerThread:
746 """Start a background thread that polls for notifications."""
747 pass
748
749
750class AbstractKeyspaceNotifications(KeyspaceNotificationsInterface):
751 """
752 Abstract base class for keyspace notification managers.
753
754 Provides shared implementation for subscribe/unsubscribe logic.
755 Subclasses must implement:
756 - _execute_subscribe: Execute the subscribe operation
757 - _execute_unsubscribe: Execute the unsubscribe operation
758 - get_message: Get the next notification
759 - listen: Generator for notifications
760 - close: Clean up resources
761 """
762
763 def __init__(
764 self,
765 key_prefix: str | bytes | None = None,
766 ignore_subscribe_messages: bool = True,
767 ):
768 """
769 Initialize the base keyspace notification manager.
770
771 Args:
772 key_prefix: Optional prefix to filter and strip from keys in notifications
773 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations
774 are not returned by get_message/listen
775 """
776 self.key_prefix = key_prefix
777 self.ignore_subscribe_messages = ignore_subscribe_messages
778 self._closed = False
779
780 def subscribe(
781 self,
782 *channels: ChannelT,
783 handler: SyncHandlerT | None = None,
784 ):
785 """
786 Subscribe to keyspace notification channels.
787
788 Automatically detects whether each channel is a pattern (contains
789 wildcards like *, ?, [) or an exact channel name and uses the
790 appropriate Redis subscribe command internally.
791
792 Args:
793 *channels: Channels to subscribe to. Can be strings, KeyspaceChannel,
794 or KeyeventChannel objects. Patterns are auto-detected.
795 handler: Optional callback function that receives KeyNotification
796 objects. If provided, notifications are passed to the handler
797 instead of being returned by get_message()/listen().
798 """
799 # Wrap the handler to convert raw messages to KeyNotification objects
800 wrapped_handler: Callable | None = None
801 if handler is not None:
802 # Capture key_prefix in closure for consistent filtering/stripping
803 key_prefix = self.key_prefix
804
805 def _wrap_handler(message):
806 notification = KeyNotification.from_message(
807 message, key_prefix=key_prefix
808 )
809 if notification is not None:
810 handler(notification)
811
812 wrapped_handler = _wrap_handler
813
814 patterns = {}
815 exact_channels = {}
816
817 for channel in channels:
818 if hasattr(channel, "_channel_str"):
819 channel_str = str(channel)
820 else:
821 channel_str = safe_str(channel)
822 if _is_pattern(channel):
823 patterns[channel_str] = wrapped_handler
824 else:
825 exact_channels[channel_str] = wrapped_handler
826
827 # Delegate to subclass implementation first. For standalone Redis
828 # this raises on failure, keeping tracking state clean. For cluster
829 # implementations the operation is best-effort (partial failures are
830 # logged, not raised) so tracking state is always updated afterwards.
831 self._execute_subscribe(patterns, exact_channels)
832 self._track_subscribe(patterns, exact_channels)
833
834 @abstractmethod
835 def _execute_subscribe(
836 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
837 ) -> None:
838 """
839 Execute the subscribe operation.
840
841 Args:
842 patterns: Dict mapping pattern strings to handlers (for psubscribe)
843 exact_channels: Dict mapping channel strings to handlers (for subscribe)
844 """
845 pass
846
847 def unsubscribe(self, *channels: ChannelT):
848 """
849 Unsubscribe from keyspace notification channels.
850
851 Automatically detects whether each channel is a pattern or exact
852 channel and uses the appropriate Redis unsubscribe command.
853
854 Args:
855 *channels: Channels to unsubscribe from.
856 """
857 patterns = []
858 exact_channels = []
859
860 for channel in channels:
861 if hasattr(channel, "_channel_str"):
862 channel_str = str(channel)
863 else:
864 channel_str = safe_str(channel)
865 if _is_pattern(channel):
866 patterns.append(channel_str)
867 else:
868 exact_channels.append(channel_str)
869
870 # Delegate to subclass implementation first. For standalone Redis
871 # this raises on failure, keeping tracking state intact. For cluster
872 # implementations the operation is best-effort (partial failures are
873 # logged, not raised) so tracking state is always removed afterwards
874 # — this is intentional: the user asked to unsubscribe, so
875 # refresh_subscriptions should not re-subscribe these channels.
876 self._execute_unsubscribe(patterns, exact_channels)
877 self._untrack_subscribe(patterns, exact_channels)
878
879 @abstractmethod
880 def _execute_unsubscribe(
881 self, patterns: list[str], exact_channels: list[str]
882 ) -> None:
883 """
884 Execute the unsubscribe operation.
885
886 Args:
887 patterns: List of pattern strings to punsubscribe from
888 exact_channels: List of channel strings to unsubscribe from
889 """
890 pass
891
892 def _track_subscribe(
893 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
894 ) -> None:
895 """Track newly subscribed patterns/channels.
896
897 Override in subclasses that need to maintain their own subscription
898 registry (e.g. cluster implementations that must re-subscribe
899 new/failed-over nodes). The default is a no-op because standalone
900 implementations delegate tracking to the underlying PubSub object.
901 """
902
903 def _untrack_subscribe(
904 self, patterns: list[str], exact_channels: list[str]
905 ) -> None:
906 """Remove patterns/channels from the subscription registry.
907
908 Override in subclasses that maintain their own subscription registry.
909 The default is a no-op.
910 """
911
912 def subscribe_keyspace(
913 self,
914 key_or_pattern: str,
915 db: int = 0,
916 handler: SyncHandlerT | None = None,
917 ):
918 """
919 Subscribe to keyspace notifications for specific keys.
920
921 Args:
922 key_or_pattern: The key or pattern to monitor. Use '*' for wildcards.
923 db: The database number (default 0).
924 handler: Optional callback for notifications.
925
926 Example:
927 >>> ksn.subscribe_keyspace("user:123", db=0)
928 >>> ksn.subscribe_keyspace("user:*", db=0)
929 """
930 channel = KeyspaceChannel(key_or_pattern, db=db)
931 self.subscribe(channel, handler=handler)
932
933 def subscribe_keyevent(
934 self,
935 event: str,
936 db: int = 0,
937 handler: SyncHandlerT | None = None,
938 ):
939 """
940 Subscribe to keyevent notifications for specific event types.
941
942 Args:
943 event: The event type to monitor (e.g., EventType.SET or "set")
944 db: The database number (default 0).
945 handler: Optional callback for notifications.
946
947 Example:
948 >>> ksn.subscribe_keyevent(EventType.SET)
949 >>> ksn.subscribe_keyevent(EventType.EXPIRED, handler=my_handler)
950 """
951 channel = KeyeventChannel(event, db=db)
952 self.subscribe(channel, handler=handler)
953
954 def __enter__(self):
955 return self
956
957 def __exit__(self, _exc_type, _exc_val, _exc_tb):
958 self.close()
959 return False
960
961 def run_in_thread(
962 self,
963 poll_timeout: float = 0.0,
964 daemon: bool = False,
965 exception_handler: Callable[
966 [
967 BaseException,
968 KeyspaceNotificationsInterface,
969 KeyspaceWorkerThread,
970 ],
971 None,
972 ]
973 | None = None,
974 ) -> KeyspaceWorkerThread:
975 """
976 Start a background thread that polls for notifications and triggers handlers.
977
978 This method spawns a thread that continuously calls get_message() to
979 process incoming notifications. When a notification arrives, any
980 registered handler for that channel/pattern is invoked automatically.
981
982 All subscriptions must have handlers registered before calling this method.
983
984 Args:
985 poll_timeout: Timeout in seconds for get_message() calls. When no message
986 is available, the thread waits up to this long before checking
987 again. Default 0.0 (non-blocking). WARNING: the default
988 causes a CPU spin-loop. It is preferred to pass a positive
989 value (e.g. 0.1 or 1.0).
990 daemon: If True, the thread will be a daemon thread and will be
991 terminated when the main program exits. Default False.
992 exception_handler: Optional callback invoked when an exception occurs
993 in the worker thread. Receives (exception, notifications,
994 thread) as arguments. If None, exceptions are raised.
995
996 Returns:
997 KeyspaceWorkerThread: The started worker thread. Call stop() on it
998 to stop the thread and close the notifications.
999
1000 Raises:
1001 RedisError: If any subscription doesn't have a handler registered.
1002
1003 Example:
1004 >>> def my_handler(notification):
1005 ... print(f"Got: {notification.key} - {notification.event_type}")
1006 >>>
1007 >>> notifications.subscribe(KeyspaceChannel("user:*"), handler=my_handler)
1008 >>> thread = notifications.run_in_thread(poll_timeout=0.1, daemon=True)
1009 >>> # ... handlers are called automatically ...
1010 >>> thread.stop()
1011 """
1012 self._validate_all_handlers()
1013
1014 thread = KeyspaceWorkerThread(
1015 self,
1016 poll_timeout,
1017 daemon=daemon,
1018 exception_handler=exception_handler,
1019 )
1020 thread.start()
1021 return thread
1022
1023 @abstractmethod
1024 def _validate_all_handlers(self) -> None:
1025 """Raise :class:`~redis.RedisError` if any subscription lacks a handler.
1026
1027 Subclasses inspect their own subscription state to perform
1028 the validation.
1029 """
1030 pass
1031
1032
1033class KeyspaceWorkerThread(threading.Thread):
1034 """
1035 Background thread for processing keyspace notifications.
1036
1037 This thread continuously polls for notifications and invokes registered
1038 handlers. It works with both KeyspaceNotifications (standalone) and
1039 ClusterKeyspaceNotifications.
1040
1041 Example:
1042 >>> thread = notifications.run_in_thread(poll_timeout=0.1)
1043 >>> # ... handlers are called automatically ...
1044 >>> thread.stop()
1045 """
1046
1047 def __init__(
1048 self,
1049 notifications: KeyspaceNotificationsInterface,
1050 poll_timeout: float,
1051 daemon: bool = False,
1052 exception_handler: Callable[
1053 [
1054 BaseException,
1055 KeyspaceNotificationsInterface,
1056 KeyspaceWorkerThread,
1057 ],
1058 None,
1059 ]
1060 | None = None,
1061 ):
1062 super().__init__()
1063 self.daemon = daemon
1064 self.notifications = notifications
1065 self.poll_timeout = poll_timeout
1066 self.exception_handler = exception_handler
1067 self._running = threading.Event()
1068
1069 def run(self) -> None:
1070 """Main loop that polls for notifications and triggers handlers."""
1071 if self._running.is_set():
1072 return
1073 self._running.set()
1074 notifications = self.notifications
1075 poll_timeout = self.poll_timeout
1076 while self._running.is_set():
1077 try:
1078 notifications.get_message(
1079 ignore_subscribe_messages=True, timeout=poll_timeout
1080 )
1081 except BaseException as e:
1082 if self.exception_handler is None:
1083 raise
1084 self.exception_handler(e, notifications, self)
1085 notifications.close()
1086
1087 def stop(self) -> None:
1088 """
1089 Stop the worker thread.
1090
1091 This signals the thread to exit its run loop. The thread will close
1092 the notifications object before terminating.
1093 """
1094 self._running.clear()
1095
1096
1097# =============================================================================
1098# Standalone Keyspace Notification Manager
1099# =============================================================================
1100
1101
1102class KeyspaceNotifications(AbstractKeyspaceNotifications):
1103 """
1104 Manages keyspace notification subscriptions for standalone Redis.
1105
1106 For standalone Redis, keyspace notifications work with a single PubSub
1107 connection. This class wraps that connection and provides:
1108 - Automatic pattern vs exact channel detection
1109 - KeyNotification parsing with optional key_prefix filtering
1110 - Convenience methods for keyspace and keyevent subscriptions
1111 - Context manager and run_in_thread support
1112 """
1113
1114 def __init__(
1115 self,
1116 redis_client: Redis,
1117 key_prefix: str | bytes | None = None,
1118 ignore_subscribe_messages: bool = True,
1119 ):
1120 """
1121 Initialize the standalone keyspace notification manager.
1122
1123 Note: Keyspace notifications must be enabled on the Redis server via
1124 the ``notify-keyspace-events`` configuration option. This is a server-side
1125 configuration that should be done by your infrastructure/operations team.
1126
1127 Args:
1128 redis_client: A Redis client instance
1129 key_prefix: Optional prefix to filter and strip from keys in notifications
1130 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations
1131 are not returned by get_message/listen
1132 """
1133 super().__init__(key_prefix, ignore_subscribe_messages)
1134 self.redis = redis_client
1135
1136 # Create the PubSub instance with ignore_subscribe_messages=False
1137 # so that the per-call argument in get_message() can control behavior
1138 self._pubsub = redis_client.pubsub(ignore_subscribe_messages=False)
1139
1140 def _execute_subscribe(
1141 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
1142 ) -> None:
1143 """Execute subscribe on the single pubsub connection."""
1144 if patterns:
1145 self._pubsub.psubscribe(**patterns)
1146 if exact_channels:
1147 self._pubsub.subscribe(**exact_channels)
1148
1149 def _execute_unsubscribe(
1150 self, patterns: list[str], exact_channels: list[str]
1151 ) -> None:
1152 """Execute unsubscribe on the single pubsub connection."""
1153 if patterns:
1154 self._pubsub.punsubscribe(*patterns)
1155 if exact_channels:
1156 self._pubsub.unsubscribe(*exact_channels)
1157
1158 def get_message(
1159 self,
1160 ignore_subscribe_messages: bool | None = None,
1161 timeout: float = 0.0,
1162 ) -> KeyNotification | None:
1163 """
1164 Get the next keyspace notification if one is available.
1165
1166 Note: If a handler was registered for the channel, pubsub will call
1167 the handler directly and this method returns None for that message.
1168
1169 Args:
1170 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages.
1171 Defaults to the value set in __init__ (True).
1172 timeout: Time to wait for a message.
1173
1174 Returns:
1175 A KeyNotification if a notification is available and no handler
1176 was registered for the channel, None otherwise.
1177 """
1178 if ignore_subscribe_messages is None:
1179 ignore_subscribe_messages = self.ignore_subscribe_messages
1180
1181 if self._closed:
1182 return None
1183
1184 # Pubsub's get_message will call wrapped handlers directly for channels
1185 # with registered handlers and return None. For channels without handlers,
1186 # it returns the raw message which we parse to KeyNotification.
1187 message = self._pubsub.get_message(
1188 ignore_subscribe_messages=ignore_subscribe_messages,
1189 timeout=timeout,
1190 )
1191
1192 if message is not None:
1193 return KeyNotification.from_message(message, key_prefix=self.key_prefix)
1194
1195 return None
1196
1197 def listen(self):
1198 """
1199 Listen for keyspace notifications.
1200
1201 This is a generator that yields KeyNotification objects as they arrive.
1202 It blocks until a notification is received.
1203
1204 Yields:
1205 KeyNotification objects for each keyspace/keyevent notification.
1206
1207 Example:
1208 >>> for notification in ksn.listen():
1209 ... print(f"{notification.key}: {notification.event_type}")
1210 """
1211 while self.subscribed:
1212 notification = self.get_message(timeout=1.0)
1213 if notification is not None:
1214 yield notification
1215
1216 @property
1217 def subscribed(self) -> bool:
1218 """Check if there are any active subscriptions and not closed."""
1219 return not self._closed and self._pubsub.subscribed
1220
1221 def _validate_all_handlers(self) -> None:
1222 """Raise if any subscription in the underlying PubSub lacks a handler."""
1223 for channel, handler in self._pubsub.channels.items():
1224 if handler is None:
1225 raise RedisError(f"Channel '{channel}' has no handler registered")
1226 for pattern, handler in self._pubsub.patterns.items():
1227 if handler is None:
1228 raise RedisError(f"Pattern '{pattern}' has no handler registered")
1229
1230 def close(self):
1231 """Close the pubsub connection and clean up resources."""
1232 self._closed = True
1233 try:
1234 self._pubsub.close()
1235 except Exception:
1236 pass
1237
1238
1239# =============================================================================
1240# Cluster-Aware Keyspace Notification Manager
1241# =============================================================================
1242
1243
1244class ClusterKeyspaceNotifications(AbstractKeyspaceNotifications):
1245 """
1246 Manages keyspace notification subscriptions across all nodes in a Redis Cluster.
1247
1248 In Redis Cluster, keyspace notifications are NOT broadcast between nodes.
1249 Each node only emits notifications for keys it owns. This class automatically
1250 subscribes to all primary nodes in the cluster and handles topology changes.
1251 """
1252
1253 def __init__(
1254 self,
1255 redis_cluster: RedisCluster,
1256 key_prefix: str | bytes | None = None,
1257 ignore_subscribe_messages: bool = True,
1258 ):
1259 """
1260 Initialize the cluster keyspace notification manager.
1261
1262 Note: Keyspace notifications must be enabled on all Redis cluster nodes via
1263 the ``notify-keyspace-events`` configuration option. This is a server-side
1264 configuration that should be done by your infrastructure/operations team.
1265
1266 Args:
1267 redis_cluster: A RedisCluster instance
1268 key_prefix: Optional prefix to filter and strip from keys in notifications
1269 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations
1270 are not returned by get_message/listen
1271 """
1272 super().__init__(key_prefix, ignore_subscribe_messages)
1273 self.cluster = redis_cluster
1274
1275 # Canonical subscription registry: pattern/channel -> wrapped handler.
1276 # In cluster mode there are multiple PubSub objects (one per node), so
1277 # this is the single source of truth used to (re-)subscribe new or
1278 # failed-over nodes.
1279 self._subscribed_patterns: dict[str, Any] = {}
1280 self._subscribed_channels: dict[str, Any] = {}
1281
1282 # Track subscriptions per node
1283 self._node_pubsubs: dict[str, Any] = {}
1284
1285 # Lock for topology refresh operations
1286 self._refresh_lock = threading.Lock()
1287
1288 # Current pubsub index for round-robin polling
1289 self._poll_index = 0
1290
1291 @property
1292 def subscribed(self) -> bool:
1293 """Check if there are any active subscriptions and not closed."""
1294 return not self._closed and bool(
1295 self._subscribed_patterns or self._subscribed_channels
1296 )
1297
1298 def _track_subscribe(
1299 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
1300 ) -> None:
1301 """Track newly subscribed patterns/channels in the cluster registry."""
1302 if patterns:
1303 self._subscribed_patterns.update(patterns)
1304 if exact_channels:
1305 self._subscribed_channels.update(exact_channels)
1306
1307 def _untrack_subscribe(
1308 self, patterns: list[str], exact_channels: list[str]
1309 ) -> None:
1310 """Remove patterns/channels from the cluster registry."""
1311 for p in patterns:
1312 self._subscribed_patterns.pop(p, None)
1313 for c in exact_channels:
1314 self._subscribed_channels.pop(c, None)
1315
1316 def _validate_all_handlers(self) -> None:
1317 """Raise if any subscription in the cluster registry lacks a handler."""
1318 for channel, handler in self._subscribed_channels.items():
1319 if handler is None:
1320 raise RedisError(f"Channel '{channel}' has no handler registered")
1321 for pattern, handler in self._subscribed_patterns.items():
1322 if handler is None:
1323 raise RedisError(f"Pattern '{pattern}' has no handler registered")
1324
1325 def _get_all_primary_nodes(self):
1326 """Get all primary nodes in the cluster."""
1327 return self.cluster.get_primaries()
1328
1329 def _cleanup_node(self, node_name: str) -> None:
1330 """Remove and close a node's PubSub.
1331
1332 Closing the ``PubSub`` disconnects its connection so it is not
1333 left in a subscribed state inside the connection pool.
1334 """
1335 pubsub = self._node_pubsubs.pop(node_name, None)
1336 if pubsub:
1337 try:
1338 pubsub.close()
1339 except Exception:
1340 pass
1341
1342 def _ensure_node_pubsub(self, node) -> Any:
1343 """Get or create a PubSub instance for a node."""
1344 if node.name not in self._node_pubsubs:
1345 redis_conn = self.cluster.get_redis_connection(node)
1346 # Always create PubSub with ignore_subscribe_messages=False
1347 # so that the per-call argument in get_message() can control
1348 # the behavior reliably
1349 pubsub = redis_conn.pubsub(ignore_subscribe_messages=False)
1350 self._node_pubsubs[node.name] = pubsub
1351 return self._node_pubsubs[node.name]
1352
1353 def _execute_subscribe(
1354 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
1355 ) -> None:
1356 """Execute subscribe on all cluster nodes.
1357
1358 Patterns and exact channels are subscribed in a single pass over
1359 nodes so that a mid-batch node failure cannot create a
1360 partially-caught-up replacement. If a node fails during this
1361 call it is removed from ``_node_pubsubs`` and will be fully
1362 re-subscribed on the next ``refresh_subscriptions`` cycle.
1363
1364 If a newly discovered node is encountered (not yet in
1365 ``_node_pubsubs``), it is also subscribed to all *previously*
1366 tracked patterns/channels so it doesn't miss notifications for
1367 subscriptions that were established before this node joined.
1368 """
1369 if not patterns and not exact_channels:
1370 return
1371
1372 failed_nodes: list[str] = []
1373 for node in self._get_all_primary_nodes():
1374 is_new_node = node.name not in self._node_pubsubs
1375 pubsub = self._ensure_node_pubsub(node)
1376 try:
1377 # If this is a brand-new node, catch it up on existing
1378 # subscriptions before adding the new channels.
1379 if is_new_node:
1380 if self._subscribed_patterns:
1381 pubsub.psubscribe(**self._subscribed_patterns)
1382 if self._subscribed_channels:
1383 pubsub.subscribe(**self._subscribed_channels)
1384
1385 if patterns:
1386 pubsub.psubscribe(**patterns)
1387 if exact_channels:
1388 pubsub.subscribe(**exact_channels)
1389 except Exception:
1390 # Remove the broken pubsub so refresh_subscriptions can
1391 # re-create it later.
1392 self._cleanup_node(node.name)
1393 failed_nodes.append(node.name)
1394
1395 if failed_nodes:
1396 logger.warning(
1397 "Failed to subscribe on cluster nodes: %s. "
1398 "These nodes will be retried on the next refresh cycle.",
1399 ", ".join(failed_nodes),
1400 )
1401
1402 def _execute_unsubscribe(
1403 self, patterns: list[str], exact_channels: list[str]
1404 ) -> None:
1405 """Execute unsubscribe on all cluster nodes."""
1406 if patterns:
1407 self._unsubscribe_from_all_nodes(patterns, use_punsubscribe=True)
1408 if exact_channels:
1409 self._unsubscribe_from_all_nodes(exact_channels, use_punsubscribe=False)
1410
1411 def _unsubscribe_from_all_nodes(self, channels: list[str], use_punsubscribe: bool):
1412 """Unsubscribe from patterns/channels on all nodes.
1413
1414 Best-effort: tries every node so that a single broken connection
1415 does not prevent the remaining nodes from being unsubscribed.
1416 Broken pubsubs are cleaned up; the tracking state is still removed
1417 by the caller, so ``refresh_subscriptions`` will *not* re-subscribe
1418 these channels on replacement nodes.
1419 """
1420 failed_nodes: list[str] = []
1421 for node_name, pubsub in list(self._node_pubsubs.items()):
1422 try:
1423 if use_punsubscribe:
1424 pubsub.punsubscribe(*channels)
1425 else:
1426 pubsub.unsubscribe(*channels)
1427 except Exception:
1428 self._cleanup_node(node_name)
1429 failed_nodes.append(node_name)
1430
1431 if failed_nodes:
1432 logger.warning(
1433 "Failed to unsubscribe on cluster nodes: %s. "
1434 "These nodes will be re-created on the next refresh cycle.",
1435 ", ".join(failed_nodes),
1436 )
1437
1438 def get_message(
1439 self,
1440 ignore_subscribe_messages: bool | None = None,
1441 timeout: float = 0.0,
1442 ) -> KeyNotification | None:
1443 """
1444 Get the next keyspace notification if one is available.
1445
1446 This method polls all node pubsubs in round-robin fashion until
1447 a message is received or the timeout expires.
1448 If a connection error occurs, subscriptions are automatically refreshed.
1449
1450 Args:
1451 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages.
1452 Defaults to the value set in __init__ (True).
1453 timeout: Total time to wait for a message (distributed across all nodes)
1454
1455 Returns:
1456 A KeyNotification if a notification is available, None otherwise.
1457 """
1458 if self._closed:
1459 return None
1460
1461 total_nodes = len(self._node_pubsubs)
1462 if total_nodes == 0:
1463 # Sleep for the requested timeout so callers that loop
1464 # (run_in_thread, listen) don't spin the CPU when all node
1465 # connections have been cleaned up.
1466 if timeout > 0:
1467 time.sleep(timeout)
1468 return None
1469
1470 # Use instance default if not specified
1471 if ignore_subscribe_messages is None:
1472 ignore_subscribe_messages = self.ignore_subscribe_messages
1473
1474 # Handle timeout=0 as a single non-blocking poll over all pubsubs
1475 # This matches the expected semantics of PubSub.get_message(timeout=0)
1476 if timeout == 0.0:
1477 return self._poll_all_nodes_once(ignore_subscribe_messages)
1478
1479 # Calculate per-node timeout for each poll
1480 # Use a small timeout per node to allow round-robin polling
1481 per_node_timeout = min(0.1, timeout / max(total_nodes, 1))
1482
1483 start_time = time.monotonic()
1484 end_time = start_time + timeout
1485
1486 while True:
1487 # Check if we've exceeded the total timeout
1488 if time.monotonic() >= end_time:
1489 return None
1490
1491 pubsubs = list(self._node_pubsubs.values())
1492 if not pubsubs:
1493 return None
1494
1495 # Round-robin polling
1496 self._poll_index = self._poll_index % len(pubsubs)
1497 pubsub = pubsubs[self._poll_index]
1498 self._poll_index += 1
1499
1500 try:
1501 message = pubsub.get_message(
1502 ignore_subscribe_messages=ignore_subscribe_messages,
1503 timeout=per_node_timeout,
1504 )
1505 except (ConnectionError, TimeoutError, RedisError):
1506 # Connection error - refresh subscriptions and continue
1507 self._refresh_subscriptions_on_error()
1508 continue
1509
1510 if message is not None:
1511 # Note: If a handler was registered, PubSub already invoked it
1512 # and returned None, so we only reach here for handler-less subscriptions
1513 notification = KeyNotification.from_message(
1514 message, key_prefix=self.key_prefix
1515 )
1516 if notification is not None:
1517 return notification
1518 # If not a keyspace notification, continue checking other nodes
1519
1520 def _poll_all_nodes_once(
1521 self, ignore_subscribe_messages: bool
1522 ) -> KeyNotification | None:
1523 """
1524 Perform a single non-blocking poll over all node pubsubs.
1525
1526 This is used when timeout=0 to match the expected semantics of
1527 PubSub.get_message(timeout=0) - a non-blocking check for messages.
1528
1529 Returns:
1530 A KeyNotification if one is available, None otherwise.
1531 """
1532 had_error = False
1533 for pubsub in list(self._node_pubsubs.values()):
1534 try:
1535 message = pubsub.get_message(
1536 ignore_subscribe_messages=ignore_subscribe_messages,
1537 timeout=0.0,
1538 )
1539 except (ConnectionError, TimeoutError, RedisError):
1540 # Record the error but continue polling remaining healthy
1541 # nodes so that already-buffered notifications are not lost.
1542 had_error = True
1543 continue
1544
1545 if message is not None:
1546 # Note: If a handler was registered, PubSub already invoked it
1547 # and returned None, so we only reach here for handler-less subscriptions
1548 notification = KeyNotification.from_message(
1549 message, key_prefix=self.key_prefix
1550 )
1551 if notification is not None:
1552 # Refresh before returning if any node had an error,
1553 # so the next poll cycle has fresh state.
1554 if had_error:
1555 self._refresh_subscriptions_on_error()
1556 return notification
1557
1558 # Refresh after polling all nodes if any had errors
1559 if had_error:
1560 self._refresh_subscriptions_on_error()
1561 return None
1562
1563 def listen(self):
1564 """
1565 Listen for keyspace notifications from all cluster nodes.
1566
1567 This is a generator that yields KeyNotification objects as they arrive.
1568 It blocks until a notification is received.
1569
1570 Yields:
1571 KeyNotification objects for each keyspace/keyevent notification.
1572
1573 Example:
1574 >>> for notification in ksn.listen():
1575 ... print(f"{notification.key}: {notification.event_type}")
1576 """
1577 while self.subscribed:
1578 notification = self.get_message(timeout=1.0)
1579 if notification is not None:
1580 yield notification
1581
1582 def _refresh_subscriptions_on_error(self):
1583 """
1584 Refresh subscriptions after a connection error.
1585
1586 This is called automatically when a connection error occurs during
1587 get_message(). It checks if nodes changed before refreshing.
1588 """
1589 self._poll_index = 0 # Reset round-robin index
1590
1591 try:
1592 self.refresh_subscriptions()
1593 except Exception:
1594 logger.warning(
1595 "Failed to refresh cluster subscriptions, will retry on next error",
1596 exc_info=True,
1597 )
1598
1599 def _is_pubsub_connected(self, pubsub) -> bool:
1600 """Check if a pubsub connection is still alive."""
1601 try:
1602 conn = pubsub.connection
1603 if conn is None:
1604 return False
1605 return conn.is_connected
1606 except Exception:
1607 return False
1608
1609 def refresh_subscriptions(self):
1610 """
1611 Refresh subscriptions after a topology change.
1612
1613 This method is called automatically when topology changes are detected
1614 or when connection errors occur. You can also call it manually if needed.
1615
1616 This method:
1617 1. Discovers any new primary nodes and subscribes them
1618 2. Removes pubsubs for nodes that are no longer primaries
1619 3. Re-creates broken pubsub connections for existing nodes
1620 """
1621 with self._refresh_lock:
1622 current_primaries = {
1623 node.name: node for node in self._get_all_primary_nodes()
1624 }
1625
1626 # Remove pubsubs for nodes that are no longer primaries
1627 removed_nodes = set(self._node_pubsubs.keys()) - set(
1628 current_primaries.keys()
1629 )
1630 for node_name in removed_nodes:
1631 self._cleanup_node(node_name)
1632
1633 # Detect broken connections for existing nodes and remove them
1634 # so they get re-created below
1635 existing_nodes = set(self._node_pubsubs.keys()) & set(
1636 current_primaries.keys()
1637 )
1638 for node_name in existing_nodes:
1639 pubsub = self._node_pubsubs.get(node_name)
1640 if pubsub and not self._is_pubsub_connected(pubsub):
1641 # Connection is broken, remove it so it gets re-created
1642 self._cleanup_node(node_name)
1643
1644 # Subscribe new nodes (and nodes with broken connections) to existing
1645 # patterns/channels
1646 new_nodes = set(current_primaries.keys()) - set(self._node_pubsubs.keys())
1647 failed_nodes: list[str] = []
1648 for node_name in new_nodes:
1649 node = current_primaries[node_name]
1650 pubsub = self._ensure_node_pubsub(node)
1651
1652 try:
1653 if self._subscribed_patterns:
1654 pubsub.psubscribe(**self._subscribed_patterns)
1655 if self._subscribed_channels:
1656 pubsub.subscribe(**self._subscribed_channels)
1657 except Exception:
1658 # Subscription failed - remove from dict so retry is possible
1659 self._cleanup_node(node_name)
1660 failed_nodes.append(node_name)
1661
1662 # Raise after attempting all nodes so we don't skip any
1663 if failed_nodes:
1664 raise ConnectionError(
1665 f"Failed to subscribe to cluster nodes: {', '.join(failed_nodes)}"
1666 )
1667
1668 def close(self):
1669 """Close all pubsub connections and clean up resources."""
1670 self._closed = True
1671 for node_name in list(self._node_pubsubs.keys()):
1672 self._cleanup_node(node_name)
1673 self._subscribed_patterns.clear()
1674 self._subscribed_channels.clear()