Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/cluster.py: 19%

833 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 07:09 +0000

1import random 

2import socket 

3import sys 

4import threading 

5import time 

6from collections import OrderedDict 

7from typing import Any, Callable, Dict, List, Optional, Tuple, Union 

8 

9from redis.backoff import default_backoff 

10from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan 

11from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands 

12from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url 

13from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

14from redis.exceptions import ( 

15 AskError, 

16 AuthenticationError, 

17 ClusterCrossSlotError, 

18 ClusterDownError, 

19 ClusterError, 

20 ConnectionError, 

21 DataError, 

22 MasterDownError, 

23 MovedError, 

24 RedisClusterException, 

25 RedisError, 

26 ResponseError, 

27 SlotNotCoveredError, 

28 TimeoutError, 

29 TryAgainError, 

30) 

31from redis.lock import Lock 

32from redis.retry import Retry 

33from redis.utils import ( 

34 dict_merge, 

35 list_keys_to_dict, 

36 merge_result, 

37 safe_str, 

38 str_if_bytes, 

39) 

40 

41 

42def get_node_name(host: str, port: Union[str, int]) -> str: 

43 return f"{host}:{port}" 

44 

45 

46def get_connection(redis_node, *args, **options): 

47 return redis_node.connection or redis_node.connection_pool.get_connection( 

48 args[0], **options 

49 ) 

50 

51 

52def parse_scan_result(command, res, **options): 

53 cursors = {} 

54 ret = [] 

55 for node_name, response in res.items(): 

56 cursor, r = parse_scan(response, **options) 

57 cursors[node_name] = cursor 

58 ret += r 

59 

60 return cursors, ret 

61 

62 

63def parse_pubsub_numsub(command, res, **options): 

64 numsub_d = OrderedDict() 

65 for numsub_tups in res.values(): 

66 for channel, numsubbed in numsub_tups: 

67 try: 

68 numsub_d[channel] += numsubbed 

69 except KeyError: 

70 numsub_d[channel] = numsubbed 

71 

72 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

73 return ret_numsub 

74 

75 

76def parse_cluster_slots( 

77 resp: Any, **options: Any 

78) -> Dict[Tuple[int, int], Dict[str, Any]]: 

79 current_host = options.get("current_host", "") 

80 

81 def fix_server(*args: Any) -> Tuple[str, Any]: 

82 return str_if_bytes(args[0]) or current_host, args[1] 

83 

84 slots = {} 

85 for slot in resp: 

86 start, end, primary = slot[:3] 

87 replicas = slot[3:] 

88 slots[start, end] = { 

89 "primary": fix_server(*primary), 

90 "replicas": [fix_server(*replica) for replica in replicas], 

91 } 

92 

93 return slots 

94 

95 

96def parse_cluster_shards(resp, **options): 

97 """ 

98 Parse CLUSTER SHARDS response. 

99 """ 

100 shards = [] 

101 for x in resp: 

102 shard = {"slots": [], "nodes": []} 

103 for i in range(0, len(x[1]), 2): 

104 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

105 nodes = x[3] 

106 for node in nodes: 

107 dict_node = {} 

108 for i in range(0, len(node), 2): 

109 dict_node[node[i]] = node[i + 1] 

110 shard["nodes"].append(dict_node) 

111 shards.append(shard) 

112 

113 return shards 

114 

115 

116PRIMARY = "primary" 

117REPLICA = "replica" 

118SLOT_ID = "slot-id" 

119 

120REDIS_ALLOWED_KEYS = ( 

121 "charset", 

122 "connection_class", 

123 "connection_pool", 

124 "connection_pool_class", 

125 "client_name", 

126 "credential_provider", 

127 "db", 

128 "decode_responses", 

129 "encoding", 

130 "encoding_errors", 

131 "errors", 

132 "host", 

133 "max_connections", 

134 "nodes_flag", 

135 "redis_connect_func", 

136 "password", 

137 "port", 

138 "queue_class", 

139 "retry", 

140 "retry_on_timeout", 

141 "socket_connect_timeout", 

142 "socket_keepalive", 

143 "socket_keepalive_options", 

144 "socket_timeout", 

145 "ssl", 

146 "ssl_ca_certs", 

147 "ssl_ca_data", 

148 "ssl_certfile", 

149 "ssl_cert_reqs", 

150 "ssl_keyfile", 

151 "ssl_password", 

152 "unix_socket_path", 

153 "username", 

154) 

155KWARGS_DISABLED_KEYS = ("host", "port") 

156 

157 

158def cleanup_kwargs(**kwargs): 

159 """ 

160 Remove unsupported or disabled keys from kwargs 

161 """ 

162 connection_kwargs = { 

163 k: v 

164 for k, v in kwargs.items() 

165 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

166 } 

167 

168 return connection_kwargs 

169 

170 

171class ClusterParser(DefaultParser): 

172 EXCEPTION_CLASSES = dict_merge( 

173 DefaultParser.EXCEPTION_CLASSES, 

174 { 

175 "ASK": AskError, 

176 "TRYAGAIN": TryAgainError, 

177 "MOVED": MovedError, 

178 "CLUSTERDOWN": ClusterDownError, 

179 "CROSSSLOT": ClusterCrossSlotError, 

180 "MASTERDOWN": MasterDownError, 

181 }, 

182 ) 

183 

184 

185class AbstractRedisCluster: 

186 RedisClusterRequestTTL = 16 

187 

188 PRIMARIES = "primaries" 

189 REPLICAS = "replicas" 

190 ALL_NODES = "all" 

191 RANDOM = "random" 

192 DEFAULT_NODE = "default-node" 

193 

194 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

195 

196 COMMAND_FLAGS = dict_merge( 

197 list_keys_to_dict( 

198 [ 

199 "ACL CAT", 

200 "ACL DELUSER", 

201 "ACL DRYRUN", 

202 "ACL GENPASS", 

203 "ACL GETUSER", 

204 "ACL HELP", 

205 "ACL LIST", 

206 "ACL LOG", 

207 "ACL LOAD", 

208 "ACL SAVE", 

209 "ACL SETUSER", 

210 "ACL USERS", 

211 "ACL WHOAMI", 

212 "AUTH", 

213 "CLIENT LIST", 

214 "CLIENT SETNAME", 

215 "CLIENT GETNAME", 

216 "CONFIG SET", 

217 "CONFIG REWRITE", 

218 "CONFIG RESETSTAT", 

219 "TIME", 

220 "PUBSUB CHANNELS", 

221 "PUBSUB NUMPAT", 

222 "PUBSUB NUMSUB", 

223 "PING", 

224 "INFO", 

225 "SHUTDOWN", 

226 "KEYS", 

227 "DBSIZE", 

228 "BGSAVE", 

229 "SLOWLOG GET", 

230 "SLOWLOG LEN", 

231 "SLOWLOG RESET", 

232 "WAIT", 

233 "SAVE", 

234 "MEMORY PURGE", 

235 "MEMORY MALLOC-STATS", 

236 "MEMORY STATS", 

237 "LASTSAVE", 

238 "CLIENT TRACKINGINFO", 

239 "CLIENT PAUSE", 

240 "CLIENT UNPAUSE", 

241 "CLIENT UNBLOCK", 

242 "CLIENT ID", 

243 "CLIENT REPLY", 

244 "CLIENT GETREDIR", 

245 "CLIENT INFO", 

246 "CLIENT KILL", 

247 "READONLY", 

248 "READWRITE", 

249 "CLUSTER INFO", 

250 "CLUSTER MEET", 

251 "CLUSTER NODES", 

252 "CLUSTER REPLICAS", 

253 "CLUSTER RESET", 

254 "CLUSTER SET-CONFIG-EPOCH", 

255 "CLUSTER SLOTS", 

256 "CLUSTER SHARDS", 

257 "CLUSTER COUNT-FAILURE-REPORTS", 

258 "CLUSTER KEYSLOT", 

259 "COMMAND", 

260 "COMMAND COUNT", 

261 "COMMAND LIST", 

262 "COMMAND GETKEYS", 

263 "CONFIG GET", 

264 "DEBUG", 

265 "RANDOMKEY", 

266 "READONLY", 

267 "READWRITE", 

268 "TIME", 

269 "GRAPH.CONFIG", 

270 "LATENCY HISTORY", 

271 "LATENCY LATEST", 

272 "LATENCY RESET", 

273 ], 

274 DEFAULT_NODE, 

275 ), 

276 list_keys_to_dict( 

277 [ 

278 "FLUSHALL", 

279 "FLUSHDB", 

280 "FUNCTION DELETE", 

281 "FUNCTION FLUSH", 

282 "FUNCTION LIST", 

283 "FUNCTION LOAD", 

284 "FUNCTION RESTORE", 

285 "SCAN", 

286 "SCRIPT EXISTS", 

287 "SCRIPT FLUSH", 

288 "SCRIPT LOAD", 

289 ], 

290 PRIMARIES, 

291 ), 

292 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

293 list_keys_to_dict( 

294 [ 

295 "CLUSTER COUNTKEYSINSLOT", 

296 "CLUSTER DELSLOTS", 

297 "CLUSTER DELSLOTSRANGE", 

298 "CLUSTER GETKEYSINSLOT", 

299 "CLUSTER SETSLOT", 

300 ], 

301 SLOT_ID, 

302 ), 

303 ) 

304 

305 SEARCH_COMMANDS = ( 

306 [ 

307 "FT.CREATE", 

308 "FT.SEARCH", 

309 "FT.AGGREGATE", 

310 "FT.EXPLAIN", 

311 "FT.EXPLAINCLI", 

312 "FT,PROFILE", 

313 "FT.ALTER", 

314 "FT.DROPINDEX", 

315 "FT.ALIASADD", 

316 "FT.ALIASUPDATE", 

317 "FT.ALIASDEL", 

318 "FT.TAGVALS", 

319 "FT.SUGADD", 

320 "FT.SUGGET", 

321 "FT.SUGDEL", 

322 "FT.SUGLEN", 

323 "FT.SYNUPDATE", 

324 "FT.SYNDUMP", 

325 "FT.SPELLCHECK", 

326 "FT.DICTADD", 

327 "FT.DICTDEL", 

328 "FT.DICTDUMP", 

329 "FT.INFO", 

330 "FT._LIST", 

331 "FT.CONFIG", 

332 "FT.ADD", 

333 "FT.DEL", 

334 "FT.DROP", 

335 "FT.GET", 

336 "FT.MGET", 

337 "FT.SYNADD", 

338 ], 

339 ) 

340 

341 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

342 "CLUSTER SLOTS": parse_cluster_slots, 

343 "CLUSTER SHARDS": parse_cluster_shards, 

344 } 

345 

346 RESULT_CALLBACKS = dict_merge( 

347 list_keys_to_dict(["PUBSUB NUMSUB"], parse_pubsub_numsub), 

348 list_keys_to_dict( 

349 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

350 ), 

351 list_keys_to_dict(["KEYS", "PUBSUB CHANNELS"], merge_result), 

352 list_keys_to_dict( 

353 [ 

354 "PING", 

355 "CONFIG SET", 

356 "CONFIG REWRITE", 

357 "CONFIG RESETSTAT", 

358 "CLIENT SETNAME", 

359 "BGSAVE", 

360 "SLOWLOG RESET", 

361 "SAVE", 

362 "MEMORY PURGE", 

363 "CLIENT PAUSE", 

364 "CLIENT UNPAUSE", 

365 ], 

366 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

367 ), 

368 list_keys_to_dict( 

369 ["DBSIZE", "WAIT"], 

370 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

371 ), 

372 list_keys_to_dict( 

373 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

374 ), 

375 list_keys_to_dict(["SCAN"], parse_scan_result), 

376 list_keys_to_dict( 

377 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

378 ), 

379 list_keys_to_dict( 

380 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

381 ), 

382 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

383 ) 

384 

385 ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError) 

386 

387 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

388 """Replace the default cluster node. 

389 A random cluster node will be chosen if target_node isn't passed, and primaries 

390 will be prioritized. The default node will not be changed if there are no other 

391 nodes in the cluster. 

392 

393 Args: 

394 target_node (ClusterNode, optional): Target node to replace the default 

395 node. Defaults to None. 

396 """ 

397 if target_node: 

398 self.nodes_manager.default_node = target_node 

399 else: 

400 curr_node = self.get_default_node() 

401 primaries = [node for node in self.get_primaries() if node != curr_node] 

402 if primaries: 

403 # Choose a primary if the cluster contains different primaries 

404 self.nodes_manager.default_node = random.choice(primaries) 

405 else: 

406 # Otherwise, hoose a primary if the cluster contains different primaries 

407 replicas = [node for node in self.get_replicas() if node != curr_node] 

408 if replicas: 

409 self.nodes_manager.default_node = random.choice(replicas) 

410 

411 

412class RedisCluster(AbstractRedisCluster, RedisClusterCommands): 

413 @classmethod 

414 def from_url(cls, url, **kwargs): 

415 """ 

416 Return a Redis client object configured from the given URL 

417 

418 For example:: 

419 

420 redis://[[username]:[password]]@localhost:6379/0 

421 rediss://[[username]:[password]]@localhost:6379/0 

422 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

423 

424 Three URL schemes are supported: 

425 

426 - `redis://` creates a TCP socket connection. See more at: 

427 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

428 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

429 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

430 - ``unix://``: creates a Unix Domain Socket connection. 

431 

432 The username, password, hostname, path and all querystring values 

433 are passed through urllib.parse.unquote in order to replace any 

434 percent-encoded values with their corresponding characters. 

435 

436 There are several ways to specify a database number. The first value 

437 found will be used: 

438 

439 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

440 2. If using the redis:// or rediss:// schemes, the path argument 

441 of the url, e.g. redis://localhost/0 

442 3. A ``db`` keyword argument to this function. 

443 

444 If none of these options are specified, the default db=0 is used. 

445 

446 All querystring options are cast to their appropriate Python types. 

447 Boolean arguments can be specified with string values "True"/"False" 

448 or "Yes"/"No". Values that cannot be properly cast cause a 

449 ``ValueError`` to be raised. Once parsed, the querystring arguments 

450 and keyword arguments are passed to the ``ConnectionPool``'s 

451 class initializer. In the case of conflicting arguments, querystring 

452 arguments always win. 

453 

454 """ 

455 return cls(url=url, **kwargs) 

456 

457 def __init__( 

458 self, 

459 host: Optional[str] = None, 

460 port: int = 6379, 

461 startup_nodes: Optional[List["ClusterNode"]] = None, 

462 cluster_error_retry_attempts: int = 3, 

463 retry: Optional["Retry"] = None, 

464 require_full_coverage: bool = False, 

465 reinitialize_steps: int = 5, 

466 read_from_replicas: bool = False, 

467 dynamic_startup_nodes: bool = True, 

468 url: Optional[str] = None, 

469 **kwargs, 

470 ): 

471 """ 

472 Initialize a new RedisCluster client. 

473 

474 :param startup_nodes: 

475 List of nodes from which initial bootstrapping can be done 

476 :param host: 

477 Can be used to point to a startup node 

478 :param port: 

479 Can be used to point to a startup node 

480 :param require_full_coverage: 

481 When set to False (default value): the client will not require a 

482 full coverage of the slots. However, if not all slots are covered, 

483 and at least one node has 'cluster-require-full-coverage' set to 

484 'yes,' the server will throw a ClusterDownError for some key-based 

485 commands. See - 

486 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

487 When set to True: all slots must be covered to construct the 

488 cluster client. If not all slots are covered, RedisClusterException 

489 will be thrown. 

490 :param read_from_replicas: 

491 Enable read from replicas in READONLY mode. You can read possibly 

492 stale data. 

493 When set to true, read commands will be assigned between the 

494 primary and its replications in a Round-Robin manner. 

495 :param dynamic_startup_nodes: 

496 Set the RedisCluster's startup nodes to all of the discovered nodes. 

497 If true (default value), the cluster's discovered nodes will be used to 

498 determine the cluster nodes-slots mapping in the next topology refresh. 

499 It will remove the initial passed startup nodes if their endpoints aren't 

500 listed in the CLUSTER SLOTS output. 

501 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

502 specific IP addresses, it is best to set it to false. 

503 :param cluster_error_retry_attempts: 

504 Number of times to retry before raising an error when 

505 :class:`~.TimeoutError` or :class:`~.ConnectionError` or 

506 :class:`~.ClusterDownError` are encountered 

507 :param reinitialize_steps: 

508 Specifies the number of MOVED errors that need to occur before 

509 reinitializing the whole cluster topology. If a MOVED error occurs 

510 and the cluster does not need to be reinitialized on this current 

511 error handling, only the MOVED slot will be patched with the 

512 redirected node. 

513 To reinitialize the cluster on every MOVED error, set 

514 reinitialize_steps to 1. 

515 To avoid reinitializing the cluster on moved errors, set 

516 reinitialize_steps to 0. 

517 

518 :**kwargs: 

519 Extra arguments that will be sent into Redis instance when created 

520 (See Official redis-py doc for supported kwargs 

521 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

522 Some kwargs are not supported and will raise a 

523 RedisClusterException: 

524 - db (Redis do not support database SELECT in cluster mode) 

525 """ 

526 if startup_nodes is None: 

527 startup_nodes = [] 

528 

529 if "db" in kwargs: 

530 # Argument 'db' is not possible to use in cluster mode 

531 raise RedisClusterException( 

532 "Argument 'db' is not possible to use in cluster mode" 

533 ) 

534 

535 # Get the startup node/s 

536 from_url = False 

537 if url is not None: 

538 from_url = True 

539 url_options = parse_url(url) 

540 if "path" in url_options: 

541 raise RedisClusterException( 

542 "RedisCluster does not currently support Unix Domain " 

543 "Socket connections" 

544 ) 

545 if "db" in url_options and url_options["db"] != 0: 

546 # Argument 'db' is not possible to use in cluster mode 

547 raise RedisClusterException( 

548 "A ``db`` querystring option can only be 0 in cluster mode" 

549 ) 

550 kwargs.update(url_options) 

551 host = kwargs.get("host") 

552 port = kwargs.get("port", port) 

553 startup_nodes.append(ClusterNode(host, port)) 

554 elif host is not None and port is not None: 

555 startup_nodes.append(ClusterNode(host, port)) 

556 elif len(startup_nodes) == 0: 

557 # No startup node was provided 

558 raise RedisClusterException( 

559 "RedisCluster requires at least one node to discover the " 

560 "cluster. Please provide one of the followings:\n" 

561 "1. host and port, for example:\n" 

562 " RedisCluster(host='localhost', port=6379)\n" 

563 "2. list of startup nodes, for example:\n" 

564 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

565 " ClusterNode('localhost', 6378)])" 

566 ) 

567 # Update the connection arguments 

568 # Whenever a new connection is established, RedisCluster's on_connect 

569 # method should be run 

570 # If the user passed on_connect function we'll save it and run it 

571 # inside the RedisCluster.on_connect() function 

572 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

573 kwargs.update({"redis_connect_func": self.on_connect}) 

574 kwargs = cleanup_kwargs(**kwargs) 

575 if retry: 

576 self.retry = retry 

577 kwargs.update({"retry": self.retry}) 

578 else: 

579 kwargs.update({"retry": Retry(default_backoff(), 0)}) 

580 

581 self.encoder = Encoder( 

582 kwargs.get("encoding", "utf-8"), 

583 kwargs.get("encoding_errors", "strict"), 

584 kwargs.get("decode_responses", False), 

585 ) 

586 self.cluster_error_retry_attempts = cluster_error_retry_attempts 

587 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

588 self.node_flags = self.__class__.NODE_FLAGS.copy() 

589 self.read_from_replicas = read_from_replicas 

590 self.reinitialize_counter = 0 

591 self.reinitialize_steps = reinitialize_steps 

592 self.nodes_manager = NodesManager( 

593 startup_nodes=startup_nodes, 

594 from_url=from_url, 

595 require_full_coverage=require_full_coverage, 

596 dynamic_startup_nodes=dynamic_startup_nodes, 

597 **kwargs, 

598 ) 

599 

600 self.cluster_response_callbacks = CaseInsensitiveDict( 

601 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

602 ) 

603 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

604 self.commands_parser = CommandsParser(self) 

605 self._lock = threading.Lock() 

606 

607 def __enter__(self): 

608 return self 

609 

610 def __exit__(self, exc_type, exc_value, traceback): 

611 self.close() 

612 

613 def __del__(self): 

614 self.close() 

615 

616 def disconnect_connection_pools(self): 

617 for node in self.get_nodes(): 

618 if node.redis_connection: 

619 try: 

620 node.redis_connection.connection_pool.disconnect() 

621 except OSError: 

622 # Client was already disconnected. do nothing 

623 pass 

624 

625 def on_connect(self, connection): 

626 """ 

627 Initialize the connection, authenticate and select a database and send 

628 READONLY if it is set during object initialization. 

629 """ 

630 connection.set_parser(ClusterParser) 

631 connection.on_connect() 

632 

633 if self.read_from_replicas: 

634 # Sending READONLY command to server to configure connection as 

635 # readonly. Since each cluster node may change its server type due 

636 # to a failover, we should establish a READONLY connection 

637 # regardless of the server type. If this is a primary connection, 

638 # READONLY would not affect executing write commands. 

639 connection.send_command("READONLY") 

640 if str_if_bytes(connection.read_response()) != "OK": 

641 raise ConnectionError("READONLY command failed") 

642 

643 if self.user_on_connect_func is not None: 

644 self.user_on_connect_func(connection) 

645 

646 def get_redis_connection(self, node): 

647 if not node.redis_connection: 

648 with self._lock: 

649 if not node.redis_connection: 

650 self.nodes_manager.create_redis_connections([node]) 

651 return node.redis_connection 

652 

653 def get_node(self, host=None, port=None, node_name=None): 

654 return self.nodes_manager.get_node(host, port, node_name) 

655 

656 def get_primaries(self): 

657 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

658 

659 def get_replicas(self): 

660 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

661 

662 def get_random_node(self): 

663 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

664 

665 def get_nodes(self): 

666 return list(self.nodes_manager.nodes_cache.values()) 

667 

668 def get_node_from_key(self, key, replica=False): 

669 """ 

670 Get the node that holds the key's slot. 

671 If replica set to True but the slot doesn't have any replicas, None is 

672 returned. 

673 """ 

674 slot = self.keyslot(key) 

675 slot_cache = self.nodes_manager.slots_cache.get(slot) 

676 if slot_cache is None or len(slot_cache) == 0: 

677 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

678 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

679 return None 

680 elif replica: 

681 node_idx = 1 

682 else: 

683 # primary 

684 node_idx = 0 

685 

686 return slot_cache[node_idx] 

687 

688 def get_default_node(self): 

689 """ 

690 Get the cluster's default node 

691 """ 

692 return self.nodes_manager.default_node 

693 

694 def set_default_node(self, node): 

695 """ 

696 Set the default node of the cluster. 

697 :param node: 'ClusterNode' 

698 :return True if the default node was set, else False 

699 """ 

700 if node is None or self.get_node(node_name=node.name) is None: 

701 return False 

702 self.nodes_manager.default_node = node 

703 return True 

704 

705 def get_retry(self) -> Optional["Retry"]: 

706 return self.retry 

707 

708 def set_retry(self, retry: "Retry") -> None: 

709 self.retry = retry 

710 for node in self.get_nodes(): 

711 node.redis_connection.set_retry(retry) 

712 

713 def monitor(self, target_node=None): 

714 """ 

715 Returns a Monitor object for the specified target node. 

716 The default cluster node will be selected if no target node was 

717 specified. 

718 Monitor is useful for handling the MONITOR command to the redis server. 

719 next_command() method returns one command from monitor 

720 listen() method yields commands from monitor. 

721 """ 

722 if target_node is None: 

723 target_node = self.get_default_node() 

724 if target_node.redis_connection is None: 

725 raise RedisClusterException( 

726 f"Cluster Node {target_node.name} has no redis_connection" 

727 ) 

728 return target_node.redis_connection.monitor() 

729 

730 def pubsub(self, node=None, host=None, port=None, **kwargs): 

731 """ 

732 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

733 connected to the specified node 

734 """ 

735 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

736 

737 def pipeline(self, transaction=None, shard_hint=None): 

738 """ 

739 Cluster impl: 

740 Pipelines do not work in cluster mode the same way they 

741 do in normal mode. Create a clone of this object so 

742 that simulating pipelines will work correctly. Each 

743 command will be called directly when used and 

744 when calling execute() will only return the result stack. 

745 """ 

746 if shard_hint: 

747 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

748 

749 if transaction: 

750 raise RedisClusterException("transaction is deprecated in cluster mode") 

751 

752 return ClusterPipeline( 

753 nodes_manager=self.nodes_manager, 

754 commands_parser=self.commands_parser, 

755 startup_nodes=self.nodes_manager.startup_nodes, 

756 result_callbacks=self.result_callbacks, 

757 cluster_response_callbacks=self.cluster_response_callbacks, 

758 cluster_error_retry_attempts=self.cluster_error_retry_attempts, 

759 read_from_replicas=self.read_from_replicas, 

760 reinitialize_steps=self.reinitialize_steps, 

761 lock=self._lock, 

762 ) 

763 

764 def lock( 

765 self, 

766 name, 

767 timeout=None, 

768 sleep=0.1, 

769 blocking=True, 

770 blocking_timeout=None, 

771 lock_class=None, 

772 thread_local=True, 

773 ): 

774 """ 

775 Return a new Lock object using key ``name`` that mimics 

776 the behavior of threading.Lock. 

777 

778 If specified, ``timeout`` indicates a maximum life for the lock. 

779 By default, it will remain locked until release() is called. 

780 

781 ``sleep`` indicates the amount of time to sleep per loop iteration 

782 when the lock is in blocking mode and another client is currently 

783 holding the lock. 

784 

785 ``blocking`` indicates whether calling ``acquire`` should block until 

786 the lock has been acquired or to fail immediately, causing ``acquire`` 

787 to return False and the lock not being acquired. Defaults to True. 

788 Note this value can be overridden by passing a ``blocking`` 

789 argument to ``acquire``. 

790 

791 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

792 spend trying to acquire the lock. A value of ``None`` indicates 

793 continue trying forever. ``blocking_timeout`` can be specified as a 

794 float or integer, both representing the number of seconds to wait. 

795 

796 ``lock_class`` forces the specified lock implementation. Note that as 

797 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

798 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

799 you have created your own custom lock class. 

800 

801 ``thread_local`` indicates whether the lock token is placed in 

802 thread-local storage. By default, the token is placed in thread local 

803 storage so that a thread only sees its token, not a token set by 

804 another thread. Consider the following timeline: 

805 

806 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

807 thread-1 sets the token to "abc" 

808 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

809 Lock instance. 

810 time: 5, thread-1 has not yet completed. redis expires the lock 

811 key. 

812 time: 5, thread-2 acquired `my-lock` now that it's available. 

813 thread-2 sets the token to "xyz" 

814 time: 6, thread-1 finishes its work and calls release(). if the 

815 token is *not* stored in thread local storage, then 

816 thread-1 would see the token value as "xyz" and would be 

817 able to successfully release the thread-2's lock. 

818 

819 In some use cases it's necessary to disable thread local storage. For 

820 example, if you have code where one thread acquires a lock and passes 

821 that lock instance to a worker thread to release later. If thread 

822 local storage isn't disabled in this case, the worker thread won't see 

823 the token set by the thread that acquired the lock. Our assumption 

824 is that these cases aren't common and as such default to using 

825 thread local storage.""" 

826 if lock_class is None: 

827 lock_class = Lock 

828 return lock_class( 

829 self, 

830 name, 

831 timeout=timeout, 

832 sleep=sleep, 

833 blocking=blocking, 

834 blocking_timeout=blocking_timeout, 

835 thread_local=thread_local, 

836 ) 

837 

838 def set_response_callback(self, command, callback): 

839 """Set a custom Response Callback""" 

840 self.cluster_response_callbacks[command] = callback 

841 

842 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: 

843 # Determine which nodes should be executed the command on. 

844 # Returns a list of target nodes. 

845 command = args[0].upper() 

846 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

847 command = f"{args[0]} {args[1]}".upper() 

848 

849 nodes_flag = kwargs.pop("nodes_flag", None) 

850 if nodes_flag is not None: 

851 # nodes flag passed by the user 

852 command_flag = nodes_flag 

853 else: 

854 # get the nodes group for this command if it was predefined 

855 command_flag = self.command_flags.get(command) 

856 if command_flag == self.__class__.RANDOM: 

857 # return a random node 

858 return [self.get_random_node()] 

859 elif command_flag == self.__class__.PRIMARIES: 

860 # return all primaries 

861 return self.get_primaries() 

862 elif command_flag == self.__class__.REPLICAS: 

863 # return all replicas 

864 return self.get_replicas() 

865 elif command_flag == self.__class__.ALL_NODES: 

866 # return all nodes 

867 return self.get_nodes() 

868 elif command_flag == self.__class__.DEFAULT_NODE: 

869 # return the cluster's default node 

870 return [self.nodes_manager.default_node] 

871 elif command in self.__class__.SEARCH_COMMANDS[0]: 

872 return [self.nodes_manager.default_node] 

873 else: 

874 # get the node that holds the key's slot 

875 slot = self.determine_slot(*args) 

876 node = self.nodes_manager.get_node_from_slot( 

877 slot, self.read_from_replicas and command in READ_COMMANDS 

878 ) 

879 return [node] 

880 

881 def _should_reinitialized(self): 

882 # To reinitialize the cluster on every MOVED error, 

883 # set reinitialize_steps to 1. 

884 # To avoid reinitializing the cluster on moved errors, set 

885 # reinitialize_steps to 0. 

886 if self.reinitialize_steps == 0: 

887 return False 

888 else: 

889 return self.reinitialize_counter % self.reinitialize_steps == 0 

890 

891 def keyslot(self, key): 

892 """ 

893 Calculate keyslot for a given key. 

894 See Keys distribution model in https://redis.io/topics/cluster-spec 

895 """ 

896 k = self.encoder.encode(key) 

897 return key_slot(k) 

898 

899 def _get_command_keys(self, *args): 

900 """ 

901 Get the keys in the command. If the command has no keys in in, None is 

902 returned. 

903 

904 NOTE: Due to a bug in redis<7.0, this function does not work properly 

905 for EVAL or EVALSHA when the `numkeys` arg is 0. 

906 - issue: https://github.com/redis/redis/issues/9493 

907 - fix: https://github.com/redis/redis/pull/9733 

908 

909 So, don't use this function with EVAL or EVALSHA. 

910 """ 

911 redis_conn = self.get_default_node().redis_connection 

912 return self.commands_parser.get_keys(redis_conn, *args) 

913 

914 def determine_slot(self, *args): 

915 """ 

916 Figure out what slot to use based on args. 

917 

918 Raises a RedisClusterException if there's a missing key and we can't 

919 determine what slots to map the command to; or, if the keys don't 

920 all map to the same key slot. 

921 """ 

922 command = args[0] 

923 if self.command_flags.get(command) == SLOT_ID: 

924 # The command contains the slot ID 

925 return args[1] 

926 

927 # Get the keys in the command 

928 

929 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

930 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

931 # where `self._get_command_keys()` fails anyway. So, we special case 

932 # EVAL/EVALSHA. 

933 if command in ("EVAL", "EVALSHA"): 

934 # command syntax: EVAL "script body" num_keys ... 

935 if len(args) <= 2: 

936 raise RedisClusterException(f"Invalid args in command: {args}") 

937 num_actual_keys = args[2] 

938 eval_keys = args[3 : 3 + num_actual_keys] 

939 # if there are 0 keys, that means the script can be run on any node 

940 # so we can just return a random slot 

941 if len(eval_keys) == 0: 

942 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

943 keys = eval_keys 

944 else: 

945 keys = self._get_command_keys(*args) 

946 if keys is None or len(keys) == 0: 

947 # FCALL can call a function with 0 keys, that means the function 

948 # can be run on any node so we can just return a random slot 

949 if command in ("FCALL", "FCALL_RO"): 

950 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

951 raise RedisClusterException( 

952 "No way to dispatch this command to Redis Cluster. " 

953 "Missing key.\nYou can execute the command by specifying " 

954 f"target nodes.\nCommand: {args}" 

955 ) 

956 

957 # single key command 

958 if len(keys) == 1: 

959 return self.keyslot(keys[0]) 

960 

961 # multi-key command; we need to make sure all keys are mapped to 

962 # the same slot 

963 slots = {self.keyslot(key) for key in keys} 

964 if len(slots) != 1: 

965 raise RedisClusterException( 

966 f"{command} - all keys must map to the same key slot" 

967 ) 

968 

969 return slots.pop() 

970 

971 def get_encoder(self): 

972 """ 

973 Get the connections' encoder 

974 """ 

975 return self.encoder 

976 

977 def get_connection_kwargs(self): 

978 """ 

979 Get the connections' key-word arguments 

980 """ 

981 return self.nodes_manager.connection_kwargs 

982 

983 def _is_nodes_flag(self, target_nodes): 

984 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

985 

986 def _parse_target_nodes(self, target_nodes): 

987 if isinstance(target_nodes, list): 

988 nodes = target_nodes 

989 elif isinstance(target_nodes, ClusterNode): 

990 # Supports passing a single ClusterNode as a variable 

991 nodes = [target_nodes] 

992 elif isinstance(target_nodes, dict): 

993 # Supports dictionaries of the format {node_name: node}. 

994 # It enables to execute commands with multi nodes as follows: 

995 # rc.cluster_save_config(rc.get_primaries()) 

996 nodes = target_nodes.values() 

997 else: 

998 raise TypeError( 

999 "target_nodes type can be one of the following: " 

1000 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1001 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1002 f"The passed type is {type(target_nodes)}" 

1003 ) 

1004 return nodes 

1005 

1006 def execute_command(self, *args, **kwargs): 

1007 """ 

1008 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1009 

1010 It will try the number of times specified by the config option 

1011 "self.cluster_error_retry_attempts" which defaults to 3 unless manually 

1012 configured. 

1013 

1014 If it reaches the number of times, the command will raise the exception 

1015 

1016 Key argument :target_nodes: can be passed with the following types: 

1017 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1018 ClusterNode 

1019 list<ClusterNode> 

1020 dict<Any, ClusterNode> 

1021 """ 

1022 target_nodes_specified = False 

1023 is_default_node = False 

1024 target_nodes = None 

1025 passed_targets = kwargs.pop("target_nodes", None) 

1026 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1027 target_nodes = self._parse_target_nodes(passed_targets) 

1028 target_nodes_specified = True 

1029 # If an error that allows retrying was thrown, the nodes and slots 

1030 # cache were reinitialized. We will retry executing the command with 

1031 # the updated cluster setup only when the target nodes can be 

1032 # determined again with the new cache tables. Therefore, when target 

1033 # nodes were passed to this function, we cannot retry the command 

1034 # execution since the nodes may not be valid anymore after the tables 

1035 # were reinitialized. So in case of passed target nodes, 

1036 # retry_attempts will be set to 0. 

1037 retry_attempts = ( 

1038 0 if target_nodes_specified else self.cluster_error_retry_attempts 

1039 ) 

1040 # Add one for the first execution 

1041 execute_attempts = 1 + retry_attempts 

1042 for _ in range(execute_attempts): 

1043 try: 

1044 res = {} 

1045 if not target_nodes_specified: 

1046 # Determine the nodes to execute the command on 

1047 target_nodes = self._determine_nodes( 

1048 *args, **kwargs, nodes_flag=passed_targets 

1049 ) 

1050 if not target_nodes: 

1051 raise RedisClusterException( 

1052 f"No targets were found to execute {args} command on" 

1053 ) 

1054 if ( 

1055 len(target_nodes) == 1 

1056 and target_nodes[0] == self.get_default_node() 

1057 ): 

1058 is_default_node = True 

1059 for node in target_nodes: 

1060 res[node.name] = self._execute_command(node, *args, **kwargs) 

1061 # Return the processed result 

1062 return self._process_result(args[0], res, **kwargs) 

1063 except Exception as e: 

1064 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1065 if is_default_node: 

1066 # Replace the default cluster node 

1067 self.replace_default_node() 

1068 # The nodes and slots cache were reinitialized. 

1069 # Try again with the new cluster setup. 

1070 retry_attempts -= 1 

1071 continue 

1072 else: 

1073 # raise the exception 

1074 raise e 

1075 

1076 def _execute_command(self, target_node, *args, **kwargs): 

1077 """ 

1078 Send a command to a node in the cluster 

1079 """ 

1080 command = args[0] 

1081 redis_node = None 

1082 connection = None 

1083 redirect_addr = None 

1084 asking = False 

1085 moved = False 

1086 ttl = int(self.RedisClusterRequestTTL) 

1087 

1088 while ttl > 0: 

1089 ttl -= 1 

1090 try: 

1091 if asking: 

1092 target_node = self.get_node(node_name=redirect_addr) 

1093 elif moved: 

1094 # MOVED occurred and the slots cache was updated, 

1095 # refresh the target node 

1096 slot = self.determine_slot(*args) 

1097 target_node = self.nodes_manager.get_node_from_slot( 

1098 slot, self.read_from_replicas and command in READ_COMMANDS 

1099 ) 

1100 moved = False 

1101 

1102 redis_node = self.get_redis_connection(target_node) 

1103 connection = get_connection(redis_node, *args, **kwargs) 

1104 if asking: 

1105 connection.send_command("ASKING") 

1106 redis_node.parse_response(connection, "ASKING", **kwargs) 

1107 asking = False 

1108 

1109 connection.send_command(*args) 

1110 response = redis_node.parse_response(connection, command, **kwargs) 

1111 if command in self.cluster_response_callbacks: 

1112 response = self.cluster_response_callbacks[command]( 

1113 response, **kwargs 

1114 ) 

1115 return response 

1116 except AuthenticationError: 

1117 raise 

1118 except (ConnectionError, TimeoutError) as e: 

1119 # Connection retries are being handled in the node's 

1120 # Retry object. 

1121 # ConnectionError can also be raised if we couldn't get a 

1122 # connection from the pool before timing out, so check that 

1123 # this is an actual connection before attempting to disconnect. 

1124 if connection is not None: 

1125 connection.disconnect() 

1126 

1127 # Remove the failed node from the startup nodes before we try 

1128 # to reinitialize the cluster 

1129 self.nodes_manager.startup_nodes.pop(target_node.name, None) 

1130 # Reset the cluster node's connection 

1131 target_node.redis_connection = None 

1132 self.nodes_manager.initialize() 

1133 raise e 

1134 except MovedError as e: 

1135 # First, we will try to patch the slots/nodes cache with the 

1136 # redirected node output and try again. If MovedError exceeds 

1137 # 'reinitialize_steps' number of times, we will force 

1138 # reinitializing the tables, and then try again. 

1139 # 'reinitialize_steps' counter will increase faster when 

1140 # the same client object is shared between multiple threads. To 

1141 # reduce the frequency you can set this variable in the 

1142 # RedisCluster constructor. 

1143 self.reinitialize_counter += 1 

1144 if self._should_reinitialized(): 

1145 self.nodes_manager.initialize() 

1146 # Reset the counter 

1147 self.reinitialize_counter = 0 

1148 else: 

1149 self.nodes_manager.update_moved_exception(e) 

1150 moved = True 

1151 except TryAgainError: 

1152 if ttl < self.RedisClusterRequestTTL / 2: 

1153 time.sleep(0.05) 

1154 except AskError as e: 

1155 redirect_addr = get_node_name(host=e.host, port=e.port) 

1156 asking = True 

1157 except ClusterDownError as e: 

1158 # ClusterDownError can occur during a failover and to get 

1159 # self-healed, we will try to reinitialize the cluster layout 

1160 # and retry executing the command 

1161 time.sleep(0.25) 

1162 self.nodes_manager.initialize() 

1163 raise e 

1164 except ResponseError: 

1165 raise 

1166 except Exception as e: 

1167 if connection: 

1168 connection.disconnect() 

1169 raise e 

1170 finally: 

1171 if connection is not None: 

1172 redis_node.connection_pool.release(connection) 

1173 

1174 raise ClusterError("TTL exhausted.") 

1175 

1176 def close(self): 

1177 try: 

1178 with self._lock: 

1179 if self.nodes_manager: 

1180 self.nodes_manager.close() 

1181 except AttributeError: 

1182 # RedisCluster's __init__ can fail before nodes_manager is set 

1183 pass 

1184 

1185 def _process_result(self, command, res, **kwargs): 

1186 """ 

1187 Process the result of the executed command. 

1188 The function would return a dict or a single value. 

1189 

1190 :type command: str 

1191 :type res: dict 

1192 

1193 `res` should be in the following format: 

1194 Dict<node_name, command_result> 

1195 """ 

1196 if command in self.result_callbacks: 

1197 return self.result_callbacks[command](command, res, **kwargs) 

1198 elif len(res) == 1: 

1199 # When we execute the command on a single node, we can 

1200 # remove the dictionary and return a single response 

1201 return list(res.values())[0] 

1202 else: 

1203 return res 

1204 

1205 def load_external_module(self, funcname, func): 

1206 """ 

1207 This function can be used to add externally defined redis modules, 

1208 and their namespaces to the redis client. 

1209 

1210 ``funcname`` - A string containing the name of the function to create 

1211 ``func`` - The function, being added to this class. 

1212 """ 

1213 setattr(self, funcname, func) 

1214 

1215 

1216class ClusterNode: 

1217 def __init__(self, host, port, server_type=None, redis_connection=None): 

1218 if host == "localhost": 

1219 host = socket.gethostbyname(host) 

1220 

1221 self.host = host 

1222 self.port = port 

1223 self.name = get_node_name(host, port) 

1224 self.server_type = server_type 

1225 self.redis_connection = redis_connection 

1226 

1227 def __repr__(self): 

1228 return ( 

1229 f"[host={self.host}," 

1230 f"port={self.port}," 

1231 f"name={self.name}," 

1232 f"server_type={self.server_type}," 

1233 f"redis_connection={self.redis_connection}]" 

1234 ) 

1235 

1236 def __eq__(self, obj): 

1237 return isinstance(obj, ClusterNode) and obj.name == self.name 

1238 

1239 def __del__(self): 

1240 if self.redis_connection is not None: 

1241 self.redis_connection.close() 

1242 

1243 

1244class LoadBalancer: 

1245 """ 

1246 Round-Robin Load Balancing 

1247 """ 

1248 

1249 def __init__(self, start_index: int = 0) -> None: 

1250 self.primary_to_idx = {} 

1251 self.start_index = start_index 

1252 

1253 def get_server_index(self, primary: str, list_size: int) -> int: 

1254 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

1255 # Update the index 

1256 self.primary_to_idx[primary] = (server_index + 1) % list_size 

1257 return server_index 

1258 

1259 def reset(self) -> None: 

1260 self.primary_to_idx.clear() 

1261 

1262 

1263class NodesManager: 

1264 def __init__( 

1265 self, 

1266 startup_nodes, 

1267 from_url=False, 

1268 require_full_coverage=False, 

1269 lock=None, 

1270 dynamic_startup_nodes=True, 

1271 connection_pool_class=ConnectionPool, 

1272 **kwargs, 

1273 ): 

1274 self.nodes_cache = {} 

1275 self.slots_cache = {} 

1276 self.startup_nodes = {} 

1277 self.default_node = None 

1278 self.populate_startup_nodes(startup_nodes) 

1279 self.from_url = from_url 

1280 self._require_full_coverage = require_full_coverage 

1281 self._dynamic_startup_nodes = dynamic_startup_nodes 

1282 self.connection_pool_class = connection_pool_class 

1283 self._moved_exception = None 

1284 self.connection_kwargs = kwargs 

1285 self.read_load_balancer = LoadBalancer() 

1286 if lock is None: 

1287 lock = threading.Lock() 

1288 self._lock = lock 

1289 self.initialize() 

1290 

1291 def get_node(self, host=None, port=None, node_name=None): 

1292 """ 

1293 Get the requested node from the cluster's nodes. 

1294 nodes. 

1295 :return: ClusterNode if the node exists, else None 

1296 """ 

1297 if host and port: 

1298 # the user passed host and port 

1299 if host == "localhost": 

1300 host = socket.gethostbyname(host) 

1301 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1302 elif node_name: 

1303 return self.nodes_cache.get(node_name) 

1304 else: 

1305 return None 

1306 

1307 def update_moved_exception(self, exception): 

1308 self._moved_exception = exception 

1309 

1310 def _update_moved_slots(self): 

1311 """ 

1312 Update the slot's node with the redirected one 

1313 """ 

1314 e = self._moved_exception 

1315 redirected_node = self.get_node(host=e.host, port=e.port) 

1316 if redirected_node is not None: 

1317 # The node already exists 

1318 if redirected_node.server_type is not PRIMARY: 

1319 # Update the node's server type 

1320 redirected_node.server_type = PRIMARY 

1321 else: 

1322 # This is a new node, we will add it to the nodes cache 

1323 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

1324 self.nodes_cache[redirected_node.name] = redirected_node 

1325 if redirected_node in self.slots_cache[e.slot_id]: 

1326 # The MOVED error resulted from a failover, and the new slot owner 

1327 # had previously been a replica. 

1328 old_primary = self.slots_cache[e.slot_id][0] 

1329 # Update the old primary to be a replica and add it to the end of 

1330 # the slot's node list 

1331 old_primary.server_type = REPLICA 

1332 self.slots_cache[e.slot_id].append(old_primary) 

1333 # Remove the old replica, which is now a primary, from the slot's 

1334 # node list 

1335 self.slots_cache[e.slot_id].remove(redirected_node) 

1336 # Override the old primary with the new one 

1337 self.slots_cache[e.slot_id][0] = redirected_node 

1338 if self.default_node == old_primary: 

1339 # Update the default node with the new primary 

1340 self.default_node = redirected_node 

1341 else: 

1342 # The new slot owner is a new server, or a server from a different 

1343 # shard. We need to remove all current nodes from the slot's list 

1344 # (including replications) and add just the new node. 

1345 self.slots_cache[e.slot_id] = [redirected_node] 

1346 # Reset moved_exception 

1347 self._moved_exception = None 

1348 

1349 def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None): 

1350 """ 

1351 Gets a node that servers this hash slot 

1352 """ 

1353 if self._moved_exception: 

1354 with self._lock: 

1355 if self._moved_exception: 

1356 self._update_moved_slots() 

1357 

1358 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

1359 raise SlotNotCoveredError( 

1360 f'Slot "{slot}" not covered by the cluster. ' 

1361 f'"require_full_coverage={self._require_full_coverage}"' 

1362 ) 

1363 

1364 if read_from_replicas is True: 

1365 # get the server index in a Round-Robin manner 

1366 primary_name = self.slots_cache[slot][0].name 

1367 node_idx = self.read_load_balancer.get_server_index( 

1368 primary_name, len(self.slots_cache[slot]) 

1369 ) 

1370 elif ( 

1371 server_type is None 

1372 or server_type == PRIMARY 

1373 or len(self.slots_cache[slot]) == 1 

1374 ): 

1375 # return a primary 

1376 node_idx = 0 

1377 else: 

1378 # return a replica 

1379 # randomly choose one of the replicas 

1380 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

1381 

1382 return self.slots_cache[slot][node_idx] 

1383 

1384 def get_nodes_by_server_type(self, server_type): 

1385 """ 

1386 Get all nodes with the specified server type 

1387 :param server_type: 'primary' or 'replica' 

1388 :return: list of ClusterNode 

1389 """ 

1390 return [ 

1391 node 

1392 for node in self.nodes_cache.values() 

1393 if node.server_type == server_type 

1394 ] 

1395 

1396 def populate_startup_nodes(self, nodes): 

1397 """ 

1398 Populate all startup nodes and filters out any duplicates 

1399 """ 

1400 for n in nodes: 

1401 self.startup_nodes[n.name] = n 

1402 

1403 def check_slots_coverage(self, slots_cache): 

1404 # Validate if all slots are covered or if we should try next 

1405 # startup node 

1406 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

1407 if i not in slots_cache: 

1408 return False 

1409 return True 

1410 

1411 def create_redis_connections(self, nodes): 

1412 """ 

1413 This function will create a redis connection to all nodes in :nodes: 

1414 """ 

1415 for node in nodes: 

1416 if node.redis_connection is None: 

1417 node.redis_connection = self.create_redis_node( 

1418 host=node.host, port=node.port, **self.connection_kwargs 

1419 ) 

1420 

1421 def create_redis_node(self, host, port, **kwargs): 

1422 if self.from_url: 

1423 # Create a redis node with a costumed connection pool 

1424 kwargs.update({"host": host}) 

1425 kwargs.update({"port": port}) 

1426 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

1427 else: 

1428 r = Redis(host=host, port=port, **kwargs) 

1429 return r 

1430 

1431 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

1432 node_name = get_node_name(host, port) 

1433 # check if we already have this node in the tmp_nodes_cache 

1434 target_node = tmp_nodes_cache.get(node_name) 

1435 if target_node is None: 

1436 # before creating a new cluster node, check if the cluster node already 

1437 # exists in the current nodes cache and has a valid connection so we can 

1438 # reuse it 

1439 target_node = self.nodes_cache.get(node_name) 

1440 if target_node is None or target_node.redis_connection is None: 

1441 # create new cluster node for this cluster 

1442 target_node = ClusterNode(host, port, role) 

1443 if target_node.server_type != role: 

1444 target_node.server_type = role 

1445 

1446 return target_node 

1447 

1448 def initialize(self): 

1449 """ 

1450 Initializes the nodes cache, slots cache and redis connections. 

1451 :startup_nodes: 

1452 Responsible for discovering other nodes in the cluster 

1453 """ 

1454 self.reset() 

1455 tmp_nodes_cache = {} 

1456 tmp_slots = {} 

1457 disagreements = [] 

1458 startup_nodes_reachable = False 

1459 fully_covered = False 

1460 kwargs = self.connection_kwargs 

1461 exception = None 

1462 for startup_node in self.startup_nodes.values(): 

1463 try: 

1464 if startup_node.redis_connection: 

1465 r = startup_node.redis_connection 

1466 else: 

1467 # Create a new Redis connection 

1468 r = self.create_redis_node( 

1469 startup_node.host, startup_node.port, **kwargs 

1470 ) 

1471 self.startup_nodes[startup_node.name].redis_connection = r 

1472 # Make sure cluster mode is enabled on this node 

1473 if bool(r.info().get("cluster_enabled")) is False: 

1474 raise RedisClusterException( 

1475 "Cluster mode is not enabled on this node" 

1476 ) 

1477 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

1478 startup_nodes_reachable = True 

1479 except Exception as e: 

1480 # Try the next startup node. 

1481 # The exception is saved and raised only if we have no more nodes. 

1482 exception = e 

1483 continue 

1484 

1485 # CLUSTER SLOTS command results in the following output: 

1486 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

1487 # where each node contains the following list: [IP, port, node_id] 

1488 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

1489 # primary node of the first slot section. 

1490 # If there's only one server in the cluster, its ``host`` is '' 

1491 # Fix it to the host in startup_nodes 

1492 if ( 

1493 len(cluster_slots) == 1 

1494 and len(cluster_slots[0][2][0]) == 0 

1495 and len(self.startup_nodes) == 1 

1496 ): 

1497 cluster_slots[0][2][0] = startup_node.host 

1498 

1499 for slot in cluster_slots: 

1500 primary_node = slot[2] 

1501 host = str_if_bytes(primary_node[0]) 

1502 if host == "": 

1503 host = startup_node.host 

1504 port = int(primary_node[1]) 

1505 

1506 target_node = self._get_or_create_cluster_node( 

1507 host, port, PRIMARY, tmp_nodes_cache 

1508 ) 

1509 # add this node to the nodes cache 

1510 tmp_nodes_cache[target_node.name] = target_node 

1511 

1512 for i in range(int(slot[0]), int(slot[1]) + 1): 

1513 if i not in tmp_slots: 

1514 tmp_slots[i] = [] 

1515 tmp_slots[i].append(target_node) 

1516 replica_nodes = [slot[j] for j in range(3, len(slot))] 

1517 

1518 for replica_node in replica_nodes: 

1519 host = str_if_bytes(replica_node[0]) 

1520 port = replica_node[1] 

1521 

1522 target_replica_node = self._get_or_create_cluster_node( 

1523 host, port, REPLICA, tmp_nodes_cache 

1524 ) 

1525 tmp_slots[i].append(target_replica_node) 

1526 # add this node to the nodes cache 

1527 tmp_nodes_cache[ 

1528 target_replica_node.name 

1529 ] = target_replica_node 

1530 else: 

1531 # Validate that 2 nodes want to use the same slot cache 

1532 # setup 

1533 tmp_slot = tmp_slots[i][0] 

1534 if tmp_slot.name != target_node.name: 

1535 disagreements.append( 

1536 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

1537 ) 

1538 

1539 if len(disagreements) > 5: 

1540 raise RedisClusterException( 

1541 f"startup_nodes could not agree on a valid " 

1542 f'slots cache: {", ".join(disagreements)}' 

1543 ) 

1544 

1545 fully_covered = self.check_slots_coverage(tmp_slots) 

1546 if fully_covered: 

1547 # Don't need to continue to the next startup node if all 

1548 # slots are covered 

1549 break 

1550 

1551 if not startup_nodes_reachable: 

1552 raise RedisClusterException( 

1553 f"Redis Cluster cannot be connected. Please provide at least " 

1554 f"one reachable node: {str(exception)}" 

1555 ) from exception 

1556 

1557 # Create Redis connections to all nodes 

1558 self.create_redis_connections(list(tmp_nodes_cache.values())) 

1559 

1560 # Check if the slots are not fully covered 

1561 if not fully_covered and self._require_full_coverage: 

1562 # Despite the requirement that the slots be covered, there 

1563 # isn't a full coverage 

1564 raise RedisClusterException( 

1565 f"All slots are not covered after query all startup_nodes. " 

1566 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

1567 f"covered..." 

1568 ) 

1569 

1570 # Set the tmp variables to the real variables 

1571 self.nodes_cache = tmp_nodes_cache 

1572 self.slots_cache = tmp_slots 

1573 # Set the default node 

1574 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

1575 if self._dynamic_startup_nodes: 

1576 # Populate the startup nodes with all discovered nodes 

1577 self.startup_nodes = tmp_nodes_cache 

1578 # If initialize was called after a MovedError, clear it 

1579 self._moved_exception = None 

1580 

1581 def close(self): 

1582 self.default_node = None 

1583 for node in self.nodes_cache.values(): 

1584 if node.redis_connection: 

1585 node.redis_connection.close() 

1586 

1587 def reset(self): 

1588 try: 

1589 self.read_load_balancer.reset() 

1590 except TypeError: 

1591 # The read_load_balancer is None, do nothing 

1592 pass 

1593 

1594 

1595class ClusterPubSub(PubSub): 

1596 """ 

1597 Wrapper for PubSub class. 

1598 

1599 IMPORTANT: before using ClusterPubSub, read about the known limitations 

1600 with pubsub in Cluster mode and learn how to workaround them: 

1601 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html 

1602 """ 

1603 

1604 def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs): 

1605 """ 

1606 When a pubsub instance is created without specifying a node, a single 

1607 node will be transparently chosen for the pubsub connection on the 

1608 first command execution. The node will be determined by: 

1609 1. Hashing the channel name in the request to find its keyslot 

1610 2. Selecting a node that handles the keyslot: If read_from_replicas is 

1611 set to true, a replica can be selected. 

1612 

1613 :type redis_cluster: RedisCluster 

1614 :type node: ClusterNode 

1615 :type host: str 

1616 :type port: int 

1617 """ 

1618 self.node = None 

1619 self.set_pubsub_node(redis_cluster, node, host, port) 

1620 connection_pool = ( 

1621 None 

1622 if self.node is None 

1623 else redis_cluster.get_redis_connection(self.node).connection_pool 

1624 ) 

1625 self.cluster = redis_cluster 

1626 super().__init__( 

1627 **kwargs, connection_pool=connection_pool, encoder=redis_cluster.encoder 

1628 ) 

1629 

1630 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

1631 """ 

1632 The pubsub node will be set according to the passed node, host and port 

1633 When none of the node, host, or port are specified - the node is set 

1634 to None and will be determined by the keyslot of the channel in the 

1635 first command to be executed. 

1636 RedisClusterException will be thrown if the passed node does not exist 

1637 in the cluster. 

1638 If host is passed without port, or vice versa, a DataError will be 

1639 thrown. 

1640 :type cluster: RedisCluster 

1641 :type node: ClusterNode 

1642 :type host: str 

1643 :type port: int 

1644 """ 

1645 if node is not None: 

1646 # node is passed by the user 

1647 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

1648 pubsub_node = node 

1649 elif host is not None and port is not None: 

1650 # host and port passed by the user 

1651 node = cluster.get_node(host=host, port=port) 

1652 self._raise_on_invalid_node(cluster, node, host, port) 

1653 pubsub_node = node 

1654 elif any([host, port]) is True: 

1655 # only 'host' or 'port' passed 

1656 raise DataError("Passing a host requires passing a port, and vice versa") 

1657 else: 

1658 # nothing passed by the user. set node to None 

1659 pubsub_node = None 

1660 

1661 self.node = pubsub_node 

1662 

1663 def get_pubsub_node(self): 

1664 """ 

1665 Get the node that is being used as the pubsub connection 

1666 """ 

1667 return self.node 

1668 

1669 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

1670 """ 

1671 Raise a RedisClusterException if the node is None or doesn't exist in 

1672 the cluster. 

1673 """ 

1674 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

1675 raise RedisClusterException( 

1676 f"Node {host}:{port} doesn't exist in the cluster" 

1677 ) 

1678 

1679 def execute_command(self, *args, **kwargs): 

1680 """ 

1681 Execute a publish/subscribe command. 

1682 

1683 Taken code from redis-py and tweak to make it work within a cluster. 

1684 """ 

1685 # NOTE: don't parse the response in this function -- it could pull a 

1686 # legitimate message off the stack if the connection is already 

1687 # subscribed to one or more channels 

1688 

1689 if self.connection is None: 

1690 if self.connection_pool is None: 

1691 if len(args) > 1: 

1692 # Hash the first channel and get one of the nodes holding 

1693 # this slot 

1694 channel = args[1] 

1695 slot = self.cluster.keyslot(channel) 

1696 node = self.cluster.nodes_manager.get_node_from_slot( 

1697 slot, self.cluster.read_from_replicas 

1698 ) 

1699 else: 

1700 # Get a random node 

1701 node = self.cluster.get_random_node() 

1702 self.node = node 

1703 redis_connection = self.cluster.get_redis_connection(node) 

1704 self.connection_pool = redis_connection.connection_pool 

1705 self.connection = self.connection_pool.get_connection( 

1706 "pubsub", self.shard_hint 

1707 ) 

1708 # register a callback that re-subscribes to any channels we 

1709 # were listening to when we were disconnected 

1710 self.connection.register_connect_callback(self.on_connect) 

1711 connection = self.connection 

1712 self._execute(connection, connection.send_command, *args) 

1713 

1714 def get_redis_connection(self): 

1715 """ 

1716 Get the Redis connection of the pubsub connected node. 

1717 """ 

1718 if self.node is not None: 

1719 return self.node.redis_connection 

1720 

1721 

1722class ClusterPipeline(RedisCluster): 

1723 """ 

1724 Support for Redis pipeline 

1725 in cluster mode 

1726 """ 

1727 

1728 ERRORS_ALLOW_RETRY = ( 

1729 ConnectionError, 

1730 TimeoutError, 

1731 MovedError, 

1732 AskError, 

1733 TryAgainError, 

1734 ) 

1735 

1736 def __init__( 

1737 self, 

1738 nodes_manager: "NodesManager", 

1739 commands_parser: "CommandsParser", 

1740 result_callbacks: Optional[Dict[str, Callable]] = None, 

1741 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

1742 startup_nodes: Optional[List["ClusterNode"]] = None, 

1743 read_from_replicas: bool = False, 

1744 cluster_error_retry_attempts: int = 3, 

1745 reinitialize_steps: int = 5, 

1746 lock=None, 

1747 **kwargs, 

1748 ): 

1749 """ """ 

1750 self.command_stack = [] 

1751 self.nodes_manager = nodes_manager 

1752 self.commands_parser = commands_parser 

1753 self.refresh_table_asap = False 

1754 self.result_callbacks = ( 

1755 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

1756 ) 

1757 self.startup_nodes = startup_nodes if startup_nodes else [] 

1758 self.read_from_replicas = read_from_replicas 

1759 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

1760 self.cluster_response_callbacks = cluster_response_callbacks 

1761 self.cluster_error_retry_attempts = cluster_error_retry_attempts 

1762 self.reinitialize_counter = 0 

1763 self.reinitialize_steps = reinitialize_steps 

1764 self.encoder = Encoder( 

1765 kwargs.get("encoding", "utf-8"), 

1766 kwargs.get("encoding_errors", "strict"), 

1767 kwargs.get("decode_responses", False), 

1768 ) 

1769 if lock is None: 

1770 lock = threading.Lock() 

1771 self._lock = lock 

1772 

1773 def __repr__(self): 

1774 """ """ 

1775 return f"{type(self).__name__}" 

1776 

1777 def __enter__(self): 

1778 """ """ 

1779 return self 

1780 

1781 def __exit__(self, exc_type, exc_value, traceback): 

1782 """ """ 

1783 self.reset() 

1784 

1785 def __del__(self): 

1786 try: 

1787 self.reset() 

1788 except Exception: 

1789 pass 

1790 

1791 def __len__(self): 

1792 """ """ 

1793 return len(self.command_stack) 

1794 

1795 def __bool__(self): 

1796 "Pipeline instances should always evaluate to True on Python 3+" 

1797 return True 

1798 

1799 def execute_command(self, *args, **kwargs): 

1800 """ 

1801 Wrapper function for pipeline_execute_command 

1802 """ 

1803 return self.pipeline_execute_command(*args, **kwargs) 

1804 

1805 def pipeline_execute_command(self, *args, **options): 

1806 """ 

1807 Appends the executed command to the pipeline's command stack 

1808 """ 

1809 self.command_stack.append( 

1810 PipelineCommand(args, options, len(self.command_stack)) 

1811 ) 

1812 return self 

1813 

1814 def raise_first_error(self, stack): 

1815 """ 

1816 Raise the first exception on the stack 

1817 """ 

1818 for c in stack: 

1819 r = c.result 

1820 if isinstance(r, Exception): 

1821 self.annotate_exception(r, c.position + 1, c.args) 

1822 raise r 

1823 

1824 def annotate_exception(self, exception, number, command): 

1825 """ 

1826 Provides extra context to the exception prior to it being handled 

1827 """ 

1828 cmd = " ".join(map(safe_str, command)) 

1829 msg = ( 

1830 f"Command # {number} ({cmd}) of pipeline " 

1831 f"caused error: {exception.args[0]}" 

1832 ) 

1833 exception.args = (msg,) + exception.args[1:] 

1834 

1835 def execute(self, raise_on_error=True): 

1836 """ 

1837 Execute all the commands in the current pipeline 

1838 """ 

1839 stack = self.command_stack 

1840 try: 

1841 return self.send_cluster_commands(stack, raise_on_error) 

1842 finally: 

1843 self.reset() 

1844 

1845 def reset(self): 

1846 """ 

1847 Reset back to empty pipeline. 

1848 """ 

1849 self.command_stack = [] 

1850 

1851 self.scripts = set() 

1852 

1853 # TODO: Implement 

1854 # make sure to reset the connection state in the event that we were 

1855 # watching something 

1856 # if self.watching and self.connection: 

1857 # try: 

1858 # # call this manually since our unwatch or 

1859 # # immediate_execute_command methods can call reset() 

1860 # self.connection.send_command('UNWATCH') 

1861 # self.connection.read_response() 

1862 # except ConnectionError: 

1863 # # disconnect will also remove any previous WATCHes 

1864 # self.connection.disconnect() 

1865 

1866 # clean up the other instance attributes 

1867 self.watching = False 

1868 self.explicit_transaction = False 

1869 

1870 # TODO: Implement 

1871 # we can safely return the connection to the pool here since we're 

1872 # sure we're no longer WATCHing anything 

1873 # if self.connection: 

1874 # self.connection_pool.release(self.connection) 

1875 # self.connection = None 

1876 

1877 def send_cluster_commands( 

1878 self, stack, raise_on_error=True, allow_redirections=True 

1879 ): 

1880 """ 

1881 Wrapper for CLUSTERDOWN error handling. 

1882 

1883 If the cluster reports it is down it is assumed that: 

1884 - connection_pool was disconnected 

1885 - connection_pool was reseted 

1886 - refereh_table_asap set to True 

1887 

1888 It will try the number of times specified by 

1889 the config option "self.cluster_error_retry_attempts" 

1890 which defaults to 3 unless manually configured. 

1891 

1892 If it reaches the number of times, the command will 

1893 raises ClusterDownException. 

1894 """ 

1895 if not stack: 

1896 return [] 

1897 retry_attempts = self.cluster_error_retry_attempts 

1898 while True: 

1899 try: 

1900 return self._send_cluster_commands( 

1901 stack, 

1902 raise_on_error=raise_on_error, 

1903 allow_redirections=allow_redirections, 

1904 ) 

1905 except (ClusterDownError, ConnectionError) as e: 

1906 if retry_attempts > 0: 

1907 # Try again with the new cluster setup. All other errors 

1908 # should be raised. 

1909 retry_attempts -= 1 

1910 pass 

1911 else: 

1912 raise e 

1913 

1914 def _send_cluster_commands( 

1915 self, stack, raise_on_error=True, allow_redirections=True 

1916 ): 

1917 """ 

1918 Send a bunch of cluster commands to the redis cluster. 

1919 

1920 `allow_redirections` If the pipeline should follow 

1921 `ASK` & `MOVED` responses automatically. If set 

1922 to false it will raise RedisClusterException. 

1923 """ 

1924 # the first time sending the commands we send all of 

1925 # the commands that were queued up. 

1926 # if we have to run through it again, we only retry 

1927 # the commands that failed. 

1928 attempt = sorted(stack, key=lambda x: x.position) 

1929 is_default_node = False 

1930 # build a list of node objects based on node names we need to 

1931 nodes = {} 

1932 

1933 # as we move through each command that still needs to be processed, 

1934 # we figure out the slot number that command maps to, then from 

1935 # the slot determine the node. 

1936 for c in attempt: 

1937 while True: 

1938 # refer to our internal node -> slot table that 

1939 # tells us where a given command should route to. 

1940 # (it might be possible we have a cached node that no longer 

1941 # exists in the cluster, which is why we do this in a loop) 

1942 passed_targets = c.options.pop("target_nodes", None) 

1943 if passed_targets and not self._is_nodes_flag(passed_targets): 

1944 target_nodes = self._parse_target_nodes(passed_targets) 

1945 else: 

1946 target_nodes = self._determine_nodes( 

1947 *c.args, node_flag=passed_targets 

1948 ) 

1949 if not target_nodes: 

1950 raise RedisClusterException( 

1951 f"No targets were found to execute {c.args} command on" 

1952 ) 

1953 if len(target_nodes) > 1: 

1954 raise RedisClusterException( 

1955 f"Too many targets for command {c.args}" 

1956 ) 

1957 

1958 node = target_nodes[0] 

1959 if node == self.get_default_node(): 

1960 is_default_node = True 

1961 

1962 # now that we know the name of the node 

1963 # ( it's just a string in the form of host:port ) 

1964 # we can build a list of commands for each node. 

1965 node_name = node.name 

1966 if node_name not in nodes: 

1967 redis_node = self.get_redis_connection(node) 

1968 try: 

1969 connection = get_connection(redis_node, c.args) 

1970 except ConnectionError: 

1971 # Connection retries are being handled in the node's 

1972 # Retry object. Reinitialize the node -> slot table. 

1973 self.nodes_manager.initialize() 

1974 if is_default_node: 

1975 self.replace_default_node() 

1976 raise 

1977 nodes[node_name] = NodeCommands( 

1978 redis_node.parse_response, 

1979 redis_node.connection_pool, 

1980 connection, 

1981 ) 

1982 nodes[node_name].append(c) 

1983 break 

1984 

1985 # send the commands in sequence. 

1986 # we write to all the open sockets for each node first, 

1987 # before reading anything 

1988 # this allows us to flush all the requests out across the 

1989 # network essentially in parallel 

1990 # so that we can read them all in parallel as they come back. 

1991 # we dont' multiplex on the sockets as they come available, 

1992 # but that shouldn't make too much difference. 

1993 node_commands = nodes.values() 

1994 for n in node_commands: 

1995 n.write() 

1996 

1997 for n in node_commands: 

1998 n.read() 

1999 

2000 # release all of the redis connections we allocated earlier 

2001 # back into the connection pool. 

2002 # we used to do this step as part of a try/finally block, 

2003 # but it is really dangerous to 

2004 # release connections back into the pool if for some 

2005 # reason the socket has data still left in it 

2006 # from a previous operation. The write and 

2007 # read operations already have try/catch around them for 

2008 # all known types of errors including connection 

2009 # and socket level errors. 

2010 # So if we hit an exception, something really bad 

2011 # happened and putting any oF 

2012 # these connections back into the pool is a very bad idea. 

2013 # the socket might have unread buffer still sitting in it, 

2014 # and then the next time we read from it we pass the 

2015 # buffered result back from a previous command and 

2016 # every single request after to that connection will always get 

2017 # a mismatched result. 

2018 for n in nodes.values(): 

2019 n.connection_pool.release(n.connection) 

2020 

2021 # if the response isn't an exception it is a 

2022 # valid response from the node 

2023 # we're all done with that command, YAY! 

2024 # if we have more commands to attempt, we've run into problems. 

2025 # collect all the commands we are allowed to retry. 

2026 # (MOVED, ASK, or connection errors or timeout errors) 

2027 attempt = sorted( 

2028 ( 

2029 c 

2030 for c in attempt 

2031 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

2032 ), 

2033 key=lambda x: x.position, 

2034 ) 

2035 if attempt and allow_redirections: 

2036 # RETRY MAGIC HAPPENS HERE! 

2037 # send these remaing commands one at a time using `execute_command` 

2038 # in the main client. This keeps our retry logic 

2039 # in one place mostly, 

2040 # and allows us to be more confident in correctness of behavior. 

2041 # at this point any speed gains from pipelining have been lost 

2042 # anyway, so we might as well make the best 

2043 # attempt to get the correct behavior. 

2044 # 

2045 # The client command will handle retries for each 

2046 # individual command sequentially as we pass each 

2047 # one into `execute_command`. Any exceptions 

2048 # that bubble out should only appear once all 

2049 # retries have been exhausted. 

2050 # 

2051 # If a lot of commands have failed, we'll be setting the 

2052 # flag to rebuild the slots table from scratch. 

2053 # So MOVED errors should correct themselves fairly quickly. 

2054 self.reinitialize_counter += 1 

2055 if self._should_reinitialized(): 

2056 self.nodes_manager.initialize() 

2057 if is_default_node: 

2058 self.replace_default_node() 

2059 for c in attempt: 

2060 try: 

2061 # send each command individually like we 

2062 # do in the main client. 

2063 c.result = super().execute_command(*c.args, **c.options) 

2064 except RedisError as e: 

2065 c.result = e 

2066 

2067 # turn the response back into a simple flat array that corresponds 

2068 # to the sequence of commands issued in the stack in pipeline.execute() 

2069 response = [] 

2070 for c in sorted(stack, key=lambda x: x.position): 

2071 if c.args[0] in self.cluster_response_callbacks: 

2072 c.result = self.cluster_response_callbacks[c.args[0]]( 

2073 c.result, **c.options 

2074 ) 

2075 response.append(c.result) 

2076 

2077 if raise_on_error: 

2078 self.raise_first_error(stack) 

2079 

2080 return response 

2081 

2082 def _fail_on_redirect(self, allow_redirections): 

2083 """ """ 

2084 if not allow_redirections: 

2085 raise RedisClusterException( 

2086 "ASK & MOVED redirection not allowed in this pipeline" 

2087 ) 

2088 

2089 def exists(self, *keys): 

2090 return self.execute_command("EXISTS", *keys) 

2091 

2092 def eval(self): 

2093 """ """ 

2094 raise RedisClusterException("method eval() is not implemented") 

2095 

2096 def multi(self): 

2097 """ """ 

2098 raise RedisClusterException("method multi() is not implemented") 

2099 

2100 def immediate_execute_command(self, *args, **options): 

2101 """ """ 

2102 raise RedisClusterException( 

2103 "method immediate_execute_command() is not implemented" 

2104 ) 

2105 

2106 def _execute_transaction(self, *args, **kwargs): 

2107 """ """ 

2108 raise RedisClusterException("method _execute_transaction() is not implemented") 

2109 

2110 def load_scripts(self): 

2111 """ """ 

2112 raise RedisClusterException("method load_scripts() is not implemented") 

2113 

2114 def watch(self, *names): 

2115 """ """ 

2116 raise RedisClusterException("method watch() is not implemented") 

2117 

2118 def unwatch(self): 

2119 """ """ 

2120 raise RedisClusterException("method unwatch() is not implemented") 

2121 

2122 def script_load_for_pipeline(self, *args, **kwargs): 

2123 """ """ 

2124 raise RedisClusterException( 

2125 "method script_load_for_pipeline() is not implemented" 

2126 ) 

2127 

2128 def delete(self, *names): 

2129 """ 

2130 "Delete a key specified by ``names``" 

2131 """ 

2132 if len(names) != 1: 

2133 raise RedisClusterException( 

2134 "deleting multiple keys is not implemented in pipeline command" 

2135 ) 

2136 

2137 return self.execute_command("DEL", names[0]) 

2138 

2139 def unlink(self, *names): 

2140 """ 

2141 "Unlink a key specified by ``names``" 

2142 """ 

2143 if len(names) != 1: 

2144 raise RedisClusterException( 

2145 "unlinking multiple keys is not implemented in pipeline command" 

2146 ) 

2147 

2148 return self.execute_command("UNLINK", names[0]) 

2149 

2150 

2151def block_pipeline_command(name: str) -> Callable[..., Any]: 

2152 """ 

2153 Prints error because some pipelined commands should 

2154 be blocked when running in cluster-mode 

2155 """ 

2156 

2157 def inner(*args, **kwargs): 

2158 raise RedisClusterException( 

2159 f"ERROR: Calling pipelined function {name} is blocked " 

2160 f"when running redis in cluster mode..." 

2161 ) 

2162 

2163 return inner 

2164 

2165 

2166# Blocked pipeline commands 

2167PIPELINE_BLOCKED_COMMANDS = ( 

2168 "BGREWRITEAOF", 

2169 "BGSAVE", 

2170 "BITOP", 

2171 "BRPOPLPUSH", 

2172 "CLIENT GETNAME", 

2173 "CLIENT KILL", 

2174 "CLIENT LIST", 

2175 "CLIENT SETNAME", 

2176 "CLIENT", 

2177 "CONFIG GET", 

2178 "CONFIG RESETSTAT", 

2179 "CONFIG REWRITE", 

2180 "CONFIG SET", 

2181 "CONFIG", 

2182 "DBSIZE", 

2183 "ECHO", 

2184 "EVALSHA", 

2185 "FLUSHALL", 

2186 "FLUSHDB", 

2187 "INFO", 

2188 "KEYS", 

2189 "LASTSAVE", 

2190 "MGET", 

2191 "MGET NONATOMIC", 

2192 "MOVE", 

2193 "MSET", 

2194 "MSET NONATOMIC", 

2195 "MSETNX", 

2196 "PFCOUNT", 

2197 "PFMERGE", 

2198 "PING", 

2199 "PUBLISH", 

2200 "RANDOMKEY", 

2201 "READONLY", 

2202 "READWRITE", 

2203 "RENAME", 

2204 "RENAMENX", 

2205 "RPOPLPUSH", 

2206 "SAVE", 

2207 "SCAN", 

2208 "SCRIPT EXISTS", 

2209 "SCRIPT FLUSH", 

2210 "SCRIPT KILL", 

2211 "SCRIPT LOAD", 

2212 "SCRIPT", 

2213 "SDIFF", 

2214 "SDIFFSTORE", 

2215 "SENTINEL GET MASTER ADDR BY NAME", 

2216 "SENTINEL MASTER", 

2217 "SENTINEL MASTERS", 

2218 "SENTINEL MONITOR", 

2219 "SENTINEL REMOVE", 

2220 "SENTINEL SENTINELS", 

2221 "SENTINEL SET", 

2222 "SENTINEL SLAVES", 

2223 "SENTINEL", 

2224 "SHUTDOWN", 

2225 "SINTER", 

2226 "SINTERSTORE", 

2227 "SLAVEOF", 

2228 "SLOWLOG GET", 

2229 "SLOWLOG LEN", 

2230 "SLOWLOG RESET", 

2231 "SLOWLOG", 

2232 "SMOVE", 

2233 "SORT", 

2234 "SUNION", 

2235 "SUNIONSTORE", 

2236 "TIME", 

2237) 

2238for command in PIPELINE_BLOCKED_COMMANDS: 

2239 command = command.replace(" ", "_").lower() 

2240 

2241 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

2242 

2243 

2244class PipelineCommand: 

2245 """ """ 

2246 

2247 def __init__(self, args, options=None, position=None): 

2248 self.args = args 

2249 if options is None: 

2250 options = {} 

2251 self.options = options 

2252 self.position = position 

2253 self.result = None 

2254 self.node = None 

2255 self.asking = False 

2256 

2257 

2258class NodeCommands: 

2259 """ """ 

2260 

2261 def __init__(self, parse_response, connection_pool, connection): 

2262 """ """ 

2263 self.parse_response = parse_response 

2264 self.connection_pool = connection_pool 

2265 self.connection = connection 

2266 self.commands = [] 

2267 

2268 def append(self, c): 

2269 """ """ 

2270 self.commands.append(c) 

2271 

2272 def write(self): 

2273 """ 

2274 Code borrowed from Redis so it can be fixed 

2275 """ 

2276 connection = self.connection 

2277 commands = self.commands 

2278 

2279 # We are going to clobber the commands with the write, so go ahead 

2280 # and ensure that nothing is sitting there from a previous run. 

2281 for c in commands: 

2282 c.result = None 

2283 

2284 # build up all commands into a single request to increase network perf 

2285 # send all the commands and catch connection and timeout errors. 

2286 try: 

2287 connection.send_packed_command( 

2288 connection.pack_commands([c.args for c in commands]) 

2289 ) 

2290 except (ConnectionError, TimeoutError) as e: 

2291 for c in commands: 

2292 c.result = e 

2293 

2294 def read(self): 

2295 """ """ 

2296 connection = self.connection 

2297 for c in self.commands: 

2298 

2299 # if there is a result on this command, 

2300 # it means we ran into an exception 

2301 # like a connection error. Trying to parse 

2302 # a response on a connection that 

2303 # is no longer open will result in a 

2304 # connection error raised by redis-py. 

2305 # but redis-py doesn't check in parse_response 

2306 # that the sock object is 

2307 # still set and if you try to 

2308 # read from a closed connection, it will 

2309 # result in an AttributeError because 

2310 # it will do a readline() call on None. 

2311 # This can have all kinds of nasty side-effects. 

2312 # Treating this case as a connection error 

2313 # is fine because it will dump 

2314 # the connection object back into the 

2315 # pool and on the next write, it will 

2316 # explicitly open the connection and all will be well. 

2317 if c.result is None: 

2318 try: 

2319 c.result = self.parse_response(connection, c.args[0], **c.options) 

2320 except (ConnectionError, TimeoutError) as e: 

2321 for c in self.commands: 

2322 c.result = e 

2323 return 

2324 except RedisError: 

2325 c.result = sys.exc_info()[1]