Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1593 statements  

1import logging 

2import random 

3import socket 

4import sys 

5import threading 

6import time 

7from abc import ABC, abstractmethod 

8from collections import OrderedDict 

9from copy import copy 

10from enum import Enum 

11from itertools import chain 

12from typing import ( 

13 Any, 

14 Callable, 

15 Dict, 

16 List, 

17 Literal, 

18 Optional, 

19 Set, 

20 Tuple, 

21 Union, 

22) 

23 

24from redis._parsers import CommandsParser, Encoder 

25from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy 

26from redis._parsers.helpers import parse_scan 

27from redis.backoff import ExponentialWithJitterBackoff, NoBackoff 

28from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface 

29from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis 

30from redis.commands import READ_COMMANDS, RedisClusterCommands 

31from redis.commands.helpers import list_or_args 

32from redis.commands.policies import PolicyResolver, StaticPolicyResolver 

33from redis.connection import ( 

34 Connection, 

35 ConnectionPool, 

36 parse_url, 

37) 

38from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

39from redis.event import ( 

40 AfterPooledConnectionsInstantiationEvent, 

41 AfterPubSubConnectionInstantiationEvent, 

42 ClientType, 

43 EventDispatcher, 

44) 

45from redis.exceptions import ( 

46 AskError, 

47 AuthenticationError, 

48 ClusterDownError, 

49 ClusterError, 

50 ConnectionError, 

51 CrossSlotTransactionError, 

52 DataError, 

53 ExecAbortError, 

54 InvalidPipelineStack, 

55 MaxConnectionsError, 

56 MovedError, 

57 RedisClusterException, 

58 RedisError, 

59 ResponseError, 

60 SlotNotCoveredError, 

61 TimeoutError, 

62 TryAgainError, 

63 WatchError, 

64) 

65from redis.lock import Lock 

66from redis.maint_notifications import ( 

67 MaintNotificationsConfig, 

68 OSSMaintNotificationsHandler, 

69) 

70from redis.observability.recorder import ( 

71 record_error_count, 

72 record_operation_duration, 

73) 

74from redis.retry import Retry 

75from redis.utils import ( 

76 check_protocol_version, 

77 deprecated_args, 

78 deprecated_function, 

79 dict_merge, 

80 list_keys_to_dict, 

81 merge_result, 

82 safe_str, 

83 str_if_bytes, 

84 truncate_text, 

85) 

86 

87logger = logging.getLogger(__name__) 

88 

89 

90def is_debug_log_enabled(): 

91 return logger.isEnabledFor(logging.DEBUG) 

92 

93 

94def get_node_name(host: str, port: Union[str, int]) -> str: 

95 return f"{host}:{port}" 

96 

97 

98@deprecated_args( 

99 allowed_args=["redis_node"], 

100 reason="Use get_connection(redis_node) instead", 

101 version="5.3.0", 

102) 

103def get_connection(redis_node: Redis, *args, **options) -> Connection: 

104 return redis_node.connection or redis_node.connection_pool.get_connection() 

105 

106 

107def parse_scan_result(command, res, **options): 

108 cursors = {} 

109 ret = [] 

110 for node_name, response in res.items(): 

111 cursor, r = parse_scan(response, **options) 

112 cursors[node_name] = cursor 

113 ret += r 

114 

115 return cursors, ret 

116 

117 

118def parse_pubsub_numsub(command, res, **options): 

119 numsub_d = OrderedDict() 

120 for numsub_tups in res.values(): 

121 for channel, numsubbed in numsub_tups: 

122 try: 

123 numsub_d[channel] += numsubbed 

124 except KeyError: 

125 numsub_d[channel] = numsubbed 

126 

127 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

128 return ret_numsub 

129 

130 

131def parse_cluster_slots( 

132 resp: Any, **options: Any 

133) -> Dict[Tuple[int, int], Dict[str, Any]]: 

134 current_host = options.get("current_host", "") 

135 

136 def fix_server(*args: Any) -> Tuple[str, Any]: 

137 return str_if_bytes(args[0]) or current_host, args[1] 

138 

139 slots = {} 

140 for slot in resp: 

141 start, end, primary = slot[:3] 

142 replicas = slot[3:] 

143 slots[start, end] = { 

144 "primary": fix_server(*primary), 

145 "replicas": [fix_server(*replica) for replica in replicas], 

146 } 

147 

148 return slots 

149 

150 

151def parse_cluster_shards(resp, **options): 

152 """ 

153 Parse CLUSTER SHARDS response. 

154 """ 

155 if isinstance(resp[0], dict): 

156 return resp 

157 shards = [] 

158 for x in resp: 

159 shard = {"slots": [], "nodes": []} 

160 for i in range(0, len(x[1]), 2): 

161 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

162 nodes = x[3] 

163 for node in nodes: 

164 dict_node = {} 

165 for i in range(0, len(node), 2): 

166 dict_node[node[i]] = node[i + 1] 

167 shard["nodes"].append(dict_node) 

168 shards.append(shard) 

169 

170 return shards 

171 

172 

173def parse_cluster_myshardid(resp, **options): 

174 """ 

175 Parse CLUSTER MYSHARDID response. 

176 """ 

177 return resp.decode("utf-8") 

178 

179 

180PRIMARY = "primary" 

181REPLICA = "replica" 

182SLOT_ID = "slot-id" 

183 

184REDIS_ALLOWED_KEYS = ( 

185 "connection_class", 

186 "connection_pool", 

187 "connection_pool_class", 

188 "client_name", 

189 "credential_provider", 

190 "db", 

191 "decode_responses", 

192 "encoding", 

193 "encoding_errors", 

194 "host", 

195 "lib_name", 

196 "lib_version", 

197 "max_connections", 

198 "nodes_flag", 

199 "redis_connect_func", 

200 "password", 

201 "port", 

202 "timeout", 

203 "queue_class", 

204 "retry", 

205 "retry_on_timeout", 

206 "protocol", 

207 "socket_connect_timeout", 

208 "socket_keepalive", 

209 "socket_keepalive_options", 

210 "socket_timeout", 

211 "ssl", 

212 "ssl_ca_certs", 

213 "ssl_ca_data", 

214 "ssl_ca_path", 

215 "ssl_certfile", 

216 "ssl_cert_reqs", 

217 "ssl_include_verify_flags", 

218 "ssl_exclude_verify_flags", 

219 "ssl_keyfile", 

220 "ssl_password", 

221 "ssl_check_hostname", 

222 "unix_socket_path", 

223 "username", 

224 "cache", 

225 "cache_config", 

226 "maint_notifications_config", 

227) 

228KWARGS_DISABLED_KEYS = ("host", "port", "retry") 

229 

230 

231def cleanup_kwargs(**kwargs): 

232 """ 

233 Remove unsupported or disabled keys from kwargs 

234 """ 

235 connection_kwargs = { 

236 k: v 

237 for k, v in kwargs.items() 

238 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

239 } 

240 

241 return connection_kwargs 

242 

243 

244class MaintNotificationsAbstractRedisCluster: 

245 """ 

246 Abstract class for handling maintenance notifications logic. 

247 This class is expected to be used as base class together with RedisCluster. 

248 

249 This class is intended to be used with multiple inheritance! 

250 

251 All logic related to maintenance notifications is encapsulated in this class. 

252 """ 

253 

254 def __init__( 

255 self, 

256 maint_notifications_config: Optional[MaintNotificationsConfig], 

257 **kwargs, 

258 ): 

259 # Initialize maintenance notifications 

260 is_protocol_supported = check_protocol_version(kwargs.get("protocol"), 3) 

261 

262 if ( 

263 maint_notifications_config 

264 and maint_notifications_config.enabled 

265 and not is_protocol_supported 

266 ): 

267 raise RedisError( 

268 "Maintenance notifications handlers on connection are only supported with RESP version 3" 

269 ) 

270 if maint_notifications_config is None and is_protocol_supported: 

271 maint_notifications_config = MaintNotificationsConfig() 

272 

273 self.maint_notifications_config = maint_notifications_config 

274 

275 if self.maint_notifications_config and self.maint_notifications_config.enabled: 

276 self._oss_cluster_maint_notifications_handler = ( 

277 OSSMaintNotificationsHandler(self, self.maint_notifications_config) 

278 ) 

279 # Update connection kwargs for all future nodes connections 

280 self._update_connection_kwargs_for_maint_notifications( 

281 self._oss_cluster_maint_notifications_handler 

282 ) 

283 # Update existing nodes connections - they are created as part of the RedisCluster constructor 

284 for node in self.get_nodes(): 

285 if node.redis_connection is None: 

286 continue 

287 node.redis_connection.connection_pool.update_maint_notifications_config( 

288 self.maint_notifications_config, 

289 oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler, 

290 ) 

291 else: 

292 self._oss_cluster_maint_notifications_handler = None 

293 

294 def _update_connection_kwargs_for_maint_notifications( 

295 self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler 

296 ): 

297 """ 

298 Update the connection kwargs for all future connections. 

299 """ 

300 self.nodes_manager.connection_kwargs.update( 

301 { 

302 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler, 

303 } 

304 ) 

305 

306 

307class AbstractRedisCluster: 

308 RedisClusterRequestTTL = 16 

309 

310 PRIMARIES = "primaries" 

311 REPLICAS = "replicas" 

312 ALL_NODES = "all" 

313 RANDOM = "random" 

314 DEFAULT_NODE = "default-node" 

315 

316 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

317 

318 COMMAND_FLAGS = dict_merge( 

319 list_keys_to_dict( 

320 [ 

321 "ACL CAT", 

322 "ACL DELUSER", 

323 "ACL DRYRUN", 

324 "ACL GENPASS", 

325 "ACL GETUSER", 

326 "ACL HELP", 

327 "ACL LIST", 

328 "ACL LOG", 

329 "ACL LOAD", 

330 "ACL SAVE", 

331 "ACL SETUSER", 

332 "ACL USERS", 

333 "ACL WHOAMI", 

334 "AUTH", 

335 "CLIENT LIST", 

336 "CLIENT SETINFO", 

337 "CLIENT SETNAME", 

338 "CLIENT GETNAME", 

339 "CONFIG SET", 

340 "CONFIG REWRITE", 

341 "CONFIG RESETSTAT", 

342 "TIME", 

343 "PUBSUB CHANNELS", 

344 "PUBSUB NUMPAT", 

345 "PUBSUB NUMSUB", 

346 "PUBSUB SHARDCHANNELS", 

347 "PUBSUB SHARDNUMSUB", 

348 "PING", 

349 "INFO", 

350 "SHUTDOWN", 

351 "KEYS", 

352 "DBSIZE", 

353 "BGSAVE", 

354 "SLOWLOG GET", 

355 "SLOWLOG LEN", 

356 "SLOWLOG RESET", 

357 "WAIT", 

358 "WAITAOF", 

359 "SAVE", 

360 "MEMORY PURGE", 

361 "MEMORY MALLOC-STATS", 

362 "MEMORY STATS", 

363 "LASTSAVE", 

364 "CLIENT TRACKINGINFO", 

365 "CLIENT PAUSE", 

366 "CLIENT UNPAUSE", 

367 "CLIENT UNBLOCK", 

368 "CLIENT ID", 

369 "CLIENT REPLY", 

370 "CLIENT GETREDIR", 

371 "CLIENT INFO", 

372 "CLIENT KILL", 

373 "READONLY", 

374 "CLUSTER INFO", 

375 "CLUSTER MEET", 

376 "CLUSTER MYSHARDID", 

377 "CLUSTER NODES", 

378 "CLUSTER REPLICAS", 

379 "CLUSTER RESET", 

380 "CLUSTER SET-CONFIG-EPOCH", 

381 "CLUSTER SLOTS", 

382 "CLUSTER SHARDS", 

383 "CLUSTER COUNT-FAILURE-REPORTS", 

384 "CLUSTER KEYSLOT", 

385 "COMMAND", 

386 "COMMAND COUNT", 

387 "COMMAND LIST", 

388 "COMMAND GETKEYS", 

389 "CONFIG GET", 

390 "DEBUG", 

391 "RANDOMKEY", 

392 "READONLY", 

393 "READWRITE", 

394 "TIME", 

395 "TFUNCTION LOAD", 

396 "TFUNCTION DELETE", 

397 "TFUNCTION LIST", 

398 "TFCALL", 

399 "TFCALLASYNC", 

400 "LATENCY HISTORY", 

401 "LATENCY LATEST", 

402 "LATENCY RESET", 

403 "MODULE LIST", 

404 "MODULE LOAD", 

405 "MODULE UNLOAD", 

406 "MODULE LOADEX", 

407 ], 

408 DEFAULT_NODE, 

409 ), 

410 list_keys_to_dict( 

411 [ 

412 "FLUSHALL", 

413 "FLUSHDB", 

414 "FUNCTION DELETE", 

415 "FUNCTION FLUSH", 

416 "FUNCTION LIST", 

417 "FUNCTION LOAD", 

418 "FUNCTION RESTORE", 

419 "SCAN", 

420 "SCRIPT EXISTS", 

421 "SCRIPT FLUSH", 

422 "SCRIPT LOAD", 

423 ], 

424 PRIMARIES, 

425 ), 

426 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

427 list_keys_to_dict( 

428 [ 

429 "CLUSTER COUNTKEYSINSLOT", 

430 "CLUSTER DELSLOTS", 

431 "CLUSTER DELSLOTSRANGE", 

432 "CLUSTER GETKEYSINSLOT", 

433 "CLUSTER SETSLOT", 

434 ], 

435 SLOT_ID, 

436 ), 

437 ) 

438 

439 SEARCH_COMMANDS = ( 

440 [ 

441 "FT.CREATE", 

442 "FT.SEARCH", 

443 "FT.AGGREGATE", 

444 "FT.EXPLAIN", 

445 "FT.EXPLAINCLI", 

446 "FT,PROFILE", 

447 "FT.ALTER", 

448 "FT.DROPINDEX", 

449 "FT.ALIASADD", 

450 "FT.ALIASUPDATE", 

451 "FT.ALIASDEL", 

452 "FT.TAGVALS", 

453 "FT.SUGADD", 

454 "FT.SUGGET", 

455 "FT.SUGDEL", 

456 "FT.SUGLEN", 

457 "FT.SYNUPDATE", 

458 "FT.SYNDUMP", 

459 "FT.SPELLCHECK", 

460 "FT.DICTADD", 

461 "FT.DICTDEL", 

462 "FT.DICTDUMP", 

463 "FT.INFO", 

464 "FT._LIST", 

465 "FT.CONFIG", 

466 "FT.ADD", 

467 "FT.DEL", 

468 "FT.DROP", 

469 "FT.GET", 

470 "FT.MGET", 

471 "FT.SYNADD", 

472 ], 

473 ) 

474 

475 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

476 "CLUSTER SLOTS": parse_cluster_slots, 

477 "CLUSTER SHARDS": parse_cluster_shards, 

478 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

479 } 

480 

481 RESULT_CALLBACKS = dict_merge( 

482 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub), 

483 list_keys_to_dict( 

484 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

485 ), 

486 list_keys_to_dict( 

487 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result 

488 ), 

489 list_keys_to_dict( 

490 [ 

491 "PING", 

492 "CONFIG SET", 

493 "CONFIG REWRITE", 

494 "CONFIG RESETSTAT", 

495 "CLIENT SETNAME", 

496 "BGSAVE", 

497 "SLOWLOG RESET", 

498 "SAVE", 

499 "MEMORY PURGE", 

500 "CLIENT PAUSE", 

501 "CLIENT UNPAUSE", 

502 ], 

503 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

504 ), 

505 list_keys_to_dict( 

506 ["DBSIZE", "WAIT"], 

507 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

508 ), 

509 list_keys_to_dict( 

510 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

511 ), 

512 list_keys_to_dict(["SCAN"], parse_scan_result), 

513 list_keys_to_dict( 

514 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

515 ), 

516 list_keys_to_dict( 

517 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

518 ), 

519 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

520 ) 

521 

522 ERRORS_ALLOW_RETRY = ( 

523 ConnectionError, 

524 TimeoutError, 

525 ClusterDownError, 

526 SlotNotCoveredError, 

527 ) 

528 

529 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

530 """Replace the default cluster node. 

531 A random cluster node will be chosen if target_node isn't passed, and primaries 

532 will be prioritized. The default node will not be changed if there are no other 

533 nodes in the cluster. 

534 

535 Args: 

536 target_node (ClusterNode, optional): Target node to replace the default 

537 node. Defaults to None. 

538 """ 

539 if target_node: 

540 self.nodes_manager.default_node = target_node 

541 else: 

542 curr_node = self.get_default_node() 

543 primaries = [node for node in self.get_primaries() if node != curr_node] 

544 if primaries: 

545 # Choose a primary if the cluster contains different primaries 

546 self.nodes_manager.default_node = random.choice(primaries) 

547 else: 

548 # Otherwise, choose a primary if the cluster contains different primaries 

549 replicas = [node for node in self.get_replicas() if node != curr_node] 

550 if replicas: 

551 self.nodes_manager.default_node = random.choice(replicas) 

552 

553 

554class RedisCluster( 

555 AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands 

556): 

557 @classmethod 

558 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": 

559 """ 

560 Return a Redis client object configured from the given URL 

561 

562 For example:: 

563 

564 redis://[[username]:[password]]@localhost:6379/0 

565 rediss://[[username]:[password]]@localhost:6379/0 

566 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

567 

568 Three URL schemes are supported: 

569 

570 - `redis://` creates a TCP socket connection. See more at: 

571 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

572 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

573 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

574 - ``unix://``: creates a Unix Domain Socket connection. 

575 

576 The username, password, hostname, path and all querystring values 

577 are passed through urllib.parse.unquote in order to replace any 

578 percent-encoded values with their corresponding characters. 

579 

580 There are several ways to specify a database number. The first value 

581 found will be used: 

582 

583 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

584 2. If using the redis:// or rediss:// schemes, the path argument 

585 of the url, e.g. redis://localhost/0 

586 3. A ``db`` keyword argument to this function. 

587 

588 If none of these options are specified, the default db=0 is used. 

589 

590 All querystring options are cast to their appropriate Python types. 

591 Boolean arguments can be specified with string values "True"/"False" 

592 or "Yes"/"No". Values that cannot be properly cast cause a 

593 ``ValueError`` to be raised. Once parsed, the querystring arguments 

594 and keyword arguments are passed to the ``ConnectionPool``'s 

595 class initializer. In the case of conflicting arguments, querystring 

596 arguments always win. 

597 

598 """ 

599 return cls(url=url, **kwargs) 

600 

601 @deprecated_args( 

602 args_to_warn=["read_from_replicas"], 

603 reason="Please configure the 'load_balancing_strategy' instead", 

604 version="5.3.0", 

605 ) 

606 @deprecated_args( 

607 args_to_warn=[ 

608 "cluster_error_retry_attempts", 

609 ], 

610 reason="Please configure the 'retry' object instead", 

611 version="6.0.0", 

612 ) 

613 def __init__( 

614 self, 

615 host: Optional[str] = None, 

616 port: int = 6379, 

617 startup_nodes: Optional[List["ClusterNode"]] = None, 

618 cluster_error_retry_attempts: int = 3, 

619 retry: Optional["Retry"] = None, 

620 require_full_coverage: bool = True, 

621 reinitialize_steps: int = 5, 

622 read_from_replicas: bool = False, 

623 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None, 

624 dynamic_startup_nodes: bool = True, 

625 url: Optional[str] = None, 

626 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

627 cache: Optional[CacheInterface] = None, 

628 cache_config: Optional[CacheConfig] = None, 

629 event_dispatcher: Optional[EventDispatcher] = None, 

630 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

631 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

632 **kwargs, 

633 ): 

634 """ 

635 Initialize a new RedisCluster client. 

636 

637 :param startup_nodes: 

638 List of nodes from which initial bootstrapping can be done 

639 :param host: 

640 Can be used to point to a startup node 

641 :param port: 

642 Can be used to point to a startup node 

643 :param require_full_coverage: 

644 When set to False (default value): the client will not require a 

645 full coverage of the slots. However, if not all slots are covered, 

646 and at least one node has 'cluster-require-full-coverage' set to 

647 'yes,' the server will throw a ClusterDownError for some key-based 

648 commands. See - 

649 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

650 When set to True: all slots must be covered to construct the 

651 cluster client. If not all slots are covered, RedisClusterException 

652 will be thrown. 

653 :param read_from_replicas: 

654 @deprecated - please use load_balancing_strategy instead 

655 Enable read from replicas in READONLY mode. You can read possibly 

656 stale data. 

657 When set to true, read commands will be assigned between the 

658 primary and its replications in a Round-Robin manner. 

659 :param load_balancing_strategy: 

660 Enable read from replicas in READONLY mode and defines the load balancing 

661 strategy that will be used for cluster node selection. 

662 The data read from replicas is eventually consistent with the data in primary nodes. 

663 :param dynamic_startup_nodes: 

664 Set the RedisCluster's startup nodes to all of the discovered nodes. 

665 If true (default value), the cluster's discovered nodes will be used to 

666 determine the cluster nodes-slots mapping in the next topology refresh. 

667 It will remove the initial passed startup nodes if their endpoints aren't 

668 listed in the CLUSTER SLOTS output. 

669 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

670 specific IP addresses, it is best to set it to false. 

671 :param cluster_error_retry_attempts: 

672 @deprecated - Please configure the 'retry' object instead 

673 In case 'retry' object is set - this argument is ignored! 

674 

675 Number of times to retry before raising an error when 

676 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or 

677 :class:`~.ClusterDownError` are encountered 

678 :param retry: 

679 A retry object that defines the retry strategy and the number of 

680 retries for the cluster client. 

681 In current implementation for the cluster client (starting form redis-py version 6.0.0) 

682 the retry object is not yet fully utilized, instead it is used just to determine 

683 the number of retries for the cluster client. 

684 In the future releases the retry object will be used to handle the cluster client retries! 

685 :param reinitialize_steps: 

686 Specifies the number of MOVED errors that need to occur before 

687 reinitializing the whole cluster topology. If a MOVED error occurs 

688 and the cluster does not need to be reinitialized on this current 

689 error handling, only the MOVED slot will be patched with the 

690 redirected node. 

691 To reinitialize the cluster on every MOVED error, set 

692 reinitialize_steps to 1. 

693 To avoid reinitializing the cluster on moved errors, set 

694 reinitialize_steps to 0. 

695 :param address_remap: 

696 An optional callable which, when provided with an internal network 

697 address of a node, e.g. a `(host, port)` tuple, will return the address 

698 where the node is reachable. This can be used to map the addresses at 

699 which the nodes _think_ they are, to addresses at which a client may 

700 reach them, such as when they sit behind a proxy. 

701 

702 :param maint_notifications_config: 

703 Configures the nodes connections to support maintenance notifications - see 

704 `redis.maint_notifications.MaintNotificationsConfig` for details. 

705 Only supported with RESP3. 

706 If not provided and protocol is RESP3, the maintenance notifications 

707 will be enabled by default (logic is included in the NodesManager 

708 initialization). 

709 :**kwargs: 

710 Extra arguments that will be sent into Redis instance when created 

711 (See Official redis-py doc for supported kwargs - the only limitation 

712 is that you can't provide 'retry' object as part of kwargs. 

713 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

714 Some kwargs are not supported and will raise a 

715 RedisClusterException: 

716 - db (Redis do not support database SELECT in cluster mode) 

717 

718 """ 

719 if startup_nodes is None: 

720 startup_nodes = [] 

721 

722 if "db" in kwargs: 

723 # Argument 'db' is not possible to use in cluster mode 

724 raise RedisClusterException( 

725 "Argument 'db' is not possible to use in cluster mode" 

726 ) 

727 

728 if "retry" in kwargs: 

729 # Argument 'retry' is not possible to be used in kwargs when in cluster mode 

730 # the kwargs are set to the lower level connections to the cluster nodes 

731 # and there we provide retry configuration without retries allowed. 

732 # The retries should be handled on cluster client level. 

733 raise RedisClusterException( 

734 "The 'retry' argument cannot be used in kwargs when running in cluster mode." 

735 ) 

736 

737 # Get the startup node/s 

738 from_url = False 

739 if url is not None: 

740 from_url = True 

741 url_options = parse_url(url) 

742 if "path" in url_options: 

743 raise RedisClusterException( 

744 "RedisCluster does not currently support Unix Domain " 

745 "Socket connections" 

746 ) 

747 if "db" in url_options and url_options["db"] != 0: 

748 # Argument 'db' is not possible to use in cluster mode 

749 raise RedisClusterException( 

750 "A ``db`` querystring option can only be 0 in cluster mode" 

751 ) 

752 kwargs.update(url_options) 

753 host = kwargs.get("host") 

754 port = kwargs.get("port", port) 

755 startup_nodes.append(ClusterNode(host, port)) 

756 elif host is not None and port is not None: 

757 startup_nodes.append(ClusterNode(host, port)) 

758 elif len(startup_nodes) == 0: 

759 # No startup node was provided 

760 raise RedisClusterException( 

761 "RedisCluster requires at least one node to discover the " 

762 "cluster. Please provide one of the followings:\n" 

763 "1. host and port, for example:\n" 

764 " RedisCluster(host='localhost', port=6379)\n" 

765 "2. list of startup nodes, for example:\n" 

766 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

767 " ClusterNode('localhost', 6378)])" 

768 ) 

769 # Update the connection arguments 

770 # Whenever a new connection is established, RedisCluster's on_connect 

771 # method should be run 

772 # If the user passed on_connect function we'll save it and run it 

773 # inside the RedisCluster.on_connect() function 

774 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

775 kwargs.update({"redis_connect_func": self.on_connect}) 

776 kwargs = cleanup_kwargs(**kwargs) 

777 if retry: 

778 self.retry = retry 

779 else: 

780 self.retry = Retry( 

781 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

782 retries=cluster_error_retry_attempts, 

783 ) 

784 

785 self.encoder = Encoder( 

786 kwargs.get("encoding", "utf-8"), 

787 kwargs.get("encoding_errors", "strict"), 

788 kwargs.get("decode_responses", False), 

789 ) 

790 protocol = kwargs.get("protocol", None) 

791 if (cache_config or cache) and not check_protocol_version(protocol, 3): 

792 raise RedisError("Client caching is only supported with RESP version 3") 

793 

794 if maint_notifications_config and not check_protocol_version(protocol, 3): 

795 raise RedisError( 

796 "Maintenance notifications are only supported with RESP version 3" 

797 ) 

798 if check_protocol_version(protocol, 3) and maint_notifications_config is None: 

799 maint_notifications_config = MaintNotificationsConfig() 

800 

801 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

802 self.node_flags = self.__class__.NODE_FLAGS.copy() 

803 self.read_from_replicas = read_from_replicas 

804 self.load_balancing_strategy = load_balancing_strategy 

805 self.reinitialize_counter = 0 

806 self.reinitialize_steps = reinitialize_steps 

807 if event_dispatcher is None: 

808 self._event_dispatcher = EventDispatcher() 

809 else: 

810 self._event_dispatcher = event_dispatcher 

811 self.startup_nodes = startup_nodes 

812 

813 self.nodes_manager = NodesManager( 

814 startup_nodes=startup_nodes, 

815 from_url=from_url, 

816 require_full_coverage=require_full_coverage, 

817 dynamic_startup_nodes=dynamic_startup_nodes, 

818 address_remap=address_remap, 

819 cache=cache, 

820 cache_config=cache_config, 

821 event_dispatcher=self._event_dispatcher, 

822 maint_notifications_config=maint_notifications_config, 

823 **kwargs, 

824 ) 

825 

826 self.cluster_response_callbacks = CaseInsensitiveDict( 

827 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

828 ) 

829 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

830 

831 # For backward compatibility, mapping from existing policies to new one 

832 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

833 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

834 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

835 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

836 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

837 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

838 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

839 } 

840 

841 self._policies_callback_mapping: dict[ 

842 Union[RequestPolicy, ResponsePolicy], Callable 

843 ] = { 

844 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

845 self.get_random_primary_or_all_nodes(command_name) 

846 ], 

847 RequestPolicy.DEFAULT_KEYED: lambda command, 

848 *args: self.get_nodes_from_slot(command, *args), 

849 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

850 RequestPolicy.ALL_SHARDS: self.get_primaries, 

851 RequestPolicy.ALL_NODES: self.get_nodes, 

852 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

853 RequestPolicy.MULTI_SHARD: lambda *args, 

854 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

855 RequestPolicy.SPECIAL: self.get_special_nodes, 

856 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

857 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

858 } 

859 

860 self._policy_resolver = policy_resolver 

861 self.commands_parser = CommandsParser(self) 

862 

863 # Node where FT.AGGREGATE command is executed. 

864 self._aggregate_nodes = None 

865 self._lock = threading.RLock() 

866 

867 MaintNotificationsAbstractRedisCluster.__init__( 

868 self, maint_notifications_config, **kwargs 

869 ) 

870 

871 def __enter__(self): 

872 return self 

873 

874 def __exit__(self, exc_type, exc_value, traceback): 

875 self.close() 

876 

877 def __del__(self): 

878 try: 

879 self.close() 

880 except Exception: 

881 pass 

882 

883 def disconnect_connection_pools(self): 

884 for node in self.get_nodes(): 

885 if node.redis_connection: 

886 try: 

887 node.redis_connection.connection_pool.disconnect() 

888 except OSError: 

889 # Client was already disconnected. do nothing 

890 pass 

891 

892 def on_connect(self, connection): 

893 """ 

894 Initialize the connection, authenticate and select a database and send 

895 READONLY if it is set during object initialization. 

896 """ 

897 connection.on_connect() 

898 

899 if self.read_from_replicas or self.load_balancing_strategy: 

900 # Sending READONLY command to server to configure connection as 

901 # readonly. Since each cluster node may change its server type due 

902 # to a failover, we should establish a READONLY connection 

903 # regardless of the server type. If this is a primary connection, 

904 # READONLY would not affect executing write commands. 

905 connection.send_command("READONLY") 

906 if str_if_bytes(connection.read_response()) != "OK": 

907 raise ConnectionError("READONLY command failed") 

908 

909 if self.user_on_connect_func is not None: 

910 self.user_on_connect_func(connection) 

911 

912 def get_redis_connection(self, node: "ClusterNode") -> Redis: 

913 if not node.redis_connection: 

914 with self._lock: 

915 if not node.redis_connection: 

916 self.nodes_manager.create_redis_connections([node]) 

917 return node.redis_connection 

918 

919 def get_node(self, host=None, port=None, node_name=None): 

920 return self.nodes_manager.get_node(host, port, node_name) 

921 

922 def get_primaries(self): 

923 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

924 

925 def get_replicas(self): 

926 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

927 

928 def get_random_node(self): 

929 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

930 

931 def get_random_primary_or_all_nodes(self, command_name): 

932 """ 

933 Returns random primary or all nodes depends on READONLY mode. 

934 """ 

935 if self.read_from_replicas and command_name in READ_COMMANDS: 

936 return self.get_random_node() 

937 

938 return self.get_random_primary_node() 

939 

940 def get_nodes(self): 

941 return list(self.nodes_manager.nodes_cache.values()) 

942 

943 def get_node_from_key(self, key, replica=False): 

944 """ 

945 Get the node that holds the key's slot. 

946 If replica set to True but the slot doesn't have any replicas, None is 

947 returned. 

948 """ 

949 slot = self.keyslot(key) 

950 slot_cache = self.nodes_manager.slots_cache.get(slot) 

951 if slot_cache is None or len(slot_cache) == 0: 

952 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

953 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

954 return None 

955 elif replica: 

956 node_idx = 1 

957 else: 

958 # primary 

959 node_idx = 0 

960 

961 return slot_cache[node_idx] 

962 

963 def get_default_node(self): 

964 """ 

965 Get the cluster's default node 

966 """ 

967 return self.nodes_manager.default_node 

968 

969 def get_nodes_from_slot(self, command: str, *args): 

970 """ 

971 Returns a list of nodes that hold the specified keys' slots. 

972 """ 

973 # get the node that holds the key's slot 

974 slot = self.determine_slot(*args) 

975 node = self.nodes_manager.get_node_from_slot( 

976 slot, 

977 self.read_from_replicas and command in READ_COMMANDS, 

978 self.load_balancing_strategy if command in READ_COMMANDS else None, 

979 ) 

980 return [node] 

981 

982 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]: 

983 """ 

984 Splits the command with Multi-Shard policy, to the multiple commands 

985 """ 

986 keys = self._get_command_keys(*args) 

987 commands = [] 

988 

989 for key in keys: 

990 commands.append( 

991 { 

992 "args": (args[0], key), 

993 "kwargs": kwargs, 

994 } 

995 ) 

996 

997 return commands 

998 

999 def get_special_nodes(self) -> Optional[list["ClusterNode"]]: 

1000 """ 

1001 Returns a list of nodes for commands with a special policy. 

1002 """ 

1003 if not self._aggregate_nodes: 

1004 raise RedisClusterException( 

1005 "Cannot execute FT.CURSOR commands without FT.AGGREGATE" 

1006 ) 

1007 

1008 return self._aggregate_nodes 

1009 

1010 def get_random_primary_node(self) -> "ClusterNode": 

1011 """ 

1012 Returns a random primary node 

1013 """ 

1014 return random.choice(self.get_primaries()) 

1015 

1016 def _evaluate_all_succeeded(self, res): 

1017 """ 

1018 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED 

1019 """ 

1020 first_successful_response = None 

1021 

1022 if isinstance(res, dict): 

1023 for key, value in res.items(): 

1024 if value: 

1025 if first_successful_response is None: 

1026 first_successful_response = {key: value} 

1027 else: 

1028 return {key: False} 

1029 else: 

1030 for response in res: 

1031 if response: 

1032 if first_successful_response is None: 

1033 # Dynamically resolve type 

1034 first_successful_response = type(response)(response) 

1035 else: 

1036 return type(response)(False) 

1037 

1038 return first_successful_response 

1039 

1040 def set_default_node(self, node): 

1041 """ 

1042 Set the default node of the cluster. 

1043 :param node: 'ClusterNode' 

1044 :return True if the default node was set, else False 

1045 """ 

1046 if node is None or self.get_node(node_name=node.name) is None: 

1047 return False 

1048 self.nodes_manager.default_node = node 

1049 return True 

1050 

1051 def set_retry(self, retry: Retry) -> None: 

1052 self.retry = retry 

1053 

1054 def monitor(self, target_node=None): 

1055 """ 

1056 Returns a Monitor object for the specified target node. 

1057 The default cluster node will be selected if no target node was 

1058 specified. 

1059 Monitor is useful for handling the MONITOR command to the redis server. 

1060 next_command() method returns one command from monitor 

1061 listen() method yields commands from monitor. 

1062 """ 

1063 if target_node is None: 

1064 target_node = self.get_default_node() 

1065 if target_node.redis_connection is None: 

1066 raise RedisClusterException( 

1067 f"Cluster Node {target_node.name} has no redis_connection" 

1068 ) 

1069 return target_node.redis_connection.monitor() 

1070 

1071 def pubsub(self, node=None, host=None, port=None, **kwargs): 

1072 """ 

1073 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

1074 connected to the specified node 

1075 """ 

1076 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

1077 

1078 def pipeline(self, transaction=None, shard_hint=None): 

1079 """ 

1080 Cluster impl: 

1081 Pipelines do not work in cluster mode the same way they 

1082 do in normal mode. Create a clone of this object so 

1083 that simulating pipelines will work correctly. Each 

1084 command will be called directly when used and 

1085 when calling execute() will only return the result stack. 

1086 """ 

1087 if shard_hint: 

1088 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

1089 

1090 return ClusterPipeline( 

1091 nodes_manager=self.nodes_manager, 

1092 commands_parser=self.commands_parser, 

1093 startup_nodes=self.nodes_manager.startup_nodes, 

1094 result_callbacks=self.result_callbacks, 

1095 cluster_response_callbacks=self.cluster_response_callbacks, 

1096 read_from_replicas=self.read_from_replicas, 

1097 load_balancing_strategy=self.load_balancing_strategy, 

1098 reinitialize_steps=self.reinitialize_steps, 

1099 retry=self.retry, 

1100 lock=self._lock, 

1101 transaction=transaction, 

1102 event_dispatcher=self._event_dispatcher, 

1103 ) 

1104 

1105 def lock( 

1106 self, 

1107 name, 

1108 timeout=None, 

1109 sleep=0.1, 

1110 blocking=True, 

1111 blocking_timeout=None, 

1112 lock_class=None, 

1113 thread_local=True, 

1114 raise_on_release_error: bool = True, 

1115 ): 

1116 """ 

1117 Return a new Lock object using key ``name`` that mimics 

1118 the behavior of threading.Lock. 

1119 

1120 If specified, ``timeout`` indicates a maximum life for the lock. 

1121 By default, it will remain locked until release() is called. 

1122 

1123 ``sleep`` indicates the amount of time to sleep per loop iteration 

1124 when the lock is in blocking mode and another client is currently 

1125 holding the lock. 

1126 

1127 ``blocking`` indicates whether calling ``acquire`` should block until 

1128 the lock has been acquired or to fail immediately, causing ``acquire`` 

1129 to return False and the lock not being acquired. Defaults to True. 

1130 Note this value can be overridden by passing a ``blocking`` 

1131 argument to ``acquire``. 

1132 

1133 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

1134 spend trying to acquire the lock. A value of ``None`` indicates 

1135 continue trying forever. ``blocking_timeout`` can be specified as a 

1136 float or integer, both representing the number of seconds to wait. 

1137 

1138 ``lock_class`` forces the specified lock implementation. Note that as 

1139 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

1140 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

1141 you have created your own custom lock class. 

1142 

1143 ``thread_local`` indicates whether the lock token is placed in 

1144 thread-local storage. By default, the token is placed in thread local 

1145 storage so that a thread only sees its token, not a token set by 

1146 another thread. Consider the following timeline: 

1147 

1148 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

1149 thread-1 sets the token to "abc" 

1150 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

1151 Lock instance. 

1152 time: 5, thread-1 has not yet completed. redis expires the lock 

1153 key. 

1154 time: 5, thread-2 acquired `my-lock` now that it's available. 

1155 thread-2 sets the token to "xyz" 

1156 time: 6, thread-1 finishes its work and calls release(). if the 

1157 token is *not* stored in thread local storage, then 

1158 thread-1 would see the token value as "xyz" and would be 

1159 able to successfully release the thread-2's lock. 

1160 

1161 ``raise_on_release_error`` indicates whether to raise an exception when 

1162 the lock is no longer owned when exiting the context manager. By default, 

1163 this is True, meaning an exception will be raised. If False, the warning 

1164 will be logged and the exception will be suppressed. 

1165 

1166 In some use cases it's necessary to disable thread local storage. For 

1167 example, if you have code where one thread acquires a lock and passes 

1168 that lock instance to a worker thread to release later. If thread 

1169 local storage isn't disabled in this case, the worker thread won't see 

1170 the token set by the thread that acquired the lock. Our assumption 

1171 is that these cases aren't common and as such default to using 

1172 thread local storage.""" 

1173 if lock_class is None: 

1174 lock_class = Lock 

1175 return lock_class( 

1176 self, 

1177 name, 

1178 timeout=timeout, 

1179 sleep=sleep, 

1180 blocking=blocking, 

1181 blocking_timeout=blocking_timeout, 

1182 thread_local=thread_local, 

1183 raise_on_release_error=raise_on_release_error, 

1184 ) 

1185 

1186 def set_response_callback(self, command, callback): 

1187 """Set a custom Response Callback""" 

1188 self.cluster_response_callbacks[command] = callback 

1189 

1190 def _determine_nodes( 

1191 self, *args, request_policy: RequestPolicy, **kwargs 

1192 ) -> List["ClusterNode"]: 

1193 """ 

1194 Determines a nodes the command should be executed on. 

1195 """ 

1196 command = args[0].upper() 

1197 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1198 command = f"{args[0]} {args[1]}".upper() 

1199 

1200 nodes_flag = kwargs.pop("nodes_flag", None) 

1201 if nodes_flag is not None: 

1202 # nodes flag passed by the user 

1203 command_flag = nodes_flag 

1204 else: 

1205 # get the nodes group for this command if it was predefined 

1206 command_flag = self.command_flags.get(command) 

1207 

1208 if command_flag in self._command_flags_mapping: 

1209 request_policy = self._command_flags_mapping[command_flag] 

1210 

1211 policy_callback = self._policies_callback_mapping[request_policy] 

1212 

1213 if request_policy == RequestPolicy.DEFAULT_KEYED: 

1214 nodes = policy_callback(command, *args) 

1215 elif request_policy == RequestPolicy.MULTI_SHARD: 

1216 nodes = policy_callback(*args, **kwargs) 

1217 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

1218 nodes = policy_callback(args[0]) 

1219 else: 

1220 nodes = policy_callback() 

1221 

1222 if args[0].lower() == "ft.aggregate": 

1223 self._aggregate_nodes = nodes 

1224 

1225 return nodes 

1226 

1227 def _should_reinitialized(self): 

1228 # To reinitialize the cluster on every MOVED error, 

1229 # set reinitialize_steps to 1. 

1230 # To avoid reinitializing the cluster on moved errors, set 

1231 # reinitialize_steps to 0. 

1232 if self.reinitialize_steps == 0: 

1233 return False 

1234 else: 

1235 return self.reinitialize_counter % self.reinitialize_steps == 0 

1236 

1237 def keyslot(self, key): 

1238 """ 

1239 Calculate keyslot for a given key. 

1240 See Keys distribution model in https://redis.io/topics/cluster-spec 

1241 """ 

1242 k = self.encoder.encode(key) 

1243 return key_slot(k) 

1244 

1245 def _get_command_keys(self, *args): 

1246 """ 

1247 Get the keys in the command. If the command has no keys in in, None is 

1248 returned. 

1249 

1250 NOTE: Due to a bug in redis<7.0, this function does not work properly 

1251 for EVAL or EVALSHA when the `numkeys` arg is 0. 

1252 - issue: https://github.com/redis/redis/issues/9493 

1253 - fix: https://github.com/redis/redis/pull/9733 

1254 

1255 So, don't use this function with EVAL or EVALSHA. 

1256 """ 

1257 redis_conn = self.get_default_node().redis_connection 

1258 return self.commands_parser.get_keys(redis_conn, *args) 

1259 

1260 def determine_slot(self, *args) -> Optional[int]: 

1261 """ 

1262 Figure out what slot to use based on args. 

1263 

1264 Raises a RedisClusterException if there's a missing key and we can't 

1265 determine what slots to map the command to; or, if the keys don't 

1266 all map to the same key slot. 

1267 """ 

1268 command = args[0] 

1269 if self.command_flags.get(command) == SLOT_ID: 

1270 # The command contains the slot ID 

1271 return args[1] 

1272 

1273 # Get the keys in the command 

1274 

1275 # CLIENT TRACKING is a special case. 

1276 # It doesn't have any keys, it needs to be sent to the provided nodes 

1277 # By default it will be sent to all nodes. 

1278 if command.upper() == "CLIENT TRACKING": 

1279 return None 

1280 

1281 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

1282 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

1283 # where `self._get_command_keys()` fails anyway. So, we special case 

1284 # EVAL/EVALSHA. 

1285 if command.upper() in ("EVAL", "EVALSHA"): 

1286 # command syntax: EVAL "script body" num_keys ... 

1287 if len(args) <= 2: 

1288 raise RedisClusterException(f"Invalid args in command: {args}") 

1289 num_actual_keys = int(args[2]) 

1290 eval_keys = args[3 : 3 + num_actual_keys] 

1291 # if there are 0 keys, that means the script can be run on any node 

1292 # so we can just return a random slot 

1293 if len(eval_keys) == 0: 

1294 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1295 keys = eval_keys 

1296 else: 

1297 keys = self._get_command_keys(*args) 

1298 if keys is None or len(keys) == 0: 

1299 # FCALL can call a function with 0 keys, that means the function 

1300 # can be run on any node so we can just return a random slot 

1301 if command.upper() in ("FCALL", "FCALL_RO"): 

1302 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1303 raise RedisClusterException( 

1304 "No way to dispatch this command to Redis Cluster. " 

1305 "Missing key.\nYou can execute the command by specifying " 

1306 f"target nodes.\nCommand: {args}" 

1307 ) 

1308 

1309 # single key command 

1310 if len(keys) == 1: 

1311 return self.keyslot(keys[0]) 

1312 

1313 # multi-key command; we need to make sure all keys are mapped to 

1314 # the same slot 

1315 slots = {self.keyslot(key) for key in keys} 

1316 if len(slots) != 1: 

1317 raise RedisClusterException( 

1318 f"{command} - all keys must map to the same key slot" 

1319 ) 

1320 

1321 return slots.pop() 

1322 

1323 def get_encoder(self): 

1324 """ 

1325 Get the connections' encoder 

1326 """ 

1327 return self.encoder 

1328 

1329 def get_connection_kwargs(self): 

1330 """ 

1331 Get the connections' key-word arguments 

1332 """ 

1333 return self.nodes_manager.connection_kwargs 

1334 

1335 def _is_nodes_flag(self, target_nodes): 

1336 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1337 

1338 def _parse_target_nodes(self, target_nodes): 

1339 if isinstance(target_nodes, list): 

1340 nodes = target_nodes 

1341 elif isinstance(target_nodes, ClusterNode): 

1342 # Supports passing a single ClusterNode as a variable 

1343 nodes = [target_nodes] 

1344 elif isinstance(target_nodes, dict): 

1345 # Supports dictionaries of the format {node_name: node}. 

1346 # It enables to execute commands with multi nodes as follows: 

1347 # rc.cluster_save_config(rc.get_primaries()) 

1348 nodes = target_nodes.values() 

1349 else: 

1350 raise TypeError( 

1351 "target_nodes type can be one of the following: " 

1352 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1353 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1354 f"The passed type is {type(target_nodes)}" 

1355 ) 

1356 return nodes 

1357 

1358 def execute_command(self, *args, **kwargs): 

1359 return self._internal_execute_command(*args, **kwargs) 

1360 

1361 def _internal_execute_command(self, *args, **kwargs): 

1362 """ 

1363 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1364 

1365 It will try the number of times specified by the retries property from 

1366 config option "self.retry" which defaults to 3 unless manually 

1367 configured. 

1368 

1369 If it reaches the number of times, the command will raise the exception 

1370 

1371 Key argument :target_nodes: can be passed with the following types: 

1372 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1373 ClusterNode 

1374 list<ClusterNode> 

1375 dict<Any, ClusterNode> 

1376 """ 

1377 target_nodes_specified = False 

1378 is_default_node = False 

1379 target_nodes = None 

1380 passed_targets = kwargs.pop("target_nodes", None) 

1381 command_policies = self._policy_resolver.resolve(args[0].lower()) 

1382 

1383 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1384 target_nodes = self._parse_target_nodes(passed_targets) 

1385 target_nodes_specified = True 

1386 

1387 if not command_policies and not target_nodes_specified: 

1388 command = args[0].upper() 

1389 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1390 command = f"{args[0]} {args[1]}".upper() 

1391 

1392 # We only could resolve key properties if command is not 

1393 # in a list of pre-defined request policies 

1394 command_flag = self.command_flags.get(command) 

1395 if not command_flag: 

1396 # Fallback to default policy 

1397 if not self.get_default_node(): 

1398 slot = None 

1399 else: 

1400 slot = self.determine_slot(*args) 

1401 if slot is None: 

1402 command_policies = CommandPolicies() 

1403 else: 

1404 command_policies = CommandPolicies( 

1405 request_policy=RequestPolicy.DEFAULT_KEYED, 

1406 response_policy=ResponsePolicy.DEFAULT_KEYED, 

1407 ) 

1408 else: 

1409 if command_flag in self._command_flags_mapping: 

1410 command_policies = CommandPolicies( 

1411 request_policy=self._command_flags_mapping[command_flag] 

1412 ) 

1413 else: 

1414 command_policies = CommandPolicies() 

1415 elif not command_policies and target_nodes_specified: 

1416 command_policies = CommandPolicies() 

1417 

1418 # If an error that allows retrying was thrown, the nodes and slots 

1419 # cache were reinitialized. We will retry executing the command with 

1420 # the updated cluster setup only when the target nodes can be 

1421 # determined again with the new cache tables. Therefore, when target 

1422 # nodes were passed to this function, we cannot retry the command 

1423 # execution since the nodes may not be valid anymore after the tables 

1424 # were reinitialized. So in case of passed target nodes, 

1425 # retry_attempts will be set to 0. 

1426 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries() 

1427 # Add one for the first execution 

1428 execute_attempts = 1 + retry_attempts 

1429 failure_count = 0 

1430 

1431 # Start timing for observability 

1432 start_time = time.monotonic() 

1433 

1434 for _ in range(execute_attempts): 

1435 try: 

1436 res = {} 

1437 if not target_nodes_specified: 

1438 # Determine the nodes to execute the command on 

1439 target_nodes = self._determine_nodes( 

1440 *args, 

1441 request_policy=command_policies.request_policy, 

1442 nodes_flag=passed_targets, 

1443 ) 

1444 

1445 if not target_nodes: 

1446 raise RedisClusterException( 

1447 f"No targets were found to execute {args} command on" 

1448 ) 

1449 if ( 

1450 len(target_nodes) == 1 

1451 and target_nodes[0] == self.get_default_node() 

1452 ): 

1453 is_default_node = True 

1454 for node in target_nodes: 

1455 res[node.name] = self._execute_command(node, *args, **kwargs) 

1456 

1457 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED: 

1458 break 

1459 

1460 # Return the processed result 

1461 return self._process_result( 

1462 args[0], 

1463 res, 

1464 response_policy=command_policies.response_policy, 

1465 **kwargs, 

1466 ) 

1467 except Exception as e: 

1468 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1469 if is_default_node: 

1470 # Replace the default cluster node 

1471 self.replace_default_node() 

1472 # The nodes and slots cache were reinitialized. 

1473 # Try again with the new cluster setup. 

1474 retry_attempts -= 1 

1475 failure_count += 1 

1476 

1477 if hasattr(e, "connection"): 

1478 self._record_command_metric( 

1479 command_name=args[0], 

1480 duration_seconds=time.monotonic() - start_time, 

1481 connection=e.connection, 

1482 error=e, 

1483 ) 

1484 

1485 self._record_error_metric( 

1486 error=e, 

1487 connection=e.connection, 

1488 retry_attempts=failure_count, 

1489 ) 

1490 continue 

1491 else: 

1492 # raise the exception 

1493 if hasattr(e, "connection"): 

1494 self._record_error_metric( 

1495 error=e, 

1496 connection=e.connection, 

1497 retry_attempts=failure_count, 

1498 is_internal=False, 

1499 ) 

1500 raise e 

1501 

1502 def _execute_command(self, target_node, *args, **kwargs): 

1503 """ 

1504 Send a command to a node in the cluster 

1505 """ 

1506 command = args[0] 

1507 redis_node = None 

1508 connection = None 

1509 redirect_addr = None 

1510 asking = False 

1511 moved = False 

1512 ttl = int(self.RedisClusterRequestTTL) 

1513 

1514 # Start timing for observability 

1515 start_time = time.monotonic() 

1516 

1517 while ttl > 0: 

1518 ttl -= 1 

1519 try: 

1520 if asking: 

1521 target_node = self.get_node(node_name=redirect_addr) 

1522 elif moved: 

1523 # MOVED occurred and the slots cache was updated, 

1524 # refresh the target node 

1525 slot = self.determine_slot(*args) 

1526 target_node = self.nodes_manager.get_node_from_slot( 

1527 slot, 

1528 self.read_from_replicas and command in READ_COMMANDS, 

1529 self.load_balancing_strategy 

1530 if command in READ_COMMANDS 

1531 else None, 

1532 ) 

1533 moved = False 

1534 

1535 redis_node = self.get_redis_connection(target_node) 

1536 connection = get_connection(redis_node) 

1537 if asking: 

1538 connection.send_command("ASKING") 

1539 redis_node.parse_response(connection, "ASKING", **kwargs) 

1540 asking = False 

1541 connection.send_command(*args, **kwargs) 

1542 response = redis_node.parse_response(connection, command, **kwargs) 

1543 

1544 # Remove keys entry, it needs only for cache. 

1545 kwargs.pop("keys", None) 

1546 

1547 if command in self.cluster_response_callbacks: 

1548 response = self.cluster_response_callbacks[command]( 

1549 response, **kwargs 

1550 ) 

1551 

1552 self._record_command_metric( 

1553 command_name=command, 

1554 duration_seconds=time.monotonic() - start_time, 

1555 connection=connection, 

1556 ) 

1557 return response 

1558 except AuthenticationError as e: 

1559 e.connection = connection if connection is not None else target_node 

1560 self._record_command_metric( 

1561 command_name=command, 

1562 duration_seconds=time.monotonic() - start_time, 

1563 connection=connection, 

1564 error=e, 

1565 ) 

1566 raise 

1567 except MaxConnectionsError as e: 

1568 # MaxConnectionsError indicates client-side resource exhaustion 

1569 # (too many connections in the pool), not a node failure. 

1570 # Don't treat this as a node failure - just re-raise the error 

1571 # without reinitializing the cluster. 

1572 # The connection in the error is used to report the metrics based on host and port info 

1573 # so we use the target node object which contains the host and port info 

1574 # because we did not get the connection yet 

1575 e.connection = target_node 

1576 self._record_command_metric( 

1577 command_name=command, 

1578 duration_seconds=time.monotonic() - start_time, 

1579 connection=connection, 

1580 error=e, 

1581 ) 

1582 raise 

1583 except (ConnectionError, TimeoutError) as e: 

1584 if is_debug_log_enabled(): 

1585 socket_address = self._extracts_socket_address(connection) 

1586 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1587 logger.debug( 

1588 f"{type(e).__name__} received for command {args_log_str}, on node {target_node.name}, " 

1589 f"and connection: {connection} using local socket address: {socket_address}, error: {e}" 

1590 ) 

1591 # this is used to report the metrics based on host and port info 

1592 e.connection = connection if connection else target_node 

1593 

1594 # ConnectionError can also be raised if we couldn't get a 

1595 # connection from the pool before timing out, so check that 

1596 # this is an actual connection before attempting to disconnect. 

1597 if connection is not None: 

1598 connection.disconnect() 

1599 

1600 # Instead of setting to None, properly handle the pool 

1601 # Get the pool safely - redis_connection could be set to None 

1602 # by another thread between the check and access 

1603 redis_conn = target_node.redis_connection 

1604 if redis_conn is not None: 

1605 pool = redis_conn.connection_pool 

1606 if pool is not None: 

1607 with pool._lock: 

1608 # take care for the active connections in the pool 

1609 pool.update_active_connections_for_reconnect() 

1610 # disconnect all free connections 

1611 pool.disconnect_free_connections() 

1612 

1613 # Move the failed node to the end of the cached nodes list 

1614 self.nodes_manager.move_node_to_end_of_cached_nodes(target_node.name) 

1615 

1616 # DON'T set redis_connection = None - keep the pool for reuse 

1617 self.nodes_manager.initialize() 

1618 e.connection = connection 

1619 self._record_command_metric( 

1620 command_name=command, 

1621 duration_seconds=time.monotonic() - start_time, 

1622 connection=connection, 

1623 error=e, 

1624 ) 

1625 raise e 

1626 except MovedError as e: 

1627 if is_debug_log_enabled(): 

1628 socket_address = self._extracts_socket_address(connection) 

1629 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1630 logger.debug( 

1631 f"MOVED error received for command {args_log_str}, on node {target_node.name}, " 

1632 f"and connection: {connection} using local socket address: {socket_address}, error: {e}" 

1633 ) 

1634 # First, we will try to patch the slots/nodes cache with the 

1635 # redirected node output and try again. If MovedError exceeds 

1636 # 'reinitialize_steps' number of times, we will force 

1637 # reinitializing the tables, and then try again. 

1638 # 'reinitialize_steps' counter will increase faster when 

1639 # the same client object is shared between multiple threads. To 

1640 # reduce the frequency you can set this variable in the 

1641 # RedisCluster constructor. 

1642 self.reinitialize_counter += 1 

1643 if self._should_reinitialized(): 

1644 # during this call all connections are closed or marked for disconnect, 

1645 # so we don't need to disconnect the changed node's connections 

1646 self.nodes_manager.initialize( 

1647 additional_startup_nodes_info=[(e.host, e.port)] 

1648 ) 

1649 # Reset the counter 

1650 self.reinitialize_counter = 0 

1651 else: 

1652 self.nodes_manager.move_slot(e) 

1653 moved = True 

1654 self._record_command_metric( 

1655 command_name=command, 

1656 duration_seconds=time.monotonic() - start_time, 

1657 connection=connection, 

1658 error=e, 

1659 ) 

1660 self._record_error_metric( 

1661 error=e, 

1662 connection=connection, 

1663 ) 

1664 except TryAgainError as e: 

1665 if is_debug_log_enabled(): 

1666 socket_address = self._extracts_socket_address(connection) 

1667 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1668 logger.debug( 

1669 f"TRYAGAIN error received for command {args_log_str}, on node {target_node.name}, " 

1670 f"and connection: {connection} using local socket address: {socket_address}" 

1671 ) 

1672 if ttl < self.RedisClusterRequestTTL / 2: 

1673 time.sleep(0.05) 

1674 

1675 self._record_command_metric( 

1676 command_name=command, 

1677 duration_seconds=time.monotonic() - start_time, 

1678 connection=connection, 

1679 error=e, 

1680 ) 

1681 self._record_error_metric( 

1682 error=e, 

1683 connection=connection, 

1684 ) 

1685 except AskError as e: 

1686 if is_debug_log_enabled(): 

1687 socket_address = self._extracts_socket_address(connection) 

1688 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1689 logger.debug( 

1690 f"ASK error received for command {args_log_str}, on node {target_node.name}, " 

1691 f"and connection: {connection} using local socket address: {socket_address}, error: {e}" 

1692 ) 

1693 redirect_addr = get_node_name(host=e.host, port=e.port) 

1694 asking = True 

1695 

1696 self._record_command_metric( 

1697 command_name=command, 

1698 duration_seconds=time.monotonic() - start_time, 

1699 connection=connection, 

1700 error=e, 

1701 ) 

1702 self._record_error_metric( 

1703 error=e, 

1704 connection=connection, 

1705 ) 

1706 except (ClusterDownError, SlotNotCoveredError) as e: 

1707 # ClusterDownError can occur during a failover and to get 

1708 # self-healed, we will try to reinitialize the cluster layout 

1709 # and retry executing the command 

1710 

1711 # SlotNotCoveredError can occur when the cluster is not fully 

1712 # initialized or can be temporary issue. 

1713 # We will try to reinitialize the cluster topology 

1714 # and retry executing the command 

1715 

1716 time.sleep(0.25) 

1717 self.nodes_manager.initialize() 

1718 

1719 # if we have a connection, use it, otherwise use the target node 

1720 # object which contains the host and port info 

1721 # this is used to report the metrics based on host and port info 

1722 e.connection = connection if connection else target_node 

1723 self._record_command_metric( 

1724 command_name=command, 

1725 duration_seconds=time.monotonic() - start_time, 

1726 connection=connection, 

1727 error=e, 

1728 ) 

1729 raise 

1730 except ResponseError as e: 

1731 # this is used to report the metrics based on host and port info 

1732 e.connection = connection 

1733 self._record_command_metric( 

1734 command_name=command, 

1735 duration_seconds=time.monotonic() - start_time, 

1736 connection=connection, 

1737 error=e, 

1738 ) 

1739 raise 

1740 except Exception as e: 

1741 if connection: 

1742 connection.disconnect() 

1743 

1744 # if we have a connection, use it, otherwise use the target node 

1745 # object which contains the host and port info 

1746 # this is used to report the metrics based on host and port info 

1747 e.connection = connection if connection else target_node 

1748 self._record_command_metric( 

1749 command_name=command, 

1750 duration_seconds=time.monotonic() - start_time, 

1751 connection=connection, 

1752 error=e, 

1753 ) 

1754 raise e 

1755 finally: 

1756 if connection is not None: 

1757 redis_node.connection_pool.release(connection) 

1758 

1759 e = ClusterError("TTL exhausted.") 

1760 # In this case we should have an active connection. 

1761 # If we are here, we have received many MOVED or ASK errors and finally exhausted the TTL. 

1762 # This means that we used an active connection to read from the socket. 

1763 # This is used to report metrics based on the host and port information. 

1764 e.connection = connection 

1765 self._record_command_metric( 

1766 command_name=command, 

1767 duration_seconds=time.monotonic() - start_time, 

1768 connection=connection, 

1769 error=e, 

1770 ) 

1771 raise e 

1772 

1773 def _record_command_metric( 

1774 self, 

1775 command_name: str, 

1776 duration_seconds: float, 

1777 connection: Connection, 

1778 error=None, 

1779 ): 

1780 """ 

1781 Records operation duration metric directly. 

1782 """ 

1783 record_operation_duration( 

1784 command_name=command_name, 

1785 duration_seconds=duration_seconds, 

1786 server_address=connection.host, 

1787 server_port=connection.port, 

1788 db_namespace=str(connection.db), 

1789 error=error, 

1790 ) 

1791 

1792 def _record_error_metric( 

1793 self, 

1794 error: Exception, 

1795 connection: Connection, 

1796 is_internal: bool = True, 

1797 retry_attempts: Optional[int] = None, 

1798 ): 

1799 """ 

1800 Records error count metric directly. 

1801 """ 

1802 record_error_count( 

1803 server_address=connection.host, 

1804 server_port=connection.port, 

1805 network_peer_address=connection.host, 

1806 network_peer_port=connection.port, 

1807 error_type=error, 

1808 retry_attempts=retry_attempts if retry_attempts is not None else 0, 

1809 is_internal=is_internal, 

1810 ) 

1811 

1812 def _extracts_socket_address( 

1813 self, connection: Optional[Connection] 

1814 ) -> Optional[int]: 

1815 if connection is None: 

1816 return None 

1817 try: 

1818 socket_address = ( 

1819 connection._sock.getsockname() if connection._sock else None 

1820 ) 

1821 socket_address = socket_address[1] if socket_address else None 

1822 except (AttributeError, OSError): 

1823 pass 

1824 return socket_address 

1825 

1826 def close(self) -> None: 

1827 try: 

1828 with self._lock: 

1829 if self.nodes_manager: 

1830 self.nodes_manager.close() 

1831 except AttributeError: 

1832 # RedisCluster's __init__ can fail before nodes_manager is set 

1833 pass 

1834 

1835 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs): 

1836 """ 

1837 Process the result of the executed command. 

1838 The function would return a dict or a single value. 

1839 

1840 :type command: str 

1841 :type res: dict 

1842 

1843 `res` should be in the following format: 

1844 Dict<node_name, command_result> 

1845 """ 

1846 if command in self.result_callbacks: 

1847 res = self.result_callbacks[command](command, res, **kwargs) 

1848 elif len(res) == 1: 

1849 # When we execute the command on a single node, we can 

1850 # remove the dictionary and return a single response 

1851 res = list(res.values())[0] 

1852 

1853 return self._policies_callback_mapping[response_policy](res) 

1854 

1855 def load_external_module(self, funcname, func): 

1856 """ 

1857 This function can be used to add externally defined redis modules, 

1858 and their namespaces to the redis client. 

1859 

1860 ``funcname`` - A string containing the name of the function to create 

1861 ``func`` - The function, being added to this class. 

1862 """ 

1863 setattr(self, funcname, func) 

1864 

1865 def transaction(self, func, *watches, **kwargs): 

1866 """ 

1867 Convenience method for executing the callable `func` as a transaction 

1868 while watching all keys specified in `watches`. The 'func' callable 

1869 should expect a single argument which is a Pipeline object. 

1870 """ 

1871 shard_hint = kwargs.pop("shard_hint", None) 

1872 value_from_callable = kwargs.pop("value_from_callable", False) 

1873 watch_delay = kwargs.pop("watch_delay", None) 

1874 with self.pipeline(True, shard_hint) as pipe: 

1875 while True: 

1876 try: 

1877 if watches: 

1878 pipe.watch(*watches) 

1879 func_value = func(pipe) 

1880 exec_value = pipe.execute() 

1881 return func_value if value_from_callable else exec_value 

1882 except WatchError: 

1883 if watch_delay is not None and watch_delay > 0: 

1884 time.sleep(watch_delay) 

1885 continue 

1886 

1887 

1888class ClusterNode: 

1889 def __init__(self, host, port, server_type=None, redis_connection=None): 

1890 if host == "localhost": 

1891 host = socket.gethostbyname(host) 

1892 

1893 self.host = host 

1894 self.port = port 

1895 self.name = get_node_name(host, port) 

1896 self.server_type = server_type 

1897 self.redis_connection = redis_connection 

1898 

1899 def __repr__(self): 

1900 return ( 

1901 f"[host={self.host}," 

1902 f"port={self.port}," 

1903 f"name={self.name}," 

1904 f"server_type={self.server_type}," 

1905 f"redis_connection={self.redis_connection}]" 

1906 ) 

1907 

1908 def __eq__(self, obj): 

1909 return isinstance(obj, ClusterNode) and obj.name == self.name 

1910 

1911 def __hash__(self): 

1912 return hash(self.name) 

1913 

1914 

1915class LoadBalancingStrategy(Enum): 

1916 ROUND_ROBIN = "round_robin" 

1917 ROUND_ROBIN_REPLICAS = "round_robin_replicas" 

1918 RANDOM_REPLICA = "random_replica" 

1919 

1920 

1921class LoadBalancer: 

1922 """ 

1923 Round-Robin Load Balancing 

1924 """ 

1925 

1926 def __init__(self, start_index: int = 0) -> None: 

1927 self.primary_to_idx: dict[str, int] = {} 

1928 self.start_index: int = start_index 

1929 self._lock: threading.Lock = threading.Lock() 

1930 

1931 def get_server_index( 

1932 self, 

1933 primary: str, 

1934 list_size: int, 

1935 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN, 

1936 ) -> int: 

1937 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA: 

1938 return self._get_random_replica_index(list_size) 

1939 else: 

1940 return self._get_round_robin_index( 

1941 primary, 

1942 list_size, 

1943 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, 

1944 ) 

1945 

1946 def reset(self) -> None: 

1947 with self._lock: 

1948 self.primary_to_idx.clear() 

1949 

1950 def _get_random_replica_index(self, list_size: int) -> int: 

1951 return random.randint(1, list_size - 1) 

1952 

1953 def _get_round_robin_index( 

1954 self, primary: str, list_size: int, replicas_only: bool 

1955 ) -> int: 

1956 with self._lock: 

1957 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

1958 if replicas_only and server_index == 0: 

1959 # skip the primary node index 

1960 server_index = 1 

1961 # Update the index for the next round 

1962 self.primary_to_idx[primary] = (server_index + 1) % list_size 

1963 return server_index 

1964 

1965 

1966class NodesManager: 

1967 def __init__( 

1968 self, 

1969 startup_nodes: list[ClusterNode], 

1970 from_url=False, 

1971 require_full_coverage=False, 

1972 lock: Optional[threading.RLock] = None, 

1973 dynamic_startup_nodes=True, 

1974 connection_pool_class=ConnectionPool, 

1975 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

1976 cache: Optional[CacheInterface] = None, 

1977 cache_config: Optional[CacheConfig] = None, 

1978 cache_factory: Optional[CacheFactoryInterface] = None, 

1979 event_dispatcher: Optional[EventDispatcher] = None, 

1980 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

1981 **kwargs, 

1982 ): 

1983 self.nodes_cache: dict[str, ClusterNode] = {} 

1984 self.slots_cache: dict[int, list[ClusterNode]] = {} 

1985 self.startup_nodes: dict[str, ClusterNode] = {n.name: n for n in startup_nodes} 

1986 self.default_node: Optional[ClusterNode] = None 

1987 self._epoch: int = 0 

1988 self.from_url = from_url 

1989 self._require_full_coverage = require_full_coverage 

1990 self._dynamic_startup_nodes = dynamic_startup_nodes 

1991 self.connection_pool_class = connection_pool_class 

1992 self.address_remap = address_remap 

1993 self._cache: Optional[CacheInterface] = None 

1994 if cache: 

1995 self._cache = cache 

1996 elif cache_factory is not None: 

1997 self._cache = cache_factory.get_cache() 

1998 elif cache_config is not None: 

1999 self._cache = CacheFactory(cache_config).get_cache() 

2000 self.connection_kwargs = kwargs 

2001 self.read_load_balancer = LoadBalancer() 

2002 

2003 # nodes_cache / slots_cache / startup_nodes / default_node are protected by _lock 

2004 if lock is None: 

2005 self._lock = threading.RLock() 

2006 else: 

2007 self._lock = lock 

2008 

2009 # initialize holds _initialization_lock to dedup multiple calls to reinitialize; 

2010 # note that if we hold both _lock and _initialization_lock, we _must_ acquire 

2011 # _initialization_lock first (ie: to have a consistent order) to avoid deadlock. 

2012 self._initialization_lock: threading.RLock = threading.RLock() 

2013 

2014 if event_dispatcher is None: 

2015 self._event_dispatcher = EventDispatcher() 

2016 else: 

2017 self._event_dispatcher = event_dispatcher 

2018 self._credential_provider = self.connection_kwargs.get( 

2019 "credential_provider", None 

2020 ) 

2021 self.maint_notifications_config = maint_notifications_config 

2022 

2023 self.initialize() 

2024 

2025 def get_node( 

2026 self, 

2027 host: Optional[str] = None, 

2028 port: Optional[int] = None, 

2029 node_name: Optional[str] = None, 

2030 ) -> Optional[ClusterNode]: 

2031 """ 

2032 Get the requested node from the cluster's nodes. 

2033 nodes. 

2034 :return: ClusterNode if the node exists, else None 

2035 """ 

2036 if host and port: 

2037 # the user passed host and port 

2038 if host == "localhost": 

2039 host = socket.gethostbyname(host) 

2040 with self._lock: 

2041 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

2042 elif node_name: 

2043 with self._lock: 

2044 return self.nodes_cache.get(node_name) 

2045 else: 

2046 return None 

2047 

2048 def move_slot(self, e: Union[AskError, MovedError]): 

2049 """ 

2050 Update the slot's node with the redirected one 

2051 """ 

2052 with self._lock: 

2053 redirected_node = self.get_node(host=e.host, port=e.port) 

2054 if redirected_node is not None: 

2055 # The node already exists 

2056 if redirected_node.server_type is not PRIMARY: 

2057 # Update the node's server type 

2058 redirected_node.server_type = PRIMARY 

2059 else: 

2060 # This is a new node, we will add it to the nodes cache 

2061 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

2062 self.nodes_cache[redirected_node.name] = redirected_node 

2063 

2064 slot_nodes = self.slots_cache[e.slot_id] 

2065 if redirected_node not in slot_nodes: 

2066 # The new slot owner is a new server, or a server from a different 

2067 # shard. We need to remove all current nodes from the slot's list 

2068 # (including replications) and add just the new node. 

2069 self.slots_cache[e.slot_id] = [redirected_node] 

2070 elif redirected_node is not slot_nodes[0]: 

2071 # The MOVED error resulted from a failover, and the new slot owner 

2072 # had previously been a replica. 

2073 old_primary = slot_nodes[0] 

2074 # Update the old primary to be a replica and add it to the end of 

2075 # the slot's node list 

2076 old_primary.server_type = REPLICA 

2077 slot_nodes.append(old_primary) 

2078 # Remove the old replica, which is now a primary, from the slot's 

2079 # node list 

2080 slot_nodes.remove(redirected_node) 

2081 # Override the old primary with the new one 

2082 slot_nodes[0] = redirected_node 

2083 if self.default_node == old_primary: 

2084 # Update the default node with the new primary 

2085 self.default_node = redirected_node 

2086 # else: circular MOVED to current primary -> no-op 

2087 

2088 @deprecated_args( 

2089 args_to_warn=["server_type"], 

2090 reason=( 

2091 "In case you need select some load balancing strategy " 

2092 "that will use replicas, please set it through 'load_balancing_strategy'" 

2093 ), 

2094 version="5.3.0", 

2095 ) 

2096 def get_node_from_slot( 

2097 self, 

2098 slot: int, 

2099 read_from_replicas: bool = False, 

2100 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

2101 server_type: Optional[Literal["primary", "replica"]] = None, 

2102 ) -> ClusterNode: 

2103 """ 

2104 Gets a node that servers this hash slot 

2105 """ 

2106 

2107 if read_from_replicas is True and load_balancing_strategy is None: 

2108 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN 

2109 

2110 with self._lock: 

2111 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

2112 raise SlotNotCoveredError( 

2113 f'Slot "{slot}" not covered by the cluster. ' 

2114 + f'"require_full_coverage={self._require_full_coverage}"' 

2115 ) 

2116 

2117 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: 

2118 # get the server index using the strategy defined in load_balancing_strategy 

2119 primary_name = self.slots_cache[slot][0].name 

2120 node_idx = self.read_load_balancer.get_server_index( 

2121 primary_name, len(self.slots_cache[slot]), load_balancing_strategy 

2122 ) 

2123 elif ( 

2124 server_type is None 

2125 or server_type == PRIMARY 

2126 or len(self.slots_cache[slot]) == 1 

2127 ): 

2128 # return a primary 

2129 node_idx = 0 

2130 else: 

2131 # return a replica 

2132 # randomly choose one of the replicas 

2133 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

2134 

2135 return self.slots_cache[slot][node_idx] 

2136 

2137 def get_nodes_by_server_type(self, server_type: Literal["primary", "replica"]): 

2138 """ 

2139 Get all nodes with the specified server type 

2140 :param server_type: 'primary' or 'replica' 

2141 :return: list of ClusterNode 

2142 """ 

2143 with self._lock: 

2144 return [ 

2145 node 

2146 for node in self.nodes_cache.values() 

2147 if node.server_type == server_type 

2148 ] 

2149 

2150 @deprecated_function( 

2151 reason="This method is not used anymore internally. The startup nodes are populated automatically.", 

2152 version="7.0.2", 

2153 ) 

2154 def populate_startup_nodes(self, nodes): 

2155 """ 

2156 Populate all startup nodes and filters out any duplicates 

2157 """ 

2158 with self._lock: 

2159 for n in nodes: 

2160 self.startup_nodes[n.name] = n 

2161 

2162 def move_node_to_end_of_cached_nodes(self, node_name: str) -> None: 

2163 """ 

2164 Move a failing node to the end of startup_nodes and nodes_cache so it's 

2165 tried last during reinitialization and when selecting the default node. 

2166 If the node is not in the respective list, nothing is done. 

2167 """ 

2168 # Move in startup_nodes 

2169 if node_name in self.startup_nodes and len(self.startup_nodes) > 1: 

2170 node = self.startup_nodes.pop(node_name) 

2171 self.startup_nodes[node_name] = node # Re-insert at end 

2172 

2173 # Move in nodes_cache - this affects get_nodes_by_server_type ordering 

2174 # which is used to select the default_node during initialize() 

2175 if node_name in self.nodes_cache and len(self.nodes_cache) > 1: 

2176 node = self.nodes_cache.pop(node_name) 

2177 self.nodes_cache[node_name] = node # Re-insert at end 

2178 

2179 def check_slots_coverage(self, slots_cache): 

2180 # Validate if all slots are covered or if we should try next 

2181 # startup node 

2182 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

2183 if i not in slots_cache: 

2184 return False 

2185 return True 

2186 

2187 def create_redis_connections(self, nodes): 

2188 """ 

2189 This function will create a redis connection to all nodes in :nodes: 

2190 """ 

2191 connection_pools = [] 

2192 for node in nodes: 

2193 if node.redis_connection is None: 

2194 node.redis_connection = self.create_redis_node( 

2195 host=node.host, 

2196 port=node.port, 

2197 maint_notifications_config=self.maint_notifications_config, 

2198 **self.connection_kwargs, 

2199 ) 

2200 connection_pools.append(node.redis_connection.connection_pool) 

2201 

2202 self._event_dispatcher.dispatch( 

2203 AfterPooledConnectionsInstantiationEvent( 

2204 connection_pools, ClientType.SYNC, self._credential_provider 

2205 ) 

2206 ) 

2207 

2208 def create_redis_node( 

2209 self, 

2210 host, 

2211 port, 

2212 **kwargs, 

2213 ): 

2214 # We are configuring the connection pool not to retry 

2215 # connections on lower level clients to avoid retrying 

2216 # connections to nodes that are not reachable 

2217 # and to avoid blocking the connection pool. 

2218 # The only error that will have some handling in the lower 

2219 # level clients is ConnectionError which will trigger disconnection 

2220 # of the socket. 

2221 # The retries will be handled on cluster client level 

2222 # where we will have proper handling of the cluster topology 

2223 node_retry_config = Retry( 

2224 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) 

2225 ) 

2226 

2227 if self.from_url: 

2228 # Create a redis node with a custom connection pool 

2229 kwargs.update({"host": host}) 

2230 kwargs.update({"port": port}) 

2231 kwargs.update({"cache": self._cache}) 

2232 kwargs.update({"retry": node_retry_config}) 

2233 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

2234 else: 

2235 r = Redis( 

2236 host=host, 

2237 port=port, 

2238 cache=self._cache, 

2239 retry=node_retry_config, 

2240 **kwargs, 

2241 ) 

2242 return r 

2243 

2244 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

2245 node_name = get_node_name(host, port) 

2246 # check if we already have this node in the tmp_nodes_cache 

2247 target_node = tmp_nodes_cache.get(node_name) 

2248 if target_node is None: 

2249 # before creating a new cluster node, check if the cluster node already 

2250 # exists in the current nodes cache and has a valid connection so we can 

2251 # reuse it 

2252 redis_connection: Optional[Redis] = None 

2253 with self._lock: 

2254 previous_node = self.nodes_cache.get(node_name) 

2255 if previous_node: 

2256 redis_connection = previous_node.redis_connection 

2257 # don't update the old ClusterNode, so we don't update its role 

2258 # outside of the lock 

2259 target_node = ClusterNode(host, port, role, redis_connection) 

2260 # add this node to the nodes cache 

2261 tmp_nodes_cache[target_node.name] = target_node 

2262 

2263 return target_node 

2264 

2265 def _get_epoch(self) -> int: 

2266 """ 

2267 Get the current epoch value. This method exists primarily to allow 

2268 tests to mock the epoch fetch and control race condition timing. 

2269 """ 

2270 with self._lock: 

2271 return self._epoch 

2272 

2273 def initialize( 

2274 self, 

2275 additional_startup_nodes_info: Optional[List[Tuple[str, int]]] = None, 

2276 disconnect_startup_nodes_pools: bool = True, 

2277 ): 

2278 """ 

2279 Initializes the nodes cache, slots cache and redis connections. 

2280 :startup_nodes: 

2281 Responsible for discovering other nodes in the cluster 

2282 :disconnect_startup_nodes_pools: 

2283 Whether to disconnect the connection pool of the startup nodes 

2284 after the initialization is complete. This is useful when the 

2285 startup nodes are not part of the cluster and we want to avoid 

2286 keeping the connection open. 

2287 :additional_startup_nodes_info: 

2288 Additional nodes to add temporarily to the startup nodes. 

2289 The additional nodes will be used just in the process of extraction of the slots 

2290 and nodes information from the cluster. 

2291 This is useful when we want to add new nodes to the cluster 

2292 and initialize the client 

2293 with them. 

2294 The format of the list is a list of tuples, where each tuple contains 

2295 the host and port of the node. 

2296 """ 

2297 self.reset() 

2298 tmp_nodes_cache = {} 

2299 tmp_slots = {} 

2300 disagreements = [] 

2301 startup_nodes_reachable = False 

2302 fully_covered = False 

2303 kwargs = self.connection_kwargs 

2304 exception = None 

2305 epoch = self._get_epoch() 

2306 if additional_startup_nodes_info is None: 

2307 additional_startup_nodes_info = [] 

2308 

2309 with self._initialization_lock: 

2310 with self._lock: 

2311 if epoch != self._epoch: 

2312 # another thread has already re-initialized the nodes; don't 

2313 # bother running again 

2314 return 

2315 

2316 with self._lock: 

2317 startup_nodes = tuple(self.startup_nodes.values()) 

2318 

2319 additional_startup_nodes = [ 

2320 ClusterNode(host, port) for host, port in additional_startup_nodes_info 

2321 ] 

2322 if is_debug_log_enabled(): 

2323 logger.debug( 

2324 f"Topology refresh: using additional nodes: {[node.name for node in additional_startup_nodes]}; " 

2325 f"and startup nodes: {[node.name for node in startup_nodes]}" 

2326 ) 

2327 

2328 for startup_node in (*startup_nodes, *additional_startup_nodes): 

2329 try: 

2330 if startup_node.redis_connection: 

2331 r = startup_node.redis_connection 

2332 

2333 else: 

2334 # Create a new Redis connection 

2335 if is_debug_log_enabled(): 

2336 socket_timeout = kwargs.get("socket_timeout", "not set") 

2337 socket_connect_timeout = kwargs.get( 

2338 "socket_connect_timeout", "not set" 

2339 ) 

2340 maint_enabled = ( 

2341 self.maint_notifications_config.enabled 

2342 if self.maint_notifications_config 

2343 else False 

2344 ) 

2345 logger.debug( 

2346 "Topology refresh: Creating new Redis connection to " 

2347 f"{startup_node.host}:{startup_node.port}; " 

2348 f"with socket_timeout: {socket_timeout}, and " 

2349 f"socket_connect_timeout: {socket_connect_timeout}, " 

2350 "and maint_notifications enabled: " 

2351 f"{maint_enabled}" 

2352 ) 

2353 r = self.create_redis_node( 

2354 startup_node.host, 

2355 startup_node.port, 

2356 maint_notifications_config=self.maint_notifications_config, 

2357 **kwargs, 

2358 ) 

2359 if startup_node in self.startup_nodes.values(): 

2360 self.startup_nodes[startup_node.name].redis_connection = r 

2361 else: 

2362 startup_node.redis_connection = r 

2363 try: 

2364 # Make sure cluster mode is enabled on this node 

2365 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

2366 if disconnect_startup_nodes_pools: 

2367 with r.connection_pool._lock: 

2368 # take care to clear connections before we move on 

2369 # mark all active connections for reconnect - they will be 

2370 # reconnected on next use, but will allow current in flight commands to complete first 

2371 r.connection_pool.update_active_connections_for_reconnect() 

2372 # Needed to clear READONLY state when it is no longer applicable 

2373 r.connection_pool.disconnect_free_connections() 

2374 except ResponseError: 

2375 raise RedisClusterException( 

2376 "Cluster mode is not enabled on this node" 

2377 ) 

2378 startup_nodes_reachable = True 

2379 except Exception as e: 

2380 # Try the next startup node. 

2381 # The exception is saved and raised only if we have no more nodes. 

2382 exception = e 

2383 continue 

2384 

2385 # CLUSTER SLOTS command results in the following output: 

2386 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

2387 # where each node contains the following list: [IP, port, node_id] 

2388 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

2389 # primary node of the first slot section. 

2390 # If there's only one server in the cluster, its ``host`` is '' 

2391 # Fix it to the host in startup_nodes 

2392 if ( 

2393 len(cluster_slots) == 1 

2394 and len(cluster_slots[0][2][0]) == 0 

2395 and len(self.startup_nodes) == 1 

2396 ): 

2397 cluster_slots[0][2][0] = startup_node.host 

2398 

2399 for slot in cluster_slots: 

2400 primary_node = slot[2] 

2401 host = str_if_bytes(primary_node[0]) 

2402 if host == "": 

2403 host = startup_node.host 

2404 port = int(primary_node[1]) 

2405 host, port = self.remap_host_port(host, port) 

2406 

2407 nodes_for_slot = [] 

2408 

2409 target_node = self._get_or_create_cluster_node( 

2410 host, port, PRIMARY, tmp_nodes_cache 

2411 ) 

2412 nodes_for_slot.append(target_node) 

2413 

2414 replica_nodes = slot[3:] 

2415 for replica_node in replica_nodes: 

2416 host = str_if_bytes(replica_node[0]) 

2417 port = int(replica_node[1]) 

2418 host, port = self.remap_host_port(host, port) 

2419 target_replica_node = self._get_or_create_cluster_node( 

2420 host, port, REPLICA, tmp_nodes_cache 

2421 ) 

2422 nodes_for_slot.append(target_replica_node) 

2423 

2424 for i in range(int(slot[0]), int(slot[1]) + 1): 

2425 if i not in tmp_slots: 

2426 tmp_slots[i] = nodes_for_slot 

2427 else: 

2428 # Validate that 2 nodes want to use the same slot cache 

2429 # setup 

2430 tmp_slot = tmp_slots[i][0] 

2431 if tmp_slot.name != target_node.name: 

2432 disagreements.append( 

2433 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

2434 ) 

2435 

2436 if len(disagreements) > 5: 

2437 raise RedisClusterException( 

2438 f"startup_nodes could not agree on a valid " 

2439 f"slots cache: {', '.join(disagreements)}" 

2440 ) 

2441 

2442 fully_covered = self.check_slots_coverage(tmp_slots) 

2443 if fully_covered: 

2444 # Don't need to continue to the next startup node if all 

2445 # slots are covered 

2446 break 

2447 

2448 if not startup_nodes_reachable: 

2449 raise RedisClusterException( 

2450 f"Redis Cluster cannot be connected. Please provide at least " 

2451 f"one reachable node: {str(exception)}" 

2452 ) from exception 

2453 

2454 # Create Redis connections to all nodes 

2455 self.create_redis_connections(list(tmp_nodes_cache.values())) 

2456 

2457 # Check if the slots are not fully covered 

2458 if not fully_covered and self._require_full_coverage: 

2459 # Despite the requirement that the slots be covered, there 

2460 # isn't a full coverage 

2461 raise RedisClusterException( 

2462 f"All slots are not covered after query all startup_nodes. " 

2463 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

2464 f"covered..." 

2465 ) 

2466 

2467 # Set the tmp variables to the real variables 

2468 with self._lock: 

2469 self.nodes_cache = tmp_nodes_cache 

2470 self.slots_cache = tmp_slots 

2471 # Set the default node 

2472 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

2473 if self._dynamic_startup_nodes: 

2474 # Populate the startup nodes with all discovered nodes 

2475 self.startup_nodes = tmp_nodes_cache 

2476 # Increment the epoch to signal that initialization has completed 

2477 self._epoch += 1 

2478 

2479 def close(self) -> None: 

2480 with self._lock: 

2481 self.default_node = None 

2482 nodes = tuple(self.nodes_cache.values()) 

2483 for node in nodes: 

2484 if node.redis_connection: 

2485 node.redis_connection.close() 

2486 

2487 def reset(self): 

2488 try: 

2489 self.read_load_balancer.reset() 

2490 except TypeError: 

2491 # The read_load_balancer is None, do nothing 

2492 pass 

2493 

2494 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

2495 """ 

2496 Remap the host and port returned from the cluster to a different 

2497 internal value. Useful if the client is not connecting directly 

2498 to the cluster. 

2499 """ 

2500 if self.address_remap: 

2501 return self.address_remap((host, port)) 

2502 return host, port 

2503 

2504 def find_connection_owner(self, connection: Connection) -> Optional[ClusterNode]: 

2505 node_name = get_node_name(connection.host, connection.port) 

2506 with self._lock: 

2507 for node in tuple(self.nodes_cache.values()): 

2508 if node.redis_connection: 

2509 conn_args = node.redis_connection.connection_pool.connection_kwargs 

2510 if node_name == get_node_name( 

2511 conn_args.get("host"), conn_args.get("port") 

2512 ): 

2513 return node 

2514 return None 

2515 

2516 

2517class ClusterPubSub(PubSub): 

2518 """ 

2519 Wrapper for PubSub class. 

2520 

2521 IMPORTANT: before using ClusterPubSub, read about the known limitations 

2522 with pubsub in Cluster mode and learn how to workaround them: 

2523 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html 

2524 """ 

2525 

2526 def __init__( 

2527 self, 

2528 redis_cluster, 

2529 node=None, 

2530 host=None, 

2531 port=None, 

2532 push_handler_func=None, 

2533 event_dispatcher: Optional["EventDispatcher"] = None, 

2534 **kwargs, 

2535 ): 

2536 """ 

2537 When a pubsub instance is created without specifying a node, a single 

2538 node will be transparently chosen for the pubsub connection on the 

2539 first command execution. The node will be determined by: 

2540 1. Hashing the channel name in the request to find its keyslot 

2541 2. Selecting a node that handles the keyslot: If read_from_replicas is 

2542 set to true or load_balancing_strategy is set, a replica can be selected. 

2543 

2544 :type redis_cluster: RedisCluster 

2545 :type node: ClusterNode 

2546 :type host: str 

2547 :type port: int 

2548 """ 

2549 self.node = None 

2550 self.set_pubsub_node(redis_cluster, node, host, port) 

2551 connection_pool = ( 

2552 None 

2553 if self.node is None 

2554 else redis_cluster.get_redis_connection(self.node).connection_pool 

2555 ) 

2556 self.cluster = redis_cluster 

2557 self.node_pubsub_mapping = {} 

2558 self._pubsubs_generator = self._pubsubs_generator() 

2559 if event_dispatcher is None: 

2560 self._event_dispatcher = EventDispatcher() 

2561 else: 

2562 self._event_dispatcher = event_dispatcher 

2563 super().__init__( 

2564 connection_pool=connection_pool, 

2565 encoder=redis_cluster.encoder, 

2566 push_handler_func=push_handler_func, 

2567 event_dispatcher=self._event_dispatcher, 

2568 **kwargs, 

2569 ) 

2570 

2571 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

2572 """ 

2573 The pubsub node will be set according to the passed node, host and port 

2574 When none of the node, host, or port are specified - the node is set 

2575 to None and will be determined by the keyslot of the channel in the 

2576 first command to be executed. 

2577 RedisClusterException will be thrown if the passed node does not exist 

2578 in the cluster. 

2579 If host is passed without port, or vice versa, a DataError will be 

2580 thrown. 

2581 :type cluster: RedisCluster 

2582 :type node: ClusterNode 

2583 :type host: str 

2584 :type port: int 

2585 """ 

2586 if node is not None: 

2587 # node is passed by the user 

2588 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

2589 pubsub_node = node 

2590 elif host is not None and port is not None: 

2591 # host and port passed by the user 

2592 node = cluster.get_node(host=host, port=port) 

2593 self._raise_on_invalid_node(cluster, node, host, port) 

2594 pubsub_node = node 

2595 elif any([host, port]) is True: 

2596 # only 'host' or 'port' passed 

2597 raise DataError("Passing a host requires passing a port, and vice versa") 

2598 else: 

2599 # nothing passed by the user. set node to None 

2600 pubsub_node = None 

2601 

2602 self.node = pubsub_node 

2603 

2604 def get_pubsub_node(self): 

2605 """ 

2606 Get the node that is being used as the pubsub connection 

2607 """ 

2608 return self.node 

2609 

2610 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

2611 """ 

2612 Raise a RedisClusterException if the node is None or doesn't exist in 

2613 the cluster. 

2614 """ 

2615 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

2616 raise RedisClusterException( 

2617 f"Node {host}:{port} doesn't exist in the cluster" 

2618 ) 

2619 

2620 def execute_command(self, *args): 

2621 """ 

2622 Execute a subscribe/unsubscribe command. 

2623 

2624 Taken code from redis-py and tweak to make it work within a cluster. 

2625 """ 

2626 # NOTE: don't parse the response in this function -- it could pull a 

2627 # legitimate message off the stack if the connection is already 

2628 # subscribed to one or more channels 

2629 

2630 if self.connection is None: 

2631 if self.connection_pool is None: 

2632 if len(args) > 1: 

2633 # Hash the first channel and get one of the nodes holding 

2634 # this slot 

2635 channel = args[1] 

2636 slot = self.cluster.keyslot(channel) 

2637 node = self.cluster.nodes_manager.get_node_from_slot( 

2638 slot, 

2639 self.cluster.read_from_replicas, 

2640 self.cluster.load_balancing_strategy, 

2641 ) 

2642 else: 

2643 # Get a random node 

2644 node = self.cluster.get_random_node() 

2645 self.node = node 

2646 redis_connection = self.cluster.get_redis_connection(node) 

2647 self.connection_pool = redis_connection.connection_pool 

2648 self.connection = self.connection_pool.get_connection() 

2649 # register a callback that re-subscribes to any channels we 

2650 # were listening to when we were disconnected 

2651 self.connection.register_connect_callback(self.on_connect) 

2652 if self.push_handler_func is not None: 

2653 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

2654 self._event_dispatcher.dispatch( 

2655 AfterPubSubConnectionInstantiationEvent( 

2656 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

2657 ) 

2658 ) 

2659 connection = self.connection 

2660 self._execute(connection, connection.send_command, *args) 

2661 

2662 def _get_node_pubsub(self, node): 

2663 try: 

2664 return self.node_pubsub_mapping[node.name] 

2665 except KeyError: 

2666 pubsub = node.redis_connection.pubsub( 

2667 push_handler_func=self.push_handler_func 

2668 ) 

2669 self.node_pubsub_mapping[node.name] = pubsub 

2670 return pubsub 

2671 

2672 def _sharded_message_generator(self): 

2673 for _ in range(len(self.node_pubsub_mapping)): 

2674 pubsub = next(self._pubsubs_generator) 

2675 message = pubsub.get_message() 

2676 if message is not None: 

2677 return message 

2678 return None 

2679 

2680 def _pubsubs_generator(self): 

2681 while True: 

2682 current_nodes = list(self.node_pubsub_mapping.values()) 

2683 yield from current_nodes 

2684 

2685 def get_sharded_message( 

2686 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None 

2687 ): 

2688 if target_node: 

2689 message = self.node_pubsub_mapping[target_node.name].get_message( 

2690 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout 

2691 ) 

2692 else: 

2693 message = self._sharded_message_generator() 

2694 if message is None: 

2695 return None 

2696 elif str_if_bytes(message["type"]) == "sunsubscribe": 

2697 if message["channel"] in self.pending_unsubscribe_shard_channels: 

2698 self.pending_unsubscribe_shard_channels.remove(message["channel"]) 

2699 self.shard_channels.pop(message["channel"], None) 

2700 node = self.cluster.get_node_from_key(message["channel"]) 

2701 if self.node_pubsub_mapping[node.name].subscribed is False: 

2702 self.node_pubsub_mapping.pop(node.name) 

2703 if not self.channels and not self.patterns and not self.shard_channels: 

2704 # There are no subscriptions anymore, set subscribed_event flag 

2705 # to false 

2706 self.subscribed_event.clear() 

2707 if self.ignore_subscribe_messages or ignore_subscribe_messages: 

2708 return None 

2709 return message 

2710 

2711 def ssubscribe(self, *args, **kwargs): 

2712 if args: 

2713 args = list_or_args(args[0], args[1:]) 

2714 s_channels = dict.fromkeys(args) 

2715 s_channels.update(kwargs) 

2716 for s_channel, handler in s_channels.items(): 

2717 node = self.cluster.get_node_from_key(s_channel) 

2718 pubsub = self._get_node_pubsub(node) 

2719 if handler: 

2720 pubsub.ssubscribe(**{s_channel: handler}) 

2721 else: 

2722 pubsub.ssubscribe(s_channel) 

2723 self.shard_channels.update(pubsub.shard_channels) 

2724 self.pending_unsubscribe_shard_channels.difference_update( 

2725 self._normalize_keys({s_channel: None}) 

2726 ) 

2727 if pubsub.subscribed and not self.subscribed: 

2728 self.subscribed_event.set() 

2729 self.health_check_response_counter = 0 

2730 

2731 def sunsubscribe(self, *args): 

2732 if args: 

2733 args = list_or_args(args[0], args[1:]) 

2734 else: 

2735 args = self.shard_channels 

2736 

2737 for s_channel in args: 

2738 node = self.cluster.get_node_from_key(s_channel) 

2739 p = self._get_node_pubsub(node) 

2740 p.sunsubscribe(s_channel) 

2741 self.pending_unsubscribe_shard_channels.update( 

2742 p.pending_unsubscribe_shard_channels 

2743 ) 

2744 

2745 def get_redis_connection(self): 

2746 """ 

2747 Get the Redis connection of the pubsub connected node. 

2748 """ 

2749 if self.node is not None: 

2750 return self.node.redis_connection 

2751 

2752 def disconnect(self): 

2753 """ 

2754 Disconnect the pubsub connection. 

2755 """ 

2756 if self.connection: 

2757 self.connection.disconnect() 

2758 for pubsub in self.node_pubsub_mapping.values(): 

2759 pubsub.connection.disconnect() 

2760 

2761 

2762class ClusterPipeline(RedisCluster): 

2763 """ 

2764 Support for Redis pipeline 

2765 in cluster mode 

2766 """ 

2767 

2768 ERRORS_ALLOW_RETRY = ( 

2769 ConnectionError, 

2770 TimeoutError, 

2771 MovedError, 

2772 AskError, 

2773 TryAgainError, 

2774 ) 

2775 

2776 NO_SLOTS_COMMANDS = {"UNWATCH"} 

2777 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

2778 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

2779 

2780 @deprecated_args( 

2781 args_to_warn=[ 

2782 "cluster_error_retry_attempts", 

2783 ], 

2784 reason="Please configure the 'retry' object instead", 

2785 version="6.0.0", 

2786 ) 

2787 def __init__( 

2788 self, 

2789 nodes_manager: "NodesManager", 

2790 commands_parser: "CommandsParser", 

2791 result_callbacks: Optional[Dict[str, Callable]] = None, 

2792 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

2793 startup_nodes: Optional[List["ClusterNode"]] = None, 

2794 read_from_replicas: bool = False, 

2795 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

2796 cluster_error_retry_attempts: int = 3, 

2797 reinitialize_steps: int = 5, 

2798 retry: Optional[Retry] = None, 

2799 lock=None, 

2800 transaction=False, 

2801 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

2802 event_dispatcher: Optional["EventDispatcher"] = None, 

2803 **kwargs, 

2804 ): 

2805 """ """ 

2806 self.command_stack = [] 

2807 self.nodes_manager = nodes_manager 

2808 self.commands_parser = commands_parser 

2809 self.refresh_table_asap = False 

2810 self.result_callbacks = ( 

2811 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

2812 ) 

2813 self.startup_nodes = startup_nodes if startup_nodes else [] 

2814 self.read_from_replicas = read_from_replicas 

2815 self.load_balancing_strategy = load_balancing_strategy 

2816 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

2817 self.cluster_response_callbacks = cluster_response_callbacks 

2818 self.reinitialize_counter = 0 

2819 self.reinitialize_steps = reinitialize_steps 

2820 if retry is not None: 

2821 self.retry = retry 

2822 else: 

2823 self.retry = Retry( 

2824 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

2825 retries=cluster_error_retry_attempts, 

2826 ) 

2827 

2828 self.encoder = Encoder( 

2829 kwargs.get("encoding", "utf-8"), 

2830 kwargs.get("encoding_errors", "strict"), 

2831 kwargs.get("decode_responses", False), 

2832 ) 

2833 if lock is None: 

2834 lock = threading.RLock() 

2835 self._lock = lock 

2836 self.parent_execute_command = super().execute_command 

2837 self._execution_strategy: ExecutionStrategy = ( 

2838 PipelineStrategy(self) if not transaction else TransactionStrategy(self) 

2839 ) 

2840 

2841 # For backward compatibility, mapping from existing policies to new one 

2842 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

2843 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

2844 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

2845 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

2846 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

2847 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

2848 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

2849 } 

2850 

2851 self._policies_callback_mapping: dict[ 

2852 Union[RequestPolicy, ResponsePolicy], Callable 

2853 ] = { 

2854 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

2855 self.get_random_primary_or_all_nodes(command_name) 

2856 ], 

2857 RequestPolicy.DEFAULT_KEYED: lambda command, 

2858 *args: self.get_nodes_from_slot(command, *args), 

2859 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

2860 RequestPolicy.ALL_SHARDS: self.get_primaries, 

2861 RequestPolicy.ALL_NODES: self.get_nodes, 

2862 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

2863 RequestPolicy.MULTI_SHARD: lambda *args, 

2864 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

2865 RequestPolicy.SPECIAL: self.get_special_nodes, 

2866 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

2867 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

2868 } 

2869 

2870 self._policy_resolver = policy_resolver 

2871 

2872 if event_dispatcher is None: 

2873 self._event_dispatcher = EventDispatcher() 

2874 else: 

2875 self._event_dispatcher = event_dispatcher 

2876 

2877 def __repr__(self): 

2878 """ """ 

2879 return f"{type(self).__name__}" 

2880 

2881 def __enter__(self): 

2882 """ """ 

2883 return self 

2884 

2885 def __exit__(self, exc_type, exc_value, traceback): 

2886 """ """ 

2887 self.reset() 

2888 

2889 def __del__(self): 

2890 try: 

2891 self.reset() 

2892 except Exception: 

2893 pass 

2894 

2895 def __len__(self): 

2896 """ """ 

2897 return len(self._execution_strategy.command_queue) 

2898 

2899 def __bool__(self): 

2900 "Pipeline instances should always evaluate to True on Python 3+" 

2901 return True 

2902 

2903 def execute_command(self, *args, **kwargs): 

2904 """ 

2905 Wrapper function for pipeline_execute_command 

2906 """ 

2907 return self._execution_strategy.execute_command(*args, **kwargs) 

2908 

2909 def pipeline_execute_command(self, *args, **options): 

2910 """ 

2911 Stage a command to be executed when execute() is next called 

2912 

2913 Returns the current Pipeline object back so commands can be 

2914 chained together, such as: 

2915 

2916 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

2917 

2918 At some other point, you can then run: pipe.execute(), 

2919 which will execute all commands queued in the pipe. 

2920 """ 

2921 return self._execution_strategy.execute_command(*args, **options) 

2922 

2923 def annotate_exception(self, exception, number, command): 

2924 """ 

2925 Provides extra context to the exception prior to it being handled 

2926 """ 

2927 self._execution_strategy.annotate_exception(exception, number, command) 

2928 

2929 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2930 """ 

2931 Execute all the commands in the current pipeline 

2932 """ 

2933 

2934 try: 

2935 return self._execution_strategy.execute(raise_on_error) 

2936 finally: 

2937 self.reset() 

2938 

2939 def reset(self): 

2940 """ 

2941 Reset back to empty pipeline. 

2942 """ 

2943 self._execution_strategy.reset() 

2944 

2945 def send_cluster_commands( 

2946 self, stack, raise_on_error=True, allow_redirections=True 

2947 ): 

2948 return self._execution_strategy.send_cluster_commands( 

2949 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections 

2950 ) 

2951 

2952 def exists(self, *keys): 

2953 return self._execution_strategy.exists(*keys) 

2954 

2955 def eval(self): 

2956 """ """ 

2957 return self._execution_strategy.eval() 

2958 

2959 def multi(self): 

2960 """ 

2961 Start a transactional block of the pipeline after WATCH commands 

2962 are issued. End the transactional block with `execute`. 

2963 """ 

2964 self._execution_strategy.multi() 

2965 

2966 def load_scripts(self): 

2967 """ """ 

2968 self._execution_strategy.load_scripts() 

2969 

2970 def discard(self): 

2971 """ """ 

2972 self._execution_strategy.discard() 

2973 

2974 def watch(self, *names): 

2975 """Watches the values at keys ``names``""" 

2976 self._execution_strategy.watch(*names) 

2977 

2978 def unwatch(self): 

2979 """Unwatches all previously specified keys""" 

2980 self._execution_strategy.unwatch() 

2981 

2982 def script_load_for_pipeline(self, *args, **kwargs): 

2983 self._execution_strategy.script_load_for_pipeline(*args, **kwargs) 

2984 

2985 def delete(self, *names): 

2986 self._execution_strategy.delete(*names) 

2987 

2988 def unlink(self, *names): 

2989 self._execution_strategy.unlink(*names) 

2990 

2991 

2992def block_pipeline_command(name: str) -> Callable[..., Any]: 

2993 """ 

2994 Prints error because some pipelined commands should 

2995 be blocked when running in cluster-mode 

2996 """ 

2997 

2998 def inner(*args, **kwargs): 

2999 raise RedisClusterException( 

3000 f"ERROR: Calling pipelined function {name} is blocked " 

3001 f"when running redis in cluster mode..." 

3002 ) 

3003 

3004 return inner 

3005 

3006 

3007# Blocked pipeline commands 

3008PIPELINE_BLOCKED_COMMANDS = ( 

3009 "BGREWRITEAOF", 

3010 "BGSAVE", 

3011 "BITOP", 

3012 "BRPOPLPUSH", 

3013 "CLIENT GETNAME", 

3014 "CLIENT KILL", 

3015 "CLIENT LIST", 

3016 "CLIENT SETNAME", 

3017 "CLIENT", 

3018 "CONFIG GET", 

3019 "CONFIG RESETSTAT", 

3020 "CONFIG REWRITE", 

3021 "CONFIG SET", 

3022 "CONFIG", 

3023 "DBSIZE", 

3024 "ECHO", 

3025 "EVALSHA", 

3026 "FLUSHALL", 

3027 "FLUSHDB", 

3028 "INFO", 

3029 "KEYS", 

3030 "LASTSAVE", 

3031 "MGET", 

3032 "MGET NONATOMIC", 

3033 "MOVE", 

3034 "MSET", 

3035 "MSETEX", 

3036 "MSET NONATOMIC", 

3037 "MSETNX", 

3038 "PFCOUNT", 

3039 "PFMERGE", 

3040 "PING", 

3041 "PUBLISH", 

3042 "RANDOMKEY", 

3043 "READONLY", 

3044 "READWRITE", 

3045 "RENAME", 

3046 "RENAMENX", 

3047 "RPOPLPUSH", 

3048 "SAVE", 

3049 "SCAN", 

3050 "SCRIPT EXISTS", 

3051 "SCRIPT FLUSH", 

3052 "SCRIPT KILL", 

3053 "SCRIPT LOAD", 

3054 "SCRIPT", 

3055 "SDIFF", 

3056 "SDIFFSTORE", 

3057 "SENTINEL GET MASTER ADDR BY NAME", 

3058 "SENTINEL MASTER", 

3059 "SENTINEL MASTERS", 

3060 "SENTINEL MONITOR", 

3061 "SENTINEL REMOVE", 

3062 "SENTINEL SENTINELS", 

3063 "SENTINEL SET", 

3064 "SENTINEL SLAVES", 

3065 "SENTINEL", 

3066 "SHUTDOWN", 

3067 "SINTER", 

3068 "SINTERSTORE", 

3069 "SLAVEOF", 

3070 "SLOWLOG GET", 

3071 "SLOWLOG LEN", 

3072 "SLOWLOG RESET", 

3073 "SLOWLOG", 

3074 "SMOVE", 

3075 "SORT", 

3076 "SUNION", 

3077 "SUNIONSTORE", 

3078 "TIME", 

3079) 

3080for command in PIPELINE_BLOCKED_COMMANDS: 

3081 command = command.replace(" ", "_").lower() 

3082 

3083 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

3084 

3085 

3086class PipelineCommand: 

3087 """ """ 

3088 

3089 def __init__(self, args, options=None, position=None): 

3090 self.args = args 

3091 if options is None: 

3092 options = {} 

3093 self.options = options 

3094 self.position = position 

3095 self.result = None 

3096 self.node = None 

3097 self.asking = False 

3098 self.command_policies: Optional[CommandPolicies] = None 

3099 

3100 

3101class NodeCommands: 

3102 """ """ 

3103 

3104 def __init__( 

3105 self, parse_response, connection_pool: ConnectionPool, connection: Connection 

3106 ): 

3107 """ """ 

3108 self.parse_response = parse_response 

3109 self.connection_pool = connection_pool 

3110 self.connection = connection 

3111 self.commands = [] 

3112 

3113 def append(self, c): 

3114 """ """ 

3115 self.commands.append(c) 

3116 

3117 def write(self): 

3118 """ 

3119 Code borrowed from Redis so it can be fixed 

3120 """ 

3121 connection = self.connection 

3122 commands = self.commands 

3123 

3124 # We are going to clobber the commands with the write, so go ahead 

3125 # and ensure that nothing is sitting there from a previous run. 

3126 for c in commands: 

3127 c.result = None 

3128 

3129 # build up all commands into a single request to increase network perf 

3130 # send all the commands and catch connection and timeout errors. 

3131 try: 

3132 connection.send_packed_command( 

3133 connection.pack_commands([c.args for c in commands]) 

3134 ) 

3135 except (ConnectionError, TimeoutError) as e: 

3136 for c in commands: 

3137 c.result = e 

3138 

3139 def read(self): 

3140 """ """ 

3141 connection = self.connection 

3142 for c in self.commands: 

3143 # if there is a result on this command, 

3144 # it means we ran into an exception 

3145 # like a connection error. Trying to parse 

3146 # a response on a connection that 

3147 # is no longer open will result in a 

3148 # connection error raised by redis-py. 

3149 # but redis-py doesn't check in parse_response 

3150 # that the sock object is 

3151 # still set and if you try to 

3152 # read from a closed connection, it will 

3153 # result in an AttributeError because 

3154 # it will do a readline() call on None. 

3155 # This can have all kinds of nasty side-effects. 

3156 # Treating this case as a connection error 

3157 # is fine because it will dump 

3158 # the connection object back into the 

3159 # pool and on the next write, it will 

3160 # explicitly open the connection and all will be well. 

3161 if c.result is None: 

3162 try: 

3163 c.result = self.parse_response(connection, c.args[0], **c.options) 

3164 except (ConnectionError, TimeoutError) as e: 

3165 for c in self.commands: 

3166 c.result = e 

3167 return 

3168 except RedisError: 

3169 c.result = sys.exc_info()[1] 

3170 

3171 

3172class ExecutionStrategy(ABC): 

3173 @property 

3174 @abstractmethod 

3175 def command_queue(self): 

3176 pass 

3177 

3178 @abstractmethod 

3179 def execute_command(self, *args, **kwargs): 

3180 """ 

3181 Execution flow for current execution strategy. 

3182 

3183 See: ClusterPipeline.execute_command() 

3184 """ 

3185 pass 

3186 

3187 @abstractmethod 

3188 def annotate_exception(self, exception, number, command): 

3189 """ 

3190 Annotate exception according to current execution strategy. 

3191 

3192 See: ClusterPipeline.annotate_exception() 

3193 """ 

3194 pass 

3195 

3196 @abstractmethod 

3197 def pipeline_execute_command(self, *args, **options): 

3198 """ 

3199 Pipeline execution flow for current execution strategy. 

3200 

3201 See: ClusterPipeline.pipeline_execute_command() 

3202 """ 

3203 pass 

3204 

3205 @abstractmethod 

3206 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3207 """ 

3208 Executes current execution strategy. 

3209 

3210 See: ClusterPipeline.execute() 

3211 """ 

3212 pass 

3213 

3214 @abstractmethod 

3215 def send_cluster_commands( 

3216 self, stack, raise_on_error=True, allow_redirections=True 

3217 ): 

3218 """ 

3219 Sends commands according to current execution strategy. 

3220 

3221 See: ClusterPipeline.send_cluster_commands() 

3222 """ 

3223 pass 

3224 

3225 @abstractmethod 

3226 def reset(self): 

3227 """ 

3228 Resets current execution strategy. 

3229 

3230 See: ClusterPipeline.reset() 

3231 """ 

3232 pass 

3233 

3234 @abstractmethod 

3235 def exists(self, *keys): 

3236 pass 

3237 

3238 @abstractmethod 

3239 def eval(self): 

3240 pass 

3241 

3242 @abstractmethod 

3243 def multi(self): 

3244 """ 

3245 Starts transactional context. 

3246 

3247 See: ClusterPipeline.multi() 

3248 """ 

3249 pass 

3250 

3251 @abstractmethod 

3252 def load_scripts(self): 

3253 pass 

3254 

3255 @abstractmethod 

3256 def watch(self, *names): 

3257 pass 

3258 

3259 @abstractmethod 

3260 def unwatch(self): 

3261 """ 

3262 Unwatches all previously specified keys 

3263 

3264 See: ClusterPipeline.unwatch() 

3265 """ 

3266 pass 

3267 

3268 @abstractmethod 

3269 def script_load_for_pipeline(self, *args, **kwargs): 

3270 pass 

3271 

3272 @abstractmethod 

3273 def delete(self, *names): 

3274 """ 

3275 "Delete a key specified by ``names``" 

3276 

3277 See: ClusterPipeline.delete() 

3278 """ 

3279 pass 

3280 

3281 @abstractmethod 

3282 def unlink(self, *names): 

3283 """ 

3284 "Unlink a key specified by ``names``" 

3285 

3286 See: ClusterPipeline.unlink() 

3287 """ 

3288 pass 

3289 

3290 @abstractmethod 

3291 def discard(self): 

3292 pass 

3293 

3294 

3295class AbstractStrategy(ExecutionStrategy): 

3296 def __init__( 

3297 self, 

3298 pipe: ClusterPipeline, 

3299 ): 

3300 self._command_queue: List[PipelineCommand] = [] 

3301 self._pipe = pipe 

3302 self._nodes_manager = self._pipe.nodes_manager 

3303 

3304 @property 

3305 def command_queue(self): 

3306 return self._command_queue 

3307 

3308 @command_queue.setter 

3309 def command_queue(self, queue: List[PipelineCommand]): 

3310 self._command_queue = queue 

3311 

3312 @abstractmethod 

3313 def execute_command(self, *args, **kwargs): 

3314 pass 

3315 

3316 def pipeline_execute_command(self, *args, **options): 

3317 self._command_queue.append( 

3318 PipelineCommand(args, options, len(self._command_queue)) 

3319 ) 

3320 return self._pipe 

3321 

3322 @abstractmethod 

3323 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3324 pass 

3325 

3326 @abstractmethod 

3327 def send_cluster_commands( 

3328 self, stack, raise_on_error=True, allow_redirections=True 

3329 ): 

3330 pass 

3331 

3332 @abstractmethod 

3333 def reset(self): 

3334 pass 

3335 

3336 def exists(self, *keys): 

3337 return self.execute_command("EXISTS", *keys) 

3338 

3339 def eval(self): 

3340 """ """ 

3341 raise RedisClusterException("method eval() is not implemented") 

3342 

3343 def load_scripts(self): 

3344 """ """ 

3345 raise RedisClusterException("method load_scripts() is not implemented") 

3346 

3347 def script_load_for_pipeline(self, *args, **kwargs): 

3348 """ """ 

3349 raise RedisClusterException( 

3350 "method script_load_for_pipeline() is not implemented" 

3351 ) 

3352 

3353 def annotate_exception(self, exception, number, command): 

3354 """ 

3355 Provides extra context to the exception prior to it being handled 

3356 """ 

3357 cmd = " ".join(map(safe_str, command)) 

3358 msg = ( 

3359 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

3360 f"caused error: {exception.args[0]}" 

3361 ) 

3362 exception.args = (msg,) + exception.args[1:] 

3363 

3364 

3365class PipelineStrategy(AbstractStrategy): 

3366 def __init__(self, pipe: ClusterPipeline): 

3367 super().__init__(pipe) 

3368 self.command_flags = pipe.command_flags 

3369 

3370 def execute_command(self, *args, **kwargs): 

3371 return self.pipeline_execute_command(*args, **kwargs) 

3372 

3373 def _raise_first_error(self, stack, start_time): 

3374 """ 

3375 Raise the first exception on the stack 

3376 """ 

3377 for c in stack: 

3378 r = c.result 

3379 if isinstance(r, Exception): 

3380 self.annotate_exception(r, c.position + 1, c.args) 

3381 

3382 record_operation_duration( 

3383 command_name="PIPELINE", 

3384 duration_seconds=time.monotonic() - start_time, 

3385 error=r, 

3386 ) 

3387 

3388 raise r 

3389 

3390 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3391 stack = self._command_queue 

3392 if not stack: 

3393 return [] 

3394 

3395 try: 

3396 return self.send_cluster_commands(stack, raise_on_error) 

3397 finally: 

3398 self.reset() 

3399 

3400 def reset(self): 

3401 """ 

3402 Reset back to empty pipeline. 

3403 """ 

3404 self._command_queue = [] 

3405 

3406 def send_cluster_commands( 

3407 self, stack, raise_on_error=True, allow_redirections=True 

3408 ): 

3409 """ 

3410 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling. 

3411 

3412 If one of the retryable exceptions has been thrown we assume that: 

3413 - connection_pool was disconnected 

3414 - connection_pool was reset 

3415 - refresh_table_asap set to True 

3416 

3417 It will try the number of times specified by 

3418 the retries in config option "self.retry" 

3419 which defaults to 3 unless manually configured. 

3420 

3421 If it reaches the number of times, the command will 

3422 raises ClusterDownException. 

3423 """ 

3424 if not stack: 

3425 return [] 

3426 retry_attempts = self._pipe.retry.get_retries() 

3427 while True: 

3428 try: 

3429 return self._send_cluster_commands( 

3430 stack, 

3431 raise_on_error=raise_on_error, 

3432 allow_redirections=allow_redirections, 

3433 ) 

3434 except RedisCluster.ERRORS_ALLOW_RETRY as e: 

3435 if retry_attempts > 0: 

3436 # Try again with the new cluster setup. All other errors 

3437 # should be raised. 

3438 retry_attempts -= 1 

3439 pass 

3440 else: 

3441 raise e 

3442 

3443 def _send_cluster_commands( 

3444 self, stack, raise_on_error=True, allow_redirections=True 

3445 ): 

3446 """ 

3447 Send a bunch of cluster commands to the redis cluster. 

3448 

3449 `allow_redirections` If the pipeline should follow 

3450 `ASK` & `MOVED` responses automatically. If set 

3451 to false it will raise RedisClusterException. 

3452 """ 

3453 # the first time sending the commands we send all of 

3454 # the commands that were queued up. 

3455 # if we have to run through it again, we only retry 

3456 # the commands that failed. 

3457 attempt = sorted(stack, key=lambda x: x.position) 

3458 is_default_node = False 

3459 # build a list of node objects based on node names we need to 

3460 nodes: dict[str, NodeCommands] = {} 

3461 nodes_written = 0 

3462 nodes_read = 0 

3463 

3464 try: 

3465 # as we move through each command that still needs to be processed, 

3466 # we figure out the slot number that command maps to, then from 

3467 # the slot determine the node. 

3468 for c in attempt: 

3469 command_policies = self._pipe._policy_resolver.resolve( 

3470 c.args[0].lower() 

3471 ) 

3472 # refer to our internal node -> slot table that 

3473 # tells us where a given command should route to. 

3474 # (it might be possible we have a cached node that no longer 

3475 # exists in the cluster, which is why we do this in a loop) 

3476 passed_targets = c.options.pop("target_nodes", None) 

3477 if passed_targets and not self._is_nodes_flag(passed_targets): 

3478 target_nodes = self._parse_target_nodes(passed_targets) 

3479 

3480 if not command_policies: 

3481 command_policies = CommandPolicies() 

3482 else: 

3483 if not command_policies: 

3484 command = c.args[0].upper() 

3485 if ( 

3486 len(c.args) >= 2 

3487 and f"{c.args[0]} {c.args[1]}".upper() 

3488 in self._pipe.command_flags 

3489 ): 

3490 command = f"{c.args[0]} {c.args[1]}".upper() 

3491 

3492 # We only could resolve key properties if command is not 

3493 # in a list of pre-defined request policies 

3494 command_flag = self.command_flags.get(command) 

3495 if not command_flag: 

3496 # Fallback to default policy 

3497 if not self._pipe.get_default_node(): 

3498 keys = None 

3499 else: 

3500 keys = self._pipe._get_command_keys(*c.args) 

3501 if not keys or len(keys) == 0: 

3502 command_policies = CommandPolicies() 

3503 else: 

3504 command_policies = CommandPolicies( 

3505 request_policy=RequestPolicy.DEFAULT_KEYED, 

3506 response_policy=ResponsePolicy.DEFAULT_KEYED, 

3507 ) 

3508 else: 

3509 if command_flag in self._pipe._command_flags_mapping: 

3510 command_policies = CommandPolicies( 

3511 request_policy=self._pipe._command_flags_mapping[ 

3512 command_flag 

3513 ] 

3514 ) 

3515 else: 

3516 command_policies = CommandPolicies() 

3517 

3518 target_nodes = self._determine_nodes( 

3519 *c.args, 

3520 request_policy=command_policies.request_policy, 

3521 node_flag=passed_targets, 

3522 ) 

3523 if not target_nodes: 

3524 raise RedisClusterException( 

3525 f"No targets were found to execute {c.args} command on" 

3526 ) 

3527 c.command_policies = command_policies 

3528 if len(target_nodes) > 1: 

3529 raise RedisClusterException( 

3530 f"Too many targets for command {c.args}" 

3531 ) 

3532 

3533 node = target_nodes[0] 

3534 if node == self._pipe.get_default_node(): 

3535 is_default_node = True 

3536 

3537 # now that we know the name of the node 

3538 # ( it's just a string in the form of host:port ) 

3539 # we can build a list of commands for each node. 

3540 node_name = node.name 

3541 if node_name not in nodes: 

3542 redis_node = self._pipe.get_redis_connection(node) 

3543 try: 

3544 connection = get_connection(redis_node) 

3545 except (ConnectionError, TimeoutError): 

3546 # Release any connections we've already acquired before clearing nodes 

3547 for n in nodes.values(): 

3548 n.connection_pool.release(n.connection) 

3549 # Connection retries are being handled in the node's 

3550 # Retry object. Reinitialize the node -> slot table. 

3551 self._nodes_manager.initialize() 

3552 if is_default_node: 

3553 self._pipe.replace_default_node() 

3554 nodes = {} 

3555 raise 

3556 nodes[node_name] = NodeCommands( 

3557 redis_node.parse_response, 

3558 redis_node.connection_pool, 

3559 connection, 

3560 ) 

3561 nodes[node_name].append(c) 

3562 

3563 # send the commands in sequence. 

3564 # we write to all the open sockets for each node first, 

3565 # before reading anything 

3566 # this allows us to flush all the requests out across the 

3567 # network 

3568 # so that we can read them from different sockets as they come back. 

3569 # we don't multiplex on the sockets as they come available, 

3570 # but that shouldn't make too much difference. 

3571 

3572 # Start timing for observability 

3573 start_time = time.monotonic() 

3574 

3575 node_commands = nodes.values() 

3576 for n in node_commands: 

3577 nodes_written += 1 

3578 n.write() 

3579 

3580 for n in node_commands: 

3581 n.read() 

3582 

3583 # Find the first error in this node's commands, if any 

3584 node_error = None 

3585 for cmd in n.commands: 

3586 if isinstance(cmd.result, Exception): 

3587 node_error = cmd.result 

3588 break 

3589 

3590 record_operation_duration( 

3591 command_name="PIPELINE", 

3592 duration_seconds=time.monotonic() - start_time, 

3593 server_address=n.connection.host, 

3594 server_port=n.connection.port, 

3595 db_namespace=str(n.connection.db), 

3596 error=node_error, 

3597 ) 

3598 nodes_read += 1 

3599 finally: 

3600 # release all the redis connections we allocated earlier 

3601 # back into the connection pool. 

3602 # if the connection is dirty (that is: we've written 

3603 # commands to it, but haven't read the responses), we need 

3604 # to close the connection before returning it to the pool. 

3605 # otherwise, the next caller to use this connection will 

3606 # read the response from _this_ request, not its own request. 

3607 # disconnecting discards the dirty state & forces the next 

3608 # caller to reconnect. 

3609 # NOTE: dicts have a consistent ordering; we're iterating 

3610 # through nodes.values() in the same order as we are when 

3611 # reading / writing to the connections above, which is critical 

3612 # for how we're using the nodes_written/nodes_read offsets. 

3613 for i, n in enumerate(nodes.values()): 

3614 if i < nodes_written and i >= nodes_read: 

3615 n.connection.disconnect() 

3616 n.connection_pool.release(n.connection) 

3617 

3618 # if the response isn't an exception it is a 

3619 # valid response from the node 

3620 # we're all done with that command, YAY! 

3621 # if we have more commands to attempt, we've run into problems. 

3622 # collect all the commands we are allowed to retry. 

3623 # (MOVED, ASK, or connection errors or timeout errors) 

3624 attempt = sorted( 

3625 ( 

3626 c 

3627 for c in attempt 

3628 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

3629 ), 

3630 key=lambda x: x.position, 

3631 ) 

3632 if attempt and allow_redirections: 

3633 # RETRY MAGIC HAPPENS HERE! 

3634 # send these remaining commands one at a time using `execute_command` 

3635 # in the main client. This keeps our retry logic 

3636 # in one place mostly, 

3637 # and allows us to be more confident in correctness of behavior. 

3638 # at this point any speed gains from pipelining have been lost 

3639 # anyway, so we might as well make the best 

3640 # attempt to get the correct behavior. 

3641 # 

3642 # The client command will handle retries for each 

3643 # individual command sequentially as we pass each 

3644 # one into `execute_command`. Any exceptions 

3645 # that bubble out should only appear once all 

3646 # retries have been exhausted. 

3647 # 

3648 # If a lot of commands have failed, we'll be setting the 

3649 # flag to rebuild the slots table from scratch. 

3650 # So MOVED errors should correct themselves fairly quickly. 

3651 self._pipe.reinitialize_counter += 1 

3652 if self._pipe._should_reinitialized(): 

3653 self._nodes_manager.initialize() 

3654 if is_default_node: 

3655 self._pipe.replace_default_node() 

3656 for c in attempt: 

3657 try: 

3658 # send each command individually like we 

3659 # do in the main client. 

3660 c.result = self._pipe.parent_execute_command(*c.args, **c.options) 

3661 except RedisError as e: 

3662 c.result = e 

3663 

3664 # turn the response back into a simple flat array that corresponds 

3665 # to the sequence of commands issued in the stack in pipeline.execute() 

3666 response = [] 

3667 for c in sorted(stack, key=lambda x: x.position): 

3668 if c.args[0] in self._pipe.cluster_response_callbacks: 

3669 # Remove keys entry, it needs only for cache. 

3670 c.options.pop("keys", None) 

3671 c.result = self._pipe._policies_callback_mapping[ 

3672 c.command_policies.response_policy 

3673 ]( 

3674 self._pipe.cluster_response_callbacks[c.args[0]]( 

3675 c.result, **c.options 

3676 ) 

3677 ) 

3678 response.append(c.result) 

3679 

3680 if raise_on_error: 

3681 self._raise_first_error(stack, start_time) 

3682 

3683 return response 

3684 

3685 def _is_nodes_flag(self, target_nodes): 

3686 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags 

3687 

3688 def _parse_target_nodes(self, target_nodes): 

3689 if isinstance(target_nodes, list): 

3690 nodes = target_nodes 

3691 elif isinstance(target_nodes, ClusterNode): 

3692 # Supports passing a single ClusterNode as a variable 

3693 nodes = [target_nodes] 

3694 elif isinstance(target_nodes, dict): 

3695 # Supports dictionaries of the format {node_name: node}. 

3696 # It enables to execute commands with multi nodes as follows: 

3697 # rc.cluster_save_config(rc.get_primaries()) 

3698 nodes = target_nodes.values() 

3699 else: 

3700 raise TypeError( 

3701 "target_nodes type can be one of the following: " 

3702 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

3703 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

3704 f"The passed type is {type(target_nodes)}" 

3705 ) 

3706 return nodes 

3707 

3708 def _determine_nodes( 

3709 self, *args, request_policy: RequestPolicy, **kwargs 

3710 ) -> List["ClusterNode"]: 

3711 # Determine which nodes should be executed the command on. 

3712 # Returns a list of target nodes. 

3713 command = args[0].upper() 

3714 if ( 

3715 len(args) >= 2 

3716 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags 

3717 ): 

3718 command = f"{args[0]} {args[1]}".upper() 

3719 

3720 nodes_flag = kwargs.pop("nodes_flag", None) 

3721 if nodes_flag is not None: 

3722 # nodes flag passed by the user 

3723 command_flag = nodes_flag 

3724 else: 

3725 # get the nodes group for this command if it was predefined 

3726 command_flag = self._pipe.command_flags.get(command) 

3727 

3728 if command_flag in self._pipe._command_flags_mapping: 

3729 request_policy = self._pipe._command_flags_mapping[command_flag] 

3730 

3731 policy_callback = self._pipe._policies_callback_mapping[request_policy] 

3732 

3733 if request_policy == RequestPolicy.DEFAULT_KEYED: 

3734 nodes = policy_callback(command, *args) 

3735 elif request_policy == RequestPolicy.MULTI_SHARD: 

3736 nodes = policy_callback(*args, **kwargs) 

3737 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

3738 nodes = policy_callback(args[0]) 

3739 else: 

3740 nodes = policy_callback() 

3741 

3742 if args[0].lower() == "ft.aggregate": 

3743 self._aggregate_nodes = nodes 

3744 

3745 return nodes 

3746 

3747 def multi(self): 

3748 raise RedisClusterException( 

3749 "method multi() is not supported outside of transactional context" 

3750 ) 

3751 

3752 def discard(self): 

3753 raise RedisClusterException( 

3754 "method discard() is not supported outside of transactional context" 

3755 ) 

3756 

3757 def watch(self, *names): 

3758 raise RedisClusterException( 

3759 "method watch() is not supported outside of transactional context" 

3760 ) 

3761 

3762 def unwatch(self, *names): 

3763 raise RedisClusterException( 

3764 "method unwatch() is not supported outside of transactional context" 

3765 ) 

3766 

3767 def delete(self, *names): 

3768 if len(names) != 1: 

3769 raise RedisClusterException( 

3770 "deleting multiple keys is not implemented in pipeline command" 

3771 ) 

3772 

3773 return self.execute_command("DEL", names[0]) 

3774 

3775 def unlink(self, *names): 

3776 if len(names) != 1: 

3777 raise RedisClusterException( 

3778 "unlinking multiple keys is not implemented in pipeline command" 

3779 ) 

3780 

3781 return self.execute_command("UNLINK", names[0]) 

3782 

3783 

3784class TransactionStrategy(AbstractStrategy): 

3785 NO_SLOTS_COMMANDS = {"UNWATCH"} 

3786 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

3787 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

3788 SLOT_REDIRECT_ERRORS = (AskError, MovedError) 

3789 CONNECTION_ERRORS = ( 

3790 ConnectionError, 

3791 OSError, 

3792 ClusterDownError, 

3793 SlotNotCoveredError, 

3794 ) 

3795 

3796 def __init__(self, pipe: ClusterPipeline): 

3797 super().__init__(pipe) 

3798 self._explicit_transaction = False 

3799 self._watching = False 

3800 self._pipeline_slots: Set[int] = set() 

3801 self._transaction_connection: Optional[Connection] = None 

3802 self._executing = False 

3803 self._retry = copy(self._pipe.retry) 

3804 self._retry.update_supported_errors( 

3805 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS 

3806 ) 

3807 

3808 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]: 

3809 """ 

3810 Find a connection for a pipeline transaction. 

3811 

3812 For running an atomic transaction, watch keys ensure that contents have not been 

3813 altered as long as the watch commands for those keys were sent over the same 

3814 connection. So once we start watching a key, we fetch a connection to the 

3815 node that owns that slot and reuse it. 

3816 """ 

3817 if not self._pipeline_slots: 

3818 raise RedisClusterException( 

3819 "At least a command with a key is needed to identify a node" 

3820 ) 

3821 

3822 node: ClusterNode = self._nodes_manager.get_node_from_slot( 

3823 list(self._pipeline_slots)[0], False 

3824 ) 

3825 redis_node: Redis = self._pipe.get_redis_connection(node) 

3826 if self._transaction_connection: 

3827 if not redis_node.connection_pool.owns_connection( 

3828 self._transaction_connection 

3829 ): 

3830 previous_node = self._nodes_manager.find_connection_owner( 

3831 self._transaction_connection 

3832 ) 

3833 previous_node.connection_pool.release(self._transaction_connection) 

3834 self._transaction_connection = None 

3835 

3836 if not self._transaction_connection: 

3837 self._transaction_connection = get_connection(redis_node) 

3838 

3839 return redis_node, self._transaction_connection 

3840 

3841 def execute_command(self, *args, **kwargs): 

3842 slot_number: Optional[int] = None 

3843 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS: 

3844 slot_number = self._pipe.determine_slot(*args) 

3845 

3846 if ( 

3847 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS 

3848 ) and not self._explicit_transaction: 

3849 if args[0] == "WATCH": 

3850 self._validate_watch() 

3851 

3852 if slot_number is not None: 

3853 if self._pipeline_slots and slot_number not in self._pipeline_slots: 

3854 raise CrossSlotTransactionError( 

3855 "Cannot watch or send commands on different slots" 

3856 ) 

3857 

3858 self._pipeline_slots.add(slot_number) 

3859 elif args[0] not in self.NO_SLOTS_COMMANDS: 

3860 raise RedisClusterException( 

3861 f"Cannot identify slot number for command: {args[0]}," 

3862 "it cannot be triggered in a transaction" 

3863 ) 

3864 

3865 return self._immediate_execute_command(*args, **kwargs) 

3866 else: 

3867 if slot_number is not None: 

3868 self._pipeline_slots.add(slot_number) 

3869 

3870 return self.pipeline_execute_command(*args, **kwargs) 

3871 

3872 def _validate_watch(self): 

3873 if self._explicit_transaction: 

3874 raise RedisError("Cannot issue a WATCH after a MULTI") 

3875 

3876 self._watching = True 

3877 

3878 def _immediate_execute_command(self, *args, **options): 

3879 return self._retry.call_with_retry( 

3880 lambda: self._get_connection_and_send_command(*args, **options), 

3881 self._reinitialize_on_error, 

3882 with_failure_count=True, 

3883 ) 

3884 

3885 def _get_connection_and_send_command(self, *args, **options): 

3886 redis_node, connection = self._get_client_and_connection_for_transaction() 

3887 

3888 # Start timing for observability 

3889 start_time = time.monotonic() 

3890 

3891 try: 

3892 response = self._send_command_parse_response( 

3893 connection, redis_node, args[0], *args, **options 

3894 ) 

3895 

3896 record_operation_duration( 

3897 command_name=args[0], 

3898 duration_seconds=time.monotonic() - start_time, 

3899 server_address=connection.host, 

3900 server_port=connection.port, 

3901 db_namespace=str(connection.db), 

3902 ) 

3903 

3904 return response 

3905 except Exception as e: 

3906 if connection: 

3907 # this is used to report the metrics based on host and port info 

3908 e.connection = connection 

3909 record_operation_duration( 

3910 command_name=args[0], 

3911 duration_seconds=time.monotonic() - start_time, 

3912 server_address=connection.host, 

3913 server_port=connection.port, 

3914 db_namespace=str(connection.db), 

3915 error=e, 

3916 ) 

3917 raise 

3918 

3919 def _send_command_parse_response( 

3920 self, conn, redis_node: Redis, command_name, *args, **options 

3921 ): 

3922 """ 

3923 Send a command and parse the response 

3924 """ 

3925 

3926 conn.send_command(*args) 

3927 output = redis_node.parse_response(conn, command_name, **options) 

3928 

3929 if command_name in self.UNWATCH_COMMANDS: 

3930 self._watching = False 

3931 return output 

3932 

3933 def _reinitialize_on_error(self, error, failure_count): 

3934 if hasattr(error, "connection"): 

3935 record_error_count( 

3936 server_address=error.connection.host, 

3937 server_port=error.connection.port, 

3938 network_peer_address=error.connection.host, 

3939 network_peer_port=error.connection.port, 

3940 error_type=error, 

3941 retry_attempts=failure_count, 

3942 is_internal=True, 

3943 ) 

3944 

3945 if self._watching: 

3946 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: 

3947 raise WatchError("Slot rebalancing occurred while watching keys") 

3948 

3949 if ( 

3950 type(error) in self.SLOT_REDIRECT_ERRORS 

3951 or type(error) in self.CONNECTION_ERRORS 

3952 ): 

3953 if self._transaction_connection: 

3954 if is_debug_log_enabled(): 

3955 logger.debug( 

3956 f"Operation failed, " 

3957 f"with connection: {self._transaction_connection}, " 

3958 f"details: {self._transaction_connection.extract_connection_details()}", 

3959 ) 

3960 # Disconnect and release back to pool 

3961 self._transaction_connection.disconnect() 

3962 node = self._nodes_manager.find_connection_owner( 

3963 self._transaction_connection 

3964 ) 

3965 if node and node.redis_connection: 

3966 node.redis_connection.connection_pool.release( 

3967 self._transaction_connection 

3968 ) 

3969 self._transaction_connection = None 

3970 

3971 self._pipe.reinitialize_counter += 1 

3972 if self._pipe._should_reinitialized(): 

3973 self._nodes_manager.initialize() 

3974 self.reinitialize_counter = 0 

3975 else: 

3976 if isinstance(error, AskError): 

3977 self._nodes_manager.move_slot(error) 

3978 

3979 self._executing = False 

3980 

3981 def _raise_first_error(self, responses, stack, start_time): 

3982 """ 

3983 Raise the first exception on the stack 

3984 """ 

3985 for r, cmd in zip(responses, stack): 

3986 if isinstance(r, Exception): 

3987 self.annotate_exception(r, cmd.position + 1, cmd.args) 

3988 

3989 record_operation_duration( 

3990 command_name="TRANSACTION", 

3991 duration_seconds=time.monotonic() - start_time, 

3992 server_address=self._transaction_connection.host, 

3993 server_port=self._transaction_connection.port, 

3994 db_namespace=str(self._transaction_connection.db), 

3995 ) 

3996 

3997 raise r 

3998 

3999 def execute(self, raise_on_error: bool = True) -> List[Any]: 

4000 stack = self._command_queue 

4001 if not stack and (not self._watching or not self._pipeline_slots): 

4002 return [] 

4003 

4004 return self._execute_transaction_with_retries(stack, raise_on_error) 

4005 

4006 def _execute_transaction_with_retries( 

4007 self, stack: List["PipelineCommand"], raise_on_error: bool 

4008 ): 

4009 return self._retry.call_with_retry( 

4010 lambda: self._execute_transaction(stack, raise_on_error), 

4011 lambda error, failure_count: self._reinitialize_on_error( 

4012 error, failure_count 

4013 ), 

4014 with_failure_count=True, 

4015 ) 

4016 

4017 def _execute_transaction( 

4018 self, stack: List["PipelineCommand"], raise_on_error: bool 

4019 ): 

4020 if len(self._pipeline_slots) > 1: 

4021 raise CrossSlotTransactionError( 

4022 "All keys involved in a cluster transaction must map to the same slot" 

4023 ) 

4024 

4025 self._executing = True 

4026 

4027 redis_node, connection = self._get_client_and_connection_for_transaction() 

4028 

4029 stack = chain( 

4030 [PipelineCommand(("MULTI",))], 

4031 stack, 

4032 [PipelineCommand(("EXEC",))], 

4033 ) 

4034 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options] 

4035 packed_commands = connection.pack_commands(commands) 

4036 

4037 # Start timing for observability 

4038 start_time = time.monotonic() 

4039 

4040 connection.send_packed_command(packed_commands) 

4041 errors = [] 

4042 

4043 # parse off the response for MULTI 

4044 # NOTE: we need to handle ResponseErrors here and continue 

4045 # so that we read all the additional command messages from 

4046 # the socket 

4047 try: 

4048 redis_node.parse_response(connection, "MULTI") 

4049 except ResponseError as e: 

4050 self.annotate_exception(e, 0, "MULTI") 

4051 errors.append(e) 

4052 except self.CONNECTION_ERRORS as cluster_error: 

4053 self.annotate_exception(cluster_error, 0, "MULTI") 

4054 raise 

4055 

4056 # and all the other commands 

4057 for i, command in enumerate(self._command_queue): 

4058 if EMPTY_RESPONSE in command.options: 

4059 errors.append((i, command.options[EMPTY_RESPONSE])) 

4060 else: 

4061 try: 

4062 _ = redis_node.parse_response(connection, "_") 

4063 except self.SLOT_REDIRECT_ERRORS as slot_error: 

4064 self.annotate_exception(slot_error, i + 1, command.args) 

4065 errors.append(slot_error) 

4066 except self.CONNECTION_ERRORS as cluster_error: 

4067 self.annotate_exception(cluster_error, i + 1, command.args) 

4068 raise 

4069 except ResponseError as e: 

4070 self.annotate_exception(e, i + 1, command.args) 

4071 errors.append(e) 

4072 

4073 response = None 

4074 # parse the EXEC. 

4075 try: 

4076 response = redis_node.parse_response(connection, "EXEC") 

4077 except ExecAbortError: 

4078 if errors: 

4079 raise errors[0] 

4080 raise 

4081 

4082 self._executing = False 

4083 

4084 record_operation_duration( 

4085 command_name="TRANSACTION", 

4086 duration_seconds=time.monotonic() - start_time, 

4087 server_address=connection.host, 

4088 server_port=connection.port, 

4089 db_namespace=str(connection.db), 

4090 ) 

4091 

4092 # EXEC clears any watched keys 

4093 self._watching = False 

4094 

4095 if response is None: 

4096 raise WatchError("Watched variable changed.") 

4097 

4098 # put any parse errors into the response 

4099 for i, e in errors: 

4100 response.insert(i, e) 

4101 

4102 if len(response) != len(self._command_queue): 

4103 raise InvalidPipelineStack( 

4104 "Unexpected response length for cluster pipeline EXEC." 

4105 " Command stack was {} but response had length {}".format( 

4106 [c.args[0] for c in self._command_queue], len(response) 

4107 ) 

4108 ) 

4109 

4110 # find any errors in the response and raise if necessary 

4111 if raise_on_error or len(errors) > 0: 

4112 self._raise_first_error( 

4113 response, 

4114 self._command_queue, 

4115 start_time, 

4116 ) 

4117 

4118 # We have to run response callbacks manually 

4119 data = [] 

4120 for r, cmd in zip(response, self._command_queue): 

4121 if not isinstance(r, Exception): 

4122 command_name = cmd.args[0] 

4123 if command_name in self._pipe.cluster_response_callbacks: 

4124 r = self._pipe.cluster_response_callbacks[command_name]( 

4125 r, **cmd.options 

4126 ) 

4127 data.append(r) 

4128 return data 

4129 

4130 def reset(self): 

4131 self._command_queue = [] 

4132 

4133 # make sure to reset the connection state in the event that we were 

4134 # watching something 

4135 if self._transaction_connection: 

4136 try: 

4137 if self._watching: 

4138 # call this manually since our unwatch or 

4139 # immediate_execute_command methods can call reset() 

4140 self._transaction_connection.send_command("UNWATCH") 

4141 self._transaction_connection.read_response() 

4142 # we can safely return the connection to the pool here since we're 

4143 # sure we're no longer WATCHing anything 

4144 node = self._nodes_manager.find_connection_owner( 

4145 self._transaction_connection 

4146 ) 

4147 if node and node.redis_connection: 

4148 node.redis_connection.connection_pool.release( 

4149 self._transaction_connection 

4150 ) 

4151 self._transaction_connection = None 

4152 except self.CONNECTION_ERRORS: 

4153 # disconnect will also remove any previous WATCHes 

4154 if self._transaction_connection: 

4155 self._transaction_connection.disconnect() 

4156 node = self._nodes_manager.find_connection_owner( 

4157 self._transaction_connection 

4158 ) 

4159 if node and node.redis_connection: 

4160 node.redis_connection.connection_pool.release( 

4161 self._transaction_connection 

4162 ) 

4163 self._transaction_connection = None 

4164 

4165 # clean up the other instance attributes 

4166 self._watching = False 

4167 self._explicit_transaction = False 

4168 self._pipeline_slots = set() 

4169 self._executing = False 

4170 

4171 def send_cluster_commands( 

4172 self, stack, raise_on_error=True, allow_redirections=True 

4173 ): 

4174 raise NotImplementedError( 

4175 "send_cluster_commands cannot be executed in transactional context." 

4176 ) 

4177 

4178 def multi(self): 

4179 if self._explicit_transaction: 

4180 raise RedisError("Cannot issue nested calls to MULTI") 

4181 if self._command_queue: 

4182 raise RedisError( 

4183 "Commands without an initial WATCH have already been issued" 

4184 ) 

4185 self._explicit_transaction = True 

4186 

4187 def watch(self, *names): 

4188 if self._explicit_transaction: 

4189 raise RedisError("Cannot issue a WATCH after a MULTI") 

4190 

4191 return self.execute_command("WATCH", *names) 

4192 

4193 def unwatch(self): 

4194 if self._watching: 

4195 return self.execute_command("UNWATCH") 

4196 

4197 return True 

4198 

4199 def discard(self): 

4200 self.reset() 

4201 

4202 def delete(self, *names): 

4203 return self.execute_command("DEL", *names) 

4204 

4205 def unlink(self, *names): 

4206 return self.execute_command("UNLINK", *names)