Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/keyspace_notifications.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

590 statements  

1""" 

2Redis Keyspace Notifications support for redis-py. 

3 

4This module provides utilities for subscribing to and parsing Redis keyspace 

5notifications. Keyspace notifications allow clients to receive events when 

6keys are modified in Redis. 

7 

8Note: Keyspace notifications must be enabled on the Redis server via the 

9``notify-keyspace-events`` configuration option. This is a server-side 

10configuration that should be done by your infrastructure/operations team. 

11See the Redis documentation for details: 

12https://redis.io/docs/latest/develop/pubsub/keyspace-notifications/ 

13 

14Standalone Redis Example: 

15 >>> from redis import Redis 

16 >>> from redis.keyspace_notifications import ( 

17 ... KeyspaceNotifications, 

18 ... KeyspaceChannel, 

19 ... EventType, 

20 ... ) 

21 >>> 

22 >>> r = Redis() 

23 >>> # Server must have notify-keyspace-events configured (e.g., "KEA") 

24 >>> ksn = KeyspaceNotifications(r) 

25 >>> 

26 >>> # Subscribe using Channel class (patterns auto-detected) 

27 >>> channel = KeyspaceChannel("user:*") 

28 >>> ksn.subscribe(channel) 

29 >>> 

30 >>> # Or use convenience methods for specific event types 

31 >>> ksn.subscribe_keyevent(EventType.SET) 

32 >>> 

33 >>> for notification in ksn.listen(): 

34 ... print(f"Key: {notification.key}, Event: {notification.event_type}") 

35 

36Redis Cluster Example: 

37 >>> from redis.cluster import RedisCluster 

38 >>> from redis.keyspace_notifications import ( 

39 ... ClusterKeyspaceNotifications, 

40 ... KeyspaceChannel, 

41 ... EventType, 

42 ... ) 

43 >>> 

44 >>> rc = RedisCluster(host="localhost", port=7000) 

45 >>> # Server must have notify-keyspace-events configured (e.g., "KEA") 

46 >>> ksn = ClusterKeyspaceNotifications(rc) 

47 >>> 

48 >>> # Subscribe using Channel class (patterns auto-detected) 

49 >>> channel = KeyspaceChannel("user:*") 

50 >>> ksn.subscribe(channel) 

51 >>> 

52 >>> # Or use convenience methods for specific event types 

53 >>> ksn.subscribe_keyevent(EventType.SET) 

54 >>> 

55 >>> for notification in ksn.listen(): 

56 ... print(f"Key: {notification.key}, Event: {notification.event_type}") 

57""" 

58 

59from __future__ import annotations 

60 

61import logging 

62import re 

63import threading 

64import time 

65from abc import ABC, abstractmethod 

66from collections.abc import Callable 

67from dataclasses import dataclass 

68from enum import Enum 

69from typing import TYPE_CHECKING, Any, ClassVar, Union 

70 

71from redis.client import Redis 

72from redis.cluster import RedisCluster 

73from redis.exceptions import ( 

74 ConnectionError, 

75 RedisError, 

76 TimeoutError, 

77) 

78from redis.utils import safe_str 

79 

80logger = logging.getLogger(__name__) 

81 

82if TYPE_CHECKING: 

83 from typing import TypeAlias 

84 

85# Type alias for channel arguments - can be a string, bytes, or Channel object 

86# This is defined here and the actual types are added after class definitions 

87ChannelT: TypeAlias = Union[str, bytes, "KeyspaceChannel", "KeyeventChannel"] 

88 

89 

90# Type alias for sync handlers 

91SyncHandlerT = Callable[["KeyNotification"], None] 

92 

93 

94# ============================================================================= 

95# Event Type Constants 

96# ============================================================================= 

97# These are common Redis keyspace notification event types provided for 

98# convenience. You can use any string as an event type - these constants 

99# are not exhaustive and Redis may add new events in future versions. 

100 

101 

102class EventType: 

103 """ 

104 Common Redis keyspace notification event type constants. 

105 

106 These are provided for convenience and IDE autocomplete. You can use 

107 any string as an event type - new Redis events will work without 

108 needing library updates. 

109 """ 

110 

111 # String commands 

112 SET = "set" 

113 SETEX = "setex" 

114 SETNX = "setnx" 

115 SETRANGE = "setrange" 

116 INCR = "incr" 

117 INCRBY = "incrby" 

118 INCRBYFLOAT = "incrbyfloat" 

119 DECR = "decr" 

120 DECRBY = "decrby" 

121 APPEND = "append" 

122 

123 # Generic commands 

124 DEL = "del" 

125 UNLINK = "unlink" 

126 RENAME = "rename" 

127 RENAME_FROM = "rename_from" 

128 RENAME_TO = "rename_to" 

129 COPY_TO = "copy_to" 

130 MOVE = "move" 

131 RESTORE = "restore" 

132 

133 # Expiration events 

134 EXPIRE = "expire" 

135 EXPIREAT = "expireat" 

136 PEXPIRE = "pexpire" 

137 PEXPIREAT = "pexpireat" 

138 EXPIRED = "expired" 

139 PERSIST = "persist" 

140 

141 # Eviction events 

142 EVICTED = "evicted" 

143 

144 # List commands 

145 LPUSH = "lpush" 

146 RPUSH = "rpush" 

147 LPOP = "lpop" 

148 RPOP = "rpop" 

149 LINSERT = "linsert" 

150 LSET = "lset" 

151 LTRIM = "ltrim" 

152 LMOVE = "lmove" 

153 BLPOP = "blpop" 

154 BRPOP = "brpop" 

155 BLMOVE = "blmove" 

156 

157 # Set commands 

158 SADD = "sadd" 

159 SREM = "srem" 

160 SPOP = "spop" 

161 SMOVE = "smove" 

162 SINTERSTORE = "sinterstore" 

163 SUNIONSTORE = "sunionstore" 

164 SDIFFSTORE = "sdiffstore" 

165 

166 # Sorted set commands 

167 ZADD = "zadd" 

168 ZINCRBY = "zincrby" 

169 ZREM = "zrem" 

170 ZREMRANGEBYRANK = "zremrangebyrank" 

171 ZREMRANGEBYSCORE = "zremrangebyscore" 

172 ZREMRANGEBYLEX = "zremrangebylex" 

173 ZPOPMIN = "zpopmin" 

174 ZPOPMAX = "zpopmax" 

175 BZPOPMIN = "bzpopmin" 

176 BZPOPMAX = "bzpopmax" 

177 ZINTERSTORE = "zinterstore" 

178 ZUNIONSTORE = "zunionstore" 

179 ZDIFFSTORE = "zdiffstore" 

180 ZRANGESTORE = "zrangestore" 

181 

182 # Hash commands 

183 HSET = "hset" 

184 HSETNX = "hsetnx" 

185 HDEL = "hdel" 

186 HINCRBY = "hincrby" 

187 HINCRBYFLOAT = "hincrbyfloat" 

188 

189 # Stream commands 

190 XADD = "xadd" 

191 XTRIM = "xtrim" 

192 XDEL = "xdel" 

193 XGROUP_CREATE = "xgroup-create" 

194 XGROUP_CREATECONSUMER = "xgroup-createconsumer" 

195 XGROUP_DELCONSUMER = "xgroup-delconsumer" 

196 XGROUP_DESTROY = "xgroup-destroy" 

197 XGROUP_SETID = "xgroup-setid" 

198 XSETID = "xsetid" 

199 XCLAIM = "xclaim" 

200 XAUTOCLAIM = "xautoclaim" 

201 XREADGROUP = "xreadgroup" 

202 

203 # Other 

204 NEW = "new" # Key created (when tracking new keys) 

205 SORTSTORE = "sortstore" 

206 GETEX = "getex" 

207 GETDEL = "getdel" 

208 SETIFGT = "setifgt" 

209 SETIFLT = "setiflt" 

210 SETIFEQ = "setifeq" 

211 SETIFNE = "setifne" 

212 

213 

214@dataclass 

215class KeyNotification: 

216 """ 

217 Represents a parsed Redis keyspace or keyevent notification. 

218 

219 This class provides convenient access to the notification details 

220 like key, event type, and database number. 

221 

222 Attributes: 

223 key: The Redis key that was affected (for keyspace notifications) 

224 or the key name from the message data (for keyevent notifications) 

225 event_type: The type of operation that occurred (e.g., "set", "del"). 

226 This is a plain string, so new Redis events work automatically. 

227 Compare against EventType constants or any string. 

228 database: The database number where the event occurred 

229 channel: The original channel name 

230 is_keyspace: True if this is a keyspace notification, False for keyevent 

231 """ 

232 

233 # Regex patterns for parsing keyspace/keyevent channels 

234 # Pattern: __keyspace@<db>__:<key> or __keyevent@<db>__:<event> 

235 _KEYSPACE_PATTERN: ClassVar[re.Pattern] = re.compile( 

236 r"^__keyspace@(\d+|\*)__:(.+)$" 

237 ) 

238 _KEYEVENT_PATTERN: ClassVar[re.Pattern] = re.compile( 

239 r"^__keyevent@(\d+|\*)__:(.+)$" 

240 ) 

241 

242 key: str 

243 event_type: str 

244 database: int 

245 channel: str 

246 is_keyspace: bool 

247 

248 @classmethod 

249 def from_message( 

250 cls, 

251 message: dict[str, Any] | None, 

252 key_prefix: str | bytes | None = None, 

253 ) -> KeyNotification | None: 

254 """ 

255 Parse a pub/sub message into a KeyNotification. 

256 

257 Args: 

258 message: A pub/sub message dict with 'channel', 'data', and 'type' keys 

259 key_prefix: Optional prefix to filter and strip from keys. 

260 If provided, only notifications for keys starting with 

261 this prefix will be returned, and the prefix will be 

262 stripped from the key. 

263 

264 Returns: 

265 A KeyNotification if the message is a valid keyspace/keyevent 

266 notification, None otherwise. 

267 

268 Example: 

269 >>> message = { 

270 ... 'type': 'pmessage', 

271 ... 'pattern': '__keyspace@0__:user:*', 

272 ... 'channel': '__keyspace@0__:user:123', 

273 ... 'data': 'set' 

274 ... } 

275 >>> notification = KeyNotification.from_message(message) 

276 >>> notification.key 

277 'user:123' 

278 >>> notification.event_type 

279 'set' 

280 """ 

281 if message is None: 

282 return None 

283 

284 msg_type = message.get("type") 

285 if msg_type not in ("message", "pmessage"): 

286 return None 

287 

288 channel = message.get("channel") 

289 data = message.get("data") 

290 

291 if channel is None or data is None: 

292 return None 

293 

294 return cls.try_parse(channel, data, key_prefix) 

295 

296 @classmethod 

297 def try_parse( 

298 cls, 

299 channel: str | bytes, 

300 data: str | bytes, 

301 key_prefix: str | bytes | None = None, 

302 ) -> KeyNotification | None: 

303 """ 

304 Try to parse a channel and data into a KeyNotification. 

305 

306 This is a lower-level method that takes the channel and data directly, 

307 useful when working with callback-based subscription handlers. 

308 

309 Args: 

310 channel: The channel name (e.g., "__keyspace@0__:mykey") 

311 data: The message data (event type for keyspace, key for keyevent) 

312 key_prefix: Optional prefix to filter and strip from keys 

313 

314 Returns: 

315 A KeyNotification if valid, None otherwise. 

316 """ 

317 channel = safe_str(channel) 

318 data = safe_str(data) 

319 

320 return cls._parse(channel, data, key_prefix) 

321 

322 @classmethod 

323 def _parse( 

324 cls, 

325 channel: str, 

326 data: str, 

327 key_prefix: str | bytes | None = None, 

328 ) -> KeyNotification | None: 

329 """Internal parsing logic.""" 

330 # Normalize key_prefix 

331 key_prefix = safe_str(key_prefix) if key_prefix else None 

332 

333 # Try keyspace pattern first: __keyspace@<db>__:<key> 

334 match = cls._KEYSPACE_PATTERN.match(channel) 

335 if match: 

336 db_str, key = match.groups() 

337 database = int(db_str) if db_str != "*" else -1 

338 event_type = data # For keyspace, the data is the event type 

339 

340 # Apply key prefix filter 

341 if key_prefix: 

342 if not key.startswith(key_prefix): 

343 return None 

344 key = key[len(key_prefix) :] 

345 

346 return cls( 

347 key=key, 

348 event_type=event_type, 

349 database=database, 

350 channel=channel, 

351 is_keyspace=True, 

352 ) 

353 

354 # Try keyevent pattern: __keyevent@<db>__:<event> 

355 match = cls._KEYEVENT_PATTERN.match(channel) 

356 if match: 

357 db_str, event_type = match.groups() 

358 database = int(db_str) if db_str != "*" else -1 

359 key = data # For keyevent, the data is the key 

360 

361 # Apply key prefix filter 

362 if key_prefix: 

363 if not key.startswith(key_prefix): 

364 return None 

365 key = key[len(key_prefix) :] 

366 

367 return cls( 

368 key=key, 

369 event_type=event_type, 

370 database=database, 

371 channel=channel, 

372 is_keyspace=False, 

373 ) 

374 

375 return None 

376 

377 def key_starts_with(self, prefix: str | bytes) -> bool: 

378 """Check if the key starts with the given prefix.""" 

379 prefix = safe_str(prefix) 

380 return self.key.startswith(prefix) 

381 

382 

383# ============================================================================= 

384# Channel Classes 

385# ============================================================================= 

386 

387 

388class KeyspaceChannel: 

389 """ 

390 Represents a keyspace notification channel for subscribing to events on keys. 

391 

392 Keyspace notifications publish the event type (e.g., "set", "del") as the message 

393 when a key matching the pattern is modified. 

394 

395 This class can be used directly with subscribe()/psubscribe() as it implements 

396 __str__ to return the channel string. 

397 

398 Attributes: 

399 key_or_pattern: The key or pattern to monitor (use '*' for wildcards) 

400 db: The database number (defaults to 0, the only database in Redis Cluster) 

401 is_pattern: Whether this channel contains wildcards 

402 

403 Examples: 

404 >>> channel = KeyspaceChannel("user:123", db=0) 

405 >>> str(channel) 

406 '__keyspace@0__:user:123' 

407 

408 >>> # Pattern subscription (wildcards are auto-detected) 

409 >>> channel = KeyspaceChannel("user:*", db=0) 

410 >>> str(channel) 

411 '__keyspace@0__:user:*' 

412 

413 >>> # Use with KeyspaceNotifications 

414 >>> notifications = KeyspaceNotifications(redis_client) 

415 >>> notifications.subscribe(channel) 

416 """ 

417 

418 PREFIX: ClassVar[str] = "__keyspace@" 

419 

420 def __init__(self, key_or_pattern: str, db: int = 0): 

421 """ 

422 Create a keyspace notification channel. 

423 

424 Args: 

425 key_or_pattern: The key or pattern to monitor. Use '*' for wildcards. 

426 db: The database number. Defaults to 0 (the only database in Redis Cluster). 

427 """ 

428 self.key_or_pattern = key_or_pattern 

429 self.db = db 

430 self._channel_str = self._build_channel_string() 

431 

432 def _build_channel_string(self) -> str: 

433 return f"{self.PREFIX}{self.db}__:{self.key_or_pattern}" 

434 

435 @property 

436 def is_pattern(self) -> bool: 

437 """Check if this channel contains wildcards and should use psubscribe.""" 

438 return _is_pattern(self.key_or_pattern) 

439 

440 def __str__(self) -> str: 

441 return self._channel_str 

442 

443 def __repr__(self) -> str: 

444 return f"KeyspaceChannel({self.key_or_pattern!r}, db={self.db})" 

445 

446 def __eq__(self, other: object) -> bool: 

447 if isinstance(other, KeyspaceChannel): 

448 return self._channel_str == other._channel_str 

449 if isinstance(other, str): 

450 return self._channel_str == other 

451 return NotImplemented 

452 

453 def __hash__(self) -> int: 

454 return hash(self._channel_str) 

455 

456 

457class KeyeventChannel: 

458 """ 

459 Represents a keyevent notification channel for subscribing to event types. 

460 

461 Keyevent notifications publish the key name as the message when the specified 

462 event type occurs on any key. 

463 

464 This class can be used directly with subscribe()/psubscribe() as it implements 

465 __str__ to return the channel string. 

466 

467 Attributes: 

468 event: The event type to monitor 

469 db: The database number (defaults to 0, the only database in Redis Cluster) 

470 is_pattern: Whether this channel contains wildcards 

471 

472 Examples: 

473 >>> channel = KeyeventChannel(EventType.SET, db=0) 

474 >>> str(channel) 

475 '__keyevent@0__:set' 

476 

477 >>> channel = KeyeventChannel.all_events(db=0) 

478 >>> str(channel) 

479 '__keyevent@0__:*' 

480 

481 >>> # Use with KeyspaceNotifications 

482 >>> notifications = KeyspaceNotifications(redis_client) 

483 >>> notifications.subscribe(channel) 

484 """ 

485 

486 PREFIX: ClassVar[str] = "__keyevent@" 

487 

488 def __init__(self, event: str, db: int = 0): 

489 """ 

490 Create a keyevent notification channel. 

491 

492 Args: 

493 event: The event type to monitor (e.g., EventType.SET or "set") 

494 db: The database number. Defaults to 0 (the only database in Redis Cluster). 

495 """ 

496 self.event = event 

497 self.db = db 

498 self._channel_str = self._build_channel_string() 

499 

500 def _build_channel_string(self) -> str: 

501 return f"{self.PREFIX}{self.db}__:{self.event}" 

502 

503 @property 

504 def is_pattern(self) -> bool: 

505 """Check if this channel contains wildcards and should use psubscribe.""" 

506 return _is_pattern(self.event) 

507 

508 @classmethod 

509 def all_events(cls, db: int = 0) -> "KeyeventChannel": 

510 """ 

511 Create a keyevent pattern for subscribing to all event types. 

512 

513 This is equivalent to KeyeventChannel("*"). 

514 

515 Args: 

516 db: The database number. Defaults to 0 (the only database in Redis Cluster). 

517 

518 Returns: 

519 A KeyeventChannel configured to receive all events. 

520 

521 Examples: 

522 >>> channel = KeyeventChannel.all_events() 

523 >>> str(channel) 

524 '__keyevent@0__:*' 

525 """ 

526 return cls("*", db=db) 

527 

528 def __str__(self) -> str: 

529 return self._channel_str 

530 

531 def __repr__(self) -> str: 

532 return f"KeyeventChannel({self.event!r}, db={self.db})" 

533 

534 def __eq__(self, other: object) -> bool: 

535 if isinstance(other, KeyeventChannel): 

536 return self._channel_str == other._channel_str 

537 if isinstance(other, str): 

538 return self._channel_str == other 

539 return NotImplemented 

540 

541 def __hash__(self) -> int: 

542 return hash(self._channel_str) 

543 

544 

545class ChannelType(Enum): 

546 """ 

547 Enum representing the type of a Redis keyspace notification channel. 

548 

549 Redis provides two types of keyspace notifications: 

550 - KEYSPACE: Notifies about events on specific keys. The channel format is 

551 `__keyspace@{db}__:{key}` and the message data contains the event type. 

552 - KEYEVENT: Notifies about specific event types. The channel format is 

553 `__keyevent@{db}__:{event}` and the message data contains the key name. 

554 

555 Examples: 

556 >>> get_channel_type("__keyspace@0__:mykey") 

557 ChannelType.KEYSPACE 

558 >>> get_channel_type("__keyevent@0__:set") 

559 ChannelType.KEYEVENT 

560 >>> get_channel_type("regular_channel") is None 

561 True 

562 """ 

563 

564 KEYSPACE = "keyspace" 

565 KEYEVENT = "keyevent" 

566 

567 

568def get_channel_type(channel: str | bytes) -> ChannelType | None: 

569 """ 

570 Determine the type of a Redis keyspace notification channel. 

571 

572 Args: 

573 channel: The channel name to check (string or bytes). 

574 

575 Returns: 

576 ChannelType.KEYSPACE if it's a keyspace notification channel, 

577 ChannelType.KEYEVENT if it's a keyevent notification channel, 

578 None if it's not a keyspace notification channel. 

579 

580 Examples: 

581 >>> get_channel_type("__keyspace@0__:mykey") 

582 ChannelType.KEYSPACE 

583 >>> get_channel_type("__keyevent@0__:set") 

584 ChannelType.KEYEVENT 

585 >>> get_channel_type("regular_channel") is None 

586 True 

587 >>> get_channel_type(b"__keyspace@0__:mykey") 

588 ChannelType.KEYSPACE 

589 """ 

590 channel_str = safe_str(channel) 

591 if channel_str.startswith(KeyspaceChannel.PREFIX): 

592 return ChannelType.KEYSPACE 

593 if channel_str.startswith(KeyeventChannel.PREFIX): 

594 return ChannelType.KEYEVENT 

595 return None 

596 

597 

598def _is_pattern( 

599 channel: str | bytes | KeyspaceChannel | KeyeventChannel, 

600) -> bool: 

601 """ 

602 Check if a channel string contains glob-style pattern characters. 

603 

604 Redis uses glob-style patterns for psubscribe: 

605 - * matches any sequence of characters 

606 - ? matches any single character 

607 - [...] matches any character in the brackets 

608 

609 Args: 

610 channel: The channel string to check. Can be a string, bytes, 

611 or a KeyspaceChannel/KeyeventChannel object. 

612 

613 Returns: 

614 True if the channel contains pattern characters, False otherwise. 

615 """ 

616 # Handle Channel objects that have _channel_str attribute 

617 # (KeyspaceChannel, KeyeventChannel) 

618 if hasattr(channel, "_channel_str"): 

619 channel = channel._channel_str 

620 channel = safe_str(channel) 

621 # Check for unescaped glob pattern characters. 

622 # * and ? are always pattern characters. 

623 # [ is only a pattern character when followed by a matching unescaped ], 

624 # forming a bracket expression like [abc] or [a-z]. A lone [ (e.g. in 

625 # a key named "my[key") is treated as a literal by Redis. 

626 i = 0 

627 while i < len(channel): 

628 char = channel[i] 

629 if char == "\\": 

630 # Skip escaped character 

631 i += 2 

632 continue 

633 if char in ("*", "?"): 

634 return True 

635 if char == "[": 

636 # Look for a matching unescaped ] 

637 j = i + 1 

638 while j < len(channel): 

639 if channel[j] == "\\": 

640 j += 2 

641 continue 

642 if channel[j] == "]": 

643 return True 

644 j += 1 

645 # No matching ] found — literal [ 

646 i += 1 

647 return False 

648 

649 

650# ============================================================================= 

651# Abstract Base Class for Keyspace Notifications 

652# ============================================================================= 

653 

654 

655class KeyspaceNotificationsInterface(ABC): 

656 """ 

657 Interface for keyspace notification managers. 

658 

659 This interface provides a consistent API for both standalone (KeyspaceNotifications) 

660 and cluster (ClusterKeyspaceNotifications) implementations, allowing the same 

661 code patterns to work with both standalone and cluster Redis deployments. 

662 """ 

663 

664 @abstractmethod 

665 def subscribe( 

666 self, 

667 *channels: ChannelT, 

668 handler: SyncHandlerT | None = None, 

669 ): 

670 """Subscribe to keyspace notification channels.""" 

671 pass 

672 

673 @abstractmethod 

674 def unsubscribe(self, *channels: ChannelT): 

675 """Unsubscribe from keyspace notification channels.""" 

676 pass 

677 

678 @abstractmethod 

679 def subscribe_keyspace( 

680 self, 

681 key_or_pattern: str, 

682 db: int = 0, 

683 handler: SyncHandlerT | None = None, 

684 ): 

685 """Subscribe to keyspace notifications for specific keys.""" 

686 pass 

687 

688 @abstractmethod 

689 def subscribe_keyevent( 

690 self, 

691 event: str, 

692 db: int = 0, 

693 handler: SyncHandlerT | None = None, 

694 ): 

695 """Subscribe to keyevent notifications for specific event types.""" 

696 pass 

697 

698 @abstractmethod 

699 def get_message( 

700 self, 

701 ignore_subscribe_messages: bool | None = None, 

702 timeout: float = 0.0, 

703 ) -> KeyNotification | None: 

704 """Get the next keyspace notification if one is available.""" 

705 pass 

706 

707 @abstractmethod 

708 def listen(self): 

709 """Listen for keyspace notifications.""" 

710 pass 

711 

712 @abstractmethod 

713 def close(self): 

714 """Close the notification manager and clean up resources.""" 

715 pass 

716 

717 @abstractmethod 

718 def __enter__(self): 

719 pass 

720 

721 @abstractmethod 

722 def __exit__(self, _exc_type, _exc_val, _exc_tb): 

723 pass 

724 

725 @property 

726 @abstractmethod 

727 def subscribed(self) -> bool: 

728 """Check if there are any active subscriptions and not closed.""" 

729 pass 

730 

731 @abstractmethod 

732 def run_in_thread( 

733 self, 

734 poll_timeout: float = 0.0, 

735 daemon: bool = False, 

736 exception_handler: Callable[ 

737 [ 

738 BaseException, 

739 KeyspaceNotificationsInterface, 

740 KeyspaceWorkerThread, 

741 ], 

742 None, 

743 ] 

744 | None = None, 

745 ) -> KeyspaceWorkerThread: 

746 """Start a background thread that polls for notifications.""" 

747 pass 

748 

749 

750class AbstractKeyspaceNotifications(KeyspaceNotificationsInterface): 

751 """ 

752 Abstract base class for keyspace notification managers. 

753 

754 Provides shared implementation for subscribe/unsubscribe logic. 

755 Subclasses must implement: 

756 - _execute_subscribe: Execute the subscribe operation 

757 - _execute_unsubscribe: Execute the unsubscribe operation 

758 - get_message: Get the next notification 

759 - listen: Generator for notifications 

760 - close: Clean up resources 

761 """ 

762 

763 def __init__( 

764 self, 

765 key_prefix: str | bytes | None = None, 

766 ignore_subscribe_messages: bool = True, 

767 ): 

768 """ 

769 Initialize the base keyspace notification manager. 

770 

771 Args: 

772 key_prefix: Optional prefix to filter and strip from keys in notifications 

773 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations 

774 are not returned by get_message/listen 

775 """ 

776 self.key_prefix = key_prefix 

777 self.ignore_subscribe_messages = ignore_subscribe_messages 

778 self._closed = False 

779 

780 def subscribe( 

781 self, 

782 *channels: ChannelT, 

783 handler: SyncHandlerT | None = None, 

784 ): 

785 """ 

786 Subscribe to keyspace notification channels. 

787 

788 Automatically detects whether each channel is a pattern (contains 

789 wildcards like *, ?, [) or an exact channel name and uses the 

790 appropriate Redis subscribe command internally. 

791 

792 Args: 

793 *channels: Channels to subscribe to. Can be strings, KeyspaceChannel, 

794 or KeyeventChannel objects. Patterns are auto-detected. 

795 handler: Optional callback function that receives KeyNotification 

796 objects. If provided, notifications are passed to the handler 

797 instead of being returned by get_message()/listen(). 

798 """ 

799 # Wrap the handler to convert raw messages to KeyNotification objects 

800 wrapped_handler: Callable | None = None 

801 if handler is not None: 

802 # Capture key_prefix in closure for consistent filtering/stripping 

803 key_prefix = self.key_prefix 

804 

805 def _wrap_handler(message): 

806 notification = KeyNotification.from_message( 

807 message, key_prefix=key_prefix 

808 ) 

809 if notification is not None: 

810 handler(notification) 

811 

812 wrapped_handler = _wrap_handler 

813 

814 patterns = {} 

815 exact_channels = {} 

816 

817 for channel in channels: 

818 if hasattr(channel, "_channel_str"): 

819 channel_str = str(channel) 

820 else: 

821 channel_str = safe_str(channel) 

822 if _is_pattern(channel): 

823 patterns[channel_str] = wrapped_handler 

824 else: 

825 exact_channels[channel_str] = wrapped_handler 

826 

827 # Delegate to subclass implementation first. For standalone Redis 

828 # this raises on failure, keeping tracking state clean. For cluster 

829 # implementations the operation is best-effort (partial failures are 

830 # logged, not raised) so tracking state is always updated afterwards. 

831 self._execute_subscribe(patterns, exact_channels) 

832 self._track_subscribe(patterns, exact_channels) 

833 

834 @abstractmethod 

835 def _execute_subscribe( 

836 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

837 ) -> None: 

838 """ 

839 Execute the subscribe operation. 

840 

841 Args: 

842 patterns: Dict mapping pattern strings to handlers (for psubscribe) 

843 exact_channels: Dict mapping channel strings to handlers (for subscribe) 

844 """ 

845 pass 

846 

847 def unsubscribe(self, *channels: ChannelT): 

848 """ 

849 Unsubscribe from keyspace notification channels. 

850 

851 Automatically detects whether each channel is a pattern or exact 

852 channel and uses the appropriate Redis unsubscribe command. 

853 

854 Args: 

855 *channels: Channels to unsubscribe from. 

856 """ 

857 patterns = [] 

858 exact_channels = [] 

859 

860 for channel in channels: 

861 if hasattr(channel, "_channel_str"): 

862 channel_str = str(channel) 

863 else: 

864 channel_str = safe_str(channel) 

865 if _is_pattern(channel): 

866 patterns.append(channel_str) 

867 else: 

868 exact_channels.append(channel_str) 

869 

870 # Delegate to subclass implementation first. For standalone Redis 

871 # this raises on failure, keeping tracking state intact. For cluster 

872 # implementations the operation is best-effort (partial failures are 

873 # logged, not raised) so tracking state is always removed afterwards 

874 # — this is intentional: the user asked to unsubscribe, so 

875 # refresh_subscriptions should not re-subscribe these channels. 

876 self._execute_unsubscribe(patterns, exact_channels) 

877 self._untrack_subscribe(patterns, exact_channels) 

878 

879 @abstractmethod 

880 def _execute_unsubscribe( 

881 self, patterns: list[str], exact_channels: list[str] 

882 ) -> None: 

883 """ 

884 Execute the unsubscribe operation. 

885 

886 Args: 

887 patterns: List of pattern strings to punsubscribe from 

888 exact_channels: List of channel strings to unsubscribe from 

889 """ 

890 pass 

891 

892 def _track_subscribe( 

893 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

894 ) -> None: 

895 """Track newly subscribed patterns/channels. 

896 

897 Override in subclasses that need to maintain their own subscription 

898 registry (e.g. cluster implementations that must re-subscribe 

899 new/failed-over nodes). The default is a no-op because standalone 

900 implementations delegate tracking to the underlying PubSub object. 

901 """ 

902 

903 def _untrack_subscribe( 

904 self, patterns: list[str], exact_channels: list[str] 

905 ) -> None: 

906 """Remove patterns/channels from the subscription registry. 

907 

908 Override in subclasses that maintain their own subscription registry. 

909 The default is a no-op. 

910 """ 

911 

912 def subscribe_keyspace( 

913 self, 

914 key_or_pattern: str, 

915 db: int = 0, 

916 handler: SyncHandlerT | None = None, 

917 ): 

918 """ 

919 Subscribe to keyspace notifications for specific keys. 

920 

921 Args: 

922 key_or_pattern: The key or pattern to monitor. Use '*' for wildcards. 

923 db: The database number (default 0). 

924 handler: Optional callback for notifications. 

925 

926 Example: 

927 >>> ksn.subscribe_keyspace("user:123", db=0) 

928 >>> ksn.subscribe_keyspace("user:*", db=0) 

929 """ 

930 channel = KeyspaceChannel(key_or_pattern, db=db) 

931 self.subscribe(channel, handler=handler) 

932 

933 def subscribe_keyevent( 

934 self, 

935 event: str, 

936 db: int = 0, 

937 handler: SyncHandlerT | None = None, 

938 ): 

939 """ 

940 Subscribe to keyevent notifications for specific event types. 

941 

942 Args: 

943 event: The event type to monitor (e.g., EventType.SET or "set") 

944 db: The database number (default 0). 

945 handler: Optional callback for notifications. 

946 

947 Example: 

948 >>> ksn.subscribe_keyevent(EventType.SET) 

949 >>> ksn.subscribe_keyevent(EventType.EXPIRED, handler=my_handler) 

950 """ 

951 channel = KeyeventChannel(event, db=db) 

952 self.subscribe(channel, handler=handler) 

953 

954 def __enter__(self): 

955 return self 

956 

957 def __exit__(self, _exc_type, _exc_val, _exc_tb): 

958 self.close() 

959 return False 

960 

961 def run_in_thread( 

962 self, 

963 poll_timeout: float = 0.0, 

964 daemon: bool = False, 

965 exception_handler: Callable[ 

966 [ 

967 BaseException, 

968 KeyspaceNotificationsInterface, 

969 KeyspaceWorkerThread, 

970 ], 

971 None, 

972 ] 

973 | None = None, 

974 ) -> KeyspaceWorkerThread: 

975 """ 

976 Start a background thread that polls for notifications and triggers handlers. 

977 

978 This method spawns a thread that continuously calls get_message() to 

979 process incoming notifications. When a notification arrives, any 

980 registered handler for that channel/pattern is invoked automatically. 

981 

982 All subscriptions must have handlers registered before calling this method. 

983 

984 Args: 

985 poll_timeout: Timeout in seconds for get_message() calls. When no message 

986 is available, the thread waits up to this long before checking 

987 again. Default 0.0 (non-blocking). WARNING: the default 

988 causes a CPU spin-loop. It is preferred to pass a positive 

989 value (e.g. 0.1 or 1.0). 

990 daemon: If True, the thread will be a daemon thread and will be 

991 terminated when the main program exits. Default False. 

992 exception_handler: Optional callback invoked when an exception occurs 

993 in the worker thread. Receives (exception, notifications, 

994 thread) as arguments. If None, exceptions are raised. 

995 

996 Returns: 

997 KeyspaceWorkerThread: The started worker thread. Call stop() on it 

998 to stop the thread and close the notifications. 

999 

1000 Raises: 

1001 RedisError: If any subscription doesn't have a handler registered. 

1002 

1003 Example: 

1004 >>> def my_handler(notification): 

1005 ... print(f"Got: {notification.key} - {notification.event_type}") 

1006 >>> 

1007 >>> notifications.subscribe(KeyspaceChannel("user:*"), handler=my_handler) 

1008 >>> thread = notifications.run_in_thread(poll_timeout=0.1, daemon=True) 

1009 >>> # ... handlers are called automatically ... 

1010 >>> thread.stop() 

1011 """ 

1012 self._validate_all_handlers() 

1013 

1014 thread = KeyspaceWorkerThread( 

1015 self, 

1016 poll_timeout, 

1017 daemon=daemon, 

1018 exception_handler=exception_handler, 

1019 ) 

1020 thread.start() 

1021 return thread 

1022 

1023 @abstractmethod 

1024 def _validate_all_handlers(self) -> None: 

1025 """Raise :class:`~redis.RedisError` if any subscription lacks a handler. 

1026 

1027 Subclasses inspect their own subscription state to perform 

1028 the validation. 

1029 """ 

1030 pass 

1031 

1032 

1033class KeyspaceWorkerThread(threading.Thread): 

1034 """ 

1035 Background thread for processing keyspace notifications. 

1036 

1037 This thread continuously polls for notifications and invokes registered 

1038 handlers. It works with both KeyspaceNotifications (standalone) and 

1039 ClusterKeyspaceNotifications. 

1040 

1041 Example: 

1042 >>> thread = notifications.run_in_thread(poll_timeout=0.1) 

1043 >>> # ... handlers are called automatically ... 

1044 >>> thread.stop() 

1045 """ 

1046 

1047 def __init__( 

1048 self, 

1049 notifications: KeyspaceNotificationsInterface, 

1050 poll_timeout: float, 

1051 daemon: bool = False, 

1052 exception_handler: Callable[ 

1053 [ 

1054 BaseException, 

1055 KeyspaceNotificationsInterface, 

1056 KeyspaceWorkerThread, 

1057 ], 

1058 None, 

1059 ] 

1060 | None = None, 

1061 ): 

1062 super().__init__() 

1063 self.daemon = daemon 

1064 self.notifications = notifications 

1065 self.poll_timeout = poll_timeout 

1066 self.exception_handler = exception_handler 

1067 self._running = threading.Event() 

1068 

1069 def run(self) -> None: 

1070 """Main loop that polls for notifications and triggers handlers.""" 

1071 if self._running.is_set(): 

1072 return 

1073 self._running.set() 

1074 notifications = self.notifications 

1075 poll_timeout = self.poll_timeout 

1076 while self._running.is_set(): 

1077 try: 

1078 notifications.get_message( 

1079 ignore_subscribe_messages=True, timeout=poll_timeout 

1080 ) 

1081 except BaseException as e: 

1082 if self.exception_handler is None: 

1083 raise 

1084 self.exception_handler(e, notifications, self) 

1085 notifications.close() 

1086 

1087 def stop(self) -> None: 

1088 """ 

1089 Stop the worker thread. 

1090 

1091 This signals the thread to exit its run loop. The thread will close 

1092 the notifications object before terminating. 

1093 """ 

1094 self._running.clear() 

1095 

1096 

1097# ============================================================================= 

1098# Standalone Keyspace Notification Manager 

1099# ============================================================================= 

1100 

1101 

1102class KeyspaceNotifications(AbstractKeyspaceNotifications): 

1103 """ 

1104 Manages keyspace notification subscriptions for standalone Redis. 

1105 

1106 For standalone Redis, keyspace notifications work with a single PubSub 

1107 connection. This class wraps that connection and provides: 

1108 - Automatic pattern vs exact channel detection 

1109 - KeyNotification parsing with optional key_prefix filtering 

1110 - Convenience methods for keyspace and keyevent subscriptions 

1111 - Context manager and run_in_thread support 

1112 """ 

1113 

1114 def __init__( 

1115 self, 

1116 redis_client: Redis, 

1117 key_prefix: str | bytes | None = None, 

1118 ignore_subscribe_messages: bool = True, 

1119 ): 

1120 """ 

1121 Initialize the standalone keyspace notification manager. 

1122 

1123 Note: Keyspace notifications must be enabled on the Redis server via 

1124 the ``notify-keyspace-events`` configuration option. This is a server-side 

1125 configuration that should be done by your infrastructure/operations team. 

1126 

1127 Args: 

1128 redis_client: A Redis client instance 

1129 key_prefix: Optional prefix to filter and strip from keys in notifications 

1130 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations 

1131 are not returned by get_message/listen 

1132 """ 

1133 super().__init__(key_prefix, ignore_subscribe_messages) 

1134 self.redis = redis_client 

1135 

1136 # Create the PubSub instance with ignore_subscribe_messages=False 

1137 # so that the per-call argument in get_message() can control behavior 

1138 self._pubsub = redis_client.pubsub(ignore_subscribe_messages=False) 

1139 

1140 def _execute_subscribe( 

1141 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

1142 ) -> None: 

1143 """Execute subscribe on the single pubsub connection.""" 

1144 if patterns: 

1145 self._pubsub.psubscribe(**patterns) 

1146 if exact_channels: 

1147 self._pubsub.subscribe(**exact_channels) 

1148 

1149 def _execute_unsubscribe( 

1150 self, patterns: list[str], exact_channels: list[str] 

1151 ) -> None: 

1152 """Execute unsubscribe on the single pubsub connection.""" 

1153 if patterns: 

1154 self._pubsub.punsubscribe(*patterns) 

1155 if exact_channels: 

1156 self._pubsub.unsubscribe(*exact_channels) 

1157 

1158 def get_message( 

1159 self, 

1160 ignore_subscribe_messages: bool | None = None, 

1161 timeout: float = 0.0, 

1162 ) -> KeyNotification | None: 

1163 """ 

1164 Get the next keyspace notification if one is available. 

1165 

1166 Note: If a handler was registered for the channel, pubsub will call 

1167 the handler directly and this method returns None for that message. 

1168 

1169 Args: 

1170 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages. 

1171 Defaults to the value set in __init__ (True). 

1172 timeout: Time to wait for a message. 

1173 

1174 Returns: 

1175 A KeyNotification if a notification is available and no handler 

1176 was registered for the channel, None otherwise. 

1177 """ 

1178 if ignore_subscribe_messages is None: 

1179 ignore_subscribe_messages = self.ignore_subscribe_messages 

1180 

1181 if self._closed: 

1182 return None 

1183 

1184 # Pubsub's get_message will call wrapped handlers directly for channels 

1185 # with registered handlers and return None. For channels without handlers, 

1186 # it returns the raw message which we parse to KeyNotification. 

1187 message = self._pubsub.get_message( 

1188 ignore_subscribe_messages=ignore_subscribe_messages, 

1189 timeout=timeout, 

1190 ) 

1191 

1192 if message is not None: 

1193 return KeyNotification.from_message(message, key_prefix=self.key_prefix) 

1194 

1195 return None 

1196 

1197 def listen(self): 

1198 """ 

1199 Listen for keyspace notifications. 

1200 

1201 This is a generator that yields KeyNotification objects as they arrive. 

1202 It blocks until a notification is received. 

1203 

1204 Yields: 

1205 KeyNotification objects for each keyspace/keyevent notification. 

1206 

1207 Example: 

1208 >>> for notification in ksn.listen(): 

1209 ... print(f"{notification.key}: {notification.event_type}") 

1210 """ 

1211 while self.subscribed: 

1212 notification = self.get_message(timeout=1.0) 

1213 if notification is not None: 

1214 yield notification 

1215 

1216 @property 

1217 def subscribed(self) -> bool: 

1218 """Check if there are any active subscriptions and not closed.""" 

1219 return not self._closed and self._pubsub.subscribed 

1220 

1221 def _validate_all_handlers(self) -> None: 

1222 """Raise if any subscription in the underlying PubSub lacks a handler.""" 

1223 for channel, handler in self._pubsub.channels.items(): 

1224 if handler is None: 

1225 raise RedisError(f"Channel '{channel}' has no handler registered") 

1226 for pattern, handler in self._pubsub.patterns.items(): 

1227 if handler is None: 

1228 raise RedisError(f"Pattern '{pattern}' has no handler registered") 

1229 

1230 def close(self): 

1231 """Close the pubsub connection and clean up resources.""" 

1232 self._closed = True 

1233 try: 

1234 self._pubsub.close() 

1235 except Exception: 

1236 pass 

1237 

1238 

1239# ============================================================================= 

1240# Cluster-Aware Keyspace Notification Manager 

1241# ============================================================================= 

1242 

1243 

1244class ClusterKeyspaceNotifications(AbstractKeyspaceNotifications): 

1245 """ 

1246 Manages keyspace notification subscriptions across all nodes in a Redis Cluster. 

1247 

1248 In Redis Cluster, keyspace notifications are NOT broadcast between nodes. 

1249 Each node only emits notifications for keys it owns. This class automatically 

1250 subscribes to all primary nodes in the cluster and handles topology changes. 

1251 """ 

1252 

1253 def __init__( 

1254 self, 

1255 redis_cluster: RedisCluster, 

1256 key_prefix: str | bytes | None = None, 

1257 ignore_subscribe_messages: bool = True, 

1258 ): 

1259 """ 

1260 Initialize the cluster keyspace notification manager. 

1261 

1262 Note: Keyspace notifications must be enabled on all Redis cluster nodes via 

1263 the ``notify-keyspace-events`` configuration option. This is a server-side 

1264 configuration that should be done by your infrastructure/operations team. 

1265 

1266 Args: 

1267 redis_cluster: A RedisCluster instance 

1268 key_prefix: Optional prefix to filter and strip from keys in notifications 

1269 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations 

1270 are not returned by get_message/listen 

1271 """ 

1272 super().__init__(key_prefix, ignore_subscribe_messages) 

1273 self.cluster = redis_cluster 

1274 

1275 # Canonical subscription registry: pattern/channel -> wrapped handler. 

1276 # In cluster mode there are multiple PubSub objects (one per node), so 

1277 # this is the single source of truth used to (re-)subscribe new or 

1278 # failed-over nodes. 

1279 self._subscribed_patterns: dict[str, Any] = {} 

1280 self._subscribed_channels: dict[str, Any] = {} 

1281 

1282 # Track subscriptions per node 

1283 self._node_pubsubs: dict[str, Any] = {} 

1284 

1285 # Lock for topology refresh operations 

1286 self._refresh_lock = threading.Lock() 

1287 

1288 # Current pubsub index for round-robin polling 

1289 self._poll_index = 0 

1290 

1291 @property 

1292 def subscribed(self) -> bool: 

1293 """Check if there are any active subscriptions and not closed.""" 

1294 return not self._closed and bool( 

1295 self._subscribed_patterns or self._subscribed_channels 

1296 ) 

1297 

1298 def _track_subscribe( 

1299 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

1300 ) -> None: 

1301 """Track newly subscribed patterns/channels in the cluster registry.""" 

1302 if patterns: 

1303 self._subscribed_patterns.update(patterns) 

1304 if exact_channels: 

1305 self._subscribed_channels.update(exact_channels) 

1306 

1307 def _untrack_subscribe( 

1308 self, patterns: list[str], exact_channels: list[str] 

1309 ) -> None: 

1310 """Remove patterns/channels from the cluster registry.""" 

1311 for p in patterns: 

1312 self._subscribed_patterns.pop(p, None) 

1313 for c in exact_channels: 

1314 self._subscribed_channels.pop(c, None) 

1315 

1316 def _validate_all_handlers(self) -> None: 

1317 """Raise if any subscription in the cluster registry lacks a handler.""" 

1318 for channel, handler in self._subscribed_channels.items(): 

1319 if handler is None: 

1320 raise RedisError(f"Channel '{channel}' has no handler registered") 

1321 for pattern, handler in self._subscribed_patterns.items(): 

1322 if handler is None: 

1323 raise RedisError(f"Pattern '{pattern}' has no handler registered") 

1324 

1325 def _get_all_primary_nodes(self): 

1326 """Get all primary nodes in the cluster.""" 

1327 return self.cluster.get_primaries() 

1328 

1329 def _cleanup_node(self, node_name: str) -> None: 

1330 """Remove and close a node's PubSub. 

1331 

1332 Closing the ``PubSub`` disconnects its connection so it is not 

1333 left in a subscribed state inside the connection pool. 

1334 """ 

1335 pubsub = self._node_pubsubs.pop(node_name, None) 

1336 if pubsub: 

1337 try: 

1338 pubsub.close() 

1339 except Exception: 

1340 pass 

1341 

1342 def _ensure_node_pubsub(self, node) -> Any: 

1343 """Get or create a PubSub instance for a node.""" 

1344 if node.name not in self._node_pubsubs: 

1345 redis_conn = self.cluster.get_redis_connection(node) 

1346 # Always create PubSub with ignore_subscribe_messages=False 

1347 # so that the per-call argument in get_message() can control 

1348 # the behavior reliably 

1349 pubsub = redis_conn.pubsub(ignore_subscribe_messages=False) 

1350 self._node_pubsubs[node.name] = pubsub 

1351 return self._node_pubsubs[node.name] 

1352 

1353 def _execute_subscribe( 

1354 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

1355 ) -> None: 

1356 """Execute subscribe on all cluster nodes. 

1357 

1358 Patterns and exact channels are subscribed in a single pass over 

1359 nodes so that a mid-batch node failure cannot create a 

1360 partially-caught-up replacement. If a node fails during this 

1361 call it is removed from ``_node_pubsubs`` and will be fully 

1362 re-subscribed on the next ``refresh_subscriptions`` cycle. 

1363 

1364 If a newly discovered node is encountered (not yet in 

1365 ``_node_pubsubs``), it is also subscribed to all *previously* 

1366 tracked patterns/channels so it doesn't miss notifications for 

1367 subscriptions that were established before this node joined. 

1368 """ 

1369 if not patterns and not exact_channels: 

1370 return 

1371 

1372 failed_nodes: list[str] = [] 

1373 for node in self._get_all_primary_nodes(): 

1374 is_new_node = node.name not in self._node_pubsubs 

1375 pubsub = self._ensure_node_pubsub(node) 

1376 try: 

1377 # If this is a brand-new node, catch it up on existing 

1378 # subscriptions before adding the new channels. 

1379 if is_new_node: 

1380 if self._subscribed_patterns: 

1381 pubsub.psubscribe(**self._subscribed_patterns) 

1382 if self._subscribed_channels: 

1383 pubsub.subscribe(**self._subscribed_channels) 

1384 

1385 if patterns: 

1386 pubsub.psubscribe(**patterns) 

1387 if exact_channels: 

1388 pubsub.subscribe(**exact_channels) 

1389 except Exception: 

1390 # Remove the broken pubsub so refresh_subscriptions can 

1391 # re-create it later. 

1392 self._cleanup_node(node.name) 

1393 failed_nodes.append(node.name) 

1394 

1395 if failed_nodes: 

1396 logger.warning( 

1397 "Failed to subscribe on cluster nodes: %s. " 

1398 "These nodes will be retried on the next refresh cycle.", 

1399 ", ".join(failed_nodes), 

1400 ) 

1401 

1402 def _execute_unsubscribe( 

1403 self, patterns: list[str], exact_channels: list[str] 

1404 ) -> None: 

1405 """Execute unsubscribe on all cluster nodes.""" 

1406 if patterns: 

1407 self._unsubscribe_from_all_nodes(patterns, use_punsubscribe=True) 

1408 if exact_channels: 

1409 self._unsubscribe_from_all_nodes(exact_channels, use_punsubscribe=False) 

1410 

1411 def _unsubscribe_from_all_nodes(self, channels: list[str], use_punsubscribe: bool): 

1412 """Unsubscribe from patterns/channels on all nodes. 

1413 

1414 Best-effort: tries every node so that a single broken connection 

1415 does not prevent the remaining nodes from being unsubscribed. 

1416 Broken pubsubs are cleaned up; the tracking state is still removed 

1417 by the caller, so ``refresh_subscriptions`` will *not* re-subscribe 

1418 these channels on replacement nodes. 

1419 """ 

1420 failed_nodes: list[str] = [] 

1421 for node_name, pubsub in list(self._node_pubsubs.items()): 

1422 try: 

1423 if use_punsubscribe: 

1424 pubsub.punsubscribe(*channels) 

1425 else: 

1426 pubsub.unsubscribe(*channels) 

1427 except Exception: 

1428 self._cleanup_node(node_name) 

1429 failed_nodes.append(node_name) 

1430 

1431 if failed_nodes: 

1432 logger.warning( 

1433 "Failed to unsubscribe on cluster nodes: %s. " 

1434 "These nodes will be re-created on the next refresh cycle.", 

1435 ", ".join(failed_nodes), 

1436 ) 

1437 

1438 def get_message( 

1439 self, 

1440 ignore_subscribe_messages: bool | None = None, 

1441 timeout: float = 0.0, 

1442 ) -> KeyNotification | None: 

1443 """ 

1444 Get the next keyspace notification if one is available. 

1445 

1446 This method polls all node pubsubs in round-robin fashion until 

1447 a message is received or the timeout expires. 

1448 If a connection error occurs, subscriptions are automatically refreshed. 

1449 

1450 Args: 

1451 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages. 

1452 Defaults to the value set in __init__ (True). 

1453 timeout: Total time to wait for a message (distributed across all nodes) 

1454 

1455 Returns: 

1456 A KeyNotification if a notification is available, None otherwise. 

1457 """ 

1458 if self._closed: 

1459 return None 

1460 

1461 total_nodes = len(self._node_pubsubs) 

1462 if total_nodes == 0: 

1463 # Sleep for the requested timeout so callers that loop 

1464 # (run_in_thread, listen) don't spin the CPU when all node 

1465 # connections have been cleaned up. 

1466 if timeout > 0: 

1467 time.sleep(timeout) 

1468 return None 

1469 

1470 # Use instance default if not specified 

1471 if ignore_subscribe_messages is None: 

1472 ignore_subscribe_messages = self.ignore_subscribe_messages 

1473 

1474 # Handle timeout=0 as a single non-blocking poll over all pubsubs 

1475 # This matches the expected semantics of PubSub.get_message(timeout=0) 

1476 if timeout == 0.0: 

1477 return self._poll_all_nodes_once(ignore_subscribe_messages) 

1478 

1479 # Calculate per-node timeout for each poll 

1480 # Use a small timeout per node to allow round-robin polling 

1481 per_node_timeout = min(0.1, timeout / max(total_nodes, 1)) 

1482 

1483 start_time = time.monotonic() 

1484 end_time = start_time + timeout 

1485 

1486 while True: 

1487 # Check if we've exceeded the total timeout 

1488 if time.monotonic() >= end_time: 

1489 return None 

1490 

1491 pubsubs = list(self._node_pubsubs.values()) 

1492 if not pubsubs: 

1493 return None 

1494 

1495 # Round-robin polling 

1496 self._poll_index = self._poll_index % len(pubsubs) 

1497 pubsub = pubsubs[self._poll_index] 

1498 self._poll_index += 1 

1499 

1500 try: 

1501 message = pubsub.get_message( 

1502 ignore_subscribe_messages=ignore_subscribe_messages, 

1503 timeout=per_node_timeout, 

1504 ) 

1505 except (ConnectionError, TimeoutError, RedisError): 

1506 # Connection error - refresh subscriptions and continue 

1507 self._refresh_subscriptions_on_error() 

1508 continue 

1509 

1510 if message is not None: 

1511 # Note: If a handler was registered, PubSub already invoked it 

1512 # and returned None, so we only reach here for handler-less subscriptions 

1513 notification = KeyNotification.from_message( 

1514 message, key_prefix=self.key_prefix 

1515 ) 

1516 if notification is not None: 

1517 return notification 

1518 # If not a keyspace notification, continue checking other nodes 

1519 

1520 def _poll_all_nodes_once( 

1521 self, ignore_subscribe_messages: bool 

1522 ) -> KeyNotification | None: 

1523 """ 

1524 Perform a single non-blocking poll over all node pubsubs. 

1525 

1526 This is used when timeout=0 to match the expected semantics of 

1527 PubSub.get_message(timeout=0) - a non-blocking check for messages. 

1528 

1529 Returns: 

1530 A KeyNotification if one is available, None otherwise. 

1531 """ 

1532 had_error = False 

1533 for pubsub in list(self._node_pubsubs.values()): 

1534 try: 

1535 message = pubsub.get_message( 

1536 ignore_subscribe_messages=ignore_subscribe_messages, 

1537 timeout=0.0, 

1538 ) 

1539 except (ConnectionError, TimeoutError, RedisError): 

1540 # Record the error but continue polling remaining healthy 

1541 # nodes so that already-buffered notifications are not lost. 

1542 had_error = True 

1543 continue 

1544 

1545 if message is not None: 

1546 # Note: If a handler was registered, PubSub already invoked it 

1547 # and returned None, so we only reach here for handler-less subscriptions 

1548 notification = KeyNotification.from_message( 

1549 message, key_prefix=self.key_prefix 

1550 ) 

1551 if notification is not None: 

1552 # Refresh before returning if any node had an error, 

1553 # so the next poll cycle has fresh state. 

1554 if had_error: 

1555 self._refresh_subscriptions_on_error() 

1556 return notification 

1557 

1558 # Refresh after polling all nodes if any had errors 

1559 if had_error: 

1560 self._refresh_subscriptions_on_error() 

1561 return None 

1562 

1563 def listen(self): 

1564 """ 

1565 Listen for keyspace notifications from all cluster nodes. 

1566 

1567 This is a generator that yields KeyNotification objects as they arrive. 

1568 It blocks until a notification is received. 

1569 

1570 Yields: 

1571 KeyNotification objects for each keyspace/keyevent notification. 

1572 

1573 Example: 

1574 >>> for notification in ksn.listen(): 

1575 ... print(f"{notification.key}: {notification.event_type}") 

1576 """ 

1577 while self.subscribed: 

1578 notification = self.get_message(timeout=1.0) 

1579 if notification is not None: 

1580 yield notification 

1581 

1582 def _refresh_subscriptions_on_error(self): 

1583 """ 

1584 Refresh subscriptions after a connection error. 

1585 

1586 This is called automatically when a connection error occurs during 

1587 get_message(). It checks if nodes changed before refreshing. 

1588 """ 

1589 self._poll_index = 0 # Reset round-robin index 

1590 

1591 try: 

1592 self.refresh_subscriptions() 

1593 except Exception: 

1594 logger.warning( 

1595 "Failed to refresh cluster subscriptions, will retry on next error", 

1596 exc_info=True, 

1597 ) 

1598 

1599 def _is_pubsub_connected(self, pubsub) -> bool: 

1600 """Check if a pubsub connection is still alive.""" 

1601 try: 

1602 conn = pubsub.connection 

1603 if conn is None: 

1604 return False 

1605 return conn.is_connected 

1606 except Exception: 

1607 return False 

1608 

1609 def refresh_subscriptions(self): 

1610 """ 

1611 Refresh subscriptions after a topology change. 

1612 

1613 This method is called automatically when topology changes are detected 

1614 or when connection errors occur. You can also call it manually if needed. 

1615 

1616 This method: 

1617 1. Discovers any new primary nodes and subscribes them 

1618 2. Removes pubsubs for nodes that are no longer primaries 

1619 3. Re-creates broken pubsub connections for existing nodes 

1620 """ 

1621 with self._refresh_lock: 

1622 current_primaries = { 

1623 node.name: node for node in self._get_all_primary_nodes() 

1624 } 

1625 

1626 # Remove pubsubs for nodes that are no longer primaries 

1627 removed_nodes = set(self._node_pubsubs.keys()) - set( 

1628 current_primaries.keys() 

1629 ) 

1630 for node_name in removed_nodes: 

1631 self._cleanup_node(node_name) 

1632 

1633 # Detect broken connections for existing nodes and remove them 

1634 # so they get re-created below 

1635 existing_nodes = set(self._node_pubsubs.keys()) & set( 

1636 current_primaries.keys() 

1637 ) 

1638 for node_name in existing_nodes: 

1639 pubsub = self._node_pubsubs.get(node_name) 

1640 if pubsub and not self._is_pubsub_connected(pubsub): 

1641 # Connection is broken, remove it so it gets re-created 

1642 self._cleanup_node(node_name) 

1643 

1644 # Subscribe new nodes (and nodes with broken connections) to existing 

1645 # patterns/channels 

1646 new_nodes = set(current_primaries.keys()) - set(self._node_pubsubs.keys()) 

1647 failed_nodes: list[str] = [] 

1648 for node_name in new_nodes: 

1649 node = current_primaries[node_name] 

1650 pubsub = self._ensure_node_pubsub(node) 

1651 

1652 try: 

1653 if self._subscribed_patterns: 

1654 pubsub.psubscribe(**self._subscribed_patterns) 

1655 if self._subscribed_channels: 

1656 pubsub.subscribe(**self._subscribed_channels) 

1657 except Exception: 

1658 # Subscription failed - remove from dict so retry is possible 

1659 self._cleanup_node(node_name) 

1660 failed_nodes.append(node_name) 

1661 

1662 # Raise after attempting all nodes so we don't skip any 

1663 if failed_nodes: 

1664 raise ConnectionError( 

1665 f"Failed to subscribe to cluster nodes: {', '.join(failed_nodes)}" 

1666 ) 

1667 

1668 def close(self): 

1669 """Close all pubsub connections and clean up resources.""" 

1670 self._closed = True 

1671 for node_name in list(self._node_pubsubs.keys()): 

1672 self._cleanup_node(node_name) 

1673 self._subscribed_patterns.clear() 

1674 self._subscribed_channels.clear()