Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1786 statements  

1import logging 

2import random 

3import socket 

4import sys 

5import threading 

6import time 

7import weakref 

8from abc import ABC, abstractmethod 

9from collections import OrderedDict, defaultdict 

10from concurrent.futures import Future, ThreadPoolExecutor 

11from copy import copy 

12from enum import Enum 

13from itertools import chain 

14from types import MethodType 

15from typing import ( 

16 TYPE_CHECKING, 

17 Any, 

18 Callable, 

19 Dict, 

20 List, 

21 Literal, 

22 Optional, 

23 Set, 

24 Tuple, 

25 Type, 

26 Union, 

27) 

28 

29if TYPE_CHECKING: 

30 from redis.keyspace_notifications import ClusterKeyspaceNotifications 

31 

32from redis._parsers import CommandsParser, Encoder 

33from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy 

34from redis._parsers.helpers import parse_scan 

35from redis.backoff import ExponentialWithJitterBackoff, NoBackoff 

36from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface 

37from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis 

38from redis.commands import READ_COMMANDS, RedisClusterCommands 

39from redis.commands.helpers import list_or_args 

40from redis.commands.policies import PolicyResolver, StaticPolicyResolver 

41from redis.connection import ( 

42 Connection, 

43 ConnectionPool, 

44 parse_url, 

45) 

46from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

47from redis.event import ( 

48 AfterPooledConnectionsInstantiationEvent, 

49 AfterPubSubConnectionInstantiationEvent, 

50 AfterSlotsCacheRefreshEvent, 

51 ClientType, 

52 EventDispatcher, 

53 EventListenerInterface, 

54) 

55from redis.exceptions import ( 

56 AskError, 

57 AuthenticationError, 

58 ClusterDownError, 

59 ClusterError, 

60 ConnectionError, 

61 CrossSlotTransactionError, 

62 DataError, 

63 ExecAbortError, 

64 InvalidPipelineStack, 

65 MaxConnectionsError, 

66 MovedError, 

67 RedisClusterException, 

68 RedisError, 

69 ResponseError, 

70 SlotNotCoveredError, 

71 TimeoutError, 

72 TryAgainError, 

73 WatchError, 

74) 

75from redis.lock import Lock 

76from redis.maint_notifications import ( 

77 MaintNotificationsConfig, 

78 OSSMaintNotificationsHandler, 

79) 

80from redis.observability.recorder import ( 

81 record_error_count, 

82 record_operation_duration, 

83) 

84from redis.retry import Retry 

85from redis.utils import ( 

86 DEFAULT_RESP_VERSION, 

87 check_protocol_version, 

88 deprecated_args, 

89 deprecated_function, 

90 dict_merge, 

91 list_keys_to_dict, 

92 merge_result, 

93 safe_str, 

94 str_if_bytes, 

95 truncate_text, 

96) 

97 

98logger = logging.getLogger(__name__) 

99 

100 

101def is_debug_log_enabled(): 

102 return logger.isEnabledFor(logging.DEBUG) 

103 

104 

105def get_node_name(host: str, port: Union[str, int]) -> str: 

106 return f"{host}:{port}" 

107 

108 

109@deprecated_args( 

110 allowed_args=["redis_node"], 

111 reason="Use get_connection(redis_node) instead", 

112 version="5.3.0", 

113) 

114def get_connection(redis_node: Redis, *args, **options) -> Connection: 

115 return redis_node.connection or redis_node.connection_pool.get_connection() 

116 

117 

118def parse_scan_result(command, res, **options): 

119 cursors = {} 

120 ret = [] 

121 for node_name, response in res.items(): 

122 cursor, r = parse_scan(response, **options) 

123 cursors[node_name] = cursor 

124 ret += r 

125 

126 return cursors, ret 

127 

128 

129def parse_pubsub_numsub(command, res, **options): 

130 numsub_d = OrderedDict() 

131 for numsub_tups in res.values(): 

132 for channel, numsubbed in numsub_tups: 

133 try: 

134 numsub_d[channel] += numsubbed 

135 except KeyError: 

136 numsub_d[channel] = numsubbed 

137 

138 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

139 return ret_numsub 

140 

141 

142def parse_cluster_slots( 

143 resp: Any, **options: Any 

144) -> Dict[Tuple[int, int], Dict[str, Any]]: 

145 current_host = options.get("current_host", "") 

146 

147 def fix_server(*args: Any) -> Tuple[str, Any]: 

148 return str_if_bytes(args[0]) or current_host, args[1] 

149 

150 slots = {} 

151 for slot in resp: 

152 start, end, primary = slot[:3] 

153 replicas = slot[3:] 

154 slots[start, end] = { 

155 "primary": fix_server(*primary), 

156 "replicas": [fix_server(*replica) for replica in replicas], 

157 } 

158 

159 return slots 

160 

161 

162def parse_cluster_shards(resp, **options): 

163 """ 

164 Parse CLUSTER SHARDS response. 

165 

166 Normalises the output so that all dictionary keys are strings regardless 

167 of protocol version (RESP2 returns bytes keys for node attributes, 

168 RESP3 returns bytes keys at every level). 

169 """ 

170 if isinstance(resp[0], dict): 

171 # RESP3 – native dicts with bytes keys; decode them. 

172 shards = [] 

173 for item in resp: 

174 shard = { 

175 "slots": item.get("slots") or item.get(b"slots", []), 

176 "nodes": [], 

177 } 

178 raw_nodes = item.get("nodes") or item.get(b"nodes", []) 

179 for node in raw_nodes: 

180 if isinstance(node, dict): 

181 shard["nodes"].append({str_if_bytes(k): v for k, v in node.items()}) 

182 else: 

183 shard["nodes"].append(node) 

184 shards.append(shard) 

185 return shards 

186 

187 # RESP2 – flat list structure; build dicts with string keys. 

188 shards = [] 

189 for x in resp: 

190 shard = {"slots": [], "nodes": []} 

191 for i in range(0, len(x[1]), 2): 

192 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

193 nodes = x[3] 

194 for node in nodes: 

195 dict_node = {} 

196 for i in range(0, len(node), 2): 

197 dict_node[str_if_bytes(node[i])] = node[i + 1] 

198 shard["nodes"].append(dict_node) 

199 shards.append(shard) 

200 

201 return shards 

202 

203 

204def parse_cluster_myshardid(resp, **options): 

205 """ 

206 Parse CLUSTER MYSHARDID response. 

207 """ 

208 return resp.decode("utf-8") 

209 

210 

211PRIMARY = "primary" 

212REPLICA = "replica" 

213SLOT_ID = "slot-id" 

214 

215REDIS_ALLOWED_KEYS = ( 

216 "connection_class", 

217 "connection_pool", 

218 "connection_pool_class", 

219 "client_name", 

220 "credential_provider", 

221 "db", 

222 "decode_responses", 

223 "encoding", 

224 "encoding_errors", 

225 "host", 

226 "lib_name", 

227 "lib_version", 

228 "max_connections", 

229 "nodes_flag", 

230 "redis_connect_func", 

231 "password", 

232 "port", 

233 "timeout", 

234 "queue_class", 

235 "retry", 

236 "retry_on_timeout", 

237 "protocol", 

238 "socket_connect_timeout", 

239 "socket_keepalive", 

240 "socket_keepalive_options", 

241 "socket_timeout", 

242 "ssl", 

243 "ssl_ca_certs", 

244 "ssl_ca_data", 

245 "ssl_ca_path", 

246 "ssl_certfile", 

247 "ssl_cert_reqs", 

248 "ssl_include_verify_flags", 

249 "ssl_exclude_verify_flags", 

250 "ssl_keyfile", 

251 "ssl_password", 

252 "ssl_check_hostname", 

253 "unix_socket_path", 

254 "username", 

255 "cache", 

256 "cache_config", 

257 "maint_notifications_config", 

258) 

259KWARGS_DISABLED_KEYS = ("host", "port", "retry") 

260 

261 

262def cleanup_kwargs(**kwargs): 

263 """ 

264 Remove unsupported or disabled keys from kwargs 

265 """ 

266 connection_kwargs = { 

267 k: v 

268 for k, v in kwargs.items() 

269 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

270 } 

271 

272 return connection_kwargs 

273 

274 

275class MaintNotificationsAbstractRedisCluster: 

276 """ 

277 Abstract class for handling maintenance notifications logic. 

278 This class is expected to be used as base class together with RedisCluster. 

279 

280 This class is intended to be used with multiple inheritance! 

281 

282 All logic related to maintenance notifications is encapsulated in this class. 

283 """ 

284 

285 def __init__( 

286 self, 

287 maint_notifications_config: Optional[MaintNotificationsConfig], 

288 **kwargs, 

289 ): 

290 # Initialize maintenance notifications 

291 is_protocol_supported = check_protocol_version( 

292 kwargs.get("protocol", DEFAULT_RESP_VERSION), 3 

293 ) 

294 

295 if ( 

296 maint_notifications_config 

297 and maint_notifications_config.enabled 

298 and not is_protocol_supported 

299 ): 

300 raise RedisError( 

301 "Maintenance notifications handlers on connection are only supported with RESP version 3" 

302 ) 

303 if maint_notifications_config is None and is_protocol_supported: 

304 maint_notifications_config = MaintNotificationsConfig() 

305 

306 self.maint_notifications_config = maint_notifications_config 

307 

308 if self.maint_notifications_config and self.maint_notifications_config.enabled: 

309 self._oss_cluster_maint_notifications_handler = ( 

310 OSSMaintNotificationsHandler(self, self.maint_notifications_config) 

311 ) 

312 # Update connection kwargs for all future nodes connections 

313 self._update_connection_kwargs_for_maint_notifications( 

314 self._oss_cluster_maint_notifications_handler 

315 ) 

316 # Update existing nodes connections - they are created as part of the RedisCluster constructor 

317 for node in self.get_nodes(): 

318 if node.redis_connection is None: 

319 continue 

320 node.redis_connection.connection_pool.update_maint_notifications_config( 

321 self.maint_notifications_config, 

322 oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler, 

323 ) 

324 else: 

325 self._oss_cluster_maint_notifications_handler = None 

326 

327 def _update_connection_kwargs_for_maint_notifications( 

328 self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler 

329 ): 

330 """ 

331 Update the connection kwargs for all future connections. 

332 """ 

333 self.nodes_manager.connection_kwargs.update( 

334 { 

335 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler, 

336 } 

337 ) 

338 

339 

340class AbstractRedisCluster: 

341 RedisClusterRequestTTL = 16 

342 

343 PRIMARIES = "primaries" 

344 REPLICAS = "replicas" 

345 ALL_NODES = "all" 

346 RANDOM = "random" 

347 DEFAULT_NODE = "default-node" 

348 

349 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

350 

351 COMMAND_FLAGS = dict_merge( 

352 list_keys_to_dict( 

353 [ 

354 "ACL CAT", 

355 "ACL DELUSER", 

356 "ACL DRYRUN", 

357 "ACL GENPASS", 

358 "ACL GETUSER", 

359 "ACL HELP", 

360 "ACL LIST", 

361 "ACL LOG", 

362 "ACL LOAD", 

363 "ACL SAVE", 

364 "ACL SETUSER", 

365 "ACL USERS", 

366 "ACL WHOAMI", 

367 "AUTH", 

368 "CLIENT LIST", 

369 "CLIENT SETINFO", 

370 "CLIENT SETNAME", 

371 "CLIENT GETNAME", 

372 "CONFIG SET", 

373 "CONFIG REWRITE", 

374 "CONFIG RESETSTAT", 

375 "TIME", 

376 "PUBSUB CHANNELS", 

377 "PUBSUB NUMPAT", 

378 "PUBSUB NUMSUB", 

379 "PUBSUB SHARDCHANNELS", 

380 "PUBSUB SHARDNUMSUB", 

381 "PING", 

382 "INFO", 

383 "SHUTDOWN", 

384 "KEYS", 

385 "DBSIZE", 

386 "BGSAVE", 

387 "SLOWLOG GET", 

388 "SLOWLOG LEN", 

389 "SLOWLOG RESET", 

390 "WAIT", 

391 "WAITAOF", 

392 "SAVE", 

393 "MEMORY PURGE", 

394 "MEMORY MALLOC-STATS", 

395 "MEMORY STATS", 

396 "LASTSAVE", 

397 "CLIENT TRACKINGINFO", 

398 "CLIENT PAUSE", 

399 "CLIENT UNPAUSE", 

400 "CLIENT UNBLOCK", 

401 "CLIENT ID", 

402 "CLIENT REPLY", 

403 "CLIENT GETREDIR", 

404 "CLIENT INFO", 

405 "CLIENT KILL", 

406 "READONLY", 

407 "CLUSTER INFO", 

408 "CLUSTER MEET", 

409 "CLUSTER MYSHARDID", 

410 "CLUSTER NODES", 

411 "CLUSTER REPLICAS", 

412 "CLUSTER RESET", 

413 "CLUSTER SET-CONFIG-EPOCH", 

414 "CLUSTER SLOTS", 

415 "CLUSTER SHARDS", 

416 "CLUSTER COUNT-FAILURE-REPORTS", 

417 "CLUSTER KEYSLOT", 

418 "COMMAND", 

419 "COMMAND COUNT", 

420 "COMMAND LIST", 

421 "COMMAND GETKEYS", 

422 "CONFIG GET", 

423 "DEBUG", 

424 "RANDOMKEY", 

425 "READONLY", 

426 "READWRITE", 

427 "TIME", 

428 "TFUNCTION LOAD", 

429 "TFUNCTION DELETE", 

430 "TFUNCTION LIST", 

431 "TFCALL", 

432 "TFCALLASYNC", 

433 "LATENCY HISTORY", 

434 "LATENCY LATEST", 

435 "LATENCY RESET", 

436 "MODULE LIST", 

437 "MODULE LOAD", 

438 "MODULE UNLOAD", 

439 "MODULE LOADEX", 

440 ], 

441 DEFAULT_NODE, 

442 ), 

443 list_keys_to_dict( 

444 [ 

445 "FLUSHALL", 

446 "FLUSHDB", 

447 "FUNCTION DELETE", 

448 "FUNCTION FLUSH", 

449 "FUNCTION LIST", 

450 "FUNCTION LOAD", 

451 "FUNCTION RESTORE", 

452 "SCAN", 

453 "SCRIPT EXISTS", 

454 "SCRIPT FLUSH", 

455 "SCRIPT LOAD", 

456 ], 

457 PRIMARIES, 

458 ), 

459 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

460 list_keys_to_dict( 

461 [ 

462 "CLUSTER COUNTKEYSINSLOT", 

463 "CLUSTER DELSLOTS", 

464 "CLUSTER DELSLOTSRANGE", 

465 "CLUSTER GETKEYSINSLOT", 

466 "CLUSTER SETSLOT", 

467 ], 

468 SLOT_ID, 

469 ), 

470 ) 

471 

472 SEARCH_COMMANDS = ( 

473 [ 

474 "FT.CREATE", 

475 "FT.SEARCH", 

476 "FT.AGGREGATE", 

477 "FT.EXPLAIN", 

478 "FT.EXPLAINCLI", 

479 "FT,PROFILE", 

480 "FT.ALTER", 

481 "FT.DROPINDEX", 

482 "FT.ALIASADD", 

483 "FT.ALIASUPDATE", 

484 "FT.ALIASDEL", 

485 "FT.TAGVALS", 

486 "FT.SUGADD", 

487 "FT.SUGGET", 

488 "FT.SUGDEL", 

489 "FT.SUGLEN", 

490 "FT.SYNUPDATE", 

491 "FT.SYNDUMP", 

492 "FT.SPELLCHECK", 

493 "FT.DICTADD", 

494 "FT.DICTDEL", 

495 "FT.DICTDUMP", 

496 "FT.INFO", 

497 "FT._LIST", 

498 "FT.CONFIG", 

499 "FT.ADD", 

500 "FT.DEL", 

501 "FT.DROP", 

502 "FT.GET", 

503 "FT.MGET", 

504 "FT.SYNADD", 

505 ], 

506 ) 

507 

508 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

509 "CLUSTER SLOTS": parse_cluster_slots, 

510 "CLUSTER SHARDS": parse_cluster_shards, 

511 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

512 } 

513 

514 RESULT_CALLBACKS = dict_merge( 

515 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub), 

516 list_keys_to_dict( 

517 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

518 ), 

519 list_keys_to_dict( 

520 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result 

521 ), 

522 list_keys_to_dict( 

523 [ 

524 "PING", 

525 "CONFIG SET", 

526 "CONFIG REWRITE", 

527 "CONFIG RESETSTAT", 

528 "CLIENT SETNAME", 

529 "BGSAVE", 

530 "SLOWLOG RESET", 

531 "SAVE", 

532 "MEMORY PURGE", 

533 "CLIENT PAUSE", 

534 "CLIENT UNPAUSE", 

535 ], 

536 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

537 ), 

538 list_keys_to_dict( 

539 ["DBSIZE", "WAIT"], 

540 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

541 ), 

542 list_keys_to_dict( 

543 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

544 ), 

545 list_keys_to_dict(["SCAN"], parse_scan_result), 

546 list_keys_to_dict( 

547 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

548 ), 

549 list_keys_to_dict( 

550 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

551 ), 

552 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

553 ) 

554 

555 ERRORS_ALLOW_RETRY = ( 

556 ConnectionError, 

557 TimeoutError, 

558 ClusterDownError, 

559 SlotNotCoveredError, 

560 ) 

561 

562 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

563 """Replace the default cluster node. 

564 A random cluster node will be chosen if target_node isn't passed, and primaries 

565 will be prioritized. The default node will not be changed if there are no other 

566 nodes in the cluster. 

567 

568 Args: 

569 target_node (ClusterNode, optional): Target node to replace the default 

570 node. Defaults to None. 

571 """ 

572 if target_node: 

573 self.nodes_manager.default_node = target_node 

574 else: 

575 curr_node = self.get_default_node() 

576 primaries = [node for node in self.get_primaries() if node != curr_node] 

577 if primaries: 

578 # Choose a primary if the cluster contains different primaries 

579 self.nodes_manager.default_node = random.choice(primaries) 

580 else: 

581 # Otherwise, choose a primary if the cluster contains different primaries 

582 replicas = [node for node in self.get_replicas() if node != curr_node] 

583 if replicas: 

584 self.nodes_manager.default_node = random.choice(replicas) 

585 

586 

587class RedisCluster( 

588 AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands 

589): 

590 # Type discrimination marker for @overload self-type pattern 

591 _is_async_client: Literal[False] = False 

592 

593 @classmethod 

594 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": 

595 """ 

596 Return a Redis client object configured from the given URL 

597 

598 For example:: 

599 

600 redis://[[username]:[password]]@localhost:6379/0 

601 rediss://[[username]:[password]]@localhost:6379/0 

602 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

603 

604 Three URL schemes are supported: 

605 

606 - `redis://` creates a TCP socket connection. See more at: 

607 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

608 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

609 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

610 - ``unix://``: creates a Unix Domain Socket connection. 

611 

612 The username, password, hostname, path and all querystring values 

613 are passed through urllib.parse.unquote in order to replace any 

614 percent-encoded values with their corresponding characters. 

615 

616 There are several ways to specify a database number. The first value 

617 found will be used: 

618 

619 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

620 2. If using the redis:// or rediss:// schemes, the path argument 

621 of the url, e.g. redis://localhost/0 

622 3. A ``db`` keyword argument to this function. 

623 

624 If none of these options are specified, the default db=0 is used. 

625 

626 All querystring options are cast to their appropriate Python types. 

627 Boolean arguments can be specified with string values "True"/"False" 

628 or "Yes"/"No". Values that cannot be properly cast cause a 

629 ``ValueError`` to be raised. Once parsed, the querystring arguments 

630 and keyword arguments are passed to the ``ConnectionPool``'s 

631 class initializer. In the case of conflicting arguments, querystring 

632 arguments always win. 

633 

634 """ 

635 return cls(url=url, **kwargs) 

636 

637 @deprecated_args( 

638 args_to_warn=["read_from_replicas"], 

639 reason="Please configure the 'load_balancing_strategy' instead", 

640 version="5.3.0", 

641 ) 

642 @deprecated_args( 

643 args_to_warn=[ 

644 "cluster_error_retry_attempts", 

645 ], 

646 reason="Please configure the 'retry' object instead", 

647 version="6.0.0", 

648 ) 

649 def __init__( 

650 self, 

651 host: Optional[str] = None, 

652 port: int = 6379, 

653 startup_nodes: Optional[List["ClusterNode"]] = None, 

654 cluster_error_retry_attempts: int = 3, 

655 retry: Optional["Retry"] = None, 

656 require_full_coverage: bool = True, 

657 reinitialize_steps: int = 5, 

658 read_from_replicas: bool = False, 

659 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None, 

660 dynamic_startup_nodes: bool = True, 

661 url: Optional[str] = None, 

662 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

663 cache: Optional[CacheInterface] = None, 

664 cache_config: Optional[CacheConfig] = None, 

665 event_dispatcher: Optional[EventDispatcher] = None, 

666 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

667 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

668 **kwargs, 

669 ): 

670 """ 

671 Initialize a new RedisCluster client. 

672 

673 :param startup_nodes: 

674 List of nodes from which initial bootstrapping can be done 

675 :param host: 

676 Can be used to point to a startup node 

677 :param port: 

678 Can be used to point to a startup node 

679 :param require_full_coverage: 

680 When set to False (default value): the client will not require a 

681 full coverage of the slots. However, if not all slots are covered, 

682 and at least one node has 'cluster-require-full-coverage' set to 

683 'yes,' the server will throw a ClusterDownError for some key-based 

684 commands. See - 

685 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

686 When set to True: all slots must be covered to construct the 

687 cluster client. If not all slots are covered, RedisClusterException 

688 will be thrown. 

689 :param read_from_replicas: 

690 @deprecated - please use load_balancing_strategy instead 

691 Enable read from replicas in READONLY mode. You can read possibly 

692 stale data. 

693 When set to true, read commands will be assigned between the 

694 primary and its replications in a Round-Robin manner. 

695 :param load_balancing_strategy: 

696 Enable read from replicas in READONLY mode and defines the load balancing 

697 strategy that will be used for cluster node selection. 

698 The data read from replicas is eventually consistent with the data in primary nodes. 

699 :param dynamic_startup_nodes: 

700 Set the RedisCluster's startup nodes to all of the discovered nodes. 

701 If true (default value), the cluster's discovered nodes will be used to 

702 determine the cluster nodes-slots mapping in the next topology refresh. 

703 It will remove the initial passed startup nodes if their endpoints aren't 

704 listed in the CLUSTER SLOTS output. 

705 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

706 specific IP addresses, it is best to set it to false. 

707 :param cluster_error_retry_attempts: 

708 @deprecated - Please configure the 'retry' object instead 

709 In case 'retry' object is set - this argument is ignored! 

710 

711 Number of times to retry before raising an error when 

712 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or 

713 :class:`~.ClusterDownError` are encountered 

714 :param retry: 

715 A retry object that defines the retry strategy and the number of 

716 retries for the cluster client. 

717 In current implementation for the cluster client (starting form redis-py version 6.0.0) 

718 the retry object is not yet fully utilized, instead it is used just to determine 

719 the number of retries for the cluster client. 

720 In the future releases the retry object will be used to handle the cluster client retries! 

721 :param reinitialize_steps: 

722 Specifies the number of MOVED errors that need to occur before 

723 reinitializing the whole cluster topology. If a MOVED error occurs 

724 and the cluster does not need to be reinitialized on this current 

725 error handling, only the MOVED slot will be patched with the 

726 redirected node. 

727 To reinitialize the cluster on every MOVED error, set 

728 reinitialize_steps to 1. 

729 To avoid reinitializing the cluster on moved errors, set 

730 reinitialize_steps to 0. 

731 :param address_remap: 

732 An optional callable which, when provided with an internal network 

733 address of a node, e.g. a `(host, port)` tuple, will return the address 

734 where the node is reachable. This can be used to map the addresses at 

735 which the nodes _think_ they are, to addresses at which a client may 

736 reach them, such as when they sit behind a proxy. 

737 

738 :param maint_notifications_config: 

739 Configures the nodes connections to support maintenance notifications - see 

740 `redis.maint_notifications.MaintNotificationsConfig` for details. 

741 Only supported with RESP3. 

742 If not provided and protocol is RESP3, the maintenance notifications 

743 will be enabled by default (logic is included in the NodesManager 

744 initialization). 

745 :**kwargs: 

746 Extra arguments that will be sent into Redis instance when created 

747 (See Official redis-py doc for supported kwargs - the only limitation 

748 is that you can't provide 'retry' object as part of kwargs. 

749 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

750 Some kwargs are not supported and will raise a 

751 RedisClusterException: 

752 - db (Redis do not support database SELECT in cluster mode) 

753 

754 """ 

755 if startup_nodes is None: 

756 startup_nodes = [] 

757 

758 if "db" in kwargs: 

759 # Argument 'db' is not possible to use in cluster mode 

760 raise RedisClusterException( 

761 "Argument 'db' is not possible to use in cluster mode" 

762 ) 

763 

764 if "retry" in kwargs: 

765 # Argument 'retry' is not possible to be used in kwargs when in cluster mode 

766 # the kwargs are set to the lower level connections to the cluster nodes 

767 # and there we provide retry configuration without retries allowed. 

768 # The retries should be handled on cluster client level. 

769 raise RedisClusterException( 

770 "The 'retry' argument cannot be used in kwargs when running in cluster mode." 

771 ) 

772 

773 # Get the startup node/s 

774 from_url = False 

775 if url is not None: 

776 from_url = True 

777 url_options = parse_url(url) 

778 if "path" in url_options: 

779 raise RedisClusterException( 

780 "RedisCluster does not currently support Unix Domain " 

781 "Socket connections" 

782 ) 

783 if "db" in url_options and url_options["db"] != 0: 

784 # Argument 'db' is not possible to use in cluster mode 

785 raise RedisClusterException( 

786 "A ``db`` querystring option can only be 0 in cluster mode" 

787 ) 

788 kwargs.update(url_options) 

789 host = kwargs.get("host") 

790 port = kwargs.get("port", port) 

791 startup_nodes.append(ClusterNode(host, port)) 

792 elif host is not None and port is not None: 

793 startup_nodes.append(ClusterNode(host, port)) 

794 elif len(startup_nodes) == 0: 

795 # No startup node was provided 

796 raise RedisClusterException( 

797 "RedisCluster requires at least one node to discover the " 

798 "cluster. Please provide one of the followings:\n" 

799 "1. host and port, for example:\n" 

800 " RedisCluster(host='localhost', port=6379)\n" 

801 "2. list of startup nodes, for example:\n" 

802 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

803 " ClusterNode('localhost', 6378)])" 

804 ) 

805 # Update the connection arguments 

806 # Whenever a new connection is established, RedisCluster's on_connect 

807 # method should be run 

808 # If the user passed on_connect function we'll save it and run it 

809 # inside the RedisCluster.on_connect() function 

810 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

811 kwargs.update({"redis_connect_func": self.on_connect}) 

812 kwargs = cleanup_kwargs(**kwargs) 

813 if retry: 

814 self.retry = retry 

815 else: 

816 self.retry = Retry( 

817 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

818 retries=cluster_error_retry_attempts, 

819 ) 

820 

821 self.encoder = Encoder( 

822 kwargs.get("encoding", "utf-8"), 

823 kwargs.get("encoding_errors", "strict"), 

824 kwargs.get("decode_responses", False), 

825 ) 

826 protocol = kwargs.get("protocol", DEFAULT_RESP_VERSION) 

827 if (cache_config or cache) and not check_protocol_version(protocol, 3): 

828 raise RedisError("Client caching is only supported with RESP version 3") 

829 

830 if maint_notifications_config and not check_protocol_version(protocol, 3): 

831 raise RedisError( 

832 "Maintenance notifications are only supported with RESP version 3" 

833 ) 

834 if check_protocol_version(protocol, 3) and maint_notifications_config is None: 

835 maint_notifications_config = MaintNotificationsConfig() 

836 

837 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

838 self.node_flags = self.__class__.NODE_FLAGS.copy() 

839 self.read_from_replicas = read_from_replicas 

840 self.load_balancing_strategy = load_balancing_strategy 

841 self.reinitialize_counter = 0 

842 self.reinitialize_steps = reinitialize_steps 

843 if event_dispatcher is None: 

844 self._event_dispatcher = EventDispatcher() 

845 else: 

846 self._event_dispatcher = event_dispatcher 

847 self.startup_nodes = startup_nodes 

848 

849 self.nodes_manager = NodesManager( 

850 startup_nodes=startup_nodes, 

851 from_url=from_url, 

852 require_full_coverage=require_full_coverage, 

853 dynamic_startup_nodes=dynamic_startup_nodes, 

854 address_remap=address_remap, 

855 cache=cache, 

856 cache_config=cache_config, 

857 event_dispatcher=self._event_dispatcher, 

858 maint_notifications_config=maint_notifications_config, 

859 **kwargs, 

860 ) 

861 

862 self.cluster_response_callbacks = CaseInsensitiveDict( 

863 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

864 ) 

865 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

866 

867 # For backward compatibility, mapping from existing policies to new one 

868 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

869 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

870 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

871 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

872 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

873 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

874 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

875 } 

876 

877 self._policies_callback_mapping: dict[ 

878 Union[RequestPolicy, ResponsePolicy], Callable 

879 ] = { 

880 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

881 self.get_random_primary_or_all_nodes(command_name) 

882 ], 

883 RequestPolicy.DEFAULT_KEYED: lambda command, 

884 *args: self.get_nodes_from_slot(command, *args), 

885 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

886 RequestPolicy.ALL_SHARDS: self.get_primaries, 

887 RequestPolicy.ALL_NODES: self.get_nodes, 

888 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

889 RequestPolicy.MULTI_SHARD: lambda *args, 

890 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

891 RequestPolicy.SPECIAL: self.get_special_nodes, 

892 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

893 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

894 } 

895 

896 self._policy_resolver = policy_resolver 

897 self.commands_parser = CommandsParser(self) 

898 

899 # Node where FT.AGGREGATE command is executed. 

900 self._aggregate_nodes = None 

901 self._lock = threading.RLock() 

902 

903 MaintNotificationsAbstractRedisCluster.__init__( 

904 self, maint_notifications_config, **kwargs 

905 ) 

906 

907 def __enter__(self): 

908 return self 

909 

910 def __exit__(self, exc_type, exc_value, traceback): 

911 self.close() 

912 

913 def __del__(self): 

914 try: 

915 self.close() 

916 except Exception: 

917 pass 

918 

919 def disconnect_connection_pools(self): 

920 for node in self.get_nodes(): 

921 if node.redis_connection: 

922 try: 

923 node.redis_connection.connection_pool.disconnect() 

924 except OSError: 

925 # Client was already disconnected. do nothing 

926 pass 

927 

928 def on_connect(self, connection): 

929 """ 

930 Initialize the connection, authenticate and select a database and send 

931 READONLY if it is set during object initialization. 

932 """ 

933 connection.on_connect() 

934 

935 if self.read_from_replicas or self.load_balancing_strategy: 

936 # Sending READONLY command to server to configure connection as 

937 # readonly. Since each cluster node may change its server type due 

938 # to a failover, we should establish a READONLY connection 

939 # regardless of the server type. If this is a primary connection, 

940 # READONLY would not affect executing write commands. 

941 connection.send_command("READONLY") 

942 if str_if_bytes(connection.read_response()) != "OK": 

943 raise ConnectionError("READONLY command failed") 

944 

945 if self.user_on_connect_func is not None: 

946 self.user_on_connect_func(connection) 

947 

948 def get_redis_connection(self, node: "ClusterNode") -> Redis: 

949 if not node.redis_connection: 

950 with self._lock: 

951 if not node.redis_connection: 

952 self.nodes_manager.create_redis_connections([node]) 

953 return node.redis_connection 

954 

955 def get_node(self, host=None, port=None, node_name=None): 

956 return self.nodes_manager.get_node(host, port, node_name) 

957 

958 def get_primaries(self): 

959 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

960 

961 def get_replicas(self): 

962 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

963 

964 def get_random_node(self): 

965 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

966 

967 def get_random_primary_or_all_nodes(self, command_name): 

968 """ 

969 Returns random primary or all nodes depends on READONLY mode. 

970 """ 

971 if self.read_from_replicas and command_name in READ_COMMANDS: 

972 return self.get_random_node() 

973 

974 return self.get_random_primary_node() 

975 

976 def get_nodes(self): 

977 return list(self.nodes_manager.nodes_cache.values()) 

978 

979 def get_node_from_key(self, key, replica=False): 

980 """ 

981 Get the node that holds the key's slot. 

982 If replica set to True but the slot doesn't have any replicas, None is 

983 returned. 

984 """ 

985 slot = self.keyslot(key) 

986 slot_cache = self.nodes_manager.slots_cache.get(slot) 

987 if slot_cache is None or len(slot_cache) == 0: 

988 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

989 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

990 return None 

991 elif replica: 

992 node_idx = 1 

993 else: 

994 # primary 

995 node_idx = 0 

996 

997 return slot_cache[node_idx] 

998 

999 def get_default_node(self): 

1000 """ 

1001 Get the cluster's default node 

1002 """ 

1003 return self.nodes_manager.default_node 

1004 

1005 def get_nodes_from_slot(self, command: str, *args): 

1006 """ 

1007 Returns a list of nodes that hold the specified keys' slots. 

1008 """ 

1009 # get the node that holds the key's slot 

1010 slot = self.determine_slot(*args) 

1011 node = self.nodes_manager.get_node_from_slot( 

1012 slot, 

1013 self.read_from_replicas and command in READ_COMMANDS, 

1014 self.load_balancing_strategy if command in READ_COMMANDS else None, 

1015 ) 

1016 return [node] 

1017 

1018 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]: 

1019 """ 

1020 Splits the command with Multi-Shard policy, to the multiple commands 

1021 """ 

1022 keys = self._get_command_keys(*args) 

1023 commands = [] 

1024 

1025 for key in keys: 

1026 commands.append( 

1027 { 

1028 "args": (args[0], key), 

1029 "kwargs": kwargs, 

1030 } 

1031 ) 

1032 

1033 return commands 

1034 

1035 def get_special_nodes(self) -> Optional[list["ClusterNode"]]: 

1036 """ 

1037 Returns a list of nodes for commands with a special policy. 

1038 """ 

1039 if not self._aggregate_nodes: 

1040 raise RedisClusterException( 

1041 "Cannot execute FT.CURSOR commands without FT.AGGREGATE" 

1042 ) 

1043 

1044 return self._aggregate_nodes 

1045 

1046 def get_random_primary_node(self) -> "ClusterNode": 

1047 """ 

1048 Returns a random primary node 

1049 """ 

1050 return random.choice(self.get_primaries()) 

1051 

1052 def _evaluate_all_succeeded(self, res): 

1053 """ 

1054 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED 

1055 """ 

1056 first_successful_response = None 

1057 

1058 if isinstance(res, dict): 

1059 for key, value in res.items(): 

1060 if value: 

1061 if first_successful_response is None: 

1062 first_successful_response = {key: value} 

1063 else: 

1064 return {key: False} 

1065 else: 

1066 for response in res: 

1067 if response: 

1068 if first_successful_response is None: 

1069 # Dynamically resolve type 

1070 first_successful_response = type(response)(response) 

1071 else: 

1072 return type(response)(False) 

1073 

1074 return first_successful_response 

1075 

1076 def set_default_node(self, node): 

1077 """ 

1078 Set the default node of the cluster. 

1079 :param node: 'ClusterNode' 

1080 :return True if the default node was set, else False 

1081 """ 

1082 if node is None or self.get_node(node_name=node.name) is None: 

1083 return False 

1084 self.nodes_manager.default_node = node 

1085 return True 

1086 

1087 def set_retry(self, retry: Retry) -> None: 

1088 self.retry = retry 

1089 

1090 def monitor(self, target_node=None): 

1091 """ 

1092 Returns a Monitor object for the specified target node. 

1093 The default cluster node will be selected if no target node was 

1094 specified. 

1095 Monitor is useful for handling the MONITOR command to the redis server. 

1096 next_command() method returns one command from monitor 

1097 listen() method yields commands from monitor. 

1098 """ 

1099 if target_node is None: 

1100 target_node = self.get_default_node() 

1101 if target_node.redis_connection is None: 

1102 raise RedisClusterException( 

1103 f"Cluster Node {target_node.name} has no redis_connection" 

1104 ) 

1105 return target_node.redis_connection.monitor() 

1106 

1107 def pubsub(self, node=None, host=None, port=None, **kwargs): 

1108 """ 

1109 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

1110 connected to the specified node 

1111 """ 

1112 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

1113 

1114 def keyspace_notifications( 

1115 self, 

1116 key_prefix: Union[str, bytes, None] = None, 

1117 ignore_subscribe_messages: bool = True, 

1118 ) -> "ClusterKeyspaceNotifications": 

1119 """ 

1120 Return a :class:`~redis.keyspace_notifications.ClusterKeyspaceNotifications` 

1121 object for subscribing to keyspace and keyevent notifications across 

1122 all primary nodes in the cluster. 

1123 

1124 Note: Keyspace notifications must be enabled on all Redis cluster nodes 

1125 via the ``notify-keyspace-events`` configuration option. 

1126 

1127 Args: 

1128 key_prefix: Optional prefix to filter and strip from keys in 

1129 notifications. 

1130 ignore_subscribe_messages: If True, subscribe/unsubscribe 

1131 confirmations are not returned by 

1132 get_message/listen. 

1133 """ 

1134 from redis.keyspace_notifications import ClusterKeyspaceNotifications 

1135 

1136 return ClusterKeyspaceNotifications( 

1137 self, 

1138 key_prefix=key_prefix, 

1139 ignore_subscribe_messages=ignore_subscribe_messages, 

1140 ) 

1141 

1142 def pipeline(self, transaction=None, shard_hint=None): 

1143 """ 

1144 Cluster impl: 

1145 Pipelines do not work in cluster mode the same way they 

1146 do in normal mode. Create a clone of this object so 

1147 that simulating pipelines will work correctly. Each 

1148 command will be called directly when used and 

1149 when calling execute() will only return the result stack. 

1150 """ 

1151 if shard_hint: 

1152 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

1153 

1154 return ClusterPipeline( 

1155 nodes_manager=self.nodes_manager, 

1156 commands_parser=self.commands_parser, 

1157 startup_nodes=self.nodes_manager.startup_nodes, 

1158 result_callbacks=self.result_callbacks, 

1159 cluster_response_callbacks=self.cluster_response_callbacks, 

1160 read_from_replicas=self.read_from_replicas, 

1161 load_balancing_strategy=self.load_balancing_strategy, 

1162 reinitialize_steps=self.reinitialize_steps, 

1163 retry=self.retry, 

1164 lock=self._lock, 

1165 transaction=transaction, 

1166 event_dispatcher=self._event_dispatcher, 

1167 ) 

1168 

1169 def lock( 

1170 self, 

1171 name, 

1172 timeout=None, 

1173 sleep=0.1, 

1174 blocking=True, 

1175 blocking_timeout=None, 

1176 lock_class=None, 

1177 thread_local=True, 

1178 raise_on_release_error: bool = True, 

1179 ): 

1180 """ 

1181 Return a new Lock object using key ``name`` that mimics 

1182 the behavior of threading.Lock. 

1183 

1184 If specified, ``timeout`` indicates a maximum life for the lock. 

1185 By default, it will remain locked until release() is called. 

1186 

1187 ``sleep`` indicates the amount of time to sleep per loop iteration 

1188 when the lock is in blocking mode and another client is currently 

1189 holding the lock. 

1190 

1191 ``blocking`` indicates whether calling ``acquire`` should block until 

1192 the lock has been acquired or to fail immediately, causing ``acquire`` 

1193 to return False and the lock not being acquired. Defaults to True. 

1194 Note this value can be overridden by passing a ``blocking`` 

1195 argument to ``acquire``. 

1196 

1197 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

1198 spend trying to acquire the lock. A value of ``None`` indicates 

1199 continue trying forever. ``blocking_timeout`` can be specified as a 

1200 float or integer, both representing the number of seconds to wait. 

1201 

1202 ``lock_class`` forces the specified lock implementation. Note that as 

1203 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

1204 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

1205 you have created your own custom lock class. 

1206 

1207 ``thread_local`` indicates whether the lock token is placed in 

1208 thread-local storage. By default, the token is placed in thread local 

1209 storage so that a thread only sees its token, not a token set by 

1210 another thread. Consider the following timeline: 

1211 

1212 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

1213 thread-1 sets the token to "abc" 

1214 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

1215 Lock instance. 

1216 time: 5, thread-1 has not yet completed. redis expires the lock 

1217 key. 

1218 time: 5, thread-2 acquired `my-lock` now that it's available. 

1219 thread-2 sets the token to "xyz" 

1220 time: 6, thread-1 finishes its work and calls release(). if the 

1221 token is *not* stored in thread local storage, then 

1222 thread-1 would see the token value as "xyz" and would be 

1223 able to successfully release the thread-2's lock. 

1224 

1225 ``raise_on_release_error`` indicates whether to raise an exception when 

1226 the lock is no longer owned when exiting the context manager. By default, 

1227 this is True, meaning an exception will be raised. If False, the warning 

1228 will be logged and the exception will be suppressed. 

1229 

1230 In some use cases it's necessary to disable thread local storage. For 

1231 example, if you have code where one thread acquires a lock and passes 

1232 that lock instance to a worker thread to release later. If thread 

1233 local storage isn't disabled in this case, the worker thread won't see 

1234 the token set by the thread that acquired the lock. Our assumption 

1235 is that these cases aren't common and as such default to using 

1236 thread local storage.""" 

1237 if lock_class is None: 

1238 lock_class = Lock 

1239 return lock_class( 

1240 self, 

1241 name, 

1242 timeout=timeout, 

1243 sleep=sleep, 

1244 blocking=blocking, 

1245 blocking_timeout=blocking_timeout, 

1246 thread_local=thread_local, 

1247 raise_on_release_error=raise_on_release_error, 

1248 ) 

1249 

1250 def set_response_callback(self, command, callback): 

1251 """Set a custom Response Callback""" 

1252 self.cluster_response_callbacks[command] = callback 

1253 

1254 def _determine_nodes( 

1255 self, *args, request_policy: RequestPolicy, **kwargs 

1256 ) -> List["ClusterNode"]: 

1257 """ 

1258 Determines a nodes the command should be executed on. 

1259 """ 

1260 command = args[0].upper() 

1261 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1262 command = f"{args[0]} {args[1]}".upper() 

1263 

1264 nodes_flag = kwargs.pop("nodes_flag", None) 

1265 if nodes_flag is not None: 

1266 # nodes flag passed by the user 

1267 command_flag = nodes_flag 

1268 else: 

1269 # get the nodes group for this command if it was predefined 

1270 command_flag = self.command_flags.get(command) 

1271 

1272 if command_flag in self._command_flags_mapping: 

1273 request_policy = self._command_flags_mapping[command_flag] 

1274 

1275 policy_callback = self._policies_callback_mapping[request_policy] 

1276 

1277 if request_policy == RequestPolicy.DEFAULT_KEYED: 

1278 nodes = policy_callback(command, *args) 

1279 elif request_policy == RequestPolicy.MULTI_SHARD: 

1280 nodes = policy_callback(*args, **kwargs) 

1281 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

1282 nodes = policy_callback(args[0]) 

1283 else: 

1284 nodes = policy_callback() 

1285 

1286 if args[0].lower() == "ft.aggregate": 

1287 self._aggregate_nodes = nodes 

1288 

1289 return nodes 

1290 

1291 def _should_reinitialized(self): 

1292 # To reinitialize the cluster on every MOVED error, 

1293 # set reinitialize_steps to 1. 

1294 # To avoid reinitializing the cluster on moved errors, set 

1295 # reinitialize_steps to 0. 

1296 if self.reinitialize_steps == 0: 

1297 return False 

1298 else: 

1299 return self.reinitialize_counter % self.reinitialize_steps == 0 

1300 

1301 def keyslot(self, key): 

1302 """ 

1303 Calculate keyslot for a given key. 

1304 See Keys distribution model in https://redis.io/topics/cluster-spec 

1305 """ 

1306 k = self.encoder.encode(key) 

1307 return key_slot(k) 

1308 

1309 def _get_command_keys(self, *args): 

1310 """ 

1311 Get the keys in the command. If the command has no keys in in, None is 

1312 returned. 

1313 

1314 NOTE: Due to a bug in redis<7.0, this function does not work properly 

1315 for EVAL or EVALSHA when the `numkeys` arg is 0. 

1316 - issue: https://github.com/redis/redis/issues/9493 

1317 - fix: https://github.com/redis/redis/pull/9733 

1318 

1319 So, don't use this function with EVAL or EVALSHA. 

1320 """ 

1321 redis_conn = self.get_default_node().redis_connection 

1322 return self.commands_parser.get_keys(redis_conn, *args) 

1323 

1324 def determine_slot(self, *args) -> Optional[int]: 

1325 """ 

1326 Figure out what slot to use based on args. 

1327 

1328 Raises a RedisClusterException if there's a missing key and we can't 

1329 determine what slots to map the command to; or, if the keys don't 

1330 all map to the same key slot. 

1331 """ 

1332 command = args[0] 

1333 if self.command_flags.get(command) == SLOT_ID: 

1334 # The command contains the slot ID 

1335 return args[1] 

1336 

1337 # Get the keys in the command 

1338 

1339 # CLIENT TRACKING is a special case. 

1340 # It doesn't have any keys, it needs to be sent to the provided nodes 

1341 # By default it will be sent to all nodes. 

1342 if command.upper() == "CLIENT TRACKING": 

1343 return None 

1344 

1345 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

1346 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

1347 # where `self._get_command_keys()` fails anyway. So, we special case 

1348 # EVAL/EVALSHA. 

1349 if command.upper() in ("EVAL", "EVALSHA"): 

1350 # command syntax: EVAL "script body" num_keys ... 

1351 if len(args) <= 2: 

1352 raise RedisClusterException(f"Invalid args in command: {args}") 

1353 num_actual_keys = int(args[2]) 

1354 eval_keys = args[3 : 3 + num_actual_keys] 

1355 # if there are 0 keys, that means the script can be run on any node 

1356 # so we can just return a random slot 

1357 if len(eval_keys) == 0: 

1358 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1359 keys = eval_keys 

1360 else: 

1361 keys = self._get_command_keys(*args) 

1362 if keys is None or len(keys) == 0: 

1363 # FCALL can call a function with 0 keys, that means the function 

1364 # can be run on any node so we can just return a random slot 

1365 if command.upper() in ("FCALL", "FCALL_RO"): 

1366 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1367 raise RedisClusterException( 

1368 "No way to dispatch this command to Redis Cluster. " 

1369 "Missing key.\nYou can execute the command by specifying " 

1370 f"target nodes.\nCommand: {args}" 

1371 ) 

1372 

1373 # single key command 

1374 if len(keys) == 1: 

1375 return self.keyslot(keys[0]) 

1376 

1377 # multi-key command; we need to make sure all keys are mapped to 

1378 # the same slot 

1379 slots = {self.keyslot(key) for key in keys} 

1380 if len(slots) != 1: 

1381 raise RedisClusterException( 

1382 f"{command} - all keys must map to the same key slot" 

1383 ) 

1384 

1385 return slots.pop() 

1386 

1387 def get_encoder(self): 

1388 """ 

1389 Get the connections' encoder 

1390 """ 

1391 return self.encoder 

1392 

1393 def get_connection_kwargs(self): 

1394 """ 

1395 Get the connections' key-word arguments 

1396 """ 

1397 return self.nodes_manager.connection_kwargs 

1398 

1399 def _is_nodes_flag(self, target_nodes): 

1400 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1401 

1402 def _parse_target_nodes(self, target_nodes): 

1403 if isinstance(target_nodes, list): 

1404 nodes = target_nodes 

1405 elif isinstance(target_nodes, ClusterNode): 

1406 # Supports passing a single ClusterNode as a variable 

1407 nodes = [target_nodes] 

1408 elif isinstance(target_nodes, dict): 

1409 # Supports dictionaries of the format {node_name: node}. 

1410 # It enables to execute commands with multi nodes as follows: 

1411 # rc.cluster_save_config(rc.get_primaries()) 

1412 nodes = target_nodes.values() 

1413 else: 

1414 raise TypeError( 

1415 "target_nodes type can be one of the following: " 

1416 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1417 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1418 f"The passed type is {type(target_nodes)}" 

1419 ) 

1420 return nodes 

1421 

1422 def execute_command(self, *args, **kwargs): 

1423 return self._internal_execute_command(*args, **kwargs) 

1424 

1425 def _internal_execute_command(self, *args, **kwargs): 

1426 """ 

1427 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1428 

1429 It will try the number of times specified by the retries property from 

1430 config option "self.retry" which defaults to 3 unless manually 

1431 configured. 

1432 

1433 If it reaches the number of times, the command will raise the exception 

1434 

1435 Key argument :target_nodes: can be passed with the following types: 

1436 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1437 ClusterNode 

1438 list<ClusterNode> 

1439 dict<Any, ClusterNode> 

1440 """ 

1441 target_nodes_specified = False 

1442 is_default_node = False 

1443 target_nodes = None 

1444 passed_targets = kwargs.pop("target_nodes", None) 

1445 command_policies = self._policy_resolver.resolve(args[0].lower()) 

1446 

1447 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1448 target_nodes = self._parse_target_nodes(passed_targets) 

1449 target_nodes_specified = True 

1450 

1451 if not command_policies and not target_nodes_specified: 

1452 command = args[0].upper() 

1453 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1454 command = f"{args[0]} {args[1]}".upper() 

1455 

1456 # We only could resolve key properties if command is not 

1457 # in a list of pre-defined request policies 

1458 command_flag = self.command_flags.get(command) 

1459 if not command_flag: 

1460 # Fallback to default policy 

1461 if not self.get_default_node(): 

1462 slot = None 

1463 else: 

1464 slot = self.determine_slot(*args) 

1465 if slot is None: 

1466 command_policies = CommandPolicies() 

1467 else: 

1468 command_policies = CommandPolicies( 

1469 request_policy=RequestPolicy.DEFAULT_KEYED, 

1470 response_policy=ResponsePolicy.DEFAULT_KEYED, 

1471 ) 

1472 else: 

1473 if command_flag in self._command_flags_mapping: 

1474 command_policies = CommandPolicies( 

1475 request_policy=self._command_flags_mapping[command_flag] 

1476 ) 

1477 else: 

1478 command_policies = CommandPolicies() 

1479 elif not command_policies and target_nodes_specified: 

1480 command_policies = CommandPolicies() 

1481 

1482 # If an error that allows retrying was thrown, the nodes and slots 

1483 # cache were reinitialized. We will retry executing the command with 

1484 # the updated cluster setup only when the target nodes can be 

1485 # determined again with the new cache tables. Therefore, when target 

1486 # nodes were passed to this function, we cannot retry the command 

1487 # execution since the nodes may not be valid anymore after the tables 

1488 # were reinitialized. So in case of passed target nodes, 

1489 # retry_attempts will be set to 0. 

1490 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries() 

1491 # Add one for the first execution 

1492 execute_attempts = 1 + retry_attempts 

1493 failure_count = 0 

1494 

1495 # Start timing for observability 

1496 start_time = time.monotonic() 

1497 

1498 for _ in range(execute_attempts): 

1499 try: 

1500 res = {} 

1501 if not target_nodes_specified: 

1502 # Determine the nodes to execute the command on 

1503 target_nodes = self._determine_nodes( 

1504 *args, 

1505 request_policy=command_policies.request_policy, 

1506 nodes_flag=passed_targets, 

1507 ) 

1508 

1509 if not target_nodes: 

1510 raise RedisClusterException( 

1511 f"No targets were found to execute {args} command on" 

1512 ) 

1513 if ( 

1514 len(target_nodes) == 1 

1515 and target_nodes[0] == self.get_default_node() 

1516 ): 

1517 is_default_node = True 

1518 for node in target_nodes: 

1519 res[node.name] = self._execute_command(node, *args, **kwargs) 

1520 

1521 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED: 

1522 break 

1523 

1524 # Return the processed result 

1525 return self._process_result( 

1526 args[0], 

1527 res, 

1528 response_policy=command_policies.response_policy, 

1529 **kwargs, 

1530 ) 

1531 except Exception as e: 

1532 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1533 if is_default_node: 

1534 # Replace the default cluster node 

1535 self.replace_default_node() 

1536 # The nodes and slots cache were reinitialized. 

1537 # Try again with the new cluster setup. 

1538 retry_attempts -= 1 

1539 failure_count += 1 

1540 

1541 if hasattr(e, "connection"): 

1542 self._record_command_metric( 

1543 command_name=args[0], 

1544 duration_seconds=time.monotonic() - start_time, 

1545 connection=e.connection, 

1546 error=e, 

1547 ) 

1548 

1549 self._record_error_metric( 

1550 error=e, 

1551 connection=e.connection, 

1552 retry_attempts=failure_count, 

1553 ) 

1554 continue 

1555 else: 

1556 # raise the exception 

1557 if hasattr(e, "connection"): 

1558 self._record_error_metric( 

1559 error=e, 

1560 connection=e.connection, 

1561 retry_attempts=failure_count, 

1562 is_internal=False, 

1563 ) 

1564 raise e 

1565 

1566 def _execute_command(self, target_node, *args, **kwargs): 

1567 """ 

1568 Send a command to a node in the cluster 

1569 """ 

1570 command = args[0] 

1571 redis_node = None 

1572 connection = None 

1573 redirect_addr = None 

1574 asking = False 

1575 moved = False 

1576 ttl = int(self.RedisClusterRequestTTL) 

1577 

1578 # Start timing for observability 

1579 start_time = time.monotonic() 

1580 

1581 while ttl > 0: 

1582 ttl -= 1 

1583 try: 

1584 if asking: 

1585 target_node = self.get_node(node_name=redirect_addr) 

1586 elif moved: 

1587 # MOVED occurred and the slots cache was updated, 

1588 # refresh the target node 

1589 slot = self.determine_slot(*args) 

1590 target_node = self.nodes_manager.get_node_from_slot( 

1591 slot, 

1592 self.read_from_replicas and command in READ_COMMANDS, 

1593 self.load_balancing_strategy 

1594 if command in READ_COMMANDS 

1595 else None, 

1596 ) 

1597 moved = False 

1598 

1599 redis_node = self.get_redis_connection(target_node) 

1600 connection = get_connection(redis_node) 

1601 if asking: 

1602 connection.send_command("ASKING") 

1603 redis_node.parse_response(connection, "ASKING", **kwargs) 

1604 asking = False 

1605 connection.send_command(*args, **kwargs) 

1606 response = redis_node.parse_response(connection, command, **kwargs) 

1607 

1608 # Remove keys entry, it needs only for cache. 

1609 kwargs.pop("keys", None) 

1610 

1611 if command in self.cluster_response_callbacks: 

1612 response = self.cluster_response_callbacks[command]( 

1613 response, **kwargs 

1614 ) 

1615 

1616 self._record_command_metric( 

1617 command_name=command, 

1618 duration_seconds=time.monotonic() - start_time, 

1619 connection=connection, 

1620 ) 

1621 return response 

1622 except AuthenticationError as e: 

1623 e.connection = connection if connection is not None else target_node 

1624 self._record_command_metric( 

1625 command_name=command, 

1626 duration_seconds=time.monotonic() - start_time, 

1627 connection=e.connection, 

1628 error=e, 

1629 ) 

1630 raise 

1631 except MaxConnectionsError as e: 

1632 # MaxConnectionsError indicates client-side resource exhaustion 

1633 # (too many connections in the pool), not a node failure. 

1634 # Don't treat this as a node failure - just re-raise the error 

1635 # without reinitializing the cluster. 

1636 # The connection in the error is used to report the metrics based on host and port info 

1637 # so we use the target node object which contains the host and port info 

1638 # because we did not get the connection yet 

1639 e.connection = target_node 

1640 self._record_command_metric( 

1641 command_name=command, 

1642 duration_seconds=time.monotonic() - start_time, 

1643 connection=e.connection, 

1644 error=e, 

1645 ) 

1646 raise 

1647 except (ConnectionError, TimeoutError) as e: 

1648 if is_debug_log_enabled(): 

1649 socket_address = self._extracts_socket_address(connection) 

1650 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1651 logger.debug( 

1652 f"{type(e).__name__} received for command {args_log_str}, on node {target_node.name}, " 

1653 f"and connection: {connection} using local socket address: {socket_address}, error: {e}" 

1654 ) 

1655 # this is used to report the metrics based on host and port info 

1656 e.connection = connection if connection else target_node 

1657 

1658 # ConnectionError can also be raised if we couldn't get a 

1659 # connection from the pool before timing out, so check that 

1660 # this is an actual connection before attempting to disconnect. 

1661 if connection is not None: 

1662 connection.disconnect() 

1663 

1664 # Instead of setting to None, properly handle the pool 

1665 # Get the pool safely - redis_connection could be set to None 

1666 # by another thread between the check and access 

1667 redis_conn = target_node.redis_connection 

1668 if redis_conn is not None: 

1669 pool = redis_conn.connection_pool 

1670 if pool is not None: 

1671 with pool._lock: 

1672 # take care for the active connections in the pool 

1673 pool.update_active_connections_for_reconnect() 

1674 # disconnect all free connections 

1675 pool.disconnect_free_connections() 

1676 

1677 # Move the failed node to the end of the cached nodes list 

1678 self.nodes_manager.move_node_to_end_of_cached_nodes(target_node.name) 

1679 

1680 # DON'T set redis_connection = None - keep the pool for reuse 

1681 self.nodes_manager.initialize() 

1682 self._record_command_metric( 

1683 command_name=command, 

1684 duration_seconds=time.monotonic() - start_time, 

1685 connection=e.connection, 

1686 error=e, 

1687 ) 

1688 raise e 

1689 except MovedError as e: 

1690 if is_debug_log_enabled(): 

1691 socket_address = self._extracts_socket_address(connection) 

1692 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1693 logger.debug( 

1694 f"MOVED error received for command {args_log_str}, on node {target_node.name}, " 

1695 f"and connection: {connection} using local socket address: {socket_address}, error: {e}" 

1696 ) 

1697 # First, we will try to patch the slots/nodes cache with the 

1698 # redirected node output and try again. If MovedError exceeds 

1699 # 'reinitialize_steps' number of times, we will force 

1700 # reinitializing the tables, and then try again. 

1701 # 'reinitialize_steps' counter will increase faster when 

1702 # the same client object is shared between multiple threads. To 

1703 # reduce the frequency you can set this variable in the 

1704 # RedisCluster constructor. 

1705 self.reinitialize_counter += 1 

1706 if self._should_reinitialized(): 

1707 # during this call all connections are closed or marked for disconnect, 

1708 # so we don't need to disconnect the changed node's connections 

1709 self.nodes_manager.initialize( 

1710 additional_startup_nodes_info=[(e.host, e.port)] 

1711 ) 

1712 # Reset the counter 

1713 self.reinitialize_counter = 0 

1714 else: 

1715 self.nodes_manager.move_slot(e) 

1716 moved = True 

1717 self._record_command_metric( 

1718 command_name=command, 

1719 duration_seconds=time.monotonic() - start_time, 

1720 connection=connection, 

1721 error=e, 

1722 ) 

1723 self._record_error_metric( 

1724 error=e, 

1725 connection=connection, 

1726 ) 

1727 except TryAgainError as e: 

1728 if is_debug_log_enabled(): 

1729 socket_address = self._extracts_socket_address(connection) 

1730 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1731 logger.debug( 

1732 f"TRYAGAIN error received for command {args_log_str}, on node {target_node.name}, " 

1733 f"and connection: {connection} using local socket address: {socket_address}" 

1734 ) 

1735 if ttl < self.RedisClusterRequestTTL / 2: 

1736 time.sleep(0.05) 

1737 

1738 self._record_command_metric( 

1739 command_name=command, 

1740 duration_seconds=time.monotonic() - start_time, 

1741 connection=connection, 

1742 error=e, 

1743 ) 

1744 self._record_error_metric( 

1745 error=e, 

1746 connection=connection, 

1747 ) 

1748 except AskError as e: 

1749 if is_debug_log_enabled(): 

1750 socket_address = self._extracts_socket_address(connection) 

1751 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1752 logger.debug( 

1753 f"ASK error received for command {args_log_str}, on node {target_node.name}, " 

1754 f"and connection: {connection} using local socket address: {socket_address}, error: {e}" 

1755 ) 

1756 redirect_addr = get_node_name(host=e.host, port=e.port) 

1757 asking = True 

1758 

1759 self._record_command_metric( 

1760 command_name=command, 

1761 duration_seconds=time.monotonic() - start_time, 

1762 connection=connection, 

1763 error=e, 

1764 ) 

1765 self._record_error_metric( 

1766 error=e, 

1767 connection=connection, 

1768 ) 

1769 except (ClusterDownError, SlotNotCoveredError) as e: 

1770 # ClusterDownError can occur during a failover and to get 

1771 # self-healed, we will try to reinitialize the cluster layout 

1772 # and retry executing the command 

1773 

1774 # SlotNotCoveredError can occur when the cluster is not fully 

1775 # initialized or can be temporary issue. 

1776 # We will try to reinitialize the cluster topology 

1777 # and retry executing the command 

1778 

1779 time.sleep(0.25) 

1780 self.nodes_manager.initialize() 

1781 

1782 # if we have a connection, use it, otherwise use the target node 

1783 # object which contains the host and port info 

1784 # this is used to report the metrics based on host and port info 

1785 e.connection = connection if connection else target_node 

1786 self._record_command_metric( 

1787 command_name=command, 

1788 duration_seconds=time.monotonic() - start_time, 

1789 connection=e.connection, 

1790 error=e, 

1791 ) 

1792 raise 

1793 except ResponseError as e: 

1794 # this is used to report the metrics based on host and port info 

1795 # ResponseError typically happens after get_connection() succeeds, 

1796 # so connection should be available 

1797 e.connection = connection if connection else target_node 

1798 self._record_command_metric( 

1799 command_name=command, 

1800 duration_seconds=time.monotonic() - start_time, 

1801 connection=e.connection, 

1802 error=e, 

1803 ) 

1804 raise 

1805 except Exception as e: 

1806 if connection: 

1807 connection.disconnect() 

1808 

1809 # if we have a connection, use it, otherwise use the target node 

1810 # object which contains the host and port info 

1811 # this is used to report the metrics based on host and port info 

1812 e.connection = connection if connection else target_node 

1813 self._record_command_metric( 

1814 command_name=command, 

1815 duration_seconds=time.monotonic() - start_time, 

1816 connection=e.connection, 

1817 error=e, 

1818 ) 

1819 raise e 

1820 finally: 

1821 if connection is not None: 

1822 redis_node.connection_pool.release(connection) 

1823 

1824 e = ClusterError("TTL exhausted.") 

1825 # In this case we should have an active connection. 

1826 # If we are here, we have received many MOVED or ASK errors and finally exhausted the TTL. 

1827 # This means that we used an active connection to read from the socket. 

1828 # This is used to report metrics based on the host and port information. 

1829 e.connection = connection 

1830 self._record_command_metric( 

1831 command_name=command, 

1832 duration_seconds=time.monotonic() - start_time, 

1833 connection=connection, 

1834 error=e, 

1835 ) 

1836 raise e 

1837 

1838 def _record_command_metric( 

1839 self, 

1840 command_name: str, 

1841 duration_seconds: float, 

1842 connection: Connection, 

1843 error=None, 

1844 ): 

1845 """ 

1846 Records operation duration metric directly. 

1847 """ 

1848 host = connection.host if connection else "unknown" 

1849 port = connection.port if connection else 0 

1850 db = str(connection.db) if connection and hasattr(connection, "db") else "0" 

1851 

1852 record_operation_duration( 

1853 command_name=command_name, 

1854 duration_seconds=duration_seconds, 

1855 server_address=host, 

1856 server_port=port, 

1857 db_namespace=db, 

1858 error=error, 

1859 ) 

1860 

1861 def _record_error_metric( 

1862 self, 

1863 error: Exception, 

1864 connection: Connection, 

1865 is_internal: bool = True, 

1866 retry_attempts: Optional[int] = None, 

1867 ): 

1868 """ 

1869 Records error count metric directly. 

1870 """ 

1871 record_error_count( 

1872 server_address=connection.host, 

1873 server_port=connection.port, 

1874 network_peer_address=connection.host, 

1875 network_peer_port=connection.port, 

1876 error_type=error, 

1877 retry_attempts=retry_attempts if retry_attempts is not None else 0, 

1878 is_internal=is_internal, 

1879 ) 

1880 

1881 def _extracts_socket_address( 

1882 self, connection: Optional[Connection] 

1883 ) -> Optional[int]: 

1884 if connection is None: 

1885 return None 

1886 try: 

1887 socket_address = ( 

1888 connection._sock.getsockname() if connection._sock else None 

1889 ) 

1890 socket_address = socket_address[1] if socket_address else None 

1891 except (AttributeError, OSError): 

1892 pass 

1893 return socket_address 

1894 

1895 def close(self) -> None: 

1896 try: 

1897 with self._lock: 

1898 if self.nodes_manager: 

1899 self.nodes_manager.close() 

1900 except AttributeError: 

1901 # RedisCluster's __init__ can fail before nodes_manager is set 

1902 pass 

1903 

1904 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs): 

1905 """ 

1906 Process the result of the executed command. 

1907 The function would return a dict or a single value. 

1908 

1909 :type command: str 

1910 :type res: dict 

1911 

1912 `res` should be in the following format: 

1913 Dict<node_name, command_result> 

1914 """ 

1915 if command in self.result_callbacks: 

1916 res = self.result_callbacks[command](command, res, **kwargs) 

1917 elif len(res) == 1: 

1918 # When we execute the command on a single node, we can 

1919 # remove the dictionary and return a single response 

1920 res = list(res.values())[0] 

1921 

1922 return self._policies_callback_mapping[response_policy](res) 

1923 

1924 def load_external_module(self, funcname, func): 

1925 """ 

1926 This function can be used to add externally defined redis modules, 

1927 and their namespaces to the redis client. 

1928 

1929 ``funcname`` - A string containing the name of the function to create 

1930 ``func`` - The function, being added to this class. 

1931 """ 

1932 setattr(self, funcname, func) 

1933 

1934 def transaction(self, func, *watches, **kwargs): 

1935 """ 

1936 Convenience method for executing the callable `func` as a transaction 

1937 while watching all keys specified in `watches`. The 'func' callable 

1938 should expect a single argument which is a Pipeline object. 

1939 """ 

1940 shard_hint = kwargs.pop("shard_hint", None) 

1941 value_from_callable = kwargs.pop("value_from_callable", False) 

1942 watch_delay = kwargs.pop("watch_delay", None) 

1943 with self.pipeline(True, shard_hint) as pipe: 

1944 while True: 

1945 try: 

1946 if watches: 

1947 pipe.watch(*watches) 

1948 func_value = func(pipe) 

1949 exec_value = pipe.execute() 

1950 return func_value if value_from_callable else exec_value 

1951 except WatchError: 

1952 if watch_delay is not None and watch_delay > 0: 

1953 time.sleep(watch_delay) 

1954 continue 

1955 

1956 

1957class ClusterNode: 

1958 def __init__(self, host, port, server_type=None, redis_connection=None): 

1959 if host == "localhost": 

1960 host = socket.gethostbyname(host) 

1961 

1962 self.host = host 

1963 self.port = port 

1964 self.name = get_node_name(host, port) 

1965 self.server_type = server_type 

1966 self.redis_connection = redis_connection 

1967 

1968 def __repr__(self): 

1969 return ( 

1970 f"[host={self.host}," 

1971 f"port={self.port}," 

1972 f"name={self.name}," 

1973 f"server_type={self.server_type}," 

1974 f"redis_connection={self.redis_connection}]" 

1975 ) 

1976 

1977 def __eq__(self, obj): 

1978 return isinstance(obj, ClusterNode) and obj.name == self.name 

1979 

1980 def __hash__(self): 

1981 return hash(self.name) 

1982 

1983 

1984class LoadBalancingStrategy(Enum): 

1985 ROUND_ROBIN = "round_robin" 

1986 ROUND_ROBIN_REPLICAS = "round_robin_replicas" 

1987 RANDOM = "random" 

1988 RANDOM_REPLICA = "random_replica" 

1989 

1990 

1991class LoadBalancer: 

1992 """ 

1993 Round-Robin Load Balancing 

1994 """ 

1995 

1996 def __init__(self, start_index: int = 0) -> None: 

1997 self.primary_to_idx: dict[str, int] = {} 

1998 self.start_index: int = start_index 

1999 self._lock: threading.Lock = threading.Lock() 

2000 

2001 def get_server_index( 

2002 self, 

2003 primary: str, 

2004 list_size: int, 

2005 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN, 

2006 ) -> int: 

2007 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA: 

2008 return self._get_random_server_index( 

2009 list_size, 

2010 replicas_only=True, 

2011 ) 

2012 elif load_balancing_strategy == LoadBalancingStrategy.RANDOM: 

2013 return self._get_random_server_index( 

2014 list_size, 

2015 replicas_only=False, 

2016 ) 

2017 else: 

2018 return self._get_round_robin_index( 

2019 primary, 

2020 list_size, 

2021 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, 

2022 ) 

2023 

2024 def reset(self) -> None: 

2025 with self._lock: 

2026 self.primary_to_idx.clear() 

2027 

2028 def _get_random_server_index(self, list_size: int, replicas_only: bool) -> int: 

2029 return random.randint(1 if replicas_only else 0, list_size - 1) 

2030 

2031 def _get_round_robin_index( 

2032 self, primary: str, list_size: int, replicas_only: bool 

2033 ) -> int: 

2034 with self._lock: 

2035 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

2036 if replicas_only and server_index == 0: 

2037 # skip the primary node index 

2038 server_index = 1 

2039 # Update the index for the next round 

2040 self.primary_to_idx[primary] = (server_index + 1) % list_size 

2041 return server_index 

2042 

2043 

2044class NodesManager: 

2045 def __init__( 

2046 self, 

2047 startup_nodes: list[ClusterNode], 

2048 from_url=False, 

2049 require_full_coverage=False, 

2050 lock: Optional[threading.RLock] = None, 

2051 dynamic_startup_nodes=True, 

2052 connection_pool_class=ConnectionPool, 

2053 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

2054 cache: Optional[CacheInterface] = None, 

2055 cache_config: Optional[CacheConfig] = None, 

2056 cache_factory: Optional[CacheFactoryInterface] = None, 

2057 event_dispatcher: Optional[EventDispatcher] = None, 

2058 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

2059 **kwargs, 

2060 ): 

2061 self.nodes_cache: dict[str, ClusterNode] = {} 

2062 self.slots_cache: dict[int, list[ClusterNode]] = {} 

2063 self.startup_nodes: dict[str, ClusterNode] = {n.name: n for n in startup_nodes} 

2064 self.default_node: Optional[ClusterNode] = None 

2065 self._epoch: int = 0 

2066 self.from_url = from_url 

2067 self._require_full_coverage = require_full_coverage 

2068 self._dynamic_startup_nodes = dynamic_startup_nodes 

2069 self.connection_pool_class = connection_pool_class 

2070 self.address_remap = address_remap 

2071 self._cache: Optional[CacheInterface] = None 

2072 if cache: 

2073 self._cache = cache 

2074 elif cache_factory is not None: 

2075 self._cache = cache_factory.get_cache() 

2076 elif cache_config is not None: 

2077 self._cache = CacheFactory(cache_config).get_cache() 

2078 self.connection_kwargs = kwargs 

2079 self.read_load_balancer = LoadBalancer() 

2080 

2081 # nodes_cache / slots_cache / startup_nodes / default_node are protected by _lock 

2082 if lock is None: 

2083 self._lock = threading.RLock() 

2084 else: 

2085 self._lock = lock 

2086 

2087 # initialize holds _initialization_lock to dedup multiple calls to reinitialize; 

2088 # note that if we hold both _lock and _initialization_lock, we _must_ acquire 

2089 # _initialization_lock first (ie: to have a consistent order) to avoid deadlock. 

2090 self._initialization_lock: threading.RLock = threading.RLock() 

2091 

2092 if event_dispatcher is None: 

2093 self._event_dispatcher = EventDispatcher() 

2094 else: 

2095 self._event_dispatcher = event_dispatcher 

2096 self._credential_provider = self.connection_kwargs.get( 

2097 "credential_provider", None 

2098 ) 

2099 self.maint_notifications_config = maint_notifications_config 

2100 

2101 self.initialize() 

2102 

2103 def get_node( 

2104 self, 

2105 host: Optional[str] = None, 

2106 port: Optional[int] = None, 

2107 node_name: Optional[str] = None, 

2108 ) -> Optional[ClusterNode]: 

2109 """ 

2110 Get the requested node from the cluster's nodes. 

2111 nodes. 

2112 :return: ClusterNode if the node exists, else None 

2113 """ 

2114 if host and port: 

2115 # the user passed host and port 

2116 if host == "localhost": 

2117 host = socket.gethostbyname(host) 

2118 with self._lock: 

2119 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

2120 elif node_name: 

2121 with self._lock: 

2122 return self.nodes_cache.get(node_name) 

2123 else: 

2124 return None 

2125 

2126 def move_slot(self, e: Union[AskError, MovedError]): 

2127 """ 

2128 Update the slot's node with the redirected one 

2129 """ 

2130 node_changed = False 

2131 with self._lock: 

2132 redirected_node = self.get_node(host=e.host, port=e.port) 

2133 if redirected_node is not None: 

2134 # The node already exists 

2135 if redirected_node.server_type is not PRIMARY: 

2136 # Update the node's server type 

2137 redirected_node.server_type = PRIMARY 

2138 else: 

2139 # This is a new node, we will add it to the nodes cache 

2140 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

2141 self.nodes_cache[redirected_node.name] = redirected_node 

2142 

2143 slot_nodes = self.slots_cache[e.slot_id] 

2144 if redirected_node not in slot_nodes: 

2145 # The new slot owner is a new server, or a server from a different 

2146 # shard. We need to remove all current nodes from the slot's list 

2147 # (including replications) and add just the new node. 

2148 self.slots_cache[e.slot_id] = [redirected_node] 

2149 node_changed = True 

2150 elif redirected_node is not slot_nodes[0]: 

2151 # The MOVED error resulted from a failover, and the new slot owner 

2152 # had previously been a replica. 

2153 old_primary = slot_nodes[0] 

2154 # Update the old primary to be a replica and add it to the end of 

2155 # the slot's node list 

2156 old_primary.server_type = REPLICA 

2157 slot_nodes.append(old_primary) 

2158 # Remove the old replica, which is now a primary, from the slot's 

2159 # node list 

2160 slot_nodes.remove(redirected_node) 

2161 # Override the old primary with the new one 

2162 slot_nodes[0] = redirected_node 

2163 if self.default_node == old_primary: 

2164 # Update the default node with the new primary 

2165 self.default_node = redirected_node 

2166 node_changed = True 

2167 # else: circular MOVED to current primary -> no-op 

2168 # Dispatch outside the lock so listeners can acquire their own locks 

2169 # without risk of deadlock. Skipped on the no-op branch to avoid 

2170 # needless reconciliation walks under MOVED storms. A listener must 

2171 # not break slots-cache refresh; log and continue so a single buggy 

2172 # listener cannot starve the rest. 

2173 if node_changed: 

2174 try: 

2175 self._event_dispatcher.dispatch(AfterSlotsCacheRefreshEvent()) 

2176 except Exception as exc: 

2177 # Don't shadow the method parameter ``e``: ``except as`` binds 

2178 # the listener exception in the function scope and ``del``s 

2179 # the name on block exit (PEP 3134), which would also wipe 

2180 # out the original AskError/MovedError parameter. 

2181 logger.exception( 

2182 "listener raised during slots-cache refresh: %s: %s", 

2183 type(exc).__name__, 

2184 exc, 

2185 ) 

2186 

2187 @deprecated_args( 

2188 args_to_warn=["server_type"], 

2189 reason=( 

2190 "In case you need select some load balancing strategy " 

2191 "that will use replicas, please set it through 'load_balancing_strategy'" 

2192 ), 

2193 version="5.3.0", 

2194 ) 

2195 def get_node_from_slot( 

2196 self, 

2197 slot: int, 

2198 read_from_replicas: bool = False, 

2199 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

2200 server_type: Optional[Literal["primary", "replica"]] = None, 

2201 ) -> ClusterNode: 

2202 """ 

2203 Gets a node that servers this hash slot 

2204 """ 

2205 

2206 if read_from_replicas is True and load_balancing_strategy is None: 

2207 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN 

2208 

2209 with self._lock: 

2210 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

2211 raise SlotNotCoveredError( 

2212 f'Slot "{slot}" not covered by the cluster. ' 

2213 + f'"require_full_coverage={self._require_full_coverage}"' 

2214 ) 

2215 

2216 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: 

2217 # get the server index using the strategy defined in load_balancing_strategy 

2218 primary_name = self.slots_cache[slot][0].name 

2219 node_idx = self.read_load_balancer.get_server_index( 

2220 primary_name, len(self.slots_cache[slot]), load_balancing_strategy 

2221 ) 

2222 elif ( 

2223 server_type is None 

2224 or server_type == PRIMARY 

2225 or len(self.slots_cache[slot]) == 1 

2226 ): 

2227 # return a primary 

2228 node_idx = 0 

2229 else: 

2230 # return a replica 

2231 # randomly choose one of the replicas 

2232 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

2233 

2234 return self.slots_cache[slot][node_idx] 

2235 

2236 def get_nodes_by_server_type(self, server_type: Literal["primary", "replica"]): 

2237 """ 

2238 Get all nodes with the specified server type 

2239 :param server_type: 'primary' or 'replica' 

2240 :return: list of ClusterNode 

2241 """ 

2242 with self._lock: 

2243 return [ 

2244 node 

2245 for node in self.nodes_cache.values() 

2246 if node.server_type == server_type 

2247 ] 

2248 

2249 @deprecated_function( 

2250 reason="This method is not used anymore internally. The startup nodes are populated automatically.", 

2251 version="7.0.2", 

2252 ) 

2253 def populate_startup_nodes(self, nodes): 

2254 """ 

2255 Populate all startup nodes and filters out any duplicates 

2256 """ 

2257 with self._lock: 

2258 for n in nodes: 

2259 self.startup_nodes[n.name] = n 

2260 

2261 def move_node_to_end_of_cached_nodes(self, node_name: str) -> None: 

2262 """ 

2263 Move a failing node to the end of startup_nodes and nodes_cache so it's 

2264 tried last during reinitialization and when selecting the default node. 

2265 If the node is not in the respective list, nothing is done. 

2266 """ 

2267 # Move in startup_nodes 

2268 if node_name in self.startup_nodes and len(self.startup_nodes) > 1: 

2269 node = self.startup_nodes.pop(node_name) 

2270 self.startup_nodes[node_name] = node # Re-insert at end 

2271 

2272 # Move in nodes_cache - this affects get_nodes_by_server_type ordering 

2273 # which is used to select the default_node during initialize() 

2274 if node_name in self.nodes_cache and len(self.nodes_cache) > 1: 

2275 node = self.nodes_cache.pop(node_name) 

2276 self.nodes_cache[node_name] = node # Re-insert at end 

2277 

2278 def check_slots_coverage(self, slots_cache): 

2279 # Validate if all slots are covered or if we should try next 

2280 # startup node 

2281 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

2282 if i not in slots_cache: 

2283 return False 

2284 return True 

2285 

2286 def create_redis_connections(self, nodes): 

2287 """ 

2288 This function will create a redis connection to all nodes in :nodes: 

2289 """ 

2290 connection_pools = [] 

2291 for node in nodes: 

2292 if node.redis_connection is None: 

2293 node.redis_connection = self.create_redis_node( 

2294 host=node.host, 

2295 port=node.port, 

2296 maint_notifications_config=self.maint_notifications_config, 

2297 **self.connection_kwargs, 

2298 ) 

2299 connection_pools.append(node.redis_connection.connection_pool) 

2300 

2301 self._event_dispatcher.dispatch( 

2302 AfterPooledConnectionsInstantiationEvent( 

2303 connection_pools, ClientType.SYNC, self._credential_provider 

2304 ) 

2305 ) 

2306 

2307 def create_redis_node( 

2308 self, 

2309 host, 

2310 port, 

2311 **kwargs, 

2312 ): 

2313 # We are configuring the connection pool not to retry 

2314 # connections on lower level clients to avoid retrying 

2315 # connections to nodes that are not reachable 

2316 # and to avoid blocking the connection pool. 

2317 # The only error that will have some handling in the lower 

2318 # level clients is ConnectionError which will trigger disconnection 

2319 # of the socket. 

2320 # The retries will be handled on cluster client level 

2321 # where we will have proper handling of the cluster topology 

2322 node_retry_config = Retry( 

2323 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) 

2324 ) 

2325 

2326 if self.from_url: 

2327 # Create a redis node with a custom connection pool 

2328 kwargs.update({"host": host}) 

2329 kwargs.update({"port": port}) 

2330 kwargs.update({"cache": self._cache}) 

2331 kwargs.update({"retry": node_retry_config}) 

2332 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

2333 else: 

2334 r = Redis( 

2335 host=host, 

2336 port=port, 

2337 cache=self._cache, 

2338 retry=node_retry_config, 

2339 **kwargs, 

2340 ) 

2341 return r 

2342 

2343 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

2344 node_name = get_node_name(host, port) 

2345 # check if we already have this node in the tmp_nodes_cache 

2346 target_node = tmp_nodes_cache.get(node_name) 

2347 if target_node is None: 

2348 # before creating a new cluster node, check if the cluster node already 

2349 # exists in the current nodes cache and has a valid connection so we can 

2350 # reuse it 

2351 redis_connection: Optional[Redis] = None 

2352 with self._lock: 

2353 previous_node = self.nodes_cache.get(node_name) 

2354 if previous_node: 

2355 redis_connection = previous_node.redis_connection 

2356 # don't update the old ClusterNode, so we don't update its role 

2357 # outside of the lock 

2358 target_node = ClusterNode(host, port, role, redis_connection) 

2359 # add this node to the nodes cache 

2360 tmp_nodes_cache[target_node.name] = target_node 

2361 

2362 return target_node 

2363 

2364 def _get_epoch(self) -> int: 

2365 """ 

2366 Get the current epoch value. This method exists primarily to allow 

2367 tests to mock the epoch fetch and control race condition timing. 

2368 """ 

2369 with self._lock: 

2370 return self._epoch 

2371 

2372 def initialize( 

2373 self, 

2374 additional_startup_nodes_info: Optional[List[Tuple[str, int]]] = None, 

2375 disconnect_startup_nodes_pools: bool = True, 

2376 ): 

2377 """ 

2378 Initializes the nodes cache, slots cache and redis connections. 

2379 :startup_nodes: 

2380 Responsible for discovering other nodes in the cluster 

2381 :disconnect_startup_nodes_pools: 

2382 Whether to disconnect the connection pool of the startup nodes 

2383 after the initialization is complete. This is useful when the 

2384 startup nodes are not part of the cluster and we want to avoid 

2385 keeping the connection open. 

2386 :additional_startup_nodes_info: 

2387 Additional nodes to add temporarily to the startup nodes. 

2388 The additional nodes will be used just in the process of extraction of the slots 

2389 and nodes information from the cluster. 

2390 This is useful when we want to add new nodes to the cluster 

2391 and initialize the client 

2392 with them. 

2393 The format of the list is a list of tuples, where each tuple contains 

2394 the host and port of the node. 

2395 """ 

2396 self.reset() 

2397 tmp_nodes_cache = {} 

2398 tmp_slots = {} 

2399 disagreements = [] 

2400 startup_nodes_reachable = False 

2401 fully_covered = False 

2402 kwargs = self.connection_kwargs 

2403 exception = None 

2404 epoch = self._get_epoch() 

2405 if additional_startup_nodes_info is None: 

2406 additional_startup_nodes_info = [] 

2407 

2408 with self._initialization_lock: 

2409 with self._lock: 

2410 if epoch != self._epoch: 

2411 # another thread has already re-initialized the nodes; don't 

2412 # bother running again 

2413 return 

2414 

2415 with self._lock: 

2416 startup_nodes = tuple(self.startup_nodes.values()) 

2417 

2418 additional_startup_nodes = [ 

2419 ClusterNode(host, port) for host, port in additional_startup_nodes_info 

2420 ] 

2421 if is_debug_log_enabled(): 

2422 logger.debug( 

2423 f"Topology refresh: using additional nodes: {[node.name for node in additional_startup_nodes]}; " 

2424 f"and startup nodes: {[node.name for node in startup_nodes]}" 

2425 ) 

2426 

2427 for startup_node in (*startup_nodes, *additional_startup_nodes): 

2428 try: 

2429 if startup_node.redis_connection: 

2430 r = startup_node.redis_connection 

2431 

2432 else: 

2433 # Create a new Redis connection 

2434 if is_debug_log_enabled(): 

2435 socket_timeout = kwargs.get("socket_timeout", "not set") 

2436 socket_connect_timeout = kwargs.get( 

2437 "socket_connect_timeout", "not set" 

2438 ) 

2439 maint_enabled = ( 

2440 self.maint_notifications_config.enabled 

2441 if self.maint_notifications_config 

2442 else False 

2443 ) 

2444 logger.debug( 

2445 "Topology refresh: Creating new Redis connection to " 

2446 f"{startup_node.host}:{startup_node.port}; " 

2447 f"with socket_timeout: {socket_timeout}, and " 

2448 f"socket_connect_timeout: {socket_connect_timeout}, " 

2449 "and maint_notifications enabled: " 

2450 f"{maint_enabled}" 

2451 ) 

2452 r = self.create_redis_node( 

2453 startup_node.host, 

2454 startup_node.port, 

2455 maint_notifications_config=self.maint_notifications_config, 

2456 **kwargs, 

2457 ) 

2458 if startup_node in self.startup_nodes.values(): 

2459 self.startup_nodes[startup_node.name].redis_connection = r 

2460 else: 

2461 startup_node.redis_connection = r 

2462 try: 

2463 # Make sure cluster mode is enabled on this node 

2464 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

2465 if disconnect_startup_nodes_pools: 

2466 with r.connection_pool._lock: 

2467 # take care to clear connections before we move on 

2468 # mark all active connections for reconnect - they will be 

2469 # reconnected on next use, but will allow current in flight commands to complete first 

2470 r.connection_pool.update_active_connections_for_reconnect() 

2471 # Needed to clear READONLY state when it is no longer applicable 

2472 r.connection_pool.disconnect_free_connections() 

2473 except ResponseError: 

2474 raise RedisClusterException( 

2475 "Cluster mode is not enabled on this node" 

2476 ) 

2477 startup_nodes_reachable = True 

2478 except Exception as e: 

2479 # Try the next startup node. 

2480 # The exception is saved and raised only if we have no more nodes. 

2481 exception = e 

2482 continue 

2483 

2484 # CLUSTER SLOTS command results in the following output: 

2485 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

2486 # where each node contains the following list: [IP, port, node_id] 

2487 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

2488 # primary node of the first slot section. 

2489 # If there's only one server in the cluster, its ``host`` is '' 

2490 # Fix it to the host in startup_nodes 

2491 if ( 

2492 len(cluster_slots) == 1 

2493 and len(cluster_slots[0][2][0]) == 0 

2494 and len(self.startup_nodes) == 1 

2495 ): 

2496 cluster_slots[0][2][0] = startup_node.host 

2497 

2498 for slot in cluster_slots: 

2499 primary_node = slot[2] 

2500 host = str_if_bytes(primary_node[0]) 

2501 if host == "": 

2502 host = startup_node.host 

2503 port = int(primary_node[1]) 

2504 host, port = self.remap_host_port(host, port) 

2505 

2506 nodes_for_slot = [] 

2507 

2508 target_node = self._get_or_create_cluster_node( 

2509 host, port, PRIMARY, tmp_nodes_cache 

2510 ) 

2511 nodes_for_slot.append(target_node) 

2512 

2513 replica_nodes = slot[3:] 

2514 for replica_node in replica_nodes: 

2515 host = str_if_bytes(replica_node[0]) 

2516 port = int(replica_node[1]) 

2517 host, port = self.remap_host_port(host, port) 

2518 target_replica_node = self._get_or_create_cluster_node( 

2519 host, port, REPLICA, tmp_nodes_cache 

2520 ) 

2521 nodes_for_slot.append(target_replica_node) 

2522 

2523 for i in range(int(slot[0]), int(slot[1]) + 1): 

2524 if i not in tmp_slots: 

2525 tmp_slots[i] = nodes_for_slot 

2526 else: 

2527 # Validate that 2 nodes want to use the same slot cache 

2528 # setup 

2529 tmp_slot = tmp_slots[i][0] 

2530 if tmp_slot.name != target_node.name: 

2531 disagreements.append( 

2532 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

2533 ) 

2534 

2535 if len(disagreements) > 5: 

2536 raise RedisClusterException( 

2537 f"startup_nodes could not agree on a valid " 

2538 f"slots cache: {', '.join(disagreements)}" 

2539 ) 

2540 

2541 fully_covered = self.check_slots_coverage(tmp_slots) 

2542 if fully_covered: 

2543 # Don't need to continue to the next startup node if all 

2544 # slots are covered 

2545 break 

2546 

2547 if not startup_nodes_reachable: 

2548 raise RedisClusterException( 

2549 f"Redis Cluster cannot be connected. Please provide at least " 

2550 f"one reachable node: {str(exception)}" 

2551 ) from exception 

2552 

2553 # Create Redis connections to all nodes 

2554 self.create_redis_connections(list(tmp_nodes_cache.values())) 

2555 

2556 # Check if the slots are not fully covered 

2557 if not fully_covered and self._require_full_coverage: 

2558 # Despite the requirement that the slots be covered, there 

2559 # isn't a full coverage 

2560 raise RedisClusterException( 

2561 f"All slots are not covered after query all startup_nodes. " 

2562 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

2563 f"covered..." 

2564 ) 

2565 

2566 # Set the tmp variables to the real variables 

2567 with self._lock: 

2568 self.nodes_cache = tmp_nodes_cache 

2569 self.slots_cache = tmp_slots 

2570 # Set the default node 

2571 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

2572 if self._dynamic_startup_nodes: 

2573 # Populate the startup nodes with all discovered nodes 

2574 self.startup_nodes = tmp_nodes_cache 

2575 # Increment the epoch to signal that initialization has completed 

2576 self._epoch += 1 

2577 # Dispatch so listeners (e.g. ClusterPubSub) can reconcile per-node 

2578 # state after slot ownership may have changed. A listener must not 

2579 # break slots-cache refresh; log and continue so a single buggy 

2580 # listener cannot starve the rest. 

2581 try: 

2582 self._event_dispatcher.dispatch(AfterSlotsCacheRefreshEvent()) 

2583 except Exception as e: 

2584 logger.exception( 

2585 "listener raised during slots-cache refresh: %s: %s", 

2586 type(e).__name__, 

2587 e, 

2588 ) 

2589 

2590 def close(self) -> None: 

2591 with self._lock: 

2592 self.default_node = None 

2593 nodes = tuple(self.nodes_cache.values()) 

2594 for node in nodes: 

2595 if node.redis_connection: 

2596 node.redis_connection.close() 

2597 

2598 def reset(self): 

2599 try: 

2600 self.read_load_balancer.reset() 

2601 except TypeError: 

2602 # The read_load_balancer is None, do nothing 

2603 pass 

2604 

2605 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

2606 """ 

2607 Remap the host and port returned from the cluster to a different 

2608 internal value. Useful if the client is not connecting directly 

2609 to the cluster. 

2610 """ 

2611 if self.address_remap: 

2612 return self.address_remap((host, port)) 

2613 return host, port 

2614 

2615 def find_connection_owner(self, connection: Connection) -> Optional[ClusterNode]: 

2616 node_name = get_node_name(connection.host, connection.port) 

2617 with self._lock: 

2618 for node in tuple(self.nodes_cache.values()): 

2619 if node.redis_connection: 

2620 conn_args = node.redis_connection.connection_pool.connection_kwargs 

2621 if node_name == get_node_name( 

2622 conn_args.get("host"), conn_args.get("port") 

2623 ): 

2624 return node 

2625 return None 

2626 

2627 

2628def _unregister_slots_cache_listener( 

2629 dispatcher_ref: "weakref.ref[EventDispatcher]", 

2630 listener: EventListenerInterface, 

2631 event_type: Type[object], 

2632) -> None: 

2633 # Module-level finalizer callback. Kept free of strong references to the 

2634 # owning ClusterPubSub so attaching it via weakref.finalize does not 

2635 # extend the pubsub's lifetime. 

2636 dispatcher = dispatcher_ref() 

2637 if dispatcher is not None: 

2638 dispatcher.unregister_listeners({event_type: [listener]}) 

2639 

2640 

2641class ClusterPubSubSlotsCacheListener(EventListenerInterface): 

2642 """ 

2643 Listener that forwards AfterSlotsCacheRefreshEvent to a ClusterPubSub. 

2644 

2645 Holds a weak reference to the pubsub so it does not keep the instance 

2646 alive. Deterministic cleanup of the dispatcher's strong reference to this 

2647 listener is performed by a ``weakref.finalize`` attached to the owning 

2648 ClusterPubSub in ``ClusterPubSub.__init__``. 

2649 """ 

2650 

2651 def __init__(self, pubsub: "ClusterPubSub") -> None: 

2652 self._pubsub_ref: "weakref.ref[ClusterPubSub]" = weakref.ref(pubsub) 

2653 

2654 def listen(self, event: object) -> None: 

2655 pubsub = self._pubsub_ref() 

2656 if pubsub is None: 

2657 # Race window between pubsub GC and the finalizer running; safe 

2658 # no-op, finalizer will remove this listener shortly. 

2659 return 

2660 try: 

2661 pubsub.on_slots_changed() 

2662 except Exception as e: 

2663 # Listeners must not break slots-cache refresh; log and continue so 

2664 # a single buggy pubsub cannot starve the rest. 

2665 logger.exception( 

2666 "pubsub %r raised during slots-cache change: %s: %s", 

2667 pubsub, 

2668 type(e).__name__, 

2669 e, 

2670 ) 

2671 

2672 

2673class ClusterPubSub(PubSub): 

2674 """ 

2675 Wrapper for PubSub class. 

2676 

2677 IMPORTANT: before using ClusterPubSub, read about the known limitations 

2678 with pubsub in Cluster mode and learn how to workaround them: 

2679 https://redis.readthedocs.io/en/stable/clustering.html#known-pubsub-limitations 

2680 """ 

2681 

2682 def __init__( 

2683 self, 

2684 redis_cluster, 

2685 node=None, 

2686 host=None, 

2687 port=None, 

2688 push_handler_func=None, 

2689 event_dispatcher: Optional["EventDispatcher"] = None, 

2690 **kwargs, 

2691 ): 

2692 """ 

2693 When a pubsub instance is created without specifying a node, a single 

2694 node will be transparently chosen for the pubsub connection on the 

2695 first command execution. The node will be determined by: 

2696 1. Hashing the channel name in the request to find its keyslot 

2697 2. Selecting a node that handles the keyslot: If read_from_replicas is 

2698 set to true or load_balancing_strategy is set, a replica can be selected. 

2699 

2700 :type redis_cluster: RedisCluster 

2701 :type node: ClusterNode 

2702 :type host: str 

2703 :type port: int 

2704 """ 

2705 self.node = None 

2706 self.set_pubsub_node(redis_cluster, node, host, port) 

2707 connection_pool = ( 

2708 None 

2709 if self.node is None 

2710 else redis_cluster.get_redis_connection(self.node).connection_pool 

2711 ) 

2712 self.cluster = redis_cluster 

2713 self.node_pubsub_mapping = {} 

2714 # Reverse index: shard channel (normalized) -> owning node.name. Used to 

2715 # route sunsubscribe calls and reconcile subscriptions after slot 

2716 # migration / failover. 

2717 self._shard_channel_to_node: dict = {} 

2718 # Dedicated lock for shard-subscription bookkeeping. Distinct from 

2719 # PubSub.self._lock (which serializes wire I/O on the cluster-level 

2720 # connection used by aclose / send_command / regular subscribe) so 

2721 # that reconciliation cannot starve those unrelated paths during 

2722 # long per-channel migrations. 

2723 self._shard_state_lock: threading.RLock = threading.RLock() 

2724 # Worker executor for off-loading slot-migration reconciliation from 

2725 # the dispatch call site (mirrors async's asyncio.create_task model so 

2726 # the thread that triggered MovedError / topology refresh is not 

2727 # blocked on per-channel sunsubscribe / ssubscribe network I/O). 

2728 # Lazy-created on first on_slots_changed() to avoid a persistent 

2729 # worker thread for pubsubs that never see a slot migration. 

2730 # Initialized before super().__init__() because PubSub.__init__ calls 

2731 # self.reset(), which resolves to ClusterPubSub.reset() and reads 

2732 # these attributes. 

2733 self._reconcile_executor: Optional[ThreadPoolExecutor] = None 

2734 # In-flight reconciliation futures; tracked so reset() can cancel 

2735 # pending work and so exceptions surface via a done-callback. 

2736 self._reconcile_futures: Set[Future] = set() 

2737 self._pubsubs_generator = self._pubsubs_generator() 

2738 if event_dispatcher is None: 

2739 self._event_dispatcher = EventDispatcher() 

2740 else: 

2741 self._event_dispatcher = event_dispatcher 

2742 super().__init__( 

2743 connection_pool=connection_pool, 

2744 encoder=redis_cluster.encoder, 

2745 push_handler_func=push_handler_func, 

2746 event_dispatcher=self._event_dispatcher, 

2747 **kwargs, 

2748 ) 

2749 # Subscribe to slots-cache change notifications so shard subscriptions 

2750 # can be reconciled automatically after topology refreshes. 

2751 nm_dispatcher = redis_cluster.nodes_manager._event_dispatcher 

2752 self._slots_cache_listener = ClusterPubSubSlotsCacheListener(self) 

2753 nm_dispatcher.register_listeners( 

2754 {AfterSlotsCacheRefreshEvent: [self._slots_cache_listener]} 

2755 ) 

2756 # Deterministic GC-time cleanup so short-lived pubsubs do not leak 

2757 # listeners in the dispatcher when no slots-refresh event ever fires. 

2758 weakref.finalize( 

2759 self, 

2760 _unregister_slots_cache_listener, 

2761 weakref.ref(nm_dispatcher), 

2762 self._slots_cache_listener, 

2763 AfterSlotsCacheRefreshEvent, 

2764 ) 

2765 

2766 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

2767 """ 

2768 The pubsub node will be set according to the passed node, host and port 

2769 When none of the node, host, or port are specified - the node is set 

2770 to None and will be determined by the keyslot of the channel in the 

2771 first command to be executed. 

2772 RedisClusterException will be thrown if the passed node does not exist 

2773 in the cluster. 

2774 If host is passed without port, or vice versa, a DataError will be 

2775 thrown. 

2776 :type cluster: RedisCluster 

2777 :type node: ClusterNode 

2778 :type host: str 

2779 :type port: int 

2780 """ 

2781 if node is not None: 

2782 # node is passed by the user 

2783 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

2784 pubsub_node = node 

2785 elif host is not None and port is not None: 

2786 # host and port passed by the user 

2787 node = cluster.get_node(host=host, port=port) 

2788 self._raise_on_invalid_node(cluster, node, host, port) 

2789 pubsub_node = node 

2790 elif any([host, port]) is True: 

2791 # only 'host' or 'port' passed 

2792 raise DataError("Passing a host requires passing a port, and vice versa") 

2793 else: 

2794 # nothing passed by the user. set node to None 

2795 pubsub_node = None 

2796 

2797 self.node = pubsub_node 

2798 

2799 def get_pubsub_node(self): 

2800 """ 

2801 Get the node that is being used as the pubsub connection 

2802 """ 

2803 return self.node 

2804 

2805 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

2806 """ 

2807 Raise a RedisClusterException if the node is None or doesn't exist in 

2808 the cluster. 

2809 """ 

2810 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

2811 raise RedisClusterException( 

2812 f"Node {host}:{port} doesn't exist in the cluster" 

2813 ) 

2814 

2815 def execute_command(self, *args): 

2816 """ 

2817 Execute a subscribe/unsubscribe command. 

2818 

2819 Taken code from redis-py and tweak to make it work within a cluster. 

2820 """ 

2821 # NOTE: don't parse the response in this function -- it could pull a 

2822 # legitimate message off the stack if the connection is already 

2823 # subscribed to one or more channels 

2824 

2825 if self.connection is None: 

2826 if self.connection_pool is None: 

2827 if len(args) > 1: 

2828 # Hash the first channel and get one of the nodes holding 

2829 # this slot 

2830 channel = args[1] 

2831 slot = self.cluster.keyslot(channel) 

2832 node = self.cluster.nodes_manager.get_node_from_slot( 

2833 slot, 

2834 self.cluster.read_from_replicas, 

2835 self.cluster.load_balancing_strategy, 

2836 ) 

2837 else: 

2838 # Get a random node 

2839 node = self.cluster.get_random_node() 

2840 self.node = node 

2841 redis_connection = self.cluster.get_redis_connection(node) 

2842 self.connection_pool = redis_connection.connection_pool 

2843 self.connection = self.connection_pool.get_connection() 

2844 # register a callback that re-subscribes to any channels we 

2845 # were listening to when we were disconnected 

2846 self.connection.register_connect_callback(self.on_connect) 

2847 if self.push_handler_func is not None: 

2848 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

2849 self._event_dispatcher.dispatch( 

2850 AfterPubSubConnectionInstantiationEvent( 

2851 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

2852 ) 

2853 ) 

2854 connection = self.connection 

2855 self._execute(connection, connection.send_command, *args) 

2856 

2857 def _resubscribe_shard_channels(self) -> None: 

2858 # A single node can own multiple slot ranges, so a batched 

2859 # ``SSUBSCRIBE`` covering every tracked channel would be rejected by 

2860 # Redis with a ``CROSSSLOT`` error. Group by hash slot and emit one 

2861 # ``SSUBSCRIBE`` per slot. 

2862 by_slot: defaultdict[int, dict] = defaultdict(dict) 

2863 for k, v in self.shard_channels.items(): 

2864 by_slot[key_slot(self.encoder.encode(k))][k] = v 

2865 for subscriptions in by_slot.values(): 

2866 self._resubscribe(subscriptions, self.ssubscribe) 

2867 

2868 def _get_node_pubsub(self, node): 

2869 try: 

2870 return self.node_pubsub_mapping[node.name] 

2871 except KeyError: 

2872 redis_connection = self.cluster.get_redis_connection(node) 

2873 pubsub = redis_connection.pubsub( 

2874 push_handler_func=self.push_handler_func, 

2875 ) 

2876 # Replay shard subscriptions on reconnect with slot-aware grouping 

2877 # so that channels spanning multiple slots owned by this node do 

2878 # not trigger a CROSSSLOT error. 

2879 pubsub._resubscribe_shard_channels = MethodType( 

2880 ClusterPubSub._resubscribe_shard_channels, pubsub 

2881 ) 

2882 self.node_pubsub_mapping[node.name] = pubsub 

2883 return pubsub 

2884 

2885 def _find_node_name_for_pubsub(self, pubsub): 

2886 for node_name, node_pubsub in self.node_pubsub_mapping.items(): 

2887 if node_pubsub is pubsub: 

2888 return node_name 

2889 return None 

2890 

2891 def _sharded_message_generator(self, timeout=0.0): 

2892 for _ in range(len(self.node_pubsub_mapping)): 

2893 pubsub = next(self._pubsubs_generator) 

2894 # Don't pass ignore_subscribe_messages here - let get_sharded_message 

2895 # handle the filtering after processing subscription state changes 

2896 message = pubsub.get_message( 

2897 ignore_subscribe_messages=False, timeout=timeout 

2898 ) 

2899 if message is not None: 

2900 return pubsub, message 

2901 return None, None 

2902 

2903 def _pubsubs_generator(self): 

2904 while True: 

2905 current_nodes = list(self.node_pubsub_mapping.values()) 

2906 if not current_nodes: 

2907 return # Avoid infinite loop when no subscriptions exist 

2908 yield from current_nodes 

2909 

2910 def get_sharded_message( 

2911 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None 

2912 ): 

2913 if target_node: 

2914 # Use .get(): migration-driven cleanup in the sunsubscribe branch 

2915 # below and reset() both remove entries from node_pubsub_mapping, 

2916 # so a caller polling with target_node may race the cleanup. Match 

2917 # the async counterpart's None-handling rather than raising 

2918 # KeyError. None pubsub falls through to "no message available". 

2919 pubsub = self.node_pubsub_mapping.get(target_node.name) 

2920 if pubsub is not None: 

2921 # Don't pass ignore_subscribe_messages here - let get_sharded_message 

2922 # handle the filtering after processing subscription state changes 

2923 message = pubsub.get_message( 

2924 ignore_subscribe_messages=False, timeout=timeout 

2925 ) 

2926 else: 

2927 message = None 

2928 else: 

2929 pubsub, message = self._sharded_message_generator(timeout=timeout) 

2930 if message is None: 

2931 return None 

2932 # Only sunsubscribe mutates cluster-level shard state; bypassing the 

2933 # lock on the data-message hot path keeps smessage delivery from 

2934 # competing with the reconciliation worker for _shard_state_lock. 

2935 if str_if_bytes(message["type"]) == "sunsubscribe": 

2936 # Serialize state mutation against reinitialize_shard_subscriptions 

2937 # (worker thread). The blocking get_message above intentionally 

2938 # runs outside the lock so reconciliation is not stalled by long 

2939 # polls. 

2940 with self._shard_state_lock: 

2941 if message["channel"] in self.pending_unsubscribe_shard_channels: 

2942 # User-initiated sunsubscribe: drop from cluster-level tracking. 

2943 self.pending_unsubscribe_shard_channels.remove(message["channel"]) 

2944 self.shard_channels.pop(message["channel"], None) 

2945 self._shard_channel_to_node.pop(message["channel"], None) 

2946 # Drop the per-node pubsub that delivered the confirmation once 

2947 # it no longer holds any shard subscriptions, regardless of 

2948 # whether the sunsubscribe was user-initiated or driven by 

2949 # slot-migration reconciliation (_migrate_shard_channel, which 

2950 # intentionally does not add the channel to 

2951 # pending_unsubscribe_shard_channels). This releases the 

2952 # dedicated connection that would otherwise linger. 

2953 # Identifying the receiving pubsub directly (rather than via 

2954 # the cluster's current slot map) is required after slot 

2955 # migration, where the channel's owner is no longer the node 

2956 # that received our original SSUBSCRIBE. 

2957 if pubsub is not None and not pubsub.subscribed: 

2958 name = self._find_node_name_for_pubsub(pubsub) 

2959 if name is not None: 

2960 try: 

2961 pubsub.reset() 

2962 except Exception: 

2963 pass 

2964 self.node_pubsub_mapping.pop(name, None) 

2965 # Mirror PubSub.handle_message: the empty-check belongs in the 

2966 # unsubscribe branch since that is the only path that can 

2967 # reduce shard_channels here. 

2968 if not self.channels and not self.patterns and not self.shard_channels: 

2969 self.subscribed_event.clear() 

2970 # Only suppress subscribe/unsubscribe messages, not data messages (smessage) 

2971 if str_if_bytes(message["type"]) in ("ssubscribe", "sunsubscribe"): 

2972 if self.ignore_subscribe_messages or ignore_subscribe_messages: 

2973 return None 

2974 return message 

2975 

2976 def ssubscribe(self, *args, **kwargs): 

2977 if args: 

2978 args = list_or_args(args[0], args[1:]) 

2979 s_channels = dict.fromkeys(args) 

2980 s_channels.update(kwargs) 

2981 # Serialize against reinitialize_shard_subscriptions (worker thread) 

2982 # so the reverse index, shard_channels, and node_pubsub_mapping are 

2983 # not mutated concurrently. 

2984 with self._shard_state_lock: 

2985 for s_channel, handler in s_channels.items(): 

2986 node = self.cluster.get_node_from_key(s_channel) 

2987 if not node: 

2988 continue 

2989 # Lazy re-route: if this channel is already tracked against a 

2990 # different node (e.g. after a slot migration), migrate it now 

2991 # so the caller's intent is applied on the current owner. 

2992 normalized_key = next(iter(self._normalize_keys({s_channel: None}))) 

2993 old_name = self._shard_channel_to_node.get(normalized_key) 

2994 if old_name and old_name != node.name: 

2995 # Match PubSub.ssubscribe() dict.update() semantics: the 

2996 # caller's newly supplied handler (including None) always 

2997 # overrides any previously registered handler. 

2998 self._migrate_shard_channel( 

2999 normalized_key, 

3000 handler, 

3001 old_name, 

3002 node, 

3003 ) 

3004 continue 

3005 pubsub = self._get_node_pubsub(node) 

3006 if handler: 

3007 pubsub.ssubscribe(**{s_channel: handler}) 

3008 else: 

3009 pubsub.ssubscribe(s_channel) 

3010 self.shard_channels.update(pubsub.shard_channels) 

3011 self._shard_channel_to_node[normalized_key] = node.name 

3012 self.pending_unsubscribe_shard_channels.difference_update( 

3013 self._normalize_keys({s_channel: None}) 

3014 ) 

3015 if pubsub.subscribed and not self.subscribed: 

3016 self.subscribed_event.set() 

3017 self.health_check_response_counter = 0 

3018 

3019 def sunsubscribe(self, *args): 

3020 if args: 

3021 args = list_or_args(args[0], args[1:]) 

3022 else: 

3023 args = list(self.shard_channels) 

3024 

3025 # Serialize against reinitialize_shard_subscriptions: the reverse 

3026 # index and node_pubsub_mapping must not change between the lookup 

3027 # and the per-node sunsubscribe call below. 

3028 with self._shard_state_lock: 

3029 for s_channel in args: 

3030 normalized_key = next(iter(self._normalize_keys({s_channel: None}))) 

3031 # Route via the reverse index so we unsubscribe on the node 

3032 # that actually holds the subscription. After a slot migration 

3033 # the cluster's current owner may no longer be that node. 

3034 name = self._shard_channel_to_node.get(normalized_key) 

3035 if name and name in self.node_pubsub_mapping: 

3036 p = self.node_pubsub_mapping[name] 

3037 else: 

3038 node = self.cluster.get_node_from_key(s_channel) 

3039 if not node or node.name not in self.node_pubsub_mapping: 

3040 continue 

3041 p = self.node_pubsub_mapping[node.name] 

3042 p.sunsubscribe(s_channel) 

3043 self.pending_unsubscribe_shard_channels.update( 

3044 p.pending_unsubscribe_shard_channels 

3045 ) 

3046 

3047 def reinitialize_shard_subscriptions(self): 

3048 """ 

3049 Reconcile per-node shard subscriptions against the cluster's current 

3050 slot ownership map. For each tracked shard channel whose owning node 

3051 has changed (e.g. after CLUSTER SETSLOT / failover), sunsubscribe on 

3052 the old node's pubsub and ssubscribe on the new owner's pubsub, 

3053 preserving any registered handler. 

3054 """ 

3055 uncovered: list = [] 

3056 made_progress = False 

3057 first_migrate_error: Optional[BaseException] = None 

3058 with self._shard_state_lock: 

3059 for channel, handler in list(self.shard_channels.items()): 

3060 try: 

3061 new_node = self.cluster.get_node_from_key(channel) 

3062 except SlotNotCoveredError: 

3063 # Slot is transiently uncovered (mid-migration / partial 

3064 # topology refresh). Defer this channel so coverable 

3065 # siblings still reconcile this pass; we surface the 

3066 # error below so the caller (and logs) know not every 

3067 # channel was reconciled. Retry happens on the next 

3068 # slots-cache change notification. 

3069 uncovered.append(channel) 

3070 continue 

3071 old_name = self._shard_channel_to_node.get(channel) 

3072 if old_name == new_node.name: 

3073 continue 

3074 try: 

3075 self._migrate_shard_channel(channel, handler, old_name, new_node) 

3076 made_progress = True 

3077 except (ConnectionError, TimeoutError, OSError) as e: 

3078 # Transient connectivity error while subscribing on the 

3079 # new owner (or unsubscribing on the old owner if its 

3080 # handler chose to re-raise). Do not abort reconciliation 

3081 # for sibling channels: _shard_channel_to_node was not 

3082 # advanced for this channel, so the next slots-cache 

3083 # change notification will retry it. 

3084 logger.warning( 

3085 "shard channel %r migration deferred: %s: %s", 

3086 channel, 

3087 type(e).__name__, 

3088 e, 

3089 ) 

3090 if first_migrate_error is None: 

3091 first_migrate_error = e 

3092 continue 

3093 # Garbage-collect per-node pubsubs that no longer hold any 

3094 # subscription so their connections are released. 

3095 for name, pubsub in list(self.node_pubsub_mapping.items()): 

3096 if not pubsub.subscribed: 

3097 try: 

3098 pubsub.reset() 

3099 except Exception: 

3100 pass 

3101 self.node_pubsub_mapping.pop(name, None) 

3102 if uncovered: 

3103 # Surface the uncovered channels so the caller (and observer 

3104 # notification path) knows reconciliation was incomplete. All 

3105 # coverable siblings have already been migrated above. 

3106 raise SlotNotCoveredError( 

3107 f"{len(uncovered)} shard channel(s) left unreconciled; " 

3108 f"slot(s) not covered by the cluster: {uncovered!r}" 

3109 ) 

3110 if first_migrate_error is not None and not made_progress: 

3111 # Every migration attempted in this pass failed transiently and 

3112 # nothing else made progress. Re-raise the first caught error 

3113 # (typically the root cause; later failures are often downstream 

3114 # symptoms of the same unreachable node) so the worker's done- 

3115 # callback surfaces a single representative failure through the 

3116 # same logger channel used for SlotNotCoveredError. Per-channel 

3117 # WARNINGs above preserve the full forensic detail. 

3118 raise first_migrate_error 

3119 

3120 def _migrate_shard_channel(self, channel, handler, old_name, new_node): 

3121 # Detach from the old per-node pubsub, best-effort: the old node may 

3122 # already be unreachable during migration / failover. 

3123 if old_name and old_name in self.node_pubsub_mapping: 

3124 old_pubsub = self.node_pubsub_mapping[old_name] 

3125 try: 

3126 old_pubsub.sunsubscribe(channel) 

3127 except (ConnectionError, TimeoutError, OSError): 

3128 # redis-py's Connection has already called ``disconnect()`` 

3129 # before raising (see Connection.read_response / 

3130 # send_packed_command with ``disconnect_on_error=True``), 

3131 # so ``old_pubsub``'s dedicated socket is gone. Two cases: 

3132 # 

3133 # 1. The old node is no longer in the cluster topology 

3134 # (e.g. removed by failover / topology refresh): no 

3135 # reconnect target exists, so ``old_pubsub.subscribed`` 

3136 # would stay True forever and the end-of-pass GC block 

3137 # would skip it. Drop it eagerly so the round-robin 

3138 # generator does not keep yielding a dead pubsub that 

3139 # produces periodic errors from ``get_sharded_message``. 

3140 # 2. The old node is still known (transiently slow / 

3141 # unreachable): ``PubSub._execute`` auto-reconnects and 

3142 # ``on_connect`` re-subscribes to remaining channels, 

3143 # so other subscriptions on the same pubsub recover 

3144 # naturally. Leave it alone. 

3145 if self.cluster.get_node(node_name=old_name) is None: 

3146 try: 

3147 old_pubsub.reset() 

3148 except Exception: 

3149 pass 

3150 self.node_pubsub_mapping.pop(old_name, None) 

3151 # Attach to the new per-node pubsub, preserving the handler. Decode to 

3152 # a text key only when we must pass it as a kwarg (handler present). 

3153 new_pubsub = self._get_node_pubsub(new_node) 

3154 if handler: 

3155 decoded = ( 

3156 self.encoder.decode(channel, force=True) 

3157 if isinstance(channel, (bytes, bytearray)) 

3158 else channel 

3159 ) 

3160 new_pubsub.ssubscribe(**{decoded: handler}) 

3161 else: 

3162 new_pubsub.ssubscribe(channel) 

3163 self.shard_channels.update(new_pubsub.shard_channels) 

3164 normalized_key = next(iter(self._normalize_keys({channel: None}))) 

3165 self._shard_channel_to_node[normalized_key] = new_node.name 

3166 self.pending_unsubscribe_shard_channels.difference_update( 

3167 self._normalize_keys({channel: None}) 

3168 ) 

3169 if new_pubsub.subscribed and not self.subscribed: 

3170 self.subscribed_event.set() 

3171 self.health_check_response_counter = 0 

3172 

3173 def on_slots_changed(self): 

3174 # Observer hook invoked by NodesManager after a slots-cache refresh. 

3175 # Schedule reconciliation on a dedicated worker thread so the caller 

3176 # (typically MovedError handling in _execute_command or the topology 

3177 # refresh thread in initialize()) is not blocked on the network I/O 

3178 # performed by reinitialize_shard_subscriptions. Mirrors the async 

3179 # path's asyncio.create_task model. No-op when there are no shard 

3180 # subscriptions to reconcile. 

3181 if not self.shard_channels: 

3182 return 

3183 # Serialize lazy executor creation and submission against concurrent 

3184 # on_slots_changed calls (EventDispatcher releases its lock before 

3185 # invoking listeners, so two MovedError-handling threads can land 

3186 # here at once) and against reset() which tears the executor down. 

3187 # Without this, two threads could each create a ThreadPoolExecutor 

3188 # and one would be orphaned (leaking its worker thread); a reset() 

3189 # interleaved between create and submit() could also raise 

3190 # RuntimeError("cannot schedule new futures after shutdown"). 

3191 with self._shard_state_lock: 

3192 if self._reconcile_executor is None: 

3193 self._reconcile_executor = ThreadPoolExecutor( 

3194 max_workers=1, 

3195 thread_name_prefix="redis-cluster-pubsub-reconcile", 

3196 ) 

3197 future = self._reconcile_executor.submit( 

3198 self.reinitialize_shard_subscriptions 

3199 ) 

3200 self._reconcile_futures.add(future) 

3201 future.add_done_callback(self._discard_reconcile_future) 

3202 # Consume the future's exception (if any) so it is not silently lost. 

3203 # reinitialize_shard_subscriptions surfaces SlotNotCoveredError when 

3204 # a slot is still transiently uncovered; route it through the same 

3205 # logger channel as the async path for consistent observability. 

3206 future.add_done_callback(self._log_reconcile_future_exception) 

3207 

3208 def _discard_reconcile_future(self, future: "Future") -> None: 

3209 # Done-callback fires on the worker thread. Take _shard_state_lock so 

3210 # the discard observes the same mutual-exclusion discipline as the 

3211 # add() / clear() sites; without it the set mutation is correct only 

3212 # because of CPython's GIL and would race under free-threaded builds. 

3213 with self._shard_state_lock: 

3214 self._reconcile_futures.discard(future) 

3215 

3216 @staticmethod 

3217 def _log_reconcile_future_exception(future: "Future") -> None: 

3218 if future.cancelled(): 

3219 return 

3220 exc = future.exception() 

3221 if exc is not None: 

3222 logger.error( 

3223 "shard subscription reconciliation failed: %r", exc, exc_info=exc 

3224 ) 

3225 

3226 def reset(self) -> None: 

3227 # Hold _shard_state_lock across the entire teardown so it observes 

3228 # the same mutual-exclusion discipline as ssubscribe / sunsubscribe / 

3229 # get_sharded_message / reinitialize_shard_subscriptions, which all 

3230 # mutate shard_channels, _shard_channel_to_node, and 

3231 # node_pubsub_mapping under this lock. Without it, super().reset() 

3232 # rebinds shard_channels and pending_unsubscribe_shard_channels in 

3233 # parallel with a concurrent user-thread mutation, silently dropping 

3234 # subscription intent. cancel_futures drops queued reconciliation 

3235 # work; the currently-running task (if any) is already serialized 

3236 # against us by this same lock - shutdown(wait=False) avoids waiting 

3237 # on the worker thread's join, not on its critical section. 

3238 with self._shard_state_lock: 

3239 if self._reconcile_executor is not None: 

3240 self._reconcile_executor.shutdown(wait=False, cancel_futures=True) 

3241 self._reconcile_executor = None 

3242 self._reconcile_futures.clear() 

3243 # Tear down per-node pubsubs (parity with async aclose) so they 

3244 # don't leak their dedicated connections and don't replay stale 

3245 # shard_channels via PubSub.on_connect on a subsequent reconnect. 

3246 # Errors are swallowed because reset() is also a fallback path 

3247 # from __del__; we cannot let one buggy per-node pubsub mask the 

3248 # rest of the teardown. 

3249 for pubsub in self.node_pubsub_mapping.values(): 

3250 try: 

3251 pubsub.reset() 

3252 except Exception: 

3253 pass 

3254 # Drop the now-dead per-node pubsubs from the mapping so the 

3255 # round-robin in _pubsubs_generator / _sharded_message_generator 

3256 # cannot yield them between teardown and re-subscription. 

3257 self.node_pubsub_mapping.clear() 

3258 # _pubsubs_generator captures node_pubsub_mapping.values() into 

3259 # a local list inside ``yield from``; clearing the mapping does 

3260 # not reach references already held by that captured snapshot, 

3261 # so a generator suspended mid-yield-from would still surface 

3262 # the now-reset() per-node pubsubs after re-subscription. 

3263 # Recreate it to drop the captured list. type(self) bypasses 

3264 # the instance-level self-shadow established at __init__ 

3265 # (self._pubsubs_generator = self._pubsubs_generator()). 

3266 self._pubsubs_generator = type(self)._pubsubs_generator(self) 

3267 super().reset() 

3268 self._shard_channel_to_node = {} 

3269 

3270 def get_redis_connection(self): 

3271 """ 

3272 Get the Redis connection of the pubsub connected node. 

3273 """ 

3274 if self.node is not None: 

3275 return self.node.redis_connection 

3276 

3277 def disconnect(self): 

3278 """ 

3279 Disconnect the pubsub connection. 

3280 """ 

3281 if self.connection: 

3282 self.connection.disconnect() 

3283 for pubsub in self.node_pubsub_mapping.values(): 

3284 if pubsub.connection: 

3285 pubsub.connection.disconnect() 

3286 

3287 

3288class ClusterPipeline(RedisCluster): 

3289 """ 

3290 Support for Redis pipeline 

3291 in cluster mode 

3292 """ 

3293 

3294 ERRORS_ALLOW_RETRY = ( 

3295 ConnectionError, 

3296 TimeoutError, 

3297 MovedError, 

3298 AskError, 

3299 TryAgainError, 

3300 ) 

3301 

3302 NO_SLOTS_COMMANDS = {"UNWATCH"} 

3303 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

3304 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

3305 

3306 @deprecated_args( 

3307 args_to_warn=[ 

3308 "cluster_error_retry_attempts", 

3309 ], 

3310 reason="Please configure the 'retry' object instead", 

3311 version="6.0.0", 

3312 ) 

3313 def __init__( 

3314 self, 

3315 nodes_manager: "NodesManager", 

3316 commands_parser: "CommandsParser", 

3317 result_callbacks: Optional[Dict[str, Callable]] = None, 

3318 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

3319 startup_nodes: Optional[List["ClusterNode"]] = None, 

3320 read_from_replicas: bool = False, 

3321 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

3322 cluster_error_retry_attempts: int = 3, 

3323 reinitialize_steps: int = 5, 

3324 retry: Optional[Retry] = None, 

3325 lock=None, 

3326 transaction=False, 

3327 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

3328 event_dispatcher: Optional["EventDispatcher"] = None, 

3329 **kwargs, 

3330 ): 

3331 """ """ 

3332 self.command_stack = [] 

3333 self.nodes_manager = nodes_manager 

3334 self.commands_parser = commands_parser 

3335 self.refresh_table_asap = False 

3336 self.result_callbacks = ( 

3337 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

3338 ) 

3339 self.startup_nodes = startup_nodes if startup_nodes else [] 

3340 self.read_from_replicas = read_from_replicas 

3341 self.load_balancing_strategy = load_balancing_strategy 

3342 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

3343 self.cluster_response_callbacks = cluster_response_callbacks 

3344 self.reinitialize_counter = 0 

3345 self.reinitialize_steps = reinitialize_steps 

3346 if retry is not None: 

3347 self.retry = retry 

3348 else: 

3349 self.retry = Retry( 

3350 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

3351 retries=cluster_error_retry_attempts, 

3352 ) 

3353 

3354 self.encoder = Encoder( 

3355 kwargs.get("encoding", "utf-8"), 

3356 kwargs.get("encoding_errors", "strict"), 

3357 kwargs.get("decode_responses", False), 

3358 ) 

3359 if lock is None: 

3360 lock = threading.RLock() 

3361 self._lock = lock 

3362 self.parent_execute_command = super().execute_command 

3363 self._execution_strategy: ExecutionStrategy = ( 

3364 PipelineStrategy(self) if not transaction else TransactionStrategy(self) 

3365 ) 

3366 

3367 # For backward compatibility, mapping from existing policies to new one 

3368 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

3369 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

3370 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

3371 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

3372 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

3373 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

3374 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

3375 } 

3376 

3377 self._policies_callback_mapping: dict[ 

3378 Union[RequestPolicy, ResponsePolicy], Callable 

3379 ] = { 

3380 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

3381 self.get_random_primary_or_all_nodes(command_name) 

3382 ], 

3383 RequestPolicy.DEFAULT_KEYED: lambda command, 

3384 *args: self.get_nodes_from_slot(command, *args), 

3385 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

3386 RequestPolicy.ALL_SHARDS: self.get_primaries, 

3387 RequestPolicy.ALL_NODES: self.get_nodes, 

3388 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

3389 RequestPolicy.MULTI_SHARD: lambda *args, 

3390 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

3391 RequestPolicy.SPECIAL: self.get_special_nodes, 

3392 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

3393 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

3394 } 

3395 

3396 self._policy_resolver = policy_resolver 

3397 

3398 if event_dispatcher is None: 

3399 self._event_dispatcher = EventDispatcher() 

3400 else: 

3401 self._event_dispatcher = event_dispatcher 

3402 

3403 def __repr__(self): 

3404 """ """ 

3405 return f"{type(self).__name__}" 

3406 

3407 def __enter__(self): 

3408 """ """ 

3409 return self 

3410 

3411 def __exit__(self, exc_type, exc_value, traceback): 

3412 """ """ 

3413 self.reset() 

3414 

3415 def __del__(self): 

3416 try: 

3417 self.reset() 

3418 except Exception: 

3419 pass 

3420 

3421 def __len__(self): 

3422 """ """ 

3423 return len(self._execution_strategy.command_queue) 

3424 

3425 def __bool__(self): 

3426 "Pipeline instances should always evaluate to True on Python 3+" 

3427 return True 

3428 

3429 def execute_command(self, *args, **kwargs): 

3430 """ 

3431 Wrapper function for pipeline_execute_command 

3432 """ 

3433 return self._execution_strategy.execute_command(*args, **kwargs) 

3434 

3435 def pipeline_execute_command(self, *args, **options): 

3436 """ 

3437 Stage a command to be executed when execute() is next called 

3438 

3439 Returns the current Pipeline object back so commands can be 

3440 chained together, such as: 

3441 

3442 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

3443 

3444 At some other point, you can then run: pipe.execute(), 

3445 which will execute all commands queued in the pipe. 

3446 """ 

3447 return self._execution_strategy.execute_command(*args, **options) 

3448 

3449 def annotate_exception(self, exception, number, command): 

3450 """ 

3451 Provides extra context to the exception prior to it being handled 

3452 """ 

3453 self._execution_strategy.annotate_exception(exception, number, command) 

3454 

3455 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3456 """ 

3457 Execute all the commands in the current pipeline 

3458 """ 

3459 

3460 try: 

3461 return self._execution_strategy.execute(raise_on_error) 

3462 finally: 

3463 self.reset() 

3464 

3465 def reset(self): 

3466 """ 

3467 Reset back to empty pipeline. 

3468 """ 

3469 self._execution_strategy.reset() 

3470 

3471 def send_cluster_commands( 

3472 self, stack, raise_on_error=True, allow_redirections=True 

3473 ): 

3474 return self._execution_strategy.send_cluster_commands( 

3475 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections 

3476 ) 

3477 

3478 def exists(self, *keys): 

3479 return self._execution_strategy.exists(*keys) 

3480 

3481 def eval(self): 

3482 """ """ 

3483 return self._execution_strategy.eval() 

3484 

3485 def multi(self): 

3486 """ 

3487 Start a transactional block of the pipeline after WATCH commands 

3488 are issued. End the transactional block with `execute`. 

3489 """ 

3490 self._execution_strategy.multi() 

3491 

3492 def load_scripts(self): 

3493 """ """ 

3494 self._execution_strategy.load_scripts() 

3495 

3496 def discard(self): 

3497 """ """ 

3498 self._execution_strategy.discard() 

3499 

3500 def watch(self, *names): 

3501 """Watches the values at keys ``names``""" 

3502 self._execution_strategy.watch(*names) 

3503 

3504 def unwatch(self): 

3505 """Unwatches all previously specified keys""" 

3506 self._execution_strategy.unwatch() 

3507 

3508 def script_load_for_pipeline(self, *args, **kwargs): 

3509 self._execution_strategy.script_load_for_pipeline(*args, **kwargs) 

3510 

3511 def delete(self, *names): 

3512 self._execution_strategy.delete(*names) 

3513 

3514 def unlink(self, *names): 

3515 self._execution_strategy.unlink(*names) 

3516 

3517 

3518def block_pipeline_command(name: str) -> Callable[..., Any]: 

3519 """ 

3520 Prints error because some pipelined commands should 

3521 be blocked when running in cluster-mode 

3522 """ 

3523 

3524 def inner(*args, **kwargs): 

3525 raise RedisClusterException( 

3526 f"ERROR: Calling pipelined function {name} is blocked " 

3527 f"when running redis in cluster mode..." 

3528 ) 

3529 

3530 return inner 

3531 

3532 

3533# Blocked pipeline commands 

3534PIPELINE_BLOCKED_COMMANDS = ( 

3535 "BGREWRITEAOF", 

3536 "BGSAVE", 

3537 "BITOP", 

3538 "BRPOPLPUSH", 

3539 "CLIENT GETNAME", 

3540 "CLIENT KILL", 

3541 "CLIENT LIST", 

3542 "CLIENT SETNAME", 

3543 "CLIENT", 

3544 "CONFIG GET", 

3545 "CONFIG RESETSTAT", 

3546 "CONFIG REWRITE", 

3547 "CONFIG SET", 

3548 "CONFIG", 

3549 "DBSIZE", 

3550 "ECHO", 

3551 "EVALSHA", 

3552 "FLUSHALL", 

3553 "FLUSHDB", 

3554 "INFO", 

3555 "KEYS", 

3556 "LASTSAVE", 

3557 "MGET", 

3558 "MGET NONATOMIC", 

3559 "MOVE", 

3560 "MSET", 

3561 "MSETEX", 

3562 "MSET NONATOMIC", 

3563 "MSETNX", 

3564 "PFCOUNT", 

3565 "PFMERGE", 

3566 "PING", 

3567 "PUBLISH", 

3568 "RANDOMKEY", 

3569 "READONLY", 

3570 "READWRITE", 

3571 "RENAME", 

3572 "RENAMENX", 

3573 "RPOPLPUSH", 

3574 "SAVE", 

3575 "SCAN", 

3576 "SCRIPT EXISTS", 

3577 "SCRIPT FLUSH", 

3578 "SCRIPT KILL", 

3579 "SCRIPT LOAD", 

3580 "SCRIPT", 

3581 "SDIFF", 

3582 "SDIFFSTORE", 

3583 "SENTINEL GET MASTER ADDR BY NAME", 

3584 "SENTINEL MASTER", 

3585 "SENTINEL MASTERS", 

3586 "SENTINEL MONITOR", 

3587 "SENTINEL REMOVE", 

3588 "SENTINEL SENTINELS", 

3589 "SENTINEL SET", 

3590 "SENTINEL SLAVES", 

3591 "SENTINEL", 

3592 "SHUTDOWN", 

3593 "SINTER", 

3594 "SINTERSTORE", 

3595 "SLAVEOF", 

3596 "SLOWLOG GET", 

3597 "SLOWLOG LEN", 

3598 "SLOWLOG RESET", 

3599 "SLOWLOG", 

3600 "SMOVE", 

3601 "SORT", 

3602 "SUNION", 

3603 "SUNIONSTORE", 

3604 "TIME", 

3605) 

3606for command in PIPELINE_BLOCKED_COMMANDS: 

3607 command = command.replace(" ", "_").lower() 

3608 

3609 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

3610 

3611 

3612class PipelineCommand: 

3613 """ """ 

3614 

3615 def __init__(self, args, options=None, position=None): 

3616 self.args = args 

3617 if options is None: 

3618 options = {} 

3619 self.options = options 

3620 self.position = position 

3621 self.result = None 

3622 self.node = None 

3623 self.asking = False 

3624 self.command_policies: Optional[CommandPolicies] = None 

3625 

3626 

3627class NodeCommands: 

3628 """ """ 

3629 

3630 def __init__( 

3631 self, parse_response, connection_pool: ConnectionPool, connection: Connection 

3632 ): 

3633 """ """ 

3634 self.parse_response = parse_response 

3635 self.connection_pool = connection_pool 

3636 self.connection = connection 

3637 self.commands = [] 

3638 

3639 def append(self, c): 

3640 """ """ 

3641 self.commands.append(c) 

3642 

3643 def write(self): 

3644 """ 

3645 Code borrowed from Redis so it can be fixed 

3646 """ 

3647 connection = self.connection 

3648 commands = self.commands 

3649 

3650 # We are going to clobber the commands with the write, so go ahead 

3651 # and ensure that nothing is sitting there from a previous run. 

3652 for c in commands: 

3653 c.result = None 

3654 

3655 # build up all commands into a single request to increase network perf 

3656 # send all the commands and catch connection and timeout errors. 

3657 try: 

3658 connection.send_packed_command( 

3659 connection.pack_commands([c.args for c in commands]) 

3660 ) 

3661 except (ConnectionError, TimeoutError) as e: 

3662 for c in commands: 

3663 c.result = e 

3664 

3665 def read(self): 

3666 """ """ 

3667 connection = self.connection 

3668 for c in self.commands: 

3669 # if there is a result on this command, 

3670 # it means we ran into an exception 

3671 # like a connection error. Trying to parse 

3672 # a response on a connection that 

3673 # is no longer open will result in a 

3674 # connection error raised by redis-py. 

3675 # but redis-py doesn't check in parse_response 

3676 # that the sock object is 

3677 # still set and if you try to 

3678 # read from a closed connection, it will 

3679 # result in an AttributeError because 

3680 # it will do a readline() call on None. 

3681 # This can have all kinds of nasty side-effects. 

3682 # Treating this case as a connection error 

3683 # is fine because it will dump 

3684 # the connection object back into the 

3685 # pool and on the next write, it will 

3686 # explicitly open the connection and all will be well. 

3687 if c.result is None: 

3688 try: 

3689 c.result = self.parse_response(connection, c.args[0], **c.options) 

3690 except (ConnectionError, TimeoutError) as e: 

3691 for c in self.commands: 

3692 c.result = e 

3693 return 

3694 except RedisError: 

3695 c.result = sys.exc_info()[1] 

3696 

3697 

3698class ExecutionStrategy(ABC): 

3699 @property 

3700 @abstractmethod 

3701 def command_queue(self): 

3702 pass 

3703 

3704 @abstractmethod 

3705 def execute_command(self, *args, **kwargs): 

3706 """ 

3707 Execution flow for current execution strategy. 

3708 

3709 See: ClusterPipeline.execute_command() 

3710 """ 

3711 pass 

3712 

3713 @abstractmethod 

3714 def annotate_exception(self, exception, number, command): 

3715 """ 

3716 Annotate exception according to current execution strategy. 

3717 

3718 See: ClusterPipeline.annotate_exception() 

3719 """ 

3720 pass 

3721 

3722 @abstractmethod 

3723 def pipeline_execute_command(self, *args, **options): 

3724 """ 

3725 Pipeline execution flow for current execution strategy. 

3726 

3727 See: ClusterPipeline.pipeline_execute_command() 

3728 """ 

3729 pass 

3730 

3731 @abstractmethod 

3732 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3733 """ 

3734 Executes current execution strategy. 

3735 

3736 See: ClusterPipeline.execute() 

3737 """ 

3738 pass 

3739 

3740 @abstractmethod 

3741 def send_cluster_commands( 

3742 self, stack, raise_on_error=True, allow_redirections=True 

3743 ): 

3744 """ 

3745 Sends commands according to current execution strategy. 

3746 

3747 See: ClusterPipeline.send_cluster_commands() 

3748 """ 

3749 pass 

3750 

3751 @abstractmethod 

3752 def reset(self): 

3753 """ 

3754 Resets current execution strategy. 

3755 

3756 See: ClusterPipeline.reset() 

3757 """ 

3758 pass 

3759 

3760 @abstractmethod 

3761 def exists(self, *keys): 

3762 pass 

3763 

3764 @abstractmethod 

3765 def eval(self): 

3766 pass 

3767 

3768 @abstractmethod 

3769 def multi(self): 

3770 """ 

3771 Starts transactional context. 

3772 

3773 See: ClusterPipeline.multi() 

3774 """ 

3775 pass 

3776 

3777 @abstractmethod 

3778 def load_scripts(self): 

3779 pass 

3780 

3781 @abstractmethod 

3782 def watch(self, *names): 

3783 pass 

3784 

3785 @abstractmethod 

3786 def unwatch(self): 

3787 """ 

3788 Unwatches all previously specified keys 

3789 

3790 See: ClusterPipeline.unwatch() 

3791 """ 

3792 pass 

3793 

3794 @abstractmethod 

3795 def script_load_for_pipeline(self, *args, **kwargs): 

3796 pass 

3797 

3798 @abstractmethod 

3799 def delete(self, *names): 

3800 """ 

3801 "Delete a key specified by ``names``" 

3802 

3803 See: ClusterPipeline.delete() 

3804 """ 

3805 pass 

3806 

3807 @abstractmethod 

3808 def unlink(self, *names): 

3809 """ 

3810 "Unlink a key specified by ``names``" 

3811 

3812 See: ClusterPipeline.unlink() 

3813 """ 

3814 pass 

3815 

3816 @abstractmethod 

3817 def discard(self): 

3818 pass 

3819 

3820 

3821class AbstractStrategy(ExecutionStrategy): 

3822 def __init__( 

3823 self, 

3824 pipe: ClusterPipeline, 

3825 ): 

3826 self._command_queue: List[PipelineCommand] = [] 

3827 self._pipe = pipe 

3828 self._nodes_manager = self._pipe.nodes_manager 

3829 

3830 @property 

3831 def command_queue(self): 

3832 return self._command_queue 

3833 

3834 @command_queue.setter 

3835 def command_queue(self, queue: List[PipelineCommand]): 

3836 self._command_queue = queue 

3837 

3838 @abstractmethod 

3839 def execute_command(self, *args, **kwargs): 

3840 pass 

3841 

3842 def pipeline_execute_command(self, *args, **options): 

3843 self._command_queue.append( 

3844 PipelineCommand(args, options, len(self._command_queue)) 

3845 ) 

3846 return self._pipe 

3847 

3848 @abstractmethod 

3849 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3850 pass 

3851 

3852 @abstractmethod 

3853 def send_cluster_commands( 

3854 self, stack, raise_on_error=True, allow_redirections=True 

3855 ): 

3856 pass 

3857 

3858 @abstractmethod 

3859 def reset(self): 

3860 pass 

3861 

3862 def exists(self, *keys): 

3863 return self.execute_command("EXISTS", *keys) 

3864 

3865 def eval(self): 

3866 """ """ 

3867 raise RedisClusterException("method eval() is not implemented") 

3868 

3869 def load_scripts(self): 

3870 """ """ 

3871 raise RedisClusterException("method load_scripts() is not implemented") 

3872 

3873 def script_load_for_pipeline(self, *args, **kwargs): 

3874 """ """ 

3875 raise RedisClusterException( 

3876 "method script_load_for_pipeline() is not implemented" 

3877 ) 

3878 

3879 def annotate_exception(self, exception, number, command): 

3880 """ 

3881 Provides extra context to the exception prior to it being handled 

3882 """ 

3883 cmd = " ".join(map(safe_str, command)) 

3884 msg = ( 

3885 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

3886 f"caused error: {exception.args[0]}" 

3887 ) 

3888 exception.args = (msg,) + exception.args[1:] 

3889 

3890 

3891class PipelineStrategy(AbstractStrategy): 

3892 def __init__(self, pipe: ClusterPipeline): 

3893 super().__init__(pipe) 

3894 self.command_flags = pipe.command_flags 

3895 

3896 def execute_command(self, *args, **kwargs): 

3897 return self.pipeline_execute_command(*args, **kwargs) 

3898 

3899 def _raise_first_error(self, stack, start_time): 

3900 """ 

3901 Raise the first exception on the stack 

3902 """ 

3903 for c in stack: 

3904 r = c.result 

3905 if isinstance(r, Exception): 

3906 self.annotate_exception(r, c.position + 1, c.args) 

3907 

3908 record_operation_duration( 

3909 command_name="PIPELINE", 

3910 duration_seconds=time.monotonic() - start_time, 

3911 error=r, 

3912 ) 

3913 

3914 raise r 

3915 

3916 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3917 stack = self._command_queue 

3918 if not stack: 

3919 return [] 

3920 

3921 try: 

3922 return self.send_cluster_commands(stack, raise_on_error) 

3923 finally: 

3924 self.reset() 

3925 

3926 def reset(self): 

3927 """ 

3928 Reset back to empty pipeline. 

3929 """ 

3930 self._command_queue = [] 

3931 

3932 def send_cluster_commands( 

3933 self, stack, raise_on_error=True, allow_redirections=True 

3934 ): 

3935 """ 

3936 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling. 

3937 

3938 If one of the retryable exceptions has been thrown we assume that: 

3939 - connection_pool was disconnected 

3940 - connection_pool was reset 

3941 - refresh_table_asap set to True 

3942 

3943 It will try the number of times specified by 

3944 the retries in config option "self.retry" 

3945 which defaults to 3 unless manually configured. 

3946 

3947 If it reaches the number of times, the command will 

3948 raises ClusterDownException. 

3949 """ 

3950 if not stack: 

3951 return [] 

3952 retry_attempts = self._pipe.retry.get_retries() 

3953 while True: 

3954 try: 

3955 return self._send_cluster_commands( 

3956 stack, 

3957 raise_on_error=raise_on_error, 

3958 allow_redirections=allow_redirections, 

3959 ) 

3960 except RedisCluster.ERRORS_ALLOW_RETRY as e: 

3961 if retry_attempts > 0: 

3962 # Try again with the new cluster setup. All other errors 

3963 # should be raised. 

3964 retry_attempts -= 1 

3965 pass 

3966 else: 

3967 raise e 

3968 

3969 def _send_cluster_commands( 

3970 self, stack, raise_on_error=True, allow_redirections=True 

3971 ): 

3972 """ 

3973 Send a bunch of cluster commands to the redis cluster. 

3974 

3975 `allow_redirections` If the pipeline should follow 

3976 `ASK` & `MOVED` responses automatically. If set 

3977 to false it will raise RedisClusterException. 

3978 """ 

3979 # the first time sending the commands we send all of 

3980 # the commands that were queued up. 

3981 # if we have to run through it again, we only retry 

3982 # the commands that failed. 

3983 attempt = sorted(stack, key=lambda x: x.position) 

3984 is_default_node = False 

3985 # build a list of node objects based on node names we need to 

3986 nodes: dict[str, NodeCommands] = {} 

3987 nodes_written = 0 

3988 nodes_read = 0 

3989 

3990 try: 

3991 # as we move through each command that still needs to be processed, 

3992 # we figure out the slot number that command maps to, then from 

3993 # the slot determine the node. 

3994 for c in attempt: 

3995 command_policies = self._pipe._policy_resolver.resolve( 

3996 c.args[0].lower() 

3997 ) 

3998 # refer to our internal node -> slot table that 

3999 # tells us where a given command should route to. 

4000 # (it might be possible we have a cached node that no longer 

4001 # exists in the cluster, which is why we do this in a loop) 

4002 passed_targets = c.options.pop("target_nodes", None) 

4003 if passed_targets and not self._is_nodes_flag(passed_targets): 

4004 target_nodes = self._parse_target_nodes(passed_targets) 

4005 

4006 if not command_policies: 

4007 command_policies = CommandPolicies() 

4008 else: 

4009 if not command_policies: 

4010 command = c.args[0].upper() 

4011 if ( 

4012 len(c.args) >= 2 

4013 and f"{c.args[0]} {c.args[1]}".upper() 

4014 in self._pipe.command_flags 

4015 ): 

4016 command = f"{c.args[0]} {c.args[1]}".upper() 

4017 

4018 # We only could resolve key properties if command is not 

4019 # in a list of pre-defined request policies 

4020 command_flag = self.command_flags.get(command) 

4021 if not command_flag: 

4022 # Fallback to default policy 

4023 if not self._pipe.get_default_node(): 

4024 keys = None 

4025 else: 

4026 keys = self._pipe._get_command_keys(*c.args) 

4027 if not keys or len(keys) == 0: 

4028 command_policies = CommandPolicies() 

4029 else: 

4030 command_policies = CommandPolicies( 

4031 request_policy=RequestPolicy.DEFAULT_KEYED, 

4032 response_policy=ResponsePolicy.DEFAULT_KEYED, 

4033 ) 

4034 else: 

4035 if command_flag in self._pipe._command_flags_mapping: 

4036 command_policies = CommandPolicies( 

4037 request_policy=self._pipe._command_flags_mapping[ 

4038 command_flag 

4039 ] 

4040 ) 

4041 else: 

4042 command_policies = CommandPolicies() 

4043 

4044 target_nodes = self._determine_nodes( 

4045 *c.args, 

4046 request_policy=command_policies.request_policy, 

4047 node_flag=passed_targets, 

4048 ) 

4049 if not target_nodes: 

4050 raise RedisClusterException( 

4051 f"No targets were found to execute {c.args} command on" 

4052 ) 

4053 c.command_policies = command_policies 

4054 if len(target_nodes) > 1: 

4055 raise RedisClusterException( 

4056 f"Too many targets for command {c.args}" 

4057 ) 

4058 

4059 node = target_nodes[0] 

4060 if node == self._pipe.get_default_node(): 

4061 is_default_node = True 

4062 

4063 # now that we know the name of the node 

4064 # ( it's just a string in the form of host:port ) 

4065 # we can build a list of commands for each node. 

4066 node_name = node.name 

4067 if node_name not in nodes: 

4068 redis_node = self._pipe.get_redis_connection(node) 

4069 try: 

4070 connection = get_connection(redis_node) 

4071 except (ConnectionError, TimeoutError): 

4072 # Release any connections we've already acquired before clearing nodes 

4073 for n in nodes.values(): 

4074 n.connection_pool.release(n.connection) 

4075 # Connection retries are being handled in the node's 

4076 # Retry object. Reinitialize the node -> slot table. 

4077 self._nodes_manager.initialize() 

4078 if is_default_node: 

4079 self._pipe.replace_default_node() 

4080 nodes = {} 

4081 raise 

4082 nodes[node_name] = NodeCommands( 

4083 redis_node.parse_response, 

4084 redis_node.connection_pool, 

4085 connection, 

4086 ) 

4087 nodes[node_name].append(c) 

4088 

4089 # send the commands in sequence. 

4090 # we write to all the open sockets for each node first, 

4091 # before reading anything 

4092 # this allows us to flush all the requests out across the 

4093 # network 

4094 # so that we can read them from different sockets as they come back. 

4095 # we don't multiplex on the sockets as they come available, 

4096 # but that shouldn't make too much difference. 

4097 

4098 # Start timing for observability 

4099 start_time = time.monotonic() 

4100 

4101 node_commands = nodes.values() 

4102 for n in node_commands: 

4103 nodes_written += 1 

4104 n.write() 

4105 

4106 for n in node_commands: 

4107 n.read() 

4108 

4109 # Find the first error in this node's commands, if any 

4110 node_error = None 

4111 for cmd in n.commands: 

4112 if isinstance(cmd.result, Exception): 

4113 node_error = cmd.result 

4114 break 

4115 

4116 record_operation_duration( 

4117 command_name="PIPELINE", 

4118 duration_seconds=time.monotonic() - start_time, 

4119 server_address=n.connection.host, 

4120 server_port=n.connection.port, 

4121 db_namespace=str(n.connection.db), 

4122 error=node_error, 

4123 ) 

4124 nodes_read += 1 

4125 finally: 

4126 # release all the redis connections we allocated earlier 

4127 # back into the connection pool. 

4128 # if the connection is dirty (that is: we've written 

4129 # commands to it, but haven't read the responses), we need 

4130 # to close the connection before returning it to the pool. 

4131 # otherwise, the next caller to use this connection will 

4132 # read the response from _this_ request, not its own request. 

4133 # disconnecting discards the dirty state & forces the next 

4134 # caller to reconnect. 

4135 # NOTE: dicts have a consistent ordering; we're iterating 

4136 # through nodes.values() in the same order as we are when 

4137 # reading / writing to the connections above, which is critical 

4138 # for how we're using the nodes_written/nodes_read offsets. 

4139 for i, n in enumerate(nodes.values()): 

4140 if i < nodes_written and i >= nodes_read: 

4141 n.connection.disconnect() 

4142 n.connection_pool.release(n.connection) 

4143 

4144 # if the response isn't an exception it is a 

4145 # valid response from the node 

4146 # we're all done with that command, YAY! 

4147 # if we have more commands to attempt, we've run into problems. 

4148 # collect all the commands we are allowed to retry. 

4149 # (MOVED, ASK, or connection errors or timeout errors) 

4150 attempt = sorted( 

4151 ( 

4152 c 

4153 for c in attempt 

4154 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

4155 ), 

4156 key=lambda x: x.position, 

4157 ) 

4158 if attempt and allow_redirections: 

4159 # RETRY MAGIC HAPPENS HERE! 

4160 # send these remaining commands one at a time using `execute_command` 

4161 # in the main client. This keeps our retry logic 

4162 # in one place mostly, 

4163 # and allows us to be more confident in correctness of behavior. 

4164 # at this point any speed gains from pipelining have been lost 

4165 # anyway, so we might as well make the best 

4166 # attempt to get the correct behavior. 

4167 # 

4168 # The client command will handle retries for each 

4169 # individual command sequentially as we pass each 

4170 # one into `execute_command`. Any exceptions 

4171 # that bubble out should only appear once all 

4172 # retries have been exhausted. 

4173 # 

4174 # If a lot of commands have failed, we'll be setting the 

4175 # flag to rebuild the slots table from scratch. 

4176 # So MOVED errors should correct themselves fairly quickly. 

4177 self._pipe.reinitialize_counter += 1 

4178 if self._pipe._should_reinitialized(): 

4179 self._nodes_manager.initialize() 

4180 if is_default_node: 

4181 self._pipe.replace_default_node() 

4182 for c in attempt: 

4183 try: 

4184 # send each command individually like we 

4185 # do in the main client. 

4186 c.result = self._pipe.parent_execute_command(*c.args, **c.options) 

4187 except RedisError as e: 

4188 c.result = e 

4189 

4190 # turn the response back into a simple flat array that corresponds 

4191 # to the sequence of commands issued in the stack in pipeline.execute() 

4192 response = [] 

4193 for c in sorted(stack, key=lambda x: x.position): 

4194 if c.args[0] in self._pipe.cluster_response_callbacks: 

4195 # Remove keys entry, it needs only for cache. 

4196 c.options.pop("keys", None) 

4197 c.result = self._pipe._policies_callback_mapping[ 

4198 c.command_policies.response_policy 

4199 ]( 

4200 self._pipe.cluster_response_callbacks[c.args[0]]( 

4201 c.result, **c.options 

4202 ) 

4203 ) 

4204 response.append(c.result) 

4205 

4206 if raise_on_error: 

4207 self._raise_first_error(stack, start_time) 

4208 

4209 return response 

4210 

4211 def _is_nodes_flag(self, target_nodes): 

4212 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags 

4213 

4214 def _parse_target_nodes(self, target_nodes): 

4215 if isinstance(target_nodes, list): 

4216 nodes = target_nodes 

4217 elif isinstance(target_nodes, ClusterNode): 

4218 # Supports passing a single ClusterNode as a variable 

4219 nodes = [target_nodes] 

4220 elif isinstance(target_nodes, dict): 

4221 # Supports dictionaries of the format {node_name: node}. 

4222 # It enables to execute commands with multi nodes as follows: 

4223 # rc.cluster_save_config(rc.get_primaries()) 

4224 nodes = target_nodes.values() 

4225 else: 

4226 raise TypeError( 

4227 "target_nodes type can be one of the following: " 

4228 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

4229 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

4230 f"The passed type is {type(target_nodes)}" 

4231 ) 

4232 return nodes 

4233 

4234 def _determine_nodes( 

4235 self, *args, request_policy: RequestPolicy, **kwargs 

4236 ) -> List["ClusterNode"]: 

4237 # Determine which nodes should be executed the command on. 

4238 # Returns a list of target nodes. 

4239 command = args[0].upper() 

4240 if ( 

4241 len(args) >= 2 

4242 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags 

4243 ): 

4244 command = f"{args[0]} {args[1]}".upper() 

4245 

4246 nodes_flag = kwargs.pop("nodes_flag", None) 

4247 if nodes_flag is not None: 

4248 # nodes flag passed by the user 

4249 command_flag = nodes_flag 

4250 else: 

4251 # get the nodes group for this command if it was predefined 

4252 command_flag = self._pipe.command_flags.get(command) 

4253 

4254 if command_flag in self._pipe._command_flags_mapping: 

4255 request_policy = self._pipe._command_flags_mapping[command_flag] 

4256 

4257 policy_callback = self._pipe._policies_callback_mapping[request_policy] 

4258 

4259 if request_policy == RequestPolicy.DEFAULT_KEYED: 

4260 nodes = policy_callback(command, *args) 

4261 elif request_policy == RequestPolicy.MULTI_SHARD: 

4262 nodes = policy_callback(*args, **kwargs) 

4263 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

4264 nodes = policy_callback(args[0]) 

4265 else: 

4266 nodes = policy_callback() 

4267 

4268 if args[0].lower() == "ft.aggregate": 

4269 self._aggregate_nodes = nodes 

4270 

4271 return nodes 

4272 

4273 def multi(self): 

4274 raise RedisClusterException( 

4275 "method multi() is not supported outside of transactional context" 

4276 ) 

4277 

4278 def discard(self): 

4279 raise RedisClusterException( 

4280 "method discard() is not supported outside of transactional context" 

4281 ) 

4282 

4283 def watch(self, *names): 

4284 raise RedisClusterException( 

4285 "method watch() is not supported outside of transactional context" 

4286 ) 

4287 

4288 def unwatch(self, *names): 

4289 raise RedisClusterException( 

4290 "method unwatch() is not supported outside of transactional context" 

4291 ) 

4292 

4293 def delete(self, *names): 

4294 if len(names) != 1: 

4295 raise RedisClusterException( 

4296 "deleting multiple keys is not implemented in pipeline command" 

4297 ) 

4298 

4299 return self.execute_command("DEL", names[0]) 

4300 

4301 def unlink(self, *names): 

4302 if len(names) != 1: 

4303 raise RedisClusterException( 

4304 "unlinking multiple keys is not implemented in pipeline command" 

4305 ) 

4306 

4307 return self.execute_command("UNLINK", names[0]) 

4308 

4309 

4310class TransactionStrategy(AbstractStrategy): 

4311 NO_SLOTS_COMMANDS = {"UNWATCH"} 

4312 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

4313 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

4314 SLOT_REDIRECT_ERRORS = (AskError, MovedError) 

4315 CONNECTION_ERRORS = ( 

4316 ConnectionError, 

4317 OSError, 

4318 ClusterDownError, 

4319 SlotNotCoveredError, 

4320 ) 

4321 

4322 def __init__(self, pipe: ClusterPipeline): 

4323 super().__init__(pipe) 

4324 self._explicit_transaction = False 

4325 self._watching = False 

4326 self._pipeline_slots: Set[int] = set() 

4327 self._transaction_connection: Optional[Connection] = None 

4328 self._executing = False 

4329 self._retry = copy(self._pipe.retry) 

4330 self._retry.update_supported_errors( 

4331 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS 

4332 ) 

4333 

4334 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]: 

4335 """ 

4336 Find a connection for a pipeline transaction. 

4337 

4338 For running an atomic transaction, watch keys ensure that contents have not been 

4339 altered as long as the watch commands for those keys were sent over the same 

4340 connection. So once we start watching a key, we fetch a connection to the 

4341 node that owns that slot and reuse it. 

4342 """ 

4343 if not self._pipeline_slots: 

4344 raise RedisClusterException( 

4345 "At least a command with a key is needed to identify a node" 

4346 ) 

4347 

4348 node: ClusterNode = self._nodes_manager.get_node_from_slot( 

4349 list(self._pipeline_slots)[0], False 

4350 ) 

4351 redis_node: Redis = self._pipe.get_redis_connection(node) 

4352 if self._transaction_connection: 

4353 if not redis_node.connection_pool.owns_connection( 

4354 self._transaction_connection 

4355 ): 

4356 previous_node = self._nodes_manager.find_connection_owner( 

4357 self._transaction_connection 

4358 ) 

4359 previous_node.connection_pool.release(self._transaction_connection) 

4360 self._transaction_connection = None 

4361 

4362 if not self._transaction_connection: 

4363 self._transaction_connection = get_connection(redis_node) 

4364 

4365 return redis_node, self._transaction_connection 

4366 

4367 def execute_command(self, *args, **kwargs): 

4368 slot_number: Optional[int] = None 

4369 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS: 

4370 slot_number = self._pipe.determine_slot(*args) 

4371 

4372 if ( 

4373 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS 

4374 ) and not self._explicit_transaction: 

4375 if args[0] == "WATCH": 

4376 self._validate_watch() 

4377 

4378 if slot_number is not None: 

4379 if self._pipeline_slots and slot_number not in self._pipeline_slots: 

4380 raise CrossSlotTransactionError( 

4381 "Cannot watch or send commands on different slots" 

4382 ) 

4383 

4384 self._pipeline_slots.add(slot_number) 

4385 elif args[0] not in self.NO_SLOTS_COMMANDS: 

4386 raise RedisClusterException( 

4387 f"Cannot identify slot number for command: {args[0]}," 

4388 "it cannot be triggered in a transaction" 

4389 ) 

4390 

4391 return self._immediate_execute_command(*args, **kwargs) 

4392 else: 

4393 if slot_number is not None: 

4394 self._pipeline_slots.add(slot_number) 

4395 

4396 return self.pipeline_execute_command(*args, **kwargs) 

4397 

4398 def _validate_watch(self): 

4399 if self._explicit_transaction: 

4400 raise RedisError("Cannot issue a WATCH after a MULTI") 

4401 

4402 self._watching = True 

4403 

4404 def _immediate_execute_command(self, *args, **options): 

4405 return self._retry.call_with_retry( 

4406 lambda: self._get_connection_and_send_command(*args, **options), 

4407 self._reinitialize_on_error, 

4408 with_failure_count=True, 

4409 ) 

4410 

4411 def _get_connection_and_send_command(self, *args, **options): 

4412 redis_node, connection = self._get_client_and_connection_for_transaction() 

4413 

4414 # Start timing for observability 

4415 start_time = time.monotonic() 

4416 

4417 try: 

4418 response = self._send_command_parse_response( 

4419 connection, redis_node, args[0], *args, **options 

4420 ) 

4421 

4422 record_operation_duration( 

4423 command_name=args[0], 

4424 duration_seconds=time.monotonic() - start_time, 

4425 server_address=connection.host, 

4426 server_port=connection.port, 

4427 db_namespace=str(connection.db), 

4428 ) 

4429 

4430 return response 

4431 except Exception as e: 

4432 if connection: 

4433 # this is used to report the metrics based on host and port info 

4434 e.connection = connection 

4435 record_operation_duration( 

4436 command_name=args[0], 

4437 duration_seconds=time.monotonic() - start_time, 

4438 server_address=connection.host, 

4439 server_port=connection.port, 

4440 db_namespace=str(connection.db), 

4441 error=e, 

4442 ) 

4443 raise 

4444 

4445 def _send_command_parse_response( 

4446 self, conn, redis_node: Redis, command_name, *args, **options 

4447 ): 

4448 """ 

4449 Send a command and parse the response 

4450 """ 

4451 

4452 conn.send_command(*args) 

4453 output = redis_node.parse_response(conn, command_name, **options) 

4454 

4455 if command_name in self.UNWATCH_COMMANDS: 

4456 self._watching = False 

4457 return output 

4458 

4459 def _reinitialize_on_error(self, error, failure_count): 

4460 if hasattr(error, "connection"): 

4461 record_error_count( 

4462 server_address=error.connection.host, 

4463 server_port=error.connection.port, 

4464 network_peer_address=error.connection.host, 

4465 network_peer_port=error.connection.port, 

4466 error_type=error, 

4467 retry_attempts=failure_count, 

4468 is_internal=True, 

4469 ) 

4470 

4471 if self._watching: 

4472 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: 

4473 raise WatchError("Slot rebalancing occurred while watching keys") 

4474 

4475 if ( 

4476 type(error) in self.SLOT_REDIRECT_ERRORS 

4477 or type(error) in self.CONNECTION_ERRORS 

4478 ): 

4479 if self._transaction_connection: 

4480 if is_debug_log_enabled(): 

4481 logger.debug( 

4482 f"Operation failed, " 

4483 f"with connection: {self._transaction_connection}, " 

4484 f"details: {self._transaction_connection.extract_connection_details()}", 

4485 ) 

4486 # Disconnect and release back to pool 

4487 self._transaction_connection.disconnect() 

4488 node = self._nodes_manager.find_connection_owner( 

4489 self._transaction_connection 

4490 ) 

4491 if node and node.redis_connection: 

4492 node.redis_connection.connection_pool.release( 

4493 self._transaction_connection 

4494 ) 

4495 self._transaction_connection = None 

4496 

4497 self._pipe.reinitialize_counter += 1 

4498 if self._pipe._should_reinitialized(): 

4499 self._nodes_manager.initialize() 

4500 self.reinitialize_counter = 0 

4501 else: 

4502 if isinstance(error, AskError): 

4503 self._nodes_manager.move_slot(error) 

4504 

4505 self._executing = False 

4506 

4507 def _raise_first_error(self, responses, stack, start_time): 

4508 """ 

4509 Raise the first exception on the stack 

4510 """ 

4511 for r, cmd in zip(responses, stack): 

4512 if isinstance(r, Exception): 

4513 self.annotate_exception(r, cmd.position + 1, cmd.args) 

4514 

4515 record_operation_duration( 

4516 command_name="TRANSACTION", 

4517 duration_seconds=time.monotonic() - start_time, 

4518 server_address=self._transaction_connection.host, 

4519 server_port=self._transaction_connection.port, 

4520 db_namespace=str(self._transaction_connection.db), 

4521 ) 

4522 

4523 raise r 

4524 

4525 def execute(self, raise_on_error: bool = True) -> List[Any]: 

4526 stack = self._command_queue 

4527 if not stack and (not self._watching or not self._pipeline_slots): 

4528 return [] 

4529 

4530 return self._execute_transaction_with_retries(stack, raise_on_error) 

4531 

4532 def _execute_transaction_with_retries( 

4533 self, stack: List["PipelineCommand"], raise_on_error: bool 

4534 ): 

4535 return self._retry.call_with_retry( 

4536 lambda: self._execute_transaction(stack, raise_on_error), 

4537 lambda error, failure_count: self._reinitialize_on_error( 

4538 error, failure_count 

4539 ), 

4540 with_failure_count=True, 

4541 ) 

4542 

4543 def _execute_transaction( 

4544 self, stack: List["PipelineCommand"], raise_on_error: bool 

4545 ): 

4546 if len(self._pipeline_slots) > 1: 

4547 raise CrossSlotTransactionError( 

4548 "All keys involved in a cluster transaction must map to the same slot" 

4549 ) 

4550 

4551 self._executing = True 

4552 

4553 redis_node, connection = self._get_client_and_connection_for_transaction() 

4554 

4555 stack = chain( 

4556 [PipelineCommand(("MULTI",))], 

4557 stack, 

4558 [PipelineCommand(("EXEC",))], 

4559 ) 

4560 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options] 

4561 packed_commands = connection.pack_commands(commands) 

4562 

4563 # Start timing for observability 

4564 start_time = time.monotonic() 

4565 

4566 connection.send_packed_command(packed_commands) 

4567 errors = [] 

4568 

4569 # parse off the response for MULTI 

4570 # NOTE: we need to handle ResponseErrors here and continue 

4571 # so that we read all the additional command messages from 

4572 # the socket 

4573 try: 

4574 redis_node.parse_response(connection, "MULTI") 

4575 except ResponseError as e: 

4576 self.annotate_exception(e, 0, "MULTI") 

4577 errors.append(e) 

4578 except self.CONNECTION_ERRORS as cluster_error: 

4579 self.annotate_exception(cluster_error, 0, "MULTI") 

4580 raise 

4581 

4582 # and all the other commands 

4583 for i, command in enumerate(self._command_queue): 

4584 if EMPTY_RESPONSE in command.options: 

4585 errors.append((i, command.options[EMPTY_RESPONSE])) 

4586 else: 

4587 try: 

4588 _ = redis_node.parse_response(connection, "_") 

4589 except self.SLOT_REDIRECT_ERRORS as slot_error: 

4590 self.annotate_exception(slot_error, i + 1, command.args) 

4591 errors.append(slot_error) 

4592 except self.CONNECTION_ERRORS as cluster_error: 

4593 self.annotate_exception(cluster_error, i + 1, command.args) 

4594 raise 

4595 except ResponseError as e: 

4596 self.annotate_exception(e, i + 1, command.args) 

4597 errors.append(e) 

4598 

4599 response = None 

4600 # parse the EXEC. 

4601 try: 

4602 response = redis_node.parse_response(connection, "EXEC") 

4603 except ExecAbortError: 

4604 if errors: 

4605 raise errors[0] 

4606 raise 

4607 

4608 self._executing = False 

4609 

4610 record_operation_duration( 

4611 command_name="TRANSACTION", 

4612 duration_seconds=time.monotonic() - start_time, 

4613 server_address=connection.host, 

4614 server_port=connection.port, 

4615 db_namespace=str(connection.db), 

4616 ) 

4617 

4618 # EXEC clears any watched keys 

4619 self._watching = False 

4620 

4621 if response is None: 

4622 raise WatchError("Watched variable changed.") 

4623 

4624 # put any parse errors into the response 

4625 for i, e in errors: 

4626 response.insert(i, e) 

4627 

4628 if len(response) != len(self._command_queue): 

4629 raise InvalidPipelineStack( 

4630 "Unexpected response length for cluster pipeline EXEC." 

4631 " Command stack was {} but response had length {}".format( 

4632 [c.args[0] for c in self._command_queue], len(response) 

4633 ) 

4634 ) 

4635 

4636 # find any errors in the response and raise if necessary 

4637 if raise_on_error or len(errors) > 0: 

4638 self._raise_first_error( 

4639 response, 

4640 self._command_queue, 

4641 start_time, 

4642 ) 

4643 

4644 # We have to run response callbacks manually 

4645 data = [] 

4646 for r, cmd in zip(response, self._command_queue): 

4647 if not isinstance(r, Exception): 

4648 command_name = cmd.args[0] 

4649 if command_name in self._pipe.cluster_response_callbacks: 

4650 r = self._pipe.cluster_response_callbacks[command_name]( 

4651 r, **cmd.options 

4652 ) 

4653 data.append(r) 

4654 return data 

4655 

4656 def reset(self): 

4657 self._command_queue = [] 

4658 

4659 # make sure to reset the connection state in the event that we were 

4660 # watching something 

4661 if self._transaction_connection: 

4662 try: 

4663 if self._watching: 

4664 # call this manually since our unwatch or 

4665 # immediate_execute_command methods can call reset() 

4666 self._transaction_connection.send_command("UNWATCH") 

4667 self._transaction_connection.read_response() 

4668 # we can safely return the connection to the pool here since we're 

4669 # sure we're no longer WATCHing anything 

4670 node = self._nodes_manager.find_connection_owner( 

4671 self._transaction_connection 

4672 ) 

4673 if node and node.redis_connection: 

4674 node.redis_connection.connection_pool.release( 

4675 self._transaction_connection 

4676 ) 

4677 self._transaction_connection = None 

4678 except self.CONNECTION_ERRORS: 

4679 # disconnect will also remove any previous WATCHes 

4680 if self._transaction_connection: 

4681 self._transaction_connection.disconnect() 

4682 node = self._nodes_manager.find_connection_owner( 

4683 self._transaction_connection 

4684 ) 

4685 if node and node.redis_connection: 

4686 node.redis_connection.connection_pool.release( 

4687 self._transaction_connection 

4688 ) 

4689 self._transaction_connection = None 

4690 

4691 # clean up the other instance attributes 

4692 self._watching = False 

4693 self._explicit_transaction = False 

4694 self._pipeline_slots = set() 

4695 self._executing = False 

4696 

4697 def send_cluster_commands( 

4698 self, stack, raise_on_error=True, allow_redirections=True 

4699 ): 

4700 raise NotImplementedError( 

4701 "send_cluster_commands cannot be executed in transactional context." 

4702 ) 

4703 

4704 def multi(self): 

4705 if self._explicit_transaction: 

4706 raise RedisError("Cannot issue nested calls to MULTI") 

4707 if self._command_queue: 

4708 raise RedisError( 

4709 "Commands without an initial WATCH have already been issued" 

4710 ) 

4711 self._explicit_transaction = True 

4712 

4713 def watch(self, *names): 

4714 if self._explicit_transaction: 

4715 raise RedisError("Cannot issue a WATCH after a MULTI") 

4716 

4717 return self.execute_command("WATCH", *names) 

4718 

4719 def unwatch(self): 

4720 if self._watching: 

4721 return self.execute_command("UNWATCH") 

4722 

4723 return True 

4724 

4725 def discard(self): 

4726 self.reset() 

4727 

4728 def delete(self, *names): 

4729 return self.execute_command("DEL", *names) 

4730 

4731 def unlink(self, *names): 

4732 return self.execute_command("UNLINK", *names)