Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1420 statements  

1import random 

2import socket 

3import sys 

4import threading 

5import time 

6from abc import ABC, abstractmethod 

7from collections import OrderedDict 

8from copy import copy 

9from enum import Enum 

10from itertools import chain 

11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union 

12 

13from redis._parsers import CommandsParser, Encoder 

14from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy 

15from redis._parsers.helpers import parse_scan 

16from redis.backoff import ExponentialWithJitterBackoff, NoBackoff 

17from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface 

18from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis 

19from redis.commands import READ_COMMANDS, RedisClusterCommands 

20from redis.commands.helpers import list_or_args 

21from redis.commands.policies import PolicyResolver, StaticPolicyResolver 

22from redis.connection import ( 

23 Connection, 

24 ConnectionPool, 

25 parse_url, 

26) 

27from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

28from redis.event import ( 

29 AfterPooledConnectionsInstantiationEvent, 

30 AfterPubSubConnectionInstantiationEvent, 

31 ClientType, 

32 EventDispatcher, 

33) 

34from redis.exceptions import ( 

35 AskError, 

36 AuthenticationError, 

37 ClusterDownError, 

38 ClusterError, 

39 ConnectionError, 

40 CrossSlotTransactionError, 

41 DataError, 

42 ExecAbortError, 

43 InvalidPipelineStack, 

44 MaxConnectionsError, 

45 MovedError, 

46 RedisClusterException, 

47 RedisError, 

48 ResponseError, 

49 SlotNotCoveredError, 

50 TimeoutError, 

51 TryAgainError, 

52 WatchError, 

53) 

54from redis.lock import Lock 

55from redis.maint_notifications import MaintNotificationsConfig 

56from redis.retry import Retry 

57from redis.utils import ( 

58 deprecated_args, 

59 dict_merge, 

60 list_keys_to_dict, 

61 merge_result, 

62 safe_str, 

63 str_if_bytes, 

64 truncate_text, 

65) 

66 

67 

68def get_node_name(host: str, port: Union[str, int]) -> str: 

69 return f"{host}:{port}" 

70 

71 

72@deprecated_args( 

73 allowed_args=["redis_node"], 

74 reason="Use get_connection(redis_node) instead", 

75 version="5.3.0", 

76) 

77def get_connection(redis_node: Redis, *args, **options) -> Connection: 

78 return redis_node.connection or redis_node.connection_pool.get_connection() 

79 

80 

81def parse_scan_result(command, res, **options): 

82 cursors = {} 

83 ret = [] 

84 for node_name, response in res.items(): 

85 cursor, r = parse_scan(response, **options) 

86 cursors[node_name] = cursor 

87 ret += r 

88 

89 return cursors, ret 

90 

91 

92def parse_pubsub_numsub(command, res, **options): 

93 numsub_d = OrderedDict() 

94 for numsub_tups in res.values(): 

95 for channel, numsubbed in numsub_tups: 

96 try: 

97 numsub_d[channel] += numsubbed 

98 except KeyError: 

99 numsub_d[channel] = numsubbed 

100 

101 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

102 return ret_numsub 

103 

104 

105def parse_cluster_slots( 

106 resp: Any, **options: Any 

107) -> Dict[Tuple[int, int], Dict[str, Any]]: 

108 current_host = options.get("current_host", "") 

109 

110 def fix_server(*args: Any) -> Tuple[str, Any]: 

111 return str_if_bytes(args[0]) or current_host, args[1] 

112 

113 slots = {} 

114 for slot in resp: 

115 start, end, primary = slot[:3] 

116 replicas = slot[3:] 

117 slots[start, end] = { 

118 "primary": fix_server(*primary), 

119 "replicas": [fix_server(*replica) for replica in replicas], 

120 } 

121 

122 return slots 

123 

124 

125def parse_cluster_shards(resp, **options): 

126 """ 

127 Parse CLUSTER SHARDS response. 

128 """ 

129 if isinstance(resp[0], dict): 

130 return resp 

131 shards = [] 

132 for x in resp: 

133 shard = {"slots": [], "nodes": []} 

134 for i in range(0, len(x[1]), 2): 

135 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

136 nodes = x[3] 

137 for node in nodes: 

138 dict_node = {} 

139 for i in range(0, len(node), 2): 

140 dict_node[node[i]] = node[i + 1] 

141 shard["nodes"].append(dict_node) 

142 shards.append(shard) 

143 

144 return shards 

145 

146 

147def parse_cluster_myshardid(resp, **options): 

148 """ 

149 Parse CLUSTER MYSHARDID response. 

150 """ 

151 return resp.decode("utf-8") 

152 

153 

154PRIMARY = "primary" 

155REPLICA = "replica" 

156SLOT_ID = "slot-id" 

157 

158REDIS_ALLOWED_KEYS = ( 

159 "connection_class", 

160 "connection_pool", 

161 "connection_pool_class", 

162 "client_name", 

163 "credential_provider", 

164 "db", 

165 "decode_responses", 

166 "encoding", 

167 "encoding_errors", 

168 "host", 

169 "lib_name", 

170 "lib_version", 

171 "max_connections", 

172 "nodes_flag", 

173 "redis_connect_func", 

174 "password", 

175 "port", 

176 "timeout", 

177 "queue_class", 

178 "retry", 

179 "retry_on_timeout", 

180 "protocol", 

181 "socket_connect_timeout", 

182 "socket_keepalive", 

183 "socket_keepalive_options", 

184 "socket_timeout", 

185 "ssl", 

186 "ssl_ca_certs", 

187 "ssl_ca_data", 

188 "ssl_certfile", 

189 "ssl_cert_reqs", 

190 "ssl_include_verify_flags", 

191 "ssl_exclude_verify_flags", 

192 "ssl_keyfile", 

193 "ssl_password", 

194 "ssl_check_hostname", 

195 "unix_socket_path", 

196 "username", 

197 "cache", 

198 "cache_config", 

199) 

200KWARGS_DISABLED_KEYS = ("host", "port", "retry") 

201 

202 

203def cleanup_kwargs(**kwargs): 

204 """ 

205 Remove unsupported or disabled keys from kwargs 

206 """ 

207 connection_kwargs = { 

208 k: v 

209 for k, v in kwargs.items() 

210 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

211 } 

212 

213 return connection_kwargs 

214 

215 

216class AbstractRedisCluster: 

217 RedisClusterRequestTTL = 16 

218 

219 PRIMARIES = "primaries" 

220 REPLICAS = "replicas" 

221 ALL_NODES = "all" 

222 RANDOM = "random" 

223 DEFAULT_NODE = "default-node" 

224 

225 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

226 

227 COMMAND_FLAGS = dict_merge( 

228 list_keys_to_dict( 

229 [ 

230 "ACL CAT", 

231 "ACL DELUSER", 

232 "ACL DRYRUN", 

233 "ACL GENPASS", 

234 "ACL GETUSER", 

235 "ACL HELP", 

236 "ACL LIST", 

237 "ACL LOG", 

238 "ACL LOAD", 

239 "ACL SAVE", 

240 "ACL SETUSER", 

241 "ACL USERS", 

242 "ACL WHOAMI", 

243 "AUTH", 

244 "CLIENT LIST", 

245 "CLIENT SETINFO", 

246 "CLIENT SETNAME", 

247 "CLIENT GETNAME", 

248 "CONFIG SET", 

249 "CONFIG REWRITE", 

250 "CONFIG RESETSTAT", 

251 "TIME", 

252 "PUBSUB CHANNELS", 

253 "PUBSUB NUMPAT", 

254 "PUBSUB NUMSUB", 

255 "PUBSUB SHARDCHANNELS", 

256 "PUBSUB SHARDNUMSUB", 

257 "PING", 

258 "INFO", 

259 "SHUTDOWN", 

260 "KEYS", 

261 "DBSIZE", 

262 "BGSAVE", 

263 "SLOWLOG GET", 

264 "SLOWLOG LEN", 

265 "SLOWLOG RESET", 

266 "WAIT", 

267 "WAITAOF", 

268 "SAVE", 

269 "MEMORY PURGE", 

270 "MEMORY MALLOC-STATS", 

271 "MEMORY STATS", 

272 "LASTSAVE", 

273 "CLIENT TRACKINGINFO", 

274 "CLIENT PAUSE", 

275 "CLIENT UNPAUSE", 

276 "CLIENT UNBLOCK", 

277 "CLIENT ID", 

278 "CLIENT REPLY", 

279 "CLIENT GETREDIR", 

280 "CLIENT INFO", 

281 "CLIENT KILL", 

282 "READONLY", 

283 "CLUSTER INFO", 

284 "CLUSTER MEET", 

285 "CLUSTER MYSHARDID", 

286 "CLUSTER NODES", 

287 "CLUSTER REPLICAS", 

288 "CLUSTER RESET", 

289 "CLUSTER SET-CONFIG-EPOCH", 

290 "CLUSTER SLOTS", 

291 "CLUSTER SHARDS", 

292 "CLUSTER COUNT-FAILURE-REPORTS", 

293 "CLUSTER KEYSLOT", 

294 "COMMAND", 

295 "COMMAND COUNT", 

296 "COMMAND LIST", 

297 "COMMAND GETKEYS", 

298 "CONFIG GET", 

299 "DEBUG", 

300 "RANDOMKEY", 

301 "READONLY", 

302 "READWRITE", 

303 "TIME", 

304 "TFUNCTION LOAD", 

305 "TFUNCTION DELETE", 

306 "TFUNCTION LIST", 

307 "TFCALL", 

308 "TFCALLASYNC", 

309 "LATENCY HISTORY", 

310 "LATENCY LATEST", 

311 "LATENCY RESET", 

312 "MODULE LIST", 

313 "MODULE LOAD", 

314 "MODULE UNLOAD", 

315 "MODULE LOADEX", 

316 ], 

317 DEFAULT_NODE, 

318 ), 

319 list_keys_to_dict( 

320 [ 

321 "FLUSHALL", 

322 "FLUSHDB", 

323 "FUNCTION DELETE", 

324 "FUNCTION FLUSH", 

325 "FUNCTION LIST", 

326 "FUNCTION LOAD", 

327 "FUNCTION RESTORE", 

328 "SCAN", 

329 "SCRIPT EXISTS", 

330 "SCRIPT FLUSH", 

331 "SCRIPT LOAD", 

332 ], 

333 PRIMARIES, 

334 ), 

335 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

336 list_keys_to_dict( 

337 [ 

338 "CLUSTER COUNTKEYSINSLOT", 

339 "CLUSTER DELSLOTS", 

340 "CLUSTER DELSLOTSRANGE", 

341 "CLUSTER GETKEYSINSLOT", 

342 "CLUSTER SETSLOT", 

343 ], 

344 SLOT_ID, 

345 ), 

346 ) 

347 

348 SEARCH_COMMANDS = ( 

349 [ 

350 "FT.CREATE", 

351 "FT.SEARCH", 

352 "FT.AGGREGATE", 

353 "FT.EXPLAIN", 

354 "FT.EXPLAINCLI", 

355 "FT,PROFILE", 

356 "FT.ALTER", 

357 "FT.DROPINDEX", 

358 "FT.ALIASADD", 

359 "FT.ALIASUPDATE", 

360 "FT.ALIASDEL", 

361 "FT.TAGVALS", 

362 "FT.SUGADD", 

363 "FT.SUGGET", 

364 "FT.SUGDEL", 

365 "FT.SUGLEN", 

366 "FT.SYNUPDATE", 

367 "FT.SYNDUMP", 

368 "FT.SPELLCHECK", 

369 "FT.DICTADD", 

370 "FT.DICTDEL", 

371 "FT.DICTDUMP", 

372 "FT.INFO", 

373 "FT._LIST", 

374 "FT.CONFIG", 

375 "FT.ADD", 

376 "FT.DEL", 

377 "FT.DROP", 

378 "FT.GET", 

379 "FT.MGET", 

380 "FT.SYNADD", 

381 ], 

382 ) 

383 

384 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

385 "CLUSTER SLOTS": parse_cluster_slots, 

386 "CLUSTER SHARDS": parse_cluster_shards, 

387 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

388 } 

389 

390 RESULT_CALLBACKS = dict_merge( 

391 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub), 

392 list_keys_to_dict( 

393 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

394 ), 

395 list_keys_to_dict( 

396 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result 

397 ), 

398 list_keys_to_dict( 

399 [ 

400 "PING", 

401 "CONFIG SET", 

402 "CONFIG REWRITE", 

403 "CONFIG RESETSTAT", 

404 "CLIENT SETNAME", 

405 "BGSAVE", 

406 "SLOWLOG RESET", 

407 "SAVE", 

408 "MEMORY PURGE", 

409 "CLIENT PAUSE", 

410 "CLIENT UNPAUSE", 

411 ], 

412 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

413 ), 

414 list_keys_to_dict( 

415 ["DBSIZE", "WAIT"], 

416 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

417 ), 

418 list_keys_to_dict( 

419 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

420 ), 

421 list_keys_to_dict(["SCAN"], parse_scan_result), 

422 list_keys_to_dict( 

423 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

424 ), 

425 list_keys_to_dict( 

426 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

427 ), 

428 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

429 ) 

430 

431 ERRORS_ALLOW_RETRY = ( 

432 ConnectionError, 

433 TimeoutError, 

434 ClusterDownError, 

435 SlotNotCoveredError, 

436 ) 

437 

438 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

439 """Replace the default cluster node. 

440 A random cluster node will be chosen if target_node isn't passed, and primaries 

441 will be prioritized. The default node will not be changed if there are no other 

442 nodes in the cluster. 

443 

444 Args: 

445 target_node (ClusterNode, optional): Target node to replace the default 

446 node. Defaults to None. 

447 """ 

448 if target_node: 

449 self.nodes_manager.default_node = target_node 

450 else: 

451 curr_node = self.get_default_node() 

452 primaries = [node for node in self.get_primaries() if node != curr_node] 

453 if primaries: 

454 # Choose a primary if the cluster contains different primaries 

455 self.nodes_manager.default_node = random.choice(primaries) 

456 else: 

457 # Otherwise, choose a primary if the cluster contains different primaries 

458 replicas = [node for node in self.get_replicas() if node != curr_node] 

459 if replicas: 

460 self.nodes_manager.default_node = random.choice(replicas) 

461 

462 

463class RedisCluster(AbstractRedisCluster, RedisClusterCommands): 

464 @classmethod 

465 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": 

466 """ 

467 Return a Redis client object configured from the given URL 

468 

469 For example:: 

470 

471 redis://[[username]:[password]]@localhost:6379/0 

472 rediss://[[username]:[password]]@localhost:6379/0 

473 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

474 

475 Three URL schemes are supported: 

476 

477 - `redis://` creates a TCP socket connection. See more at: 

478 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

479 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

480 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

481 - ``unix://``: creates a Unix Domain Socket connection. 

482 

483 The username, password, hostname, path and all querystring values 

484 are passed through urllib.parse.unquote in order to replace any 

485 percent-encoded values with their corresponding characters. 

486 

487 There are several ways to specify a database number. The first value 

488 found will be used: 

489 

490 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

491 2. If using the redis:// or rediss:// schemes, the path argument 

492 of the url, e.g. redis://localhost/0 

493 3. A ``db`` keyword argument to this function. 

494 

495 If none of these options are specified, the default db=0 is used. 

496 

497 All querystring options are cast to their appropriate Python types. 

498 Boolean arguments can be specified with string values "True"/"False" 

499 or "Yes"/"No". Values that cannot be properly cast cause a 

500 ``ValueError`` to be raised. Once parsed, the querystring arguments 

501 and keyword arguments are passed to the ``ConnectionPool``'s 

502 class initializer. In the case of conflicting arguments, querystring 

503 arguments always win. 

504 

505 """ 

506 return cls(url=url, **kwargs) 

507 

508 @deprecated_args( 

509 args_to_warn=["read_from_replicas"], 

510 reason="Please configure the 'load_balancing_strategy' instead", 

511 version="5.3.0", 

512 ) 

513 @deprecated_args( 

514 args_to_warn=[ 

515 "cluster_error_retry_attempts", 

516 ], 

517 reason="Please configure the 'retry' object instead", 

518 version="6.0.0", 

519 ) 

520 def __init__( 

521 self, 

522 host: Optional[str] = None, 

523 port: int = 6379, 

524 startup_nodes: Optional[List["ClusterNode"]] = None, 

525 cluster_error_retry_attempts: int = 3, 

526 retry: Optional["Retry"] = None, 

527 require_full_coverage: bool = True, 

528 reinitialize_steps: int = 5, 

529 read_from_replicas: bool = False, 

530 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None, 

531 dynamic_startup_nodes: bool = True, 

532 url: Optional[str] = None, 

533 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

534 cache: Optional[CacheInterface] = None, 

535 cache_config: Optional[CacheConfig] = None, 

536 event_dispatcher: Optional[EventDispatcher] = None, 

537 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

538 **kwargs, 

539 ): 

540 """ 

541 Initialize a new RedisCluster client. 

542 

543 :param startup_nodes: 

544 List of nodes from which initial bootstrapping can be done 

545 :param host: 

546 Can be used to point to a startup node 

547 :param port: 

548 Can be used to point to a startup node 

549 :param require_full_coverage: 

550 When set to False (default value): the client will not require a 

551 full coverage of the slots. However, if not all slots are covered, 

552 and at least one node has 'cluster-require-full-coverage' set to 

553 'yes,' the server will throw a ClusterDownError for some key-based 

554 commands. See - 

555 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

556 When set to True: all slots must be covered to construct the 

557 cluster client. If not all slots are covered, RedisClusterException 

558 will be thrown. 

559 :param read_from_replicas: 

560 @deprecated - please use load_balancing_strategy instead 

561 Enable read from replicas in READONLY mode. You can read possibly 

562 stale data. 

563 When set to true, read commands will be assigned between the 

564 primary and its replications in a Round-Robin manner. 

565 :param load_balancing_strategy: 

566 Enable read from replicas in READONLY mode and defines the load balancing 

567 strategy that will be used for cluster node selection. 

568 The data read from replicas is eventually consistent with the data in primary nodes. 

569 :param dynamic_startup_nodes: 

570 Set the RedisCluster's startup nodes to all of the discovered nodes. 

571 If true (default value), the cluster's discovered nodes will be used to 

572 determine the cluster nodes-slots mapping in the next topology refresh. 

573 It will remove the initial passed startup nodes if their endpoints aren't 

574 listed in the CLUSTER SLOTS output. 

575 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

576 specific IP addresses, it is best to set it to false. 

577 :param cluster_error_retry_attempts: 

578 @deprecated - Please configure the 'retry' object instead 

579 In case 'retry' object is set - this argument is ignored! 

580 

581 Number of times to retry before raising an error when 

582 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or 

583 :class:`~.ClusterDownError` are encountered 

584 :param retry: 

585 A retry object that defines the retry strategy and the number of 

586 retries for the cluster client. 

587 In current implementation for the cluster client (starting form redis-py version 6.0.0) 

588 the retry object is not yet fully utilized, instead it is used just to determine 

589 the number of retries for the cluster client. 

590 In the future releases the retry object will be used to handle the cluster client retries! 

591 :param reinitialize_steps: 

592 Specifies the number of MOVED errors that need to occur before 

593 reinitializing the whole cluster topology. If a MOVED error occurs 

594 and the cluster does not need to be reinitialized on this current 

595 error handling, only the MOVED slot will be patched with the 

596 redirected node. 

597 To reinitialize the cluster on every MOVED error, set 

598 reinitialize_steps to 1. 

599 To avoid reinitializing the cluster on moved errors, set 

600 reinitialize_steps to 0. 

601 :param address_remap: 

602 An optional callable which, when provided with an internal network 

603 address of a node, e.g. a `(host, port)` tuple, will return the address 

604 where the node is reachable. This can be used to map the addresses at 

605 which the nodes _think_ they are, to addresses at which a client may 

606 reach them, such as when they sit behind a proxy. 

607 

608 :**kwargs: 

609 Extra arguments that will be sent into Redis instance when created 

610 (See Official redis-py doc for supported kwargs - the only limitation 

611 is that you can't provide 'retry' object as part of kwargs. 

612 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

613 Some kwargs are not supported and will raise a 

614 RedisClusterException: 

615 - db (Redis do not support database SELECT in cluster mode) 

616 """ 

617 if startup_nodes is None: 

618 startup_nodes = [] 

619 

620 if "db" in kwargs: 

621 # Argument 'db' is not possible to use in cluster mode 

622 raise RedisClusterException( 

623 "Argument 'db' is not possible to use in cluster mode" 

624 ) 

625 

626 if "retry" in kwargs: 

627 # Argument 'retry' is not possible to be used in kwargs when in cluster mode 

628 # the kwargs are set to the lower level connections to the cluster nodes 

629 # and there we provide retry configuration without retries allowed. 

630 # The retries should be handled on cluster client level. 

631 raise RedisClusterException( 

632 "The 'retry' argument cannot be used in kwargs when running in cluster mode." 

633 ) 

634 

635 # Get the startup node/s 

636 from_url = False 

637 if url is not None: 

638 from_url = True 

639 url_options = parse_url(url) 

640 if "path" in url_options: 

641 raise RedisClusterException( 

642 "RedisCluster does not currently support Unix Domain " 

643 "Socket connections" 

644 ) 

645 if "db" in url_options and url_options["db"] != 0: 

646 # Argument 'db' is not possible to use in cluster mode 

647 raise RedisClusterException( 

648 "A ``db`` querystring option can only be 0 in cluster mode" 

649 ) 

650 kwargs.update(url_options) 

651 host = kwargs.get("host") 

652 port = kwargs.get("port", port) 

653 startup_nodes.append(ClusterNode(host, port)) 

654 elif host is not None and port is not None: 

655 startup_nodes.append(ClusterNode(host, port)) 

656 elif len(startup_nodes) == 0: 

657 # No startup node was provided 

658 raise RedisClusterException( 

659 "RedisCluster requires at least one node to discover the " 

660 "cluster. Please provide one of the followings:\n" 

661 "1. host and port, for example:\n" 

662 " RedisCluster(host='localhost', port=6379)\n" 

663 "2. list of startup nodes, for example:\n" 

664 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

665 " ClusterNode('localhost', 6378)])" 

666 ) 

667 # Update the connection arguments 

668 # Whenever a new connection is established, RedisCluster's on_connect 

669 # method should be run 

670 # If the user passed on_connect function we'll save it and run it 

671 # inside the RedisCluster.on_connect() function 

672 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

673 kwargs.update({"redis_connect_func": self.on_connect}) 

674 kwargs = cleanup_kwargs(**kwargs) 

675 if retry: 

676 self.retry = retry 

677 else: 

678 self.retry = Retry( 

679 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

680 retries=cluster_error_retry_attempts, 

681 ) 

682 

683 self.encoder = Encoder( 

684 kwargs.get("encoding", "utf-8"), 

685 kwargs.get("encoding_errors", "strict"), 

686 kwargs.get("decode_responses", False), 

687 ) 

688 protocol = kwargs.get("protocol", None) 

689 if (cache_config or cache) and protocol not in [3, "3"]: 

690 raise RedisError("Client caching is only supported with RESP version 3") 

691 

692 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

693 self.node_flags = self.__class__.NODE_FLAGS.copy() 

694 self.read_from_replicas = read_from_replicas 

695 self.load_balancing_strategy = load_balancing_strategy 

696 self.reinitialize_counter = 0 

697 self.reinitialize_steps = reinitialize_steps 

698 if event_dispatcher is None: 

699 self._event_dispatcher = EventDispatcher() 

700 else: 

701 self._event_dispatcher = event_dispatcher 

702 self.startup_nodes = startup_nodes 

703 self.nodes_manager = NodesManager( 

704 startup_nodes=startup_nodes, 

705 from_url=from_url, 

706 require_full_coverage=require_full_coverage, 

707 dynamic_startup_nodes=dynamic_startup_nodes, 

708 address_remap=address_remap, 

709 cache=cache, 

710 cache_config=cache_config, 

711 event_dispatcher=self._event_dispatcher, 

712 **kwargs, 

713 ) 

714 

715 self.cluster_response_callbacks = CaseInsensitiveDict( 

716 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

717 ) 

718 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

719 

720 # For backward compatibility, mapping from existing policies to new one 

721 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

722 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

723 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

724 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

725 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

726 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

727 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

728 } 

729 

730 self._policies_callback_mapping: dict[ 

731 Union[RequestPolicy, ResponsePolicy], Callable 

732 ] = { 

733 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

734 self.get_random_primary_or_all_nodes(command_name) 

735 ], 

736 RequestPolicy.DEFAULT_KEYED: lambda command, 

737 *args: self.get_nodes_from_slot(command, *args), 

738 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

739 RequestPolicy.ALL_SHARDS: self.get_primaries, 

740 RequestPolicy.ALL_NODES: self.get_nodes, 

741 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

742 RequestPolicy.MULTI_SHARD: lambda *args, 

743 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

744 RequestPolicy.SPECIAL: self.get_special_nodes, 

745 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

746 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

747 } 

748 

749 self._policy_resolver = policy_resolver 

750 self.commands_parser = CommandsParser(self) 

751 

752 # Node where FT.AGGREGATE command is executed. 

753 self._aggregate_nodes = None 

754 self._lock = threading.RLock() 

755 

756 def __enter__(self): 

757 return self 

758 

759 def __exit__(self, exc_type, exc_value, traceback): 

760 self.close() 

761 

762 def __del__(self): 

763 try: 

764 self.close() 

765 except Exception: 

766 pass 

767 

768 def disconnect_connection_pools(self): 

769 for node in self.get_nodes(): 

770 if node.redis_connection: 

771 try: 

772 node.redis_connection.connection_pool.disconnect() 

773 except OSError: 

774 # Client was already disconnected. do nothing 

775 pass 

776 

777 def on_connect(self, connection): 

778 """ 

779 Initialize the connection, authenticate and select a database and send 

780 READONLY if it is set during object initialization. 

781 """ 

782 connection.on_connect() 

783 

784 if self.read_from_replicas or self.load_balancing_strategy: 

785 # Sending READONLY command to server to configure connection as 

786 # readonly. Since each cluster node may change its server type due 

787 # to a failover, we should establish a READONLY connection 

788 # regardless of the server type. If this is a primary connection, 

789 # READONLY would not affect executing write commands. 

790 connection.send_command("READONLY") 

791 if str_if_bytes(connection.read_response()) != "OK": 

792 raise ConnectionError("READONLY command failed") 

793 

794 if self.user_on_connect_func is not None: 

795 self.user_on_connect_func(connection) 

796 

797 def get_redis_connection(self, node: "ClusterNode") -> Redis: 

798 if not node.redis_connection: 

799 with self._lock: 

800 if not node.redis_connection: 

801 self.nodes_manager.create_redis_connections([node]) 

802 return node.redis_connection 

803 

804 def get_node(self, host=None, port=None, node_name=None): 

805 return self.nodes_manager.get_node(host, port, node_name) 

806 

807 def get_primaries(self): 

808 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

809 

810 def get_replicas(self): 

811 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

812 

813 def get_random_node(self): 

814 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

815 

816 def get_random_primary_or_all_nodes(self, command_name): 

817 """ 

818 Returns random primary or all nodes depends on READONLY mode. 

819 """ 

820 if self.read_from_replicas and command_name in READ_COMMANDS: 

821 return self.get_random_node() 

822 

823 return self.get_random_primary_node() 

824 

825 def get_nodes(self): 

826 return list(self.nodes_manager.nodes_cache.values()) 

827 

828 def get_node_from_key(self, key, replica=False): 

829 """ 

830 Get the node that holds the key's slot. 

831 If replica set to True but the slot doesn't have any replicas, None is 

832 returned. 

833 """ 

834 slot = self.keyslot(key) 

835 slot_cache = self.nodes_manager.slots_cache.get(slot) 

836 if slot_cache is None or len(slot_cache) == 0: 

837 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

838 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

839 return None 

840 elif replica: 

841 node_idx = 1 

842 else: 

843 # primary 

844 node_idx = 0 

845 

846 return slot_cache[node_idx] 

847 

848 def get_default_node(self): 

849 """ 

850 Get the cluster's default node 

851 """ 

852 return self.nodes_manager.default_node 

853 

854 def get_nodes_from_slot(self, command: str, *args): 

855 """ 

856 Returns a list of nodes that hold the specified keys' slots. 

857 """ 

858 # get the node that holds the key's slot 

859 slot = self.determine_slot(*args) 

860 node = self.nodes_manager.get_node_from_slot( 

861 slot, 

862 self.read_from_replicas and command in READ_COMMANDS, 

863 self.load_balancing_strategy if command in READ_COMMANDS else None, 

864 ) 

865 return [node] 

866 

867 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]: 

868 """ 

869 Splits the command with Multi-Shard policy, to the multiple commands 

870 """ 

871 keys = self._get_command_keys(*args) 

872 commands = [] 

873 

874 for key in keys: 

875 commands.append( 

876 { 

877 "args": (args[0], key), 

878 "kwargs": kwargs, 

879 } 

880 ) 

881 

882 return commands 

883 

884 def get_special_nodes(self) -> Optional[list["ClusterNode"]]: 

885 """ 

886 Returns a list of nodes for commands with a special policy. 

887 """ 

888 if not self._aggregate_nodes: 

889 raise RedisClusterException( 

890 "Cannot execute FT.CURSOR commands without FT.AGGREGATE" 

891 ) 

892 

893 return self._aggregate_nodes 

894 

895 def get_random_primary_node(self) -> "ClusterNode": 

896 """ 

897 Returns a random primary node 

898 """ 

899 return random.choice(self.get_primaries()) 

900 

901 def _evaluate_all_succeeded(self, res): 

902 """ 

903 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED 

904 """ 

905 first_successful_response = None 

906 

907 if isinstance(res, dict): 

908 for key, value in res.items(): 

909 if value: 

910 if first_successful_response is None: 

911 first_successful_response = {key: value} 

912 else: 

913 return {key: False} 

914 else: 

915 for response in res: 

916 if response: 

917 if first_successful_response is None: 

918 # Dynamically resolve type 

919 first_successful_response = type(response)(response) 

920 else: 

921 return type(response)(False) 

922 

923 return first_successful_response 

924 

925 def set_default_node(self, node): 

926 """ 

927 Set the default node of the cluster. 

928 :param node: 'ClusterNode' 

929 :return True if the default node was set, else False 

930 """ 

931 if node is None or self.get_node(node_name=node.name) is None: 

932 return False 

933 self.nodes_manager.default_node = node 

934 return True 

935 

936 def set_retry(self, retry: Retry) -> None: 

937 self.retry = retry 

938 

939 def monitor(self, target_node=None): 

940 """ 

941 Returns a Monitor object for the specified target node. 

942 The default cluster node will be selected if no target node was 

943 specified. 

944 Monitor is useful for handling the MONITOR command to the redis server. 

945 next_command() method returns one command from monitor 

946 listen() method yields commands from monitor. 

947 """ 

948 if target_node is None: 

949 target_node = self.get_default_node() 

950 if target_node.redis_connection is None: 

951 raise RedisClusterException( 

952 f"Cluster Node {target_node.name} has no redis_connection" 

953 ) 

954 return target_node.redis_connection.monitor() 

955 

956 def pubsub(self, node=None, host=None, port=None, **kwargs): 

957 """ 

958 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

959 connected to the specified node 

960 """ 

961 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

962 

963 def pipeline(self, transaction=None, shard_hint=None): 

964 """ 

965 Cluster impl: 

966 Pipelines do not work in cluster mode the same way they 

967 do in normal mode. Create a clone of this object so 

968 that simulating pipelines will work correctly. Each 

969 command will be called directly when used and 

970 when calling execute() will only return the result stack. 

971 """ 

972 if shard_hint: 

973 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

974 

975 return ClusterPipeline( 

976 nodes_manager=self.nodes_manager, 

977 commands_parser=self.commands_parser, 

978 startup_nodes=self.nodes_manager.startup_nodes, 

979 result_callbacks=self.result_callbacks, 

980 cluster_response_callbacks=self.cluster_response_callbacks, 

981 read_from_replicas=self.read_from_replicas, 

982 load_balancing_strategy=self.load_balancing_strategy, 

983 reinitialize_steps=self.reinitialize_steps, 

984 retry=self.retry, 

985 lock=self._lock, 

986 transaction=transaction, 

987 ) 

988 

989 def lock( 

990 self, 

991 name, 

992 timeout=None, 

993 sleep=0.1, 

994 blocking=True, 

995 blocking_timeout=None, 

996 lock_class=None, 

997 thread_local=True, 

998 raise_on_release_error: bool = True, 

999 ): 

1000 """ 

1001 Return a new Lock object using key ``name`` that mimics 

1002 the behavior of threading.Lock. 

1003 

1004 If specified, ``timeout`` indicates a maximum life for the lock. 

1005 By default, it will remain locked until release() is called. 

1006 

1007 ``sleep`` indicates the amount of time to sleep per loop iteration 

1008 when the lock is in blocking mode and another client is currently 

1009 holding the lock. 

1010 

1011 ``blocking`` indicates whether calling ``acquire`` should block until 

1012 the lock has been acquired or to fail immediately, causing ``acquire`` 

1013 to return False and the lock not being acquired. Defaults to True. 

1014 Note this value can be overridden by passing a ``blocking`` 

1015 argument to ``acquire``. 

1016 

1017 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

1018 spend trying to acquire the lock. A value of ``None`` indicates 

1019 continue trying forever. ``blocking_timeout`` can be specified as a 

1020 float or integer, both representing the number of seconds to wait. 

1021 

1022 ``lock_class`` forces the specified lock implementation. Note that as 

1023 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

1024 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

1025 you have created your own custom lock class. 

1026 

1027 ``thread_local`` indicates whether the lock token is placed in 

1028 thread-local storage. By default, the token is placed in thread local 

1029 storage so that a thread only sees its token, not a token set by 

1030 another thread. Consider the following timeline: 

1031 

1032 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

1033 thread-1 sets the token to "abc" 

1034 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

1035 Lock instance. 

1036 time: 5, thread-1 has not yet completed. redis expires the lock 

1037 key. 

1038 time: 5, thread-2 acquired `my-lock` now that it's available. 

1039 thread-2 sets the token to "xyz" 

1040 time: 6, thread-1 finishes its work and calls release(). if the 

1041 token is *not* stored in thread local storage, then 

1042 thread-1 would see the token value as "xyz" and would be 

1043 able to successfully release the thread-2's lock. 

1044 

1045 ``raise_on_release_error`` indicates whether to raise an exception when 

1046 the lock is no longer owned when exiting the context manager. By default, 

1047 this is True, meaning an exception will be raised. If False, the warning 

1048 will be logged and the exception will be suppressed. 

1049 

1050 In some use cases it's necessary to disable thread local storage. For 

1051 example, if you have code where one thread acquires a lock and passes 

1052 that lock instance to a worker thread to release later. If thread 

1053 local storage isn't disabled in this case, the worker thread won't see 

1054 the token set by the thread that acquired the lock. Our assumption 

1055 is that these cases aren't common and as such default to using 

1056 thread local storage.""" 

1057 if lock_class is None: 

1058 lock_class = Lock 

1059 return lock_class( 

1060 self, 

1061 name, 

1062 timeout=timeout, 

1063 sleep=sleep, 

1064 blocking=blocking, 

1065 blocking_timeout=blocking_timeout, 

1066 thread_local=thread_local, 

1067 raise_on_release_error=raise_on_release_error, 

1068 ) 

1069 

1070 def set_response_callback(self, command, callback): 

1071 """Set a custom Response Callback""" 

1072 self.cluster_response_callbacks[command] = callback 

1073 

1074 def _determine_nodes( 

1075 self, *args, request_policy: RequestPolicy, **kwargs 

1076 ) -> List["ClusterNode"]: 

1077 """ 

1078 Determines a nodes the command should be executed on. 

1079 """ 

1080 command = args[0].upper() 

1081 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1082 command = f"{args[0]} {args[1]}".upper() 

1083 

1084 nodes_flag = kwargs.pop("nodes_flag", None) 

1085 if nodes_flag is not None: 

1086 # nodes flag passed by the user 

1087 command_flag = nodes_flag 

1088 else: 

1089 # get the nodes group for this command if it was predefined 

1090 command_flag = self.command_flags.get(command) 

1091 

1092 if command_flag in self._command_flags_mapping: 

1093 request_policy = self._command_flags_mapping[command_flag] 

1094 

1095 policy_callback = self._policies_callback_mapping[request_policy] 

1096 

1097 if request_policy == RequestPolicy.DEFAULT_KEYED: 

1098 nodes = policy_callback(command, *args) 

1099 elif request_policy == RequestPolicy.MULTI_SHARD: 

1100 nodes = policy_callback(*args, **kwargs) 

1101 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

1102 nodes = policy_callback(args[0]) 

1103 else: 

1104 nodes = policy_callback() 

1105 

1106 if args[0].lower() == "ft.aggregate": 

1107 self._aggregate_nodes = nodes 

1108 

1109 return nodes 

1110 

1111 def _should_reinitialized(self): 

1112 # To reinitialize the cluster on every MOVED error, 

1113 # set reinitialize_steps to 1. 

1114 # To avoid reinitializing the cluster on moved errors, set 

1115 # reinitialize_steps to 0. 

1116 if self.reinitialize_steps == 0: 

1117 return False 

1118 else: 

1119 return self.reinitialize_counter % self.reinitialize_steps == 0 

1120 

1121 def keyslot(self, key): 

1122 """ 

1123 Calculate keyslot for a given key. 

1124 See Keys distribution model in https://redis.io/topics/cluster-spec 

1125 """ 

1126 k = self.encoder.encode(key) 

1127 return key_slot(k) 

1128 

1129 def _get_command_keys(self, *args): 

1130 """ 

1131 Get the keys in the command. If the command has no keys in in, None is 

1132 returned. 

1133 

1134 NOTE: Due to a bug in redis<7.0, this function does not work properly 

1135 for EVAL or EVALSHA when the `numkeys` arg is 0. 

1136 - issue: https://github.com/redis/redis/issues/9493 

1137 - fix: https://github.com/redis/redis/pull/9733 

1138 

1139 So, don't use this function with EVAL or EVALSHA. 

1140 """ 

1141 redis_conn = self.get_default_node().redis_connection 

1142 return self.commands_parser.get_keys(redis_conn, *args) 

1143 

1144 def determine_slot(self, *args) -> Optional[int]: 

1145 """ 

1146 Figure out what slot to use based on args. 

1147 

1148 Raises a RedisClusterException if there's a missing key and we can't 

1149 determine what slots to map the command to; or, if the keys don't 

1150 all map to the same key slot. 

1151 """ 

1152 command = args[0] 

1153 if self.command_flags.get(command) == SLOT_ID: 

1154 # The command contains the slot ID 

1155 return args[1] 

1156 

1157 # Get the keys in the command 

1158 

1159 # CLIENT TRACKING is a special case. 

1160 # It doesn't have any keys, it needs to be sent to the provided nodes 

1161 # By default it will be sent to all nodes. 

1162 if command.upper() == "CLIENT TRACKING": 

1163 return None 

1164 

1165 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

1166 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

1167 # where `self._get_command_keys()` fails anyway. So, we special case 

1168 # EVAL/EVALSHA. 

1169 if command.upper() in ("EVAL", "EVALSHA"): 

1170 # command syntax: EVAL "script body" num_keys ... 

1171 if len(args) <= 2: 

1172 raise RedisClusterException(f"Invalid args in command: {args}") 

1173 num_actual_keys = int(args[2]) 

1174 eval_keys = args[3 : 3 + num_actual_keys] 

1175 # if there are 0 keys, that means the script can be run on any node 

1176 # so we can just return a random slot 

1177 if len(eval_keys) == 0: 

1178 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1179 keys = eval_keys 

1180 else: 

1181 keys = self._get_command_keys(*args) 

1182 if keys is None or len(keys) == 0: 

1183 # FCALL can call a function with 0 keys, that means the function 

1184 # can be run on any node so we can just return a random slot 

1185 if command.upper() in ("FCALL", "FCALL_RO"): 

1186 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1187 raise RedisClusterException( 

1188 "No way to dispatch this command to Redis Cluster. " 

1189 "Missing key.\nYou can execute the command by specifying " 

1190 f"target nodes.\nCommand: {args}" 

1191 ) 

1192 

1193 # single key command 

1194 if len(keys) == 1: 

1195 return self.keyslot(keys[0]) 

1196 

1197 # multi-key command; we need to make sure all keys are mapped to 

1198 # the same slot 

1199 slots = {self.keyslot(key) for key in keys} 

1200 if len(slots) != 1: 

1201 raise RedisClusterException( 

1202 f"{command} - all keys must map to the same key slot" 

1203 ) 

1204 

1205 return slots.pop() 

1206 

1207 def get_encoder(self): 

1208 """ 

1209 Get the connections' encoder 

1210 """ 

1211 return self.encoder 

1212 

1213 def get_connection_kwargs(self): 

1214 """ 

1215 Get the connections' key-word arguments 

1216 """ 

1217 return self.nodes_manager.connection_kwargs 

1218 

1219 def _is_nodes_flag(self, target_nodes): 

1220 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1221 

1222 def _parse_target_nodes(self, target_nodes): 

1223 if isinstance(target_nodes, list): 

1224 nodes = target_nodes 

1225 elif isinstance(target_nodes, ClusterNode): 

1226 # Supports passing a single ClusterNode as a variable 

1227 nodes = [target_nodes] 

1228 elif isinstance(target_nodes, dict): 

1229 # Supports dictionaries of the format {node_name: node}. 

1230 # It enables to execute commands with multi nodes as follows: 

1231 # rc.cluster_save_config(rc.get_primaries()) 

1232 nodes = target_nodes.values() 

1233 else: 

1234 raise TypeError( 

1235 "target_nodes type can be one of the following: " 

1236 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1237 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1238 f"The passed type is {type(target_nodes)}" 

1239 ) 

1240 return nodes 

1241 

1242 def execute_command(self, *args, **kwargs): 

1243 return self._internal_execute_command(*args, **kwargs) 

1244 

1245 def _internal_execute_command(self, *args, **kwargs): 

1246 """ 

1247 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1248 

1249 It will try the number of times specified by the retries property from 

1250 config option "self.retry" which defaults to 3 unless manually 

1251 configured. 

1252 

1253 If it reaches the number of times, the command will raise the exception 

1254 

1255 Key argument :target_nodes: can be passed with the following types: 

1256 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1257 ClusterNode 

1258 list<ClusterNode> 

1259 dict<Any, ClusterNode> 

1260 """ 

1261 target_nodes_specified = False 

1262 is_default_node = False 

1263 target_nodes = None 

1264 passed_targets = kwargs.pop("target_nodes", None) 

1265 command_policies = self._policy_resolver.resolve(args[0].lower()) 

1266 

1267 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1268 target_nodes = self._parse_target_nodes(passed_targets) 

1269 target_nodes_specified = True 

1270 

1271 if not command_policies and not target_nodes_specified: 

1272 command = args[0].upper() 

1273 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1274 command = f"{args[0]} {args[1]}".upper() 

1275 

1276 # We only could resolve key properties if command is not 

1277 # in a list of pre-defined request policies 

1278 command_flag = self.command_flags.get(command) 

1279 if not command_flag: 

1280 # Fallback to default policy 

1281 if not self.get_default_node(): 

1282 slot = None 

1283 else: 

1284 slot = self.determine_slot(*args) 

1285 if not slot: 

1286 command_policies = CommandPolicies() 

1287 else: 

1288 command_policies = CommandPolicies( 

1289 request_policy=RequestPolicy.DEFAULT_KEYED, 

1290 response_policy=ResponsePolicy.DEFAULT_KEYED, 

1291 ) 

1292 else: 

1293 if command_flag in self._command_flags_mapping: 

1294 command_policies = CommandPolicies( 

1295 request_policy=self._command_flags_mapping[command_flag] 

1296 ) 

1297 else: 

1298 command_policies = CommandPolicies() 

1299 elif not command_policies and target_nodes_specified: 

1300 command_policies = CommandPolicies() 

1301 

1302 # If an error that allows retrying was thrown, the nodes and slots 

1303 # cache were reinitialized. We will retry executing the command with 

1304 # the updated cluster setup only when the target nodes can be 

1305 # determined again with the new cache tables. Therefore, when target 

1306 # nodes were passed to this function, we cannot retry the command 

1307 # execution since the nodes may not be valid anymore after the tables 

1308 # were reinitialized. So in case of passed target nodes, 

1309 # retry_attempts will be set to 0. 

1310 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries() 

1311 # Add one for the first execution 

1312 execute_attempts = 1 + retry_attempts 

1313 for _ in range(execute_attempts): 

1314 try: 

1315 res = {} 

1316 if not target_nodes_specified: 

1317 # Determine the nodes to execute the command on 

1318 target_nodes = self._determine_nodes( 

1319 *args, 

1320 request_policy=command_policies.request_policy, 

1321 nodes_flag=passed_targets, 

1322 ) 

1323 if not target_nodes: 

1324 raise RedisClusterException( 

1325 f"No targets were found to execute {args} command on" 

1326 ) 

1327 if ( 

1328 len(target_nodes) == 1 

1329 and target_nodes[0] == self.get_default_node() 

1330 ): 

1331 is_default_node = True 

1332 for node in target_nodes: 

1333 res[node.name] = self._execute_command(node, *args, **kwargs) 

1334 

1335 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED: 

1336 break 

1337 

1338 # Return the processed result 

1339 return self._process_result( 

1340 args[0], 

1341 res, 

1342 response_policy=command_policies.response_policy, 

1343 **kwargs, 

1344 ) 

1345 except Exception as e: 

1346 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1347 if is_default_node: 

1348 # Replace the default cluster node 

1349 self.replace_default_node() 

1350 # The nodes and slots cache were reinitialized. 

1351 # Try again with the new cluster setup. 

1352 retry_attempts -= 1 

1353 continue 

1354 else: 

1355 # raise the exception 

1356 raise e 

1357 

1358 def _execute_command(self, target_node, *args, **kwargs): 

1359 """ 

1360 Send a command to a node in the cluster 

1361 """ 

1362 command = args[0] 

1363 redis_node = None 

1364 connection = None 

1365 redirect_addr = None 

1366 asking = False 

1367 moved = False 

1368 ttl = int(self.RedisClusterRequestTTL) 

1369 

1370 while ttl > 0: 

1371 ttl -= 1 

1372 try: 

1373 if asking: 

1374 target_node = self.get_node(node_name=redirect_addr) 

1375 elif moved: 

1376 # MOVED occurred and the slots cache was updated, 

1377 # refresh the target node 

1378 slot = self.determine_slot(*args) 

1379 target_node = self.nodes_manager.get_node_from_slot( 

1380 slot, 

1381 self.read_from_replicas and command in READ_COMMANDS, 

1382 self.load_balancing_strategy 

1383 if command in READ_COMMANDS 

1384 else None, 

1385 ) 

1386 moved = False 

1387 

1388 redis_node = self.get_redis_connection(target_node) 

1389 connection = get_connection(redis_node) 

1390 if asking: 

1391 connection.send_command("ASKING") 

1392 redis_node.parse_response(connection, "ASKING", **kwargs) 

1393 asking = False 

1394 connection.send_command(*args, **kwargs) 

1395 response = redis_node.parse_response(connection, command, **kwargs) 

1396 

1397 # Remove keys entry, it needs only for cache. 

1398 kwargs.pop("keys", None) 

1399 

1400 if command in self.cluster_response_callbacks: 

1401 response = self.cluster_response_callbacks[command]( 

1402 response, **kwargs 

1403 ) 

1404 return response 

1405 except AuthenticationError: 

1406 raise 

1407 except MaxConnectionsError: 

1408 # MaxConnectionsError indicates client-side resource exhaustion 

1409 # (too many connections in the pool), not a node failure. 

1410 # Don't treat this as a node failure - just re-raise the error 

1411 # without reinitializing the cluster. 

1412 raise 

1413 except (ConnectionError, TimeoutError) as e: 

1414 # ConnectionError can also be raised if we couldn't get a 

1415 # connection from the pool before timing out, so check that 

1416 # this is an actual connection before attempting to disconnect. 

1417 if connection is not None: 

1418 connection.disconnect() 

1419 

1420 # Remove the failed node from the startup nodes before we try 

1421 # to reinitialize the cluster 

1422 self.nodes_manager.startup_nodes.pop(target_node.name, None) 

1423 # Reset the cluster node's connection 

1424 target_node.redis_connection = None 

1425 self.nodes_manager.initialize() 

1426 raise e 

1427 except MovedError as e: 

1428 # First, we will try to patch the slots/nodes cache with the 

1429 # redirected node output and try again. If MovedError exceeds 

1430 # 'reinitialize_steps' number of times, we will force 

1431 # reinitializing the tables, and then try again. 

1432 # 'reinitialize_steps' counter will increase faster when 

1433 # the same client object is shared between multiple threads. To 

1434 # reduce the frequency you can set this variable in the 

1435 # RedisCluster constructor. 

1436 self.reinitialize_counter += 1 

1437 if self._should_reinitialized(): 

1438 self.nodes_manager.initialize() 

1439 # Reset the counter 

1440 self.reinitialize_counter = 0 

1441 else: 

1442 self.nodes_manager.update_moved_exception(e) 

1443 moved = True 

1444 except TryAgainError: 

1445 if ttl < self.RedisClusterRequestTTL / 2: 

1446 time.sleep(0.05) 

1447 except AskError as e: 

1448 redirect_addr = get_node_name(host=e.host, port=e.port) 

1449 asking = True 

1450 except (ClusterDownError, SlotNotCoveredError): 

1451 # ClusterDownError can occur during a failover and to get 

1452 # self-healed, we will try to reinitialize the cluster layout 

1453 # and retry executing the command 

1454 

1455 # SlotNotCoveredError can occur when the cluster is not fully 

1456 # initialized or can be temporary issue. 

1457 # We will try to reinitialize the cluster topology 

1458 # and retry executing the command 

1459 

1460 time.sleep(0.25) 

1461 self.nodes_manager.initialize() 

1462 raise 

1463 except ResponseError: 

1464 raise 

1465 except Exception as e: 

1466 if connection: 

1467 connection.disconnect() 

1468 raise e 

1469 finally: 

1470 if connection is not None: 

1471 redis_node.connection_pool.release(connection) 

1472 

1473 raise ClusterError("TTL exhausted.") 

1474 

1475 def close(self) -> None: 

1476 try: 

1477 with self._lock: 

1478 if self.nodes_manager: 

1479 self.nodes_manager.close() 

1480 except AttributeError: 

1481 # RedisCluster's __init__ can fail before nodes_manager is set 

1482 pass 

1483 

1484 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs): 

1485 """ 

1486 Process the result of the executed command. 

1487 The function would return a dict or a single value. 

1488 

1489 :type command: str 

1490 :type res: dict 

1491 

1492 `res` should be in the following format: 

1493 Dict<node_name, command_result> 

1494 """ 

1495 if command in self.result_callbacks: 

1496 res = self.result_callbacks[command](command, res, **kwargs) 

1497 elif len(res) == 1: 

1498 # When we execute the command on a single node, we can 

1499 # remove the dictionary and return a single response 

1500 res = list(res.values())[0] 

1501 

1502 return self._policies_callback_mapping[response_policy](res) 

1503 

1504 def load_external_module(self, funcname, func): 

1505 """ 

1506 This function can be used to add externally defined redis modules, 

1507 and their namespaces to the redis client. 

1508 

1509 ``funcname`` - A string containing the name of the function to create 

1510 ``func`` - The function, being added to this class. 

1511 """ 

1512 setattr(self, funcname, func) 

1513 

1514 def transaction(self, func, *watches, **kwargs): 

1515 """ 

1516 Convenience method for executing the callable `func` as a transaction 

1517 while watching all keys specified in `watches`. The 'func' callable 

1518 should expect a single argument which is a Pipeline object. 

1519 """ 

1520 shard_hint = kwargs.pop("shard_hint", None) 

1521 value_from_callable = kwargs.pop("value_from_callable", False) 

1522 watch_delay = kwargs.pop("watch_delay", None) 

1523 with self.pipeline(True, shard_hint) as pipe: 

1524 while True: 

1525 try: 

1526 if watches: 

1527 pipe.watch(*watches) 

1528 func_value = func(pipe) 

1529 exec_value = pipe.execute() 

1530 return func_value if value_from_callable else exec_value 

1531 except WatchError: 

1532 if watch_delay is not None and watch_delay > 0: 

1533 time.sleep(watch_delay) 

1534 continue 

1535 

1536 

1537class ClusterNode: 

1538 def __init__(self, host, port, server_type=None, redis_connection=None): 

1539 if host == "localhost": 

1540 host = socket.gethostbyname(host) 

1541 

1542 self.host = host 

1543 self.port = port 

1544 self.name = get_node_name(host, port) 

1545 self.server_type = server_type 

1546 self.redis_connection = redis_connection 

1547 

1548 def __repr__(self): 

1549 return ( 

1550 f"[host={self.host}," 

1551 f"port={self.port}," 

1552 f"name={self.name}," 

1553 f"server_type={self.server_type}," 

1554 f"redis_connection={self.redis_connection}]" 

1555 ) 

1556 

1557 def __eq__(self, obj): 

1558 return isinstance(obj, ClusterNode) and obj.name == self.name 

1559 

1560 def __del__(self): 

1561 try: 

1562 if self.redis_connection is not None: 

1563 self.redis_connection.close() 

1564 except Exception: 

1565 # Ignore errors when closing the connection 

1566 pass 

1567 

1568 

1569class LoadBalancingStrategy(Enum): 

1570 ROUND_ROBIN = "round_robin" 

1571 ROUND_ROBIN_REPLICAS = "round_robin_replicas" 

1572 RANDOM_REPLICA = "random_replica" 

1573 

1574 

1575class LoadBalancer: 

1576 """ 

1577 Round-Robin Load Balancing 

1578 """ 

1579 

1580 def __init__(self, start_index: int = 0) -> None: 

1581 self.primary_to_idx = {} 

1582 self.start_index = start_index 

1583 

1584 def get_server_index( 

1585 self, 

1586 primary: str, 

1587 list_size: int, 

1588 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN, 

1589 ) -> int: 

1590 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA: 

1591 return self._get_random_replica_index(list_size) 

1592 else: 

1593 return self._get_round_robin_index( 

1594 primary, 

1595 list_size, 

1596 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, 

1597 ) 

1598 

1599 def reset(self) -> None: 

1600 self.primary_to_idx.clear() 

1601 

1602 def _get_random_replica_index(self, list_size: int) -> int: 

1603 return random.randint(1, list_size - 1) 

1604 

1605 def _get_round_robin_index( 

1606 self, primary: str, list_size: int, replicas_only: bool 

1607 ) -> int: 

1608 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

1609 if replicas_only and server_index == 0: 

1610 # skip the primary node index 

1611 server_index = 1 

1612 # Update the index for the next round 

1613 self.primary_to_idx[primary] = (server_index + 1) % list_size 

1614 return server_index 

1615 

1616 

1617class NodesManager: 

1618 def __init__( 

1619 self, 

1620 startup_nodes, 

1621 from_url=False, 

1622 require_full_coverage=False, 

1623 lock=None, 

1624 dynamic_startup_nodes=True, 

1625 connection_pool_class=ConnectionPool, 

1626 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

1627 cache: Optional[CacheInterface] = None, 

1628 cache_config: Optional[CacheConfig] = None, 

1629 cache_factory: Optional[CacheFactoryInterface] = None, 

1630 event_dispatcher: Optional[EventDispatcher] = None, 

1631 **kwargs, 

1632 ): 

1633 self.nodes_cache: Dict[str, Redis] = {} 

1634 self.slots_cache = {} 

1635 self.startup_nodes = {} 

1636 self.default_node = None 

1637 self.populate_startup_nodes(startup_nodes) 

1638 self.from_url = from_url 

1639 self._require_full_coverage = require_full_coverage 

1640 self._dynamic_startup_nodes = dynamic_startup_nodes 

1641 self.connection_pool_class = connection_pool_class 

1642 self.address_remap = address_remap 

1643 self._cache = cache 

1644 self._cache_config = cache_config 

1645 self._cache_factory = cache_factory 

1646 self._moved_exception = None 

1647 self.connection_kwargs = kwargs 

1648 self.read_load_balancer = LoadBalancer() 

1649 if lock is None: 

1650 lock = threading.RLock() 

1651 self._lock = lock 

1652 if event_dispatcher is None: 

1653 self._event_dispatcher = EventDispatcher() 

1654 else: 

1655 self._event_dispatcher = event_dispatcher 

1656 self._credential_provider = self.connection_kwargs.get( 

1657 "credential_provider", None 

1658 ) 

1659 self.initialize() 

1660 

1661 def get_node(self, host=None, port=None, node_name=None): 

1662 """ 

1663 Get the requested node from the cluster's nodes. 

1664 nodes. 

1665 :return: ClusterNode if the node exists, else None 

1666 """ 

1667 if host and port: 

1668 # the user passed host and port 

1669 if host == "localhost": 

1670 host = socket.gethostbyname(host) 

1671 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1672 elif node_name: 

1673 return self.nodes_cache.get(node_name) 

1674 else: 

1675 return None 

1676 

1677 def update_moved_exception(self, exception): 

1678 self._moved_exception = exception 

1679 

1680 def _update_moved_slots(self): 

1681 """ 

1682 Update the slot's node with the redirected one 

1683 """ 

1684 e = self._moved_exception 

1685 redirected_node = self.get_node(host=e.host, port=e.port) 

1686 if redirected_node is not None: 

1687 # The node already exists 

1688 if redirected_node.server_type is not PRIMARY: 

1689 # Update the node's server type 

1690 redirected_node.server_type = PRIMARY 

1691 else: 

1692 # This is a new node, we will add it to the nodes cache 

1693 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

1694 self.nodes_cache[redirected_node.name] = redirected_node 

1695 if redirected_node in self.slots_cache[e.slot_id]: 

1696 # The MOVED error resulted from a failover, and the new slot owner 

1697 # had previously been a replica. 

1698 old_primary = self.slots_cache[e.slot_id][0] 

1699 # Update the old primary to be a replica and add it to the end of 

1700 # the slot's node list 

1701 old_primary.server_type = REPLICA 

1702 self.slots_cache[e.slot_id].append(old_primary) 

1703 # Remove the old replica, which is now a primary, from the slot's 

1704 # node list 

1705 self.slots_cache[e.slot_id].remove(redirected_node) 

1706 # Override the old primary with the new one 

1707 self.slots_cache[e.slot_id][0] = redirected_node 

1708 if self.default_node == old_primary: 

1709 # Update the default node with the new primary 

1710 self.default_node = redirected_node 

1711 else: 

1712 # The new slot owner is a new server, or a server from a different 

1713 # shard. We need to remove all current nodes from the slot's list 

1714 # (including replications) and add just the new node. 

1715 self.slots_cache[e.slot_id] = [redirected_node] 

1716 # Reset moved_exception 

1717 self._moved_exception = None 

1718 

1719 @deprecated_args( 

1720 args_to_warn=["server_type"], 

1721 reason=( 

1722 "In case you need select some load balancing strategy " 

1723 "that will use replicas, please set it through 'load_balancing_strategy'" 

1724 ), 

1725 version="5.3.0", 

1726 ) 

1727 def get_node_from_slot( 

1728 self, 

1729 slot, 

1730 read_from_replicas=False, 

1731 load_balancing_strategy=None, 

1732 server_type=None, 

1733 ) -> ClusterNode: 

1734 """ 

1735 Gets a node that servers this hash slot 

1736 """ 

1737 if self._moved_exception: 

1738 with self._lock: 

1739 if self._moved_exception: 

1740 self._update_moved_slots() 

1741 

1742 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

1743 raise SlotNotCoveredError( 

1744 f'Slot "{slot}" not covered by the cluster. ' 

1745 f'"require_full_coverage={self._require_full_coverage}"' 

1746 ) 

1747 

1748 if read_from_replicas is True and load_balancing_strategy is None: 

1749 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN 

1750 

1751 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: 

1752 # get the server index using the strategy defined in load_balancing_strategy 

1753 primary_name = self.slots_cache[slot][0].name 

1754 node_idx = self.read_load_balancer.get_server_index( 

1755 primary_name, len(self.slots_cache[slot]), load_balancing_strategy 

1756 ) 

1757 elif ( 

1758 server_type is None 

1759 or server_type == PRIMARY 

1760 or len(self.slots_cache[slot]) == 1 

1761 ): 

1762 # return a primary 

1763 node_idx = 0 

1764 else: 

1765 # return a replica 

1766 # randomly choose one of the replicas 

1767 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

1768 

1769 return self.slots_cache[slot][node_idx] 

1770 

1771 def get_nodes_by_server_type(self, server_type): 

1772 """ 

1773 Get all nodes with the specified server type 

1774 :param server_type: 'primary' or 'replica' 

1775 :return: list of ClusterNode 

1776 """ 

1777 return [ 

1778 node 

1779 for node in self.nodes_cache.values() 

1780 if node.server_type == server_type 

1781 ] 

1782 

1783 def populate_startup_nodes(self, nodes): 

1784 """ 

1785 Populate all startup nodes and filters out any duplicates 

1786 """ 

1787 for n in nodes: 

1788 self.startup_nodes[n.name] = n 

1789 

1790 def check_slots_coverage(self, slots_cache): 

1791 # Validate if all slots are covered or if we should try next 

1792 # startup node 

1793 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

1794 if i not in slots_cache: 

1795 return False 

1796 return True 

1797 

1798 def create_redis_connections(self, nodes): 

1799 """ 

1800 This function will create a redis connection to all nodes in :nodes: 

1801 """ 

1802 connection_pools = [] 

1803 for node in nodes: 

1804 if node.redis_connection is None: 

1805 node.redis_connection = self.create_redis_node( 

1806 host=node.host, port=node.port, **self.connection_kwargs 

1807 ) 

1808 connection_pools.append(node.redis_connection.connection_pool) 

1809 

1810 self._event_dispatcher.dispatch( 

1811 AfterPooledConnectionsInstantiationEvent( 

1812 connection_pools, ClientType.SYNC, self._credential_provider 

1813 ) 

1814 ) 

1815 

1816 def create_redis_node(self, host, port, **kwargs): 

1817 # We are configuring the connection pool not to retry 

1818 # connections on lower level clients to avoid retrying 

1819 # connections to nodes that are not reachable 

1820 # and to avoid blocking the connection pool. 

1821 # The only error that will have some handling in the lower 

1822 # level clients is ConnectionError which will trigger disconnection 

1823 # of the socket. 

1824 # The retries will be handled on cluster client level 

1825 # where we will have proper handling of the cluster topology 

1826 node_retry_config = Retry( 

1827 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) 

1828 ) 

1829 

1830 protocol = kwargs.get("protocol", None) 

1831 if protocol in [3, "3"]: 

1832 kwargs.update( 

1833 {"maint_notifications_config": MaintNotificationsConfig(enabled=False)} 

1834 ) 

1835 if self.from_url: 

1836 # Create a redis node with a costumed connection pool 

1837 kwargs.update({"host": host}) 

1838 kwargs.update({"port": port}) 

1839 kwargs.update({"cache": self._cache}) 

1840 kwargs.update({"retry": node_retry_config}) 

1841 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

1842 else: 

1843 r = Redis( 

1844 host=host, 

1845 port=port, 

1846 cache=self._cache, 

1847 retry=node_retry_config, 

1848 **kwargs, 

1849 ) 

1850 return r 

1851 

1852 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

1853 node_name = get_node_name(host, port) 

1854 # check if we already have this node in the tmp_nodes_cache 

1855 target_node = tmp_nodes_cache.get(node_name) 

1856 if target_node is None: 

1857 # before creating a new cluster node, check if the cluster node already 

1858 # exists in the current nodes cache and has a valid connection so we can 

1859 # reuse it 

1860 target_node = self.nodes_cache.get(node_name) 

1861 if target_node is None or target_node.redis_connection is None: 

1862 # create new cluster node for this cluster 

1863 target_node = ClusterNode(host, port, role) 

1864 if target_node.server_type != role: 

1865 target_node.server_type = role 

1866 # add this node to the nodes cache 

1867 tmp_nodes_cache[target_node.name] = target_node 

1868 

1869 return target_node 

1870 

1871 def initialize(self): 

1872 """ 

1873 Initializes the nodes cache, slots cache and redis connections. 

1874 :startup_nodes: 

1875 Responsible for discovering other nodes in the cluster 

1876 """ 

1877 self.reset() 

1878 tmp_nodes_cache = {} 

1879 tmp_slots = {} 

1880 disagreements = [] 

1881 startup_nodes_reachable = False 

1882 fully_covered = False 

1883 kwargs = self.connection_kwargs 

1884 exception = None 

1885 # Convert to tuple to prevent RuntimeError if self.startup_nodes 

1886 # is modified during iteration 

1887 for startup_node in tuple(self.startup_nodes.values()): 

1888 try: 

1889 if startup_node.redis_connection: 

1890 r = startup_node.redis_connection 

1891 else: 

1892 # Create a new Redis connection 

1893 r = self.create_redis_node( 

1894 startup_node.host, startup_node.port, **kwargs 

1895 ) 

1896 self.startup_nodes[startup_node.name].redis_connection = r 

1897 # Make sure cluster mode is enabled on this node 

1898 try: 

1899 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

1900 r.connection_pool.disconnect() 

1901 except ResponseError: 

1902 raise RedisClusterException( 

1903 "Cluster mode is not enabled on this node" 

1904 ) 

1905 startup_nodes_reachable = True 

1906 except Exception as e: 

1907 # Try the next startup node. 

1908 # The exception is saved and raised only if we have no more nodes. 

1909 exception = e 

1910 continue 

1911 

1912 # CLUSTER SLOTS command results in the following output: 

1913 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

1914 # where each node contains the following list: [IP, port, node_id] 

1915 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

1916 # primary node of the first slot section. 

1917 # If there's only one server in the cluster, its ``host`` is '' 

1918 # Fix it to the host in startup_nodes 

1919 if ( 

1920 len(cluster_slots) == 1 

1921 and len(cluster_slots[0][2][0]) == 0 

1922 and len(self.startup_nodes) == 1 

1923 ): 

1924 cluster_slots[0][2][0] = startup_node.host 

1925 

1926 for slot in cluster_slots: 

1927 primary_node = slot[2] 

1928 host = str_if_bytes(primary_node[0]) 

1929 if host == "": 

1930 host = startup_node.host 

1931 port = int(primary_node[1]) 

1932 host, port = self.remap_host_port(host, port) 

1933 

1934 nodes_for_slot = [] 

1935 

1936 target_node = self._get_or_create_cluster_node( 

1937 host, port, PRIMARY, tmp_nodes_cache 

1938 ) 

1939 nodes_for_slot.append(target_node) 

1940 

1941 replica_nodes = slot[3:] 

1942 for replica_node in replica_nodes: 

1943 host = str_if_bytes(replica_node[0]) 

1944 port = int(replica_node[1]) 

1945 host, port = self.remap_host_port(host, port) 

1946 target_replica_node = self._get_or_create_cluster_node( 

1947 host, port, REPLICA, tmp_nodes_cache 

1948 ) 

1949 nodes_for_slot.append(target_replica_node) 

1950 

1951 for i in range(int(slot[0]), int(slot[1]) + 1): 

1952 if i not in tmp_slots: 

1953 tmp_slots[i] = nodes_for_slot 

1954 else: 

1955 # Validate that 2 nodes want to use the same slot cache 

1956 # setup 

1957 tmp_slot = tmp_slots[i][0] 

1958 if tmp_slot.name != target_node.name: 

1959 disagreements.append( 

1960 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

1961 ) 

1962 

1963 if len(disagreements) > 5: 

1964 raise RedisClusterException( 

1965 f"startup_nodes could not agree on a valid " 

1966 f"slots cache: {', '.join(disagreements)}" 

1967 ) 

1968 

1969 fully_covered = self.check_slots_coverage(tmp_slots) 

1970 if fully_covered: 

1971 # Don't need to continue to the next startup node if all 

1972 # slots are covered 

1973 break 

1974 

1975 if not startup_nodes_reachable: 

1976 raise RedisClusterException( 

1977 f"Redis Cluster cannot be connected. Please provide at least " 

1978 f"one reachable node: {str(exception)}" 

1979 ) from exception 

1980 

1981 if self._cache is None and self._cache_config is not None: 

1982 if self._cache_factory is None: 

1983 self._cache = CacheFactory(self._cache_config).get_cache() 

1984 else: 

1985 self._cache = self._cache_factory.get_cache() 

1986 

1987 # Create Redis connections to all nodes 

1988 self.create_redis_connections(list(tmp_nodes_cache.values())) 

1989 

1990 # Check if the slots are not fully covered 

1991 if not fully_covered and self._require_full_coverage: 

1992 # Despite the requirement that the slots be covered, there 

1993 # isn't a full coverage 

1994 raise RedisClusterException( 

1995 f"All slots are not covered after query all startup_nodes. " 

1996 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

1997 f"covered..." 

1998 ) 

1999 

2000 # Set the tmp variables to the real variables 

2001 self.nodes_cache = tmp_nodes_cache 

2002 self.slots_cache = tmp_slots 

2003 # Set the default node 

2004 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

2005 if self._dynamic_startup_nodes: 

2006 # Populate the startup nodes with all discovered nodes 

2007 self.startup_nodes = tmp_nodes_cache 

2008 # If initialize was called after a MovedError, clear it 

2009 self._moved_exception = None 

2010 

2011 def close(self) -> None: 

2012 self.default_node = None 

2013 for node in self.nodes_cache.values(): 

2014 if node.redis_connection: 

2015 node.redis_connection.close() 

2016 

2017 def reset(self): 

2018 try: 

2019 self.read_load_balancer.reset() 

2020 except TypeError: 

2021 # The read_load_balancer is None, do nothing 

2022 pass 

2023 

2024 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

2025 """ 

2026 Remap the host and port returned from the cluster to a different 

2027 internal value. Useful if the client is not connecting directly 

2028 to the cluster. 

2029 """ 

2030 if self.address_remap: 

2031 return self.address_remap((host, port)) 

2032 return host, port 

2033 

2034 def find_connection_owner(self, connection: Connection) -> Optional[Redis]: 

2035 node_name = get_node_name(connection.host, connection.port) 

2036 for node in tuple(self.nodes_cache.values()): 

2037 if node.redis_connection: 

2038 conn_args = node.redis_connection.connection_pool.connection_kwargs 

2039 if node_name == get_node_name( 

2040 conn_args.get("host"), conn_args.get("port") 

2041 ): 

2042 return node 

2043 

2044 

2045class ClusterPubSub(PubSub): 

2046 """ 

2047 Wrapper for PubSub class. 

2048 

2049 IMPORTANT: before using ClusterPubSub, read about the known limitations 

2050 with pubsub in Cluster mode and learn how to workaround them: 

2051 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html 

2052 """ 

2053 

2054 def __init__( 

2055 self, 

2056 redis_cluster, 

2057 node=None, 

2058 host=None, 

2059 port=None, 

2060 push_handler_func=None, 

2061 event_dispatcher: Optional["EventDispatcher"] = None, 

2062 **kwargs, 

2063 ): 

2064 """ 

2065 When a pubsub instance is created without specifying a node, a single 

2066 node will be transparently chosen for the pubsub connection on the 

2067 first command execution. The node will be determined by: 

2068 1. Hashing the channel name in the request to find its keyslot 

2069 2. Selecting a node that handles the keyslot: If read_from_replicas is 

2070 set to true or load_balancing_strategy is set, a replica can be selected. 

2071 

2072 :type redis_cluster: RedisCluster 

2073 :type node: ClusterNode 

2074 :type host: str 

2075 :type port: int 

2076 """ 

2077 self.node = None 

2078 self.set_pubsub_node(redis_cluster, node, host, port) 

2079 connection_pool = ( 

2080 None 

2081 if self.node is None 

2082 else redis_cluster.get_redis_connection(self.node).connection_pool 

2083 ) 

2084 self.cluster = redis_cluster 

2085 self.node_pubsub_mapping = {} 

2086 self._pubsubs_generator = self._pubsubs_generator() 

2087 if event_dispatcher is None: 

2088 self._event_dispatcher = EventDispatcher() 

2089 else: 

2090 self._event_dispatcher = event_dispatcher 

2091 super().__init__( 

2092 connection_pool=connection_pool, 

2093 encoder=redis_cluster.encoder, 

2094 push_handler_func=push_handler_func, 

2095 event_dispatcher=self._event_dispatcher, 

2096 **kwargs, 

2097 ) 

2098 

2099 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

2100 """ 

2101 The pubsub node will be set according to the passed node, host and port 

2102 When none of the node, host, or port are specified - the node is set 

2103 to None and will be determined by the keyslot of the channel in the 

2104 first command to be executed. 

2105 RedisClusterException will be thrown if the passed node does not exist 

2106 in the cluster. 

2107 If host is passed without port, or vice versa, a DataError will be 

2108 thrown. 

2109 :type cluster: RedisCluster 

2110 :type node: ClusterNode 

2111 :type host: str 

2112 :type port: int 

2113 """ 

2114 if node is not None: 

2115 # node is passed by the user 

2116 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

2117 pubsub_node = node 

2118 elif host is not None and port is not None: 

2119 # host and port passed by the user 

2120 node = cluster.get_node(host=host, port=port) 

2121 self._raise_on_invalid_node(cluster, node, host, port) 

2122 pubsub_node = node 

2123 elif any([host, port]) is True: 

2124 # only 'host' or 'port' passed 

2125 raise DataError("Passing a host requires passing a port, and vice versa") 

2126 else: 

2127 # nothing passed by the user. set node to None 

2128 pubsub_node = None 

2129 

2130 self.node = pubsub_node 

2131 

2132 def get_pubsub_node(self): 

2133 """ 

2134 Get the node that is being used as the pubsub connection 

2135 """ 

2136 return self.node 

2137 

2138 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

2139 """ 

2140 Raise a RedisClusterException if the node is None or doesn't exist in 

2141 the cluster. 

2142 """ 

2143 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

2144 raise RedisClusterException( 

2145 f"Node {host}:{port} doesn't exist in the cluster" 

2146 ) 

2147 

2148 def execute_command(self, *args): 

2149 """ 

2150 Execute a subscribe/unsubscribe command. 

2151 

2152 Taken code from redis-py and tweak to make it work within a cluster. 

2153 """ 

2154 # NOTE: don't parse the response in this function -- it could pull a 

2155 # legitimate message off the stack if the connection is already 

2156 # subscribed to one or more channels 

2157 

2158 if self.connection is None: 

2159 if self.connection_pool is None: 

2160 if len(args) > 1: 

2161 # Hash the first channel and get one of the nodes holding 

2162 # this slot 

2163 channel = args[1] 

2164 slot = self.cluster.keyslot(channel) 

2165 node = self.cluster.nodes_manager.get_node_from_slot( 

2166 slot, 

2167 self.cluster.read_from_replicas, 

2168 self.cluster.load_balancing_strategy, 

2169 ) 

2170 else: 

2171 # Get a random node 

2172 node = self.cluster.get_random_node() 

2173 self.node = node 

2174 redis_connection = self.cluster.get_redis_connection(node) 

2175 self.connection_pool = redis_connection.connection_pool 

2176 self.connection = self.connection_pool.get_connection() 

2177 # register a callback that re-subscribes to any channels we 

2178 # were listening to when we were disconnected 

2179 self.connection.register_connect_callback(self.on_connect) 

2180 if self.push_handler_func is not None: 

2181 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

2182 self._event_dispatcher.dispatch( 

2183 AfterPubSubConnectionInstantiationEvent( 

2184 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

2185 ) 

2186 ) 

2187 connection = self.connection 

2188 self._execute(connection, connection.send_command, *args) 

2189 

2190 def _get_node_pubsub(self, node): 

2191 try: 

2192 return self.node_pubsub_mapping[node.name] 

2193 except KeyError: 

2194 pubsub = node.redis_connection.pubsub( 

2195 push_handler_func=self.push_handler_func 

2196 ) 

2197 self.node_pubsub_mapping[node.name] = pubsub 

2198 return pubsub 

2199 

2200 def _sharded_message_generator(self): 

2201 for _ in range(len(self.node_pubsub_mapping)): 

2202 pubsub = next(self._pubsubs_generator) 

2203 message = pubsub.get_message() 

2204 if message is not None: 

2205 return message 

2206 return None 

2207 

2208 def _pubsubs_generator(self): 

2209 while True: 

2210 yield from self.node_pubsub_mapping.values() 

2211 

2212 def get_sharded_message( 

2213 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None 

2214 ): 

2215 if target_node: 

2216 message = self.node_pubsub_mapping[target_node.name].get_message( 

2217 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout 

2218 ) 

2219 else: 

2220 message = self._sharded_message_generator() 

2221 if message is None: 

2222 return None 

2223 elif str_if_bytes(message["type"]) == "sunsubscribe": 

2224 if message["channel"] in self.pending_unsubscribe_shard_channels: 

2225 self.pending_unsubscribe_shard_channels.remove(message["channel"]) 

2226 self.shard_channels.pop(message["channel"], None) 

2227 node = self.cluster.get_node_from_key(message["channel"]) 

2228 if self.node_pubsub_mapping[node.name].subscribed is False: 

2229 self.node_pubsub_mapping.pop(node.name) 

2230 if not self.channels and not self.patterns and not self.shard_channels: 

2231 # There are no subscriptions anymore, set subscribed_event flag 

2232 # to false 

2233 self.subscribed_event.clear() 

2234 if self.ignore_subscribe_messages or ignore_subscribe_messages: 

2235 return None 

2236 return message 

2237 

2238 def ssubscribe(self, *args, **kwargs): 

2239 if args: 

2240 args = list_or_args(args[0], args[1:]) 

2241 s_channels = dict.fromkeys(args) 

2242 s_channels.update(kwargs) 

2243 for s_channel, handler in s_channels.items(): 

2244 node = self.cluster.get_node_from_key(s_channel) 

2245 pubsub = self._get_node_pubsub(node) 

2246 if handler: 

2247 pubsub.ssubscribe(**{s_channel: handler}) 

2248 else: 

2249 pubsub.ssubscribe(s_channel) 

2250 self.shard_channels.update(pubsub.shard_channels) 

2251 self.pending_unsubscribe_shard_channels.difference_update( 

2252 self._normalize_keys({s_channel: None}) 

2253 ) 

2254 if pubsub.subscribed and not self.subscribed: 

2255 self.subscribed_event.set() 

2256 self.health_check_response_counter = 0 

2257 

2258 def sunsubscribe(self, *args): 

2259 if args: 

2260 args = list_or_args(args[0], args[1:]) 

2261 else: 

2262 args = self.shard_channels 

2263 

2264 for s_channel in args: 

2265 node = self.cluster.get_node_from_key(s_channel) 

2266 p = self._get_node_pubsub(node) 

2267 p.sunsubscribe(s_channel) 

2268 self.pending_unsubscribe_shard_channels.update( 

2269 p.pending_unsubscribe_shard_channels 

2270 ) 

2271 

2272 def get_redis_connection(self): 

2273 """ 

2274 Get the Redis connection of the pubsub connected node. 

2275 """ 

2276 if self.node is not None: 

2277 return self.node.redis_connection 

2278 

2279 def disconnect(self): 

2280 """ 

2281 Disconnect the pubsub connection. 

2282 """ 

2283 if self.connection: 

2284 self.connection.disconnect() 

2285 for pubsub in self.node_pubsub_mapping.values(): 

2286 pubsub.connection.disconnect() 

2287 

2288 

2289class ClusterPipeline(RedisCluster): 

2290 """ 

2291 Support for Redis pipeline 

2292 in cluster mode 

2293 """ 

2294 

2295 ERRORS_ALLOW_RETRY = ( 

2296 ConnectionError, 

2297 TimeoutError, 

2298 MovedError, 

2299 AskError, 

2300 TryAgainError, 

2301 ) 

2302 

2303 NO_SLOTS_COMMANDS = {"UNWATCH"} 

2304 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

2305 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

2306 

2307 @deprecated_args( 

2308 args_to_warn=[ 

2309 "cluster_error_retry_attempts", 

2310 ], 

2311 reason="Please configure the 'retry' object instead", 

2312 version="6.0.0", 

2313 ) 

2314 def __init__( 

2315 self, 

2316 nodes_manager: "NodesManager", 

2317 commands_parser: "CommandsParser", 

2318 result_callbacks: Optional[Dict[str, Callable]] = None, 

2319 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

2320 startup_nodes: Optional[List["ClusterNode"]] = None, 

2321 read_from_replicas: bool = False, 

2322 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

2323 cluster_error_retry_attempts: int = 3, 

2324 reinitialize_steps: int = 5, 

2325 retry: Optional[Retry] = None, 

2326 lock=None, 

2327 transaction=False, 

2328 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

2329 **kwargs, 

2330 ): 

2331 """ """ 

2332 self.command_stack = [] 

2333 self.nodes_manager = nodes_manager 

2334 self.commands_parser = commands_parser 

2335 self.refresh_table_asap = False 

2336 self.result_callbacks = ( 

2337 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

2338 ) 

2339 self.startup_nodes = startup_nodes if startup_nodes else [] 

2340 self.read_from_replicas = read_from_replicas 

2341 self.load_balancing_strategy = load_balancing_strategy 

2342 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

2343 self.cluster_response_callbacks = cluster_response_callbacks 

2344 self.reinitialize_counter = 0 

2345 self.reinitialize_steps = reinitialize_steps 

2346 if retry is not None: 

2347 self.retry = retry 

2348 else: 

2349 self.retry = Retry( 

2350 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

2351 retries=cluster_error_retry_attempts, 

2352 ) 

2353 

2354 self.encoder = Encoder( 

2355 kwargs.get("encoding", "utf-8"), 

2356 kwargs.get("encoding_errors", "strict"), 

2357 kwargs.get("decode_responses", False), 

2358 ) 

2359 if lock is None: 

2360 lock = threading.RLock() 

2361 self._lock = lock 

2362 self.parent_execute_command = super().execute_command 

2363 self._execution_strategy: ExecutionStrategy = ( 

2364 PipelineStrategy(self) if not transaction else TransactionStrategy(self) 

2365 ) 

2366 

2367 # For backward compatibility, mapping from existing policies to new one 

2368 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

2369 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

2370 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

2371 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

2372 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

2373 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

2374 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

2375 } 

2376 

2377 self._policies_callback_mapping: dict[ 

2378 Union[RequestPolicy, ResponsePolicy], Callable 

2379 ] = { 

2380 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

2381 self.get_random_primary_or_all_nodes(command_name) 

2382 ], 

2383 RequestPolicy.DEFAULT_KEYED: lambda command, 

2384 *args: self.get_nodes_from_slot(command, *args), 

2385 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

2386 RequestPolicy.ALL_SHARDS: self.get_primaries, 

2387 RequestPolicy.ALL_NODES: self.get_nodes, 

2388 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

2389 RequestPolicy.MULTI_SHARD: lambda *args, 

2390 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

2391 RequestPolicy.SPECIAL: self.get_special_nodes, 

2392 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

2393 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

2394 } 

2395 

2396 self._policy_resolver = policy_resolver 

2397 

2398 def __repr__(self): 

2399 """ """ 

2400 return f"{type(self).__name__}" 

2401 

2402 def __enter__(self): 

2403 """ """ 

2404 return self 

2405 

2406 def __exit__(self, exc_type, exc_value, traceback): 

2407 """ """ 

2408 self.reset() 

2409 

2410 def __del__(self): 

2411 try: 

2412 self.reset() 

2413 except Exception: 

2414 pass 

2415 

2416 def __len__(self): 

2417 """ """ 

2418 return len(self._execution_strategy.command_queue) 

2419 

2420 def __bool__(self): 

2421 "Pipeline instances should always evaluate to True on Python 3+" 

2422 return True 

2423 

2424 def execute_command(self, *args, **kwargs): 

2425 """ 

2426 Wrapper function for pipeline_execute_command 

2427 """ 

2428 return self._execution_strategy.execute_command(*args, **kwargs) 

2429 

2430 def pipeline_execute_command(self, *args, **options): 

2431 """ 

2432 Stage a command to be executed when execute() is next called 

2433 

2434 Returns the current Pipeline object back so commands can be 

2435 chained together, such as: 

2436 

2437 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

2438 

2439 At some other point, you can then run: pipe.execute(), 

2440 which will execute all commands queued in the pipe. 

2441 """ 

2442 return self._execution_strategy.execute_command(*args, **options) 

2443 

2444 def annotate_exception(self, exception, number, command): 

2445 """ 

2446 Provides extra context to the exception prior to it being handled 

2447 """ 

2448 self._execution_strategy.annotate_exception(exception, number, command) 

2449 

2450 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2451 """ 

2452 Execute all the commands in the current pipeline 

2453 """ 

2454 

2455 try: 

2456 return self._execution_strategy.execute(raise_on_error) 

2457 finally: 

2458 self.reset() 

2459 

2460 def reset(self): 

2461 """ 

2462 Reset back to empty pipeline. 

2463 """ 

2464 self._execution_strategy.reset() 

2465 

2466 def send_cluster_commands( 

2467 self, stack, raise_on_error=True, allow_redirections=True 

2468 ): 

2469 return self._execution_strategy.send_cluster_commands( 

2470 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections 

2471 ) 

2472 

2473 def exists(self, *keys): 

2474 return self._execution_strategy.exists(*keys) 

2475 

2476 def eval(self): 

2477 """ """ 

2478 return self._execution_strategy.eval() 

2479 

2480 def multi(self): 

2481 """ 

2482 Start a transactional block of the pipeline after WATCH commands 

2483 are issued. End the transactional block with `execute`. 

2484 """ 

2485 self._execution_strategy.multi() 

2486 

2487 def load_scripts(self): 

2488 """ """ 

2489 self._execution_strategy.load_scripts() 

2490 

2491 def discard(self): 

2492 """ """ 

2493 self._execution_strategy.discard() 

2494 

2495 def watch(self, *names): 

2496 """Watches the values at keys ``names``""" 

2497 self._execution_strategy.watch(*names) 

2498 

2499 def unwatch(self): 

2500 """Unwatches all previously specified keys""" 

2501 self._execution_strategy.unwatch() 

2502 

2503 def script_load_for_pipeline(self, *args, **kwargs): 

2504 self._execution_strategy.script_load_for_pipeline(*args, **kwargs) 

2505 

2506 def delete(self, *names): 

2507 self._execution_strategy.delete(*names) 

2508 

2509 def unlink(self, *names): 

2510 self._execution_strategy.unlink(*names) 

2511 

2512 

2513def block_pipeline_command(name: str) -> Callable[..., Any]: 

2514 """ 

2515 Prints error because some pipelined commands should 

2516 be blocked when running in cluster-mode 

2517 """ 

2518 

2519 def inner(*args, **kwargs): 

2520 raise RedisClusterException( 

2521 f"ERROR: Calling pipelined function {name} is blocked " 

2522 f"when running redis in cluster mode..." 

2523 ) 

2524 

2525 return inner 

2526 

2527 

2528# Blocked pipeline commands 

2529PIPELINE_BLOCKED_COMMANDS = ( 

2530 "BGREWRITEAOF", 

2531 "BGSAVE", 

2532 "BITOP", 

2533 "BRPOPLPUSH", 

2534 "CLIENT GETNAME", 

2535 "CLIENT KILL", 

2536 "CLIENT LIST", 

2537 "CLIENT SETNAME", 

2538 "CLIENT", 

2539 "CONFIG GET", 

2540 "CONFIG RESETSTAT", 

2541 "CONFIG REWRITE", 

2542 "CONFIG SET", 

2543 "CONFIG", 

2544 "DBSIZE", 

2545 "ECHO", 

2546 "EVALSHA", 

2547 "FLUSHALL", 

2548 "FLUSHDB", 

2549 "INFO", 

2550 "KEYS", 

2551 "LASTSAVE", 

2552 "MGET", 

2553 "MGET NONATOMIC", 

2554 "MOVE", 

2555 "MSET", 

2556 "MSETEX", 

2557 "MSET NONATOMIC", 

2558 "MSETNX", 

2559 "PFCOUNT", 

2560 "PFMERGE", 

2561 "PING", 

2562 "PUBLISH", 

2563 "RANDOMKEY", 

2564 "READONLY", 

2565 "READWRITE", 

2566 "RENAME", 

2567 "RENAMENX", 

2568 "RPOPLPUSH", 

2569 "SAVE", 

2570 "SCAN", 

2571 "SCRIPT EXISTS", 

2572 "SCRIPT FLUSH", 

2573 "SCRIPT KILL", 

2574 "SCRIPT LOAD", 

2575 "SCRIPT", 

2576 "SDIFF", 

2577 "SDIFFSTORE", 

2578 "SENTINEL GET MASTER ADDR BY NAME", 

2579 "SENTINEL MASTER", 

2580 "SENTINEL MASTERS", 

2581 "SENTINEL MONITOR", 

2582 "SENTINEL REMOVE", 

2583 "SENTINEL SENTINELS", 

2584 "SENTINEL SET", 

2585 "SENTINEL SLAVES", 

2586 "SENTINEL", 

2587 "SHUTDOWN", 

2588 "SINTER", 

2589 "SINTERSTORE", 

2590 "SLAVEOF", 

2591 "SLOWLOG GET", 

2592 "SLOWLOG LEN", 

2593 "SLOWLOG RESET", 

2594 "SLOWLOG", 

2595 "SMOVE", 

2596 "SORT", 

2597 "SUNION", 

2598 "SUNIONSTORE", 

2599 "TIME", 

2600) 

2601for command in PIPELINE_BLOCKED_COMMANDS: 

2602 command = command.replace(" ", "_").lower() 

2603 

2604 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

2605 

2606 

2607class PipelineCommand: 

2608 """ """ 

2609 

2610 def __init__(self, args, options=None, position=None): 

2611 self.args = args 

2612 if options is None: 

2613 options = {} 

2614 self.options = options 

2615 self.position = position 

2616 self.result = None 

2617 self.node = None 

2618 self.asking = False 

2619 self.command_policies: Optional[CommandPolicies] = None 

2620 

2621 

2622class NodeCommands: 

2623 """ """ 

2624 

2625 def __init__(self, parse_response, connection_pool, connection): 

2626 """ """ 

2627 self.parse_response = parse_response 

2628 self.connection_pool = connection_pool 

2629 self.connection = connection 

2630 self.commands = [] 

2631 

2632 def append(self, c): 

2633 """ """ 

2634 self.commands.append(c) 

2635 

2636 def write(self): 

2637 """ 

2638 Code borrowed from Redis so it can be fixed 

2639 """ 

2640 connection = self.connection 

2641 commands = self.commands 

2642 

2643 # We are going to clobber the commands with the write, so go ahead 

2644 # and ensure that nothing is sitting there from a previous run. 

2645 for c in commands: 

2646 c.result = None 

2647 

2648 # build up all commands into a single request to increase network perf 

2649 # send all the commands and catch connection and timeout errors. 

2650 try: 

2651 connection.send_packed_command( 

2652 connection.pack_commands([c.args for c in commands]) 

2653 ) 

2654 except (ConnectionError, TimeoutError) as e: 

2655 for c in commands: 

2656 c.result = e 

2657 

2658 def read(self): 

2659 """ """ 

2660 connection = self.connection 

2661 for c in self.commands: 

2662 # if there is a result on this command, 

2663 # it means we ran into an exception 

2664 # like a connection error. Trying to parse 

2665 # a response on a connection that 

2666 # is no longer open will result in a 

2667 # connection error raised by redis-py. 

2668 # but redis-py doesn't check in parse_response 

2669 # that the sock object is 

2670 # still set and if you try to 

2671 # read from a closed connection, it will 

2672 # result in an AttributeError because 

2673 # it will do a readline() call on None. 

2674 # This can have all kinds of nasty side-effects. 

2675 # Treating this case as a connection error 

2676 # is fine because it will dump 

2677 # the connection object back into the 

2678 # pool and on the next write, it will 

2679 # explicitly open the connection and all will be well. 

2680 if c.result is None: 

2681 try: 

2682 c.result = self.parse_response(connection, c.args[0], **c.options) 

2683 except (ConnectionError, TimeoutError) as e: 

2684 for c in self.commands: 

2685 c.result = e 

2686 return 

2687 except RedisError: 

2688 c.result = sys.exc_info()[1] 

2689 

2690 

2691class ExecutionStrategy(ABC): 

2692 @property 

2693 @abstractmethod 

2694 def command_queue(self): 

2695 pass 

2696 

2697 @abstractmethod 

2698 def execute_command(self, *args, **kwargs): 

2699 """ 

2700 Execution flow for current execution strategy. 

2701 

2702 See: ClusterPipeline.execute_command() 

2703 """ 

2704 pass 

2705 

2706 @abstractmethod 

2707 def annotate_exception(self, exception, number, command): 

2708 """ 

2709 Annotate exception according to current execution strategy. 

2710 

2711 See: ClusterPipeline.annotate_exception() 

2712 """ 

2713 pass 

2714 

2715 @abstractmethod 

2716 def pipeline_execute_command(self, *args, **options): 

2717 """ 

2718 Pipeline execution flow for current execution strategy. 

2719 

2720 See: ClusterPipeline.pipeline_execute_command() 

2721 """ 

2722 pass 

2723 

2724 @abstractmethod 

2725 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2726 """ 

2727 Executes current execution strategy. 

2728 

2729 See: ClusterPipeline.execute() 

2730 """ 

2731 pass 

2732 

2733 @abstractmethod 

2734 def send_cluster_commands( 

2735 self, stack, raise_on_error=True, allow_redirections=True 

2736 ): 

2737 """ 

2738 Sends commands according to current execution strategy. 

2739 

2740 See: ClusterPipeline.send_cluster_commands() 

2741 """ 

2742 pass 

2743 

2744 @abstractmethod 

2745 def reset(self): 

2746 """ 

2747 Resets current execution strategy. 

2748 

2749 See: ClusterPipeline.reset() 

2750 """ 

2751 pass 

2752 

2753 @abstractmethod 

2754 def exists(self, *keys): 

2755 pass 

2756 

2757 @abstractmethod 

2758 def eval(self): 

2759 pass 

2760 

2761 @abstractmethod 

2762 def multi(self): 

2763 """ 

2764 Starts transactional context. 

2765 

2766 See: ClusterPipeline.multi() 

2767 """ 

2768 pass 

2769 

2770 @abstractmethod 

2771 def load_scripts(self): 

2772 pass 

2773 

2774 @abstractmethod 

2775 def watch(self, *names): 

2776 pass 

2777 

2778 @abstractmethod 

2779 def unwatch(self): 

2780 """ 

2781 Unwatches all previously specified keys 

2782 

2783 See: ClusterPipeline.unwatch() 

2784 """ 

2785 pass 

2786 

2787 @abstractmethod 

2788 def script_load_for_pipeline(self, *args, **kwargs): 

2789 pass 

2790 

2791 @abstractmethod 

2792 def delete(self, *names): 

2793 """ 

2794 "Delete a key specified by ``names``" 

2795 

2796 See: ClusterPipeline.delete() 

2797 """ 

2798 pass 

2799 

2800 @abstractmethod 

2801 def unlink(self, *names): 

2802 """ 

2803 "Unlink a key specified by ``names``" 

2804 

2805 See: ClusterPipeline.unlink() 

2806 """ 

2807 pass 

2808 

2809 @abstractmethod 

2810 def discard(self): 

2811 pass 

2812 

2813 

2814class AbstractStrategy(ExecutionStrategy): 

2815 def __init__( 

2816 self, 

2817 pipe: ClusterPipeline, 

2818 ): 

2819 self._command_queue: List[PipelineCommand] = [] 

2820 self._pipe = pipe 

2821 self._nodes_manager = self._pipe.nodes_manager 

2822 

2823 @property 

2824 def command_queue(self): 

2825 return self._command_queue 

2826 

2827 @command_queue.setter 

2828 def command_queue(self, queue: List[PipelineCommand]): 

2829 self._command_queue = queue 

2830 

2831 @abstractmethod 

2832 def execute_command(self, *args, **kwargs): 

2833 pass 

2834 

2835 def pipeline_execute_command(self, *args, **options): 

2836 self._command_queue.append( 

2837 PipelineCommand(args, options, len(self._command_queue)) 

2838 ) 

2839 return self._pipe 

2840 

2841 @abstractmethod 

2842 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2843 pass 

2844 

2845 @abstractmethod 

2846 def send_cluster_commands( 

2847 self, stack, raise_on_error=True, allow_redirections=True 

2848 ): 

2849 pass 

2850 

2851 @abstractmethod 

2852 def reset(self): 

2853 pass 

2854 

2855 def exists(self, *keys): 

2856 return self.execute_command("EXISTS", *keys) 

2857 

2858 def eval(self): 

2859 """ """ 

2860 raise RedisClusterException("method eval() is not implemented") 

2861 

2862 def load_scripts(self): 

2863 """ """ 

2864 raise RedisClusterException("method load_scripts() is not implemented") 

2865 

2866 def script_load_for_pipeline(self, *args, **kwargs): 

2867 """ """ 

2868 raise RedisClusterException( 

2869 "method script_load_for_pipeline() is not implemented" 

2870 ) 

2871 

2872 def annotate_exception(self, exception, number, command): 

2873 """ 

2874 Provides extra context to the exception prior to it being handled 

2875 """ 

2876 cmd = " ".join(map(safe_str, command)) 

2877 msg = ( 

2878 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

2879 f"caused error: {exception.args[0]}" 

2880 ) 

2881 exception.args = (msg,) + exception.args[1:] 

2882 

2883 

2884class PipelineStrategy(AbstractStrategy): 

2885 def __init__(self, pipe: ClusterPipeline): 

2886 super().__init__(pipe) 

2887 self.command_flags = pipe.command_flags 

2888 

2889 def execute_command(self, *args, **kwargs): 

2890 return self.pipeline_execute_command(*args, **kwargs) 

2891 

2892 def _raise_first_error(self, stack): 

2893 """ 

2894 Raise the first exception on the stack 

2895 """ 

2896 for c in stack: 

2897 r = c.result 

2898 if isinstance(r, Exception): 

2899 self.annotate_exception(r, c.position + 1, c.args) 

2900 raise r 

2901 

2902 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2903 stack = self._command_queue 

2904 if not stack: 

2905 return [] 

2906 

2907 try: 

2908 return self.send_cluster_commands(stack, raise_on_error) 

2909 finally: 

2910 self.reset() 

2911 

2912 def reset(self): 

2913 """ 

2914 Reset back to empty pipeline. 

2915 """ 

2916 self._command_queue = [] 

2917 

2918 def send_cluster_commands( 

2919 self, stack, raise_on_error=True, allow_redirections=True 

2920 ): 

2921 """ 

2922 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling. 

2923 

2924 If one of the retryable exceptions has been thrown we assume that: 

2925 - connection_pool was disconnected 

2926 - connection_pool was reset 

2927 - refresh_table_asap set to True 

2928 

2929 It will try the number of times specified by 

2930 the retries in config option "self.retry" 

2931 which defaults to 3 unless manually configured. 

2932 

2933 If it reaches the number of times, the command will 

2934 raises ClusterDownException. 

2935 """ 

2936 if not stack: 

2937 return [] 

2938 retry_attempts = self._pipe.retry.get_retries() 

2939 while True: 

2940 try: 

2941 return self._send_cluster_commands( 

2942 stack, 

2943 raise_on_error=raise_on_error, 

2944 allow_redirections=allow_redirections, 

2945 ) 

2946 except RedisCluster.ERRORS_ALLOW_RETRY as e: 

2947 if retry_attempts > 0: 

2948 # Try again with the new cluster setup. All other errors 

2949 # should be raised. 

2950 retry_attempts -= 1 

2951 pass 

2952 else: 

2953 raise e 

2954 

2955 def _send_cluster_commands( 

2956 self, stack, raise_on_error=True, allow_redirections=True 

2957 ): 

2958 """ 

2959 Send a bunch of cluster commands to the redis cluster. 

2960 

2961 `allow_redirections` If the pipeline should follow 

2962 `ASK` & `MOVED` responses automatically. If set 

2963 to false it will raise RedisClusterException. 

2964 """ 

2965 # the first time sending the commands we send all of 

2966 # the commands that were queued up. 

2967 # if we have to run through it again, we only retry 

2968 # the commands that failed. 

2969 attempt = sorted(stack, key=lambda x: x.position) 

2970 is_default_node = False 

2971 # build a list of node objects based on node names we need to 

2972 nodes = {} 

2973 

2974 # as we move through each command that still needs to be processed, 

2975 # we figure out the slot number that command maps to, then from 

2976 # the slot determine the node. 

2977 for c in attempt: 

2978 command_policies = self._pipe._policy_resolver.resolve(c.args[0].lower()) 

2979 

2980 while True: 

2981 # refer to our internal node -> slot table that 

2982 # tells us where a given command should route to. 

2983 # (it might be possible we have a cached node that no longer 

2984 # exists in the cluster, which is why we do this in a loop) 

2985 passed_targets = c.options.pop("target_nodes", None) 

2986 if passed_targets and not self._is_nodes_flag(passed_targets): 

2987 target_nodes = self._parse_target_nodes(passed_targets) 

2988 

2989 if not command_policies: 

2990 command_policies = CommandPolicies() 

2991 else: 

2992 if not command_policies: 

2993 command = c.args[0].upper() 

2994 if ( 

2995 len(c.args) >= 2 

2996 and f"{c.args[0]} {c.args[1]}".upper() 

2997 in self._pipe.command_flags 

2998 ): 

2999 command = f"{c.args[0]} {c.args[1]}".upper() 

3000 

3001 # We only could resolve key properties if command is not 

3002 # in a list of pre-defined request policies 

3003 command_flag = self.command_flags.get(command) 

3004 if not command_flag: 

3005 # Fallback to default policy 

3006 if not self._pipe.get_default_node(): 

3007 keys = None 

3008 else: 

3009 keys = self._pipe._get_command_keys(*c.args) 

3010 if not keys or len(keys) == 0: 

3011 command_policies = CommandPolicies() 

3012 else: 

3013 command_policies = CommandPolicies( 

3014 request_policy=RequestPolicy.DEFAULT_KEYED, 

3015 response_policy=ResponsePolicy.DEFAULT_KEYED, 

3016 ) 

3017 else: 

3018 if command_flag in self._pipe._command_flags_mapping: 

3019 command_policies = CommandPolicies( 

3020 request_policy=self._pipe._command_flags_mapping[ 

3021 command_flag 

3022 ] 

3023 ) 

3024 else: 

3025 command_policies = CommandPolicies() 

3026 

3027 target_nodes = self._determine_nodes( 

3028 *c.args, 

3029 request_policy=command_policies.request_policy, 

3030 node_flag=passed_targets, 

3031 ) 

3032 if not target_nodes: 

3033 raise RedisClusterException( 

3034 f"No targets were found to execute {c.args} command on" 

3035 ) 

3036 c.command_policies = command_policies 

3037 if len(target_nodes) > 1: 

3038 raise RedisClusterException( 

3039 f"Too many targets for command {c.args}" 

3040 ) 

3041 

3042 node = target_nodes[0] 

3043 if node == self._pipe.get_default_node(): 

3044 is_default_node = True 

3045 

3046 # now that we know the name of the node 

3047 # ( it's just a string in the form of host:port ) 

3048 # we can build a list of commands for each node. 

3049 node_name = node.name 

3050 if node_name not in nodes: 

3051 redis_node = self._pipe.get_redis_connection(node) 

3052 try: 

3053 connection = get_connection(redis_node) 

3054 except (ConnectionError, TimeoutError): 

3055 for n in nodes.values(): 

3056 n.connection_pool.release(n.connection) 

3057 # Connection retries are being handled in the node's 

3058 # Retry object. Reinitialize the node -> slot table. 

3059 self._nodes_manager.initialize() 

3060 if is_default_node: 

3061 self._pipe.replace_default_node() 

3062 raise 

3063 nodes[node_name] = NodeCommands( 

3064 redis_node.parse_response, 

3065 redis_node.connection_pool, 

3066 connection, 

3067 ) 

3068 nodes[node_name].append(c) 

3069 break 

3070 

3071 # send the commands in sequence. 

3072 # we write to all the open sockets for each node first, 

3073 # before reading anything 

3074 # this allows us to flush all the requests out across the 

3075 # network 

3076 # so that we can read them from different sockets as they come back. 

3077 # we dont' multiplex on the sockets as they come available, 

3078 # but that shouldn't make too much difference. 

3079 try: 

3080 node_commands = nodes.values() 

3081 for n in node_commands: 

3082 n.write() 

3083 

3084 for n in node_commands: 

3085 n.read() 

3086 finally: 

3087 # release all of the redis connections we allocated earlier 

3088 # back into the connection pool. 

3089 # we used to do this step as part of a try/finally block, 

3090 # but it is really dangerous to 

3091 # release connections back into the pool if for some 

3092 # reason the socket has data still left in it 

3093 # from a previous operation. The write and 

3094 # read operations already have try/catch around them for 

3095 # all known types of errors including connection 

3096 # and socket level errors. 

3097 # So if we hit an exception, something really bad 

3098 # happened and putting any oF 

3099 # these connections back into the pool is a very bad idea. 

3100 # the socket might have unread buffer still sitting in it, 

3101 # and then the next time we read from it we pass the 

3102 # buffered result back from a previous command and 

3103 # every single request after to that connection will always get 

3104 # a mismatched result. 

3105 for n in nodes.values(): 

3106 n.connection_pool.release(n.connection) 

3107 

3108 # if the response isn't an exception it is a 

3109 # valid response from the node 

3110 # we're all done with that command, YAY! 

3111 # if we have more commands to attempt, we've run into problems. 

3112 # collect all the commands we are allowed to retry. 

3113 # (MOVED, ASK, or connection errors or timeout errors) 

3114 attempt = sorted( 

3115 ( 

3116 c 

3117 for c in attempt 

3118 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

3119 ), 

3120 key=lambda x: x.position, 

3121 ) 

3122 if attempt and allow_redirections: 

3123 # RETRY MAGIC HAPPENS HERE! 

3124 # send these remaining commands one at a time using `execute_command` 

3125 # in the main client. This keeps our retry logic 

3126 # in one place mostly, 

3127 # and allows us to be more confident in correctness of behavior. 

3128 # at this point any speed gains from pipelining have been lost 

3129 # anyway, so we might as well make the best 

3130 # attempt to get the correct behavior. 

3131 # 

3132 # The client command will handle retries for each 

3133 # individual command sequentially as we pass each 

3134 # one into `execute_command`. Any exceptions 

3135 # that bubble out should only appear once all 

3136 # retries have been exhausted. 

3137 # 

3138 # If a lot of commands have failed, we'll be setting the 

3139 # flag to rebuild the slots table from scratch. 

3140 # So MOVED errors should correct themselves fairly quickly. 

3141 self._pipe.reinitialize_counter += 1 

3142 if self._pipe._should_reinitialized(): 

3143 self._nodes_manager.initialize() 

3144 if is_default_node: 

3145 self._pipe.replace_default_node() 

3146 for c in attempt: 

3147 try: 

3148 # send each command individually like we 

3149 # do in the main client. 

3150 c.result = self._pipe.parent_execute_command(*c.args, **c.options) 

3151 except RedisError as e: 

3152 c.result = e 

3153 

3154 # turn the response back into a simple flat array that corresponds 

3155 # to the sequence of commands issued in the stack in pipeline.execute() 

3156 response = [] 

3157 for c in sorted(stack, key=lambda x: x.position): 

3158 if c.args[0] in self._pipe.cluster_response_callbacks: 

3159 # Remove keys entry, it needs only for cache. 

3160 c.options.pop("keys", None) 

3161 c.result = self._pipe._policies_callback_mapping[ 

3162 c.command_policies.response_policy 

3163 ]( 

3164 self._pipe.cluster_response_callbacks[c.args[0]]( 

3165 c.result, **c.options 

3166 ) 

3167 ) 

3168 response.append(c.result) 

3169 

3170 if raise_on_error: 

3171 self._raise_first_error(stack) 

3172 

3173 return response 

3174 

3175 def _is_nodes_flag(self, target_nodes): 

3176 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags 

3177 

3178 def _parse_target_nodes(self, target_nodes): 

3179 if isinstance(target_nodes, list): 

3180 nodes = target_nodes 

3181 elif isinstance(target_nodes, ClusterNode): 

3182 # Supports passing a single ClusterNode as a variable 

3183 nodes = [target_nodes] 

3184 elif isinstance(target_nodes, dict): 

3185 # Supports dictionaries of the format {node_name: node}. 

3186 # It enables to execute commands with multi nodes as follows: 

3187 # rc.cluster_save_config(rc.get_primaries()) 

3188 nodes = target_nodes.values() 

3189 else: 

3190 raise TypeError( 

3191 "target_nodes type can be one of the following: " 

3192 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

3193 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

3194 f"The passed type is {type(target_nodes)}" 

3195 ) 

3196 return nodes 

3197 

3198 def _determine_nodes( 

3199 self, *args, request_policy: RequestPolicy, **kwargs 

3200 ) -> List["ClusterNode"]: 

3201 # Determine which nodes should be executed the command on. 

3202 # Returns a list of target nodes. 

3203 command = args[0].upper() 

3204 if ( 

3205 len(args) >= 2 

3206 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags 

3207 ): 

3208 command = f"{args[0]} {args[1]}".upper() 

3209 

3210 nodes_flag = kwargs.pop("nodes_flag", None) 

3211 if nodes_flag is not None: 

3212 # nodes flag passed by the user 

3213 command_flag = nodes_flag 

3214 else: 

3215 # get the nodes group for this command if it was predefined 

3216 command_flag = self._pipe.command_flags.get(command) 

3217 

3218 if command_flag in self._pipe._command_flags_mapping: 

3219 request_policy = self._pipe._command_flags_mapping[command_flag] 

3220 

3221 policy_callback = self._pipe._policies_callback_mapping[request_policy] 

3222 

3223 if request_policy == RequestPolicy.DEFAULT_KEYED: 

3224 nodes = policy_callback(command, *args) 

3225 elif request_policy == RequestPolicy.MULTI_SHARD: 

3226 nodes = policy_callback(*args, **kwargs) 

3227 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

3228 nodes = policy_callback(args[0]) 

3229 else: 

3230 nodes = policy_callback() 

3231 

3232 if args[0].lower() == "ft.aggregate": 

3233 self._aggregate_nodes = nodes 

3234 

3235 return nodes 

3236 

3237 def multi(self): 

3238 raise RedisClusterException( 

3239 "method multi() is not supported outside of transactional context" 

3240 ) 

3241 

3242 def discard(self): 

3243 raise RedisClusterException( 

3244 "method discard() is not supported outside of transactional context" 

3245 ) 

3246 

3247 def watch(self, *names): 

3248 raise RedisClusterException( 

3249 "method watch() is not supported outside of transactional context" 

3250 ) 

3251 

3252 def unwatch(self, *names): 

3253 raise RedisClusterException( 

3254 "method unwatch() is not supported outside of transactional context" 

3255 ) 

3256 

3257 def delete(self, *names): 

3258 if len(names) != 1: 

3259 raise RedisClusterException( 

3260 "deleting multiple keys is not implemented in pipeline command" 

3261 ) 

3262 

3263 return self.execute_command("DEL", names[0]) 

3264 

3265 def unlink(self, *names): 

3266 if len(names) != 1: 

3267 raise RedisClusterException( 

3268 "unlinking multiple keys is not implemented in pipeline command" 

3269 ) 

3270 

3271 return self.execute_command("UNLINK", names[0]) 

3272 

3273 

3274class TransactionStrategy(AbstractStrategy): 

3275 NO_SLOTS_COMMANDS = {"UNWATCH"} 

3276 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

3277 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

3278 SLOT_REDIRECT_ERRORS = (AskError, MovedError) 

3279 CONNECTION_ERRORS = ( 

3280 ConnectionError, 

3281 OSError, 

3282 ClusterDownError, 

3283 SlotNotCoveredError, 

3284 ) 

3285 

3286 def __init__(self, pipe: ClusterPipeline): 

3287 super().__init__(pipe) 

3288 self._explicit_transaction = False 

3289 self._watching = False 

3290 self._pipeline_slots: Set[int] = set() 

3291 self._transaction_connection: Optional[Connection] = None 

3292 self._executing = False 

3293 self._retry = copy(self._pipe.retry) 

3294 self._retry.update_supported_errors( 

3295 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS 

3296 ) 

3297 

3298 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]: 

3299 """ 

3300 Find a connection for a pipeline transaction. 

3301 

3302 For running an atomic transaction, watch keys ensure that contents have not been 

3303 altered as long as the watch commands for those keys were sent over the same 

3304 connection. So once we start watching a key, we fetch a connection to the 

3305 node that owns that slot and reuse it. 

3306 """ 

3307 if not self._pipeline_slots: 

3308 raise RedisClusterException( 

3309 "At least a command with a key is needed to identify a node" 

3310 ) 

3311 

3312 node: ClusterNode = self._nodes_manager.get_node_from_slot( 

3313 list(self._pipeline_slots)[0], False 

3314 ) 

3315 redis_node: Redis = self._pipe.get_redis_connection(node) 

3316 if self._transaction_connection: 

3317 if not redis_node.connection_pool.owns_connection( 

3318 self._transaction_connection 

3319 ): 

3320 previous_node = self._nodes_manager.find_connection_owner( 

3321 self._transaction_connection 

3322 ) 

3323 previous_node.connection_pool.release(self._transaction_connection) 

3324 self._transaction_connection = None 

3325 

3326 if not self._transaction_connection: 

3327 self._transaction_connection = get_connection(redis_node) 

3328 

3329 return redis_node, self._transaction_connection 

3330 

3331 def execute_command(self, *args, **kwargs): 

3332 slot_number: Optional[int] = None 

3333 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS: 

3334 slot_number = self._pipe.determine_slot(*args) 

3335 

3336 if ( 

3337 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS 

3338 ) and not self._explicit_transaction: 

3339 if args[0] == "WATCH": 

3340 self._validate_watch() 

3341 

3342 if slot_number is not None: 

3343 if self._pipeline_slots and slot_number not in self._pipeline_slots: 

3344 raise CrossSlotTransactionError( 

3345 "Cannot watch or send commands on different slots" 

3346 ) 

3347 

3348 self._pipeline_slots.add(slot_number) 

3349 elif args[0] not in self.NO_SLOTS_COMMANDS: 

3350 raise RedisClusterException( 

3351 f"Cannot identify slot number for command: {args[0]}," 

3352 "it cannot be triggered in a transaction" 

3353 ) 

3354 

3355 return self._immediate_execute_command(*args, **kwargs) 

3356 else: 

3357 if slot_number is not None: 

3358 self._pipeline_slots.add(slot_number) 

3359 

3360 return self.pipeline_execute_command(*args, **kwargs) 

3361 

3362 def _validate_watch(self): 

3363 if self._explicit_transaction: 

3364 raise RedisError("Cannot issue a WATCH after a MULTI") 

3365 

3366 self._watching = True 

3367 

3368 def _immediate_execute_command(self, *args, **options): 

3369 return self._retry.call_with_retry( 

3370 lambda: self._get_connection_and_send_command(*args, **options), 

3371 self._reinitialize_on_error, 

3372 ) 

3373 

3374 def _get_connection_and_send_command(self, *args, **options): 

3375 redis_node, connection = self._get_client_and_connection_for_transaction() 

3376 return self._send_command_parse_response( 

3377 connection, redis_node, args[0], *args, **options 

3378 ) 

3379 

3380 def _send_command_parse_response( 

3381 self, conn, redis_node: Redis, command_name, *args, **options 

3382 ): 

3383 """ 

3384 Send a command and parse the response 

3385 """ 

3386 

3387 conn.send_command(*args) 

3388 output = redis_node.parse_response(conn, command_name, **options) 

3389 

3390 if command_name in self.UNWATCH_COMMANDS: 

3391 self._watching = False 

3392 return output 

3393 

3394 def _reinitialize_on_error(self, error): 

3395 if self._watching: 

3396 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: 

3397 raise WatchError("Slot rebalancing occurred while watching keys") 

3398 

3399 if ( 

3400 type(error) in self.SLOT_REDIRECT_ERRORS 

3401 or type(error) in self.CONNECTION_ERRORS 

3402 ): 

3403 if self._transaction_connection: 

3404 self._transaction_connection = None 

3405 

3406 self._pipe.reinitialize_counter += 1 

3407 if self._pipe._should_reinitialized(): 

3408 self._nodes_manager.initialize() 

3409 self.reinitialize_counter = 0 

3410 else: 

3411 if isinstance(error, AskError): 

3412 self._nodes_manager.update_moved_exception(error) 

3413 

3414 self._executing = False 

3415 

3416 def _raise_first_error(self, responses, stack): 

3417 """ 

3418 Raise the first exception on the stack 

3419 """ 

3420 for r, cmd in zip(responses, stack): 

3421 if isinstance(r, Exception): 

3422 self.annotate_exception(r, cmd.position + 1, cmd.args) 

3423 raise r 

3424 

3425 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3426 stack = self._command_queue 

3427 if not stack and (not self._watching or not self._pipeline_slots): 

3428 return [] 

3429 

3430 return self._execute_transaction_with_retries(stack, raise_on_error) 

3431 

3432 def _execute_transaction_with_retries( 

3433 self, stack: List["PipelineCommand"], raise_on_error: bool 

3434 ): 

3435 return self._retry.call_with_retry( 

3436 lambda: self._execute_transaction(stack, raise_on_error), 

3437 self._reinitialize_on_error, 

3438 ) 

3439 

3440 def _execute_transaction( 

3441 self, stack: List["PipelineCommand"], raise_on_error: bool 

3442 ): 

3443 if len(self._pipeline_slots) > 1: 

3444 raise CrossSlotTransactionError( 

3445 "All keys involved in a cluster transaction must map to the same slot" 

3446 ) 

3447 

3448 self._executing = True 

3449 

3450 redis_node, connection = self._get_client_and_connection_for_transaction() 

3451 

3452 stack = chain( 

3453 [PipelineCommand(("MULTI",))], 

3454 stack, 

3455 [PipelineCommand(("EXEC",))], 

3456 ) 

3457 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options] 

3458 packed_commands = connection.pack_commands(commands) 

3459 connection.send_packed_command(packed_commands) 

3460 errors = [] 

3461 

3462 # parse off the response for MULTI 

3463 # NOTE: we need to handle ResponseErrors here and continue 

3464 # so that we read all the additional command messages from 

3465 # the socket 

3466 try: 

3467 redis_node.parse_response(connection, "MULTI") 

3468 except ResponseError as e: 

3469 self.annotate_exception(e, 0, "MULTI") 

3470 errors.append(e) 

3471 except self.CONNECTION_ERRORS as cluster_error: 

3472 self.annotate_exception(cluster_error, 0, "MULTI") 

3473 raise 

3474 

3475 # and all the other commands 

3476 for i, command in enumerate(self._command_queue): 

3477 if EMPTY_RESPONSE in command.options: 

3478 errors.append((i, command.options[EMPTY_RESPONSE])) 

3479 else: 

3480 try: 

3481 _ = redis_node.parse_response(connection, "_") 

3482 except self.SLOT_REDIRECT_ERRORS as slot_error: 

3483 self.annotate_exception(slot_error, i + 1, command.args) 

3484 errors.append(slot_error) 

3485 except self.CONNECTION_ERRORS as cluster_error: 

3486 self.annotate_exception(cluster_error, i + 1, command.args) 

3487 raise 

3488 except ResponseError as e: 

3489 self.annotate_exception(e, i + 1, command.args) 

3490 errors.append(e) 

3491 

3492 response = None 

3493 # parse the EXEC. 

3494 try: 

3495 response = redis_node.parse_response(connection, "EXEC") 

3496 except ExecAbortError: 

3497 if errors: 

3498 raise errors[0] 

3499 raise 

3500 

3501 self._executing = False 

3502 

3503 # EXEC clears any watched keys 

3504 self._watching = False 

3505 

3506 if response is None: 

3507 raise WatchError("Watched variable changed.") 

3508 

3509 # put any parse errors into the response 

3510 for i, e in errors: 

3511 response.insert(i, e) 

3512 

3513 if len(response) != len(self._command_queue): 

3514 raise InvalidPipelineStack( 

3515 "Unexpected response length for cluster pipeline EXEC." 

3516 " Command stack was {} but response had length {}".format( 

3517 [c.args[0] for c in self._command_queue], len(response) 

3518 ) 

3519 ) 

3520 

3521 # find any errors in the response and raise if necessary 

3522 if raise_on_error or len(errors) > 0: 

3523 self._raise_first_error( 

3524 response, 

3525 self._command_queue, 

3526 ) 

3527 

3528 # We have to run response callbacks manually 

3529 data = [] 

3530 for r, cmd in zip(response, self._command_queue): 

3531 if not isinstance(r, Exception): 

3532 command_name = cmd.args[0] 

3533 if command_name in self._pipe.cluster_response_callbacks: 

3534 r = self._pipe.cluster_response_callbacks[command_name]( 

3535 r, **cmd.options 

3536 ) 

3537 data.append(r) 

3538 return data 

3539 

3540 def reset(self): 

3541 self._command_queue = [] 

3542 

3543 # make sure to reset the connection state in the event that we were 

3544 # watching something 

3545 if self._transaction_connection: 

3546 try: 

3547 if self._watching: 

3548 # call this manually since our unwatch or 

3549 # immediate_execute_command methods can call reset() 

3550 self._transaction_connection.send_command("UNWATCH") 

3551 self._transaction_connection.read_response() 

3552 # we can safely return the connection to the pool here since we're 

3553 # sure we're no longer WATCHing anything 

3554 node = self._nodes_manager.find_connection_owner( 

3555 self._transaction_connection 

3556 ) 

3557 node.redis_connection.connection_pool.release( 

3558 self._transaction_connection 

3559 ) 

3560 self._transaction_connection = None 

3561 except self.CONNECTION_ERRORS: 

3562 # disconnect will also remove any previous WATCHes 

3563 if self._transaction_connection: 

3564 self._transaction_connection.disconnect() 

3565 

3566 # clean up the other instance attributes 

3567 self._watching = False 

3568 self._explicit_transaction = False 

3569 self._pipeline_slots = set() 

3570 self._executing = False 

3571 

3572 def send_cluster_commands( 

3573 self, stack, raise_on_error=True, allow_redirections=True 

3574 ): 

3575 raise NotImplementedError( 

3576 "send_cluster_commands cannot be executed in transactional context." 

3577 ) 

3578 

3579 def multi(self): 

3580 if self._explicit_transaction: 

3581 raise RedisError("Cannot issue nested calls to MULTI") 

3582 if self._command_queue: 

3583 raise RedisError( 

3584 "Commands without an initial WATCH have already been issued" 

3585 ) 

3586 self._explicit_transaction = True 

3587 

3588 def watch(self, *names): 

3589 if self._explicit_transaction: 

3590 raise RedisError("Cannot issue a WATCH after a MULTI") 

3591 

3592 return self.execute_command("WATCH", *names) 

3593 

3594 def unwatch(self): 

3595 if self._watching: 

3596 return self.execute_command("UNWATCH") 

3597 

3598 return True 

3599 

3600 def discard(self): 

3601 self.reset() 

3602 

3603 def delete(self, *names): 

3604 return self.execute_command("DEL", *names) 

3605 

3606 def unlink(self, *names): 

3607 return self.execute_command("UNLINK", *names)