Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1335 statements  

1import random 

2import socket 

3import sys 

4import threading 

5import time 

6from abc import ABC, abstractmethod 

7from collections import OrderedDict 

8from copy import copy 

9from enum import Enum 

10from itertools import chain 

11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union 

12 

13from redis._parsers import CommandsParser, Encoder 

14from redis._parsers.helpers import parse_scan 

15from redis.backoff import ExponentialWithJitterBackoff, NoBackoff 

16from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface 

17from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis 

18from redis.commands import READ_COMMANDS, RedisClusterCommands 

19from redis.commands.helpers import list_or_args 

20from redis.connection import ( 

21 Connection, 

22 ConnectionPool, 

23 parse_url, 

24) 

25from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

26from redis.event import ( 

27 AfterPooledConnectionsInstantiationEvent, 

28 AfterPubSubConnectionInstantiationEvent, 

29 ClientType, 

30 EventDispatcher, 

31) 

32from redis.exceptions import ( 

33 AskError, 

34 AuthenticationError, 

35 ClusterDownError, 

36 ClusterError, 

37 ConnectionError, 

38 CrossSlotTransactionError, 

39 DataError, 

40 ExecAbortError, 

41 InvalidPipelineStack, 

42 MaxConnectionsError, 

43 MovedError, 

44 RedisClusterException, 

45 RedisError, 

46 ResponseError, 

47 SlotNotCoveredError, 

48 TimeoutError, 

49 TryAgainError, 

50 WatchError, 

51) 

52from redis.lock import Lock 

53from redis.retry import Retry 

54from redis.utils import ( 

55 deprecated_args, 

56 dict_merge, 

57 list_keys_to_dict, 

58 merge_result, 

59 safe_str, 

60 str_if_bytes, 

61 truncate_text, 

62) 

63 

64 

65def get_node_name(host: str, port: Union[str, int]) -> str: 

66 return f"{host}:{port}" 

67 

68 

69@deprecated_args( 

70 allowed_args=["redis_node"], 

71 reason="Use get_connection(redis_node) instead", 

72 version="5.3.0", 

73) 

74def get_connection(redis_node: Redis, *args, **options) -> Connection: 

75 return redis_node.connection or redis_node.connection_pool.get_connection() 

76 

77 

78def parse_scan_result(command, res, **options): 

79 cursors = {} 

80 ret = [] 

81 for node_name, response in res.items(): 

82 cursor, r = parse_scan(response, **options) 

83 cursors[node_name] = cursor 

84 ret += r 

85 

86 return cursors, ret 

87 

88 

89def parse_pubsub_numsub(command, res, **options): 

90 numsub_d = OrderedDict() 

91 for numsub_tups in res.values(): 

92 for channel, numsubbed in numsub_tups: 

93 try: 

94 numsub_d[channel] += numsubbed 

95 except KeyError: 

96 numsub_d[channel] = numsubbed 

97 

98 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

99 return ret_numsub 

100 

101 

102def parse_cluster_slots( 

103 resp: Any, **options: Any 

104) -> Dict[Tuple[int, int], Dict[str, Any]]: 

105 current_host = options.get("current_host", "") 

106 

107 def fix_server(*args: Any) -> Tuple[str, Any]: 

108 return str_if_bytes(args[0]) or current_host, args[1] 

109 

110 slots = {} 

111 for slot in resp: 

112 start, end, primary = slot[:3] 

113 replicas = slot[3:] 

114 slots[start, end] = { 

115 "primary": fix_server(*primary), 

116 "replicas": [fix_server(*replica) for replica in replicas], 

117 } 

118 

119 return slots 

120 

121 

122def parse_cluster_shards(resp, **options): 

123 """ 

124 Parse CLUSTER SHARDS response. 

125 """ 

126 if isinstance(resp[0], dict): 

127 return resp 

128 shards = [] 

129 for x in resp: 

130 shard = {"slots": [], "nodes": []} 

131 for i in range(0, len(x[1]), 2): 

132 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

133 nodes = x[3] 

134 for node in nodes: 

135 dict_node = {} 

136 for i in range(0, len(node), 2): 

137 dict_node[node[i]] = node[i + 1] 

138 shard["nodes"].append(dict_node) 

139 shards.append(shard) 

140 

141 return shards 

142 

143 

144def parse_cluster_myshardid(resp, **options): 

145 """ 

146 Parse CLUSTER MYSHARDID response. 

147 """ 

148 return resp.decode("utf-8") 

149 

150 

151PRIMARY = "primary" 

152REPLICA = "replica" 

153SLOT_ID = "slot-id" 

154 

155REDIS_ALLOWED_KEYS = ( 

156 "connection_class", 

157 "connection_pool", 

158 "connection_pool_class", 

159 "client_name", 

160 "credential_provider", 

161 "db", 

162 "decode_responses", 

163 "encoding", 

164 "encoding_errors", 

165 "host", 

166 "lib_name", 

167 "lib_version", 

168 "max_connections", 

169 "nodes_flag", 

170 "redis_connect_func", 

171 "password", 

172 "port", 

173 "timeout", 

174 "queue_class", 

175 "retry", 

176 "retry_on_timeout", 

177 "protocol", 

178 "socket_connect_timeout", 

179 "socket_keepalive", 

180 "socket_keepalive_options", 

181 "socket_timeout", 

182 "ssl", 

183 "ssl_ca_certs", 

184 "ssl_ca_data", 

185 "ssl_certfile", 

186 "ssl_cert_reqs", 

187 "ssl_include_verify_flags", 

188 "ssl_exclude_verify_flags", 

189 "ssl_keyfile", 

190 "ssl_password", 

191 "ssl_check_hostname", 

192 "unix_socket_path", 

193 "username", 

194 "cache", 

195 "cache_config", 

196) 

197KWARGS_DISABLED_KEYS = ("host", "port", "retry") 

198 

199 

200def cleanup_kwargs(**kwargs): 

201 """ 

202 Remove unsupported or disabled keys from kwargs 

203 """ 

204 connection_kwargs = { 

205 k: v 

206 for k, v in kwargs.items() 

207 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

208 } 

209 

210 return connection_kwargs 

211 

212 

213class AbstractRedisCluster: 

214 RedisClusterRequestTTL = 16 

215 

216 PRIMARIES = "primaries" 

217 REPLICAS = "replicas" 

218 ALL_NODES = "all" 

219 RANDOM = "random" 

220 DEFAULT_NODE = "default-node" 

221 

222 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

223 

224 COMMAND_FLAGS = dict_merge( 

225 list_keys_to_dict( 

226 [ 

227 "ACL CAT", 

228 "ACL DELUSER", 

229 "ACL DRYRUN", 

230 "ACL GENPASS", 

231 "ACL GETUSER", 

232 "ACL HELP", 

233 "ACL LIST", 

234 "ACL LOG", 

235 "ACL LOAD", 

236 "ACL SAVE", 

237 "ACL SETUSER", 

238 "ACL USERS", 

239 "ACL WHOAMI", 

240 "AUTH", 

241 "CLIENT LIST", 

242 "CLIENT SETINFO", 

243 "CLIENT SETNAME", 

244 "CLIENT GETNAME", 

245 "CONFIG SET", 

246 "CONFIG REWRITE", 

247 "CONFIG RESETSTAT", 

248 "TIME", 

249 "PUBSUB CHANNELS", 

250 "PUBSUB NUMPAT", 

251 "PUBSUB NUMSUB", 

252 "PUBSUB SHARDCHANNELS", 

253 "PUBSUB SHARDNUMSUB", 

254 "PING", 

255 "INFO", 

256 "SHUTDOWN", 

257 "KEYS", 

258 "DBSIZE", 

259 "BGSAVE", 

260 "SLOWLOG GET", 

261 "SLOWLOG LEN", 

262 "SLOWLOG RESET", 

263 "WAIT", 

264 "WAITAOF", 

265 "SAVE", 

266 "MEMORY PURGE", 

267 "MEMORY MALLOC-STATS", 

268 "MEMORY STATS", 

269 "LASTSAVE", 

270 "CLIENT TRACKINGINFO", 

271 "CLIENT PAUSE", 

272 "CLIENT UNPAUSE", 

273 "CLIENT UNBLOCK", 

274 "CLIENT ID", 

275 "CLIENT REPLY", 

276 "CLIENT GETREDIR", 

277 "CLIENT INFO", 

278 "CLIENT KILL", 

279 "READONLY", 

280 "CLUSTER INFO", 

281 "CLUSTER MEET", 

282 "CLUSTER MYSHARDID", 

283 "CLUSTER NODES", 

284 "CLUSTER REPLICAS", 

285 "CLUSTER RESET", 

286 "CLUSTER SET-CONFIG-EPOCH", 

287 "CLUSTER SLOTS", 

288 "CLUSTER SHARDS", 

289 "CLUSTER COUNT-FAILURE-REPORTS", 

290 "CLUSTER KEYSLOT", 

291 "COMMAND", 

292 "COMMAND COUNT", 

293 "COMMAND LIST", 

294 "COMMAND GETKEYS", 

295 "CONFIG GET", 

296 "DEBUG", 

297 "RANDOMKEY", 

298 "READONLY", 

299 "READWRITE", 

300 "TIME", 

301 "TFUNCTION LOAD", 

302 "TFUNCTION DELETE", 

303 "TFUNCTION LIST", 

304 "TFCALL", 

305 "TFCALLASYNC", 

306 "LATENCY HISTORY", 

307 "LATENCY LATEST", 

308 "LATENCY RESET", 

309 "MODULE LIST", 

310 "MODULE LOAD", 

311 "MODULE UNLOAD", 

312 "MODULE LOADEX", 

313 ], 

314 DEFAULT_NODE, 

315 ), 

316 list_keys_to_dict( 

317 [ 

318 "FLUSHALL", 

319 "FLUSHDB", 

320 "FUNCTION DELETE", 

321 "FUNCTION FLUSH", 

322 "FUNCTION LIST", 

323 "FUNCTION LOAD", 

324 "FUNCTION RESTORE", 

325 "SCAN", 

326 "SCRIPT EXISTS", 

327 "SCRIPT FLUSH", 

328 "SCRIPT LOAD", 

329 ], 

330 PRIMARIES, 

331 ), 

332 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

333 list_keys_to_dict( 

334 [ 

335 "CLUSTER COUNTKEYSINSLOT", 

336 "CLUSTER DELSLOTS", 

337 "CLUSTER DELSLOTSRANGE", 

338 "CLUSTER GETKEYSINSLOT", 

339 "CLUSTER SETSLOT", 

340 ], 

341 SLOT_ID, 

342 ), 

343 ) 

344 

345 SEARCH_COMMANDS = ( 

346 [ 

347 "FT.CREATE", 

348 "FT.SEARCH", 

349 "FT.AGGREGATE", 

350 "FT.EXPLAIN", 

351 "FT.EXPLAINCLI", 

352 "FT,PROFILE", 

353 "FT.ALTER", 

354 "FT.DROPINDEX", 

355 "FT.ALIASADD", 

356 "FT.ALIASUPDATE", 

357 "FT.ALIASDEL", 

358 "FT.TAGVALS", 

359 "FT.SUGADD", 

360 "FT.SUGGET", 

361 "FT.SUGDEL", 

362 "FT.SUGLEN", 

363 "FT.SYNUPDATE", 

364 "FT.SYNDUMP", 

365 "FT.SPELLCHECK", 

366 "FT.DICTADD", 

367 "FT.DICTDEL", 

368 "FT.DICTDUMP", 

369 "FT.INFO", 

370 "FT._LIST", 

371 "FT.CONFIG", 

372 "FT.ADD", 

373 "FT.DEL", 

374 "FT.DROP", 

375 "FT.GET", 

376 "FT.MGET", 

377 "FT.SYNADD", 

378 ], 

379 ) 

380 

381 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

382 "CLUSTER SLOTS": parse_cluster_slots, 

383 "CLUSTER SHARDS": parse_cluster_shards, 

384 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

385 } 

386 

387 RESULT_CALLBACKS = dict_merge( 

388 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub), 

389 list_keys_to_dict( 

390 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

391 ), 

392 list_keys_to_dict( 

393 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result 

394 ), 

395 list_keys_to_dict( 

396 [ 

397 "PING", 

398 "CONFIG SET", 

399 "CONFIG REWRITE", 

400 "CONFIG RESETSTAT", 

401 "CLIENT SETNAME", 

402 "BGSAVE", 

403 "SLOWLOG RESET", 

404 "SAVE", 

405 "MEMORY PURGE", 

406 "CLIENT PAUSE", 

407 "CLIENT UNPAUSE", 

408 ], 

409 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

410 ), 

411 list_keys_to_dict( 

412 ["DBSIZE", "WAIT"], 

413 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

414 ), 

415 list_keys_to_dict( 

416 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

417 ), 

418 list_keys_to_dict(["SCAN"], parse_scan_result), 

419 list_keys_to_dict( 

420 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

421 ), 

422 list_keys_to_dict( 

423 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

424 ), 

425 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

426 ) 

427 

428 ERRORS_ALLOW_RETRY = ( 

429 ConnectionError, 

430 TimeoutError, 

431 ClusterDownError, 

432 SlotNotCoveredError, 

433 ) 

434 

435 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

436 """Replace the default cluster node. 

437 A random cluster node will be chosen if target_node isn't passed, and primaries 

438 will be prioritized. The default node will not be changed if there are no other 

439 nodes in the cluster. 

440 

441 Args: 

442 target_node (ClusterNode, optional): Target node to replace the default 

443 node. Defaults to None. 

444 """ 

445 if target_node: 

446 self.nodes_manager.default_node = target_node 

447 else: 

448 curr_node = self.get_default_node() 

449 primaries = [node for node in self.get_primaries() if node != curr_node] 

450 if primaries: 

451 # Choose a primary if the cluster contains different primaries 

452 self.nodes_manager.default_node = random.choice(primaries) 

453 else: 

454 # Otherwise, choose a primary if the cluster contains different primaries 

455 replicas = [node for node in self.get_replicas() if node != curr_node] 

456 if replicas: 

457 self.nodes_manager.default_node = random.choice(replicas) 

458 

459 

460class RedisCluster(AbstractRedisCluster, RedisClusterCommands): 

461 @classmethod 

462 def from_url(cls, url, **kwargs): 

463 """ 

464 Return a Redis client object configured from the given URL 

465 

466 For example:: 

467 

468 redis://[[username]:[password]]@localhost:6379/0 

469 rediss://[[username]:[password]]@localhost:6379/0 

470 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

471 

472 Three URL schemes are supported: 

473 

474 - `redis://` creates a TCP socket connection. See more at: 

475 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

476 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

477 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

478 - ``unix://``: creates a Unix Domain Socket connection. 

479 

480 The username, password, hostname, path and all querystring values 

481 are passed through urllib.parse.unquote in order to replace any 

482 percent-encoded values with their corresponding characters. 

483 

484 There are several ways to specify a database number. The first value 

485 found will be used: 

486 

487 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

488 2. If using the redis:// or rediss:// schemes, the path argument 

489 of the url, e.g. redis://localhost/0 

490 3. A ``db`` keyword argument to this function. 

491 

492 If none of these options are specified, the default db=0 is used. 

493 

494 All querystring options are cast to their appropriate Python types. 

495 Boolean arguments can be specified with string values "True"/"False" 

496 or "Yes"/"No". Values that cannot be properly cast cause a 

497 ``ValueError`` to be raised. Once parsed, the querystring arguments 

498 and keyword arguments are passed to the ``ConnectionPool``'s 

499 class initializer. In the case of conflicting arguments, querystring 

500 arguments always win. 

501 

502 """ 

503 return cls(url=url, **kwargs) 

504 

505 @deprecated_args( 

506 args_to_warn=["read_from_replicas"], 

507 reason="Please configure the 'load_balancing_strategy' instead", 

508 version="5.3.0", 

509 ) 

510 @deprecated_args( 

511 args_to_warn=[ 

512 "cluster_error_retry_attempts", 

513 ], 

514 reason="Please configure the 'retry' object instead", 

515 version="6.0.0", 

516 ) 

517 def __init__( 

518 self, 

519 host: Optional[str] = None, 

520 port: int = 6379, 

521 startup_nodes: Optional[List["ClusterNode"]] = None, 

522 cluster_error_retry_attempts: int = 3, 

523 retry: Optional["Retry"] = None, 

524 require_full_coverage: bool = True, 

525 reinitialize_steps: int = 5, 

526 read_from_replicas: bool = False, 

527 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None, 

528 dynamic_startup_nodes: bool = True, 

529 url: Optional[str] = None, 

530 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

531 cache: Optional[CacheInterface] = None, 

532 cache_config: Optional[CacheConfig] = None, 

533 event_dispatcher: Optional[EventDispatcher] = None, 

534 **kwargs, 

535 ): 

536 """ 

537 Initialize a new RedisCluster client. 

538 

539 :param startup_nodes: 

540 List of nodes from which initial bootstrapping can be done 

541 :param host: 

542 Can be used to point to a startup node 

543 :param port: 

544 Can be used to point to a startup node 

545 :param require_full_coverage: 

546 When set to False (default value): the client will not require a 

547 full coverage of the slots. However, if not all slots are covered, 

548 and at least one node has 'cluster-require-full-coverage' set to 

549 'yes,' the server will throw a ClusterDownError for some key-based 

550 commands. See - 

551 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

552 When set to True: all slots must be covered to construct the 

553 cluster client. If not all slots are covered, RedisClusterException 

554 will be thrown. 

555 :param read_from_replicas: 

556 @deprecated - please use load_balancing_strategy instead 

557 Enable read from replicas in READONLY mode. You can read possibly 

558 stale data. 

559 When set to true, read commands will be assigned between the 

560 primary and its replications in a Round-Robin manner. 

561 :param load_balancing_strategy: 

562 Enable read from replicas in READONLY mode and defines the load balancing 

563 strategy that will be used for cluster node selection. 

564 The data read from replicas is eventually consistent with the data in primary nodes. 

565 :param dynamic_startup_nodes: 

566 Set the RedisCluster's startup nodes to all of the discovered nodes. 

567 If true (default value), the cluster's discovered nodes will be used to 

568 determine the cluster nodes-slots mapping in the next topology refresh. 

569 It will remove the initial passed startup nodes if their endpoints aren't 

570 listed in the CLUSTER SLOTS output. 

571 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

572 specific IP addresses, it is best to set it to false. 

573 :param cluster_error_retry_attempts: 

574 @deprecated - Please configure the 'retry' object instead 

575 In case 'retry' object is set - this argument is ignored! 

576 

577 Number of times to retry before raising an error when 

578 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or 

579 :class:`~.ClusterDownError` are encountered 

580 :param retry: 

581 A retry object that defines the retry strategy and the number of 

582 retries for the cluster client. 

583 In current implementation for the cluster client (starting form redis-py version 6.0.0) 

584 the retry object is not yet fully utilized, instead it is used just to determine 

585 the number of retries for the cluster client. 

586 In the future releases the retry object will be used to handle the cluster client retries! 

587 :param reinitialize_steps: 

588 Specifies the number of MOVED errors that need to occur before 

589 reinitializing the whole cluster topology. If a MOVED error occurs 

590 and the cluster does not need to be reinitialized on this current 

591 error handling, only the MOVED slot will be patched with the 

592 redirected node. 

593 To reinitialize the cluster on every MOVED error, set 

594 reinitialize_steps to 1. 

595 To avoid reinitializing the cluster on moved errors, set 

596 reinitialize_steps to 0. 

597 :param address_remap: 

598 An optional callable which, when provided with an internal network 

599 address of a node, e.g. a `(host, port)` tuple, will return the address 

600 where the node is reachable. This can be used to map the addresses at 

601 which the nodes _think_ they are, to addresses at which a client may 

602 reach them, such as when they sit behind a proxy. 

603 

604 :**kwargs: 

605 Extra arguments that will be sent into Redis instance when created 

606 (See Official redis-py doc for supported kwargs - the only limitation 

607 is that you can't provide 'retry' object as part of kwargs. 

608 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

609 Some kwargs are not supported and will raise a 

610 RedisClusterException: 

611 - db (Redis do not support database SELECT in cluster mode) 

612 """ 

613 if startup_nodes is None: 

614 startup_nodes = [] 

615 

616 if "db" in kwargs: 

617 # Argument 'db' is not possible to use in cluster mode 

618 raise RedisClusterException( 

619 "Argument 'db' is not possible to use in cluster mode" 

620 ) 

621 

622 if "retry" in kwargs: 

623 # Argument 'retry' is not possible to be used in kwargs when in cluster mode 

624 # the kwargs are set to the lower level connections to the cluster nodes 

625 # and there we provide retry configuration without retries allowed. 

626 # The retries should be handled on cluster client level. 

627 raise RedisClusterException( 

628 "The 'retry' argument cannot be used in kwargs when running in cluster mode." 

629 ) 

630 

631 # Get the startup node/s 

632 from_url = False 

633 if url is not None: 

634 from_url = True 

635 url_options = parse_url(url) 

636 if "path" in url_options: 

637 raise RedisClusterException( 

638 "RedisCluster does not currently support Unix Domain " 

639 "Socket connections" 

640 ) 

641 if "db" in url_options and url_options["db"] != 0: 

642 # Argument 'db' is not possible to use in cluster mode 

643 raise RedisClusterException( 

644 "A ``db`` querystring option can only be 0 in cluster mode" 

645 ) 

646 kwargs.update(url_options) 

647 host = kwargs.get("host") 

648 port = kwargs.get("port", port) 

649 startup_nodes.append(ClusterNode(host, port)) 

650 elif host is not None and port is not None: 

651 startup_nodes.append(ClusterNode(host, port)) 

652 elif len(startup_nodes) == 0: 

653 # No startup node was provided 

654 raise RedisClusterException( 

655 "RedisCluster requires at least one node to discover the " 

656 "cluster. Please provide one of the followings:\n" 

657 "1. host and port, for example:\n" 

658 " RedisCluster(host='localhost', port=6379)\n" 

659 "2. list of startup nodes, for example:\n" 

660 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

661 " ClusterNode('localhost', 6378)])" 

662 ) 

663 # Update the connection arguments 

664 # Whenever a new connection is established, RedisCluster's on_connect 

665 # method should be run 

666 # If the user passed on_connect function we'll save it and run it 

667 # inside the RedisCluster.on_connect() function 

668 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

669 kwargs.update({"redis_connect_func": self.on_connect}) 

670 kwargs = cleanup_kwargs(**kwargs) 

671 if retry: 

672 self.retry = retry 

673 else: 

674 self.retry = Retry( 

675 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

676 retries=cluster_error_retry_attempts, 

677 ) 

678 

679 self.encoder = Encoder( 

680 kwargs.get("encoding", "utf-8"), 

681 kwargs.get("encoding_errors", "strict"), 

682 kwargs.get("decode_responses", False), 

683 ) 

684 protocol = kwargs.get("protocol", None) 

685 if (cache_config or cache) and protocol not in [3, "3"]: 

686 raise RedisError("Client caching is only supported with RESP version 3") 

687 

688 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

689 self.node_flags = self.__class__.NODE_FLAGS.copy() 

690 self.read_from_replicas = read_from_replicas 

691 self.load_balancing_strategy = load_balancing_strategy 

692 self.reinitialize_counter = 0 

693 self.reinitialize_steps = reinitialize_steps 

694 if event_dispatcher is None: 

695 self._event_dispatcher = EventDispatcher() 

696 else: 

697 self._event_dispatcher = event_dispatcher 

698 self.startup_nodes = startup_nodes 

699 self.nodes_manager = NodesManager( 

700 startup_nodes=startup_nodes, 

701 from_url=from_url, 

702 require_full_coverage=require_full_coverage, 

703 dynamic_startup_nodes=dynamic_startup_nodes, 

704 address_remap=address_remap, 

705 cache=cache, 

706 cache_config=cache_config, 

707 event_dispatcher=self._event_dispatcher, 

708 **kwargs, 

709 ) 

710 

711 self.cluster_response_callbacks = CaseInsensitiveDict( 

712 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

713 ) 

714 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

715 

716 self.commands_parser = CommandsParser(self) 

717 self._lock = threading.RLock() 

718 

719 def __enter__(self): 

720 return self 

721 

722 def __exit__(self, exc_type, exc_value, traceback): 

723 self.close() 

724 

725 def __del__(self): 

726 try: 

727 self.close() 

728 except Exception: 

729 pass 

730 

731 def disconnect_connection_pools(self): 

732 for node in self.get_nodes(): 

733 if node.redis_connection: 

734 try: 

735 node.redis_connection.connection_pool.disconnect() 

736 except OSError: 

737 # Client was already disconnected. do nothing 

738 pass 

739 

740 def on_connect(self, connection): 

741 """ 

742 Initialize the connection, authenticate and select a database and send 

743 READONLY if it is set during object initialization. 

744 """ 

745 connection.on_connect() 

746 

747 if self.read_from_replicas or self.load_balancing_strategy: 

748 # Sending READONLY command to server to configure connection as 

749 # readonly. Since each cluster node may change its server type due 

750 # to a failover, we should establish a READONLY connection 

751 # regardless of the server type. If this is a primary connection, 

752 # READONLY would not affect executing write commands. 

753 connection.send_command("READONLY") 

754 if str_if_bytes(connection.read_response()) != "OK": 

755 raise ConnectionError("READONLY command failed") 

756 

757 if self.user_on_connect_func is not None: 

758 self.user_on_connect_func(connection) 

759 

760 def get_redis_connection(self, node: "ClusterNode") -> Redis: 

761 if not node.redis_connection: 

762 with self._lock: 

763 if not node.redis_connection: 

764 self.nodes_manager.create_redis_connections([node]) 

765 return node.redis_connection 

766 

767 def get_node(self, host=None, port=None, node_name=None): 

768 return self.nodes_manager.get_node(host, port, node_name) 

769 

770 def get_primaries(self): 

771 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

772 

773 def get_replicas(self): 

774 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

775 

776 def get_random_node(self): 

777 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

778 

779 def get_nodes(self): 

780 return list(self.nodes_manager.nodes_cache.values()) 

781 

782 def get_node_from_key(self, key, replica=False): 

783 """ 

784 Get the node that holds the key's slot. 

785 If replica set to True but the slot doesn't have any replicas, None is 

786 returned. 

787 """ 

788 slot = self.keyslot(key) 

789 slot_cache = self.nodes_manager.slots_cache.get(slot) 

790 if slot_cache is None or len(slot_cache) == 0: 

791 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

792 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

793 return None 

794 elif replica: 

795 node_idx = 1 

796 else: 

797 # primary 

798 node_idx = 0 

799 

800 return slot_cache[node_idx] 

801 

802 def get_default_node(self): 

803 """ 

804 Get the cluster's default node 

805 """ 

806 return self.nodes_manager.default_node 

807 

808 def set_default_node(self, node): 

809 """ 

810 Set the default node of the cluster. 

811 :param node: 'ClusterNode' 

812 :return True if the default node was set, else False 

813 """ 

814 if node is None or self.get_node(node_name=node.name) is None: 

815 return False 

816 self.nodes_manager.default_node = node 

817 return True 

818 

819 def set_retry(self, retry: Retry) -> None: 

820 self.retry = retry 

821 

822 def monitor(self, target_node=None): 

823 """ 

824 Returns a Monitor object for the specified target node. 

825 The default cluster node will be selected if no target node was 

826 specified. 

827 Monitor is useful for handling the MONITOR command to the redis server. 

828 next_command() method returns one command from monitor 

829 listen() method yields commands from monitor. 

830 """ 

831 if target_node is None: 

832 target_node = self.get_default_node() 

833 if target_node.redis_connection is None: 

834 raise RedisClusterException( 

835 f"Cluster Node {target_node.name} has no redis_connection" 

836 ) 

837 return target_node.redis_connection.monitor() 

838 

839 def pubsub(self, node=None, host=None, port=None, **kwargs): 

840 """ 

841 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

842 connected to the specified node 

843 """ 

844 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

845 

846 def pipeline(self, transaction=None, shard_hint=None): 

847 """ 

848 Cluster impl: 

849 Pipelines do not work in cluster mode the same way they 

850 do in normal mode. Create a clone of this object so 

851 that simulating pipelines will work correctly. Each 

852 command will be called directly when used and 

853 when calling execute() will only return the result stack. 

854 """ 

855 if shard_hint: 

856 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

857 

858 return ClusterPipeline( 

859 nodes_manager=self.nodes_manager, 

860 commands_parser=self.commands_parser, 

861 startup_nodes=self.nodes_manager.startup_nodes, 

862 result_callbacks=self.result_callbacks, 

863 cluster_response_callbacks=self.cluster_response_callbacks, 

864 read_from_replicas=self.read_from_replicas, 

865 load_balancing_strategy=self.load_balancing_strategy, 

866 reinitialize_steps=self.reinitialize_steps, 

867 retry=self.retry, 

868 lock=self._lock, 

869 transaction=transaction, 

870 ) 

871 

872 def lock( 

873 self, 

874 name, 

875 timeout=None, 

876 sleep=0.1, 

877 blocking=True, 

878 blocking_timeout=None, 

879 lock_class=None, 

880 thread_local=True, 

881 raise_on_release_error: bool = True, 

882 ): 

883 """ 

884 Return a new Lock object using key ``name`` that mimics 

885 the behavior of threading.Lock. 

886 

887 If specified, ``timeout`` indicates a maximum life for the lock. 

888 By default, it will remain locked until release() is called. 

889 

890 ``sleep`` indicates the amount of time to sleep per loop iteration 

891 when the lock is in blocking mode and another client is currently 

892 holding the lock. 

893 

894 ``blocking`` indicates whether calling ``acquire`` should block until 

895 the lock has been acquired or to fail immediately, causing ``acquire`` 

896 to return False and the lock not being acquired. Defaults to True. 

897 Note this value can be overridden by passing a ``blocking`` 

898 argument to ``acquire``. 

899 

900 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

901 spend trying to acquire the lock. A value of ``None`` indicates 

902 continue trying forever. ``blocking_timeout`` can be specified as a 

903 float or integer, both representing the number of seconds to wait. 

904 

905 ``lock_class`` forces the specified lock implementation. Note that as 

906 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

907 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

908 you have created your own custom lock class. 

909 

910 ``thread_local`` indicates whether the lock token is placed in 

911 thread-local storage. By default, the token is placed in thread local 

912 storage so that a thread only sees its token, not a token set by 

913 another thread. Consider the following timeline: 

914 

915 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

916 thread-1 sets the token to "abc" 

917 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

918 Lock instance. 

919 time: 5, thread-1 has not yet completed. redis expires the lock 

920 key. 

921 time: 5, thread-2 acquired `my-lock` now that it's available. 

922 thread-2 sets the token to "xyz" 

923 time: 6, thread-1 finishes its work and calls release(). if the 

924 token is *not* stored in thread local storage, then 

925 thread-1 would see the token value as "xyz" and would be 

926 able to successfully release the thread-2's lock. 

927 

928 ``raise_on_release_error`` indicates whether to raise an exception when 

929 the lock is no longer owned when exiting the context manager. By default, 

930 this is True, meaning an exception will be raised. If False, the warning 

931 will be logged and the exception will be suppressed. 

932 

933 In some use cases it's necessary to disable thread local storage. For 

934 example, if you have code where one thread acquires a lock and passes 

935 that lock instance to a worker thread to release later. If thread 

936 local storage isn't disabled in this case, the worker thread won't see 

937 the token set by the thread that acquired the lock. Our assumption 

938 is that these cases aren't common and as such default to using 

939 thread local storage.""" 

940 if lock_class is None: 

941 lock_class = Lock 

942 return lock_class( 

943 self, 

944 name, 

945 timeout=timeout, 

946 sleep=sleep, 

947 blocking=blocking, 

948 blocking_timeout=blocking_timeout, 

949 thread_local=thread_local, 

950 raise_on_release_error=raise_on_release_error, 

951 ) 

952 

953 def set_response_callback(self, command, callback): 

954 """Set a custom Response Callback""" 

955 self.cluster_response_callbacks[command] = callback 

956 

957 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: 

958 # Determine which nodes should be executed the command on. 

959 # Returns a list of target nodes. 

960 command = args[0].upper() 

961 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

962 command = f"{args[0]} {args[1]}".upper() 

963 

964 nodes_flag = kwargs.pop("nodes_flag", None) 

965 if nodes_flag is not None: 

966 # nodes flag passed by the user 

967 command_flag = nodes_flag 

968 else: 

969 # get the nodes group for this command if it was predefined 

970 command_flag = self.command_flags.get(command) 

971 if command_flag == self.__class__.RANDOM: 

972 # return a random node 

973 return [self.get_random_node()] 

974 elif command_flag == self.__class__.PRIMARIES: 

975 # return all primaries 

976 return self.get_primaries() 

977 elif command_flag == self.__class__.REPLICAS: 

978 # return all replicas 

979 return self.get_replicas() 

980 elif command_flag == self.__class__.ALL_NODES: 

981 # return all nodes 

982 return self.get_nodes() 

983 elif command_flag == self.__class__.DEFAULT_NODE: 

984 # return the cluster's default node 

985 return [self.nodes_manager.default_node] 

986 elif command in self.__class__.SEARCH_COMMANDS[0]: 

987 return [self.nodes_manager.default_node] 

988 else: 

989 # get the node that holds the key's slot 

990 slot = self.determine_slot(*args) 

991 node = self.nodes_manager.get_node_from_slot( 

992 slot, 

993 self.read_from_replicas and command in READ_COMMANDS, 

994 self.load_balancing_strategy if command in READ_COMMANDS else None, 

995 ) 

996 return [node] 

997 

998 def _should_reinitialized(self): 

999 # To reinitialize the cluster on every MOVED error, 

1000 # set reinitialize_steps to 1. 

1001 # To avoid reinitializing the cluster on moved errors, set 

1002 # reinitialize_steps to 0. 

1003 if self.reinitialize_steps == 0: 

1004 return False 

1005 else: 

1006 return self.reinitialize_counter % self.reinitialize_steps == 0 

1007 

1008 def keyslot(self, key): 

1009 """ 

1010 Calculate keyslot for a given key. 

1011 See Keys distribution model in https://redis.io/topics/cluster-spec 

1012 """ 

1013 k = self.encoder.encode(key) 

1014 return key_slot(k) 

1015 

1016 def _get_command_keys(self, *args): 

1017 """ 

1018 Get the keys in the command. If the command has no keys in in, None is 

1019 returned. 

1020 

1021 NOTE: Due to a bug in redis<7.0, this function does not work properly 

1022 for EVAL or EVALSHA when the `numkeys` arg is 0. 

1023 - issue: https://github.com/redis/redis/issues/9493 

1024 - fix: https://github.com/redis/redis/pull/9733 

1025 

1026 So, don't use this function with EVAL or EVALSHA. 

1027 """ 

1028 redis_conn = self.get_default_node().redis_connection 

1029 return self.commands_parser.get_keys(redis_conn, *args) 

1030 

1031 def determine_slot(self, *args) -> int: 

1032 """ 

1033 Figure out what slot to use based on args. 

1034 

1035 Raises a RedisClusterException if there's a missing key and we can't 

1036 determine what slots to map the command to; or, if the keys don't 

1037 all map to the same key slot. 

1038 """ 

1039 command = args[0] 

1040 if self.command_flags.get(command) == SLOT_ID: 

1041 # The command contains the slot ID 

1042 return args[1] 

1043 

1044 # Get the keys in the command 

1045 

1046 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

1047 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

1048 # where `self._get_command_keys()` fails anyway. So, we special case 

1049 # EVAL/EVALSHA. 

1050 if command.upper() in ("EVAL", "EVALSHA"): 

1051 # command syntax: EVAL "script body" num_keys ... 

1052 if len(args) <= 2: 

1053 raise RedisClusterException(f"Invalid args in command: {args}") 

1054 num_actual_keys = int(args[2]) 

1055 eval_keys = args[3 : 3 + num_actual_keys] 

1056 # if there are 0 keys, that means the script can be run on any node 

1057 # so we can just return a random slot 

1058 if len(eval_keys) == 0: 

1059 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1060 keys = eval_keys 

1061 else: 

1062 keys = self._get_command_keys(*args) 

1063 if keys is None or len(keys) == 0: 

1064 # FCALL can call a function with 0 keys, that means the function 

1065 # can be run on any node so we can just return a random slot 

1066 if command.upper() in ("FCALL", "FCALL_RO"): 

1067 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1068 raise RedisClusterException( 

1069 "No way to dispatch this command to Redis Cluster. " 

1070 "Missing key.\nYou can execute the command by specifying " 

1071 f"target nodes.\nCommand: {args}" 

1072 ) 

1073 

1074 # single key command 

1075 if len(keys) == 1: 

1076 return self.keyslot(keys[0]) 

1077 

1078 # multi-key command; we need to make sure all keys are mapped to 

1079 # the same slot 

1080 slots = {self.keyslot(key) for key in keys} 

1081 if len(slots) != 1: 

1082 raise RedisClusterException( 

1083 f"{command} - all keys must map to the same key slot" 

1084 ) 

1085 

1086 return slots.pop() 

1087 

1088 def get_encoder(self): 

1089 """ 

1090 Get the connections' encoder 

1091 """ 

1092 return self.encoder 

1093 

1094 def get_connection_kwargs(self): 

1095 """ 

1096 Get the connections' key-word arguments 

1097 """ 

1098 return self.nodes_manager.connection_kwargs 

1099 

1100 def _is_nodes_flag(self, target_nodes): 

1101 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1102 

1103 def _parse_target_nodes(self, target_nodes): 

1104 if isinstance(target_nodes, list): 

1105 nodes = target_nodes 

1106 elif isinstance(target_nodes, ClusterNode): 

1107 # Supports passing a single ClusterNode as a variable 

1108 nodes = [target_nodes] 

1109 elif isinstance(target_nodes, dict): 

1110 # Supports dictionaries of the format {node_name: node}. 

1111 # It enables to execute commands with multi nodes as follows: 

1112 # rc.cluster_save_config(rc.get_primaries()) 

1113 nodes = target_nodes.values() 

1114 else: 

1115 raise TypeError( 

1116 "target_nodes type can be one of the following: " 

1117 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1118 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1119 f"The passed type is {type(target_nodes)}" 

1120 ) 

1121 return nodes 

1122 

1123 def execute_command(self, *args, **kwargs): 

1124 return self._internal_execute_command(*args, **kwargs) 

1125 

1126 def _internal_execute_command(self, *args, **kwargs): 

1127 """ 

1128 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1129 

1130 It will try the number of times specified by the retries property from 

1131 config option "self.retry" which defaults to 3 unless manually 

1132 configured. 

1133 

1134 If it reaches the number of times, the command will raise the exception 

1135 

1136 Key argument :target_nodes: can be passed with the following types: 

1137 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1138 ClusterNode 

1139 list<ClusterNode> 

1140 dict<Any, ClusterNode> 

1141 """ 

1142 target_nodes_specified = False 

1143 is_default_node = False 

1144 target_nodes = None 

1145 passed_targets = kwargs.pop("target_nodes", None) 

1146 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1147 target_nodes = self._parse_target_nodes(passed_targets) 

1148 target_nodes_specified = True 

1149 # If an error that allows retrying was thrown, the nodes and slots 

1150 # cache were reinitialized. We will retry executing the command with 

1151 # the updated cluster setup only when the target nodes can be 

1152 # determined again with the new cache tables. Therefore, when target 

1153 # nodes were passed to this function, we cannot retry the command 

1154 # execution since the nodes may not be valid anymore after the tables 

1155 # were reinitialized. So in case of passed target nodes, 

1156 # retry_attempts will be set to 0. 

1157 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries() 

1158 # Add one for the first execution 

1159 execute_attempts = 1 + retry_attempts 

1160 for _ in range(execute_attempts): 

1161 try: 

1162 res = {} 

1163 if not target_nodes_specified: 

1164 # Determine the nodes to execute the command on 

1165 target_nodes = self._determine_nodes( 

1166 *args, **kwargs, nodes_flag=passed_targets 

1167 ) 

1168 if not target_nodes: 

1169 raise RedisClusterException( 

1170 f"No targets were found to execute {args} command on" 

1171 ) 

1172 if ( 

1173 len(target_nodes) == 1 

1174 and target_nodes[0] == self.get_default_node() 

1175 ): 

1176 is_default_node = True 

1177 for node in target_nodes: 

1178 res[node.name] = self._execute_command(node, *args, **kwargs) 

1179 # Return the processed result 

1180 return self._process_result(args[0], res, **kwargs) 

1181 except Exception as e: 

1182 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1183 if is_default_node: 

1184 # Replace the default cluster node 

1185 self.replace_default_node() 

1186 # The nodes and slots cache were reinitialized. 

1187 # Try again with the new cluster setup. 

1188 retry_attempts -= 1 

1189 continue 

1190 else: 

1191 # raise the exception 

1192 raise e 

1193 

1194 def _execute_command(self, target_node, *args, **kwargs): 

1195 """ 

1196 Send a command to a node in the cluster 

1197 """ 

1198 command = args[0] 

1199 redis_node = None 

1200 connection = None 

1201 redirect_addr = None 

1202 asking = False 

1203 moved = False 

1204 ttl = int(self.RedisClusterRequestTTL) 

1205 

1206 while ttl > 0: 

1207 ttl -= 1 

1208 try: 

1209 if asking: 

1210 target_node = self.get_node(node_name=redirect_addr) 

1211 elif moved: 

1212 # MOVED occurred and the slots cache was updated, 

1213 # refresh the target node 

1214 slot = self.determine_slot(*args) 

1215 target_node = self.nodes_manager.get_node_from_slot( 

1216 slot, 

1217 self.read_from_replicas and command in READ_COMMANDS, 

1218 self.load_balancing_strategy 

1219 if command in READ_COMMANDS 

1220 else None, 

1221 ) 

1222 moved = False 

1223 

1224 redis_node = self.get_redis_connection(target_node) 

1225 connection = get_connection(redis_node) 

1226 if asking: 

1227 connection.send_command("ASKING") 

1228 redis_node.parse_response(connection, "ASKING", **kwargs) 

1229 asking = False 

1230 connection.send_command(*args, **kwargs) 

1231 response = redis_node.parse_response(connection, command, **kwargs) 

1232 

1233 # Remove keys entry, it needs only for cache. 

1234 kwargs.pop("keys", None) 

1235 

1236 if command in self.cluster_response_callbacks: 

1237 response = self.cluster_response_callbacks[command]( 

1238 response, **kwargs 

1239 ) 

1240 return response 

1241 except AuthenticationError: 

1242 raise 

1243 except MaxConnectionsError: 

1244 # MaxConnectionsError indicates client-side resource exhaustion 

1245 # (too many connections in the pool), not a node failure. 

1246 # Don't treat this as a node failure - just re-raise the error 

1247 # without reinitializing the cluster. 

1248 raise 

1249 except (ConnectionError, TimeoutError) as e: 

1250 # ConnectionError can also be raised if we couldn't get a 

1251 # connection from the pool before timing out, so check that 

1252 # this is an actual connection before attempting to disconnect. 

1253 if connection is not None: 

1254 connection.disconnect() 

1255 

1256 # Remove the failed node from the startup nodes before we try 

1257 # to reinitialize the cluster 

1258 self.nodes_manager.startup_nodes.pop(target_node.name, None) 

1259 # Reset the cluster node's connection 

1260 target_node.redis_connection = None 

1261 self.nodes_manager.initialize() 

1262 raise e 

1263 except MovedError as e: 

1264 # First, we will try to patch the slots/nodes cache with the 

1265 # redirected node output and try again. If MovedError exceeds 

1266 # 'reinitialize_steps' number of times, we will force 

1267 # reinitializing the tables, and then try again. 

1268 # 'reinitialize_steps' counter will increase faster when 

1269 # the same client object is shared between multiple threads. To 

1270 # reduce the frequency you can set this variable in the 

1271 # RedisCluster constructor. 

1272 self.reinitialize_counter += 1 

1273 if self._should_reinitialized(): 

1274 self.nodes_manager.initialize() 

1275 # Reset the counter 

1276 self.reinitialize_counter = 0 

1277 else: 

1278 self.nodes_manager.update_moved_exception(e) 

1279 moved = True 

1280 except TryAgainError: 

1281 if ttl < self.RedisClusterRequestTTL / 2: 

1282 time.sleep(0.05) 

1283 except AskError as e: 

1284 redirect_addr = get_node_name(host=e.host, port=e.port) 

1285 asking = True 

1286 except (ClusterDownError, SlotNotCoveredError): 

1287 # ClusterDownError can occur during a failover and to get 

1288 # self-healed, we will try to reinitialize the cluster layout 

1289 # and retry executing the command 

1290 

1291 # SlotNotCoveredError can occur when the cluster is not fully 

1292 # initialized or can be temporary issue. 

1293 # We will try to reinitialize the cluster topology 

1294 # and retry executing the command 

1295 

1296 time.sleep(0.25) 

1297 self.nodes_manager.initialize() 

1298 raise 

1299 except ResponseError: 

1300 raise 

1301 except Exception as e: 

1302 if connection: 

1303 connection.disconnect() 

1304 raise e 

1305 finally: 

1306 if connection is not None: 

1307 redis_node.connection_pool.release(connection) 

1308 

1309 raise ClusterError("TTL exhausted.") 

1310 

1311 def close(self) -> None: 

1312 try: 

1313 with self._lock: 

1314 if self.nodes_manager: 

1315 self.nodes_manager.close() 

1316 except AttributeError: 

1317 # RedisCluster's __init__ can fail before nodes_manager is set 

1318 pass 

1319 

1320 def _process_result(self, command, res, **kwargs): 

1321 """ 

1322 Process the result of the executed command. 

1323 The function would return a dict or a single value. 

1324 

1325 :type command: str 

1326 :type res: dict 

1327 

1328 `res` should be in the following format: 

1329 Dict<node_name, command_result> 

1330 """ 

1331 if command in self.result_callbacks: 

1332 return self.result_callbacks[command](command, res, **kwargs) 

1333 elif len(res) == 1: 

1334 # When we execute the command on a single node, we can 

1335 # remove the dictionary and return a single response 

1336 return list(res.values())[0] 

1337 else: 

1338 return res 

1339 

1340 def load_external_module(self, funcname, func): 

1341 """ 

1342 This function can be used to add externally defined redis modules, 

1343 and their namespaces to the redis client. 

1344 

1345 ``funcname`` - A string containing the name of the function to create 

1346 ``func`` - The function, being added to this class. 

1347 """ 

1348 setattr(self, funcname, func) 

1349 

1350 def transaction(self, func, *watches, **kwargs): 

1351 """ 

1352 Convenience method for executing the callable `func` as a transaction 

1353 while watching all keys specified in `watches`. The 'func' callable 

1354 should expect a single argument which is a Pipeline object. 

1355 """ 

1356 shard_hint = kwargs.pop("shard_hint", None) 

1357 value_from_callable = kwargs.pop("value_from_callable", False) 

1358 watch_delay = kwargs.pop("watch_delay", None) 

1359 with self.pipeline(True, shard_hint) as pipe: 

1360 while True: 

1361 try: 

1362 if watches: 

1363 pipe.watch(*watches) 

1364 func_value = func(pipe) 

1365 exec_value = pipe.execute() 

1366 return func_value if value_from_callable else exec_value 

1367 except WatchError: 

1368 if watch_delay is not None and watch_delay > 0: 

1369 time.sleep(watch_delay) 

1370 continue 

1371 

1372 

1373class ClusterNode: 

1374 def __init__(self, host, port, server_type=None, redis_connection=None): 

1375 if host == "localhost": 

1376 host = socket.gethostbyname(host) 

1377 

1378 self.host = host 

1379 self.port = port 

1380 self.name = get_node_name(host, port) 

1381 self.server_type = server_type 

1382 self.redis_connection = redis_connection 

1383 

1384 def __repr__(self): 

1385 return ( 

1386 f"[host={self.host}," 

1387 f"port={self.port}," 

1388 f"name={self.name}," 

1389 f"server_type={self.server_type}," 

1390 f"redis_connection={self.redis_connection}]" 

1391 ) 

1392 

1393 def __eq__(self, obj): 

1394 return isinstance(obj, ClusterNode) and obj.name == self.name 

1395 

1396 def __del__(self): 

1397 try: 

1398 if self.redis_connection is not None: 

1399 self.redis_connection.close() 

1400 except Exception: 

1401 # Ignore errors when closing the connection 

1402 pass 

1403 

1404 

1405class LoadBalancingStrategy(Enum): 

1406 ROUND_ROBIN = "round_robin" 

1407 ROUND_ROBIN_REPLICAS = "round_robin_replicas" 

1408 RANDOM_REPLICA = "random_replica" 

1409 

1410 

1411class LoadBalancer: 

1412 """ 

1413 Round-Robin Load Balancing 

1414 """ 

1415 

1416 def __init__(self, start_index: int = 0) -> None: 

1417 self.primary_to_idx = {} 

1418 self.start_index = start_index 

1419 

1420 def get_server_index( 

1421 self, 

1422 primary: str, 

1423 list_size: int, 

1424 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN, 

1425 ) -> int: 

1426 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA: 

1427 return self._get_random_replica_index(list_size) 

1428 else: 

1429 return self._get_round_robin_index( 

1430 primary, 

1431 list_size, 

1432 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, 

1433 ) 

1434 

1435 def reset(self) -> None: 

1436 self.primary_to_idx.clear() 

1437 

1438 def _get_random_replica_index(self, list_size: int) -> int: 

1439 return random.randint(1, list_size - 1) 

1440 

1441 def _get_round_robin_index( 

1442 self, primary: str, list_size: int, replicas_only: bool 

1443 ) -> int: 

1444 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

1445 if replicas_only and server_index == 0: 

1446 # skip the primary node index 

1447 server_index = 1 

1448 # Update the index for the next round 

1449 self.primary_to_idx[primary] = (server_index + 1) % list_size 

1450 return server_index 

1451 

1452 

1453class NodesManager: 

1454 def __init__( 

1455 self, 

1456 startup_nodes, 

1457 from_url=False, 

1458 require_full_coverage=False, 

1459 lock=None, 

1460 dynamic_startup_nodes=True, 

1461 connection_pool_class=ConnectionPool, 

1462 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

1463 cache: Optional[CacheInterface] = None, 

1464 cache_config: Optional[CacheConfig] = None, 

1465 cache_factory: Optional[CacheFactoryInterface] = None, 

1466 event_dispatcher: Optional[EventDispatcher] = None, 

1467 **kwargs, 

1468 ): 

1469 self.nodes_cache: Dict[str, Redis] = {} 

1470 self.slots_cache = {} 

1471 self.startup_nodes = {} 

1472 self.default_node = None 

1473 self.populate_startup_nodes(startup_nodes) 

1474 self.from_url = from_url 

1475 self._require_full_coverage = require_full_coverage 

1476 self._dynamic_startup_nodes = dynamic_startup_nodes 

1477 self.connection_pool_class = connection_pool_class 

1478 self.address_remap = address_remap 

1479 self._cache = cache 

1480 self._cache_config = cache_config 

1481 self._cache_factory = cache_factory 

1482 self._moved_exception = None 

1483 self.connection_kwargs = kwargs 

1484 self.read_load_balancer = LoadBalancer() 

1485 if lock is None: 

1486 lock = threading.RLock() 

1487 self._lock = lock 

1488 if event_dispatcher is None: 

1489 self._event_dispatcher = EventDispatcher() 

1490 else: 

1491 self._event_dispatcher = event_dispatcher 

1492 self._credential_provider = self.connection_kwargs.get( 

1493 "credential_provider", None 

1494 ) 

1495 self.initialize() 

1496 

1497 def get_node(self, host=None, port=None, node_name=None): 

1498 """ 

1499 Get the requested node from the cluster's nodes. 

1500 nodes. 

1501 :return: ClusterNode if the node exists, else None 

1502 """ 

1503 if host and port: 

1504 # the user passed host and port 

1505 if host == "localhost": 

1506 host = socket.gethostbyname(host) 

1507 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1508 elif node_name: 

1509 return self.nodes_cache.get(node_name) 

1510 else: 

1511 return None 

1512 

1513 def update_moved_exception(self, exception): 

1514 self._moved_exception = exception 

1515 

1516 def _update_moved_slots(self): 

1517 """ 

1518 Update the slot's node with the redirected one 

1519 """ 

1520 e = self._moved_exception 

1521 redirected_node = self.get_node(host=e.host, port=e.port) 

1522 if redirected_node is not None: 

1523 # The node already exists 

1524 if redirected_node.server_type is not PRIMARY: 

1525 # Update the node's server type 

1526 redirected_node.server_type = PRIMARY 

1527 else: 

1528 # This is a new node, we will add it to the nodes cache 

1529 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

1530 self.nodes_cache[redirected_node.name] = redirected_node 

1531 if redirected_node in self.slots_cache[e.slot_id]: 

1532 # The MOVED error resulted from a failover, and the new slot owner 

1533 # had previously been a replica. 

1534 old_primary = self.slots_cache[e.slot_id][0] 

1535 # Update the old primary to be a replica and add it to the end of 

1536 # the slot's node list 

1537 old_primary.server_type = REPLICA 

1538 self.slots_cache[e.slot_id].append(old_primary) 

1539 # Remove the old replica, which is now a primary, from the slot's 

1540 # node list 

1541 self.slots_cache[e.slot_id].remove(redirected_node) 

1542 # Override the old primary with the new one 

1543 self.slots_cache[e.slot_id][0] = redirected_node 

1544 if self.default_node == old_primary: 

1545 # Update the default node with the new primary 

1546 self.default_node = redirected_node 

1547 else: 

1548 # The new slot owner is a new server, or a server from a different 

1549 # shard. We need to remove all current nodes from the slot's list 

1550 # (including replications) and add just the new node. 

1551 self.slots_cache[e.slot_id] = [redirected_node] 

1552 # Reset moved_exception 

1553 self._moved_exception = None 

1554 

1555 @deprecated_args( 

1556 args_to_warn=["server_type"], 

1557 reason=( 

1558 "In case you need select some load balancing strategy " 

1559 "that will use replicas, please set it through 'load_balancing_strategy'" 

1560 ), 

1561 version="5.3.0", 

1562 ) 

1563 def get_node_from_slot( 

1564 self, 

1565 slot, 

1566 read_from_replicas=False, 

1567 load_balancing_strategy=None, 

1568 server_type=None, 

1569 ) -> ClusterNode: 

1570 """ 

1571 Gets a node that servers this hash slot 

1572 """ 

1573 if self._moved_exception: 

1574 with self._lock: 

1575 if self._moved_exception: 

1576 self._update_moved_slots() 

1577 

1578 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

1579 raise SlotNotCoveredError( 

1580 f'Slot "{slot}" not covered by the cluster. ' 

1581 f'"require_full_coverage={self._require_full_coverage}"' 

1582 ) 

1583 

1584 if read_from_replicas is True and load_balancing_strategy is None: 

1585 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN 

1586 

1587 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: 

1588 # get the server index using the strategy defined in load_balancing_strategy 

1589 primary_name = self.slots_cache[slot][0].name 

1590 node_idx = self.read_load_balancer.get_server_index( 

1591 primary_name, len(self.slots_cache[slot]), load_balancing_strategy 

1592 ) 

1593 elif ( 

1594 server_type is None 

1595 or server_type == PRIMARY 

1596 or len(self.slots_cache[slot]) == 1 

1597 ): 

1598 # return a primary 

1599 node_idx = 0 

1600 else: 

1601 # return a replica 

1602 # randomly choose one of the replicas 

1603 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

1604 

1605 return self.slots_cache[slot][node_idx] 

1606 

1607 def get_nodes_by_server_type(self, server_type): 

1608 """ 

1609 Get all nodes with the specified server type 

1610 :param server_type: 'primary' or 'replica' 

1611 :return: list of ClusterNode 

1612 """ 

1613 return [ 

1614 node 

1615 for node in self.nodes_cache.values() 

1616 if node.server_type == server_type 

1617 ] 

1618 

1619 def populate_startup_nodes(self, nodes): 

1620 """ 

1621 Populate all startup nodes and filters out any duplicates 

1622 """ 

1623 for n in nodes: 

1624 self.startup_nodes[n.name] = n 

1625 

1626 def check_slots_coverage(self, slots_cache): 

1627 # Validate if all slots are covered or if we should try next 

1628 # startup node 

1629 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

1630 if i not in slots_cache: 

1631 return False 

1632 return True 

1633 

1634 def create_redis_connections(self, nodes): 

1635 """ 

1636 This function will create a redis connection to all nodes in :nodes: 

1637 """ 

1638 connection_pools = [] 

1639 for node in nodes: 

1640 if node.redis_connection is None: 

1641 node.redis_connection = self.create_redis_node( 

1642 host=node.host, port=node.port, **self.connection_kwargs 

1643 ) 

1644 connection_pools.append(node.redis_connection.connection_pool) 

1645 

1646 self._event_dispatcher.dispatch( 

1647 AfterPooledConnectionsInstantiationEvent( 

1648 connection_pools, ClientType.SYNC, self._credential_provider 

1649 ) 

1650 ) 

1651 

1652 def create_redis_node(self, host, port, **kwargs): 

1653 # We are configuring the connection pool not to retry 

1654 # connections on lower level clients to avoid retrying 

1655 # connections to nodes that are not reachable 

1656 # and to avoid blocking the connection pool. 

1657 # The only error that will have some handling in the lower 

1658 # level clients is ConnectionError which will trigger disconnection 

1659 # of the socket. 

1660 # The retries will be handled on cluster client level 

1661 # where we will have proper handling of the cluster topology 

1662 node_retry_config = Retry( 

1663 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) 

1664 ) 

1665 

1666 if self.from_url: 

1667 # Create a redis node with a costumed connection pool 

1668 kwargs.update({"host": host}) 

1669 kwargs.update({"port": port}) 

1670 kwargs.update({"cache": self._cache}) 

1671 kwargs.update({"retry": node_retry_config}) 

1672 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

1673 else: 

1674 r = Redis( 

1675 host=host, 

1676 port=port, 

1677 cache=self._cache, 

1678 retry=node_retry_config, 

1679 **kwargs, 

1680 ) 

1681 return r 

1682 

1683 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

1684 node_name = get_node_name(host, port) 

1685 # check if we already have this node in the tmp_nodes_cache 

1686 target_node = tmp_nodes_cache.get(node_name) 

1687 if target_node is None: 

1688 # before creating a new cluster node, check if the cluster node already 

1689 # exists in the current nodes cache and has a valid connection so we can 

1690 # reuse it 

1691 target_node = self.nodes_cache.get(node_name) 

1692 if target_node is None or target_node.redis_connection is None: 

1693 # create new cluster node for this cluster 

1694 target_node = ClusterNode(host, port, role) 

1695 if target_node.server_type != role: 

1696 target_node.server_type = role 

1697 # add this node to the nodes cache 

1698 tmp_nodes_cache[target_node.name] = target_node 

1699 

1700 return target_node 

1701 

1702 def initialize(self): 

1703 """ 

1704 Initializes the nodes cache, slots cache and redis connections. 

1705 :startup_nodes: 

1706 Responsible for discovering other nodes in the cluster 

1707 """ 

1708 self.reset() 

1709 tmp_nodes_cache = {} 

1710 tmp_slots = {} 

1711 disagreements = [] 

1712 startup_nodes_reachable = False 

1713 fully_covered = False 

1714 kwargs = self.connection_kwargs 

1715 exception = None 

1716 # Convert to tuple to prevent RuntimeError if self.startup_nodes 

1717 # is modified during iteration 

1718 for startup_node in tuple(self.startup_nodes.values()): 

1719 try: 

1720 if startup_node.redis_connection: 

1721 r = startup_node.redis_connection 

1722 else: 

1723 # Create a new Redis connection 

1724 r = self.create_redis_node( 

1725 startup_node.host, startup_node.port, **kwargs 

1726 ) 

1727 self.startup_nodes[startup_node.name].redis_connection = r 

1728 # Make sure cluster mode is enabled on this node 

1729 try: 

1730 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

1731 r.connection_pool.disconnect() 

1732 except ResponseError: 

1733 raise RedisClusterException( 

1734 "Cluster mode is not enabled on this node" 

1735 ) 

1736 startup_nodes_reachable = True 

1737 except Exception as e: 

1738 # Try the next startup node. 

1739 # The exception is saved and raised only if we have no more nodes. 

1740 exception = e 

1741 continue 

1742 

1743 # CLUSTER SLOTS command results in the following output: 

1744 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

1745 # where each node contains the following list: [IP, port, node_id] 

1746 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

1747 # primary node of the first slot section. 

1748 # If there's only one server in the cluster, its ``host`` is '' 

1749 # Fix it to the host in startup_nodes 

1750 if ( 

1751 len(cluster_slots) == 1 

1752 and len(cluster_slots[0][2][0]) == 0 

1753 and len(self.startup_nodes) == 1 

1754 ): 

1755 cluster_slots[0][2][0] = startup_node.host 

1756 

1757 for slot in cluster_slots: 

1758 primary_node = slot[2] 

1759 host = str_if_bytes(primary_node[0]) 

1760 if host == "": 

1761 host = startup_node.host 

1762 port = int(primary_node[1]) 

1763 host, port = self.remap_host_port(host, port) 

1764 

1765 nodes_for_slot = [] 

1766 

1767 target_node = self._get_or_create_cluster_node( 

1768 host, port, PRIMARY, tmp_nodes_cache 

1769 ) 

1770 nodes_for_slot.append(target_node) 

1771 

1772 replica_nodes = slot[3:] 

1773 for replica_node in replica_nodes: 

1774 host = str_if_bytes(replica_node[0]) 

1775 port = int(replica_node[1]) 

1776 host, port = self.remap_host_port(host, port) 

1777 target_replica_node = self._get_or_create_cluster_node( 

1778 host, port, REPLICA, tmp_nodes_cache 

1779 ) 

1780 nodes_for_slot.append(target_replica_node) 

1781 

1782 for i in range(int(slot[0]), int(slot[1]) + 1): 

1783 if i not in tmp_slots: 

1784 tmp_slots[i] = nodes_for_slot 

1785 else: 

1786 # Validate that 2 nodes want to use the same slot cache 

1787 # setup 

1788 tmp_slot = tmp_slots[i][0] 

1789 if tmp_slot.name != target_node.name: 

1790 disagreements.append( 

1791 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

1792 ) 

1793 

1794 if len(disagreements) > 5: 

1795 raise RedisClusterException( 

1796 f"startup_nodes could not agree on a valid " 

1797 f"slots cache: {', '.join(disagreements)}" 

1798 ) 

1799 

1800 fully_covered = self.check_slots_coverage(tmp_slots) 

1801 if fully_covered: 

1802 # Don't need to continue to the next startup node if all 

1803 # slots are covered 

1804 break 

1805 

1806 if not startup_nodes_reachable: 

1807 raise RedisClusterException( 

1808 f"Redis Cluster cannot be connected. Please provide at least " 

1809 f"one reachable node: {str(exception)}" 

1810 ) from exception 

1811 

1812 if self._cache is None and self._cache_config is not None: 

1813 if self._cache_factory is None: 

1814 self._cache = CacheFactory(self._cache_config).get_cache() 

1815 else: 

1816 self._cache = self._cache_factory.get_cache() 

1817 

1818 # Create Redis connections to all nodes 

1819 self.create_redis_connections(list(tmp_nodes_cache.values())) 

1820 

1821 # Check if the slots are not fully covered 

1822 if not fully_covered and self._require_full_coverage: 

1823 # Despite the requirement that the slots be covered, there 

1824 # isn't a full coverage 

1825 raise RedisClusterException( 

1826 f"All slots are not covered after query all startup_nodes. " 

1827 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

1828 f"covered..." 

1829 ) 

1830 

1831 # Set the tmp variables to the real variables 

1832 self.nodes_cache = tmp_nodes_cache 

1833 self.slots_cache = tmp_slots 

1834 # Set the default node 

1835 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

1836 if self._dynamic_startup_nodes: 

1837 # Populate the startup nodes with all discovered nodes 

1838 self.startup_nodes = tmp_nodes_cache 

1839 # If initialize was called after a MovedError, clear it 

1840 self._moved_exception = None 

1841 

1842 def close(self) -> None: 

1843 self.default_node = None 

1844 for node in self.nodes_cache.values(): 

1845 if node.redis_connection: 

1846 node.redis_connection.close() 

1847 

1848 def reset(self): 

1849 try: 

1850 self.read_load_balancer.reset() 

1851 except TypeError: 

1852 # The read_load_balancer is None, do nothing 

1853 pass 

1854 

1855 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

1856 """ 

1857 Remap the host and port returned from the cluster to a different 

1858 internal value. Useful if the client is not connecting directly 

1859 to the cluster. 

1860 """ 

1861 if self.address_remap: 

1862 return self.address_remap((host, port)) 

1863 return host, port 

1864 

1865 def find_connection_owner(self, connection: Connection) -> Optional[Redis]: 

1866 node_name = get_node_name(connection.host, connection.port) 

1867 for node in tuple(self.nodes_cache.values()): 

1868 if node.redis_connection: 

1869 conn_args = node.redis_connection.connection_pool.connection_kwargs 

1870 if node_name == get_node_name( 

1871 conn_args.get("host"), conn_args.get("port") 

1872 ): 

1873 return node 

1874 

1875 

1876class ClusterPubSub(PubSub): 

1877 """ 

1878 Wrapper for PubSub class. 

1879 

1880 IMPORTANT: before using ClusterPubSub, read about the known limitations 

1881 with pubsub in Cluster mode and learn how to workaround them: 

1882 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html 

1883 """ 

1884 

1885 def __init__( 

1886 self, 

1887 redis_cluster, 

1888 node=None, 

1889 host=None, 

1890 port=None, 

1891 push_handler_func=None, 

1892 event_dispatcher: Optional["EventDispatcher"] = None, 

1893 **kwargs, 

1894 ): 

1895 """ 

1896 When a pubsub instance is created without specifying a node, a single 

1897 node will be transparently chosen for the pubsub connection on the 

1898 first command execution. The node will be determined by: 

1899 1. Hashing the channel name in the request to find its keyslot 

1900 2. Selecting a node that handles the keyslot: If read_from_replicas is 

1901 set to true or load_balancing_strategy is set, a replica can be selected. 

1902 

1903 :type redis_cluster: RedisCluster 

1904 :type node: ClusterNode 

1905 :type host: str 

1906 :type port: int 

1907 """ 

1908 self.node = None 

1909 self.set_pubsub_node(redis_cluster, node, host, port) 

1910 connection_pool = ( 

1911 None 

1912 if self.node is None 

1913 else redis_cluster.get_redis_connection(self.node).connection_pool 

1914 ) 

1915 self.cluster = redis_cluster 

1916 self.node_pubsub_mapping = {} 

1917 self._pubsubs_generator = self._pubsubs_generator() 

1918 if event_dispatcher is None: 

1919 self._event_dispatcher = EventDispatcher() 

1920 else: 

1921 self._event_dispatcher = event_dispatcher 

1922 super().__init__( 

1923 connection_pool=connection_pool, 

1924 encoder=redis_cluster.encoder, 

1925 push_handler_func=push_handler_func, 

1926 event_dispatcher=self._event_dispatcher, 

1927 **kwargs, 

1928 ) 

1929 

1930 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

1931 """ 

1932 The pubsub node will be set according to the passed node, host and port 

1933 When none of the node, host, or port are specified - the node is set 

1934 to None and will be determined by the keyslot of the channel in the 

1935 first command to be executed. 

1936 RedisClusterException will be thrown if the passed node does not exist 

1937 in the cluster. 

1938 If host is passed without port, or vice versa, a DataError will be 

1939 thrown. 

1940 :type cluster: RedisCluster 

1941 :type node: ClusterNode 

1942 :type host: str 

1943 :type port: int 

1944 """ 

1945 if node is not None: 

1946 # node is passed by the user 

1947 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

1948 pubsub_node = node 

1949 elif host is not None and port is not None: 

1950 # host and port passed by the user 

1951 node = cluster.get_node(host=host, port=port) 

1952 self._raise_on_invalid_node(cluster, node, host, port) 

1953 pubsub_node = node 

1954 elif any([host, port]) is True: 

1955 # only 'host' or 'port' passed 

1956 raise DataError("Passing a host requires passing a port, and vice versa") 

1957 else: 

1958 # nothing passed by the user. set node to None 

1959 pubsub_node = None 

1960 

1961 self.node = pubsub_node 

1962 

1963 def get_pubsub_node(self): 

1964 """ 

1965 Get the node that is being used as the pubsub connection 

1966 """ 

1967 return self.node 

1968 

1969 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

1970 """ 

1971 Raise a RedisClusterException if the node is None or doesn't exist in 

1972 the cluster. 

1973 """ 

1974 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

1975 raise RedisClusterException( 

1976 f"Node {host}:{port} doesn't exist in the cluster" 

1977 ) 

1978 

1979 def execute_command(self, *args): 

1980 """ 

1981 Execute a subscribe/unsubscribe command. 

1982 

1983 Taken code from redis-py and tweak to make it work within a cluster. 

1984 """ 

1985 # NOTE: don't parse the response in this function -- it could pull a 

1986 # legitimate message off the stack if the connection is already 

1987 # subscribed to one or more channels 

1988 

1989 if self.connection is None: 

1990 if self.connection_pool is None: 

1991 if len(args) > 1: 

1992 # Hash the first channel and get one of the nodes holding 

1993 # this slot 

1994 channel = args[1] 

1995 slot = self.cluster.keyslot(channel) 

1996 node = self.cluster.nodes_manager.get_node_from_slot( 

1997 slot, 

1998 self.cluster.read_from_replicas, 

1999 self.cluster.load_balancing_strategy, 

2000 ) 

2001 else: 

2002 # Get a random node 

2003 node = self.cluster.get_random_node() 

2004 self.node = node 

2005 redis_connection = self.cluster.get_redis_connection(node) 

2006 self.connection_pool = redis_connection.connection_pool 

2007 self.connection = self.connection_pool.get_connection() 

2008 # register a callback that re-subscribes to any channels we 

2009 # were listening to when we were disconnected 

2010 self.connection.register_connect_callback(self.on_connect) 

2011 if self.push_handler_func is not None: 

2012 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

2013 self._event_dispatcher.dispatch( 

2014 AfterPubSubConnectionInstantiationEvent( 

2015 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

2016 ) 

2017 ) 

2018 connection = self.connection 

2019 self._execute(connection, connection.send_command, *args) 

2020 

2021 def _get_node_pubsub(self, node): 

2022 try: 

2023 return self.node_pubsub_mapping[node.name] 

2024 except KeyError: 

2025 pubsub = node.redis_connection.pubsub( 

2026 push_handler_func=self.push_handler_func 

2027 ) 

2028 self.node_pubsub_mapping[node.name] = pubsub 

2029 return pubsub 

2030 

2031 def _sharded_message_generator(self): 

2032 for _ in range(len(self.node_pubsub_mapping)): 

2033 pubsub = next(self._pubsubs_generator) 

2034 message = pubsub.get_message() 

2035 if message is not None: 

2036 return message 

2037 return None 

2038 

2039 def _pubsubs_generator(self): 

2040 while True: 

2041 yield from self.node_pubsub_mapping.values() 

2042 

2043 def get_sharded_message( 

2044 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None 

2045 ): 

2046 if target_node: 

2047 message = self.node_pubsub_mapping[target_node.name].get_message( 

2048 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout 

2049 ) 

2050 else: 

2051 message = self._sharded_message_generator() 

2052 if message is None: 

2053 return None 

2054 elif str_if_bytes(message["type"]) == "sunsubscribe": 

2055 if message["channel"] in self.pending_unsubscribe_shard_channels: 

2056 self.pending_unsubscribe_shard_channels.remove(message["channel"]) 

2057 self.shard_channels.pop(message["channel"], None) 

2058 node = self.cluster.get_node_from_key(message["channel"]) 

2059 if self.node_pubsub_mapping[node.name].subscribed is False: 

2060 self.node_pubsub_mapping.pop(node.name) 

2061 if not self.channels and not self.patterns and not self.shard_channels: 

2062 # There are no subscriptions anymore, set subscribed_event flag 

2063 # to false 

2064 self.subscribed_event.clear() 

2065 if self.ignore_subscribe_messages or ignore_subscribe_messages: 

2066 return None 

2067 return message 

2068 

2069 def ssubscribe(self, *args, **kwargs): 

2070 if args: 

2071 args = list_or_args(args[0], args[1:]) 

2072 s_channels = dict.fromkeys(args) 

2073 s_channels.update(kwargs) 

2074 for s_channel, handler in s_channels.items(): 

2075 node = self.cluster.get_node_from_key(s_channel) 

2076 pubsub = self._get_node_pubsub(node) 

2077 if handler: 

2078 pubsub.ssubscribe(**{s_channel: handler}) 

2079 else: 

2080 pubsub.ssubscribe(s_channel) 

2081 self.shard_channels.update(pubsub.shard_channels) 

2082 self.pending_unsubscribe_shard_channels.difference_update( 

2083 self._normalize_keys({s_channel: None}) 

2084 ) 

2085 if pubsub.subscribed and not self.subscribed: 

2086 self.subscribed_event.set() 

2087 self.health_check_response_counter = 0 

2088 

2089 def sunsubscribe(self, *args): 

2090 if args: 

2091 args = list_or_args(args[0], args[1:]) 

2092 else: 

2093 args = self.shard_channels 

2094 

2095 for s_channel in args: 

2096 node = self.cluster.get_node_from_key(s_channel) 

2097 p = self._get_node_pubsub(node) 

2098 p.sunsubscribe(s_channel) 

2099 self.pending_unsubscribe_shard_channels.update( 

2100 p.pending_unsubscribe_shard_channels 

2101 ) 

2102 

2103 def get_redis_connection(self): 

2104 """ 

2105 Get the Redis connection of the pubsub connected node. 

2106 """ 

2107 if self.node is not None: 

2108 return self.node.redis_connection 

2109 

2110 def disconnect(self): 

2111 """ 

2112 Disconnect the pubsub connection. 

2113 """ 

2114 if self.connection: 

2115 self.connection.disconnect() 

2116 for pubsub in self.node_pubsub_mapping.values(): 

2117 pubsub.connection.disconnect() 

2118 

2119 

2120class ClusterPipeline(RedisCluster): 

2121 """ 

2122 Support for Redis pipeline 

2123 in cluster mode 

2124 """ 

2125 

2126 ERRORS_ALLOW_RETRY = ( 

2127 ConnectionError, 

2128 TimeoutError, 

2129 MovedError, 

2130 AskError, 

2131 TryAgainError, 

2132 ) 

2133 

2134 NO_SLOTS_COMMANDS = {"UNWATCH"} 

2135 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

2136 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

2137 

2138 @deprecated_args( 

2139 args_to_warn=[ 

2140 "cluster_error_retry_attempts", 

2141 ], 

2142 reason="Please configure the 'retry' object instead", 

2143 version="6.0.0", 

2144 ) 

2145 def __init__( 

2146 self, 

2147 nodes_manager: "NodesManager", 

2148 commands_parser: "CommandsParser", 

2149 result_callbacks: Optional[Dict[str, Callable]] = None, 

2150 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

2151 startup_nodes: Optional[List["ClusterNode"]] = None, 

2152 read_from_replicas: bool = False, 

2153 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

2154 cluster_error_retry_attempts: int = 3, 

2155 reinitialize_steps: int = 5, 

2156 retry: Optional[Retry] = None, 

2157 lock=None, 

2158 transaction=False, 

2159 **kwargs, 

2160 ): 

2161 """ """ 

2162 self.command_stack = [] 

2163 self.nodes_manager = nodes_manager 

2164 self.commands_parser = commands_parser 

2165 self.refresh_table_asap = False 

2166 self.result_callbacks = ( 

2167 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

2168 ) 

2169 self.startup_nodes = startup_nodes if startup_nodes else [] 

2170 self.read_from_replicas = read_from_replicas 

2171 self.load_balancing_strategy = load_balancing_strategy 

2172 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

2173 self.cluster_response_callbacks = cluster_response_callbacks 

2174 self.reinitialize_counter = 0 

2175 self.reinitialize_steps = reinitialize_steps 

2176 if retry is not None: 

2177 self.retry = retry 

2178 else: 

2179 self.retry = Retry( 

2180 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

2181 retries=cluster_error_retry_attempts, 

2182 ) 

2183 

2184 self.encoder = Encoder( 

2185 kwargs.get("encoding", "utf-8"), 

2186 kwargs.get("encoding_errors", "strict"), 

2187 kwargs.get("decode_responses", False), 

2188 ) 

2189 if lock is None: 

2190 lock = threading.RLock() 

2191 self._lock = lock 

2192 self.parent_execute_command = super().execute_command 

2193 self._execution_strategy: ExecutionStrategy = ( 

2194 PipelineStrategy(self) if not transaction else TransactionStrategy(self) 

2195 ) 

2196 

2197 def __repr__(self): 

2198 """ """ 

2199 return f"{type(self).__name__}" 

2200 

2201 def __enter__(self): 

2202 """ """ 

2203 return self 

2204 

2205 def __exit__(self, exc_type, exc_value, traceback): 

2206 """ """ 

2207 self.reset() 

2208 

2209 def __del__(self): 

2210 try: 

2211 self.reset() 

2212 except Exception: 

2213 pass 

2214 

2215 def __len__(self): 

2216 """ """ 

2217 return len(self._execution_strategy.command_queue) 

2218 

2219 def __bool__(self): 

2220 "Pipeline instances should always evaluate to True on Python 3+" 

2221 return True 

2222 

2223 def execute_command(self, *args, **kwargs): 

2224 """ 

2225 Wrapper function for pipeline_execute_command 

2226 """ 

2227 return self._execution_strategy.execute_command(*args, **kwargs) 

2228 

2229 def pipeline_execute_command(self, *args, **options): 

2230 """ 

2231 Stage a command to be executed when execute() is next called 

2232 

2233 Returns the current Pipeline object back so commands can be 

2234 chained together, such as: 

2235 

2236 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

2237 

2238 At some other point, you can then run: pipe.execute(), 

2239 which will execute all commands queued in the pipe. 

2240 """ 

2241 return self._execution_strategy.execute_command(*args, **options) 

2242 

2243 def annotate_exception(self, exception, number, command): 

2244 """ 

2245 Provides extra context to the exception prior to it being handled 

2246 """ 

2247 self._execution_strategy.annotate_exception(exception, number, command) 

2248 

2249 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2250 """ 

2251 Execute all the commands in the current pipeline 

2252 """ 

2253 

2254 try: 

2255 return self._execution_strategy.execute(raise_on_error) 

2256 finally: 

2257 self.reset() 

2258 

2259 def reset(self): 

2260 """ 

2261 Reset back to empty pipeline. 

2262 """ 

2263 self._execution_strategy.reset() 

2264 

2265 def send_cluster_commands( 

2266 self, stack, raise_on_error=True, allow_redirections=True 

2267 ): 

2268 return self._execution_strategy.send_cluster_commands( 

2269 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections 

2270 ) 

2271 

2272 def exists(self, *keys): 

2273 return self._execution_strategy.exists(*keys) 

2274 

2275 def eval(self): 

2276 """ """ 

2277 return self._execution_strategy.eval() 

2278 

2279 def multi(self): 

2280 """ 

2281 Start a transactional block of the pipeline after WATCH commands 

2282 are issued. End the transactional block with `execute`. 

2283 """ 

2284 self._execution_strategy.multi() 

2285 

2286 def load_scripts(self): 

2287 """ """ 

2288 self._execution_strategy.load_scripts() 

2289 

2290 def discard(self): 

2291 """ """ 

2292 self._execution_strategy.discard() 

2293 

2294 def watch(self, *names): 

2295 """Watches the values at keys ``names``""" 

2296 self._execution_strategy.watch(*names) 

2297 

2298 def unwatch(self): 

2299 """Unwatches all previously specified keys""" 

2300 self._execution_strategy.unwatch() 

2301 

2302 def script_load_for_pipeline(self, *args, **kwargs): 

2303 self._execution_strategy.script_load_for_pipeline(*args, **kwargs) 

2304 

2305 def delete(self, *names): 

2306 self._execution_strategy.delete(*names) 

2307 

2308 def unlink(self, *names): 

2309 self._execution_strategy.unlink(*names) 

2310 

2311 

2312def block_pipeline_command(name: str) -> Callable[..., Any]: 

2313 """ 

2314 Prints error because some pipelined commands should 

2315 be blocked when running in cluster-mode 

2316 """ 

2317 

2318 def inner(*args, **kwargs): 

2319 raise RedisClusterException( 

2320 f"ERROR: Calling pipelined function {name} is blocked " 

2321 f"when running redis in cluster mode..." 

2322 ) 

2323 

2324 return inner 

2325 

2326 

2327# Blocked pipeline commands 

2328PIPELINE_BLOCKED_COMMANDS = ( 

2329 "BGREWRITEAOF", 

2330 "BGSAVE", 

2331 "BITOP", 

2332 "BRPOPLPUSH", 

2333 "CLIENT GETNAME", 

2334 "CLIENT KILL", 

2335 "CLIENT LIST", 

2336 "CLIENT SETNAME", 

2337 "CLIENT", 

2338 "CONFIG GET", 

2339 "CONFIG RESETSTAT", 

2340 "CONFIG REWRITE", 

2341 "CONFIG SET", 

2342 "CONFIG", 

2343 "DBSIZE", 

2344 "ECHO", 

2345 "EVALSHA", 

2346 "FLUSHALL", 

2347 "FLUSHDB", 

2348 "INFO", 

2349 "KEYS", 

2350 "LASTSAVE", 

2351 "MGET", 

2352 "MGET NONATOMIC", 

2353 "MOVE", 

2354 "MSET", 

2355 "MSET NONATOMIC", 

2356 "MSETNX", 

2357 "PFCOUNT", 

2358 "PFMERGE", 

2359 "PING", 

2360 "PUBLISH", 

2361 "RANDOMKEY", 

2362 "READONLY", 

2363 "READWRITE", 

2364 "RENAME", 

2365 "RENAMENX", 

2366 "RPOPLPUSH", 

2367 "SAVE", 

2368 "SCAN", 

2369 "SCRIPT EXISTS", 

2370 "SCRIPT FLUSH", 

2371 "SCRIPT KILL", 

2372 "SCRIPT LOAD", 

2373 "SCRIPT", 

2374 "SDIFF", 

2375 "SDIFFSTORE", 

2376 "SENTINEL GET MASTER ADDR BY NAME", 

2377 "SENTINEL MASTER", 

2378 "SENTINEL MASTERS", 

2379 "SENTINEL MONITOR", 

2380 "SENTINEL REMOVE", 

2381 "SENTINEL SENTINELS", 

2382 "SENTINEL SET", 

2383 "SENTINEL SLAVES", 

2384 "SENTINEL", 

2385 "SHUTDOWN", 

2386 "SINTER", 

2387 "SINTERSTORE", 

2388 "SLAVEOF", 

2389 "SLOWLOG GET", 

2390 "SLOWLOG LEN", 

2391 "SLOWLOG RESET", 

2392 "SLOWLOG", 

2393 "SMOVE", 

2394 "SORT", 

2395 "SUNION", 

2396 "SUNIONSTORE", 

2397 "TIME", 

2398) 

2399for command in PIPELINE_BLOCKED_COMMANDS: 

2400 command = command.replace(" ", "_").lower() 

2401 

2402 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

2403 

2404 

2405class PipelineCommand: 

2406 """ """ 

2407 

2408 def __init__(self, args, options=None, position=None): 

2409 self.args = args 

2410 if options is None: 

2411 options = {} 

2412 self.options = options 

2413 self.position = position 

2414 self.result = None 

2415 self.node = None 

2416 self.asking = False 

2417 

2418 

2419class NodeCommands: 

2420 """ """ 

2421 

2422 def __init__(self, parse_response, connection_pool, connection): 

2423 """ """ 

2424 self.parse_response = parse_response 

2425 self.connection_pool = connection_pool 

2426 self.connection = connection 

2427 self.commands = [] 

2428 

2429 def append(self, c): 

2430 """ """ 

2431 self.commands.append(c) 

2432 

2433 def write(self): 

2434 """ 

2435 Code borrowed from Redis so it can be fixed 

2436 """ 

2437 connection = self.connection 

2438 commands = self.commands 

2439 

2440 # We are going to clobber the commands with the write, so go ahead 

2441 # and ensure that nothing is sitting there from a previous run. 

2442 for c in commands: 

2443 c.result = None 

2444 

2445 # build up all commands into a single request to increase network perf 

2446 # send all the commands and catch connection and timeout errors. 

2447 try: 

2448 connection.send_packed_command( 

2449 connection.pack_commands([c.args for c in commands]) 

2450 ) 

2451 except (ConnectionError, TimeoutError) as e: 

2452 for c in commands: 

2453 c.result = e 

2454 

2455 def read(self): 

2456 """ """ 

2457 connection = self.connection 

2458 for c in self.commands: 

2459 # if there is a result on this command, 

2460 # it means we ran into an exception 

2461 # like a connection error. Trying to parse 

2462 # a response on a connection that 

2463 # is no longer open will result in a 

2464 # connection error raised by redis-py. 

2465 # but redis-py doesn't check in parse_response 

2466 # that the sock object is 

2467 # still set and if you try to 

2468 # read from a closed connection, it will 

2469 # result in an AttributeError because 

2470 # it will do a readline() call on None. 

2471 # This can have all kinds of nasty side-effects. 

2472 # Treating this case as a connection error 

2473 # is fine because it will dump 

2474 # the connection object back into the 

2475 # pool and on the next write, it will 

2476 # explicitly open the connection and all will be well. 

2477 if c.result is None: 

2478 try: 

2479 c.result = self.parse_response(connection, c.args[0], **c.options) 

2480 except (ConnectionError, TimeoutError) as e: 

2481 for c in self.commands: 

2482 c.result = e 

2483 return 

2484 except RedisError: 

2485 c.result = sys.exc_info()[1] 

2486 

2487 

2488class ExecutionStrategy(ABC): 

2489 @property 

2490 @abstractmethod 

2491 def command_queue(self): 

2492 pass 

2493 

2494 @abstractmethod 

2495 def execute_command(self, *args, **kwargs): 

2496 """ 

2497 Execution flow for current execution strategy. 

2498 

2499 See: ClusterPipeline.execute_command() 

2500 """ 

2501 pass 

2502 

2503 @abstractmethod 

2504 def annotate_exception(self, exception, number, command): 

2505 """ 

2506 Annotate exception according to current execution strategy. 

2507 

2508 See: ClusterPipeline.annotate_exception() 

2509 """ 

2510 pass 

2511 

2512 @abstractmethod 

2513 def pipeline_execute_command(self, *args, **options): 

2514 """ 

2515 Pipeline execution flow for current execution strategy. 

2516 

2517 See: ClusterPipeline.pipeline_execute_command() 

2518 """ 

2519 pass 

2520 

2521 @abstractmethod 

2522 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2523 """ 

2524 Executes current execution strategy. 

2525 

2526 See: ClusterPipeline.execute() 

2527 """ 

2528 pass 

2529 

2530 @abstractmethod 

2531 def send_cluster_commands( 

2532 self, stack, raise_on_error=True, allow_redirections=True 

2533 ): 

2534 """ 

2535 Sends commands according to current execution strategy. 

2536 

2537 See: ClusterPipeline.send_cluster_commands() 

2538 """ 

2539 pass 

2540 

2541 @abstractmethod 

2542 def reset(self): 

2543 """ 

2544 Resets current execution strategy. 

2545 

2546 See: ClusterPipeline.reset() 

2547 """ 

2548 pass 

2549 

2550 @abstractmethod 

2551 def exists(self, *keys): 

2552 pass 

2553 

2554 @abstractmethod 

2555 def eval(self): 

2556 pass 

2557 

2558 @abstractmethod 

2559 def multi(self): 

2560 """ 

2561 Starts transactional context. 

2562 

2563 See: ClusterPipeline.multi() 

2564 """ 

2565 pass 

2566 

2567 @abstractmethod 

2568 def load_scripts(self): 

2569 pass 

2570 

2571 @abstractmethod 

2572 def watch(self, *names): 

2573 pass 

2574 

2575 @abstractmethod 

2576 def unwatch(self): 

2577 """ 

2578 Unwatches all previously specified keys 

2579 

2580 See: ClusterPipeline.unwatch() 

2581 """ 

2582 pass 

2583 

2584 @abstractmethod 

2585 def script_load_for_pipeline(self, *args, **kwargs): 

2586 pass 

2587 

2588 @abstractmethod 

2589 def delete(self, *names): 

2590 """ 

2591 "Delete a key specified by ``names``" 

2592 

2593 See: ClusterPipeline.delete() 

2594 """ 

2595 pass 

2596 

2597 @abstractmethod 

2598 def unlink(self, *names): 

2599 """ 

2600 "Unlink a key specified by ``names``" 

2601 

2602 See: ClusterPipeline.unlink() 

2603 """ 

2604 pass 

2605 

2606 @abstractmethod 

2607 def discard(self): 

2608 pass 

2609 

2610 

2611class AbstractStrategy(ExecutionStrategy): 

2612 def __init__( 

2613 self, 

2614 pipe: ClusterPipeline, 

2615 ): 

2616 self._command_queue: List[PipelineCommand] = [] 

2617 self._pipe = pipe 

2618 self._nodes_manager = self._pipe.nodes_manager 

2619 

2620 @property 

2621 def command_queue(self): 

2622 return self._command_queue 

2623 

2624 @command_queue.setter 

2625 def command_queue(self, queue: List[PipelineCommand]): 

2626 self._command_queue = queue 

2627 

2628 @abstractmethod 

2629 def execute_command(self, *args, **kwargs): 

2630 pass 

2631 

2632 def pipeline_execute_command(self, *args, **options): 

2633 self._command_queue.append( 

2634 PipelineCommand(args, options, len(self._command_queue)) 

2635 ) 

2636 return self._pipe 

2637 

2638 @abstractmethod 

2639 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2640 pass 

2641 

2642 @abstractmethod 

2643 def send_cluster_commands( 

2644 self, stack, raise_on_error=True, allow_redirections=True 

2645 ): 

2646 pass 

2647 

2648 @abstractmethod 

2649 def reset(self): 

2650 pass 

2651 

2652 def exists(self, *keys): 

2653 return self.execute_command("EXISTS", *keys) 

2654 

2655 def eval(self): 

2656 """ """ 

2657 raise RedisClusterException("method eval() is not implemented") 

2658 

2659 def load_scripts(self): 

2660 """ """ 

2661 raise RedisClusterException("method load_scripts() is not implemented") 

2662 

2663 def script_load_for_pipeline(self, *args, **kwargs): 

2664 """ """ 

2665 raise RedisClusterException( 

2666 "method script_load_for_pipeline() is not implemented" 

2667 ) 

2668 

2669 def annotate_exception(self, exception, number, command): 

2670 """ 

2671 Provides extra context to the exception prior to it being handled 

2672 """ 

2673 cmd = " ".join(map(safe_str, command)) 

2674 msg = ( 

2675 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

2676 f"caused error: {exception.args[0]}" 

2677 ) 

2678 exception.args = (msg,) + exception.args[1:] 

2679 

2680 

2681class PipelineStrategy(AbstractStrategy): 

2682 def __init__(self, pipe: ClusterPipeline): 

2683 super().__init__(pipe) 

2684 self.command_flags = pipe.command_flags 

2685 

2686 def execute_command(self, *args, **kwargs): 

2687 return self.pipeline_execute_command(*args, **kwargs) 

2688 

2689 def _raise_first_error(self, stack): 

2690 """ 

2691 Raise the first exception on the stack 

2692 """ 

2693 for c in stack: 

2694 r = c.result 

2695 if isinstance(r, Exception): 

2696 self.annotate_exception(r, c.position + 1, c.args) 

2697 raise r 

2698 

2699 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2700 stack = self._command_queue 

2701 if not stack: 

2702 return [] 

2703 

2704 try: 

2705 return self.send_cluster_commands(stack, raise_on_error) 

2706 finally: 

2707 self.reset() 

2708 

2709 def reset(self): 

2710 """ 

2711 Reset back to empty pipeline. 

2712 """ 

2713 self._command_queue = [] 

2714 

2715 def send_cluster_commands( 

2716 self, stack, raise_on_error=True, allow_redirections=True 

2717 ): 

2718 """ 

2719 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling. 

2720 

2721 If one of the retryable exceptions has been thrown we assume that: 

2722 - connection_pool was disconnected 

2723 - connection_pool was reset 

2724 - refresh_table_asap set to True 

2725 

2726 It will try the number of times specified by 

2727 the retries in config option "self.retry" 

2728 which defaults to 3 unless manually configured. 

2729 

2730 If it reaches the number of times, the command will 

2731 raises ClusterDownException. 

2732 """ 

2733 if not stack: 

2734 return [] 

2735 retry_attempts = self._pipe.retry.get_retries() 

2736 while True: 

2737 try: 

2738 return self._send_cluster_commands( 

2739 stack, 

2740 raise_on_error=raise_on_error, 

2741 allow_redirections=allow_redirections, 

2742 ) 

2743 except RedisCluster.ERRORS_ALLOW_RETRY as e: 

2744 if retry_attempts > 0: 

2745 # Try again with the new cluster setup. All other errors 

2746 # should be raised. 

2747 retry_attempts -= 1 

2748 pass 

2749 else: 

2750 raise e 

2751 

2752 def _send_cluster_commands( 

2753 self, stack, raise_on_error=True, allow_redirections=True 

2754 ): 

2755 """ 

2756 Send a bunch of cluster commands to the redis cluster. 

2757 

2758 `allow_redirections` If the pipeline should follow 

2759 `ASK` & `MOVED` responses automatically. If set 

2760 to false it will raise RedisClusterException. 

2761 """ 

2762 # the first time sending the commands we send all of 

2763 # the commands that were queued up. 

2764 # if we have to run through it again, we only retry 

2765 # the commands that failed. 

2766 attempt = sorted(stack, key=lambda x: x.position) 

2767 is_default_node = False 

2768 # build a list of node objects based on node names we need to 

2769 nodes = {} 

2770 

2771 # as we move through each command that still needs to be processed, 

2772 # we figure out the slot number that command maps to, then from 

2773 # the slot determine the node. 

2774 for c in attempt: 

2775 while True: 

2776 # refer to our internal node -> slot table that 

2777 # tells us where a given command should route to. 

2778 # (it might be possible we have a cached node that no longer 

2779 # exists in the cluster, which is why we do this in a loop) 

2780 passed_targets = c.options.pop("target_nodes", None) 

2781 if passed_targets and not self._is_nodes_flag(passed_targets): 

2782 target_nodes = self._parse_target_nodes(passed_targets) 

2783 else: 

2784 target_nodes = self._determine_nodes( 

2785 *c.args, node_flag=passed_targets 

2786 ) 

2787 if not target_nodes: 

2788 raise RedisClusterException( 

2789 f"No targets were found to execute {c.args} command on" 

2790 ) 

2791 if len(target_nodes) > 1: 

2792 raise RedisClusterException( 

2793 f"Too many targets for command {c.args}" 

2794 ) 

2795 

2796 node = target_nodes[0] 

2797 if node == self._pipe.get_default_node(): 

2798 is_default_node = True 

2799 

2800 # now that we know the name of the node 

2801 # ( it's just a string in the form of host:port ) 

2802 # we can build a list of commands for each node. 

2803 node_name = node.name 

2804 if node_name not in nodes: 

2805 redis_node = self._pipe.get_redis_connection(node) 

2806 try: 

2807 connection = get_connection(redis_node) 

2808 except (ConnectionError, TimeoutError): 

2809 for n in nodes.values(): 

2810 n.connection_pool.release(n.connection) 

2811 # Connection retries are being handled in the node's 

2812 # Retry object. Reinitialize the node -> slot table. 

2813 self._nodes_manager.initialize() 

2814 if is_default_node: 

2815 self._pipe.replace_default_node() 

2816 raise 

2817 nodes[node_name] = NodeCommands( 

2818 redis_node.parse_response, 

2819 redis_node.connection_pool, 

2820 connection, 

2821 ) 

2822 nodes[node_name].append(c) 

2823 break 

2824 

2825 # send the commands in sequence. 

2826 # we write to all the open sockets for each node first, 

2827 # before reading anything 

2828 # this allows us to flush all the requests out across the 

2829 # network 

2830 # so that we can read them from different sockets as they come back. 

2831 # we dont' multiplex on the sockets as they come available, 

2832 # but that shouldn't make too much difference. 

2833 try: 

2834 node_commands = nodes.values() 

2835 for n in node_commands: 

2836 n.write() 

2837 

2838 for n in node_commands: 

2839 n.read() 

2840 finally: 

2841 # release all of the redis connections we allocated earlier 

2842 # back into the connection pool. 

2843 # we used to do this step as part of a try/finally block, 

2844 # but it is really dangerous to 

2845 # release connections back into the pool if for some 

2846 # reason the socket has data still left in it 

2847 # from a previous operation. The write and 

2848 # read operations already have try/catch around them for 

2849 # all known types of errors including connection 

2850 # and socket level errors. 

2851 # So if we hit an exception, something really bad 

2852 # happened and putting any oF 

2853 # these connections back into the pool is a very bad idea. 

2854 # the socket might have unread buffer still sitting in it, 

2855 # and then the next time we read from it we pass the 

2856 # buffered result back from a previous command and 

2857 # every single request after to that connection will always get 

2858 # a mismatched result. 

2859 for n in nodes.values(): 

2860 n.connection_pool.release(n.connection) 

2861 

2862 # if the response isn't an exception it is a 

2863 # valid response from the node 

2864 # we're all done with that command, YAY! 

2865 # if we have more commands to attempt, we've run into problems. 

2866 # collect all the commands we are allowed to retry. 

2867 # (MOVED, ASK, or connection errors or timeout errors) 

2868 attempt = sorted( 

2869 ( 

2870 c 

2871 for c in attempt 

2872 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

2873 ), 

2874 key=lambda x: x.position, 

2875 ) 

2876 if attempt and allow_redirections: 

2877 # RETRY MAGIC HAPPENS HERE! 

2878 # send these remaining commands one at a time using `execute_command` 

2879 # in the main client. This keeps our retry logic 

2880 # in one place mostly, 

2881 # and allows us to be more confident in correctness of behavior. 

2882 # at this point any speed gains from pipelining have been lost 

2883 # anyway, so we might as well make the best 

2884 # attempt to get the correct behavior. 

2885 # 

2886 # The client command will handle retries for each 

2887 # individual command sequentially as we pass each 

2888 # one into `execute_command`. Any exceptions 

2889 # that bubble out should only appear once all 

2890 # retries have been exhausted. 

2891 # 

2892 # If a lot of commands have failed, we'll be setting the 

2893 # flag to rebuild the slots table from scratch. 

2894 # So MOVED errors should correct themselves fairly quickly. 

2895 self._pipe.reinitialize_counter += 1 

2896 if self._pipe._should_reinitialized(): 

2897 self._nodes_manager.initialize() 

2898 if is_default_node: 

2899 self._pipe.replace_default_node() 

2900 for c in attempt: 

2901 try: 

2902 # send each command individually like we 

2903 # do in the main client. 

2904 c.result = self._pipe.parent_execute_command(*c.args, **c.options) 

2905 except RedisError as e: 

2906 c.result = e 

2907 

2908 # turn the response back into a simple flat array that corresponds 

2909 # to the sequence of commands issued in the stack in pipeline.execute() 

2910 response = [] 

2911 for c in sorted(stack, key=lambda x: x.position): 

2912 if c.args[0] in self._pipe.cluster_response_callbacks: 

2913 # Remove keys entry, it needs only for cache. 

2914 c.options.pop("keys", None) 

2915 c.result = self._pipe.cluster_response_callbacks[c.args[0]]( 

2916 c.result, **c.options 

2917 ) 

2918 response.append(c.result) 

2919 

2920 if raise_on_error: 

2921 self._raise_first_error(stack) 

2922 

2923 return response 

2924 

2925 def _is_nodes_flag(self, target_nodes): 

2926 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags 

2927 

2928 def _parse_target_nodes(self, target_nodes): 

2929 if isinstance(target_nodes, list): 

2930 nodes = target_nodes 

2931 elif isinstance(target_nodes, ClusterNode): 

2932 # Supports passing a single ClusterNode as a variable 

2933 nodes = [target_nodes] 

2934 elif isinstance(target_nodes, dict): 

2935 # Supports dictionaries of the format {node_name: node}. 

2936 # It enables to execute commands with multi nodes as follows: 

2937 # rc.cluster_save_config(rc.get_primaries()) 

2938 nodes = target_nodes.values() 

2939 else: 

2940 raise TypeError( 

2941 "target_nodes type can be one of the following: " 

2942 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

2943 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

2944 f"The passed type is {type(target_nodes)}" 

2945 ) 

2946 return nodes 

2947 

2948 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: 

2949 # Determine which nodes should be executed the command on. 

2950 # Returns a list of target nodes. 

2951 command = args[0].upper() 

2952 if ( 

2953 len(args) >= 2 

2954 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags 

2955 ): 

2956 command = f"{args[0]} {args[1]}".upper() 

2957 

2958 nodes_flag = kwargs.pop("nodes_flag", None) 

2959 if nodes_flag is not None: 

2960 # nodes flag passed by the user 

2961 command_flag = nodes_flag 

2962 else: 

2963 # get the nodes group for this command if it was predefined 

2964 command_flag = self._pipe.command_flags.get(command) 

2965 if command_flag == self._pipe.RANDOM: 

2966 # return a random node 

2967 return [self._pipe.get_random_node()] 

2968 elif command_flag == self._pipe.PRIMARIES: 

2969 # return all primaries 

2970 return self._pipe.get_primaries() 

2971 elif command_flag == self._pipe.REPLICAS: 

2972 # return all replicas 

2973 return self._pipe.get_replicas() 

2974 elif command_flag == self._pipe.ALL_NODES: 

2975 # return all nodes 

2976 return self._pipe.get_nodes() 

2977 elif command_flag == self._pipe.DEFAULT_NODE: 

2978 # return the cluster's default node 

2979 return [self._nodes_manager.default_node] 

2980 elif command in self._pipe.SEARCH_COMMANDS[0]: 

2981 return [self._nodes_manager.default_node] 

2982 else: 

2983 # get the node that holds the key's slot 

2984 slot = self._pipe.determine_slot(*args) 

2985 node = self._nodes_manager.get_node_from_slot( 

2986 slot, 

2987 self._pipe.read_from_replicas and command in READ_COMMANDS, 

2988 self._pipe.load_balancing_strategy 

2989 if command in READ_COMMANDS 

2990 else None, 

2991 ) 

2992 return [node] 

2993 

2994 def multi(self): 

2995 raise RedisClusterException( 

2996 "method multi() is not supported outside of transactional context" 

2997 ) 

2998 

2999 def discard(self): 

3000 raise RedisClusterException( 

3001 "method discard() is not supported outside of transactional context" 

3002 ) 

3003 

3004 def watch(self, *names): 

3005 raise RedisClusterException( 

3006 "method watch() is not supported outside of transactional context" 

3007 ) 

3008 

3009 def unwatch(self, *names): 

3010 raise RedisClusterException( 

3011 "method unwatch() is not supported outside of transactional context" 

3012 ) 

3013 

3014 def delete(self, *names): 

3015 if len(names) != 1: 

3016 raise RedisClusterException( 

3017 "deleting multiple keys is not implemented in pipeline command" 

3018 ) 

3019 

3020 return self.execute_command("DEL", names[0]) 

3021 

3022 def unlink(self, *names): 

3023 if len(names) != 1: 

3024 raise RedisClusterException( 

3025 "unlinking multiple keys is not implemented in pipeline command" 

3026 ) 

3027 

3028 return self.execute_command("UNLINK", names[0]) 

3029 

3030 

3031class TransactionStrategy(AbstractStrategy): 

3032 NO_SLOTS_COMMANDS = {"UNWATCH"} 

3033 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

3034 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

3035 SLOT_REDIRECT_ERRORS = (AskError, MovedError) 

3036 CONNECTION_ERRORS = ( 

3037 ConnectionError, 

3038 OSError, 

3039 ClusterDownError, 

3040 SlotNotCoveredError, 

3041 ) 

3042 

3043 def __init__(self, pipe: ClusterPipeline): 

3044 super().__init__(pipe) 

3045 self._explicit_transaction = False 

3046 self._watching = False 

3047 self._pipeline_slots: Set[int] = set() 

3048 self._transaction_connection: Optional[Connection] = None 

3049 self._executing = False 

3050 self._retry = copy(self._pipe.retry) 

3051 self._retry.update_supported_errors( 

3052 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS 

3053 ) 

3054 

3055 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]: 

3056 """ 

3057 Find a connection for a pipeline transaction. 

3058 

3059 For running an atomic transaction, watch keys ensure that contents have not been 

3060 altered as long as the watch commands for those keys were sent over the same 

3061 connection. So once we start watching a key, we fetch a connection to the 

3062 node that owns that slot and reuse it. 

3063 """ 

3064 if not self._pipeline_slots: 

3065 raise RedisClusterException( 

3066 "At least a command with a key is needed to identify a node" 

3067 ) 

3068 

3069 node: ClusterNode = self._nodes_manager.get_node_from_slot( 

3070 list(self._pipeline_slots)[0], False 

3071 ) 

3072 redis_node: Redis = self._pipe.get_redis_connection(node) 

3073 if self._transaction_connection: 

3074 if not redis_node.connection_pool.owns_connection( 

3075 self._transaction_connection 

3076 ): 

3077 previous_node = self._nodes_manager.find_connection_owner( 

3078 self._transaction_connection 

3079 ) 

3080 previous_node.connection_pool.release(self._transaction_connection) 

3081 self._transaction_connection = None 

3082 

3083 if not self._transaction_connection: 

3084 self._transaction_connection = get_connection(redis_node) 

3085 

3086 return redis_node, self._transaction_connection 

3087 

3088 def execute_command(self, *args, **kwargs): 

3089 slot_number: Optional[int] = None 

3090 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS: 

3091 slot_number = self._pipe.determine_slot(*args) 

3092 

3093 if ( 

3094 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS 

3095 ) and not self._explicit_transaction: 

3096 if args[0] == "WATCH": 

3097 self._validate_watch() 

3098 

3099 if slot_number is not None: 

3100 if self._pipeline_slots and slot_number not in self._pipeline_slots: 

3101 raise CrossSlotTransactionError( 

3102 "Cannot watch or send commands on different slots" 

3103 ) 

3104 

3105 self._pipeline_slots.add(slot_number) 

3106 elif args[0] not in self.NO_SLOTS_COMMANDS: 

3107 raise RedisClusterException( 

3108 f"Cannot identify slot number for command: {args[0]}," 

3109 "it cannot be triggered in a transaction" 

3110 ) 

3111 

3112 return self._immediate_execute_command(*args, **kwargs) 

3113 else: 

3114 if slot_number is not None: 

3115 self._pipeline_slots.add(slot_number) 

3116 

3117 return self.pipeline_execute_command(*args, **kwargs) 

3118 

3119 def _validate_watch(self): 

3120 if self._explicit_transaction: 

3121 raise RedisError("Cannot issue a WATCH after a MULTI") 

3122 

3123 self._watching = True 

3124 

3125 def _immediate_execute_command(self, *args, **options): 

3126 return self._retry.call_with_retry( 

3127 lambda: self._get_connection_and_send_command(*args, **options), 

3128 self._reinitialize_on_error, 

3129 ) 

3130 

3131 def _get_connection_and_send_command(self, *args, **options): 

3132 redis_node, connection = self._get_client_and_connection_for_transaction() 

3133 return self._send_command_parse_response( 

3134 connection, redis_node, args[0], *args, **options 

3135 ) 

3136 

3137 def _send_command_parse_response( 

3138 self, conn, redis_node: Redis, command_name, *args, **options 

3139 ): 

3140 """ 

3141 Send a command and parse the response 

3142 """ 

3143 

3144 conn.send_command(*args) 

3145 output = redis_node.parse_response(conn, command_name, **options) 

3146 

3147 if command_name in self.UNWATCH_COMMANDS: 

3148 self._watching = False 

3149 return output 

3150 

3151 def _reinitialize_on_error(self, error): 

3152 if self._watching: 

3153 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: 

3154 raise WatchError("Slot rebalancing occurred while watching keys") 

3155 

3156 if ( 

3157 type(error) in self.SLOT_REDIRECT_ERRORS 

3158 or type(error) in self.CONNECTION_ERRORS 

3159 ): 

3160 if self._transaction_connection: 

3161 self._transaction_connection = None 

3162 

3163 self._pipe.reinitialize_counter += 1 

3164 if self._pipe._should_reinitialized(): 

3165 self._nodes_manager.initialize() 

3166 self.reinitialize_counter = 0 

3167 else: 

3168 if isinstance(error, AskError): 

3169 self._nodes_manager.update_moved_exception(error) 

3170 

3171 self._executing = False 

3172 

3173 def _raise_first_error(self, responses, stack): 

3174 """ 

3175 Raise the first exception on the stack 

3176 """ 

3177 for r, cmd in zip(responses, stack): 

3178 if isinstance(r, Exception): 

3179 self.annotate_exception(r, cmd.position + 1, cmd.args) 

3180 raise r 

3181 

3182 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3183 stack = self._command_queue 

3184 if not stack and (not self._watching or not self._pipeline_slots): 

3185 return [] 

3186 

3187 return self._execute_transaction_with_retries(stack, raise_on_error) 

3188 

3189 def _execute_transaction_with_retries( 

3190 self, stack: List["PipelineCommand"], raise_on_error: bool 

3191 ): 

3192 return self._retry.call_with_retry( 

3193 lambda: self._execute_transaction(stack, raise_on_error), 

3194 self._reinitialize_on_error, 

3195 ) 

3196 

3197 def _execute_transaction( 

3198 self, stack: List["PipelineCommand"], raise_on_error: bool 

3199 ): 

3200 if len(self._pipeline_slots) > 1: 

3201 raise CrossSlotTransactionError( 

3202 "All keys involved in a cluster transaction must map to the same slot" 

3203 ) 

3204 

3205 self._executing = True 

3206 

3207 redis_node, connection = self._get_client_and_connection_for_transaction() 

3208 

3209 stack = chain( 

3210 [PipelineCommand(("MULTI",))], 

3211 stack, 

3212 [PipelineCommand(("EXEC",))], 

3213 ) 

3214 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options] 

3215 packed_commands = connection.pack_commands(commands) 

3216 connection.send_packed_command(packed_commands) 

3217 errors = [] 

3218 

3219 # parse off the response for MULTI 

3220 # NOTE: we need to handle ResponseErrors here and continue 

3221 # so that we read all the additional command messages from 

3222 # the socket 

3223 try: 

3224 redis_node.parse_response(connection, "MULTI") 

3225 except ResponseError as e: 

3226 self.annotate_exception(e, 0, "MULTI") 

3227 errors.append(e) 

3228 except self.CONNECTION_ERRORS as cluster_error: 

3229 self.annotate_exception(cluster_error, 0, "MULTI") 

3230 raise 

3231 

3232 # and all the other commands 

3233 for i, command in enumerate(self._command_queue): 

3234 if EMPTY_RESPONSE in command.options: 

3235 errors.append((i, command.options[EMPTY_RESPONSE])) 

3236 else: 

3237 try: 

3238 _ = redis_node.parse_response(connection, "_") 

3239 except self.SLOT_REDIRECT_ERRORS as slot_error: 

3240 self.annotate_exception(slot_error, i + 1, command.args) 

3241 errors.append(slot_error) 

3242 except self.CONNECTION_ERRORS as cluster_error: 

3243 self.annotate_exception(cluster_error, i + 1, command.args) 

3244 raise 

3245 except ResponseError as e: 

3246 self.annotate_exception(e, i + 1, command.args) 

3247 errors.append(e) 

3248 

3249 response = None 

3250 # parse the EXEC. 

3251 try: 

3252 response = redis_node.parse_response(connection, "EXEC") 

3253 except ExecAbortError: 

3254 if errors: 

3255 raise errors[0] 

3256 raise 

3257 

3258 self._executing = False 

3259 

3260 # EXEC clears any watched keys 

3261 self._watching = False 

3262 

3263 if response is None: 

3264 raise WatchError("Watched variable changed.") 

3265 

3266 # put any parse errors into the response 

3267 for i, e in errors: 

3268 response.insert(i, e) 

3269 

3270 if len(response) != len(self._command_queue): 

3271 raise InvalidPipelineStack( 

3272 "Unexpected response length for cluster pipeline EXEC." 

3273 " Command stack was {} but response had length {}".format( 

3274 [c.args[0] for c in self._command_queue], len(response) 

3275 ) 

3276 ) 

3277 

3278 # find any errors in the response and raise if necessary 

3279 if raise_on_error or len(errors) > 0: 

3280 self._raise_first_error( 

3281 response, 

3282 self._command_queue, 

3283 ) 

3284 

3285 # We have to run response callbacks manually 

3286 data = [] 

3287 for r, cmd in zip(response, self._command_queue): 

3288 if not isinstance(r, Exception): 

3289 command_name = cmd.args[0] 

3290 if command_name in self._pipe.cluster_response_callbacks: 

3291 r = self._pipe.cluster_response_callbacks[command_name]( 

3292 r, **cmd.options 

3293 ) 

3294 data.append(r) 

3295 return data 

3296 

3297 def reset(self): 

3298 self._command_queue = [] 

3299 

3300 # make sure to reset the connection state in the event that we were 

3301 # watching something 

3302 if self._transaction_connection: 

3303 try: 

3304 if self._watching: 

3305 # call this manually since our unwatch or 

3306 # immediate_execute_command methods can call reset() 

3307 self._transaction_connection.send_command("UNWATCH") 

3308 self._transaction_connection.read_response() 

3309 # we can safely return the connection to the pool here since we're 

3310 # sure we're no longer WATCHing anything 

3311 node = self._nodes_manager.find_connection_owner( 

3312 self._transaction_connection 

3313 ) 

3314 node.redis_connection.connection_pool.release( 

3315 self._transaction_connection 

3316 ) 

3317 self._transaction_connection = None 

3318 except self.CONNECTION_ERRORS: 

3319 # disconnect will also remove any previous WATCHes 

3320 if self._transaction_connection: 

3321 self._transaction_connection.disconnect() 

3322 

3323 # clean up the other instance attributes 

3324 self._watching = False 

3325 self._explicit_transaction = False 

3326 self._pipeline_slots = set() 

3327 self._executing = False 

3328 

3329 def send_cluster_commands( 

3330 self, stack, raise_on_error=True, allow_redirections=True 

3331 ): 

3332 raise NotImplementedError( 

3333 "send_cluster_commands cannot be executed in transactional context." 

3334 ) 

3335 

3336 def multi(self): 

3337 if self._explicit_transaction: 

3338 raise RedisError("Cannot issue nested calls to MULTI") 

3339 if self._command_queue: 

3340 raise RedisError( 

3341 "Commands without an initial WATCH have already been issued" 

3342 ) 

3343 self._explicit_transaction = True 

3344 

3345 def watch(self, *names): 

3346 if self._explicit_transaction: 

3347 raise RedisError("Cannot issue a WATCH after a MULTI") 

3348 

3349 return self.execute_command("WATCH", *names) 

3350 

3351 def unwatch(self): 

3352 if self._watching: 

3353 return self.execute_command("UNWATCH") 

3354 

3355 return True 

3356 

3357 def discard(self): 

3358 self.reset() 

3359 

3360 def delete(self, *names): 

3361 return self.execute_command("DEL", *names) 

3362 

3363 def unlink(self, *names): 

3364 return self.execute_command("UNLINK", *names)