Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/asyncio/keyspace_notifications.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

369 statements  

1""" 

2Async Redis Keyspace Notifications support for redis-py. 

3 

4This module provides async utilities for subscribing to and parsing Redis 

5keyspace notifications. 

6 

7Standalone Redis Example: 

8 >>> from redis.asyncio import Redis 

9 >>> from redis.asyncio.keyspace_notifications import ( 

10 ... AsyncKeyspaceNotifications, 

11 ... ) 

12 >>> from redis.keyspace_notifications import KeyspaceChannel, EventType 

13 >>> 

14 >>> async def main(): 

15 ... async with Redis() as r: 

16 ... async with AsyncKeyspaceNotifications(r) as ksn: 

17 ... channel = KeyspaceChannel("user:*") 

18 ... await ksn.subscribe(channel) 

19 ... async for notification in ksn.listen(): 

20 ... print(f"Key: {notification.key}, Event: {notification.event_type}") 

21 

22Redis Cluster Example: 

23 >>> from redis.asyncio.cluster import RedisCluster 

24 >>> from redis.asyncio.keyspace_notifications import ( 

25 ... AsyncClusterKeyspaceNotifications, 

26 ... ) 

27 >>> from redis.keyspace_notifications import KeyspaceChannel, EventType 

28 >>> 

29 >>> async def main(): 

30 ... async with RedisCluster(host="localhost", port=7000) as rc: 

31 ... async with AsyncClusterKeyspaceNotifications(rc) as ksn: 

32 ... channel = KeyspaceChannel("user:*") 

33 ... await ksn.subscribe(channel) 

34 ... async for notification in ksn.listen(): 

35 ... print(f"Key: {notification.key}, Event: {notification.event_type}") 

36""" 

37 

38from __future__ import annotations 

39 

40import asyncio 

41import inspect 

42import logging 

43import time 

44from abc import ABC, abstractmethod 

45from collections.abc import AsyncIterator, Awaitable, Callable 

46from typing import Any 

47 

48from redis.asyncio.client import PubSub, Redis 

49from redis.asyncio.cluster import ClusterNode, RedisCluster, _ClusterNodePoolAdapter 

50from redis.exceptions import ( 

51 ConnectionError, 

52 RedisError, 

53 TimeoutError, 

54) 

55from redis.keyspace_notifications import ( 

56 ChannelT, 

57 KeyeventChannel, 

58 KeyNotification, 

59 KeyspaceChannel, 

60 SubkeyeventChannel, 

61 SubkeyspaceChannel, 

62 SubkeyspaceeventChannel, 

63 SubkeyspaceitemChannel, 

64 _is_pattern, 

65) 

66from redis.utils import safe_str 

67 

68logger = logging.getLogger(__name__) 

69 

70 

71# Type alias for handlers that can be sync or async 

72AsyncHandlerT = Callable[[KeyNotification], None | Awaitable[None]] 

73 

74 

75# ============================================================================= 

76# Async Interface for Keyspace Notifications 

77# ============================================================================= 

78 

79 

80class AsyncKeyspaceNotificationsInterface(ABC): 

81 """ 

82 Async interface for keyspace notification managers. 

83 

84 This interface provides a consistent async API for both standalone 

85 (AsyncKeyspaceNotifications) and cluster (AsyncClusterKeyspaceNotifications) 

86 implementations. 

87 """ 

88 

89 @abstractmethod 

90 async def subscribe( 

91 self, 

92 *channels: ChannelT, 

93 handler: AsyncHandlerT | None = None, 

94 ): 

95 """Subscribe to keyspace notification channels.""" 

96 pass 

97 

98 @abstractmethod 

99 async def unsubscribe(self, *channels: ChannelT): 

100 """Unsubscribe from keyspace notification channels.""" 

101 pass 

102 

103 @abstractmethod 

104 async def subscribe_keyspace( 

105 self, 

106 key_or_pattern: str, 

107 db: int = 0, 

108 handler: AsyncHandlerT | None = None, 

109 ): 

110 """Subscribe to keyspace notifications for specific keys.""" 

111 pass 

112 

113 @abstractmethod 

114 async def subscribe_keyevent( 

115 self, 

116 event: str, 

117 db: int = 0, 

118 handler: AsyncHandlerT | None = None, 

119 ): 

120 """Subscribe to keyevent notifications for specific event types.""" 

121 pass 

122 

123 @abstractmethod 

124 async def subscribe_subkeyspace( 

125 self, 

126 key_or_pattern: str, 

127 db: int = 0, 

128 handler: AsyncHandlerT | None = None, 

129 ): 

130 """Subscribe to subkeyspace notifications for specific keys.""" 

131 pass 

132 

133 @abstractmethod 

134 async def subscribe_subkeyevent( 

135 self, 

136 event: str, 

137 db: int = 0, 

138 handler: AsyncHandlerT | None = None, 

139 ): 

140 """Subscribe to subkeyevent notifications for specific event types.""" 

141 pass 

142 

143 @abstractmethod 

144 async def subscribe_subkeyspaceitem( 

145 self, 

146 key_or_pattern: str, 

147 subkey_or_pattern: str, 

148 db: int = 0, 

149 handler: AsyncHandlerT | None = None, 

150 ): 

151 """Subscribe to subkeyspaceitem notifications for a specific subkey.""" 

152 pass 

153 

154 @abstractmethod 

155 async def subscribe_subkeyspaceevent( 

156 self, 

157 event: str, 

158 key_or_pattern: str, 

159 db: int = 0, 

160 handler: AsyncHandlerT | None = None, 

161 ): 

162 """Subscribe to subkeyspaceevent notifications for an event on a key.""" 

163 pass 

164 

165 @abstractmethod 

166 async def get_message( 

167 self, 

168 ignore_subscribe_messages: bool | None = None, 

169 timeout: float = 0.0, 

170 ) -> KeyNotification | None: 

171 """Get the next keyspace notification if one is available.""" 

172 pass 

173 

174 @abstractmethod 

175 def listen(self) -> AsyncIterator[KeyNotification]: 

176 """Listen for keyspace notifications.""" 

177 pass 

178 

179 @abstractmethod 

180 async def aclose(self): 

181 """Close the notification manager and clean up resources.""" 

182 pass 

183 

184 @abstractmethod 

185 async def __aenter__(self): 

186 pass 

187 

188 @abstractmethod 

189 async def __aexit__(self, _exc_type, _exc_val, _exc_tb): 

190 pass 

191 

192 @property 

193 @abstractmethod 

194 def subscribed(self) -> bool: 

195 """Check if there are any active subscriptions and not closed.""" 

196 pass 

197 

198 @abstractmethod 

199 async def run( 

200 self, 

201 poll_timeout: float = 1.0, 

202 exception_handler: Callable[ 

203 [BaseException, AsyncKeyspaceNotificationsInterface], 

204 None | Awaitable[None], 

205 ] 

206 | None = None, 

207 ) -> None: 

208 """ 

209 Run the notification loop as a coroutine. 

210 

211 This is the async equivalent of run_in_thread() for sync notifications. 

212 Use asyncio.create_task() to run in the background. 

213 

214 The exception_handler can be either a sync or async function. 

215 """ 

216 pass 

217 

218 

219# ============================================================================= 

220# Abstract Base Class for Async Keyspace Notifications 

221# ============================================================================= 

222 

223 

224class AbstractAsyncKeyspaceNotifications(AsyncKeyspaceNotificationsInterface): 

225 """ 

226 Abstract base class for async keyspace notification managers. 

227 

228 Provides shared implementation for subscribe/unsubscribe logic. 

229 Subclasses must implement: 

230 - _execute_subscribe: Execute the subscribe operation 

231 - _execute_unsubscribe: Execute the unsubscribe operation 

232 - get_message: Get the next notification 

233 - listen: Async generator for notifications 

234 - aclose: Clean up resources 

235 """ 

236 

237 def __init__( 

238 self, 

239 key_prefix: str | bytes | None = None, 

240 ignore_subscribe_messages: bool = True, 

241 ): 

242 """ 

243 Initialize the base async keyspace notification manager. 

244 

245 Args: 

246 key_prefix: Optional prefix to filter and strip from keys in notifications 

247 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations 

248 are not returned by get_message/listen 

249 """ 

250 self.key_prefix = key_prefix 

251 self.ignore_subscribe_messages = ignore_subscribe_messages 

252 self._closed = False 

253 

254 async def subscribe( 

255 self, 

256 *channels: ChannelT, 

257 handler: AsyncHandlerT | None = None, 

258 ): 

259 """ 

260 Subscribe to keyspace notification channels. 

261 

262 Automatically detects whether each channel is a pattern (contains 

263 wildcards like *, ?, [) or an exact channel name and uses the 

264 appropriate Redis subscribe command internally. 

265 

266 The handler can be either a sync or async function. Note that a 

267 **sync** handler will be called directly on the event loop thread, 

268 so it must not perform blocking I/O or long-running computation — 

269 prefer an ``async`` handler whenever possible. 

270 """ 

271 # Wrap the handler to convert raw messages to KeyNotification objects 

272 wrapped_handler: Callable | None = None 

273 if handler is not None: 

274 key_prefix = self.key_prefix 

275 is_async_handler = inspect.iscoroutinefunction(handler) 

276 

277 if is_async_handler: 

278 # We've verified handler is async, so the result is awaitable 

279 async_handler = handler 

280 

281 async def _async_wrap_handler(message): 

282 notification = KeyNotification.from_message( 

283 message, key_prefix=key_prefix 

284 ) 

285 if notification is not None: 

286 await async_handler(notification) 

287 

288 wrapped_handler = _async_wrap_handler 

289 else: 

290 

291 def _sync_wrap_handler(message): 

292 notification = KeyNotification.from_message( 

293 message, key_prefix=key_prefix 

294 ) 

295 if notification is not None: 

296 handler(notification) 

297 

298 wrapped_handler = _sync_wrap_handler 

299 

300 patterns = {} 

301 exact_channels = {} 

302 

303 for channel in channels: 

304 if hasattr(channel, "_channel_str"): 

305 channel_str = str(channel) 

306 else: 

307 channel_str = safe_str(channel) 

308 if _is_pattern(channel): 

309 patterns[channel_str] = wrapped_handler 

310 else: 

311 exact_channels[channel_str] = wrapped_handler 

312 

313 # Delegate to subclass implementation first. For standalone Redis 

314 # this raises on failure, keeping tracking state clean. For cluster 

315 # implementations the operation is best-effort (partial failures are 

316 # logged, not raised) so tracking state is always updated afterwards. 

317 await self._execute_subscribe(patterns, exact_channels) 

318 self._track_subscribe(patterns, exact_channels) 

319 

320 @abstractmethod 

321 async def _execute_subscribe( 

322 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

323 ) -> None: 

324 """Execute the subscribe operation.""" 

325 pass 

326 

327 async def unsubscribe(self, *channels: ChannelT): 

328 """Unsubscribe from keyspace notification channels.""" 

329 patterns = [] 

330 exact_channels = [] 

331 

332 for channel in channels: 

333 if hasattr(channel, "_channel_str"): 

334 channel_str = str(channel) 

335 else: 

336 channel_str = safe_str(channel) 

337 if _is_pattern(channel): 

338 patterns.append(channel_str) 

339 else: 

340 exact_channels.append(channel_str) 

341 

342 # Delegate to subclass implementation first. For standalone Redis 

343 # this raises on failure, keeping tracking state intact. For cluster 

344 # implementations the operation is best-effort (partial failures are 

345 # logged, not raised) so tracking state is always removed afterwards 

346 # — this is intentional: the user asked to unsubscribe, so 

347 # refresh_subscriptions should not re-subscribe these channels. 

348 await self._execute_unsubscribe(patterns, exact_channels) 

349 self._untrack_subscribe(patterns, exact_channels) 

350 

351 @abstractmethod 

352 async def _execute_unsubscribe( 

353 self, patterns: list[str], exact_channels: list[str] 

354 ) -> None: 

355 """Execute the unsubscribe operation.""" 

356 pass 

357 

358 def _track_subscribe( 

359 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

360 ) -> None: 

361 """Track newly subscribed patterns/channels. 

362 

363 Override in subclasses that need to maintain their own subscription 

364 registry (e.g. cluster implementations that must re-subscribe 

365 new/failed-over nodes). The default is a no-op because standalone 

366 implementations delegate tracking to the underlying PubSub object. 

367 """ 

368 

369 def _untrack_subscribe( 

370 self, patterns: list[str], exact_channels: list[str] 

371 ) -> None: 

372 """Remove patterns/channels from the subscription registry. 

373 

374 Override in subclasses that maintain their own subscription registry. 

375 The default is a no-op. 

376 """ 

377 

378 async def subscribe_keyspace( 

379 self, 

380 key_or_pattern: str, 

381 db: int = 0, 

382 handler: AsyncHandlerT | None = None, 

383 ): 

384 """Subscribe to keyspace notifications for specific keys.""" 

385 channel = KeyspaceChannel(key_or_pattern, db=db) 

386 await self.subscribe(channel, handler=handler) 

387 

388 async def subscribe_keyevent( 

389 self, 

390 event: str, 

391 db: int = 0, 

392 handler: AsyncHandlerT | None = None, 

393 ): 

394 """Subscribe to keyevent notifications for specific event types.""" 

395 channel = KeyeventChannel(event, db=db) 

396 await self.subscribe(channel, handler=handler) 

397 

398 async def subscribe_subkeyspace( 

399 self, 

400 key_or_pattern: str, 

401 db: int = 0, 

402 handler: AsyncHandlerT | None = None, 

403 ): 

404 """Subscribe to subkeyspace notifications for specific keys.""" 

405 channel = SubkeyspaceChannel(key_or_pattern, db=db) 

406 await self.subscribe(channel, handler=handler) 

407 

408 async def subscribe_subkeyevent( 

409 self, 

410 event: str, 

411 db: int = 0, 

412 handler: AsyncHandlerT | None = None, 

413 ): 

414 """Subscribe to subkeyevent notifications for specific event types.""" 

415 channel = SubkeyeventChannel(event, db=db) 

416 await self.subscribe(channel, handler=handler) 

417 

418 async def subscribe_subkeyspaceitem( 

419 self, 

420 key_or_pattern: str, 

421 subkey_or_pattern: str, 

422 db: int = 0, 

423 handler: AsyncHandlerT | None = None, 

424 ): 

425 """Subscribe to subkeyspaceitem notifications for a specific subkey.""" 

426 channel = SubkeyspaceitemChannel(key_or_pattern, subkey_or_pattern, db=db) 

427 await self.subscribe(channel, handler=handler) 

428 

429 async def subscribe_subkeyspaceevent( 

430 self, 

431 event: str, 

432 key_or_pattern: str, 

433 db: int = 0, 

434 handler: AsyncHandlerT | None = None, 

435 ): 

436 """Subscribe to subkeyspaceevent notifications for an event on a key.""" 

437 channel = SubkeyspaceeventChannel(event, key_or_pattern, db=db) 

438 await self.subscribe(channel, handler=handler) 

439 

440 async def __aenter__(self): 

441 return self 

442 

443 async def __aexit__(self, _exc_type, _exc_val, _exc_tb): 

444 await self.aclose() 

445 return False 

446 

447 async def run( 

448 self, 

449 poll_timeout: float = 1.0, 

450 exception_handler: Callable[ 

451 [BaseException, AsyncKeyspaceNotificationsInterface], 

452 None | Awaitable[None], 

453 ] 

454 | None = None, 

455 ) -> None: 

456 """ 

457 Run the notification loop as a coroutine. 

458 

459 This continuously polls for notifications and triggers handlers. 

460 Use asyncio.create_task() to run in the background. 

461 

462 Args: 

463 poll_timeout: Timeout in seconds for each get_message call. 

464 exception_handler: Optional callback for handling exceptions. 

465 Can be sync or async. 

466 """ 

467 while self.subscribed: 

468 try: 

469 await self.get_message(timeout=poll_timeout) 

470 except asyncio.CancelledError: 

471 raise 

472 except BaseException as e: 

473 if exception_handler is not None: 

474 result = exception_handler(e, self) 

475 if inspect.isawaitable(result): 

476 await result 

477 else: 

478 raise 

479 

480 

481# ============================================================================= 

482# Standalone Async Keyspace Notification Manager 

483# ============================================================================= 

484 

485 

486class AsyncKeyspaceNotifications(AbstractAsyncKeyspaceNotifications): 

487 """ 

488 Manages keyspace notification subscriptions for standalone async Redis. 

489 

490 For standalone Redis, keyspace notifications work with a single PubSub 

491 connection. This class wraps that connection and provides: 

492 - Automatic pattern vs exact channel detection 

493 - KeyNotification parsing with optional key_prefix filtering 

494 - Convenience methods for keyspace and keyevent subscriptions 

495 - Context manager and run() coroutine support 

496 """ 

497 

498 def __init__( 

499 self, 

500 redis_client: Redis, 

501 key_prefix: str | bytes | None = None, 

502 ignore_subscribe_messages: bool = True, 

503 ): 

504 """ 

505 Initialize the standalone async keyspace notification manager. 

506 

507 Note: Keyspace notifications must be enabled on the Redis server via 

508 the ``notify-keyspace-events`` configuration option. 

509 

510 Args: 

511 redis_client: An async Redis client instance 

512 key_prefix: Optional prefix to filter and strip from keys in notifications 

513 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations 

514 are not returned by get_message/listen 

515 """ 

516 super().__init__(key_prefix, ignore_subscribe_messages) 

517 self.redis = redis_client 

518 # Create PubSub with ignore_subscribe_messages=False so per-call arg works 

519 self._pubsub: PubSub = redis_client.pubsub(ignore_subscribe_messages=False) 

520 

521 async def _execute_subscribe( 

522 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

523 ) -> None: 

524 """Execute subscribe on the single pubsub connection.""" 

525 if patterns: 

526 await self._pubsub.psubscribe(**patterns) 

527 if exact_channels: 

528 await self._pubsub.subscribe(**exact_channels) 

529 

530 async def _execute_unsubscribe( 

531 self, patterns: list[str], exact_channels: list[str] 

532 ) -> None: 

533 """Execute unsubscribe on the single pubsub connection.""" 

534 if patterns: 

535 await self._pubsub.punsubscribe(*patterns) 

536 if exact_channels: 

537 await self._pubsub.unsubscribe(*exact_channels) 

538 

539 async def get_message( 

540 self, 

541 ignore_subscribe_messages: bool | None = None, 

542 timeout: float = 0.0, 

543 ) -> KeyNotification | None: 

544 """ 

545 Get the next keyspace notification if one is available. 

546 

547 Args: 

548 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages. 

549 Defaults to the value set in __init__ (True). 

550 timeout: Time to wait for a message. 

551 

552 Returns: 

553 A KeyNotification if a notification is available and no handler 

554 was registered for the channel, None otherwise. 

555 """ 

556 if ignore_subscribe_messages is None: 

557 ignore_subscribe_messages = self.ignore_subscribe_messages 

558 

559 if self._closed: 

560 return None 

561 

562 message = await self._pubsub.get_message( 

563 ignore_subscribe_messages=ignore_subscribe_messages, 

564 timeout=timeout, 

565 ) 

566 

567 if message is not None: 

568 return KeyNotification.from_message(message, key_prefix=self.key_prefix) 

569 

570 return None 

571 

572 async def listen(self) -> AsyncIterator[KeyNotification]: 

573 """ 

574 Listen for keyspace notifications. 

575 

576 This is an async generator that yields KeyNotification objects as they arrive. 

577 

578 Yields: 

579 KeyNotification objects for each keyspace/keyevent notification. 

580 

581 Example: 

582 >>> async for notification in ksn.listen(): 

583 ... print(f"{notification.key}: {notification.event_type}") 

584 """ 

585 while self.subscribed: 

586 notification = await self.get_message(timeout=1.0) 

587 if notification is not None: 

588 yield notification 

589 

590 @property 

591 def subscribed(self) -> bool: 

592 """Check if there are any active subscriptions and not closed.""" 

593 return not self._closed and self._pubsub.subscribed 

594 

595 async def aclose(self): 

596 """Close the pubsub connection and clean up resources.""" 

597 self._closed = True 

598 try: 

599 await self._pubsub.aclose() 

600 except Exception: 

601 pass 

602 

603 

604# ============================================================================= 

605# Cluster-Aware Async Keyspace Notification Manager 

606# ============================================================================= 

607 

608 

609class AsyncClusterKeyspaceNotifications(AbstractAsyncKeyspaceNotifications): 

610 """ 

611 Manages keyspace notification subscriptions across all nodes in an async Redis Cluster. 

612 

613 In Redis Cluster, keyspace notifications are NOT broadcast between nodes. 

614 Each node only emits notifications for keys it owns. This class automatically 

615 subscribes to all primary nodes in the cluster and handles topology changes. 

616 """ 

617 

618 def __init__( 

619 self, 

620 redis_cluster: RedisCluster, 

621 key_prefix: str | bytes | None = None, 

622 ignore_subscribe_messages: bool = True, 

623 ): 

624 """ 

625 Initialize the async cluster keyspace notification manager. 

626 

627 Note: Keyspace notifications must be enabled on all Redis cluster nodes via 

628 the ``notify-keyspace-events`` configuration option. 

629 

630 Args: 

631 redis_cluster: An async RedisCluster instance 

632 key_prefix: Optional prefix to filter and strip from keys in notifications 

633 ignore_subscribe_messages: If True, subscribe/unsubscribe confirmations 

634 are not returned by get_message/listen 

635 """ 

636 super().__init__(key_prefix, ignore_subscribe_messages) 

637 self.cluster = redis_cluster 

638 

639 # Canonical subscription registry: pattern/channel -> wrapped handler. 

640 # In cluster mode there are multiple PubSub objects (one per node), so 

641 # this is the single source of truth used to (re-)subscribe new or 

642 # failed-over nodes. 

643 self._subscribed_patterns: dict[str, Any] = {} 

644 self._subscribed_channels: dict[str, Any] = {} 

645 

646 # Track subscriptions per node 

647 self._node_pubsubs: dict[str, PubSub] = {} 

648 

649 # Lock for topology refresh operations 

650 self._refresh_lock = asyncio.Lock() 

651 

652 # Current pubsub index for round-robin polling 

653 self._poll_index = 0 

654 

655 @property 

656 def subscribed(self) -> bool: 

657 """Check if there are any active subscriptions and not closed.""" 

658 return not self._closed and bool( 

659 self._subscribed_patterns or self._subscribed_channels 

660 ) 

661 

662 def _track_subscribe( 

663 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

664 ) -> None: 

665 """Track newly subscribed patterns/channels in the cluster registry.""" 

666 if patterns: 

667 self._subscribed_patterns.update(patterns) 

668 if exact_channels: 

669 self._subscribed_channels.update(exact_channels) 

670 

671 def _untrack_subscribe( 

672 self, patterns: list[str], exact_channels: list[str] 

673 ) -> None: 

674 """Remove patterns/channels from the cluster registry.""" 

675 for p in patterns: 

676 self._subscribed_patterns.pop(p, None) 

677 for c in exact_channels: 

678 self._subscribed_channels.pop(c, None) 

679 

680 def _get_all_primary_nodes(self) -> list[ClusterNode]: 

681 """Get all primary nodes in the cluster.""" 

682 return self.cluster.get_primaries() 

683 

684 async def _ensure_node_pubsub(self, node: ClusterNode) -> PubSub: 

685 """Get or create a PubSub instance for a node. 

686 

687 Uses a :class:`_ClusterNodePoolAdapter` to borrow a connection 

688 from the node's existing pool. When the ``PubSub`` is closed 

689 the connection is disconnected and returned to the node, 

690 ensuring no subscribed socket is left in the free queue. 

691 """ 

692 if node.name not in self._node_pubsubs: 

693 pool_adapter = _ClusterNodePoolAdapter(node) 

694 pubsub = PubSub( 

695 connection_pool=pool_adapter, # type: ignore[arg-type] 

696 ignore_subscribe_messages=False, 

697 ) 

698 self._node_pubsubs[node.name] = pubsub 

699 return self._node_pubsubs[node.name] 

700 

701 async def _cleanup_node(self, node_name: str) -> None: 

702 """Remove and close a node's PubSub. 

703 

704 ``PubSub.aclose()`` disconnects the connection and releases it 

705 back to the underlying :class:`ClusterNode` via the adapter. 

706 """ 

707 pubsub = self._node_pubsubs.pop(node_name, None) 

708 if pubsub: 

709 try: 

710 await pubsub.aclose() 

711 except Exception: 

712 pass 

713 

714 async def _execute_subscribe( 

715 self, patterns: dict[str, Any], exact_channels: dict[str, Any] 

716 ) -> None: 

717 """Execute subscribe on all cluster nodes. 

718 

719 Patterns and exact channels are subscribed in a single pass over 

720 nodes so that a mid-batch node failure cannot create a 

721 partially-caught-up replacement. If a node fails during this 

722 call it is removed from ``_node_pubsubs`` and will be fully 

723 re-subscribed on the next ``refresh_subscriptions`` cycle. 

724 

725 If a newly discovered node is encountered (not yet in 

726 ``_node_pubsubs``), it is also subscribed to all *previously* 

727 tracked patterns/channels so it doesn't miss notifications for 

728 subscriptions that were established before this node joined. 

729 """ 

730 if not patterns and not exact_channels: 

731 return 

732 

733 failed_nodes: list[str] = [] 

734 for node in self._get_all_primary_nodes(): 

735 is_new_node = node.name not in self._node_pubsubs 

736 pubsub = await self._ensure_node_pubsub(node) 

737 try: 

738 # If this is a brand-new node, catch it up on existing 

739 # subscriptions before adding the new channels. 

740 if is_new_node: 

741 if self._subscribed_patterns: 

742 await pubsub.psubscribe(**self._subscribed_patterns) 

743 if self._subscribed_channels: 

744 await pubsub.subscribe(**self._subscribed_channels) 

745 

746 if patterns: 

747 await pubsub.psubscribe(**patterns) 

748 if exact_channels: 

749 await pubsub.subscribe(**exact_channels) 

750 except Exception: 

751 # Remove the broken pubsub and its connection pool 

752 # so refresh_subscriptions can re-create both later. 

753 await self._cleanup_node(node.name) 

754 failed_nodes.append(node.name) 

755 

756 if failed_nodes: 

757 logger.warning( 

758 "Failed to subscribe on cluster nodes: %s. " 

759 "These nodes will be retried on the next refresh cycle.", 

760 ", ".join(failed_nodes), 

761 ) 

762 

763 async def _execute_unsubscribe( 

764 self, patterns: list[str], exact_channels: list[str] 

765 ) -> None: 

766 """Execute unsubscribe on all cluster nodes.""" 

767 if patterns: 

768 await self._unsubscribe_from_all_nodes(patterns, use_punsubscribe=True) 

769 if exact_channels: 

770 await self._unsubscribe_from_all_nodes( 

771 exact_channels, use_punsubscribe=False 

772 ) 

773 

774 async def _unsubscribe_from_all_nodes( 

775 self, channels: list[str], use_punsubscribe: bool 

776 ): 

777 """Unsubscribe from patterns/channels on all nodes. 

778 

779 Best-effort: tries every node so that a single broken connection 

780 does not prevent the remaining nodes from being unsubscribed. 

781 Broken pubsubs are cleaned up; the tracking state is still removed 

782 by the caller, so ``refresh_subscriptions`` will *not* re-subscribe 

783 these channels on replacement nodes. 

784 """ 

785 failed_nodes: list[str] = [] 

786 for node_name in list(self._node_pubsubs.keys()): 

787 pubsub = self._node_pubsubs.get(node_name) 

788 if pubsub is None: 

789 continue 

790 try: 

791 if use_punsubscribe: 

792 await pubsub.punsubscribe(*channels) 

793 else: 

794 await pubsub.unsubscribe(*channels) 

795 except Exception: 

796 await self._cleanup_node(node_name) 

797 failed_nodes.append(node_name) 

798 

799 if failed_nodes: 

800 logger.warning( 

801 "Failed to unsubscribe on cluster nodes: %s. " 

802 "These nodes will be re-created on the next refresh cycle.", 

803 ", ".join(failed_nodes), 

804 ) 

805 

806 async def get_message( 

807 self, 

808 ignore_subscribe_messages: bool | None = None, 

809 timeout: float = 0.0, 

810 ) -> KeyNotification | None: 

811 """ 

812 Get the next keyspace notification if one is available. 

813 

814 This method polls all node pubsubs in round-robin fashion until 

815 a message is received or the timeout expires. 

816 If a connection error occurs, subscriptions are automatically refreshed. 

817 

818 Args: 

819 ignore_subscribe_messages: If True, skip subscribe/unsubscribe messages. 

820 Defaults to the value set in __init__ (True). 

821 timeout: Total time to wait for a message (distributed across all nodes) 

822 

823 Returns: 

824 A KeyNotification if a notification is available, None otherwise. 

825 """ 

826 if self._closed: 

827 return None 

828 

829 total_nodes = len(self._node_pubsubs) 

830 if total_nodes == 0: 

831 # Sleep for the requested timeout so callers that loop 

832 # (run(), listen) don't spin the CPU when all node 

833 # connections have been cleaned up. 

834 if timeout > 0: 

835 await asyncio.sleep(timeout) 

836 return None 

837 

838 if ignore_subscribe_messages is None: 

839 ignore_subscribe_messages = self.ignore_subscribe_messages 

840 

841 # Handle timeout=0 as a single non-blocking poll over all pubsubs 

842 if timeout == 0.0: 

843 return await self._poll_all_nodes_once(ignore_subscribe_messages) 

844 

845 # Calculate per-node timeout for each poll 

846 per_node_timeout = min(0.1, timeout / max(total_nodes, 1)) 

847 

848 start_time = time.monotonic() 

849 end_time = start_time + timeout 

850 

851 while True: 

852 if time.monotonic() >= end_time: 

853 return None 

854 

855 pubsubs = list(self._node_pubsubs.values()) 

856 if not pubsubs: 

857 return None 

858 

859 # Round-robin polling 

860 self._poll_index = self._poll_index % len(pubsubs) 

861 pubsub = pubsubs[self._poll_index] 

862 self._poll_index += 1 

863 

864 try: 

865 message = await pubsub.get_message( 

866 ignore_subscribe_messages=ignore_subscribe_messages, 

867 timeout=per_node_timeout, 

868 ) 

869 except (ConnectionError, TimeoutError, RedisError): 

870 await self._refresh_subscriptions_on_error() 

871 continue 

872 

873 if message is not None: 

874 notification = KeyNotification.from_message( 

875 message, key_prefix=self.key_prefix 

876 ) 

877 if notification is not None: 

878 return notification 

879 

880 async def _poll_all_nodes_once( 

881 self, ignore_subscribe_messages: bool 

882 ) -> KeyNotification | None: 

883 """ 

884 Perform a single non-blocking poll over all node pubsubs. 

885 

886 Returns: 

887 A KeyNotification if one is available, None otherwise. 

888 """ 

889 had_error = False 

890 for pubsub in list(self._node_pubsubs.values()): 

891 try: 

892 message = await pubsub.get_message( 

893 ignore_subscribe_messages=ignore_subscribe_messages, 

894 timeout=0.0, 

895 ) 

896 except (ConnectionError, TimeoutError, RedisError): 

897 # Record the error but continue polling remaining healthy 

898 # nodes so that already-buffered notifications are not lost. 

899 had_error = True 

900 continue 

901 

902 if message is not None: 

903 notification = KeyNotification.from_message( 

904 message, key_prefix=self.key_prefix 

905 ) 

906 if notification is not None: 

907 # Refresh before returning if any node had an error, 

908 # so the next poll cycle has fresh state. 

909 if had_error: 

910 await self._refresh_subscriptions_on_error() 

911 return notification 

912 

913 # Refresh after polling all nodes if any had errors 

914 if had_error: 

915 await self._refresh_subscriptions_on_error() 

916 return None 

917 

918 async def listen(self) -> AsyncIterator[KeyNotification]: 

919 """ 

920 Listen for keyspace notifications from all cluster nodes. 

921 

922 This is an async generator that yields KeyNotification objects as they arrive. 

923 

924 Yields: 

925 KeyNotification objects for each keyspace/keyevent notification. 

926 

927 Example: 

928 >>> async for notification in ksn.listen(): 

929 ... print(f"{notification.key}: {notification.event_type}") 

930 """ 

931 while self.subscribed: 

932 notification = await self.get_message(timeout=1.0) 

933 if notification is not None: 

934 yield notification 

935 

936 async def _refresh_subscriptions_on_error(self): 

937 """ 

938 Refresh subscriptions after a connection error. 

939 

940 This is called automatically when a connection error occurs during 

941 get_message(). It checks if nodes changed before refreshing. 

942 """ 

943 self._poll_index = 0 # Reset round-robin index 

944 

945 try: 

946 await self.refresh_subscriptions() 

947 except Exception: 

948 logger.warning( 

949 "Failed to refresh cluster subscriptions, will retry on next error", 

950 exc_info=True, 

951 ) 

952 

953 def _is_pubsub_connected(self, pubsub) -> bool: 

954 """Check if a pubsub connection is still alive.""" 

955 try: 

956 conn = pubsub.connection 

957 if conn is None: 

958 return False 

959 # Async connections use is_connected property 

960 return conn.is_connected 

961 except Exception: 

962 return False 

963 

964 async def refresh_subscriptions(self): 

965 """ 

966 Refresh subscriptions after a topology change. 

967 

968 This method is called automatically when topology changes are detected 

969 or when connection errors occur. You can also call it manually if needed. 

970 

971 This method: 

972 1. Discovers any new primary nodes and subscribes them 

973 2. Removes pubsubs for nodes that are no longer primaries 

974 3. Re-creates broken pubsub connections for existing nodes 

975 """ 

976 async with self._refresh_lock: 

977 current_primaries = { 

978 node.name: node for node in self._get_all_primary_nodes() 

979 } 

980 

981 # Remove pubsubs for nodes that are no longer primaries 

982 removed_nodes = set(self._node_pubsubs.keys()) - set( 

983 current_primaries.keys() 

984 ) 

985 for node_name in removed_nodes: 

986 await self._cleanup_node(node_name) 

987 

988 # Detect broken connections for existing nodes and remove them 

989 # so they get re-created below 

990 existing_nodes = set(self._node_pubsubs.keys()) & set( 

991 current_primaries.keys() 

992 ) 

993 for node_name in existing_nodes: 

994 pubsub = self._node_pubsubs.get(node_name) 

995 if pubsub and not self._is_pubsub_connected(pubsub): 

996 # Connection is broken, remove it so it gets re-created 

997 await self._cleanup_node(node_name) 

998 

999 # Subscribe new nodes (and nodes with broken connections) 

1000 # to existing patterns/channels 

1001 new_nodes = set(current_primaries.keys()) - set(self._node_pubsubs.keys()) 

1002 failed_nodes: list[str] = [] 

1003 for node_name in new_nodes: 

1004 node = current_primaries[node_name] 

1005 pubsub = await self._ensure_node_pubsub(node) 

1006 

1007 try: 

1008 if self._subscribed_patterns: 

1009 await pubsub.psubscribe(**self._subscribed_patterns) 

1010 if self._subscribed_channels: 

1011 await pubsub.subscribe(**self._subscribed_channels) 

1012 except Exception: 

1013 # Subscription failed - remove from dict so retry is possible 

1014 await self._cleanup_node(node_name) 

1015 failed_nodes.append(node_name) 

1016 

1017 # Raise after attempting all nodes so we don't skip any 

1018 if failed_nodes: 

1019 raise ConnectionError( 

1020 f"Failed to subscribe to cluster nodes: {', '.join(failed_nodes)}" 

1021 ) 

1022 

1023 async def aclose(self): 

1024 """Close all pubsub connections and clean up resources.""" 

1025 self._closed = True 

1026 for node_name in list(self._node_pubsubs.keys()): 

1027 await self._cleanup_node(node_name) 

1028 self._subscribed_patterns.clear() 

1029 self._subscribed_channels.clear()