Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1331 statements  

1import random 

2import socket 

3import sys 

4import threading 

5import time 

6from abc import ABC, abstractmethod 

7from collections import OrderedDict 

8from copy import copy 

9from enum import Enum 

10from itertools import chain 

11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union 

12 

13from redis._parsers import CommandsParser, Encoder 

14from redis._parsers.helpers import parse_scan 

15from redis.backoff import ExponentialWithJitterBackoff, NoBackoff 

16from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface 

17from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis 

18from redis.commands import READ_COMMANDS, RedisClusterCommands 

19from redis.commands.helpers import list_or_args 

20from redis.connection import ( 

21 Connection, 

22 ConnectionPool, 

23 parse_url, 

24) 

25from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

26from redis.event import ( 

27 AfterPooledConnectionsInstantiationEvent, 

28 AfterPubSubConnectionInstantiationEvent, 

29 ClientType, 

30 EventDispatcher, 

31) 

32from redis.exceptions import ( 

33 AskError, 

34 AuthenticationError, 

35 ClusterDownError, 

36 ClusterError, 

37 ConnectionError, 

38 CrossSlotTransactionError, 

39 DataError, 

40 ExecAbortError, 

41 InvalidPipelineStack, 

42 MovedError, 

43 RedisClusterException, 

44 RedisError, 

45 ResponseError, 

46 SlotNotCoveredError, 

47 TimeoutError, 

48 TryAgainError, 

49 WatchError, 

50) 

51from redis.lock import Lock 

52from redis.retry import Retry 

53from redis.utils import ( 

54 deprecated_args, 

55 dict_merge, 

56 list_keys_to_dict, 

57 merge_result, 

58 safe_str, 

59 str_if_bytes, 

60 truncate_text, 

61) 

62 

63 

64def get_node_name(host: str, port: Union[str, int]) -> str: 

65 return f"{host}:{port}" 

66 

67 

68@deprecated_args( 

69 allowed_args=["redis_node"], 

70 reason="Use get_connection(redis_node) instead", 

71 version="5.3.0", 

72) 

73def get_connection(redis_node: Redis, *args, **options) -> Connection: 

74 return redis_node.connection or redis_node.connection_pool.get_connection() 

75 

76 

77def parse_scan_result(command, res, **options): 

78 cursors = {} 

79 ret = [] 

80 for node_name, response in res.items(): 

81 cursor, r = parse_scan(response, **options) 

82 cursors[node_name] = cursor 

83 ret += r 

84 

85 return cursors, ret 

86 

87 

88def parse_pubsub_numsub(command, res, **options): 

89 numsub_d = OrderedDict() 

90 for numsub_tups in res.values(): 

91 for channel, numsubbed in numsub_tups: 

92 try: 

93 numsub_d[channel] += numsubbed 

94 except KeyError: 

95 numsub_d[channel] = numsubbed 

96 

97 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

98 return ret_numsub 

99 

100 

101def parse_cluster_slots( 

102 resp: Any, **options: Any 

103) -> Dict[Tuple[int, int], Dict[str, Any]]: 

104 current_host = options.get("current_host", "") 

105 

106 def fix_server(*args: Any) -> Tuple[str, Any]: 

107 return str_if_bytes(args[0]) or current_host, args[1] 

108 

109 slots = {} 

110 for slot in resp: 

111 start, end, primary = slot[:3] 

112 replicas = slot[3:] 

113 slots[start, end] = { 

114 "primary": fix_server(*primary), 

115 "replicas": [fix_server(*replica) for replica in replicas], 

116 } 

117 

118 return slots 

119 

120 

121def parse_cluster_shards(resp, **options): 

122 """ 

123 Parse CLUSTER SHARDS response. 

124 """ 

125 if isinstance(resp[0], dict): 

126 return resp 

127 shards = [] 

128 for x in resp: 

129 shard = {"slots": [], "nodes": []} 

130 for i in range(0, len(x[1]), 2): 

131 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

132 nodes = x[3] 

133 for node in nodes: 

134 dict_node = {} 

135 for i in range(0, len(node), 2): 

136 dict_node[node[i]] = node[i + 1] 

137 shard["nodes"].append(dict_node) 

138 shards.append(shard) 

139 

140 return shards 

141 

142 

143def parse_cluster_myshardid(resp, **options): 

144 """ 

145 Parse CLUSTER MYSHARDID response. 

146 """ 

147 return resp.decode("utf-8") 

148 

149 

150PRIMARY = "primary" 

151REPLICA = "replica" 

152SLOT_ID = "slot-id" 

153 

154REDIS_ALLOWED_KEYS = ( 

155 "connection_class", 

156 "connection_pool", 

157 "connection_pool_class", 

158 "client_name", 

159 "credential_provider", 

160 "db", 

161 "decode_responses", 

162 "encoding", 

163 "encoding_errors", 

164 "host", 

165 "lib_name", 

166 "lib_version", 

167 "max_connections", 

168 "nodes_flag", 

169 "redis_connect_func", 

170 "password", 

171 "port", 

172 "queue_class", 

173 "retry", 

174 "retry_on_timeout", 

175 "protocol", 

176 "socket_connect_timeout", 

177 "socket_keepalive", 

178 "socket_keepalive_options", 

179 "socket_timeout", 

180 "ssl", 

181 "ssl_ca_certs", 

182 "ssl_ca_data", 

183 "ssl_certfile", 

184 "ssl_cert_reqs", 

185 "ssl_keyfile", 

186 "ssl_password", 

187 "ssl_check_hostname", 

188 "unix_socket_path", 

189 "username", 

190 "cache", 

191 "cache_config", 

192) 

193KWARGS_DISABLED_KEYS = ("host", "port", "retry") 

194 

195 

196def cleanup_kwargs(**kwargs): 

197 """ 

198 Remove unsupported or disabled keys from kwargs 

199 """ 

200 connection_kwargs = { 

201 k: v 

202 for k, v in kwargs.items() 

203 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

204 } 

205 

206 return connection_kwargs 

207 

208 

209class AbstractRedisCluster: 

210 RedisClusterRequestTTL = 16 

211 

212 PRIMARIES = "primaries" 

213 REPLICAS = "replicas" 

214 ALL_NODES = "all" 

215 RANDOM = "random" 

216 DEFAULT_NODE = "default-node" 

217 

218 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

219 

220 COMMAND_FLAGS = dict_merge( 

221 list_keys_to_dict( 

222 [ 

223 "ACL CAT", 

224 "ACL DELUSER", 

225 "ACL DRYRUN", 

226 "ACL GENPASS", 

227 "ACL GETUSER", 

228 "ACL HELP", 

229 "ACL LIST", 

230 "ACL LOG", 

231 "ACL LOAD", 

232 "ACL SAVE", 

233 "ACL SETUSER", 

234 "ACL USERS", 

235 "ACL WHOAMI", 

236 "AUTH", 

237 "CLIENT LIST", 

238 "CLIENT SETINFO", 

239 "CLIENT SETNAME", 

240 "CLIENT GETNAME", 

241 "CONFIG SET", 

242 "CONFIG REWRITE", 

243 "CONFIG RESETSTAT", 

244 "TIME", 

245 "PUBSUB CHANNELS", 

246 "PUBSUB NUMPAT", 

247 "PUBSUB NUMSUB", 

248 "PUBSUB SHARDCHANNELS", 

249 "PUBSUB SHARDNUMSUB", 

250 "PING", 

251 "INFO", 

252 "SHUTDOWN", 

253 "KEYS", 

254 "DBSIZE", 

255 "BGSAVE", 

256 "SLOWLOG GET", 

257 "SLOWLOG LEN", 

258 "SLOWLOG RESET", 

259 "WAIT", 

260 "WAITAOF", 

261 "SAVE", 

262 "MEMORY PURGE", 

263 "MEMORY MALLOC-STATS", 

264 "MEMORY STATS", 

265 "LASTSAVE", 

266 "CLIENT TRACKINGINFO", 

267 "CLIENT PAUSE", 

268 "CLIENT UNPAUSE", 

269 "CLIENT UNBLOCK", 

270 "CLIENT ID", 

271 "CLIENT REPLY", 

272 "CLIENT GETREDIR", 

273 "CLIENT INFO", 

274 "CLIENT KILL", 

275 "READONLY", 

276 "CLUSTER INFO", 

277 "CLUSTER MEET", 

278 "CLUSTER MYSHARDID", 

279 "CLUSTER NODES", 

280 "CLUSTER REPLICAS", 

281 "CLUSTER RESET", 

282 "CLUSTER SET-CONFIG-EPOCH", 

283 "CLUSTER SLOTS", 

284 "CLUSTER SHARDS", 

285 "CLUSTER COUNT-FAILURE-REPORTS", 

286 "CLUSTER KEYSLOT", 

287 "COMMAND", 

288 "COMMAND COUNT", 

289 "COMMAND LIST", 

290 "COMMAND GETKEYS", 

291 "CONFIG GET", 

292 "DEBUG", 

293 "RANDOMKEY", 

294 "READONLY", 

295 "READWRITE", 

296 "TIME", 

297 "TFUNCTION LOAD", 

298 "TFUNCTION DELETE", 

299 "TFUNCTION LIST", 

300 "TFCALL", 

301 "TFCALLASYNC", 

302 "LATENCY HISTORY", 

303 "LATENCY LATEST", 

304 "LATENCY RESET", 

305 "MODULE LIST", 

306 "MODULE LOAD", 

307 "MODULE UNLOAD", 

308 "MODULE LOADEX", 

309 ], 

310 DEFAULT_NODE, 

311 ), 

312 list_keys_to_dict( 

313 [ 

314 "FLUSHALL", 

315 "FLUSHDB", 

316 "FUNCTION DELETE", 

317 "FUNCTION FLUSH", 

318 "FUNCTION LIST", 

319 "FUNCTION LOAD", 

320 "FUNCTION RESTORE", 

321 "SCAN", 

322 "SCRIPT EXISTS", 

323 "SCRIPT FLUSH", 

324 "SCRIPT LOAD", 

325 ], 

326 PRIMARIES, 

327 ), 

328 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

329 list_keys_to_dict( 

330 [ 

331 "CLUSTER COUNTKEYSINSLOT", 

332 "CLUSTER DELSLOTS", 

333 "CLUSTER DELSLOTSRANGE", 

334 "CLUSTER GETKEYSINSLOT", 

335 "CLUSTER SETSLOT", 

336 ], 

337 SLOT_ID, 

338 ), 

339 ) 

340 

341 SEARCH_COMMANDS = ( 

342 [ 

343 "FT.CREATE", 

344 "FT.SEARCH", 

345 "FT.AGGREGATE", 

346 "FT.EXPLAIN", 

347 "FT.EXPLAINCLI", 

348 "FT,PROFILE", 

349 "FT.ALTER", 

350 "FT.DROPINDEX", 

351 "FT.ALIASADD", 

352 "FT.ALIASUPDATE", 

353 "FT.ALIASDEL", 

354 "FT.TAGVALS", 

355 "FT.SUGADD", 

356 "FT.SUGGET", 

357 "FT.SUGDEL", 

358 "FT.SUGLEN", 

359 "FT.SYNUPDATE", 

360 "FT.SYNDUMP", 

361 "FT.SPELLCHECK", 

362 "FT.DICTADD", 

363 "FT.DICTDEL", 

364 "FT.DICTDUMP", 

365 "FT.INFO", 

366 "FT._LIST", 

367 "FT.CONFIG", 

368 "FT.ADD", 

369 "FT.DEL", 

370 "FT.DROP", 

371 "FT.GET", 

372 "FT.MGET", 

373 "FT.SYNADD", 

374 ], 

375 ) 

376 

377 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

378 "CLUSTER SLOTS": parse_cluster_slots, 

379 "CLUSTER SHARDS": parse_cluster_shards, 

380 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

381 } 

382 

383 RESULT_CALLBACKS = dict_merge( 

384 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub), 

385 list_keys_to_dict( 

386 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

387 ), 

388 list_keys_to_dict( 

389 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result 

390 ), 

391 list_keys_to_dict( 

392 [ 

393 "PING", 

394 "CONFIG SET", 

395 "CONFIG REWRITE", 

396 "CONFIG RESETSTAT", 

397 "CLIENT SETNAME", 

398 "BGSAVE", 

399 "SLOWLOG RESET", 

400 "SAVE", 

401 "MEMORY PURGE", 

402 "CLIENT PAUSE", 

403 "CLIENT UNPAUSE", 

404 ], 

405 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

406 ), 

407 list_keys_to_dict( 

408 ["DBSIZE", "WAIT"], 

409 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

410 ), 

411 list_keys_to_dict( 

412 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

413 ), 

414 list_keys_to_dict(["SCAN"], parse_scan_result), 

415 list_keys_to_dict( 

416 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

417 ), 

418 list_keys_to_dict( 

419 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

420 ), 

421 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

422 ) 

423 

424 ERRORS_ALLOW_RETRY = ( 

425 ConnectionError, 

426 TimeoutError, 

427 ClusterDownError, 

428 SlotNotCoveredError, 

429 ) 

430 

431 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

432 """Replace the default cluster node. 

433 A random cluster node will be chosen if target_node isn't passed, and primaries 

434 will be prioritized. The default node will not be changed if there are no other 

435 nodes in the cluster. 

436 

437 Args: 

438 target_node (ClusterNode, optional): Target node to replace the default 

439 node. Defaults to None. 

440 """ 

441 if target_node: 

442 self.nodes_manager.default_node = target_node 

443 else: 

444 curr_node = self.get_default_node() 

445 primaries = [node for node in self.get_primaries() if node != curr_node] 

446 if primaries: 

447 # Choose a primary if the cluster contains different primaries 

448 self.nodes_manager.default_node = random.choice(primaries) 

449 else: 

450 # Otherwise, choose a primary if the cluster contains different primaries 

451 replicas = [node for node in self.get_replicas() if node != curr_node] 

452 if replicas: 

453 self.nodes_manager.default_node = random.choice(replicas) 

454 

455 

456class RedisCluster(AbstractRedisCluster, RedisClusterCommands): 

457 @classmethod 

458 def from_url(cls, url, **kwargs): 

459 """ 

460 Return a Redis client object configured from the given URL 

461 

462 For example:: 

463 

464 redis://[[username]:[password]]@localhost:6379/0 

465 rediss://[[username]:[password]]@localhost:6379/0 

466 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

467 

468 Three URL schemes are supported: 

469 

470 - `redis://` creates a TCP socket connection. See more at: 

471 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

472 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

473 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

474 - ``unix://``: creates a Unix Domain Socket connection. 

475 

476 The username, password, hostname, path and all querystring values 

477 are passed through urllib.parse.unquote in order to replace any 

478 percent-encoded values with their corresponding characters. 

479 

480 There are several ways to specify a database number. The first value 

481 found will be used: 

482 

483 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

484 2. If using the redis:// or rediss:// schemes, the path argument 

485 of the url, e.g. redis://localhost/0 

486 3. A ``db`` keyword argument to this function. 

487 

488 If none of these options are specified, the default db=0 is used. 

489 

490 All querystring options are cast to their appropriate Python types. 

491 Boolean arguments can be specified with string values "True"/"False" 

492 or "Yes"/"No". Values that cannot be properly cast cause a 

493 ``ValueError`` to be raised. Once parsed, the querystring arguments 

494 and keyword arguments are passed to the ``ConnectionPool``'s 

495 class initializer. In the case of conflicting arguments, querystring 

496 arguments always win. 

497 

498 """ 

499 return cls(url=url, **kwargs) 

500 

501 @deprecated_args( 

502 args_to_warn=["read_from_replicas"], 

503 reason="Please configure the 'load_balancing_strategy' instead", 

504 version="5.3.0", 

505 ) 

506 @deprecated_args( 

507 args_to_warn=[ 

508 "cluster_error_retry_attempts", 

509 ], 

510 reason="Please configure the 'retry' object instead", 

511 version="6.0.0", 

512 ) 

513 def __init__( 

514 self, 

515 host: Optional[str] = None, 

516 port: int = 6379, 

517 startup_nodes: Optional[List["ClusterNode"]] = None, 

518 cluster_error_retry_attempts: int = 3, 

519 retry: Optional["Retry"] = None, 

520 require_full_coverage: bool = True, 

521 reinitialize_steps: int = 5, 

522 read_from_replicas: bool = False, 

523 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None, 

524 dynamic_startup_nodes: bool = True, 

525 url: Optional[str] = None, 

526 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

527 cache: Optional[CacheInterface] = None, 

528 cache_config: Optional[CacheConfig] = None, 

529 event_dispatcher: Optional[EventDispatcher] = None, 

530 **kwargs, 

531 ): 

532 """ 

533 Initialize a new RedisCluster client. 

534 

535 :param startup_nodes: 

536 List of nodes from which initial bootstrapping can be done 

537 :param host: 

538 Can be used to point to a startup node 

539 :param port: 

540 Can be used to point to a startup node 

541 :param require_full_coverage: 

542 When set to False (default value): the client will not require a 

543 full coverage of the slots. However, if not all slots are covered, 

544 and at least one node has 'cluster-require-full-coverage' set to 

545 'yes,' the server will throw a ClusterDownError for some key-based 

546 commands. See - 

547 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

548 When set to True: all slots must be covered to construct the 

549 cluster client. If not all slots are covered, RedisClusterException 

550 will be thrown. 

551 :param read_from_replicas: 

552 @deprecated - please use load_balancing_strategy instead 

553 Enable read from replicas in READONLY mode. You can read possibly 

554 stale data. 

555 When set to true, read commands will be assigned between the 

556 primary and its replications in a Round-Robin manner. 

557 :param load_balancing_strategy: 

558 Enable read from replicas in READONLY mode and defines the load balancing 

559 strategy that will be used for cluster node selection. 

560 The data read from replicas is eventually consistent with the data in primary nodes. 

561 :param dynamic_startup_nodes: 

562 Set the RedisCluster's startup nodes to all of the discovered nodes. 

563 If true (default value), the cluster's discovered nodes will be used to 

564 determine the cluster nodes-slots mapping in the next topology refresh. 

565 It will remove the initial passed startup nodes if their endpoints aren't 

566 listed in the CLUSTER SLOTS output. 

567 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

568 specific IP addresses, it is best to set it to false. 

569 :param cluster_error_retry_attempts: 

570 @deprecated - Please configure the 'retry' object instead 

571 In case 'retry' object is set - this argument is ignored! 

572 

573 Number of times to retry before raising an error when 

574 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or 

575 :class:`~.ClusterDownError` are encountered 

576 :param retry: 

577 A retry object that defines the retry strategy and the number of 

578 retries for the cluster client. 

579 In current implementation for the cluster client (starting form redis-py version 6.0.0) 

580 the retry object is not yet fully utilized, instead it is used just to determine 

581 the number of retries for the cluster client. 

582 In the future releases the retry object will be used to handle the cluster client retries! 

583 :param reinitialize_steps: 

584 Specifies the number of MOVED errors that need to occur before 

585 reinitializing the whole cluster topology. If a MOVED error occurs 

586 and the cluster does not need to be reinitialized on this current 

587 error handling, only the MOVED slot will be patched with the 

588 redirected node. 

589 To reinitialize the cluster on every MOVED error, set 

590 reinitialize_steps to 1. 

591 To avoid reinitializing the cluster on moved errors, set 

592 reinitialize_steps to 0. 

593 :param address_remap: 

594 An optional callable which, when provided with an internal network 

595 address of a node, e.g. a `(host, port)` tuple, will return the address 

596 where the node is reachable. This can be used to map the addresses at 

597 which the nodes _think_ they are, to addresses at which a client may 

598 reach them, such as when they sit behind a proxy. 

599 

600 :**kwargs: 

601 Extra arguments that will be sent into Redis instance when created 

602 (See Official redis-py doc for supported kwargs - the only limitation 

603 is that you can't provide 'retry' object as part of kwargs. 

604 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

605 Some kwargs are not supported and will raise a 

606 RedisClusterException: 

607 - db (Redis do not support database SELECT in cluster mode) 

608 """ 

609 if startup_nodes is None: 

610 startup_nodes = [] 

611 

612 if "db" in kwargs: 

613 # Argument 'db' is not possible to use in cluster mode 

614 raise RedisClusterException( 

615 "Argument 'db' is not possible to use in cluster mode" 

616 ) 

617 

618 if "retry" in kwargs: 

619 # Argument 'retry' is not possible to be used in kwargs when in cluster mode 

620 # the kwargs are set to the lower level connections to the cluster nodes 

621 # and there we provide retry configuration without retries allowed. 

622 # The retries should be handled on cluster client level. 

623 raise RedisClusterException( 

624 "The 'retry' argument cannot be used in kwargs when running in cluster mode." 

625 ) 

626 

627 # Get the startup node/s 

628 from_url = False 

629 if url is not None: 

630 from_url = True 

631 url_options = parse_url(url) 

632 if "path" in url_options: 

633 raise RedisClusterException( 

634 "RedisCluster does not currently support Unix Domain " 

635 "Socket connections" 

636 ) 

637 if "db" in url_options and url_options["db"] != 0: 

638 # Argument 'db' is not possible to use in cluster mode 

639 raise RedisClusterException( 

640 "A ``db`` querystring option can only be 0 in cluster mode" 

641 ) 

642 kwargs.update(url_options) 

643 host = kwargs.get("host") 

644 port = kwargs.get("port", port) 

645 startup_nodes.append(ClusterNode(host, port)) 

646 elif host is not None and port is not None: 

647 startup_nodes.append(ClusterNode(host, port)) 

648 elif len(startup_nodes) == 0: 

649 # No startup node was provided 

650 raise RedisClusterException( 

651 "RedisCluster requires at least one node to discover the " 

652 "cluster. Please provide one of the followings:\n" 

653 "1. host and port, for example:\n" 

654 " RedisCluster(host='localhost', port=6379)\n" 

655 "2. list of startup nodes, for example:\n" 

656 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

657 " ClusterNode('localhost', 6378)])" 

658 ) 

659 # Update the connection arguments 

660 # Whenever a new connection is established, RedisCluster's on_connect 

661 # method should be run 

662 # If the user passed on_connect function we'll save it and run it 

663 # inside the RedisCluster.on_connect() function 

664 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

665 kwargs.update({"redis_connect_func": self.on_connect}) 

666 kwargs = cleanup_kwargs(**kwargs) 

667 if retry: 

668 self.retry = retry 

669 else: 

670 self.retry = Retry( 

671 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

672 retries=cluster_error_retry_attempts, 

673 ) 

674 

675 self.encoder = Encoder( 

676 kwargs.get("encoding", "utf-8"), 

677 kwargs.get("encoding_errors", "strict"), 

678 kwargs.get("decode_responses", False), 

679 ) 

680 protocol = kwargs.get("protocol", None) 

681 if (cache_config or cache) and protocol not in [3, "3"]: 

682 raise RedisError("Client caching is only supported with RESP version 3") 

683 

684 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

685 self.node_flags = self.__class__.NODE_FLAGS.copy() 

686 self.read_from_replicas = read_from_replicas 

687 self.load_balancing_strategy = load_balancing_strategy 

688 self.reinitialize_counter = 0 

689 self.reinitialize_steps = reinitialize_steps 

690 if event_dispatcher is None: 

691 self._event_dispatcher = EventDispatcher() 

692 else: 

693 self._event_dispatcher = event_dispatcher 

694 self.nodes_manager = NodesManager( 

695 startup_nodes=startup_nodes, 

696 from_url=from_url, 

697 require_full_coverage=require_full_coverage, 

698 dynamic_startup_nodes=dynamic_startup_nodes, 

699 address_remap=address_remap, 

700 cache=cache, 

701 cache_config=cache_config, 

702 event_dispatcher=self._event_dispatcher, 

703 **kwargs, 

704 ) 

705 

706 self.cluster_response_callbacks = CaseInsensitiveDict( 

707 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

708 ) 

709 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

710 

711 self.commands_parser = CommandsParser(self) 

712 self._lock = threading.RLock() 

713 

714 def __enter__(self): 

715 return self 

716 

717 def __exit__(self, exc_type, exc_value, traceback): 

718 self.close() 

719 

720 def __del__(self): 

721 try: 

722 self.close() 

723 except Exception: 

724 pass 

725 

726 def disconnect_connection_pools(self): 

727 for node in self.get_nodes(): 

728 if node.redis_connection: 

729 try: 

730 node.redis_connection.connection_pool.disconnect() 

731 except OSError: 

732 # Client was already disconnected. do nothing 

733 pass 

734 

735 def on_connect(self, connection): 

736 """ 

737 Initialize the connection, authenticate and select a database and send 

738 READONLY if it is set during object initialization. 

739 """ 

740 connection.on_connect() 

741 

742 if self.read_from_replicas or self.load_balancing_strategy: 

743 # Sending READONLY command to server to configure connection as 

744 # readonly. Since each cluster node may change its server type due 

745 # to a failover, we should establish a READONLY connection 

746 # regardless of the server type. If this is a primary connection, 

747 # READONLY would not affect executing write commands. 

748 connection.send_command("READONLY") 

749 if str_if_bytes(connection.read_response()) != "OK": 

750 raise ConnectionError("READONLY command failed") 

751 

752 if self.user_on_connect_func is not None: 

753 self.user_on_connect_func(connection) 

754 

755 def get_redis_connection(self, node: "ClusterNode") -> Redis: 

756 if not node.redis_connection: 

757 with self._lock: 

758 if not node.redis_connection: 

759 self.nodes_manager.create_redis_connections([node]) 

760 return node.redis_connection 

761 

762 def get_node(self, host=None, port=None, node_name=None): 

763 return self.nodes_manager.get_node(host, port, node_name) 

764 

765 def get_primaries(self): 

766 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

767 

768 def get_replicas(self): 

769 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

770 

771 def get_random_node(self): 

772 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

773 

774 def get_nodes(self): 

775 return list(self.nodes_manager.nodes_cache.values()) 

776 

777 def get_node_from_key(self, key, replica=False): 

778 """ 

779 Get the node that holds the key's slot. 

780 If replica set to True but the slot doesn't have any replicas, None is 

781 returned. 

782 """ 

783 slot = self.keyslot(key) 

784 slot_cache = self.nodes_manager.slots_cache.get(slot) 

785 if slot_cache is None or len(slot_cache) == 0: 

786 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

787 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

788 return None 

789 elif replica: 

790 node_idx = 1 

791 else: 

792 # primary 

793 node_idx = 0 

794 

795 return slot_cache[node_idx] 

796 

797 def get_default_node(self): 

798 """ 

799 Get the cluster's default node 

800 """ 

801 return self.nodes_manager.default_node 

802 

803 def set_default_node(self, node): 

804 """ 

805 Set the default node of the cluster. 

806 :param node: 'ClusterNode' 

807 :return True if the default node was set, else False 

808 """ 

809 if node is None or self.get_node(node_name=node.name) is None: 

810 return False 

811 self.nodes_manager.default_node = node 

812 return True 

813 

814 def set_retry(self, retry: Retry) -> None: 

815 self.retry = retry 

816 

817 def monitor(self, target_node=None): 

818 """ 

819 Returns a Monitor object for the specified target node. 

820 The default cluster node will be selected if no target node was 

821 specified. 

822 Monitor is useful for handling the MONITOR command to the redis server. 

823 next_command() method returns one command from monitor 

824 listen() method yields commands from monitor. 

825 """ 

826 if target_node is None: 

827 target_node = self.get_default_node() 

828 if target_node.redis_connection is None: 

829 raise RedisClusterException( 

830 f"Cluster Node {target_node.name} has no redis_connection" 

831 ) 

832 return target_node.redis_connection.monitor() 

833 

834 def pubsub(self, node=None, host=None, port=None, **kwargs): 

835 """ 

836 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

837 connected to the specified node 

838 """ 

839 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

840 

841 def pipeline(self, transaction=None, shard_hint=None): 

842 """ 

843 Cluster impl: 

844 Pipelines do not work in cluster mode the same way they 

845 do in normal mode. Create a clone of this object so 

846 that simulating pipelines will work correctly. Each 

847 command will be called directly when used and 

848 when calling execute() will only return the result stack. 

849 """ 

850 if shard_hint: 

851 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

852 

853 return ClusterPipeline( 

854 nodes_manager=self.nodes_manager, 

855 commands_parser=self.commands_parser, 

856 startup_nodes=self.nodes_manager.startup_nodes, 

857 result_callbacks=self.result_callbacks, 

858 cluster_response_callbacks=self.cluster_response_callbacks, 

859 read_from_replicas=self.read_from_replicas, 

860 load_balancing_strategy=self.load_balancing_strategy, 

861 reinitialize_steps=self.reinitialize_steps, 

862 retry=self.retry, 

863 lock=self._lock, 

864 transaction=transaction, 

865 ) 

866 

867 def lock( 

868 self, 

869 name, 

870 timeout=None, 

871 sleep=0.1, 

872 blocking=True, 

873 blocking_timeout=None, 

874 lock_class=None, 

875 thread_local=True, 

876 raise_on_release_error: bool = True, 

877 ): 

878 """ 

879 Return a new Lock object using key ``name`` that mimics 

880 the behavior of threading.Lock. 

881 

882 If specified, ``timeout`` indicates a maximum life for the lock. 

883 By default, it will remain locked until release() is called. 

884 

885 ``sleep`` indicates the amount of time to sleep per loop iteration 

886 when the lock is in blocking mode and another client is currently 

887 holding the lock. 

888 

889 ``blocking`` indicates whether calling ``acquire`` should block until 

890 the lock has been acquired or to fail immediately, causing ``acquire`` 

891 to return False and the lock not being acquired. Defaults to True. 

892 Note this value can be overridden by passing a ``blocking`` 

893 argument to ``acquire``. 

894 

895 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

896 spend trying to acquire the lock. A value of ``None`` indicates 

897 continue trying forever. ``blocking_timeout`` can be specified as a 

898 float or integer, both representing the number of seconds to wait. 

899 

900 ``lock_class`` forces the specified lock implementation. Note that as 

901 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

902 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

903 you have created your own custom lock class. 

904 

905 ``thread_local`` indicates whether the lock token is placed in 

906 thread-local storage. By default, the token is placed in thread local 

907 storage so that a thread only sees its token, not a token set by 

908 another thread. Consider the following timeline: 

909 

910 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

911 thread-1 sets the token to "abc" 

912 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

913 Lock instance. 

914 time: 5, thread-1 has not yet completed. redis expires the lock 

915 key. 

916 time: 5, thread-2 acquired `my-lock` now that it's available. 

917 thread-2 sets the token to "xyz" 

918 time: 6, thread-1 finishes its work and calls release(). if the 

919 token is *not* stored in thread local storage, then 

920 thread-1 would see the token value as "xyz" and would be 

921 able to successfully release the thread-2's lock. 

922 

923 ``raise_on_release_error`` indicates whether to raise an exception when 

924 the lock is no longer owned when exiting the context manager. By default, 

925 this is True, meaning an exception will be raised. If False, the warning 

926 will be logged and the exception will be suppressed. 

927 

928 In some use cases it's necessary to disable thread local storage. For 

929 example, if you have code where one thread acquires a lock and passes 

930 that lock instance to a worker thread to release later. If thread 

931 local storage isn't disabled in this case, the worker thread won't see 

932 the token set by the thread that acquired the lock. Our assumption 

933 is that these cases aren't common and as such default to using 

934 thread local storage.""" 

935 if lock_class is None: 

936 lock_class = Lock 

937 return lock_class( 

938 self, 

939 name, 

940 timeout=timeout, 

941 sleep=sleep, 

942 blocking=blocking, 

943 blocking_timeout=blocking_timeout, 

944 thread_local=thread_local, 

945 raise_on_release_error=raise_on_release_error, 

946 ) 

947 

948 def set_response_callback(self, command, callback): 

949 """Set a custom Response Callback""" 

950 self.cluster_response_callbacks[command] = callback 

951 

952 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: 

953 # Determine which nodes should be executed the command on. 

954 # Returns a list of target nodes. 

955 command = args[0].upper() 

956 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

957 command = f"{args[0]} {args[1]}".upper() 

958 

959 nodes_flag = kwargs.pop("nodes_flag", None) 

960 if nodes_flag is not None: 

961 # nodes flag passed by the user 

962 command_flag = nodes_flag 

963 else: 

964 # get the nodes group for this command if it was predefined 

965 command_flag = self.command_flags.get(command) 

966 if command_flag == self.__class__.RANDOM: 

967 # return a random node 

968 return [self.get_random_node()] 

969 elif command_flag == self.__class__.PRIMARIES: 

970 # return all primaries 

971 return self.get_primaries() 

972 elif command_flag == self.__class__.REPLICAS: 

973 # return all replicas 

974 return self.get_replicas() 

975 elif command_flag == self.__class__.ALL_NODES: 

976 # return all nodes 

977 return self.get_nodes() 

978 elif command_flag == self.__class__.DEFAULT_NODE: 

979 # return the cluster's default node 

980 return [self.nodes_manager.default_node] 

981 elif command in self.__class__.SEARCH_COMMANDS[0]: 

982 return [self.nodes_manager.default_node] 

983 else: 

984 # get the node that holds the key's slot 

985 slot = self.determine_slot(*args) 

986 node = self.nodes_manager.get_node_from_slot( 

987 slot, 

988 self.read_from_replicas and command in READ_COMMANDS, 

989 self.load_balancing_strategy if command in READ_COMMANDS else None, 

990 ) 

991 return [node] 

992 

993 def _should_reinitialized(self): 

994 # To reinitialize the cluster on every MOVED error, 

995 # set reinitialize_steps to 1. 

996 # To avoid reinitializing the cluster on moved errors, set 

997 # reinitialize_steps to 0. 

998 if self.reinitialize_steps == 0: 

999 return False 

1000 else: 

1001 return self.reinitialize_counter % self.reinitialize_steps == 0 

1002 

1003 def keyslot(self, key): 

1004 """ 

1005 Calculate keyslot for a given key. 

1006 See Keys distribution model in https://redis.io/topics/cluster-spec 

1007 """ 

1008 k = self.encoder.encode(key) 

1009 return key_slot(k) 

1010 

1011 def _get_command_keys(self, *args): 

1012 """ 

1013 Get the keys in the command. If the command has no keys in in, None is 

1014 returned. 

1015 

1016 NOTE: Due to a bug in redis<7.0, this function does not work properly 

1017 for EVAL or EVALSHA when the `numkeys` arg is 0. 

1018 - issue: https://github.com/redis/redis/issues/9493 

1019 - fix: https://github.com/redis/redis/pull/9733 

1020 

1021 So, don't use this function with EVAL or EVALSHA. 

1022 """ 

1023 redis_conn = self.get_default_node().redis_connection 

1024 return self.commands_parser.get_keys(redis_conn, *args) 

1025 

1026 def determine_slot(self, *args) -> int: 

1027 """ 

1028 Figure out what slot to use based on args. 

1029 

1030 Raises a RedisClusterException if there's a missing key and we can't 

1031 determine what slots to map the command to; or, if the keys don't 

1032 all map to the same key slot. 

1033 """ 

1034 command = args[0] 

1035 if self.command_flags.get(command) == SLOT_ID: 

1036 # The command contains the slot ID 

1037 return args[1] 

1038 

1039 # Get the keys in the command 

1040 

1041 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

1042 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

1043 # where `self._get_command_keys()` fails anyway. So, we special case 

1044 # EVAL/EVALSHA. 

1045 if command.upper() in ("EVAL", "EVALSHA"): 

1046 # command syntax: EVAL "script body" num_keys ... 

1047 if len(args) <= 2: 

1048 raise RedisClusterException(f"Invalid args in command: {args}") 

1049 num_actual_keys = int(args[2]) 

1050 eval_keys = args[3 : 3 + num_actual_keys] 

1051 # if there are 0 keys, that means the script can be run on any node 

1052 # so we can just return a random slot 

1053 if len(eval_keys) == 0: 

1054 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1055 keys = eval_keys 

1056 else: 

1057 keys = self._get_command_keys(*args) 

1058 if keys is None or len(keys) == 0: 

1059 # FCALL can call a function with 0 keys, that means the function 

1060 # can be run on any node so we can just return a random slot 

1061 if command.upper() in ("FCALL", "FCALL_RO"): 

1062 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1063 raise RedisClusterException( 

1064 "No way to dispatch this command to Redis Cluster. " 

1065 "Missing key.\nYou can execute the command by specifying " 

1066 f"target nodes.\nCommand: {args}" 

1067 ) 

1068 

1069 # single key command 

1070 if len(keys) == 1: 

1071 return self.keyslot(keys[0]) 

1072 

1073 # multi-key command; we need to make sure all keys are mapped to 

1074 # the same slot 

1075 slots = {self.keyslot(key) for key in keys} 

1076 if len(slots) != 1: 

1077 raise RedisClusterException( 

1078 f"{command} - all keys must map to the same key slot" 

1079 ) 

1080 

1081 return slots.pop() 

1082 

1083 def get_encoder(self): 

1084 """ 

1085 Get the connections' encoder 

1086 """ 

1087 return self.encoder 

1088 

1089 def get_connection_kwargs(self): 

1090 """ 

1091 Get the connections' key-word arguments 

1092 """ 

1093 return self.nodes_manager.connection_kwargs 

1094 

1095 def _is_nodes_flag(self, target_nodes): 

1096 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1097 

1098 def _parse_target_nodes(self, target_nodes): 

1099 if isinstance(target_nodes, list): 

1100 nodes = target_nodes 

1101 elif isinstance(target_nodes, ClusterNode): 

1102 # Supports passing a single ClusterNode as a variable 

1103 nodes = [target_nodes] 

1104 elif isinstance(target_nodes, dict): 

1105 # Supports dictionaries of the format {node_name: node}. 

1106 # It enables to execute commands with multi nodes as follows: 

1107 # rc.cluster_save_config(rc.get_primaries()) 

1108 nodes = target_nodes.values() 

1109 else: 

1110 raise TypeError( 

1111 "target_nodes type can be one of the following: " 

1112 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1113 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1114 f"The passed type is {type(target_nodes)}" 

1115 ) 

1116 return nodes 

1117 

1118 def execute_command(self, *args, **kwargs): 

1119 return self._internal_execute_command(*args, **kwargs) 

1120 

1121 def _internal_execute_command(self, *args, **kwargs): 

1122 """ 

1123 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1124 

1125 It will try the number of times specified by the retries property from 

1126 config option "self.retry" which defaults to 3 unless manually 

1127 configured. 

1128 

1129 If it reaches the number of times, the command will raise the exception 

1130 

1131 Key argument :target_nodes: can be passed with the following types: 

1132 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1133 ClusterNode 

1134 list<ClusterNode> 

1135 dict<Any, ClusterNode> 

1136 """ 

1137 target_nodes_specified = False 

1138 is_default_node = False 

1139 target_nodes = None 

1140 passed_targets = kwargs.pop("target_nodes", None) 

1141 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1142 target_nodes = self._parse_target_nodes(passed_targets) 

1143 target_nodes_specified = True 

1144 # If an error that allows retrying was thrown, the nodes and slots 

1145 # cache were reinitialized. We will retry executing the command with 

1146 # the updated cluster setup only when the target nodes can be 

1147 # determined again with the new cache tables. Therefore, when target 

1148 # nodes were passed to this function, we cannot retry the command 

1149 # execution since the nodes may not be valid anymore after the tables 

1150 # were reinitialized. So in case of passed target nodes, 

1151 # retry_attempts will be set to 0. 

1152 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries() 

1153 # Add one for the first execution 

1154 execute_attempts = 1 + retry_attempts 

1155 for _ in range(execute_attempts): 

1156 try: 

1157 res = {} 

1158 if not target_nodes_specified: 

1159 # Determine the nodes to execute the command on 

1160 target_nodes = self._determine_nodes( 

1161 *args, **kwargs, nodes_flag=passed_targets 

1162 ) 

1163 if not target_nodes: 

1164 raise RedisClusterException( 

1165 f"No targets were found to execute {args} command on" 

1166 ) 

1167 if ( 

1168 len(target_nodes) == 1 

1169 and target_nodes[0] == self.get_default_node() 

1170 ): 

1171 is_default_node = True 

1172 for node in target_nodes: 

1173 res[node.name] = self._execute_command(node, *args, **kwargs) 

1174 # Return the processed result 

1175 return self._process_result(args[0], res, **kwargs) 

1176 except Exception as e: 

1177 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1178 if is_default_node: 

1179 # Replace the default cluster node 

1180 self.replace_default_node() 

1181 # The nodes and slots cache were reinitialized. 

1182 # Try again with the new cluster setup. 

1183 retry_attempts -= 1 

1184 continue 

1185 else: 

1186 # raise the exception 

1187 raise e 

1188 

1189 def _execute_command(self, target_node, *args, **kwargs): 

1190 """ 

1191 Send a command to a node in the cluster 

1192 """ 

1193 command = args[0] 

1194 redis_node = None 

1195 connection = None 

1196 redirect_addr = None 

1197 asking = False 

1198 moved = False 

1199 ttl = int(self.RedisClusterRequestTTL) 

1200 

1201 while ttl > 0: 

1202 ttl -= 1 

1203 try: 

1204 if asking: 

1205 target_node = self.get_node(node_name=redirect_addr) 

1206 elif moved: 

1207 # MOVED occurred and the slots cache was updated, 

1208 # refresh the target node 

1209 slot = self.determine_slot(*args) 

1210 target_node = self.nodes_manager.get_node_from_slot( 

1211 slot, 

1212 self.read_from_replicas and command in READ_COMMANDS, 

1213 self.load_balancing_strategy 

1214 if command in READ_COMMANDS 

1215 else None, 

1216 ) 

1217 moved = False 

1218 

1219 redis_node = self.get_redis_connection(target_node) 

1220 connection = get_connection(redis_node) 

1221 if asking: 

1222 connection.send_command("ASKING") 

1223 redis_node.parse_response(connection, "ASKING", **kwargs) 

1224 asking = False 

1225 connection.send_command(*args, **kwargs) 

1226 response = redis_node.parse_response(connection, command, **kwargs) 

1227 

1228 # Remove keys entry, it needs only for cache. 

1229 kwargs.pop("keys", None) 

1230 

1231 if command in self.cluster_response_callbacks: 

1232 response = self.cluster_response_callbacks[command]( 

1233 response, **kwargs 

1234 ) 

1235 return response 

1236 except AuthenticationError: 

1237 raise 

1238 except (ConnectionError, TimeoutError) as e: 

1239 # ConnectionError can also be raised if we couldn't get a 

1240 # connection from the pool before timing out, so check that 

1241 # this is an actual connection before attempting to disconnect. 

1242 if connection is not None: 

1243 connection.disconnect() 

1244 

1245 # Remove the failed node from the startup nodes before we try 

1246 # to reinitialize the cluster 

1247 self.nodes_manager.startup_nodes.pop(target_node.name, None) 

1248 # Reset the cluster node's connection 

1249 target_node.redis_connection = None 

1250 self.nodes_manager.initialize() 

1251 raise e 

1252 except MovedError as e: 

1253 # First, we will try to patch the slots/nodes cache with the 

1254 # redirected node output and try again. If MovedError exceeds 

1255 # 'reinitialize_steps' number of times, we will force 

1256 # reinitializing the tables, and then try again. 

1257 # 'reinitialize_steps' counter will increase faster when 

1258 # the same client object is shared between multiple threads. To 

1259 # reduce the frequency you can set this variable in the 

1260 # RedisCluster constructor. 

1261 self.reinitialize_counter += 1 

1262 if self._should_reinitialized(): 

1263 self.nodes_manager.initialize() 

1264 # Reset the counter 

1265 self.reinitialize_counter = 0 

1266 else: 

1267 self.nodes_manager.update_moved_exception(e) 

1268 moved = True 

1269 except TryAgainError: 

1270 if ttl < self.RedisClusterRequestTTL / 2: 

1271 time.sleep(0.05) 

1272 except AskError as e: 

1273 redirect_addr = get_node_name(host=e.host, port=e.port) 

1274 asking = True 

1275 except (ClusterDownError, SlotNotCoveredError): 

1276 # ClusterDownError can occur during a failover and to get 

1277 # self-healed, we will try to reinitialize the cluster layout 

1278 # and retry executing the command 

1279 

1280 # SlotNotCoveredError can occur when the cluster is not fully 

1281 # initialized or can be temporary issue. 

1282 # We will try to reinitialize the cluster topology 

1283 # and retry executing the command 

1284 

1285 time.sleep(0.25) 

1286 self.nodes_manager.initialize() 

1287 raise 

1288 except ResponseError: 

1289 raise 

1290 except Exception as e: 

1291 if connection: 

1292 connection.disconnect() 

1293 raise e 

1294 finally: 

1295 if connection is not None: 

1296 redis_node.connection_pool.release(connection) 

1297 

1298 raise ClusterError("TTL exhausted.") 

1299 

1300 def close(self) -> None: 

1301 try: 

1302 with self._lock: 

1303 if self.nodes_manager: 

1304 self.nodes_manager.close() 

1305 except AttributeError: 

1306 # RedisCluster's __init__ can fail before nodes_manager is set 

1307 pass 

1308 

1309 def _process_result(self, command, res, **kwargs): 

1310 """ 

1311 Process the result of the executed command. 

1312 The function would return a dict or a single value. 

1313 

1314 :type command: str 

1315 :type res: dict 

1316 

1317 `res` should be in the following format: 

1318 Dict<node_name, command_result> 

1319 """ 

1320 if command in self.result_callbacks: 

1321 return self.result_callbacks[command](command, res, **kwargs) 

1322 elif len(res) == 1: 

1323 # When we execute the command on a single node, we can 

1324 # remove the dictionary and return a single response 

1325 return list(res.values())[0] 

1326 else: 

1327 return res 

1328 

1329 def load_external_module(self, funcname, func): 

1330 """ 

1331 This function can be used to add externally defined redis modules, 

1332 and their namespaces to the redis client. 

1333 

1334 ``funcname`` - A string containing the name of the function to create 

1335 ``func`` - The function, being added to this class. 

1336 """ 

1337 setattr(self, funcname, func) 

1338 

1339 def transaction(self, func, *watches, **kwargs): 

1340 """ 

1341 Convenience method for executing the callable `func` as a transaction 

1342 while watching all keys specified in `watches`. The 'func' callable 

1343 should expect a single argument which is a Pipeline object. 

1344 """ 

1345 shard_hint = kwargs.pop("shard_hint", None) 

1346 value_from_callable = kwargs.pop("value_from_callable", False) 

1347 watch_delay = kwargs.pop("watch_delay", None) 

1348 with self.pipeline(True, shard_hint) as pipe: 

1349 while True: 

1350 try: 

1351 if watches: 

1352 pipe.watch(*watches) 

1353 func_value = func(pipe) 

1354 exec_value = pipe.execute() 

1355 return func_value if value_from_callable else exec_value 

1356 except WatchError: 

1357 if watch_delay is not None and watch_delay > 0: 

1358 time.sleep(watch_delay) 

1359 continue 

1360 

1361 

1362class ClusterNode: 

1363 def __init__(self, host, port, server_type=None, redis_connection=None): 

1364 if host == "localhost": 

1365 host = socket.gethostbyname(host) 

1366 

1367 self.host = host 

1368 self.port = port 

1369 self.name = get_node_name(host, port) 

1370 self.server_type = server_type 

1371 self.redis_connection = redis_connection 

1372 

1373 def __repr__(self): 

1374 return ( 

1375 f"[host={self.host}," 

1376 f"port={self.port}," 

1377 f"name={self.name}," 

1378 f"server_type={self.server_type}," 

1379 f"redis_connection={self.redis_connection}]" 

1380 ) 

1381 

1382 def __eq__(self, obj): 

1383 return isinstance(obj, ClusterNode) and obj.name == self.name 

1384 

1385 def __del__(self): 

1386 try: 

1387 if self.redis_connection is not None: 

1388 self.redis_connection.close() 

1389 except Exception: 

1390 # Ignore errors when closing the connection 

1391 pass 

1392 

1393 

1394class LoadBalancingStrategy(Enum): 

1395 ROUND_ROBIN = "round_robin" 

1396 ROUND_ROBIN_REPLICAS = "round_robin_replicas" 

1397 RANDOM_REPLICA = "random_replica" 

1398 

1399 

1400class LoadBalancer: 

1401 """ 

1402 Round-Robin Load Balancing 

1403 """ 

1404 

1405 def __init__(self, start_index: int = 0) -> None: 

1406 self.primary_to_idx = {} 

1407 self.start_index = start_index 

1408 

1409 def get_server_index( 

1410 self, 

1411 primary: str, 

1412 list_size: int, 

1413 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN, 

1414 ) -> int: 

1415 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA: 

1416 return self._get_random_replica_index(list_size) 

1417 else: 

1418 return self._get_round_robin_index( 

1419 primary, 

1420 list_size, 

1421 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, 

1422 ) 

1423 

1424 def reset(self) -> None: 

1425 self.primary_to_idx.clear() 

1426 

1427 def _get_random_replica_index(self, list_size: int) -> int: 

1428 return random.randint(1, list_size - 1) 

1429 

1430 def _get_round_robin_index( 

1431 self, primary: str, list_size: int, replicas_only: bool 

1432 ) -> int: 

1433 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

1434 if replicas_only and server_index == 0: 

1435 # skip the primary node index 

1436 server_index = 1 

1437 # Update the index for the next round 

1438 self.primary_to_idx[primary] = (server_index + 1) % list_size 

1439 return server_index 

1440 

1441 

1442class NodesManager: 

1443 def __init__( 

1444 self, 

1445 startup_nodes, 

1446 from_url=False, 

1447 require_full_coverage=False, 

1448 lock=None, 

1449 dynamic_startup_nodes=True, 

1450 connection_pool_class=ConnectionPool, 

1451 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

1452 cache: Optional[CacheInterface] = None, 

1453 cache_config: Optional[CacheConfig] = None, 

1454 cache_factory: Optional[CacheFactoryInterface] = None, 

1455 event_dispatcher: Optional[EventDispatcher] = None, 

1456 **kwargs, 

1457 ): 

1458 self.nodes_cache: Dict[str, Redis] = {} 

1459 self.slots_cache = {} 

1460 self.startup_nodes = {} 

1461 self.default_node = None 

1462 self.populate_startup_nodes(startup_nodes) 

1463 self.from_url = from_url 

1464 self._require_full_coverage = require_full_coverage 

1465 self._dynamic_startup_nodes = dynamic_startup_nodes 

1466 self.connection_pool_class = connection_pool_class 

1467 self.address_remap = address_remap 

1468 self._cache = cache 

1469 self._cache_config = cache_config 

1470 self._cache_factory = cache_factory 

1471 self._moved_exception = None 

1472 self.connection_kwargs = kwargs 

1473 self.read_load_balancer = LoadBalancer() 

1474 if lock is None: 

1475 lock = threading.RLock() 

1476 self._lock = lock 

1477 if event_dispatcher is None: 

1478 self._event_dispatcher = EventDispatcher() 

1479 else: 

1480 self._event_dispatcher = event_dispatcher 

1481 self._credential_provider = self.connection_kwargs.get( 

1482 "credential_provider", None 

1483 ) 

1484 self.initialize() 

1485 

1486 def get_node(self, host=None, port=None, node_name=None): 

1487 """ 

1488 Get the requested node from the cluster's nodes. 

1489 nodes. 

1490 :return: ClusterNode if the node exists, else None 

1491 """ 

1492 if host and port: 

1493 # the user passed host and port 

1494 if host == "localhost": 

1495 host = socket.gethostbyname(host) 

1496 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1497 elif node_name: 

1498 return self.nodes_cache.get(node_name) 

1499 else: 

1500 return None 

1501 

1502 def update_moved_exception(self, exception): 

1503 self._moved_exception = exception 

1504 

1505 def _update_moved_slots(self): 

1506 """ 

1507 Update the slot's node with the redirected one 

1508 """ 

1509 e = self._moved_exception 

1510 redirected_node = self.get_node(host=e.host, port=e.port) 

1511 if redirected_node is not None: 

1512 # The node already exists 

1513 if redirected_node.server_type is not PRIMARY: 

1514 # Update the node's server type 

1515 redirected_node.server_type = PRIMARY 

1516 else: 

1517 # This is a new node, we will add it to the nodes cache 

1518 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

1519 self.nodes_cache[redirected_node.name] = redirected_node 

1520 if redirected_node in self.slots_cache[e.slot_id]: 

1521 # The MOVED error resulted from a failover, and the new slot owner 

1522 # had previously been a replica. 

1523 old_primary = self.slots_cache[e.slot_id][0] 

1524 # Update the old primary to be a replica and add it to the end of 

1525 # the slot's node list 

1526 old_primary.server_type = REPLICA 

1527 self.slots_cache[e.slot_id].append(old_primary) 

1528 # Remove the old replica, which is now a primary, from the slot's 

1529 # node list 

1530 self.slots_cache[e.slot_id].remove(redirected_node) 

1531 # Override the old primary with the new one 

1532 self.slots_cache[e.slot_id][0] = redirected_node 

1533 if self.default_node == old_primary: 

1534 # Update the default node with the new primary 

1535 self.default_node = redirected_node 

1536 else: 

1537 # The new slot owner is a new server, or a server from a different 

1538 # shard. We need to remove all current nodes from the slot's list 

1539 # (including replications) and add just the new node. 

1540 self.slots_cache[e.slot_id] = [redirected_node] 

1541 # Reset moved_exception 

1542 self._moved_exception = None 

1543 

1544 @deprecated_args( 

1545 args_to_warn=["server_type"], 

1546 reason=( 

1547 "In case you need select some load balancing strategy " 

1548 "that will use replicas, please set it through 'load_balancing_strategy'" 

1549 ), 

1550 version="5.3.0", 

1551 ) 

1552 def get_node_from_slot( 

1553 self, 

1554 slot, 

1555 read_from_replicas=False, 

1556 load_balancing_strategy=None, 

1557 server_type=None, 

1558 ) -> ClusterNode: 

1559 """ 

1560 Gets a node that servers this hash slot 

1561 """ 

1562 if self._moved_exception: 

1563 with self._lock: 

1564 if self._moved_exception: 

1565 self._update_moved_slots() 

1566 

1567 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

1568 raise SlotNotCoveredError( 

1569 f'Slot "{slot}" not covered by the cluster. ' 

1570 f'"require_full_coverage={self._require_full_coverage}"' 

1571 ) 

1572 

1573 if read_from_replicas is True and load_balancing_strategy is None: 

1574 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN 

1575 

1576 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: 

1577 # get the server index using the strategy defined in load_balancing_strategy 

1578 primary_name = self.slots_cache[slot][0].name 

1579 node_idx = self.read_load_balancer.get_server_index( 

1580 primary_name, len(self.slots_cache[slot]), load_balancing_strategy 

1581 ) 

1582 elif ( 

1583 server_type is None 

1584 or server_type == PRIMARY 

1585 or len(self.slots_cache[slot]) == 1 

1586 ): 

1587 # return a primary 

1588 node_idx = 0 

1589 else: 

1590 # return a replica 

1591 # randomly choose one of the replicas 

1592 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

1593 

1594 return self.slots_cache[slot][node_idx] 

1595 

1596 def get_nodes_by_server_type(self, server_type): 

1597 """ 

1598 Get all nodes with the specified server type 

1599 :param server_type: 'primary' or 'replica' 

1600 :return: list of ClusterNode 

1601 """ 

1602 return [ 

1603 node 

1604 for node in self.nodes_cache.values() 

1605 if node.server_type == server_type 

1606 ] 

1607 

1608 def populate_startup_nodes(self, nodes): 

1609 """ 

1610 Populate all startup nodes and filters out any duplicates 

1611 """ 

1612 for n in nodes: 

1613 self.startup_nodes[n.name] = n 

1614 

1615 def check_slots_coverage(self, slots_cache): 

1616 # Validate if all slots are covered or if we should try next 

1617 # startup node 

1618 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

1619 if i not in slots_cache: 

1620 return False 

1621 return True 

1622 

1623 def create_redis_connections(self, nodes): 

1624 """ 

1625 This function will create a redis connection to all nodes in :nodes: 

1626 """ 

1627 connection_pools = [] 

1628 for node in nodes: 

1629 if node.redis_connection is None: 

1630 node.redis_connection = self.create_redis_node( 

1631 host=node.host, port=node.port, **self.connection_kwargs 

1632 ) 

1633 connection_pools.append(node.redis_connection.connection_pool) 

1634 

1635 self._event_dispatcher.dispatch( 

1636 AfterPooledConnectionsInstantiationEvent( 

1637 connection_pools, ClientType.SYNC, self._credential_provider 

1638 ) 

1639 ) 

1640 

1641 def create_redis_node(self, host, port, **kwargs): 

1642 # We are configuring the connection pool not to retry 

1643 # connections on lower level clients to avoid retrying 

1644 # connections to nodes that are not reachable 

1645 # and to avoid blocking the connection pool. 

1646 # The only error that will have some handling in the lower 

1647 # level clients is ConnectionError which will trigger disconnection 

1648 # of the socket. 

1649 # The retries will be handled on cluster client level 

1650 # where we will have proper handling of the cluster topology 

1651 node_retry_config = Retry( 

1652 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) 

1653 ) 

1654 

1655 if self.from_url: 

1656 # Create a redis node with a costumed connection pool 

1657 kwargs.update({"host": host}) 

1658 kwargs.update({"port": port}) 

1659 kwargs.update({"cache": self._cache}) 

1660 kwargs.update({"retry": node_retry_config}) 

1661 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

1662 else: 

1663 r = Redis( 

1664 host=host, 

1665 port=port, 

1666 cache=self._cache, 

1667 retry=node_retry_config, 

1668 **kwargs, 

1669 ) 

1670 return r 

1671 

1672 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

1673 node_name = get_node_name(host, port) 

1674 # check if we already have this node in the tmp_nodes_cache 

1675 target_node = tmp_nodes_cache.get(node_name) 

1676 if target_node is None: 

1677 # before creating a new cluster node, check if the cluster node already 

1678 # exists in the current nodes cache and has a valid connection so we can 

1679 # reuse it 

1680 target_node = self.nodes_cache.get(node_name) 

1681 if target_node is None or target_node.redis_connection is None: 

1682 # create new cluster node for this cluster 

1683 target_node = ClusterNode(host, port, role) 

1684 if target_node.server_type != role: 

1685 target_node.server_type = role 

1686 # add this node to the nodes cache 

1687 tmp_nodes_cache[target_node.name] = target_node 

1688 

1689 return target_node 

1690 

1691 def initialize(self): 

1692 """ 

1693 Initializes the nodes cache, slots cache and redis connections. 

1694 :startup_nodes: 

1695 Responsible for discovering other nodes in the cluster 

1696 """ 

1697 self.reset() 

1698 tmp_nodes_cache = {} 

1699 tmp_slots = {} 

1700 disagreements = [] 

1701 startup_nodes_reachable = False 

1702 fully_covered = False 

1703 kwargs = self.connection_kwargs 

1704 exception = None 

1705 # Convert to tuple to prevent RuntimeError if self.startup_nodes 

1706 # is modified during iteration 

1707 for startup_node in tuple(self.startup_nodes.values()): 

1708 try: 

1709 if startup_node.redis_connection: 

1710 r = startup_node.redis_connection 

1711 else: 

1712 # Create a new Redis connection 

1713 r = self.create_redis_node( 

1714 startup_node.host, startup_node.port, **kwargs 

1715 ) 

1716 self.startup_nodes[startup_node.name].redis_connection = r 

1717 # Make sure cluster mode is enabled on this node 

1718 try: 

1719 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

1720 r.connection_pool.disconnect() 

1721 except ResponseError: 

1722 raise RedisClusterException( 

1723 "Cluster mode is not enabled on this node" 

1724 ) 

1725 startup_nodes_reachable = True 

1726 except Exception as e: 

1727 # Try the next startup node. 

1728 # The exception is saved and raised only if we have no more nodes. 

1729 exception = e 

1730 continue 

1731 

1732 # CLUSTER SLOTS command results in the following output: 

1733 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

1734 # where each node contains the following list: [IP, port, node_id] 

1735 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

1736 # primary node of the first slot section. 

1737 # If there's only one server in the cluster, its ``host`` is '' 

1738 # Fix it to the host in startup_nodes 

1739 if ( 

1740 len(cluster_slots) == 1 

1741 and len(cluster_slots[0][2][0]) == 0 

1742 and len(self.startup_nodes) == 1 

1743 ): 

1744 cluster_slots[0][2][0] = startup_node.host 

1745 

1746 for slot in cluster_slots: 

1747 primary_node = slot[2] 

1748 host = str_if_bytes(primary_node[0]) 

1749 if host == "": 

1750 host = startup_node.host 

1751 port = int(primary_node[1]) 

1752 host, port = self.remap_host_port(host, port) 

1753 

1754 nodes_for_slot = [] 

1755 

1756 target_node = self._get_or_create_cluster_node( 

1757 host, port, PRIMARY, tmp_nodes_cache 

1758 ) 

1759 nodes_for_slot.append(target_node) 

1760 

1761 replica_nodes = slot[3:] 

1762 for replica_node in replica_nodes: 

1763 host = str_if_bytes(replica_node[0]) 

1764 port = int(replica_node[1]) 

1765 host, port = self.remap_host_port(host, port) 

1766 target_replica_node = self._get_or_create_cluster_node( 

1767 host, port, REPLICA, tmp_nodes_cache 

1768 ) 

1769 nodes_for_slot.append(target_replica_node) 

1770 

1771 for i in range(int(slot[0]), int(slot[1]) + 1): 

1772 if i not in tmp_slots: 

1773 tmp_slots[i] = nodes_for_slot 

1774 else: 

1775 # Validate that 2 nodes want to use the same slot cache 

1776 # setup 

1777 tmp_slot = tmp_slots[i][0] 

1778 if tmp_slot.name != target_node.name: 

1779 disagreements.append( 

1780 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

1781 ) 

1782 

1783 if len(disagreements) > 5: 

1784 raise RedisClusterException( 

1785 f"startup_nodes could not agree on a valid " 

1786 f"slots cache: {', '.join(disagreements)}" 

1787 ) 

1788 

1789 fully_covered = self.check_slots_coverage(tmp_slots) 

1790 if fully_covered: 

1791 # Don't need to continue to the next startup node if all 

1792 # slots are covered 

1793 break 

1794 

1795 if not startup_nodes_reachable: 

1796 raise RedisClusterException( 

1797 f"Redis Cluster cannot be connected. Please provide at least " 

1798 f"one reachable node: {str(exception)}" 

1799 ) from exception 

1800 

1801 if self._cache is None and self._cache_config is not None: 

1802 if self._cache_factory is None: 

1803 self._cache = CacheFactory(self._cache_config).get_cache() 

1804 else: 

1805 self._cache = self._cache_factory.get_cache() 

1806 

1807 # Create Redis connections to all nodes 

1808 self.create_redis_connections(list(tmp_nodes_cache.values())) 

1809 

1810 # Check if the slots are not fully covered 

1811 if not fully_covered and self._require_full_coverage: 

1812 # Despite the requirement that the slots be covered, there 

1813 # isn't a full coverage 

1814 raise RedisClusterException( 

1815 f"All slots are not covered after query all startup_nodes. " 

1816 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

1817 f"covered..." 

1818 ) 

1819 

1820 # Set the tmp variables to the real variables 

1821 self.nodes_cache = tmp_nodes_cache 

1822 self.slots_cache = tmp_slots 

1823 # Set the default node 

1824 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

1825 if self._dynamic_startup_nodes: 

1826 # Populate the startup nodes with all discovered nodes 

1827 self.startup_nodes = tmp_nodes_cache 

1828 # If initialize was called after a MovedError, clear it 

1829 self._moved_exception = None 

1830 

1831 def close(self) -> None: 

1832 self.default_node = None 

1833 for node in self.nodes_cache.values(): 

1834 if node.redis_connection: 

1835 node.redis_connection.close() 

1836 

1837 def reset(self): 

1838 try: 

1839 self.read_load_balancer.reset() 

1840 except TypeError: 

1841 # The read_load_balancer is None, do nothing 

1842 pass 

1843 

1844 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

1845 """ 

1846 Remap the host and port returned from the cluster to a different 

1847 internal value. Useful if the client is not connecting directly 

1848 to the cluster. 

1849 """ 

1850 if self.address_remap: 

1851 return self.address_remap((host, port)) 

1852 return host, port 

1853 

1854 def find_connection_owner(self, connection: Connection) -> Optional[Redis]: 

1855 node_name = get_node_name(connection.host, connection.port) 

1856 for node in tuple(self.nodes_cache.values()): 

1857 if node.redis_connection: 

1858 conn_args = node.redis_connection.connection_pool.connection_kwargs 

1859 if node_name == get_node_name( 

1860 conn_args.get("host"), conn_args.get("port") 

1861 ): 

1862 return node 

1863 

1864 

1865class ClusterPubSub(PubSub): 

1866 """ 

1867 Wrapper for PubSub class. 

1868 

1869 IMPORTANT: before using ClusterPubSub, read about the known limitations 

1870 with pubsub in Cluster mode and learn how to workaround them: 

1871 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html 

1872 """ 

1873 

1874 def __init__( 

1875 self, 

1876 redis_cluster, 

1877 node=None, 

1878 host=None, 

1879 port=None, 

1880 push_handler_func=None, 

1881 event_dispatcher: Optional["EventDispatcher"] = None, 

1882 **kwargs, 

1883 ): 

1884 """ 

1885 When a pubsub instance is created without specifying a node, a single 

1886 node will be transparently chosen for the pubsub connection on the 

1887 first command execution. The node will be determined by: 

1888 1. Hashing the channel name in the request to find its keyslot 

1889 2. Selecting a node that handles the keyslot: If read_from_replicas is 

1890 set to true or load_balancing_strategy is set, a replica can be selected. 

1891 

1892 :type redis_cluster: RedisCluster 

1893 :type node: ClusterNode 

1894 :type host: str 

1895 :type port: int 

1896 """ 

1897 self.node = None 

1898 self.set_pubsub_node(redis_cluster, node, host, port) 

1899 connection_pool = ( 

1900 None 

1901 if self.node is None 

1902 else redis_cluster.get_redis_connection(self.node).connection_pool 

1903 ) 

1904 self.cluster = redis_cluster 

1905 self.node_pubsub_mapping = {} 

1906 self._pubsubs_generator = self._pubsubs_generator() 

1907 if event_dispatcher is None: 

1908 self._event_dispatcher = EventDispatcher() 

1909 else: 

1910 self._event_dispatcher = event_dispatcher 

1911 super().__init__( 

1912 connection_pool=connection_pool, 

1913 encoder=redis_cluster.encoder, 

1914 push_handler_func=push_handler_func, 

1915 event_dispatcher=self._event_dispatcher, 

1916 **kwargs, 

1917 ) 

1918 

1919 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

1920 """ 

1921 The pubsub node will be set according to the passed node, host and port 

1922 When none of the node, host, or port are specified - the node is set 

1923 to None and will be determined by the keyslot of the channel in the 

1924 first command to be executed. 

1925 RedisClusterException will be thrown if the passed node does not exist 

1926 in the cluster. 

1927 If host is passed without port, or vice versa, a DataError will be 

1928 thrown. 

1929 :type cluster: RedisCluster 

1930 :type node: ClusterNode 

1931 :type host: str 

1932 :type port: int 

1933 """ 

1934 if node is not None: 

1935 # node is passed by the user 

1936 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

1937 pubsub_node = node 

1938 elif host is not None and port is not None: 

1939 # host and port passed by the user 

1940 node = cluster.get_node(host=host, port=port) 

1941 self._raise_on_invalid_node(cluster, node, host, port) 

1942 pubsub_node = node 

1943 elif any([host, port]) is True: 

1944 # only 'host' or 'port' passed 

1945 raise DataError("Passing a host requires passing a port, and vice versa") 

1946 else: 

1947 # nothing passed by the user. set node to None 

1948 pubsub_node = None 

1949 

1950 self.node = pubsub_node 

1951 

1952 def get_pubsub_node(self): 

1953 """ 

1954 Get the node that is being used as the pubsub connection 

1955 """ 

1956 return self.node 

1957 

1958 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

1959 """ 

1960 Raise a RedisClusterException if the node is None or doesn't exist in 

1961 the cluster. 

1962 """ 

1963 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

1964 raise RedisClusterException( 

1965 f"Node {host}:{port} doesn't exist in the cluster" 

1966 ) 

1967 

1968 def execute_command(self, *args): 

1969 """ 

1970 Execute a subscribe/unsubscribe command. 

1971 

1972 Taken code from redis-py and tweak to make it work within a cluster. 

1973 """ 

1974 # NOTE: don't parse the response in this function -- it could pull a 

1975 # legitimate message off the stack if the connection is already 

1976 # subscribed to one or more channels 

1977 

1978 if self.connection is None: 

1979 if self.connection_pool is None: 

1980 if len(args) > 1: 

1981 # Hash the first channel and get one of the nodes holding 

1982 # this slot 

1983 channel = args[1] 

1984 slot = self.cluster.keyslot(channel) 

1985 node = self.cluster.nodes_manager.get_node_from_slot( 

1986 slot, 

1987 self.cluster.read_from_replicas, 

1988 self.cluster.load_balancing_strategy, 

1989 ) 

1990 else: 

1991 # Get a random node 

1992 node = self.cluster.get_random_node() 

1993 self.node = node 

1994 redis_connection = self.cluster.get_redis_connection(node) 

1995 self.connection_pool = redis_connection.connection_pool 

1996 self.connection = self.connection_pool.get_connection() 

1997 # register a callback that re-subscribes to any channels we 

1998 # were listening to when we were disconnected 

1999 self.connection.register_connect_callback(self.on_connect) 

2000 if self.push_handler_func is not None: 

2001 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

2002 self._event_dispatcher.dispatch( 

2003 AfterPubSubConnectionInstantiationEvent( 

2004 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

2005 ) 

2006 ) 

2007 connection = self.connection 

2008 self._execute(connection, connection.send_command, *args) 

2009 

2010 def _get_node_pubsub(self, node): 

2011 try: 

2012 return self.node_pubsub_mapping[node.name] 

2013 except KeyError: 

2014 pubsub = node.redis_connection.pubsub( 

2015 push_handler_func=self.push_handler_func 

2016 ) 

2017 self.node_pubsub_mapping[node.name] = pubsub 

2018 return pubsub 

2019 

2020 def _sharded_message_generator(self): 

2021 for _ in range(len(self.node_pubsub_mapping)): 

2022 pubsub = next(self._pubsubs_generator) 

2023 message = pubsub.get_message() 

2024 if message is not None: 

2025 return message 

2026 return None 

2027 

2028 def _pubsubs_generator(self): 

2029 while True: 

2030 yield from self.node_pubsub_mapping.values() 

2031 

2032 def get_sharded_message( 

2033 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None 

2034 ): 

2035 if target_node: 

2036 message = self.node_pubsub_mapping[target_node.name].get_message( 

2037 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout 

2038 ) 

2039 else: 

2040 message = self._sharded_message_generator() 

2041 if message is None: 

2042 return None 

2043 elif str_if_bytes(message["type"]) == "sunsubscribe": 

2044 if message["channel"] in self.pending_unsubscribe_shard_channels: 

2045 self.pending_unsubscribe_shard_channels.remove(message["channel"]) 

2046 self.shard_channels.pop(message["channel"], None) 

2047 node = self.cluster.get_node_from_key(message["channel"]) 

2048 if self.node_pubsub_mapping[node.name].subscribed is False: 

2049 self.node_pubsub_mapping.pop(node.name) 

2050 if not self.channels and not self.patterns and not self.shard_channels: 

2051 # There are no subscriptions anymore, set subscribed_event flag 

2052 # to false 

2053 self.subscribed_event.clear() 

2054 if self.ignore_subscribe_messages or ignore_subscribe_messages: 

2055 return None 

2056 return message 

2057 

2058 def ssubscribe(self, *args, **kwargs): 

2059 if args: 

2060 args = list_or_args(args[0], args[1:]) 

2061 s_channels = dict.fromkeys(args) 

2062 s_channels.update(kwargs) 

2063 for s_channel, handler in s_channels.items(): 

2064 node = self.cluster.get_node_from_key(s_channel) 

2065 pubsub = self._get_node_pubsub(node) 

2066 if handler: 

2067 pubsub.ssubscribe(**{s_channel: handler}) 

2068 else: 

2069 pubsub.ssubscribe(s_channel) 

2070 self.shard_channels.update(pubsub.shard_channels) 

2071 self.pending_unsubscribe_shard_channels.difference_update( 

2072 self._normalize_keys({s_channel: None}) 

2073 ) 

2074 if pubsub.subscribed and not self.subscribed: 

2075 self.subscribed_event.set() 

2076 self.health_check_response_counter = 0 

2077 

2078 def sunsubscribe(self, *args): 

2079 if args: 

2080 args = list_or_args(args[0], args[1:]) 

2081 else: 

2082 args = self.shard_channels 

2083 

2084 for s_channel in args: 

2085 node = self.cluster.get_node_from_key(s_channel) 

2086 p = self._get_node_pubsub(node) 

2087 p.sunsubscribe(s_channel) 

2088 self.pending_unsubscribe_shard_channels.update( 

2089 p.pending_unsubscribe_shard_channels 

2090 ) 

2091 

2092 def get_redis_connection(self): 

2093 """ 

2094 Get the Redis connection of the pubsub connected node. 

2095 """ 

2096 if self.node is not None: 

2097 return self.node.redis_connection 

2098 

2099 def disconnect(self): 

2100 """ 

2101 Disconnect the pubsub connection. 

2102 """ 

2103 if self.connection: 

2104 self.connection.disconnect() 

2105 for pubsub in self.node_pubsub_mapping.values(): 

2106 pubsub.connection.disconnect() 

2107 

2108 

2109class ClusterPipeline(RedisCluster): 

2110 """ 

2111 Support for Redis pipeline 

2112 in cluster mode 

2113 """ 

2114 

2115 ERRORS_ALLOW_RETRY = ( 

2116 ConnectionError, 

2117 TimeoutError, 

2118 MovedError, 

2119 AskError, 

2120 TryAgainError, 

2121 ) 

2122 

2123 NO_SLOTS_COMMANDS = {"UNWATCH"} 

2124 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

2125 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

2126 

2127 @deprecated_args( 

2128 args_to_warn=[ 

2129 "cluster_error_retry_attempts", 

2130 ], 

2131 reason="Please configure the 'retry' object instead", 

2132 version="6.0.0", 

2133 ) 

2134 def __init__( 

2135 self, 

2136 nodes_manager: "NodesManager", 

2137 commands_parser: "CommandsParser", 

2138 result_callbacks: Optional[Dict[str, Callable]] = None, 

2139 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

2140 startup_nodes: Optional[List["ClusterNode"]] = None, 

2141 read_from_replicas: bool = False, 

2142 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

2143 cluster_error_retry_attempts: int = 3, 

2144 reinitialize_steps: int = 5, 

2145 retry: Optional[Retry] = None, 

2146 lock=None, 

2147 transaction=False, 

2148 **kwargs, 

2149 ): 

2150 """ """ 

2151 self.command_stack = [] 

2152 self.nodes_manager = nodes_manager 

2153 self.commands_parser = commands_parser 

2154 self.refresh_table_asap = False 

2155 self.result_callbacks = ( 

2156 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

2157 ) 

2158 self.startup_nodes = startup_nodes if startup_nodes else [] 

2159 self.read_from_replicas = read_from_replicas 

2160 self.load_balancing_strategy = load_balancing_strategy 

2161 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

2162 self.cluster_response_callbacks = cluster_response_callbacks 

2163 self.reinitialize_counter = 0 

2164 self.reinitialize_steps = reinitialize_steps 

2165 if retry is not None: 

2166 self.retry = retry 

2167 else: 

2168 self.retry = Retry( 

2169 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

2170 retries=cluster_error_retry_attempts, 

2171 ) 

2172 

2173 self.encoder = Encoder( 

2174 kwargs.get("encoding", "utf-8"), 

2175 kwargs.get("encoding_errors", "strict"), 

2176 kwargs.get("decode_responses", False), 

2177 ) 

2178 if lock is None: 

2179 lock = threading.RLock() 

2180 self._lock = lock 

2181 self.parent_execute_command = super().execute_command 

2182 self._execution_strategy: ExecutionStrategy = ( 

2183 PipelineStrategy(self) if not transaction else TransactionStrategy(self) 

2184 ) 

2185 

2186 def __repr__(self): 

2187 """ """ 

2188 return f"{type(self).__name__}" 

2189 

2190 def __enter__(self): 

2191 """ """ 

2192 return self 

2193 

2194 def __exit__(self, exc_type, exc_value, traceback): 

2195 """ """ 

2196 self.reset() 

2197 

2198 def __del__(self): 

2199 try: 

2200 self.reset() 

2201 except Exception: 

2202 pass 

2203 

2204 def __len__(self): 

2205 """ """ 

2206 return len(self._execution_strategy.command_queue) 

2207 

2208 def __bool__(self): 

2209 "Pipeline instances should always evaluate to True on Python 3+" 

2210 return True 

2211 

2212 def execute_command(self, *args, **kwargs): 

2213 """ 

2214 Wrapper function for pipeline_execute_command 

2215 """ 

2216 return self._execution_strategy.execute_command(*args, **kwargs) 

2217 

2218 def pipeline_execute_command(self, *args, **options): 

2219 """ 

2220 Stage a command to be executed when execute() is next called 

2221 

2222 Returns the current Pipeline object back so commands can be 

2223 chained together, such as: 

2224 

2225 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

2226 

2227 At some other point, you can then run: pipe.execute(), 

2228 which will execute all commands queued in the pipe. 

2229 """ 

2230 return self._execution_strategy.execute_command(*args, **options) 

2231 

2232 def annotate_exception(self, exception, number, command): 

2233 """ 

2234 Provides extra context to the exception prior to it being handled 

2235 """ 

2236 self._execution_strategy.annotate_exception(exception, number, command) 

2237 

2238 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2239 """ 

2240 Execute all the commands in the current pipeline 

2241 """ 

2242 

2243 try: 

2244 return self._execution_strategy.execute(raise_on_error) 

2245 finally: 

2246 self.reset() 

2247 

2248 def reset(self): 

2249 """ 

2250 Reset back to empty pipeline. 

2251 """ 

2252 self._execution_strategy.reset() 

2253 

2254 def send_cluster_commands( 

2255 self, stack, raise_on_error=True, allow_redirections=True 

2256 ): 

2257 return self._execution_strategy.send_cluster_commands( 

2258 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections 

2259 ) 

2260 

2261 def exists(self, *keys): 

2262 return self._execution_strategy.exists(*keys) 

2263 

2264 def eval(self): 

2265 """ """ 

2266 return self._execution_strategy.eval() 

2267 

2268 def multi(self): 

2269 """ 

2270 Start a transactional block of the pipeline after WATCH commands 

2271 are issued. End the transactional block with `execute`. 

2272 """ 

2273 self._execution_strategy.multi() 

2274 

2275 def load_scripts(self): 

2276 """ """ 

2277 self._execution_strategy.load_scripts() 

2278 

2279 def discard(self): 

2280 """ """ 

2281 self._execution_strategy.discard() 

2282 

2283 def watch(self, *names): 

2284 """Watches the values at keys ``names``""" 

2285 self._execution_strategy.watch(*names) 

2286 

2287 def unwatch(self): 

2288 """Unwatches all previously specified keys""" 

2289 self._execution_strategy.unwatch() 

2290 

2291 def script_load_for_pipeline(self, *args, **kwargs): 

2292 self._execution_strategy.script_load_for_pipeline(*args, **kwargs) 

2293 

2294 def delete(self, *names): 

2295 self._execution_strategy.delete(*names) 

2296 

2297 def unlink(self, *names): 

2298 self._execution_strategy.unlink(*names) 

2299 

2300 

2301def block_pipeline_command(name: str) -> Callable[..., Any]: 

2302 """ 

2303 Prints error because some pipelined commands should 

2304 be blocked when running in cluster-mode 

2305 """ 

2306 

2307 def inner(*args, **kwargs): 

2308 raise RedisClusterException( 

2309 f"ERROR: Calling pipelined function {name} is blocked " 

2310 f"when running redis in cluster mode..." 

2311 ) 

2312 

2313 return inner 

2314 

2315 

2316# Blocked pipeline commands 

2317PIPELINE_BLOCKED_COMMANDS = ( 

2318 "BGREWRITEAOF", 

2319 "BGSAVE", 

2320 "BITOP", 

2321 "BRPOPLPUSH", 

2322 "CLIENT GETNAME", 

2323 "CLIENT KILL", 

2324 "CLIENT LIST", 

2325 "CLIENT SETNAME", 

2326 "CLIENT", 

2327 "CONFIG GET", 

2328 "CONFIG RESETSTAT", 

2329 "CONFIG REWRITE", 

2330 "CONFIG SET", 

2331 "CONFIG", 

2332 "DBSIZE", 

2333 "ECHO", 

2334 "EVALSHA", 

2335 "FLUSHALL", 

2336 "FLUSHDB", 

2337 "INFO", 

2338 "KEYS", 

2339 "LASTSAVE", 

2340 "MGET", 

2341 "MGET NONATOMIC", 

2342 "MOVE", 

2343 "MSET", 

2344 "MSET NONATOMIC", 

2345 "MSETNX", 

2346 "PFCOUNT", 

2347 "PFMERGE", 

2348 "PING", 

2349 "PUBLISH", 

2350 "RANDOMKEY", 

2351 "READONLY", 

2352 "READWRITE", 

2353 "RENAME", 

2354 "RENAMENX", 

2355 "RPOPLPUSH", 

2356 "SAVE", 

2357 "SCAN", 

2358 "SCRIPT EXISTS", 

2359 "SCRIPT FLUSH", 

2360 "SCRIPT KILL", 

2361 "SCRIPT LOAD", 

2362 "SCRIPT", 

2363 "SDIFF", 

2364 "SDIFFSTORE", 

2365 "SENTINEL GET MASTER ADDR BY NAME", 

2366 "SENTINEL MASTER", 

2367 "SENTINEL MASTERS", 

2368 "SENTINEL MONITOR", 

2369 "SENTINEL REMOVE", 

2370 "SENTINEL SENTINELS", 

2371 "SENTINEL SET", 

2372 "SENTINEL SLAVES", 

2373 "SENTINEL", 

2374 "SHUTDOWN", 

2375 "SINTER", 

2376 "SINTERSTORE", 

2377 "SLAVEOF", 

2378 "SLOWLOG GET", 

2379 "SLOWLOG LEN", 

2380 "SLOWLOG RESET", 

2381 "SLOWLOG", 

2382 "SMOVE", 

2383 "SORT", 

2384 "SUNION", 

2385 "SUNIONSTORE", 

2386 "TIME", 

2387) 

2388for command in PIPELINE_BLOCKED_COMMANDS: 

2389 command = command.replace(" ", "_").lower() 

2390 

2391 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

2392 

2393 

2394class PipelineCommand: 

2395 """ """ 

2396 

2397 def __init__(self, args, options=None, position=None): 

2398 self.args = args 

2399 if options is None: 

2400 options = {} 

2401 self.options = options 

2402 self.position = position 

2403 self.result = None 

2404 self.node = None 

2405 self.asking = False 

2406 

2407 

2408class NodeCommands: 

2409 """ """ 

2410 

2411 def __init__(self, parse_response, connection_pool, connection): 

2412 """ """ 

2413 self.parse_response = parse_response 

2414 self.connection_pool = connection_pool 

2415 self.connection = connection 

2416 self.commands = [] 

2417 

2418 def append(self, c): 

2419 """ """ 

2420 self.commands.append(c) 

2421 

2422 def write(self): 

2423 """ 

2424 Code borrowed from Redis so it can be fixed 

2425 """ 

2426 connection = self.connection 

2427 commands = self.commands 

2428 

2429 # We are going to clobber the commands with the write, so go ahead 

2430 # and ensure that nothing is sitting there from a previous run. 

2431 for c in commands: 

2432 c.result = None 

2433 

2434 # build up all commands into a single request to increase network perf 

2435 # send all the commands and catch connection and timeout errors. 

2436 try: 

2437 connection.send_packed_command( 

2438 connection.pack_commands([c.args for c in commands]) 

2439 ) 

2440 except (ConnectionError, TimeoutError) as e: 

2441 for c in commands: 

2442 c.result = e 

2443 

2444 def read(self): 

2445 """ """ 

2446 connection = self.connection 

2447 for c in self.commands: 

2448 # if there is a result on this command, 

2449 # it means we ran into an exception 

2450 # like a connection error. Trying to parse 

2451 # a response on a connection that 

2452 # is no longer open will result in a 

2453 # connection error raised by redis-py. 

2454 # but redis-py doesn't check in parse_response 

2455 # that the sock object is 

2456 # still set and if you try to 

2457 # read from a closed connection, it will 

2458 # result in an AttributeError because 

2459 # it will do a readline() call on None. 

2460 # This can have all kinds of nasty side-effects. 

2461 # Treating this case as a connection error 

2462 # is fine because it will dump 

2463 # the connection object back into the 

2464 # pool and on the next write, it will 

2465 # explicitly open the connection and all will be well. 

2466 if c.result is None: 

2467 try: 

2468 c.result = self.parse_response(connection, c.args[0], **c.options) 

2469 except (ConnectionError, TimeoutError) as e: 

2470 for c in self.commands: 

2471 c.result = e 

2472 return 

2473 except RedisError: 

2474 c.result = sys.exc_info()[1] 

2475 

2476 

2477class ExecutionStrategy(ABC): 

2478 @property 

2479 @abstractmethod 

2480 def command_queue(self): 

2481 pass 

2482 

2483 @abstractmethod 

2484 def execute_command(self, *args, **kwargs): 

2485 """ 

2486 Execution flow for current execution strategy. 

2487 

2488 See: ClusterPipeline.execute_command() 

2489 """ 

2490 pass 

2491 

2492 @abstractmethod 

2493 def annotate_exception(self, exception, number, command): 

2494 """ 

2495 Annotate exception according to current execution strategy. 

2496 

2497 See: ClusterPipeline.annotate_exception() 

2498 """ 

2499 pass 

2500 

2501 @abstractmethod 

2502 def pipeline_execute_command(self, *args, **options): 

2503 """ 

2504 Pipeline execution flow for current execution strategy. 

2505 

2506 See: ClusterPipeline.pipeline_execute_command() 

2507 """ 

2508 pass 

2509 

2510 @abstractmethod 

2511 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2512 """ 

2513 Executes current execution strategy. 

2514 

2515 See: ClusterPipeline.execute() 

2516 """ 

2517 pass 

2518 

2519 @abstractmethod 

2520 def send_cluster_commands( 

2521 self, stack, raise_on_error=True, allow_redirections=True 

2522 ): 

2523 """ 

2524 Sends commands according to current execution strategy. 

2525 

2526 See: ClusterPipeline.send_cluster_commands() 

2527 """ 

2528 pass 

2529 

2530 @abstractmethod 

2531 def reset(self): 

2532 """ 

2533 Resets current execution strategy. 

2534 

2535 See: ClusterPipeline.reset() 

2536 """ 

2537 pass 

2538 

2539 @abstractmethod 

2540 def exists(self, *keys): 

2541 pass 

2542 

2543 @abstractmethod 

2544 def eval(self): 

2545 pass 

2546 

2547 @abstractmethod 

2548 def multi(self): 

2549 """ 

2550 Starts transactional context. 

2551 

2552 See: ClusterPipeline.multi() 

2553 """ 

2554 pass 

2555 

2556 @abstractmethod 

2557 def load_scripts(self): 

2558 pass 

2559 

2560 @abstractmethod 

2561 def watch(self, *names): 

2562 pass 

2563 

2564 @abstractmethod 

2565 def unwatch(self): 

2566 """ 

2567 Unwatches all previously specified keys 

2568 

2569 See: ClusterPipeline.unwatch() 

2570 """ 

2571 pass 

2572 

2573 @abstractmethod 

2574 def script_load_for_pipeline(self, *args, **kwargs): 

2575 pass 

2576 

2577 @abstractmethod 

2578 def delete(self, *names): 

2579 """ 

2580 "Delete a key specified by ``names``" 

2581 

2582 See: ClusterPipeline.delete() 

2583 """ 

2584 pass 

2585 

2586 @abstractmethod 

2587 def unlink(self, *names): 

2588 """ 

2589 "Unlink a key specified by ``names``" 

2590 

2591 See: ClusterPipeline.unlink() 

2592 """ 

2593 pass 

2594 

2595 @abstractmethod 

2596 def discard(self): 

2597 pass 

2598 

2599 

2600class AbstractStrategy(ExecutionStrategy): 

2601 def __init__( 

2602 self, 

2603 pipe: ClusterPipeline, 

2604 ): 

2605 self._command_queue: List[PipelineCommand] = [] 

2606 self._pipe = pipe 

2607 self._nodes_manager = self._pipe.nodes_manager 

2608 

2609 @property 

2610 def command_queue(self): 

2611 return self._command_queue 

2612 

2613 @command_queue.setter 

2614 def command_queue(self, queue: List[PipelineCommand]): 

2615 self._command_queue = queue 

2616 

2617 @abstractmethod 

2618 def execute_command(self, *args, **kwargs): 

2619 pass 

2620 

2621 def pipeline_execute_command(self, *args, **options): 

2622 self._command_queue.append( 

2623 PipelineCommand(args, options, len(self._command_queue)) 

2624 ) 

2625 return self._pipe 

2626 

2627 @abstractmethod 

2628 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2629 pass 

2630 

2631 @abstractmethod 

2632 def send_cluster_commands( 

2633 self, stack, raise_on_error=True, allow_redirections=True 

2634 ): 

2635 pass 

2636 

2637 @abstractmethod 

2638 def reset(self): 

2639 pass 

2640 

2641 def exists(self, *keys): 

2642 return self.execute_command("EXISTS", *keys) 

2643 

2644 def eval(self): 

2645 """ """ 

2646 raise RedisClusterException("method eval() is not implemented") 

2647 

2648 def load_scripts(self): 

2649 """ """ 

2650 raise RedisClusterException("method load_scripts() is not implemented") 

2651 

2652 def script_load_for_pipeline(self, *args, **kwargs): 

2653 """ """ 

2654 raise RedisClusterException( 

2655 "method script_load_for_pipeline() is not implemented" 

2656 ) 

2657 

2658 def annotate_exception(self, exception, number, command): 

2659 """ 

2660 Provides extra context to the exception prior to it being handled 

2661 """ 

2662 cmd = " ".join(map(safe_str, command)) 

2663 msg = ( 

2664 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

2665 f"caused error: {exception.args[0]}" 

2666 ) 

2667 exception.args = (msg,) + exception.args[1:] 

2668 

2669 

2670class PipelineStrategy(AbstractStrategy): 

2671 def __init__(self, pipe: ClusterPipeline): 

2672 super().__init__(pipe) 

2673 self.command_flags = pipe.command_flags 

2674 

2675 def execute_command(self, *args, **kwargs): 

2676 return self.pipeline_execute_command(*args, **kwargs) 

2677 

2678 def _raise_first_error(self, stack): 

2679 """ 

2680 Raise the first exception on the stack 

2681 """ 

2682 for c in stack: 

2683 r = c.result 

2684 if isinstance(r, Exception): 

2685 self.annotate_exception(r, c.position + 1, c.args) 

2686 raise r 

2687 

2688 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2689 stack = self._command_queue 

2690 if not stack: 

2691 return [] 

2692 

2693 try: 

2694 return self.send_cluster_commands(stack, raise_on_error) 

2695 finally: 

2696 self.reset() 

2697 

2698 def reset(self): 

2699 """ 

2700 Reset back to empty pipeline. 

2701 """ 

2702 self._command_queue = [] 

2703 

2704 def send_cluster_commands( 

2705 self, stack, raise_on_error=True, allow_redirections=True 

2706 ): 

2707 """ 

2708 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling. 

2709 

2710 If one of the retryable exceptions has been thrown we assume that: 

2711 - connection_pool was disconnected 

2712 - connection_pool was reseted 

2713 - refereh_table_asap set to True 

2714 

2715 It will try the number of times specified by 

2716 the retries in config option "self.retry" 

2717 which defaults to 3 unless manually configured. 

2718 

2719 If it reaches the number of times, the command will 

2720 raises ClusterDownException. 

2721 """ 

2722 if not stack: 

2723 return [] 

2724 retry_attempts = self._pipe.retry.get_retries() 

2725 while True: 

2726 try: 

2727 return self._send_cluster_commands( 

2728 stack, 

2729 raise_on_error=raise_on_error, 

2730 allow_redirections=allow_redirections, 

2731 ) 

2732 except RedisCluster.ERRORS_ALLOW_RETRY as e: 

2733 if retry_attempts > 0: 

2734 # Try again with the new cluster setup. All other errors 

2735 # should be raised. 

2736 retry_attempts -= 1 

2737 pass 

2738 else: 

2739 raise e 

2740 

2741 def _send_cluster_commands( 

2742 self, stack, raise_on_error=True, allow_redirections=True 

2743 ): 

2744 """ 

2745 Send a bunch of cluster commands to the redis cluster. 

2746 

2747 `allow_redirections` If the pipeline should follow 

2748 `ASK` & `MOVED` responses automatically. If set 

2749 to false it will raise RedisClusterException. 

2750 """ 

2751 # the first time sending the commands we send all of 

2752 # the commands that were queued up. 

2753 # if we have to run through it again, we only retry 

2754 # the commands that failed. 

2755 attempt = sorted(stack, key=lambda x: x.position) 

2756 is_default_node = False 

2757 # build a list of node objects based on node names we need to 

2758 nodes = {} 

2759 

2760 # as we move through each command that still needs to be processed, 

2761 # we figure out the slot number that command maps to, then from 

2762 # the slot determine the node. 

2763 for c in attempt: 

2764 while True: 

2765 # refer to our internal node -> slot table that 

2766 # tells us where a given command should route to. 

2767 # (it might be possible we have a cached node that no longer 

2768 # exists in the cluster, which is why we do this in a loop) 

2769 passed_targets = c.options.pop("target_nodes", None) 

2770 if passed_targets and not self._is_nodes_flag(passed_targets): 

2771 target_nodes = self._parse_target_nodes(passed_targets) 

2772 else: 

2773 target_nodes = self._determine_nodes( 

2774 *c.args, node_flag=passed_targets 

2775 ) 

2776 if not target_nodes: 

2777 raise RedisClusterException( 

2778 f"No targets were found to execute {c.args} command on" 

2779 ) 

2780 if len(target_nodes) > 1: 

2781 raise RedisClusterException( 

2782 f"Too many targets for command {c.args}" 

2783 ) 

2784 

2785 node = target_nodes[0] 

2786 if node == self._pipe.get_default_node(): 

2787 is_default_node = True 

2788 

2789 # now that we know the name of the node 

2790 # ( it's just a string in the form of host:port ) 

2791 # we can build a list of commands for each node. 

2792 node_name = node.name 

2793 if node_name not in nodes: 

2794 redis_node = self._pipe.get_redis_connection(node) 

2795 try: 

2796 connection = get_connection(redis_node) 

2797 except (ConnectionError, TimeoutError): 

2798 for n in nodes.values(): 

2799 n.connection_pool.release(n.connection) 

2800 # Connection retries are being handled in the node's 

2801 # Retry object. Reinitialize the node -> slot table. 

2802 self._nodes_manager.initialize() 

2803 if is_default_node: 

2804 self._pipe.replace_default_node() 

2805 raise 

2806 nodes[node_name] = NodeCommands( 

2807 redis_node.parse_response, 

2808 redis_node.connection_pool, 

2809 connection, 

2810 ) 

2811 nodes[node_name].append(c) 

2812 break 

2813 

2814 # send the commands in sequence. 

2815 # we write to all the open sockets for each node first, 

2816 # before reading anything 

2817 # this allows us to flush all the requests out across the 

2818 # network 

2819 # so that we can read them from different sockets as they come back. 

2820 # we dont' multiplex on the sockets as they come available, 

2821 # but that shouldn't make too much difference. 

2822 try: 

2823 node_commands = nodes.values() 

2824 for n in node_commands: 

2825 n.write() 

2826 

2827 for n in node_commands: 

2828 n.read() 

2829 finally: 

2830 # release all of the redis connections we allocated earlier 

2831 # back into the connection pool. 

2832 # we used to do this step as part of a try/finally block, 

2833 # but it is really dangerous to 

2834 # release connections back into the pool if for some 

2835 # reason the socket has data still left in it 

2836 # from a previous operation. The write and 

2837 # read operations already have try/catch around them for 

2838 # all known types of errors including connection 

2839 # and socket level errors. 

2840 # So if we hit an exception, something really bad 

2841 # happened and putting any oF 

2842 # these connections back into the pool is a very bad idea. 

2843 # the socket might have unread buffer still sitting in it, 

2844 # and then the next time we read from it we pass the 

2845 # buffered result back from a previous command and 

2846 # every single request after to that connection will always get 

2847 # a mismatched result. 

2848 for n in nodes.values(): 

2849 n.connection_pool.release(n.connection) 

2850 

2851 # if the response isn't an exception it is a 

2852 # valid response from the node 

2853 # we're all done with that command, YAY! 

2854 # if we have more commands to attempt, we've run into problems. 

2855 # collect all the commands we are allowed to retry. 

2856 # (MOVED, ASK, or connection errors or timeout errors) 

2857 attempt = sorted( 

2858 ( 

2859 c 

2860 for c in attempt 

2861 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

2862 ), 

2863 key=lambda x: x.position, 

2864 ) 

2865 if attempt and allow_redirections: 

2866 # RETRY MAGIC HAPPENS HERE! 

2867 # send these remaining commands one at a time using `execute_command` 

2868 # in the main client. This keeps our retry logic 

2869 # in one place mostly, 

2870 # and allows us to be more confident in correctness of behavior. 

2871 # at this point any speed gains from pipelining have been lost 

2872 # anyway, so we might as well make the best 

2873 # attempt to get the correct behavior. 

2874 # 

2875 # The client command will handle retries for each 

2876 # individual command sequentially as we pass each 

2877 # one into `execute_command`. Any exceptions 

2878 # that bubble out should only appear once all 

2879 # retries have been exhausted. 

2880 # 

2881 # If a lot of commands have failed, we'll be setting the 

2882 # flag to rebuild the slots table from scratch. 

2883 # So MOVED errors should correct themselves fairly quickly. 

2884 self._pipe.reinitialize_counter += 1 

2885 if self._pipe._should_reinitialized(): 

2886 self._nodes_manager.initialize() 

2887 if is_default_node: 

2888 self._pipe.replace_default_node() 

2889 for c in attempt: 

2890 try: 

2891 # send each command individually like we 

2892 # do in the main client. 

2893 c.result = self._pipe.parent_execute_command(*c.args, **c.options) 

2894 except RedisError as e: 

2895 c.result = e 

2896 

2897 # turn the response back into a simple flat array that corresponds 

2898 # to the sequence of commands issued in the stack in pipeline.execute() 

2899 response = [] 

2900 for c in sorted(stack, key=lambda x: x.position): 

2901 if c.args[0] in self._pipe.cluster_response_callbacks: 

2902 # Remove keys entry, it needs only for cache. 

2903 c.options.pop("keys", None) 

2904 c.result = self._pipe.cluster_response_callbacks[c.args[0]]( 

2905 c.result, **c.options 

2906 ) 

2907 response.append(c.result) 

2908 

2909 if raise_on_error: 

2910 self._raise_first_error(stack) 

2911 

2912 return response 

2913 

2914 def _is_nodes_flag(self, target_nodes): 

2915 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags 

2916 

2917 def _parse_target_nodes(self, target_nodes): 

2918 if isinstance(target_nodes, list): 

2919 nodes = target_nodes 

2920 elif isinstance(target_nodes, ClusterNode): 

2921 # Supports passing a single ClusterNode as a variable 

2922 nodes = [target_nodes] 

2923 elif isinstance(target_nodes, dict): 

2924 # Supports dictionaries of the format {node_name: node}. 

2925 # It enables to execute commands with multi nodes as follows: 

2926 # rc.cluster_save_config(rc.get_primaries()) 

2927 nodes = target_nodes.values() 

2928 else: 

2929 raise TypeError( 

2930 "target_nodes type can be one of the following: " 

2931 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

2932 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

2933 f"The passed type is {type(target_nodes)}" 

2934 ) 

2935 return nodes 

2936 

2937 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: 

2938 # Determine which nodes should be executed the command on. 

2939 # Returns a list of target nodes. 

2940 command = args[0].upper() 

2941 if ( 

2942 len(args) >= 2 

2943 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags 

2944 ): 

2945 command = f"{args[0]} {args[1]}".upper() 

2946 

2947 nodes_flag = kwargs.pop("nodes_flag", None) 

2948 if nodes_flag is not None: 

2949 # nodes flag passed by the user 

2950 command_flag = nodes_flag 

2951 else: 

2952 # get the nodes group for this command if it was predefined 

2953 command_flag = self._pipe.command_flags.get(command) 

2954 if command_flag == self._pipe.RANDOM: 

2955 # return a random node 

2956 return [self._pipe.get_random_node()] 

2957 elif command_flag == self._pipe.PRIMARIES: 

2958 # return all primaries 

2959 return self._pipe.get_primaries() 

2960 elif command_flag == self._pipe.REPLICAS: 

2961 # return all replicas 

2962 return self._pipe.get_replicas() 

2963 elif command_flag == self._pipe.ALL_NODES: 

2964 # return all nodes 

2965 return self._pipe.get_nodes() 

2966 elif command_flag == self._pipe.DEFAULT_NODE: 

2967 # return the cluster's default node 

2968 return [self._nodes_manager.default_node] 

2969 elif command in self._pipe.SEARCH_COMMANDS[0]: 

2970 return [self._nodes_manager.default_node] 

2971 else: 

2972 # get the node that holds the key's slot 

2973 slot = self._pipe.determine_slot(*args) 

2974 node = self._nodes_manager.get_node_from_slot( 

2975 slot, 

2976 self._pipe.read_from_replicas and command in READ_COMMANDS, 

2977 self._pipe.load_balancing_strategy 

2978 if command in READ_COMMANDS 

2979 else None, 

2980 ) 

2981 return [node] 

2982 

2983 def multi(self): 

2984 raise RedisClusterException( 

2985 "method multi() is not supported outside of transactional context" 

2986 ) 

2987 

2988 def discard(self): 

2989 raise RedisClusterException( 

2990 "method discard() is not supported outside of transactional context" 

2991 ) 

2992 

2993 def watch(self, *names): 

2994 raise RedisClusterException( 

2995 "method watch() is not supported outside of transactional context" 

2996 ) 

2997 

2998 def unwatch(self, *names): 

2999 raise RedisClusterException( 

3000 "method unwatch() is not supported outside of transactional context" 

3001 ) 

3002 

3003 def delete(self, *names): 

3004 if len(names) != 1: 

3005 raise RedisClusterException( 

3006 "deleting multiple keys is not implemented in pipeline command" 

3007 ) 

3008 

3009 return self.execute_command("DEL", names[0]) 

3010 

3011 def unlink(self, *names): 

3012 if len(names) != 1: 

3013 raise RedisClusterException( 

3014 "unlinking multiple keys is not implemented in pipeline command" 

3015 ) 

3016 

3017 return self.execute_command("UNLINK", names[0]) 

3018 

3019 

3020class TransactionStrategy(AbstractStrategy): 

3021 NO_SLOTS_COMMANDS = {"UNWATCH"} 

3022 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

3023 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

3024 SLOT_REDIRECT_ERRORS = (AskError, MovedError) 

3025 CONNECTION_ERRORS = ( 

3026 ConnectionError, 

3027 OSError, 

3028 ClusterDownError, 

3029 SlotNotCoveredError, 

3030 ) 

3031 

3032 def __init__(self, pipe: ClusterPipeline): 

3033 super().__init__(pipe) 

3034 self._explicit_transaction = False 

3035 self._watching = False 

3036 self._pipeline_slots: Set[int] = set() 

3037 self._transaction_connection: Optional[Connection] = None 

3038 self._executing = False 

3039 self._retry = copy(self._pipe.retry) 

3040 self._retry.update_supported_errors( 

3041 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS 

3042 ) 

3043 

3044 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]: 

3045 """ 

3046 Find a connection for a pipeline transaction. 

3047 

3048 For running an atomic transaction, watch keys ensure that contents have not been 

3049 altered as long as the watch commands for those keys were sent over the same 

3050 connection. So once we start watching a key, we fetch a connection to the 

3051 node that owns that slot and reuse it. 

3052 """ 

3053 if not self._pipeline_slots: 

3054 raise RedisClusterException( 

3055 "At least a command with a key is needed to identify a node" 

3056 ) 

3057 

3058 node: ClusterNode = self._nodes_manager.get_node_from_slot( 

3059 list(self._pipeline_slots)[0], False 

3060 ) 

3061 redis_node: Redis = self._pipe.get_redis_connection(node) 

3062 if self._transaction_connection: 

3063 if not redis_node.connection_pool.owns_connection( 

3064 self._transaction_connection 

3065 ): 

3066 previous_node = self._nodes_manager.find_connection_owner( 

3067 self._transaction_connection 

3068 ) 

3069 previous_node.connection_pool.release(self._transaction_connection) 

3070 self._transaction_connection = None 

3071 

3072 if not self._transaction_connection: 

3073 self._transaction_connection = get_connection(redis_node) 

3074 

3075 return redis_node, self._transaction_connection 

3076 

3077 def execute_command(self, *args, **kwargs): 

3078 slot_number: Optional[int] = None 

3079 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS: 

3080 slot_number = self._pipe.determine_slot(*args) 

3081 

3082 if ( 

3083 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS 

3084 ) and not self._explicit_transaction: 

3085 if args[0] == "WATCH": 

3086 self._validate_watch() 

3087 

3088 if slot_number is not None: 

3089 if self._pipeline_slots and slot_number not in self._pipeline_slots: 

3090 raise CrossSlotTransactionError( 

3091 "Cannot watch or send commands on different slots" 

3092 ) 

3093 

3094 self._pipeline_slots.add(slot_number) 

3095 elif args[0] not in self.NO_SLOTS_COMMANDS: 

3096 raise RedisClusterException( 

3097 f"Cannot identify slot number for command: {args[0]}," 

3098 "it cannot be triggered in a transaction" 

3099 ) 

3100 

3101 return self._immediate_execute_command(*args, **kwargs) 

3102 else: 

3103 if slot_number is not None: 

3104 self._pipeline_slots.add(slot_number) 

3105 

3106 return self.pipeline_execute_command(*args, **kwargs) 

3107 

3108 def _validate_watch(self): 

3109 if self._explicit_transaction: 

3110 raise RedisError("Cannot issue a WATCH after a MULTI") 

3111 

3112 self._watching = True 

3113 

3114 def _immediate_execute_command(self, *args, **options): 

3115 return self._retry.call_with_retry( 

3116 lambda: self._get_connection_and_send_command(*args, **options), 

3117 self._reinitialize_on_error, 

3118 ) 

3119 

3120 def _get_connection_and_send_command(self, *args, **options): 

3121 redis_node, connection = self._get_client_and_connection_for_transaction() 

3122 return self._send_command_parse_response( 

3123 connection, redis_node, args[0], *args, **options 

3124 ) 

3125 

3126 def _send_command_parse_response( 

3127 self, conn, redis_node: Redis, command_name, *args, **options 

3128 ): 

3129 """ 

3130 Send a command and parse the response 

3131 """ 

3132 

3133 conn.send_command(*args) 

3134 output = redis_node.parse_response(conn, command_name, **options) 

3135 

3136 if command_name in self.UNWATCH_COMMANDS: 

3137 self._watching = False 

3138 return output 

3139 

3140 def _reinitialize_on_error(self, error): 

3141 if self._watching: 

3142 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: 

3143 raise WatchError("Slot rebalancing occurred while watching keys") 

3144 

3145 if ( 

3146 type(error) in self.SLOT_REDIRECT_ERRORS 

3147 or type(error) in self.CONNECTION_ERRORS 

3148 ): 

3149 if self._transaction_connection: 

3150 self._transaction_connection = None 

3151 

3152 self._pipe.reinitialize_counter += 1 

3153 if self._pipe._should_reinitialized(): 

3154 self._nodes_manager.initialize() 

3155 self.reinitialize_counter = 0 

3156 else: 

3157 self._nodes_manager.update_moved_exception(error) 

3158 

3159 self._executing = False 

3160 

3161 def _raise_first_error(self, responses, stack): 

3162 """ 

3163 Raise the first exception on the stack 

3164 """ 

3165 for r, cmd in zip(responses, stack): 

3166 if isinstance(r, Exception): 

3167 self.annotate_exception(r, cmd.position + 1, cmd.args) 

3168 raise r 

3169 

3170 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3171 stack = self._command_queue 

3172 if not stack and (not self._watching or not self._pipeline_slots): 

3173 return [] 

3174 

3175 return self._execute_transaction_with_retries(stack, raise_on_error) 

3176 

3177 def _execute_transaction_with_retries( 

3178 self, stack: List["PipelineCommand"], raise_on_error: bool 

3179 ): 

3180 return self._retry.call_with_retry( 

3181 lambda: self._execute_transaction(stack, raise_on_error), 

3182 self._reinitialize_on_error, 

3183 ) 

3184 

3185 def _execute_transaction( 

3186 self, stack: List["PipelineCommand"], raise_on_error: bool 

3187 ): 

3188 if len(self._pipeline_slots) > 1: 

3189 raise CrossSlotTransactionError( 

3190 "All keys involved in a cluster transaction must map to the same slot" 

3191 ) 

3192 

3193 self._executing = True 

3194 

3195 redis_node, connection = self._get_client_and_connection_for_transaction() 

3196 

3197 stack = chain( 

3198 [PipelineCommand(("MULTI",))], 

3199 stack, 

3200 [PipelineCommand(("EXEC",))], 

3201 ) 

3202 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options] 

3203 packed_commands = connection.pack_commands(commands) 

3204 connection.send_packed_command(packed_commands) 

3205 errors = [] 

3206 

3207 # parse off the response for MULTI 

3208 # NOTE: we need to handle ResponseErrors here and continue 

3209 # so that we read all the additional command messages from 

3210 # the socket 

3211 try: 

3212 redis_node.parse_response(connection, "MULTI") 

3213 except ResponseError as e: 

3214 self.annotate_exception(e, 0, "MULTI") 

3215 errors.append(e) 

3216 except self.CONNECTION_ERRORS as cluster_error: 

3217 self.annotate_exception(cluster_error, 0, "MULTI") 

3218 raise 

3219 

3220 # and all the other commands 

3221 for i, command in enumerate(self._command_queue): 

3222 if EMPTY_RESPONSE in command.options: 

3223 errors.append((i, command.options[EMPTY_RESPONSE])) 

3224 else: 

3225 try: 

3226 _ = redis_node.parse_response(connection, "_") 

3227 except self.SLOT_REDIRECT_ERRORS as slot_error: 

3228 self.annotate_exception(slot_error, i + 1, command.args) 

3229 errors.append(slot_error) 

3230 except self.CONNECTION_ERRORS as cluster_error: 

3231 self.annotate_exception(cluster_error, i + 1, command.args) 

3232 raise 

3233 except ResponseError as e: 

3234 self.annotate_exception(e, i + 1, command.args) 

3235 errors.append(e) 

3236 

3237 response = None 

3238 # parse the EXEC. 

3239 try: 

3240 response = redis_node.parse_response(connection, "EXEC") 

3241 except ExecAbortError: 

3242 if errors: 

3243 raise errors[0] 

3244 raise 

3245 

3246 self._executing = False 

3247 

3248 # EXEC clears any watched keys 

3249 self._watching = False 

3250 

3251 if response is None: 

3252 raise WatchError("Watched variable changed.") 

3253 

3254 # put any parse errors into the response 

3255 for i, e in errors: 

3256 response.insert(i, e) 

3257 

3258 if len(response) != len(self._command_queue): 

3259 raise InvalidPipelineStack( 

3260 "Unexpected response length for cluster pipeline EXEC." 

3261 " Command stack was {} but response had length {}".format( 

3262 [c.args[0] for c in self._command_queue], len(response) 

3263 ) 

3264 ) 

3265 

3266 # find any errors in the response and raise if necessary 

3267 if raise_on_error or len(errors) > 0: 

3268 self._raise_first_error( 

3269 response, 

3270 self._command_queue, 

3271 ) 

3272 

3273 # We have to run response callbacks manually 

3274 data = [] 

3275 for r, cmd in zip(response, self._command_queue): 

3276 if not isinstance(r, Exception): 

3277 command_name = cmd.args[0] 

3278 if command_name in self._pipe.cluster_response_callbacks: 

3279 r = self._pipe.cluster_response_callbacks[command_name]( 

3280 r, **cmd.options 

3281 ) 

3282 data.append(r) 

3283 return data 

3284 

3285 def reset(self): 

3286 self._command_queue = [] 

3287 

3288 # make sure to reset the connection state in the event that we were 

3289 # watching something 

3290 if self._transaction_connection: 

3291 try: 

3292 if self._watching: 

3293 # call this manually since our unwatch or 

3294 # immediate_execute_command methods can call reset() 

3295 self._transaction_connection.send_command("UNWATCH") 

3296 self._transaction_connection.read_response() 

3297 # we can safely return the connection to the pool here since we're 

3298 # sure we're no longer WATCHing anything 

3299 node = self._nodes_manager.find_connection_owner( 

3300 self._transaction_connection 

3301 ) 

3302 node.redis_connection.connection_pool.release( 

3303 self._transaction_connection 

3304 ) 

3305 self._transaction_connection = None 

3306 except self.CONNECTION_ERRORS: 

3307 # disconnect will also remove any previous WATCHes 

3308 if self._transaction_connection: 

3309 self._transaction_connection.disconnect() 

3310 

3311 # clean up the other instance attributes 

3312 self._watching = False 

3313 self._explicit_transaction = False 

3314 self._pipeline_slots = set() 

3315 self._executing = False 

3316 

3317 def send_cluster_commands( 

3318 self, stack, raise_on_error=True, allow_redirections=True 

3319 ): 

3320 raise NotImplementedError( 

3321 "send_cluster_commands cannot be executed in transactional context." 

3322 ) 

3323 

3324 def multi(self): 

3325 if self._explicit_transaction: 

3326 raise RedisError("Cannot issue nested calls to MULTI") 

3327 if self._command_queue: 

3328 raise RedisError( 

3329 "Commands without an initial WATCH have already been issued" 

3330 ) 

3331 self._explicit_transaction = True 

3332 

3333 def watch(self, *names): 

3334 if self._explicit_transaction: 

3335 raise RedisError("Cannot issue a WATCH after a MULTI") 

3336 

3337 return self.execute_command("WATCH", *names) 

3338 

3339 def unwatch(self): 

3340 if self._watching: 

3341 return self.execute_command("UNWATCH") 

3342 

3343 return True 

3344 

3345 def discard(self): 

3346 self.reset() 

3347 

3348 def delete(self, *names): 

3349 return self.execute_command("DEL", *names) 

3350 

3351 def unlink(self, *names): 

3352 return self.execute_command("UNLINK", *names)