Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1520 statements  

1import logging 

2import random 

3import socket 

4import sys 

5import threading 

6import time 

7from abc import ABC, abstractmethod 

8from collections import OrderedDict 

9from copy import copy 

10from enum import Enum 

11from itertools import chain 

12from typing import ( 

13 Any, 

14 Callable, 

15 Dict, 

16 List, 

17 Literal, 

18 Optional, 

19 Set, 

20 Tuple, 

21 Union, 

22) 

23 

24from redis._parsers import CommandsParser, Encoder 

25from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy 

26from redis._parsers.helpers import parse_scan 

27from redis.backoff import ExponentialWithJitterBackoff, NoBackoff 

28from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface 

29from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis 

30from redis.commands import READ_COMMANDS, RedisClusterCommands 

31from redis.commands.helpers import list_or_args 

32from redis.commands.policies import PolicyResolver, StaticPolicyResolver 

33from redis.connection import ( 

34 Connection, 

35 ConnectionPool, 

36 parse_url, 

37) 

38from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

39from redis.event import ( 

40 AfterPooledConnectionsInstantiationEvent, 

41 AfterPubSubConnectionInstantiationEvent, 

42 ClientType, 

43 EventDispatcher, 

44) 

45from redis.exceptions import ( 

46 AskError, 

47 AuthenticationError, 

48 ClusterDownError, 

49 ClusterError, 

50 ConnectionError, 

51 CrossSlotTransactionError, 

52 DataError, 

53 ExecAbortError, 

54 InvalidPipelineStack, 

55 MaxConnectionsError, 

56 MovedError, 

57 RedisClusterException, 

58 RedisError, 

59 ResponseError, 

60 SlotNotCoveredError, 

61 TimeoutError, 

62 TryAgainError, 

63 WatchError, 

64) 

65from redis.lock import Lock 

66from redis.maint_notifications import ( 

67 MaintNotificationsConfig, 

68 OSSMaintNotificationsHandler, 

69) 

70from redis.retry import Retry 

71from redis.utils import ( 

72 check_protocol_version, 

73 deprecated_args, 

74 deprecated_function, 

75 dict_merge, 

76 list_keys_to_dict, 

77 merge_result, 

78 safe_str, 

79 str_if_bytes, 

80 truncate_text, 

81) 

82 

83logger = logging.getLogger(__name__) 

84 

85 

86def is_debug_log_enabled(): 

87 return logger.isEnabledFor(logging.DEBUG) 

88 

89 

90def get_node_name(host: str, port: Union[str, int]) -> str: 

91 return f"{host}:{port}" 

92 

93 

94@deprecated_args( 

95 allowed_args=["redis_node"], 

96 reason="Use get_connection(redis_node) instead", 

97 version="5.3.0", 

98) 

99def get_connection(redis_node: Redis, *args, **options) -> Connection: 

100 return redis_node.connection or redis_node.connection_pool.get_connection() 

101 

102 

103def parse_scan_result(command, res, **options): 

104 cursors = {} 

105 ret = [] 

106 for node_name, response in res.items(): 

107 cursor, r = parse_scan(response, **options) 

108 cursors[node_name] = cursor 

109 ret += r 

110 

111 return cursors, ret 

112 

113 

114def parse_pubsub_numsub(command, res, **options): 

115 numsub_d = OrderedDict() 

116 for numsub_tups in res.values(): 

117 for channel, numsubbed in numsub_tups: 

118 try: 

119 numsub_d[channel] += numsubbed 

120 except KeyError: 

121 numsub_d[channel] = numsubbed 

122 

123 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

124 return ret_numsub 

125 

126 

127def parse_cluster_slots( 

128 resp: Any, **options: Any 

129) -> Dict[Tuple[int, int], Dict[str, Any]]: 

130 current_host = options.get("current_host", "") 

131 

132 def fix_server(*args: Any) -> Tuple[str, Any]: 

133 return str_if_bytes(args[0]) or current_host, args[1] 

134 

135 slots = {} 

136 for slot in resp: 

137 start, end, primary = slot[:3] 

138 replicas = slot[3:] 

139 slots[start, end] = { 

140 "primary": fix_server(*primary), 

141 "replicas": [fix_server(*replica) for replica in replicas], 

142 } 

143 

144 return slots 

145 

146 

147def parse_cluster_shards(resp, **options): 

148 """ 

149 Parse CLUSTER SHARDS response. 

150 """ 

151 if isinstance(resp[0], dict): 

152 return resp 

153 shards = [] 

154 for x in resp: 

155 shard = {"slots": [], "nodes": []} 

156 for i in range(0, len(x[1]), 2): 

157 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

158 nodes = x[3] 

159 for node in nodes: 

160 dict_node = {} 

161 for i in range(0, len(node), 2): 

162 dict_node[node[i]] = node[i + 1] 

163 shard["nodes"].append(dict_node) 

164 shards.append(shard) 

165 

166 return shards 

167 

168 

169def parse_cluster_myshardid(resp, **options): 

170 """ 

171 Parse CLUSTER MYSHARDID response. 

172 """ 

173 return resp.decode("utf-8") 

174 

175 

176PRIMARY = "primary" 

177REPLICA = "replica" 

178SLOT_ID = "slot-id" 

179 

180REDIS_ALLOWED_KEYS = ( 

181 "connection_class", 

182 "connection_pool", 

183 "connection_pool_class", 

184 "client_name", 

185 "credential_provider", 

186 "db", 

187 "decode_responses", 

188 "encoding", 

189 "encoding_errors", 

190 "host", 

191 "lib_name", 

192 "lib_version", 

193 "max_connections", 

194 "nodes_flag", 

195 "redis_connect_func", 

196 "password", 

197 "port", 

198 "timeout", 

199 "queue_class", 

200 "retry", 

201 "retry_on_timeout", 

202 "protocol", 

203 "socket_connect_timeout", 

204 "socket_keepalive", 

205 "socket_keepalive_options", 

206 "socket_timeout", 

207 "ssl", 

208 "ssl_ca_certs", 

209 "ssl_ca_data", 

210 "ssl_ca_path", 

211 "ssl_certfile", 

212 "ssl_cert_reqs", 

213 "ssl_include_verify_flags", 

214 "ssl_exclude_verify_flags", 

215 "ssl_keyfile", 

216 "ssl_password", 

217 "ssl_check_hostname", 

218 "unix_socket_path", 

219 "username", 

220 "cache", 

221 "cache_config", 

222 "maint_notifications_config", 

223) 

224KWARGS_DISABLED_KEYS = ("host", "port", "retry") 

225 

226 

227def cleanup_kwargs(**kwargs): 

228 """ 

229 Remove unsupported or disabled keys from kwargs 

230 """ 

231 connection_kwargs = { 

232 k: v 

233 for k, v in kwargs.items() 

234 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

235 } 

236 

237 return connection_kwargs 

238 

239 

240class MaintNotificationsAbstractRedisCluster: 

241 """ 

242 Abstract class for handling maintenance notifications logic. 

243 This class is expected to be used as base class together with RedisCluster. 

244 

245 This class is intended to be used with multiple inheritance! 

246 

247 All logic related to maintenance notifications is encapsulated in this class. 

248 """ 

249 

250 def __init__( 

251 self, 

252 maint_notifications_config: Optional[MaintNotificationsConfig], 

253 **kwargs, 

254 ): 

255 # Initialize maintenance notifications 

256 is_protocol_supported = check_protocol_version(kwargs.get("protocol"), 3) 

257 

258 if ( 

259 maint_notifications_config 

260 and maint_notifications_config.enabled 

261 and not is_protocol_supported 

262 ): 

263 raise RedisError( 

264 "Maintenance notifications handlers on connection are only supported with RESP version 3" 

265 ) 

266 if maint_notifications_config is None and is_protocol_supported: 

267 maint_notifications_config = MaintNotificationsConfig() 

268 

269 self.maint_notifications_config = maint_notifications_config 

270 

271 if self.maint_notifications_config and self.maint_notifications_config.enabled: 

272 self._oss_cluster_maint_notifications_handler = ( 

273 OSSMaintNotificationsHandler(self, self.maint_notifications_config) 

274 ) 

275 # Update connection kwargs for all future nodes connections 

276 self._update_connection_kwargs_for_maint_notifications( 

277 self._oss_cluster_maint_notifications_handler 

278 ) 

279 # Update existing nodes connections - they are created as part of the RedisCluster constructor 

280 for node in self.get_nodes(): 

281 if node.redis_connection is None: 

282 continue 

283 node.redis_connection.connection_pool.update_maint_notifications_config( 

284 self.maint_notifications_config, 

285 oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler, 

286 ) 

287 else: 

288 self._oss_cluster_maint_notifications_handler = None 

289 

290 def _update_connection_kwargs_for_maint_notifications( 

291 self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler 

292 ): 

293 """ 

294 Update the connection kwargs for all future connections. 

295 """ 

296 self.nodes_manager.connection_kwargs.update( 

297 { 

298 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler, 

299 } 

300 ) 

301 

302 

303class AbstractRedisCluster: 

304 RedisClusterRequestTTL = 16 

305 

306 PRIMARIES = "primaries" 

307 REPLICAS = "replicas" 

308 ALL_NODES = "all" 

309 RANDOM = "random" 

310 DEFAULT_NODE = "default-node" 

311 

312 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

313 

314 COMMAND_FLAGS = dict_merge( 

315 list_keys_to_dict( 

316 [ 

317 "ACL CAT", 

318 "ACL DELUSER", 

319 "ACL DRYRUN", 

320 "ACL GENPASS", 

321 "ACL GETUSER", 

322 "ACL HELP", 

323 "ACL LIST", 

324 "ACL LOG", 

325 "ACL LOAD", 

326 "ACL SAVE", 

327 "ACL SETUSER", 

328 "ACL USERS", 

329 "ACL WHOAMI", 

330 "AUTH", 

331 "CLIENT LIST", 

332 "CLIENT SETINFO", 

333 "CLIENT SETNAME", 

334 "CLIENT GETNAME", 

335 "CONFIG SET", 

336 "CONFIG REWRITE", 

337 "CONFIG RESETSTAT", 

338 "TIME", 

339 "PUBSUB CHANNELS", 

340 "PUBSUB NUMPAT", 

341 "PUBSUB NUMSUB", 

342 "PUBSUB SHARDCHANNELS", 

343 "PUBSUB SHARDNUMSUB", 

344 "PING", 

345 "INFO", 

346 "SHUTDOWN", 

347 "KEYS", 

348 "DBSIZE", 

349 "BGSAVE", 

350 "SLOWLOG GET", 

351 "SLOWLOG LEN", 

352 "SLOWLOG RESET", 

353 "WAIT", 

354 "WAITAOF", 

355 "SAVE", 

356 "MEMORY PURGE", 

357 "MEMORY MALLOC-STATS", 

358 "MEMORY STATS", 

359 "LASTSAVE", 

360 "CLIENT TRACKINGINFO", 

361 "CLIENT PAUSE", 

362 "CLIENT UNPAUSE", 

363 "CLIENT UNBLOCK", 

364 "CLIENT ID", 

365 "CLIENT REPLY", 

366 "CLIENT GETREDIR", 

367 "CLIENT INFO", 

368 "CLIENT KILL", 

369 "READONLY", 

370 "CLUSTER INFO", 

371 "CLUSTER MEET", 

372 "CLUSTER MYSHARDID", 

373 "CLUSTER NODES", 

374 "CLUSTER REPLICAS", 

375 "CLUSTER RESET", 

376 "CLUSTER SET-CONFIG-EPOCH", 

377 "CLUSTER SLOTS", 

378 "CLUSTER SHARDS", 

379 "CLUSTER COUNT-FAILURE-REPORTS", 

380 "CLUSTER KEYSLOT", 

381 "COMMAND", 

382 "COMMAND COUNT", 

383 "COMMAND LIST", 

384 "COMMAND GETKEYS", 

385 "CONFIG GET", 

386 "DEBUG", 

387 "RANDOMKEY", 

388 "READONLY", 

389 "READWRITE", 

390 "TIME", 

391 "TFUNCTION LOAD", 

392 "TFUNCTION DELETE", 

393 "TFUNCTION LIST", 

394 "TFCALL", 

395 "TFCALLASYNC", 

396 "LATENCY HISTORY", 

397 "LATENCY LATEST", 

398 "LATENCY RESET", 

399 "MODULE LIST", 

400 "MODULE LOAD", 

401 "MODULE UNLOAD", 

402 "MODULE LOADEX", 

403 ], 

404 DEFAULT_NODE, 

405 ), 

406 list_keys_to_dict( 

407 [ 

408 "FLUSHALL", 

409 "FLUSHDB", 

410 "FUNCTION DELETE", 

411 "FUNCTION FLUSH", 

412 "FUNCTION LIST", 

413 "FUNCTION LOAD", 

414 "FUNCTION RESTORE", 

415 "SCAN", 

416 "SCRIPT EXISTS", 

417 "SCRIPT FLUSH", 

418 "SCRIPT LOAD", 

419 ], 

420 PRIMARIES, 

421 ), 

422 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

423 list_keys_to_dict( 

424 [ 

425 "CLUSTER COUNTKEYSINSLOT", 

426 "CLUSTER DELSLOTS", 

427 "CLUSTER DELSLOTSRANGE", 

428 "CLUSTER GETKEYSINSLOT", 

429 "CLUSTER SETSLOT", 

430 ], 

431 SLOT_ID, 

432 ), 

433 ) 

434 

435 SEARCH_COMMANDS = ( 

436 [ 

437 "FT.CREATE", 

438 "FT.SEARCH", 

439 "FT.AGGREGATE", 

440 "FT.EXPLAIN", 

441 "FT.EXPLAINCLI", 

442 "FT,PROFILE", 

443 "FT.ALTER", 

444 "FT.DROPINDEX", 

445 "FT.ALIASADD", 

446 "FT.ALIASUPDATE", 

447 "FT.ALIASDEL", 

448 "FT.TAGVALS", 

449 "FT.SUGADD", 

450 "FT.SUGGET", 

451 "FT.SUGDEL", 

452 "FT.SUGLEN", 

453 "FT.SYNUPDATE", 

454 "FT.SYNDUMP", 

455 "FT.SPELLCHECK", 

456 "FT.DICTADD", 

457 "FT.DICTDEL", 

458 "FT.DICTDUMP", 

459 "FT.INFO", 

460 "FT._LIST", 

461 "FT.CONFIG", 

462 "FT.ADD", 

463 "FT.DEL", 

464 "FT.DROP", 

465 "FT.GET", 

466 "FT.MGET", 

467 "FT.SYNADD", 

468 ], 

469 ) 

470 

471 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

472 "CLUSTER SLOTS": parse_cluster_slots, 

473 "CLUSTER SHARDS": parse_cluster_shards, 

474 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

475 } 

476 

477 RESULT_CALLBACKS = dict_merge( 

478 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub), 

479 list_keys_to_dict( 

480 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

481 ), 

482 list_keys_to_dict( 

483 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result 

484 ), 

485 list_keys_to_dict( 

486 [ 

487 "PING", 

488 "CONFIG SET", 

489 "CONFIG REWRITE", 

490 "CONFIG RESETSTAT", 

491 "CLIENT SETNAME", 

492 "BGSAVE", 

493 "SLOWLOG RESET", 

494 "SAVE", 

495 "MEMORY PURGE", 

496 "CLIENT PAUSE", 

497 "CLIENT UNPAUSE", 

498 ], 

499 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

500 ), 

501 list_keys_to_dict( 

502 ["DBSIZE", "WAIT"], 

503 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

504 ), 

505 list_keys_to_dict( 

506 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

507 ), 

508 list_keys_to_dict(["SCAN"], parse_scan_result), 

509 list_keys_to_dict( 

510 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

511 ), 

512 list_keys_to_dict( 

513 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

514 ), 

515 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

516 ) 

517 

518 ERRORS_ALLOW_RETRY = ( 

519 ConnectionError, 

520 TimeoutError, 

521 ClusterDownError, 

522 SlotNotCoveredError, 

523 ) 

524 

525 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

526 """Replace the default cluster node. 

527 A random cluster node will be chosen if target_node isn't passed, and primaries 

528 will be prioritized. The default node will not be changed if there are no other 

529 nodes in the cluster. 

530 

531 Args: 

532 target_node (ClusterNode, optional): Target node to replace the default 

533 node. Defaults to None. 

534 """ 

535 if target_node: 

536 self.nodes_manager.default_node = target_node 

537 else: 

538 curr_node = self.get_default_node() 

539 primaries = [node for node in self.get_primaries() if node != curr_node] 

540 if primaries: 

541 # Choose a primary if the cluster contains different primaries 

542 self.nodes_manager.default_node = random.choice(primaries) 

543 else: 

544 # Otherwise, choose a primary if the cluster contains different primaries 

545 replicas = [node for node in self.get_replicas() if node != curr_node] 

546 if replicas: 

547 self.nodes_manager.default_node = random.choice(replicas) 

548 

549 

550class RedisCluster( 

551 AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands 

552): 

553 @classmethod 

554 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": 

555 """ 

556 Return a Redis client object configured from the given URL 

557 

558 For example:: 

559 

560 redis://[[username]:[password]]@localhost:6379/0 

561 rediss://[[username]:[password]]@localhost:6379/0 

562 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

563 

564 Three URL schemes are supported: 

565 

566 - `redis://` creates a TCP socket connection. See more at: 

567 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

568 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

569 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

570 - ``unix://``: creates a Unix Domain Socket connection. 

571 

572 The username, password, hostname, path and all querystring values 

573 are passed through urllib.parse.unquote in order to replace any 

574 percent-encoded values with their corresponding characters. 

575 

576 There are several ways to specify a database number. The first value 

577 found will be used: 

578 

579 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

580 2. If using the redis:// or rediss:// schemes, the path argument 

581 of the url, e.g. redis://localhost/0 

582 3. A ``db`` keyword argument to this function. 

583 

584 If none of these options are specified, the default db=0 is used. 

585 

586 All querystring options are cast to their appropriate Python types. 

587 Boolean arguments can be specified with string values "True"/"False" 

588 or "Yes"/"No". Values that cannot be properly cast cause a 

589 ``ValueError`` to be raised. Once parsed, the querystring arguments 

590 and keyword arguments are passed to the ``ConnectionPool``'s 

591 class initializer. In the case of conflicting arguments, querystring 

592 arguments always win. 

593 

594 """ 

595 return cls(url=url, **kwargs) 

596 

597 @deprecated_args( 

598 args_to_warn=["read_from_replicas"], 

599 reason="Please configure the 'load_balancing_strategy' instead", 

600 version="5.3.0", 

601 ) 

602 @deprecated_args( 

603 args_to_warn=[ 

604 "cluster_error_retry_attempts", 

605 ], 

606 reason="Please configure the 'retry' object instead", 

607 version="6.0.0", 

608 ) 

609 def __init__( 

610 self, 

611 host: Optional[str] = None, 

612 port: int = 6379, 

613 startup_nodes: Optional[List["ClusterNode"]] = None, 

614 cluster_error_retry_attempts: int = 3, 

615 retry: Optional["Retry"] = None, 

616 require_full_coverage: bool = True, 

617 reinitialize_steps: int = 5, 

618 read_from_replicas: bool = False, 

619 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None, 

620 dynamic_startup_nodes: bool = True, 

621 url: Optional[str] = None, 

622 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

623 cache: Optional[CacheInterface] = None, 

624 cache_config: Optional[CacheConfig] = None, 

625 event_dispatcher: Optional[EventDispatcher] = None, 

626 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

627 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

628 **kwargs, 

629 ): 

630 """ 

631 Initialize a new RedisCluster client. 

632 

633 :param startup_nodes: 

634 List of nodes from which initial bootstrapping can be done 

635 :param host: 

636 Can be used to point to a startup node 

637 :param port: 

638 Can be used to point to a startup node 

639 :param require_full_coverage: 

640 When set to False (default value): the client will not require a 

641 full coverage of the slots. However, if not all slots are covered, 

642 and at least one node has 'cluster-require-full-coverage' set to 

643 'yes,' the server will throw a ClusterDownError for some key-based 

644 commands. See - 

645 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

646 When set to True: all slots must be covered to construct the 

647 cluster client. If not all slots are covered, RedisClusterException 

648 will be thrown. 

649 :param read_from_replicas: 

650 @deprecated - please use load_balancing_strategy instead 

651 Enable read from replicas in READONLY mode. You can read possibly 

652 stale data. 

653 When set to true, read commands will be assigned between the 

654 primary and its replications in a Round-Robin manner. 

655 :param load_balancing_strategy: 

656 Enable read from replicas in READONLY mode and defines the load balancing 

657 strategy that will be used for cluster node selection. 

658 The data read from replicas is eventually consistent with the data in primary nodes. 

659 :param dynamic_startup_nodes: 

660 Set the RedisCluster's startup nodes to all of the discovered nodes. 

661 If true (default value), the cluster's discovered nodes will be used to 

662 determine the cluster nodes-slots mapping in the next topology refresh. 

663 It will remove the initial passed startup nodes if their endpoints aren't 

664 listed in the CLUSTER SLOTS output. 

665 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

666 specific IP addresses, it is best to set it to false. 

667 :param cluster_error_retry_attempts: 

668 @deprecated - Please configure the 'retry' object instead 

669 In case 'retry' object is set - this argument is ignored! 

670 

671 Number of times to retry before raising an error when 

672 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or 

673 :class:`~.ClusterDownError` are encountered 

674 :param retry: 

675 A retry object that defines the retry strategy and the number of 

676 retries for the cluster client. 

677 In current implementation for the cluster client (starting form redis-py version 6.0.0) 

678 the retry object is not yet fully utilized, instead it is used just to determine 

679 the number of retries for the cluster client. 

680 In the future releases the retry object will be used to handle the cluster client retries! 

681 :param reinitialize_steps: 

682 Specifies the number of MOVED errors that need to occur before 

683 reinitializing the whole cluster topology. If a MOVED error occurs 

684 and the cluster does not need to be reinitialized on this current 

685 error handling, only the MOVED slot will be patched with the 

686 redirected node. 

687 To reinitialize the cluster on every MOVED error, set 

688 reinitialize_steps to 1. 

689 To avoid reinitializing the cluster on moved errors, set 

690 reinitialize_steps to 0. 

691 :param address_remap: 

692 An optional callable which, when provided with an internal network 

693 address of a node, e.g. a `(host, port)` tuple, will return the address 

694 where the node is reachable. This can be used to map the addresses at 

695 which the nodes _think_ they are, to addresses at which a client may 

696 reach them, such as when they sit behind a proxy. 

697 

698 :param maint_notifications_config: 

699 Configures the nodes connections to support maintenance notifications - see 

700 `redis.maint_notifications.MaintNotificationsConfig` for details. 

701 Only supported with RESP3. 

702 If not provided and protocol is RESP3, the maintenance notifications 

703 will be enabled by default (logic is included in the NodesManager 

704 initialization). 

705 :**kwargs: 

706 Extra arguments that will be sent into Redis instance when created 

707 (See Official redis-py doc for supported kwargs - the only limitation 

708 is that you can't provide 'retry' object as part of kwargs. 

709 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

710 Some kwargs are not supported and will raise a 

711 RedisClusterException: 

712 - db (Redis do not support database SELECT in cluster mode) 

713 

714 """ 

715 if startup_nodes is None: 

716 startup_nodes = [] 

717 

718 if "db" in kwargs: 

719 # Argument 'db' is not possible to use in cluster mode 

720 raise RedisClusterException( 

721 "Argument 'db' is not possible to use in cluster mode" 

722 ) 

723 

724 if "retry" in kwargs: 

725 # Argument 'retry' is not possible to be used in kwargs when in cluster mode 

726 # the kwargs are set to the lower level connections to the cluster nodes 

727 # and there we provide retry configuration without retries allowed. 

728 # The retries should be handled on cluster client level. 

729 raise RedisClusterException( 

730 "The 'retry' argument cannot be used in kwargs when running in cluster mode." 

731 ) 

732 

733 # Get the startup node/s 

734 from_url = False 

735 if url is not None: 

736 from_url = True 

737 url_options = parse_url(url) 

738 if "path" in url_options: 

739 raise RedisClusterException( 

740 "RedisCluster does not currently support Unix Domain " 

741 "Socket connections" 

742 ) 

743 if "db" in url_options and url_options["db"] != 0: 

744 # Argument 'db' is not possible to use in cluster mode 

745 raise RedisClusterException( 

746 "A ``db`` querystring option can only be 0 in cluster mode" 

747 ) 

748 kwargs.update(url_options) 

749 host = kwargs.get("host") 

750 port = kwargs.get("port", port) 

751 startup_nodes.append(ClusterNode(host, port)) 

752 elif host is not None and port is not None: 

753 startup_nodes.append(ClusterNode(host, port)) 

754 elif len(startup_nodes) == 0: 

755 # No startup node was provided 

756 raise RedisClusterException( 

757 "RedisCluster requires at least one node to discover the " 

758 "cluster. Please provide one of the followings:\n" 

759 "1. host and port, for example:\n" 

760 " RedisCluster(host='localhost', port=6379)\n" 

761 "2. list of startup nodes, for example:\n" 

762 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

763 " ClusterNode('localhost', 6378)])" 

764 ) 

765 # Update the connection arguments 

766 # Whenever a new connection is established, RedisCluster's on_connect 

767 # method should be run 

768 # If the user passed on_connect function we'll save it and run it 

769 # inside the RedisCluster.on_connect() function 

770 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

771 kwargs.update({"redis_connect_func": self.on_connect}) 

772 kwargs = cleanup_kwargs(**kwargs) 

773 if retry: 

774 self.retry = retry 

775 else: 

776 self.retry = Retry( 

777 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

778 retries=cluster_error_retry_attempts, 

779 ) 

780 

781 self.encoder = Encoder( 

782 kwargs.get("encoding", "utf-8"), 

783 kwargs.get("encoding_errors", "strict"), 

784 kwargs.get("decode_responses", False), 

785 ) 

786 protocol = kwargs.get("protocol", None) 

787 if (cache_config or cache) and not check_protocol_version(protocol, 3): 

788 raise RedisError("Client caching is only supported with RESP version 3") 

789 

790 if maint_notifications_config and not check_protocol_version(protocol, 3): 

791 raise RedisError( 

792 "Maintenance notifications are only supported with RESP version 3" 

793 ) 

794 if check_protocol_version(protocol, 3) and maint_notifications_config is None: 

795 maint_notifications_config = MaintNotificationsConfig() 

796 

797 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

798 self.node_flags = self.__class__.NODE_FLAGS.copy() 

799 self.read_from_replicas = read_from_replicas 

800 self.load_balancing_strategy = load_balancing_strategy 

801 self.reinitialize_counter = 0 

802 self.reinitialize_steps = reinitialize_steps 

803 if event_dispatcher is None: 

804 self._event_dispatcher = EventDispatcher() 

805 else: 

806 self._event_dispatcher = event_dispatcher 

807 self.startup_nodes = startup_nodes 

808 

809 self.nodes_manager = NodesManager( 

810 startup_nodes=startup_nodes, 

811 from_url=from_url, 

812 require_full_coverage=require_full_coverage, 

813 dynamic_startup_nodes=dynamic_startup_nodes, 

814 address_remap=address_remap, 

815 cache=cache, 

816 cache_config=cache_config, 

817 event_dispatcher=self._event_dispatcher, 

818 maint_notifications_config=maint_notifications_config, 

819 **kwargs, 

820 ) 

821 

822 self.cluster_response_callbacks = CaseInsensitiveDict( 

823 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

824 ) 

825 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

826 

827 # For backward compatibility, mapping from existing policies to new one 

828 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

829 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

830 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

831 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

832 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

833 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

834 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

835 } 

836 

837 self._policies_callback_mapping: dict[ 

838 Union[RequestPolicy, ResponsePolicy], Callable 

839 ] = { 

840 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

841 self.get_random_primary_or_all_nodes(command_name) 

842 ], 

843 RequestPolicy.DEFAULT_KEYED: lambda command, 

844 *args: self.get_nodes_from_slot(command, *args), 

845 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

846 RequestPolicy.ALL_SHARDS: self.get_primaries, 

847 RequestPolicy.ALL_NODES: self.get_nodes, 

848 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

849 RequestPolicy.MULTI_SHARD: lambda *args, 

850 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

851 RequestPolicy.SPECIAL: self.get_special_nodes, 

852 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

853 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

854 } 

855 

856 self._policy_resolver = policy_resolver 

857 self.commands_parser = CommandsParser(self) 

858 

859 # Node where FT.AGGREGATE command is executed. 

860 self._aggregate_nodes = None 

861 self._lock = threading.RLock() 

862 

863 MaintNotificationsAbstractRedisCluster.__init__( 

864 self, maint_notifications_config, **kwargs 

865 ) 

866 

867 def __enter__(self): 

868 return self 

869 

870 def __exit__(self, exc_type, exc_value, traceback): 

871 self.close() 

872 

873 def __del__(self): 

874 try: 

875 self.close() 

876 except Exception: 

877 pass 

878 

879 def disconnect_connection_pools(self): 

880 for node in self.get_nodes(): 

881 if node.redis_connection: 

882 try: 

883 node.redis_connection.connection_pool.disconnect() 

884 except OSError: 

885 # Client was already disconnected. do nothing 

886 pass 

887 

888 def on_connect(self, connection): 

889 """ 

890 Initialize the connection, authenticate and select a database and send 

891 READONLY if it is set during object initialization. 

892 """ 

893 connection.on_connect() 

894 

895 if self.read_from_replicas or self.load_balancing_strategy: 

896 # Sending READONLY command to server to configure connection as 

897 # readonly. Since each cluster node may change its server type due 

898 # to a failover, we should establish a READONLY connection 

899 # regardless of the server type. If this is a primary connection, 

900 # READONLY would not affect executing write commands. 

901 connection.send_command("READONLY") 

902 if str_if_bytes(connection.read_response()) != "OK": 

903 raise ConnectionError("READONLY command failed") 

904 

905 if self.user_on_connect_func is not None: 

906 self.user_on_connect_func(connection) 

907 

908 def get_redis_connection(self, node: "ClusterNode") -> Redis: 

909 if not node.redis_connection: 

910 with self._lock: 

911 if not node.redis_connection: 

912 self.nodes_manager.create_redis_connections([node]) 

913 return node.redis_connection 

914 

915 def get_node(self, host=None, port=None, node_name=None): 

916 return self.nodes_manager.get_node(host, port, node_name) 

917 

918 def get_primaries(self): 

919 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

920 

921 def get_replicas(self): 

922 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

923 

924 def get_random_node(self): 

925 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

926 

927 def get_random_primary_or_all_nodes(self, command_name): 

928 """ 

929 Returns random primary or all nodes depends on READONLY mode. 

930 """ 

931 if self.read_from_replicas and command_name in READ_COMMANDS: 

932 return self.get_random_node() 

933 

934 return self.get_random_primary_node() 

935 

936 def get_nodes(self): 

937 return list(self.nodes_manager.nodes_cache.values()) 

938 

939 def get_node_from_key(self, key, replica=False): 

940 """ 

941 Get the node that holds the key's slot. 

942 If replica set to True but the slot doesn't have any replicas, None is 

943 returned. 

944 """ 

945 slot = self.keyslot(key) 

946 slot_cache = self.nodes_manager.slots_cache.get(slot) 

947 if slot_cache is None or len(slot_cache) == 0: 

948 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

949 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

950 return None 

951 elif replica: 

952 node_idx = 1 

953 else: 

954 # primary 

955 node_idx = 0 

956 

957 return slot_cache[node_idx] 

958 

959 def get_default_node(self): 

960 """ 

961 Get the cluster's default node 

962 """ 

963 return self.nodes_manager.default_node 

964 

965 def get_nodes_from_slot(self, command: str, *args): 

966 """ 

967 Returns a list of nodes that hold the specified keys' slots. 

968 """ 

969 # get the node that holds the key's slot 

970 slot = self.determine_slot(*args) 

971 node = self.nodes_manager.get_node_from_slot( 

972 slot, 

973 self.read_from_replicas and command in READ_COMMANDS, 

974 self.load_balancing_strategy if command in READ_COMMANDS else None, 

975 ) 

976 return [node] 

977 

978 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]: 

979 """ 

980 Splits the command with Multi-Shard policy, to the multiple commands 

981 """ 

982 keys = self._get_command_keys(*args) 

983 commands = [] 

984 

985 for key in keys: 

986 commands.append( 

987 { 

988 "args": (args[0], key), 

989 "kwargs": kwargs, 

990 } 

991 ) 

992 

993 return commands 

994 

995 def get_special_nodes(self) -> Optional[list["ClusterNode"]]: 

996 """ 

997 Returns a list of nodes for commands with a special policy. 

998 """ 

999 if not self._aggregate_nodes: 

1000 raise RedisClusterException( 

1001 "Cannot execute FT.CURSOR commands without FT.AGGREGATE" 

1002 ) 

1003 

1004 return self._aggregate_nodes 

1005 

1006 def get_random_primary_node(self) -> "ClusterNode": 

1007 """ 

1008 Returns a random primary node 

1009 """ 

1010 return random.choice(self.get_primaries()) 

1011 

1012 def _evaluate_all_succeeded(self, res): 

1013 """ 

1014 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED 

1015 """ 

1016 first_successful_response = None 

1017 

1018 if isinstance(res, dict): 

1019 for key, value in res.items(): 

1020 if value: 

1021 if first_successful_response is None: 

1022 first_successful_response = {key: value} 

1023 else: 

1024 return {key: False} 

1025 else: 

1026 for response in res: 

1027 if response: 

1028 if first_successful_response is None: 

1029 # Dynamically resolve type 

1030 first_successful_response = type(response)(response) 

1031 else: 

1032 return type(response)(False) 

1033 

1034 return first_successful_response 

1035 

1036 def set_default_node(self, node): 

1037 """ 

1038 Set the default node of the cluster. 

1039 :param node: 'ClusterNode' 

1040 :return True if the default node was set, else False 

1041 """ 

1042 if node is None or self.get_node(node_name=node.name) is None: 

1043 return False 

1044 self.nodes_manager.default_node = node 

1045 return True 

1046 

1047 def set_retry(self, retry: Retry) -> None: 

1048 self.retry = retry 

1049 

1050 def monitor(self, target_node=None): 

1051 """ 

1052 Returns a Monitor object for the specified target node. 

1053 The default cluster node will be selected if no target node was 

1054 specified. 

1055 Monitor is useful for handling the MONITOR command to the redis server. 

1056 next_command() method returns one command from monitor 

1057 listen() method yields commands from monitor. 

1058 """ 

1059 if target_node is None: 

1060 target_node = self.get_default_node() 

1061 if target_node.redis_connection is None: 

1062 raise RedisClusterException( 

1063 f"Cluster Node {target_node.name} has no redis_connection" 

1064 ) 

1065 return target_node.redis_connection.monitor() 

1066 

1067 def pubsub(self, node=None, host=None, port=None, **kwargs): 

1068 """ 

1069 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

1070 connected to the specified node 

1071 """ 

1072 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

1073 

1074 def pipeline(self, transaction=None, shard_hint=None): 

1075 """ 

1076 Cluster impl: 

1077 Pipelines do not work in cluster mode the same way they 

1078 do in normal mode. Create a clone of this object so 

1079 that simulating pipelines will work correctly. Each 

1080 command will be called directly when used and 

1081 when calling execute() will only return the result stack. 

1082 """ 

1083 if shard_hint: 

1084 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

1085 

1086 return ClusterPipeline( 

1087 nodes_manager=self.nodes_manager, 

1088 commands_parser=self.commands_parser, 

1089 startup_nodes=self.nodes_manager.startup_nodes, 

1090 result_callbacks=self.result_callbacks, 

1091 cluster_response_callbacks=self.cluster_response_callbacks, 

1092 read_from_replicas=self.read_from_replicas, 

1093 load_balancing_strategy=self.load_balancing_strategy, 

1094 reinitialize_steps=self.reinitialize_steps, 

1095 retry=self.retry, 

1096 lock=self._lock, 

1097 transaction=transaction, 

1098 ) 

1099 

1100 def lock( 

1101 self, 

1102 name, 

1103 timeout=None, 

1104 sleep=0.1, 

1105 blocking=True, 

1106 blocking_timeout=None, 

1107 lock_class=None, 

1108 thread_local=True, 

1109 raise_on_release_error: bool = True, 

1110 ): 

1111 """ 

1112 Return a new Lock object using key ``name`` that mimics 

1113 the behavior of threading.Lock. 

1114 

1115 If specified, ``timeout`` indicates a maximum life for the lock. 

1116 By default, it will remain locked until release() is called. 

1117 

1118 ``sleep`` indicates the amount of time to sleep per loop iteration 

1119 when the lock is in blocking mode and another client is currently 

1120 holding the lock. 

1121 

1122 ``blocking`` indicates whether calling ``acquire`` should block until 

1123 the lock has been acquired or to fail immediately, causing ``acquire`` 

1124 to return False and the lock not being acquired. Defaults to True. 

1125 Note this value can be overridden by passing a ``blocking`` 

1126 argument to ``acquire``. 

1127 

1128 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

1129 spend trying to acquire the lock. A value of ``None`` indicates 

1130 continue trying forever. ``blocking_timeout`` can be specified as a 

1131 float or integer, both representing the number of seconds to wait. 

1132 

1133 ``lock_class`` forces the specified lock implementation. Note that as 

1134 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

1135 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

1136 you have created your own custom lock class. 

1137 

1138 ``thread_local`` indicates whether the lock token is placed in 

1139 thread-local storage. By default, the token is placed in thread local 

1140 storage so that a thread only sees its token, not a token set by 

1141 another thread. Consider the following timeline: 

1142 

1143 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

1144 thread-1 sets the token to "abc" 

1145 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

1146 Lock instance. 

1147 time: 5, thread-1 has not yet completed. redis expires the lock 

1148 key. 

1149 time: 5, thread-2 acquired `my-lock` now that it's available. 

1150 thread-2 sets the token to "xyz" 

1151 time: 6, thread-1 finishes its work and calls release(). if the 

1152 token is *not* stored in thread local storage, then 

1153 thread-1 would see the token value as "xyz" and would be 

1154 able to successfully release the thread-2's lock. 

1155 

1156 ``raise_on_release_error`` indicates whether to raise an exception when 

1157 the lock is no longer owned when exiting the context manager. By default, 

1158 this is True, meaning an exception will be raised. If False, the warning 

1159 will be logged and the exception will be suppressed. 

1160 

1161 In some use cases it's necessary to disable thread local storage. For 

1162 example, if you have code where one thread acquires a lock and passes 

1163 that lock instance to a worker thread to release later. If thread 

1164 local storage isn't disabled in this case, the worker thread won't see 

1165 the token set by the thread that acquired the lock. Our assumption 

1166 is that these cases aren't common and as such default to using 

1167 thread local storage.""" 

1168 if lock_class is None: 

1169 lock_class = Lock 

1170 return lock_class( 

1171 self, 

1172 name, 

1173 timeout=timeout, 

1174 sleep=sleep, 

1175 blocking=blocking, 

1176 blocking_timeout=blocking_timeout, 

1177 thread_local=thread_local, 

1178 raise_on_release_error=raise_on_release_error, 

1179 ) 

1180 

1181 def set_response_callback(self, command, callback): 

1182 """Set a custom Response Callback""" 

1183 self.cluster_response_callbacks[command] = callback 

1184 

1185 def _determine_nodes( 

1186 self, *args, request_policy: RequestPolicy, **kwargs 

1187 ) -> List["ClusterNode"]: 

1188 """ 

1189 Determines a nodes the command should be executed on. 

1190 """ 

1191 command = args[0].upper() 

1192 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1193 command = f"{args[0]} {args[1]}".upper() 

1194 

1195 nodes_flag = kwargs.pop("nodes_flag", None) 

1196 if nodes_flag is not None: 

1197 # nodes flag passed by the user 

1198 command_flag = nodes_flag 

1199 else: 

1200 # get the nodes group for this command if it was predefined 

1201 command_flag = self.command_flags.get(command) 

1202 

1203 if command_flag in self._command_flags_mapping: 

1204 request_policy = self._command_flags_mapping[command_flag] 

1205 

1206 policy_callback = self._policies_callback_mapping[request_policy] 

1207 

1208 if request_policy == RequestPolicy.DEFAULT_KEYED: 

1209 nodes = policy_callback(command, *args) 

1210 elif request_policy == RequestPolicy.MULTI_SHARD: 

1211 nodes = policy_callback(*args, **kwargs) 

1212 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

1213 nodes = policy_callback(args[0]) 

1214 else: 

1215 nodes = policy_callback() 

1216 

1217 if args[0].lower() == "ft.aggregate": 

1218 self._aggregate_nodes = nodes 

1219 

1220 return nodes 

1221 

1222 def _should_reinitialized(self): 

1223 # To reinitialize the cluster on every MOVED error, 

1224 # set reinitialize_steps to 1. 

1225 # To avoid reinitializing the cluster on moved errors, set 

1226 # reinitialize_steps to 0. 

1227 if self.reinitialize_steps == 0: 

1228 return False 

1229 else: 

1230 return self.reinitialize_counter % self.reinitialize_steps == 0 

1231 

1232 def keyslot(self, key): 

1233 """ 

1234 Calculate keyslot for a given key. 

1235 See Keys distribution model in https://redis.io/topics/cluster-spec 

1236 """ 

1237 k = self.encoder.encode(key) 

1238 return key_slot(k) 

1239 

1240 def _get_command_keys(self, *args): 

1241 """ 

1242 Get the keys in the command. If the command has no keys in in, None is 

1243 returned. 

1244 

1245 NOTE: Due to a bug in redis<7.0, this function does not work properly 

1246 for EVAL or EVALSHA when the `numkeys` arg is 0. 

1247 - issue: https://github.com/redis/redis/issues/9493 

1248 - fix: https://github.com/redis/redis/pull/9733 

1249 

1250 So, don't use this function with EVAL or EVALSHA. 

1251 """ 

1252 redis_conn = self.get_default_node().redis_connection 

1253 return self.commands_parser.get_keys(redis_conn, *args) 

1254 

1255 def determine_slot(self, *args) -> Optional[int]: 

1256 """ 

1257 Figure out what slot to use based on args. 

1258 

1259 Raises a RedisClusterException if there's a missing key and we can't 

1260 determine what slots to map the command to; or, if the keys don't 

1261 all map to the same key slot. 

1262 """ 

1263 command = args[0] 

1264 if self.command_flags.get(command) == SLOT_ID: 

1265 # The command contains the slot ID 

1266 return args[1] 

1267 

1268 # Get the keys in the command 

1269 

1270 # CLIENT TRACKING is a special case. 

1271 # It doesn't have any keys, it needs to be sent to the provided nodes 

1272 # By default it will be sent to all nodes. 

1273 if command.upper() == "CLIENT TRACKING": 

1274 return None 

1275 

1276 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

1277 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

1278 # where `self._get_command_keys()` fails anyway. So, we special case 

1279 # EVAL/EVALSHA. 

1280 if command.upper() in ("EVAL", "EVALSHA"): 

1281 # command syntax: EVAL "script body" num_keys ... 

1282 if len(args) <= 2: 

1283 raise RedisClusterException(f"Invalid args in command: {args}") 

1284 num_actual_keys = int(args[2]) 

1285 eval_keys = args[3 : 3 + num_actual_keys] 

1286 # if there are 0 keys, that means the script can be run on any node 

1287 # so we can just return a random slot 

1288 if len(eval_keys) == 0: 

1289 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1290 keys = eval_keys 

1291 else: 

1292 keys = self._get_command_keys(*args) 

1293 if keys is None or len(keys) == 0: 

1294 # FCALL can call a function with 0 keys, that means the function 

1295 # can be run on any node so we can just return a random slot 

1296 if command.upper() in ("FCALL", "FCALL_RO"): 

1297 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1298 raise RedisClusterException( 

1299 "No way to dispatch this command to Redis Cluster. " 

1300 "Missing key.\nYou can execute the command by specifying " 

1301 f"target nodes.\nCommand: {args}" 

1302 ) 

1303 

1304 # single key command 

1305 if len(keys) == 1: 

1306 return self.keyslot(keys[0]) 

1307 

1308 # multi-key command; we need to make sure all keys are mapped to 

1309 # the same slot 

1310 slots = {self.keyslot(key) for key in keys} 

1311 if len(slots) != 1: 

1312 raise RedisClusterException( 

1313 f"{command} - all keys must map to the same key slot" 

1314 ) 

1315 

1316 return slots.pop() 

1317 

1318 def get_encoder(self): 

1319 """ 

1320 Get the connections' encoder 

1321 """ 

1322 return self.encoder 

1323 

1324 def get_connection_kwargs(self): 

1325 """ 

1326 Get the connections' key-word arguments 

1327 """ 

1328 return self.nodes_manager.connection_kwargs 

1329 

1330 def _is_nodes_flag(self, target_nodes): 

1331 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1332 

1333 def _parse_target_nodes(self, target_nodes): 

1334 if isinstance(target_nodes, list): 

1335 nodes = target_nodes 

1336 elif isinstance(target_nodes, ClusterNode): 

1337 # Supports passing a single ClusterNode as a variable 

1338 nodes = [target_nodes] 

1339 elif isinstance(target_nodes, dict): 

1340 # Supports dictionaries of the format {node_name: node}. 

1341 # It enables to execute commands with multi nodes as follows: 

1342 # rc.cluster_save_config(rc.get_primaries()) 

1343 nodes = target_nodes.values() 

1344 else: 

1345 raise TypeError( 

1346 "target_nodes type can be one of the following: " 

1347 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1348 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1349 f"The passed type is {type(target_nodes)}" 

1350 ) 

1351 return nodes 

1352 

1353 def execute_command(self, *args, **kwargs): 

1354 return self._internal_execute_command(*args, **kwargs) 

1355 

1356 def _internal_execute_command(self, *args, **kwargs): 

1357 """ 

1358 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1359 

1360 It will try the number of times specified by the retries property from 

1361 config option "self.retry" which defaults to 3 unless manually 

1362 configured. 

1363 

1364 If it reaches the number of times, the command will raise the exception 

1365 

1366 Key argument :target_nodes: can be passed with the following types: 

1367 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1368 ClusterNode 

1369 list<ClusterNode> 

1370 dict<Any, ClusterNode> 

1371 """ 

1372 target_nodes_specified = False 

1373 is_default_node = False 

1374 target_nodes = None 

1375 passed_targets = kwargs.pop("target_nodes", None) 

1376 command_policies = self._policy_resolver.resolve(args[0].lower()) 

1377 

1378 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1379 target_nodes = self._parse_target_nodes(passed_targets) 

1380 target_nodes_specified = True 

1381 

1382 if not command_policies and not target_nodes_specified: 

1383 command = args[0].upper() 

1384 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1385 command = f"{args[0]} {args[1]}".upper() 

1386 

1387 # We only could resolve key properties if command is not 

1388 # in a list of pre-defined request policies 

1389 command_flag = self.command_flags.get(command) 

1390 if not command_flag: 

1391 # Fallback to default policy 

1392 if not self.get_default_node(): 

1393 slot = None 

1394 else: 

1395 slot = self.determine_slot(*args) 

1396 if slot is None: 

1397 command_policies = CommandPolicies() 

1398 else: 

1399 command_policies = CommandPolicies( 

1400 request_policy=RequestPolicy.DEFAULT_KEYED, 

1401 response_policy=ResponsePolicy.DEFAULT_KEYED, 

1402 ) 

1403 else: 

1404 if command_flag in self._command_flags_mapping: 

1405 command_policies = CommandPolicies( 

1406 request_policy=self._command_flags_mapping[command_flag] 

1407 ) 

1408 else: 

1409 command_policies = CommandPolicies() 

1410 elif not command_policies and target_nodes_specified: 

1411 command_policies = CommandPolicies() 

1412 

1413 # If an error that allows retrying was thrown, the nodes and slots 

1414 # cache were reinitialized. We will retry executing the command with 

1415 # the updated cluster setup only when the target nodes can be 

1416 # determined again with the new cache tables. Therefore, when target 

1417 # nodes were passed to this function, we cannot retry the command 

1418 # execution since the nodes may not be valid anymore after the tables 

1419 # were reinitialized. So in case of passed target nodes, 

1420 # retry_attempts will be set to 0. 

1421 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries() 

1422 # Add one for the first execution 

1423 execute_attempts = 1 + retry_attempts 

1424 for _ in range(execute_attempts): 

1425 try: 

1426 res = {} 

1427 if not target_nodes_specified: 

1428 # Determine the nodes to execute the command on 

1429 target_nodes = self._determine_nodes( 

1430 *args, 

1431 request_policy=command_policies.request_policy, 

1432 nodes_flag=passed_targets, 

1433 ) 

1434 

1435 if not target_nodes: 

1436 raise RedisClusterException( 

1437 f"No targets were found to execute {args} command on" 

1438 ) 

1439 if ( 

1440 len(target_nodes) == 1 

1441 and target_nodes[0] == self.get_default_node() 

1442 ): 

1443 is_default_node = True 

1444 for node in target_nodes: 

1445 res[node.name] = self._execute_command(node, *args, **kwargs) 

1446 

1447 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED: 

1448 break 

1449 

1450 # Return the processed result 

1451 return self._process_result( 

1452 args[0], 

1453 res, 

1454 response_policy=command_policies.response_policy, 

1455 **kwargs, 

1456 ) 

1457 except Exception as e: 

1458 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1459 if is_default_node: 

1460 # Replace the default cluster node 

1461 self.replace_default_node() 

1462 # The nodes and slots cache were reinitialized. 

1463 # Try again with the new cluster setup. 

1464 retry_attempts -= 1 

1465 continue 

1466 else: 

1467 # raise the exception 

1468 raise e 

1469 

1470 def _execute_command(self, target_node, *args, **kwargs): 

1471 """ 

1472 Send a command to a node in the cluster 

1473 """ 

1474 command = args[0] 

1475 redis_node = None 

1476 connection = None 

1477 redirect_addr = None 

1478 asking = False 

1479 moved = False 

1480 ttl = int(self.RedisClusterRequestTTL) 

1481 

1482 while ttl > 0: 

1483 ttl -= 1 

1484 try: 

1485 if asking: 

1486 target_node = self.get_node(node_name=redirect_addr) 

1487 elif moved: 

1488 # MOVED occurred and the slots cache was updated, 

1489 # refresh the target node 

1490 slot = self.determine_slot(*args) 

1491 target_node = self.nodes_manager.get_node_from_slot( 

1492 slot, 

1493 self.read_from_replicas and command in READ_COMMANDS, 

1494 self.load_balancing_strategy 

1495 if command in READ_COMMANDS 

1496 else None, 

1497 ) 

1498 moved = False 

1499 

1500 redis_node = self.get_redis_connection(target_node) 

1501 connection = get_connection(redis_node) 

1502 if asking: 

1503 connection.send_command("ASKING") 

1504 redis_node.parse_response(connection, "ASKING", **kwargs) 

1505 asking = False 

1506 connection.send_command(*args, **kwargs) 

1507 response = redis_node.parse_response(connection, command, **kwargs) 

1508 

1509 # Remove keys entry, it needs only for cache. 

1510 kwargs.pop("keys", None) 

1511 

1512 if command in self.cluster_response_callbacks: 

1513 response = self.cluster_response_callbacks[command]( 

1514 response, **kwargs 

1515 ) 

1516 return response 

1517 except AuthenticationError: 

1518 raise 

1519 except MaxConnectionsError: 

1520 # MaxConnectionsError indicates client-side resource exhaustion 

1521 # (too many connections in the pool), not a node failure. 

1522 # Don't treat this as a node failure - just re-raise the error 

1523 # without reinitializing the cluster. 

1524 raise 

1525 except (ConnectionError, TimeoutError) as e: 

1526 # ConnectionError can also be raised if we couldn't get a 

1527 # connection from the pool before timing out, so check that 

1528 # this is an actual connection before attempting to disconnect. 

1529 if connection is not None: 

1530 connection.disconnect() 

1531 

1532 # Instead of setting to None, properly handle the pool 

1533 # Get the pool safely - redis_connection could be set to None 

1534 # by another thread between the check and access 

1535 redis_conn = target_node.redis_connection 

1536 if redis_conn is not None: 

1537 pool = redis_conn.connection_pool 

1538 if pool is not None: 

1539 with pool._lock: 

1540 # take care for the active connections in the pool 

1541 pool.update_active_connections_for_reconnect() 

1542 # disconnect all free connections 

1543 pool.disconnect_free_connections() 

1544 

1545 # Move the failed node to the end of the cached nodes list 

1546 self.nodes_manager.move_node_to_end_of_cached_nodes(target_node.name) 

1547 

1548 # DON'T set redis_connection = None - keep the pool for reuse 

1549 self.nodes_manager.initialize() 

1550 raise e 

1551 except MovedError as e: 

1552 if is_debug_log_enabled(): 

1553 socket_address = self._extracts_socket_address(connection) 

1554 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1555 logger.debug( 

1556 f"MOVED error received for command {args_log_str}, on node {target_node.name}, " 

1557 f"and connection: {connection} using local socket address: {socket_address}, error: {e}" 

1558 ) 

1559 # First, we will try to patch the slots/nodes cache with the 

1560 # redirected node output and try again. If MovedError exceeds 

1561 # 'reinitialize_steps' number of times, we will force 

1562 # reinitializing the tables, and then try again. 

1563 # 'reinitialize_steps' counter will increase faster when 

1564 # the same client object is shared between multiple threads. To 

1565 # reduce the frequency you can set this variable in the 

1566 # RedisCluster constructor. 

1567 self.reinitialize_counter += 1 

1568 if self._should_reinitialized(): 

1569 # during this call all connections are closed or marked for disconnect, 

1570 # so we don't need to disconnect the changed node's connections 

1571 self.nodes_manager.initialize( 

1572 additional_startup_nodes_info=[(e.host, e.port)] 

1573 ) 

1574 # Reset the counter 

1575 self.reinitialize_counter = 0 

1576 else: 

1577 self.nodes_manager.move_slot(e) 

1578 moved = True 

1579 except TryAgainError: 

1580 if is_debug_log_enabled(): 

1581 socket_address = self._extracts_socket_address(connection) 

1582 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1583 logger.debug( 

1584 f"TRYAGAIN error received for command {args_log_str}, on node {target_node.name}, " 

1585 f"and connection: {connection} using local socket address: {socket_address}" 

1586 ) 

1587 if ttl < self.RedisClusterRequestTTL / 2: 

1588 time.sleep(0.05) 

1589 except AskError as e: 

1590 if is_debug_log_enabled(): 

1591 socket_address = self._extracts_socket_address(connection) 

1592 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1593 logger.debug( 

1594 f"ASK error received for command {args_log_str}, on node {target_node.name}, " 

1595 f"and connection: {connection} using local socket address: {socket_address}, error: {e}" 

1596 ) 

1597 redirect_addr = get_node_name(host=e.host, port=e.port) 

1598 asking = True 

1599 except (ClusterDownError, SlotNotCoveredError): 

1600 # ClusterDownError can occur during a failover and to get 

1601 # self-healed, we will try to reinitialize the cluster layout 

1602 # and retry executing the command 

1603 

1604 # SlotNotCoveredError can occur when the cluster is not fully 

1605 # initialized or can be temporary issue. 

1606 # We will try to reinitialize the cluster topology 

1607 # and retry executing the command 

1608 

1609 time.sleep(0.25) 

1610 self.nodes_manager.initialize() 

1611 raise 

1612 except ResponseError: 

1613 raise 

1614 except Exception as e: 

1615 if connection: 

1616 connection.disconnect() 

1617 raise e 

1618 finally: 

1619 if connection is not None: 

1620 redis_node.connection_pool.release(connection) 

1621 

1622 raise ClusterError("TTL exhausted.") 

1623 

1624 def _extracts_socket_address( 

1625 self, connection: Optional[Connection] 

1626 ) -> Optional[int]: 

1627 if connection is None: 

1628 return None 

1629 try: 

1630 socket_address = ( 

1631 connection._sock.getsockname() if connection._sock else None 

1632 ) 

1633 socket_address = socket_address[1] if socket_address else None 

1634 except (AttributeError, OSError): 

1635 pass 

1636 return socket_address 

1637 

1638 def close(self) -> None: 

1639 try: 

1640 with self._lock: 

1641 if self.nodes_manager: 

1642 self.nodes_manager.close() 

1643 except AttributeError: 

1644 # RedisCluster's __init__ can fail before nodes_manager is set 

1645 pass 

1646 

1647 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs): 

1648 """ 

1649 Process the result of the executed command. 

1650 The function would return a dict or a single value. 

1651 

1652 :type command: str 

1653 :type res: dict 

1654 

1655 `res` should be in the following format: 

1656 Dict<node_name, command_result> 

1657 """ 

1658 if command in self.result_callbacks: 

1659 res = self.result_callbacks[command](command, res, **kwargs) 

1660 elif len(res) == 1: 

1661 # When we execute the command on a single node, we can 

1662 # remove the dictionary and return a single response 

1663 res = list(res.values())[0] 

1664 

1665 return self._policies_callback_mapping[response_policy](res) 

1666 

1667 def load_external_module(self, funcname, func): 

1668 """ 

1669 This function can be used to add externally defined redis modules, 

1670 and their namespaces to the redis client. 

1671 

1672 ``funcname`` - A string containing the name of the function to create 

1673 ``func`` - The function, being added to this class. 

1674 """ 

1675 setattr(self, funcname, func) 

1676 

1677 def transaction(self, func, *watches, **kwargs): 

1678 """ 

1679 Convenience method for executing the callable `func` as a transaction 

1680 while watching all keys specified in `watches`. The 'func' callable 

1681 should expect a single argument which is a Pipeline object. 

1682 """ 

1683 shard_hint = kwargs.pop("shard_hint", None) 

1684 value_from_callable = kwargs.pop("value_from_callable", False) 

1685 watch_delay = kwargs.pop("watch_delay", None) 

1686 with self.pipeline(True, shard_hint) as pipe: 

1687 while True: 

1688 try: 

1689 if watches: 

1690 pipe.watch(*watches) 

1691 func_value = func(pipe) 

1692 exec_value = pipe.execute() 

1693 return func_value if value_from_callable else exec_value 

1694 except WatchError: 

1695 if watch_delay is not None and watch_delay > 0: 

1696 time.sleep(watch_delay) 

1697 continue 

1698 

1699 

1700class ClusterNode: 

1701 def __init__(self, host, port, server_type=None, redis_connection=None): 

1702 if host == "localhost": 

1703 host = socket.gethostbyname(host) 

1704 

1705 self.host = host 

1706 self.port = port 

1707 self.name = get_node_name(host, port) 

1708 self.server_type = server_type 

1709 self.redis_connection = redis_connection 

1710 

1711 def __repr__(self): 

1712 return ( 

1713 f"[host={self.host}," 

1714 f"port={self.port}," 

1715 f"name={self.name}," 

1716 f"server_type={self.server_type}," 

1717 f"redis_connection={self.redis_connection}]" 

1718 ) 

1719 

1720 def __eq__(self, obj): 

1721 return isinstance(obj, ClusterNode) and obj.name == self.name 

1722 

1723 def __hash__(self): 

1724 return hash(self.name) 

1725 

1726 

1727class LoadBalancingStrategy(Enum): 

1728 ROUND_ROBIN = "round_robin" 

1729 ROUND_ROBIN_REPLICAS = "round_robin_replicas" 

1730 RANDOM_REPLICA = "random_replica" 

1731 

1732 

1733class LoadBalancer: 

1734 """ 

1735 Round-Robin Load Balancing 

1736 """ 

1737 

1738 def __init__(self, start_index: int = 0) -> None: 

1739 self.primary_to_idx: dict[str, int] = {} 

1740 self.start_index: int = start_index 

1741 self._lock: threading.Lock = threading.Lock() 

1742 

1743 def get_server_index( 

1744 self, 

1745 primary: str, 

1746 list_size: int, 

1747 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN, 

1748 ) -> int: 

1749 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA: 

1750 return self._get_random_replica_index(list_size) 

1751 else: 

1752 return self._get_round_robin_index( 

1753 primary, 

1754 list_size, 

1755 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, 

1756 ) 

1757 

1758 def reset(self) -> None: 

1759 with self._lock: 

1760 self.primary_to_idx.clear() 

1761 

1762 def _get_random_replica_index(self, list_size: int) -> int: 

1763 return random.randint(1, list_size - 1) 

1764 

1765 def _get_round_robin_index( 

1766 self, primary: str, list_size: int, replicas_only: bool 

1767 ) -> int: 

1768 with self._lock: 

1769 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

1770 if replicas_only and server_index == 0: 

1771 # skip the primary node index 

1772 server_index = 1 

1773 # Update the index for the next round 

1774 self.primary_to_idx[primary] = (server_index + 1) % list_size 

1775 return server_index 

1776 

1777 

1778class NodesManager: 

1779 def __init__( 

1780 self, 

1781 startup_nodes: list[ClusterNode], 

1782 from_url=False, 

1783 require_full_coverage=False, 

1784 lock: Optional[threading.RLock] = None, 

1785 dynamic_startup_nodes=True, 

1786 connection_pool_class=ConnectionPool, 

1787 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

1788 cache: Optional[CacheInterface] = None, 

1789 cache_config: Optional[CacheConfig] = None, 

1790 cache_factory: Optional[CacheFactoryInterface] = None, 

1791 event_dispatcher: Optional[EventDispatcher] = None, 

1792 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

1793 **kwargs, 

1794 ): 

1795 self.nodes_cache: dict[str, ClusterNode] = {} 

1796 self.slots_cache: dict[int, list[ClusterNode]] = {} 

1797 self.startup_nodes: dict[str, ClusterNode] = {n.name: n for n in startup_nodes} 

1798 self.default_node: Optional[ClusterNode] = None 

1799 self._epoch: int = 0 

1800 self.from_url = from_url 

1801 self._require_full_coverage = require_full_coverage 

1802 self._dynamic_startup_nodes = dynamic_startup_nodes 

1803 self.connection_pool_class = connection_pool_class 

1804 self.address_remap = address_remap 

1805 self._cache: Optional[CacheInterface] = None 

1806 if cache: 

1807 self._cache = cache 

1808 elif cache_factory is not None: 

1809 self._cache = cache_factory.get_cache() 

1810 elif cache_config is not None: 

1811 self._cache = CacheFactory(cache_config).get_cache() 

1812 self.connection_kwargs = kwargs 

1813 self.read_load_balancer = LoadBalancer() 

1814 

1815 # nodes_cache / slots_cache / startup_nodes / default_node are protected by _lock 

1816 if lock is None: 

1817 self._lock = threading.RLock() 

1818 else: 

1819 self._lock = lock 

1820 

1821 # initialize holds _initialization_lock to dedup multiple calls to reinitialize; 

1822 # note that if we hold both _lock and _initialization_lock, we _must_ acquire 

1823 # _initialization_lock first (ie: to have a consistent order) to avoid deadlock. 

1824 self._initialization_lock: threading.RLock = threading.RLock() 

1825 

1826 if event_dispatcher is None: 

1827 self._event_dispatcher = EventDispatcher() 

1828 else: 

1829 self._event_dispatcher = event_dispatcher 

1830 self._credential_provider = self.connection_kwargs.get( 

1831 "credential_provider", None 

1832 ) 

1833 self.maint_notifications_config = maint_notifications_config 

1834 

1835 self.initialize() 

1836 

1837 def get_node( 

1838 self, 

1839 host: Optional[str] = None, 

1840 port: Optional[int] = None, 

1841 node_name: Optional[str] = None, 

1842 ) -> Optional[ClusterNode]: 

1843 """ 

1844 Get the requested node from the cluster's nodes. 

1845 nodes. 

1846 :return: ClusterNode if the node exists, else None 

1847 """ 

1848 if host and port: 

1849 # the user passed host and port 

1850 if host == "localhost": 

1851 host = socket.gethostbyname(host) 

1852 with self._lock: 

1853 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1854 elif node_name: 

1855 with self._lock: 

1856 return self.nodes_cache.get(node_name) 

1857 else: 

1858 return None 

1859 

1860 def move_slot(self, e: Union[AskError, MovedError]): 

1861 """ 

1862 Update the slot's node with the redirected one 

1863 """ 

1864 with self._lock: 

1865 redirected_node = self.get_node(host=e.host, port=e.port) 

1866 if redirected_node is not None: 

1867 # The node already exists 

1868 if redirected_node.server_type is not PRIMARY: 

1869 # Update the node's server type 

1870 redirected_node.server_type = PRIMARY 

1871 else: 

1872 # This is a new node, we will add it to the nodes cache 

1873 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

1874 self.nodes_cache[redirected_node.name] = redirected_node 

1875 

1876 slot_nodes = self.slots_cache[e.slot_id] 

1877 if redirected_node not in slot_nodes: 

1878 # The new slot owner is a new server, or a server from a different 

1879 # shard. We need to remove all current nodes from the slot's list 

1880 # (including replications) and add just the new node. 

1881 self.slots_cache[e.slot_id] = [redirected_node] 

1882 elif redirected_node is not slot_nodes[0]: 

1883 # The MOVED error resulted from a failover, and the new slot owner 

1884 # had previously been a replica. 

1885 old_primary = slot_nodes[0] 

1886 # Update the old primary to be a replica and add it to the end of 

1887 # the slot's node list 

1888 old_primary.server_type = REPLICA 

1889 slot_nodes.append(old_primary) 

1890 # Remove the old replica, which is now a primary, from the slot's 

1891 # node list 

1892 slot_nodes.remove(redirected_node) 

1893 # Override the old primary with the new one 

1894 slot_nodes[0] = redirected_node 

1895 if self.default_node == old_primary: 

1896 # Update the default node with the new primary 

1897 self.default_node = redirected_node 

1898 # else: circular MOVED to current primary -> no-op 

1899 

1900 @deprecated_args( 

1901 args_to_warn=["server_type"], 

1902 reason=( 

1903 "In case you need select some load balancing strategy " 

1904 "that will use replicas, please set it through 'load_balancing_strategy'" 

1905 ), 

1906 version="5.3.0", 

1907 ) 

1908 def get_node_from_slot( 

1909 self, 

1910 slot: int, 

1911 read_from_replicas: bool = False, 

1912 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

1913 server_type: Optional[Literal["primary", "replica"]] = None, 

1914 ) -> ClusterNode: 

1915 """ 

1916 Gets a node that servers this hash slot 

1917 """ 

1918 

1919 if read_from_replicas is True and load_balancing_strategy is None: 

1920 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN 

1921 

1922 with self._lock: 

1923 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

1924 raise SlotNotCoveredError( 

1925 f'Slot "{slot}" not covered by the cluster. ' 

1926 + f'"require_full_coverage={self._require_full_coverage}"' 

1927 ) 

1928 

1929 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: 

1930 # get the server index using the strategy defined in load_balancing_strategy 

1931 primary_name = self.slots_cache[slot][0].name 

1932 node_idx = self.read_load_balancer.get_server_index( 

1933 primary_name, len(self.slots_cache[slot]), load_balancing_strategy 

1934 ) 

1935 elif ( 

1936 server_type is None 

1937 or server_type == PRIMARY 

1938 or len(self.slots_cache[slot]) == 1 

1939 ): 

1940 # return a primary 

1941 node_idx = 0 

1942 else: 

1943 # return a replica 

1944 # randomly choose one of the replicas 

1945 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

1946 

1947 return self.slots_cache[slot][node_idx] 

1948 

1949 def get_nodes_by_server_type(self, server_type: Literal["primary", "replica"]): 

1950 """ 

1951 Get all nodes with the specified server type 

1952 :param server_type: 'primary' or 'replica' 

1953 :return: list of ClusterNode 

1954 """ 

1955 with self._lock: 

1956 return [ 

1957 node 

1958 for node in self.nodes_cache.values() 

1959 if node.server_type == server_type 

1960 ] 

1961 

1962 @deprecated_function( 

1963 reason="This method is not used anymore internally. The startup nodes are populated automatically.", 

1964 version="7.0.2", 

1965 ) 

1966 def populate_startup_nodes(self, nodes): 

1967 """ 

1968 Populate all startup nodes and filters out any duplicates 

1969 """ 

1970 with self._lock: 

1971 for n in nodes: 

1972 self.startup_nodes[n.name] = n 

1973 

1974 def move_node_to_end_of_cached_nodes(self, node_name: str) -> None: 

1975 """ 

1976 Move a failing node to the end of startup_nodes and nodes_cache so it's 

1977 tried last during reinitialization and when selecting the default node. 

1978 If the node is not in the respective list, nothing is done. 

1979 """ 

1980 # Move in startup_nodes 

1981 if node_name in self.startup_nodes and len(self.startup_nodes) > 1: 

1982 node = self.startup_nodes.pop(node_name) 

1983 self.startup_nodes[node_name] = node # Re-insert at end 

1984 

1985 # Move in nodes_cache - this affects get_nodes_by_server_type ordering 

1986 # which is used to select the default_node during initialize() 

1987 if node_name in self.nodes_cache and len(self.nodes_cache) > 1: 

1988 node = self.nodes_cache.pop(node_name) 

1989 self.nodes_cache[node_name] = node # Re-insert at end 

1990 

1991 def check_slots_coverage(self, slots_cache): 

1992 # Validate if all slots are covered or if we should try next 

1993 # startup node 

1994 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

1995 if i not in slots_cache: 

1996 return False 

1997 return True 

1998 

1999 def create_redis_connections(self, nodes): 

2000 """ 

2001 This function will create a redis connection to all nodes in :nodes: 

2002 """ 

2003 connection_pools = [] 

2004 for node in nodes: 

2005 if node.redis_connection is None: 

2006 node.redis_connection = self.create_redis_node( 

2007 host=node.host, 

2008 port=node.port, 

2009 maint_notifications_config=self.maint_notifications_config, 

2010 **self.connection_kwargs, 

2011 ) 

2012 connection_pools.append(node.redis_connection.connection_pool) 

2013 

2014 self._event_dispatcher.dispatch( 

2015 AfterPooledConnectionsInstantiationEvent( 

2016 connection_pools, ClientType.SYNC, self._credential_provider 

2017 ) 

2018 ) 

2019 

2020 def create_redis_node( 

2021 self, 

2022 host, 

2023 port, 

2024 **kwargs, 

2025 ): 

2026 # We are configuring the connection pool not to retry 

2027 # connections on lower level clients to avoid retrying 

2028 # connections to nodes that are not reachable 

2029 # and to avoid blocking the connection pool. 

2030 # The only error that will have some handling in the lower 

2031 # level clients is ConnectionError which will trigger disconnection 

2032 # of the socket. 

2033 # The retries will be handled on cluster client level 

2034 # where we will have proper handling of the cluster topology 

2035 node_retry_config = Retry( 

2036 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) 

2037 ) 

2038 

2039 if self.from_url: 

2040 # Create a redis node with a custom connection pool 

2041 kwargs.update({"host": host}) 

2042 kwargs.update({"port": port}) 

2043 kwargs.update({"cache": self._cache}) 

2044 kwargs.update({"retry": node_retry_config}) 

2045 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

2046 else: 

2047 r = Redis( 

2048 host=host, 

2049 port=port, 

2050 cache=self._cache, 

2051 retry=node_retry_config, 

2052 **kwargs, 

2053 ) 

2054 return r 

2055 

2056 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

2057 node_name = get_node_name(host, port) 

2058 # check if we already have this node in the tmp_nodes_cache 

2059 target_node = tmp_nodes_cache.get(node_name) 

2060 if target_node is None: 

2061 # before creating a new cluster node, check if the cluster node already 

2062 # exists in the current nodes cache and has a valid connection so we can 

2063 # reuse it 

2064 redis_connection: Optional[Redis] = None 

2065 with self._lock: 

2066 previous_node = self.nodes_cache.get(node_name) 

2067 if previous_node: 

2068 redis_connection = previous_node.redis_connection 

2069 # don't update the old ClusterNode, so we don't update its role 

2070 # outside of the lock 

2071 target_node = ClusterNode(host, port, role, redis_connection) 

2072 # add this node to the nodes cache 

2073 tmp_nodes_cache[target_node.name] = target_node 

2074 

2075 return target_node 

2076 

2077 def _get_epoch(self) -> int: 

2078 """ 

2079 Get the current epoch value. This method exists primarily to allow 

2080 tests to mock the epoch fetch and control race condition timing. 

2081 """ 

2082 with self._lock: 

2083 return self._epoch 

2084 

2085 def initialize( 

2086 self, 

2087 additional_startup_nodes_info: Optional[List[Tuple[str, int]]] = None, 

2088 disconnect_startup_nodes_pools: bool = True, 

2089 ): 

2090 """ 

2091 Initializes the nodes cache, slots cache and redis connections. 

2092 :startup_nodes: 

2093 Responsible for discovering other nodes in the cluster 

2094 :disconnect_startup_nodes_pools: 

2095 Whether to disconnect the connection pool of the startup nodes 

2096 after the initialization is complete. This is useful when the 

2097 startup nodes are not part of the cluster and we want to avoid 

2098 keeping the connection open. 

2099 :additional_startup_nodes_info: 

2100 Additional nodes to add temporarily to the startup nodes. 

2101 The additional nodes will be used just in the process of extraction of the slots 

2102 and nodes information from the cluster. 

2103 This is useful when we want to add new nodes to the cluster 

2104 and initialize the client 

2105 with them. 

2106 The format of the list is a list of tuples, where each tuple contains 

2107 the host and port of the node. 

2108 """ 

2109 self.reset() 

2110 tmp_nodes_cache = {} 

2111 tmp_slots = {} 

2112 disagreements = [] 

2113 startup_nodes_reachable = False 

2114 fully_covered = False 

2115 kwargs = self.connection_kwargs 

2116 exception = None 

2117 epoch = self._get_epoch() 

2118 if additional_startup_nodes_info is None: 

2119 additional_startup_nodes_info = [] 

2120 

2121 with self._initialization_lock: 

2122 with self._lock: 

2123 if epoch != self._epoch: 

2124 # another thread has already re-initialized the nodes; don't 

2125 # bother running again 

2126 return 

2127 

2128 with self._lock: 

2129 startup_nodes = tuple(self.startup_nodes.values()) 

2130 

2131 additional_startup_nodes = [ 

2132 ClusterNode(host, port) for host, port in additional_startup_nodes_info 

2133 ] 

2134 if is_debug_log_enabled(): 

2135 logger.debug( 

2136 f"Topology refresh: using additional nodes: {[node.name for node in additional_startup_nodes]}; " 

2137 f"and startup nodes: {[node.name for node in startup_nodes]}" 

2138 ) 

2139 

2140 for startup_node in (*startup_nodes, *additional_startup_nodes): 

2141 try: 

2142 if startup_node.redis_connection: 

2143 r = startup_node.redis_connection 

2144 

2145 else: 

2146 # Create a new Redis connection 

2147 if is_debug_log_enabled(): 

2148 socket_timeout = kwargs.get("socket_timeout", "not set") 

2149 socket_connect_timeout = kwargs.get( 

2150 "socket_connect_timeout", "not set" 

2151 ) 

2152 maint_enabled = ( 

2153 self.maint_notifications_config.enabled 

2154 if self.maint_notifications_config 

2155 else False 

2156 ) 

2157 logger.debug( 

2158 "Topology refresh: Creating new Redis connection to " 

2159 f"{startup_node.host}:{startup_node.port}; " 

2160 f"with socket_timeout: {socket_timeout}, and " 

2161 f"socket_connect_timeout: {socket_connect_timeout}, " 

2162 "and maint_notifications enabled: " 

2163 f"{maint_enabled}" 

2164 ) 

2165 r = self.create_redis_node( 

2166 startup_node.host, 

2167 startup_node.port, 

2168 maint_notifications_config=self.maint_notifications_config, 

2169 **kwargs, 

2170 ) 

2171 if startup_node in self.startup_nodes.values(): 

2172 self.startup_nodes[startup_node.name].redis_connection = r 

2173 else: 

2174 startup_node.redis_connection = r 

2175 try: 

2176 # Make sure cluster mode is enabled on this node 

2177 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

2178 if disconnect_startup_nodes_pools: 

2179 with r.connection_pool._lock: 

2180 # take care to clear connections before we move on 

2181 # mark all active connections for reconnect - they will be 

2182 # reconnected on next use, but will allow current in flight commands to complete first 

2183 r.connection_pool.update_active_connections_for_reconnect() 

2184 # Needed to clear READONLY state when it is no longer applicable 

2185 r.connection_pool.disconnect_free_connections() 

2186 except ResponseError: 

2187 raise RedisClusterException( 

2188 "Cluster mode is not enabled on this node" 

2189 ) 

2190 startup_nodes_reachable = True 

2191 except Exception as e: 

2192 # Try the next startup node. 

2193 # The exception is saved and raised only if we have no more nodes. 

2194 exception = e 

2195 continue 

2196 

2197 # CLUSTER SLOTS command results in the following output: 

2198 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

2199 # where each node contains the following list: [IP, port, node_id] 

2200 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

2201 # primary node of the first slot section. 

2202 # If there's only one server in the cluster, its ``host`` is '' 

2203 # Fix it to the host in startup_nodes 

2204 if ( 

2205 len(cluster_slots) == 1 

2206 and len(cluster_slots[0][2][0]) == 0 

2207 and len(self.startup_nodes) == 1 

2208 ): 

2209 cluster_slots[0][2][0] = startup_node.host 

2210 

2211 for slot in cluster_slots: 

2212 primary_node = slot[2] 

2213 host = str_if_bytes(primary_node[0]) 

2214 if host == "": 

2215 host = startup_node.host 

2216 port = int(primary_node[1]) 

2217 host, port = self.remap_host_port(host, port) 

2218 

2219 nodes_for_slot = [] 

2220 

2221 target_node = self._get_or_create_cluster_node( 

2222 host, port, PRIMARY, tmp_nodes_cache 

2223 ) 

2224 nodes_for_slot.append(target_node) 

2225 

2226 replica_nodes = slot[3:] 

2227 for replica_node in replica_nodes: 

2228 host = str_if_bytes(replica_node[0]) 

2229 port = int(replica_node[1]) 

2230 host, port = self.remap_host_port(host, port) 

2231 target_replica_node = self._get_or_create_cluster_node( 

2232 host, port, REPLICA, tmp_nodes_cache 

2233 ) 

2234 nodes_for_slot.append(target_replica_node) 

2235 

2236 for i in range(int(slot[0]), int(slot[1]) + 1): 

2237 if i not in tmp_slots: 

2238 tmp_slots[i] = nodes_for_slot 

2239 else: 

2240 # Validate that 2 nodes want to use the same slot cache 

2241 # setup 

2242 tmp_slot = tmp_slots[i][0] 

2243 if tmp_slot.name != target_node.name: 

2244 disagreements.append( 

2245 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

2246 ) 

2247 

2248 if len(disagreements) > 5: 

2249 raise RedisClusterException( 

2250 f"startup_nodes could not agree on a valid " 

2251 f"slots cache: {', '.join(disagreements)}" 

2252 ) 

2253 

2254 fully_covered = self.check_slots_coverage(tmp_slots) 

2255 if fully_covered: 

2256 # Don't need to continue to the next startup node if all 

2257 # slots are covered 

2258 break 

2259 

2260 if not startup_nodes_reachable: 

2261 raise RedisClusterException( 

2262 f"Redis Cluster cannot be connected. Please provide at least " 

2263 f"one reachable node: {str(exception)}" 

2264 ) from exception 

2265 

2266 # Create Redis connections to all nodes 

2267 self.create_redis_connections(list(tmp_nodes_cache.values())) 

2268 

2269 # Check if the slots are not fully covered 

2270 if not fully_covered and self._require_full_coverage: 

2271 # Despite the requirement that the slots be covered, there 

2272 # isn't a full coverage 

2273 raise RedisClusterException( 

2274 f"All slots are not covered after query all startup_nodes. " 

2275 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

2276 f"covered..." 

2277 ) 

2278 

2279 # Set the tmp variables to the real variables 

2280 with self._lock: 

2281 self.nodes_cache = tmp_nodes_cache 

2282 self.slots_cache = tmp_slots 

2283 # Set the default node 

2284 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

2285 if self._dynamic_startup_nodes: 

2286 # Populate the startup nodes with all discovered nodes 

2287 self.startup_nodes = tmp_nodes_cache 

2288 # Increment the epoch to signal that initialization has completed 

2289 self._epoch += 1 

2290 

2291 def close(self) -> None: 

2292 with self._lock: 

2293 self.default_node = None 

2294 nodes = tuple(self.nodes_cache.values()) 

2295 for node in nodes: 

2296 if node.redis_connection: 

2297 node.redis_connection.close() 

2298 

2299 def reset(self): 

2300 try: 

2301 self.read_load_balancer.reset() 

2302 except TypeError: 

2303 # The read_load_balancer is None, do nothing 

2304 pass 

2305 

2306 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

2307 """ 

2308 Remap the host and port returned from the cluster to a different 

2309 internal value. Useful if the client is not connecting directly 

2310 to the cluster. 

2311 """ 

2312 if self.address_remap: 

2313 return self.address_remap((host, port)) 

2314 return host, port 

2315 

2316 def find_connection_owner(self, connection: Connection) -> Optional[ClusterNode]: 

2317 node_name = get_node_name(connection.host, connection.port) 

2318 with self._lock: 

2319 for node in tuple(self.nodes_cache.values()): 

2320 if node.redis_connection: 

2321 conn_args = node.redis_connection.connection_pool.connection_kwargs 

2322 if node_name == get_node_name( 

2323 conn_args.get("host"), conn_args.get("port") 

2324 ): 

2325 return node 

2326 return None 

2327 

2328 

2329class ClusterPubSub(PubSub): 

2330 """ 

2331 Wrapper for PubSub class. 

2332 

2333 IMPORTANT: before using ClusterPubSub, read about the known limitations 

2334 with pubsub in Cluster mode and learn how to workaround them: 

2335 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html 

2336 """ 

2337 

2338 def __init__( 

2339 self, 

2340 redis_cluster, 

2341 node=None, 

2342 host=None, 

2343 port=None, 

2344 push_handler_func=None, 

2345 event_dispatcher: Optional["EventDispatcher"] = None, 

2346 **kwargs, 

2347 ): 

2348 """ 

2349 When a pubsub instance is created without specifying a node, a single 

2350 node will be transparently chosen for the pubsub connection on the 

2351 first command execution. The node will be determined by: 

2352 1. Hashing the channel name in the request to find its keyslot 

2353 2. Selecting a node that handles the keyslot: If read_from_replicas is 

2354 set to true or load_balancing_strategy is set, a replica can be selected. 

2355 

2356 :type redis_cluster: RedisCluster 

2357 :type node: ClusterNode 

2358 :type host: str 

2359 :type port: int 

2360 """ 

2361 self.node = None 

2362 self.set_pubsub_node(redis_cluster, node, host, port) 

2363 connection_pool = ( 

2364 None 

2365 if self.node is None 

2366 else redis_cluster.get_redis_connection(self.node).connection_pool 

2367 ) 

2368 self.cluster = redis_cluster 

2369 self.node_pubsub_mapping = {} 

2370 self._pubsubs_generator = self._pubsubs_generator() 

2371 if event_dispatcher is None: 

2372 self._event_dispatcher = EventDispatcher() 

2373 else: 

2374 self._event_dispatcher = event_dispatcher 

2375 super().__init__( 

2376 connection_pool=connection_pool, 

2377 encoder=redis_cluster.encoder, 

2378 push_handler_func=push_handler_func, 

2379 event_dispatcher=self._event_dispatcher, 

2380 **kwargs, 

2381 ) 

2382 

2383 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

2384 """ 

2385 The pubsub node will be set according to the passed node, host and port 

2386 When none of the node, host, or port are specified - the node is set 

2387 to None and will be determined by the keyslot of the channel in the 

2388 first command to be executed. 

2389 RedisClusterException will be thrown if the passed node does not exist 

2390 in the cluster. 

2391 If host is passed without port, or vice versa, a DataError will be 

2392 thrown. 

2393 :type cluster: RedisCluster 

2394 :type node: ClusterNode 

2395 :type host: str 

2396 :type port: int 

2397 """ 

2398 if node is not None: 

2399 # node is passed by the user 

2400 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

2401 pubsub_node = node 

2402 elif host is not None and port is not None: 

2403 # host and port passed by the user 

2404 node = cluster.get_node(host=host, port=port) 

2405 self._raise_on_invalid_node(cluster, node, host, port) 

2406 pubsub_node = node 

2407 elif any([host, port]) is True: 

2408 # only 'host' or 'port' passed 

2409 raise DataError("Passing a host requires passing a port, and vice versa") 

2410 else: 

2411 # nothing passed by the user. set node to None 

2412 pubsub_node = None 

2413 

2414 self.node = pubsub_node 

2415 

2416 def get_pubsub_node(self): 

2417 """ 

2418 Get the node that is being used as the pubsub connection 

2419 """ 

2420 return self.node 

2421 

2422 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

2423 """ 

2424 Raise a RedisClusterException if the node is None or doesn't exist in 

2425 the cluster. 

2426 """ 

2427 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

2428 raise RedisClusterException( 

2429 f"Node {host}:{port} doesn't exist in the cluster" 

2430 ) 

2431 

2432 def execute_command(self, *args): 

2433 """ 

2434 Execute a subscribe/unsubscribe command. 

2435 

2436 Taken code from redis-py and tweak to make it work within a cluster. 

2437 """ 

2438 # NOTE: don't parse the response in this function -- it could pull a 

2439 # legitimate message off the stack if the connection is already 

2440 # subscribed to one or more channels 

2441 

2442 if self.connection is None: 

2443 if self.connection_pool is None: 

2444 if len(args) > 1: 

2445 # Hash the first channel and get one of the nodes holding 

2446 # this slot 

2447 channel = args[1] 

2448 slot = self.cluster.keyslot(channel) 

2449 node = self.cluster.nodes_manager.get_node_from_slot( 

2450 slot, 

2451 self.cluster.read_from_replicas, 

2452 self.cluster.load_balancing_strategy, 

2453 ) 

2454 else: 

2455 # Get a random node 

2456 node = self.cluster.get_random_node() 

2457 self.node = node 

2458 redis_connection = self.cluster.get_redis_connection(node) 

2459 self.connection_pool = redis_connection.connection_pool 

2460 self.connection = self.connection_pool.get_connection() 

2461 # register a callback that re-subscribes to any channels we 

2462 # were listening to when we were disconnected 

2463 self.connection.register_connect_callback(self.on_connect) 

2464 if self.push_handler_func is not None: 

2465 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

2466 self._event_dispatcher.dispatch( 

2467 AfterPubSubConnectionInstantiationEvent( 

2468 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

2469 ) 

2470 ) 

2471 connection = self.connection 

2472 self._execute(connection, connection.send_command, *args) 

2473 

2474 def _get_node_pubsub(self, node): 

2475 try: 

2476 return self.node_pubsub_mapping[node.name] 

2477 except KeyError: 

2478 pubsub = node.redis_connection.pubsub( 

2479 push_handler_func=self.push_handler_func 

2480 ) 

2481 self.node_pubsub_mapping[node.name] = pubsub 

2482 return pubsub 

2483 

2484 def _sharded_message_generator(self): 

2485 for _ in range(len(self.node_pubsub_mapping)): 

2486 pubsub = next(self._pubsubs_generator) 

2487 message = pubsub.get_message() 

2488 if message is not None: 

2489 return message 

2490 return None 

2491 

2492 def _pubsubs_generator(self): 

2493 while True: 

2494 current_nodes = list(self.node_pubsub_mapping.values()) 

2495 yield from current_nodes 

2496 

2497 def get_sharded_message( 

2498 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None 

2499 ): 

2500 if target_node: 

2501 message = self.node_pubsub_mapping[target_node.name].get_message( 

2502 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout 

2503 ) 

2504 else: 

2505 message = self._sharded_message_generator() 

2506 if message is None: 

2507 return None 

2508 elif str_if_bytes(message["type"]) == "sunsubscribe": 

2509 if message["channel"] in self.pending_unsubscribe_shard_channels: 

2510 self.pending_unsubscribe_shard_channels.remove(message["channel"]) 

2511 self.shard_channels.pop(message["channel"], None) 

2512 node = self.cluster.get_node_from_key(message["channel"]) 

2513 if self.node_pubsub_mapping[node.name].subscribed is False: 

2514 self.node_pubsub_mapping.pop(node.name) 

2515 if not self.channels and not self.patterns and not self.shard_channels: 

2516 # There are no subscriptions anymore, set subscribed_event flag 

2517 # to false 

2518 self.subscribed_event.clear() 

2519 if self.ignore_subscribe_messages or ignore_subscribe_messages: 

2520 return None 

2521 return message 

2522 

2523 def ssubscribe(self, *args, **kwargs): 

2524 if args: 

2525 args = list_or_args(args[0], args[1:]) 

2526 s_channels = dict.fromkeys(args) 

2527 s_channels.update(kwargs) 

2528 for s_channel, handler in s_channels.items(): 

2529 node = self.cluster.get_node_from_key(s_channel) 

2530 pubsub = self._get_node_pubsub(node) 

2531 if handler: 

2532 pubsub.ssubscribe(**{s_channel: handler}) 

2533 else: 

2534 pubsub.ssubscribe(s_channel) 

2535 self.shard_channels.update(pubsub.shard_channels) 

2536 self.pending_unsubscribe_shard_channels.difference_update( 

2537 self._normalize_keys({s_channel: None}) 

2538 ) 

2539 if pubsub.subscribed and not self.subscribed: 

2540 self.subscribed_event.set() 

2541 self.health_check_response_counter = 0 

2542 

2543 def sunsubscribe(self, *args): 

2544 if args: 

2545 args = list_or_args(args[0], args[1:]) 

2546 else: 

2547 args = self.shard_channels 

2548 

2549 for s_channel in args: 

2550 node = self.cluster.get_node_from_key(s_channel) 

2551 p = self._get_node_pubsub(node) 

2552 p.sunsubscribe(s_channel) 

2553 self.pending_unsubscribe_shard_channels.update( 

2554 p.pending_unsubscribe_shard_channels 

2555 ) 

2556 

2557 def get_redis_connection(self): 

2558 """ 

2559 Get the Redis connection of the pubsub connected node. 

2560 """ 

2561 if self.node is not None: 

2562 return self.node.redis_connection 

2563 

2564 def disconnect(self): 

2565 """ 

2566 Disconnect the pubsub connection. 

2567 """ 

2568 if self.connection: 

2569 self.connection.disconnect() 

2570 for pubsub in self.node_pubsub_mapping.values(): 

2571 pubsub.connection.disconnect() 

2572 

2573 

2574class ClusterPipeline(RedisCluster): 

2575 """ 

2576 Support for Redis pipeline 

2577 in cluster mode 

2578 """ 

2579 

2580 ERRORS_ALLOW_RETRY = ( 

2581 ConnectionError, 

2582 TimeoutError, 

2583 MovedError, 

2584 AskError, 

2585 TryAgainError, 

2586 ) 

2587 

2588 NO_SLOTS_COMMANDS = {"UNWATCH"} 

2589 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

2590 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

2591 

2592 @deprecated_args( 

2593 args_to_warn=[ 

2594 "cluster_error_retry_attempts", 

2595 ], 

2596 reason="Please configure the 'retry' object instead", 

2597 version="6.0.0", 

2598 ) 

2599 def __init__( 

2600 self, 

2601 nodes_manager: "NodesManager", 

2602 commands_parser: "CommandsParser", 

2603 result_callbacks: Optional[Dict[str, Callable]] = None, 

2604 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

2605 startup_nodes: Optional[List["ClusterNode"]] = None, 

2606 read_from_replicas: bool = False, 

2607 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

2608 cluster_error_retry_attempts: int = 3, 

2609 reinitialize_steps: int = 5, 

2610 retry: Optional[Retry] = None, 

2611 lock=None, 

2612 transaction=False, 

2613 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

2614 **kwargs, 

2615 ): 

2616 """ """ 

2617 self.command_stack = [] 

2618 self.nodes_manager = nodes_manager 

2619 self.commands_parser = commands_parser 

2620 self.refresh_table_asap = False 

2621 self.result_callbacks = ( 

2622 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

2623 ) 

2624 self.startup_nodes = startup_nodes if startup_nodes else [] 

2625 self.read_from_replicas = read_from_replicas 

2626 self.load_balancing_strategy = load_balancing_strategy 

2627 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

2628 self.cluster_response_callbacks = cluster_response_callbacks 

2629 self.reinitialize_counter = 0 

2630 self.reinitialize_steps = reinitialize_steps 

2631 if retry is not None: 

2632 self.retry = retry 

2633 else: 

2634 self.retry = Retry( 

2635 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

2636 retries=cluster_error_retry_attempts, 

2637 ) 

2638 

2639 self.encoder = Encoder( 

2640 kwargs.get("encoding", "utf-8"), 

2641 kwargs.get("encoding_errors", "strict"), 

2642 kwargs.get("decode_responses", False), 

2643 ) 

2644 if lock is None: 

2645 lock = threading.RLock() 

2646 self._lock = lock 

2647 self.parent_execute_command = super().execute_command 

2648 self._execution_strategy: ExecutionStrategy = ( 

2649 PipelineStrategy(self) if not transaction else TransactionStrategy(self) 

2650 ) 

2651 

2652 # For backward compatibility, mapping from existing policies to new one 

2653 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

2654 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

2655 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

2656 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

2657 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

2658 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

2659 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

2660 } 

2661 

2662 self._policies_callback_mapping: dict[ 

2663 Union[RequestPolicy, ResponsePolicy], Callable 

2664 ] = { 

2665 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

2666 self.get_random_primary_or_all_nodes(command_name) 

2667 ], 

2668 RequestPolicy.DEFAULT_KEYED: lambda command, 

2669 *args: self.get_nodes_from_slot(command, *args), 

2670 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

2671 RequestPolicy.ALL_SHARDS: self.get_primaries, 

2672 RequestPolicy.ALL_NODES: self.get_nodes, 

2673 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

2674 RequestPolicy.MULTI_SHARD: lambda *args, 

2675 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

2676 RequestPolicy.SPECIAL: self.get_special_nodes, 

2677 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

2678 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

2679 } 

2680 

2681 self._policy_resolver = policy_resolver 

2682 

2683 def __repr__(self): 

2684 """ """ 

2685 return f"{type(self).__name__}" 

2686 

2687 def __enter__(self): 

2688 """ """ 

2689 return self 

2690 

2691 def __exit__(self, exc_type, exc_value, traceback): 

2692 """ """ 

2693 self.reset() 

2694 

2695 def __del__(self): 

2696 try: 

2697 self.reset() 

2698 except Exception: 

2699 pass 

2700 

2701 def __len__(self): 

2702 """ """ 

2703 return len(self._execution_strategy.command_queue) 

2704 

2705 def __bool__(self): 

2706 "Pipeline instances should always evaluate to True on Python 3+" 

2707 return True 

2708 

2709 def execute_command(self, *args, **kwargs): 

2710 """ 

2711 Wrapper function for pipeline_execute_command 

2712 """ 

2713 return self._execution_strategy.execute_command(*args, **kwargs) 

2714 

2715 def pipeline_execute_command(self, *args, **options): 

2716 """ 

2717 Stage a command to be executed when execute() is next called 

2718 

2719 Returns the current Pipeline object back so commands can be 

2720 chained together, such as: 

2721 

2722 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

2723 

2724 At some other point, you can then run: pipe.execute(), 

2725 which will execute all commands queued in the pipe. 

2726 """ 

2727 return self._execution_strategy.execute_command(*args, **options) 

2728 

2729 def annotate_exception(self, exception, number, command): 

2730 """ 

2731 Provides extra context to the exception prior to it being handled 

2732 """ 

2733 self._execution_strategy.annotate_exception(exception, number, command) 

2734 

2735 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2736 """ 

2737 Execute all the commands in the current pipeline 

2738 """ 

2739 

2740 try: 

2741 return self._execution_strategy.execute(raise_on_error) 

2742 finally: 

2743 self.reset() 

2744 

2745 def reset(self): 

2746 """ 

2747 Reset back to empty pipeline. 

2748 """ 

2749 self._execution_strategy.reset() 

2750 

2751 def send_cluster_commands( 

2752 self, stack, raise_on_error=True, allow_redirections=True 

2753 ): 

2754 return self._execution_strategy.send_cluster_commands( 

2755 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections 

2756 ) 

2757 

2758 def exists(self, *keys): 

2759 return self._execution_strategy.exists(*keys) 

2760 

2761 def eval(self): 

2762 """ """ 

2763 return self._execution_strategy.eval() 

2764 

2765 def multi(self): 

2766 """ 

2767 Start a transactional block of the pipeline after WATCH commands 

2768 are issued. End the transactional block with `execute`. 

2769 """ 

2770 self._execution_strategy.multi() 

2771 

2772 def load_scripts(self): 

2773 """ """ 

2774 self._execution_strategy.load_scripts() 

2775 

2776 def discard(self): 

2777 """ """ 

2778 self._execution_strategy.discard() 

2779 

2780 def watch(self, *names): 

2781 """Watches the values at keys ``names``""" 

2782 self._execution_strategy.watch(*names) 

2783 

2784 def unwatch(self): 

2785 """Unwatches all previously specified keys""" 

2786 self._execution_strategy.unwatch() 

2787 

2788 def script_load_for_pipeline(self, *args, **kwargs): 

2789 self._execution_strategy.script_load_for_pipeline(*args, **kwargs) 

2790 

2791 def delete(self, *names): 

2792 self._execution_strategy.delete(*names) 

2793 

2794 def unlink(self, *names): 

2795 self._execution_strategy.unlink(*names) 

2796 

2797 

2798def block_pipeline_command(name: str) -> Callable[..., Any]: 

2799 """ 

2800 Prints error because some pipelined commands should 

2801 be blocked when running in cluster-mode 

2802 """ 

2803 

2804 def inner(*args, **kwargs): 

2805 raise RedisClusterException( 

2806 f"ERROR: Calling pipelined function {name} is blocked " 

2807 f"when running redis in cluster mode..." 

2808 ) 

2809 

2810 return inner 

2811 

2812 

2813# Blocked pipeline commands 

2814PIPELINE_BLOCKED_COMMANDS = ( 

2815 "BGREWRITEAOF", 

2816 "BGSAVE", 

2817 "BITOP", 

2818 "BRPOPLPUSH", 

2819 "CLIENT GETNAME", 

2820 "CLIENT KILL", 

2821 "CLIENT LIST", 

2822 "CLIENT SETNAME", 

2823 "CLIENT", 

2824 "CONFIG GET", 

2825 "CONFIG RESETSTAT", 

2826 "CONFIG REWRITE", 

2827 "CONFIG SET", 

2828 "CONFIG", 

2829 "DBSIZE", 

2830 "ECHO", 

2831 "EVALSHA", 

2832 "FLUSHALL", 

2833 "FLUSHDB", 

2834 "INFO", 

2835 "KEYS", 

2836 "LASTSAVE", 

2837 "MGET", 

2838 "MGET NONATOMIC", 

2839 "MOVE", 

2840 "MSET", 

2841 "MSETEX", 

2842 "MSET NONATOMIC", 

2843 "MSETNX", 

2844 "PFCOUNT", 

2845 "PFMERGE", 

2846 "PING", 

2847 "PUBLISH", 

2848 "RANDOMKEY", 

2849 "READONLY", 

2850 "READWRITE", 

2851 "RENAME", 

2852 "RENAMENX", 

2853 "RPOPLPUSH", 

2854 "SAVE", 

2855 "SCAN", 

2856 "SCRIPT EXISTS", 

2857 "SCRIPT FLUSH", 

2858 "SCRIPT KILL", 

2859 "SCRIPT LOAD", 

2860 "SCRIPT", 

2861 "SDIFF", 

2862 "SDIFFSTORE", 

2863 "SENTINEL GET MASTER ADDR BY NAME", 

2864 "SENTINEL MASTER", 

2865 "SENTINEL MASTERS", 

2866 "SENTINEL MONITOR", 

2867 "SENTINEL REMOVE", 

2868 "SENTINEL SENTINELS", 

2869 "SENTINEL SET", 

2870 "SENTINEL SLAVES", 

2871 "SENTINEL", 

2872 "SHUTDOWN", 

2873 "SINTER", 

2874 "SINTERSTORE", 

2875 "SLAVEOF", 

2876 "SLOWLOG GET", 

2877 "SLOWLOG LEN", 

2878 "SLOWLOG RESET", 

2879 "SLOWLOG", 

2880 "SMOVE", 

2881 "SORT", 

2882 "SUNION", 

2883 "SUNIONSTORE", 

2884 "TIME", 

2885) 

2886for command in PIPELINE_BLOCKED_COMMANDS: 

2887 command = command.replace(" ", "_").lower() 

2888 

2889 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

2890 

2891 

2892class PipelineCommand: 

2893 """ """ 

2894 

2895 def __init__(self, args, options=None, position=None): 

2896 self.args = args 

2897 if options is None: 

2898 options = {} 

2899 self.options = options 

2900 self.position = position 

2901 self.result = None 

2902 self.node = None 

2903 self.asking = False 

2904 self.command_policies: Optional[CommandPolicies] = None 

2905 

2906 

2907class NodeCommands: 

2908 """ """ 

2909 

2910 def __init__(self, parse_response, connection_pool, connection): 

2911 """ """ 

2912 self.parse_response = parse_response 

2913 self.connection_pool = connection_pool 

2914 self.connection = connection 

2915 self.commands = [] 

2916 

2917 def append(self, c): 

2918 """ """ 

2919 self.commands.append(c) 

2920 

2921 def write(self): 

2922 """ 

2923 Code borrowed from Redis so it can be fixed 

2924 """ 

2925 connection = self.connection 

2926 commands = self.commands 

2927 

2928 # We are going to clobber the commands with the write, so go ahead 

2929 # and ensure that nothing is sitting there from a previous run. 

2930 for c in commands: 

2931 c.result = None 

2932 

2933 # build up all commands into a single request to increase network perf 

2934 # send all the commands and catch connection and timeout errors. 

2935 try: 

2936 connection.send_packed_command( 

2937 connection.pack_commands([c.args for c in commands]) 

2938 ) 

2939 except (ConnectionError, TimeoutError) as e: 

2940 for c in commands: 

2941 c.result = e 

2942 

2943 def read(self): 

2944 """ """ 

2945 connection = self.connection 

2946 for c in self.commands: 

2947 # if there is a result on this command, 

2948 # it means we ran into an exception 

2949 # like a connection error. Trying to parse 

2950 # a response on a connection that 

2951 # is no longer open will result in a 

2952 # connection error raised by redis-py. 

2953 # but redis-py doesn't check in parse_response 

2954 # that the sock object is 

2955 # still set and if you try to 

2956 # read from a closed connection, it will 

2957 # result in an AttributeError because 

2958 # it will do a readline() call on None. 

2959 # This can have all kinds of nasty side-effects. 

2960 # Treating this case as a connection error 

2961 # is fine because it will dump 

2962 # the connection object back into the 

2963 # pool and on the next write, it will 

2964 # explicitly open the connection and all will be well. 

2965 if c.result is None: 

2966 try: 

2967 c.result = self.parse_response(connection, c.args[0], **c.options) 

2968 except (ConnectionError, TimeoutError) as e: 

2969 for c in self.commands: 

2970 c.result = e 

2971 return 

2972 except RedisError: 

2973 c.result = sys.exc_info()[1] 

2974 

2975 

2976class ExecutionStrategy(ABC): 

2977 @property 

2978 @abstractmethod 

2979 def command_queue(self): 

2980 pass 

2981 

2982 @abstractmethod 

2983 def execute_command(self, *args, **kwargs): 

2984 """ 

2985 Execution flow for current execution strategy. 

2986 

2987 See: ClusterPipeline.execute_command() 

2988 """ 

2989 pass 

2990 

2991 @abstractmethod 

2992 def annotate_exception(self, exception, number, command): 

2993 """ 

2994 Annotate exception according to current execution strategy. 

2995 

2996 See: ClusterPipeline.annotate_exception() 

2997 """ 

2998 pass 

2999 

3000 @abstractmethod 

3001 def pipeline_execute_command(self, *args, **options): 

3002 """ 

3003 Pipeline execution flow for current execution strategy. 

3004 

3005 See: ClusterPipeline.pipeline_execute_command() 

3006 """ 

3007 pass 

3008 

3009 @abstractmethod 

3010 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3011 """ 

3012 Executes current execution strategy. 

3013 

3014 See: ClusterPipeline.execute() 

3015 """ 

3016 pass 

3017 

3018 @abstractmethod 

3019 def send_cluster_commands( 

3020 self, stack, raise_on_error=True, allow_redirections=True 

3021 ): 

3022 """ 

3023 Sends commands according to current execution strategy. 

3024 

3025 See: ClusterPipeline.send_cluster_commands() 

3026 """ 

3027 pass 

3028 

3029 @abstractmethod 

3030 def reset(self): 

3031 """ 

3032 Resets current execution strategy. 

3033 

3034 See: ClusterPipeline.reset() 

3035 """ 

3036 pass 

3037 

3038 @abstractmethod 

3039 def exists(self, *keys): 

3040 pass 

3041 

3042 @abstractmethod 

3043 def eval(self): 

3044 pass 

3045 

3046 @abstractmethod 

3047 def multi(self): 

3048 """ 

3049 Starts transactional context. 

3050 

3051 See: ClusterPipeline.multi() 

3052 """ 

3053 pass 

3054 

3055 @abstractmethod 

3056 def load_scripts(self): 

3057 pass 

3058 

3059 @abstractmethod 

3060 def watch(self, *names): 

3061 pass 

3062 

3063 @abstractmethod 

3064 def unwatch(self): 

3065 """ 

3066 Unwatches all previously specified keys 

3067 

3068 See: ClusterPipeline.unwatch() 

3069 """ 

3070 pass 

3071 

3072 @abstractmethod 

3073 def script_load_for_pipeline(self, *args, **kwargs): 

3074 pass 

3075 

3076 @abstractmethod 

3077 def delete(self, *names): 

3078 """ 

3079 "Delete a key specified by ``names``" 

3080 

3081 See: ClusterPipeline.delete() 

3082 """ 

3083 pass 

3084 

3085 @abstractmethod 

3086 def unlink(self, *names): 

3087 """ 

3088 "Unlink a key specified by ``names``" 

3089 

3090 See: ClusterPipeline.unlink() 

3091 """ 

3092 pass 

3093 

3094 @abstractmethod 

3095 def discard(self): 

3096 pass 

3097 

3098 

3099class AbstractStrategy(ExecutionStrategy): 

3100 def __init__( 

3101 self, 

3102 pipe: ClusterPipeline, 

3103 ): 

3104 self._command_queue: List[PipelineCommand] = [] 

3105 self._pipe = pipe 

3106 self._nodes_manager = self._pipe.nodes_manager 

3107 

3108 @property 

3109 def command_queue(self): 

3110 return self._command_queue 

3111 

3112 @command_queue.setter 

3113 def command_queue(self, queue: List[PipelineCommand]): 

3114 self._command_queue = queue 

3115 

3116 @abstractmethod 

3117 def execute_command(self, *args, **kwargs): 

3118 pass 

3119 

3120 def pipeline_execute_command(self, *args, **options): 

3121 self._command_queue.append( 

3122 PipelineCommand(args, options, len(self._command_queue)) 

3123 ) 

3124 return self._pipe 

3125 

3126 @abstractmethod 

3127 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3128 pass 

3129 

3130 @abstractmethod 

3131 def send_cluster_commands( 

3132 self, stack, raise_on_error=True, allow_redirections=True 

3133 ): 

3134 pass 

3135 

3136 @abstractmethod 

3137 def reset(self): 

3138 pass 

3139 

3140 def exists(self, *keys): 

3141 return self.execute_command("EXISTS", *keys) 

3142 

3143 def eval(self): 

3144 """ """ 

3145 raise RedisClusterException("method eval() is not implemented") 

3146 

3147 def load_scripts(self): 

3148 """ """ 

3149 raise RedisClusterException("method load_scripts() is not implemented") 

3150 

3151 def script_load_for_pipeline(self, *args, **kwargs): 

3152 """ """ 

3153 raise RedisClusterException( 

3154 "method script_load_for_pipeline() is not implemented" 

3155 ) 

3156 

3157 def annotate_exception(self, exception, number, command): 

3158 """ 

3159 Provides extra context to the exception prior to it being handled 

3160 """ 

3161 cmd = " ".join(map(safe_str, command)) 

3162 msg = ( 

3163 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

3164 f"caused error: {exception.args[0]}" 

3165 ) 

3166 exception.args = (msg,) + exception.args[1:] 

3167 

3168 

3169class PipelineStrategy(AbstractStrategy): 

3170 def __init__(self, pipe: ClusterPipeline): 

3171 super().__init__(pipe) 

3172 self.command_flags = pipe.command_flags 

3173 

3174 def execute_command(self, *args, **kwargs): 

3175 return self.pipeline_execute_command(*args, **kwargs) 

3176 

3177 def _raise_first_error(self, stack): 

3178 """ 

3179 Raise the first exception on the stack 

3180 """ 

3181 for c in stack: 

3182 r = c.result 

3183 if isinstance(r, Exception): 

3184 self.annotate_exception(r, c.position + 1, c.args) 

3185 raise r 

3186 

3187 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3188 stack = self._command_queue 

3189 if not stack: 

3190 return [] 

3191 

3192 try: 

3193 return self.send_cluster_commands(stack, raise_on_error) 

3194 finally: 

3195 self.reset() 

3196 

3197 def reset(self): 

3198 """ 

3199 Reset back to empty pipeline. 

3200 """ 

3201 self._command_queue = [] 

3202 

3203 def send_cluster_commands( 

3204 self, stack, raise_on_error=True, allow_redirections=True 

3205 ): 

3206 """ 

3207 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling. 

3208 

3209 If one of the retryable exceptions has been thrown we assume that: 

3210 - connection_pool was disconnected 

3211 - connection_pool was reset 

3212 - refresh_table_asap set to True 

3213 

3214 It will try the number of times specified by 

3215 the retries in config option "self.retry" 

3216 which defaults to 3 unless manually configured. 

3217 

3218 If it reaches the number of times, the command will 

3219 raises ClusterDownException. 

3220 """ 

3221 if not stack: 

3222 return [] 

3223 retry_attempts = self._pipe.retry.get_retries() 

3224 while True: 

3225 try: 

3226 return self._send_cluster_commands( 

3227 stack, 

3228 raise_on_error=raise_on_error, 

3229 allow_redirections=allow_redirections, 

3230 ) 

3231 except RedisCluster.ERRORS_ALLOW_RETRY as e: 

3232 if retry_attempts > 0: 

3233 # Try again with the new cluster setup. All other errors 

3234 # should be raised. 

3235 retry_attempts -= 1 

3236 pass 

3237 else: 

3238 raise e 

3239 

3240 def _send_cluster_commands( 

3241 self, stack, raise_on_error=True, allow_redirections=True 

3242 ): 

3243 """ 

3244 Send a bunch of cluster commands to the redis cluster. 

3245 

3246 `allow_redirections` If the pipeline should follow 

3247 `ASK` & `MOVED` responses automatically. If set 

3248 to false it will raise RedisClusterException. 

3249 """ 

3250 # the first time sending the commands we send all of 

3251 # the commands that were queued up. 

3252 # if we have to run through it again, we only retry 

3253 # the commands that failed. 

3254 attempt = sorted(stack, key=lambda x: x.position) 

3255 is_default_node = False 

3256 # build a list of node objects based on node names we need to 

3257 nodes = {} 

3258 

3259 # as we move through each command that still needs to be processed, 

3260 # we figure out the slot number that command maps to, then from 

3261 # the slot determine the node. 

3262 for c in attempt: 

3263 command_policies = self._pipe._policy_resolver.resolve(c.args[0].lower()) 

3264 

3265 while True: 

3266 # refer to our internal node -> slot table that 

3267 # tells us where a given command should route to. 

3268 # (it might be possible we have a cached node that no longer 

3269 # exists in the cluster, which is why we do this in a loop) 

3270 passed_targets = c.options.pop("target_nodes", None) 

3271 if passed_targets and not self._is_nodes_flag(passed_targets): 

3272 target_nodes = self._parse_target_nodes(passed_targets) 

3273 

3274 if not command_policies: 

3275 command_policies = CommandPolicies() 

3276 else: 

3277 if not command_policies: 

3278 command = c.args[0].upper() 

3279 if ( 

3280 len(c.args) >= 2 

3281 and f"{c.args[0]} {c.args[1]}".upper() 

3282 in self._pipe.command_flags 

3283 ): 

3284 command = f"{c.args[0]} {c.args[1]}".upper() 

3285 

3286 # We only could resolve key properties if command is not 

3287 # in a list of pre-defined request policies 

3288 command_flag = self.command_flags.get(command) 

3289 if not command_flag: 

3290 # Fallback to default policy 

3291 if not self._pipe.get_default_node(): 

3292 keys = None 

3293 else: 

3294 keys = self._pipe._get_command_keys(*c.args) 

3295 if not keys or len(keys) == 0: 

3296 command_policies = CommandPolicies() 

3297 else: 

3298 command_policies = CommandPolicies( 

3299 request_policy=RequestPolicy.DEFAULT_KEYED, 

3300 response_policy=ResponsePolicy.DEFAULT_KEYED, 

3301 ) 

3302 else: 

3303 if command_flag in self._pipe._command_flags_mapping: 

3304 command_policies = CommandPolicies( 

3305 request_policy=self._pipe._command_flags_mapping[ 

3306 command_flag 

3307 ] 

3308 ) 

3309 else: 

3310 command_policies = CommandPolicies() 

3311 

3312 target_nodes = self._determine_nodes( 

3313 *c.args, 

3314 request_policy=command_policies.request_policy, 

3315 node_flag=passed_targets, 

3316 ) 

3317 if not target_nodes: 

3318 raise RedisClusterException( 

3319 f"No targets were found to execute {c.args} command on" 

3320 ) 

3321 c.command_policies = command_policies 

3322 if len(target_nodes) > 1: 

3323 raise RedisClusterException( 

3324 f"Too many targets for command {c.args}" 

3325 ) 

3326 

3327 node = target_nodes[0] 

3328 if node == self._pipe.get_default_node(): 

3329 is_default_node = True 

3330 

3331 # now that we know the name of the node 

3332 # ( it's just a string in the form of host:port ) 

3333 # we can build a list of commands for each node. 

3334 node_name = node.name 

3335 if node_name not in nodes: 

3336 redis_node = self._pipe.get_redis_connection(node) 

3337 try: 

3338 connection = get_connection(redis_node) 

3339 except (ConnectionError, TimeoutError): 

3340 for n in nodes.values(): 

3341 n.connection_pool.release(n.connection) 

3342 # Connection retries are being handled in the node's 

3343 # Retry object. Reinitialize the node -> slot table. 

3344 self._nodes_manager.initialize() 

3345 if is_default_node: 

3346 self._pipe.replace_default_node() 

3347 raise 

3348 nodes[node_name] = NodeCommands( 

3349 redis_node.parse_response, 

3350 redis_node.connection_pool, 

3351 connection, 

3352 ) 

3353 nodes[node_name].append(c) 

3354 break 

3355 

3356 # send the commands in sequence. 

3357 # we write to all the open sockets for each node first, 

3358 # before reading anything 

3359 # this allows us to flush all the requests out across the 

3360 # network 

3361 # so that we can read them from different sockets as they come back. 

3362 # we dont' multiplex on the sockets as they come available, 

3363 # but that shouldn't make too much difference. 

3364 try: 

3365 node_commands = nodes.values() 

3366 for n in node_commands: 

3367 n.write() 

3368 

3369 for n in node_commands: 

3370 n.read() 

3371 finally: 

3372 # release all of the redis connections we allocated earlier 

3373 # back into the connection pool. 

3374 # we used to do this step as part of a try/finally block, 

3375 # but it is really dangerous to 

3376 # release connections back into the pool if for some 

3377 # reason the socket has data still left in it 

3378 # from a previous operation. The write and 

3379 # read operations already have try/catch around them for 

3380 # all known types of errors including connection 

3381 # and socket level errors. 

3382 # So if we hit an exception, something really bad 

3383 # happened and putting any oF 

3384 # these connections back into the pool is a very bad idea. 

3385 # the socket might have unread buffer still sitting in it, 

3386 # and then the next time we read from it we pass the 

3387 # buffered result back from a previous command and 

3388 # every single request after to that connection will always get 

3389 # a mismatched result. 

3390 for n in nodes.values(): 

3391 n.connection_pool.release(n.connection) 

3392 

3393 # if the response isn't an exception it is a 

3394 # valid response from the node 

3395 # we're all done with that command, YAY! 

3396 # if we have more commands to attempt, we've run into problems. 

3397 # collect all the commands we are allowed to retry. 

3398 # (MOVED, ASK, or connection errors or timeout errors) 

3399 attempt = sorted( 

3400 ( 

3401 c 

3402 for c in attempt 

3403 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

3404 ), 

3405 key=lambda x: x.position, 

3406 ) 

3407 if attempt and allow_redirections: 

3408 # RETRY MAGIC HAPPENS HERE! 

3409 # send these remaining commands one at a time using `execute_command` 

3410 # in the main client. This keeps our retry logic 

3411 # in one place mostly, 

3412 # and allows us to be more confident in correctness of behavior. 

3413 # at this point any speed gains from pipelining have been lost 

3414 # anyway, so we might as well make the best 

3415 # attempt to get the correct behavior. 

3416 # 

3417 # The client command will handle retries for each 

3418 # individual command sequentially as we pass each 

3419 # one into `execute_command`. Any exceptions 

3420 # that bubble out should only appear once all 

3421 # retries have been exhausted. 

3422 # 

3423 # If a lot of commands have failed, we'll be setting the 

3424 # flag to rebuild the slots table from scratch. 

3425 # So MOVED errors should correct themselves fairly quickly. 

3426 self._pipe.reinitialize_counter += 1 

3427 if self._pipe._should_reinitialized(): 

3428 self._nodes_manager.initialize() 

3429 if is_default_node: 

3430 self._pipe.replace_default_node() 

3431 for c in attempt: 

3432 try: 

3433 # send each command individually like we 

3434 # do in the main client. 

3435 c.result = self._pipe.parent_execute_command(*c.args, **c.options) 

3436 except RedisError as e: 

3437 c.result = e 

3438 

3439 # turn the response back into a simple flat array that corresponds 

3440 # to the sequence of commands issued in the stack in pipeline.execute() 

3441 response = [] 

3442 for c in sorted(stack, key=lambda x: x.position): 

3443 if c.args[0] in self._pipe.cluster_response_callbacks: 

3444 # Remove keys entry, it needs only for cache. 

3445 c.options.pop("keys", None) 

3446 c.result = self._pipe._policies_callback_mapping[ 

3447 c.command_policies.response_policy 

3448 ]( 

3449 self._pipe.cluster_response_callbacks[c.args[0]]( 

3450 c.result, **c.options 

3451 ) 

3452 ) 

3453 response.append(c.result) 

3454 

3455 if raise_on_error: 

3456 self._raise_first_error(stack) 

3457 

3458 return response 

3459 

3460 def _is_nodes_flag(self, target_nodes): 

3461 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags 

3462 

3463 def _parse_target_nodes(self, target_nodes): 

3464 if isinstance(target_nodes, list): 

3465 nodes = target_nodes 

3466 elif isinstance(target_nodes, ClusterNode): 

3467 # Supports passing a single ClusterNode as a variable 

3468 nodes = [target_nodes] 

3469 elif isinstance(target_nodes, dict): 

3470 # Supports dictionaries of the format {node_name: node}. 

3471 # It enables to execute commands with multi nodes as follows: 

3472 # rc.cluster_save_config(rc.get_primaries()) 

3473 nodes = target_nodes.values() 

3474 else: 

3475 raise TypeError( 

3476 "target_nodes type can be one of the following: " 

3477 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

3478 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

3479 f"The passed type is {type(target_nodes)}" 

3480 ) 

3481 return nodes 

3482 

3483 def _determine_nodes( 

3484 self, *args, request_policy: RequestPolicy, **kwargs 

3485 ) -> List["ClusterNode"]: 

3486 # Determine which nodes should be executed the command on. 

3487 # Returns a list of target nodes. 

3488 command = args[0].upper() 

3489 if ( 

3490 len(args) >= 2 

3491 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags 

3492 ): 

3493 command = f"{args[0]} {args[1]}".upper() 

3494 

3495 nodes_flag = kwargs.pop("nodes_flag", None) 

3496 if nodes_flag is not None: 

3497 # nodes flag passed by the user 

3498 command_flag = nodes_flag 

3499 else: 

3500 # get the nodes group for this command if it was predefined 

3501 command_flag = self._pipe.command_flags.get(command) 

3502 

3503 if command_flag in self._pipe._command_flags_mapping: 

3504 request_policy = self._pipe._command_flags_mapping[command_flag] 

3505 

3506 policy_callback = self._pipe._policies_callback_mapping[request_policy] 

3507 

3508 if request_policy == RequestPolicy.DEFAULT_KEYED: 

3509 nodes = policy_callback(command, *args) 

3510 elif request_policy == RequestPolicy.MULTI_SHARD: 

3511 nodes = policy_callback(*args, **kwargs) 

3512 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

3513 nodes = policy_callback(args[0]) 

3514 else: 

3515 nodes = policy_callback() 

3516 

3517 if args[0].lower() == "ft.aggregate": 

3518 self._aggregate_nodes = nodes 

3519 

3520 return nodes 

3521 

3522 def multi(self): 

3523 raise RedisClusterException( 

3524 "method multi() is not supported outside of transactional context" 

3525 ) 

3526 

3527 def discard(self): 

3528 raise RedisClusterException( 

3529 "method discard() is not supported outside of transactional context" 

3530 ) 

3531 

3532 def watch(self, *names): 

3533 raise RedisClusterException( 

3534 "method watch() is not supported outside of transactional context" 

3535 ) 

3536 

3537 def unwatch(self, *names): 

3538 raise RedisClusterException( 

3539 "method unwatch() is not supported outside of transactional context" 

3540 ) 

3541 

3542 def delete(self, *names): 

3543 if len(names) != 1: 

3544 raise RedisClusterException( 

3545 "deleting multiple keys is not implemented in pipeline command" 

3546 ) 

3547 

3548 return self.execute_command("DEL", names[0]) 

3549 

3550 def unlink(self, *names): 

3551 if len(names) != 1: 

3552 raise RedisClusterException( 

3553 "unlinking multiple keys is not implemented in pipeline command" 

3554 ) 

3555 

3556 return self.execute_command("UNLINK", names[0]) 

3557 

3558 

3559class TransactionStrategy(AbstractStrategy): 

3560 NO_SLOTS_COMMANDS = {"UNWATCH"} 

3561 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

3562 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

3563 SLOT_REDIRECT_ERRORS = (AskError, MovedError) 

3564 CONNECTION_ERRORS = ( 

3565 ConnectionError, 

3566 OSError, 

3567 ClusterDownError, 

3568 SlotNotCoveredError, 

3569 ) 

3570 

3571 def __init__(self, pipe: ClusterPipeline): 

3572 super().__init__(pipe) 

3573 self._explicit_transaction = False 

3574 self._watching = False 

3575 self._pipeline_slots: Set[int] = set() 

3576 self._transaction_connection: Optional[Connection] = None 

3577 self._executing = False 

3578 self._retry = copy(self._pipe.retry) 

3579 self._retry.update_supported_errors( 

3580 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS 

3581 ) 

3582 

3583 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]: 

3584 """ 

3585 Find a connection for a pipeline transaction. 

3586 

3587 For running an atomic transaction, watch keys ensure that contents have not been 

3588 altered as long as the watch commands for those keys were sent over the same 

3589 connection. So once we start watching a key, we fetch a connection to the 

3590 node that owns that slot and reuse it. 

3591 """ 

3592 if not self._pipeline_slots: 

3593 raise RedisClusterException( 

3594 "At least a command with a key is needed to identify a node" 

3595 ) 

3596 

3597 node: ClusterNode = self._nodes_manager.get_node_from_slot( 

3598 list(self._pipeline_slots)[0], False 

3599 ) 

3600 redis_node: Redis = self._pipe.get_redis_connection(node) 

3601 if self._transaction_connection: 

3602 if not redis_node.connection_pool.owns_connection( 

3603 self._transaction_connection 

3604 ): 

3605 previous_node = self._nodes_manager.find_connection_owner( 

3606 self._transaction_connection 

3607 ) 

3608 previous_node.connection_pool.release(self._transaction_connection) 

3609 self._transaction_connection = None 

3610 

3611 if not self._transaction_connection: 

3612 self._transaction_connection = get_connection(redis_node) 

3613 

3614 return redis_node, self._transaction_connection 

3615 

3616 def execute_command(self, *args, **kwargs): 

3617 slot_number: Optional[int] = None 

3618 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS: 

3619 slot_number = self._pipe.determine_slot(*args) 

3620 

3621 if ( 

3622 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS 

3623 ) and not self._explicit_transaction: 

3624 if args[0] == "WATCH": 

3625 self._validate_watch() 

3626 

3627 if slot_number is not None: 

3628 if self._pipeline_slots and slot_number not in self._pipeline_slots: 

3629 raise CrossSlotTransactionError( 

3630 "Cannot watch or send commands on different slots" 

3631 ) 

3632 

3633 self._pipeline_slots.add(slot_number) 

3634 elif args[0] not in self.NO_SLOTS_COMMANDS: 

3635 raise RedisClusterException( 

3636 f"Cannot identify slot number for command: {args[0]}," 

3637 "it cannot be triggered in a transaction" 

3638 ) 

3639 

3640 return self._immediate_execute_command(*args, **kwargs) 

3641 else: 

3642 if slot_number is not None: 

3643 self._pipeline_slots.add(slot_number) 

3644 

3645 return self.pipeline_execute_command(*args, **kwargs) 

3646 

3647 def _validate_watch(self): 

3648 if self._explicit_transaction: 

3649 raise RedisError("Cannot issue a WATCH after a MULTI") 

3650 

3651 self._watching = True 

3652 

3653 def _immediate_execute_command(self, *args, **options): 

3654 return self._retry.call_with_retry( 

3655 lambda: self._get_connection_and_send_command(*args, **options), 

3656 self._reinitialize_on_error, 

3657 ) 

3658 

3659 def _get_connection_and_send_command(self, *args, **options): 

3660 redis_node, connection = self._get_client_and_connection_for_transaction() 

3661 return self._send_command_parse_response( 

3662 connection, redis_node, args[0], *args, **options 

3663 ) 

3664 

3665 def _send_command_parse_response( 

3666 self, conn, redis_node: Redis, command_name, *args, **options 

3667 ): 

3668 """ 

3669 Send a command and parse the response 

3670 """ 

3671 

3672 conn.send_command(*args) 

3673 output = redis_node.parse_response(conn, command_name, **options) 

3674 

3675 if command_name in self.UNWATCH_COMMANDS: 

3676 self._watching = False 

3677 return output 

3678 

3679 def _reinitialize_on_error(self, error): 

3680 if self._watching: 

3681 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: 

3682 raise WatchError("Slot rebalancing occurred while watching keys") 

3683 

3684 if ( 

3685 type(error) in self.SLOT_REDIRECT_ERRORS 

3686 or type(error) in self.CONNECTION_ERRORS 

3687 ): 

3688 if self._transaction_connection: 

3689 # Disconnect and release back to pool 

3690 self._transaction_connection.disconnect() 

3691 node = self._nodes_manager.find_connection_owner( 

3692 self._transaction_connection 

3693 ) 

3694 if node and node.redis_connection: 

3695 node.redis_connection.connection_pool.release( 

3696 self._transaction_connection 

3697 ) 

3698 self._transaction_connection = None 

3699 

3700 self._pipe.reinitialize_counter += 1 

3701 if self._pipe._should_reinitialized(): 

3702 self._nodes_manager.initialize() 

3703 self.reinitialize_counter = 0 

3704 else: 

3705 if isinstance(error, AskError): 

3706 self._nodes_manager.move_slot(error) 

3707 

3708 self._executing = False 

3709 

3710 def _raise_first_error(self, responses, stack): 

3711 """ 

3712 Raise the first exception on the stack 

3713 """ 

3714 for r, cmd in zip(responses, stack): 

3715 if isinstance(r, Exception): 

3716 self.annotate_exception(r, cmd.position + 1, cmd.args) 

3717 raise r 

3718 

3719 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3720 stack = self._command_queue 

3721 if not stack and (not self._watching or not self._pipeline_slots): 

3722 return [] 

3723 

3724 return self._execute_transaction_with_retries(stack, raise_on_error) 

3725 

3726 def _execute_transaction_with_retries( 

3727 self, stack: List["PipelineCommand"], raise_on_error: bool 

3728 ): 

3729 return self._retry.call_with_retry( 

3730 lambda: self._execute_transaction(stack, raise_on_error), 

3731 self._reinitialize_on_error, 

3732 ) 

3733 

3734 def _execute_transaction( 

3735 self, stack: List["PipelineCommand"], raise_on_error: bool 

3736 ): 

3737 if len(self._pipeline_slots) > 1: 

3738 raise CrossSlotTransactionError( 

3739 "All keys involved in a cluster transaction must map to the same slot" 

3740 ) 

3741 

3742 self._executing = True 

3743 

3744 redis_node, connection = self._get_client_and_connection_for_transaction() 

3745 

3746 stack = chain( 

3747 [PipelineCommand(("MULTI",))], 

3748 stack, 

3749 [PipelineCommand(("EXEC",))], 

3750 ) 

3751 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options] 

3752 packed_commands = connection.pack_commands(commands) 

3753 connection.send_packed_command(packed_commands) 

3754 errors = [] 

3755 

3756 # parse off the response for MULTI 

3757 # NOTE: we need to handle ResponseErrors here and continue 

3758 # so that we read all the additional command messages from 

3759 # the socket 

3760 try: 

3761 redis_node.parse_response(connection, "MULTI") 

3762 except ResponseError as e: 

3763 self.annotate_exception(e, 0, "MULTI") 

3764 errors.append(e) 

3765 except self.CONNECTION_ERRORS as cluster_error: 

3766 self.annotate_exception(cluster_error, 0, "MULTI") 

3767 raise 

3768 

3769 # and all the other commands 

3770 for i, command in enumerate(self._command_queue): 

3771 if EMPTY_RESPONSE in command.options: 

3772 errors.append((i, command.options[EMPTY_RESPONSE])) 

3773 else: 

3774 try: 

3775 _ = redis_node.parse_response(connection, "_") 

3776 except self.SLOT_REDIRECT_ERRORS as slot_error: 

3777 self.annotate_exception(slot_error, i + 1, command.args) 

3778 errors.append(slot_error) 

3779 except self.CONNECTION_ERRORS as cluster_error: 

3780 self.annotate_exception(cluster_error, i + 1, command.args) 

3781 raise 

3782 except ResponseError as e: 

3783 self.annotate_exception(e, i + 1, command.args) 

3784 errors.append(e) 

3785 

3786 response = None 

3787 # parse the EXEC. 

3788 try: 

3789 response = redis_node.parse_response(connection, "EXEC") 

3790 except ExecAbortError: 

3791 if errors: 

3792 raise errors[0] 

3793 raise 

3794 

3795 self._executing = False 

3796 

3797 # EXEC clears any watched keys 

3798 self._watching = False 

3799 

3800 if response is None: 

3801 raise WatchError("Watched variable changed.") 

3802 

3803 # put any parse errors into the response 

3804 for i, e in errors: 

3805 response.insert(i, e) 

3806 

3807 if len(response) != len(self._command_queue): 

3808 raise InvalidPipelineStack( 

3809 "Unexpected response length for cluster pipeline EXEC." 

3810 " Command stack was {} but response had length {}".format( 

3811 [c.args[0] for c in self._command_queue], len(response) 

3812 ) 

3813 ) 

3814 

3815 # find any errors in the response and raise if necessary 

3816 if raise_on_error or len(errors) > 0: 

3817 self._raise_first_error( 

3818 response, 

3819 self._command_queue, 

3820 ) 

3821 

3822 # We have to run response callbacks manually 

3823 data = [] 

3824 for r, cmd in zip(response, self._command_queue): 

3825 if not isinstance(r, Exception): 

3826 command_name = cmd.args[0] 

3827 if command_name in self._pipe.cluster_response_callbacks: 

3828 r = self._pipe.cluster_response_callbacks[command_name]( 

3829 r, **cmd.options 

3830 ) 

3831 data.append(r) 

3832 return data 

3833 

3834 def reset(self): 

3835 self._command_queue = [] 

3836 

3837 # make sure to reset the connection state in the event that we were 

3838 # watching something 

3839 if self._transaction_connection: 

3840 try: 

3841 if self._watching: 

3842 # call this manually since our unwatch or 

3843 # immediate_execute_command methods can call reset() 

3844 self._transaction_connection.send_command("UNWATCH") 

3845 self._transaction_connection.read_response() 

3846 # we can safely return the connection to the pool here since we're 

3847 # sure we're no longer WATCHing anything 

3848 node = self._nodes_manager.find_connection_owner( 

3849 self._transaction_connection 

3850 ) 

3851 if node and node.redis_connection: 

3852 node.redis_connection.connection_pool.release( 

3853 self._transaction_connection 

3854 ) 

3855 self._transaction_connection = None 

3856 except self.CONNECTION_ERRORS: 

3857 # disconnect will also remove any previous WATCHes 

3858 if self._transaction_connection: 

3859 self._transaction_connection.disconnect() 

3860 node = self._nodes_manager.find_connection_owner( 

3861 self._transaction_connection 

3862 ) 

3863 if node and node.redis_connection: 

3864 node.redis_connection.connection_pool.release( 

3865 self._transaction_connection 

3866 ) 

3867 self._transaction_connection = None 

3868 

3869 # clean up the other instance attributes 

3870 self._watching = False 

3871 self._explicit_transaction = False 

3872 self._pipeline_slots = set() 

3873 self._executing = False 

3874 

3875 def send_cluster_commands( 

3876 self, stack, raise_on_error=True, allow_redirections=True 

3877 ): 

3878 raise NotImplementedError( 

3879 "send_cluster_commands cannot be executed in transactional context." 

3880 ) 

3881 

3882 def multi(self): 

3883 if self._explicit_transaction: 

3884 raise RedisError("Cannot issue nested calls to MULTI") 

3885 if self._command_queue: 

3886 raise RedisError( 

3887 "Commands without an initial WATCH have already been issued" 

3888 ) 

3889 self._explicit_transaction = True 

3890 

3891 def watch(self, *names): 

3892 if self._explicit_transaction: 

3893 raise RedisError("Cannot issue a WATCH after a MULTI") 

3894 

3895 return self.execute_command("WATCH", *names) 

3896 

3897 def unwatch(self): 

3898 if self._watching: 

3899 return self.execute_command("UNWATCH") 

3900 

3901 return True 

3902 

3903 def discard(self): 

3904 self.reset() 

3905 

3906 def delete(self, *names): 

3907 return self.execute_command("DEL", *names) 

3908 

3909 def unlink(self, *names): 

3910 return self.execute_command("UNLINK", *names)