Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/cluster.py: 18%

927 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 06:16 +0000

1import random 

2import socket 

3import sys 

4import threading 

5import time 

6from collections import OrderedDict 

7from typing import Any, Callable, Dict, List, Optional, Tuple, Union 

8 

9from redis._parsers import CommandsParser, Encoder 

10from redis._parsers.helpers import parse_scan 

11from redis.backoff import default_backoff 

12from redis.client import CaseInsensitiveDict, PubSub, Redis 

13from redis.commands import READ_COMMANDS, RedisClusterCommands 

14from redis.commands.helpers import list_or_args 

15from redis.connection import ConnectionPool, DefaultParser, parse_url 

16from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

17from redis.exceptions import ( 

18 AskError, 

19 AuthenticationError, 

20 ClusterCrossSlotError, 

21 ClusterDownError, 

22 ClusterError, 

23 ConnectionError, 

24 DataError, 

25 MasterDownError, 

26 MovedError, 

27 RedisClusterException, 

28 RedisError, 

29 ResponseError, 

30 SlotNotCoveredError, 

31 TimeoutError, 

32 TryAgainError, 

33) 

34from redis.lock import Lock 

35from redis.retry import Retry 

36from redis.utils import ( 

37 HIREDIS_AVAILABLE, 

38 dict_merge, 

39 list_keys_to_dict, 

40 merge_result, 

41 safe_str, 

42 str_if_bytes, 

43) 

44 

45 

46def get_node_name(host: str, port: Union[str, int]) -> str: 

47 return f"{host}:{port}" 

48 

49 

50def get_connection(redis_node, *args, **options): 

51 return redis_node.connection or redis_node.connection_pool.get_connection( 

52 args[0], **options 

53 ) 

54 

55 

56def parse_scan_result(command, res, **options): 

57 cursors = {} 

58 ret = [] 

59 for node_name, response in res.items(): 

60 cursor, r = parse_scan(response, **options) 

61 cursors[node_name] = cursor 

62 ret += r 

63 

64 return cursors, ret 

65 

66 

67def parse_pubsub_numsub(command, res, **options): 

68 numsub_d = OrderedDict() 

69 for numsub_tups in res.values(): 

70 for channel, numsubbed in numsub_tups: 

71 try: 

72 numsub_d[channel] += numsubbed 

73 except KeyError: 

74 numsub_d[channel] = numsubbed 

75 

76 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

77 return ret_numsub 

78 

79 

80def parse_cluster_slots( 

81 resp: Any, **options: Any 

82) -> Dict[Tuple[int, int], Dict[str, Any]]: 

83 current_host = options.get("current_host", "") 

84 

85 def fix_server(*args: Any) -> Tuple[str, Any]: 

86 return str_if_bytes(args[0]) or current_host, args[1] 

87 

88 slots = {} 

89 for slot in resp: 

90 start, end, primary = slot[:3] 

91 replicas = slot[3:] 

92 slots[start, end] = { 

93 "primary": fix_server(*primary), 

94 "replicas": [fix_server(*replica) for replica in replicas], 

95 } 

96 

97 return slots 

98 

99 

100def parse_cluster_shards(resp, **options): 

101 """ 

102 Parse CLUSTER SHARDS response. 

103 """ 

104 if isinstance(resp[0], dict): 

105 return resp 

106 shards = [] 

107 for x in resp: 

108 shard = {"slots": [], "nodes": []} 

109 for i in range(0, len(x[1]), 2): 

110 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

111 nodes = x[3] 

112 for node in nodes: 

113 dict_node = {} 

114 for i in range(0, len(node), 2): 

115 dict_node[node[i]] = node[i + 1] 

116 shard["nodes"].append(dict_node) 

117 shards.append(shard) 

118 

119 return shards 

120 

121 

122def parse_cluster_myshardid(resp, **options): 

123 """ 

124 Parse CLUSTER MYSHARDID response. 

125 """ 

126 return resp.decode("utf-8") 

127 

128 

129PRIMARY = "primary" 

130REPLICA = "replica" 

131SLOT_ID = "slot-id" 

132 

133REDIS_ALLOWED_KEYS = ( 

134 "charset", 

135 "connection_class", 

136 "connection_pool", 

137 "connection_pool_class", 

138 "client_name", 

139 "credential_provider", 

140 "db", 

141 "decode_responses", 

142 "encoding", 

143 "encoding_errors", 

144 "errors", 

145 "host", 

146 "lib_name", 

147 "lib_version", 

148 "max_connections", 

149 "nodes_flag", 

150 "redis_connect_func", 

151 "password", 

152 "port", 

153 "queue_class", 

154 "retry", 

155 "retry_on_timeout", 

156 "protocol", 

157 "socket_connect_timeout", 

158 "socket_keepalive", 

159 "socket_keepalive_options", 

160 "socket_timeout", 

161 "ssl", 

162 "ssl_ca_certs", 

163 "ssl_ca_data", 

164 "ssl_certfile", 

165 "ssl_cert_reqs", 

166 "ssl_keyfile", 

167 "ssl_password", 

168 "unix_socket_path", 

169 "username", 

170 "cache_enabled", 

171 "client_cache", 

172 "cache_max_size", 

173 "cache_ttl", 

174 "cache_policy", 

175 "cache_blacklist", 

176 "cache_whitelist", 

177) 

178KWARGS_DISABLED_KEYS = ("host", "port") 

179 

180 

181def cleanup_kwargs(**kwargs): 

182 """ 

183 Remove unsupported or disabled keys from kwargs 

184 """ 

185 connection_kwargs = { 

186 k: v 

187 for k, v in kwargs.items() 

188 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

189 } 

190 

191 return connection_kwargs 

192 

193 

194class ClusterParser(DefaultParser): 

195 EXCEPTION_CLASSES = dict_merge( 

196 DefaultParser.EXCEPTION_CLASSES, 

197 { 

198 "ASK": AskError, 

199 "TRYAGAIN": TryAgainError, 

200 "MOVED": MovedError, 

201 "CLUSTERDOWN": ClusterDownError, 

202 "CROSSSLOT": ClusterCrossSlotError, 

203 "MASTERDOWN": MasterDownError, 

204 }, 

205 ) 

206 

207 

208class AbstractRedisCluster: 

209 RedisClusterRequestTTL = 16 

210 

211 PRIMARIES = "primaries" 

212 REPLICAS = "replicas" 

213 ALL_NODES = "all" 

214 RANDOM = "random" 

215 DEFAULT_NODE = "default-node" 

216 

217 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

218 

219 COMMAND_FLAGS = dict_merge( 

220 list_keys_to_dict( 

221 [ 

222 "ACL CAT", 

223 "ACL DELUSER", 

224 "ACL DRYRUN", 

225 "ACL GENPASS", 

226 "ACL GETUSER", 

227 "ACL HELP", 

228 "ACL LIST", 

229 "ACL LOG", 

230 "ACL LOAD", 

231 "ACL SAVE", 

232 "ACL SETUSER", 

233 "ACL USERS", 

234 "ACL WHOAMI", 

235 "AUTH", 

236 "CLIENT LIST", 

237 "CLIENT SETINFO", 

238 "CLIENT SETNAME", 

239 "CLIENT GETNAME", 

240 "CONFIG SET", 

241 "CONFIG REWRITE", 

242 "CONFIG RESETSTAT", 

243 "TIME", 

244 "PUBSUB CHANNELS", 

245 "PUBSUB NUMPAT", 

246 "PUBSUB NUMSUB", 

247 "PUBSUB SHARDCHANNELS", 

248 "PUBSUB SHARDNUMSUB", 

249 "PING", 

250 "INFO", 

251 "SHUTDOWN", 

252 "KEYS", 

253 "DBSIZE", 

254 "BGSAVE", 

255 "SLOWLOG GET", 

256 "SLOWLOG LEN", 

257 "SLOWLOG RESET", 

258 "WAIT", 

259 "WAITAOF", 

260 "SAVE", 

261 "MEMORY PURGE", 

262 "MEMORY MALLOC-STATS", 

263 "MEMORY STATS", 

264 "LASTSAVE", 

265 "CLIENT TRACKINGINFO", 

266 "CLIENT PAUSE", 

267 "CLIENT UNPAUSE", 

268 "CLIENT UNBLOCK", 

269 "CLIENT ID", 

270 "CLIENT REPLY", 

271 "CLIENT GETREDIR", 

272 "CLIENT INFO", 

273 "CLIENT KILL", 

274 "READONLY", 

275 "CLUSTER INFO", 

276 "CLUSTER MEET", 

277 "CLUSTER MYSHARDID", 

278 "CLUSTER NODES", 

279 "CLUSTER REPLICAS", 

280 "CLUSTER RESET", 

281 "CLUSTER SET-CONFIG-EPOCH", 

282 "CLUSTER SLOTS", 

283 "CLUSTER SHARDS", 

284 "CLUSTER COUNT-FAILURE-REPORTS", 

285 "CLUSTER KEYSLOT", 

286 "COMMAND", 

287 "COMMAND COUNT", 

288 "COMMAND LIST", 

289 "COMMAND GETKEYS", 

290 "CONFIG GET", 

291 "DEBUG", 

292 "RANDOMKEY", 

293 "READONLY", 

294 "READWRITE", 

295 "TIME", 

296 "TFUNCTION LOAD", 

297 "TFUNCTION DELETE", 

298 "TFUNCTION LIST", 

299 "TFCALL", 

300 "TFCALLASYNC", 

301 "GRAPH.CONFIG", 

302 "LATENCY HISTORY", 

303 "LATENCY LATEST", 

304 "LATENCY RESET", 

305 "MODULE LIST", 

306 "MODULE LOAD", 

307 "MODULE UNLOAD", 

308 "MODULE LOADEX", 

309 ], 

310 DEFAULT_NODE, 

311 ), 

312 list_keys_to_dict( 

313 [ 

314 "FLUSHALL", 

315 "FLUSHDB", 

316 "FUNCTION DELETE", 

317 "FUNCTION FLUSH", 

318 "FUNCTION LIST", 

319 "FUNCTION LOAD", 

320 "FUNCTION RESTORE", 

321 "REDISGEARS_2.REFRESHCLUSTER", 

322 "SCAN", 

323 "SCRIPT EXISTS", 

324 "SCRIPT FLUSH", 

325 "SCRIPT LOAD", 

326 ], 

327 PRIMARIES, 

328 ), 

329 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

330 list_keys_to_dict( 

331 [ 

332 "CLUSTER COUNTKEYSINSLOT", 

333 "CLUSTER DELSLOTS", 

334 "CLUSTER DELSLOTSRANGE", 

335 "CLUSTER GETKEYSINSLOT", 

336 "CLUSTER SETSLOT", 

337 ], 

338 SLOT_ID, 

339 ), 

340 ) 

341 

342 SEARCH_COMMANDS = ( 

343 [ 

344 "FT.CREATE", 

345 "FT.SEARCH", 

346 "FT.AGGREGATE", 

347 "FT.EXPLAIN", 

348 "FT.EXPLAINCLI", 

349 "FT,PROFILE", 

350 "FT.ALTER", 

351 "FT.DROPINDEX", 

352 "FT.ALIASADD", 

353 "FT.ALIASUPDATE", 

354 "FT.ALIASDEL", 

355 "FT.TAGVALS", 

356 "FT.SUGADD", 

357 "FT.SUGGET", 

358 "FT.SUGDEL", 

359 "FT.SUGLEN", 

360 "FT.SYNUPDATE", 

361 "FT.SYNDUMP", 

362 "FT.SPELLCHECK", 

363 "FT.DICTADD", 

364 "FT.DICTDEL", 

365 "FT.DICTDUMP", 

366 "FT.INFO", 

367 "FT._LIST", 

368 "FT.CONFIG", 

369 "FT.ADD", 

370 "FT.DEL", 

371 "FT.DROP", 

372 "FT.GET", 

373 "FT.MGET", 

374 "FT.SYNADD", 

375 ], 

376 ) 

377 

378 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

379 "CLUSTER SLOTS": parse_cluster_slots, 

380 "CLUSTER SHARDS": parse_cluster_shards, 

381 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

382 } 

383 

384 RESULT_CALLBACKS = dict_merge( 

385 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub), 

386 list_keys_to_dict( 

387 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

388 ), 

389 list_keys_to_dict( 

390 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result 

391 ), 

392 list_keys_to_dict( 

393 [ 

394 "PING", 

395 "CONFIG SET", 

396 "CONFIG REWRITE", 

397 "CONFIG RESETSTAT", 

398 "CLIENT SETNAME", 

399 "BGSAVE", 

400 "SLOWLOG RESET", 

401 "SAVE", 

402 "MEMORY PURGE", 

403 "CLIENT PAUSE", 

404 "CLIENT UNPAUSE", 

405 ], 

406 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

407 ), 

408 list_keys_to_dict( 

409 ["DBSIZE", "WAIT"], 

410 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

411 ), 

412 list_keys_to_dict( 

413 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

414 ), 

415 list_keys_to_dict(["SCAN"], parse_scan_result), 

416 list_keys_to_dict( 

417 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

418 ), 

419 list_keys_to_dict( 

420 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

421 ), 

422 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

423 ) 

424 

425 ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError) 

426 

427 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

428 """Replace the default cluster node. 

429 A random cluster node will be chosen if target_node isn't passed, and primaries 

430 will be prioritized. The default node will not be changed if there are no other 

431 nodes in the cluster. 

432 

433 Args: 

434 target_node (ClusterNode, optional): Target node to replace the default 

435 node. Defaults to None. 

436 """ 

437 if target_node: 

438 self.nodes_manager.default_node = target_node 

439 else: 

440 curr_node = self.get_default_node() 

441 primaries = [node for node in self.get_primaries() if node != curr_node] 

442 if primaries: 

443 # Choose a primary if the cluster contains different primaries 

444 self.nodes_manager.default_node = random.choice(primaries) 

445 else: 

446 # Otherwise, hoose a primary if the cluster contains different primaries 

447 replicas = [node for node in self.get_replicas() if node != curr_node] 

448 if replicas: 

449 self.nodes_manager.default_node = random.choice(replicas) 

450 

451 

452class RedisCluster(AbstractRedisCluster, RedisClusterCommands): 

453 @classmethod 

454 def from_url(cls, url, **kwargs): 

455 """ 

456 Return a Redis client object configured from the given URL 

457 

458 For example:: 

459 

460 redis://[[username]:[password]]@localhost:6379/0 

461 rediss://[[username]:[password]]@localhost:6379/0 

462 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

463 

464 Three URL schemes are supported: 

465 

466 - `redis://` creates a TCP socket connection. See more at: 

467 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

468 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

469 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

470 - ``unix://``: creates a Unix Domain Socket connection. 

471 

472 The username, password, hostname, path and all querystring values 

473 are passed through urllib.parse.unquote in order to replace any 

474 percent-encoded values with their corresponding characters. 

475 

476 There are several ways to specify a database number. The first value 

477 found will be used: 

478 

479 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

480 2. If using the redis:// or rediss:// schemes, the path argument 

481 of the url, e.g. redis://localhost/0 

482 3. A ``db`` keyword argument to this function. 

483 

484 If none of these options are specified, the default db=0 is used. 

485 

486 All querystring options are cast to their appropriate Python types. 

487 Boolean arguments can be specified with string values "True"/"False" 

488 or "Yes"/"No". Values that cannot be properly cast cause a 

489 ``ValueError`` to be raised. Once parsed, the querystring arguments 

490 and keyword arguments are passed to the ``ConnectionPool``'s 

491 class initializer. In the case of conflicting arguments, querystring 

492 arguments always win. 

493 

494 """ 

495 return cls(url=url, **kwargs) 

496 

497 def __init__( 

498 self, 

499 host: Optional[str] = None, 

500 port: int = 6379, 

501 startup_nodes: Optional[List["ClusterNode"]] = None, 

502 cluster_error_retry_attempts: int = 3, 

503 retry: Optional["Retry"] = None, 

504 require_full_coverage: bool = False, 

505 reinitialize_steps: int = 5, 

506 read_from_replicas: bool = False, 

507 dynamic_startup_nodes: bool = True, 

508 url: Optional[str] = None, 

509 address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None, 

510 **kwargs, 

511 ): 

512 """ 

513 Initialize a new RedisCluster client. 

514 

515 :param startup_nodes: 

516 List of nodes from which initial bootstrapping can be done 

517 :param host: 

518 Can be used to point to a startup node 

519 :param port: 

520 Can be used to point to a startup node 

521 :param require_full_coverage: 

522 When set to False (default value): the client will not require a 

523 full coverage of the slots. However, if not all slots are covered, 

524 and at least one node has 'cluster-require-full-coverage' set to 

525 'yes,' the server will throw a ClusterDownError for some key-based 

526 commands. See - 

527 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

528 When set to True: all slots must be covered to construct the 

529 cluster client. If not all slots are covered, RedisClusterException 

530 will be thrown. 

531 :param read_from_replicas: 

532 Enable read from replicas in READONLY mode. You can read possibly 

533 stale data. 

534 When set to true, read commands will be assigned between the 

535 primary and its replications in a Round-Robin manner. 

536 :param dynamic_startup_nodes: 

537 Set the RedisCluster's startup nodes to all of the discovered nodes. 

538 If true (default value), the cluster's discovered nodes will be used to 

539 determine the cluster nodes-slots mapping in the next topology refresh. 

540 It will remove the initial passed startup nodes if their endpoints aren't 

541 listed in the CLUSTER SLOTS output. 

542 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

543 specific IP addresses, it is best to set it to false. 

544 :param cluster_error_retry_attempts: 

545 Number of times to retry before raising an error when 

546 :class:`~.TimeoutError` or :class:`~.ConnectionError` or 

547 :class:`~.ClusterDownError` are encountered 

548 :param reinitialize_steps: 

549 Specifies the number of MOVED errors that need to occur before 

550 reinitializing the whole cluster topology. If a MOVED error occurs 

551 and the cluster does not need to be reinitialized on this current 

552 error handling, only the MOVED slot will be patched with the 

553 redirected node. 

554 To reinitialize the cluster on every MOVED error, set 

555 reinitialize_steps to 1. 

556 To avoid reinitializing the cluster on moved errors, set 

557 reinitialize_steps to 0. 

558 :param address_remap: 

559 An optional callable which, when provided with an internal network 

560 address of a node, e.g. a `(host, port)` tuple, will return the address 

561 where the node is reachable. This can be used to map the addresses at 

562 which the nodes _think_ they are, to addresses at which a client may 

563 reach them, such as when they sit behind a proxy. 

564 

565 :**kwargs: 

566 Extra arguments that will be sent into Redis instance when created 

567 (See Official redis-py doc for supported kwargs 

568 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

569 Some kwargs are not supported and will raise a 

570 RedisClusterException: 

571 - db (Redis do not support database SELECT in cluster mode) 

572 """ 

573 if startup_nodes is None: 

574 startup_nodes = [] 

575 

576 if "db" in kwargs: 

577 # Argument 'db' is not possible to use in cluster mode 

578 raise RedisClusterException( 

579 "Argument 'db' is not possible to use in cluster mode" 

580 ) 

581 

582 # Get the startup node/s 

583 from_url = False 

584 if url is not None: 

585 from_url = True 

586 url_options = parse_url(url) 

587 if "path" in url_options: 

588 raise RedisClusterException( 

589 "RedisCluster does not currently support Unix Domain " 

590 "Socket connections" 

591 ) 

592 if "db" in url_options and url_options["db"] != 0: 

593 # Argument 'db' is not possible to use in cluster mode 

594 raise RedisClusterException( 

595 "A ``db`` querystring option can only be 0 in cluster mode" 

596 ) 

597 kwargs.update(url_options) 

598 host = kwargs.get("host") 

599 port = kwargs.get("port", port) 

600 startup_nodes.append(ClusterNode(host, port)) 

601 elif host is not None and port is not None: 

602 startup_nodes.append(ClusterNode(host, port)) 

603 elif len(startup_nodes) == 0: 

604 # No startup node was provided 

605 raise RedisClusterException( 

606 "RedisCluster requires at least one node to discover the " 

607 "cluster. Please provide one of the followings:\n" 

608 "1. host and port, for example:\n" 

609 " RedisCluster(host='localhost', port=6379)\n" 

610 "2. list of startup nodes, for example:\n" 

611 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

612 " ClusterNode('localhost', 6378)])" 

613 ) 

614 # Update the connection arguments 

615 # Whenever a new connection is established, RedisCluster's on_connect 

616 # method should be run 

617 # If the user passed on_connect function we'll save it and run it 

618 # inside the RedisCluster.on_connect() function 

619 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

620 kwargs.update({"redis_connect_func": self.on_connect}) 

621 kwargs = cleanup_kwargs(**kwargs) 

622 if retry: 

623 self.retry = retry 

624 kwargs.update({"retry": self.retry}) 

625 else: 

626 kwargs.update({"retry": Retry(default_backoff(), 0)}) 

627 

628 self.encoder = Encoder( 

629 kwargs.get("encoding", "utf-8"), 

630 kwargs.get("encoding_errors", "strict"), 

631 kwargs.get("decode_responses", False), 

632 ) 

633 self.cluster_error_retry_attempts = cluster_error_retry_attempts 

634 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

635 self.node_flags = self.__class__.NODE_FLAGS.copy() 

636 self.read_from_replicas = read_from_replicas 

637 self.reinitialize_counter = 0 

638 self.reinitialize_steps = reinitialize_steps 

639 self.nodes_manager = NodesManager( 

640 startup_nodes=startup_nodes, 

641 from_url=from_url, 

642 require_full_coverage=require_full_coverage, 

643 dynamic_startup_nodes=dynamic_startup_nodes, 

644 address_remap=address_remap, 

645 **kwargs, 

646 ) 

647 

648 self.cluster_response_callbacks = CaseInsensitiveDict( 

649 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

650 ) 

651 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

652 self.commands_parser = CommandsParser(self) 

653 self._lock = threading.Lock() 

654 

655 def __enter__(self): 

656 return self 

657 

658 def __exit__(self, exc_type, exc_value, traceback): 

659 self.close() 

660 

661 def __del__(self): 

662 self.close() 

663 

664 def disconnect_connection_pools(self): 

665 for node in self.get_nodes(): 

666 if node.redis_connection: 

667 try: 

668 node.redis_connection.connection_pool.disconnect() 

669 except OSError: 

670 # Client was already disconnected. do nothing 

671 pass 

672 

673 def on_connect(self, connection): 

674 """ 

675 Initialize the connection, authenticate and select a database and send 

676 READONLY if it is set during object initialization. 

677 """ 

678 connection.set_parser(ClusterParser) 

679 connection.on_connect() 

680 

681 if self.read_from_replicas: 

682 # Sending READONLY command to server to configure connection as 

683 # readonly. Since each cluster node may change its server type due 

684 # to a failover, we should establish a READONLY connection 

685 # regardless of the server type. If this is a primary connection, 

686 # READONLY would not affect executing write commands. 

687 connection.send_command("READONLY") 

688 if str_if_bytes(connection.read_response()) != "OK": 

689 raise ConnectionError("READONLY command failed") 

690 

691 if self.user_on_connect_func is not None: 

692 self.user_on_connect_func(connection) 

693 

694 def get_redis_connection(self, node): 

695 if not node.redis_connection: 

696 with self._lock: 

697 if not node.redis_connection: 

698 self.nodes_manager.create_redis_connections([node]) 

699 return node.redis_connection 

700 

701 def get_node(self, host=None, port=None, node_name=None): 

702 return self.nodes_manager.get_node(host, port, node_name) 

703 

704 def get_primaries(self): 

705 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

706 

707 def get_replicas(self): 

708 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

709 

710 def get_random_node(self): 

711 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

712 

713 def get_nodes(self): 

714 return list(self.nodes_manager.nodes_cache.values()) 

715 

716 def get_node_from_key(self, key, replica=False): 

717 """ 

718 Get the node that holds the key's slot. 

719 If replica set to True but the slot doesn't have any replicas, None is 

720 returned. 

721 """ 

722 slot = self.keyslot(key) 

723 slot_cache = self.nodes_manager.slots_cache.get(slot) 

724 if slot_cache is None or len(slot_cache) == 0: 

725 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

726 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

727 return None 

728 elif replica: 

729 node_idx = 1 

730 else: 

731 # primary 

732 node_idx = 0 

733 

734 return slot_cache[node_idx] 

735 

736 def get_default_node(self): 

737 """ 

738 Get the cluster's default node 

739 """ 

740 return self.nodes_manager.default_node 

741 

742 def set_default_node(self, node): 

743 """ 

744 Set the default node of the cluster. 

745 :param node: 'ClusterNode' 

746 :return True if the default node was set, else False 

747 """ 

748 if node is None or self.get_node(node_name=node.name) is None: 

749 return False 

750 self.nodes_manager.default_node = node 

751 return True 

752 

753 def get_retry(self) -> Optional["Retry"]: 

754 return self.retry 

755 

756 def set_retry(self, retry: "Retry") -> None: 

757 self.retry = retry 

758 for node in self.get_nodes(): 

759 node.redis_connection.set_retry(retry) 

760 

761 def monitor(self, target_node=None): 

762 """ 

763 Returns a Monitor object for the specified target node. 

764 The default cluster node will be selected if no target node was 

765 specified. 

766 Monitor is useful for handling the MONITOR command to the redis server. 

767 next_command() method returns one command from monitor 

768 listen() method yields commands from monitor. 

769 """ 

770 if target_node is None: 

771 target_node = self.get_default_node() 

772 if target_node.redis_connection is None: 

773 raise RedisClusterException( 

774 f"Cluster Node {target_node.name} has no redis_connection" 

775 ) 

776 return target_node.redis_connection.monitor() 

777 

778 def pubsub(self, node=None, host=None, port=None, **kwargs): 

779 """ 

780 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

781 connected to the specified node 

782 """ 

783 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

784 

785 def pipeline(self, transaction=None, shard_hint=None): 

786 """ 

787 Cluster impl: 

788 Pipelines do not work in cluster mode the same way they 

789 do in normal mode. Create a clone of this object so 

790 that simulating pipelines will work correctly. Each 

791 command will be called directly when used and 

792 when calling execute() will only return the result stack. 

793 """ 

794 if shard_hint: 

795 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

796 

797 if transaction: 

798 raise RedisClusterException("transaction is deprecated in cluster mode") 

799 

800 return ClusterPipeline( 

801 nodes_manager=self.nodes_manager, 

802 commands_parser=self.commands_parser, 

803 startup_nodes=self.nodes_manager.startup_nodes, 

804 result_callbacks=self.result_callbacks, 

805 cluster_response_callbacks=self.cluster_response_callbacks, 

806 cluster_error_retry_attempts=self.cluster_error_retry_attempts, 

807 read_from_replicas=self.read_from_replicas, 

808 reinitialize_steps=self.reinitialize_steps, 

809 lock=self._lock, 

810 ) 

811 

812 def lock( 

813 self, 

814 name, 

815 timeout=None, 

816 sleep=0.1, 

817 blocking=True, 

818 blocking_timeout=None, 

819 lock_class=None, 

820 thread_local=True, 

821 ): 

822 """ 

823 Return a new Lock object using key ``name`` that mimics 

824 the behavior of threading.Lock. 

825 

826 If specified, ``timeout`` indicates a maximum life for the lock. 

827 By default, it will remain locked until release() is called. 

828 

829 ``sleep`` indicates the amount of time to sleep per loop iteration 

830 when the lock is in blocking mode and another client is currently 

831 holding the lock. 

832 

833 ``blocking`` indicates whether calling ``acquire`` should block until 

834 the lock has been acquired or to fail immediately, causing ``acquire`` 

835 to return False and the lock not being acquired. Defaults to True. 

836 Note this value can be overridden by passing a ``blocking`` 

837 argument to ``acquire``. 

838 

839 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

840 spend trying to acquire the lock. A value of ``None`` indicates 

841 continue trying forever. ``blocking_timeout`` can be specified as a 

842 float or integer, both representing the number of seconds to wait. 

843 

844 ``lock_class`` forces the specified lock implementation. Note that as 

845 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

846 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

847 you have created your own custom lock class. 

848 

849 ``thread_local`` indicates whether the lock token is placed in 

850 thread-local storage. By default, the token is placed in thread local 

851 storage so that a thread only sees its token, not a token set by 

852 another thread. Consider the following timeline: 

853 

854 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

855 thread-1 sets the token to "abc" 

856 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

857 Lock instance. 

858 time: 5, thread-1 has not yet completed. redis expires the lock 

859 key. 

860 time: 5, thread-2 acquired `my-lock` now that it's available. 

861 thread-2 sets the token to "xyz" 

862 time: 6, thread-1 finishes its work and calls release(). if the 

863 token is *not* stored in thread local storage, then 

864 thread-1 would see the token value as "xyz" and would be 

865 able to successfully release the thread-2's lock. 

866 

867 In some use cases it's necessary to disable thread local storage. For 

868 example, if you have code where one thread acquires a lock and passes 

869 that lock instance to a worker thread to release later. If thread 

870 local storage isn't disabled in this case, the worker thread won't see 

871 the token set by the thread that acquired the lock. Our assumption 

872 is that these cases aren't common and as such default to using 

873 thread local storage.""" 

874 if lock_class is None: 

875 lock_class = Lock 

876 return lock_class( 

877 self, 

878 name, 

879 timeout=timeout, 

880 sleep=sleep, 

881 blocking=blocking, 

882 blocking_timeout=blocking_timeout, 

883 thread_local=thread_local, 

884 ) 

885 

886 def set_response_callback(self, command, callback): 

887 """Set a custom Response Callback""" 

888 self.cluster_response_callbacks[command] = callback 

889 

890 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: 

891 # Determine which nodes should be executed the command on. 

892 # Returns a list of target nodes. 

893 command = args[0].upper() 

894 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

895 command = f"{args[0]} {args[1]}".upper() 

896 

897 nodes_flag = kwargs.pop("nodes_flag", None) 

898 if nodes_flag is not None: 

899 # nodes flag passed by the user 

900 command_flag = nodes_flag 

901 else: 

902 # get the nodes group for this command if it was predefined 

903 command_flag = self.command_flags.get(command) 

904 if command_flag == self.__class__.RANDOM: 

905 # return a random node 

906 return [self.get_random_node()] 

907 elif command_flag == self.__class__.PRIMARIES: 

908 # return all primaries 

909 return self.get_primaries() 

910 elif command_flag == self.__class__.REPLICAS: 

911 # return all replicas 

912 return self.get_replicas() 

913 elif command_flag == self.__class__.ALL_NODES: 

914 # return all nodes 

915 return self.get_nodes() 

916 elif command_flag == self.__class__.DEFAULT_NODE: 

917 # return the cluster's default node 

918 return [self.nodes_manager.default_node] 

919 elif command in self.__class__.SEARCH_COMMANDS[0]: 

920 return [self.nodes_manager.default_node] 

921 else: 

922 # get the node that holds the key's slot 

923 slot = self.determine_slot(*args) 

924 node = self.nodes_manager.get_node_from_slot( 

925 slot, self.read_from_replicas and command in READ_COMMANDS 

926 ) 

927 return [node] 

928 

929 def _should_reinitialized(self): 

930 # To reinitialize the cluster on every MOVED error, 

931 # set reinitialize_steps to 1. 

932 # To avoid reinitializing the cluster on moved errors, set 

933 # reinitialize_steps to 0. 

934 if self.reinitialize_steps == 0: 

935 return False 

936 else: 

937 return self.reinitialize_counter % self.reinitialize_steps == 0 

938 

939 def keyslot(self, key): 

940 """ 

941 Calculate keyslot for a given key. 

942 See Keys distribution model in https://redis.io/topics/cluster-spec 

943 """ 

944 k = self.encoder.encode(key) 

945 return key_slot(k) 

946 

947 def _get_command_keys(self, *args): 

948 """ 

949 Get the keys in the command. If the command has no keys in in, None is 

950 returned. 

951 

952 NOTE: Due to a bug in redis<7.0, this function does not work properly 

953 for EVAL or EVALSHA when the `numkeys` arg is 0. 

954 - issue: https://github.com/redis/redis/issues/9493 

955 - fix: https://github.com/redis/redis/pull/9733 

956 

957 So, don't use this function with EVAL or EVALSHA. 

958 """ 

959 redis_conn = self.get_default_node().redis_connection 

960 return self.commands_parser.get_keys(redis_conn, *args) 

961 

962 def determine_slot(self, *args): 

963 """ 

964 Figure out what slot to use based on args. 

965 

966 Raises a RedisClusterException if there's a missing key and we can't 

967 determine what slots to map the command to; or, if the keys don't 

968 all map to the same key slot. 

969 """ 

970 command = args[0] 

971 if self.command_flags.get(command) == SLOT_ID: 

972 # The command contains the slot ID 

973 return args[1] 

974 

975 # Get the keys in the command 

976 

977 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

978 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

979 # where `self._get_command_keys()` fails anyway. So, we special case 

980 # EVAL/EVALSHA. 

981 if command.upper() in ("EVAL", "EVALSHA"): 

982 # command syntax: EVAL "script body" num_keys ... 

983 if len(args) <= 2: 

984 raise RedisClusterException(f"Invalid args in command: {args}") 

985 num_actual_keys = int(args[2]) 

986 eval_keys = args[3 : 3 + num_actual_keys] 

987 # if there are 0 keys, that means the script can be run on any node 

988 # so we can just return a random slot 

989 if len(eval_keys) == 0: 

990 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

991 keys = eval_keys 

992 else: 

993 keys = self._get_command_keys(*args) 

994 if keys is None or len(keys) == 0: 

995 # FCALL can call a function with 0 keys, that means the function 

996 # can be run on any node so we can just return a random slot 

997 if command.upper() in ("FCALL", "FCALL_RO"): 

998 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

999 raise RedisClusterException( 

1000 "No way to dispatch this command to Redis Cluster. " 

1001 "Missing key.\nYou can execute the command by specifying " 

1002 f"target nodes.\nCommand: {args}" 

1003 ) 

1004 

1005 # single key command 

1006 if len(keys) == 1: 

1007 return self.keyslot(keys[0]) 

1008 

1009 # multi-key command; we need to make sure all keys are mapped to 

1010 # the same slot 

1011 slots = {self.keyslot(key) for key in keys} 

1012 if len(slots) != 1: 

1013 raise RedisClusterException( 

1014 f"{command} - all keys must map to the same key slot" 

1015 ) 

1016 

1017 return slots.pop() 

1018 

1019 def get_encoder(self): 

1020 """ 

1021 Get the connections' encoder 

1022 """ 

1023 return self.encoder 

1024 

1025 def get_connection_kwargs(self): 

1026 """ 

1027 Get the connections' key-word arguments 

1028 """ 

1029 return self.nodes_manager.connection_kwargs 

1030 

1031 def _is_nodes_flag(self, target_nodes): 

1032 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1033 

1034 def _parse_target_nodes(self, target_nodes): 

1035 if isinstance(target_nodes, list): 

1036 nodes = target_nodes 

1037 elif isinstance(target_nodes, ClusterNode): 

1038 # Supports passing a single ClusterNode as a variable 

1039 nodes = [target_nodes] 

1040 elif isinstance(target_nodes, dict): 

1041 # Supports dictionaries of the format {node_name: node}. 

1042 # It enables to execute commands with multi nodes as follows: 

1043 # rc.cluster_save_config(rc.get_primaries()) 

1044 nodes = target_nodes.values() 

1045 else: 

1046 raise TypeError( 

1047 "target_nodes type can be one of the following: " 

1048 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1049 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1050 f"The passed type is {type(target_nodes)}" 

1051 ) 

1052 return nodes 

1053 

1054 def execute_command(self, *args, **kwargs): 

1055 """ 

1056 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1057 

1058 It will try the number of times specified by the config option 

1059 "self.cluster_error_retry_attempts" which defaults to 3 unless manually 

1060 configured. 

1061 

1062 If it reaches the number of times, the command will raise the exception 

1063 

1064 Key argument :target_nodes: can be passed with the following types: 

1065 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1066 ClusterNode 

1067 list<ClusterNode> 

1068 dict<Any, ClusterNode> 

1069 """ 

1070 target_nodes_specified = False 

1071 is_default_node = False 

1072 target_nodes = None 

1073 passed_targets = kwargs.pop("target_nodes", None) 

1074 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1075 target_nodes = self._parse_target_nodes(passed_targets) 

1076 target_nodes_specified = True 

1077 # If an error that allows retrying was thrown, the nodes and slots 

1078 # cache were reinitialized. We will retry executing the command with 

1079 # the updated cluster setup only when the target nodes can be 

1080 # determined again with the new cache tables. Therefore, when target 

1081 # nodes were passed to this function, we cannot retry the command 

1082 # execution since the nodes may not be valid anymore after the tables 

1083 # were reinitialized. So in case of passed target nodes, 

1084 # retry_attempts will be set to 0. 

1085 retry_attempts = ( 

1086 0 if target_nodes_specified else self.cluster_error_retry_attempts 

1087 ) 

1088 # Add one for the first execution 

1089 execute_attempts = 1 + retry_attempts 

1090 for _ in range(execute_attempts): 

1091 try: 

1092 res = {} 

1093 if not target_nodes_specified: 

1094 # Determine the nodes to execute the command on 

1095 target_nodes = self._determine_nodes( 

1096 *args, **kwargs, nodes_flag=passed_targets 

1097 ) 

1098 if not target_nodes: 

1099 raise RedisClusterException( 

1100 f"No targets were found to execute {args} command on" 

1101 ) 

1102 if ( 

1103 len(target_nodes) == 1 

1104 and target_nodes[0] == self.get_default_node() 

1105 ): 

1106 is_default_node = True 

1107 for node in target_nodes: 

1108 res[node.name] = self._execute_command(node, *args, **kwargs) 

1109 # Return the processed result 

1110 return self._process_result(args[0], res, **kwargs) 

1111 except Exception as e: 

1112 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1113 if is_default_node: 

1114 # Replace the default cluster node 

1115 self.replace_default_node() 

1116 # The nodes and slots cache were reinitialized. 

1117 # Try again with the new cluster setup. 

1118 retry_attempts -= 1 

1119 continue 

1120 else: 

1121 # raise the exception 

1122 raise e 

1123 

1124 def _execute_command(self, target_node, *args, **kwargs): 

1125 """ 

1126 Send a command to a node in the cluster 

1127 """ 

1128 keys = kwargs.pop("keys", None) 

1129 command = args[0] 

1130 redis_node = None 

1131 connection = None 

1132 redirect_addr = None 

1133 asking = False 

1134 moved = False 

1135 ttl = int(self.RedisClusterRequestTTL) 

1136 

1137 while ttl > 0: 

1138 ttl -= 1 

1139 try: 

1140 if asking: 

1141 target_node = self.get_node(node_name=redirect_addr) 

1142 elif moved: 

1143 # MOVED occurred and the slots cache was updated, 

1144 # refresh the target node 

1145 slot = self.determine_slot(*args) 

1146 target_node = self.nodes_manager.get_node_from_slot( 

1147 slot, self.read_from_replicas and command in READ_COMMANDS 

1148 ) 

1149 moved = False 

1150 

1151 redis_node = self.get_redis_connection(target_node) 

1152 connection = get_connection(redis_node, *args, **kwargs) 

1153 if asking: 

1154 connection.send_command("ASKING") 

1155 redis_node.parse_response(connection, "ASKING", **kwargs) 

1156 asking = False 

1157 response_from_cache = connection._get_from_local_cache(args) 

1158 if response_from_cache is not None: 

1159 return response_from_cache 

1160 else: 

1161 connection.send_command(*args) 

1162 response = redis_node.parse_response(connection, command, **kwargs) 

1163 if command in self.cluster_response_callbacks: 

1164 response = self.cluster_response_callbacks[command]( 

1165 response, **kwargs 

1166 ) 

1167 connection._add_to_local_cache(args, response, keys) 

1168 return response 

1169 except AuthenticationError: 

1170 raise 

1171 except (ConnectionError, TimeoutError) as e: 

1172 # Connection retries are being handled in the node's 

1173 # Retry object. 

1174 # ConnectionError can also be raised if we couldn't get a 

1175 # connection from the pool before timing out, so check that 

1176 # this is an actual connection before attempting to disconnect. 

1177 if connection is not None: 

1178 connection.disconnect() 

1179 

1180 # Remove the failed node from the startup nodes before we try 

1181 # to reinitialize the cluster 

1182 self.nodes_manager.startup_nodes.pop(target_node.name, None) 

1183 # Reset the cluster node's connection 

1184 target_node.redis_connection = None 

1185 self.nodes_manager.initialize() 

1186 raise e 

1187 except MovedError as e: 

1188 # First, we will try to patch the slots/nodes cache with the 

1189 # redirected node output and try again. If MovedError exceeds 

1190 # 'reinitialize_steps' number of times, we will force 

1191 # reinitializing the tables, and then try again. 

1192 # 'reinitialize_steps' counter will increase faster when 

1193 # the same client object is shared between multiple threads. To 

1194 # reduce the frequency you can set this variable in the 

1195 # RedisCluster constructor. 

1196 self.reinitialize_counter += 1 

1197 if self._should_reinitialized(): 

1198 self.nodes_manager.initialize() 

1199 # Reset the counter 

1200 self.reinitialize_counter = 0 

1201 else: 

1202 self.nodes_manager.update_moved_exception(e) 

1203 moved = True 

1204 except TryAgainError: 

1205 if ttl < self.RedisClusterRequestTTL / 2: 

1206 time.sleep(0.05) 

1207 except AskError as e: 

1208 redirect_addr = get_node_name(host=e.host, port=e.port) 

1209 asking = True 

1210 except ClusterDownError as e: 

1211 # ClusterDownError can occur during a failover and to get 

1212 # self-healed, we will try to reinitialize the cluster layout 

1213 # and retry executing the command 

1214 time.sleep(0.25) 

1215 self.nodes_manager.initialize() 

1216 raise e 

1217 except ResponseError: 

1218 raise 

1219 except Exception as e: 

1220 if connection: 

1221 connection.disconnect() 

1222 raise e 

1223 finally: 

1224 if connection is not None: 

1225 redis_node.connection_pool.release(connection) 

1226 

1227 raise ClusterError("TTL exhausted.") 

1228 

1229 def close(self): 

1230 try: 

1231 with self._lock: 

1232 if self.nodes_manager: 

1233 self.nodes_manager.close() 

1234 except AttributeError: 

1235 # RedisCluster's __init__ can fail before nodes_manager is set 

1236 pass 

1237 

1238 def _process_result(self, command, res, **kwargs): 

1239 """ 

1240 Process the result of the executed command. 

1241 The function would return a dict or a single value. 

1242 

1243 :type command: str 

1244 :type res: dict 

1245 

1246 `res` should be in the following format: 

1247 Dict<node_name, command_result> 

1248 """ 

1249 if command in self.result_callbacks: 

1250 return self.result_callbacks[command](command, res, **kwargs) 

1251 elif len(res) == 1: 

1252 # When we execute the command on a single node, we can 

1253 # remove the dictionary and return a single response 

1254 return list(res.values())[0] 

1255 else: 

1256 return res 

1257 

1258 def load_external_module(self, funcname, func): 

1259 """ 

1260 This function can be used to add externally defined redis modules, 

1261 and their namespaces to the redis client. 

1262 

1263 ``funcname`` - A string containing the name of the function to create 

1264 ``func`` - The function, being added to this class. 

1265 """ 

1266 setattr(self, funcname, func) 

1267 

1268 

1269class ClusterNode: 

1270 def __init__(self, host, port, server_type=None, redis_connection=None): 

1271 if host == "localhost": 

1272 host = socket.gethostbyname(host) 

1273 

1274 self.host = host 

1275 self.port = port 

1276 self.name = get_node_name(host, port) 

1277 self.server_type = server_type 

1278 self.redis_connection = redis_connection 

1279 

1280 def __repr__(self): 

1281 return ( 

1282 f"[host={self.host}," 

1283 f"port={self.port}," 

1284 f"name={self.name}," 

1285 f"server_type={self.server_type}," 

1286 f"redis_connection={self.redis_connection}]" 

1287 ) 

1288 

1289 def __eq__(self, obj): 

1290 return isinstance(obj, ClusterNode) and obj.name == self.name 

1291 

1292 def __del__(self): 

1293 if self.redis_connection is not None: 

1294 self.redis_connection.close() 

1295 

1296 

1297class LoadBalancer: 

1298 """ 

1299 Round-Robin Load Balancing 

1300 """ 

1301 

1302 def __init__(self, start_index: int = 0) -> None: 

1303 self.primary_to_idx = {} 

1304 self.start_index = start_index 

1305 

1306 def get_server_index(self, primary: str, list_size: int) -> int: 

1307 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

1308 # Update the index 

1309 self.primary_to_idx[primary] = (server_index + 1) % list_size 

1310 return server_index 

1311 

1312 def reset(self) -> None: 

1313 self.primary_to_idx.clear() 

1314 

1315 

1316class NodesManager: 

1317 def __init__( 

1318 self, 

1319 startup_nodes, 

1320 from_url=False, 

1321 require_full_coverage=False, 

1322 lock=None, 

1323 dynamic_startup_nodes=True, 

1324 connection_pool_class=ConnectionPool, 

1325 address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None, 

1326 **kwargs, 

1327 ): 

1328 self.nodes_cache = {} 

1329 self.slots_cache = {} 

1330 self.startup_nodes = {} 

1331 self.default_node = None 

1332 self.populate_startup_nodes(startup_nodes) 

1333 self.from_url = from_url 

1334 self._require_full_coverage = require_full_coverage 

1335 self._dynamic_startup_nodes = dynamic_startup_nodes 

1336 self.connection_pool_class = connection_pool_class 

1337 self.address_remap = address_remap 

1338 self._moved_exception = None 

1339 self.connection_kwargs = kwargs 

1340 self.read_load_balancer = LoadBalancer() 

1341 if lock is None: 

1342 lock = threading.Lock() 

1343 self._lock = lock 

1344 self.initialize() 

1345 

1346 def get_node(self, host=None, port=None, node_name=None): 

1347 """ 

1348 Get the requested node from the cluster's nodes. 

1349 nodes. 

1350 :return: ClusterNode if the node exists, else None 

1351 """ 

1352 if host and port: 

1353 # the user passed host and port 

1354 if host == "localhost": 

1355 host = socket.gethostbyname(host) 

1356 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1357 elif node_name: 

1358 return self.nodes_cache.get(node_name) 

1359 else: 

1360 return None 

1361 

1362 def update_moved_exception(self, exception): 

1363 self._moved_exception = exception 

1364 

1365 def _update_moved_slots(self): 

1366 """ 

1367 Update the slot's node with the redirected one 

1368 """ 

1369 e = self._moved_exception 

1370 redirected_node = self.get_node(host=e.host, port=e.port) 

1371 if redirected_node is not None: 

1372 # The node already exists 

1373 if redirected_node.server_type is not PRIMARY: 

1374 # Update the node's server type 

1375 redirected_node.server_type = PRIMARY 

1376 else: 

1377 # This is a new node, we will add it to the nodes cache 

1378 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

1379 self.nodes_cache[redirected_node.name] = redirected_node 

1380 if redirected_node in self.slots_cache[e.slot_id]: 

1381 # The MOVED error resulted from a failover, and the new slot owner 

1382 # had previously been a replica. 

1383 old_primary = self.slots_cache[e.slot_id][0] 

1384 # Update the old primary to be a replica and add it to the end of 

1385 # the slot's node list 

1386 old_primary.server_type = REPLICA 

1387 self.slots_cache[e.slot_id].append(old_primary) 

1388 # Remove the old replica, which is now a primary, from the slot's 

1389 # node list 

1390 self.slots_cache[e.slot_id].remove(redirected_node) 

1391 # Override the old primary with the new one 

1392 self.slots_cache[e.slot_id][0] = redirected_node 

1393 if self.default_node == old_primary: 

1394 # Update the default node with the new primary 

1395 self.default_node = redirected_node 

1396 else: 

1397 # The new slot owner is a new server, or a server from a different 

1398 # shard. We need to remove all current nodes from the slot's list 

1399 # (including replications) and add just the new node. 

1400 self.slots_cache[e.slot_id] = [redirected_node] 

1401 # Reset moved_exception 

1402 self._moved_exception = None 

1403 

1404 def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None): 

1405 """ 

1406 Gets a node that servers this hash slot 

1407 """ 

1408 if self._moved_exception: 

1409 with self._lock: 

1410 if self._moved_exception: 

1411 self._update_moved_slots() 

1412 

1413 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

1414 raise SlotNotCoveredError( 

1415 f'Slot "{slot}" not covered by the cluster. ' 

1416 f'"require_full_coverage={self._require_full_coverage}"' 

1417 ) 

1418 

1419 if read_from_replicas is True: 

1420 # get the server index in a Round-Robin manner 

1421 primary_name = self.slots_cache[slot][0].name 

1422 node_idx = self.read_load_balancer.get_server_index( 

1423 primary_name, len(self.slots_cache[slot]) 

1424 ) 

1425 elif ( 

1426 server_type is None 

1427 or server_type == PRIMARY 

1428 or len(self.slots_cache[slot]) == 1 

1429 ): 

1430 # return a primary 

1431 node_idx = 0 

1432 else: 

1433 # return a replica 

1434 # randomly choose one of the replicas 

1435 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

1436 

1437 return self.slots_cache[slot][node_idx] 

1438 

1439 def get_nodes_by_server_type(self, server_type): 

1440 """ 

1441 Get all nodes with the specified server type 

1442 :param server_type: 'primary' or 'replica' 

1443 :return: list of ClusterNode 

1444 """ 

1445 return [ 

1446 node 

1447 for node in self.nodes_cache.values() 

1448 if node.server_type == server_type 

1449 ] 

1450 

1451 def populate_startup_nodes(self, nodes): 

1452 """ 

1453 Populate all startup nodes and filters out any duplicates 

1454 """ 

1455 for n in nodes: 

1456 self.startup_nodes[n.name] = n 

1457 

1458 def check_slots_coverage(self, slots_cache): 

1459 # Validate if all slots are covered or if we should try next 

1460 # startup node 

1461 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

1462 if i not in slots_cache: 

1463 return False 

1464 return True 

1465 

1466 def create_redis_connections(self, nodes): 

1467 """ 

1468 This function will create a redis connection to all nodes in :nodes: 

1469 """ 

1470 for node in nodes: 

1471 if node.redis_connection is None: 

1472 node.redis_connection = self.create_redis_node( 

1473 host=node.host, port=node.port, **self.connection_kwargs 

1474 ) 

1475 

1476 def create_redis_node(self, host, port, **kwargs): 

1477 if self.from_url: 

1478 # Create a redis node with a costumed connection pool 

1479 kwargs.update({"host": host}) 

1480 kwargs.update({"port": port}) 

1481 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

1482 else: 

1483 r = Redis(host=host, port=port, **kwargs) 

1484 return r 

1485 

1486 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

1487 node_name = get_node_name(host, port) 

1488 # check if we already have this node in the tmp_nodes_cache 

1489 target_node = tmp_nodes_cache.get(node_name) 

1490 if target_node is None: 

1491 # before creating a new cluster node, check if the cluster node already 

1492 # exists in the current nodes cache and has a valid connection so we can 

1493 # reuse it 

1494 target_node = self.nodes_cache.get(node_name) 

1495 if target_node is None or target_node.redis_connection is None: 

1496 # create new cluster node for this cluster 

1497 target_node = ClusterNode(host, port, role) 

1498 if target_node.server_type != role: 

1499 target_node.server_type = role 

1500 

1501 return target_node 

1502 

1503 def initialize(self): 

1504 """ 

1505 Initializes the nodes cache, slots cache and redis connections. 

1506 :startup_nodes: 

1507 Responsible for discovering other nodes in the cluster 

1508 """ 

1509 self.reset() 

1510 tmp_nodes_cache = {} 

1511 tmp_slots = {} 

1512 disagreements = [] 

1513 startup_nodes_reachable = False 

1514 fully_covered = False 

1515 kwargs = self.connection_kwargs 

1516 exception = None 

1517 for startup_node in self.startup_nodes.values(): 

1518 try: 

1519 if startup_node.redis_connection: 

1520 r = startup_node.redis_connection 

1521 else: 

1522 # Create a new Redis connection 

1523 r = self.create_redis_node( 

1524 startup_node.host, startup_node.port, **kwargs 

1525 ) 

1526 self.startup_nodes[startup_node.name].redis_connection = r 

1527 # Make sure cluster mode is enabled on this node 

1528 try: 

1529 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

1530 except ResponseError: 

1531 raise RedisClusterException( 

1532 "Cluster mode is not enabled on this node" 

1533 ) 

1534 startup_nodes_reachable = True 

1535 except Exception as e: 

1536 # Try the next startup node. 

1537 # The exception is saved and raised only if we have no more nodes. 

1538 exception = e 

1539 continue 

1540 

1541 # CLUSTER SLOTS command results in the following output: 

1542 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

1543 # where each node contains the following list: [IP, port, node_id] 

1544 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

1545 # primary node of the first slot section. 

1546 # If there's only one server in the cluster, its ``host`` is '' 

1547 # Fix it to the host in startup_nodes 

1548 if ( 

1549 len(cluster_slots) == 1 

1550 and len(cluster_slots[0][2][0]) == 0 

1551 and len(self.startup_nodes) == 1 

1552 ): 

1553 cluster_slots[0][2][0] = startup_node.host 

1554 

1555 for slot in cluster_slots: 

1556 primary_node = slot[2] 

1557 host = str_if_bytes(primary_node[0]) 

1558 if host == "": 

1559 host = startup_node.host 

1560 port = int(primary_node[1]) 

1561 host, port = self.remap_host_port(host, port) 

1562 

1563 target_node = self._get_or_create_cluster_node( 

1564 host, port, PRIMARY, tmp_nodes_cache 

1565 ) 

1566 # add this node to the nodes cache 

1567 tmp_nodes_cache[target_node.name] = target_node 

1568 

1569 for i in range(int(slot[0]), int(slot[1]) + 1): 

1570 if i not in tmp_slots: 

1571 tmp_slots[i] = [] 

1572 tmp_slots[i].append(target_node) 

1573 replica_nodes = [slot[j] for j in range(3, len(slot))] 

1574 

1575 for replica_node in replica_nodes: 

1576 host = str_if_bytes(replica_node[0]) 

1577 port = replica_node[1] 

1578 host, port = self.remap_host_port(host, port) 

1579 

1580 target_replica_node = self._get_or_create_cluster_node( 

1581 host, port, REPLICA, tmp_nodes_cache 

1582 ) 

1583 tmp_slots[i].append(target_replica_node) 

1584 # add this node to the nodes cache 

1585 tmp_nodes_cache[target_replica_node.name] = ( 

1586 target_replica_node 

1587 ) 

1588 else: 

1589 # Validate that 2 nodes want to use the same slot cache 

1590 # setup 

1591 tmp_slot = tmp_slots[i][0] 

1592 if tmp_slot.name != target_node.name: 

1593 disagreements.append( 

1594 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

1595 ) 

1596 

1597 if len(disagreements) > 5: 

1598 raise RedisClusterException( 

1599 f"startup_nodes could not agree on a valid " 

1600 f'slots cache: {", ".join(disagreements)}' 

1601 ) 

1602 

1603 fully_covered = self.check_slots_coverage(tmp_slots) 

1604 if fully_covered: 

1605 # Don't need to continue to the next startup node if all 

1606 # slots are covered 

1607 break 

1608 

1609 if not startup_nodes_reachable: 

1610 raise RedisClusterException( 

1611 f"Redis Cluster cannot be connected. Please provide at least " 

1612 f"one reachable node: {str(exception)}" 

1613 ) from exception 

1614 

1615 # Create Redis connections to all nodes 

1616 self.create_redis_connections(list(tmp_nodes_cache.values())) 

1617 

1618 # Check if the slots are not fully covered 

1619 if not fully_covered and self._require_full_coverage: 

1620 # Despite the requirement that the slots be covered, there 

1621 # isn't a full coverage 

1622 raise RedisClusterException( 

1623 f"All slots are not covered after query all startup_nodes. " 

1624 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

1625 f"covered..." 

1626 ) 

1627 

1628 # Set the tmp variables to the real variables 

1629 self.nodes_cache = tmp_nodes_cache 

1630 self.slots_cache = tmp_slots 

1631 # Set the default node 

1632 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

1633 if self._dynamic_startup_nodes: 

1634 # Populate the startup nodes with all discovered nodes 

1635 self.startup_nodes = tmp_nodes_cache 

1636 # If initialize was called after a MovedError, clear it 

1637 self._moved_exception = None 

1638 

1639 def close(self): 

1640 self.default_node = None 

1641 for node in self.nodes_cache.values(): 

1642 if node.redis_connection: 

1643 node.redis_connection.close() 

1644 

1645 def reset(self): 

1646 try: 

1647 self.read_load_balancer.reset() 

1648 except TypeError: 

1649 # The read_load_balancer is None, do nothing 

1650 pass 

1651 

1652 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

1653 """ 

1654 Remap the host and port returned from the cluster to a different 

1655 internal value. Useful if the client is not connecting directly 

1656 to the cluster. 

1657 """ 

1658 if self.address_remap: 

1659 return self.address_remap((host, port)) 

1660 return host, port 

1661 

1662 

1663class ClusterPubSub(PubSub): 

1664 """ 

1665 Wrapper for PubSub class. 

1666 

1667 IMPORTANT: before using ClusterPubSub, read about the known limitations 

1668 with pubsub in Cluster mode and learn how to workaround them: 

1669 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html 

1670 """ 

1671 

1672 def __init__( 

1673 self, 

1674 redis_cluster, 

1675 node=None, 

1676 host=None, 

1677 port=None, 

1678 push_handler_func=None, 

1679 **kwargs, 

1680 ): 

1681 """ 

1682 When a pubsub instance is created without specifying a node, a single 

1683 node will be transparently chosen for the pubsub connection on the 

1684 first command execution. The node will be determined by: 

1685 1. Hashing the channel name in the request to find its keyslot 

1686 2. Selecting a node that handles the keyslot: If read_from_replicas is 

1687 set to true, a replica can be selected. 

1688 

1689 :type redis_cluster: RedisCluster 

1690 :type node: ClusterNode 

1691 :type host: str 

1692 :type port: int 

1693 """ 

1694 self.node = None 

1695 self.set_pubsub_node(redis_cluster, node, host, port) 

1696 connection_pool = ( 

1697 None 

1698 if self.node is None 

1699 else redis_cluster.get_redis_connection(self.node).connection_pool 

1700 ) 

1701 self.cluster = redis_cluster 

1702 self.node_pubsub_mapping = {} 

1703 self._pubsubs_generator = self._pubsubs_generator() 

1704 super().__init__( 

1705 connection_pool=connection_pool, 

1706 encoder=redis_cluster.encoder, 

1707 push_handler_func=push_handler_func, 

1708 **kwargs, 

1709 ) 

1710 

1711 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

1712 """ 

1713 The pubsub node will be set according to the passed node, host and port 

1714 When none of the node, host, or port are specified - the node is set 

1715 to None and will be determined by the keyslot of the channel in the 

1716 first command to be executed. 

1717 RedisClusterException will be thrown if the passed node does not exist 

1718 in the cluster. 

1719 If host is passed without port, or vice versa, a DataError will be 

1720 thrown. 

1721 :type cluster: RedisCluster 

1722 :type node: ClusterNode 

1723 :type host: str 

1724 :type port: int 

1725 """ 

1726 if node is not None: 

1727 # node is passed by the user 

1728 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

1729 pubsub_node = node 

1730 elif host is not None and port is not None: 

1731 # host and port passed by the user 

1732 node = cluster.get_node(host=host, port=port) 

1733 self._raise_on_invalid_node(cluster, node, host, port) 

1734 pubsub_node = node 

1735 elif any([host, port]) is True: 

1736 # only 'host' or 'port' passed 

1737 raise DataError("Passing a host requires passing a port, and vice versa") 

1738 else: 

1739 # nothing passed by the user. set node to None 

1740 pubsub_node = None 

1741 

1742 self.node = pubsub_node 

1743 

1744 def get_pubsub_node(self): 

1745 """ 

1746 Get the node that is being used as the pubsub connection 

1747 """ 

1748 return self.node 

1749 

1750 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

1751 """ 

1752 Raise a RedisClusterException if the node is None or doesn't exist in 

1753 the cluster. 

1754 """ 

1755 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

1756 raise RedisClusterException( 

1757 f"Node {host}:{port} doesn't exist in the cluster" 

1758 ) 

1759 

1760 def execute_command(self, *args): 

1761 """ 

1762 Execute a subscribe/unsubscribe command. 

1763 

1764 Taken code from redis-py and tweak to make it work within a cluster. 

1765 """ 

1766 # NOTE: don't parse the response in this function -- it could pull a 

1767 # legitimate message off the stack if the connection is already 

1768 # subscribed to one or more channels 

1769 

1770 if self.connection is None: 

1771 if self.connection_pool is None: 

1772 if len(args) > 1: 

1773 # Hash the first channel and get one of the nodes holding 

1774 # this slot 

1775 channel = args[1] 

1776 slot = self.cluster.keyslot(channel) 

1777 node = self.cluster.nodes_manager.get_node_from_slot( 

1778 slot, self.cluster.read_from_replicas 

1779 ) 

1780 else: 

1781 # Get a random node 

1782 node = self.cluster.get_random_node() 

1783 self.node = node 

1784 redis_connection = self.cluster.get_redis_connection(node) 

1785 self.connection_pool = redis_connection.connection_pool 

1786 self.connection = self.connection_pool.get_connection( 

1787 "pubsub", self.shard_hint 

1788 ) 

1789 # register a callback that re-subscribes to any channels we 

1790 # were listening to when we were disconnected 

1791 self.connection.register_connect_callback(self.on_connect) 

1792 if self.push_handler_func is not None and not HIREDIS_AVAILABLE: 

1793 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

1794 connection = self.connection 

1795 self._execute(connection, connection.send_command, *args) 

1796 

1797 def _get_node_pubsub(self, node): 

1798 try: 

1799 return self.node_pubsub_mapping[node.name] 

1800 except KeyError: 

1801 pubsub = node.redis_connection.pubsub( 

1802 push_handler_func=self.push_handler_func 

1803 ) 

1804 self.node_pubsub_mapping[node.name] = pubsub 

1805 return pubsub 

1806 

1807 def _sharded_message_generator(self): 

1808 for _ in range(len(self.node_pubsub_mapping)): 

1809 pubsub = next(self._pubsubs_generator) 

1810 message = pubsub.get_message() 

1811 if message is not None: 

1812 return message 

1813 return None 

1814 

1815 def _pubsubs_generator(self): 

1816 while True: 

1817 for pubsub in self.node_pubsub_mapping.values(): 

1818 yield pubsub 

1819 

1820 def get_sharded_message( 

1821 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None 

1822 ): 

1823 if target_node: 

1824 message = self.node_pubsub_mapping[target_node.name].get_message( 

1825 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout 

1826 ) 

1827 else: 

1828 message = self._sharded_message_generator() 

1829 if message is None: 

1830 return None 

1831 elif str_if_bytes(message["type"]) == "sunsubscribe": 

1832 if message["channel"] in self.pending_unsubscribe_shard_channels: 

1833 self.pending_unsubscribe_shard_channels.remove(message["channel"]) 

1834 self.shard_channels.pop(message["channel"], None) 

1835 node = self.cluster.get_node_from_key(message["channel"]) 

1836 if self.node_pubsub_mapping[node.name].subscribed is False: 

1837 self.node_pubsub_mapping.pop(node.name) 

1838 if not self.channels and not self.patterns and not self.shard_channels: 

1839 # There are no subscriptions anymore, set subscribed_event flag 

1840 # to false 

1841 self.subscribed_event.clear() 

1842 if self.ignore_subscribe_messages or ignore_subscribe_messages: 

1843 return None 

1844 return message 

1845 

1846 def ssubscribe(self, *args, **kwargs): 

1847 if args: 

1848 args = list_or_args(args[0], args[1:]) 

1849 s_channels = dict.fromkeys(args) 

1850 s_channels.update(kwargs) 

1851 for s_channel, handler in s_channels.items(): 

1852 node = self.cluster.get_node_from_key(s_channel) 

1853 pubsub = self._get_node_pubsub(node) 

1854 if handler: 

1855 pubsub.ssubscribe(**{s_channel: handler}) 

1856 else: 

1857 pubsub.ssubscribe(s_channel) 

1858 self.shard_channels.update(pubsub.shard_channels) 

1859 self.pending_unsubscribe_shard_channels.difference_update( 

1860 self._normalize_keys({s_channel: None}) 

1861 ) 

1862 if pubsub.subscribed and not self.subscribed: 

1863 self.subscribed_event.set() 

1864 self.health_check_response_counter = 0 

1865 

1866 def sunsubscribe(self, *args): 

1867 if args: 

1868 args = list_or_args(args[0], args[1:]) 

1869 else: 

1870 args = self.shard_channels 

1871 

1872 for s_channel in args: 

1873 node = self.cluster.get_node_from_key(s_channel) 

1874 p = self._get_node_pubsub(node) 

1875 p.sunsubscribe(s_channel) 

1876 self.pending_unsubscribe_shard_channels.update( 

1877 p.pending_unsubscribe_shard_channels 

1878 ) 

1879 

1880 def get_redis_connection(self): 

1881 """ 

1882 Get the Redis connection of the pubsub connected node. 

1883 """ 

1884 if self.node is not None: 

1885 return self.node.redis_connection 

1886 

1887 def disconnect(self): 

1888 """ 

1889 Disconnect the pubsub connection. 

1890 """ 

1891 if self.connection: 

1892 self.connection.disconnect() 

1893 for pubsub in self.node_pubsub_mapping.values(): 

1894 pubsub.connection.disconnect() 

1895 

1896 

1897class ClusterPipeline(RedisCluster): 

1898 """ 

1899 Support for Redis pipeline 

1900 in cluster mode 

1901 """ 

1902 

1903 ERRORS_ALLOW_RETRY = ( 

1904 ConnectionError, 

1905 TimeoutError, 

1906 MovedError, 

1907 AskError, 

1908 TryAgainError, 

1909 ) 

1910 

1911 def __init__( 

1912 self, 

1913 nodes_manager: "NodesManager", 

1914 commands_parser: "CommandsParser", 

1915 result_callbacks: Optional[Dict[str, Callable]] = None, 

1916 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

1917 startup_nodes: Optional[List["ClusterNode"]] = None, 

1918 read_from_replicas: bool = False, 

1919 cluster_error_retry_attempts: int = 3, 

1920 reinitialize_steps: int = 5, 

1921 lock=None, 

1922 **kwargs, 

1923 ): 

1924 """ """ 

1925 self.command_stack = [] 

1926 self.nodes_manager = nodes_manager 

1927 self.commands_parser = commands_parser 

1928 self.refresh_table_asap = False 

1929 self.result_callbacks = ( 

1930 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

1931 ) 

1932 self.startup_nodes = startup_nodes if startup_nodes else [] 

1933 self.read_from_replicas = read_from_replicas 

1934 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

1935 self.cluster_response_callbacks = cluster_response_callbacks 

1936 self.cluster_error_retry_attempts = cluster_error_retry_attempts 

1937 self.reinitialize_counter = 0 

1938 self.reinitialize_steps = reinitialize_steps 

1939 self.encoder = Encoder( 

1940 kwargs.get("encoding", "utf-8"), 

1941 kwargs.get("encoding_errors", "strict"), 

1942 kwargs.get("decode_responses", False), 

1943 ) 

1944 if lock is None: 

1945 lock = threading.Lock() 

1946 self._lock = lock 

1947 

1948 def __repr__(self): 

1949 """ """ 

1950 return f"{type(self).__name__}" 

1951 

1952 def __enter__(self): 

1953 """ """ 

1954 return self 

1955 

1956 def __exit__(self, exc_type, exc_value, traceback): 

1957 """ """ 

1958 self.reset() 

1959 

1960 def __del__(self): 

1961 try: 

1962 self.reset() 

1963 except Exception: 

1964 pass 

1965 

1966 def __len__(self): 

1967 """ """ 

1968 return len(self.command_stack) 

1969 

1970 def __bool__(self): 

1971 "Pipeline instances should always evaluate to True on Python 3+" 

1972 return True 

1973 

1974 def execute_command(self, *args, **kwargs): 

1975 """ 

1976 Wrapper function for pipeline_execute_command 

1977 """ 

1978 kwargs.pop("keys", None) # the keys are used only for client side caching 

1979 return self.pipeline_execute_command(*args, **kwargs) 

1980 

1981 def pipeline_execute_command(self, *args, **options): 

1982 """ 

1983 Appends the executed command to the pipeline's command stack 

1984 """ 

1985 self.command_stack.append( 

1986 PipelineCommand(args, options, len(self.command_stack)) 

1987 ) 

1988 return self 

1989 

1990 def raise_first_error(self, stack): 

1991 """ 

1992 Raise the first exception on the stack 

1993 """ 

1994 for c in stack: 

1995 r = c.result 

1996 if isinstance(r, Exception): 

1997 self.annotate_exception(r, c.position + 1, c.args) 

1998 raise r 

1999 

2000 def annotate_exception(self, exception, number, command): 

2001 """ 

2002 Provides extra context to the exception prior to it being handled 

2003 """ 

2004 cmd = " ".join(map(safe_str, command)) 

2005 msg = ( 

2006 f"Command # {number} ({cmd}) of pipeline " 

2007 f"caused error: {exception.args[0]}" 

2008 ) 

2009 exception.args = (msg,) + exception.args[1:] 

2010 

2011 def execute(self, raise_on_error=True): 

2012 """ 

2013 Execute all the commands in the current pipeline 

2014 """ 

2015 stack = self.command_stack 

2016 try: 

2017 return self.send_cluster_commands(stack, raise_on_error) 

2018 finally: 

2019 self.reset() 

2020 

2021 def reset(self): 

2022 """ 

2023 Reset back to empty pipeline. 

2024 """ 

2025 self.command_stack = [] 

2026 

2027 self.scripts = set() 

2028 

2029 # TODO: Implement 

2030 # make sure to reset the connection state in the event that we were 

2031 # watching something 

2032 # if self.watching and self.connection: 

2033 # try: 

2034 # # call this manually since our unwatch or 

2035 # # immediate_execute_command methods can call reset() 

2036 # self.connection.send_command('UNWATCH') 

2037 # self.connection.read_response() 

2038 # except ConnectionError: 

2039 # # disconnect will also remove any previous WATCHes 

2040 # self.connection.disconnect() 

2041 

2042 # clean up the other instance attributes 

2043 self.watching = False 

2044 self.explicit_transaction = False 

2045 

2046 # TODO: Implement 

2047 # we can safely return the connection to the pool here since we're 

2048 # sure we're no longer WATCHing anything 

2049 # if self.connection: 

2050 # self.connection_pool.release(self.connection) 

2051 # self.connection = None 

2052 

2053 def send_cluster_commands( 

2054 self, stack, raise_on_error=True, allow_redirections=True 

2055 ): 

2056 """ 

2057 Wrapper for CLUSTERDOWN error handling. 

2058 

2059 If the cluster reports it is down it is assumed that: 

2060 - connection_pool was disconnected 

2061 - connection_pool was reseted 

2062 - refereh_table_asap set to True 

2063 

2064 It will try the number of times specified by 

2065 the config option "self.cluster_error_retry_attempts" 

2066 which defaults to 3 unless manually configured. 

2067 

2068 If it reaches the number of times, the command will 

2069 raises ClusterDownException. 

2070 """ 

2071 if not stack: 

2072 return [] 

2073 retry_attempts = self.cluster_error_retry_attempts 

2074 while True: 

2075 try: 

2076 return self._send_cluster_commands( 

2077 stack, 

2078 raise_on_error=raise_on_error, 

2079 allow_redirections=allow_redirections, 

2080 ) 

2081 except (ClusterDownError, ConnectionError) as e: 

2082 if retry_attempts > 0: 

2083 # Try again with the new cluster setup. All other errors 

2084 # should be raised. 

2085 retry_attempts -= 1 

2086 pass 

2087 else: 

2088 raise e 

2089 

2090 def _send_cluster_commands( 

2091 self, stack, raise_on_error=True, allow_redirections=True 

2092 ): 

2093 """ 

2094 Send a bunch of cluster commands to the redis cluster. 

2095 

2096 `allow_redirections` If the pipeline should follow 

2097 `ASK` & `MOVED` responses automatically. If set 

2098 to false it will raise RedisClusterException. 

2099 """ 

2100 # the first time sending the commands we send all of 

2101 # the commands that were queued up. 

2102 # if we have to run through it again, we only retry 

2103 # the commands that failed. 

2104 attempt = sorted(stack, key=lambda x: x.position) 

2105 is_default_node = False 

2106 # build a list of node objects based on node names we need to 

2107 nodes = {} 

2108 

2109 # as we move through each command that still needs to be processed, 

2110 # we figure out the slot number that command maps to, then from 

2111 # the slot determine the node. 

2112 for c in attempt: 

2113 while True: 

2114 # refer to our internal node -> slot table that 

2115 # tells us where a given command should route to. 

2116 # (it might be possible we have a cached node that no longer 

2117 # exists in the cluster, which is why we do this in a loop) 

2118 passed_targets = c.options.pop("target_nodes", None) 

2119 if passed_targets and not self._is_nodes_flag(passed_targets): 

2120 target_nodes = self._parse_target_nodes(passed_targets) 

2121 else: 

2122 target_nodes = self._determine_nodes( 

2123 *c.args, node_flag=passed_targets 

2124 ) 

2125 if not target_nodes: 

2126 raise RedisClusterException( 

2127 f"No targets were found to execute {c.args} command on" 

2128 ) 

2129 if len(target_nodes) > 1: 

2130 raise RedisClusterException( 

2131 f"Too many targets for command {c.args}" 

2132 ) 

2133 

2134 node = target_nodes[0] 

2135 if node == self.get_default_node(): 

2136 is_default_node = True 

2137 

2138 # now that we know the name of the node 

2139 # ( it's just a string in the form of host:port ) 

2140 # we can build a list of commands for each node. 

2141 node_name = node.name 

2142 if node_name not in nodes: 

2143 redis_node = self.get_redis_connection(node) 

2144 try: 

2145 connection = get_connection(redis_node, c.args) 

2146 except ConnectionError: 

2147 for n in nodes.values(): 

2148 n.connection_pool.release(n.connection) 

2149 # Connection retries are being handled in the node's 

2150 # Retry object. Reinitialize the node -> slot table. 

2151 self.nodes_manager.initialize() 

2152 if is_default_node: 

2153 self.replace_default_node() 

2154 raise 

2155 nodes[node_name] = NodeCommands( 

2156 redis_node.parse_response, 

2157 redis_node.connection_pool, 

2158 connection, 

2159 ) 

2160 nodes[node_name].append(c) 

2161 break 

2162 

2163 # send the commands in sequence. 

2164 # we write to all the open sockets for each node first, 

2165 # before reading anything 

2166 # this allows us to flush all the requests out across the 

2167 # network essentially in parallel 

2168 # so that we can read them all in parallel as they come back. 

2169 # we dont' multiplex on the sockets as they come available, 

2170 # but that shouldn't make too much difference. 

2171 node_commands = nodes.values() 

2172 try: 

2173 node_commands = nodes.values() 

2174 for n in node_commands: 

2175 n.write() 

2176 

2177 for n in node_commands: 

2178 n.read() 

2179 finally: 

2180 # release all of the redis connections we allocated earlier 

2181 # back into the connection pool. 

2182 # we used to do this step as part of a try/finally block, 

2183 # but it is really dangerous to 

2184 # release connections back into the pool if for some 

2185 # reason the socket has data still left in it 

2186 # from a previous operation. The write and 

2187 # read operations already have try/catch around them for 

2188 # all known types of errors including connection 

2189 # and socket level errors. 

2190 # So if we hit an exception, something really bad 

2191 # happened and putting any oF 

2192 # these connections back into the pool is a very bad idea. 

2193 # the socket might have unread buffer still sitting in it, 

2194 # and then the next time we read from it we pass the 

2195 # buffered result back from a previous command and 

2196 # every single request after to that connection will always get 

2197 # a mismatched result. 

2198 for n in nodes.values(): 

2199 n.connection_pool.release(n.connection) 

2200 

2201 # if the response isn't an exception it is a 

2202 # valid response from the node 

2203 # we're all done with that command, YAY! 

2204 # if we have more commands to attempt, we've run into problems. 

2205 # collect all the commands we are allowed to retry. 

2206 # (MOVED, ASK, or connection errors or timeout errors) 

2207 attempt = sorted( 

2208 ( 

2209 c 

2210 for c in attempt 

2211 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

2212 ), 

2213 key=lambda x: x.position, 

2214 ) 

2215 if attempt and allow_redirections: 

2216 # RETRY MAGIC HAPPENS HERE! 

2217 # send these remaining commands one at a time using `execute_command` 

2218 # in the main client. This keeps our retry logic 

2219 # in one place mostly, 

2220 # and allows us to be more confident in correctness of behavior. 

2221 # at this point any speed gains from pipelining have been lost 

2222 # anyway, so we might as well make the best 

2223 # attempt to get the correct behavior. 

2224 # 

2225 # The client command will handle retries for each 

2226 # individual command sequentially as we pass each 

2227 # one into `execute_command`. Any exceptions 

2228 # that bubble out should only appear once all 

2229 # retries have been exhausted. 

2230 # 

2231 # If a lot of commands have failed, we'll be setting the 

2232 # flag to rebuild the slots table from scratch. 

2233 # So MOVED errors should correct themselves fairly quickly. 

2234 self.reinitialize_counter += 1 

2235 if self._should_reinitialized(): 

2236 self.nodes_manager.initialize() 

2237 if is_default_node: 

2238 self.replace_default_node() 

2239 for c in attempt: 

2240 try: 

2241 # send each command individually like we 

2242 # do in the main client. 

2243 c.result = super().execute_command(*c.args, **c.options) 

2244 except RedisError as e: 

2245 c.result = e 

2246 

2247 # turn the response back into a simple flat array that corresponds 

2248 # to the sequence of commands issued in the stack in pipeline.execute() 

2249 response = [] 

2250 for c in sorted(stack, key=lambda x: x.position): 

2251 if c.args[0] in self.cluster_response_callbacks: 

2252 c.result = self.cluster_response_callbacks[c.args[0]]( 

2253 c.result, **c.options 

2254 ) 

2255 response.append(c.result) 

2256 

2257 if raise_on_error: 

2258 self.raise_first_error(stack) 

2259 

2260 return response 

2261 

2262 def _fail_on_redirect(self, allow_redirections): 

2263 """ """ 

2264 if not allow_redirections: 

2265 raise RedisClusterException( 

2266 "ASK & MOVED redirection not allowed in this pipeline" 

2267 ) 

2268 

2269 def exists(self, *keys): 

2270 return self.execute_command("EXISTS", *keys) 

2271 

2272 def eval(self): 

2273 """ """ 

2274 raise RedisClusterException("method eval() is not implemented") 

2275 

2276 def multi(self): 

2277 """ """ 

2278 raise RedisClusterException("method multi() is not implemented") 

2279 

2280 def immediate_execute_command(self, *args, **options): 

2281 """ """ 

2282 raise RedisClusterException( 

2283 "method immediate_execute_command() is not implemented" 

2284 ) 

2285 

2286 def _execute_transaction(self, *args, **kwargs): 

2287 """ """ 

2288 raise RedisClusterException("method _execute_transaction() is not implemented") 

2289 

2290 def load_scripts(self): 

2291 """ """ 

2292 raise RedisClusterException("method load_scripts() is not implemented") 

2293 

2294 def watch(self, *names): 

2295 """ """ 

2296 raise RedisClusterException("method watch() is not implemented") 

2297 

2298 def unwatch(self): 

2299 """ """ 

2300 raise RedisClusterException("method unwatch() is not implemented") 

2301 

2302 def script_load_for_pipeline(self, *args, **kwargs): 

2303 """ """ 

2304 raise RedisClusterException( 

2305 "method script_load_for_pipeline() is not implemented" 

2306 ) 

2307 

2308 def delete(self, *names): 

2309 """ 

2310 "Delete a key specified by ``names``" 

2311 """ 

2312 if len(names) != 1: 

2313 raise RedisClusterException( 

2314 "deleting multiple keys is not implemented in pipeline command" 

2315 ) 

2316 

2317 return self.execute_command("DEL", names[0]) 

2318 

2319 def unlink(self, *names): 

2320 """ 

2321 "Unlink a key specified by ``names``" 

2322 """ 

2323 if len(names) != 1: 

2324 raise RedisClusterException( 

2325 "unlinking multiple keys is not implemented in pipeline command" 

2326 ) 

2327 

2328 return self.execute_command("UNLINK", names[0]) 

2329 

2330 

2331def block_pipeline_command(name: str) -> Callable[..., Any]: 

2332 """ 

2333 Prints error because some pipelined commands should 

2334 be blocked when running in cluster-mode 

2335 """ 

2336 

2337 def inner(*args, **kwargs): 

2338 raise RedisClusterException( 

2339 f"ERROR: Calling pipelined function {name} is blocked " 

2340 f"when running redis in cluster mode..." 

2341 ) 

2342 

2343 return inner 

2344 

2345 

2346# Blocked pipeline commands 

2347PIPELINE_BLOCKED_COMMANDS = ( 

2348 "BGREWRITEAOF", 

2349 "BGSAVE", 

2350 "BITOP", 

2351 "BRPOPLPUSH", 

2352 "CLIENT GETNAME", 

2353 "CLIENT KILL", 

2354 "CLIENT LIST", 

2355 "CLIENT SETNAME", 

2356 "CLIENT", 

2357 "CONFIG GET", 

2358 "CONFIG RESETSTAT", 

2359 "CONFIG REWRITE", 

2360 "CONFIG SET", 

2361 "CONFIG", 

2362 "DBSIZE", 

2363 "ECHO", 

2364 "EVALSHA", 

2365 "FLUSHALL", 

2366 "FLUSHDB", 

2367 "INFO", 

2368 "KEYS", 

2369 "LASTSAVE", 

2370 "MGET", 

2371 "MGET NONATOMIC", 

2372 "MOVE", 

2373 "MSET", 

2374 "MSET NONATOMIC", 

2375 "MSETNX", 

2376 "PFCOUNT", 

2377 "PFMERGE", 

2378 "PING", 

2379 "PUBLISH", 

2380 "RANDOMKEY", 

2381 "READONLY", 

2382 "READWRITE", 

2383 "RENAME", 

2384 "RENAMENX", 

2385 "RPOPLPUSH", 

2386 "SAVE", 

2387 "SCAN", 

2388 "SCRIPT EXISTS", 

2389 "SCRIPT FLUSH", 

2390 "SCRIPT KILL", 

2391 "SCRIPT LOAD", 

2392 "SCRIPT", 

2393 "SDIFF", 

2394 "SDIFFSTORE", 

2395 "SENTINEL GET MASTER ADDR BY NAME", 

2396 "SENTINEL MASTER", 

2397 "SENTINEL MASTERS", 

2398 "SENTINEL MONITOR", 

2399 "SENTINEL REMOVE", 

2400 "SENTINEL SENTINELS", 

2401 "SENTINEL SET", 

2402 "SENTINEL SLAVES", 

2403 "SENTINEL", 

2404 "SHUTDOWN", 

2405 "SINTER", 

2406 "SINTERSTORE", 

2407 "SLAVEOF", 

2408 "SLOWLOG GET", 

2409 "SLOWLOG LEN", 

2410 "SLOWLOG RESET", 

2411 "SLOWLOG", 

2412 "SMOVE", 

2413 "SORT", 

2414 "SUNION", 

2415 "SUNIONSTORE", 

2416 "TIME", 

2417) 

2418for command in PIPELINE_BLOCKED_COMMANDS: 

2419 command = command.replace(" ", "_").lower() 

2420 

2421 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

2422 

2423 

2424class PipelineCommand: 

2425 """ """ 

2426 

2427 def __init__(self, args, options=None, position=None): 

2428 self.args = args 

2429 if options is None: 

2430 options = {} 

2431 self.options = options 

2432 self.position = position 

2433 self.result = None 

2434 self.node = None 

2435 self.asking = False 

2436 

2437 

2438class NodeCommands: 

2439 """ """ 

2440 

2441 def __init__(self, parse_response, connection_pool, connection): 

2442 """ """ 

2443 self.parse_response = parse_response 

2444 self.connection_pool = connection_pool 

2445 self.connection = connection 

2446 self.commands = [] 

2447 

2448 def append(self, c): 

2449 """ """ 

2450 self.commands.append(c) 

2451 

2452 def write(self): 

2453 """ 

2454 Code borrowed from Redis so it can be fixed 

2455 """ 

2456 connection = self.connection 

2457 commands = self.commands 

2458 

2459 # We are going to clobber the commands with the write, so go ahead 

2460 # and ensure that nothing is sitting there from a previous run. 

2461 for c in commands: 

2462 c.result = None 

2463 

2464 # build up all commands into a single request to increase network perf 

2465 # send all the commands and catch connection and timeout errors. 

2466 try: 

2467 connection.send_packed_command( 

2468 connection.pack_commands([c.args for c in commands]) 

2469 ) 

2470 except (ConnectionError, TimeoutError) as e: 

2471 for c in commands: 

2472 c.result = e 

2473 

2474 def read(self): 

2475 """ """ 

2476 connection = self.connection 

2477 for c in self.commands: 

2478 # if there is a result on this command, 

2479 # it means we ran into an exception 

2480 # like a connection error. Trying to parse 

2481 # a response on a connection that 

2482 # is no longer open will result in a 

2483 # connection error raised by redis-py. 

2484 # but redis-py doesn't check in parse_response 

2485 # that the sock object is 

2486 # still set and if you try to 

2487 # read from a closed connection, it will 

2488 # result in an AttributeError because 

2489 # it will do a readline() call on None. 

2490 # This can have all kinds of nasty side-effects. 

2491 # Treating this case as a connection error 

2492 # is fine because it will dump 

2493 # the connection object back into the 

2494 # pool and on the next write, it will 

2495 # explicitly open the connection and all will be well. 

2496 if c.result is None: 

2497 try: 

2498 c.result = self.parse_response(connection, c.args[0], **c.options) 

2499 except (ConnectionError, TimeoutError) as e: 

2500 for c in self.commands: 

2501 c.result = e 

2502 return 

2503 except RedisError: 

2504 c.result = sys.exc_info()[1]