Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1418 statements  

1import random 

2import socket 

3import sys 

4import threading 

5import time 

6from abc import ABC, abstractmethod 

7from collections import OrderedDict 

8from copy import copy 

9from enum import Enum 

10from itertools import chain 

11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union 

12 

13from redis._parsers import CommandsParser, Encoder 

14from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy 

15from redis._parsers.helpers import parse_scan 

16from redis.backoff import ExponentialWithJitterBackoff, NoBackoff 

17from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface 

18from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis 

19from redis.commands import READ_COMMANDS, RedisClusterCommands 

20from redis.commands.helpers import list_or_args 

21from redis.commands.policies import PolicyResolver, StaticPolicyResolver 

22from redis.connection import ( 

23 Connection, 

24 ConnectionPool, 

25 parse_url, 

26) 

27from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

28from redis.event import ( 

29 AfterPooledConnectionsInstantiationEvent, 

30 AfterPubSubConnectionInstantiationEvent, 

31 ClientType, 

32 EventDispatcher, 

33) 

34from redis.exceptions import ( 

35 AskError, 

36 AuthenticationError, 

37 ClusterDownError, 

38 ClusterError, 

39 ConnectionError, 

40 CrossSlotTransactionError, 

41 DataError, 

42 ExecAbortError, 

43 InvalidPipelineStack, 

44 MaxConnectionsError, 

45 MovedError, 

46 RedisClusterException, 

47 RedisError, 

48 ResponseError, 

49 SlotNotCoveredError, 

50 TimeoutError, 

51 TryAgainError, 

52 WatchError, 

53) 

54from redis.lock import Lock 

55from redis.maint_notifications import MaintNotificationsConfig 

56from redis.retry import Retry 

57from redis.utils import ( 

58 deprecated_args, 

59 dict_merge, 

60 list_keys_to_dict, 

61 merge_result, 

62 safe_str, 

63 str_if_bytes, 

64 truncate_text, 

65) 

66 

67 

68def get_node_name(host: str, port: Union[str, int]) -> str: 

69 return f"{host}:{port}" 

70 

71 

72@deprecated_args( 

73 allowed_args=["redis_node"], 

74 reason="Use get_connection(redis_node) instead", 

75 version="5.3.0", 

76) 

77def get_connection(redis_node: Redis, *args, **options) -> Connection: 

78 return redis_node.connection or redis_node.connection_pool.get_connection() 

79 

80 

81def parse_scan_result(command, res, **options): 

82 cursors = {} 

83 ret = [] 

84 for node_name, response in res.items(): 

85 cursor, r = parse_scan(response, **options) 

86 cursors[node_name] = cursor 

87 ret += r 

88 

89 return cursors, ret 

90 

91 

92def parse_pubsub_numsub(command, res, **options): 

93 numsub_d = OrderedDict() 

94 for numsub_tups in res.values(): 

95 for channel, numsubbed in numsub_tups: 

96 try: 

97 numsub_d[channel] += numsubbed 

98 except KeyError: 

99 numsub_d[channel] = numsubbed 

100 

101 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

102 return ret_numsub 

103 

104 

105def parse_cluster_slots( 

106 resp: Any, **options: Any 

107) -> Dict[Tuple[int, int], Dict[str, Any]]: 

108 current_host = options.get("current_host", "") 

109 

110 def fix_server(*args: Any) -> Tuple[str, Any]: 

111 return str_if_bytes(args[0]) or current_host, args[1] 

112 

113 slots = {} 

114 for slot in resp: 

115 start, end, primary = slot[:3] 

116 replicas = slot[3:] 

117 slots[start, end] = { 

118 "primary": fix_server(*primary), 

119 "replicas": [fix_server(*replica) for replica in replicas], 

120 } 

121 

122 return slots 

123 

124 

125def parse_cluster_shards(resp, **options): 

126 """ 

127 Parse CLUSTER SHARDS response. 

128 """ 

129 if isinstance(resp[0], dict): 

130 return resp 

131 shards = [] 

132 for x in resp: 

133 shard = {"slots": [], "nodes": []} 

134 for i in range(0, len(x[1]), 2): 

135 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

136 nodes = x[3] 

137 for node in nodes: 

138 dict_node = {} 

139 for i in range(0, len(node), 2): 

140 dict_node[node[i]] = node[i + 1] 

141 shard["nodes"].append(dict_node) 

142 shards.append(shard) 

143 

144 return shards 

145 

146 

147def parse_cluster_myshardid(resp, **options): 

148 """ 

149 Parse CLUSTER MYSHARDID response. 

150 """ 

151 return resp.decode("utf-8") 

152 

153 

154PRIMARY = "primary" 

155REPLICA = "replica" 

156SLOT_ID = "slot-id" 

157 

158REDIS_ALLOWED_KEYS = ( 

159 "connection_class", 

160 "connection_pool", 

161 "connection_pool_class", 

162 "client_name", 

163 "credential_provider", 

164 "db", 

165 "decode_responses", 

166 "encoding", 

167 "encoding_errors", 

168 "host", 

169 "lib_name", 

170 "lib_version", 

171 "max_connections", 

172 "nodes_flag", 

173 "redis_connect_func", 

174 "password", 

175 "port", 

176 "timeout", 

177 "queue_class", 

178 "retry", 

179 "retry_on_timeout", 

180 "protocol", 

181 "socket_connect_timeout", 

182 "socket_keepalive", 

183 "socket_keepalive_options", 

184 "socket_timeout", 

185 "ssl", 

186 "ssl_ca_certs", 

187 "ssl_ca_data", 

188 "ssl_certfile", 

189 "ssl_cert_reqs", 

190 "ssl_include_verify_flags", 

191 "ssl_exclude_verify_flags", 

192 "ssl_keyfile", 

193 "ssl_password", 

194 "ssl_check_hostname", 

195 "unix_socket_path", 

196 "username", 

197 "cache", 

198 "cache_config", 

199) 

200KWARGS_DISABLED_KEYS = ("host", "port", "retry") 

201 

202 

203def cleanup_kwargs(**kwargs): 

204 """ 

205 Remove unsupported or disabled keys from kwargs 

206 """ 

207 connection_kwargs = { 

208 k: v 

209 for k, v in kwargs.items() 

210 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

211 } 

212 

213 return connection_kwargs 

214 

215 

216class AbstractRedisCluster: 

217 RedisClusterRequestTTL = 16 

218 

219 PRIMARIES = "primaries" 

220 REPLICAS = "replicas" 

221 ALL_NODES = "all" 

222 RANDOM = "random" 

223 DEFAULT_NODE = "default-node" 

224 

225 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

226 

227 COMMAND_FLAGS = dict_merge( 

228 list_keys_to_dict( 

229 [ 

230 "ACL CAT", 

231 "ACL DELUSER", 

232 "ACL DRYRUN", 

233 "ACL GENPASS", 

234 "ACL GETUSER", 

235 "ACL HELP", 

236 "ACL LIST", 

237 "ACL LOG", 

238 "ACL LOAD", 

239 "ACL SAVE", 

240 "ACL SETUSER", 

241 "ACL USERS", 

242 "ACL WHOAMI", 

243 "AUTH", 

244 "CLIENT LIST", 

245 "CLIENT SETINFO", 

246 "CLIENT SETNAME", 

247 "CLIENT GETNAME", 

248 "CONFIG SET", 

249 "CONFIG REWRITE", 

250 "CONFIG RESETSTAT", 

251 "TIME", 

252 "PUBSUB CHANNELS", 

253 "PUBSUB NUMPAT", 

254 "PUBSUB NUMSUB", 

255 "PUBSUB SHARDCHANNELS", 

256 "PUBSUB SHARDNUMSUB", 

257 "PING", 

258 "INFO", 

259 "SHUTDOWN", 

260 "KEYS", 

261 "DBSIZE", 

262 "BGSAVE", 

263 "SLOWLOG GET", 

264 "SLOWLOG LEN", 

265 "SLOWLOG RESET", 

266 "WAIT", 

267 "WAITAOF", 

268 "SAVE", 

269 "MEMORY PURGE", 

270 "MEMORY MALLOC-STATS", 

271 "MEMORY STATS", 

272 "LASTSAVE", 

273 "CLIENT TRACKINGINFO", 

274 "CLIENT PAUSE", 

275 "CLIENT UNPAUSE", 

276 "CLIENT UNBLOCK", 

277 "CLIENT ID", 

278 "CLIENT REPLY", 

279 "CLIENT GETREDIR", 

280 "CLIENT INFO", 

281 "CLIENT KILL", 

282 "READONLY", 

283 "CLUSTER INFO", 

284 "CLUSTER MEET", 

285 "CLUSTER MYSHARDID", 

286 "CLUSTER NODES", 

287 "CLUSTER REPLICAS", 

288 "CLUSTER RESET", 

289 "CLUSTER SET-CONFIG-EPOCH", 

290 "CLUSTER SLOTS", 

291 "CLUSTER SHARDS", 

292 "CLUSTER COUNT-FAILURE-REPORTS", 

293 "CLUSTER KEYSLOT", 

294 "COMMAND", 

295 "COMMAND COUNT", 

296 "COMMAND LIST", 

297 "COMMAND GETKEYS", 

298 "CONFIG GET", 

299 "DEBUG", 

300 "RANDOMKEY", 

301 "READONLY", 

302 "READWRITE", 

303 "TIME", 

304 "TFUNCTION LOAD", 

305 "TFUNCTION DELETE", 

306 "TFUNCTION LIST", 

307 "TFCALL", 

308 "TFCALLASYNC", 

309 "LATENCY HISTORY", 

310 "LATENCY LATEST", 

311 "LATENCY RESET", 

312 "MODULE LIST", 

313 "MODULE LOAD", 

314 "MODULE UNLOAD", 

315 "MODULE LOADEX", 

316 ], 

317 DEFAULT_NODE, 

318 ), 

319 list_keys_to_dict( 

320 [ 

321 "FLUSHALL", 

322 "FLUSHDB", 

323 "FUNCTION DELETE", 

324 "FUNCTION FLUSH", 

325 "FUNCTION LIST", 

326 "FUNCTION LOAD", 

327 "FUNCTION RESTORE", 

328 "SCAN", 

329 "SCRIPT EXISTS", 

330 "SCRIPT FLUSH", 

331 "SCRIPT LOAD", 

332 ], 

333 PRIMARIES, 

334 ), 

335 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

336 list_keys_to_dict( 

337 [ 

338 "CLUSTER COUNTKEYSINSLOT", 

339 "CLUSTER DELSLOTS", 

340 "CLUSTER DELSLOTSRANGE", 

341 "CLUSTER GETKEYSINSLOT", 

342 "CLUSTER SETSLOT", 

343 ], 

344 SLOT_ID, 

345 ), 

346 ) 

347 

348 SEARCH_COMMANDS = ( 

349 [ 

350 "FT.CREATE", 

351 "FT.SEARCH", 

352 "FT.AGGREGATE", 

353 "FT.EXPLAIN", 

354 "FT.EXPLAINCLI", 

355 "FT,PROFILE", 

356 "FT.ALTER", 

357 "FT.DROPINDEX", 

358 "FT.ALIASADD", 

359 "FT.ALIASUPDATE", 

360 "FT.ALIASDEL", 

361 "FT.TAGVALS", 

362 "FT.SUGADD", 

363 "FT.SUGGET", 

364 "FT.SUGDEL", 

365 "FT.SUGLEN", 

366 "FT.SYNUPDATE", 

367 "FT.SYNDUMP", 

368 "FT.SPELLCHECK", 

369 "FT.DICTADD", 

370 "FT.DICTDEL", 

371 "FT.DICTDUMP", 

372 "FT.INFO", 

373 "FT._LIST", 

374 "FT.CONFIG", 

375 "FT.ADD", 

376 "FT.DEL", 

377 "FT.DROP", 

378 "FT.GET", 

379 "FT.MGET", 

380 "FT.SYNADD", 

381 ], 

382 ) 

383 

384 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

385 "CLUSTER SLOTS": parse_cluster_slots, 

386 "CLUSTER SHARDS": parse_cluster_shards, 

387 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

388 } 

389 

390 RESULT_CALLBACKS = dict_merge( 

391 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub), 

392 list_keys_to_dict( 

393 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

394 ), 

395 list_keys_to_dict( 

396 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result 

397 ), 

398 list_keys_to_dict( 

399 [ 

400 "PING", 

401 "CONFIG SET", 

402 "CONFIG REWRITE", 

403 "CONFIG RESETSTAT", 

404 "CLIENT SETNAME", 

405 "BGSAVE", 

406 "SLOWLOG RESET", 

407 "SAVE", 

408 "MEMORY PURGE", 

409 "CLIENT PAUSE", 

410 "CLIENT UNPAUSE", 

411 ], 

412 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

413 ), 

414 list_keys_to_dict( 

415 ["DBSIZE", "WAIT"], 

416 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

417 ), 

418 list_keys_to_dict( 

419 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

420 ), 

421 list_keys_to_dict(["SCAN"], parse_scan_result), 

422 list_keys_to_dict( 

423 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

424 ), 

425 list_keys_to_dict( 

426 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

427 ), 

428 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

429 ) 

430 

431 ERRORS_ALLOW_RETRY = ( 

432 ConnectionError, 

433 TimeoutError, 

434 ClusterDownError, 

435 SlotNotCoveredError, 

436 ) 

437 

438 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

439 """Replace the default cluster node. 

440 A random cluster node will be chosen if target_node isn't passed, and primaries 

441 will be prioritized. The default node will not be changed if there are no other 

442 nodes in the cluster. 

443 

444 Args: 

445 target_node (ClusterNode, optional): Target node to replace the default 

446 node. Defaults to None. 

447 """ 

448 if target_node: 

449 self.nodes_manager.default_node = target_node 

450 else: 

451 curr_node = self.get_default_node() 

452 primaries = [node for node in self.get_primaries() if node != curr_node] 

453 if primaries: 

454 # Choose a primary if the cluster contains different primaries 

455 self.nodes_manager.default_node = random.choice(primaries) 

456 else: 

457 # Otherwise, choose a primary if the cluster contains different primaries 

458 replicas = [node for node in self.get_replicas() if node != curr_node] 

459 if replicas: 

460 self.nodes_manager.default_node = random.choice(replicas) 

461 

462 

463class RedisCluster(AbstractRedisCluster, RedisClusterCommands): 

464 @classmethod 

465 def from_url(cls, url, **kwargs): 

466 """ 

467 Return a Redis client object configured from the given URL 

468 

469 For example:: 

470 

471 redis://[[username]:[password]]@localhost:6379/0 

472 rediss://[[username]:[password]]@localhost:6379/0 

473 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

474 

475 Three URL schemes are supported: 

476 

477 - `redis://` creates a TCP socket connection. See more at: 

478 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

479 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

480 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

481 - ``unix://``: creates a Unix Domain Socket connection. 

482 

483 The username, password, hostname, path and all querystring values 

484 are passed through urllib.parse.unquote in order to replace any 

485 percent-encoded values with their corresponding characters. 

486 

487 There are several ways to specify a database number. The first value 

488 found will be used: 

489 

490 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

491 2. If using the redis:// or rediss:// schemes, the path argument 

492 of the url, e.g. redis://localhost/0 

493 3. A ``db`` keyword argument to this function. 

494 

495 If none of these options are specified, the default db=0 is used. 

496 

497 All querystring options are cast to their appropriate Python types. 

498 Boolean arguments can be specified with string values "True"/"False" 

499 or "Yes"/"No". Values that cannot be properly cast cause a 

500 ``ValueError`` to be raised. Once parsed, the querystring arguments 

501 and keyword arguments are passed to the ``ConnectionPool``'s 

502 class initializer. In the case of conflicting arguments, querystring 

503 arguments always win. 

504 

505 """ 

506 return cls(url=url, **kwargs) 

507 

508 @deprecated_args( 

509 args_to_warn=["read_from_replicas"], 

510 reason="Please configure the 'load_balancing_strategy' instead", 

511 version="5.3.0", 

512 ) 

513 @deprecated_args( 

514 args_to_warn=[ 

515 "cluster_error_retry_attempts", 

516 ], 

517 reason="Please configure the 'retry' object instead", 

518 version="6.0.0", 

519 ) 

520 def __init__( 

521 self, 

522 host: Optional[str] = None, 

523 port: int = 6379, 

524 startup_nodes: Optional[List["ClusterNode"]] = None, 

525 cluster_error_retry_attempts: int = 3, 

526 retry: Optional["Retry"] = None, 

527 require_full_coverage: bool = True, 

528 reinitialize_steps: int = 5, 

529 read_from_replicas: bool = False, 

530 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None, 

531 dynamic_startup_nodes: bool = True, 

532 url: Optional[str] = None, 

533 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

534 cache: Optional[CacheInterface] = None, 

535 cache_config: Optional[CacheConfig] = None, 

536 event_dispatcher: Optional[EventDispatcher] = None, 

537 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

538 **kwargs, 

539 ): 

540 """ 

541 Initialize a new RedisCluster client. 

542 

543 :param startup_nodes: 

544 List of nodes from which initial bootstrapping can be done 

545 :param host: 

546 Can be used to point to a startup node 

547 :param port: 

548 Can be used to point to a startup node 

549 :param require_full_coverage: 

550 When set to False (default value): the client will not require a 

551 full coverage of the slots. However, if not all slots are covered, 

552 and at least one node has 'cluster-require-full-coverage' set to 

553 'yes,' the server will throw a ClusterDownError for some key-based 

554 commands. See - 

555 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

556 When set to True: all slots must be covered to construct the 

557 cluster client. If not all slots are covered, RedisClusterException 

558 will be thrown. 

559 :param read_from_replicas: 

560 @deprecated - please use load_balancing_strategy instead 

561 Enable read from replicas in READONLY mode. You can read possibly 

562 stale data. 

563 When set to true, read commands will be assigned between the 

564 primary and its replications in a Round-Robin manner. 

565 :param load_balancing_strategy: 

566 Enable read from replicas in READONLY mode and defines the load balancing 

567 strategy that will be used for cluster node selection. 

568 The data read from replicas is eventually consistent with the data in primary nodes. 

569 :param dynamic_startup_nodes: 

570 Set the RedisCluster's startup nodes to all of the discovered nodes. 

571 If true (default value), the cluster's discovered nodes will be used to 

572 determine the cluster nodes-slots mapping in the next topology refresh. 

573 It will remove the initial passed startup nodes if their endpoints aren't 

574 listed in the CLUSTER SLOTS output. 

575 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

576 specific IP addresses, it is best to set it to false. 

577 :param cluster_error_retry_attempts: 

578 @deprecated - Please configure the 'retry' object instead 

579 In case 'retry' object is set - this argument is ignored! 

580 

581 Number of times to retry before raising an error when 

582 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or 

583 :class:`~.ClusterDownError` are encountered 

584 :param retry: 

585 A retry object that defines the retry strategy and the number of 

586 retries for the cluster client. 

587 In current implementation for the cluster client (starting form redis-py version 6.0.0) 

588 the retry object is not yet fully utilized, instead it is used just to determine 

589 the number of retries for the cluster client. 

590 In the future releases the retry object will be used to handle the cluster client retries! 

591 :param reinitialize_steps: 

592 Specifies the number of MOVED errors that need to occur before 

593 reinitializing the whole cluster topology. If a MOVED error occurs 

594 and the cluster does not need to be reinitialized on this current 

595 error handling, only the MOVED slot will be patched with the 

596 redirected node. 

597 To reinitialize the cluster on every MOVED error, set 

598 reinitialize_steps to 1. 

599 To avoid reinitializing the cluster on moved errors, set 

600 reinitialize_steps to 0. 

601 :param address_remap: 

602 An optional callable which, when provided with an internal network 

603 address of a node, e.g. a `(host, port)` tuple, will return the address 

604 where the node is reachable. This can be used to map the addresses at 

605 which the nodes _think_ they are, to addresses at which a client may 

606 reach them, such as when they sit behind a proxy. 

607 

608 :**kwargs: 

609 Extra arguments that will be sent into Redis instance when created 

610 (See Official redis-py doc for supported kwargs - the only limitation 

611 is that you can't provide 'retry' object as part of kwargs. 

612 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

613 Some kwargs are not supported and will raise a 

614 RedisClusterException: 

615 - db (Redis do not support database SELECT in cluster mode) 

616 """ 

617 if startup_nodes is None: 

618 startup_nodes = [] 

619 

620 if "db" in kwargs: 

621 # Argument 'db' is not possible to use in cluster mode 

622 raise RedisClusterException( 

623 "Argument 'db' is not possible to use in cluster mode" 

624 ) 

625 

626 if "retry" in kwargs: 

627 # Argument 'retry' is not possible to be used in kwargs when in cluster mode 

628 # the kwargs are set to the lower level connections to the cluster nodes 

629 # and there we provide retry configuration without retries allowed. 

630 # The retries should be handled on cluster client level. 

631 raise RedisClusterException( 

632 "The 'retry' argument cannot be used in kwargs when running in cluster mode." 

633 ) 

634 

635 # Get the startup node/s 

636 from_url = False 

637 if url is not None: 

638 from_url = True 

639 url_options = parse_url(url) 

640 if "path" in url_options: 

641 raise RedisClusterException( 

642 "RedisCluster does not currently support Unix Domain " 

643 "Socket connections" 

644 ) 

645 if "db" in url_options and url_options["db"] != 0: 

646 # Argument 'db' is not possible to use in cluster mode 

647 raise RedisClusterException( 

648 "A ``db`` querystring option can only be 0 in cluster mode" 

649 ) 

650 kwargs.update(url_options) 

651 host = kwargs.get("host") 

652 port = kwargs.get("port", port) 

653 startup_nodes.append(ClusterNode(host, port)) 

654 elif host is not None and port is not None: 

655 startup_nodes.append(ClusterNode(host, port)) 

656 elif len(startup_nodes) == 0: 

657 # No startup node was provided 

658 raise RedisClusterException( 

659 "RedisCluster requires at least one node to discover the " 

660 "cluster. Please provide one of the followings:\n" 

661 "1. host and port, for example:\n" 

662 " RedisCluster(host='localhost', port=6379)\n" 

663 "2. list of startup nodes, for example:\n" 

664 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

665 " ClusterNode('localhost', 6378)])" 

666 ) 

667 # Update the connection arguments 

668 # Whenever a new connection is established, RedisCluster's on_connect 

669 # method should be run 

670 # If the user passed on_connect function we'll save it and run it 

671 # inside the RedisCluster.on_connect() function 

672 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

673 kwargs.update({"redis_connect_func": self.on_connect}) 

674 kwargs = cleanup_kwargs(**kwargs) 

675 if retry: 

676 self.retry = retry 

677 else: 

678 self.retry = Retry( 

679 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

680 retries=cluster_error_retry_attempts, 

681 ) 

682 

683 self.encoder = Encoder( 

684 kwargs.get("encoding", "utf-8"), 

685 kwargs.get("encoding_errors", "strict"), 

686 kwargs.get("decode_responses", False), 

687 ) 

688 protocol = kwargs.get("protocol", None) 

689 if (cache_config or cache) and protocol not in [3, "3"]: 

690 raise RedisError("Client caching is only supported with RESP version 3") 

691 

692 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

693 self.node_flags = self.__class__.NODE_FLAGS.copy() 

694 self.read_from_replicas = read_from_replicas 

695 self.load_balancing_strategy = load_balancing_strategy 

696 self.reinitialize_counter = 0 

697 self.reinitialize_steps = reinitialize_steps 

698 if event_dispatcher is None: 

699 self._event_dispatcher = EventDispatcher() 

700 else: 

701 self._event_dispatcher = event_dispatcher 

702 self.startup_nodes = startup_nodes 

703 self.nodes_manager = NodesManager( 

704 startup_nodes=startup_nodes, 

705 from_url=from_url, 

706 require_full_coverage=require_full_coverage, 

707 dynamic_startup_nodes=dynamic_startup_nodes, 

708 address_remap=address_remap, 

709 cache=cache, 

710 cache_config=cache_config, 

711 event_dispatcher=self._event_dispatcher, 

712 **kwargs, 

713 ) 

714 

715 self.cluster_response_callbacks = CaseInsensitiveDict( 

716 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

717 ) 

718 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

719 

720 # For backward compatibility, mapping from existing policies to new one 

721 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

722 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

723 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

724 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

725 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

726 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

727 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

728 } 

729 

730 self._policies_callback_mapping: dict[ 

731 Union[RequestPolicy, ResponsePolicy], Callable 

732 ] = { 

733 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

734 self.get_random_primary_or_all_nodes(command_name) 

735 ], 

736 RequestPolicy.DEFAULT_KEYED: lambda command, 

737 *args: self.get_nodes_from_slot(command, *args), 

738 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

739 RequestPolicy.ALL_SHARDS: self.get_primaries, 

740 RequestPolicy.ALL_NODES: self.get_nodes, 

741 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

742 RequestPolicy.MULTI_SHARD: lambda *args, 

743 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

744 RequestPolicy.SPECIAL: self.get_special_nodes, 

745 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

746 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

747 } 

748 

749 self._policy_resolver = policy_resolver 

750 self.commands_parser = CommandsParser(self) 

751 

752 # Node where FT.AGGREGATE command is executed. 

753 self._aggregate_nodes = None 

754 self._lock = threading.RLock() 

755 

756 def __enter__(self): 

757 return self 

758 

759 def __exit__(self, exc_type, exc_value, traceback): 

760 self.close() 

761 

762 def __del__(self): 

763 try: 

764 self.close() 

765 except Exception: 

766 pass 

767 

768 def disconnect_connection_pools(self): 

769 for node in self.get_nodes(): 

770 if node.redis_connection: 

771 try: 

772 node.redis_connection.connection_pool.disconnect() 

773 except OSError: 

774 # Client was already disconnected. do nothing 

775 pass 

776 

777 def on_connect(self, connection): 

778 """ 

779 Initialize the connection, authenticate and select a database and send 

780 READONLY if it is set during object initialization. 

781 """ 

782 connection.on_connect() 

783 

784 if self.read_from_replicas or self.load_balancing_strategy: 

785 # Sending READONLY command to server to configure connection as 

786 # readonly. Since each cluster node may change its server type due 

787 # to a failover, we should establish a READONLY connection 

788 # regardless of the server type. If this is a primary connection, 

789 # READONLY would not affect executing write commands. 

790 connection.send_command("READONLY") 

791 if str_if_bytes(connection.read_response()) != "OK": 

792 raise ConnectionError("READONLY command failed") 

793 

794 if self.user_on_connect_func is not None: 

795 self.user_on_connect_func(connection) 

796 

797 def get_redis_connection(self, node: "ClusterNode") -> Redis: 

798 if not node.redis_connection: 

799 with self._lock: 

800 if not node.redis_connection: 

801 self.nodes_manager.create_redis_connections([node]) 

802 return node.redis_connection 

803 

804 def get_node(self, host=None, port=None, node_name=None): 

805 return self.nodes_manager.get_node(host, port, node_name) 

806 

807 def get_primaries(self): 

808 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

809 

810 def get_replicas(self): 

811 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

812 

813 def get_random_node(self): 

814 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

815 

816 def get_random_primary_or_all_nodes(self, command_name): 

817 """ 

818 Returns random primary or all nodes depends on READONLY mode. 

819 """ 

820 if self.read_from_replicas and command_name in READ_COMMANDS: 

821 return self.get_random_node() 

822 

823 return self.get_random_primary_node() 

824 

825 def get_nodes(self): 

826 return list(self.nodes_manager.nodes_cache.values()) 

827 

828 def get_node_from_key(self, key, replica=False): 

829 """ 

830 Get the node that holds the key's slot. 

831 If replica set to True but the slot doesn't have any replicas, None is 

832 returned. 

833 """ 

834 slot = self.keyslot(key) 

835 slot_cache = self.nodes_manager.slots_cache.get(slot) 

836 if slot_cache is None or len(slot_cache) == 0: 

837 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

838 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

839 return None 

840 elif replica: 

841 node_idx = 1 

842 else: 

843 # primary 

844 node_idx = 0 

845 

846 return slot_cache[node_idx] 

847 

848 def get_default_node(self): 

849 """ 

850 Get the cluster's default node 

851 """ 

852 return self.nodes_manager.default_node 

853 

854 def get_nodes_from_slot(self, command: str, *args): 

855 """ 

856 Returns a list of nodes that hold the specified keys' slots. 

857 """ 

858 # get the node that holds the key's slot 

859 slot = self.determine_slot(*args) 

860 node = self.nodes_manager.get_node_from_slot( 

861 slot, 

862 self.read_from_replicas and command in READ_COMMANDS, 

863 self.load_balancing_strategy if command in READ_COMMANDS else None, 

864 ) 

865 return [node] 

866 

867 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]: 

868 """ 

869 Splits the command with Multi-Shard policy, to the multiple commands 

870 """ 

871 keys = self._get_command_keys(*args) 

872 commands = [] 

873 

874 for key in keys: 

875 commands.append( 

876 { 

877 "args": (args[0], key), 

878 "kwargs": kwargs, 

879 } 

880 ) 

881 

882 return commands 

883 

884 def get_special_nodes(self) -> Optional[list["ClusterNode"]]: 

885 """ 

886 Returns a list of nodes for commands with a special policy. 

887 """ 

888 if not self._aggregate_nodes: 

889 raise RedisClusterException( 

890 "Cannot execute FT.CURSOR commands without FT.AGGREGATE" 

891 ) 

892 

893 return self._aggregate_nodes 

894 

895 def get_random_primary_node(self) -> "ClusterNode": 

896 """ 

897 Returns a random primary node 

898 """ 

899 return random.choice(self.get_primaries()) 

900 

901 def _evaluate_all_succeeded(self, res): 

902 """ 

903 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED 

904 """ 

905 first_successful_response = None 

906 

907 if isinstance(res, dict): 

908 for key, value in res.items(): 

909 if value: 

910 if first_successful_response is None: 

911 first_successful_response = {key: value} 

912 else: 

913 return {key: False} 

914 else: 

915 for response in res: 

916 if response: 

917 if first_successful_response is None: 

918 # Dynamically resolve type 

919 first_successful_response = type(response)(response) 

920 else: 

921 return type(response)(False) 

922 

923 return first_successful_response 

924 

925 def set_default_node(self, node): 

926 """ 

927 Set the default node of the cluster. 

928 :param node: 'ClusterNode' 

929 :return True if the default node was set, else False 

930 """ 

931 if node is None or self.get_node(node_name=node.name) is None: 

932 return False 

933 self.nodes_manager.default_node = node 

934 return True 

935 

936 def set_retry(self, retry: Retry) -> None: 

937 self.retry = retry 

938 

939 def monitor(self, target_node=None): 

940 """ 

941 Returns a Monitor object for the specified target node. 

942 The default cluster node will be selected if no target node was 

943 specified. 

944 Monitor is useful for handling the MONITOR command to the redis server. 

945 next_command() method returns one command from monitor 

946 listen() method yields commands from monitor. 

947 """ 

948 if target_node is None: 

949 target_node = self.get_default_node() 

950 if target_node.redis_connection is None: 

951 raise RedisClusterException( 

952 f"Cluster Node {target_node.name} has no redis_connection" 

953 ) 

954 return target_node.redis_connection.monitor() 

955 

956 def pubsub(self, node=None, host=None, port=None, **kwargs): 

957 """ 

958 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

959 connected to the specified node 

960 """ 

961 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

962 

963 def pipeline(self, transaction=None, shard_hint=None): 

964 """ 

965 Cluster impl: 

966 Pipelines do not work in cluster mode the same way they 

967 do in normal mode. Create a clone of this object so 

968 that simulating pipelines will work correctly. Each 

969 command will be called directly when used and 

970 when calling execute() will only return the result stack. 

971 """ 

972 if shard_hint: 

973 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

974 

975 return ClusterPipeline( 

976 nodes_manager=self.nodes_manager, 

977 commands_parser=self.commands_parser, 

978 startup_nodes=self.nodes_manager.startup_nodes, 

979 result_callbacks=self.result_callbacks, 

980 cluster_response_callbacks=self.cluster_response_callbacks, 

981 read_from_replicas=self.read_from_replicas, 

982 load_balancing_strategy=self.load_balancing_strategy, 

983 reinitialize_steps=self.reinitialize_steps, 

984 retry=self.retry, 

985 lock=self._lock, 

986 transaction=transaction, 

987 ) 

988 

989 def lock( 

990 self, 

991 name, 

992 timeout=None, 

993 sleep=0.1, 

994 blocking=True, 

995 blocking_timeout=None, 

996 lock_class=None, 

997 thread_local=True, 

998 raise_on_release_error: bool = True, 

999 ): 

1000 """ 

1001 Return a new Lock object using key ``name`` that mimics 

1002 the behavior of threading.Lock. 

1003 

1004 If specified, ``timeout`` indicates a maximum life for the lock. 

1005 By default, it will remain locked until release() is called. 

1006 

1007 ``sleep`` indicates the amount of time to sleep per loop iteration 

1008 when the lock is in blocking mode and another client is currently 

1009 holding the lock. 

1010 

1011 ``blocking`` indicates whether calling ``acquire`` should block until 

1012 the lock has been acquired or to fail immediately, causing ``acquire`` 

1013 to return False and the lock not being acquired. Defaults to True. 

1014 Note this value can be overridden by passing a ``blocking`` 

1015 argument to ``acquire``. 

1016 

1017 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

1018 spend trying to acquire the lock. A value of ``None`` indicates 

1019 continue trying forever. ``blocking_timeout`` can be specified as a 

1020 float or integer, both representing the number of seconds to wait. 

1021 

1022 ``lock_class`` forces the specified lock implementation. Note that as 

1023 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

1024 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

1025 you have created your own custom lock class. 

1026 

1027 ``thread_local`` indicates whether the lock token is placed in 

1028 thread-local storage. By default, the token is placed in thread local 

1029 storage so that a thread only sees its token, not a token set by 

1030 another thread. Consider the following timeline: 

1031 

1032 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

1033 thread-1 sets the token to "abc" 

1034 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

1035 Lock instance. 

1036 time: 5, thread-1 has not yet completed. redis expires the lock 

1037 key. 

1038 time: 5, thread-2 acquired `my-lock` now that it's available. 

1039 thread-2 sets the token to "xyz" 

1040 time: 6, thread-1 finishes its work and calls release(). if the 

1041 token is *not* stored in thread local storage, then 

1042 thread-1 would see the token value as "xyz" and would be 

1043 able to successfully release the thread-2's lock. 

1044 

1045 ``raise_on_release_error`` indicates whether to raise an exception when 

1046 the lock is no longer owned when exiting the context manager. By default, 

1047 this is True, meaning an exception will be raised. If False, the warning 

1048 will be logged and the exception will be suppressed. 

1049 

1050 In some use cases it's necessary to disable thread local storage. For 

1051 example, if you have code where one thread acquires a lock and passes 

1052 that lock instance to a worker thread to release later. If thread 

1053 local storage isn't disabled in this case, the worker thread won't see 

1054 the token set by the thread that acquired the lock. Our assumption 

1055 is that these cases aren't common and as such default to using 

1056 thread local storage.""" 

1057 if lock_class is None: 

1058 lock_class = Lock 

1059 return lock_class( 

1060 self, 

1061 name, 

1062 timeout=timeout, 

1063 sleep=sleep, 

1064 blocking=blocking, 

1065 blocking_timeout=blocking_timeout, 

1066 thread_local=thread_local, 

1067 raise_on_release_error=raise_on_release_error, 

1068 ) 

1069 

1070 def set_response_callback(self, command, callback): 

1071 """Set a custom Response Callback""" 

1072 self.cluster_response_callbacks[command] = callback 

1073 

1074 def _determine_nodes( 

1075 self, *args, request_policy: RequestPolicy, **kwargs 

1076 ) -> List["ClusterNode"]: 

1077 """ 

1078 Determines a nodes the command should be executed on. 

1079 """ 

1080 command = args[0].upper() 

1081 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1082 command = f"{args[0]} {args[1]}".upper() 

1083 

1084 nodes_flag = kwargs.pop("nodes_flag", None) 

1085 if nodes_flag is not None: 

1086 # nodes flag passed by the user 

1087 command_flag = nodes_flag 

1088 else: 

1089 # get the nodes group for this command if it was predefined 

1090 command_flag = self.command_flags.get(command) 

1091 

1092 if command_flag in self._command_flags_mapping: 

1093 request_policy = self._command_flags_mapping[command_flag] 

1094 

1095 policy_callback = self._policies_callback_mapping[request_policy] 

1096 

1097 if request_policy == RequestPolicy.DEFAULT_KEYED: 

1098 nodes = policy_callback(command, *args) 

1099 elif request_policy == RequestPolicy.MULTI_SHARD: 

1100 nodes = policy_callback(*args, **kwargs) 

1101 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

1102 nodes = policy_callback(args[0]) 

1103 else: 

1104 nodes = policy_callback() 

1105 

1106 if args[0].lower() == "ft.aggregate": 

1107 self._aggregate_nodes = nodes 

1108 

1109 return nodes 

1110 

1111 def _should_reinitialized(self): 

1112 # To reinitialize the cluster on every MOVED error, 

1113 # set reinitialize_steps to 1. 

1114 # To avoid reinitializing the cluster on moved errors, set 

1115 # reinitialize_steps to 0. 

1116 if self.reinitialize_steps == 0: 

1117 return False 

1118 else: 

1119 return self.reinitialize_counter % self.reinitialize_steps == 0 

1120 

1121 def keyslot(self, key): 

1122 """ 

1123 Calculate keyslot for a given key. 

1124 See Keys distribution model in https://redis.io/topics/cluster-spec 

1125 """ 

1126 k = self.encoder.encode(key) 

1127 return key_slot(k) 

1128 

1129 def _get_command_keys(self, *args): 

1130 """ 

1131 Get the keys in the command. If the command has no keys in in, None is 

1132 returned. 

1133 

1134 NOTE: Due to a bug in redis<7.0, this function does not work properly 

1135 for EVAL or EVALSHA when the `numkeys` arg is 0. 

1136 - issue: https://github.com/redis/redis/issues/9493 

1137 - fix: https://github.com/redis/redis/pull/9733 

1138 

1139 So, don't use this function with EVAL or EVALSHA. 

1140 """ 

1141 redis_conn = self.get_default_node().redis_connection 

1142 return self.commands_parser.get_keys(redis_conn, *args) 

1143 

1144 def determine_slot(self, *args) -> int: 

1145 """ 

1146 Figure out what slot to use based on args. 

1147 

1148 Raises a RedisClusterException if there's a missing key and we can't 

1149 determine what slots to map the command to; or, if the keys don't 

1150 all map to the same key slot. 

1151 """ 

1152 command = args[0] 

1153 if self.command_flags.get(command) == SLOT_ID: 

1154 # The command contains the slot ID 

1155 return args[1] 

1156 

1157 # Get the keys in the command 

1158 

1159 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

1160 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

1161 # where `self._get_command_keys()` fails anyway. So, we special case 

1162 # EVAL/EVALSHA. 

1163 if command.upper() in ("EVAL", "EVALSHA"): 

1164 # command syntax: EVAL "script body" num_keys ... 

1165 if len(args) <= 2: 

1166 raise RedisClusterException(f"Invalid args in command: {args}") 

1167 num_actual_keys = int(args[2]) 

1168 eval_keys = args[3 : 3 + num_actual_keys] 

1169 # if there are 0 keys, that means the script can be run on any node 

1170 # so we can just return a random slot 

1171 if len(eval_keys) == 0: 

1172 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1173 keys = eval_keys 

1174 else: 

1175 keys = self._get_command_keys(*args) 

1176 if keys is None or len(keys) == 0: 

1177 # FCALL can call a function with 0 keys, that means the function 

1178 # can be run on any node so we can just return a random slot 

1179 if command.upper() in ("FCALL", "FCALL_RO"): 

1180 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1181 raise RedisClusterException( 

1182 "No way to dispatch this command to Redis Cluster. " 

1183 "Missing key.\nYou can execute the command by specifying " 

1184 f"target nodes.\nCommand: {args}" 

1185 ) 

1186 

1187 # single key command 

1188 if len(keys) == 1: 

1189 return self.keyslot(keys[0]) 

1190 

1191 # multi-key command; we need to make sure all keys are mapped to 

1192 # the same slot 

1193 slots = {self.keyslot(key) for key in keys} 

1194 if len(slots) != 1: 

1195 raise RedisClusterException( 

1196 f"{command} - all keys must map to the same key slot" 

1197 ) 

1198 

1199 return slots.pop() 

1200 

1201 def get_encoder(self): 

1202 """ 

1203 Get the connections' encoder 

1204 """ 

1205 return self.encoder 

1206 

1207 def get_connection_kwargs(self): 

1208 """ 

1209 Get the connections' key-word arguments 

1210 """ 

1211 return self.nodes_manager.connection_kwargs 

1212 

1213 def _is_nodes_flag(self, target_nodes): 

1214 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1215 

1216 def _parse_target_nodes(self, target_nodes): 

1217 if isinstance(target_nodes, list): 

1218 nodes = target_nodes 

1219 elif isinstance(target_nodes, ClusterNode): 

1220 # Supports passing a single ClusterNode as a variable 

1221 nodes = [target_nodes] 

1222 elif isinstance(target_nodes, dict): 

1223 # Supports dictionaries of the format {node_name: node}. 

1224 # It enables to execute commands with multi nodes as follows: 

1225 # rc.cluster_save_config(rc.get_primaries()) 

1226 nodes = target_nodes.values() 

1227 else: 

1228 raise TypeError( 

1229 "target_nodes type can be one of the following: " 

1230 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1231 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1232 f"The passed type is {type(target_nodes)}" 

1233 ) 

1234 return nodes 

1235 

1236 def execute_command(self, *args, **kwargs): 

1237 return self._internal_execute_command(*args, **kwargs) 

1238 

1239 def _internal_execute_command(self, *args, **kwargs): 

1240 """ 

1241 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1242 

1243 It will try the number of times specified by the retries property from 

1244 config option "self.retry" which defaults to 3 unless manually 

1245 configured. 

1246 

1247 If it reaches the number of times, the command will raise the exception 

1248 

1249 Key argument :target_nodes: can be passed with the following types: 

1250 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1251 ClusterNode 

1252 list<ClusterNode> 

1253 dict<Any, ClusterNode> 

1254 """ 

1255 target_nodes_specified = False 

1256 is_default_node = False 

1257 target_nodes = None 

1258 passed_targets = kwargs.pop("target_nodes", None) 

1259 command_policies = self._policy_resolver.resolve(args[0].lower()) 

1260 

1261 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1262 target_nodes = self._parse_target_nodes(passed_targets) 

1263 target_nodes_specified = True 

1264 

1265 if not command_policies and not target_nodes_specified: 

1266 command = args[0].upper() 

1267 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1268 command = f"{args[0]} {args[1]}".upper() 

1269 

1270 # We only could resolve key properties if command is not 

1271 # in a list of pre-defined request policies 

1272 command_flag = self.command_flags.get(command) 

1273 if not command_flag: 

1274 # Fallback to default policy 

1275 if not self.get_default_node(): 

1276 slot = None 

1277 else: 

1278 slot = self.determine_slot(*args) 

1279 if not slot: 

1280 command_policies = CommandPolicies() 

1281 else: 

1282 command_policies = CommandPolicies( 

1283 request_policy=RequestPolicy.DEFAULT_KEYED, 

1284 response_policy=ResponsePolicy.DEFAULT_KEYED, 

1285 ) 

1286 else: 

1287 if command_flag in self._command_flags_mapping: 

1288 command_policies = CommandPolicies( 

1289 request_policy=self._command_flags_mapping[command_flag] 

1290 ) 

1291 else: 

1292 command_policies = CommandPolicies() 

1293 elif not command_policies and target_nodes_specified: 

1294 command_policies = CommandPolicies() 

1295 

1296 # If an error that allows retrying was thrown, the nodes and slots 

1297 # cache were reinitialized. We will retry executing the command with 

1298 # the updated cluster setup only when the target nodes can be 

1299 # determined again with the new cache tables. Therefore, when target 

1300 # nodes were passed to this function, we cannot retry the command 

1301 # execution since the nodes may not be valid anymore after the tables 

1302 # were reinitialized. So in case of passed target nodes, 

1303 # retry_attempts will be set to 0. 

1304 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries() 

1305 # Add one for the first execution 

1306 execute_attempts = 1 + retry_attempts 

1307 for _ in range(execute_attempts): 

1308 try: 

1309 res = {} 

1310 if not target_nodes_specified: 

1311 # Determine the nodes to execute the command on 

1312 target_nodes = self._determine_nodes( 

1313 *args, 

1314 request_policy=command_policies.request_policy, 

1315 nodes_flag=passed_targets, 

1316 ) 

1317 if not target_nodes: 

1318 raise RedisClusterException( 

1319 f"No targets were found to execute {args} command on" 

1320 ) 

1321 if ( 

1322 len(target_nodes) == 1 

1323 and target_nodes[0] == self.get_default_node() 

1324 ): 

1325 is_default_node = True 

1326 for node in target_nodes: 

1327 res[node.name] = self._execute_command(node, *args, **kwargs) 

1328 

1329 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED: 

1330 break 

1331 

1332 # Return the processed result 

1333 return self._process_result( 

1334 args[0], 

1335 res, 

1336 response_policy=command_policies.response_policy, 

1337 **kwargs, 

1338 ) 

1339 except Exception as e: 

1340 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1341 if is_default_node: 

1342 # Replace the default cluster node 

1343 self.replace_default_node() 

1344 # The nodes and slots cache were reinitialized. 

1345 # Try again with the new cluster setup. 

1346 retry_attempts -= 1 

1347 continue 

1348 else: 

1349 # raise the exception 

1350 raise e 

1351 

1352 def _execute_command(self, target_node, *args, **kwargs): 

1353 """ 

1354 Send a command to a node in the cluster 

1355 """ 

1356 command = args[0] 

1357 redis_node = None 

1358 connection = None 

1359 redirect_addr = None 

1360 asking = False 

1361 moved = False 

1362 ttl = int(self.RedisClusterRequestTTL) 

1363 

1364 while ttl > 0: 

1365 ttl -= 1 

1366 try: 

1367 if asking: 

1368 target_node = self.get_node(node_name=redirect_addr) 

1369 elif moved: 

1370 # MOVED occurred and the slots cache was updated, 

1371 # refresh the target node 

1372 slot = self.determine_slot(*args) 

1373 target_node = self.nodes_manager.get_node_from_slot( 

1374 slot, 

1375 self.read_from_replicas and command in READ_COMMANDS, 

1376 self.load_balancing_strategy 

1377 if command in READ_COMMANDS 

1378 else None, 

1379 ) 

1380 moved = False 

1381 

1382 redis_node = self.get_redis_connection(target_node) 

1383 connection = get_connection(redis_node) 

1384 if asking: 

1385 connection.send_command("ASKING") 

1386 redis_node.parse_response(connection, "ASKING", **kwargs) 

1387 asking = False 

1388 connection.send_command(*args, **kwargs) 

1389 response = redis_node.parse_response(connection, command, **kwargs) 

1390 

1391 # Remove keys entry, it needs only for cache. 

1392 kwargs.pop("keys", None) 

1393 

1394 if command in self.cluster_response_callbacks: 

1395 response = self.cluster_response_callbacks[command]( 

1396 response, **kwargs 

1397 ) 

1398 return response 

1399 except AuthenticationError: 

1400 raise 

1401 except MaxConnectionsError: 

1402 # MaxConnectionsError indicates client-side resource exhaustion 

1403 # (too many connections in the pool), not a node failure. 

1404 # Don't treat this as a node failure - just re-raise the error 

1405 # without reinitializing the cluster. 

1406 raise 

1407 except (ConnectionError, TimeoutError) as e: 

1408 # ConnectionError can also be raised if we couldn't get a 

1409 # connection from the pool before timing out, so check that 

1410 # this is an actual connection before attempting to disconnect. 

1411 if connection is not None: 

1412 connection.disconnect() 

1413 

1414 # Remove the failed node from the startup nodes before we try 

1415 # to reinitialize the cluster 

1416 self.nodes_manager.startup_nodes.pop(target_node.name, None) 

1417 # Reset the cluster node's connection 

1418 target_node.redis_connection = None 

1419 self.nodes_manager.initialize() 

1420 raise e 

1421 except MovedError as e: 

1422 # First, we will try to patch the slots/nodes cache with the 

1423 # redirected node output and try again. If MovedError exceeds 

1424 # 'reinitialize_steps' number of times, we will force 

1425 # reinitializing the tables, and then try again. 

1426 # 'reinitialize_steps' counter will increase faster when 

1427 # the same client object is shared between multiple threads. To 

1428 # reduce the frequency you can set this variable in the 

1429 # RedisCluster constructor. 

1430 self.reinitialize_counter += 1 

1431 if self._should_reinitialized(): 

1432 self.nodes_manager.initialize() 

1433 # Reset the counter 

1434 self.reinitialize_counter = 0 

1435 else: 

1436 self.nodes_manager.update_moved_exception(e) 

1437 moved = True 

1438 except TryAgainError: 

1439 if ttl < self.RedisClusterRequestTTL / 2: 

1440 time.sleep(0.05) 

1441 except AskError as e: 

1442 redirect_addr = get_node_name(host=e.host, port=e.port) 

1443 asking = True 

1444 except (ClusterDownError, SlotNotCoveredError): 

1445 # ClusterDownError can occur during a failover and to get 

1446 # self-healed, we will try to reinitialize the cluster layout 

1447 # and retry executing the command 

1448 

1449 # SlotNotCoveredError can occur when the cluster is not fully 

1450 # initialized or can be temporary issue. 

1451 # We will try to reinitialize the cluster topology 

1452 # and retry executing the command 

1453 

1454 time.sleep(0.25) 

1455 self.nodes_manager.initialize() 

1456 raise 

1457 except ResponseError: 

1458 raise 

1459 except Exception as e: 

1460 if connection: 

1461 connection.disconnect() 

1462 raise e 

1463 finally: 

1464 if connection is not None: 

1465 redis_node.connection_pool.release(connection) 

1466 

1467 raise ClusterError("TTL exhausted.") 

1468 

1469 def close(self) -> None: 

1470 try: 

1471 with self._lock: 

1472 if self.nodes_manager: 

1473 self.nodes_manager.close() 

1474 except AttributeError: 

1475 # RedisCluster's __init__ can fail before nodes_manager is set 

1476 pass 

1477 

1478 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs): 

1479 """ 

1480 Process the result of the executed command. 

1481 The function would return a dict or a single value. 

1482 

1483 :type command: str 

1484 :type res: dict 

1485 

1486 `res` should be in the following format: 

1487 Dict<node_name, command_result> 

1488 """ 

1489 if command in self.result_callbacks: 

1490 res = self.result_callbacks[command](command, res, **kwargs) 

1491 elif len(res) == 1: 

1492 # When we execute the command on a single node, we can 

1493 # remove the dictionary and return a single response 

1494 res = list(res.values())[0] 

1495 

1496 return self._policies_callback_mapping[response_policy](res) 

1497 

1498 def load_external_module(self, funcname, func): 

1499 """ 

1500 This function can be used to add externally defined redis modules, 

1501 and their namespaces to the redis client. 

1502 

1503 ``funcname`` - A string containing the name of the function to create 

1504 ``func`` - The function, being added to this class. 

1505 """ 

1506 setattr(self, funcname, func) 

1507 

1508 def transaction(self, func, *watches, **kwargs): 

1509 """ 

1510 Convenience method for executing the callable `func` as a transaction 

1511 while watching all keys specified in `watches`. The 'func' callable 

1512 should expect a single argument which is a Pipeline object. 

1513 """ 

1514 shard_hint = kwargs.pop("shard_hint", None) 

1515 value_from_callable = kwargs.pop("value_from_callable", False) 

1516 watch_delay = kwargs.pop("watch_delay", None) 

1517 with self.pipeline(True, shard_hint) as pipe: 

1518 while True: 

1519 try: 

1520 if watches: 

1521 pipe.watch(*watches) 

1522 func_value = func(pipe) 

1523 exec_value = pipe.execute() 

1524 return func_value if value_from_callable else exec_value 

1525 except WatchError: 

1526 if watch_delay is not None and watch_delay > 0: 

1527 time.sleep(watch_delay) 

1528 continue 

1529 

1530 

1531class ClusterNode: 

1532 def __init__(self, host, port, server_type=None, redis_connection=None): 

1533 if host == "localhost": 

1534 host = socket.gethostbyname(host) 

1535 

1536 self.host = host 

1537 self.port = port 

1538 self.name = get_node_name(host, port) 

1539 self.server_type = server_type 

1540 self.redis_connection = redis_connection 

1541 

1542 def __repr__(self): 

1543 return ( 

1544 f"[host={self.host}," 

1545 f"port={self.port}," 

1546 f"name={self.name}," 

1547 f"server_type={self.server_type}," 

1548 f"redis_connection={self.redis_connection}]" 

1549 ) 

1550 

1551 def __eq__(self, obj): 

1552 return isinstance(obj, ClusterNode) and obj.name == self.name 

1553 

1554 def __del__(self): 

1555 try: 

1556 if self.redis_connection is not None: 

1557 self.redis_connection.close() 

1558 except Exception: 

1559 # Ignore errors when closing the connection 

1560 pass 

1561 

1562 

1563class LoadBalancingStrategy(Enum): 

1564 ROUND_ROBIN = "round_robin" 

1565 ROUND_ROBIN_REPLICAS = "round_robin_replicas" 

1566 RANDOM_REPLICA = "random_replica" 

1567 

1568 

1569class LoadBalancer: 

1570 """ 

1571 Round-Robin Load Balancing 

1572 """ 

1573 

1574 def __init__(self, start_index: int = 0) -> None: 

1575 self.primary_to_idx = {} 

1576 self.start_index = start_index 

1577 

1578 def get_server_index( 

1579 self, 

1580 primary: str, 

1581 list_size: int, 

1582 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN, 

1583 ) -> int: 

1584 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA: 

1585 return self._get_random_replica_index(list_size) 

1586 else: 

1587 return self._get_round_robin_index( 

1588 primary, 

1589 list_size, 

1590 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, 

1591 ) 

1592 

1593 def reset(self) -> None: 

1594 self.primary_to_idx.clear() 

1595 

1596 def _get_random_replica_index(self, list_size: int) -> int: 

1597 return random.randint(1, list_size - 1) 

1598 

1599 def _get_round_robin_index( 

1600 self, primary: str, list_size: int, replicas_only: bool 

1601 ) -> int: 

1602 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

1603 if replicas_only and server_index == 0: 

1604 # skip the primary node index 

1605 server_index = 1 

1606 # Update the index for the next round 

1607 self.primary_to_idx[primary] = (server_index + 1) % list_size 

1608 return server_index 

1609 

1610 

1611class NodesManager: 

1612 def __init__( 

1613 self, 

1614 startup_nodes, 

1615 from_url=False, 

1616 require_full_coverage=False, 

1617 lock=None, 

1618 dynamic_startup_nodes=True, 

1619 connection_pool_class=ConnectionPool, 

1620 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

1621 cache: Optional[CacheInterface] = None, 

1622 cache_config: Optional[CacheConfig] = None, 

1623 cache_factory: Optional[CacheFactoryInterface] = None, 

1624 event_dispatcher: Optional[EventDispatcher] = None, 

1625 **kwargs, 

1626 ): 

1627 self.nodes_cache: Dict[str, Redis] = {} 

1628 self.slots_cache = {} 

1629 self.startup_nodes = {} 

1630 self.default_node = None 

1631 self.populate_startup_nodes(startup_nodes) 

1632 self.from_url = from_url 

1633 self._require_full_coverage = require_full_coverage 

1634 self._dynamic_startup_nodes = dynamic_startup_nodes 

1635 self.connection_pool_class = connection_pool_class 

1636 self.address_remap = address_remap 

1637 self._cache = cache 

1638 self._cache_config = cache_config 

1639 self._cache_factory = cache_factory 

1640 self._moved_exception = None 

1641 self.connection_kwargs = kwargs 

1642 self.read_load_balancer = LoadBalancer() 

1643 if lock is None: 

1644 lock = threading.RLock() 

1645 self._lock = lock 

1646 if event_dispatcher is None: 

1647 self._event_dispatcher = EventDispatcher() 

1648 else: 

1649 self._event_dispatcher = event_dispatcher 

1650 self._credential_provider = self.connection_kwargs.get( 

1651 "credential_provider", None 

1652 ) 

1653 self.initialize() 

1654 

1655 def get_node(self, host=None, port=None, node_name=None): 

1656 """ 

1657 Get the requested node from the cluster's nodes. 

1658 nodes. 

1659 :return: ClusterNode if the node exists, else None 

1660 """ 

1661 if host and port: 

1662 # the user passed host and port 

1663 if host == "localhost": 

1664 host = socket.gethostbyname(host) 

1665 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1666 elif node_name: 

1667 return self.nodes_cache.get(node_name) 

1668 else: 

1669 return None 

1670 

1671 def update_moved_exception(self, exception): 

1672 self._moved_exception = exception 

1673 

1674 def _update_moved_slots(self): 

1675 """ 

1676 Update the slot's node with the redirected one 

1677 """ 

1678 e = self._moved_exception 

1679 redirected_node = self.get_node(host=e.host, port=e.port) 

1680 if redirected_node is not None: 

1681 # The node already exists 

1682 if redirected_node.server_type is not PRIMARY: 

1683 # Update the node's server type 

1684 redirected_node.server_type = PRIMARY 

1685 else: 

1686 # This is a new node, we will add it to the nodes cache 

1687 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

1688 self.nodes_cache[redirected_node.name] = redirected_node 

1689 if redirected_node in self.slots_cache[e.slot_id]: 

1690 # The MOVED error resulted from a failover, and the new slot owner 

1691 # had previously been a replica. 

1692 old_primary = self.slots_cache[e.slot_id][0] 

1693 # Update the old primary to be a replica and add it to the end of 

1694 # the slot's node list 

1695 old_primary.server_type = REPLICA 

1696 self.slots_cache[e.slot_id].append(old_primary) 

1697 # Remove the old replica, which is now a primary, from the slot's 

1698 # node list 

1699 self.slots_cache[e.slot_id].remove(redirected_node) 

1700 # Override the old primary with the new one 

1701 self.slots_cache[e.slot_id][0] = redirected_node 

1702 if self.default_node == old_primary: 

1703 # Update the default node with the new primary 

1704 self.default_node = redirected_node 

1705 else: 

1706 # The new slot owner is a new server, or a server from a different 

1707 # shard. We need to remove all current nodes from the slot's list 

1708 # (including replications) and add just the new node. 

1709 self.slots_cache[e.slot_id] = [redirected_node] 

1710 # Reset moved_exception 

1711 self._moved_exception = None 

1712 

1713 @deprecated_args( 

1714 args_to_warn=["server_type"], 

1715 reason=( 

1716 "In case you need select some load balancing strategy " 

1717 "that will use replicas, please set it through 'load_balancing_strategy'" 

1718 ), 

1719 version="5.3.0", 

1720 ) 

1721 def get_node_from_slot( 

1722 self, 

1723 slot, 

1724 read_from_replicas=False, 

1725 load_balancing_strategy=None, 

1726 server_type=None, 

1727 ) -> ClusterNode: 

1728 """ 

1729 Gets a node that servers this hash slot 

1730 """ 

1731 if self._moved_exception: 

1732 with self._lock: 

1733 if self._moved_exception: 

1734 self._update_moved_slots() 

1735 

1736 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

1737 raise SlotNotCoveredError( 

1738 f'Slot "{slot}" not covered by the cluster. ' 

1739 f'"require_full_coverage={self._require_full_coverage}"' 

1740 ) 

1741 

1742 if read_from_replicas is True and load_balancing_strategy is None: 

1743 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN 

1744 

1745 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: 

1746 # get the server index using the strategy defined in load_balancing_strategy 

1747 primary_name = self.slots_cache[slot][0].name 

1748 node_idx = self.read_load_balancer.get_server_index( 

1749 primary_name, len(self.slots_cache[slot]), load_balancing_strategy 

1750 ) 

1751 elif ( 

1752 server_type is None 

1753 or server_type == PRIMARY 

1754 or len(self.slots_cache[slot]) == 1 

1755 ): 

1756 # return a primary 

1757 node_idx = 0 

1758 else: 

1759 # return a replica 

1760 # randomly choose one of the replicas 

1761 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

1762 

1763 return self.slots_cache[slot][node_idx] 

1764 

1765 def get_nodes_by_server_type(self, server_type): 

1766 """ 

1767 Get all nodes with the specified server type 

1768 :param server_type: 'primary' or 'replica' 

1769 :return: list of ClusterNode 

1770 """ 

1771 return [ 

1772 node 

1773 for node in self.nodes_cache.values() 

1774 if node.server_type == server_type 

1775 ] 

1776 

1777 def populate_startup_nodes(self, nodes): 

1778 """ 

1779 Populate all startup nodes and filters out any duplicates 

1780 """ 

1781 for n in nodes: 

1782 self.startup_nodes[n.name] = n 

1783 

1784 def check_slots_coverage(self, slots_cache): 

1785 # Validate if all slots are covered or if we should try next 

1786 # startup node 

1787 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

1788 if i not in slots_cache: 

1789 return False 

1790 return True 

1791 

1792 def create_redis_connections(self, nodes): 

1793 """ 

1794 This function will create a redis connection to all nodes in :nodes: 

1795 """ 

1796 connection_pools = [] 

1797 for node in nodes: 

1798 if node.redis_connection is None: 

1799 node.redis_connection = self.create_redis_node( 

1800 host=node.host, port=node.port, **self.connection_kwargs 

1801 ) 

1802 connection_pools.append(node.redis_connection.connection_pool) 

1803 

1804 self._event_dispatcher.dispatch( 

1805 AfterPooledConnectionsInstantiationEvent( 

1806 connection_pools, ClientType.SYNC, self._credential_provider 

1807 ) 

1808 ) 

1809 

1810 def create_redis_node(self, host, port, **kwargs): 

1811 # We are configuring the connection pool not to retry 

1812 # connections on lower level clients to avoid retrying 

1813 # connections to nodes that are not reachable 

1814 # and to avoid blocking the connection pool. 

1815 # The only error that will have some handling in the lower 

1816 # level clients is ConnectionError which will trigger disconnection 

1817 # of the socket. 

1818 # The retries will be handled on cluster client level 

1819 # where we will have proper handling of the cluster topology 

1820 node_retry_config = Retry( 

1821 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) 

1822 ) 

1823 

1824 protocol = kwargs.get("protocol", None) 

1825 if protocol in [3, "3"]: 

1826 kwargs.update( 

1827 {"maint_notifications_config": MaintNotificationsConfig(enabled=False)} 

1828 ) 

1829 if self.from_url: 

1830 # Create a redis node with a costumed connection pool 

1831 kwargs.update({"host": host}) 

1832 kwargs.update({"port": port}) 

1833 kwargs.update({"cache": self._cache}) 

1834 kwargs.update({"retry": node_retry_config}) 

1835 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

1836 else: 

1837 r = Redis( 

1838 host=host, 

1839 port=port, 

1840 cache=self._cache, 

1841 retry=node_retry_config, 

1842 **kwargs, 

1843 ) 

1844 return r 

1845 

1846 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

1847 node_name = get_node_name(host, port) 

1848 # check if we already have this node in the tmp_nodes_cache 

1849 target_node = tmp_nodes_cache.get(node_name) 

1850 if target_node is None: 

1851 # before creating a new cluster node, check if the cluster node already 

1852 # exists in the current nodes cache and has a valid connection so we can 

1853 # reuse it 

1854 target_node = self.nodes_cache.get(node_name) 

1855 if target_node is None or target_node.redis_connection is None: 

1856 # create new cluster node for this cluster 

1857 target_node = ClusterNode(host, port, role) 

1858 if target_node.server_type != role: 

1859 target_node.server_type = role 

1860 # add this node to the nodes cache 

1861 tmp_nodes_cache[target_node.name] = target_node 

1862 

1863 return target_node 

1864 

1865 def initialize(self): 

1866 """ 

1867 Initializes the nodes cache, slots cache and redis connections. 

1868 :startup_nodes: 

1869 Responsible for discovering other nodes in the cluster 

1870 """ 

1871 self.reset() 

1872 tmp_nodes_cache = {} 

1873 tmp_slots = {} 

1874 disagreements = [] 

1875 startup_nodes_reachable = False 

1876 fully_covered = False 

1877 kwargs = self.connection_kwargs 

1878 exception = None 

1879 # Convert to tuple to prevent RuntimeError if self.startup_nodes 

1880 # is modified during iteration 

1881 for startup_node in tuple(self.startup_nodes.values()): 

1882 try: 

1883 if startup_node.redis_connection: 

1884 r = startup_node.redis_connection 

1885 else: 

1886 # Create a new Redis connection 

1887 r = self.create_redis_node( 

1888 startup_node.host, startup_node.port, **kwargs 

1889 ) 

1890 self.startup_nodes[startup_node.name].redis_connection = r 

1891 # Make sure cluster mode is enabled on this node 

1892 try: 

1893 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

1894 r.connection_pool.disconnect() 

1895 except ResponseError: 

1896 raise RedisClusterException( 

1897 "Cluster mode is not enabled on this node" 

1898 ) 

1899 startup_nodes_reachable = True 

1900 except Exception as e: 

1901 # Try the next startup node. 

1902 # The exception is saved and raised only if we have no more nodes. 

1903 exception = e 

1904 continue 

1905 

1906 # CLUSTER SLOTS command results in the following output: 

1907 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

1908 # where each node contains the following list: [IP, port, node_id] 

1909 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

1910 # primary node of the first slot section. 

1911 # If there's only one server in the cluster, its ``host`` is '' 

1912 # Fix it to the host in startup_nodes 

1913 if ( 

1914 len(cluster_slots) == 1 

1915 and len(cluster_slots[0][2][0]) == 0 

1916 and len(self.startup_nodes) == 1 

1917 ): 

1918 cluster_slots[0][2][0] = startup_node.host 

1919 

1920 for slot in cluster_slots: 

1921 primary_node = slot[2] 

1922 host = str_if_bytes(primary_node[0]) 

1923 if host == "": 

1924 host = startup_node.host 

1925 port = int(primary_node[1]) 

1926 host, port = self.remap_host_port(host, port) 

1927 

1928 nodes_for_slot = [] 

1929 

1930 target_node = self._get_or_create_cluster_node( 

1931 host, port, PRIMARY, tmp_nodes_cache 

1932 ) 

1933 nodes_for_slot.append(target_node) 

1934 

1935 replica_nodes = slot[3:] 

1936 for replica_node in replica_nodes: 

1937 host = str_if_bytes(replica_node[0]) 

1938 port = int(replica_node[1]) 

1939 host, port = self.remap_host_port(host, port) 

1940 target_replica_node = self._get_or_create_cluster_node( 

1941 host, port, REPLICA, tmp_nodes_cache 

1942 ) 

1943 nodes_for_slot.append(target_replica_node) 

1944 

1945 for i in range(int(slot[0]), int(slot[1]) + 1): 

1946 if i not in tmp_slots: 

1947 tmp_slots[i] = nodes_for_slot 

1948 else: 

1949 # Validate that 2 nodes want to use the same slot cache 

1950 # setup 

1951 tmp_slot = tmp_slots[i][0] 

1952 if tmp_slot.name != target_node.name: 

1953 disagreements.append( 

1954 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

1955 ) 

1956 

1957 if len(disagreements) > 5: 

1958 raise RedisClusterException( 

1959 f"startup_nodes could not agree on a valid " 

1960 f"slots cache: {', '.join(disagreements)}" 

1961 ) 

1962 

1963 fully_covered = self.check_slots_coverage(tmp_slots) 

1964 if fully_covered: 

1965 # Don't need to continue to the next startup node if all 

1966 # slots are covered 

1967 break 

1968 

1969 if not startup_nodes_reachable: 

1970 raise RedisClusterException( 

1971 f"Redis Cluster cannot be connected. Please provide at least " 

1972 f"one reachable node: {str(exception)}" 

1973 ) from exception 

1974 

1975 if self._cache is None and self._cache_config is not None: 

1976 if self._cache_factory is None: 

1977 self._cache = CacheFactory(self._cache_config).get_cache() 

1978 else: 

1979 self._cache = self._cache_factory.get_cache() 

1980 

1981 # Create Redis connections to all nodes 

1982 self.create_redis_connections(list(tmp_nodes_cache.values())) 

1983 

1984 # Check if the slots are not fully covered 

1985 if not fully_covered and self._require_full_coverage: 

1986 # Despite the requirement that the slots be covered, there 

1987 # isn't a full coverage 

1988 raise RedisClusterException( 

1989 f"All slots are not covered after query all startup_nodes. " 

1990 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

1991 f"covered..." 

1992 ) 

1993 

1994 # Set the tmp variables to the real variables 

1995 self.nodes_cache = tmp_nodes_cache 

1996 self.slots_cache = tmp_slots 

1997 # Set the default node 

1998 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

1999 if self._dynamic_startup_nodes: 

2000 # Populate the startup nodes with all discovered nodes 

2001 self.startup_nodes = tmp_nodes_cache 

2002 # If initialize was called after a MovedError, clear it 

2003 self._moved_exception = None 

2004 

2005 def close(self) -> None: 

2006 self.default_node = None 

2007 for node in self.nodes_cache.values(): 

2008 if node.redis_connection: 

2009 node.redis_connection.close() 

2010 

2011 def reset(self): 

2012 try: 

2013 self.read_load_balancer.reset() 

2014 except TypeError: 

2015 # The read_load_balancer is None, do nothing 

2016 pass 

2017 

2018 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

2019 """ 

2020 Remap the host and port returned from the cluster to a different 

2021 internal value. Useful if the client is not connecting directly 

2022 to the cluster. 

2023 """ 

2024 if self.address_remap: 

2025 return self.address_remap((host, port)) 

2026 return host, port 

2027 

2028 def find_connection_owner(self, connection: Connection) -> Optional[Redis]: 

2029 node_name = get_node_name(connection.host, connection.port) 

2030 for node in tuple(self.nodes_cache.values()): 

2031 if node.redis_connection: 

2032 conn_args = node.redis_connection.connection_pool.connection_kwargs 

2033 if node_name == get_node_name( 

2034 conn_args.get("host"), conn_args.get("port") 

2035 ): 

2036 return node 

2037 

2038 

2039class ClusterPubSub(PubSub): 

2040 """ 

2041 Wrapper for PubSub class. 

2042 

2043 IMPORTANT: before using ClusterPubSub, read about the known limitations 

2044 with pubsub in Cluster mode and learn how to workaround them: 

2045 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html 

2046 """ 

2047 

2048 def __init__( 

2049 self, 

2050 redis_cluster, 

2051 node=None, 

2052 host=None, 

2053 port=None, 

2054 push_handler_func=None, 

2055 event_dispatcher: Optional["EventDispatcher"] = None, 

2056 **kwargs, 

2057 ): 

2058 """ 

2059 When a pubsub instance is created without specifying a node, a single 

2060 node will be transparently chosen for the pubsub connection on the 

2061 first command execution. The node will be determined by: 

2062 1. Hashing the channel name in the request to find its keyslot 

2063 2. Selecting a node that handles the keyslot: If read_from_replicas is 

2064 set to true or load_balancing_strategy is set, a replica can be selected. 

2065 

2066 :type redis_cluster: RedisCluster 

2067 :type node: ClusterNode 

2068 :type host: str 

2069 :type port: int 

2070 """ 

2071 self.node = None 

2072 self.set_pubsub_node(redis_cluster, node, host, port) 

2073 connection_pool = ( 

2074 None 

2075 if self.node is None 

2076 else redis_cluster.get_redis_connection(self.node).connection_pool 

2077 ) 

2078 self.cluster = redis_cluster 

2079 self.node_pubsub_mapping = {} 

2080 self._pubsubs_generator = self._pubsubs_generator() 

2081 if event_dispatcher is None: 

2082 self._event_dispatcher = EventDispatcher() 

2083 else: 

2084 self._event_dispatcher = event_dispatcher 

2085 super().__init__( 

2086 connection_pool=connection_pool, 

2087 encoder=redis_cluster.encoder, 

2088 push_handler_func=push_handler_func, 

2089 event_dispatcher=self._event_dispatcher, 

2090 **kwargs, 

2091 ) 

2092 

2093 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

2094 """ 

2095 The pubsub node will be set according to the passed node, host and port 

2096 When none of the node, host, or port are specified - the node is set 

2097 to None and will be determined by the keyslot of the channel in the 

2098 first command to be executed. 

2099 RedisClusterException will be thrown if the passed node does not exist 

2100 in the cluster. 

2101 If host is passed without port, or vice versa, a DataError will be 

2102 thrown. 

2103 :type cluster: RedisCluster 

2104 :type node: ClusterNode 

2105 :type host: str 

2106 :type port: int 

2107 """ 

2108 if node is not None: 

2109 # node is passed by the user 

2110 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

2111 pubsub_node = node 

2112 elif host is not None and port is not None: 

2113 # host and port passed by the user 

2114 node = cluster.get_node(host=host, port=port) 

2115 self._raise_on_invalid_node(cluster, node, host, port) 

2116 pubsub_node = node 

2117 elif any([host, port]) is True: 

2118 # only 'host' or 'port' passed 

2119 raise DataError("Passing a host requires passing a port, and vice versa") 

2120 else: 

2121 # nothing passed by the user. set node to None 

2122 pubsub_node = None 

2123 

2124 self.node = pubsub_node 

2125 

2126 def get_pubsub_node(self): 

2127 """ 

2128 Get the node that is being used as the pubsub connection 

2129 """ 

2130 return self.node 

2131 

2132 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

2133 """ 

2134 Raise a RedisClusterException if the node is None or doesn't exist in 

2135 the cluster. 

2136 """ 

2137 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

2138 raise RedisClusterException( 

2139 f"Node {host}:{port} doesn't exist in the cluster" 

2140 ) 

2141 

2142 def execute_command(self, *args): 

2143 """ 

2144 Execute a subscribe/unsubscribe command. 

2145 

2146 Taken code from redis-py and tweak to make it work within a cluster. 

2147 """ 

2148 # NOTE: don't parse the response in this function -- it could pull a 

2149 # legitimate message off the stack if the connection is already 

2150 # subscribed to one or more channels 

2151 

2152 if self.connection is None: 

2153 if self.connection_pool is None: 

2154 if len(args) > 1: 

2155 # Hash the first channel and get one of the nodes holding 

2156 # this slot 

2157 channel = args[1] 

2158 slot = self.cluster.keyslot(channel) 

2159 node = self.cluster.nodes_manager.get_node_from_slot( 

2160 slot, 

2161 self.cluster.read_from_replicas, 

2162 self.cluster.load_balancing_strategy, 

2163 ) 

2164 else: 

2165 # Get a random node 

2166 node = self.cluster.get_random_node() 

2167 self.node = node 

2168 redis_connection = self.cluster.get_redis_connection(node) 

2169 self.connection_pool = redis_connection.connection_pool 

2170 self.connection = self.connection_pool.get_connection() 

2171 # register a callback that re-subscribes to any channels we 

2172 # were listening to when we were disconnected 

2173 self.connection.register_connect_callback(self.on_connect) 

2174 if self.push_handler_func is not None: 

2175 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

2176 self._event_dispatcher.dispatch( 

2177 AfterPubSubConnectionInstantiationEvent( 

2178 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

2179 ) 

2180 ) 

2181 connection = self.connection 

2182 self._execute(connection, connection.send_command, *args) 

2183 

2184 def _get_node_pubsub(self, node): 

2185 try: 

2186 return self.node_pubsub_mapping[node.name] 

2187 except KeyError: 

2188 pubsub = node.redis_connection.pubsub( 

2189 push_handler_func=self.push_handler_func 

2190 ) 

2191 self.node_pubsub_mapping[node.name] = pubsub 

2192 return pubsub 

2193 

2194 def _sharded_message_generator(self): 

2195 for _ in range(len(self.node_pubsub_mapping)): 

2196 pubsub = next(self._pubsubs_generator) 

2197 message = pubsub.get_message() 

2198 if message is not None: 

2199 return message 

2200 return None 

2201 

2202 def _pubsubs_generator(self): 

2203 while True: 

2204 yield from self.node_pubsub_mapping.values() 

2205 

2206 def get_sharded_message( 

2207 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None 

2208 ): 

2209 if target_node: 

2210 message = self.node_pubsub_mapping[target_node.name].get_message( 

2211 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout 

2212 ) 

2213 else: 

2214 message = self._sharded_message_generator() 

2215 if message is None: 

2216 return None 

2217 elif str_if_bytes(message["type"]) == "sunsubscribe": 

2218 if message["channel"] in self.pending_unsubscribe_shard_channels: 

2219 self.pending_unsubscribe_shard_channels.remove(message["channel"]) 

2220 self.shard_channels.pop(message["channel"], None) 

2221 node = self.cluster.get_node_from_key(message["channel"]) 

2222 if self.node_pubsub_mapping[node.name].subscribed is False: 

2223 self.node_pubsub_mapping.pop(node.name) 

2224 if not self.channels and not self.patterns and not self.shard_channels: 

2225 # There are no subscriptions anymore, set subscribed_event flag 

2226 # to false 

2227 self.subscribed_event.clear() 

2228 if self.ignore_subscribe_messages or ignore_subscribe_messages: 

2229 return None 

2230 return message 

2231 

2232 def ssubscribe(self, *args, **kwargs): 

2233 if args: 

2234 args = list_or_args(args[0], args[1:]) 

2235 s_channels = dict.fromkeys(args) 

2236 s_channels.update(kwargs) 

2237 for s_channel, handler in s_channels.items(): 

2238 node = self.cluster.get_node_from_key(s_channel) 

2239 pubsub = self._get_node_pubsub(node) 

2240 if handler: 

2241 pubsub.ssubscribe(**{s_channel: handler}) 

2242 else: 

2243 pubsub.ssubscribe(s_channel) 

2244 self.shard_channels.update(pubsub.shard_channels) 

2245 self.pending_unsubscribe_shard_channels.difference_update( 

2246 self._normalize_keys({s_channel: None}) 

2247 ) 

2248 if pubsub.subscribed and not self.subscribed: 

2249 self.subscribed_event.set() 

2250 self.health_check_response_counter = 0 

2251 

2252 def sunsubscribe(self, *args): 

2253 if args: 

2254 args = list_or_args(args[0], args[1:]) 

2255 else: 

2256 args = self.shard_channels 

2257 

2258 for s_channel in args: 

2259 node = self.cluster.get_node_from_key(s_channel) 

2260 p = self._get_node_pubsub(node) 

2261 p.sunsubscribe(s_channel) 

2262 self.pending_unsubscribe_shard_channels.update( 

2263 p.pending_unsubscribe_shard_channels 

2264 ) 

2265 

2266 def get_redis_connection(self): 

2267 """ 

2268 Get the Redis connection of the pubsub connected node. 

2269 """ 

2270 if self.node is not None: 

2271 return self.node.redis_connection 

2272 

2273 def disconnect(self): 

2274 """ 

2275 Disconnect the pubsub connection. 

2276 """ 

2277 if self.connection: 

2278 self.connection.disconnect() 

2279 for pubsub in self.node_pubsub_mapping.values(): 

2280 pubsub.connection.disconnect() 

2281 

2282 

2283class ClusterPipeline(RedisCluster): 

2284 """ 

2285 Support for Redis pipeline 

2286 in cluster mode 

2287 """ 

2288 

2289 ERRORS_ALLOW_RETRY = ( 

2290 ConnectionError, 

2291 TimeoutError, 

2292 MovedError, 

2293 AskError, 

2294 TryAgainError, 

2295 ) 

2296 

2297 NO_SLOTS_COMMANDS = {"UNWATCH"} 

2298 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

2299 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

2300 

2301 @deprecated_args( 

2302 args_to_warn=[ 

2303 "cluster_error_retry_attempts", 

2304 ], 

2305 reason="Please configure the 'retry' object instead", 

2306 version="6.0.0", 

2307 ) 

2308 def __init__( 

2309 self, 

2310 nodes_manager: "NodesManager", 

2311 commands_parser: "CommandsParser", 

2312 result_callbacks: Optional[Dict[str, Callable]] = None, 

2313 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

2314 startup_nodes: Optional[List["ClusterNode"]] = None, 

2315 read_from_replicas: bool = False, 

2316 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

2317 cluster_error_retry_attempts: int = 3, 

2318 reinitialize_steps: int = 5, 

2319 retry: Optional[Retry] = None, 

2320 lock=None, 

2321 transaction=False, 

2322 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

2323 **kwargs, 

2324 ): 

2325 """ """ 

2326 self.command_stack = [] 

2327 self.nodes_manager = nodes_manager 

2328 self.commands_parser = commands_parser 

2329 self.refresh_table_asap = False 

2330 self.result_callbacks = ( 

2331 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

2332 ) 

2333 self.startup_nodes = startup_nodes if startup_nodes else [] 

2334 self.read_from_replicas = read_from_replicas 

2335 self.load_balancing_strategy = load_balancing_strategy 

2336 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

2337 self.cluster_response_callbacks = cluster_response_callbacks 

2338 self.reinitialize_counter = 0 

2339 self.reinitialize_steps = reinitialize_steps 

2340 if retry is not None: 

2341 self.retry = retry 

2342 else: 

2343 self.retry = Retry( 

2344 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

2345 retries=cluster_error_retry_attempts, 

2346 ) 

2347 

2348 self.encoder = Encoder( 

2349 kwargs.get("encoding", "utf-8"), 

2350 kwargs.get("encoding_errors", "strict"), 

2351 kwargs.get("decode_responses", False), 

2352 ) 

2353 if lock is None: 

2354 lock = threading.RLock() 

2355 self._lock = lock 

2356 self.parent_execute_command = super().execute_command 

2357 self._execution_strategy: ExecutionStrategy = ( 

2358 PipelineStrategy(self) if not transaction else TransactionStrategy(self) 

2359 ) 

2360 

2361 # For backward compatibility, mapping from existing policies to new one 

2362 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

2363 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

2364 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

2365 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

2366 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

2367 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

2368 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

2369 } 

2370 

2371 self._policies_callback_mapping: dict[ 

2372 Union[RequestPolicy, ResponsePolicy], Callable 

2373 ] = { 

2374 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

2375 self.get_random_primary_or_all_nodes(command_name) 

2376 ], 

2377 RequestPolicy.DEFAULT_KEYED: lambda command, 

2378 *args: self.get_nodes_from_slot(command, *args), 

2379 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

2380 RequestPolicy.ALL_SHARDS: self.get_primaries, 

2381 RequestPolicy.ALL_NODES: self.get_nodes, 

2382 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

2383 RequestPolicy.MULTI_SHARD: lambda *args, 

2384 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

2385 RequestPolicy.SPECIAL: self.get_special_nodes, 

2386 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

2387 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

2388 } 

2389 

2390 self._policy_resolver = policy_resolver 

2391 

2392 def __repr__(self): 

2393 """ """ 

2394 return f"{type(self).__name__}" 

2395 

2396 def __enter__(self): 

2397 """ """ 

2398 return self 

2399 

2400 def __exit__(self, exc_type, exc_value, traceback): 

2401 """ """ 

2402 self.reset() 

2403 

2404 def __del__(self): 

2405 try: 

2406 self.reset() 

2407 except Exception: 

2408 pass 

2409 

2410 def __len__(self): 

2411 """ """ 

2412 return len(self._execution_strategy.command_queue) 

2413 

2414 def __bool__(self): 

2415 "Pipeline instances should always evaluate to True on Python 3+" 

2416 return True 

2417 

2418 def execute_command(self, *args, **kwargs): 

2419 """ 

2420 Wrapper function for pipeline_execute_command 

2421 """ 

2422 return self._execution_strategy.execute_command(*args, **kwargs) 

2423 

2424 def pipeline_execute_command(self, *args, **options): 

2425 """ 

2426 Stage a command to be executed when execute() is next called 

2427 

2428 Returns the current Pipeline object back so commands can be 

2429 chained together, such as: 

2430 

2431 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

2432 

2433 At some other point, you can then run: pipe.execute(), 

2434 which will execute all commands queued in the pipe. 

2435 """ 

2436 return self._execution_strategy.execute_command(*args, **options) 

2437 

2438 def annotate_exception(self, exception, number, command): 

2439 """ 

2440 Provides extra context to the exception prior to it being handled 

2441 """ 

2442 self._execution_strategy.annotate_exception(exception, number, command) 

2443 

2444 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2445 """ 

2446 Execute all the commands in the current pipeline 

2447 """ 

2448 

2449 try: 

2450 return self._execution_strategy.execute(raise_on_error) 

2451 finally: 

2452 self.reset() 

2453 

2454 def reset(self): 

2455 """ 

2456 Reset back to empty pipeline. 

2457 """ 

2458 self._execution_strategy.reset() 

2459 

2460 def send_cluster_commands( 

2461 self, stack, raise_on_error=True, allow_redirections=True 

2462 ): 

2463 return self._execution_strategy.send_cluster_commands( 

2464 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections 

2465 ) 

2466 

2467 def exists(self, *keys): 

2468 return self._execution_strategy.exists(*keys) 

2469 

2470 def eval(self): 

2471 """ """ 

2472 return self._execution_strategy.eval() 

2473 

2474 def multi(self): 

2475 """ 

2476 Start a transactional block of the pipeline after WATCH commands 

2477 are issued. End the transactional block with `execute`. 

2478 """ 

2479 self._execution_strategy.multi() 

2480 

2481 def load_scripts(self): 

2482 """ """ 

2483 self._execution_strategy.load_scripts() 

2484 

2485 def discard(self): 

2486 """ """ 

2487 self._execution_strategy.discard() 

2488 

2489 def watch(self, *names): 

2490 """Watches the values at keys ``names``""" 

2491 self._execution_strategy.watch(*names) 

2492 

2493 def unwatch(self): 

2494 """Unwatches all previously specified keys""" 

2495 self._execution_strategy.unwatch() 

2496 

2497 def script_load_for_pipeline(self, *args, **kwargs): 

2498 self._execution_strategy.script_load_for_pipeline(*args, **kwargs) 

2499 

2500 def delete(self, *names): 

2501 self._execution_strategy.delete(*names) 

2502 

2503 def unlink(self, *names): 

2504 self._execution_strategy.unlink(*names) 

2505 

2506 

2507def block_pipeline_command(name: str) -> Callable[..., Any]: 

2508 """ 

2509 Prints error because some pipelined commands should 

2510 be blocked when running in cluster-mode 

2511 """ 

2512 

2513 def inner(*args, **kwargs): 

2514 raise RedisClusterException( 

2515 f"ERROR: Calling pipelined function {name} is blocked " 

2516 f"when running redis in cluster mode..." 

2517 ) 

2518 

2519 return inner 

2520 

2521 

2522# Blocked pipeline commands 

2523PIPELINE_BLOCKED_COMMANDS = ( 

2524 "BGREWRITEAOF", 

2525 "BGSAVE", 

2526 "BITOP", 

2527 "BRPOPLPUSH", 

2528 "CLIENT GETNAME", 

2529 "CLIENT KILL", 

2530 "CLIENT LIST", 

2531 "CLIENT SETNAME", 

2532 "CLIENT", 

2533 "CONFIG GET", 

2534 "CONFIG RESETSTAT", 

2535 "CONFIG REWRITE", 

2536 "CONFIG SET", 

2537 "CONFIG", 

2538 "DBSIZE", 

2539 "ECHO", 

2540 "EVALSHA", 

2541 "FLUSHALL", 

2542 "FLUSHDB", 

2543 "INFO", 

2544 "KEYS", 

2545 "LASTSAVE", 

2546 "MGET", 

2547 "MGET NONATOMIC", 

2548 "MOVE", 

2549 "MSET", 

2550 "MSETEX", 

2551 "MSET NONATOMIC", 

2552 "MSETNX", 

2553 "PFCOUNT", 

2554 "PFMERGE", 

2555 "PING", 

2556 "PUBLISH", 

2557 "RANDOMKEY", 

2558 "READONLY", 

2559 "READWRITE", 

2560 "RENAME", 

2561 "RENAMENX", 

2562 "RPOPLPUSH", 

2563 "SAVE", 

2564 "SCAN", 

2565 "SCRIPT EXISTS", 

2566 "SCRIPT FLUSH", 

2567 "SCRIPT KILL", 

2568 "SCRIPT LOAD", 

2569 "SCRIPT", 

2570 "SDIFF", 

2571 "SDIFFSTORE", 

2572 "SENTINEL GET MASTER ADDR BY NAME", 

2573 "SENTINEL MASTER", 

2574 "SENTINEL MASTERS", 

2575 "SENTINEL MONITOR", 

2576 "SENTINEL REMOVE", 

2577 "SENTINEL SENTINELS", 

2578 "SENTINEL SET", 

2579 "SENTINEL SLAVES", 

2580 "SENTINEL", 

2581 "SHUTDOWN", 

2582 "SINTER", 

2583 "SINTERSTORE", 

2584 "SLAVEOF", 

2585 "SLOWLOG GET", 

2586 "SLOWLOG LEN", 

2587 "SLOWLOG RESET", 

2588 "SLOWLOG", 

2589 "SMOVE", 

2590 "SORT", 

2591 "SUNION", 

2592 "SUNIONSTORE", 

2593 "TIME", 

2594) 

2595for command in PIPELINE_BLOCKED_COMMANDS: 

2596 command = command.replace(" ", "_").lower() 

2597 

2598 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

2599 

2600 

2601class PipelineCommand: 

2602 """ """ 

2603 

2604 def __init__(self, args, options=None, position=None): 

2605 self.args = args 

2606 if options is None: 

2607 options = {} 

2608 self.options = options 

2609 self.position = position 

2610 self.result = None 

2611 self.node = None 

2612 self.asking = False 

2613 self.command_policies: Optional[CommandPolicies] = None 

2614 

2615 

2616class NodeCommands: 

2617 """ """ 

2618 

2619 def __init__(self, parse_response, connection_pool, connection): 

2620 """ """ 

2621 self.parse_response = parse_response 

2622 self.connection_pool = connection_pool 

2623 self.connection = connection 

2624 self.commands = [] 

2625 

2626 def append(self, c): 

2627 """ """ 

2628 self.commands.append(c) 

2629 

2630 def write(self): 

2631 """ 

2632 Code borrowed from Redis so it can be fixed 

2633 """ 

2634 connection = self.connection 

2635 commands = self.commands 

2636 

2637 # We are going to clobber the commands with the write, so go ahead 

2638 # and ensure that nothing is sitting there from a previous run. 

2639 for c in commands: 

2640 c.result = None 

2641 

2642 # build up all commands into a single request to increase network perf 

2643 # send all the commands and catch connection and timeout errors. 

2644 try: 

2645 connection.send_packed_command( 

2646 connection.pack_commands([c.args for c in commands]) 

2647 ) 

2648 except (ConnectionError, TimeoutError) as e: 

2649 for c in commands: 

2650 c.result = e 

2651 

2652 def read(self): 

2653 """ """ 

2654 connection = self.connection 

2655 for c in self.commands: 

2656 # if there is a result on this command, 

2657 # it means we ran into an exception 

2658 # like a connection error. Trying to parse 

2659 # a response on a connection that 

2660 # is no longer open will result in a 

2661 # connection error raised by redis-py. 

2662 # but redis-py doesn't check in parse_response 

2663 # that the sock object is 

2664 # still set and if you try to 

2665 # read from a closed connection, it will 

2666 # result in an AttributeError because 

2667 # it will do a readline() call on None. 

2668 # This can have all kinds of nasty side-effects. 

2669 # Treating this case as a connection error 

2670 # is fine because it will dump 

2671 # the connection object back into the 

2672 # pool and on the next write, it will 

2673 # explicitly open the connection and all will be well. 

2674 if c.result is None: 

2675 try: 

2676 c.result = self.parse_response(connection, c.args[0], **c.options) 

2677 except (ConnectionError, TimeoutError) as e: 

2678 for c in self.commands: 

2679 c.result = e 

2680 return 

2681 except RedisError: 

2682 c.result = sys.exc_info()[1] 

2683 

2684 

2685class ExecutionStrategy(ABC): 

2686 @property 

2687 @abstractmethod 

2688 def command_queue(self): 

2689 pass 

2690 

2691 @abstractmethod 

2692 def execute_command(self, *args, **kwargs): 

2693 """ 

2694 Execution flow for current execution strategy. 

2695 

2696 See: ClusterPipeline.execute_command() 

2697 """ 

2698 pass 

2699 

2700 @abstractmethod 

2701 def annotate_exception(self, exception, number, command): 

2702 """ 

2703 Annotate exception according to current execution strategy. 

2704 

2705 See: ClusterPipeline.annotate_exception() 

2706 """ 

2707 pass 

2708 

2709 @abstractmethod 

2710 def pipeline_execute_command(self, *args, **options): 

2711 """ 

2712 Pipeline execution flow for current execution strategy. 

2713 

2714 See: ClusterPipeline.pipeline_execute_command() 

2715 """ 

2716 pass 

2717 

2718 @abstractmethod 

2719 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2720 """ 

2721 Executes current execution strategy. 

2722 

2723 See: ClusterPipeline.execute() 

2724 """ 

2725 pass 

2726 

2727 @abstractmethod 

2728 def send_cluster_commands( 

2729 self, stack, raise_on_error=True, allow_redirections=True 

2730 ): 

2731 """ 

2732 Sends commands according to current execution strategy. 

2733 

2734 See: ClusterPipeline.send_cluster_commands() 

2735 """ 

2736 pass 

2737 

2738 @abstractmethod 

2739 def reset(self): 

2740 """ 

2741 Resets current execution strategy. 

2742 

2743 See: ClusterPipeline.reset() 

2744 """ 

2745 pass 

2746 

2747 @abstractmethod 

2748 def exists(self, *keys): 

2749 pass 

2750 

2751 @abstractmethod 

2752 def eval(self): 

2753 pass 

2754 

2755 @abstractmethod 

2756 def multi(self): 

2757 """ 

2758 Starts transactional context. 

2759 

2760 See: ClusterPipeline.multi() 

2761 """ 

2762 pass 

2763 

2764 @abstractmethod 

2765 def load_scripts(self): 

2766 pass 

2767 

2768 @abstractmethod 

2769 def watch(self, *names): 

2770 pass 

2771 

2772 @abstractmethod 

2773 def unwatch(self): 

2774 """ 

2775 Unwatches all previously specified keys 

2776 

2777 See: ClusterPipeline.unwatch() 

2778 """ 

2779 pass 

2780 

2781 @abstractmethod 

2782 def script_load_for_pipeline(self, *args, **kwargs): 

2783 pass 

2784 

2785 @abstractmethod 

2786 def delete(self, *names): 

2787 """ 

2788 "Delete a key specified by ``names``" 

2789 

2790 See: ClusterPipeline.delete() 

2791 """ 

2792 pass 

2793 

2794 @abstractmethod 

2795 def unlink(self, *names): 

2796 """ 

2797 "Unlink a key specified by ``names``" 

2798 

2799 See: ClusterPipeline.unlink() 

2800 """ 

2801 pass 

2802 

2803 @abstractmethod 

2804 def discard(self): 

2805 pass 

2806 

2807 

2808class AbstractStrategy(ExecutionStrategy): 

2809 def __init__( 

2810 self, 

2811 pipe: ClusterPipeline, 

2812 ): 

2813 self._command_queue: List[PipelineCommand] = [] 

2814 self._pipe = pipe 

2815 self._nodes_manager = self._pipe.nodes_manager 

2816 

2817 @property 

2818 def command_queue(self): 

2819 return self._command_queue 

2820 

2821 @command_queue.setter 

2822 def command_queue(self, queue: List[PipelineCommand]): 

2823 self._command_queue = queue 

2824 

2825 @abstractmethod 

2826 def execute_command(self, *args, **kwargs): 

2827 pass 

2828 

2829 def pipeline_execute_command(self, *args, **options): 

2830 self._command_queue.append( 

2831 PipelineCommand(args, options, len(self._command_queue)) 

2832 ) 

2833 return self._pipe 

2834 

2835 @abstractmethod 

2836 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2837 pass 

2838 

2839 @abstractmethod 

2840 def send_cluster_commands( 

2841 self, stack, raise_on_error=True, allow_redirections=True 

2842 ): 

2843 pass 

2844 

2845 @abstractmethod 

2846 def reset(self): 

2847 pass 

2848 

2849 def exists(self, *keys): 

2850 return self.execute_command("EXISTS", *keys) 

2851 

2852 def eval(self): 

2853 """ """ 

2854 raise RedisClusterException("method eval() is not implemented") 

2855 

2856 def load_scripts(self): 

2857 """ """ 

2858 raise RedisClusterException("method load_scripts() is not implemented") 

2859 

2860 def script_load_for_pipeline(self, *args, **kwargs): 

2861 """ """ 

2862 raise RedisClusterException( 

2863 "method script_load_for_pipeline() is not implemented" 

2864 ) 

2865 

2866 def annotate_exception(self, exception, number, command): 

2867 """ 

2868 Provides extra context to the exception prior to it being handled 

2869 """ 

2870 cmd = " ".join(map(safe_str, command)) 

2871 msg = ( 

2872 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

2873 f"caused error: {exception.args[0]}" 

2874 ) 

2875 exception.args = (msg,) + exception.args[1:] 

2876 

2877 

2878class PipelineStrategy(AbstractStrategy): 

2879 def __init__(self, pipe: ClusterPipeline): 

2880 super().__init__(pipe) 

2881 self.command_flags = pipe.command_flags 

2882 

2883 def execute_command(self, *args, **kwargs): 

2884 return self.pipeline_execute_command(*args, **kwargs) 

2885 

2886 def _raise_first_error(self, stack): 

2887 """ 

2888 Raise the first exception on the stack 

2889 """ 

2890 for c in stack: 

2891 r = c.result 

2892 if isinstance(r, Exception): 

2893 self.annotate_exception(r, c.position + 1, c.args) 

2894 raise r 

2895 

2896 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2897 stack = self._command_queue 

2898 if not stack: 

2899 return [] 

2900 

2901 try: 

2902 return self.send_cluster_commands(stack, raise_on_error) 

2903 finally: 

2904 self.reset() 

2905 

2906 def reset(self): 

2907 """ 

2908 Reset back to empty pipeline. 

2909 """ 

2910 self._command_queue = [] 

2911 

2912 def send_cluster_commands( 

2913 self, stack, raise_on_error=True, allow_redirections=True 

2914 ): 

2915 """ 

2916 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling. 

2917 

2918 If one of the retryable exceptions has been thrown we assume that: 

2919 - connection_pool was disconnected 

2920 - connection_pool was reset 

2921 - refresh_table_asap set to True 

2922 

2923 It will try the number of times specified by 

2924 the retries in config option "self.retry" 

2925 which defaults to 3 unless manually configured. 

2926 

2927 If it reaches the number of times, the command will 

2928 raises ClusterDownException. 

2929 """ 

2930 if not stack: 

2931 return [] 

2932 retry_attempts = self._pipe.retry.get_retries() 

2933 while True: 

2934 try: 

2935 return self._send_cluster_commands( 

2936 stack, 

2937 raise_on_error=raise_on_error, 

2938 allow_redirections=allow_redirections, 

2939 ) 

2940 except RedisCluster.ERRORS_ALLOW_RETRY as e: 

2941 if retry_attempts > 0: 

2942 # Try again with the new cluster setup. All other errors 

2943 # should be raised. 

2944 retry_attempts -= 1 

2945 pass 

2946 else: 

2947 raise e 

2948 

2949 def _send_cluster_commands( 

2950 self, stack, raise_on_error=True, allow_redirections=True 

2951 ): 

2952 """ 

2953 Send a bunch of cluster commands to the redis cluster. 

2954 

2955 `allow_redirections` If the pipeline should follow 

2956 `ASK` & `MOVED` responses automatically. If set 

2957 to false it will raise RedisClusterException. 

2958 """ 

2959 # the first time sending the commands we send all of 

2960 # the commands that were queued up. 

2961 # if we have to run through it again, we only retry 

2962 # the commands that failed. 

2963 attempt = sorted(stack, key=lambda x: x.position) 

2964 is_default_node = False 

2965 # build a list of node objects based on node names we need to 

2966 nodes = {} 

2967 

2968 # as we move through each command that still needs to be processed, 

2969 # we figure out the slot number that command maps to, then from 

2970 # the slot determine the node. 

2971 for c in attempt: 

2972 command_policies = self._pipe._policy_resolver.resolve(c.args[0].lower()) 

2973 

2974 while True: 

2975 # refer to our internal node -> slot table that 

2976 # tells us where a given command should route to. 

2977 # (it might be possible we have a cached node that no longer 

2978 # exists in the cluster, which is why we do this in a loop) 

2979 passed_targets = c.options.pop("target_nodes", None) 

2980 if passed_targets and not self._is_nodes_flag(passed_targets): 

2981 target_nodes = self._parse_target_nodes(passed_targets) 

2982 

2983 if not command_policies: 

2984 command_policies = CommandPolicies() 

2985 else: 

2986 if not command_policies: 

2987 command = c.args[0].upper() 

2988 if ( 

2989 len(c.args) >= 2 

2990 and f"{c.args[0]} {c.args[1]}".upper() 

2991 in self._pipe.command_flags 

2992 ): 

2993 command = f"{c.args[0]} {c.args[1]}".upper() 

2994 

2995 # We only could resolve key properties if command is not 

2996 # in a list of pre-defined request policies 

2997 command_flag = self.command_flags.get(command) 

2998 if not command_flag: 

2999 # Fallback to default policy 

3000 if not self._pipe.get_default_node(): 

3001 keys = None 

3002 else: 

3003 keys = self._pipe._get_command_keys(*c.args) 

3004 if not keys or len(keys) == 0: 

3005 command_policies = CommandPolicies() 

3006 else: 

3007 command_policies = CommandPolicies( 

3008 request_policy=RequestPolicy.DEFAULT_KEYED, 

3009 response_policy=ResponsePolicy.DEFAULT_KEYED, 

3010 ) 

3011 else: 

3012 if command_flag in self._pipe._command_flags_mapping: 

3013 command_policies = CommandPolicies( 

3014 request_policy=self._pipe._command_flags_mapping[ 

3015 command_flag 

3016 ] 

3017 ) 

3018 else: 

3019 command_policies = CommandPolicies() 

3020 

3021 target_nodes = self._determine_nodes( 

3022 *c.args, 

3023 request_policy=command_policies.request_policy, 

3024 node_flag=passed_targets, 

3025 ) 

3026 if not target_nodes: 

3027 raise RedisClusterException( 

3028 f"No targets were found to execute {c.args} command on" 

3029 ) 

3030 c.command_policies = command_policies 

3031 if len(target_nodes) > 1: 

3032 raise RedisClusterException( 

3033 f"Too many targets for command {c.args}" 

3034 ) 

3035 

3036 node = target_nodes[0] 

3037 if node == self._pipe.get_default_node(): 

3038 is_default_node = True 

3039 

3040 # now that we know the name of the node 

3041 # ( it's just a string in the form of host:port ) 

3042 # we can build a list of commands for each node. 

3043 node_name = node.name 

3044 if node_name not in nodes: 

3045 redis_node = self._pipe.get_redis_connection(node) 

3046 try: 

3047 connection = get_connection(redis_node) 

3048 except (ConnectionError, TimeoutError): 

3049 for n in nodes.values(): 

3050 n.connection_pool.release(n.connection) 

3051 # Connection retries are being handled in the node's 

3052 # Retry object. Reinitialize the node -> slot table. 

3053 self._nodes_manager.initialize() 

3054 if is_default_node: 

3055 self._pipe.replace_default_node() 

3056 raise 

3057 nodes[node_name] = NodeCommands( 

3058 redis_node.parse_response, 

3059 redis_node.connection_pool, 

3060 connection, 

3061 ) 

3062 nodes[node_name].append(c) 

3063 break 

3064 

3065 # send the commands in sequence. 

3066 # we write to all the open sockets for each node first, 

3067 # before reading anything 

3068 # this allows us to flush all the requests out across the 

3069 # network 

3070 # so that we can read them from different sockets as they come back. 

3071 # we dont' multiplex on the sockets as they come available, 

3072 # but that shouldn't make too much difference. 

3073 try: 

3074 node_commands = nodes.values() 

3075 for n in node_commands: 

3076 n.write() 

3077 

3078 for n in node_commands: 

3079 n.read() 

3080 finally: 

3081 # release all of the redis connections we allocated earlier 

3082 # back into the connection pool. 

3083 # we used to do this step as part of a try/finally block, 

3084 # but it is really dangerous to 

3085 # release connections back into the pool if for some 

3086 # reason the socket has data still left in it 

3087 # from a previous operation. The write and 

3088 # read operations already have try/catch around them for 

3089 # all known types of errors including connection 

3090 # and socket level errors. 

3091 # So if we hit an exception, something really bad 

3092 # happened and putting any oF 

3093 # these connections back into the pool is a very bad idea. 

3094 # the socket might have unread buffer still sitting in it, 

3095 # and then the next time we read from it we pass the 

3096 # buffered result back from a previous command and 

3097 # every single request after to that connection will always get 

3098 # a mismatched result. 

3099 for n in nodes.values(): 

3100 n.connection_pool.release(n.connection) 

3101 

3102 # if the response isn't an exception it is a 

3103 # valid response from the node 

3104 # we're all done with that command, YAY! 

3105 # if we have more commands to attempt, we've run into problems. 

3106 # collect all the commands we are allowed to retry. 

3107 # (MOVED, ASK, or connection errors or timeout errors) 

3108 attempt = sorted( 

3109 ( 

3110 c 

3111 for c in attempt 

3112 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

3113 ), 

3114 key=lambda x: x.position, 

3115 ) 

3116 if attempt and allow_redirections: 

3117 # RETRY MAGIC HAPPENS HERE! 

3118 # send these remaining commands one at a time using `execute_command` 

3119 # in the main client. This keeps our retry logic 

3120 # in one place mostly, 

3121 # and allows us to be more confident in correctness of behavior. 

3122 # at this point any speed gains from pipelining have been lost 

3123 # anyway, so we might as well make the best 

3124 # attempt to get the correct behavior. 

3125 # 

3126 # The client command will handle retries for each 

3127 # individual command sequentially as we pass each 

3128 # one into `execute_command`. Any exceptions 

3129 # that bubble out should only appear once all 

3130 # retries have been exhausted. 

3131 # 

3132 # If a lot of commands have failed, we'll be setting the 

3133 # flag to rebuild the slots table from scratch. 

3134 # So MOVED errors should correct themselves fairly quickly. 

3135 self._pipe.reinitialize_counter += 1 

3136 if self._pipe._should_reinitialized(): 

3137 self._nodes_manager.initialize() 

3138 if is_default_node: 

3139 self._pipe.replace_default_node() 

3140 for c in attempt: 

3141 try: 

3142 # send each command individually like we 

3143 # do in the main client. 

3144 c.result = self._pipe.parent_execute_command(*c.args, **c.options) 

3145 except RedisError as e: 

3146 c.result = e 

3147 

3148 # turn the response back into a simple flat array that corresponds 

3149 # to the sequence of commands issued in the stack in pipeline.execute() 

3150 response = [] 

3151 for c in sorted(stack, key=lambda x: x.position): 

3152 if c.args[0] in self._pipe.cluster_response_callbacks: 

3153 # Remove keys entry, it needs only for cache. 

3154 c.options.pop("keys", None) 

3155 c.result = self._pipe._policies_callback_mapping[ 

3156 c.command_policies.response_policy 

3157 ]( 

3158 self._pipe.cluster_response_callbacks[c.args[0]]( 

3159 c.result, **c.options 

3160 ) 

3161 ) 

3162 response.append(c.result) 

3163 

3164 if raise_on_error: 

3165 self._raise_first_error(stack) 

3166 

3167 return response 

3168 

3169 def _is_nodes_flag(self, target_nodes): 

3170 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags 

3171 

3172 def _parse_target_nodes(self, target_nodes): 

3173 if isinstance(target_nodes, list): 

3174 nodes = target_nodes 

3175 elif isinstance(target_nodes, ClusterNode): 

3176 # Supports passing a single ClusterNode as a variable 

3177 nodes = [target_nodes] 

3178 elif isinstance(target_nodes, dict): 

3179 # Supports dictionaries of the format {node_name: node}. 

3180 # It enables to execute commands with multi nodes as follows: 

3181 # rc.cluster_save_config(rc.get_primaries()) 

3182 nodes = target_nodes.values() 

3183 else: 

3184 raise TypeError( 

3185 "target_nodes type can be one of the following: " 

3186 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

3187 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

3188 f"The passed type is {type(target_nodes)}" 

3189 ) 

3190 return nodes 

3191 

3192 def _determine_nodes( 

3193 self, *args, request_policy: RequestPolicy, **kwargs 

3194 ) -> List["ClusterNode"]: 

3195 # Determine which nodes should be executed the command on. 

3196 # Returns a list of target nodes. 

3197 command = args[0].upper() 

3198 if ( 

3199 len(args) >= 2 

3200 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags 

3201 ): 

3202 command = f"{args[0]} {args[1]}".upper() 

3203 

3204 nodes_flag = kwargs.pop("nodes_flag", None) 

3205 if nodes_flag is not None: 

3206 # nodes flag passed by the user 

3207 command_flag = nodes_flag 

3208 else: 

3209 # get the nodes group for this command if it was predefined 

3210 command_flag = self._pipe.command_flags.get(command) 

3211 

3212 if command_flag in self._pipe._command_flags_mapping: 

3213 request_policy = self._pipe._command_flags_mapping[command_flag] 

3214 

3215 policy_callback = self._pipe._policies_callback_mapping[request_policy] 

3216 

3217 if request_policy == RequestPolicy.DEFAULT_KEYED: 

3218 nodes = policy_callback(command, *args) 

3219 elif request_policy == RequestPolicy.MULTI_SHARD: 

3220 nodes = policy_callback(*args, **kwargs) 

3221 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

3222 nodes = policy_callback(args[0]) 

3223 else: 

3224 nodes = policy_callback() 

3225 

3226 if args[0].lower() == "ft.aggregate": 

3227 self._aggregate_nodes = nodes 

3228 

3229 return nodes 

3230 

3231 def multi(self): 

3232 raise RedisClusterException( 

3233 "method multi() is not supported outside of transactional context" 

3234 ) 

3235 

3236 def discard(self): 

3237 raise RedisClusterException( 

3238 "method discard() is not supported outside of transactional context" 

3239 ) 

3240 

3241 def watch(self, *names): 

3242 raise RedisClusterException( 

3243 "method watch() is not supported outside of transactional context" 

3244 ) 

3245 

3246 def unwatch(self, *names): 

3247 raise RedisClusterException( 

3248 "method unwatch() is not supported outside of transactional context" 

3249 ) 

3250 

3251 def delete(self, *names): 

3252 if len(names) != 1: 

3253 raise RedisClusterException( 

3254 "deleting multiple keys is not implemented in pipeline command" 

3255 ) 

3256 

3257 return self.execute_command("DEL", names[0]) 

3258 

3259 def unlink(self, *names): 

3260 if len(names) != 1: 

3261 raise RedisClusterException( 

3262 "unlinking multiple keys is not implemented in pipeline command" 

3263 ) 

3264 

3265 return self.execute_command("UNLINK", names[0]) 

3266 

3267 

3268class TransactionStrategy(AbstractStrategy): 

3269 NO_SLOTS_COMMANDS = {"UNWATCH"} 

3270 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

3271 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

3272 SLOT_REDIRECT_ERRORS = (AskError, MovedError) 

3273 CONNECTION_ERRORS = ( 

3274 ConnectionError, 

3275 OSError, 

3276 ClusterDownError, 

3277 SlotNotCoveredError, 

3278 ) 

3279 

3280 def __init__(self, pipe: ClusterPipeline): 

3281 super().__init__(pipe) 

3282 self._explicit_transaction = False 

3283 self._watching = False 

3284 self._pipeline_slots: Set[int] = set() 

3285 self._transaction_connection: Optional[Connection] = None 

3286 self._executing = False 

3287 self._retry = copy(self._pipe.retry) 

3288 self._retry.update_supported_errors( 

3289 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS 

3290 ) 

3291 

3292 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]: 

3293 """ 

3294 Find a connection for a pipeline transaction. 

3295 

3296 For running an atomic transaction, watch keys ensure that contents have not been 

3297 altered as long as the watch commands for those keys were sent over the same 

3298 connection. So once we start watching a key, we fetch a connection to the 

3299 node that owns that slot and reuse it. 

3300 """ 

3301 if not self._pipeline_slots: 

3302 raise RedisClusterException( 

3303 "At least a command with a key is needed to identify a node" 

3304 ) 

3305 

3306 node: ClusterNode = self._nodes_manager.get_node_from_slot( 

3307 list(self._pipeline_slots)[0], False 

3308 ) 

3309 redis_node: Redis = self._pipe.get_redis_connection(node) 

3310 if self._transaction_connection: 

3311 if not redis_node.connection_pool.owns_connection( 

3312 self._transaction_connection 

3313 ): 

3314 previous_node = self._nodes_manager.find_connection_owner( 

3315 self._transaction_connection 

3316 ) 

3317 previous_node.connection_pool.release(self._transaction_connection) 

3318 self._transaction_connection = None 

3319 

3320 if not self._transaction_connection: 

3321 self._transaction_connection = get_connection(redis_node) 

3322 

3323 return redis_node, self._transaction_connection 

3324 

3325 def execute_command(self, *args, **kwargs): 

3326 slot_number: Optional[int] = None 

3327 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS: 

3328 slot_number = self._pipe.determine_slot(*args) 

3329 

3330 if ( 

3331 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS 

3332 ) and not self._explicit_transaction: 

3333 if args[0] == "WATCH": 

3334 self._validate_watch() 

3335 

3336 if slot_number is not None: 

3337 if self._pipeline_slots and slot_number not in self._pipeline_slots: 

3338 raise CrossSlotTransactionError( 

3339 "Cannot watch or send commands on different slots" 

3340 ) 

3341 

3342 self._pipeline_slots.add(slot_number) 

3343 elif args[0] not in self.NO_SLOTS_COMMANDS: 

3344 raise RedisClusterException( 

3345 f"Cannot identify slot number for command: {args[0]}," 

3346 "it cannot be triggered in a transaction" 

3347 ) 

3348 

3349 return self._immediate_execute_command(*args, **kwargs) 

3350 else: 

3351 if slot_number is not None: 

3352 self._pipeline_slots.add(slot_number) 

3353 

3354 return self.pipeline_execute_command(*args, **kwargs) 

3355 

3356 def _validate_watch(self): 

3357 if self._explicit_transaction: 

3358 raise RedisError("Cannot issue a WATCH after a MULTI") 

3359 

3360 self._watching = True 

3361 

3362 def _immediate_execute_command(self, *args, **options): 

3363 return self._retry.call_with_retry( 

3364 lambda: self._get_connection_and_send_command(*args, **options), 

3365 self._reinitialize_on_error, 

3366 ) 

3367 

3368 def _get_connection_and_send_command(self, *args, **options): 

3369 redis_node, connection = self._get_client_and_connection_for_transaction() 

3370 return self._send_command_parse_response( 

3371 connection, redis_node, args[0], *args, **options 

3372 ) 

3373 

3374 def _send_command_parse_response( 

3375 self, conn, redis_node: Redis, command_name, *args, **options 

3376 ): 

3377 """ 

3378 Send a command and parse the response 

3379 """ 

3380 

3381 conn.send_command(*args) 

3382 output = redis_node.parse_response(conn, command_name, **options) 

3383 

3384 if command_name in self.UNWATCH_COMMANDS: 

3385 self._watching = False 

3386 return output 

3387 

3388 def _reinitialize_on_error(self, error): 

3389 if self._watching: 

3390 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: 

3391 raise WatchError("Slot rebalancing occurred while watching keys") 

3392 

3393 if ( 

3394 type(error) in self.SLOT_REDIRECT_ERRORS 

3395 or type(error) in self.CONNECTION_ERRORS 

3396 ): 

3397 if self._transaction_connection: 

3398 self._transaction_connection = None 

3399 

3400 self._pipe.reinitialize_counter += 1 

3401 if self._pipe._should_reinitialized(): 

3402 self._nodes_manager.initialize() 

3403 self.reinitialize_counter = 0 

3404 else: 

3405 if isinstance(error, AskError): 

3406 self._nodes_manager.update_moved_exception(error) 

3407 

3408 self._executing = False 

3409 

3410 def _raise_first_error(self, responses, stack): 

3411 """ 

3412 Raise the first exception on the stack 

3413 """ 

3414 for r, cmd in zip(responses, stack): 

3415 if isinstance(r, Exception): 

3416 self.annotate_exception(r, cmd.position + 1, cmd.args) 

3417 raise r 

3418 

3419 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3420 stack = self._command_queue 

3421 if not stack and (not self._watching or not self._pipeline_slots): 

3422 return [] 

3423 

3424 return self._execute_transaction_with_retries(stack, raise_on_error) 

3425 

3426 def _execute_transaction_with_retries( 

3427 self, stack: List["PipelineCommand"], raise_on_error: bool 

3428 ): 

3429 return self._retry.call_with_retry( 

3430 lambda: self._execute_transaction(stack, raise_on_error), 

3431 self._reinitialize_on_error, 

3432 ) 

3433 

3434 def _execute_transaction( 

3435 self, stack: List["PipelineCommand"], raise_on_error: bool 

3436 ): 

3437 if len(self._pipeline_slots) > 1: 

3438 raise CrossSlotTransactionError( 

3439 "All keys involved in a cluster transaction must map to the same slot" 

3440 ) 

3441 

3442 self._executing = True 

3443 

3444 redis_node, connection = self._get_client_and_connection_for_transaction() 

3445 

3446 stack = chain( 

3447 [PipelineCommand(("MULTI",))], 

3448 stack, 

3449 [PipelineCommand(("EXEC",))], 

3450 ) 

3451 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options] 

3452 packed_commands = connection.pack_commands(commands) 

3453 connection.send_packed_command(packed_commands) 

3454 errors = [] 

3455 

3456 # parse off the response for MULTI 

3457 # NOTE: we need to handle ResponseErrors here and continue 

3458 # so that we read all the additional command messages from 

3459 # the socket 

3460 try: 

3461 redis_node.parse_response(connection, "MULTI") 

3462 except ResponseError as e: 

3463 self.annotate_exception(e, 0, "MULTI") 

3464 errors.append(e) 

3465 except self.CONNECTION_ERRORS as cluster_error: 

3466 self.annotate_exception(cluster_error, 0, "MULTI") 

3467 raise 

3468 

3469 # and all the other commands 

3470 for i, command in enumerate(self._command_queue): 

3471 if EMPTY_RESPONSE in command.options: 

3472 errors.append((i, command.options[EMPTY_RESPONSE])) 

3473 else: 

3474 try: 

3475 _ = redis_node.parse_response(connection, "_") 

3476 except self.SLOT_REDIRECT_ERRORS as slot_error: 

3477 self.annotate_exception(slot_error, i + 1, command.args) 

3478 errors.append(slot_error) 

3479 except self.CONNECTION_ERRORS as cluster_error: 

3480 self.annotate_exception(cluster_error, i + 1, command.args) 

3481 raise 

3482 except ResponseError as e: 

3483 self.annotate_exception(e, i + 1, command.args) 

3484 errors.append(e) 

3485 

3486 response = None 

3487 # parse the EXEC. 

3488 try: 

3489 response = redis_node.parse_response(connection, "EXEC") 

3490 except ExecAbortError: 

3491 if errors: 

3492 raise errors[0] 

3493 raise 

3494 

3495 self._executing = False 

3496 

3497 # EXEC clears any watched keys 

3498 self._watching = False 

3499 

3500 if response is None: 

3501 raise WatchError("Watched variable changed.") 

3502 

3503 # put any parse errors into the response 

3504 for i, e in errors: 

3505 response.insert(i, e) 

3506 

3507 if len(response) != len(self._command_queue): 

3508 raise InvalidPipelineStack( 

3509 "Unexpected response length for cluster pipeline EXEC." 

3510 " Command stack was {} but response had length {}".format( 

3511 [c.args[0] for c in self._command_queue], len(response) 

3512 ) 

3513 ) 

3514 

3515 # find any errors in the response and raise if necessary 

3516 if raise_on_error or len(errors) > 0: 

3517 self._raise_first_error( 

3518 response, 

3519 self._command_queue, 

3520 ) 

3521 

3522 # We have to run response callbacks manually 

3523 data = [] 

3524 for r, cmd in zip(response, self._command_queue): 

3525 if not isinstance(r, Exception): 

3526 command_name = cmd.args[0] 

3527 if command_name in self._pipe.cluster_response_callbacks: 

3528 r = self._pipe.cluster_response_callbacks[command_name]( 

3529 r, **cmd.options 

3530 ) 

3531 data.append(r) 

3532 return data 

3533 

3534 def reset(self): 

3535 self._command_queue = [] 

3536 

3537 # make sure to reset the connection state in the event that we were 

3538 # watching something 

3539 if self._transaction_connection: 

3540 try: 

3541 if self._watching: 

3542 # call this manually since our unwatch or 

3543 # immediate_execute_command methods can call reset() 

3544 self._transaction_connection.send_command("UNWATCH") 

3545 self._transaction_connection.read_response() 

3546 # we can safely return the connection to the pool here since we're 

3547 # sure we're no longer WATCHing anything 

3548 node = self._nodes_manager.find_connection_owner( 

3549 self._transaction_connection 

3550 ) 

3551 node.redis_connection.connection_pool.release( 

3552 self._transaction_connection 

3553 ) 

3554 self._transaction_connection = None 

3555 except self.CONNECTION_ERRORS: 

3556 # disconnect will also remove any previous WATCHes 

3557 if self._transaction_connection: 

3558 self._transaction_connection.disconnect() 

3559 

3560 # clean up the other instance attributes 

3561 self._watching = False 

3562 self._explicit_transaction = False 

3563 self._pipeline_slots = set() 

3564 self._executing = False 

3565 

3566 def send_cluster_commands( 

3567 self, stack, raise_on_error=True, allow_redirections=True 

3568 ): 

3569 raise NotImplementedError( 

3570 "send_cluster_commands cannot be executed in transactional context." 

3571 ) 

3572 

3573 def multi(self): 

3574 if self._explicit_transaction: 

3575 raise RedisError("Cannot issue nested calls to MULTI") 

3576 if self._command_queue: 

3577 raise RedisError( 

3578 "Commands without an initial WATCH have already been issued" 

3579 ) 

3580 self._explicit_transaction = True 

3581 

3582 def watch(self, *names): 

3583 if self._explicit_transaction: 

3584 raise RedisError("Cannot issue a WATCH after a MULTI") 

3585 

3586 return self.execute_command("WATCH", *names) 

3587 

3588 def unwatch(self): 

3589 if self._watching: 

3590 return self.execute_command("UNWATCH") 

3591 

3592 return True 

3593 

3594 def discard(self): 

3595 self.reset() 

3596 

3597 def delete(self, *names): 

3598 return self.execute_command("DEL", *names) 

3599 

3600 def unlink(self, *names): 

3601 return self.execute_command("UNLINK", *names)