1"""
2Redis Keyspace Notifications support for redis-py.
3
4This module provides utilities for subscribing to and parsing Redis keyspace
5notifications. Keyspace notifications allow clients to receive events when
6keys are modified in Redis.
7
8Note: Keyspace notifications must be enabled on the Redis server via the
9``notify-keyspace-events`` configuration option. This is a server-side
10configuration that should be done by your infrastructure/operations team.
11See the Redis documentation for details:
12https://redis.io/docs/latest/develop/pubsub/keyspace-notifications/
13
14Standalone Redis Example:
15 >>> from redis import Redis
16 >>> from redis.keyspace_notifications import (
17 ... KeyspaceNotifications,
18 ... KeyspaceChannel,
19 ... EventType,
20 ... )
21 >>>
22 >>> r = Redis()
23 >>> # Server must have notify-keyspace-events configured (e.g., "KEA")
24 >>> ksn = KeyspaceNotifications(r)
25 >>>
26 >>> # Subscribe using Channel class (patterns auto-detected)
27 >>> channel = KeyspaceChannel("user:*")
28 >>> ksn.subscribe(channel)
29 >>>
30 >>> # Or use convenience methods for specific event types
31 >>> ksn.subscribe_keyevent(EventType.SET)
32 >>>
33 >>> for notification in ksn.listen():
34 ... print(f"Key: {notification.key}, Event: {notification.event_type}")
35
36Redis Cluster Example:
37 >>> from redis.cluster import RedisCluster
38 >>> from redis.keyspace_notifications import (
39 ... ClusterKeyspaceNotifications,
40 ... KeyspaceChannel,
41 ... EventType,
42 ... )
43 >>>
44 >>> rc = RedisCluster(host="localhost", port=7000)
45 >>> # Server must have notify-keyspace-events configured (e.g., "KEA")
46 >>> ksn = ClusterKeyspaceNotifications(rc)
47 >>>
48 >>> # Subscribe using Channel class (patterns auto-detected)
49 >>> channel = KeyspaceChannel("user:*")
50 >>> ksn.subscribe(channel)
51 >>>
52 >>> # Or use convenience methods for specific event types
53 >>> ksn.subscribe_keyevent(EventType.SET)
54 >>>
55 >>> for notification in ksn.listen():
56 ... print(f"Key: {notification.key}, Event: {notification.event_type}")
57"""
58
59from __future__ import annotations
60
61import logging
62import re
63import threading
64import time
65from abc import ABC, abstractmethod
66from collections.abc import Callable
67from dataclasses import dataclass
68from enum import Enum
69from typing import TYPE_CHECKING, Any, ClassVar, Union
70
71from redis.client import Redis
72from redis.cluster import RedisCluster
73from redis.exceptions import (
74 ConnectionError,
75 RedisError,
76 TimeoutError,
77)
78from redis.utils import safe_str
79
80logger = logging.getLogger(__name__)
81
82if TYPE_CHECKING:
83 from typing import TypeAlias
84
85# Type alias for channel arguments - can be a string, bytes, or Channel object
86# This is defined here and the actual types are added after class definitions
87ChannelT: TypeAlias = Union[
88 str,
89 bytes,
90 "KeyspaceChannel",
91 "KeyeventChannel",
92 "SubkeyspaceChannel",
93 "SubkeyeventChannel",
94 "SubkeyspaceitemChannel",
95 "SubkeyspaceeventChannel",
96]
97
98
99# Type alias for sync handlers
100SyncHandlerT = Callable[["KeyNotification"], None]
101
102
103# =============================================================================
104# Event Type Constants
105# =============================================================================
106# These are common Redis keyspace notification event types provided for
107# convenience. You can use any string as an event type - these constants
108# are not exhaustive and Redis may add new events in future versions.
109
110
111class EventType:
112 """
113 Common Redis keyspace notification event type constants.
114
115 These are provided for convenience and IDE autocomplete. You can use
116 any string as an event type - new Redis events will work without
117 needing library updates.
118 """
119
120 # String commands
121 SET = "set"
122 SETEX = "setex"
123 SETNX = "setnx"
124 SETRANGE = "setrange"
125 INCR = "incr"
126 INCRBY = "incrby"
127 INCRBYFLOAT = "incrbyfloat"
128 DECR = "decr"
129 DECRBY = "decrby"
130 APPEND = "append"
131
132 # Generic commands
133 DEL = "del"
134 UNLINK = "unlink"
135 RENAME = "rename"
136 RENAME_FROM = "rename_from"
137 RENAME_TO = "rename_to"
138 COPY_TO = "copy_to"
139 MOVE = "move"
140 RESTORE = "restore"
141
142 # Expiration events
143 EXPIRE = "expire"
144 EXPIREAT = "expireat"
145 PEXPIRE = "pexpire"
146 PEXPIREAT = "pexpireat"
147 EXPIRED = "expired"
148 PERSIST = "persist"
149
150 # Eviction events
151 EVICTED = "evicted"
152
153 # List commands
154 LPUSH = "lpush"
155 RPUSH = "rpush"
156 LPOP = "lpop"
157 RPOP = "rpop"
158 LINSERT = "linsert"
159 LSET = "lset"
160 LTRIM = "ltrim"
161 LMOVE = "lmove"
162 BLPOP = "blpop"
163 BRPOP = "brpop"
164 BLMOVE = "blmove"
165
166 # Set commands
167 SADD = "sadd"
168 SREM = "srem"
169 SPOP = "spop"
170 SMOVE = "smove"
171 SINTERSTORE = "sinterstore"
172 SUNIONSTORE = "sunionstore"
173 SDIFFSTORE = "sdiffstore"
174
175 # Sorted set commands
176 ZADD = "zadd"
177 ZINCRBY = "zincrby"
178 ZREM = "zrem"
179 ZREMRANGEBYRANK = "zremrangebyrank"
180 ZREMRANGEBYSCORE = "zremrangebyscore"
181 ZREMRANGEBYLEX = "zremrangebylex"
182 ZPOPMIN = "zpopmin"
183 ZPOPMAX = "zpopmax"
184 BZPOPMIN = "bzpopmin"
185 BZPOPMAX = "bzpopmax"
186 ZINTERSTORE = "zinterstore"
187 ZUNIONSTORE = "zunionstore"
188 ZDIFFSTORE = "zdiffstore"
189 ZRANGESTORE = "zrangestore"
190
191 # Hash commands
192 HSET = "hset"
193 HSETNX = "hsetnx"
194 HDEL = "hdel"
195 HINCRBY = "hincrby"
196 HINCRBYFLOAT = "hincrbyfloat"
197
198 # Stream commands
199 XADD = "xadd"
200 XTRIM = "xtrim"
201 XDEL = "xdel"
202 XGROUP_CREATE = "xgroup-create"
203 XGROUP_CREATECONSUMER = "xgroup-createconsumer"
204 XGROUP_DELCONSUMER = "xgroup-delconsumer"
205 XGROUP_DESTROY = "xgroup-destroy"
206 XGROUP_SETID = "xgroup-setid"
207 XSETID = "xsetid"
208 XCLAIM = "xclaim"
209 XAUTOCLAIM = "xautoclaim"
210 XREADGROUP = "xreadgroup"
211
212 # Other
213 NEW = "new" # Key created (when tracking new keys)
214 SORTSTORE = "sortstore"
215 GETEX = "getex"
216 GETDEL = "getdel"
217 SETIFGT = "setifgt"
218 SETIFLT = "setiflt"
219 SETIFEQ = "setifeq"
220 SETIFNE = "setifne"
221
222
223def _parse_length_prefixed_subkeys(s: str) -> list[str]:
224 """Parse a length-prefixed subkey list.
225
226 The wire format is ``<len>:<subkey>[,<len>:<subkey>...]``.
227
228 Returns:
229 A list of subkey strings.
230 """
231 subkeys: list[str] = []
232 pos = 0
233 while pos < len(s):
234 colon = s.index(":", pos)
235 length = int(s[pos:colon])
236 start = colon + 1
237 subkeys.append(s[start : start + length])
238 pos = start + length
239 if pos < len(s) and s[pos] == ",":
240 pos += 1 # skip comma separator
241 return subkeys
242
243
244@dataclass
245class KeyNotification:
246 """
247 Represents a parsed Redis keyspace, keyevent, or subkey notification.
248
249 This class provides convenient access to the notification details
250 like key, event type, database number, and affected subkeys.
251
252 Attributes:
253 key: The Redis key that was affected (for keyspace notifications)
254 or the key name from the message data (for keyevent notifications)
255 event_type: The type of operation that occurred (e.g., "set", "del").
256 This is a plain string, so new Redis events work automatically.
257 Compare against EventType constants or any string.
258 database: The database number where the event occurred
259 channel: The original channel name
260 is_keyspace: True if this is a keyspace notification, False for keyevent
261 data: The raw data payload from the notification message.
262 subkeys: List of affected subkeys (fields) for subkey notifications.
263 Empty list for regular keyspace/keyevent notifications.
264 """
265
266 # Regex patterns for parsing keyspace/keyevent channels
267 # Pattern: __keyspace@<db>__:<key> or __keyevent@<db>__:<event>
268 _KEYSPACE_PATTERN: ClassVar[re.Pattern] = re.compile(
269 r"^__keyspace@(\d+|\*)__:(.+)$"
270 )
271 _KEYEVENT_PATTERN: ClassVar[re.Pattern] = re.compile(
272 r"^__keyevent@(\d+|\*)__:(.+)$"
273 )
274 _SUBKEYSPACE_PATTERN: ClassVar[re.Pattern] = re.compile(
275 r"^__subkeyspace@(\d+|\*)__:(.+)$"
276 )
277 _SUBKEYEVENT_PATTERN: ClassVar[re.Pattern] = re.compile(
278 r"^__subkeyevent@(\d+|\*)__:(.+)$"
279 )
280 _SUBKEYSPACEITEM_PATTERN: ClassVar[re.Pattern] = re.compile(
281 r"^__subkeyspaceitem@(\d+|\*)__:(.+)$", re.DOTALL
282 )
283 _SUBKEYSPACEEVENT_PATTERN: ClassVar[re.Pattern] = re.compile(
284 r"^__subkeyspaceevent@(\d+|\*)__:(.+)$"
285 )
286
287 key: str
288 event_type: str
289 database: int
290 channel: str
291 is_keyspace: bool
292 data: str
293 subkeys: list[str] = None # type: ignore[assignment]
294
295 def __post_init__(self):
296 if self.subkeys is None:
297 self.subkeys = []
298
299 @classmethod
300 def from_message(
301 cls,
302 message: dict[str, Any] | None,
303 key_prefix: str | bytes | None = None,
304 ) -> KeyNotification | None:
305 """
306 Parse a pub/sub message into a KeyNotification.
307
308 Args:
309 message: A pub/sub message dict with 'channel', 'data', and 'type' keys
310 key_prefix: Optional prefix to filter and strip from keys.
311 If provided, only notifications for keys starting with
312 this prefix will be returned, and the prefix will be
313 stripped from the key.
314
315 Returns:
316 A KeyNotification if the message is a valid keyspace/keyevent
317 notification, None otherwise.
318
319 Example:
320 >>> message = {
321 ... 'type': 'pmessage',
322 ... 'pattern': '__keyspace@0__:user:*',
323 ... 'channel': '__keyspace@0__:user:123',
324 ... 'data': 'set'
325 ... }
326 >>> notification = KeyNotification.from_message(message)
327 >>> notification.key
328 'user:123'
329 >>> notification.event_type
330 'set'
331 """
332 if message is None:
333 return None
334
335 msg_type = message.get("type")
336 if msg_type not in ("message", "pmessage"):
337 return None
338
339 channel = message.get("channel")
340 data = message.get("data")
341
342 if channel is None or data is None:
343 return None
344
345 return cls.try_parse(channel, data, key_prefix)
346
347 @classmethod
348 def try_parse(
349 cls,
350 channel: str | bytes,
351 data: str | bytes,
352 key_prefix: str | bytes | None = None,
353 ) -> KeyNotification | None:
354 """
355 Try to parse a channel and data into a KeyNotification.
356
357 This is a lower-level method that takes the channel and data directly,
358 useful when working with callback-based subscription handlers.
359
360 Args:
361 channel: The channel name (e.g., "__keyspace@0__:mykey")
362 data: The message data (event type for keyspace, key for keyevent)
363 key_prefix: Optional prefix to filter and strip from keys
364
365 Returns:
366 A KeyNotification if valid, None otherwise.
367 """
368 channel = safe_str(channel)
369 data = safe_str(data)
370
371 return cls._parse(channel, data, key_prefix)
372
373 @classmethod
374 def _parse(
375 cls,
376 channel: str,
377 data: str,
378 key_prefix: str | bytes | None = None,
379 ) -> KeyNotification | None:
380 """Internal parsing logic."""
381 # Normalize key_prefix
382 key_prefix = safe_str(key_prefix) if key_prefix else None
383
384 # Try keyspace pattern first: __keyspace@<db>__:<key>
385 match = cls._KEYSPACE_PATTERN.match(channel)
386 if match:
387 db_str, key = match.groups()
388 database = int(db_str) if db_str != "*" else -1
389 event_type = data # For keyspace, the data is the event type
390
391 # Apply key prefix filter
392 if key_prefix:
393 if not key.startswith(key_prefix):
394 return None
395 key = key[len(key_prefix) :]
396
397 return cls(
398 key=key,
399 event_type=event_type,
400 database=database,
401 channel=channel,
402 is_keyspace=True,
403 data=data,
404 )
405
406 # Try keyevent pattern: __keyevent@<db>__:<event>
407 match = cls._KEYEVENT_PATTERN.match(channel)
408 if match:
409 db_str, event_type = match.groups()
410 database = int(db_str) if db_str != "*" else -1
411 key = data # For keyevent, the data is the key
412
413 # Apply key prefix filter
414 if key_prefix:
415 if not key.startswith(key_prefix):
416 return None
417 key = key[len(key_prefix) :]
418
419 return cls(
420 key=key,
421 event_type=event_type,
422 database=database,
423 channel=channel,
424 is_keyspace=False,
425 data=data,
426 )
427
428 # Try subkeyspace: channel=__subkeyspace@<db>__:<key>
429 # data=<event>|<subkey_len>:<subkey>[,<subkey_len>:<subkey>...]
430 match = cls._SUBKEYSPACE_PATTERN.match(channel)
431 if match:
432 db_str, key = match.groups()
433 database = int(db_str) if db_str != "*" else -1
434 pipe_idx = data.index("|")
435 event_type = data[:pipe_idx]
436 subkeys = _parse_length_prefixed_subkeys(data[pipe_idx + 1 :])
437
438 if key_prefix:
439 if not key.startswith(key_prefix):
440 return None
441 key = key[len(key_prefix) :]
442
443 return cls(
444 key=key,
445 event_type=event_type,
446 database=database,
447 channel=channel,
448 is_keyspace=True,
449 data=data,
450 subkeys=subkeys,
451 )
452
453 # Try subkeyevent: channel=__subkeyevent@<db>__:<event>
454 # data=<key_len>:<key>|<subkey_len>:<subkey>[,...]
455 match = cls._SUBKEYEVENT_PATTERN.match(channel)
456 if match:
457 db_str, event_type = match.groups()
458 database = int(db_str) if db_str != "*" else -1
459 # Parse key by length prefix
460 colon_idx = data.index(":")
461 key_len = int(data[:colon_idx])
462 key_start = colon_idx + 1
463 key = data[key_start : key_start + key_len]
464 # After key, expect '|' then subkeys
465 subkeys_start = key_start + key_len + 1 # +1 for '|'
466 subkeys = _parse_length_prefixed_subkeys(data[subkeys_start:])
467
468 if key_prefix:
469 if not key.startswith(key_prefix):
470 return None
471 key = key[len(key_prefix) :]
472
473 return cls(
474 key=key,
475 event_type=event_type,
476 database=database,
477 channel=channel,
478 is_keyspace=False,
479 data=data,
480 subkeys=subkeys,
481 )
482
483 # Try subkeyspaceitem: channel=__subkeyspaceitem@<db>__:<key>\n<subkey>
484 # data=<event>
485 match = cls._SUBKEYSPACEITEM_PATTERN.match(channel)
486 if match:
487 db_str, key_and_subkey = match.groups()
488 database = int(db_str) if db_str != "*" else -1
489 newline_idx = key_and_subkey.index("\n")
490 key = key_and_subkey[:newline_idx]
491 subkey = key_and_subkey[newline_idx + 1 :]
492 event_type = data
493
494 if key_prefix:
495 if not key.startswith(key_prefix):
496 return None
497 key = key[len(key_prefix) :]
498
499 return cls(
500 key=key,
501 event_type=event_type,
502 database=database,
503 channel=channel,
504 is_keyspace=True,
505 data=data,
506 subkeys=[subkey],
507 )
508
509 # Try subkeyspaceevent: channel=__subkeyspaceevent@<db>__:<event>|<key>
510 # data=<subkey_len>:<subkey>[,...]
511 match = cls._SUBKEYSPACEEVENT_PATTERN.match(channel)
512 if match:
513 db_str, event_and_key = match.groups()
514 database = int(db_str) if db_str != "*" else -1
515 pipe_idx = event_and_key.index("|")
516 event_type = event_and_key[:pipe_idx]
517 key = event_and_key[pipe_idx + 1 :]
518 subkeys = _parse_length_prefixed_subkeys(data)
519
520 if key_prefix:
521 if not key.startswith(key_prefix):
522 return None
523 key = key[len(key_prefix) :]
524
525 return cls(
526 key=key,
527 event_type=event_type,
528 database=database,
529 channel=channel,
530 is_keyspace=False,
531 data=data,
532 subkeys=subkeys,
533 )
534
535 return None
536
537 def key_starts_with(self, prefix: str | bytes) -> bool:
538 """Check if the key starts with the given prefix."""
539 prefix = safe_str(prefix)
540 return self.key.startswith(prefix)
541
542
543# =============================================================================
544# Channel Classes
545# =============================================================================
546
547
548class KeyspaceChannel:
549 """
550 Represents a keyspace notification channel for subscribing to events on keys.
551
552 Keyspace notifications publish the event type (e.g., "set", "del") as the message
553 when a key matching the pattern is modified.
554
555 This class can be used directly with subscribe()/psubscribe() as it implements
556 __str__ to return the channel string.
557
558 Attributes:
559 key_or_pattern: The key or pattern to monitor (use '*' for wildcards)
560 db: The database number (defaults to 0, the only database in Redis Cluster)
561 is_pattern: Whether this channel contains wildcards
562
563 Examples:
564 >>> channel = KeyspaceChannel("user:123", db=0)
565 >>> str(channel)
566 '__keyspace@0__:user:123'
567
568 >>> # Pattern subscription (wildcards are auto-detected)
569 >>> channel = KeyspaceChannel("user:*", db=0)
570 >>> str(channel)
571 '__keyspace@0__:user:*'
572
573 >>> # Use with KeyspaceNotifications
574 >>> notifications = KeyspaceNotifications(redis_client)
575 >>> notifications.subscribe(channel)
576 """
577
578 PREFIX: ClassVar[str] = "__keyspace@"
579
580 def __init__(self, key_or_pattern: str, db: int = 0):
581 """
582 Create a keyspace notification channel.
583
584 Args:
585 key_or_pattern: The key or pattern to monitor. Use '*' for wildcards.
586 db: The database number. Defaults to 0 (the only database in Redis Cluster).
587 """
588 self.key_or_pattern = key_or_pattern
589 self.db = db
590 self._channel_str = self._build_channel_string()
591
592 def _build_channel_string(self) -> str:
593 return f"{self.PREFIX}{self.db}__:{self.key_or_pattern}"
594
595 @property
596 def is_pattern(self) -> bool:
597 """Check if this channel contains wildcards and should use psubscribe."""
598 return _is_pattern(self.key_or_pattern)
599
600 def __str__(self) -> str:
601 return self._channel_str
602
603 def __repr__(self) -> str:
604 return f"KeyspaceChannel({self.key_or_pattern!r}, db={self.db})"
605
606 def __eq__(self, other: object) -> bool:
607 if isinstance(other, KeyspaceChannel):
608 return self._channel_str == other._channel_str
609 if isinstance(other, str):
610 return self._channel_str == other
611 return NotImplemented
612
613 def __hash__(self) -> int:
614 return hash(self._channel_str)
615
616
617class KeyeventChannel:
618 """
619 Represents a keyevent notification channel for subscribing to event types.
620
621 Keyevent notifications publish the key name as the message when the specified
622 event type occurs on any key.
623
624 This class can be used directly with subscribe()/psubscribe() as it implements
625 __str__ to return the channel string.
626
627 Attributes:
628 event: The event type to monitor
629 db: The database number (defaults to 0, the only database in Redis Cluster)
630 is_pattern: Whether this channel contains wildcards
631
632 Examples:
633 >>> channel = KeyeventChannel(EventType.SET, db=0)
634 >>> str(channel)
635 '__keyevent@0__:set'
636
637 >>> channel = KeyeventChannel.all_events(db=0)
638 >>> str(channel)
639 '__keyevent@0__:*'
640
641 >>> # Use with KeyspaceNotifications
642 >>> notifications = KeyspaceNotifications(redis_client)
643 >>> notifications.subscribe(channel)
644 """
645
646 PREFIX: ClassVar[str] = "__keyevent@"
647
648 def __init__(self, event: str, db: int = 0):
649 """
650 Create a keyevent notification channel.
651
652 Args:
653 event: The event type to monitor (e.g., EventType.SET or "set")
654 db: The database number. Defaults to 0 (the only database in Redis Cluster).
655 """
656 self.event = event
657 self.db = db
658 self._channel_str = self._build_channel_string()
659
660 def _build_channel_string(self) -> str:
661 return f"{self.PREFIX}{self.db}__:{self.event}"
662
663 @property
664 def is_pattern(self) -> bool:
665 """Check if this channel contains wildcards and should use psubscribe."""
666 return _is_pattern(self.event)
667
668 @classmethod
669 def all_events(cls, db: int = 0) -> "KeyeventChannel":
670 """
671 Create a keyevent pattern for subscribing to all event types.
672
673 This is equivalent to KeyeventChannel("*").
674
675 Args:
676 db: The database number. Defaults to 0 (the only database in Redis Cluster).
677
678 Returns:
679 A KeyeventChannel configured to receive all events.
680
681 Examples:
682 >>> channel = KeyeventChannel.all_events()
683 >>> str(channel)
684 '__keyevent@0__:*'
685 """
686 return cls("*", db=db)
687
688 def __str__(self) -> str:
689 return self._channel_str
690
691 def __repr__(self) -> str:
692 return f"KeyeventChannel({self.event!r}, db={self.db})"
693
694 def __eq__(self, other: object) -> bool:
695 if isinstance(other, KeyeventChannel):
696 return self._channel_str == other._channel_str
697 if isinstance(other, str):
698 return self._channel_str == other
699 return NotImplemented
700
701 def __hash__(self) -> int:
702 return hash(self._channel_str)
703
704
705class SubkeyspaceChannel:
706 """
707 Represents a subkeyspace notification channel for subscribing to
708 subkey-level events on keys (e.g., hash field changes).
709
710 The channel format is ``__subkeyspace@<db>__:<key>``.
711 The message payload is ``<event>|<subkey_len>:<subkey>[,...]``.
712
713 Examples:
714 >>> channel = SubkeyspaceChannel("myhash", db=0)
715 >>> str(channel)
716 '__subkeyspace@0__:myhash'
717 """
718
719 PREFIX: ClassVar[str] = "__subkeyspace@"
720
721 def __init__(self, key_or_pattern: str, db: int = 0):
722 self.key_or_pattern = key_or_pattern
723 self.db = db
724 self._channel_str = self._build_channel_string()
725
726 def _build_channel_string(self) -> str:
727 return f"{self.PREFIX}{self.db}__:{self.key_or_pattern}"
728
729 @property
730 def is_pattern(self) -> bool:
731 return _is_pattern(self.key_or_pattern)
732
733 def __str__(self) -> str:
734 return self._channel_str
735
736 def __repr__(self) -> str:
737 return f"SubkeyspaceChannel({self.key_or_pattern!r}, db={self.db})"
738
739 def __eq__(self, other: object) -> bool:
740 if isinstance(other, SubkeyspaceChannel):
741 return self._channel_str == other._channel_str
742 if isinstance(other, str):
743 return self._channel_str == other
744 return NotImplemented
745
746 def __hash__(self) -> int:
747 return hash(self._channel_str)
748
749
750class SubkeyeventChannel:
751 """
752 Represents a subkeyevent notification channel for subscribing to
753 specific event types with subkey-level detail.
754
755 The channel format is ``__subkeyevent@<db>__:<event>``.
756 The message payload is ``<key_len>:<key>|<subkey_len>:<subkey>[,...]``.
757
758 Examples:
759 >>> channel = SubkeyeventChannel("hdel", db=0)
760 >>> str(channel)
761 '__subkeyevent@0__:hdel'
762 """
763
764 PREFIX: ClassVar[str] = "__subkeyevent@"
765
766 def __init__(self, event: str, db: int = 0):
767 self.event = event
768 self.db = db
769 self._channel_str = self._build_channel_string()
770
771 def _build_channel_string(self) -> str:
772 return f"{self.PREFIX}{self.db}__:{self.event}"
773
774 @property
775 def is_pattern(self) -> bool:
776 return _is_pattern(self.event)
777
778 @classmethod
779 def all_events(cls, db: int = 0) -> SubkeyeventChannel:
780 """Create a channel for all subkeyevent types."""
781 return cls("*", db=db)
782
783 def __str__(self) -> str:
784 return self._channel_str
785
786 def __repr__(self) -> str:
787 return f"SubkeyeventChannel({self.event!r}, db={self.db})"
788
789 def __eq__(self, other: object) -> bool:
790 if isinstance(other, SubkeyeventChannel):
791 return self._channel_str == other._channel_str
792 if isinstance(other, str):
793 return self._channel_str == other
794 return NotImplemented
795
796 def __hash__(self) -> int:
797 return hash(self._channel_str)
798
799
800class SubkeyspaceitemChannel:
801 """
802 Represents a subkeyspaceitem notification channel for subscribing to
803 events on a specific subkey (field) of a specific key.
804
805 The channel format is ``__subkeyspaceitem@<db>__:<key>\\n<subkey>``.
806 The message payload is the event type (e.g., ``"hset"``).
807
808 Note:
809 The server only emits this notification when the key does not
810 contain a newline character.
811
812 Examples:
813 >>> channel = SubkeyspaceitemChannel("myhash", "myfield", db=0)
814 >>> str(channel)
815 '__subkeyspaceitem@0__:myhash\\nmyfield'
816 """
817
818 PREFIX: ClassVar[str] = "__subkeyspaceitem@"
819
820 def __init__(self, key_or_pattern: str, subkey_or_pattern: str, db: int = 0):
821 self.key_or_pattern = key_or_pattern
822 self.subkey_or_pattern = subkey_or_pattern
823 self.db = db
824 self._channel_str = self._build_channel_string()
825
826 def _build_channel_string(self) -> str:
827 return (
828 f"{self.PREFIX}{self.db}__:{self.key_or_pattern}\n{self.subkey_or_pattern}"
829 )
830
831 @property
832 def is_pattern(self) -> bool:
833 return _is_pattern(self.key_or_pattern) or _is_pattern(self.subkey_or_pattern)
834
835 def __str__(self) -> str:
836 return self._channel_str
837
838 def __repr__(self) -> str:
839 return (
840 f"SubkeyspaceitemChannel({self.key_or_pattern!r}, "
841 f"{self.subkey_or_pattern!r}, db={self.db})"
842 )
843
844 def __eq__(self, other: object) -> bool:
845 if isinstance(other, SubkeyspaceitemChannel):
846 return self._channel_str == other._channel_str
847 if isinstance(other, str):
848 return self._channel_str == other
849 return NotImplemented
850
851 def __hash__(self) -> int:
852 return hash(self._channel_str)
853
854
855class SubkeyspaceeventChannel:
856 """
857 Represents a subkeyspaceevent notification channel for subscribing to
858 a specific event on a specific key, receiving affected subkeys.
859
860 The channel format is ``__subkeyspaceevent@<db>__:<event>|<key>``.
861 The message payload is a length-prefixed subkey list.
862
863 Examples:
864 >>> channel = SubkeyspaceeventChannel("hset", "myhash", db=0)
865 >>> str(channel)
866 '__subkeyspaceevent@0__:hset|myhash'
867 """
868
869 PREFIX: ClassVar[str] = "__subkeyspaceevent@"
870
871 def __init__(self, event: str, key_or_pattern: str, db: int = 0):
872 self.event = event
873 self.key_or_pattern = key_or_pattern
874 self.db = db
875 self._channel_str = self._build_channel_string()
876
877 def _build_channel_string(self) -> str:
878 return f"{self.PREFIX}{self.db}__:{self.event}|{self.key_or_pattern}"
879
880 @property
881 def is_pattern(self) -> bool:
882 return _is_pattern(self.event) or _is_pattern(self.key_or_pattern)
883
884 def __str__(self) -> str:
885 return self._channel_str
886
887 def __repr__(self) -> str:
888 return (
889 f"SubkeyspaceeventChannel({self.event!r}, "
890 f"{self.key_or_pattern!r}, db={self.db})"
891 )
892
893 def __eq__(self, other: object) -> bool:
894 if isinstance(other, SubkeyspaceeventChannel):
895 return self._channel_str == other._channel_str
896 if isinstance(other, str):
897 return self._channel_str == other
898 return NotImplemented
899
900 def __hash__(self) -> int:
901 return hash(self._channel_str)
902
903
904class ChannelType(Enum):
905 """
906 Enum representing the type of a Redis keyspace notification channel.
907
908 Redis provides two types of keyspace notifications and four subkey
909 notification types:
910
911 - KEYSPACE: ``__keyspace@{db}__:{key}`` — data is the event type.
912 - KEYEVENT: ``__keyevent@{db}__:{event}`` — data is the key name.
913 - SUBKEYSPACE: ``__subkeyspace@{db}__:{key}`` — data is event + subkeys.
914 - SUBKEYEVENT: ``__subkeyevent@{db}__:{event}`` — data is key + subkeys.
915 - SUBKEYSPACEITEM: ``__subkeyspaceitem@{db}__:{key}\\n{subkey}`` — data
916 is the event type.
917 - SUBKEYSPACEEVENT: ``__subkeyspaceevent@{db}__:{event}|{key}`` — data
918 is a subkey list.
919
920 Examples:
921 >>> get_channel_type("__keyspace@0__:mykey")
922 ChannelType.KEYSPACE
923 >>> get_channel_type("__subkeyspace@0__:myhash")
924 ChannelType.SUBKEYSPACE
925 """
926
927 KEYSPACE = "keyspace"
928 KEYEVENT = "keyevent"
929 SUBKEYSPACE = "subkeyspace"
930 SUBKEYEVENT = "subkeyevent"
931 SUBKEYSPACEITEM = "subkeyspaceitem"
932 SUBKEYSPACEEVENT = "subkeyspaceevent"
933
934
935def get_channel_type(channel: str | bytes) -> ChannelType | None:
936 """
937 Determine the type of a Redis keyspace notification channel.
938
939 Args:
940 channel: The channel name to check (string or bytes).
941
942 Returns:
943 ChannelType.KEYSPACE if it's a keyspace notification channel,
944 ChannelType.KEYEVENT if it's a keyevent notification channel,
945 None if it's not a keyspace notification channel.
946
947 Examples:
948 >>> get_channel_type("__keyspace@0__:mykey")
949 ChannelType.KEYSPACE
950 >>> get_channel_type("__keyevent@0__:set")
951 ChannelType.KEYEVENT
952 >>> get_channel_type("regular_channel") is None
953 True
954 >>> get_channel_type(b"__keyspace@0__:mykey")
955 ChannelType.KEYSPACE
956 """
957 channel_str = safe_str(channel)
958 # Check subkey prefixes first (they are longer and more specific)
959 if channel_str.startswith(SubkeyspaceitemChannel.PREFIX):
960 return ChannelType.SUBKEYSPACEITEM
961 if channel_str.startswith(SubkeyspaceeventChannel.PREFIX):
962 return ChannelType.SUBKEYSPACEEVENT
963 if channel_str.startswith(SubkeyspaceChannel.PREFIX):
964 return ChannelType.SUBKEYSPACE
965 if channel_str.startswith(SubkeyeventChannel.PREFIX):
966 return ChannelType.SUBKEYEVENT
967 if channel_str.startswith(KeyspaceChannel.PREFIX):
968 return ChannelType.KEYSPACE
969 if channel_str.startswith(KeyeventChannel.PREFIX):
970 return ChannelType.KEYEVENT
971 return None
972
973
974def _is_pattern(
975 channel: str | bytes | KeyspaceChannel | KeyeventChannel,
976) -> bool:
977 """
978 Check if a channel string contains glob-style pattern characters.
979
980 Redis uses glob-style patterns for psubscribe:
981 - * matches any sequence of characters
982 - ? matches any single character
983 - [...] matches any character in the brackets
984
985 Args:
986 channel: The channel string to check. Can be a string, bytes,
987 or a KeyspaceChannel/KeyeventChannel object.
988
989 Returns:
990 True if the channel contains pattern characters, False otherwise.
991 """
992 # Handle Channel objects that have _channel_str attribute
993 # (KeyspaceChannel, KeyeventChannel)
994 if hasattr(channel, "_channel_str"):
995 channel = channel._channel_str
996 channel = safe_str(channel)
997 # Check for unescaped glob pattern characters.
998 # * and ? are always pattern characters.
999 # [ is only a pattern character when followed by a matching unescaped ],
1000 # forming a bracket expression like [abc] or [a-z]. A lone [ (e.g. in
1001 # a key named "my[key") is treated as a literal by Redis.
1002 i = 0
1003 while i < len(channel):
1004 char = channel[i]
1005 if char == "\\":
1006 # Skip escaped character
1007 i += 2
1008 continue
1009 if char in ("*", "?"):
1010 return True
1011 if char == "[":
1012 # Look for a matching unescaped ]
1013 j = i + 1
1014 while j < len(channel):
1015 if channel[j] == "\\":
1016 j += 2
1017 continue
1018 if channel[j] == "]":
1019 return True
1020 j += 1
1021 # No matching ] found — literal [
1022 i += 1
1023 return False
1024
1025
1026# =============================================================================
1027# Abstract Base Class for Keyspace Notifications
1028# =============================================================================
1029
1030
1031class KeyspaceNotificationsInterface(ABC):
1032 """
1033 Interface for keyspace notification managers.
1034
1035 This interface provides a consistent API for both standalone (KeyspaceNotifications)
1036 and cluster (ClusterKeyspaceNotifications) implementations, allowing the same
1037 code patterns to work with both standalone and cluster Redis deployments.
1038 """
1039
1040 @abstractmethod
1041 def subscribe(
1042 self,
1043 *channels: ChannelT,
1044 handler: SyncHandlerT | None = None,
1045 ):
1046 """Subscribe to keyspace notification channels."""
1047 pass
1048
1049 @abstractmethod
1050 def unsubscribe(self, *channels: ChannelT):
1051 """Unsubscribe from keyspace notification channels."""
1052 pass
1053
1054 @abstractmethod
1055 def subscribe_keyspace(
1056 self,
1057 key_or_pattern: str,
1058 db: int = 0,
1059 handler: SyncHandlerT | None = None,
1060 ):
1061 """Subscribe to keyspace notifications for specific keys."""
1062 pass
1063
1064 @abstractmethod
1065 def subscribe_keyevent(
1066 self,
1067 event: str,
1068 db: int = 0,
1069 handler: SyncHandlerT | None = None,
1070 ):
1071 """Subscribe to keyevent notifications for specific event types."""
1072 pass
1073
1074 @abstractmethod
1075 def subscribe_subkeyspace(
1076 self,
1077 key_or_pattern: str,
1078 db: int = 0,
1079 handler: SyncHandlerT | None = None,
1080 ):
1081 """Subscribe to subkeyspace notifications for specific keys."""
1082 pass
1083
1084 @abstractmethod
1085 def subscribe_subkeyevent(
1086 self,
1087 event: str,
1088 db: int = 0,
1089 handler: SyncHandlerT | None = None,
1090 ):
1091 """Subscribe to subkeyevent notifications for specific event types."""
1092 pass
1093
1094 @abstractmethod
1095 def subscribe_subkeyspaceitem(
1096 self,
1097 key_or_pattern: str,
1098 subkey_or_pattern: str,
1099 db: int = 0,
1100 handler: SyncHandlerT | None = None,
1101 ):
1102 """Subscribe to subkeyspaceitem notifications for a specific subkey."""
1103 pass
1104
1105 @abstractmethod
1106 def subscribe_subkeyspaceevent(
1107 self,
1108 event: str,
1109 key_or_pattern: str,
1110 db: int = 0,
1111 handler: SyncHandlerT | None = None,
1112 ):
1113 """Subscribe to subkeyspaceevent notifications for an event on a key."""
1114 pass
1115
1116 @abstractmethod
1117 def get_message(
1118 self,
1119 ignore_subscribe_messages: bool | None = None,
1120 timeout: float = 0.0,
1121 ) -> KeyNotification | None:
1122 """Get the next keyspace notification if one is available."""
1123 pass
1124
1125 @abstractmethod
1126 def listen(self):
1127 """Listen for keyspace notifications."""
1128 pass
1129
1130 @abstractmethod
1131 def close(self):
1132 """Close the notification manager and clean up resources."""
1133 pass
1134
1135 @abstractmethod
1136 def __enter__(self):
1137 pass
1138
1139 @abstractmethod
1140 def __exit__(self, _exc_type, _exc_val, _exc_tb):
1141 pass
1142
1143 @property
1144 @abstractmethod
1145 def subscribed(self) -> bool:
1146 """Check if there are any active subscriptions and not closed."""
1147 pass
1148
1149 @abstractmethod
1150 def run_in_thread(
1151 self,
1152 poll_timeout: float = 0.0,
1153 daemon: bool = False,
1154 exception_handler: Callable[
1155 [
1156 BaseException,
1157 KeyspaceNotificationsInterface,
1158 KeyspaceWorkerThread,
1159 ],
1160 None,
1161 ]
1162 | None = None,
1163 ) -> KeyspaceWorkerThread:
1164 """Start a background thread that polls for notifications."""
1165 pass
1166
1167
1168class AbstractKeyspaceNotifications(KeyspaceNotificationsInterface):
1169 """
1170 Abstract base class for keyspace notification managers.
1171
1172 Provides shared implementation for subscribe/unsubscribe logic.
1173 Subclasses must implement:
1174 - _execute_subscribe: Execute the subscribe operation
1175 - _execute_unsubscribe: Execute the unsubscribe operation
1176 - get_message: Get the next notification
1177 - listen: Generator for notifications
1178 - close: Clean up resources
1179 """
1180
1181 def __init__(
1182 self,
1183 key_prefix: str | bytes | None = None,
1184 ignore_subscribe_messages: bool = True,
1185 ):
1186 """
1187 Initialize the base keyspace notification manager.
1188
1189 Args:
1190 key_prefix: Optional prefix to filter and strip from keys in notifications
1191 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations
1192 are not returned by get_message/listen
1193 """
1194 self.key_prefix = key_prefix
1195 self.ignore_subscribe_messages = ignore_subscribe_messages
1196 self._closed = False
1197
1198 def subscribe(
1199 self,
1200 *channels: ChannelT,
1201 handler: SyncHandlerT | None = None,
1202 ):
1203 """
1204 Subscribe to keyspace notification channels.
1205
1206 Automatically detects whether each channel is a pattern (contains
1207 wildcards like *, ?, [) or an exact channel name and uses the
1208 appropriate Redis subscribe command internally.
1209
1210 Args:
1211 *channels: Channels to subscribe to. Can be strings, KeyspaceChannel,
1212 or KeyeventChannel objects. Patterns are auto-detected.
1213 handler: Optional callback function that receives KeyNotification
1214 objects. If provided, notifications are passed to the handler
1215 instead of being returned by get_message()/listen().
1216 """
1217 # Wrap the handler to convert raw messages to KeyNotification objects
1218 wrapped_handler: Callable | None = None
1219 if handler is not None:
1220 # Capture key_prefix in closure for consistent filtering/stripping
1221 key_prefix = self.key_prefix
1222
1223 def _wrap_handler(message):
1224 notification = KeyNotification.from_message(
1225 message, key_prefix=key_prefix
1226 )
1227 if notification is not None:
1228 handler(notification)
1229
1230 wrapped_handler = _wrap_handler
1231
1232 patterns = {}
1233 exact_channels = {}
1234
1235 for channel in channels:
1236 if hasattr(channel, "_channel_str"):
1237 channel_str = str(channel)
1238 else:
1239 channel_str = safe_str(channel)
1240 if _is_pattern(channel):
1241 patterns[channel_str] = wrapped_handler
1242 else:
1243 exact_channels[channel_str] = wrapped_handler
1244
1245 # Delegate to subclass implementation first. For standalone Redis
1246 # this raises on failure, keeping tracking state clean. For cluster
1247 # implementations the operation is best-effort (partial failures are
1248 # logged, not raised) so tracking state is always updated afterwards.
1249 self._execute_subscribe(patterns, exact_channels)
1250 self._track_subscribe(patterns, exact_channels)
1251
1252 @abstractmethod
1253 def _execute_subscribe(
1254 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
1255 ) -> None:
1256 """
1257 Execute the subscribe operation.
1258
1259 Args:
1260 patterns: Dict mapping pattern strings to handlers (for psubscribe)
1261 exact_channels: Dict mapping channel strings to handlers (for subscribe)
1262 """
1263 pass
1264
1265 def unsubscribe(self, *channels: ChannelT):
1266 """
1267 Unsubscribe from keyspace notification channels.
1268
1269 Automatically detects whether each channel is a pattern or exact
1270 channel and uses the appropriate Redis unsubscribe command.
1271
1272 Args:
1273 *channels: Channels to unsubscribe from.
1274 """
1275 patterns = []
1276 exact_channels = []
1277
1278 for channel in channels:
1279 if hasattr(channel, "_channel_str"):
1280 channel_str = str(channel)
1281 else:
1282 channel_str = safe_str(channel)
1283 if _is_pattern(channel):
1284 patterns.append(channel_str)
1285 else:
1286 exact_channels.append(channel_str)
1287
1288 # Delegate to subclass implementation first. For standalone Redis
1289 # this raises on failure, keeping tracking state intact. For cluster
1290 # implementations the operation is best-effort (partial failures are
1291 # logged, not raised) so tracking state is always removed afterwards
1292 # — this is intentional: the user asked to unsubscribe, so
1293 # refresh_subscriptions should not re-subscribe these channels.
1294 self._execute_unsubscribe(patterns, exact_channels)
1295 self._untrack_subscribe(patterns, exact_channels)
1296
1297 @abstractmethod
1298 def _execute_unsubscribe(
1299 self, patterns: list[str], exact_channels: list[str]
1300 ) -> None:
1301 """
1302 Execute the unsubscribe operation.
1303
1304 Args:
1305 patterns: List of pattern strings to punsubscribe from
1306 exact_channels: List of channel strings to unsubscribe from
1307 """
1308 pass
1309
1310 def _track_subscribe(
1311 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
1312 ) -> None:
1313 """Track newly subscribed patterns/channels.
1314
1315 Override in subclasses that need to maintain their own subscription
1316 registry (e.g. cluster implementations that must re-subscribe
1317 new/failed-over nodes). The default is a no-op because standalone
1318 implementations delegate tracking to the underlying PubSub object.
1319 """
1320
1321 def _untrack_subscribe(
1322 self, patterns: list[str], exact_channels: list[str]
1323 ) -> None:
1324 """Remove patterns/channels from the subscription registry.
1325
1326 Override in subclasses that maintain their own subscription registry.
1327 The default is a no-op.
1328 """
1329
1330 def subscribe_keyspace(
1331 self,
1332 key_or_pattern: str,
1333 db: int = 0,
1334 handler: SyncHandlerT | None = None,
1335 ):
1336 """
1337 Subscribe to keyspace notifications for specific keys.
1338
1339 Args:
1340 key_or_pattern: The key or pattern to monitor. Use '*' for wildcards.
1341 db: The database number (default 0).
1342 handler: Optional callback for notifications.
1343
1344 Example:
1345 >>> ksn.subscribe_keyspace("user:123", db=0)
1346 >>> ksn.subscribe_keyspace("user:*", db=0)
1347 """
1348 channel = KeyspaceChannel(key_or_pattern, db=db)
1349 self.subscribe(channel, handler=handler)
1350
1351 def subscribe_keyevent(
1352 self,
1353 event: str,
1354 db: int = 0,
1355 handler: SyncHandlerT | None = None,
1356 ):
1357 """
1358 Subscribe to keyevent notifications for specific event types.
1359
1360 Args:
1361 event: The event type to monitor (e.g., EventType.SET or "set")
1362 db: The database number (default 0).
1363 handler: Optional callback for notifications.
1364
1365 Example:
1366 >>> ksn.subscribe_keyevent(EventType.SET)
1367 >>> ksn.subscribe_keyevent(EventType.EXPIRED, handler=my_handler)
1368 """
1369 channel = KeyeventChannel(event, db=db)
1370 self.subscribe(channel, handler=handler)
1371
1372 def subscribe_subkeyspace(
1373 self,
1374 key_or_pattern: str,
1375 db: int = 0,
1376 handler: SyncHandlerT | None = None,
1377 ):
1378 """
1379 Subscribe to subkeyspace notifications for specific keys.
1380
1381 Receives events with affected subkeys (fields) for the given key.
1382
1383 Args:
1384 key_or_pattern: The key or pattern to monitor.
1385 db: The database number (default 0).
1386 handler: Optional callback for notifications.
1387 """
1388 channel = SubkeyspaceChannel(key_or_pattern, db=db)
1389 self.subscribe(channel, handler=handler)
1390
1391 def subscribe_subkeyevent(
1392 self,
1393 event: str,
1394 db: int = 0,
1395 handler: SyncHandlerT | None = None,
1396 ):
1397 """
1398 Subscribe to subkeyevent notifications for specific event types.
1399
1400 Receives the affected key and subkeys when the given event occurs.
1401
1402 Args:
1403 event: The event type to monitor (e.g., "hset", "hdel").
1404 db: The database number (default 0).
1405 handler: Optional callback for notifications.
1406 """
1407 channel = SubkeyeventChannel(event, db=db)
1408 self.subscribe(channel, handler=handler)
1409
1410 def subscribe_subkeyspaceitem(
1411 self,
1412 key_or_pattern: str,
1413 subkey_or_pattern: str,
1414 db: int = 0,
1415 handler: SyncHandlerT | None = None,
1416 ):
1417 """
1418 Subscribe to subkeyspaceitem notifications for a specific subkey.
1419
1420 Receives the event type when the given subkey of the given key is
1421 modified.
1422
1423 Args:
1424 key_or_pattern: The key or pattern to monitor.
1425 subkey_or_pattern: The subkey (field) or pattern to monitor.
1426 db: The database number (default 0).
1427 handler: Optional callback for notifications.
1428 """
1429 channel = SubkeyspaceitemChannel(key_or_pattern, subkey_or_pattern, db=db)
1430 self.subscribe(channel, handler=handler)
1431
1432 def subscribe_subkeyspaceevent(
1433 self,
1434 event: str,
1435 key_or_pattern: str,
1436 db: int = 0,
1437 handler: SyncHandlerT | None = None,
1438 ):
1439 """
1440 Subscribe to subkeyspaceevent notifications for an event on a key.
1441
1442 Receives the affected subkeys when the given event occurs on the
1443 given key.
1444
1445 Args:
1446 event: The event type to monitor.
1447 key_or_pattern: The key or pattern to monitor.
1448 db: The database number (default 0).
1449 handler: Optional callback for notifications.
1450 """
1451 channel = SubkeyspaceeventChannel(event, key_or_pattern, db=db)
1452 self.subscribe(channel, handler=handler)
1453
1454 def __enter__(self):
1455 return self
1456
1457 def __exit__(self, _exc_type, _exc_val, _exc_tb):
1458 self.close()
1459 return False
1460
1461 def run_in_thread(
1462 self,
1463 poll_timeout: float = 0.0,
1464 daemon: bool = False,
1465 exception_handler: Callable[
1466 [
1467 BaseException,
1468 KeyspaceNotificationsInterface,
1469 KeyspaceWorkerThread,
1470 ],
1471 None,
1472 ]
1473 | None = None,
1474 ) -> KeyspaceWorkerThread:
1475 """
1476 Start a background thread that polls for notifications and triggers handlers.
1477
1478 This method spawns a thread that continuously calls get_message() to
1479 process incoming notifications. When a notification arrives, any
1480 registered handler for that channel/pattern is invoked automatically.
1481
1482 All subscriptions must have handlers registered before calling this method.
1483
1484 Args:
1485 poll_timeout: Timeout in seconds for get_message() calls. When no message
1486 is available, the thread waits up to this long before checking
1487 again. Default 0.0 (non-blocking). WARNING: the default
1488 causes a CPU spin-loop. It is preferred to pass a positive
1489 value (e.g. 0.1 or 1.0).
1490 daemon: If True, the thread will be a daemon thread and will be
1491 terminated when the main program exits. Default False.
1492 exception_handler: Optional callback invoked when an exception occurs
1493 in the worker thread. Receives (exception, notifications,
1494 thread) as arguments. If None, exceptions are raised.
1495
1496 Returns:
1497 KeyspaceWorkerThread: The started worker thread. Call stop() on it
1498 to stop the thread and close the notifications.
1499
1500 Raises:
1501 RedisError: If any subscription doesn't have a handler registered.
1502
1503 Example:
1504 >>> def my_handler(notification):
1505 ... print(f"Got: {notification.key} - {notification.event_type}")
1506 >>>
1507 >>> notifications.subscribe(KeyspaceChannel("user:*"), handler=my_handler)
1508 >>> thread = notifications.run_in_thread(poll_timeout=0.1, daemon=True)
1509 >>> # ... handlers are called automatically ...
1510 >>> thread.stop()
1511 """
1512 self._validate_all_handlers()
1513
1514 thread = KeyspaceWorkerThread(
1515 self,
1516 poll_timeout,
1517 daemon=daemon,
1518 exception_handler=exception_handler,
1519 )
1520 thread.start()
1521 return thread
1522
1523 @abstractmethod
1524 def _validate_all_handlers(self) -> None:
1525 """Raise :class:`~redis.RedisError` if any subscription lacks a handler.
1526
1527 Subclasses inspect their own subscription state to perform
1528 the validation.
1529 """
1530 pass
1531
1532
1533class KeyspaceWorkerThread(threading.Thread):
1534 """
1535 Background thread for processing keyspace notifications.
1536
1537 This thread continuously polls for notifications and invokes registered
1538 handlers. It works with both KeyspaceNotifications (standalone) and
1539 ClusterKeyspaceNotifications.
1540
1541 Example:
1542 >>> thread = notifications.run_in_thread(poll_timeout=0.1)
1543 >>> # ... handlers are called automatically ...
1544 >>> thread.stop()
1545 """
1546
1547 def __init__(
1548 self,
1549 notifications: KeyspaceNotificationsInterface,
1550 poll_timeout: float,
1551 daemon: bool = False,
1552 exception_handler: Callable[
1553 [
1554 BaseException,
1555 KeyspaceNotificationsInterface,
1556 KeyspaceWorkerThread,
1557 ],
1558 None,
1559 ]
1560 | None = None,
1561 ):
1562 super().__init__()
1563 self.daemon = daemon
1564 self.notifications = notifications
1565 self.poll_timeout = poll_timeout
1566 self.exception_handler = exception_handler
1567 self._running = threading.Event()
1568
1569 def run(self) -> None:
1570 """Main loop that polls for notifications and triggers handlers."""
1571 if self._running.is_set():
1572 return
1573 self._running.set()
1574 notifications = self.notifications
1575 poll_timeout = self.poll_timeout
1576 while self._running.is_set():
1577 try:
1578 notifications.get_message(
1579 ignore_subscribe_messages=True, timeout=poll_timeout
1580 )
1581 except BaseException as e:
1582 if self.exception_handler is None:
1583 raise
1584 self.exception_handler(e, notifications, self)
1585 notifications.close()
1586
1587 def stop(self) -> None:
1588 """
1589 Stop the worker thread.
1590
1591 This signals the thread to exit its run loop. The thread will close
1592 the notifications object before terminating.
1593 """
1594 self._running.clear()
1595
1596
1597# =============================================================================
1598# Standalone Keyspace Notification Manager
1599# =============================================================================
1600
1601
1602class KeyspaceNotifications(AbstractKeyspaceNotifications):
1603 """
1604 Manages keyspace notification subscriptions for standalone Redis.
1605
1606 For standalone Redis, keyspace notifications work with a single PubSub
1607 connection. This class wraps that connection and provides:
1608 - Automatic pattern vs exact channel detection
1609 - KeyNotification parsing with optional key_prefix filtering
1610 - Convenience methods for keyspace and keyevent subscriptions
1611 - Context manager and run_in_thread support
1612 """
1613
1614 def __init__(
1615 self,
1616 redis_client: Redis,
1617 key_prefix: str | bytes | None = None,
1618 ignore_subscribe_messages: bool = True,
1619 ):
1620 """
1621 Initialize the standalone keyspace notification manager.
1622
1623 Note: Keyspace notifications must be enabled on the Redis server via
1624 the ``notify-keyspace-events`` configuration option. This is a server-side
1625 configuration that should be done by your infrastructure/operations team.
1626
1627 Args:
1628 redis_client: A Redis client instance
1629 key_prefix: Optional prefix to filter and strip from keys in notifications
1630 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations
1631 are not returned by get_message/listen
1632 """
1633 super().__init__(key_prefix, ignore_subscribe_messages)
1634 self.redis = redis_client
1635
1636 # Create the PubSub instance with ignore_subscribe_messages=False
1637 # so that the per-call argument in get_message() can control behavior
1638 self._pubsub = redis_client.pubsub(ignore_subscribe_messages=False)
1639
1640 def _execute_subscribe(
1641 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
1642 ) -> None:
1643 """Execute subscribe on the single pubsub connection."""
1644 if patterns:
1645 self._pubsub.psubscribe(**patterns)
1646 if exact_channels:
1647 self._pubsub.subscribe(**exact_channels)
1648
1649 def _execute_unsubscribe(
1650 self, patterns: list[str], exact_channels: list[str]
1651 ) -> None:
1652 """Execute unsubscribe on the single pubsub connection."""
1653 if patterns:
1654 self._pubsub.punsubscribe(*patterns)
1655 if exact_channels:
1656 self._pubsub.unsubscribe(*exact_channels)
1657
1658 def get_message(
1659 self,
1660 ignore_subscribe_messages: bool | None = None,
1661 timeout: float = 0.0,
1662 ) -> KeyNotification | None:
1663 """
1664 Get the next keyspace notification if one is available.
1665
1666 Note: If a handler was registered for the channel, pubsub will call
1667 the handler directly and this method returns None for that message.
1668
1669 Args:
1670 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages.
1671 Defaults to the value set in __init__ (True).
1672 timeout: Time to wait for a message.
1673
1674 Returns:
1675 A KeyNotification if a notification is available and no handler
1676 was registered for the channel, None otherwise.
1677 """
1678 if ignore_subscribe_messages is None:
1679 ignore_subscribe_messages = self.ignore_subscribe_messages
1680
1681 if self._closed:
1682 return None
1683
1684 # Pubsub's get_message will call wrapped handlers directly for channels
1685 # with registered handlers and return None. For channels without handlers,
1686 # it returns the raw message which we parse to KeyNotification.
1687 message = self._pubsub.get_message(
1688 ignore_subscribe_messages=ignore_subscribe_messages,
1689 timeout=timeout,
1690 )
1691
1692 if message is not None:
1693 return KeyNotification.from_message(message, key_prefix=self.key_prefix)
1694
1695 return None
1696
1697 def listen(self):
1698 """
1699 Listen for keyspace notifications.
1700
1701 This is a generator that yields KeyNotification objects as they arrive.
1702 It blocks until a notification is received.
1703
1704 Yields:
1705 KeyNotification objects for each keyspace/keyevent notification.
1706
1707 Example:
1708 >>> for notification in ksn.listen():
1709 ... print(f"{notification.key}: {notification.event_type}")
1710 """
1711 while self.subscribed:
1712 notification = self.get_message(timeout=1.0)
1713 if notification is not None:
1714 yield notification
1715
1716 @property
1717 def subscribed(self) -> bool:
1718 """Check if there are any active subscriptions and not closed."""
1719 return not self._closed and self._pubsub.subscribed
1720
1721 def _validate_all_handlers(self) -> None:
1722 """Raise if any subscription in the underlying PubSub lacks a handler."""
1723 for channel, handler in self._pubsub.channels.items():
1724 if handler is None:
1725 raise RedisError(f"Channel '{channel}' has no handler registered")
1726 for pattern, handler in self._pubsub.patterns.items():
1727 if handler is None:
1728 raise RedisError(f"Pattern '{pattern}' has no handler registered")
1729
1730 def close(self):
1731 """Close the pubsub connection and clean up resources."""
1732 self._closed = True
1733 try:
1734 self._pubsub.close()
1735 except Exception:
1736 pass
1737
1738
1739# =============================================================================
1740# Cluster-Aware Keyspace Notification Manager
1741# =============================================================================
1742
1743
1744class ClusterKeyspaceNotifications(AbstractKeyspaceNotifications):
1745 """
1746 Manages keyspace notification subscriptions across all nodes in a Redis Cluster.
1747
1748 In Redis Cluster, keyspace notifications are NOT broadcast between nodes.
1749 Each node only emits notifications for keys it owns. This class automatically
1750 subscribes to all primary nodes in the cluster and handles topology changes.
1751 """
1752
1753 def __init__(
1754 self,
1755 redis_cluster: RedisCluster,
1756 key_prefix: str | bytes | None = None,
1757 ignore_subscribe_messages: bool = True,
1758 ):
1759 """
1760 Initialize the cluster keyspace notification manager.
1761
1762 Note: Keyspace notifications must be enabled on all Redis cluster nodes via
1763 the ``notify-keyspace-events`` configuration option. This is a server-side
1764 configuration that should be done by your infrastructure/operations team.
1765
1766 Args:
1767 redis_cluster: A RedisCluster instance
1768 key_prefix: Optional prefix to filter and strip from keys in notifications
1769 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations
1770 are not returned by get_message/listen
1771 """
1772 super().__init__(key_prefix, ignore_subscribe_messages)
1773 self.cluster = redis_cluster
1774
1775 # Canonical subscription registry: pattern/channel -> wrapped handler.
1776 # In cluster mode there are multiple PubSub objects (one per node), so
1777 # this is the single source of truth used to (re-)subscribe new or
1778 # failed-over nodes.
1779 self._subscribed_patterns: dict[str, Any] = {}
1780 self._subscribed_channels: dict[str, Any] = {}
1781
1782 # Track subscriptions per node
1783 self._node_pubsubs: dict[str, Any] = {}
1784
1785 # Lock for topology refresh operations
1786 self._refresh_lock = threading.Lock()
1787
1788 # Current pubsub index for round-robin polling
1789 self._poll_index = 0
1790
1791 @property
1792 def subscribed(self) -> bool:
1793 """Check if there are any active subscriptions and not closed."""
1794 return not self._closed and bool(
1795 self._subscribed_patterns or self._subscribed_channels
1796 )
1797
1798 def _track_subscribe(
1799 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
1800 ) -> None:
1801 """Track newly subscribed patterns/channels in the cluster registry."""
1802 if patterns:
1803 self._subscribed_patterns.update(patterns)
1804 if exact_channels:
1805 self._subscribed_channels.update(exact_channels)
1806
1807 def _untrack_subscribe(
1808 self, patterns: list[str], exact_channels: list[str]
1809 ) -> None:
1810 """Remove patterns/channels from the cluster registry."""
1811 for p in patterns:
1812 self._subscribed_patterns.pop(p, None)
1813 for c in exact_channels:
1814 self._subscribed_channels.pop(c, None)
1815
1816 def _validate_all_handlers(self) -> None:
1817 """Raise if any subscription in the cluster registry lacks a handler."""
1818 for channel, handler in self._subscribed_channels.items():
1819 if handler is None:
1820 raise RedisError(f"Channel '{channel}' has no handler registered")
1821 for pattern, handler in self._subscribed_patterns.items():
1822 if handler is None:
1823 raise RedisError(f"Pattern '{pattern}' has no handler registered")
1824
1825 def _get_all_primary_nodes(self):
1826 """Get all primary nodes in the cluster."""
1827 return self.cluster.get_primaries()
1828
1829 def _cleanup_node(self, node_name: str) -> None:
1830 """Remove and close a node's PubSub.
1831
1832 Closing the ``PubSub`` disconnects its connection so it is not
1833 left in a subscribed state inside the connection pool.
1834 """
1835 pubsub = self._node_pubsubs.pop(node_name, None)
1836 if pubsub:
1837 try:
1838 pubsub.close()
1839 except Exception:
1840 pass
1841
1842 def _ensure_node_pubsub(self, node) -> Any:
1843 """Get or create a PubSub instance for a node."""
1844 if node.name not in self._node_pubsubs:
1845 redis_conn = self.cluster.get_redis_connection(node)
1846 # Always create PubSub with ignore_subscribe_messages=False
1847 # so that the per-call argument in get_message() can control
1848 # the behavior reliably
1849 pubsub = redis_conn.pubsub(ignore_subscribe_messages=False)
1850 self._node_pubsubs[node.name] = pubsub
1851 return self._node_pubsubs[node.name]
1852
1853 def _execute_subscribe(
1854 self, patterns: dict[str, Any], exact_channels: dict[str, Any]
1855 ) -> None:
1856 """Execute subscribe on all cluster nodes.
1857
1858 Patterns and exact channels are subscribed in a single pass over
1859 nodes so that a mid-batch node failure cannot create a
1860 partially-caught-up replacement. If a node fails during this
1861 call it is removed from ``_node_pubsubs`` and will be fully
1862 re-subscribed on the next ``refresh_subscriptions`` cycle.
1863
1864 If a newly discovered node is encountered (not yet in
1865 ``_node_pubsubs``), it is also subscribed to all *previously*
1866 tracked patterns/channels so it doesn't miss notifications for
1867 subscriptions that were established before this node joined.
1868 """
1869 if not patterns and not exact_channels:
1870 return
1871
1872 failed_nodes: list[str] = []
1873 for node in self._get_all_primary_nodes():
1874 is_new_node = node.name not in self._node_pubsubs
1875 pubsub = self._ensure_node_pubsub(node)
1876 try:
1877 # If this is a brand-new node, catch it up on existing
1878 # subscriptions before adding the new channels.
1879 if is_new_node:
1880 if self._subscribed_patterns:
1881 pubsub.psubscribe(**self._subscribed_patterns)
1882 if self._subscribed_channels:
1883 pubsub.subscribe(**self._subscribed_channels)
1884
1885 if patterns:
1886 pubsub.psubscribe(**patterns)
1887 if exact_channels:
1888 pubsub.subscribe(**exact_channels)
1889 except Exception:
1890 # Remove the broken pubsub so refresh_subscriptions can
1891 # re-create it later.
1892 self._cleanup_node(node.name)
1893 failed_nodes.append(node.name)
1894
1895 if failed_nodes:
1896 logger.warning(
1897 "Failed to subscribe on cluster nodes: %s. "
1898 "These nodes will be retried on the next refresh cycle.",
1899 ", ".join(failed_nodes),
1900 )
1901
1902 def _execute_unsubscribe(
1903 self, patterns: list[str], exact_channels: list[str]
1904 ) -> None:
1905 """Execute unsubscribe on all cluster nodes."""
1906 if patterns:
1907 self._unsubscribe_from_all_nodes(patterns, use_punsubscribe=True)
1908 if exact_channels:
1909 self._unsubscribe_from_all_nodes(exact_channels, use_punsubscribe=False)
1910
1911 def _unsubscribe_from_all_nodes(self, channels: list[str], use_punsubscribe: bool):
1912 """Unsubscribe from patterns/channels on all nodes.
1913
1914 Best-effort: tries every node so that a single broken connection
1915 does not prevent the remaining nodes from being unsubscribed.
1916 Broken pubsubs are cleaned up; the tracking state is still removed
1917 by the caller, so ``refresh_subscriptions`` will *not* re-subscribe
1918 these channels on replacement nodes.
1919 """
1920 failed_nodes: list[str] = []
1921 for node_name, pubsub in list(self._node_pubsubs.items()):
1922 try:
1923 if use_punsubscribe:
1924 pubsub.punsubscribe(*channels)
1925 else:
1926 pubsub.unsubscribe(*channels)
1927 except Exception:
1928 self._cleanup_node(node_name)
1929 failed_nodes.append(node_name)
1930
1931 if failed_nodes:
1932 logger.warning(
1933 "Failed to unsubscribe on cluster nodes: %s. "
1934 "These nodes will be re-created on the next refresh cycle.",
1935 ", ".join(failed_nodes),
1936 )
1937
1938 def get_message(
1939 self,
1940 ignore_subscribe_messages: bool | None = None,
1941 timeout: float = 0.0,
1942 ) -> KeyNotification | None:
1943 """
1944 Get the next keyspace notification if one is available.
1945
1946 This method polls all node pubsubs in round-robin fashion until
1947 a message is received or the timeout expires.
1948 If a connection error occurs, subscriptions are automatically refreshed.
1949
1950 Args:
1951 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages.
1952 Defaults to the value set in __init__ (True).
1953 timeout: Total time to wait for a message (distributed across all nodes)
1954
1955 Returns:
1956 A KeyNotification if a notification is available, None otherwise.
1957 """
1958 if self._closed:
1959 return None
1960
1961 total_nodes = len(self._node_pubsubs)
1962 if total_nodes == 0:
1963 # Sleep for the requested timeout so callers that loop
1964 # (run_in_thread, listen) don't spin the CPU when all node
1965 # connections have been cleaned up.
1966 if timeout > 0:
1967 time.sleep(timeout)
1968 return None
1969
1970 # Use instance default if not specified
1971 if ignore_subscribe_messages is None:
1972 ignore_subscribe_messages = self.ignore_subscribe_messages
1973
1974 # Handle timeout=0 as a single non-blocking poll over all pubsubs
1975 # This matches the expected semantics of PubSub.get_message(timeout=0)
1976 if timeout == 0.0:
1977 return self._poll_all_nodes_once(ignore_subscribe_messages)
1978
1979 # Calculate per-node timeout for each poll
1980 # Use a small timeout per node to allow round-robin polling
1981 per_node_timeout = min(0.1, timeout / max(total_nodes, 1))
1982
1983 start_time = time.monotonic()
1984 end_time = start_time + timeout
1985
1986 while True:
1987 # Check if we've exceeded the total timeout
1988 if time.monotonic() >= end_time:
1989 return None
1990
1991 pubsubs = list(self._node_pubsubs.values())
1992 if not pubsubs:
1993 return None
1994
1995 # Round-robin polling
1996 self._poll_index = self._poll_index % len(pubsubs)
1997 pubsub = pubsubs[self._poll_index]
1998 self._poll_index += 1
1999
2000 try:
2001 message = pubsub.get_message(
2002 ignore_subscribe_messages=ignore_subscribe_messages,
2003 timeout=per_node_timeout,
2004 )
2005 except (ConnectionError, TimeoutError, RedisError):
2006 # Connection error - refresh subscriptions and continue
2007 self._refresh_subscriptions_on_error()
2008 continue
2009
2010 if message is not None:
2011 # Note: If a handler was registered, PubSub already invoked it
2012 # and returned None, so we only reach here for handler-less subscriptions
2013 notification = KeyNotification.from_message(
2014 message, key_prefix=self.key_prefix
2015 )
2016 if notification is not None:
2017 return notification
2018 # If not a keyspace notification, continue checking other nodes
2019
2020 def _poll_all_nodes_once(
2021 self, ignore_subscribe_messages: bool
2022 ) -> KeyNotification | None:
2023 """
2024 Perform a single non-blocking poll over all node pubsubs.
2025
2026 This is used when timeout=0 to match the expected semantics of
2027 PubSub.get_message(timeout=0) - a non-blocking check for messages.
2028
2029 Returns:
2030 A KeyNotification if one is available, None otherwise.
2031 """
2032 had_error = False
2033 for pubsub in list(self._node_pubsubs.values()):
2034 try:
2035 message = pubsub.get_message(
2036 ignore_subscribe_messages=ignore_subscribe_messages,
2037 timeout=0.0,
2038 )
2039 except (ConnectionError, TimeoutError, RedisError):
2040 # Record the error but continue polling remaining healthy
2041 # nodes so that already-buffered notifications are not lost.
2042 had_error = True
2043 continue
2044
2045 if message is not None:
2046 # Note: If a handler was registered, PubSub already invoked it
2047 # and returned None, so we only reach here for handler-less subscriptions
2048 notification = KeyNotification.from_message(
2049 message, key_prefix=self.key_prefix
2050 )
2051 if notification is not None:
2052 # Refresh before returning if any node had an error,
2053 # so the next poll cycle has fresh state.
2054 if had_error:
2055 self._refresh_subscriptions_on_error()
2056 return notification
2057
2058 # Refresh after polling all nodes if any had errors
2059 if had_error:
2060 self._refresh_subscriptions_on_error()
2061 return None
2062
2063 def listen(self):
2064 """
2065 Listen for keyspace notifications from all cluster nodes.
2066
2067 This is a generator that yields KeyNotification objects as they arrive.
2068 It blocks until a notification is received.
2069
2070 Yields:
2071 KeyNotification objects for each keyspace/keyevent notification.
2072
2073 Example:
2074 >>> for notification in ksn.listen():
2075 ... print(f"{notification.key}: {notification.event_type}")
2076 """
2077 while self.subscribed:
2078 notification = self.get_message(timeout=1.0)
2079 if notification is not None:
2080 yield notification
2081
2082 def _refresh_subscriptions_on_error(self):
2083 """
2084 Refresh subscriptions after a connection error.
2085
2086 This is called automatically when a connection error occurs during
2087 get_message(). It checks if nodes changed before refreshing.
2088 """
2089 self._poll_index = 0 # Reset round-robin index
2090
2091 try:
2092 self.refresh_subscriptions()
2093 except Exception:
2094 logger.warning(
2095 "Failed to refresh cluster subscriptions, will retry on next error",
2096 exc_info=True,
2097 )
2098
2099 def _is_pubsub_connected(self, pubsub) -> bool:
2100 """Check if a pubsub connection is still alive."""
2101 try:
2102 conn = pubsub.connection
2103 if conn is None:
2104 return False
2105 return conn.is_connected
2106 except Exception:
2107 return False
2108
2109 def refresh_subscriptions(self):
2110 """
2111 Refresh subscriptions after a topology change.
2112
2113 This method is called automatically when topology changes are detected
2114 or when connection errors occur. You can also call it manually if needed.
2115
2116 This method:
2117 1. Discovers any new primary nodes and subscribes them
2118 2. Removes pubsubs for nodes that are no longer primaries
2119 3. Re-creates broken pubsub connections for existing nodes
2120 """
2121 with self._refresh_lock:
2122 current_primaries = {
2123 node.name: node for node in self._get_all_primary_nodes()
2124 }
2125
2126 # Remove pubsubs for nodes that are no longer primaries
2127 removed_nodes = set(self._node_pubsubs.keys()) - set(
2128 current_primaries.keys()
2129 )
2130 for node_name in removed_nodes:
2131 self._cleanup_node(node_name)
2132
2133 # Detect broken connections for existing nodes and remove them
2134 # so they get re-created below
2135 existing_nodes = set(self._node_pubsubs.keys()) & set(
2136 current_primaries.keys()
2137 )
2138 for node_name in existing_nodes:
2139 pubsub = self._node_pubsubs.get(node_name)
2140 if pubsub and not self._is_pubsub_connected(pubsub):
2141 # Connection is broken, remove it so it gets re-created
2142 self._cleanup_node(node_name)
2143
2144 # Subscribe new nodes (and nodes with broken connections) to existing
2145 # patterns/channels
2146 new_nodes = set(current_primaries.keys()) - set(self._node_pubsubs.keys())
2147 failed_nodes: list[str] = []
2148 for node_name in new_nodes:
2149 node = current_primaries[node_name]
2150 pubsub = self._ensure_node_pubsub(node)
2151
2152 try:
2153 if self._subscribed_patterns:
2154 pubsub.psubscribe(**self._subscribed_patterns)
2155 if self._subscribed_channels:
2156 pubsub.subscribe(**self._subscribed_channels)
2157 except Exception:
2158 # Subscription failed - remove from dict so retry is possible
2159 self._cleanup_node(node_name)
2160 failed_nodes.append(node_name)
2161
2162 # Raise after attempting all nodes so we don't skip any
2163 if failed_nodes:
2164 raise ConnectionError(
2165 f"Failed to subscribe to cluster nodes: {', '.join(failed_nodes)}"
2166 )
2167
2168 def close(self):
2169 """Close all pubsub connections and clean up resources."""
2170 self._closed = True
2171 for node_name in list(self._node_pubsubs.keys()):
2172 self._cleanup_node(node_name)
2173 self._subscribed_patterns.clear()
2174 self._subscribed_channels.clear()