Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/cluster.py: 19%

842 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:16 +0000

1import random 

2import socket 

3import sys 

4import threading 

5import time 

6from collections import OrderedDict 

7from typing import Any, Callable, Dict, List, Optional, Tuple, Union 

8 

9from redis.backoff import default_backoff 

10from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan 

11from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands 

12from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url 

13from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

14from redis.exceptions import ( 

15 AskError, 

16 AuthenticationError, 

17 ClusterCrossSlotError, 

18 ClusterDownError, 

19 ClusterError, 

20 ConnectionError, 

21 DataError, 

22 MasterDownError, 

23 MovedError, 

24 RedisClusterException, 

25 RedisError, 

26 ResponseError, 

27 SlotNotCoveredError, 

28 TimeoutError, 

29 TryAgainError, 

30) 

31from redis.lock import Lock 

32from redis.retry import Retry 

33from redis.utils import ( 

34 dict_merge, 

35 list_keys_to_dict, 

36 merge_result, 

37 safe_str, 

38 str_if_bytes, 

39) 

40 

41 

42def get_node_name(host: str, port: Union[str, int]) -> str: 

43 return f"{host}:{port}" 

44 

45 

46def get_connection(redis_node, *args, **options): 

47 return redis_node.connection or redis_node.connection_pool.get_connection( 

48 args[0], **options 

49 ) 

50 

51 

52def parse_scan_result(command, res, **options): 

53 cursors = {} 

54 ret = [] 

55 for node_name, response in res.items(): 

56 cursor, r = parse_scan(response, **options) 

57 cursors[node_name] = cursor 

58 ret += r 

59 

60 return cursors, ret 

61 

62 

63def parse_pubsub_numsub(command, res, **options): 

64 numsub_d = OrderedDict() 

65 for numsub_tups in res.values(): 

66 for channel, numsubbed in numsub_tups: 

67 try: 

68 numsub_d[channel] += numsubbed 

69 except KeyError: 

70 numsub_d[channel] = numsubbed 

71 

72 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

73 return ret_numsub 

74 

75 

76def parse_cluster_slots( 

77 resp: Any, **options: Any 

78) -> Dict[Tuple[int, int], Dict[str, Any]]: 

79 current_host = options.get("current_host", "") 

80 

81 def fix_server(*args: Any) -> Tuple[str, Any]: 

82 return str_if_bytes(args[0]) or current_host, args[1] 

83 

84 slots = {} 

85 for slot in resp: 

86 start, end, primary = slot[:3] 

87 replicas = slot[3:] 

88 slots[start, end] = { 

89 "primary": fix_server(*primary), 

90 "replicas": [fix_server(*replica) for replica in replicas], 

91 } 

92 

93 return slots 

94 

95 

96def parse_cluster_shards(resp, **options): 

97 """ 

98 Parse CLUSTER SHARDS response. 

99 """ 

100 shards = [] 

101 for x in resp: 

102 shard = {"slots": [], "nodes": []} 

103 for i in range(0, len(x[1]), 2): 

104 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

105 nodes = x[3] 

106 for node in nodes: 

107 dict_node = {} 

108 for i in range(0, len(node), 2): 

109 dict_node[node[i]] = node[i + 1] 

110 shard["nodes"].append(dict_node) 

111 shards.append(shard) 

112 

113 return shards 

114 

115 

116def parse_cluster_myshardid(resp, **options): 

117 """ 

118 Parse CLUSTER MYSHARDID response. 

119 """ 

120 return resp.decode("utf-8") 

121 

122 

123PRIMARY = "primary" 

124REPLICA = "replica" 

125SLOT_ID = "slot-id" 

126 

127REDIS_ALLOWED_KEYS = ( 

128 "charset", 

129 "connection_class", 

130 "connection_pool", 

131 "connection_pool_class", 

132 "client_name", 

133 "credential_provider", 

134 "db", 

135 "decode_responses", 

136 "encoding", 

137 "encoding_errors", 

138 "errors", 

139 "host", 

140 "max_connections", 

141 "nodes_flag", 

142 "redis_connect_func", 

143 "password", 

144 "port", 

145 "queue_class", 

146 "retry", 

147 "retry_on_timeout", 

148 "socket_connect_timeout", 

149 "socket_keepalive", 

150 "socket_keepalive_options", 

151 "socket_timeout", 

152 "ssl", 

153 "ssl_ca_certs", 

154 "ssl_ca_data", 

155 "ssl_certfile", 

156 "ssl_cert_reqs", 

157 "ssl_keyfile", 

158 "ssl_password", 

159 "unix_socket_path", 

160 "username", 

161) 

162KWARGS_DISABLED_KEYS = ("host", "port") 

163 

164 

165def cleanup_kwargs(**kwargs): 

166 """ 

167 Remove unsupported or disabled keys from kwargs 

168 """ 

169 connection_kwargs = { 

170 k: v 

171 for k, v in kwargs.items() 

172 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

173 } 

174 

175 return connection_kwargs 

176 

177 

178class ClusterParser(DefaultParser): 

179 EXCEPTION_CLASSES = dict_merge( 

180 DefaultParser.EXCEPTION_CLASSES, 

181 { 

182 "ASK": AskError, 

183 "TRYAGAIN": TryAgainError, 

184 "MOVED": MovedError, 

185 "CLUSTERDOWN": ClusterDownError, 

186 "CROSSSLOT": ClusterCrossSlotError, 

187 "MASTERDOWN": MasterDownError, 

188 }, 

189 ) 

190 

191 

192class AbstractRedisCluster: 

193 RedisClusterRequestTTL = 16 

194 

195 PRIMARIES = "primaries" 

196 REPLICAS = "replicas" 

197 ALL_NODES = "all" 

198 RANDOM = "random" 

199 DEFAULT_NODE = "default-node" 

200 

201 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

202 

203 COMMAND_FLAGS = dict_merge( 

204 list_keys_to_dict( 

205 [ 

206 "ACL CAT", 

207 "ACL DELUSER", 

208 "ACL DRYRUN", 

209 "ACL GENPASS", 

210 "ACL GETUSER", 

211 "ACL HELP", 

212 "ACL LIST", 

213 "ACL LOG", 

214 "ACL LOAD", 

215 "ACL SAVE", 

216 "ACL SETUSER", 

217 "ACL USERS", 

218 "ACL WHOAMI", 

219 "AUTH", 

220 "CLIENT LIST", 

221 "CLIENT SETNAME", 

222 "CLIENT GETNAME", 

223 "CONFIG SET", 

224 "CONFIG REWRITE", 

225 "CONFIG RESETSTAT", 

226 "TIME", 

227 "PUBSUB CHANNELS", 

228 "PUBSUB NUMPAT", 

229 "PUBSUB NUMSUB", 

230 "PING", 

231 "INFO", 

232 "SHUTDOWN", 

233 "KEYS", 

234 "DBSIZE", 

235 "BGSAVE", 

236 "SLOWLOG GET", 

237 "SLOWLOG LEN", 

238 "SLOWLOG RESET", 

239 "WAIT", 

240 "SAVE", 

241 "MEMORY PURGE", 

242 "MEMORY MALLOC-STATS", 

243 "MEMORY STATS", 

244 "LASTSAVE", 

245 "CLIENT TRACKINGINFO", 

246 "CLIENT PAUSE", 

247 "CLIENT UNPAUSE", 

248 "CLIENT UNBLOCK", 

249 "CLIENT ID", 

250 "CLIENT REPLY", 

251 "CLIENT GETREDIR", 

252 "CLIENT INFO", 

253 "CLIENT KILL", 

254 "READONLY", 

255 "READWRITE", 

256 "CLUSTER INFO", 

257 "CLUSTER MEET", 

258 "CLUSTER NODES", 

259 "CLUSTER REPLICAS", 

260 "CLUSTER RESET", 

261 "CLUSTER SET-CONFIG-EPOCH", 

262 "CLUSTER SLOTS", 

263 "CLUSTER SHARDS", 

264 "CLUSTER COUNT-FAILURE-REPORTS", 

265 "CLUSTER KEYSLOT", 

266 "COMMAND", 

267 "COMMAND COUNT", 

268 "COMMAND LIST", 

269 "COMMAND GETKEYS", 

270 "CONFIG GET", 

271 "DEBUG", 

272 "RANDOMKEY", 

273 "READONLY", 

274 "READWRITE", 

275 "TIME", 

276 "GRAPH.CONFIG", 

277 "LATENCY HISTORY", 

278 "LATENCY LATEST", 

279 "LATENCY RESET", 

280 ], 

281 DEFAULT_NODE, 

282 ), 

283 list_keys_to_dict( 

284 [ 

285 "FLUSHALL", 

286 "FLUSHDB", 

287 "FUNCTION DELETE", 

288 "FUNCTION FLUSH", 

289 "FUNCTION LIST", 

290 "FUNCTION LOAD", 

291 "FUNCTION RESTORE", 

292 "SCAN", 

293 "SCRIPT EXISTS", 

294 "SCRIPT FLUSH", 

295 "SCRIPT LOAD", 

296 ], 

297 PRIMARIES, 

298 ), 

299 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

300 list_keys_to_dict( 

301 [ 

302 "CLUSTER COUNTKEYSINSLOT", 

303 "CLUSTER DELSLOTS", 

304 "CLUSTER DELSLOTSRANGE", 

305 "CLUSTER GETKEYSINSLOT", 

306 "CLUSTER SETSLOT", 

307 ], 

308 SLOT_ID, 

309 ), 

310 ) 

311 

312 SEARCH_COMMANDS = ( 

313 [ 

314 "FT.CREATE", 

315 "FT.SEARCH", 

316 "FT.AGGREGATE", 

317 "FT.EXPLAIN", 

318 "FT.EXPLAINCLI", 

319 "FT,PROFILE", 

320 "FT.ALTER", 

321 "FT.DROPINDEX", 

322 "FT.ALIASADD", 

323 "FT.ALIASUPDATE", 

324 "FT.ALIASDEL", 

325 "FT.TAGVALS", 

326 "FT.SUGADD", 

327 "FT.SUGGET", 

328 "FT.SUGDEL", 

329 "FT.SUGLEN", 

330 "FT.SYNUPDATE", 

331 "FT.SYNDUMP", 

332 "FT.SPELLCHECK", 

333 "FT.DICTADD", 

334 "FT.DICTDEL", 

335 "FT.DICTDUMP", 

336 "FT.INFO", 

337 "FT._LIST", 

338 "FT.CONFIG", 

339 "FT.ADD", 

340 "FT.DEL", 

341 "FT.DROP", 

342 "FT.GET", 

343 "FT.MGET", 

344 "FT.SYNADD", 

345 ], 

346 ) 

347 

348 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

349 "CLUSTER SLOTS": parse_cluster_slots, 

350 "CLUSTER SHARDS": parse_cluster_shards, 

351 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

352 } 

353 

354 RESULT_CALLBACKS = dict_merge( 

355 list_keys_to_dict(["PUBSUB NUMSUB"], parse_pubsub_numsub), 

356 list_keys_to_dict( 

357 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

358 ), 

359 list_keys_to_dict(["KEYS", "PUBSUB CHANNELS"], merge_result), 

360 list_keys_to_dict( 

361 [ 

362 "PING", 

363 "CONFIG SET", 

364 "CONFIG REWRITE", 

365 "CONFIG RESETSTAT", 

366 "CLIENT SETNAME", 

367 "BGSAVE", 

368 "SLOWLOG RESET", 

369 "SAVE", 

370 "MEMORY PURGE", 

371 "CLIENT PAUSE", 

372 "CLIENT UNPAUSE", 

373 ], 

374 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

375 ), 

376 list_keys_to_dict( 

377 ["DBSIZE", "WAIT"], 

378 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

379 ), 

380 list_keys_to_dict( 

381 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

382 ), 

383 list_keys_to_dict(["SCAN"], parse_scan_result), 

384 list_keys_to_dict( 

385 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

386 ), 

387 list_keys_to_dict( 

388 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

389 ), 

390 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

391 ) 

392 

393 ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError) 

394 

395 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

396 """Replace the default cluster node. 

397 A random cluster node will be chosen if target_node isn't passed, and primaries 

398 will be prioritized. The default node will not be changed if there are no other 

399 nodes in the cluster. 

400 

401 Args: 

402 target_node (ClusterNode, optional): Target node to replace the default 

403 node. Defaults to None. 

404 """ 

405 if target_node: 

406 self.nodes_manager.default_node = target_node 

407 else: 

408 curr_node = self.get_default_node() 

409 primaries = [node for node in self.get_primaries() if node != curr_node] 

410 if primaries: 

411 # Choose a primary if the cluster contains different primaries 

412 self.nodes_manager.default_node = random.choice(primaries) 

413 else: 

414 # Otherwise, hoose a primary if the cluster contains different primaries 

415 replicas = [node for node in self.get_replicas() if node != curr_node] 

416 if replicas: 

417 self.nodes_manager.default_node = random.choice(replicas) 

418 

419 

420class RedisCluster(AbstractRedisCluster, RedisClusterCommands): 

421 @classmethod 

422 def from_url(cls, url, **kwargs): 

423 """ 

424 Return a Redis client object configured from the given URL 

425 

426 For example:: 

427 

428 redis://[[username]:[password]]@localhost:6379/0 

429 rediss://[[username]:[password]]@localhost:6379/0 

430 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

431 

432 Three URL schemes are supported: 

433 

434 - `redis://` creates a TCP socket connection. See more at: 

435 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

436 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

437 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

438 - ``unix://``: creates a Unix Domain Socket connection. 

439 

440 The username, password, hostname, path and all querystring values 

441 are passed through urllib.parse.unquote in order to replace any 

442 percent-encoded values with their corresponding characters. 

443 

444 There are several ways to specify a database number. The first value 

445 found will be used: 

446 

447 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

448 2. If using the redis:// or rediss:// schemes, the path argument 

449 of the url, e.g. redis://localhost/0 

450 3. A ``db`` keyword argument to this function. 

451 

452 If none of these options are specified, the default db=0 is used. 

453 

454 All querystring options are cast to their appropriate Python types. 

455 Boolean arguments can be specified with string values "True"/"False" 

456 or "Yes"/"No". Values that cannot be properly cast cause a 

457 ``ValueError`` to be raised. Once parsed, the querystring arguments 

458 and keyword arguments are passed to the ``ConnectionPool``'s 

459 class initializer. In the case of conflicting arguments, querystring 

460 arguments always win. 

461 

462 """ 

463 return cls(url=url, **kwargs) 

464 

465 def __init__( 

466 self, 

467 host: Optional[str] = None, 

468 port: int = 6379, 

469 startup_nodes: Optional[List["ClusterNode"]] = None, 

470 cluster_error_retry_attempts: int = 3, 

471 retry: Optional["Retry"] = None, 

472 require_full_coverage: bool = False, 

473 reinitialize_steps: int = 5, 

474 read_from_replicas: bool = False, 

475 dynamic_startup_nodes: bool = True, 

476 url: Optional[str] = None, 

477 address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None, 

478 **kwargs, 

479 ): 

480 """ 

481 Initialize a new RedisCluster client. 

482 

483 :param startup_nodes: 

484 List of nodes from which initial bootstrapping can be done 

485 :param host: 

486 Can be used to point to a startup node 

487 :param port: 

488 Can be used to point to a startup node 

489 :param require_full_coverage: 

490 When set to False (default value): the client will not require a 

491 full coverage of the slots. However, if not all slots are covered, 

492 and at least one node has 'cluster-require-full-coverage' set to 

493 'yes,' the server will throw a ClusterDownError for some key-based 

494 commands. See - 

495 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

496 When set to True: all slots must be covered to construct the 

497 cluster client. If not all slots are covered, RedisClusterException 

498 will be thrown. 

499 :param read_from_replicas: 

500 Enable read from replicas in READONLY mode. You can read possibly 

501 stale data. 

502 When set to true, read commands will be assigned between the 

503 primary and its replications in a Round-Robin manner. 

504 :param dynamic_startup_nodes: 

505 Set the RedisCluster's startup nodes to all of the discovered nodes. 

506 If true (default value), the cluster's discovered nodes will be used to 

507 determine the cluster nodes-slots mapping in the next topology refresh. 

508 It will remove the initial passed startup nodes if their endpoints aren't 

509 listed in the CLUSTER SLOTS output. 

510 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

511 specific IP addresses, it is best to set it to false. 

512 :param cluster_error_retry_attempts: 

513 Number of times to retry before raising an error when 

514 :class:`~.TimeoutError` or :class:`~.ConnectionError` or 

515 :class:`~.ClusterDownError` are encountered 

516 :param reinitialize_steps: 

517 Specifies the number of MOVED errors that need to occur before 

518 reinitializing the whole cluster topology. If a MOVED error occurs 

519 and the cluster does not need to be reinitialized on this current 

520 error handling, only the MOVED slot will be patched with the 

521 redirected node. 

522 To reinitialize the cluster on every MOVED error, set 

523 reinitialize_steps to 1. 

524 To avoid reinitializing the cluster on moved errors, set 

525 reinitialize_steps to 0. 

526 :param address_remap: 

527 An optional callable which, when provided with an internal network 

528 address of a node, e.g. a `(host, port)` tuple, will return the address 

529 where the node is reachable. This can be used to map the addresses at 

530 which the nodes _think_ they are, to addresses at which a client may 

531 reach them, such as when they sit behind a proxy. 

532 

533 :**kwargs: 

534 Extra arguments that will be sent into Redis instance when created 

535 (See Official redis-py doc for supported kwargs 

536 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

537 Some kwargs are not supported and will raise a 

538 RedisClusterException: 

539 - db (Redis do not support database SELECT in cluster mode) 

540 """ 

541 if startup_nodes is None: 

542 startup_nodes = [] 

543 

544 if "db" in kwargs: 

545 # Argument 'db' is not possible to use in cluster mode 

546 raise RedisClusterException( 

547 "Argument 'db' is not possible to use in cluster mode" 

548 ) 

549 

550 # Get the startup node/s 

551 from_url = False 

552 if url is not None: 

553 from_url = True 

554 url_options = parse_url(url) 

555 if "path" in url_options: 

556 raise RedisClusterException( 

557 "RedisCluster does not currently support Unix Domain " 

558 "Socket connections" 

559 ) 

560 if "db" in url_options and url_options["db"] != 0: 

561 # Argument 'db' is not possible to use in cluster mode 

562 raise RedisClusterException( 

563 "A ``db`` querystring option can only be 0 in cluster mode" 

564 ) 

565 kwargs.update(url_options) 

566 host = kwargs.get("host") 

567 port = kwargs.get("port", port) 

568 startup_nodes.append(ClusterNode(host, port)) 

569 elif host is not None and port is not None: 

570 startup_nodes.append(ClusterNode(host, port)) 

571 elif len(startup_nodes) == 0: 

572 # No startup node was provided 

573 raise RedisClusterException( 

574 "RedisCluster requires at least one node to discover the " 

575 "cluster. Please provide one of the followings:\n" 

576 "1. host and port, for example:\n" 

577 " RedisCluster(host='localhost', port=6379)\n" 

578 "2. list of startup nodes, for example:\n" 

579 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

580 " ClusterNode('localhost', 6378)])" 

581 ) 

582 # Update the connection arguments 

583 # Whenever a new connection is established, RedisCluster's on_connect 

584 # method should be run 

585 # If the user passed on_connect function we'll save it and run it 

586 # inside the RedisCluster.on_connect() function 

587 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

588 kwargs.update({"redis_connect_func": self.on_connect}) 

589 kwargs = cleanup_kwargs(**kwargs) 

590 if retry: 

591 self.retry = retry 

592 kwargs.update({"retry": self.retry}) 

593 else: 

594 kwargs.update({"retry": Retry(default_backoff(), 0)}) 

595 

596 self.encoder = Encoder( 

597 kwargs.get("encoding", "utf-8"), 

598 kwargs.get("encoding_errors", "strict"), 

599 kwargs.get("decode_responses", False), 

600 ) 

601 self.cluster_error_retry_attempts = cluster_error_retry_attempts 

602 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

603 self.node_flags = self.__class__.NODE_FLAGS.copy() 

604 self.read_from_replicas = read_from_replicas 

605 self.reinitialize_counter = 0 

606 self.reinitialize_steps = reinitialize_steps 

607 self.nodes_manager = NodesManager( 

608 startup_nodes=startup_nodes, 

609 from_url=from_url, 

610 require_full_coverage=require_full_coverage, 

611 dynamic_startup_nodes=dynamic_startup_nodes, 

612 address_remap=address_remap, 

613 **kwargs, 

614 ) 

615 

616 self.cluster_response_callbacks = CaseInsensitiveDict( 

617 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

618 ) 

619 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

620 self.commands_parser = CommandsParser(self) 

621 self._lock = threading.Lock() 

622 

623 def __enter__(self): 

624 return self 

625 

626 def __exit__(self, exc_type, exc_value, traceback): 

627 self.close() 

628 

629 def __del__(self): 

630 self.close() 

631 

632 def disconnect_connection_pools(self): 

633 for node in self.get_nodes(): 

634 if node.redis_connection: 

635 try: 

636 node.redis_connection.connection_pool.disconnect() 

637 except OSError: 

638 # Client was already disconnected. do nothing 

639 pass 

640 

641 def on_connect(self, connection): 

642 """ 

643 Initialize the connection, authenticate and select a database and send 

644 READONLY if it is set during object initialization. 

645 """ 

646 connection.set_parser(ClusterParser) 

647 connection.on_connect() 

648 

649 if self.read_from_replicas: 

650 # Sending READONLY command to server to configure connection as 

651 # readonly. Since each cluster node may change its server type due 

652 # to a failover, we should establish a READONLY connection 

653 # regardless of the server type. If this is a primary connection, 

654 # READONLY would not affect executing write commands. 

655 connection.send_command("READONLY") 

656 if str_if_bytes(connection.read_response()) != "OK": 

657 raise ConnectionError("READONLY command failed") 

658 

659 if self.user_on_connect_func is not None: 

660 self.user_on_connect_func(connection) 

661 

662 def get_redis_connection(self, node): 

663 if not node.redis_connection: 

664 with self._lock: 

665 if not node.redis_connection: 

666 self.nodes_manager.create_redis_connections([node]) 

667 return node.redis_connection 

668 

669 def get_node(self, host=None, port=None, node_name=None): 

670 return self.nodes_manager.get_node(host, port, node_name) 

671 

672 def get_primaries(self): 

673 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

674 

675 def get_replicas(self): 

676 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

677 

678 def get_random_node(self): 

679 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

680 

681 def get_nodes(self): 

682 return list(self.nodes_manager.nodes_cache.values()) 

683 

684 def get_node_from_key(self, key, replica=False): 

685 """ 

686 Get the node that holds the key's slot. 

687 If replica set to True but the slot doesn't have any replicas, None is 

688 returned. 

689 """ 

690 slot = self.keyslot(key) 

691 slot_cache = self.nodes_manager.slots_cache.get(slot) 

692 if slot_cache is None or len(slot_cache) == 0: 

693 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

694 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

695 return None 

696 elif replica: 

697 node_idx = 1 

698 else: 

699 # primary 

700 node_idx = 0 

701 

702 return slot_cache[node_idx] 

703 

704 def get_default_node(self): 

705 """ 

706 Get the cluster's default node 

707 """ 

708 return self.nodes_manager.default_node 

709 

710 def set_default_node(self, node): 

711 """ 

712 Set the default node of the cluster. 

713 :param node: 'ClusterNode' 

714 :return True if the default node was set, else False 

715 """ 

716 if node is None or self.get_node(node_name=node.name) is None: 

717 return False 

718 self.nodes_manager.default_node = node 

719 return True 

720 

721 def get_retry(self) -> Optional["Retry"]: 

722 return self.retry 

723 

724 def set_retry(self, retry: "Retry") -> None: 

725 self.retry = retry 

726 for node in self.get_nodes(): 

727 node.redis_connection.set_retry(retry) 

728 

729 def monitor(self, target_node=None): 

730 """ 

731 Returns a Monitor object for the specified target node. 

732 The default cluster node will be selected if no target node was 

733 specified. 

734 Monitor is useful for handling the MONITOR command to the redis server. 

735 next_command() method returns one command from monitor 

736 listen() method yields commands from monitor. 

737 """ 

738 if target_node is None: 

739 target_node = self.get_default_node() 

740 if target_node.redis_connection is None: 

741 raise RedisClusterException( 

742 f"Cluster Node {target_node.name} has no redis_connection" 

743 ) 

744 return target_node.redis_connection.monitor() 

745 

746 def pubsub(self, node=None, host=None, port=None, **kwargs): 

747 """ 

748 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

749 connected to the specified node 

750 """ 

751 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

752 

753 def pipeline(self, transaction=None, shard_hint=None): 

754 """ 

755 Cluster impl: 

756 Pipelines do not work in cluster mode the same way they 

757 do in normal mode. Create a clone of this object so 

758 that simulating pipelines will work correctly. Each 

759 command will be called directly when used and 

760 when calling execute() will only return the result stack. 

761 """ 

762 if shard_hint: 

763 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

764 

765 if transaction: 

766 raise RedisClusterException("transaction is deprecated in cluster mode") 

767 

768 return ClusterPipeline( 

769 nodes_manager=self.nodes_manager, 

770 commands_parser=self.commands_parser, 

771 startup_nodes=self.nodes_manager.startup_nodes, 

772 result_callbacks=self.result_callbacks, 

773 cluster_response_callbacks=self.cluster_response_callbacks, 

774 cluster_error_retry_attempts=self.cluster_error_retry_attempts, 

775 read_from_replicas=self.read_from_replicas, 

776 reinitialize_steps=self.reinitialize_steps, 

777 lock=self._lock, 

778 ) 

779 

780 def lock( 

781 self, 

782 name, 

783 timeout=None, 

784 sleep=0.1, 

785 blocking=True, 

786 blocking_timeout=None, 

787 lock_class=None, 

788 thread_local=True, 

789 ): 

790 """ 

791 Return a new Lock object using key ``name`` that mimics 

792 the behavior of threading.Lock. 

793 

794 If specified, ``timeout`` indicates a maximum life for the lock. 

795 By default, it will remain locked until release() is called. 

796 

797 ``sleep`` indicates the amount of time to sleep per loop iteration 

798 when the lock is in blocking mode and another client is currently 

799 holding the lock. 

800 

801 ``blocking`` indicates whether calling ``acquire`` should block until 

802 the lock has been acquired or to fail immediately, causing ``acquire`` 

803 to return False and the lock not being acquired. Defaults to True. 

804 Note this value can be overridden by passing a ``blocking`` 

805 argument to ``acquire``. 

806 

807 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

808 spend trying to acquire the lock. A value of ``None`` indicates 

809 continue trying forever. ``blocking_timeout`` can be specified as a 

810 float or integer, both representing the number of seconds to wait. 

811 

812 ``lock_class`` forces the specified lock implementation. Note that as 

813 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

814 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

815 you have created your own custom lock class. 

816 

817 ``thread_local`` indicates whether the lock token is placed in 

818 thread-local storage. By default, the token is placed in thread local 

819 storage so that a thread only sees its token, not a token set by 

820 another thread. Consider the following timeline: 

821 

822 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

823 thread-1 sets the token to "abc" 

824 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

825 Lock instance. 

826 time: 5, thread-1 has not yet completed. redis expires the lock 

827 key. 

828 time: 5, thread-2 acquired `my-lock` now that it's available. 

829 thread-2 sets the token to "xyz" 

830 time: 6, thread-1 finishes its work and calls release(). if the 

831 token is *not* stored in thread local storage, then 

832 thread-1 would see the token value as "xyz" and would be 

833 able to successfully release the thread-2's lock. 

834 

835 In some use cases it's necessary to disable thread local storage. For 

836 example, if you have code where one thread acquires a lock and passes 

837 that lock instance to a worker thread to release later. If thread 

838 local storage isn't disabled in this case, the worker thread won't see 

839 the token set by the thread that acquired the lock. Our assumption 

840 is that these cases aren't common and as such default to using 

841 thread local storage.""" 

842 if lock_class is None: 

843 lock_class = Lock 

844 return lock_class( 

845 self, 

846 name, 

847 timeout=timeout, 

848 sleep=sleep, 

849 blocking=blocking, 

850 blocking_timeout=blocking_timeout, 

851 thread_local=thread_local, 

852 ) 

853 

854 def set_response_callback(self, command, callback): 

855 """Set a custom Response Callback""" 

856 self.cluster_response_callbacks[command] = callback 

857 

858 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: 

859 # Determine which nodes should be executed the command on. 

860 # Returns a list of target nodes. 

861 command = args[0].upper() 

862 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

863 command = f"{args[0]} {args[1]}".upper() 

864 

865 nodes_flag = kwargs.pop("nodes_flag", None) 

866 if nodes_flag is not None: 

867 # nodes flag passed by the user 

868 command_flag = nodes_flag 

869 else: 

870 # get the nodes group for this command if it was predefined 

871 command_flag = self.command_flags.get(command) 

872 if command_flag == self.__class__.RANDOM: 

873 # return a random node 

874 return [self.get_random_node()] 

875 elif command_flag == self.__class__.PRIMARIES: 

876 # return all primaries 

877 return self.get_primaries() 

878 elif command_flag == self.__class__.REPLICAS: 

879 # return all replicas 

880 return self.get_replicas() 

881 elif command_flag == self.__class__.ALL_NODES: 

882 # return all nodes 

883 return self.get_nodes() 

884 elif command_flag == self.__class__.DEFAULT_NODE: 

885 # return the cluster's default node 

886 return [self.nodes_manager.default_node] 

887 elif command in self.__class__.SEARCH_COMMANDS[0]: 

888 return [self.nodes_manager.default_node] 

889 else: 

890 # get the node that holds the key's slot 

891 slot = self.determine_slot(*args) 

892 node = self.nodes_manager.get_node_from_slot( 

893 slot, self.read_from_replicas and command in READ_COMMANDS 

894 ) 

895 return [node] 

896 

897 def _should_reinitialized(self): 

898 # To reinitialize the cluster on every MOVED error, 

899 # set reinitialize_steps to 1. 

900 # To avoid reinitializing the cluster on moved errors, set 

901 # reinitialize_steps to 0. 

902 if self.reinitialize_steps == 0: 

903 return False 

904 else: 

905 return self.reinitialize_counter % self.reinitialize_steps == 0 

906 

907 def keyslot(self, key): 

908 """ 

909 Calculate keyslot for a given key. 

910 See Keys distribution model in https://redis.io/topics/cluster-spec 

911 """ 

912 k = self.encoder.encode(key) 

913 return key_slot(k) 

914 

915 def _get_command_keys(self, *args): 

916 """ 

917 Get the keys in the command. If the command has no keys in in, None is 

918 returned. 

919 

920 NOTE: Due to a bug in redis<7.0, this function does not work properly 

921 for EVAL or EVALSHA when the `numkeys` arg is 0. 

922 - issue: https://github.com/redis/redis/issues/9493 

923 - fix: https://github.com/redis/redis/pull/9733 

924 

925 So, don't use this function with EVAL or EVALSHA. 

926 """ 

927 redis_conn = self.get_default_node().redis_connection 

928 return self.commands_parser.get_keys(redis_conn, *args) 

929 

930 def determine_slot(self, *args): 

931 """ 

932 Figure out what slot to use based on args. 

933 

934 Raises a RedisClusterException if there's a missing key and we can't 

935 determine what slots to map the command to; or, if the keys don't 

936 all map to the same key slot. 

937 """ 

938 command = args[0] 

939 if self.command_flags.get(command) == SLOT_ID: 

940 # The command contains the slot ID 

941 return args[1] 

942 

943 # Get the keys in the command 

944 

945 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

946 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

947 # where `self._get_command_keys()` fails anyway. So, we special case 

948 # EVAL/EVALSHA. 

949 if command in ("EVAL", "EVALSHA"): 

950 # command syntax: EVAL "script body" num_keys ... 

951 if len(args) <= 2: 

952 raise RedisClusterException(f"Invalid args in command: {args}") 

953 num_actual_keys = args[2] 

954 eval_keys = args[3 : 3 + num_actual_keys] 

955 # if there are 0 keys, that means the script can be run on any node 

956 # so we can just return a random slot 

957 if len(eval_keys) == 0: 

958 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

959 keys = eval_keys 

960 else: 

961 keys = self._get_command_keys(*args) 

962 if keys is None or len(keys) == 0: 

963 # FCALL can call a function with 0 keys, that means the function 

964 # can be run on any node so we can just return a random slot 

965 if command in ("FCALL", "FCALL_RO"): 

966 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

967 raise RedisClusterException( 

968 "No way to dispatch this command to Redis Cluster. " 

969 "Missing key.\nYou can execute the command by specifying " 

970 f"target nodes.\nCommand: {args}" 

971 ) 

972 

973 # single key command 

974 if len(keys) == 1: 

975 return self.keyslot(keys[0]) 

976 

977 # multi-key command; we need to make sure all keys are mapped to 

978 # the same slot 

979 slots = {self.keyslot(key) for key in keys} 

980 if len(slots) != 1: 

981 raise RedisClusterException( 

982 f"{command} - all keys must map to the same key slot" 

983 ) 

984 

985 return slots.pop() 

986 

987 def get_encoder(self): 

988 """ 

989 Get the connections' encoder 

990 """ 

991 return self.encoder 

992 

993 def get_connection_kwargs(self): 

994 """ 

995 Get the connections' key-word arguments 

996 """ 

997 return self.nodes_manager.connection_kwargs 

998 

999 def _is_nodes_flag(self, target_nodes): 

1000 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1001 

1002 def _parse_target_nodes(self, target_nodes): 

1003 if isinstance(target_nodes, list): 

1004 nodes = target_nodes 

1005 elif isinstance(target_nodes, ClusterNode): 

1006 # Supports passing a single ClusterNode as a variable 

1007 nodes = [target_nodes] 

1008 elif isinstance(target_nodes, dict): 

1009 # Supports dictionaries of the format {node_name: node}. 

1010 # It enables to execute commands with multi nodes as follows: 

1011 # rc.cluster_save_config(rc.get_primaries()) 

1012 nodes = target_nodes.values() 

1013 else: 

1014 raise TypeError( 

1015 "target_nodes type can be one of the following: " 

1016 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1017 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1018 f"The passed type is {type(target_nodes)}" 

1019 ) 

1020 return nodes 

1021 

1022 def execute_command(self, *args, **kwargs): 

1023 """ 

1024 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1025 

1026 It will try the number of times specified by the config option 

1027 "self.cluster_error_retry_attempts" which defaults to 3 unless manually 

1028 configured. 

1029 

1030 If it reaches the number of times, the command will raise the exception 

1031 

1032 Key argument :target_nodes: can be passed with the following types: 

1033 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1034 ClusterNode 

1035 list<ClusterNode> 

1036 dict<Any, ClusterNode> 

1037 """ 

1038 target_nodes_specified = False 

1039 is_default_node = False 

1040 target_nodes = None 

1041 passed_targets = kwargs.pop("target_nodes", None) 

1042 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1043 target_nodes = self._parse_target_nodes(passed_targets) 

1044 target_nodes_specified = True 

1045 # If an error that allows retrying was thrown, the nodes and slots 

1046 # cache were reinitialized. We will retry executing the command with 

1047 # the updated cluster setup only when the target nodes can be 

1048 # determined again with the new cache tables. Therefore, when target 

1049 # nodes were passed to this function, we cannot retry the command 

1050 # execution since the nodes may not be valid anymore after the tables 

1051 # were reinitialized. So in case of passed target nodes, 

1052 # retry_attempts will be set to 0. 

1053 retry_attempts = ( 

1054 0 if target_nodes_specified else self.cluster_error_retry_attempts 

1055 ) 

1056 # Add one for the first execution 

1057 execute_attempts = 1 + retry_attempts 

1058 for _ in range(execute_attempts): 

1059 try: 

1060 res = {} 

1061 if not target_nodes_specified: 

1062 # Determine the nodes to execute the command on 

1063 target_nodes = self._determine_nodes( 

1064 *args, **kwargs, nodes_flag=passed_targets 

1065 ) 

1066 if not target_nodes: 

1067 raise RedisClusterException( 

1068 f"No targets were found to execute {args} command on" 

1069 ) 

1070 if ( 

1071 len(target_nodes) == 1 

1072 and target_nodes[0] == self.get_default_node() 

1073 ): 

1074 is_default_node = True 

1075 for node in target_nodes: 

1076 res[node.name] = self._execute_command(node, *args, **kwargs) 

1077 # Return the processed result 

1078 return self._process_result(args[0], res, **kwargs) 

1079 except Exception as e: 

1080 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1081 if is_default_node: 

1082 # Replace the default cluster node 

1083 self.replace_default_node() 

1084 # The nodes and slots cache were reinitialized. 

1085 # Try again with the new cluster setup. 

1086 retry_attempts -= 1 

1087 continue 

1088 else: 

1089 # raise the exception 

1090 raise e 

1091 

1092 def _execute_command(self, target_node, *args, **kwargs): 

1093 """ 

1094 Send a command to a node in the cluster 

1095 """ 

1096 command = args[0] 

1097 redis_node = None 

1098 connection = None 

1099 redirect_addr = None 

1100 asking = False 

1101 moved = False 

1102 ttl = int(self.RedisClusterRequestTTL) 

1103 

1104 while ttl > 0: 

1105 ttl -= 1 

1106 try: 

1107 if asking: 

1108 target_node = self.get_node(node_name=redirect_addr) 

1109 elif moved: 

1110 # MOVED occurred and the slots cache was updated, 

1111 # refresh the target node 

1112 slot = self.determine_slot(*args) 

1113 target_node = self.nodes_manager.get_node_from_slot( 

1114 slot, self.read_from_replicas and command in READ_COMMANDS 

1115 ) 

1116 moved = False 

1117 

1118 redis_node = self.get_redis_connection(target_node) 

1119 connection = get_connection(redis_node, *args, **kwargs) 

1120 if asking: 

1121 connection.send_command("ASKING") 

1122 redis_node.parse_response(connection, "ASKING", **kwargs) 

1123 asking = False 

1124 

1125 connection.send_command(*args) 

1126 response = redis_node.parse_response(connection, command, **kwargs) 

1127 if command in self.cluster_response_callbacks: 

1128 response = self.cluster_response_callbacks[command]( 

1129 response, **kwargs 

1130 ) 

1131 return response 

1132 except AuthenticationError: 

1133 raise 

1134 except (ConnectionError, TimeoutError) as e: 

1135 # Connection retries are being handled in the node's 

1136 # Retry object. 

1137 # ConnectionError can also be raised if we couldn't get a 

1138 # connection from the pool before timing out, so check that 

1139 # this is an actual connection before attempting to disconnect. 

1140 if connection is not None: 

1141 connection.disconnect() 

1142 

1143 # Remove the failed node from the startup nodes before we try 

1144 # to reinitialize the cluster 

1145 self.nodes_manager.startup_nodes.pop(target_node.name, None) 

1146 # Reset the cluster node's connection 

1147 target_node.redis_connection = None 

1148 self.nodes_manager.initialize() 

1149 raise e 

1150 except MovedError as e: 

1151 # First, we will try to patch the slots/nodes cache with the 

1152 # redirected node output and try again. If MovedError exceeds 

1153 # 'reinitialize_steps' number of times, we will force 

1154 # reinitializing the tables, and then try again. 

1155 # 'reinitialize_steps' counter will increase faster when 

1156 # the same client object is shared between multiple threads. To 

1157 # reduce the frequency you can set this variable in the 

1158 # RedisCluster constructor. 

1159 self.reinitialize_counter += 1 

1160 if self._should_reinitialized(): 

1161 self.nodes_manager.initialize() 

1162 # Reset the counter 

1163 self.reinitialize_counter = 0 

1164 else: 

1165 self.nodes_manager.update_moved_exception(e) 

1166 moved = True 

1167 except TryAgainError: 

1168 if ttl < self.RedisClusterRequestTTL / 2: 

1169 time.sleep(0.05) 

1170 except AskError as e: 

1171 redirect_addr = get_node_name(host=e.host, port=e.port) 

1172 asking = True 

1173 except ClusterDownError as e: 

1174 # ClusterDownError can occur during a failover and to get 

1175 # self-healed, we will try to reinitialize the cluster layout 

1176 # and retry executing the command 

1177 time.sleep(0.25) 

1178 self.nodes_manager.initialize() 

1179 raise e 

1180 except ResponseError: 

1181 raise 

1182 except Exception as e: 

1183 if connection: 

1184 connection.disconnect() 

1185 raise e 

1186 finally: 

1187 if connection is not None: 

1188 redis_node.connection_pool.release(connection) 

1189 

1190 raise ClusterError("TTL exhausted.") 

1191 

1192 def close(self): 

1193 try: 

1194 with self._lock: 

1195 if self.nodes_manager: 

1196 self.nodes_manager.close() 

1197 except AttributeError: 

1198 # RedisCluster's __init__ can fail before nodes_manager is set 

1199 pass 

1200 

1201 def _process_result(self, command, res, **kwargs): 

1202 """ 

1203 Process the result of the executed command. 

1204 The function would return a dict or a single value. 

1205 

1206 :type command: str 

1207 :type res: dict 

1208 

1209 `res` should be in the following format: 

1210 Dict<node_name, command_result> 

1211 """ 

1212 if command in self.result_callbacks: 

1213 return self.result_callbacks[command](command, res, **kwargs) 

1214 elif len(res) == 1: 

1215 # When we execute the command on a single node, we can 

1216 # remove the dictionary and return a single response 

1217 return list(res.values())[0] 

1218 else: 

1219 return res 

1220 

1221 def load_external_module(self, funcname, func): 

1222 """ 

1223 This function can be used to add externally defined redis modules, 

1224 and their namespaces to the redis client. 

1225 

1226 ``funcname`` - A string containing the name of the function to create 

1227 ``func`` - The function, being added to this class. 

1228 """ 

1229 setattr(self, funcname, func) 

1230 

1231 

1232class ClusterNode: 

1233 def __init__(self, host, port, server_type=None, redis_connection=None): 

1234 if host == "localhost": 

1235 host = socket.gethostbyname(host) 

1236 

1237 self.host = host 

1238 self.port = port 

1239 self.name = get_node_name(host, port) 

1240 self.server_type = server_type 

1241 self.redis_connection = redis_connection 

1242 

1243 def __repr__(self): 

1244 return ( 

1245 f"[host={self.host}," 

1246 f"port={self.port}," 

1247 f"name={self.name}," 

1248 f"server_type={self.server_type}," 

1249 f"redis_connection={self.redis_connection}]" 

1250 ) 

1251 

1252 def __eq__(self, obj): 

1253 return isinstance(obj, ClusterNode) and obj.name == self.name 

1254 

1255 def __del__(self): 

1256 if self.redis_connection is not None: 

1257 self.redis_connection.close() 

1258 

1259 

1260class LoadBalancer: 

1261 """ 

1262 Round-Robin Load Balancing 

1263 """ 

1264 

1265 def __init__(self, start_index: int = 0) -> None: 

1266 self.primary_to_idx = {} 

1267 self.start_index = start_index 

1268 

1269 def get_server_index(self, primary: str, list_size: int) -> int: 

1270 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

1271 # Update the index 

1272 self.primary_to_idx[primary] = (server_index + 1) % list_size 

1273 return server_index 

1274 

1275 def reset(self) -> None: 

1276 self.primary_to_idx.clear() 

1277 

1278 

1279class NodesManager: 

1280 def __init__( 

1281 self, 

1282 startup_nodes, 

1283 from_url=False, 

1284 require_full_coverage=False, 

1285 lock=None, 

1286 dynamic_startup_nodes=True, 

1287 connection_pool_class=ConnectionPool, 

1288 address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None, 

1289 **kwargs, 

1290 ): 

1291 self.nodes_cache = {} 

1292 self.slots_cache = {} 

1293 self.startup_nodes = {} 

1294 self.default_node = None 

1295 self.populate_startup_nodes(startup_nodes) 

1296 self.from_url = from_url 

1297 self._require_full_coverage = require_full_coverage 

1298 self._dynamic_startup_nodes = dynamic_startup_nodes 

1299 self.connection_pool_class = connection_pool_class 

1300 self.address_remap = address_remap 

1301 self._moved_exception = None 

1302 self.connection_kwargs = kwargs 

1303 self.read_load_balancer = LoadBalancer() 

1304 if lock is None: 

1305 lock = threading.Lock() 

1306 self._lock = lock 

1307 self.initialize() 

1308 

1309 def get_node(self, host=None, port=None, node_name=None): 

1310 """ 

1311 Get the requested node from the cluster's nodes. 

1312 nodes. 

1313 :return: ClusterNode if the node exists, else None 

1314 """ 

1315 if host and port: 

1316 # the user passed host and port 

1317 if host == "localhost": 

1318 host = socket.gethostbyname(host) 

1319 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1320 elif node_name: 

1321 return self.nodes_cache.get(node_name) 

1322 else: 

1323 return None 

1324 

1325 def update_moved_exception(self, exception): 

1326 self._moved_exception = exception 

1327 

1328 def _update_moved_slots(self): 

1329 """ 

1330 Update the slot's node with the redirected one 

1331 """ 

1332 e = self._moved_exception 

1333 redirected_node = self.get_node(host=e.host, port=e.port) 

1334 if redirected_node is not None: 

1335 # The node already exists 

1336 if redirected_node.server_type is not PRIMARY: 

1337 # Update the node's server type 

1338 redirected_node.server_type = PRIMARY 

1339 else: 

1340 # This is a new node, we will add it to the nodes cache 

1341 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

1342 self.nodes_cache[redirected_node.name] = redirected_node 

1343 if redirected_node in self.slots_cache[e.slot_id]: 

1344 # The MOVED error resulted from a failover, and the new slot owner 

1345 # had previously been a replica. 

1346 old_primary = self.slots_cache[e.slot_id][0] 

1347 # Update the old primary to be a replica and add it to the end of 

1348 # the slot's node list 

1349 old_primary.server_type = REPLICA 

1350 self.slots_cache[e.slot_id].append(old_primary) 

1351 # Remove the old replica, which is now a primary, from the slot's 

1352 # node list 

1353 self.slots_cache[e.slot_id].remove(redirected_node) 

1354 # Override the old primary with the new one 

1355 self.slots_cache[e.slot_id][0] = redirected_node 

1356 if self.default_node == old_primary: 

1357 # Update the default node with the new primary 

1358 self.default_node = redirected_node 

1359 else: 

1360 # The new slot owner is a new server, or a server from a different 

1361 # shard. We need to remove all current nodes from the slot's list 

1362 # (including replications) and add just the new node. 

1363 self.slots_cache[e.slot_id] = [redirected_node] 

1364 # Reset moved_exception 

1365 self._moved_exception = None 

1366 

1367 def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None): 

1368 """ 

1369 Gets a node that servers this hash slot 

1370 """ 

1371 if self._moved_exception: 

1372 with self._lock: 

1373 if self._moved_exception: 

1374 self._update_moved_slots() 

1375 

1376 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

1377 raise SlotNotCoveredError( 

1378 f'Slot "{slot}" not covered by the cluster. ' 

1379 f'"require_full_coverage={self._require_full_coverage}"' 

1380 ) 

1381 

1382 if read_from_replicas is True: 

1383 # get the server index in a Round-Robin manner 

1384 primary_name = self.slots_cache[slot][0].name 

1385 node_idx = self.read_load_balancer.get_server_index( 

1386 primary_name, len(self.slots_cache[slot]) 

1387 ) 

1388 elif ( 

1389 server_type is None 

1390 or server_type == PRIMARY 

1391 or len(self.slots_cache[slot]) == 1 

1392 ): 

1393 # return a primary 

1394 node_idx = 0 

1395 else: 

1396 # return a replica 

1397 # randomly choose one of the replicas 

1398 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

1399 

1400 return self.slots_cache[slot][node_idx] 

1401 

1402 def get_nodes_by_server_type(self, server_type): 

1403 """ 

1404 Get all nodes with the specified server type 

1405 :param server_type: 'primary' or 'replica' 

1406 :return: list of ClusterNode 

1407 """ 

1408 return [ 

1409 node 

1410 for node in self.nodes_cache.values() 

1411 if node.server_type == server_type 

1412 ] 

1413 

1414 def populate_startup_nodes(self, nodes): 

1415 """ 

1416 Populate all startup nodes and filters out any duplicates 

1417 """ 

1418 for n in nodes: 

1419 self.startup_nodes[n.name] = n 

1420 

1421 def check_slots_coverage(self, slots_cache): 

1422 # Validate if all slots are covered or if we should try next 

1423 # startup node 

1424 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

1425 if i not in slots_cache: 

1426 return False 

1427 return True 

1428 

1429 def create_redis_connections(self, nodes): 

1430 """ 

1431 This function will create a redis connection to all nodes in :nodes: 

1432 """ 

1433 for node in nodes: 

1434 if node.redis_connection is None: 

1435 node.redis_connection = self.create_redis_node( 

1436 host=node.host, port=node.port, **self.connection_kwargs 

1437 ) 

1438 

1439 def create_redis_node(self, host, port, **kwargs): 

1440 if self.from_url: 

1441 # Create a redis node with a costumed connection pool 

1442 kwargs.update({"host": host}) 

1443 kwargs.update({"port": port}) 

1444 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

1445 else: 

1446 r = Redis(host=host, port=port, **kwargs) 

1447 return r 

1448 

1449 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

1450 node_name = get_node_name(host, port) 

1451 # check if we already have this node in the tmp_nodes_cache 

1452 target_node = tmp_nodes_cache.get(node_name) 

1453 if target_node is None: 

1454 # before creating a new cluster node, check if the cluster node already 

1455 # exists in the current nodes cache and has a valid connection so we can 

1456 # reuse it 

1457 target_node = self.nodes_cache.get(node_name) 

1458 if target_node is None or target_node.redis_connection is None: 

1459 # create new cluster node for this cluster 

1460 target_node = ClusterNode(host, port, role) 

1461 if target_node.server_type != role: 

1462 target_node.server_type = role 

1463 

1464 return target_node 

1465 

1466 def initialize(self): 

1467 """ 

1468 Initializes the nodes cache, slots cache and redis connections. 

1469 :startup_nodes: 

1470 Responsible for discovering other nodes in the cluster 

1471 """ 

1472 self.reset() 

1473 tmp_nodes_cache = {} 

1474 tmp_slots = {} 

1475 disagreements = [] 

1476 startup_nodes_reachable = False 

1477 fully_covered = False 

1478 kwargs = self.connection_kwargs 

1479 exception = None 

1480 for startup_node in self.startup_nodes.values(): 

1481 try: 

1482 if startup_node.redis_connection: 

1483 r = startup_node.redis_connection 

1484 else: 

1485 # Create a new Redis connection 

1486 r = self.create_redis_node( 

1487 startup_node.host, startup_node.port, **kwargs 

1488 ) 

1489 self.startup_nodes[startup_node.name].redis_connection = r 

1490 # Make sure cluster mode is enabled on this node 

1491 if bool(r.info().get("cluster_enabled")) is False: 

1492 raise RedisClusterException( 

1493 "Cluster mode is not enabled on this node" 

1494 ) 

1495 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

1496 startup_nodes_reachable = True 

1497 except Exception as e: 

1498 # Try the next startup node. 

1499 # The exception is saved and raised only if we have no more nodes. 

1500 exception = e 

1501 continue 

1502 

1503 # CLUSTER SLOTS command results in the following output: 

1504 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

1505 # where each node contains the following list: [IP, port, node_id] 

1506 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

1507 # primary node of the first slot section. 

1508 # If there's only one server in the cluster, its ``host`` is '' 

1509 # Fix it to the host in startup_nodes 

1510 if ( 

1511 len(cluster_slots) == 1 

1512 and len(cluster_slots[0][2][0]) == 0 

1513 and len(self.startup_nodes) == 1 

1514 ): 

1515 cluster_slots[0][2][0] = startup_node.host 

1516 

1517 for slot in cluster_slots: 

1518 primary_node = slot[2] 

1519 host = str_if_bytes(primary_node[0]) 

1520 if host == "": 

1521 host = startup_node.host 

1522 port = int(primary_node[1]) 

1523 host, port = self.remap_host_port(host, port) 

1524 

1525 target_node = self._get_or_create_cluster_node( 

1526 host, port, PRIMARY, tmp_nodes_cache 

1527 ) 

1528 # add this node to the nodes cache 

1529 tmp_nodes_cache[target_node.name] = target_node 

1530 

1531 for i in range(int(slot[0]), int(slot[1]) + 1): 

1532 if i not in tmp_slots: 

1533 tmp_slots[i] = [] 

1534 tmp_slots[i].append(target_node) 

1535 replica_nodes = [slot[j] for j in range(3, len(slot))] 

1536 

1537 for replica_node in replica_nodes: 

1538 host = str_if_bytes(replica_node[0]) 

1539 port = replica_node[1] 

1540 host, port = self.remap_host_port(host, port) 

1541 

1542 target_replica_node = self._get_or_create_cluster_node( 

1543 host, port, REPLICA, tmp_nodes_cache 

1544 ) 

1545 tmp_slots[i].append(target_replica_node) 

1546 # add this node to the nodes cache 

1547 tmp_nodes_cache[ 

1548 target_replica_node.name 

1549 ] = target_replica_node 

1550 else: 

1551 # Validate that 2 nodes want to use the same slot cache 

1552 # setup 

1553 tmp_slot = tmp_slots[i][0] 

1554 if tmp_slot.name != target_node.name: 

1555 disagreements.append( 

1556 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

1557 ) 

1558 

1559 if len(disagreements) > 5: 

1560 raise RedisClusterException( 

1561 f"startup_nodes could not agree on a valid " 

1562 f'slots cache: {", ".join(disagreements)}' 

1563 ) 

1564 

1565 fully_covered = self.check_slots_coverage(tmp_slots) 

1566 if fully_covered: 

1567 # Don't need to continue to the next startup node if all 

1568 # slots are covered 

1569 break 

1570 

1571 if not startup_nodes_reachable: 

1572 raise RedisClusterException( 

1573 f"Redis Cluster cannot be connected. Please provide at least " 

1574 f"one reachable node: {str(exception)}" 

1575 ) from exception 

1576 

1577 # Create Redis connections to all nodes 

1578 self.create_redis_connections(list(tmp_nodes_cache.values())) 

1579 

1580 # Check if the slots are not fully covered 

1581 if not fully_covered and self._require_full_coverage: 

1582 # Despite the requirement that the slots be covered, there 

1583 # isn't a full coverage 

1584 raise RedisClusterException( 

1585 f"All slots are not covered after query all startup_nodes. " 

1586 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

1587 f"covered..." 

1588 ) 

1589 

1590 # Set the tmp variables to the real variables 

1591 self.nodes_cache = tmp_nodes_cache 

1592 self.slots_cache = tmp_slots 

1593 # Set the default node 

1594 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

1595 if self._dynamic_startup_nodes: 

1596 # Populate the startup nodes with all discovered nodes 

1597 self.startup_nodes = tmp_nodes_cache 

1598 # If initialize was called after a MovedError, clear it 

1599 self._moved_exception = None 

1600 

1601 def close(self): 

1602 self.default_node = None 

1603 for node in self.nodes_cache.values(): 

1604 if node.redis_connection: 

1605 node.redis_connection.close() 

1606 

1607 def reset(self): 

1608 try: 

1609 self.read_load_balancer.reset() 

1610 except TypeError: 

1611 # The read_load_balancer is None, do nothing 

1612 pass 

1613 

1614 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

1615 """ 

1616 Remap the host and port returned from the cluster to a different 

1617 internal value. Useful if the client is not connecting directly 

1618 to the cluster. 

1619 """ 

1620 if self.address_remap: 

1621 return self.address_remap((host, port)) 

1622 return host, port 

1623 

1624 

1625class ClusterPubSub(PubSub): 

1626 """ 

1627 Wrapper for PubSub class. 

1628 

1629 IMPORTANT: before using ClusterPubSub, read about the known limitations 

1630 with pubsub in Cluster mode and learn how to workaround them: 

1631 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html 

1632 """ 

1633 

1634 def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs): 

1635 """ 

1636 When a pubsub instance is created without specifying a node, a single 

1637 node will be transparently chosen for the pubsub connection on the 

1638 first command execution. The node will be determined by: 

1639 1. Hashing the channel name in the request to find its keyslot 

1640 2. Selecting a node that handles the keyslot: If read_from_replicas is 

1641 set to true, a replica can be selected. 

1642 

1643 :type redis_cluster: RedisCluster 

1644 :type node: ClusterNode 

1645 :type host: str 

1646 :type port: int 

1647 """ 

1648 self.node = None 

1649 self.set_pubsub_node(redis_cluster, node, host, port) 

1650 connection_pool = ( 

1651 None 

1652 if self.node is None 

1653 else redis_cluster.get_redis_connection(self.node).connection_pool 

1654 ) 

1655 self.cluster = redis_cluster 

1656 super().__init__( 

1657 **kwargs, connection_pool=connection_pool, encoder=redis_cluster.encoder 

1658 ) 

1659 

1660 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

1661 """ 

1662 The pubsub node will be set according to the passed node, host and port 

1663 When none of the node, host, or port are specified - the node is set 

1664 to None and will be determined by the keyslot of the channel in the 

1665 first command to be executed. 

1666 RedisClusterException will be thrown if the passed node does not exist 

1667 in the cluster. 

1668 If host is passed without port, or vice versa, a DataError will be 

1669 thrown. 

1670 :type cluster: RedisCluster 

1671 :type node: ClusterNode 

1672 :type host: str 

1673 :type port: int 

1674 """ 

1675 if node is not None: 

1676 # node is passed by the user 

1677 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

1678 pubsub_node = node 

1679 elif host is not None and port is not None: 

1680 # host and port passed by the user 

1681 node = cluster.get_node(host=host, port=port) 

1682 self._raise_on_invalid_node(cluster, node, host, port) 

1683 pubsub_node = node 

1684 elif any([host, port]) is True: 

1685 # only 'host' or 'port' passed 

1686 raise DataError("Passing a host requires passing a port, and vice versa") 

1687 else: 

1688 # nothing passed by the user. set node to None 

1689 pubsub_node = None 

1690 

1691 self.node = pubsub_node 

1692 

1693 def get_pubsub_node(self): 

1694 """ 

1695 Get the node that is being used as the pubsub connection 

1696 """ 

1697 return self.node 

1698 

1699 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

1700 """ 

1701 Raise a RedisClusterException if the node is None or doesn't exist in 

1702 the cluster. 

1703 """ 

1704 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

1705 raise RedisClusterException( 

1706 f"Node {host}:{port} doesn't exist in the cluster" 

1707 ) 

1708 

1709 def execute_command(self, *args, **kwargs): 

1710 """ 

1711 Execute a publish/subscribe command. 

1712 

1713 Taken code from redis-py and tweak to make it work within a cluster. 

1714 """ 

1715 # NOTE: don't parse the response in this function -- it could pull a 

1716 # legitimate message off the stack if the connection is already 

1717 # subscribed to one or more channels 

1718 

1719 if self.connection is None: 

1720 if self.connection_pool is None: 

1721 if len(args) > 1: 

1722 # Hash the first channel and get one of the nodes holding 

1723 # this slot 

1724 channel = args[1] 

1725 slot = self.cluster.keyslot(channel) 

1726 node = self.cluster.nodes_manager.get_node_from_slot( 

1727 slot, self.cluster.read_from_replicas 

1728 ) 

1729 else: 

1730 # Get a random node 

1731 node = self.cluster.get_random_node() 

1732 self.node = node 

1733 redis_connection = self.cluster.get_redis_connection(node) 

1734 self.connection_pool = redis_connection.connection_pool 

1735 self.connection = self.connection_pool.get_connection( 

1736 "pubsub", self.shard_hint 

1737 ) 

1738 # register a callback that re-subscribes to any channels we 

1739 # were listening to when we were disconnected 

1740 self.connection.register_connect_callback(self.on_connect) 

1741 connection = self.connection 

1742 self._execute(connection, connection.send_command, *args) 

1743 

1744 def get_redis_connection(self): 

1745 """ 

1746 Get the Redis connection of the pubsub connected node. 

1747 """ 

1748 if self.node is not None: 

1749 return self.node.redis_connection 

1750 

1751 

1752class ClusterPipeline(RedisCluster): 

1753 """ 

1754 Support for Redis pipeline 

1755 in cluster mode 

1756 """ 

1757 

1758 ERRORS_ALLOW_RETRY = ( 

1759 ConnectionError, 

1760 TimeoutError, 

1761 MovedError, 

1762 AskError, 

1763 TryAgainError, 

1764 ) 

1765 

1766 def __init__( 

1767 self, 

1768 nodes_manager: "NodesManager", 

1769 commands_parser: "CommandsParser", 

1770 result_callbacks: Optional[Dict[str, Callable]] = None, 

1771 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

1772 startup_nodes: Optional[List["ClusterNode"]] = None, 

1773 read_from_replicas: bool = False, 

1774 cluster_error_retry_attempts: int = 3, 

1775 reinitialize_steps: int = 5, 

1776 lock=None, 

1777 **kwargs, 

1778 ): 

1779 """ """ 

1780 self.command_stack = [] 

1781 self.nodes_manager = nodes_manager 

1782 self.commands_parser = commands_parser 

1783 self.refresh_table_asap = False 

1784 self.result_callbacks = ( 

1785 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

1786 ) 

1787 self.startup_nodes = startup_nodes if startup_nodes else [] 

1788 self.read_from_replicas = read_from_replicas 

1789 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

1790 self.cluster_response_callbacks = cluster_response_callbacks 

1791 self.cluster_error_retry_attempts = cluster_error_retry_attempts 

1792 self.reinitialize_counter = 0 

1793 self.reinitialize_steps = reinitialize_steps 

1794 self.encoder = Encoder( 

1795 kwargs.get("encoding", "utf-8"), 

1796 kwargs.get("encoding_errors", "strict"), 

1797 kwargs.get("decode_responses", False), 

1798 ) 

1799 if lock is None: 

1800 lock = threading.Lock() 

1801 self._lock = lock 

1802 

1803 def __repr__(self): 

1804 """ """ 

1805 return f"{type(self).__name__}" 

1806 

1807 def __enter__(self): 

1808 """ """ 

1809 return self 

1810 

1811 def __exit__(self, exc_type, exc_value, traceback): 

1812 """ """ 

1813 self.reset() 

1814 

1815 def __del__(self): 

1816 try: 

1817 self.reset() 

1818 except Exception: 

1819 pass 

1820 

1821 def __len__(self): 

1822 """ """ 

1823 return len(self.command_stack) 

1824 

1825 def __bool__(self): 

1826 "Pipeline instances should always evaluate to True on Python 3+" 

1827 return True 

1828 

1829 def execute_command(self, *args, **kwargs): 

1830 """ 

1831 Wrapper function for pipeline_execute_command 

1832 """ 

1833 return self.pipeline_execute_command(*args, **kwargs) 

1834 

1835 def pipeline_execute_command(self, *args, **options): 

1836 """ 

1837 Appends the executed command to the pipeline's command stack 

1838 """ 

1839 self.command_stack.append( 

1840 PipelineCommand(args, options, len(self.command_stack)) 

1841 ) 

1842 return self 

1843 

1844 def raise_first_error(self, stack): 

1845 """ 

1846 Raise the first exception on the stack 

1847 """ 

1848 for c in stack: 

1849 r = c.result 

1850 if isinstance(r, Exception): 

1851 self.annotate_exception(r, c.position + 1, c.args) 

1852 raise r 

1853 

1854 def annotate_exception(self, exception, number, command): 

1855 """ 

1856 Provides extra context to the exception prior to it being handled 

1857 """ 

1858 cmd = " ".join(map(safe_str, command)) 

1859 msg = ( 

1860 f"Command # {number} ({cmd}) of pipeline " 

1861 f"caused error: {exception.args[0]}" 

1862 ) 

1863 exception.args = (msg,) + exception.args[1:] 

1864 

1865 def execute(self, raise_on_error=True): 

1866 """ 

1867 Execute all the commands in the current pipeline 

1868 """ 

1869 stack = self.command_stack 

1870 try: 

1871 return self.send_cluster_commands(stack, raise_on_error) 

1872 finally: 

1873 self.reset() 

1874 

1875 def reset(self): 

1876 """ 

1877 Reset back to empty pipeline. 

1878 """ 

1879 self.command_stack = [] 

1880 

1881 self.scripts = set() 

1882 

1883 # TODO: Implement 

1884 # make sure to reset the connection state in the event that we were 

1885 # watching something 

1886 # if self.watching and self.connection: 

1887 # try: 

1888 # # call this manually since our unwatch or 

1889 # # immediate_execute_command methods can call reset() 

1890 # self.connection.send_command('UNWATCH') 

1891 # self.connection.read_response() 

1892 # except ConnectionError: 

1893 # # disconnect will also remove any previous WATCHes 

1894 # self.connection.disconnect() 

1895 

1896 # clean up the other instance attributes 

1897 self.watching = False 

1898 self.explicit_transaction = False 

1899 

1900 # TODO: Implement 

1901 # we can safely return the connection to the pool here since we're 

1902 # sure we're no longer WATCHing anything 

1903 # if self.connection: 

1904 # self.connection_pool.release(self.connection) 

1905 # self.connection = None 

1906 

1907 def send_cluster_commands( 

1908 self, stack, raise_on_error=True, allow_redirections=True 

1909 ): 

1910 """ 

1911 Wrapper for CLUSTERDOWN error handling. 

1912 

1913 If the cluster reports it is down it is assumed that: 

1914 - connection_pool was disconnected 

1915 - connection_pool was reseted 

1916 - refereh_table_asap set to True 

1917 

1918 It will try the number of times specified by 

1919 the config option "self.cluster_error_retry_attempts" 

1920 which defaults to 3 unless manually configured. 

1921 

1922 If it reaches the number of times, the command will 

1923 raises ClusterDownException. 

1924 """ 

1925 if not stack: 

1926 return [] 

1927 retry_attempts = self.cluster_error_retry_attempts 

1928 while True: 

1929 try: 

1930 return self._send_cluster_commands( 

1931 stack, 

1932 raise_on_error=raise_on_error, 

1933 allow_redirections=allow_redirections, 

1934 ) 

1935 except (ClusterDownError, ConnectionError) as e: 

1936 if retry_attempts > 0: 

1937 # Try again with the new cluster setup. All other errors 

1938 # should be raised. 

1939 retry_attempts -= 1 

1940 pass 

1941 else: 

1942 raise e 

1943 

1944 def _send_cluster_commands( 

1945 self, stack, raise_on_error=True, allow_redirections=True 

1946 ): 

1947 """ 

1948 Send a bunch of cluster commands to the redis cluster. 

1949 

1950 `allow_redirections` If the pipeline should follow 

1951 `ASK` & `MOVED` responses automatically. If set 

1952 to false it will raise RedisClusterException. 

1953 """ 

1954 # the first time sending the commands we send all of 

1955 # the commands that were queued up. 

1956 # if we have to run through it again, we only retry 

1957 # the commands that failed. 

1958 attempt = sorted(stack, key=lambda x: x.position) 

1959 is_default_node = False 

1960 # build a list of node objects based on node names we need to 

1961 nodes = {} 

1962 

1963 # as we move through each command that still needs to be processed, 

1964 # we figure out the slot number that command maps to, then from 

1965 # the slot determine the node. 

1966 for c in attempt: 

1967 while True: 

1968 # refer to our internal node -> slot table that 

1969 # tells us where a given command should route to. 

1970 # (it might be possible we have a cached node that no longer 

1971 # exists in the cluster, which is why we do this in a loop) 

1972 passed_targets = c.options.pop("target_nodes", None) 

1973 if passed_targets and not self._is_nodes_flag(passed_targets): 

1974 target_nodes = self._parse_target_nodes(passed_targets) 

1975 else: 

1976 target_nodes = self._determine_nodes( 

1977 *c.args, node_flag=passed_targets 

1978 ) 

1979 if not target_nodes: 

1980 raise RedisClusterException( 

1981 f"No targets were found to execute {c.args} command on" 

1982 ) 

1983 if len(target_nodes) > 1: 

1984 raise RedisClusterException( 

1985 f"Too many targets for command {c.args}" 

1986 ) 

1987 

1988 node = target_nodes[0] 

1989 if node == self.get_default_node(): 

1990 is_default_node = True 

1991 

1992 # now that we know the name of the node 

1993 # ( it's just a string in the form of host:port ) 

1994 # we can build a list of commands for each node. 

1995 node_name = node.name 

1996 if node_name not in nodes: 

1997 redis_node = self.get_redis_connection(node) 

1998 try: 

1999 connection = get_connection(redis_node, c.args) 

2000 except ConnectionError: 

2001 # Connection retries are being handled in the node's 

2002 # Retry object. Reinitialize the node -> slot table. 

2003 self.nodes_manager.initialize() 

2004 if is_default_node: 

2005 self.replace_default_node() 

2006 raise 

2007 nodes[node_name] = NodeCommands( 

2008 redis_node.parse_response, 

2009 redis_node.connection_pool, 

2010 connection, 

2011 ) 

2012 nodes[node_name].append(c) 

2013 break 

2014 

2015 # send the commands in sequence. 

2016 # we write to all the open sockets for each node first, 

2017 # before reading anything 

2018 # this allows us to flush all the requests out across the 

2019 # network essentially in parallel 

2020 # so that we can read them all in parallel as they come back. 

2021 # we dont' multiplex on the sockets as they come available, 

2022 # but that shouldn't make too much difference. 

2023 node_commands = nodes.values() 

2024 for n in node_commands: 

2025 n.write() 

2026 

2027 for n in node_commands: 

2028 n.read() 

2029 

2030 # release all of the redis connections we allocated earlier 

2031 # back into the connection pool. 

2032 # we used to do this step as part of a try/finally block, 

2033 # but it is really dangerous to 

2034 # release connections back into the pool if for some 

2035 # reason the socket has data still left in it 

2036 # from a previous operation. The write and 

2037 # read operations already have try/catch around them for 

2038 # all known types of errors including connection 

2039 # and socket level errors. 

2040 # So if we hit an exception, something really bad 

2041 # happened and putting any oF 

2042 # these connections back into the pool is a very bad idea. 

2043 # the socket might have unread buffer still sitting in it, 

2044 # and then the next time we read from it we pass the 

2045 # buffered result back from a previous command and 

2046 # every single request after to that connection will always get 

2047 # a mismatched result. 

2048 for n in nodes.values(): 

2049 n.connection_pool.release(n.connection) 

2050 

2051 # if the response isn't an exception it is a 

2052 # valid response from the node 

2053 # we're all done with that command, YAY! 

2054 # if we have more commands to attempt, we've run into problems. 

2055 # collect all the commands we are allowed to retry. 

2056 # (MOVED, ASK, or connection errors or timeout errors) 

2057 attempt = sorted( 

2058 ( 

2059 c 

2060 for c in attempt 

2061 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

2062 ), 

2063 key=lambda x: x.position, 

2064 ) 

2065 if attempt and allow_redirections: 

2066 # RETRY MAGIC HAPPENS HERE! 

2067 # send these remaing commands one at a time using `execute_command` 

2068 # in the main client. This keeps our retry logic 

2069 # in one place mostly, 

2070 # and allows us to be more confident in correctness of behavior. 

2071 # at this point any speed gains from pipelining have been lost 

2072 # anyway, so we might as well make the best 

2073 # attempt to get the correct behavior. 

2074 # 

2075 # The client command will handle retries for each 

2076 # individual command sequentially as we pass each 

2077 # one into `execute_command`. Any exceptions 

2078 # that bubble out should only appear once all 

2079 # retries have been exhausted. 

2080 # 

2081 # If a lot of commands have failed, we'll be setting the 

2082 # flag to rebuild the slots table from scratch. 

2083 # So MOVED errors should correct themselves fairly quickly. 

2084 self.reinitialize_counter += 1 

2085 if self._should_reinitialized(): 

2086 self.nodes_manager.initialize() 

2087 if is_default_node: 

2088 self.replace_default_node() 

2089 for c in attempt: 

2090 try: 

2091 # send each command individually like we 

2092 # do in the main client. 

2093 c.result = super().execute_command(*c.args, **c.options) 

2094 except RedisError as e: 

2095 c.result = e 

2096 

2097 # turn the response back into a simple flat array that corresponds 

2098 # to the sequence of commands issued in the stack in pipeline.execute() 

2099 response = [] 

2100 for c in sorted(stack, key=lambda x: x.position): 

2101 if c.args[0] in self.cluster_response_callbacks: 

2102 c.result = self.cluster_response_callbacks[c.args[0]]( 

2103 c.result, **c.options 

2104 ) 

2105 response.append(c.result) 

2106 

2107 if raise_on_error: 

2108 self.raise_first_error(stack) 

2109 

2110 return response 

2111 

2112 def _fail_on_redirect(self, allow_redirections): 

2113 """ """ 

2114 if not allow_redirections: 

2115 raise RedisClusterException( 

2116 "ASK & MOVED redirection not allowed in this pipeline" 

2117 ) 

2118 

2119 def exists(self, *keys): 

2120 return self.execute_command("EXISTS", *keys) 

2121 

2122 def eval(self): 

2123 """ """ 

2124 raise RedisClusterException("method eval() is not implemented") 

2125 

2126 def multi(self): 

2127 """ """ 

2128 raise RedisClusterException("method multi() is not implemented") 

2129 

2130 def immediate_execute_command(self, *args, **options): 

2131 """ """ 

2132 raise RedisClusterException( 

2133 "method immediate_execute_command() is not implemented" 

2134 ) 

2135 

2136 def _execute_transaction(self, *args, **kwargs): 

2137 """ """ 

2138 raise RedisClusterException("method _execute_transaction() is not implemented") 

2139 

2140 def load_scripts(self): 

2141 """ """ 

2142 raise RedisClusterException("method load_scripts() is not implemented") 

2143 

2144 def watch(self, *names): 

2145 """ """ 

2146 raise RedisClusterException("method watch() is not implemented") 

2147 

2148 def unwatch(self): 

2149 """ """ 

2150 raise RedisClusterException("method unwatch() is not implemented") 

2151 

2152 def script_load_for_pipeline(self, *args, **kwargs): 

2153 """ """ 

2154 raise RedisClusterException( 

2155 "method script_load_for_pipeline() is not implemented" 

2156 ) 

2157 

2158 def delete(self, *names): 

2159 """ 

2160 "Delete a key specified by ``names``" 

2161 """ 

2162 if len(names) != 1: 

2163 raise RedisClusterException( 

2164 "deleting multiple keys is not implemented in pipeline command" 

2165 ) 

2166 

2167 return self.execute_command("DEL", names[0]) 

2168 

2169 def unlink(self, *names): 

2170 """ 

2171 "Unlink a key specified by ``names``" 

2172 """ 

2173 if len(names) != 1: 

2174 raise RedisClusterException( 

2175 "unlinking multiple keys is not implemented in pipeline command" 

2176 ) 

2177 

2178 return self.execute_command("UNLINK", names[0]) 

2179 

2180 

2181def block_pipeline_command(name: str) -> Callable[..., Any]: 

2182 """ 

2183 Prints error because some pipelined commands should 

2184 be blocked when running in cluster-mode 

2185 """ 

2186 

2187 def inner(*args, **kwargs): 

2188 raise RedisClusterException( 

2189 f"ERROR: Calling pipelined function {name} is blocked " 

2190 f"when running redis in cluster mode..." 

2191 ) 

2192 

2193 return inner 

2194 

2195 

2196# Blocked pipeline commands 

2197PIPELINE_BLOCKED_COMMANDS = ( 

2198 "BGREWRITEAOF", 

2199 "BGSAVE", 

2200 "BITOP", 

2201 "BRPOPLPUSH", 

2202 "CLIENT GETNAME", 

2203 "CLIENT KILL", 

2204 "CLIENT LIST", 

2205 "CLIENT SETNAME", 

2206 "CLIENT", 

2207 "CONFIG GET", 

2208 "CONFIG RESETSTAT", 

2209 "CONFIG REWRITE", 

2210 "CONFIG SET", 

2211 "CONFIG", 

2212 "DBSIZE", 

2213 "ECHO", 

2214 "EVALSHA", 

2215 "FLUSHALL", 

2216 "FLUSHDB", 

2217 "INFO", 

2218 "KEYS", 

2219 "LASTSAVE", 

2220 "MGET", 

2221 "MGET NONATOMIC", 

2222 "MOVE", 

2223 "MSET", 

2224 "MSET NONATOMIC", 

2225 "MSETNX", 

2226 "PFCOUNT", 

2227 "PFMERGE", 

2228 "PING", 

2229 "PUBLISH", 

2230 "RANDOMKEY", 

2231 "READONLY", 

2232 "READWRITE", 

2233 "RENAME", 

2234 "RENAMENX", 

2235 "RPOPLPUSH", 

2236 "SAVE", 

2237 "SCAN", 

2238 "SCRIPT EXISTS", 

2239 "SCRIPT FLUSH", 

2240 "SCRIPT KILL", 

2241 "SCRIPT LOAD", 

2242 "SCRIPT", 

2243 "SDIFF", 

2244 "SDIFFSTORE", 

2245 "SENTINEL GET MASTER ADDR BY NAME", 

2246 "SENTINEL MASTER", 

2247 "SENTINEL MASTERS", 

2248 "SENTINEL MONITOR", 

2249 "SENTINEL REMOVE", 

2250 "SENTINEL SENTINELS", 

2251 "SENTINEL SET", 

2252 "SENTINEL SLAVES", 

2253 "SENTINEL", 

2254 "SHUTDOWN", 

2255 "SINTER", 

2256 "SINTERSTORE", 

2257 "SLAVEOF", 

2258 "SLOWLOG GET", 

2259 "SLOWLOG LEN", 

2260 "SLOWLOG RESET", 

2261 "SLOWLOG", 

2262 "SMOVE", 

2263 "SORT", 

2264 "SUNION", 

2265 "SUNIONSTORE", 

2266 "TIME", 

2267) 

2268for command in PIPELINE_BLOCKED_COMMANDS: 

2269 command = command.replace(" ", "_").lower() 

2270 

2271 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

2272 

2273 

2274class PipelineCommand: 

2275 """ """ 

2276 

2277 def __init__(self, args, options=None, position=None): 

2278 self.args = args 

2279 if options is None: 

2280 options = {} 

2281 self.options = options 

2282 self.position = position 

2283 self.result = None 

2284 self.node = None 

2285 self.asking = False 

2286 

2287 

2288class NodeCommands: 

2289 """ """ 

2290 

2291 def __init__(self, parse_response, connection_pool, connection): 

2292 """ """ 

2293 self.parse_response = parse_response 

2294 self.connection_pool = connection_pool 

2295 self.connection = connection 

2296 self.commands = [] 

2297 

2298 def append(self, c): 

2299 """ """ 

2300 self.commands.append(c) 

2301 

2302 def write(self): 

2303 """ 

2304 Code borrowed from Redis so it can be fixed 

2305 """ 

2306 connection = self.connection 

2307 commands = self.commands 

2308 

2309 # We are going to clobber the commands with the write, so go ahead 

2310 # and ensure that nothing is sitting there from a previous run. 

2311 for c in commands: 

2312 c.result = None 

2313 

2314 # build up all commands into a single request to increase network perf 

2315 # send all the commands and catch connection and timeout errors. 

2316 try: 

2317 connection.send_packed_command( 

2318 connection.pack_commands([c.args for c in commands]) 

2319 ) 

2320 except (ConnectionError, TimeoutError) as e: 

2321 for c in commands: 

2322 c.result = e 

2323 

2324 def read(self): 

2325 """ """ 

2326 connection = self.connection 

2327 for c in self.commands: 

2328 

2329 # if there is a result on this command, 

2330 # it means we ran into an exception 

2331 # like a connection error. Trying to parse 

2332 # a response on a connection that 

2333 # is no longer open will result in a 

2334 # connection error raised by redis-py. 

2335 # but redis-py doesn't check in parse_response 

2336 # that the sock object is 

2337 # still set and if you try to 

2338 # read from a closed connection, it will 

2339 # result in an AttributeError because 

2340 # it will do a readline() call on None. 

2341 # This can have all kinds of nasty side-effects. 

2342 # Treating this case as a connection error 

2343 # is fine because it will dump 

2344 # the connection object back into the 

2345 # pool and on the next write, it will 

2346 # explicitly open the connection and all will be well. 

2347 if c.result is None: 

2348 try: 

2349 c.result = self.parse_response(connection, c.args[0], **c.options) 

2350 except (ConnectionError, TimeoutError) as e: 

2351 for c in self.commands: 

2352 c.result = e 

2353 return 

2354 except RedisError: 

2355 c.result = sys.exc_info()[1]