Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/keyspace_notifications.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

797 statements  

1""" 

2Redis Keyspace Notifications support for redis-py. 

3 

4This module provides utilities for subscribing to and parsing Redis keyspace 

5notifications. Keyspace notifications allow clients to receive events when 

6keys are modified in Redis. 

7 

8Note: Keyspace notifications must be enabled on the Redis server via the 

9``notify-keyspace-events`` configuration option. This is a server-side 

10configuration that should be done by your infrastructure/operations team. 

11See the Redis documentation for details: 

12https://redis.io/docs/latest/develop/pubsub/keyspace-notifications/ 

13 

14Standalone Redis Example: 

15 >>> from redis import Redis 

16 >>> from redis.keyspace_notifications import ( 

17 ... KeyspaceNotifications, 

18 ... KeyspaceChannel, 

19 ... EventType, 

20 ... ) 

21 >>> 

22 >>> r = Redis() 

23 >>> # Server must have notify-keyspace-events configured (e.g., "KEA") 

24 >>> ksn = KeyspaceNotifications(r) 

25 >>> 

26 >>> # Subscribe using Channel class (patterns auto-detected) 

27 >>> channel = KeyspaceChannel("user:*") 

28 >>> ksn.subscribe(channel) 

29 >>> 

30 >>> # Or use convenience methods for specific event types 

31 >>> ksn.subscribe_keyevent(EventType.SET) 

32 >>> 

33 >>> for notification in ksn.listen(): 

34 ... print(f"Key: {notification.key}, Event: {notification.event_type}") 

35 

36Redis Cluster Example: 

37 >>> from redis.cluster import RedisCluster 

38 >>> from redis.keyspace_notifications import ( 

39 ... ClusterKeyspaceNotifications, 

40 ... KeyspaceChannel, 

41 ... EventType, 

42 ... ) 

43 >>> 

44 >>> rc = RedisCluster(host="localhost", port=7000) 

45 >>> # Server must have notify-keyspace-events configured (e.g., "KEA") 

46 >>> ksn = ClusterKeyspaceNotifications(rc) 

47 >>> 

48 >>> # Subscribe using Channel class (patterns auto-detected) 

49 >>> channel = KeyspaceChannel("user:*") 

50 >>> ksn.subscribe(channel) 

51 >>> 

52 >>> # Or use convenience methods for specific event types 

53 >>> ksn.subscribe_keyevent(EventType.SET) 

54 >>> 

55 >>> for notification in ksn.listen(): 

56 ... print(f"Key: {notification.key}, Event: {notification.event_type}") 

57""" 

58 

59from __future__ import annotations 

60 

61import logging 

62import re 

63import threading 

64import time 

65from abc import ABC, abstractmethod 

66from collections.abc import Callable 

67from dataclasses import dataclass 

68from enum import Enum 

69from typing import TYPE_CHECKING, Any, ClassVar, Union 

70 

71from redis.client import Redis 

72from redis.cluster import RedisCluster 

73from redis.exceptions import ( 

74 ConnectionError, 

75 RedisError, 

76 TimeoutError, 

77) 

78from redis.utils import safe_str 

79 

80logger = logging.getLogger(__name__) 

81 

82if TYPE_CHECKING: 

83 from typing import TypeAlias 

84 

85# Type alias for channel arguments - can be a string, bytes, or Channel object 

86# This is defined here and the actual types are added after class definitions 

87ChannelT: TypeAlias = Union[ 

88 str, 

89 bytes, 

90 "KeyspaceChannel", 

91 "KeyeventChannel", 

92 "SubkeyspaceChannel", 

93 "SubkeyeventChannel", 

94 "SubkeyspaceitemChannel", 

95 "SubkeyspaceeventChannel", 

96] 

97 

98 

99# Type alias for sync handlers 

100SyncHandlerT = Callable[["KeyNotification"], None] 

101 

102 

103# ============================================================================= 

104# Event Type Constants 

105# ============================================================================= 

106# These are common Redis keyspace notification event types provided for 

107# convenience. You can use any string as an event type - these constants 

108# are not exhaustive and Redis may add new events in future versions. 

109 

110 

111class EventType: 

112 """ 

113 Common Redis keyspace notification event type constants. 

114 

115 These are provided for convenience and IDE autocomplete. You can use 

116 any string as an event type - new Redis events will work without 

117 needing library updates. 

118 """ 

119 

120 # String commands 

121 SET = "set" 

122 SETEX = "setex" 

123 SETNX = "setnx" 

124 SETRANGE = "setrange" 

125 INCR = "incr" 

126 INCRBY = "incrby" 

127 INCRBYFLOAT = "incrbyfloat" 

128 DECR = "decr" 

129 DECRBY = "decrby" 

130 APPEND = "append" 

131 

132 # Generic commands 

133 DEL = "del" 

134 UNLINK = "unlink" 

135 RENAME = "rename" 

136 RENAME_FROM = "rename_from" 

137 RENAME_TO = "rename_to" 

138 COPY_TO = "copy_to" 

139 MOVE = "move" 

140 RESTORE = "restore" 

141 

142 # Expiration events 

143 EXPIRE = "expire" 

144 EXPIREAT = "expireat" 

145 PEXPIRE = "pexpire" 

146 PEXPIREAT = "pexpireat" 

147 EXPIRED = "expired" 

148 PERSIST = "persist" 

149 

150 # Eviction events 

151 EVICTED = "evicted" 

152 

153 # List commands 

154 LPUSH = "lpush" 

155 RPUSH = "rpush" 

156 LPOP = "lpop" 

157 RPOP = "rpop" 

158 LINSERT = "linsert" 

159 LSET = "lset" 

160 LTRIM = "ltrim" 

161 LMOVE = "lmove" 

162 BLPOP = "blpop" 

163 BRPOP = "brpop" 

164 BLMOVE = "blmove" 

165 

166 # Set commands 

167 SADD = "sadd" 

168 SREM = "srem" 

169 SPOP = "spop" 

170 SMOVE = "smove" 

171 SINTERSTORE = "sinterstore" 

172 SUNIONSTORE = "sunionstore" 

173 SDIFFSTORE = "sdiffstore" 

174 

175 # Sorted set commands 

176 ZADD = "zadd" 

177 ZINCRBY = "zincrby" 

178 ZREM = "zrem" 

179 ZREMRANGEBYRANK = "zremrangebyrank" 

180 ZREMRANGEBYSCORE = "zremrangebyscore" 

181 ZREMRANGEBYLEX = "zremrangebylex" 

182 ZPOPMIN = "zpopmin" 

183 ZPOPMAX = "zpopmax" 

184 BZPOPMIN = "bzpopmin" 

185 BZPOPMAX = "bzpopmax" 

186 ZINTERSTORE = "zinterstore" 

187 ZUNIONSTORE = "zunionstore" 

188 ZDIFFSTORE = "zdiffstore" 

189 ZRANGESTORE = "zrangestore" 

190 

191 # Hash commands 

192 HSET = "hset" 

193 HSETNX = "hsetnx" 

194 HDEL = "hdel" 

195 HINCRBY = "hincrby" 

196 HINCRBYFLOAT = "hincrbyfloat" 

197 

198 # Stream commands 

199 XADD = "xadd" 

200 XTRIM = "xtrim" 

201 XDEL = "xdel" 

202 XGROUP_CREATE = "xgroup-create" 

203 XGROUP_CREATECONSUMER = "xgroup-createconsumer" 

204 XGROUP_DELCONSUMER = "xgroup-delconsumer" 

205 XGROUP_DESTROY = "xgroup-destroy" 

206 XGROUP_SETID = "xgroup-setid" 

207 XSETID = "xsetid" 

208 XCLAIM = "xclaim" 

209 XAUTOCLAIM = "xautoclaim" 

210 XREADGROUP = "xreadgroup" 

211 

212 # Other 

213 NEW = "new" # Key created (when tracking new keys) 

214 SORTSTORE = "sortstore" 

215 GETEX = "getex" 

216 GETDEL = "getdel" 

217 SETIFGT = "setifgt" 

218 SETIFLT = "setiflt" 

219 SETIFEQ = "setifeq" 

220 SETIFNE = "setifne" 

221 

222 

223def _parse_length_prefixed_subkeys(s: str) -> list[str]: 

224 """Parse a length-prefixed subkey list. 

225 

226 The wire format is ``<len>:<subkey>[,<len>:<subkey>...]``. 

227 

228 Returns: 

229 A list of subkey strings. 

230 """ 

231 subkeys: list[str] = [] 

232 pos = 0 

233 while pos < len(s): 

234 colon = s.index(":", pos) 

235 length = int(s[pos:colon]) 

236 start = colon + 1 

237 subkeys.append(s[start : start + length]) 

238 pos = start + length 

239 if pos < len(s) and s[pos] == ",": 

240 pos += 1 # skip comma separator 

241 return subkeys 

242 

243 

244@dataclass 

245class KeyNotification: 

246 """ 

247 Represents a parsed Redis keyspace, keyevent, or subkey notification. 

248 

249 This class provides convenient access to the notification details 

250 like key, event type, database number, and affected subkeys. 

251 

252 Attributes: 

253 key: The Redis key that was affected (for keyspace notifications) 

254 or the key name from the message data (for keyevent notifications) 

255 event_type: The type of operation that occurred (e.g., "set", "del"). 

256 This is a plain string, so new Redis events work automatically. 

257 Compare against EventType constants or any string. 

258 database: The database number where the event occurred 

259 channel: The original channel name 

260 is_keyspace: True if this is a keyspace notification, False for keyevent 

261 data: The raw data payload from the notification message. 

262 subkeys: List of affected subkeys (fields) for subkey notifications. 

263 Empty list for regular keyspace/keyevent notifications. 

264 """ 

265 

266 # Regex patterns for parsing keyspace/keyevent channels 

267 # Pattern: __keyspace@<db>__:<key> or __keyevent@<db>__:<event> 

268 _KEYSPACE_PATTERN: ClassVar[re.Pattern] = re.compile( 

269 r"^__keyspace@(\d+|\*)__:(.+)$" 

270 ) 

271 _KEYEVENT_PATTERN: ClassVar[re.Pattern] = re.compile( 

272 r"^__keyevent@(\d+|\*)__:(.+)$" 

273 ) 

274 _SUBKEYSPACE_PATTERN: ClassVar[re.Pattern] = re.compile( 

275 r"^__subkeyspace@(\d+|\*)__:(.+)$" 

276 ) 

277 _SUBKEYEVENT_PATTERN: ClassVar[re.Pattern] = re.compile( 

278 r"^__subkeyevent@(\d+|\*)__:(.+)$" 

279 ) 

280 _SUBKEYSPACEITEM_PATTERN: ClassVar[re.Pattern] = re.compile( 

281 r"^__subkeyspaceitem@(\d+|\*)__:(.+)$", re.DOTALL 

282 ) 

283 _SUBKEYSPACEEVENT_PATTERN: ClassVar[re.Pattern] = re.compile( 

284 r"^__subkeyspaceevent@(\d+|\*)__:(.+)$" 

285 ) 

286 

287 key: str 

288 event_type: str 

289 database: int 

290 channel: str 

291 is_keyspace: bool 

292 data: str 

293 subkeys: list[str] = None # type: ignore[assignment] 

294 

295 def __post_init__(self): 

296 if self.subkeys is None: 

297 self.subkeys = [] 

298 

299 @classmethod 

300 def from_message( 

301 cls, 

302 message: dict[str, Any] | None, 

303 key_prefix: str | bytes | None = None, 

304 ) -> KeyNotification | None: 

305 """ 

306 Parse a pub/sub message into a KeyNotification. 

307 

308 Args: 

309 message: A pub/sub message dict with 'channel', 'data', and 'type' keys 

310 key_prefix: Optional prefix to filter and strip from keys. 

311 If provided, only notifications for keys starting with 

312 this prefix will be returned, and the prefix will be 

313 stripped from the key. 

314 

315 Returns: 

316 A KeyNotification if the message is a valid keyspace/keyevent 

317 notification, None otherwise. 

318 

319 Example: 

320 >>> message = { 

321 ... 'type': 'pmessage', 

322 ... 'pattern': '__keyspace@0__:user:*', 

323 ... 'channel': '__keyspace@0__:user:123', 

324 ... 'data': 'set' 

325 ... } 

326 >>> notification = KeyNotification.from_message(message) 

327 >>> notification.key 

328 'user:123' 

329 >>> notification.event_type 

330 'set' 

331 """ 

332 if message is None: 

333 return None 

334 

335 msg_type = message.get("type") 

336 if msg_type not in ("message", "pmessage"): 

337 return None 

338 

339 channel = message.get("channel") 

340 data = message.get("data") 

341 

342 if channel is None or data is None: 

343 return None 

344 

345 return cls.try_parse(channel, data, key_prefix) 

346 

347 @classmethod 

348 def try_parse( 

349 cls, 

350 channel: str | bytes, 

351 data: str | bytes, 

352 key_prefix: str | bytes | None = None, 

353 ) -> KeyNotification | None: 

354 """ 

355 Try to parse a channel and data into a KeyNotification. 

356 

357 This is a lower-level method that takes the channel and data directly, 

358 useful when working with callback-based subscription handlers. 

359 

360 Args: 

361 channel: The channel name (e.g., "__keyspace@0__:mykey") 

362 data: The message data (event type for keyspace, key for keyevent) 

363 key_prefix: Optional prefix to filter and strip from keys 

364 

365 Returns: 

366 A KeyNotification if valid, None otherwise. 

367 """ 

368 channel = safe_str(channel) 

369 data = safe_str(data) 

370 

371 return cls._parse(channel, data, key_prefix) 

372 

373 @classmethod 

374 def _parse( 

375 cls, 

376 channel: str, 

377 data: str, 

378 key_prefix: str | bytes | None = None, 

379 ) -> KeyNotification | None: 

380 """Internal parsing logic.""" 

381 # Normalize key_prefix 

382 key_prefix = safe_str(key_prefix) if key_prefix else None 

383 

384 # Try keyspace pattern first: __keyspace@<db>__:<key> 

385 match = cls._KEYSPACE_PATTERN.match(channel) 

386 if match: 

387 db_str, key = match.groups() 

388 database = int(db_str) if db_str != "*" else -1 

389 event_type = data # For keyspace, the data is the event type 

390 

391 # Apply key prefix filter 

392 if key_prefix: 

393 if not key.startswith(key_prefix): 

394 return None 

395 key = key[len(key_prefix) :] 

396 

397 return cls( 

398 key=key, 

399 event_type=event_type, 

400 database=database, 

401 channel=channel, 

402 is_keyspace=True, 

403 data=data, 

404 ) 

405 

406 # Try keyevent pattern: __keyevent@<db>__:<event> 

407 match = cls._KEYEVENT_PATTERN.match(channel) 

408 if match: 

409 db_str, event_type = match.groups() 

410 database = int(db_str) if db_str != "*" else -1 

411 key = data # For keyevent, the data is the key 

412 

413 # Apply key prefix filter 

414 if key_prefix: 

415 if not key.startswith(key_prefix): 

416 return None 

417 key = key[len(key_prefix) :] 

418 

419 return cls( 

420 key=key, 

421 event_type=event_type, 

422 database=database, 

423 channel=channel, 

424 is_keyspace=False, 

425 data=data, 

426 ) 

427 

428 # Try subkeyspace: channel=__subkeyspace@<db>__:<key> 

429 # data=<event>|<subkey_len>:<subkey>[,<subkey_len>:<subkey>...] 

430 match = cls._SUBKEYSPACE_PATTERN.match(channel) 

431 if match: 

432 db_str, key = match.groups() 

433 database = int(db_str) if db_str != "*" else -1 

434 pipe_idx = data.index("|") 

435 event_type = data[:pipe_idx] 

436 subkeys = _parse_length_prefixed_subkeys(data[pipe_idx + 1 :]) 

437 

438 if key_prefix: 

439 if not key.startswith(key_prefix): 

440 return None 

441 key = key[len(key_prefix) :] 

442 

443 return cls( 

444 key=key, 

445 event_type=event_type, 

446 database=database, 

447 channel=channel, 

448 is_keyspace=True, 

449 data=data, 

450 subkeys=subkeys, 

451 ) 

452 

453 # Try subkeyevent: channel=__subkeyevent@<db>__:<event> 

454 # data=<key_len>:<key>|<subkey_len>:<subkey>[,...] 

455 match = cls._SUBKEYEVENT_PATTERN.match(channel) 

456 if match: 

457 db_str, event_type = match.groups() 

458 database = int(db_str) if db_str != "*" else -1 

459 # Parse key by length prefix 

460 colon_idx = data.index(":") 

461 key_len = int(data[:colon_idx]) 

462 key_start = colon_idx + 1 

463 key = data[key_start : key_start + key_len] 

464 # After key, expect '|' then subkeys 

465 subkeys_start = key_start + key_len + 1 # +1 for '|' 

466 subkeys = _parse_length_prefixed_subkeys(data[subkeys_start:]) 

467 

468 if key_prefix: 

469 if not key.startswith(key_prefix): 

470 return None 

471 key = key[len(key_prefix) :] 

472 

473 return cls( 

474 key=key, 

475 event_type=event_type, 

476 database=database, 

477 channel=channel, 

478 is_keyspace=False, 

479 data=data, 

480 subkeys=subkeys, 

481 ) 

482 

483 # Try subkeyspaceitem: channel=__subkeyspaceitem@<db>__:<key>\n<subkey> 

484 # data=<event> 

485 match = cls._SUBKEYSPACEITEM_PATTERN.match(channel) 

486 if match: 

487 db_str, key_and_subkey = match.groups() 

488 database = int(db_str) if db_str != "*" else -1 

489 newline_idx = key_and_subkey.index("\n") 

490 key = key_and_subkey[:newline_idx] 

491 subkey = key_and_subkey[newline_idx + 1 :] 

492 event_type = data 

493 

494 if key_prefix: 

495 if not key.startswith(key_prefix): 

496 return None 

497 key = key[len(key_prefix) :] 

498 

499 return cls( 

500 key=key, 

501 event_type=event_type, 

502 database=database, 

503 channel=channel, 

504 is_keyspace=True, 

505 data=data, 

506 subkeys=[subkey], 

507 ) 

508 

509 # Try subkeyspaceevent: channel=__subkeyspaceevent@<db>__:<event>|<key> 

510 # data=<subkey_len>:<subkey>[,...] 

511 match = cls._SUBKEYSPACEEVENT_PATTERN.match(channel) 

512 if match: 

513 db_str, event_and_key = match.groups() 

514 database = int(db_str) if db_str != "*" else -1 

515 pipe_idx = event_and_key.index("|") 

516 event_type = event_and_key[:pipe_idx] 

517 key = event_and_key[pipe_idx + 1 :] 

518 subkeys = _parse_length_prefixed_subkeys(data) 

519 

520 if key_prefix: 

521 if not key.startswith(key_prefix): 

522 return None 

523 key = key[len(key_prefix) :] 

524 

525 return cls( 

526 key=key, 

527 event_type=event_type, 

528 database=database, 

529 channel=channel, 

530 is_keyspace=False, 

531 data=data, 

532 subkeys=subkeys, 

533 ) 

534 

535 return None 

536 

537 def key_starts_with(self, prefix: str | bytes) -> bool: 

538 """Check if the key starts with the given prefix.""" 

539 prefix = safe_str(prefix) 

540 return self.key.startswith(prefix) 

541 

542 

543# ============================================================================= 

544# Channel Classes 

545# ============================================================================= 

546 

547 

548class KeyspaceChannel: 

549 """ 

550 Represents a keyspace notification channel for subscribing to events on keys. 

551 

552 Keyspace notifications publish the event type (e.g., "set", "del") as the message 

553 when a key matching the pattern is modified. 

554 

555 This class can be used directly with subscribe()/psubscribe() as it implements 

556 __str__ to return the channel string. 

557 

558 Attributes: 

559 key_or_pattern: The key or pattern to monitor (use '*' for wildcards) 

560 db: The database number (defaults to 0, the only database in Redis Cluster) 

561 is_pattern: Whether this channel contains wildcards 

562 

563 Examples: 

564 >>> channel = KeyspaceChannel("user:123", db=0) 

565 >>> str(channel) 

566 '__keyspace@0__:user:123' 

567 

568 >>> # Pattern subscription (wildcards are auto-detected) 

569 >>> channel = KeyspaceChannel("user:*", db=0) 

570 >>> str(channel) 

571 '__keyspace@0__:user:*' 

572 

573 >>> # Use with KeyspaceNotifications 

574 >>> notifications = KeyspaceNotifications(redis_client) 

575 >>> notifications.subscribe(channel) 

576 """ 

577 

578 PREFIX: ClassVar[str] = "__keyspace@" 

579 

580 def __init__(self, key_or_pattern: str, db: int = 0): 

581 """ 

582 Create a keyspace notification channel. 

583 

584 Args: 

585 key_or_pattern: The key or pattern to monitor. Use '*' for wildcards. 

586 db: The database number. Defaults to 0 (the only database in Redis Cluster). 

587 """ 

588 self.key_or_pattern = key_or_pattern 

589 self.db = db 

590 self._channel_str = self._build_channel_string() 

591 

592 def _build_channel_string(self) -> str: 

593 return f"{self.PREFIX}{self.db}__:{self.key_or_pattern}" 

594 

595 @property 

596 def is_pattern(self) -> bool: 

597 """Check if this channel contains wildcards and should use psubscribe.""" 

598 return _is_pattern(self.key_or_pattern) 

599 

600 def __str__(self) -> str: 

601 return self._channel_str 

602 

603 def __repr__(self) -> str: 

604 return f"KeyspaceChannel({self.key_or_pattern!r}, db={self.db})" 

605 

606 def __eq__(self, other: object) -> bool: 

607 if isinstance(other, KeyspaceChannel): 

608 return self._channel_str == other._channel_str 

609 if isinstance(other, str): 

610 return self._channel_str == other 

611 return NotImplemented 

612 

613 def __hash__(self) -> int: 

614 return hash(self._channel_str) 

615 

616 

617class KeyeventChannel: 

618 """ 

619 Represents a keyevent notification channel for subscribing to event types. 

620 

621 Keyevent notifications publish the key name as the message when the specified 

622 event type occurs on any key. 

623 

624 This class can be used directly with subscribe()/psubscribe() as it implements 

625 __str__ to return the channel string. 

626 

627 Attributes: 

628 event: The event type to monitor 

629 db: The database number (defaults to 0, the only database in Redis Cluster) 

630 is_pattern: Whether this channel contains wildcards 

631 

632 Examples: 

633 >>> channel = KeyeventChannel(EventType.SET, db=0) 

634 >>> str(channel) 

635 '__keyevent@0__:set' 

636 

637 >>> channel = KeyeventChannel.all_events(db=0) 

638 >>> str(channel) 

639 '__keyevent@0__:*' 

640 

641 >>> # Use with KeyspaceNotifications 

642 >>> notifications = KeyspaceNotifications(redis_client) 

643 >>> notifications.subscribe(channel) 

644 """ 

645 

646 PREFIX: ClassVar[str] = "__keyevent@" 

647 

648 def __init__(self, event: str, db: int = 0): 

649 """ 

650 Create a keyevent notification channel. 

651 

652 Args: 

653 event: The event type to monitor (e.g., EventType.SET or "set") 

654 db: The database number. Defaults to 0 (the only database in Redis Cluster). 

655 """ 

656 self.event = event 

657 self.db = db 

658 self._channel_str = self._build_channel_string() 

659 

660 def _build_channel_string(self) -> str: 

661 return f"{self.PREFIX}{self.db}__:{self.event}" 

662 

663 @property 

664 def is_pattern(self) -> bool: 

665 """Check if this channel contains wildcards and should use psubscribe.""" 

666 return _is_pattern(self.event) 

667 

668 @classmethod 

669 def all_events(cls, db: int = 0) -> "KeyeventChannel": 

670 """ 

671 Create a keyevent pattern for subscribing to all event types. 

672 

673 This is equivalent to KeyeventChannel("*"). 

674 

675 Args: 

676 db: The database number. Defaults to 0 (the only database in Redis Cluster). 

677 

678 Returns: 

679 A KeyeventChannel configured to receive all events. 

680 

681 Examples: 

682 >>> channel = KeyeventChannel.all_events() 

683 >>> str(channel) 

684 '__keyevent@0__:*' 

685 """ 

686 return cls("*", db=db) 

687 

688 def __str__(self) -> str: 

689 return self._channel_str 

690 

691 def __repr__(self) -> str: 

692 return f"KeyeventChannel({self.event!r}, db={self.db})" 

693 

694 def __eq__(self, other: object) -> bool: 

695 if isinstance(other, KeyeventChannel): 

696 return self._channel_str == other._channel_str 

697 if isinstance(other, str): 

698 return self._channel_str == other 

699 return NotImplemented 

700 

701 def __hash__(self) -> int: 

702 return hash(self._channel_str) 

703 

704 

705class SubkeyspaceChannel: 

706 """ 

707 Represents a subkeyspace notification channel for subscribing to 

708 subkey-level events on keys (e.g., hash field changes). 

709 

710 The channel format is ``__subkeyspace@<db>__:<key>``. 

711 The message payload is ``<event>|<subkey_len>:<subkey>[,...]``. 

712 

713 Examples: 

714 >>> channel = SubkeyspaceChannel("myhash", db=0) 

715 >>> str(channel) 

716 '__subkeyspace@0__:myhash' 

717 """ 

718 

719 PREFIX: ClassVar[str] = "__subkeyspace@" 

720 

721 def __init__(self, key_or_pattern: str, db: int = 0): 

722 self.key_or_pattern = key_or_pattern 

723 self.db = db 

724 self._channel_str = self._build_channel_string() 

725 

726 def _build_channel_string(self) -> str: 

727 return f"{self.PREFIX}{self.db}__:{self.key_or_pattern}" 

728 

729 @property 

730 def is_pattern(self) -> bool: 

731 return _is_pattern(self.key_or_pattern) 

732 

733 def __str__(self) -> str: 

734 return self._channel_str 

735 

736 def __repr__(self) -> str: 

737 return f"SubkeyspaceChannel({self.key_or_pattern!r}, db={self.db})" 

738 

739 def __eq__(self, other: object) -> bool: 

740 if isinstance(other, SubkeyspaceChannel): 

741 return self._channel_str == other._channel_str 

742 if isinstance(other, str): 

743 return self._channel_str == other 

744 return NotImplemented 

745 

746 def __hash__(self) -> int: 

747 return hash(self._channel_str) 

748 

749 

750class SubkeyeventChannel: 

751 """ 

752 Represents a subkeyevent notification channel for subscribing to 

753 specific event types with subkey-level detail. 

754 

755 The channel format is ``__subkeyevent@<db>__:<event>``. 

756 The message payload is ``<key_len>:<key>|<subkey_len>:<subkey>[,...]``. 

757 

758 Examples: 

759 >>> channel = SubkeyeventChannel("hdel", db=0) 

760 >>> str(channel) 

761 '__subkeyevent@0__:hdel' 

762 """ 

763 

764 PREFIX: ClassVar[str] = "__subkeyevent@" 

765 

766 def __init__(self, event: str, db: int = 0): 

767 self.event = event 

768 self.db = db 

769 self._channel_str = self._build_channel_string() 

770 

771 def _build_channel_string(self) -> str: 

772 return f"{self.PREFIX}{self.db}__:{self.event}" 

773 

774 @property 

775 def is_pattern(self) -> bool: 

776 return _is_pattern(self.event) 

777 

778 @classmethod 

779 def all_events(cls, db: int = 0) -> SubkeyeventChannel: 

780 """Create a channel for all subkeyevent types.""" 

781 return cls("*", db=db) 

782 

783 def __str__(self) -> str: 

784 return self._channel_str 

785 

786 def __repr__(self) -> str: 

787 return f"SubkeyeventChannel({self.event!r}, db={self.db})" 

788 

789 def __eq__(self, other: object) -> bool: 

790 if isinstance(other, SubkeyeventChannel): 

791 return self._channel_str == other._channel_str 

792 if isinstance(other, str): 

793 return self._channel_str == other 

794 return NotImplemented 

795 

796 def __hash__(self) -> int: 

797 return hash(self._channel_str) 

798 

799 

800class SubkeyspaceitemChannel: 

801 """ 

802 Represents a subkeyspaceitem notification channel for subscribing to 

803 events on a specific subkey (field) of a specific key. 

804 

805 The channel format is ``__subkeyspaceitem@<db>__:<key>\\n<subkey>``. 

806 The message payload is the event type (e.g., ``"hset"``). 

807 

808 Note: 

809 The server only emits this notification when the key does not 

810 contain a newline character. 

811 

812 Examples: 

813 >>> channel = SubkeyspaceitemChannel("myhash", "myfield", db=0) 

814 >>> str(channel) 

815 '__subkeyspaceitem@0__:myhash\\nmyfield' 

816 """ 

817 

818 PREFIX: ClassVar[str] = "__subkeyspaceitem@" 

819 

820 def __init__(self, key_or_pattern: str, subkey_or_pattern: str, db: int = 0): 

821 self.key_or_pattern = key_or_pattern 

822 self.subkey_or_pattern = subkey_or_pattern 

823 self.db = db 

824 self._channel_str = self._build_channel_string() 

825 

826 def _build_channel_string(self) -> str: 

827 return ( 

828 f"{self.PREFIX}{self.db}__:{self.key_or_pattern}\n{self.subkey_or_pattern}" 

829 ) 

830 

831 @property 

832 def is_pattern(self) -> bool: 

833 return _is_pattern(self.key_or_pattern) or _is_pattern(self.subkey_or_pattern) 

834 

835 def __str__(self) -> str: 

836 return self._channel_str 

837 

838 def __repr__(self) -> str: 

839 return ( 

840 f"SubkeyspaceitemChannel({self.key_or_pattern!r}, " 

841 f"{self.subkey_or_pattern!r}, db={self.db})" 

842 ) 

843 

844 def __eq__(self, other: object) -> bool: 

845 if isinstance(other, SubkeyspaceitemChannel): 

846 return self._channel_str == other._channel_str 

847 if isinstance(other, str): 

848 return self._channel_str == other 

849 return NotImplemented 

850 

851 def __hash__(self) -> int: 

852 return hash(self._channel_str) 

853 

854 

855class SubkeyspaceeventChannel: 

856 """ 

857 Represents a subkeyspaceevent notification channel for subscribing to 

858 a specific event on a specific key, receiving affected subkeys. 

859 

860 The channel format is ``__subkeyspaceevent@<db>__:<event>|<key>``. 

861 The message payload is a length-prefixed subkey list. 

862 

863 Examples: 

864 >>> channel = SubkeyspaceeventChannel("hset", "myhash", db=0) 

865 >>> str(channel) 

866 '__subkeyspaceevent@0__:hset|myhash' 

867 """ 

868 

869 PREFIX: ClassVar[str] = "__subkeyspaceevent@" 

870 

871 def __init__(self, event: str, key_or_pattern: str, db: int = 0): 

872 self.event = event 

873 self.key_or_pattern = key_or_pattern 

874 self.db = db 

875 self._channel_str = self._build_channel_string() 

876 

877 def _build_channel_string(self) -> str: 

878 return f"{self.PREFIX}{self.db}__:{self.event}|{self.key_or_pattern}" 

879 

880 @property 

881 def is_pattern(self) -> bool: 

882 return _is_pattern(self.event) or _is_pattern(self.key_or_pattern) 

883 

884 def __str__(self) -> str: 

885 return self._channel_str 

886 

887 def __repr__(self) -> str: 

888 return ( 

889 f"SubkeyspaceeventChannel({self.event!r}, " 

890 f"{self.key_or_pattern!r}, db={self.db})" 

891 ) 

892 

893 def __eq__(self, other: object) -> bool: 

894 if isinstance(other, SubkeyspaceeventChannel): 

895 return self._channel_str == other._channel_str 

896 if isinstance(other, str): 

897 return self._channel_str == other 

898 return NotImplemented 

899 

900 def __hash__(self) -> int: 

901 return hash(self._channel_str) 

902 

903 

904class ChannelType(Enum): 

905 """ 

906 Enum representing the type of a Redis keyspace notification channel. 

907 

908 Redis provides two types of keyspace notifications and four subkey 

909 notification types: 

910 

911 - KEYSPACE: ``__keyspace@{db}__:{key}`` — data is the event type. 

912 - KEYEVENT: ``__keyevent@{db}__:{event}`` — data is the key name. 

913 - SUBKEYSPACE: ``__subkeyspace@{db}__:{key}`` — data is event + subkeys. 

914 - SUBKEYEVENT: ``__subkeyevent@{db}__:{event}`` — data is key + subkeys. 

915 - SUBKEYSPACEITEM: ``__subkeyspaceitem@{db}__:{key}\\n{subkey}`` — data 

916 is the event type. 

917 - SUBKEYSPACEEVENT: ``__subkeyspaceevent@{db}__:{event}|{key}`` — data 

918 is a subkey list. 

919 

920 Examples: 

921 >>> get_channel_type("__keyspace@0__:mykey") 

922 ChannelType.KEYSPACE 

923 >>> get_channel_type("__subkeyspace@0__:myhash") 

924 ChannelType.SUBKEYSPACE 

925 """ 

926 

927 KEYSPACE = "keyspace" 

928 KEYEVENT = "keyevent" 

929 SUBKEYSPACE = "subkeyspace" 

930 SUBKEYEVENT = "subkeyevent" 

931 SUBKEYSPACEITEM = "subkeyspaceitem" 

932 SUBKEYSPACEEVENT = "subkeyspaceevent" 

933 

934 

935def get_channel_type(channel: str | bytes) -> ChannelType | None: 

936 """ 

937 Determine the type of a Redis keyspace notification channel. 

938 

939 Args: 

940 channel: The channel name to check (string or bytes). 

941 

942 Returns: 

943 ChannelType.KEYSPACE if it's a keyspace notification channel, 

944 ChannelType.KEYEVENT if it's a keyevent notification channel, 

945 None if it's not a keyspace notification channel. 

946 

947 Examples: 

948 >>> get_channel_type("__keyspace@0__:mykey") 

949 ChannelType.KEYSPACE 

950 >>> get_channel_type("__keyevent@0__:set") 

951 ChannelType.KEYEVENT 

952 >>> get_channel_type("regular_channel") is None 

953 True 

954 >>> get_channel_type(b"__keyspace@0__:mykey") 

955 ChannelType.KEYSPACE 

956 """ 

957 channel_str = safe_str(channel) 

958 # Check subkey prefixes first (they are longer and more specific) 

959 if channel_str.startswith(SubkeyspaceitemChannel.PREFIX): 

960 return ChannelType.SUBKEYSPACEITEM 

961 if channel_str.startswith(SubkeyspaceeventChannel.PREFIX): 

962 return ChannelType.SUBKEYSPACEEVENT 

963 if channel_str.startswith(SubkeyspaceChannel.PREFIX): 

964 return ChannelType.SUBKEYSPACE 

965 if channel_str.startswith(SubkeyeventChannel.PREFIX): 

966 return ChannelType.SUBKEYEVENT 

967 if channel_str.startswith(KeyspaceChannel.PREFIX): 

968 return ChannelType.KEYSPACE 

969 if channel_str.startswith(KeyeventChannel.PREFIX): 

970 return ChannelType.KEYEVENT 

971 return None 

972 

973 

974def _is_pattern( 

975 channel: str | bytes | KeyspaceChannel | KeyeventChannel, 

976) -> bool: 

977 """ 

978 Check if a channel string contains glob-style pattern characters. 

979 

980 Redis uses glob-style patterns for psubscribe: 

981 - * matches any sequence of characters 

982 - ? matches any single character 

983 - [...] matches any character in the brackets 

984 

985 Args: 

986 channel: The channel string to check. Can be a string, bytes, 

987 or a KeyspaceChannel/KeyeventChannel object. 

988 

989 Returns: 

990 True if the channel contains pattern characters, False otherwise. 

991 """ 

992 # Handle Channel objects that have _channel_str attribute 

993 # (KeyspaceChannel, KeyeventChannel) 

994 if hasattr(channel, "_channel_str"): 

995 channel = channel._channel_str 

996 channel = safe_str(channel) 

997 # Check for unescaped glob pattern characters. 

998 # * and ? are always pattern characters. 

999 # [ is only a pattern character when followed by a matching unescaped ], 

1000 # forming a bracket expression like [abc] or [a-z]. A lone [ (e.g. in 

1001 # a key named "my[key") is treated as a literal by Redis. 

1002 i = 0 

1003 while i < len(channel): 

1004 char = channel[i] 

1005 if char == "\\": 

1006 # Skip escaped character 

1007 i += 2 

1008 continue 

1009 if char in ("*", "?"): 

1010 return True 

1011 if char == "[": 

1012 # Look for a matching unescaped ] 

1013 j = i + 1 

1014 while j < len(channel): 

1015 if channel[j] == "\\": 

1016 j += 2 

1017 continue 

1018 if channel[j] == "]": 

1019 return True 

1020 j += 1 

1021 # No matching ] found — literal [ 

1022 i += 1 

1023 return False 

1024 

1025 

1026# ============================================================================= 

1027# Abstract Base Class for Keyspace Notifications 

1028# ============================================================================= 

1029 

1030 

1031class KeyspaceNotificationsInterface(ABC): 

1032 """ 

1033 Interface for keyspace notification managers. 

1034 

1035 This interface provides a consistent API for both standalone (KeyspaceNotifications) 

1036 and cluster (ClusterKeyspaceNotifications) implementations, allowing the same 

1037 code patterns to work with both standalone and cluster Redis deployments. 

1038 """ 

1039 

1040 @abstractmethod 

1041 def subscribe( 

1042 self, 

1043 *channels: ChannelT, 

1044 handler: SyncHandlerT | None = None, 

1045 ): 

1046 """Subscribe to keyspace notification channels.""" 

1047 pass 

1048 

1049 @abstractmethod 

1050 def unsubscribe(self, *channels: ChannelT): 

1051 """Unsubscribe from keyspace notification channels.""" 

1052 pass 

1053 

1054 @abstractmethod 

1055 def subscribe_keyspace( 

1056 self, 

1057 key_or_pattern: str, 

1058 db: int = 0, 

1059 handler: SyncHandlerT | None = None, 

1060 ): 

1061 """Subscribe to keyspace notifications for specific keys.""" 

1062 pass 

1063 

1064 @abstractmethod 

1065 def subscribe_keyevent( 

1066 self, 

1067 event: str, 

1068 db: int = 0, 

1069 handler: SyncHandlerT | None = None, 

1070 ): 

1071 """Subscribe to keyevent notifications for specific event types.""" 

1072 pass 

1073 

1074 @abstractmethod 

1075 def subscribe_subkeyspace( 

1076 self, 

1077 key_or_pattern: str, 

1078 db: int = 0, 

1079 handler: SyncHandlerT | None = None, 

1080 ): 

1081 """Subscribe to subkeyspace notifications for specific keys.""" 

1082 pass 

1083 

1084 @abstractmethod 

1085 def subscribe_subkeyevent( 

1086 self, 

1087 event: str, 

1088 db: int = 0, 

1089 handler: SyncHandlerT | None = None, 

1090 ): 

1091 """Subscribe to subkeyevent notifications for specific event types.""" 

1092 pass 

1093 

1094 @abstractmethod 

1095 def subscribe_subkeyspaceitem( 

1096 self, 

1097 key_or_pattern: str, 

1098 subkey_or_pattern: str, 

1099 db: int = 0, 

1100 handler: SyncHandlerT | None = None, 

1101 ): 

1102 """Subscribe to subkeyspaceitem notifications for a specific subkey.""" 

1103 pass 

1104 

1105 @abstractmethod 

1106 def subscribe_subkeyspaceevent( 

1107 self, 

1108 event: str, 

1109 key_or_pattern: str, 

1110 db: int = 0, 

1111 handler: SyncHandlerT | None = None, 

1112 ): 

1113 """Subscribe to subkeyspaceevent notifications for an event on a key.""" 

1114 pass 

1115 

1116 @abstractmethod 

1117 def get_message( 

1118 self, 

1119 ignore_subscribe_messages: bool | None = None, 

1120 timeout: float = 0.0, 

1121 ) -> KeyNotification | None: 

1122 """Get the next keyspace notification if one is available.""" 

1123 pass 

1124 

1125 @abstractmethod 

1126 def listen(self): 

1127 """Listen for keyspace notifications.""" 

1128 pass 

1129 

1130 @abstractmethod 

1131 def close(self): 

1132 """Close the notification manager and clean up resources.""" 

1133 pass 

1134 

1135 @abstractmethod 

1136 def __enter__(self): 

1137 pass 

1138 

1139 @abstractmethod 

1140 def __exit__(self, _exc_type, _exc_val, _exc_tb): 

1141 pass 

1142 

1143 @property 

1144 @abstractmethod 

1145 def subscribed(self) -> bool: 

1146 """Check if there are any active subscriptions and not closed.""" 

1147 pass 

1148 

1149 @abstractmethod 

1150 def run_in_thread( 

1151 self, 

1152 poll_timeout: float = 0.0, 

1153 daemon: bool = False, 

1154 exception_handler: Callable[ 

1155 [ 

1156 BaseException, 

1157 KeyspaceNotificationsInterface, 

1158 KeyspaceWorkerThread, 

1159 ], 

1160 None, 

1161 ] 

1162 | None = None, 

1163 ) -> KeyspaceWorkerThread: 

1164 """Start a background thread that polls for notifications.""" 

1165 pass 

1166 

1167 

1168class AbstractKeyspaceNotifications(KeyspaceNotificationsInterface): 

1169 """ 

1170 Abstract base class for keyspace notification managers. 

1171 

1172 Provides shared implementation for subscribe/unsubscribe logic. 

1173 Subclasses must implement: 

1174 - _execute_subscribe: Execute the subscribe operation 

1175 - _execute_unsubscribe: Execute the unsubscribe operation 

1176 - get_message: Get the next notification 

1177 - listen: Generator for notifications 

1178 - close: Clean up resources 

1179 """ 

1180 

1181 def __init__( 

1182 self, 

1183 key_prefix: str | bytes | None = None, 

1184 ignore_subscribe_messages: bool = True, 

1185 ): 

1186 """ 

1187 Initialize the base keyspace notification manager. 

1188 

1189 Args: 

1190 key_prefix: Optional prefix to filter and strip from keys in notifications 

1191 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations 

1192 are not returned by get_message/listen 

1193 """ 

1194 self.key_prefix = key_prefix 

1195 self.ignore_subscribe_messages = ignore_subscribe_messages 

1196 self._closed = False 

1197 

1198 def subscribe( 

1199 self, 

1200 *channels: ChannelT, 

1201 handler: SyncHandlerT | None = None, 

1202 ): 

1203 """ 

1204 Subscribe to keyspace notification channels. 

1205 

1206 Automatically detects whether each channel is a pattern (contains 

1207 wildcards like *, ?, [) or an exact channel name and uses the 

1208 appropriate Redis subscribe command internally. 

1209 

1210 Args: 

1211 *channels: Channels to subscribe to. Can be strings, KeyspaceChannel, 

1212 or KeyeventChannel objects. Patterns are auto-detected. 

1213 handler: Optional callback function that receives KeyNotification 

1214 objects. If provided, notifications are passed to the handler 

1215 instead of being returned by get_message()/listen(). 

1216 """ 

1217 # Wrap the handler to convert raw messages to KeyNotification objects 

1218 wrapped_handler: Callable | None = None 

1219 if handler is not None: 

1220 # Capture key_prefix in closure for consistent filtering/stripping 

1221 key_prefix = self.key_prefix 

1222 

1223 def _wrap_handler(message): 

1224 notification = KeyNotification.from_message( 

1225 message, key_prefix=key_prefix 

1226 ) 

1227 if notification is not None: 

1228 handler(notification) 

1229 

1230 wrapped_handler = _wrap_handler 

1231 

1232 patterns = {} 

1233 exact_channels = {} 

1234 

1235 for channel in channels: 

1236 if hasattr(channel, "_channel_str"): 

1237 channel_str = str(channel) 

1238 else: 

1239 channel_str = safe_str(channel) 

1240 if _is_pattern(channel): 

1241 patterns[channel_str] = wrapped_handler 

1242 else: 

1243 exact_channels[channel_str] = wrapped_handler 

1244 

1245 # Delegate to subclass implementation first. For standalone Redis 

1246 # this raises on failure, keeping tracking state clean. For cluster 

1247 # implementations the operation is best-effort (partial failures are 

1248 # logged, not raised) so tracking state is always updated afterwards. 

1249 self._execute_subscribe(patterns, exact_channels) 

1250 self._track_subscribe(patterns, exact_channels) 

1251 

1252 @abstractmethod 

1253 def _execute_subscribe( 

1254 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

1255 ) -> None: 

1256 """ 

1257 Execute the subscribe operation. 

1258 

1259 Args: 

1260 patterns: Dict mapping pattern strings to handlers (for psubscribe) 

1261 exact_channels: Dict mapping channel strings to handlers (for subscribe) 

1262 """ 

1263 pass 

1264 

1265 def unsubscribe(self, *channels: ChannelT): 

1266 """ 

1267 Unsubscribe from keyspace notification channels. 

1268 

1269 Automatically detects whether each channel is a pattern or exact 

1270 channel and uses the appropriate Redis unsubscribe command. 

1271 

1272 Args: 

1273 *channels: Channels to unsubscribe from. 

1274 """ 

1275 patterns = [] 

1276 exact_channels = [] 

1277 

1278 for channel in channels: 

1279 if hasattr(channel, "_channel_str"): 

1280 channel_str = str(channel) 

1281 else: 

1282 channel_str = safe_str(channel) 

1283 if _is_pattern(channel): 

1284 patterns.append(channel_str) 

1285 else: 

1286 exact_channels.append(channel_str) 

1287 

1288 # Delegate to subclass implementation first. For standalone Redis 

1289 # this raises on failure, keeping tracking state intact. For cluster 

1290 # implementations the operation is best-effort (partial failures are 

1291 # logged, not raised) so tracking state is always removed afterwards 

1292 # — this is intentional: the user asked to unsubscribe, so 

1293 # refresh_subscriptions should not re-subscribe these channels. 

1294 self._execute_unsubscribe(patterns, exact_channels) 

1295 self._untrack_subscribe(patterns, exact_channels) 

1296 

1297 @abstractmethod 

1298 def _execute_unsubscribe( 

1299 self, patterns: list[str], exact_channels: list[str] 

1300 ) -> None: 

1301 """ 

1302 Execute the unsubscribe operation. 

1303 

1304 Args: 

1305 patterns: List of pattern strings to punsubscribe from 

1306 exact_channels: List of channel strings to unsubscribe from 

1307 """ 

1308 pass 

1309 

1310 def _track_subscribe( 

1311 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

1312 ) -> None: 

1313 """Track newly subscribed patterns/channels. 

1314 

1315 Override in subclasses that need to maintain their own subscription 

1316 registry (e.g. cluster implementations that must re-subscribe 

1317 new/failed-over nodes). The default is a no-op because standalone 

1318 implementations delegate tracking to the underlying PubSub object. 

1319 """ 

1320 

1321 def _untrack_subscribe( 

1322 self, patterns: list[str], exact_channels: list[str] 

1323 ) -> None: 

1324 """Remove patterns/channels from the subscription registry. 

1325 

1326 Override in subclasses that maintain their own subscription registry. 

1327 The default is a no-op. 

1328 """ 

1329 

1330 def subscribe_keyspace( 

1331 self, 

1332 key_or_pattern: str, 

1333 db: int = 0, 

1334 handler: SyncHandlerT | None = None, 

1335 ): 

1336 """ 

1337 Subscribe to keyspace notifications for specific keys. 

1338 

1339 Args: 

1340 key_or_pattern: The key or pattern to monitor. Use '*' for wildcards. 

1341 db: The database number (default 0). 

1342 handler: Optional callback for notifications. 

1343 

1344 Example: 

1345 >>> ksn.subscribe_keyspace("user:123", db=0) 

1346 >>> ksn.subscribe_keyspace("user:*", db=0) 

1347 """ 

1348 channel = KeyspaceChannel(key_or_pattern, db=db) 

1349 self.subscribe(channel, handler=handler) 

1350 

1351 def subscribe_keyevent( 

1352 self, 

1353 event: str, 

1354 db: int = 0, 

1355 handler: SyncHandlerT | None = None, 

1356 ): 

1357 """ 

1358 Subscribe to keyevent notifications for specific event types. 

1359 

1360 Args: 

1361 event: The event type to monitor (e.g., EventType.SET or "set") 

1362 db: The database number (default 0). 

1363 handler: Optional callback for notifications. 

1364 

1365 Example: 

1366 >>> ksn.subscribe_keyevent(EventType.SET) 

1367 >>> ksn.subscribe_keyevent(EventType.EXPIRED, handler=my_handler) 

1368 """ 

1369 channel = KeyeventChannel(event, db=db) 

1370 self.subscribe(channel, handler=handler) 

1371 

1372 def subscribe_subkeyspace( 

1373 self, 

1374 key_or_pattern: str, 

1375 db: int = 0, 

1376 handler: SyncHandlerT | None = None, 

1377 ): 

1378 """ 

1379 Subscribe to subkeyspace notifications for specific keys. 

1380 

1381 Receives events with affected subkeys (fields) for the given key. 

1382 

1383 Args: 

1384 key_or_pattern: The key or pattern to monitor. 

1385 db: The database number (default 0). 

1386 handler: Optional callback for notifications. 

1387 """ 

1388 channel = SubkeyspaceChannel(key_or_pattern, db=db) 

1389 self.subscribe(channel, handler=handler) 

1390 

1391 def subscribe_subkeyevent( 

1392 self, 

1393 event: str, 

1394 db: int = 0, 

1395 handler: SyncHandlerT | None = None, 

1396 ): 

1397 """ 

1398 Subscribe to subkeyevent notifications for specific event types. 

1399 

1400 Receives the affected key and subkeys when the given event occurs. 

1401 

1402 Args: 

1403 event: The event type to monitor (e.g., "hset", "hdel"). 

1404 db: The database number (default 0). 

1405 handler: Optional callback for notifications. 

1406 """ 

1407 channel = SubkeyeventChannel(event, db=db) 

1408 self.subscribe(channel, handler=handler) 

1409 

1410 def subscribe_subkeyspaceitem( 

1411 self, 

1412 key_or_pattern: str, 

1413 subkey_or_pattern: str, 

1414 db: int = 0, 

1415 handler: SyncHandlerT | None = None, 

1416 ): 

1417 """ 

1418 Subscribe to subkeyspaceitem notifications for a specific subkey. 

1419 

1420 Receives the event type when the given subkey of the given key is 

1421 modified. 

1422 

1423 Args: 

1424 key_or_pattern: The key or pattern to monitor. 

1425 subkey_or_pattern: The subkey (field) or pattern to monitor. 

1426 db: The database number (default 0). 

1427 handler: Optional callback for notifications. 

1428 """ 

1429 channel = SubkeyspaceitemChannel(key_or_pattern, subkey_or_pattern, db=db) 

1430 self.subscribe(channel, handler=handler) 

1431 

1432 def subscribe_subkeyspaceevent( 

1433 self, 

1434 event: str, 

1435 key_or_pattern: str, 

1436 db: int = 0, 

1437 handler: SyncHandlerT | None = None, 

1438 ): 

1439 """ 

1440 Subscribe to subkeyspaceevent notifications for an event on a key. 

1441 

1442 Receives the affected subkeys when the given event occurs on the 

1443 given key. 

1444 

1445 Args: 

1446 event: The event type to monitor. 

1447 key_or_pattern: The key or pattern to monitor. 

1448 db: The database number (default 0). 

1449 handler: Optional callback for notifications. 

1450 """ 

1451 channel = SubkeyspaceeventChannel(event, key_or_pattern, db=db) 

1452 self.subscribe(channel, handler=handler) 

1453 

1454 def __enter__(self): 

1455 return self 

1456 

1457 def __exit__(self, _exc_type, _exc_val, _exc_tb): 

1458 self.close() 

1459 return False 

1460 

1461 def run_in_thread( 

1462 self, 

1463 poll_timeout: float = 0.0, 

1464 daemon: bool = False, 

1465 exception_handler: Callable[ 

1466 [ 

1467 BaseException, 

1468 KeyspaceNotificationsInterface, 

1469 KeyspaceWorkerThread, 

1470 ], 

1471 None, 

1472 ] 

1473 | None = None, 

1474 ) -> KeyspaceWorkerThread: 

1475 """ 

1476 Start a background thread that polls for notifications and triggers handlers. 

1477 

1478 This method spawns a thread that continuously calls get_message() to 

1479 process incoming notifications. When a notification arrives, any 

1480 registered handler for that channel/pattern is invoked automatically. 

1481 

1482 All subscriptions must have handlers registered before calling this method. 

1483 

1484 Args: 

1485 poll_timeout: Timeout in seconds for get_message() calls. When no message 

1486 is available, the thread waits up to this long before checking 

1487 again. Default 0.0 (non-blocking). WARNING: the default 

1488 causes a CPU spin-loop. It is preferred to pass a positive 

1489 value (e.g. 0.1 or 1.0). 

1490 daemon: If True, the thread will be a daemon thread and will be 

1491 terminated when the main program exits. Default False. 

1492 exception_handler: Optional callback invoked when an exception occurs 

1493 in the worker thread. Receives (exception, notifications, 

1494 thread) as arguments. If None, exceptions are raised. 

1495 

1496 Returns: 

1497 KeyspaceWorkerThread: The started worker thread. Call stop() on it 

1498 to stop the thread and close the notifications. 

1499 

1500 Raises: 

1501 RedisError: If any subscription doesn't have a handler registered. 

1502 

1503 Example: 

1504 >>> def my_handler(notification): 

1505 ... print(f"Got: {notification.key} - {notification.event_type}") 

1506 >>> 

1507 >>> notifications.subscribe(KeyspaceChannel("user:*"), handler=my_handler) 

1508 >>> thread = notifications.run_in_thread(poll_timeout=0.1, daemon=True) 

1509 >>> # ... handlers are called automatically ... 

1510 >>> thread.stop() 

1511 """ 

1512 self._validate_all_handlers() 

1513 

1514 thread = KeyspaceWorkerThread( 

1515 self, 

1516 poll_timeout, 

1517 daemon=daemon, 

1518 exception_handler=exception_handler, 

1519 ) 

1520 thread.start() 

1521 return thread 

1522 

1523 @abstractmethod 

1524 def _validate_all_handlers(self) -> None: 

1525 """Raise :class:`~redis.RedisError` if any subscription lacks a handler. 

1526 

1527 Subclasses inspect their own subscription state to perform 

1528 the validation. 

1529 """ 

1530 pass 

1531 

1532 

1533class KeyspaceWorkerThread(threading.Thread): 

1534 """ 

1535 Background thread for processing keyspace notifications. 

1536 

1537 This thread continuously polls for notifications and invokes registered 

1538 handlers. It works with both KeyspaceNotifications (standalone) and 

1539 ClusterKeyspaceNotifications. 

1540 

1541 Example: 

1542 >>> thread = notifications.run_in_thread(poll_timeout=0.1) 

1543 >>> # ... handlers are called automatically ... 

1544 >>> thread.stop() 

1545 """ 

1546 

1547 def __init__( 

1548 self, 

1549 notifications: KeyspaceNotificationsInterface, 

1550 poll_timeout: float, 

1551 daemon: bool = False, 

1552 exception_handler: Callable[ 

1553 [ 

1554 BaseException, 

1555 KeyspaceNotificationsInterface, 

1556 KeyspaceWorkerThread, 

1557 ], 

1558 None, 

1559 ] 

1560 | None = None, 

1561 ): 

1562 super().__init__() 

1563 self.daemon = daemon 

1564 self.notifications = notifications 

1565 self.poll_timeout = poll_timeout 

1566 self.exception_handler = exception_handler 

1567 self._running = threading.Event() 

1568 

1569 def run(self) -> None: 

1570 """Main loop that polls for notifications and triggers handlers.""" 

1571 if self._running.is_set(): 

1572 return 

1573 self._running.set() 

1574 notifications = self.notifications 

1575 poll_timeout = self.poll_timeout 

1576 while self._running.is_set(): 

1577 try: 

1578 notifications.get_message( 

1579 ignore_subscribe_messages=True, timeout=poll_timeout 

1580 ) 

1581 except BaseException as e: 

1582 if self.exception_handler is None: 

1583 raise 

1584 self.exception_handler(e, notifications, self) 

1585 notifications.close() 

1586 

1587 def stop(self) -> None: 

1588 """ 

1589 Stop the worker thread. 

1590 

1591 This signals the thread to exit its run loop. The thread will close 

1592 the notifications object before terminating. 

1593 """ 

1594 self._running.clear() 

1595 

1596 

1597# ============================================================================= 

1598# Standalone Keyspace Notification Manager 

1599# ============================================================================= 

1600 

1601 

1602class KeyspaceNotifications(AbstractKeyspaceNotifications): 

1603 """ 

1604 Manages keyspace notification subscriptions for standalone Redis. 

1605 

1606 For standalone Redis, keyspace notifications work with a single PubSub 

1607 connection. This class wraps that connection and provides: 

1608 - Automatic pattern vs exact channel detection 

1609 - KeyNotification parsing with optional key_prefix filtering 

1610 - Convenience methods for keyspace and keyevent subscriptions 

1611 - Context manager and run_in_thread support 

1612 """ 

1613 

1614 def __init__( 

1615 self, 

1616 redis_client: Redis, 

1617 key_prefix: str | bytes | None = None, 

1618 ignore_subscribe_messages: bool = True, 

1619 ): 

1620 """ 

1621 Initialize the standalone keyspace notification manager. 

1622 

1623 Note: Keyspace notifications must be enabled on the Redis server via 

1624 the ``notify-keyspace-events`` configuration option. This is a server-side 

1625 configuration that should be done by your infrastructure/operations team. 

1626 

1627 Args: 

1628 redis_client: A Redis client instance 

1629 key_prefix: Optional prefix to filter and strip from keys in notifications 

1630 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations 

1631 are not returned by get_message/listen 

1632 """ 

1633 super().__init__(key_prefix, ignore_subscribe_messages) 

1634 self.redis = redis_client 

1635 

1636 # Create the PubSub instance with ignore_subscribe_messages=False 

1637 # so that the per-call argument in get_message() can control behavior 

1638 self._pubsub = redis_client.pubsub(ignore_subscribe_messages=False) 

1639 

1640 def _execute_subscribe( 

1641 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

1642 ) -> None: 

1643 """Execute subscribe on the single pubsub connection.""" 

1644 if patterns: 

1645 self._pubsub.psubscribe(**patterns) 

1646 if exact_channels: 

1647 self._pubsub.subscribe(**exact_channels) 

1648 

1649 def _execute_unsubscribe( 

1650 self, patterns: list[str], exact_channels: list[str] 

1651 ) -> None: 

1652 """Execute unsubscribe on the single pubsub connection.""" 

1653 if patterns: 

1654 self._pubsub.punsubscribe(*patterns) 

1655 if exact_channels: 

1656 self._pubsub.unsubscribe(*exact_channels) 

1657 

1658 def get_message( 

1659 self, 

1660 ignore_subscribe_messages: bool | None = None, 

1661 timeout: float = 0.0, 

1662 ) -> KeyNotification | None: 

1663 """ 

1664 Get the next keyspace notification if one is available. 

1665 

1666 Note: If a handler was registered for the channel, pubsub will call 

1667 the handler directly and this method returns None for that message. 

1668 

1669 Args: 

1670 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages. 

1671 Defaults to the value set in __init__ (True). 

1672 timeout: Time to wait for a message. 

1673 

1674 Returns: 

1675 A KeyNotification if a notification is available and no handler 

1676 was registered for the channel, None otherwise. 

1677 """ 

1678 if ignore_subscribe_messages is None: 

1679 ignore_subscribe_messages = self.ignore_subscribe_messages 

1680 

1681 if self._closed: 

1682 return None 

1683 

1684 # Pubsub's get_message will call wrapped handlers directly for channels 

1685 # with registered handlers and return None. For channels without handlers, 

1686 # it returns the raw message which we parse to KeyNotification. 

1687 message = self._pubsub.get_message( 

1688 ignore_subscribe_messages=ignore_subscribe_messages, 

1689 timeout=timeout, 

1690 ) 

1691 

1692 if message is not None: 

1693 return KeyNotification.from_message(message, key_prefix=self.key_prefix) 

1694 

1695 return None 

1696 

1697 def listen(self): 

1698 """ 

1699 Listen for keyspace notifications. 

1700 

1701 This is a generator that yields KeyNotification objects as they arrive. 

1702 It blocks until a notification is received. 

1703 

1704 Yields: 

1705 KeyNotification objects for each keyspace/keyevent notification. 

1706 

1707 Example: 

1708 >>> for notification in ksn.listen(): 

1709 ... print(f"{notification.key}: {notification.event_type}") 

1710 """ 

1711 while self.subscribed: 

1712 notification = self.get_message(timeout=1.0) 

1713 if notification is not None: 

1714 yield notification 

1715 

1716 @property 

1717 def subscribed(self) -> bool: 

1718 """Check if there are any active subscriptions and not closed.""" 

1719 return not self._closed and self._pubsub.subscribed 

1720 

1721 def _validate_all_handlers(self) -> None: 

1722 """Raise if any subscription in the underlying PubSub lacks a handler.""" 

1723 for channel, handler in self._pubsub.channels.items(): 

1724 if handler is None: 

1725 raise RedisError(f"Channel '{channel}' has no handler registered") 

1726 for pattern, handler in self._pubsub.patterns.items(): 

1727 if handler is None: 

1728 raise RedisError(f"Pattern '{pattern}' has no handler registered") 

1729 

1730 def close(self): 

1731 """Close the pubsub connection and clean up resources.""" 

1732 self._closed = True 

1733 try: 

1734 self._pubsub.close() 

1735 except Exception: 

1736 pass 

1737 

1738 

1739# ============================================================================= 

1740# Cluster-Aware Keyspace Notification Manager 

1741# ============================================================================= 

1742 

1743 

1744class ClusterKeyspaceNotifications(AbstractKeyspaceNotifications): 

1745 """ 

1746 Manages keyspace notification subscriptions across all nodes in a Redis Cluster. 

1747 

1748 In Redis Cluster, keyspace notifications are NOT broadcast between nodes. 

1749 Each node only emits notifications for keys it owns. This class automatically 

1750 subscribes to all primary nodes in the cluster and handles topology changes. 

1751 """ 

1752 

1753 def __init__( 

1754 self, 

1755 redis_cluster: RedisCluster, 

1756 key_prefix: str | bytes | None = None, 

1757 ignore_subscribe_messages: bool = True, 

1758 ): 

1759 """ 

1760 Initialize the cluster keyspace notification manager. 

1761 

1762 Note: Keyspace notifications must be enabled on all Redis cluster nodes via 

1763 the ``notify-keyspace-events`` configuration option. This is a server-side 

1764 configuration that should be done by your infrastructure/operations team. 

1765 

1766 Args: 

1767 redis_cluster: A RedisCluster instance 

1768 key_prefix: Optional prefix to filter and strip from keys in notifications 

1769 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations 

1770 are not returned by get_message/listen 

1771 """ 

1772 super().__init__(key_prefix, ignore_subscribe_messages) 

1773 self.cluster = redis_cluster 

1774 

1775 # Canonical subscription registry: pattern/channel -> wrapped handler. 

1776 # In cluster mode there are multiple PubSub objects (one per node), so 

1777 # this is the single source of truth used to (re-)subscribe new or 

1778 # failed-over nodes. 

1779 self._subscribed_patterns: dict[str, Any] = {} 

1780 self._subscribed_channels: dict[str, Any] = {} 

1781 

1782 # Track subscriptions per node 

1783 self._node_pubsubs: dict[str, Any] = {} 

1784 

1785 # Lock for topology refresh operations 

1786 self._refresh_lock = threading.Lock() 

1787 

1788 # Current pubsub index for round-robin polling 

1789 self._poll_index = 0 

1790 

1791 @property 

1792 def subscribed(self) -> bool: 

1793 """Check if there are any active subscriptions and not closed.""" 

1794 return not self._closed and bool( 

1795 self._subscribed_patterns or self._subscribed_channels 

1796 ) 

1797 

1798 def _track_subscribe( 

1799 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

1800 ) -> None: 

1801 """Track newly subscribed patterns/channels in the cluster registry.""" 

1802 if patterns: 

1803 self._subscribed_patterns.update(patterns) 

1804 if exact_channels: 

1805 self._subscribed_channels.update(exact_channels) 

1806 

1807 def _untrack_subscribe( 

1808 self, patterns: list[str], exact_channels: list[str] 

1809 ) -> None: 

1810 """Remove patterns/channels from the cluster registry.""" 

1811 for p in patterns: 

1812 self._subscribed_patterns.pop(p, None) 

1813 for c in exact_channels: 

1814 self._subscribed_channels.pop(c, None) 

1815 

1816 def _validate_all_handlers(self) -> None: 

1817 """Raise if any subscription in the cluster registry lacks a handler.""" 

1818 for channel, handler in self._subscribed_channels.items(): 

1819 if handler is None: 

1820 raise RedisError(f"Channel '{channel}' has no handler registered") 

1821 for pattern, handler in self._subscribed_patterns.items(): 

1822 if handler is None: 

1823 raise RedisError(f"Pattern '{pattern}' has no handler registered") 

1824 

1825 def _get_all_primary_nodes(self): 

1826 """Get all primary nodes in the cluster.""" 

1827 return self.cluster.get_primaries() 

1828 

1829 def _cleanup_node(self, node_name: str) -> None: 

1830 """Remove and close a node's PubSub. 

1831 

1832 Closing the ``PubSub`` disconnects its connection so it is not 

1833 left in a subscribed state inside the connection pool. 

1834 """ 

1835 pubsub = self._node_pubsubs.pop(node_name, None) 

1836 if pubsub: 

1837 try: 

1838 pubsub.close() 

1839 except Exception: 

1840 pass 

1841 

1842 def _ensure_node_pubsub(self, node) -> Any: 

1843 """Get or create a PubSub instance for a node.""" 

1844 if node.name not in self._node_pubsubs: 

1845 redis_conn = self.cluster.get_redis_connection(node) 

1846 # Always create PubSub with ignore_subscribe_messages=False 

1847 # so that the per-call argument in get_message() can control 

1848 # the behavior reliably 

1849 pubsub = redis_conn.pubsub(ignore_subscribe_messages=False) 

1850 self._node_pubsubs[node.name] = pubsub 

1851 return self._node_pubsubs[node.name] 

1852 

1853 def _execute_subscribe( 

1854 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

1855 ) -> None: 

1856 """Execute subscribe on all cluster nodes. 

1857 

1858 Patterns and exact channels are subscribed in a single pass over 

1859 nodes so that a mid-batch node failure cannot create a 

1860 partially-caught-up replacement. If a node fails during this 

1861 call it is removed from ``_node_pubsubs`` and will be fully 

1862 re-subscribed on the next ``refresh_subscriptions`` cycle. 

1863 

1864 If a newly discovered node is encountered (not yet in 

1865 ``_node_pubsubs``), it is also subscribed to all *previously* 

1866 tracked patterns/channels so it doesn't miss notifications for 

1867 subscriptions that were established before this node joined. 

1868 """ 

1869 if not patterns and not exact_channels: 

1870 return 

1871 

1872 failed_nodes: list[str] = [] 

1873 for node in self._get_all_primary_nodes(): 

1874 is_new_node = node.name not in self._node_pubsubs 

1875 pubsub = self._ensure_node_pubsub(node) 

1876 try: 

1877 # If this is a brand-new node, catch it up on existing 

1878 # subscriptions before adding the new channels. 

1879 if is_new_node: 

1880 if self._subscribed_patterns: 

1881 pubsub.psubscribe(**self._subscribed_patterns) 

1882 if self._subscribed_channels: 

1883 pubsub.subscribe(**self._subscribed_channels) 

1884 

1885 if patterns: 

1886 pubsub.psubscribe(**patterns) 

1887 if exact_channels: 

1888 pubsub.subscribe(**exact_channels) 

1889 except Exception: 

1890 # Remove the broken pubsub so refresh_subscriptions can 

1891 # re-create it later. 

1892 self._cleanup_node(node.name) 

1893 failed_nodes.append(node.name) 

1894 

1895 if failed_nodes: 

1896 logger.warning( 

1897 "Failed to subscribe on cluster nodes: %s. " 

1898 "These nodes will be retried on the next refresh cycle.", 

1899 ", ".join(failed_nodes), 

1900 ) 

1901 

1902 def _execute_unsubscribe( 

1903 self, patterns: list[str], exact_channels: list[str] 

1904 ) -> None: 

1905 """Execute unsubscribe on all cluster nodes.""" 

1906 if patterns: 

1907 self._unsubscribe_from_all_nodes(patterns, use_punsubscribe=True) 

1908 if exact_channels: 

1909 self._unsubscribe_from_all_nodes(exact_channels, use_punsubscribe=False) 

1910 

1911 def _unsubscribe_from_all_nodes(self, channels: list[str], use_punsubscribe: bool): 

1912 """Unsubscribe from patterns/channels on all nodes. 

1913 

1914 Best-effort: tries every node so that a single broken connection 

1915 does not prevent the remaining nodes from being unsubscribed. 

1916 Broken pubsubs are cleaned up; the tracking state is still removed 

1917 by the caller, so ``refresh_subscriptions`` will *not* re-subscribe 

1918 these channels on replacement nodes. 

1919 """ 

1920 failed_nodes: list[str] = [] 

1921 for node_name, pubsub in list(self._node_pubsubs.items()): 

1922 try: 

1923 if use_punsubscribe: 

1924 pubsub.punsubscribe(*channels) 

1925 else: 

1926 pubsub.unsubscribe(*channels) 

1927 except Exception: 

1928 self._cleanup_node(node_name) 

1929 failed_nodes.append(node_name) 

1930 

1931 if failed_nodes: 

1932 logger.warning( 

1933 "Failed to unsubscribe on cluster nodes: %s. " 

1934 "These nodes will be re-created on the next refresh cycle.", 

1935 ", ".join(failed_nodes), 

1936 ) 

1937 

1938 def get_message( 

1939 self, 

1940 ignore_subscribe_messages: bool | None = None, 

1941 timeout: float = 0.0, 

1942 ) -> KeyNotification | None: 

1943 """ 

1944 Get the next keyspace notification if one is available. 

1945 

1946 This method polls all node pubsubs in round-robin fashion until 

1947 a message is received or the timeout expires. 

1948 If a connection error occurs, subscriptions are automatically refreshed. 

1949 

1950 Args: 

1951 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages. 

1952 Defaults to the value set in __init__ (True). 

1953 timeout: Total time to wait for a message (distributed across all nodes) 

1954 

1955 Returns: 

1956 A KeyNotification if a notification is available, None otherwise. 

1957 """ 

1958 if self._closed: 

1959 return None 

1960 

1961 total_nodes = len(self._node_pubsubs) 

1962 if total_nodes == 0: 

1963 # Sleep for the requested timeout so callers that loop 

1964 # (run_in_thread, listen) don't spin the CPU when all node 

1965 # connections have been cleaned up. 

1966 if timeout > 0: 

1967 time.sleep(timeout) 

1968 return None 

1969 

1970 # Use instance default if not specified 

1971 if ignore_subscribe_messages is None: 

1972 ignore_subscribe_messages = self.ignore_subscribe_messages 

1973 

1974 # Handle timeout=0 as a single non-blocking poll over all pubsubs 

1975 # This matches the expected semantics of PubSub.get_message(timeout=0) 

1976 if timeout == 0.0: 

1977 return self._poll_all_nodes_once(ignore_subscribe_messages) 

1978 

1979 # Calculate per-node timeout for each poll 

1980 # Use a small timeout per node to allow round-robin polling 

1981 per_node_timeout = min(0.1, timeout / max(total_nodes, 1)) 

1982 

1983 start_time = time.monotonic() 

1984 end_time = start_time + timeout 

1985 

1986 while True: 

1987 # Check if we've exceeded the total timeout 

1988 if time.monotonic() >= end_time: 

1989 return None 

1990 

1991 pubsubs = list(self._node_pubsubs.values()) 

1992 if not pubsubs: 

1993 return None 

1994 

1995 # Round-robin polling 

1996 self._poll_index = self._poll_index % len(pubsubs) 

1997 pubsub = pubsubs[self._poll_index] 

1998 self._poll_index += 1 

1999 

2000 try: 

2001 message = pubsub.get_message( 

2002 ignore_subscribe_messages=ignore_subscribe_messages, 

2003 timeout=per_node_timeout, 

2004 ) 

2005 except (ConnectionError, TimeoutError, RedisError): 

2006 # Connection error - refresh subscriptions and continue 

2007 self._refresh_subscriptions_on_error() 

2008 continue 

2009 

2010 if message is not None: 

2011 # Note: If a handler was registered, PubSub already invoked it 

2012 # and returned None, so we only reach here for handler-less subscriptions 

2013 notification = KeyNotification.from_message( 

2014 message, key_prefix=self.key_prefix 

2015 ) 

2016 if notification is not None: 

2017 return notification 

2018 # If not a keyspace notification, continue checking other nodes 

2019 

2020 def _poll_all_nodes_once( 

2021 self, ignore_subscribe_messages: bool 

2022 ) -> KeyNotification | None: 

2023 """ 

2024 Perform a single non-blocking poll over all node pubsubs. 

2025 

2026 This is used when timeout=0 to match the expected semantics of 

2027 PubSub.get_message(timeout=0) - a non-blocking check for messages. 

2028 

2029 Returns: 

2030 A KeyNotification if one is available, None otherwise. 

2031 """ 

2032 had_error = False 

2033 for pubsub in list(self._node_pubsubs.values()): 

2034 try: 

2035 message = pubsub.get_message( 

2036 ignore_subscribe_messages=ignore_subscribe_messages, 

2037 timeout=0.0, 

2038 ) 

2039 except (ConnectionError, TimeoutError, RedisError): 

2040 # Record the error but continue polling remaining healthy 

2041 # nodes so that already-buffered notifications are not lost. 

2042 had_error = True 

2043 continue 

2044 

2045 if message is not None: 

2046 # Note: If a handler was registered, PubSub already invoked it 

2047 # and returned None, so we only reach here for handler-less subscriptions 

2048 notification = KeyNotification.from_message( 

2049 message, key_prefix=self.key_prefix 

2050 ) 

2051 if notification is not None: 

2052 # Refresh before returning if any node had an error, 

2053 # so the next poll cycle has fresh state. 

2054 if had_error: 

2055 self._refresh_subscriptions_on_error() 

2056 return notification 

2057 

2058 # Refresh after polling all nodes if any had errors 

2059 if had_error: 

2060 self._refresh_subscriptions_on_error() 

2061 return None 

2062 

2063 def listen(self): 

2064 """ 

2065 Listen for keyspace notifications from all cluster nodes. 

2066 

2067 This is a generator that yields KeyNotification objects as they arrive. 

2068 It blocks until a notification is received. 

2069 

2070 Yields: 

2071 KeyNotification objects for each keyspace/keyevent notification. 

2072 

2073 Example: 

2074 >>> for notification in ksn.listen(): 

2075 ... print(f"{notification.key}: {notification.event_type}") 

2076 """ 

2077 while self.subscribed: 

2078 notification = self.get_message(timeout=1.0) 

2079 if notification is not None: 

2080 yield notification 

2081 

2082 def _refresh_subscriptions_on_error(self): 

2083 """ 

2084 Refresh subscriptions after a connection error. 

2085 

2086 This is called automatically when a connection error occurs during 

2087 get_message(). It checks if nodes changed before refreshing. 

2088 """ 

2089 self._poll_index = 0 # Reset round-robin index 

2090 

2091 try: 

2092 self.refresh_subscriptions() 

2093 except Exception: 

2094 logger.warning( 

2095 "Failed to refresh cluster subscriptions, will retry on next error", 

2096 exc_info=True, 

2097 ) 

2098 

2099 def _is_pubsub_connected(self, pubsub) -> bool: 

2100 """Check if a pubsub connection is still alive.""" 

2101 try: 

2102 conn = pubsub.connection 

2103 if conn is None: 

2104 return False 

2105 return conn.is_connected 

2106 except Exception: 

2107 return False 

2108 

2109 def refresh_subscriptions(self): 

2110 """ 

2111 Refresh subscriptions after a topology change. 

2112 

2113 This method is called automatically when topology changes are detected 

2114 or when connection errors occur. You can also call it manually if needed. 

2115 

2116 This method: 

2117 1. Discovers any new primary nodes and subscribes them 

2118 2. Removes pubsubs for nodes that are no longer primaries 

2119 3. Re-creates broken pubsub connections for existing nodes 

2120 """ 

2121 with self._refresh_lock: 

2122 current_primaries = { 

2123 node.name: node for node in self._get_all_primary_nodes() 

2124 } 

2125 

2126 # Remove pubsubs for nodes that are no longer primaries 

2127 removed_nodes = set(self._node_pubsubs.keys()) - set( 

2128 current_primaries.keys() 

2129 ) 

2130 for node_name in removed_nodes: 

2131 self._cleanup_node(node_name) 

2132 

2133 # Detect broken connections for existing nodes and remove them 

2134 # so they get re-created below 

2135 existing_nodes = set(self._node_pubsubs.keys()) & set( 

2136 current_primaries.keys() 

2137 ) 

2138 for node_name in existing_nodes: 

2139 pubsub = self._node_pubsubs.get(node_name) 

2140 if pubsub and not self._is_pubsub_connected(pubsub): 

2141 # Connection is broken, remove it so it gets re-created 

2142 self._cleanup_node(node_name) 

2143 

2144 # Subscribe new nodes (and nodes with broken connections) to existing 

2145 # patterns/channels 

2146 new_nodes = set(current_primaries.keys()) - set(self._node_pubsubs.keys()) 

2147 failed_nodes: list[str] = [] 

2148 for node_name in new_nodes: 

2149 node = current_primaries[node_name] 

2150 pubsub = self._ensure_node_pubsub(node) 

2151 

2152 try: 

2153 if self._subscribed_patterns: 

2154 pubsub.psubscribe(**self._subscribed_patterns) 

2155 if self._subscribed_channels: 

2156 pubsub.subscribe(**self._subscribed_channels) 

2157 except Exception: 

2158 # Subscription failed - remove from dict so retry is possible 

2159 self._cleanup_node(node_name) 

2160 failed_nodes.append(node_name) 

2161 

2162 # Raise after attempting all nodes so we don't skip any 

2163 if failed_nodes: 

2164 raise ConnectionError( 

2165 f"Failed to subscribe to cluster nodes: {', '.join(failed_nodes)}" 

2166 ) 

2167 

2168 def close(self): 

2169 """Close all pubsub connections and clean up resources.""" 

2170 self._closed = True 

2171 for node_name in list(self._node_pubsubs.keys()): 

2172 self._cleanup_node(node_name) 

2173 self._subscribed_patterns.clear() 

2174 self._subscribed_channels.clear()