Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/asyncio/keyspace_notifications.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

358 statements  

1""" 

2Async Redis Keyspace Notifications support for redis-py. 

3 

4This module provides async utilities for subscribing to and parsing Redis 

5keyspace notifications. 

6 

7Standalone Redis Example: 

8 >>> from redis.asyncio import Redis 

9 >>> from redis.asyncio.keyspace_notifications import ( 

10 ... AsyncKeyspaceNotifications, 

11 ... ) 

12 >>> from redis.keyspace_notifications import KeyspaceChannel, EventType 

13 >>> 

14 >>> async def main(): 

15 ... async with Redis() as r: 

16 ... async with AsyncKeyspaceNotifications(r) as ksn: 

17 ... channel = KeyspaceChannel("user:*") 

18 ... await ksn.subscribe(channel) 

19 ... async for notification in ksn.listen(): 

20 ... print(f"Key: {notification.key}, Event: {notification.event_type}") 

21 

22Redis Cluster Example: 

23 >>> from redis.asyncio.cluster import RedisCluster 

24 >>> from redis.asyncio.keyspace_notifications import ( 

25 ... AsyncClusterKeyspaceNotifications, 

26 ... ) 

27 >>> from redis.keyspace_notifications import KeyspaceChannel, EventType 

28 >>> 

29 >>> async def main(): 

30 ... async with RedisCluster(host="localhost", port=7000) as rc: 

31 ... async with AsyncClusterKeyspaceNotifications(rc) as ksn: 

32 ... channel = KeyspaceChannel("user:*") 

33 ... await ksn.subscribe(channel) 

34 ... async for notification in ksn.listen(): 

35 ... print(f"Key: {notification.key}, Event: {notification.event_type}") 

36""" 

37 

38from __future__ import annotations 

39 

40import asyncio 

41import inspect 

42import logging 

43import time 

44from abc import ABC, abstractmethod 

45from collections.abc import AsyncIterator, Awaitable, Callable 

46from typing import Any 

47 

48from redis._parsers.encoders import Encoder 

49from redis.asyncio.client import PubSub, Redis 

50from redis.asyncio.cluster import ClusterNode, RedisCluster 

51from redis.exceptions import ( 

52 ConnectionError, 

53 RedisError, 

54 TimeoutError, 

55) 

56from redis.keyspace_notifications import ( 

57 ChannelT, 

58 KeyeventChannel, 

59 KeyNotification, 

60 KeyspaceChannel, 

61 _is_pattern, 

62) 

63from redis.utils import safe_str 

64 

65logger = logging.getLogger(__name__) 

66 

67 

68class _ClusterNodePoolAdapter: 

69 """Thin adapter exposing the :class:`ConnectionPool` interface that 

70 :class:`PubSub` requires, backed by a :class:`ClusterNode`'s own 

71 connection pool. 

72 

73 Connections are acquired from the node via 

74 :meth:`ClusterNode.acquire_connection` and returned via 

75 :meth:`ClusterNode.release`. :meth:`PubSub.aclose` already 

76 disconnects the connection *before* calling :meth:`release`, so the 

77 connection is returned to the node's free-queue in a disconnected 

78 state — guaranteeing that a subscribed socket is never silently 

79 reused for regular commands. 

80 """ 

81 

82 def __init__(self, node: ClusterNode) -> None: 

83 self._node = node 

84 self.connection_kwargs = node.connection_kwargs 

85 

86 # -- methods used by PubSub ------------------------------------------------ 

87 

88 def get_encoder(self) -> Encoder: 

89 return self._node.get_encoder() 

90 

91 async def get_connection( 

92 self, command_name: str | None = None, *keys: Any, **options: Any 

93 ) -> Any: 

94 connection = self._node.acquire_connection() 

95 await connection.connect() 

96 return connection 

97 

98 async def release(self, connection: Any) -> None: 

99 # PubSub.aclose() disconnects the connection before calling 

100 # release(), so it is safe to put it back in the node's free 

101 # queue – it will reconnect lazily on next use. 

102 self._node.release(connection) 

103 

104 

105# Type alias for handlers that can be sync or async 

106AsyncHandlerT = Callable[[KeyNotification], None | Awaitable[None]] 

107 

108 

109# ============================================================================= 

110# Async Interface for Keyspace Notifications 

111# ============================================================================= 

112 

113 

114class AsyncKeyspaceNotificationsInterface(ABC): 

115 """ 

116 Async interface for keyspace notification managers. 

117 

118 This interface provides a consistent async API for both standalone 

119 (AsyncKeyspaceNotifications) and cluster (AsyncClusterKeyspaceNotifications) 

120 implementations. 

121 """ 

122 

123 @abstractmethod 

124 async def subscribe( 

125 self, 

126 *channels: ChannelT, 

127 handler: AsyncHandlerT | None = None, 

128 ): 

129 """Subscribe to keyspace notification channels.""" 

130 pass 

131 

132 @abstractmethod 

133 async def unsubscribe(self, *channels: ChannelT): 

134 """Unsubscribe from keyspace notification channels.""" 

135 pass 

136 

137 @abstractmethod 

138 async def subscribe_keyspace( 

139 self, 

140 key_or_pattern: str, 

141 db: int = 0, 

142 handler: AsyncHandlerT | None = None, 

143 ): 

144 """Subscribe to keyspace notifications for specific keys.""" 

145 pass 

146 

147 @abstractmethod 

148 async def subscribe_keyevent( 

149 self, 

150 event: str, 

151 db: int = 0, 

152 handler: AsyncHandlerT | None = None, 

153 ): 

154 """Subscribe to keyevent notifications for specific event types.""" 

155 pass 

156 

157 @abstractmethod 

158 async def get_message( 

159 self, 

160 ignore_subscribe_messages: bool | None = None, 

161 timeout: float = 0.0, 

162 ) -> KeyNotification | None: 

163 """Get the next keyspace notification if one is available.""" 

164 pass 

165 

166 @abstractmethod 

167 def listen(self) -> AsyncIterator[KeyNotification]: 

168 """Listen for keyspace notifications.""" 

169 pass 

170 

171 @abstractmethod 

172 async def aclose(self): 

173 """Close the notification manager and clean up resources.""" 

174 pass 

175 

176 @abstractmethod 

177 async def __aenter__(self): 

178 pass 

179 

180 @abstractmethod 

181 async def __aexit__(self, _exc_type, _exc_val, _exc_tb): 

182 pass 

183 

184 @property 

185 @abstractmethod 

186 def subscribed(self) -> bool: 

187 """Check if there are any active subscriptions and not closed.""" 

188 pass 

189 

190 @abstractmethod 

191 async def run( 

192 self, 

193 poll_timeout: float = 1.0, 

194 exception_handler: Callable[ 

195 [BaseException, AsyncKeyspaceNotificationsInterface], 

196 None | Awaitable[None], 

197 ] 

198 | None = None, 

199 ) -> None: 

200 """ 

201 Run the notification loop as a coroutine. 

202 

203 This is the async equivalent of run_in_thread() for sync notifications. 

204 Use asyncio.create_task() to run in the background. 

205 

206 The exception_handler can be either a sync or async function. 

207 """ 

208 pass 

209 

210 

211# ============================================================================= 

212# Abstract Base Class for Async Keyspace Notifications 

213# ============================================================================= 

214 

215 

216class AbstractAsyncKeyspaceNotifications(AsyncKeyspaceNotificationsInterface): 

217 """ 

218 Abstract base class for async keyspace notification managers. 

219 

220 Provides shared implementation for subscribe/unsubscribe logic. 

221 Subclasses must implement: 

222 - _execute_subscribe: Execute the subscribe operation 

223 - _execute_unsubscribe: Execute the unsubscribe operation 

224 - get_message: Get the next notification 

225 - listen: Async generator for notifications 

226 - aclose: Clean up resources 

227 """ 

228 

229 def __init__( 

230 self, 

231 key_prefix: str | bytes | None = None, 

232 ignore_subscribe_messages: bool = True, 

233 ): 

234 """ 

235 Initialize the base async keyspace notification manager. 

236 

237 Args: 

238 key_prefix: Optional prefix to filter and strip from keys in notifications 

239 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations 

240 are not returned by get_message/listen 

241 """ 

242 self.key_prefix = key_prefix 

243 self.ignore_subscribe_messages = ignore_subscribe_messages 

244 self._closed = False 

245 

246 async def subscribe( 

247 self, 

248 *channels: ChannelT, 

249 handler: AsyncHandlerT | None = None, 

250 ): 

251 """ 

252 Subscribe to keyspace notification channels. 

253 

254 Automatically detects whether each channel is a pattern (contains 

255 wildcards like *, ?, [) or an exact channel name and uses the 

256 appropriate Redis subscribe command internally. 

257 

258 The handler can be either a sync or async function. Note that a 

259 **sync** handler will be called directly on the event loop thread, 

260 so it must not perform blocking I/O or long-running computation — 

261 prefer an ``async`` handler whenever possible. 

262 """ 

263 # Wrap the handler to convert raw messages to KeyNotification objects 

264 wrapped_handler: Callable | None = None 

265 if handler is not None: 

266 key_prefix = self.key_prefix 

267 is_async_handler = inspect.iscoroutinefunction(handler) 

268 

269 if is_async_handler: 

270 # We've verified handler is async, so the result is awaitable 

271 async_handler = handler 

272 

273 async def _async_wrap_handler(message): 

274 notification = KeyNotification.from_message( 

275 message, key_prefix=key_prefix 

276 ) 

277 if notification is not None: 

278 await async_handler(notification) 

279 

280 wrapped_handler = _async_wrap_handler 

281 else: 

282 

283 def _sync_wrap_handler(message): 

284 notification = KeyNotification.from_message( 

285 message, key_prefix=key_prefix 

286 ) 

287 if notification is not None: 

288 handler(notification) 

289 

290 wrapped_handler = _sync_wrap_handler 

291 

292 patterns = {} 

293 exact_channels = {} 

294 

295 for channel in channels: 

296 if hasattr(channel, "_channel_str"): 

297 channel_str = str(channel) 

298 else: 

299 channel_str = safe_str(channel) 

300 if _is_pattern(channel): 

301 patterns[channel_str] = wrapped_handler 

302 else: 

303 exact_channels[channel_str] = wrapped_handler 

304 

305 # Delegate to subclass implementation first. For standalone Redis 

306 # this raises on failure, keeping tracking state clean. For cluster 

307 # implementations the operation is best-effort (partial failures are 

308 # logged, not raised) so tracking state is always updated afterwards. 

309 await self._execute_subscribe(patterns, exact_channels) 

310 self._track_subscribe(patterns, exact_channels) 

311 

312 @abstractmethod 

313 async def _execute_subscribe( 

314 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

315 ) -> None: 

316 """Execute the subscribe operation.""" 

317 pass 

318 

319 async def unsubscribe(self, *channels: ChannelT): 

320 """Unsubscribe from keyspace notification channels.""" 

321 patterns = [] 

322 exact_channels = [] 

323 

324 for channel in channels: 

325 if hasattr(channel, "_channel_str"): 

326 channel_str = str(channel) 

327 else: 

328 channel_str = safe_str(channel) 

329 if _is_pattern(channel): 

330 patterns.append(channel_str) 

331 else: 

332 exact_channels.append(channel_str) 

333 

334 # Delegate to subclass implementation first. For standalone Redis 

335 # this raises on failure, keeping tracking state intact. For cluster 

336 # implementations the operation is best-effort (partial failures are 

337 # logged, not raised) so tracking state is always removed afterwards 

338 # — this is intentional: the user asked to unsubscribe, so 

339 # refresh_subscriptions should not re-subscribe these channels. 

340 await self._execute_unsubscribe(patterns, exact_channels) 

341 self._untrack_subscribe(patterns, exact_channels) 

342 

343 @abstractmethod 

344 async def _execute_unsubscribe( 

345 self, patterns: list[str], exact_channels: list[str] 

346 ) -> None: 

347 """Execute the unsubscribe operation.""" 

348 pass 

349 

350 def _track_subscribe( 

351 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

352 ) -> None: 

353 """Track newly subscribed patterns/channels. 

354 

355 Override in subclasses that need to maintain their own subscription 

356 registry (e.g. cluster implementations that must re-subscribe 

357 new/failed-over nodes). The default is a no-op because standalone 

358 implementations delegate tracking to the underlying PubSub object. 

359 """ 

360 

361 def _untrack_subscribe( 

362 self, patterns: list[str], exact_channels: list[str] 

363 ) -> None: 

364 """Remove patterns/channels from the subscription registry. 

365 

366 Override in subclasses that maintain their own subscription registry. 

367 The default is a no-op. 

368 """ 

369 

370 async def subscribe_keyspace( 

371 self, 

372 key_or_pattern: str, 

373 db: int = 0, 

374 handler: AsyncHandlerT | None = None, 

375 ): 

376 """Subscribe to keyspace notifications for specific keys.""" 

377 channel = KeyspaceChannel(key_or_pattern, db=db) 

378 await self.subscribe(channel, handler=handler) 

379 

380 async def subscribe_keyevent( 

381 self, 

382 event: str, 

383 db: int = 0, 

384 handler: AsyncHandlerT | None = None, 

385 ): 

386 """Subscribe to keyevent notifications for specific event types.""" 

387 channel = KeyeventChannel(event, db=db) 

388 await self.subscribe(channel, handler=handler) 

389 

390 async def __aenter__(self): 

391 return self 

392 

393 async def __aexit__(self, _exc_type, _exc_val, _exc_tb): 

394 await self.aclose() 

395 return False 

396 

397 async def run( 

398 self, 

399 poll_timeout: float = 1.0, 

400 exception_handler: Callable[ 

401 [BaseException, AsyncKeyspaceNotificationsInterface], 

402 None | Awaitable[None], 

403 ] 

404 | None = None, 

405 ) -> None: 

406 """ 

407 Run the notification loop as a coroutine. 

408 

409 This continuously polls for notifications and triggers handlers. 

410 Use asyncio.create_task() to run in the background. 

411 

412 Args: 

413 poll_timeout: Timeout in seconds for each get_message call. 

414 exception_handler: Optional callback for handling exceptions. 

415 Can be sync or async. 

416 """ 

417 while self.subscribed: 

418 try: 

419 await self.get_message(timeout=poll_timeout) 

420 except asyncio.CancelledError: 

421 raise 

422 except BaseException as e: 

423 if exception_handler is not None: 

424 result = exception_handler(e, self) 

425 if inspect.isawaitable(result): 

426 await result 

427 else: 

428 raise 

429 

430 

431# ============================================================================= 

432# Standalone Async Keyspace Notification Manager 

433# ============================================================================= 

434 

435 

436class AsyncKeyspaceNotifications(AbstractAsyncKeyspaceNotifications): 

437 """ 

438 Manages keyspace notification subscriptions for standalone async Redis. 

439 

440 For standalone Redis, keyspace notifications work with a single PubSub 

441 connection. This class wraps that connection and provides: 

442 - Automatic pattern vs exact channel detection 

443 - KeyNotification parsing with optional key_prefix filtering 

444 - Convenience methods for keyspace and keyevent subscriptions 

445 - Context manager and run() coroutine support 

446 """ 

447 

448 def __init__( 

449 self, 

450 redis_client: Redis, 

451 key_prefix: str | bytes | None = None, 

452 ignore_subscribe_messages: bool = True, 

453 ): 

454 """ 

455 Initialize the standalone async keyspace notification manager. 

456 

457 Note: Keyspace notifications must be enabled on the Redis server via 

458 the ``notify-keyspace-events`` configuration option. 

459 

460 Args: 

461 redis_client: An async Redis client instance 

462 key_prefix: Optional prefix to filter and strip from keys in notifications 

463 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations 

464 are not returned by get_message/listen 

465 """ 

466 super().__init__(key_prefix, ignore_subscribe_messages) 

467 self.redis = redis_client 

468 # Create PubSub with ignore_subscribe_messages=False so per-call arg works 

469 self._pubsub: PubSub = redis_client.pubsub(ignore_subscribe_messages=False) 

470 

471 async def _execute_subscribe( 

472 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

473 ) -> None: 

474 """Execute subscribe on the single pubsub connection.""" 

475 if patterns: 

476 await self._pubsub.psubscribe(**patterns) 

477 if exact_channels: 

478 await self._pubsub.subscribe(**exact_channels) 

479 

480 async def _execute_unsubscribe( 

481 self, patterns: list[str], exact_channels: list[str] 

482 ) -> None: 

483 """Execute unsubscribe on the single pubsub connection.""" 

484 if patterns: 

485 await self._pubsub.punsubscribe(*patterns) 

486 if exact_channels: 

487 await self._pubsub.unsubscribe(*exact_channels) 

488 

489 async def get_message( 

490 self, 

491 ignore_subscribe_messages: bool | None = None, 

492 timeout: float = 0.0, 

493 ) -> KeyNotification | None: 

494 """ 

495 Get the next keyspace notification if one is available. 

496 

497 Args: 

498 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages. 

499 Defaults to the value set in __init__ (True). 

500 timeout: Time to wait for a message. 

501 

502 Returns: 

503 A KeyNotification if a notification is available and no handler 

504 was registered for the channel, None otherwise. 

505 """ 

506 if ignore_subscribe_messages is None: 

507 ignore_subscribe_messages = self.ignore_subscribe_messages 

508 

509 if self._closed: 

510 return None 

511 

512 message = await self._pubsub.get_message( 

513 ignore_subscribe_messages=ignore_subscribe_messages, 

514 timeout=timeout, 

515 ) 

516 

517 if message is not None: 

518 return KeyNotification.from_message(message, key_prefix=self.key_prefix) 

519 

520 return None 

521 

522 async def listen(self) -> AsyncIterator[KeyNotification]: 

523 """ 

524 Listen for keyspace notifications. 

525 

526 This is an async generator that yields KeyNotification objects as they arrive. 

527 

528 Yields: 

529 KeyNotification objects for each keyspace/keyevent notification. 

530 

531 Example: 

532 >>> async for notification in ksn.listen(): 

533 ... print(f"{notification.key}: {notification.event_type}") 

534 """ 

535 while self.subscribed: 

536 notification = await self.get_message(timeout=1.0) 

537 if notification is not None: 

538 yield notification 

539 

540 @property 

541 def subscribed(self) -> bool: 

542 """Check if there are any active subscriptions and not closed.""" 

543 return not self._closed and self._pubsub.subscribed 

544 

545 async def aclose(self): 

546 """Close the pubsub connection and clean up resources.""" 

547 self._closed = True 

548 try: 

549 await self._pubsub.aclose() 

550 except Exception: 

551 pass 

552 

553 

554# ============================================================================= 

555# Cluster-Aware Async Keyspace Notification Manager 

556# ============================================================================= 

557 

558 

559class AsyncClusterKeyspaceNotifications(AbstractAsyncKeyspaceNotifications): 

560 """ 

561 Manages keyspace notification subscriptions across all nodes in an async Redis Cluster. 

562 

563 In Redis Cluster, keyspace notifications are NOT broadcast between nodes. 

564 Each node only emits notifications for keys it owns. This class automatically 

565 subscribes to all primary nodes in the cluster and handles topology changes. 

566 """ 

567 

568 def __init__( 

569 self, 

570 redis_cluster: RedisCluster, 

571 key_prefix: str | bytes | None = None, 

572 ignore_subscribe_messages: bool = True, 

573 ): 

574 """ 

575 Initialize the async cluster keyspace notification manager. 

576 

577 Note: Keyspace notifications must be enabled on all Redis cluster nodes via 

578 the ``notify-keyspace-events`` configuration option. 

579 

580 Args: 

581 redis_cluster: An async RedisCluster instance 

582 key_prefix: Optional prefix to filter and strip from keys in notifications 

583 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations 

584 are not returned by get_message/listen 

585 """ 

586 super().__init__(key_prefix, ignore_subscribe_messages) 

587 self.cluster = redis_cluster 

588 

589 # Canonical subscription registry: pattern/channel -> wrapped handler. 

590 # In cluster mode there are multiple PubSub objects (one per node), so 

591 # this is the single source of truth used to (re-)subscribe new or 

592 # failed-over nodes. 

593 self._subscribed_patterns: dict[str, Any] = {} 

594 self._subscribed_channels: dict[str, Any] = {} 

595 

596 # Track subscriptions per node 

597 self._node_pubsubs: dict[str, PubSub] = {} 

598 

599 # Lock for topology refresh operations 

600 self._refresh_lock = asyncio.Lock() 

601 

602 # Current pubsub index for round-robin polling 

603 self._poll_index = 0 

604 

605 @property 

606 def subscribed(self) -> bool: 

607 """Check if there are any active subscriptions and not closed.""" 

608 return not self._closed and bool( 

609 self._subscribed_patterns or self._subscribed_channels 

610 ) 

611 

612 def _track_subscribe( 

613 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

614 ) -> None: 

615 """Track newly subscribed patterns/channels in the cluster registry.""" 

616 if patterns: 

617 self._subscribed_patterns.update(patterns) 

618 if exact_channels: 

619 self._subscribed_channels.update(exact_channels) 

620 

621 def _untrack_subscribe( 

622 self, patterns: list[str], exact_channels: list[str] 

623 ) -> None: 

624 """Remove patterns/channels from the cluster registry.""" 

625 for p in patterns: 

626 self._subscribed_patterns.pop(p, None) 

627 for c in exact_channels: 

628 self._subscribed_channels.pop(c, None) 

629 

630 def _get_all_primary_nodes(self) -> list[ClusterNode]: 

631 """Get all primary nodes in the cluster.""" 

632 return self.cluster.get_primaries() 

633 

634 async def _ensure_node_pubsub(self, node: ClusterNode) -> PubSub: 

635 """Get or create a PubSub instance for a node. 

636 

637 Uses a :class:`_ClusterNodePoolAdapter` to borrow a connection 

638 from the node's existing pool. When the ``PubSub`` is closed 

639 the connection is disconnected and returned to the node, 

640 ensuring no subscribed socket is left in the free queue. 

641 """ 

642 if node.name not in self._node_pubsubs: 

643 pool_adapter = _ClusterNodePoolAdapter(node) 

644 pubsub = PubSub( 

645 connection_pool=pool_adapter, # type: ignore[arg-type] 

646 ignore_subscribe_messages=False, 

647 ) 

648 self._node_pubsubs[node.name] = pubsub 

649 return self._node_pubsubs[node.name] 

650 

651 async def _cleanup_node(self, node_name: str) -> None: 

652 """Remove and close a node's PubSub. 

653 

654 ``PubSub.aclose()`` disconnects the connection and releases it 

655 back to the underlying :class:`ClusterNode` via the adapter. 

656 """ 

657 pubsub = self._node_pubsubs.pop(node_name, None) 

658 if pubsub: 

659 try: 

660 await pubsub.aclose() 

661 except Exception: 

662 pass 

663 

664 async def _execute_subscribe( 

665 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

666 ) -> None: 

667 """Execute subscribe on all cluster nodes. 

668 

669 Patterns and exact channels are subscribed in a single pass over 

670 nodes so that a mid-batch node failure cannot create a 

671 partially-caught-up replacement. If a node fails during this 

672 call it is removed from ``_node_pubsubs`` and will be fully 

673 re-subscribed on the next ``refresh_subscriptions`` cycle. 

674 

675 If a newly discovered node is encountered (not yet in 

676 ``_node_pubsubs``), it is also subscribed to all *previously* 

677 tracked patterns/channels so it doesn't miss notifications for 

678 subscriptions that were established before this node joined. 

679 """ 

680 if not patterns and not exact_channels: 

681 return 

682 

683 failed_nodes: list[str] = [] 

684 for node in self._get_all_primary_nodes(): 

685 is_new_node = node.name not in self._node_pubsubs 

686 pubsub = await self._ensure_node_pubsub(node) 

687 try: 

688 # If this is a brand-new node, catch it up on existing 

689 # subscriptions before adding the new channels. 

690 if is_new_node: 

691 if self._subscribed_patterns: 

692 await pubsub.psubscribe(**self._subscribed_patterns) 

693 if self._subscribed_channels: 

694 await pubsub.subscribe(**self._subscribed_channels) 

695 

696 if patterns: 

697 await pubsub.psubscribe(**patterns) 

698 if exact_channels: 

699 await pubsub.subscribe(**exact_channels) 

700 except Exception: 

701 # Remove the broken pubsub and its connection pool 

702 # so refresh_subscriptions can re-create both later. 

703 await self._cleanup_node(node.name) 

704 failed_nodes.append(node.name) 

705 

706 if failed_nodes: 

707 logger.warning( 

708 "Failed to subscribe on cluster nodes: %s. " 

709 "These nodes will be retried on the next refresh cycle.", 

710 ", ".join(failed_nodes), 

711 ) 

712 

713 async def _execute_unsubscribe( 

714 self, patterns: list[str], exact_channels: list[str] 

715 ) -> None: 

716 """Execute unsubscribe on all cluster nodes.""" 

717 if patterns: 

718 await self._unsubscribe_from_all_nodes(patterns, use_punsubscribe=True) 

719 if exact_channels: 

720 await self._unsubscribe_from_all_nodes( 

721 exact_channels, use_punsubscribe=False 

722 ) 

723 

724 async def _unsubscribe_from_all_nodes( 

725 self, channels: list[str], use_punsubscribe: bool 

726 ): 

727 """Unsubscribe from patterns/channels on all nodes. 

728 

729 Best-effort: tries every node so that a single broken connection 

730 does not prevent the remaining nodes from being unsubscribed. 

731 Broken pubsubs are cleaned up; the tracking state is still removed 

732 by the caller, so ``refresh_subscriptions`` will *not* re-subscribe 

733 these channels on replacement nodes. 

734 """ 

735 failed_nodes: list[str] = [] 

736 for node_name in list(self._node_pubsubs.keys()): 

737 pubsub = self._node_pubsubs.get(node_name) 

738 if pubsub is None: 

739 continue 

740 try: 

741 if use_punsubscribe: 

742 await pubsub.punsubscribe(*channels) 

743 else: 

744 await pubsub.unsubscribe(*channels) 

745 except Exception: 

746 await self._cleanup_node(node_name) 

747 failed_nodes.append(node_name) 

748 

749 if failed_nodes: 

750 logger.warning( 

751 "Failed to unsubscribe on cluster nodes: %s. " 

752 "These nodes will be re-created on the next refresh cycle.", 

753 ", ".join(failed_nodes), 

754 ) 

755 

756 async def get_message( 

757 self, 

758 ignore_subscribe_messages: bool | None = None, 

759 timeout: float = 0.0, 

760 ) -> KeyNotification | None: 

761 """ 

762 Get the next keyspace notification if one is available. 

763 

764 This method polls all node pubsubs in round-robin fashion until 

765 a message is received or the timeout expires. 

766 If a connection error occurs, subscriptions are automatically refreshed. 

767 

768 Args: 

769 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages. 

770 Defaults to the value set in __init__ (True). 

771 timeout: Total time to wait for a message (distributed across all nodes) 

772 

773 Returns: 

774 A KeyNotification if a notification is available, None otherwise. 

775 """ 

776 if self._closed: 

777 return None 

778 

779 total_nodes = len(self._node_pubsubs) 

780 if total_nodes == 0: 

781 # Sleep for the requested timeout so callers that loop 

782 # (run(), listen) don't spin the CPU when all node 

783 # connections have been cleaned up. 

784 if timeout > 0: 

785 await asyncio.sleep(timeout) 

786 return None 

787 

788 if ignore_subscribe_messages is None: 

789 ignore_subscribe_messages = self.ignore_subscribe_messages 

790 

791 # Handle timeout=0 as a single non-blocking poll over all pubsubs 

792 if timeout == 0.0: 

793 return await self._poll_all_nodes_once(ignore_subscribe_messages) 

794 

795 # Calculate per-node timeout for each poll 

796 per_node_timeout = min(0.1, timeout / max(total_nodes, 1)) 

797 

798 start_time = time.monotonic() 

799 end_time = start_time + timeout 

800 

801 while True: 

802 if time.monotonic() >= end_time: 

803 return None 

804 

805 pubsubs = list(self._node_pubsubs.values()) 

806 if not pubsubs: 

807 return None 

808 

809 # Round-robin polling 

810 self._poll_index = self._poll_index % len(pubsubs) 

811 pubsub = pubsubs[self._poll_index] 

812 self._poll_index += 1 

813 

814 try: 

815 message = await pubsub.get_message( 

816 ignore_subscribe_messages=ignore_subscribe_messages, 

817 timeout=per_node_timeout, 

818 ) 

819 except (ConnectionError, TimeoutError, RedisError): 

820 await self._refresh_subscriptions_on_error() 

821 continue 

822 

823 if message is not None: 

824 notification = KeyNotification.from_message( 

825 message, key_prefix=self.key_prefix 

826 ) 

827 if notification is not None: 

828 return notification 

829 

830 async def _poll_all_nodes_once( 

831 self, ignore_subscribe_messages: bool 

832 ) -> KeyNotification | None: 

833 """ 

834 Perform a single non-blocking poll over all node pubsubs. 

835 

836 Returns: 

837 A KeyNotification if one is available, None otherwise. 

838 """ 

839 had_error = False 

840 for pubsub in list(self._node_pubsubs.values()): 

841 try: 

842 message = await pubsub.get_message( 

843 ignore_subscribe_messages=ignore_subscribe_messages, 

844 timeout=0.0, 

845 ) 

846 except (ConnectionError, TimeoutError, RedisError): 

847 # Record the error but continue polling remaining healthy 

848 # nodes so that already-buffered notifications are not lost. 

849 had_error = True 

850 continue 

851 

852 if message is not None: 

853 notification = KeyNotification.from_message( 

854 message, key_prefix=self.key_prefix 

855 ) 

856 if notification is not None: 

857 # Refresh before returning if any node had an error, 

858 # so the next poll cycle has fresh state. 

859 if had_error: 

860 await self._refresh_subscriptions_on_error() 

861 return notification 

862 

863 # Refresh after polling all nodes if any had errors 

864 if had_error: 

865 await self._refresh_subscriptions_on_error() 

866 return None 

867 

868 async def listen(self) -> AsyncIterator[KeyNotification]: 

869 """ 

870 Listen for keyspace notifications from all cluster nodes. 

871 

872 This is an async generator that yields KeyNotification objects as they arrive. 

873 

874 Yields: 

875 KeyNotification objects for each keyspace/keyevent notification. 

876 

877 Example: 

878 >>> async for notification in ksn.listen(): 

879 ... print(f"{notification.key}: {notification.event_type}") 

880 """ 

881 while self.subscribed: 

882 notification = await self.get_message(timeout=1.0) 

883 if notification is not None: 

884 yield notification 

885 

886 async def _refresh_subscriptions_on_error(self): 

887 """ 

888 Refresh subscriptions after a connection error. 

889 

890 This is called automatically when a connection error occurs during 

891 get_message(). It checks if nodes changed before refreshing. 

892 """ 

893 self._poll_index = 0 # Reset round-robin index 

894 

895 try: 

896 await self.refresh_subscriptions() 

897 except Exception: 

898 logger.warning( 

899 "Failed to refresh cluster subscriptions, will retry on next error", 

900 exc_info=True, 

901 ) 

902 

903 def _is_pubsub_connected(self, pubsub) -> bool: 

904 """Check if a pubsub connection is still alive.""" 

905 try: 

906 conn = pubsub.connection 

907 if conn is None: 

908 return False 

909 # Async connections use is_connected property 

910 return conn.is_connected 

911 except Exception: 

912 return False 

913 

914 async def refresh_subscriptions(self): 

915 """ 

916 Refresh subscriptions after a topology change. 

917 

918 This method is called automatically when topology changes are detected 

919 or when connection errors occur. You can also call it manually if needed. 

920 

921 This method: 

922 1. Discovers any new primary nodes and subscribes them 

923 2. Removes pubsubs for nodes that are no longer primaries 

924 3. Re-creates broken pubsub connections for existing nodes 

925 """ 

926 async with self._refresh_lock: 

927 current_primaries = { 

928 node.name: node for node in self._get_all_primary_nodes() 

929 } 

930 

931 # Remove pubsubs for nodes that are no longer primaries 

932 removed_nodes = set(self._node_pubsubs.keys()) - set( 

933 current_primaries.keys() 

934 ) 

935 for node_name in removed_nodes: 

936 await self._cleanup_node(node_name) 

937 

938 # Detect broken connections for existing nodes and remove them 

939 # so they get re-created below 

940 existing_nodes = set(self._node_pubsubs.keys()) & set( 

941 current_primaries.keys() 

942 ) 

943 for node_name in existing_nodes: 

944 pubsub = self._node_pubsubs.get(node_name) 

945 if pubsub and not self._is_pubsub_connected(pubsub): 

946 # Connection is broken, remove it so it gets re-created 

947 await self._cleanup_node(node_name) 

948 

949 # Subscribe new nodes (and nodes with broken connections) 

950 # to existing patterns/channels 

951 new_nodes = set(current_primaries.keys()) - set(self._node_pubsubs.keys()) 

952 failed_nodes: list[str] = [] 

953 for node_name in new_nodes: 

954 node = current_primaries[node_name] 

955 pubsub = await self._ensure_node_pubsub(node) 

956 

957 try: 

958 if self._subscribed_patterns: 

959 await pubsub.psubscribe(**self._subscribed_patterns) 

960 if self._subscribed_channels: 

961 await pubsub.subscribe(**self._subscribed_channels) 

962 except Exception: 

963 # Subscription failed - remove from dict so retry is possible 

964 await self._cleanup_node(node_name) 

965 failed_nodes.append(node_name) 

966 

967 # Raise after attempting all nodes so we don't skip any 

968 if failed_nodes: 

969 raise ConnectionError( 

970 f"Failed to subscribe to cluster nodes: {', '.join(failed_nodes)}" 

971 ) 

972 

973 async def aclose(self): 

974 """Close all pubsub connections and clean up resources.""" 

975 self._closed = True 

976 for node_name in list(self._node_pubsubs.keys()): 

977 await self._cleanup_node(node_name) 

978 self._subscribed_patterns.clear() 

979 self._subscribed_channels.clear()