Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1423 statements  

1import random 

2import socket 

3import sys 

4import threading 

5import time 

6from abc import ABC, abstractmethod 

7from collections import OrderedDict 

8from copy import copy 

9from enum import Enum 

10from itertools import chain 

11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union 

12 

13from redis._parsers import CommandsParser, Encoder 

14from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy 

15from redis._parsers.helpers import parse_scan 

16from redis.backoff import ExponentialWithJitterBackoff, NoBackoff 

17from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface 

18from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis 

19from redis.commands import READ_COMMANDS, RedisClusterCommands 

20from redis.commands.helpers import list_or_args 

21from redis.commands.policies import PolicyResolver, StaticPolicyResolver 

22from redis.connection import ( 

23 Connection, 

24 ConnectionPool, 

25 parse_url, 

26) 

27from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

28from redis.event import ( 

29 AfterPooledConnectionsInstantiationEvent, 

30 AfterPubSubConnectionInstantiationEvent, 

31 ClientType, 

32 EventDispatcher, 

33) 

34from redis.exceptions import ( 

35 AskError, 

36 AuthenticationError, 

37 ClusterDownError, 

38 ClusterError, 

39 ConnectionError, 

40 CrossSlotTransactionError, 

41 DataError, 

42 ExecAbortError, 

43 InvalidPipelineStack, 

44 MaxConnectionsError, 

45 MovedError, 

46 RedisClusterException, 

47 RedisError, 

48 ResponseError, 

49 SlotNotCoveredError, 

50 TimeoutError, 

51 TryAgainError, 

52 WatchError, 

53) 

54from redis.lock import Lock 

55from redis.maint_notifications import MaintNotificationsConfig 

56from redis.retry import Retry 

57from redis.utils import ( 

58 deprecated_args, 

59 dict_merge, 

60 list_keys_to_dict, 

61 merge_result, 

62 safe_str, 

63 str_if_bytes, 

64 truncate_text, 

65) 

66 

67 

68def get_node_name(host: str, port: Union[str, int]) -> str: 

69 return f"{host}:{port}" 

70 

71 

72@deprecated_args( 

73 allowed_args=["redis_node"], 

74 reason="Use get_connection(redis_node) instead", 

75 version="5.3.0", 

76) 

77def get_connection(redis_node: Redis, *args, **options) -> Connection: 

78 return redis_node.connection or redis_node.connection_pool.get_connection() 

79 

80 

81def parse_scan_result(command, res, **options): 

82 cursors = {} 

83 ret = [] 

84 for node_name, response in res.items(): 

85 cursor, r = parse_scan(response, **options) 

86 cursors[node_name] = cursor 

87 ret += r 

88 

89 return cursors, ret 

90 

91 

92def parse_pubsub_numsub(command, res, **options): 

93 numsub_d = OrderedDict() 

94 for numsub_tups in res.values(): 

95 for channel, numsubbed in numsub_tups: 

96 try: 

97 numsub_d[channel] += numsubbed 

98 except KeyError: 

99 numsub_d[channel] = numsubbed 

100 

101 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

102 return ret_numsub 

103 

104 

105def parse_cluster_slots( 

106 resp: Any, **options: Any 

107) -> Dict[Tuple[int, int], Dict[str, Any]]: 

108 current_host = options.get("current_host", "") 

109 

110 def fix_server(*args: Any) -> Tuple[str, Any]: 

111 return str_if_bytes(args[0]) or current_host, args[1] 

112 

113 slots = {} 

114 for slot in resp: 

115 start, end, primary = slot[:3] 

116 replicas = slot[3:] 

117 slots[start, end] = { 

118 "primary": fix_server(*primary), 

119 "replicas": [fix_server(*replica) for replica in replicas], 

120 } 

121 

122 return slots 

123 

124 

125def parse_cluster_shards(resp, **options): 

126 """ 

127 Parse CLUSTER SHARDS response. 

128 """ 

129 if isinstance(resp[0], dict): 

130 return resp 

131 shards = [] 

132 for x in resp: 

133 shard = {"slots": [], "nodes": []} 

134 for i in range(0, len(x[1]), 2): 

135 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

136 nodes = x[3] 

137 for node in nodes: 

138 dict_node = {} 

139 for i in range(0, len(node), 2): 

140 dict_node[node[i]] = node[i + 1] 

141 shard["nodes"].append(dict_node) 

142 shards.append(shard) 

143 

144 return shards 

145 

146 

147def parse_cluster_myshardid(resp, **options): 

148 """ 

149 Parse CLUSTER MYSHARDID response. 

150 """ 

151 return resp.decode("utf-8") 

152 

153 

154PRIMARY = "primary" 

155REPLICA = "replica" 

156SLOT_ID = "slot-id" 

157 

158REDIS_ALLOWED_KEYS = ( 

159 "connection_class", 

160 "connection_pool", 

161 "connection_pool_class", 

162 "client_name", 

163 "credential_provider", 

164 "db", 

165 "decode_responses", 

166 "encoding", 

167 "encoding_errors", 

168 "host", 

169 "lib_name", 

170 "lib_version", 

171 "max_connections", 

172 "nodes_flag", 

173 "redis_connect_func", 

174 "password", 

175 "port", 

176 "timeout", 

177 "queue_class", 

178 "retry", 

179 "retry_on_timeout", 

180 "protocol", 

181 "socket_connect_timeout", 

182 "socket_keepalive", 

183 "socket_keepalive_options", 

184 "socket_timeout", 

185 "ssl", 

186 "ssl_ca_certs", 

187 "ssl_ca_data", 

188 "ssl_ca_path", 

189 "ssl_certfile", 

190 "ssl_cert_reqs", 

191 "ssl_include_verify_flags", 

192 "ssl_exclude_verify_flags", 

193 "ssl_keyfile", 

194 "ssl_password", 

195 "ssl_check_hostname", 

196 "unix_socket_path", 

197 "username", 

198 "cache", 

199 "cache_config", 

200) 

201KWARGS_DISABLED_KEYS = ("host", "port", "retry") 

202 

203 

204def cleanup_kwargs(**kwargs): 

205 """ 

206 Remove unsupported or disabled keys from kwargs 

207 """ 

208 connection_kwargs = { 

209 k: v 

210 for k, v in kwargs.items() 

211 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

212 } 

213 

214 return connection_kwargs 

215 

216 

217class AbstractRedisCluster: 

218 RedisClusterRequestTTL = 16 

219 

220 PRIMARIES = "primaries" 

221 REPLICAS = "replicas" 

222 ALL_NODES = "all" 

223 RANDOM = "random" 

224 DEFAULT_NODE = "default-node" 

225 

226 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

227 

228 COMMAND_FLAGS = dict_merge( 

229 list_keys_to_dict( 

230 [ 

231 "ACL CAT", 

232 "ACL DELUSER", 

233 "ACL DRYRUN", 

234 "ACL GENPASS", 

235 "ACL GETUSER", 

236 "ACL HELP", 

237 "ACL LIST", 

238 "ACL LOG", 

239 "ACL LOAD", 

240 "ACL SAVE", 

241 "ACL SETUSER", 

242 "ACL USERS", 

243 "ACL WHOAMI", 

244 "AUTH", 

245 "CLIENT LIST", 

246 "CLIENT SETINFO", 

247 "CLIENT SETNAME", 

248 "CLIENT GETNAME", 

249 "CONFIG SET", 

250 "CONFIG REWRITE", 

251 "CONFIG RESETSTAT", 

252 "TIME", 

253 "PUBSUB CHANNELS", 

254 "PUBSUB NUMPAT", 

255 "PUBSUB NUMSUB", 

256 "PUBSUB SHARDCHANNELS", 

257 "PUBSUB SHARDNUMSUB", 

258 "PING", 

259 "INFO", 

260 "SHUTDOWN", 

261 "KEYS", 

262 "DBSIZE", 

263 "BGSAVE", 

264 "SLOWLOG GET", 

265 "SLOWLOG LEN", 

266 "SLOWLOG RESET", 

267 "WAIT", 

268 "WAITAOF", 

269 "SAVE", 

270 "MEMORY PURGE", 

271 "MEMORY MALLOC-STATS", 

272 "MEMORY STATS", 

273 "LASTSAVE", 

274 "CLIENT TRACKINGINFO", 

275 "CLIENT PAUSE", 

276 "CLIENT UNPAUSE", 

277 "CLIENT UNBLOCK", 

278 "CLIENT ID", 

279 "CLIENT REPLY", 

280 "CLIENT GETREDIR", 

281 "CLIENT INFO", 

282 "CLIENT KILL", 

283 "READONLY", 

284 "CLUSTER INFO", 

285 "CLUSTER MEET", 

286 "CLUSTER MYSHARDID", 

287 "CLUSTER NODES", 

288 "CLUSTER REPLICAS", 

289 "CLUSTER RESET", 

290 "CLUSTER SET-CONFIG-EPOCH", 

291 "CLUSTER SLOTS", 

292 "CLUSTER SHARDS", 

293 "CLUSTER COUNT-FAILURE-REPORTS", 

294 "CLUSTER KEYSLOT", 

295 "COMMAND", 

296 "COMMAND COUNT", 

297 "COMMAND LIST", 

298 "COMMAND GETKEYS", 

299 "CONFIG GET", 

300 "DEBUG", 

301 "RANDOMKEY", 

302 "READONLY", 

303 "READWRITE", 

304 "TIME", 

305 "TFUNCTION LOAD", 

306 "TFUNCTION DELETE", 

307 "TFUNCTION LIST", 

308 "TFCALL", 

309 "TFCALLASYNC", 

310 "LATENCY HISTORY", 

311 "LATENCY LATEST", 

312 "LATENCY RESET", 

313 "MODULE LIST", 

314 "MODULE LOAD", 

315 "MODULE UNLOAD", 

316 "MODULE LOADEX", 

317 ], 

318 DEFAULT_NODE, 

319 ), 

320 list_keys_to_dict( 

321 [ 

322 "FLUSHALL", 

323 "FLUSHDB", 

324 "FUNCTION DELETE", 

325 "FUNCTION FLUSH", 

326 "FUNCTION LIST", 

327 "FUNCTION LOAD", 

328 "FUNCTION RESTORE", 

329 "SCAN", 

330 "SCRIPT EXISTS", 

331 "SCRIPT FLUSH", 

332 "SCRIPT LOAD", 

333 ], 

334 PRIMARIES, 

335 ), 

336 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

337 list_keys_to_dict( 

338 [ 

339 "CLUSTER COUNTKEYSINSLOT", 

340 "CLUSTER DELSLOTS", 

341 "CLUSTER DELSLOTSRANGE", 

342 "CLUSTER GETKEYSINSLOT", 

343 "CLUSTER SETSLOT", 

344 ], 

345 SLOT_ID, 

346 ), 

347 ) 

348 

349 SEARCH_COMMANDS = ( 

350 [ 

351 "FT.CREATE", 

352 "FT.SEARCH", 

353 "FT.AGGREGATE", 

354 "FT.EXPLAIN", 

355 "FT.EXPLAINCLI", 

356 "FT,PROFILE", 

357 "FT.ALTER", 

358 "FT.DROPINDEX", 

359 "FT.ALIASADD", 

360 "FT.ALIASUPDATE", 

361 "FT.ALIASDEL", 

362 "FT.TAGVALS", 

363 "FT.SUGADD", 

364 "FT.SUGGET", 

365 "FT.SUGDEL", 

366 "FT.SUGLEN", 

367 "FT.SYNUPDATE", 

368 "FT.SYNDUMP", 

369 "FT.SPELLCHECK", 

370 "FT.DICTADD", 

371 "FT.DICTDEL", 

372 "FT.DICTDUMP", 

373 "FT.INFO", 

374 "FT._LIST", 

375 "FT.CONFIG", 

376 "FT.ADD", 

377 "FT.DEL", 

378 "FT.DROP", 

379 "FT.GET", 

380 "FT.MGET", 

381 "FT.SYNADD", 

382 ], 

383 ) 

384 

385 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

386 "CLUSTER SLOTS": parse_cluster_slots, 

387 "CLUSTER SHARDS": parse_cluster_shards, 

388 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

389 } 

390 

391 RESULT_CALLBACKS = dict_merge( 

392 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub), 

393 list_keys_to_dict( 

394 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

395 ), 

396 list_keys_to_dict( 

397 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result 

398 ), 

399 list_keys_to_dict( 

400 [ 

401 "PING", 

402 "CONFIG SET", 

403 "CONFIG REWRITE", 

404 "CONFIG RESETSTAT", 

405 "CLIENT SETNAME", 

406 "BGSAVE", 

407 "SLOWLOG RESET", 

408 "SAVE", 

409 "MEMORY PURGE", 

410 "CLIENT PAUSE", 

411 "CLIENT UNPAUSE", 

412 ], 

413 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

414 ), 

415 list_keys_to_dict( 

416 ["DBSIZE", "WAIT"], 

417 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

418 ), 

419 list_keys_to_dict( 

420 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

421 ), 

422 list_keys_to_dict(["SCAN"], parse_scan_result), 

423 list_keys_to_dict( 

424 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

425 ), 

426 list_keys_to_dict( 

427 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

428 ), 

429 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

430 ) 

431 

432 ERRORS_ALLOW_RETRY = ( 

433 ConnectionError, 

434 TimeoutError, 

435 ClusterDownError, 

436 SlotNotCoveredError, 

437 ) 

438 

439 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

440 """Replace the default cluster node. 

441 A random cluster node will be chosen if target_node isn't passed, and primaries 

442 will be prioritized. The default node will not be changed if there are no other 

443 nodes in the cluster. 

444 

445 Args: 

446 target_node (ClusterNode, optional): Target node to replace the default 

447 node. Defaults to None. 

448 """ 

449 if target_node: 

450 self.nodes_manager.default_node = target_node 

451 else: 

452 curr_node = self.get_default_node() 

453 primaries = [node for node in self.get_primaries() if node != curr_node] 

454 if primaries: 

455 # Choose a primary if the cluster contains different primaries 

456 self.nodes_manager.default_node = random.choice(primaries) 

457 else: 

458 # Otherwise, choose a primary if the cluster contains different primaries 

459 replicas = [node for node in self.get_replicas() if node != curr_node] 

460 if replicas: 

461 self.nodes_manager.default_node = random.choice(replicas) 

462 

463 

464class RedisCluster(AbstractRedisCluster, RedisClusterCommands): 

465 @classmethod 

466 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": 

467 """ 

468 Return a Redis client object configured from the given URL 

469 

470 For example:: 

471 

472 redis://[[username]:[password]]@localhost:6379/0 

473 rediss://[[username]:[password]]@localhost:6379/0 

474 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

475 

476 Three URL schemes are supported: 

477 

478 - `redis://` creates a TCP socket connection. See more at: 

479 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

480 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

481 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

482 - ``unix://``: creates a Unix Domain Socket connection. 

483 

484 The username, password, hostname, path and all querystring values 

485 are passed through urllib.parse.unquote in order to replace any 

486 percent-encoded values with their corresponding characters. 

487 

488 There are several ways to specify a database number. The first value 

489 found will be used: 

490 

491 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

492 2. If using the redis:// or rediss:// schemes, the path argument 

493 of the url, e.g. redis://localhost/0 

494 3. A ``db`` keyword argument to this function. 

495 

496 If none of these options are specified, the default db=0 is used. 

497 

498 All querystring options are cast to their appropriate Python types. 

499 Boolean arguments can be specified with string values "True"/"False" 

500 or "Yes"/"No". Values that cannot be properly cast cause a 

501 ``ValueError`` to be raised. Once parsed, the querystring arguments 

502 and keyword arguments are passed to the ``ConnectionPool``'s 

503 class initializer. In the case of conflicting arguments, querystring 

504 arguments always win. 

505 

506 """ 

507 return cls(url=url, **kwargs) 

508 

509 @deprecated_args( 

510 args_to_warn=["read_from_replicas"], 

511 reason="Please configure the 'load_balancing_strategy' instead", 

512 version="5.3.0", 

513 ) 

514 @deprecated_args( 

515 args_to_warn=[ 

516 "cluster_error_retry_attempts", 

517 ], 

518 reason="Please configure the 'retry' object instead", 

519 version="6.0.0", 

520 ) 

521 def __init__( 

522 self, 

523 host: Optional[str] = None, 

524 port: int = 6379, 

525 startup_nodes: Optional[List["ClusterNode"]] = None, 

526 cluster_error_retry_attempts: int = 3, 

527 retry: Optional["Retry"] = None, 

528 require_full_coverage: bool = True, 

529 reinitialize_steps: int = 5, 

530 read_from_replicas: bool = False, 

531 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None, 

532 dynamic_startup_nodes: bool = True, 

533 url: Optional[str] = None, 

534 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

535 cache: Optional[CacheInterface] = None, 

536 cache_config: Optional[CacheConfig] = None, 

537 event_dispatcher: Optional[EventDispatcher] = None, 

538 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

539 **kwargs, 

540 ): 

541 """ 

542 Initialize a new RedisCluster client. 

543 

544 :param startup_nodes: 

545 List of nodes from which initial bootstrapping can be done 

546 :param host: 

547 Can be used to point to a startup node 

548 :param port: 

549 Can be used to point to a startup node 

550 :param require_full_coverage: 

551 When set to False (default value): the client will not require a 

552 full coverage of the slots. However, if not all slots are covered, 

553 and at least one node has 'cluster-require-full-coverage' set to 

554 'yes,' the server will throw a ClusterDownError for some key-based 

555 commands. See - 

556 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

557 When set to True: all slots must be covered to construct the 

558 cluster client. If not all slots are covered, RedisClusterException 

559 will be thrown. 

560 :param read_from_replicas: 

561 @deprecated - please use load_balancing_strategy instead 

562 Enable read from replicas in READONLY mode. You can read possibly 

563 stale data. 

564 When set to true, read commands will be assigned between the 

565 primary and its replications in a Round-Robin manner. 

566 :param load_balancing_strategy: 

567 Enable read from replicas in READONLY mode and defines the load balancing 

568 strategy that will be used for cluster node selection. 

569 The data read from replicas is eventually consistent with the data in primary nodes. 

570 :param dynamic_startup_nodes: 

571 Set the RedisCluster's startup nodes to all of the discovered nodes. 

572 If true (default value), the cluster's discovered nodes will be used to 

573 determine the cluster nodes-slots mapping in the next topology refresh. 

574 It will remove the initial passed startup nodes if their endpoints aren't 

575 listed in the CLUSTER SLOTS output. 

576 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

577 specific IP addresses, it is best to set it to false. 

578 :param cluster_error_retry_attempts: 

579 @deprecated - Please configure the 'retry' object instead 

580 In case 'retry' object is set - this argument is ignored! 

581 

582 Number of times to retry before raising an error when 

583 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or 

584 :class:`~.ClusterDownError` are encountered 

585 :param retry: 

586 A retry object that defines the retry strategy and the number of 

587 retries for the cluster client. 

588 In current implementation for the cluster client (starting form redis-py version 6.0.0) 

589 the retry object is not yet fully utilized, instead it is used just to determine 

590 the number of retries for the cluster client. 

591 In the future releases the retry object will be used to handle the cluster client retries! 

592 :param reinitialize_steps: 

593 Specifies the number of MOVED errors that need to occur before 

594 reinitializing the whole cluster topology. If a MOVED error occurs 

595 and the cluster does not need to be reinitialized on this current 

596 error handling, only the MOVED slot will be patched with the 

597 redirected node. 

598 To reinitialize the cluster on every MOVED error, set 

599 reinitialize_steps to 1. 

600 To avoid reinitializing the cluster on moved errors, set 

601 reinitialize_steps to 0. 

602 :param address_remap: 

603 An optional callable which, when provided with an internal network 

604 address of a node, e.g. a `(host, port)` tuple, will return the address 

605 where the node is reachable. This can be used to map the addresses at 

606 which the nodes _think_ they are, to addresses at which a client may 

607 reach them, such as when they sit behind a proxy. 

608 

609 :**kwargs: 

610 Extra arguments that will be sent into Redis instance when created 

611 (See Official redis-py doc for supported kwargs - the only limitation 

612 is that you can't provide 'retry' object as part of kwargs. 

613 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

614 Some kwargs are not supported and will raise a 

615 RedisClusterException: 

616 - db (Redis do not support database SELECT in cluster mode) 

617 """ 

618 if startup_nodes is None: 

619 startup_nodes = [] 

620 

621 if "db" in kwargs: 

622 # Argument 'db' is not possible to use in cluster mode 

623 raise RedisClusterException( 

624 "Argument 'db' is not possible to use in cluster mode" 

625 ) 

626 

627 if "retry" in kwargs: 

628 # Argument 'retry' is not possible to be used in kwargs when in cluster mode 

629 # the kwargs are set to the lower level connections to the cluster nodes 

630 # and there we provide retry configuration without retries allowed. 

631 # The retries should be handled on cluster client level. 

632 raise RedisClusterException( 

633 "The 'retry' argument cannot be used in kwargs when running in cluster mode." 

634 ) 

635 

636 # Get the startup node/s 

637 from_url = False 

638 if url is not None: 

639 from_url = True 

640 url_options = parse_url(url) 

641 if "path" in url_options: 

642 raise RedisClusterException( 

643 "RedisCluster does not currently support Unix Domain " 

644 "Socket connections" 

645 ) 

646 if "db" in url_options and url_options["db"] != 0: 

647 # Argument 'db' is not possible to use in cluster mode 

648 raise RedisClusterException( 

649 "A ``db`` querystring option can only be 0 in cluster mode" 

650 ) 

651 kwargs.update(url_options) 

652 host = kwargs.get("host") 

653 port = kwargs.get("port", port) 

654 startup_nodes.append(ClusterNode(host, port)) 

655 elif host is not None and port is not None: 

656 startup_nodes.append(ClusterNode(host, port)) 

657 elif len(startup_nodes) == 0: 

658 # No startup node was provided 

659 raise RedisClusterException( 

660 "RedisCluster requires at least one node to discover the " 

661 "cluster. Please provide one of the followings:\n" 

662 "1. host and port, for example:\n" 

663 " RedisCluster(host='localhost', port=6379)\n" 

664 "2. list of startup nodes, for example:\n" 

665 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

666 " ClusterNode('localhost', 6378)])" 

667 ) 

668 # Update the connection arguments 

669 # Whenever a new connection is established, RedisCluster's on_connect 

670 # method should be run 

671 # If the user passed on_connect function we'll save it and run it 

672 # inside the RedisCluster.on_connect() function 

673 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

674 kwargs.update({"redis_connect_func": self.on_connect}) 

675 kwargs = cleanup_kwargs(**kwargs) 

676 if retry: 

677 self.retry = retry 

678 else: 

679 self.retry = Retry( 

680 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

681 retries=cluster_error_retry_attempts, 

682 ) 

683 

684 self.encoder = Encoder( 

685 kwargs.get("encoding", "utf-8"), 

686 kwargs.get("encoding_errors", "strict"), 

687 kwargs.get("decode_responses", False), 

688 ) 

689 protocol = kwargs.get("protocol", None) 

690 if (cache_config or cache) and protocol not in [3, "3"]: 

691 raise RedisError("Client caching is only supported with RESP version 3") 

692 

693 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

694 self.node_flags = self.__class__.NODE_FLAGS.copy() 

695 self.read_from_replicas = read_from_replicas 

696 self.load_balancing_strategy = load_balancing_strategy 

697 self.reinitialize_counter = 0 

698 self.reinitialize_steps = reinitialize_steps 

699 if event_dispatcher is None: 

700 self._event_dispatcher = EventDispatcher() 

701 else: 

702 self._event_dispatcher = event_dispatcher 

703 self.startup_nodes = startup_nodes 

704 self.nodes_manager = NodesManager( 

705 startup_nodes=startup_nodes, 

706 from_url=from_url, 

707 require_full_coverage=require_full_coverage, 

708 dynamic_startup_nodes=dynamic_startup_nodes, 

709 address_remap=address_remap, 

710 cache=cache, 

711 cache_config=cache_config, 

712 event_dispatcher=self._event_dispatcher, 

713 **kwargs, 

714 ) 

715 

716 self.cluster_response_callbacks = CaseInsensitiveDict( 

717 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

718 ) 

719 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

720 

721 # For backward compatibility, mapping from existing policies to new one 

722 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

723 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

724 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

725 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

726 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

727 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

728 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

729 } 

730 

731 self._policies_callback_mapping: dict[ 

732 Union[RequestPolicy, ResponsePolicy], Callable 

733 ] = { 

734 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

735 self.get_random_primary_or_all_nodes(command_name) 

736 ], 

737 RequestPolicy.DEFAULT_KEYED: lambda command, 

738 *args: self.get_nodes_from_slot(command, *args), 

739 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

740 RequestPolicy.ALL_SHARDS: self.get_primaries, 

741 RequestPolicy.ALL_NODES: self.get_nodes, 

742 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

743 RequestPolicy.MULTI_SHARD: lambda *args, 

744 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

745 RequestPolicy.SPECIAL: self.get_special_nodes, 

746 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

747 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

748 } 

749 

750 self._policy_resolver = policy_resolver 

751 self.commands_parser = CommandsParser(self) 

752 

753 # Node where FT.AGGREGATE command is executed. 

754 self._aggregate_nodes = None 

755 self._lock = threading.RLock() 

756 

757 def __enter__(self): 

758 return self 

759 

760 def __exit__(self, exc_type, exc_value, traceback): 

761 self.close() 

762 

763 def __del__(self): 

764 try: 

765 self.close() 

766 except Exception: 

767 pass 

768 

769 def disconnect_connection_pools(self): 

770 for node in self.get_nodes(): 

771 if node.redis_connection: 

772 try: 

773 node.redis_connection.connection_pool.disconnect() 

774 except OSError: 

775 # Client was already disconnected. do nothing 

776 pass 

777 

778 def on_connect(self, connection): 

779 """ 

780 Initialize the connection, authenticate and select a database and send 

781 READONLY if it is set during object initialization. 

782 """ 

783 connection.on_connect() 

784 

785 if self.read_from_replicas or self.load_balancing_strategy: 

786 # Sending READONLY command to server to configure connection as 

787 # readonly. Since each cluster node may change its server type due 

788 # to a failover, we should establish a READONLY connection 

789 # regardless of the server type. If this is a primary connection, 

790 # READONLY would not affect executing write commands. 

791 connection.send_command("READONLY") 

792 if str_if_bytes(connection.read_response()) != "OK": 

793 raise ConnectionError("READONLY command failed") 

794 

795 if self.user_on_connect_func is not None: 

796 self.user_on_connect_func(connection) 

797 

798 def get_redis_connection(self, node: "ClusterNode") -> Redis: 

799 if not node.redis_connection: 

800 with self._lock: 

801 if not node.redis_connection: 

802 self.nodes_manager.create_redis_connections([node]) 

803 return node.redis_connection 

804 

805 def get_node(self, host=None, port=None, node_name=None): 

806 return self.nodes_manager.get_node(host, port, node_name) 

807 

808 def get_primaries(self): 

809 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

810 

811 def get_replicas(self): 

812 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

813 

814 def get_random_node(self): 

815 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

816 

817 def get_random_primary_or_all_nodes(self, command_name): 

818 """ 

819 Returns random primary or all nodes depends on READONLY mode. 

820 """ 

821 if self.read_from_replicas and command_name in READ_COMMANDS: 

822 return self.get_random_node() 

823 

824 return self.get_random_primary_node() 

825 

826 def get_nodes(self): 

827 return list(self.nodes_manager.nodes_cache.values()) 

828 

829 def get_node_from_key(self, key, replica=False): 

830 """ 

831 Get the node that holds the key's slot. 

832 If replica set to True but the slot doesn't have any replicas, None is 

833 returned. 

834 """ 

835 slot = self.keyslot(key) 

836 slot_cache = self.nodes_manager.slots_cache.get(slot) 

837 if slot_cache is None or len(slot_cache) == 0: 

838 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

839 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

840 return None 

841 elif replica: 

842 node_idx = 1 

843 else: 

844 # primary 

845 node_idx = 0 

846 

847 return slot_cache[node_idx] 

848 

849 def get_default_node(self): 

850 """ 

851 Get the cluster's default node 

852 """ 

853 return self.nodes_manager.default_node 

854 

855 def get_nodes_from_slot(self, command: str, *args): 

856 """ 

857 Returns a list of nodes that hold the specified keys' slots. 

858 """ 

859 # get the node that holds the key's slot 

860 slot = self.determine_slot(*args) 

861 node = self.nodes_manager.get_node_from_slot( 

862 slot, 

863 self.read_from_replicas and command in READ_COMMANDS, 

864 self.load_balancing_strategy if command in READ_COMMANDS else None, 

865 ) 

866 return [node] 

867 

868 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]: 

869 """ 

870 Splits the command with Multi-Shard policy, to the multiple commands 

871 """ 

872 keys = self._get_command_keys(*args) 

873 commands = [] 

874 

875 for key in keys: 

876 commands.append( 

877 { 

878 "args": (args[0], key), 

879 "kwargs": kwargs, 

880 } 

881 ) 

882 

883 return commands 

884 

885 def get_special_nodes(self) -> Optional[list["ClusterNode"]]: 

886 """ 

887 Returns a list of nodes for commands with a special policy. 

888 """ 

889 if not self._aggregate_nodes: 

890 raise RedisClusterException( 

891 "Cannot execute FT.CURSOR commands without FT.AGGREGATE" 

892 ) 

893 

894 return self._aggregate_nodes 

895 

896 def get_random_primary_node(self) -> "ClusterNode": 

897 """ 

898 Returns a random primary node 

899 """ 

900 return random.choice(self.get_primaries()) 

901 

902 def _evaluate_all_succeeded(self, res): 

903 """ 

904 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED 

905 """ 

906 first_successful_response = None 

907 

908 if isinstance(res, dict): 

909 for key, value in res.items(): 

910 if value: 

911 if first_successful_response is None: 

912 first_successful_response = {key: value} 

913 else: 

914 return {key: False} 

915 else: 

916 for response in res: 

917 if response: 

918 if first_successful_response is None: 

919 # Dynamically resolve type 

920 first_successful_response = type(response)(response) 

921 else: 

922 return type(response)(False) 

923 

924 return first_successful_response 

925 

926 def set_default_node(self, node): 

927 """ 

928 Set the default node of the cluster. 

929 :param node: 'ClusterNode' 

930 :return True if the default node was set, else False 

931 """ 

932 if node is None or self.get_node(node_name=node.name) is None: 

933 return False 

934 self.nodes_manager.default_node = node 

935 return True 

936 

937 def set_retry(self, retry: Retry) -> None: 

938 self.retry = retry 

939 

940 def monitor(self, target_node=None): 

941 """ 

942 Returns a Monitor object for the specified target node. 

943 The default cluster node will be selected if no target node was 

944 specified. 

945 Monitor is useful for handling the MONITOR command to the redis server. 

946 next_command() method returns one command from monitor 

947 listen() method yields commands from monitor. 

948 """ 

949 if target_node is None: 

950 target_node = self.get_default_node() 

951 if target_node.redis_connection is None: 

952 raise RedisClusterException( 

953 f"Cluster Node {target_node.name} has no redis_connection" 

954 ) 

955 return target_node.redis_connection.monitor() 

956 

957 def pubsub(self, node=None, host=None, port=None, **kwargs): 

958 """ 

959 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

960 connected to the specified node 

961 """ 

962 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

963 

964 def pipeline(self, transaction=None, shard_hint=None): 

965 """ 

966 Cluster impl: 

967 Pipelines do not work in cluster mode the same way they 

968 do in normal mode. Create a clone of this object so 

969 that simulating pipelines will work correctly. Each 

970 command will be called directly when used and 

971 when calling execute() will only return the result stack. 

972 """ 

973 if shard_hint: 

974 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

975 

976 return ClusterPipeline( 

977 nodes_manager=self.nodes_manager, 

978 commands_parser=self.commands_parser, 

979 startup_nodes=self.nodes_manager.startup_nodes, 

980 result_callbacks=self.result_callbacks, 

981 cluster_response_callbacks=self.cluster_response_callbacks, 

982 read_from_replicas=self.read_from_replicas, 

983 load_balancing_strategy=self.load_balancing_strategy, 

984 reinitialize_steps=self.reinitialize_steps, 

985 retry=self.retry, 

986 lock=self._lock, 

987 transaction=transaction, 

988 ) 

989 

990 def lock( 

991 self, 

992 name, 

993 timeout=None, 

994 sleep=0.1, 

995 blocking=True, 

996 blocking_timeout=None, 

997 lock_class=None, 

998 thread_local=True, 

999 raise_on_release_error: bool = True, 

1000 ): 

1001 """ 

1002 Return a new Lock object using key ``name`` that mimics 

1003 the behavior of threading.Lock. 

1004 

1005 If specified, ``timeout`` indicates a maximum life for the lock. 

1006 By default, it will remain locked until release() is called. 

1007 

1008 ``sleep`` indicates the amount of time to sleep per loop iteration 

1009 when the lock is in blocking mode and another client is currently 

1010 holding the lock. 

1011 

1012 ``blocking`` indicates whether calling ``acquire`` should block until 

1013 the lock has been acquired or to fail immediately, causing ``acquire`` 

1014 to return False and the lock not being acquired. Defaults to True. 

1015 Note this value can be overridden by passing a ``blocking`` 

1016 argument to ``acquire``. 

1017 

1018 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

1019 spend trying to acquire the lock. A value of ``None`` indicates 

1020 continue trying forever. ``blocking_timeout`` can be specified as a 

1021 float or integer, both representing the number of seconds to wait. 

1022 

1023 ``lock_class`` forces the specified lock implementation. Note that as 

1024 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

1025 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

1026 you have created your own custom lock class. 

1027 

1028 ``thread_local`` indicates whether the lock token is placed in 

1029 thread-local storage. By default, the token is placed in thread local 

1030 storage so that a thread only sees its token, not a token set by 

1031 another thread. Consider the following timeline: 

1032 

1033 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

1034 thread-1 sets the token to "abc" 

1035 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

1036 Lock instance. 

1037 time: 5, thread-1 has not yet completed. redis expires the lock 

1038 key. 

1039 time: 5, thread-2 acquired `my-lock` now that it's available. 

1040 thread-2 sets the token to "xyz" 

1041 time: 6, thread-1 finishes its work and calls release(). if the 

1042 token is *not* stored in thread local storage, then 

1043 thread-1 would see the token value as "xyz" and would be 

1044 able to successfully release the thread-2's lock. 

1045 

1046 ``raise_on_release_error`` indicates whether to raise an exception when 

1047 the lock is no longer owned when exiting the context manager. By default, 

1048 this is True, meaning an exception will be raised. If False, the warning 

1049 will be logged and the exception will be suppressed. 

1050 

1051 In some use cases it's necessary to disable thread local storage. For 

1052 example, if you have code where one thread acquires a lock and passes 

1053 that lock instance to a worker thread to release later. If thread 

1054 local storage isn't disabled in this case, the worker thread won't see 

1055 the token set by the thread that acquired the lock. Our assumption 

1056 is that these cases aren't common and as such default to using 

1057 thread local storage.""" 

1058 if lock_class is None: 

1059 lock_class = Lock 

1060 return lock_class( 

1061 self, 

1062 name, 

1063 timeout=timeout, 

1064 sleep=sleep, 

1065 blocking=blocking, 

1066 blocking_timeout=blocking_timeout, 

1067 thread_local=thread_local, 

1068 raise_on_release_error=raise_on_release_error, 

1069 ) 

1070 

1071 def set_response_callback(self, command, callback): 

1072 """Set a custom Response Callback""" 

1073 self.cluster_response_callbacks[command] = callback 

1074 

1075 def _determine_nodes( 

1076 self, *args, request_policy: RequestPolicy, **kwargs 

1077 ) -> List["ClusterNode"]: 

1078 """ 

1079 Determines a nodes the command should be executed on. 

1080 """ 

1081 command = args[0].upper() 

1082 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1083 command = f"{args[0]} {args[1]}".upper() 

1084 

1085 nodes_flag = kwargs.pop("nodes_flag", None) 

1086 if nodes_flag is not None: 

1087 # nodes flag passed by the user 

1088 command_flag = nodes_flag 

1089 else: 

1090 # get the nodes group for this command if it was predefined 

1091 command_flag = self.command_flags.get(command) 

1092 

1093 if command_flag in self._command_flags_mapping: 

1094 request_policy = self._command_flags_mapping[command_flag] 

1095 

1096 policy_callback = self._policies_callback_mapping[request_policy] 

1097 

1098 if request_policy == RequestPolicy.DEFAULT_KEYED: 

1099 nodes = policy_callback(command, *args) 

1100 elif request_policy == RequestPolicy.MULTI_SHARD: 

1101 nodes = policy_callback(*args, **kwargs) 

1102 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

1103 nodes = policy_callback(args[0]) 

1104 else: 

1105 nodes = policy_callback() 

1106 

1107 if args[0].lower() == "ft.aggregate": 

1108 self._aggregate_nodes = nodes 

1109 

1110 return nodes 

1111 

1112 def _should_reinitialized(self): 

1113 # To reinitialize the cluster on every MOVED error, 

1114 # set reinitialize_steps to 1. 

1115 # To avoid reinitializing the cluster on moved errors, set 

1116 # reinitialize_steps to 0. 

1117 if self.reinitialize_steps == 0: 

1118 return False 

1119 else: 

1120 return self.reinitialize_counter % self.reinitialize_steps == 0 

1121 

1122 def keyslot(self, key): 

1123 """ 

1124 Calculate keyslot for a given key. 

1125 See Keys distribution model in https://redis.io/topics/cluster-spec 

1126 """ 

1127 k = self.encoder.encode(key) 

1128 return key_slot(k) 

1129 

1130 def _get_command_keys(self, *args): 

1131 """ 

1132 Get the keys in the command. If the command has no keys in in, None is 

1133 returned. 

1134 

1135 NOTE: Due to a bug in redis<7.0, this function does not work properly 

1136 for EVAL or EVALSHA when the `numkeys` arg is 0. 

1137 - issue: https://github.com/redis/redis/issues/9493 

1138 - fix: https://github.com/redis/redis/pull/9733 

1139 

1140 So, don't use this function with EVAL or EVALSHA. 

1141 """ 

1142 redis_conn = self.get_default_node().redis_connection 

1143 return self.commands_parser.get_keys(redis_conn, *args) 

1144 

1145 def determine_slot(self, *args) -> Optional[int]: 

1146 """ 

1147 Figure out what slot to use based on args. 

1148 

1149 Raises a RedisClusterException if there's a missing key and we can't 

1150 determine what slots to map the command to; or, if the keys don't 

1151 all map to the same key slot. 

1152 """ 

1153 command = args[0] 

1154 if self.command_flags.get(command) == SLOT_ID: 

1155 # The command contains the slot ID 

1156 return args[1] 

1157 

1158 # Get the keys in the command 

1159 

1160 # CLIENT TRACKING is a special case. 

1161 # It doesn't have any keys, it needs to be sent to the provided nodes 

1162 # By default it will be sent to all nodes. 

1163 if command.upper() == "CLIENT TRACKING": 

1164 return None 

1165 

1166 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

1167 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

1168 # where `self._get_command_keys()` fails anyway. So, we special case 

1169 # EVAL/EVALSHA. 

1170 if command.upper() in ("EVAL", "EVALSHA"): 

1171 # command syntax: EVAL "script body" num_keys ... 

1172 if len(args) <= 2: 

1173 raise RedisClusterException(f"Invalid args in command: {args}") 

1174 num_actual_keys = int(args[2]) 

1175 eval_keys = args[3 : 3 + num_actual_keys] 

1176 # if there are 0 keys, that means the script can be run on any node 

1177 # so we can just return a random slot 

1178 if len(eval_keys) == 0: 

1179 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1180 keys = eval_keys 

1181 else: 

1182 keys = self._get_command_keys(*args) 

1183 if keys is None or len(keys) == 0: 

1184 # FCALL can call a function with 0 keys, that means the function 

1185 # can be run on any node so we can just return a random slot 

1186 if command.upper() in ("FCALL", "FCALL_RO"): 

1187 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1188 raise RedisClusterException( 

1189 "No way to dispatch this command to Redis Cluster. " 

1190 "Missing key.\nYou can execute the command by specifying " 

1191 f"target nodes.\nCommand: {args}" 

1192 ) 

1193 

1194 # single key command 

1195 if len(keys) == 1: 

1196 return self.keyslot(keys[0]) 

1197 

1198 # multi-key command; we need to make sure all keys are mapped to 

1199 # the same slot 

1200 slots = {self.keyslot(key) for key in keys} 

1201 if len(slots) != 1: 

1202 raise RedisClusterException( 

1203 f"{command} - all keys must map to the same key slot" 

1204 ) 

1205 

1206 return slots.pop() 

1207 

1208 def get_encoder(self): 

1209 """ 

1210 Get the connections' encoder 

1211 """ 

1212 return self.encoder 

1213 

1214 def get_connection_kwargs(self): 

1215 """ 

1216 Get the connections' key-word arguments 

1217 """ 

1218 return self.nodes_manager.connection_kwargs 

1219 

1220 def _is_nodes_flag(self, target_nodes): 

1221 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1222 

1223 def _parse_target_nodes(self, target_nodes): 

1224 if isinstance(target_nodes, list): 

1225 nodes = target_nodes 

1226 elif isinstance(target_nodes, ClusterNode): 

1227 # Supports passing a single ClusterNode as a variable 

1228 nodes = [target_nodes] 

1229 elif isinstance(target_nodes, dict): 

1230 # Supports dictionaries of the format {node_name: node}. 

1231 # It enables to execute commands with multi nodes as follows: 

1232 # rc.cluster_save_config(rc.get_primaries()) 

1233 nodes = target_nodes.values() 

1234 else: 

1235 raise TypeError( 

1236 "target_nodes type can be one of the following: " 

1237 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1238 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1239 f"The passed type is {type(target_nodes)}" 

1240 ) 

1241 return nodes 

1242 

1243 def execute_command(self, *args, **kwargs): 

1244 return self._internal_execute_command(*args, **kwargs) 

1245 

1246 def _internal_execute_command(self, *args, **kwargs): 

1247 """ 

1248 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1249 

1250 It will try the number of times specified by the retries property from 

1251 config option "self.retry" which defaults to 3 unless manually 

1252 configured. 

1253 

1254 If it reaches the number of times, the command will raise the exception 

1255 

1256 Key argument :target_nodes: can be passed with the following types: 

1257 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1258 ClusterNode 

1259 list<ClusterNode> 

1260 dict<Any, ClusterNode> 

1261 """ 

1262 target_nodes_specified = False 

1263 is_default_node = False 

1264 target_nodes = None 

1265 passed_targets = kwargs.pop("target_nodes", None) 

1266 command_policies = self._policy_resolver.resolve(args[0].lower()) 

1267 

1268 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1269 target_nodes = self._parse_target_nodes(passed_targets) 

1270 target_nodes_specified = True 

1271 

1272 if not command_policies and not target_nodes_specified: 

1273 command = args[0].upper() 

1274 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1275 command = f"{args[0]} {args[1]}".upper() 

1276 

1277 # We only could resolve key properties if command is not 

1278 # in a list of pre-defined request policies 

1279 command_flag = self.command_flags.get(command) 

1280 if not command_flag: 

1281 # Fallback to default policy 

1282 if not self.get_default_node(): 

1283 slot = None 

1284 else: 

1285 slot = self.determine_slot(*args) 

1286 if not slot: 

1287 command_policies = CommandPolicies() 

1288 else: 

1289 command_policies = CommandPolicies( 

1290 request_policy=RequestPolicy.DEFAULT_KEYED, 

1291 response_policy=ResponsePolicy.DEFAULT_KEYED, 

1292 ) 

1293 else: 

1294 if command_flag in self._command_flags_mapping: 

1295 command_policies = CommandPolicies( 

1296 request_policy=self._command_flags_mapping[command_flag] 

1297 ) 

1298 else: 

1299 command_policies = CommandPolicies() 

1300 elif not command_policies and target_nodes_specified: 

1301 command_policies = CommandPolicies() 

1302 

1303 # If an error that allows retrying was thrown, the nodes and slots 

1304 # cache were reinitialized. We will retry executing the command with 

1305 # the updated cluster setup only when the target nodes can be 

1306 # determined again with the new cache tables. Therefore, when target 

1307 # nodes were passed to this function, we cannot retry the command 

1308 # execution since the nodes may not be valid anymore after the tables 

1309 # were reinitialized. So in case of passed target nodes, 

1310 # retry_attempts will be set to 0. 

1311 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries() 

1312 # Add one for the first execution 

1313 execute_attempts = 1 + retry_attempts 

1314 for _ in range(execute_attempts): 

1315 try: 

1316 res = {} 

1317 if not target_nodes_specified: 

1318 # Determine the nodes to execute the command on 

1319 target_nodes = self._determine_nodes( 

1320 *args, 

1321 request_policy=command_policies.request_policy, 

1322 nodes_flag=passed_targets, 

1323 ) 

1324 if not target_nodes: 

1325 raise RedisClusterException( 

1326 f"No targets were found to execute {args} command on" 

1327 ) 

1328 if ( 

1329 len(target_nodes) == 1 

1330 and target_nodes[0] == self.get_default_node() 

1331 ): 

1332 is_default_node = True 

1333 for node in target_nodes: 

1334 res[node.name] = self._execute_command(node, *args, **kwargs) 

1335 

1336 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED: 

1337 break 

1338 

1339 # Return the processed result 

1340 return self._process_result( 

1341 args[0], 

1342 res, 

1343 response_policy=command_policies.response_policy, 

1344 **kwargs, 

1345 ) 

1346 except Exception as e: 

1347 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1348 if is_default_node: 

1349 # Replace the default cluster node 

1350 self.replace_default_node() 

1351 # The nodes and slots cache were reinitialized. 

1352 # Try again with the new cluster setup. 

1353 retry_attempts -= 1 

1354 continue 

1355 else: 

1356 # raise the exception 

1357 raise e 

1358 

1359 def _execute_command(self, target_node, *args, **kwargs): 

1360 """ 

1361 Send a command to a node in the cluster 

1362 """ 

1363 command = args[0] 

1364 redis_node = None 

1365 connection = None 

1366 redirect_addr = None 

1367 asking = False 

1368 moved = False 

1369 ttl = int(self.RedisClusterRequestTTL) 

1370 

1371 while ttl > 0: 

1372 ttl -= 1 

1373 try: 

1374 if asking: 

1375 target_node = self.get_node(node_name=redirect_addr) 

1376 elif moved: 

1377 # MOVED occurred and the slots cache was updated, 

1378 # refresh the target node 

1379 slot = self.determine_slot(*args) 

1380 target_node = self.nodes_manager.get_node_from_slot( 

1381 slot, 

1382 self.read_from_replicas and command in READ_COMMANDS, 

1383 self.load_balancing_strategy 

1384 if command in READ_COMMANDS 

1385 else None, 

1386 ) 

1387 moved = False 

1388 

1389 redis_node = self.get_redis_connection(target_node) 

1390 connection = get_connection(redis_node) 

1391 if asking: 

1392 connection.send_command("ASKING") 

1393 redis_node.parse_response(connection, "ASKING", **kwargs) 

1394 asking = False 

1395 connection.send_command(*args, **kwargs) 

1396 response = redis_node.parse_response(connection, command, **kwargs) 

1397 

1398 # Remove keys entry, it needs only for cache. 

1399 kwargs.pop("keys", None) 

1400 

1401 if command in self.cluster_response_callbacks: 

1402 response = self.cluster_response_callbacks[command]( 

1403 response, **kwargs 

1404 ) 

1405 return response 

1406 except AuthenticationError: 

1407 raise 

1408 except MaxConnectionsError: 

1409 # MaxConnectionsError indicates client-side resource exhaustion 

1410 # (too many connections in the pool), not a node failure. 

1411 # Don't treat this as a node failure - just re-raise the error 

1412 # without reinitializing the cluster. 

1413 raise 

1414 except (ConnectionError, TimeoutError) as e: 

1415 # ConnectionError can also be raised if we couldn't get a 

1416 # connection from the pool before timing out, so check that 

1417 # this is an actual connection before attempting to disconnect. 

1418 if connection is not None: 

1419 connection.disconnect() 

1420 

1421 # Remove the failed node from the startup nodes before we try 

1422 # to reinitialize the cluster 

1423 self.nodes_manager.startup_nodes.pop(target_node.name, None) 

1424 # Reset the cluster node's connection 

1425 target_node.redis_connection = None 

1426 self.nodes_manager.initialize() 

1427 raise e 

1428 except MovedError as e: 

1429 # First, we will try to patch the slots/nodes cache with the 

1430 # redirected node output and try again. If MovedError exceeds 

1431 # 'reinitialize_steps' number of times, we will force 

1432 # reinitializing the tables, and then try again. 

1433 # 'reinitialize_steps' counter will increase faster when 

1434 # the same client object is shared between multiple threads. To 

1435 # reduce the frequency you can set this variable in the 

1436 # RedisCluster constructor. 

1437 self.reinitialize_counter += 1 

1438 if self._should_reinitialized(): 

1439 self.nodes_manager.initialize() 

1440 # Reset the counter 

1441 self.reinitialize_counter = 0 

1442 else: 

1443 self.nodes_manager.update_moved_exception(e) 

1444 moved = True 

1445 except TryAgainError: 

1446 if ttl < self.RedisClusterRequestTTL / 2: 

1447 time.sleep(0.05) 

1448 except AskError as e: 

1449 redirect_addr = get_node_name(host=e.host, port=e.port) 

1450 asking = True 

1451 except (ClusterDownError, SlotNotCoveredError): 

1452 # ClusterDownError can occur during a failover and to get 

1453 # self-healed, we will try to reinitialize the cluster layout 

1454 # and retry executing the command 

1455 

1456 # SlotNotCoveredError can occur when the cluster is not fully 

1457 # initialized or can be temporary issue. 

1458 # We will try to reinitialize the cluster topology 

1459 # and retry executing the command 

1460 

1461 time.sleep(0.25) 

1462 self.nodes_manager.initialize() 

1463 raise 

1464 except ResponseError: 

1465 raise 

1466 except Exception as e: 

1467 if connection: 

1468 connection.disconnect() 

1469 raise e 

1470 finally: 

1471 if connection is not None: 

1472 redis_node.connection_pool.release(connection) 

1473 

1474 raise ClusterError("TTL exhausted.") 

1475 

1476 def close(self) -> None: 

1477 try: 

1478 with self._lock: 

1479 if self.nodes_manager: 

1480 self.nodes_manager.close() 

1481 except AttributeError: 

1482 # RedisCluster's __init__ can fail before nodes_manager is set 

1483 pass 

1484 

1485 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs): 

1486 """ 

1487 Process the result of the executed command. 

1488 The function would return a dict or a single value. 

1489 

1490 :type command: str 

1491 :type res: dict 

1492 

1493 `res` should be in the following format: 

1494 Dict<node_name, command_result> 

1495 """ 

1496 if command in self.result_callbacks: 

1497 res = self.result_callbacks[command](command, res, **kwargs) 

1498 elif len(res) == 1: 

1499 # When we execute the command on a single node, we can 

1500 # remove the dictionary and return a single response 

1501 res = list(res.values())[0] 

1502 

1503 return self._policies_callback_mapping[response_policy](res) 

1504 

1505 def load_external_module(self, funcname, func): 

1506 """ 

1507 This function can be used to add externally defined redis modules, 

1508 and their namespaces to the redis client. 

1509 

1510 ``funcname`` - A string containing the name of the function to create 

1511 ``func`` - The function, being added to this class. 

1512 """ 

1513 setattr(self, funcname, func) 

1514 

1515 def transaction(self, func, *watches, **kwargs): 

1516 """ 

1517 Convenience method for executing the callable `func` as a transaction 

1518 while watching all keys specified in `watches`. The 'func' callable 

1519 should expect a single argument which is a Pipeline object. 

1520 """ 

1521 shard_hint = kwargs.pop("shard_hint", None) 

1522 value_from_callable = kwargs.pop("value_from_callable", False) 

1523 watch_delay = kwargs.pop("watch_delay", None) 

1524 with self.pipeline(True, shard_hint) as pipe: 

1525 while True: 

1526 try: 

1527 if watches: 

1528 pipe.watch(*watches) 

1529 func_value = func(pipe) 

1530 exec_value = pipe.execute() 

1531 return func_value if value_from_callable else exec_value 

1532 except WatchError: 

1533 if watch_delay is not None and watch_delay > 0: 

1534 time.sleep(watch_delay) 

1535 continue 

1536 

1537 

1538class ClusterNode: 

1539 def __init__(self, host, port, server_type=None, redis_connection=None): 

1540 if host == "localhost": 

1541 host = socket.gethostbyname(host) 

1542 

1543 self.host = host 

1544 self.port = port 

1545 self.name = get_node_name(host, port) 

1546 self.server_type = server_type 

1547 self.redis_connection = redis_connection 

1548 

1549 def __repr__(self): 

1550 return ( 

1551 f"[host={self.host}," 

1552 f"port={self.port}," 

1553 f"name={self.name}," 

1554 f"server_type={self.server_type}," 

1555 f"redis_connection={self.redis_connection}]" 

1556 ) 

1557 

1558 def __eq__(self, obj): 

1559 return isinstance(obj, ClusterNode) and obj.name == self.name 

1560 

1561 def __del__(self): 

1562 try: 

1563 if self.redis_connection is not None: 

1564 self.redis_connection.close() 

1565 except Exception: 

1566 # Ignore errors when closing the connection 

1567 pass 

1568 

1569 

1570class LoadBalancingStrategy(Enum): 

1571 ROUND_ROBIN = "round_robin" 

1572 ROUND_ROBIN_REPLICAS = "round_robin_replicas" 

1573 RANDOM_REPLICA = "random_replica" 

1574 

1575 

1576class LoadBalancer: 

1577 """ 

1578 Round-Robin Load Balancing 

1579 """ 

1580 

1581 def __init__(self, start_index: int = 0) -> None: 

1582 self.primary_to_idx = {} 

1583 self.start_index = start_index 

1584 

1585 def get_server_index( 

1586 self, 

1587 primary: str, 

1588 list_size: int, 

1589 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN, 

1590 ) -> int: 

1591 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA: 

1592 return self._get_random_replica_index(list_size) 

1593 else: 

1594 return self._get_round_robin_index( 

1595 primary, 

1596 list_size, 

1597 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, 

1598 ) 

1599 

1600 def reset(self) -> None: 

1601 self.primary_to_idx.clear() 

1602 

1603 def _get_random_replica_index(self, list_size: int) -> int: 

1604 return random.randint(1, list_size - 1) 

1605 

1606 def _get_round_robin_index( 

1607 self, primary: str, list_size: int, replicas_only: bool 

1608 ) -> int: 

1609 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

1610 if replicas_only and server_index == 0: 

1611 # skip the primary node index 

1612 server_index = 1 

1613 # Update the index for the next round 

1614 self.primary_to_idx[primary] = (server_index + 1) % list_size 

1615 return server_index 

1616 

1617 

1618class NodesManager: 

1619 def __init__( 

1620 self, 

1621 startup_nodes, 

1622 from_url=False, 

1623 require_full_coverage=False, 

1624 lock=None, 

1625 dynamic_startup_nodes=True, 

1626 connection_pool_class=ConnectionPool, 

1627 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

1628 cache: Optional[CacheInterface] = None, 

1629 cache_config: Optional[CacheConfig] = None, 

1630 cache_factory: Optional[CacheFactoryInterface] = None, 

1631 event_dispatcher: Optional[EventDispatcher] = None, 

1632 **kwargs, 

1633 ): 

1634 self.nodes_cache: Dict[str, Redis] = {} 

1635 self.slots_cache = {} 

1636 self.startup_nodes = {} 

1637 self.default_node = None 

1638 self.populate_startup_nodes(startup_nodes) 

1639 self.from_url = from_url 

1640 self._require_full_coverage = require_full_coverage 

1641 self._dynamic_startup_nodes = dynamic_startup_nodes 

1642 self.connection_pool_class = connection_pool_class 

1643 self.address_remap = address_remap 

1644 self._cache = cache 

1645 self._cache_config = cache_config 

1646 self._cache_factory = cache_factory 

1647 self._moved_exception = None 

1648 self.connection_kwargs = kwargs 

1649 self.read_load_balancer = LoadBalancer() 

1650 if lock is None: 

1651 lock = threading.RLock() 

1652 self._lock = lock 

1653 if event_dispatcher is None: 

1654 self._event_dispatcher = EventDispatcher() 

1655 else: 

1656 self._event_dispatcher = event_dispatcher 

1657 self._credential_provider = self.connection_kwargs.get( 

1658 "credential_provider", None 

1659 ) 

1660 self.initialize() 

1661 

1662 def get_node(self, host=None, port=None, node_name=None): 

1663 """ 

1664 Get the requested node from the cluster's nodes. 

1665 nodes. 

1666 :return: ClusterNode if the node exists, else None 

1667 """ 

1668 if host and port: 

1669 # the user passed host and port 

1670 if host == "localhost": 

1671 host = socket.gethostbyname(host) 

1672 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1673 elif node_name: 

1674 return self.nodes_cache.get(node_name) 

1675 else: 

1676 return None 

1677 

1678 def update_moved_exception(self, exception): 

1679 self._moved_exception = exception 

1680 

1681 def _update_moved_slots(self): 

1682 """ 

1683 Update the slot's node with the redirected one 

1684 """ 

1685 e = self._moved_exception 

1686 redirected_node = self.get_node(host=e.host, port=e.port) 

1687 if redirected_node is not None: 

1688 # The node already exists 

1689 if redirected_node.server_type is not PRIMARY: 

1690 # Update the node's server type 

1691 redirected_node.server_type = PRIMARY 

1692 else: 

1693 # This is a new node, we will add it to the nodes cache 

1694 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

1695 self.nodes_cache[redirected_node.name] = redirected_node 

1696 slot_nodes = self.slots_cache[e.slot_id] 

1697 if redirected_node not in slot_nodes: 

1698 # The new slot owner is a new server, or a server from a different 

1699 # shard. We need to remove all current nodes from the slot's list 

1700 # (including replications) and add just the new node. 

1701 self.slots_cache[e.slot_id] = [redirected_node] 

1702 elif redirected_node is not slot_nodes[0]: 

1703 # The MOVED error resulted from a failover, and the new slot owner 

1704 # had previously been a replica. 

1705 old_primary = slot_nodes[0] 

1706 # Update the old primary to be a replica and add it to the end of 

1707 # the slot's node list 

1708 old_primary.server_type = REPLICA 

1709 slot_nodes.append(old_primary) 

1710 # Remove the old replica, which is now a primary, from the slot's 

1711 # node list 

1712 slot_nodes.remove(redirected_node) 

1713 # Override the old primary with the new one 

1714 slot_nodes[0] = redirected_node 

1715 if self.default_node == old_primary: 

1716 # Update the default node with the new primary 

1717 self.default_node = redirected_node 

1718 # else: circular MOVED to current primary -> no-op 

1719 

1720 # Reset moved_exception 

1721 self._moved_exception = None 

1722 

1723 @deprecated_args( 

1724 args_to_warn=["server_type"], 

1725 reason=( 

1726 "In case you need select some load balancing strategy " 

1727 "that will use replicas, please set it through 'load_balancing_strategy'" 

1728 ), 

1729 version="5.3.0", 

1730 ) 

1731 def get_node_from_slot( 

1732 self, 

1733 slot, 

1734 read_from_replicas=False, 

1735 load_balancing_strategy=None, 

1736 server_type=None, 

1737 ) -> ClusterNode: 

1738 """ 

1739 Gets a node that servers this hash slot 

1740 """ 

1741 if self._moved_exception: 

1742 with self._lock: 

1743 if self._moved_exception: 

1744 self._update_moved_slots() 

1745 

1746 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

1747 raise SlotNotCoveredError( 

1748 f'Slot "{slot}" not covered by the cluster. ' 

1749 f'"require_full_coverage={self._require_full_coverage}"' 

1750 ) 

1751 

1752 if read_from_replicas is True and load_balancing_strategy is None: 

1753 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN 

1754 

1755 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: 

1756 # get the server index using the strategy defined in load_balancing_strategy 

1757 primary_name = self.slots_cache[slot][0].name 

1758 node_idx = self.read_load_balancer.get_server_index( 

1759 primary_name, len(self.slots_cache[slot]), load_balancing_strategy 

1760 ) 

1761 elif ( 

1762 server_type is None 

1763 or server_type == PRIMARY 

1764 or len(self.slots_cache[slot]) == 1 

1765 ): 

1766 # return a primary 

1767 node_idx = 0 

1768 else: 

1769 # return a replica 

1770 # randomly choose one of the replicas 

1771 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

1772 

1773 return self.slots_cache[slot][node_idx] 

1774 

1775 def get_nodes_by_server_type(self, server_type): 

1776 """ 

1777 Get all nodes with the specified server type 

1778 :param server_type: 'primary' or 'replica' 

1779 :return: list of ClusterNode 

1780 """ 

1781 return [ 

1782 node 

1783 for node in self.nodes_cache.values() 

1784 if node.server_type == server_type 

1785 ] 

1786 

1787 def populate_startup_nodes(self, nodes): 

1788 """ 

1789 Populate all startup nodes and filters out any duplicates 

1790 """ 

1791 for n in nodes: 

1792 self.startup_nodes[n.name] = n 

1793 

1794 def check_slots_coverage(self, slots_cache): 

1795 # Validate if all slots are covered or if we should try next 

1796 # startup node 

1797 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

1798 if i not in slots_cache: 

1799 return False 

1800 return True 

1801 

1802 def create_redis_connections(self, nodes): 

1803 """ 

1804 This function will create a redis connection to all nodes in :nodes: 

1805 """ 

1806 connection_pools = [] 

1807 for node in nodes: 

1808 if node.redis_connection is None: 

1809 node.redis_connection = self.create_redis_node( 

1810 host=node.host, port=node.port, **self.connection_kwargs 

1811 ) 

1812 connection_pools.append(node.redis_connection.connection_pool) 

1813 

1814 self._event_dispatcher.dispatch( 

1815 AfterPooledConnectionsInstantiationEvent( 

1816 connection_pools, ClientType.SYNC, self._credential_provider 

1817 ) 

1818 ) 

1819 

1820 def create_redis_node(self, host, port, **kwargs): 

1821 # We are configuring the connection pool not to retry 

1822 # connections on lower level clients to avoid retrying 

1823 # connections to nodes that are not reachable 

1824 # and to avoid blocking the connection pool. 

1825 # The only error that will have some handling in the lower 

1826 # level clients is ConnectionError which will trigger disconnection 

1827 # of the socket. 

1828 # The retries will be handled on cluster client level 

1829 # where we will have proper handling of the cluster topology 

1830 node_retry_config = Retry( 

1831 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) 

1832 ) 

1833 

1834 protocol = kwargs.get("protocol", None) 

1835 if protocol in [3, "3"]: 

1836 kwargs.update( 

1837 {"maint_notifications_config": MaintNotificationsConfig(enabled=False)} 

1838 ) 

1839 if self.from_url: 

1840 # Create a redis node with a costumed connection pool 

1841 kwargs.update({"host": host}) 

1842 kwargs.update({"port": port}) 

1843 kwargs.update({"cache": self._cache}) 

1844 kwargs.update({"retry": node_retry_config}) 

1845 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

1846 else: 

1847 r = Redis( 

1848 host=host, 

1849 port=port, 

1850 cache=self._cache, 

1851 retry=node_retry_config, 

1852 **kwargs, 

1853 ) 

1854 return r 

1855 

1856 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

1857 node_name = get_node_name(host, port) 

1858 # check if we already have this node in the tmp_nodes_cache 

1859 target_node = tmp_nodes_cache.get(node_name) 

1860 if target_node is None: 

1861 # before creating a new cluster node, check if the cluster node already 

1862 # exists in the current nodes cache and has a valid connection so we can 

1863 # reuse it 

1864 target_node = self.nodes_cache.get(node_name) 

1865 if target_node is None or target_node.redis_connection is None: 

1866 # create new cluster node for this cluster 

1867 target_node = ClusterNode(host, port, role) 

1868 if target_node.server_type != role: 

1869 target_node.server_type = role 

1870 # add this node to the nodes cache 

1871 tmp_nodes_cache[target_node.name] = target_node 

1872 

1873 return target_node 

1874 

1875 def initialize(self): 

1876 """ 

1877 Initializes the nodes cache, slots cache and redis connections. 

1878 :startup_nodes: 

1879 Responsible for discovering other nodes in the cluster 

1880 """ 

1881 self.reset() 

1882 tmp_nodes_cache = {} 

1883 tmp_slots = {} 

1884 disagreements = [] 

1885 startup_nodes_reachable = False 

1886 fully_covered = False 

1887 kwargs = self.connection_kwargs 

1888 exception = None 

1889 # Convert to tuple to prevent RuntimeError if self.startup_nodes 

1890 # is modified during iteration 

1891 for startup_node in tuple(self.startup_nodes.values()): 

1892 try: 

1893 if startup_node.redis_connection: 

1894 r = startup_node.redis_connection 

1895 else: 

1896 # Create a new Redis connection 

1897 r = self.create_redis_node( 

1898 startup_node.host, startup_node.port, **kwargs 

1899 ) 

1900 self.startup_nodes[startup_node.name].redis_connection = r 

1901 # Make sure cluster mode is enabled on this node 

1902 try: 

1903 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

1904 r.connection_pool.disconnect() 

1905 except ResponseError: 

1906 raise RedisClusterException( 

1907 "Cluster mode is not enabled on this node" 

1908 ) 

1909 startup_nodes_reachable = True 

1910 except Exception as e: 

1911 # Try the next startup node. 

1912 # The exception is saved and raised only if we have no more nodes. 

1913 exception = e 

1914 continue 

1915 

1916 # CLUSTER SLOTS command results in the following output: 

1917 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

1918 # where each node contains the following list: [IP, port, node_id] 

1919 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

1920 # primary node of the first slot section. 

1921 # If there's only one server in the cluster, its ``host`` is '' 

1922 # Fix it to the host in startup_nodes 

1923 if ( 

1924 len(cluster_slots) == 1 

1925 and len(cluster_slots[0][2][0]) == 0 

1926 and len(self.startup_nodes) == 1 

1927 ): 

1928 cluster_slots[0][2][0] = startup_node.host 

1929 

1930 for slot in cluster_slots: 

1931 primary_node = slot[2] 

1932 host = str_if_bytes(primary_node[0]) 

1933 if host == "": 

1934 host = startup_node.host 

1935 port = int(primary_node[1]) 

1936 host, port = self.remap_host_port(host, port) 

1937 

1938 nodes_for_slot = [] 

1939 

1940 target_node = self._get_or_create_cluster_node( 

1941 host, port, PRIMARY, tmp_nodes_cache 

1942 ) 

1943 nodes_for_slot.append(target_node) 

1944 

1945 replica_nodes = slot[3:] 

1946 for replica_node in replica_nodes: 

1947 host = str_if_bytes(replica_node[0]) 

1948 port = int(replica_node[1]) 

1949 host, port = self.remap_host_port(host, port) 

1950 target_replica_node = self._get_or_create_cluster_node( 

1951 host, port, REPLICA, tmp_nodes_cache 

1952 ) 

1953 nodes_for_slot.append(target_replica_node) 

1954 

1955 for i in range(int(slot[0]), int(slot[1]) + 1): 

1956 if i not in tmp_slots: 

1957 tmp_slots[i] = nodes_for_slot 

1958 else: 

1959 # Validate that 2 nodes want to use the same slot cache 

1960 # setup 

1961 tmp_slot = tmp_slots[i][0] 

1962 if tmp_slot.name != target_node.name: 

1963 disagreements.append( 

1964 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

1965 ) 

1966 

1967 if len(disagreements) > 5: 

1968 raise RedisClusterException( 

1969 f"startup_nodes could not agree on a valid " 

1970 f"slots cache: {', '.join(disagreements)}" 

1971 ) 

1972 

1973 fully_covered = self.check_slots_coverage(tmp_slots) 

1974 if fully_covered: 

1975 # Don't need to continue to the next startup node if all 

1976 # slots are covered 

1977 break 

1978 

1979 if not startup_nodes_reachable: 

1980 raise RedisClusterException( 

1981 f"Redis Cluster cannot be connected. Please provide at least " 

1982 f"one reachable node: {str(exception)}" 

1983 ) from exception 

1984 

1985 if self._cache is None and self._cache_config is not None: 

1986 if self._cache_factory is None: 

1987 self._cache = CacheFactory(self._cache_config).get_cache() 

1988 else: 

1989 self._cache = self._cache_factory.get_cache() 

1990 

1991 # Create Redis connections to all nodes 

1992 self.create_redis_connections(list(tmp_nodes_cache.values())) 

1993 

1994 # Check if the slots are not fully covered 

1995 if not fully_covered and self._require_full_coverage: 

1996 # Despite the requirement that the slots be covered, there 

1997 # isn't a full coverage 

1998 raise RedisClusterException( 

1999 f"All slots are not covered after query all startup_nodes. " 

2000 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

2001 f"covered..." 

2002 ) 

2003 

2004 # Set the tmp variables to the real variables 

2005 self.nodes_cache = tmp_nodes_cache 

2006 self.slots_cache = tmp_slots 

2007 # Set the default node 

2008 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

2009 if self._dynamic_startup_nodes: 

2010 # Populate the startup nodes with all discovered nodes 

2011 self.startup_nodes = tmp_nodes_cache 

2012 # If initialize was called after a MovedError, clear it 

2013 self._moved_exception = None 

2014 

2015 def close(self) -> None: 

2016 self.default_node = None 

2017 for node in self.nodes_cache.values(): 

2018 if node.redis_connection: 

2019 node.redis_connection.close() 

2020 

2021 def reset(self): 

2022 try: 

2023 self.read_load_balancer.reset() 

2024 except TypeError: 

2025 # The read_load_balancer is None, do nothing 

2026 pass 

2027 

2028 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

2029 """ 

2030 Remap the host and port returned from the cluster to a different 

2031 internal value. Useful if the client is not connecting directly 

2032 to the cluster. 

2033 """ 

2034 if self.address_remap: 

2035 return self.address_remap((host, port)) 

2036 return host, port 

2037 

2038 def find_connection_owner(self, connection: Connection) -> Optional[Redis]: 

2039 node_name = get_node_name(connection.host, connection.port) 

2040 for node in tuple(self.nodes_cache.values()): 

2041 if node.redis_connection: 

2042 conn_args = node.redis_connection.connection_pool.connection_kwargs 

2043 if node_name == get_node_name( 

2044 conn_args.get("host"), conn_args.get("port") 

2045 ): 

2046 return node 

2047 

2048 

2049class ClusterPubSub(PubSub): 

2050 """ 

2051 Wrapper for PubSub class. 

2052 

2053 IMPORTANT: before using ClusterPubSub, read about the known limitations 

2054 with pubsub in Cluster mode and learn how to workaround them: 

2055 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html 

2056 """ 

2057 

2058 def __init__( 

2059 self, 

2060 redis_cluster, 

2061 node=None, 

2062 host=None, 

2063 port=None, 

2064 push_handler_func=None, 

2065 event_dispatcher: Optional["EventDispatcher"] = None, 

2066 **kwargs, 

2067 ): 

2068 """ 

2069 When a pubsub instance is created without specifying a node, a single 

2070 node will be transparently chosen for the pubsub connection on the 

2071 first command execution. The node will be determined by: 

2072 1. Hashing the channel name in the request to find its keyslot 

2073 2. Selecting a node that handles the keyslot: If read_from_replicas is 

2074 set to true or load_balancing_strategy is set, a replica can be selected. 

2075 

2076 :type redis_cluster: RedisCluster 

2077 :type node: ClusterNode 

2078 :type host: str 

2079 :type port: int 

2080 """ 

2081 self.node = None 

2082 self.set_pubsub_node(redis_cluster, node, host, port) 

2083 connection_pool = ( 

2084 None 

2085 if self.node is None 

2086 else redis_cluster.get_redis_connection(self.node).connection_pool 

2087 ) 

2088 self.cluster = redis_cluster 

2089 self.node_pubsub_mapping = {} 

2090 self._pubsubs_generator = self._pubsubs_generator() 

2091 if event_dispatcher is None: 

2092 self._event_dispatcher = EventDispatcher() 

2093 else: 

2094 self._event_dispatcher = event_dispatcher 

2095 super().__init__( 

2096 connection_pool=connection_pool, 

2097 encoder=redis_cluster.encoder, 

2098 push_handler_func=push_handler_func, 

2099 event_dispatcher=self._event_dispatcher, 

2100 **kwargs, 

2101 ) 

2102 

2103 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

2104 """ 

2105 The pubsub node will be set according to the passed node, host and port 

2106 When none of the node, host, or port are specified - the node is set 

2107 to None and will be determined by the keyslot of the channel in the 

2108 first command to be executed. 

2109 RedisClusterException will be thrown if the passed node does not exist 

2110 in the cluster. 

2111 If host is passed without port, or vice versa, a DataError will be 

2112 thrown. 

2113 :type cluster: RedisCluster 

2114 :type node: ClusterNode 

2115 :type host: str 

2116 :type port: int 

2117 """ 

2118 if node is not None: 

2119 # node is passed by the user 

2120 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

2121 pubsub_node = node 

2122 elif host is not None and port is not None: 

2123 # host and port passed by the user 

2124 node = cluster.get_node(host=host, port=port) 

2125 self._raise_on_invalid_node(cluster, node, host, port) 

2126 pubsub_node = node 

2127 elif any([host, port]) is True: 

2128 # only 'host' or 'port' passed 

2129 raise DataError("Passing a host requires passing a port, and vice versa") 

2130 else: 

2131 # nothing passed by the user. set node to None 

2132 pubsub_node = None 

2133 

2134 self.node = pubsub_node 

2135 

2136 def get_pubsub_node(self): 

2137 """ 

2138 Get the node that is being used as the pubsub connection 

2139 """ 

2140 return self.node 

2141 

2142 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

2143 """ 

2144 Raise a RedisClusterException if the node is None or doesn't exist in 

2145 the cluster. 

2146 """ 

2147 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

2148 raise RedisClusterException( 

2149 f"Node {host}:{port} doesn't exist in the cluster" 

2150 ) 

2151 

2152 def execute_command(self, *args): 

2153 """ 

2154 Execute a subscribe/unsubscribe command. 

2155 

2156 Taken code from redis-py and tweak to make it work within a cluster. 

2157 """ 

2158 # NOTE: don't parse the response in this function -- it could pull a 

2159 # legitimate message off the stack if the connection is already 

2160 # subscribed to one or more channels 

2161 

2162 if self.connection is None: 

2163 if self.connection_pool is None: 

2164 if len(args) > 1: 

2165 # Hash the first channel and get one of the nodes holding 

2166 # this slot 

2167 channel = args[1] 

2168 slot = self.cluster.keyslot(channel) 

2169 node = self.cluster.nodes_manager.get_node_from_slot( 

2170 slot, 

2171 self.cluster.read_from_replicas, 

2172 self.cluster.load_balancing_strategy, 

2173 ) 

2174 else: 

2175 # Get a random node 

2176 node = self.cluster.get_random_node() 

2177 self.node = node 

2178 redis_connection = self.cluster.get_redis_connection(node) 

2179 self.connection_pool = redis_connection.connection_pool 

2180 self.connection = self.connection_pool.get_connection() 

2181 # register a callback that re-subscribes to any channels we 

2182 # were listening to when we were disconnected 

2183 self.connection.register_connect_callback(self.on_connect) 

2184 if self.push_handler_func is not None: 

2185 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

2186 self._event_dispatcher.dispatch( 

2187 AfterPubSubConnectionInstantiationEvent( 

2188 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

2189 ) 

2190 ) 

2191 connection = self.connection 

2192 self._execute(connection, connection.send_command, *args) 

2193 

2194 def _get_node_pubsub(self, node): 

2195 try: 

2196 return self.node_pubsub_mapping[node.name] 

2197 except KeyError: 

2198 pubsub = node.redis_connection.pubsub( 

2199 push_handler_func=self.push_handler_func 

2200 ) 

2201 self.node_pubsub_mapping[node.name] = pubsub 

2202 return pubsub 

2203 

2204 def _sharded_message_generator(self): 

2205 for _ in range(len(self.node_pubsub_mapping)): 

2206 pubsub = next(self._pubsubs_generator) 

2207 message = pubsub.get_message() 

2208 if message is not None: 

2209 return message 

2210 return None 

2211 

2212 def _pubsubs_generator(self): 

2213 while True: 

2214 current_nodes = list(self.node_pubsub_mapping.values()) 

2215 yield from current_nodes 

2216 

2217 def get_sharded_message( 

2218 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None 

2219 ): 

2220 if target_node: 

2221 message = self.node_pubsub_mapping[target_node.name].get_message( 

2222 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout 

2223 ) 

2224 else: 

2225 message = self._sharded_message_generator() 

2226 if message is None: 

2227 return None 

2228 elif str_if_bytes(message["type"]) == "sunsubscribe": 

2229 if message["channel"] in self.pending_unsubscribe_shard_channels: 

2230 self.pending_unsubscribe_shard_channels.remove(message["channel"]) 

2231 self.shard_channels.pop(message["channel"], None) 

2232 node = self.cluster.get_node_from_key(message["channel"]) 

2233 if self.node_pubsub_mapping[node.name].subscribed is False: 

2234 self.node_pubsub_mapping.pop(node.name) 

2235 if not self.channels and not self.patterns and not self.shard_channels: 

2236 # There are no subscriptions anymore, set subscribed_event flag 

2237 # to false 

2238 self.subscribed_event.clear() 

2239 if self.ignore_subscribe_messages or ignore_subscribe_messages: 

2240 return None 

2241 return message 

2242 

2243 def ssubscribe(self, *args, **kwargs): 

2244 if args: 

2245 args = list_or_args(args[0], args[1:]) 

2246 s_channels = dict.fromkeys(args) 

2247 s_channels.update(kwargs) 

2248 for s_channel, handler in s_channels.items(): 

2249 node = self.cluster.get_node_from_key(s_channel) 

2250 pubsub = self._get_node_pubsub(node) 

2251 if handler: 

2252 pubsub.ssubscribe(**{s_channel: handler}) 

2253 else: 

2254 pubsub.ssubscribe(s_channel) 

2255 self.shard_channels.update(pubsub.shard_channels) 

2256 self.pending_unsubscribe_shard_channels.difference_update( 

2257 self._normalize_keys({s_channel: None}) 

2258 ) 

2259 if pubsub.subscribed and not self.subscribed: 

2260 self.subscribed_event.set() 

2261 self.health_check_response_counter = 0 

2262 

2263 def sunsubscribe(self, *args): 

2264 if args: 

2265 args = list_or_args(args[0], args[1:]) 

2266 else: 

2267 args = self.shard_channels 

2268 

2269 for s_channel in args: 

2270 node = self.cluster.get_node_from_key(s_channel) 

2271 p = self._get_node_pubsub(node) 

2272 p.sunsubscribe(s_channel) 

2273 self.pending_unsubscribe_shard_channels.update( 

2274 p.pending_unsubscribe_shard_channels 

2275 ) 

2276 

2277 def get_redis_connection(self): 

2278 """ 

2279 Get the Redis connection of the pubsub connected node. 

2280 """ 

2281 if self.node is not None: 

2282 return self.node.redis_connection 

2283 

2284 def disconnect(self): 

2285 """ 

2286 Disconnect the pubsub connection. 

2287 """ 

2288 if self.connection: 

2289 self.connection.disconnect() 

2290 for pubsub in self.node_pubsub_mapping.values(): 

2291 pubsub.connection.disconnect() 

2292 

2293 

2294class ClusterPipeline(RedisCluster): 

2295 """ 

2296 Support for Redis pipeline 

2297 in cluster mode 

2298 """ 

2299 

2300 ERRORS_ALLOW_RETRY = ( 

2301 ConnectionError, 

2302 TimeoutError, 

2303 MovedError, 

2304 AskError, 

2305 TryAgainError, 

2306 ) 

2307 

2308 NO_SLOTS_COMMANDS = {"UNWATCH"} 

2309 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

2310 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

2311 

2312 @deprecated_args( 

2313 args_to_warn=[ 

2314 "cluster_error_retry_attempts", 

2315 ], 

2316 reason="Please configure the 'retry' object instead", 

2317 version="6.0.0", 

2318 ) 

2319 def __init__( 

2320 self, 

2321 nodes_manager: "NodesManager", 

2322 commands_parser: "CommandsParser", 

2323 result_callbacks: Optional[Dict[str, Callable]] = None, 

2324 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

2325 startup_nodes: Optional[List["ClusterNode"]] = None, 

2326 read_from_replicas: bool = False, 

2327 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

2328 cluster_error_retry_attempts: int = 3, 

2329 reinitialize_steps: int = 5, 

2330 retry: Optional[Retry] = None, 

2331 lock=None, 

2332 transaction=False, 

2333 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

2334 **kwargs, 

2335 ): 

2336 """ """ 

2337 self.command_stack = [] 

2338 self.nodes_manager = nodes_manager 

2339 self.commands_parser = commands_parser 

2340 self.refresh_table_asap = False 

2341 self.result_callbacks = ( 

2342 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

2343 ) 

2344 self.startup_nodes = startup_nodes if startup_nodes else [] 

2345 self.read_from_replicas = read_from_replicas 

2346 self.load_balancing_strategy = load_balancing_strategy 

2347 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

2348 self.cluster_response_callbacks = cluster_response_callbacks 

2349 self.reinitialize_counter = 0 

2350 self.reinitialize_steps = reinitialize_steps 

2351 if retry is not None: 

2352 self.retry = retry 

2353 else: 

2354 self.retry = Retry( 

2355 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

2356 retries=cluster_error_retry_attempts, 

2357 ) 

2358 

2359 self.encoder = Encoder( 

2360 kwargs.get("encoding", "utf-8"), 

2361 kwargs.get("encoding_errors", "strict"), 

2362 kwargs.get("decode_responses", False), 

2363 ) 

2364 if lock is None: 

2365 lock = threading.RLock() 

2366 self._lock = lock 

2367 self.parent_execute_command = super().execute_command 

2368 self._execution_strategy: ExecutionStrategy = ( 

2369 PipelineStrategy(self) if not transaction else TransactionStrategy(self) 

2370 ) 

2371 

2372 # For backward compatibility, mapping from existing policies to new one 

2373 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

2374 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

2375 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

2376 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

2377 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

2378 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

2379 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

2380 } 

2381 

2382 self._policies_callback_mapping: dict[ 

2383 Union[RequestPolicy, ResponsePolicy], Callable 

2384 ] = { 

2385 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

2386 self.get_random_primary_or_all_nodes(command_name) 

2387 ], 

2388 RequestPolicy.DEFAULT_KEYED: lambda command, 

2389 *args: self.get_nodes_from_slot(command, *args), 

2390 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

2391 RequestPolicy.ALL_SHARDS: self.get_primaries, 

2392 RequestPolicy.ALL_NODES: self.get_nodes, 

2393 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

2394 RequestPolicy.MULTI_SHARD: lambda *args, 

2395 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

2396 RequestPolicy.SPECIAL: self.get_special_nodes, 

2397 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

2398 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

2399 } 

2400 

2401 self._policy_resolver = policy_resolver 

2402 

2403 def __repr__(self): 

2404 """ """ 

2405 return f"{type(self).__name__}" 

2406 

2407 def __enter__(self): 

2408 """ """ 

2409 return self 

2410 

2411 def __exit__(self, exc_type, exc_value, traceback): 

2412 """ """ 

2413 self.reset() 

2414 

2415 def __del__(self): 

2416 try: 

2417 self.reset() 

2418 except Exception: 

2419 pass 

2420 

2421 def __len__(self): 

2422 """ """ 

2423 return len(self._execution_strategy.command_queue) 

2424 

2425 def __bool__(self): 

2426 "Pipeline instances should always evaluate to True on Python 3+" 

2427 return True 

2428 

2429 def execute_command(self, *args, **kwargs): 

2430 """ 

2431 Wrapper function for pipeline_execute_command 

2432 """ 

2433 return self._execution_strategy.execute_command(*args, **kwargs) 

2434 

2435 def pipeline_execute_command(self, *args, **options): 

2436 """ 

2437 Stage a command to be executed when execute() is next called 

2438 

2439 Returns the current Pipeline object back so commands can be 

2440 chained together, such as: 

2441 

2442 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

2443 

2444 At some other point, you can then run: pipe.execute(), 

2445 which will execute all commands queued in the pipe. 

2446 """ 

2447 return self._execution_strategy.execute_command(*args, **options) 

2448 

2449 def annotate_exception(self, exception, number, command): 

2450 """ 

2451 Provides extra context to the exception prior to it being handled 

2452 """ 

2453 self._execution_strategy.annotate_exception(exception, number, command) 

2454 

2455 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2456 """ 

2457 Execute all the commands in the current pipeline 

2458 """ 

2459 

2460 try: 

2461 return self._execution_strategy.execute(raise_on_error) 

2462 finally: 

2463 self.reset() 

2464 

2465 def reset(self): 

2466 """ 

2467 Reset back to empty pipeline. 

2468 """ 

2469 self._execution_strategy.reset() 

2470 

2471 def send_cluster_commands( 

2472 self, stack, raise_on_error=True, allow_redirections=True 

2473 ): 

2474 return self._execution_strategy.send_cluster_commands( 

2475 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections 

2476 ) 

2477 

2478 def exists(self, *keys): 

2479 return self._execution_strategy.exists(*keys) 

2480 

2481 def eval(self): 

2482 """ """ 

2483 return self._execution_strategy.eval() 

2484 

2485 def multi(self): 

2486 """ 

2487 Start a transactional block of the pipeline after WATCH commands 

2488 are issued. End the transactional block with `execute`. 

2489 """ 

2490 self._execution_strategy.multi() 

2491 

2492 def load_scripts(self): 

2493 """ """ 

2494 self._execution_strategy.load_scripts() 

2495 

2496 def discard(self): 

2497 """ """ 

2498 self._execution_strategy.discard() 

2499 

2500 def watch(self, *names): 

2501 """Watches the values at keys ``names``""" 

2502 self._execution_strategy.watch(*names) 

2503 

2504 def unwatch(self): 

2505 """Unwatches all previously specified keys""" 

2506 self._execution_strategy.unwatch() 

2507 

2508 def script_load_for_pipeline(self, *args, **kwargs): 

2509 self._execution_strategy.script_load_for_pipeline(*args, **kwargs) 

2510 

2511 def delete(self, *names): 

2512 self._execution_strategy.delete(*names) 

2513 

2514 def unlink(self, *names): 

2515 self._execution_strategy.unlink(*names) 

2516 

2517 

2518def block_pipeline_command(name: str) -> Callable[..., Any]: 

2519 """ 

2520 Prints error because some pipelined commands should 

2521 be blocked when running in cluster-mode 

2522 """ 

2523 

2524 def inner(*args, **kwargs): 

2525 raise RedisClusterException( 

2526 f"ERROR: Calling pipelined function {name} is blocked " 

2527 f"when running redis in cluster mode..." 

2528 ) 

2529 

2530 return inner 

2531 

2532 

2533# Blocked pipeline commands 

2534PIPELINE_BLOCKED_COMMANDS = ( 

2535 "BGREWRITEAOF", 

2536 "BGSAVE", 

2537 "BITOP", 

2538 "BRPOPLPUSH", 

2539 "CLIENT GETNAME", 

2540 "CLIENT KILL", 

2541 "CLIENT LIST", 

2542 "CLIENT SETNAME", 

2543 "CLIENT", 

2544 "CONFIG GET", 

2545 "CONFIG RESETSTAT", 

2546 "CONFIG REWRITE", 

2547 "CONFIG SET", 

2548 "CONFIG", 

2549 "DBSIZE", 

2550 "ECHO", 

2551 "EVALSHA", 

2552 "FLUSHALL", 

2553 "FLUSHDB", 

2554 "INFO", 

2555 "KEYS", 

2556 "LASTSAVE", 

2557 "MGET", 

2558 "MGET NONATOMIC", 

2559 "MOVE", 

2560 "MSET", 

2561 "MSETEX", 

2562 "MSET NONATOMIC", 

2563 "MSETNX", 

2564 "PFCOUNT", 

2565 "PFMERGE", 

2566 "PING", 

2567 "PUBLISH", 

2568 "RANDOMKEY", 

2569 "READONLY", 

2570 "READWRITE", 

2571 "RENAME", 

2572 "RENAMENX", 

2573 "RPOPLPUSH", 

2574 "SAVE", 

2575 "SCAN", 

2576 "SCRIPT EXISTS", 

2577 "SCRIPT FLUSH", 

2578 "SCRIPT KILL", 

2579 "SCRIPT LOAD", 

2580 "SCRIPT", 

2581 "SDIFF", 

2582 "SDIFFSTORE", 

2583 "SENTINEL GET MASTER ADDR BY NAME", 

2584 "SENTINEL MASTER", 

2585 "SENTINEL MASTERS", 

2586 "SENTINEL MONITOR", 

2587 "SENTINEL REMOVE", 

2588 "SENTINEL SENTINELS", 

2589 "SENTINEL SET", 

2590 "SENTINEL SLAVES", 

2591 "SENTINEL", 

2592 "SHUTDOWN", 

2593 "SINTER", 

2594 "SINTERSTORE", 

2595 "SLAVEOF", 

2596 "SLOWLOG GET", 

2597 "SLOWLOG LEN", 

2598 "SLOWLOG RESET", 

2599 "SLOWLOG", 

2600 "SMOVE", 

2601 "SORT", 

2602 "SUNION", 

2603 "SUNIONSTORE", 

2604 "TIME", 

2605) 

2606for command in PIPELINE_BLOCKED_COMMANDS: 

2607 command = command.replace(" ", "_").lower() 

2608 

2609 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

2610 

2611 

2612class PipelineCommand: 

2613 """ """ 

2614 

2615 def __init__(self, args, options=None, position=None): 

2616 self.args = args 

2617 if options is None: 

2618 options = {} 

2619 self.options = options 

2620 self.position = position 

2621 self.result = None 

2622 self.node = None 

2623 self.asking = False 

2624 self.command_policies: Optional[CommandPolicies] = None 

2625 

2626 

2627class NodeCommands: 

2628 """ """ 

2629 

2630 def __init__(self, parse_response, connection_pool, connection): 

2631 """ """ 

2632 self.parse_response = parse_response 

2633 self.connection_pool = connection_pool 

2634 self.connection = connection 

2635 self.commands = [] 

2636 

2637 def append(self, c): 

2638 """ """ 

2639 self.commands.append(c) 

2640 

2641 def write(self): 

2642 """ 

2643 Code borrowed from Redis so it can be fixed 

2644 """ 

2645 connection = self.connection 

2646 commands = self.commands 

2647 

2648 # We are going to clobber the commands with the write, so go ahead 

2649 # and ensure that nothing is sitting there from a previous run. 

2650 for c in commands: 

2651 c.result = None 

2652 

2653 # build up all commands into a single request to increase network perf 

2654 # send all the commands and catch connection and timeout errors. 

2655 try: 

2656 connection.send_packed_command( 

2657 connection.pack_commands([c.args for c in commands]) 

2658 ) 

2659 except (ConnectionError, TimeoutError) as e: 

2660 for c in commands: 

2661 c.result = e 

2662 

2663 def read(self): 

2664 """ """ 

2665 connection = self.connection 

2666 for c in self.commands: 

2667 # if there is a result on this command, 

2668 # it means we ran into an exception 

2669 # like a connection error. Trying to parse 

2670 # a response on a connection that 

2671 # is no longer open will result in a 

2672 # connection error raised by redis-py. 

2673 # but redis-py doesn't check in parse_response 

2674 # that the sock object is 

2675 # still set and if you try to 

2676 # read from a closed connection, it will 

2677 # result in an AttributeError because 

2678 # it will do a readline() call on None. 

2679 # This can have all kinds of nasty side-effects. 

2680 # Treating this case as a connection error 

2681 # is fine because it will dump 

2682 # the connection object back into the 

2683 # pool and on the next write, it will 

2684 # explicitly open the connection and all will be well. 

2685 if c.result is None: 

2686 try: 

2687 c.result = self.parse_response(connection, c.args[0], **c.options) 

2688 except (ConnectionError, TimeoutError) as e: 

2689 for c in self.commands: 

2690 c.result = e 

2691 return 

2692 except RedisError: 

2693 c.result = sys.exc_info()[1] 

2694 

2695 

2696class ExecutionStrategy(ABC): 

2697 @property 

2698 @abstractmethod 

2699 def command_queue(self): 

2700 pass 

2701 

2702 @abstractmethod 

2703 def execute_command(self, *args, **kwargs): 

2704 """ 

2705 Execution flow for current execution strategy. 

2706 

2707 See: ClusterPipeline.execute_command() 

2708 """ 

2709 pass 

2710 

2711 @abstractmethod 

2712 def annotate_exception(self, exception, number, command): 

2713 """ 

2714 Annotate exception according to current execution strategy. 

2715 

2716 See: ClusterPipeline.annotate_exception() 

2717 """ 

2718 pass 

2719 

2720 @abstractmethod 

2721 def pipeline_execute_command(self, *args, **options): 

2722 """ 

2723 Pipeline execution flow for current execution strategy. 

2724 

2725 See: ClusterPipeline.pipeline_execute_command() 

2726 """ 

2727 pass 

2728 

2729 @abstractmethod 

2730 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2731 """ 

2732 Executes current execution strategy. 

2733 

2734 See: ClusterPipeline.execute() 

2735 """ 

2736 pass 

2737 

2738 @abstractmethod 

2739 def send_cluster_commands( 

2740 self, stack, raise_on_error=True, allow_redirections=True 

2741 ): 

2742 """ 

2743 Sends commands according to current execution strategy. 

2744 

2745 See: ClusterPipeline.send_cluster_commands() 

2746 """ 

2747 pass 

2748 

2749 @abstractmethod 

2750 def reset(self): 

2751 """ 

2752 Resets current execution strategy. 

2753 

2754 See: ClusterPipeline.reset() 

2755 """ 

2756 pass 

2757 

2758 @abstractmethod 

2759 def exists(self, *keys): 

2760 pass 

2761 

2762 @abstractmethod 

2763 def eval(self): 

2764 pass 

2765 

2766 @abstractmethod 

2767 def multi(self): 

2768 """ 

2769 Starts transactional context. 

2770 

2771 See: ClusterPipeline.multi() 

2772 """ 

2773 pass 

2774 

2775 @abstractmethod 

2776 def load_scripts(self): 

2777 pass 

2778 

2779 @abstractmethod 

2780 def watch(self, *names): 

2781 pass 

2782 

2783 @abstractmethod 

2784 def unwatch(self): 

2785 """ 

2786 Unwatches all previously specified keys 

2787 

2788 See: ClusterPipeline.unwatch() 

2789 """ 

2790 pass 

2791 

2792 @abstractmethod 

2793 def script_load_for_pipeline(self, *args, **kwargs): 

2794 pass 

2795 

2796 @abstractmethod 

2797 def delete(self, *names): 

2798 """ 

2799 "Delete a key specified by ``names``" 

2800 

2801 See: ClusterPipeline.delete() 

2802 """ 

2803 pass 

2804 

2805 @abstractmethod 

2806 def unlink(self, *names): 

2807 """ 

2808 "Unlink a key specified by ``names``" 

2809 

2810 See: ClusterPipeline.unlink() 

2811 """ 

2812 pass 

2813 

2814 @abstractmethod 

2815 def discard(self): 

2816 pass 

2817 

2818 

2819class AbstractStrategy(ExecutionStrategy): 

2820 def __init__( 

2821 self, 

2822 pipe: ClusterPipeline, 

2823 ): 

2824 self._command_queue: List[PipelineCommand] = [] 

2825 self._pipe = pipe 

2826 self._nodes_manager = self._pipe.nodes_manager 

2827 

2828 @property 

2829 def command_queue(self): 

2830 return self._command_queue 

2831 

2832 @command_queue.setter 

2833 def command_queue(self, queue: List[PipelineCommand]): 

2834 self._command_queue = queue 

2835 

2836 @abstractmethod 

2837 def execute_command(self, *args, **kwargs): 

2838 pass 

2839 

2840 def pipeline_execute_command(self, *args, **options): 

2841 self._command_queue.append( 

2842 PipelineCommand(args, options, len(self._command_queue)) 

2843 ) 

2844 return self._pipe 

2845 

2846 @abstractmethod 

2847 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2848 pass 

2849 

2850 @abstractmethod 

2851 def send_cluster_commands( 

2852 self, stack, raise_on_error=True, allow_redirections=True 

2853 ): 

2854 pass 

2855 

2856 @abstractmethod 

2857 def reset(self): 

2858 pass 

2859 

2860 def exists(self, *keys): 

2861 return self.execute_command("EXISTS", *keys) 

2862 

2863 def eval(self): 

2864 """ """ 

2865 raise RedisClusterException("method eval() is not implemented") 

2866 

2867 def load_scripts(self): 

2868 """ """ 

2869 raise RedisClusterException("method load_scripts() is not implemented") 

2870 

2871 def script_load_for_pipeline(self, *args, **kwargs): 

2872 """ """ 

2873 raise RedisClusterException( 

2874 "method script_load_for_pipeline() is not implemented" 

2875 ) 

2876 

2877 def annotate_exception(self, exception, number, command): 

2878 """ 

2879 Provides extra context to the exception prior to it being handled 

2880 """ 

2881 cmd = " ".join(map(safe_str, command)) 

2882 msg = ( 

2883 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

2884 f"caused error: {exception.args[0]}" 

2885 ) 

2886 exception.args = (msg,) + exception.args[1:] 

2887 

2888 

2889class PipelineStrategy(AbstractStrategy): 

2890 def __init__(self, pipe: ClusterPipeline): 

2891 super().__init__(pipe) 

2892 self.command_flags = pipe.command_flags 

2893 

2894 def execute_command(self, *args, **kwargs): 

2895 return self.pipeline_execute_command(*args, **kwargs) 

2896 

2897 def _raise_first_error(self, stack): 

2898 """ 

2899 Raise the first exception on the stack 

2900 """ 

2901 for c in stack: 

2902 r = c.result 

2903 if isinstance(r, Exception): 

2904 self.annotate_exception(r, c.position + 1, c.args) 

2905 raise r 

2906 

2907 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2908 stack = self._command_queue 

2909 if not stack: 

2910 return [] 

2911 

2912 try: 

2913 return self.send_cluster_commands(stack, raise_on_error) 

2914 finally: 

2915 self.reset() 

2916 

2917 def reset(self): 

2918 """ 

2919 Reset back to empty pipeline. 

2920 """ 

2921 self._command_queue = [] 

2922 

2923 def send_cluster_commands( 

2924 self, stack, raise_on_error=True, allow_redirections=True 

2925 ): 

2926 """ 

2927 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling. 

2928 

2929 If one of the retryable exceptions has been thrown we assume that: 

2930 - connection_pool was disconnected 

2931 - connection_pool was reset 

2932 - refresh_table_asap set to True 

2933 

2934 It will try the number of times specified by 

2935 the retries in config option "self.retry" 

2936 which defaults to 3 unless manually configured. 

2937 

2938 If it reaches the number of times, the command will 

2939 raises ClusterDownException. 

2940 """ 

2941 if not stack: 

2942 return [] 

2943 retry_attempts = self._pipe.retry.get_retries() 

2944 while True: 

2945 try: 

2946 return self._send_cluster_commands( 

2947 stack, 

2948 raise_on_error=raise_on_error, 

2949 allow_redirections=allow_redirections, 

2950 ) 

2951 except RedisCluster.ERRORS_ALLOW_RETRY as e: 

2952 if retry_attempts > 0: 

2953 # Try again with the new cluster setup. All other errors 

2954 # should be raised. 

2955 retry_attempts -= 1 

2956 pass 

2957 else: 

2958 raise e 

2959 

2960 def _send_cluster_commands( 

2961 self, stack, raise_on_error=True, allow_redirections=True 

2962 ): 

2963 """ 

2964 Send a bunch of cluster commands to the redis cluster. 

2965 

2966 `allow_redirections` If the pipeline should follow 

2967 `ASK` & `MOVED` responses automatically. If set 

2968 to false it will raise RedisClusterException. 

2969 """ 

2970 # the first time sending the commands we send all of 

2971 # the commands that were queued up. 

2972 # if we have to run through it again, we only retry 

2973 # the commands that failed. 

2974 attempt = sorted(stack, key=lambda x: x.position) 

2975 is_default_node = False 

2976 # build a list of node objects based on node names we need to 

2977 nodes = {} 

2978 

2979 # as we move through each command that still needs to be processed, 

2980 # we figure out the slot number that command maps to, then from 

2981 # the slot determine the node. 

2982 for c in attempt: 

2983 command_policies = self._pipe._policy_resolver.resolve(c.args[0].lower()) 

2984 

2985 while True: 

2986 # refer to our internal node -> slot table that 

2987 # tells us where a given command should route to. 

2988 # (it might be possible we have a cached node that no longer 

2989 # exists in the cluster, which is why we do this in a loop) 

2990 passed_targets = c.options.pop("target_nodes", None) 

2991 if passed_targets and not self._is_nodes_flag(passed_targets): 

2992 target_nodes = self._parse_target_nodes(passed_targets) 

2993 

2994 if not command_policies: 

2995 command_policies = CommandPolicies() 

2996 else: 

2997 if not command_policies: 

2998 command = c.args[0].upper() 

2999 if ( 

3000 len(c.args) >= 2 

3001 and f"{c.args[0]} {c.args[1]}".upper() 

3002 in self._pipe.command_flags 

3003 ): 

3004 command = f"{c.args[0]} {c.args[1]}".upper() 

3005 

3006 # We only could resolve key properties if command is not 

3007 # in a list of pre-defined request policies 

3008 command_flag = self.command_flags.get(command) 

3009 if not command_flag: 

3010 # Fallback to default policy 

3011 if not self._pipe.get_default_node(): 

3012 keys = None 

3013 else: 

3014 keys = self._pipe._get_command_keys(*c.args) 

3015 if not keys or len(keys) == 0: 

3016 command_policies = CommandPolicies() 

3017 else: 

3018 command_policies = CommandPolicies( 

3019 request_policy=RequestPolicy.DEFAULT_KEYED, 

3020 response_policy=ResponsePolicy.DEFAULT_KEYED, 

3021 ) 

3022 else: 

3023 if command_flag in self._pipe._command_flags_mapping: 

3024 command_policies = CommandPolicies( 

3025 request_policy=self._pipe._command_flags_mapping[ 

3026 command_flag 

3027 ] 

3028 ) 

3029 else: 

3030 command_policies = CommandPolicies() 

3031 

3032 target_nodes = self._determine_nodes( 

3033 *c.args, 

3034 request_policy=command_policies.request_policy, 

3035 node_flag=passed_targets, 

3036 ) 

3037 if not target_nodes: 

3038 raise RedisClusterException( 

3039 f"No targets were found to execute {c.args} command on" 

3040 ) 

3041 c.command_policies = command_policies 

3042 if len(target_nodes) > 1: 

3043 raise RedisClusterException( 

3044 f"Too many targets for command {c.args}" 

3045 ) 

3046 

3047 node = target_nodes[0] 

3048 if node == self._pipe.get_default_node(): 

3049 is_default_node = True 

3050 

3051 # now that we know the name of the node 

3052 # ( it's just a string in the form of host:port ) 

3053 # we can build a list of commands for each node. 

3054 node_name = node.name 

3055 if node_name not in nodes: 

3056 redis_node = self._pipe.get_redis_connection(node) 

3057 try: 

3058 connection = get_connection(redis_node) 

3059 except (ConnectionError, TimeoutError): 

3060 for n in nodes.values(): 

3061 n.connection_pool.release(n.connection) 

3062 # Connection retries are being handled in the node's 

3063 # Retry object. Reinitialize the node -> slot table. 

3064 self._nodes_manager.initialize() 

3065 if is_default_node: 

3066 self._pipe.replace_default_node() 

3067 raise 

3068 nodes[node_name] = NodeCommands( 

3069 redis_node.parse_response, 

3070 redis_node.connection_pool, 

3071 connection, 

3072 ) 

3073 nodes[node_name].append(c) 

3074 break 

3075 

3076 # send the commands in sequence. 

3077 # we write to all the open sockets for each node first, 

3078 # before reading anything 

3079 # this allows us to flush all the requests out across the 

3080 # network 

3081 # so that we can read them from different sockets as they come back. 

3082 # we dont' multiplex on the sockets as they come available, 

3083 # but that shouldn't make too much difference. 

3084 try: 

3085 node_commands = nodes.values() 

3086 for n in node_commands: 

3087 n.write() 

3088 

3089 for n in node_commands: 

3090 n.read() 

3091 finally: 

3092 # release all of the redis connections we allocated earlier 

3093 # back into the connection pool. 

3094 # we used to do this step as part of a try/finally block, 

3095 # but it is really dangerous to 

3096 # release connections back into the pool if for some 

3097 # reason the socket has data still left in it 

3098 # from a previous operation. The write and 

3099 # read operations already have try/catch around them for 

3100 # all known types of errors including connection 

3101 # and socket level errors. 

3102 # So if we hit an exception, something really bad 

3103 # happened and putting any oF 

3104 # these connections back into the pool is a very bad idea. 

3105 # the socket might have unread buffer still sitting in it, 

3106 # and then the next time we read from it we pass the 

3107 # buffered result back from a previous command and 

3108 # every single request after to that connection will always get 

3109 # a mismatched result. 

3110 for n in nodes.values(): 

3111 n.connection_pool.release(n.connection) 

3112 

3113 # if the response isn't an exception it is a 

3114 # valid response from the node 

3115 # we're all done with that command, YAY! 

3116 # if we have more commands to attempt, we've run into problems. 

3117 # collect all the commands we are allowed to retry. 

3118 # (MOVED, ASK, or connection errors or timeout errors) 

3119 attempt = sorted( 

3120 ( 

3121 c 

3122 for c in attempt 

3123 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

3124 ), 

3125 key=lambda x: x.position, 

3126 ) 

3127 if attempt and allow_redirections: 

3128 # RETRY MAGIC HAPPENS HERE! 

3129 # send these remaining commands one at a time using `execute_command` 

3130 # in the main client. This keeps our retry logic 

3131 # in one place mostly, 

3132 # and allows us to be more confident in correctness of behavior. 

3133 # at this point any speed gains from pipelining have been lost 

3134 # anyway, so we might as well make the best 

3135 # attempt to get the correct behavior. 

3136 # 

3137 # The client command will handle retries for each 

3138 # individual command sequentially as we pass each 

3139 # one into `execute_command`. Any exceptions 

3140 # that bubble out should only appear once all 

3141 # retries have been exhausted. 

3142 # 

3143 # If a lot of commands have failed, we'll be setting the 

3144 # flag to rebuild the slots table from scratch. 

3145 # So MOVED errors should correct themselves fairly quickly. 

3146 self._pipe.reinitialize_counter += 1 

3147 if self._pipe._should_reinitialized(): 

3148 self._nodes_manager.initialize() 

3149 if is_default_node: 

3150 self._pipe.replace_default_node() 

3151 for c in attempt: 

3152 try: 

3153 # send each command individually like we 

3154 # do in the main client. 

3155 c.result = self._pipe.parent_execute_command(*c.args, **c.options) 

3156 except RedisError as e: 

3157 c.result = e 

3158 

3159 # turn the response back into a simple flat array that corresponds 

3160 # to the sequence of commands issued in the stack in pipeline.execute() 

3161 response = [] 

3162 for c in sorted(stack, key=lambda x: x.position): 

3163 if c.args[0] in self._pipe.cluster_response_callbacks: 

3164 # Remove keys entry, it needs only for cache. 

3165 c.options.pop("keys", None) 

3166 c.result = self._pipe._policies_callback_mapping[ 

3167 c.command_policies.response_policy 

3168 ]( 

3169 self._pipe.cluster_response_callbacks[c.args[0]]( 

3170 c.result, **c.options 

3171 ) 

3172 ) 

3173 response.append(c.result) 

3174 

3175 if raise_on_error: 

3176 self._raise_first_error(stack) 

3177 

3178 return response 

3179 

3180 def _is_nodes_flag(self, target_nodes): 

3181 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags 

3182 

3183 def _parse_target_nodes(self, target_nodes): 

3184 if isinstance(target_nodes, list): 

3185 nodes = target_nodes 

3186 elif isinstance(target_nodes, ClusterNode): 

3187 # Supports passing a single ClusterNode as a variable 

3188 nodes = [target_nodes] 

3189 elif isinstance(target_nodes, dict): 

3190 # Supports dictionaries of the format {node_name: node}. 

3191 # It enables to execute commands with multi nodes as follows: 

3192 # rc.cluster_save_config(rc.get_primaries()) 

3193 nodes = target_nodes.values() 

3194 else: 

3195 raise TypeError( 

3196 "target_nodes type can be one of the following: " 

3197 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

3198 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

3199 f"The passed type is {type(target_nodes)}" 

3200 ) 

3201 return nodes 

3202 

3203 def _determine_nodes( 

3204 self, *args, request_policy: RequestPolicy, **kwargs 

3205 ) -> List["ClusterNode"]: 

3206 # Determine which nodes should be executed the command on. 

3207 # Returns a list of target nodes. 

3208 command = args[0].upper() 

3209 if ( 

3210 len(args) >= 2 

3211 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags 

3212 ): 

3213 command = f"{args[0]} {args[1]}".upper() 

3214 

3215 nodes_flag = kwargs.pop("nodes_flag", None) 

3216 if nodes_flag is not None: 

3217 # nodes flag passed by the user 

3218 command_flag = nodes_flag 

3219 else: 

3220 # get the nodes group for this command if it was predefined 

3221 command_flag = self._pipe.command_flags.get(command) 

3222 

3223 if command_flag in self._pipe._command_flags_mapping: 

3224 request_policy = self._pipe._command_flags_mapping[command_flag] 

3225 

3226 policy_callback = self._pipe._policies_callback_mapping[request_policy] 

3227 

3228 if request_policy == RequestPolicy.DEFAULT_KEYED: 

3229 nodes = policy_callback(command, *args) 

3230 elif request_policy == RequestPolicy.MULTI_SHARD: 

3231 nodes = policy_callback(*args, **kwargs) 

3232 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

3233 nodes = policy_callback(args[0]) 

3234 else: 

3235 nodes = policy_callback() 

3236 

3237 if args[0].lower() == "ft.aggregate": 

3238 self._aggregate_nodes = nodes 

3239 

3240 return nodes 

3241 

3242 def multi(self): 

3243 raise RedisClusterException( 

3244 "method multi() is not supported outside of transactional context" 

3245 ) 

3246 

3247 def discard(self): 

3248 raise RedisClusterException( 

3249 "method discard() is not supported outside of transactional context" 

3250 ) 

3251 

3252 def watch(self, *names): 

3253 raise RedisClusterException( 

3254 "method watch() is not supported outside of transactional context" 

3255 ) 

3256 

3257 def unwatch(self, *names): 

3258 raise RedisClusterException( 

3259 "method unwatch() is not supported outside of transactional context" 

3260 ) 

3261 

3262 def delete(self, *names): 

3263 if len(names) != 1: 

3264 raise RedisClusterException( 

3265 "deleting multiple keys is not implemented in pipeline command" 

3266 ) 

3267 

3268 return self.execute_command("DEL", names[0]) 

3269 

3270 def unlink(self, *names): 

3271 if len(names) != 1: 

3272 raise RedisClusterException( 

3273 "unlinking multiple keys is not implemented in pipeline command" 

3274 ) 

3275 

3276 return self.execute_command("UNLINK", names[0]) 

3277 

3278 

3279class TransactionStrategy(AbstractStrategy): 

3280 NO_SLOTS_COMMANDS = {"UNWATCH"} 

3281 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

3282 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

3283 SLOT_REDIRECT_ERRORS = (AskError, MovedError) 

3284 CONNECTION_ERRORS = ( 

3285 ConnectionError, 

3286 OSError, 

3287 ClusterDownError, 

3288 SlotNotCoveredError, 

3289 ) 

3290 

3291 def __init__(self, pipe: ClusterPipeline): 

3292 super().__init__(pipe) 

3293 self._explicit_transaction = False 

3294 self._watching = False 

3295 self._pipeline_slots: Set[int] = set() 

3296 self._transaction_connection: Optional[Connection] = None 

3297 self._executing = False 

3298 self._retry = copy(self._pipe.retry) 

3299 self._retry.update_supported_errors( 

3300 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS 

3301 ) 

3302 

3303 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]: 

3304 """ 

3305 Find a connection for a pipeline transaction. 

3306 

3307 For running an atomic transaction, watch keys ensure that contents have not been 

3308 altered as long as the watch commands for those keys were sent over the same 

3309 connection. So once we start watching a key, we fetch a connection to the 

3310 node that owns that slot and reuse it. 

3311 """ 

3312 if not self._pipeline_slots: 

3313 raise RedisClusterException( 

3314 "At least a command with a key is needed to identify a node" 

3315 ) 

3316 

3317 node: ClusterNode = self._nodes_manager.get_node_from_slot( 

3318 list(self._pipeline_slots)[0], False 

3319 ) 

3320 redis_node: Redis = self._pipe.get_redis_connection(node) 

3321 if self._transaction_connection: 

3322 if not redis_node.connection_pool.owns_connection( 

3323 self._transaction_connection 

3324 ): 

3325 previous_node = self._nodes_manager.find_connection_owner( 

3326 self._transaction_connection 

3327 ) 

3328 previous_node.connection_pool.release(self._transaction_connection) 

3329 self._transaction_connection = None 

3330 

3331 if not self._transaction_connection: 

3332 self._transaction_connection = get_connection(redis_node) 

3333 

3334 return redis_node, self._transaction_connection 

3335 

3336 def execute_command(self, *args, **kwargs): 

3337 slot_number: Optional[int] = None 

3338 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS: 

3339 slot_number = self._pipe.determine_slot(*args) 

3340 

3341 if ( 

3342 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS 

3343 ) and not self._explicit_transaction: 

3344 if args[0] == "WATCH": 

3345 self._validate_watch() 

3346 

3347 if slot_number is not None: 

3348 if self._pipeline_slots and slot_number not in self._pipeline_slots: 

3349 raise CrossSlotTransactionError( 

3350 "Cannot watch or send commands on different slots" 

3351 ) 

3352 

3353 self._pipeline_slots.add(slot_number) 

3354 elif args[0] not in self.NO_SLOTS_COMMANDS: 

3355 raise RedisClusterException( 

3356 f"Cannot identify slot number for command: {args[0]}," 

3357 "it cannot be triggered in a transaction" 

3358 ) 

3359 

3360 return self._immediate_execute_command(*args, **kwargs) 

3361 else: 

3362 if slot_number is not None: 

3363 self._pipeline_slots.add(slot_number) 

3364 

3365 return self.pipeline_execute_command(*args, **kwargs) 

3366 

3367 def _validate_watch(self): 

3368 if self._explicit_transaction: 

3369 raise RedisError("Cannot issue a WATCH after a MULTI") 

3370 

3371 self._watching = True 

3372 

3373 def _immediate_execute_command(self, *args, **options): 

3374 return self._retry.call_with_retry( 

3375 lambda: self._get_connection_and_send_command(*args, **options), 

3376 self._reinitialize_on_error, 

3377 ) 

3378 

3379 def _get_connection_and_send_command(self, *args, **options): 

3380 redis_node, connection = self._get_client_and_connection_for_transaction() 

3381 return self._send_command_parse_response( 

3382 connection, redis_node, args[0], *args, **options 

3383 ) 

3384 

3385 def _send_command_parse_response( 

3386 self, conn, redis_node: Redis, command_name, *args, **options 

3387 ): 

3388 """ 

3389 Send a command and parse the response 

3390 """ 

3391 

3392 conn.send_command(*args) 

3393 output = redis_node.parse_response(conn, command_name, **options) 

3394 

3395 if command_name in self.UNWATCH_COMMANDS: 

3396 self._watching = False 

3397 return output 

3398 

3399 def _reinitialize_on_error(self, error): 

3400 if self._watching: 

3401 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: 

3402 raise WatchError("Slot rebalancing occurred while watching keys") 

3403 

3404 if ( 

3405 type(error) in self.SLOT_REDIRECT_ERRORS 

3406 or type(error) in self.CONNECTION_ERRORS 

3407 ): 

3408 if self._transaction_connection: 

3409 self._transaction_connection = None 

3410 

3411 self._pipe.reinitialize_counter += 1 

3412 if self._pipe._should_reinitialized(): 

3413 self._nodes_manager.initialize() 

3414 self.reinitialize_counter = 0 

3415 else: 

3416 if isinstance(error, AskError): 

3417 self._nodes_manager.update_moved_exception(error) 

3418 

3419 self._executing = False 

3420 

3421 def _raise_first_error(self, responses, stack): 

3422 """ 

3423 Raise the first exception on the stack 

3424 """ 

3425 for r, cmd in zip(responses, stack): 

3426 if isinstance(r, Exception): 

3427 self.annotate_exception(r, cmd.position + 1, cmd.args) 

3428 raise r 

3429 

3430 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3431 stack = self._command_queue 

3432 if not stack and (not self._watching or not self._pipeline_slots): 

3433 return [] 

3434 

3435 return self._execute_transaction_with_retries(stack, raise_on_error) 

3436 

3437 def _execute_transaction_with_retries( 

3438 self, stack: List["PipelineCommand"], raise_on_error: bool 

3439 ): 

3440 return self._retry.call_with_retry( 

3441 lambda: self._execute_transaction(stack, raise_on_error), 

3442 self._reinitialize_on_error, 

3443 ) 

3444 

3445 def _execute_transaction( 

3446 self, stack: List["PipelineCommand"], raise_on_error: bool 

3447 ): 

3448 if len(self._pipeline_slots) > 1: 

3449 raise CrossSlotTransactionError( 

3450 "All keys involved in a cluster transaction must map to the same slot" 

3451 ) 

3452 

3453 self._executing = True 

3454 

3455 redis_node, connection = self._get_client_and_connection_for_transaction() 

3456 

3457 stack = chain( 

3458 [PipelineCommand(("MULTI",))], 

3459 stack, 

3460 [PipelineCommand(("EXEC",))], 

3461 ) 

3462 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options] 

3463 packed_commands = connection.pack_commands(commands) 

3464 connection.send_packed_command(packed_commands) 

3465 errors = [] 

3466 

3467 # parse off the response for MULTI 

3468 # NOTE: we need to handle ResponseErrors here and continue 

3469 # so that we read all the additional command messages from 

3470 # the socket 

3471 try: 

3472 redis_node.parse_response(connection, "MULTI") 

3473 except ResponseError as e: 

3474 self.annotate_exception(e, 0, "MULTI") 

3475 errors.append(e) 

3476 except self.CONNECTION_ERRORS as cluster_error: 

3477 self.annotate_exception(cluster_error, 0, "MULTI") 

3478 raise 

3479 

3480 # and all the other commands 

3481 for i, command in enumerate(self._command_queue): 

3482 if EMPTY_RESPONSE in command.options: 

3483 errors.append((i, command.options[EMPTY_RESPONSE])) 

3484 else: 

3485 try: 

3486 _ = redis_node.parse_response(connection, "_") 

3487 except self.SLOT_REDIRECT_ERRORS as slot_error: 

3488 self.annotate_exception(slot_error, i + 1, command.args) 

3489 errors.append(slot_error) 

3490 except self.CONNECTION_ERRORS as cluster_error: 

3491 self.annotate_exception(cluster_error, i + 1, command.args) 

3492 raise 

3493 except ResponseError as e: 

3494 self.annotate_exception(e, i + 1, command.args) 

3495 errors.append(e) 

3496 

3497 response = None 

3498 # parse the EXEC. 

3499 try: 

3500 response = redis_node.parse_response(connection, "EXEC") 

3501 except ExecAbortError: 

3502 if errors: 

3503 raise errors[0] 

3504 raise 

3505 

3506 self._executing = False 

3507 

3508 # EXEC clears any watched keys 

3509 self._watching = False 

3510 

3511 if response is None: 

3512 raise WatchError("Watched variable changed.") 

3513 

3514 # put any parse errors into the response 

3515 for i, e in errors: 

3516 response.insert(i, e) 

3517 

3518 if len(response) != len(self._command_queue): 

3519 raise InvalidPipelineStack( 

3520 "Unexpected response length for cluster pipeline EXEC." 

3521 " Command stack was {} but response had length {}".format( 

3522 [c.args[0] for c in self._command_queue], len(response) 

3523 ) 

3524 ) 

3525 

3526 # find any errors in the response and raise if necessary 

3527 if raise_on_error or len(errors) > 0: 

3528 self._raise_first_error( 

3529 response, 

3530 self._command_queue, 

3531 ) 

3532 

3533 # We have to run response callbacks manually 

3534 data = [] 

3535 for r, cmd in zip(response, self._command_queue): 

3536 if not isinstance(r, Exception): 

3537 command_name = cmd.args[0] 

3538 if command_name in self._pipe.cluster_response_callbacks: 

3539 r = self._pipe.cluster_response_callbacks[command_name]( 

3540 r, **cmd.options 

3541 ) 

3542 data.append(r) 

3543 return data 

3544 

3545 def reset(self): 

3546 self._command_queue = [] 

3547 

3548 # make sure to reset the connection state in the event that we were 

3549 # watching something 

3550 if self._transaction_connection: 

3551 try: 

3552 if self._watching: 

3553 # call this manually since our unwatch or 

3554 # immediate_execute_command methods can call reset() 

3555 self._transaction_connection.send_command("UNWATCH") 

3556 self._transaction_connection.read_response() 

3557 # we can safely return the connection to the pool here since we're 

3558 # sure we're no longer WATCHing anything 

3559 node = self._nodes_manager.find_connection_owner( 

3560 self._transaction_connection 

3561 ) 

3562 node.redis_connection.connection_pool.release( 

3563 self._transaction_connection 

3564 ) 

3565 self._transaction_connection = None 

3566 except self.CONNECTION_ERRORS: 

3567 # disconnect will also remove any previous WATCHes 

3568 if self._transaction_connection: 

3569 self._transaction_connection.disconnect() 

3570 

3571 # clean up the other instance attributes 

3572 self._watching = False 

3573 self._explicit_transaction = False 

3574 self._pipeline_slots = set() 

3575 self._executing = False 

3576 

3577 def send_cluster_commands( 

3578 self, stack, raise_on_error=True, allow_redirections=True 

3579 ): 

3580 raise NotImplementedError( 

3581 "send_cluster_commands cannot be executed in transactional context." 

3582 ) 

3583 

3584 def multi(self): 

3585 if self._explicit_transaction: 

3586 raise RedisError("Cannot issue nested calls to MULTI") 

3587 if self._command_queue: 

3588 raise RedisError( 

3589 "Commands without an initial WATCH have already been issued" 

3590 ) 

3591 self._explicit_transaction = True 

3592 

3593 def watch(self, *names): 

3594 if self._explicit_transaction: 

3595 raise RedisError("Cannot issue a WATCH after a MULTI") 

3596 

3597 return self.execute_command("WATCH", *names) 

3598 

3599 def unwatch(self): 

3600 if self._watching: 

3601 return self.execute_command("UNWATCH") 

3602 

3603 return True 

3604 

3605 def discard(self): 

3606 self.reset() 

3607 

3608 def delete(self, *names): 

3609 return self.execute_command("DEL", *names) 

3610 

3611 def unlink(self, *names): 

3612 return self.execute_command("UNLINK", *names)