Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1339 statements  

1import random 

2import socket 

3import sys 

4import threading 

5import time 

6from abc import ABC, abstractmethod 

7from collections import OrderedDict 

8from copy import copy 

9from enum import Enum 

10from itertools import chain 

11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union 

12 

13from redis._parsers import CommandsParser, Encoder 

14from redis._parsers.helpers import parse_scan 

15from redis.backoff import ExponentialWithJitterBackoff, NoBackoff 

16from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface 

17from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis 

18from redis.commands import READ_COMMANDS, RedisClusterCommands 

19from redis.commands.helpers import list_or_args 

20from redis.connection import ( 

21 Connection, 

22 ConnectionPool, 

23 parse_url, 

24) 

25from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

26from redis.event import ( 

27 AfterPooledConnectionsInstantiationEvent, 

28 AfterPubSubConnectionInstantiationEvent, 

29 ClientType, 

30 EventDispatcher, 

31) 

32from redis.exceptions import ( 

33 AskError, 

34 AuthenticationError, 

35 ClusterDownError, 

36 ClusterError, 

37 ConnectionError, 

38 CrossSlotTransactionError, 

39 DataError, 

40 ExecAbortError, 

41 InvalidPipelineStack, 

42 MaxConnectionsError, 

43 MovedError, 

44 RedisClusterException, 

45 RedisError, 

46 ResponseError, 

47 SlotNotCoveredError, 

48 TimeoutError, 

49 TryAgainError, 

50 WatchError, 

51) 

52from redis.lock import Lock 

53from redis.maint_notifications import MaintNotificationsConfig 

54from redis.retry import Retry 

55from redis.utils import ( 

56 deprecated_args, 

57 dict_merge, 

58 list_keys_to_dict, 

59 merge_result, 

60 safe_str, 

61 str_if_bytes, 

62 truncate_text, 

63) 

64 

65 

66def get_node_name(host: str, port: Union[str, int]) -> str: 

67 return f"{host}:{port}" 

68 

69 

70@deprecated_args( 

71 allowed_args=["redis_node"], 

72 reason="Use get_connection(redis_node) instead", 

73 version="5.3.0", 

74) 

75def get_connection(redis_node: Redis, *args, **options) -> Connection: 

76 return redis_node.connection or redis_node.connection_pool.get_connection() 

77 

78 

79def parse_scan_result(command, res, **options): 

80 cursors = {} 

81 ret = [] 

82 for node_name, response in res.items(): 

83 cursor, r = parse_scan(response, **options) 

84 cursors[node_name] = cursor 

85 ret += r 

86 

87 return cursors, ret 

88 

89 

90def parse_pubsub_numsub(command, res, **options): 

91 numsub_d = OrderedDict() 

92 for numsub_tups in res.values(): 

93 for channel, numsubbed in numsub_tups: 

94 try: 

95 numsub_d[channel] += numsubbed 

96 except KeyError: 

97 numsub_d[channel] = numsubbed 

98 

99 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

100 return ret_numsub 

101 

102 

103def parse_cluster_slots( 

104 resp: Any, **options: Any 

105) -> Dict[Tuple[int, int], Dict[str, Any]]: 

106 current_host = options.get("current_host", "") 

107 

108 def fix_server(*args: Any) -> Tuple[str, Any]: 

109 return str_if_bytes(args[0]) or current_host, args[1] 

110 

111 slots = {} 

112 for slot in resp: 

113 start, end, primary = slot[:3] 

114 replicas = slot[3:] 

115 slots[start, end] = { 

116 "primary": fix_server(*primary), 

117 "replicas": [fix_server(*replica) for replica in replicas], 

118 } 

119 

120 return slots 

121 

122 

123def parse_cluster_shards(resp, **options): 

124 """ 

125 Parse CLUSTER SHARDS response. 

126 """ 

127 if isinstance(resp[0], dict): 

128 return resp 

129 shards = [] 

130 for x in resp: 

131 shard = {"slots": [], "nodes": []} 

132 for i in range(0, len(x[1]), 2): 

133 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

134 nodes = x[3] 

135 for node in nodes: 

136 dict_node = {} 

137 for i in range(0, len(node), 2): 

138 dict_node[node[i]] = node[i + 1] 

139 shard["nodes"].append(dict_node) 

140 shards.append(shard) 

141 

142 return shards 

143 

144 

145def parse_cluster_myshardid(resp, **options): 

146 """ 

147 Parse CLUSTER MYSHARDID response. 

148 """ 

149 return resp.decode("utf-8") 

150 

151 

152PRIMARY = "primary" 

153REPLICA = "replica" 

154SLOT_ID = "slot-id" 

155 

156REDIS_ALLOWED_KEYS = ( 

157 "connection_class", 

158 "connection_pool", 

159 "connection_pool_class", 

160 "client_name", 

161 "credential_provider", 

162 "db", 

163 "decode_responses", 

164 "encoding", 

165 "encoding_errors", 

166 "host", 

167 "lib_name", 

168 "lib_version", 

169 "max_connections", 

170 "nodes_flag", 

171 "redis_connect_func", 

172 "password", 

173 "port", 

174 "timeout", 

175 "queue_class", 

176 "retry", 

177 "retry_on_timeout", 

178 "protocol", 

179 "socket_connect_timeout", 

180 "socket_keepalive", 

181 "socket_keepalive_options", 

182 "socket_timeout", 

183 "ssl", 

184 "ssl_ca_certs", 

185 "ssl_ca_data", 

186 "ssl_certfile", 

187 "ssl_cert_reqs", 

188 "ssl_include_verify_flags", 

189 "ssl_exclude_verify_flags", 

190 "ssl_keyfile", 

191 "ssl_password", 

192 "ssl_check_hostname", 

193 "unix_socket_path", 

194 "username", 

195 "cache", 

196 "cache_config", 

197) 

198KWARGS_DISABLED_KEYS = ("host", "port", "retry") 

199 

200 

201def cleanup_kwargs(**kwargs): 

202 """ 

203 Remove unsupported or disabled keys from kwargs 

204 """ 

205 connection_kwargs = { 

206 k: v 

207 for k, v in kwargs.items() 

208 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

209 } 

210 

211 return connection_kwargs 

212 

213 

214class AbstractRedisCluster: 

215 RedisClusterRequestTTL = 16 

216 

217 PRIMARIES = "primaries" 

218 REPLICAS = "replicas" 

219 ALL_NODES = "all" 

220 RANDOM = "random" 

221 DEFAULT_NODE = "default-node" 

222 

223 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

224 

225 COMMAND_FLAGS = dict_merge( 

226 list_keys_to_dict( 

227 [ 

228 "ACL CAT", 

229 "ACL DELUSER", 

230 "ACL DRYRUN", 

231 "ACL GENPASS", 

232 "ACL GETUSER", 

233 "ACL HELP", 

234 "ACL LIST", 

235 "ACL LOG", 

236 "ACL LOAD", 

237 "ACL SAVE", 

238 "ACL SETUSER", 

239 "ACL USERS", 

240 "ACL WHOAMI", 

241 "AUTH", 

242 "CLIENT LIST", 

243 "CLIENT SETINFO", 

244 "CLIENT SETNAME", 

245 "CLIENT GETNAME", 

246 "CONFIG SET", 

247 "CONFIG REWRITE", 

248 "CONFIG RESETSTAT", 

249 "TIME", 

250 "PUBSUB CHANNELS", 

251 "PUBSUB NUMPAT", 

252 "PUBSUB NUMSUB", 

253 "PUBSUB SHARDCHANNELS", 

254 "PUBSUB SHARDNUMSUB", 

255 "PING", 

256 "INFO", 

257 "SHUTDOWN", 

258 "KEYS", 

259 "DBSIZE", 

260 "BGSAVE", 

261 "SLOWLOG GET", 

262 "SLOWLOG LEN", 

263 "SLOWLOG RESET", 

264 "WAIT", 

265 "WAITAOF", 

266 "SAVE", 

267 "MEMORY PURGE", 

268 "MEMORY MALLOC-STATS", 

269 "MEMORY STATS", 

270 "LASTSAVE", 

271 "CLIENT TRACKINGINFO", 

272 "CLIENT PAUSE", 

273 "CLIENT UNPAUSE", 

274 "CLIENT UNBLOCK", 

275 "CLIENT ID", 

276 "CLIENT REPLY", 

277 "CLIENT GETREDIR", 

278 "CLIENT INFO", 

279 "CLIENT KILL", 

280 "READONLY", 

281 "CLUSTER INFO", 

282 "CLUSTER MEET", 

283 "CLUSTER MYSHARDID", 

284 "CLUSTER NODES", 

285 "CLUSTER REPLICAS", 

286 "CLUSTER RESET", 

287 "CLUSTER SET-CONFIG-EPOCH", 

288 "CLUSTER SLOTS", 

289 "CLUSTER SHARDS", 

290 "CLUSTER COUNT-FAILURE-REPORTS", 

291 "CLUSTER KEYSLOT", 

292 "COMMAND", 

293 "COMMAND COUNT", 

294 "COMMAND LIST", 

295 "COMMAND GETKEYS", 

296 "CONFIG GET", 

297 "DEBUG", 

298 "RANDOMKEY", 

299 "READONLY", 

300 "READWRITE", 

301 "TIME", 

302 "TFUNCTION LOAD", 

303 "TFUNCTION DELETE", 

304 "TFUNCTION LIST", 

305 "TFCALL", 

306 "TFCALLASYNC", 

307 "LATENCY HISTORY", 

308 "LATENCY LATEST", 

309 "LATENCY RESET", 

310 "MODULE LIST", 

311 "MODULE LOAD", 

312 "MODULE UNLOAD", 

313 "MODULE LOADEX", 

314 ], 

315 DEFAULT_NODE, 

316 ), 

317 list_keys_to_dict( 

318 [ 

319 "FLUSHALL", 

320 "FLUSHDB", 

321 "FUNCTION DELETE", 

322 "FUNCTION FLUSH", 

323 "FUNCTION LIST", 

324 "FUNCTION LOAD", 

325 "FUNCTION RESTORE", 

326 "SCAN", 

327 "SCRIPT EXISTS", 

328 "SCRIPT FLUSH", 

329 "SCRIPT LOAD", 

330 ], 

331 PRIMARIES, 

332 ), 

333 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

334 list_keys_to_dict( 

335 [ 

336 "CLUSTER COUNTKEYSINSLOT", 

337 "CLUSTER DELSLOTS", 

338 "CLUSTER DELSLOTSRANGE", 

339 "CLUSTER GETKEYSINSLOT", 

340 "CLUSTER SETSLOT", 

341 ], 

342 SLOT_ID, 

343 ), 

344 ) 

345 

346 SEARCH_COMMANDS = ( 

347 [ 

348 "FT.CREATE", 

349 "FT.SEARCH", 

350 "FT.AGGREGATE", 

351 "FT.EXPLAIN", 

352 "FT.EXPLAINCLI", 

353 "FT,PROFILE", 

354 "FT.ALTER", 

355 "FT.DROPINDEX", 

356 "FT.ALIASADD", 

357 "FT.ALIASUPDATE", 

358 "FT.ALIASDEL", 

359 "FT.TAGVALS", 

360 "FT.SUGADD", 

361 "FT.SUGGET", 

362 "FT.SUGDEL", 

363 "FT.SUGLEN", 

364 "FT.SYNUPDATE", 

365 "FT.SYNDUMP", 

366 "FT.SPELLCHECK", 

367 "FT.DICTADD", 

368 "FT.DICTDEL", 

369 "FT.DICTDUMP", 

370 "FT.INFO", 

371 "FT._LIST", 

372 "FT.CONFIG", 

373 "FT.ADD", 

374 "FT.DEL", 

375 "FT.DROP", 

376 "FT.GET", 

377 "FT.MGET", 

378 "FT.SYNADD", 

379 ], 

380 ) 

381 

382 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

383 "CLUSTER SLOTS": parse_cluster_slots, 

384 "CLUSTER SHARDS": parse_cluster_shards, 

385 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

386 } 

387 

388 RESULT_CALLBACKS = dict_merge( 

389 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub), 

390 list_keys_to_dict( 

391 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

392 ), 

393 list_keys_to_dict( 

394 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result 

395 ), 

396 list_keys_to_dict( 

397 [ 

398 "PING", 

399 "CONFIG SET", 

400 "CONFIG REWRITE", 

401 "CONFIG RESETSTAT", 

402 "CLIENT SETNAME", 

403 "BGSAVE", 

404 "SLOWLOG RESET", 

405 "SAVE", 

406 "MEMORY PURGE", 

407 "CLIENT PAUSE", 

408 "CLIENT UNPAUSE", 

409 ], 

410 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

411 ), 

412 list_keys_to_dict( 

413 ["DBSIZE", "WAIT"], 

414 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

415 ), 

416 list_keys_to_dict( 

417 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

418 ), 

419 list_keys_to_dict(["SCAN"], parse_scan_result), 

420 list_keys_to_dict( 

421 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

422 ), 

423 list_keys_to_dict( 

424 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

425 ), 

426 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

427 ) 

428 

429 ERRORS_ALLOW_RETRY = ( 

430 ConnectionError, 

431 TimeoutError, 

432 ClusterDownError, 

433 SlotNotCoveredError, 

434 ) 

435 

436 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

437 """Replace the default cluster node. 

438 A random cluster node will be chosen if target_node isn't passed, and primaries 

439 will be prioritized. The default node will not be changed if there are no other 

440 nodes in the cluster. 

441 

442 Args: 

443 target_node (ClusterNode, optional): Target node to replace the default 

444 node. Defaults to None. 

445 """ 

446 if target_node: 

447 self.nodes_manager.default_node = target_node 

448 else: 

449 curr_node = self.get_default_node() 

450 primaries = [node for node in self.get_primaries() if node != curr_node] 

451 if primaries: 

452 # Choose a primary if the cluster contains different primaries 

453 self.nodes_manager.default_node = random.choice(primaries) 

454 else: 

455 # Otherwise, choose a primary if the cluster contains different primaries 

456 replicas = [node for node in self.get_replicas() if node != curr_node] 

457 if replicas: 

458 self.nodes_manager.default_node = random.choice(replicas) 

459 

460 

461class RedisCluster(AbstractRedisCluster, RedisClusterCommands): 

462 @classmethod 

463 def from_url(cls, url, **kwargs): 

464 """ 

465 Return a Redis client object configured from the given URL 

466 

467 For example:: 

468 

469 redis://[[username]:[password]]@localhost:6379/0 

470 rediss://[[username]:[password]]@localhost:6379/0 

471 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

472 

473 Three URL schemes are supported: 

474 

475 - `redis://` creates a TCP socket connection. See more at: 

476 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

477 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

478 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

479 - ``unix://``: creates a Unix Domain Socket connection. 

480 

481 The username, password, hostname, path and all querystring values 

482 are passed through urllib.parse.unquote in order to replace any 

483 percent-encoded values with their corresponding characters. 

484 

485 There are several ways to specify a database number. The first value 

486 found will be used: 

487 

488 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

489 2. If using the redis:// or rediss:// schemes, the path argument 

490 of the url, e.g. redis://localhost/0 

491 3. A ``db`` keyword argument to this function. 

492 

493 If none of these options are specified, the default db=0 is used. 

494 

495 All querystring options are cast to their appropriate Python types. 

496 Boolean arguments can be specified with string values "True"/"False" 

497 or "Yes"/"No". Values that cannot be properly cast cause a 

498 ``ValueError`` to be raised. Once parsed, the querystring arguments 

499 and keyword arguments are passed to the ``ConnectionPool``'s 

500 class initializer. In the case of conflicting arguments, querystring 

501 arguments always win. 

502 

503 """ 

504 return cls(url=url, **kwargs) 

505 

506 @deprecated_args( 

507 args_to_warn=["read_from_replicas"], 

508 reason="Please configure the 'load_balancing_strategy' instead", 

509 version="5.3.0", 

510 ) 

511 @deprecated_args( 

512 args_to_warn=[ 

513 "cluster_error_retry_attempts", 

514 ], 

515 reason="Please configure the 'retry' object instead", 

516 version="6.0.0", 

517 ) 

518 def __init__( 

519 self, 

520 host: Optional[str] = None, 

521 port: int = 6379, 

522 startup_nodes: Optional[List["ClusterNode"]] = None, 

523 cluster_error_retry_attempts: int = 3, 

524 retry: Optional["Retry"] = None, 

525 require_full_coverage: bool = True, 

526 reinitialize_steps: int = 5, 

527 read_from_replicas: bool = False, 

528 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None, 

529 dynamic_startup_nodes: bool = True, 

530 url: Optional[str] = None, 

531 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

532 cache: Optional[CacheInterface] = None, 

533 cache_config: Optional[CacheConfig] = None, 

534 event_dispatcher: Optional[EventDispatcher] = None, 

535 **kwargs, 

536 ): 

537 """ 

538 Initialize a new RedisCluster client. 

539 

540 :param startup_nodes: 

541 List of nodes from which initial bootstrapping can be done 

542 :param host: 

543 Can be used to point to a startup node 

544 :param port: 

545 Can be used to point to a startup node 

546 :param require_full_coverage: 

547 When set to False (default value): the client will not require a 

548 full coverage of the slots. However, if not all slots are covered, 

549 and at least one node has 'cluster-require-full-coverage' set to 

550 'yes,' the server will throw a ClusterDownError for some key-based 

551 commands. See - 

552 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

553 When set to True: all slots must be covered to construct the 

554 cluster client. If not all slots are covered, RedisClusterException 

555 will be thrown. 

556 :param read_from_replicas: 

557 @deprecated - please use load_balancing_strategy instead 

558 Enable read from replicas in READONLY mode. You can read possibly 

559 stale data. 

560 When set to true, read commands will be assigned between the 

561 primary and its replications in a Round-Robin manner. 

562 :param load_balancing_strategy: 

563 Enable read from replicas in READONLY mode and defines the load balancing 

564 strategy that will be used for cluster node selection. 

565 The data read from replicas is eventually consistent with the data in primary nodes. 

566 :param dynamic_startup_nodes: 

567 Set the RedisCluster's startup nodes to all of the discovered nodes. 

568 If true (default value), the cluster's discovered nodes will be used to 

569 determine the cluster nodes-slots mapping in the next topology refresh. 

570 It will remove the initial passed startup nodes if their endpoints aren't 

571 listed in the CLUSTER SLOTS output. 

572 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

573 specific IP addresses, it is best to set it to false. 

574 :param cluster_error_retry_attempts: 

575 @deprecated - Please configure the 'retry' object instead 

576 In case 'retry' object is set - this argument is ignored! 

577 

578 Number of times to retry before raising an error when 

579 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or 

580 :class:`~.ClusterDownError` are encountered 

581 :param retry: 

582 A retry object that defines the retry strategy and the number of 

583 retries for the cluster client. 

584 In current implementation for the cluster client (starting form redis-py version 6.0.0) 

585 the retry object is not yet fully utilized, instead it is used just to determine 

586 the number of retries for the cluster client. 

587 In the future releases the retry object will be used to handle the cluster client retries! 

588 :param reinitialize_steps: 

589 Specifies the number of MOVED errors that need to occur before 

590 reinitializing the whole cluster topology. If a MOVED error occurs 

591 and the cluster does not need to be reinitialized on this current 

592 error handling, only the MOVED slot will be patched with the 

593 redirected node. 

594 To reinitialize the cluster on every MOVED error, set 

595 reinitialize_steps to 1. 

596 To avoid reinitializing the cluster on moved errors, set 

597 reinitialize_steps to 0. 

598 :param address_remap: 

599 An optional callable which, when provided with an internal network 

600 address of a node, e.g. a `(host, port)` tuple, will return the address 

601 where the node is reachable. This can be used to map the addresses at 

602 which the nodes _think_ they are, to addresses at which a client may 

603 reach them, such as when they sit behind a proxy. 

604 

605 :**kwargs: 

606 Extra arguments that will be sent into Redis instance when created 

607 (See Official redis-py doc for supported kwargs - the only limitation 

608 is that you can't provide 'retry' object as part of kwargs. 

609 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

610 Some kwargs are not supported and will raise a 

611 RedisClusterException: 

612 - db (Redis do not support database SELECT in cluster mode) 

613 """ 

614 if startup_nodes is None: 

615 startup_nodes = [] 

616 

617 if "db" in kwargs: 

618 # Argument 'db' is not possible to use in cluster mode 

619 raise RedisClusterException( 

620 "Argument 'db' is not possible to use in cluster mode" 

621 ) 

622 

623 if "retry" in kwargs: 

624 # Argument 'retry' is not possible to be used in kwargs when in cluster mode 

625 # the kwargs are set to the lower level connections to the cluster nodes 

626 # and there we provide retry configuration without retries allowed. 

627 # The retries should be handled on cluster client level. 

628 raise RedisClusterException( 

629 "The 'retry' argument cannot be used in kwargs when running in cluster mode." 

630 ) 

631 

632 # Get the startup node/s 

633 from_url = False 

634 if url is not None: 

635 from_url = True 

636 url_options = parse_url(url) 

637 if "path" in url_options: 

638 raise RedisClusterException( 

639 "RedisCluster does not currently support Unix Domain " 

640 "Socket connections" 

641 ) 

642 if "db" in url_options and url_options["db"] != 0: 

643 # Argument 'db' is not possible to use in cluster mode 

644 raise RedisClusterException( 

645 "A ``db`` querystring option can only be 0 in cluster mode" 

646 ) 

647 kwargs.update(url_options) 

648 host = kwargs.get("host") 

649 port = kwargs.get("port", port) 

650 startup_nodes.append(ClusterNode(host, port)) 

651 elif host is not None and port is not None: 

652 startup_nodes.append(ClusterNode(host, port)) 

653 elif len(startup_nodes) == 0: 

654 # No startup node was provided 

655 raise RedisClusterException( 

656 "RedisCluster requires at least one node to discover the " 

657 "cluster. Please provide one of the followings:\n" 

658 "1. host and port, for example:\n" 

659 " RedisCluster(host='localhost', port=6379)\n" 

660 "2. list of startup nodes, for example:\n" 

661 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

662 " ClusterNode('localhost', 6378)])" 

663 ) 

664 # Update the connection arguments 

665 # Whenever a new connection is established, RedisCluster's on_connect 

666 # method should be run 

667 # If the user passed on_connect function we'll save it and run it 

668 # inside the RedisCluster.on_connect() function 

669 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

670 kwargs.update({"redis_connect_func": self.on_connect}) 

671 kwargs = cleanup_kwargs(**kwargs) 

672 if retry: 

673 self.retry = retry 

674 else: 

675 self.retry = Retry( 

676 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

677 retries=cluster_error_retry_attempts, 

678 ) 

679 

680 self.encoder = Encoder( 

681 kwargs.get("encoding", "utf-8"), 

682 kwargs.get("encoding_errors", "strict"), 

683 kwargs.get("decode_responses", False), 

684 ) 

685 protocol = kwargs.get("protocol", None) 

686 if (cache_config or cache) and protocol not in [3, "3"]: 

687 raise RedisError("Client caching is only supported with RESP version 3") 

688 

689 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

690 self.node_flags = self.__class__.NODE_FLAGS.copy() 

691 self.read_from_replicas = read_from_replicas 

692 self.load_balancing_strategy = load_balancing_strategy 

693 self.reinitialize_counter = 0 

694 self.reinitialize_steps = reinitialize_steps 

695 if event_dispatcher is None: 

696 self._event_dispatcher = EventDispatcher() 

697 else: 

698 self._event_dispatcher = event_dispatcher 

699 self.startup_nodes = startup_nodes 

700 self.nodes_manager = NodesManager( 

701 startup_nodes=startup_nodes, 

702 from_url=from_url, 

703 require_full_coverage=require_full_coverage, 

704 dynamic_startup_nodes=dynamic_startup_nodes, 

705 address_remap=address_remap, 

706 cache=cache, 

707 cache_config=cache_config, 

708 event_dispatcher=self._event_dispatcher, 

709 **kwargs, 

710 ) 

711 

712 self.cluster_response_callbacks = CaseInsensitiveDict( 

713 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

714 ) 

715 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

716 

717 self.commands_parser = CommandsParser(self) 

718 self._lock = threading.RLock() 

719 

720 def __enter__(self): 

721 return self 

722 

723 def __exit__(self, exc_type, exc_value, traceback): 

724 self.close() 

725 

726 def __del__(self): 

727 try: 

728 self.close() 

729 except Exception: 

730 pass 

731 

732 def disconnect_connection_pools(self): 

733 for node in self.get_nodes(): 

734 if node.redis_connection: 

735 try: 

736 node.redis_connection.connection_pool.disconnect() 

737 except OSError: 

738 # Client was already disconnected. do nothing 

739 pass 

740 

741 def on_connect(self, connection): 

742 """ 

743 Initialize the connection, authenticate and select a database and send 

744 READONLY if it is set during object initialization. 

745 """ 

746 connection.on_connect() 

747 

748 if self.read_from_replicas or self.load_balancing_strategy: 

749 # Sending READONLY command to server to configure connection as 

750 # readonly. Since each cluster node may change its server type due 

751 # to a failover, we should establish a READONLY connection 

752 # regardless of the server type. If this is a primary connection, 

753 # READONLY would not affect executing write commands. 

754 connection.send_command("READONLY") 

755 if str_if_bytes(connection.read_response()) != "OK": 

756 raise ConnectionError("READONLY command failed") 

757 

758 if self.user_on_connect_func is not None: 

759 self.user_on_connect_func(connection) 

760 

761 def get_redis_connection(self, node: "ClusterNode") -> Redis: 

762 if not node.redis_connection: 

763 with self._lock: 

764 if not node.redis_connection: 

765 self.nodes_manager.create_redis_connections([node]) 

766 return node.redis_connection 

767 

768 def get_node(self, host=None, port=None, node_name=None): 

769 return self.nodes_manager.get_node(host, port, node_name) 

770 

771 def get_primaries(self): 

772 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

773 

774 def get_replicas(self): 

775 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

776 

777 def get_random_node(self): 

778 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

779 

780 def get_nodes(self): 

781 return list(self.nodes_manager.nodes_cache.values()) 

782 

783 def get_node_from_key(self, key, replica=False): 

784 """ 

785 Get the node that holds the key's slot. 

786 If replica set to True but the slot doesn't have any replicas, None is 

787 returned. 

788 """ 

789 slot = self.keyslot(key) 

790 slot_cache = self.nodes_manager.slots_cache.get(slot) 

791 if slot_cache is None or len(slot_cache) == 0: 

792 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

793 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

794 return None 

795 elif replica: 

796 node_idx = 1 

797 else: 

798 # primary 

799 node_idx = 0 

800 

801 return slot_cache[node_idx] 

802 

803 def get_default_node(self): 

804 """ 

805 Get the cluster's default node 

806 """ 

807 return self.nodes_manager.default_node 

808 

809 def set_default_node(self, node): 

810 """ 

811 Set the default node of the cluster. 

812 :param node: 'ClusterNode' 

813 :return True if the default node was set, else False 

814 """ 

815 if node is None or self.get_node(node_name=node.name) is None: 

816 return False 

817 self.nodes_manager.default_node = node 

818 return True 

819 

820 def set_retry(self, retry: Retry) -> None: 

821 self.retry = retry 

822 

823 def monitor(self, target_node=None): 

824 """ 

825 Returns a Monitor object for the specified target node. 

826 The default cluster node will be selected if no target node was 

827 specified. 

828 Monitor is useful for handling the MONITOR command to the redis server. 

829 next_command() method returns one command from monitor 

830 listen() method yields commands from monitor. 

831 """ 

832 if target_node is None: 

833 target_node = self.get_default_node() 

834 if target_node.redis_connection is None: 

835 raise RedisClusterException( 

836 f"Cluster Node {target_node.name} has no redis_connection" 

837 ) 

838 return target_node.redis_connection.monitor() 

839 

840 def pubsub(self, node=None, host=None, port=None, **kwargs): 

841 """ 

842 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

843 connected to the specified node 

844 """ 

845 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

846 

847 def pipeline(self, transaction=None, shard_hint=None): 

848 """ 

849 Cluster impl: 

850 Pipelines do not work in cluster mode the same way they 

851 do in normal mode. Create a clone of this object so 

852 that simulating pipelines will work correctly. Each 

853 command will be called directly when used and 

854 when calling execute() will only return the result stack. 

855 """ 

856 if shard_hint: 

857 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

858 

859 return ClusterPipeline( 

860 nodes_manager=self.nodes_manager, 

861 commands_parser=self.commands_parser, 

862 startup_nodes=self.nodes_manager.startup_nodes, 

863 result_callbacks=self.result_callbacks, 

864 cluster_response_callbacks=self.cluster_response_callbacks, 

865 read_from_replicas=self.read_from_replicas, 

866 load_balancing_strategy=self.load_balancing_strategy, 

867 reinitialize_steps=self.reinitialize_steps, 

868 retry=self.retry, 

869 lock=self._lock, 

870 transaction=transaction, 

871 ) 

872 

873 def lock( 

874 self, 

875 name, 

876 timeout=None, 

877 sleep=0.1, 

878 blocking=True, 

879 blocking_timeout=None, 

880 lock_class=None, 

881 thread_local=True, 

882 raise_on_release_error: bool = True, 

883 ): 

884 """ 

885 Return a new Lock object using key ``name`` that mimics 

886 the behavior of threading.Lock. 

887 

888 If specified, ``timeout`` indicates a maximum life for the lock. 

889 By default, it will remain locked until release() is called. 

890 

891 ``sleep`` indicates the amount of time to sleep per loop iteration 

892 when the lock is in blocking mode and another client is currently 

893 holding the lock. 

894 

895 ``blocking`` indicates whether calling ``acquire`` should block until 

896 the lock has been acquired or to fail immediately, causing ``acquire`` 

897 to return False and the lock not being acquired. Defaults to True. 

898 Note this value can be overridden by passing a ``blocking`` 

899 argument to ``acquire``. 

900 

901 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

902 spend trying to acquire the lock. A value of ``None`` indicates 

903 continue trying forever. ``blocking_timeout`` can be specified as a 

904 float or integer, both representing the number of seconds to wait. 

905 

906 ``lock_class`` forces the specified lock implementation. Note that as 

907 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

908 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

909 you have created your own custom lock class. 

910 

911 ``thread_local`` indicates whether the lock token is placed in 

912 thread-local storage. By default, the token is placed in thread local 

913 storage so that a thread only sees its token, not a token set by 

914 another thread. Consider the following timeline: 

915 

916 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

917 thread-1 sets the token to "abc" 

918 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

919 Lock instance. 

920 time: 5, thread-1 has not yet completed. redis expires the lock 

921 key. 

922 time: 5, thread-2 acquired `my-lock` now that it's available. 

923 thread-2 sets the token to "xyz" 

924 time: 6, thread-1 finishes its work and calls release(). if the 

925 token is *not* stored in thread local storage, then 

926 thread-1 would see the token value as "xyz" and would be 

927 able to successfully release the thread-2's lock. 

928 

929 ``raise_on_release_error`` indicates whether to raise an exception when 

930 the lock is no longer owned when exiting the context manager. By default, 

931 this is True, meaning an exception will be raised. If False, the warning 

932 will be logged and the exception will be suppressed. 

933 

934 In some use cases it's necessary to disable thread local storage. For 

935 example, if you have code where one thread acquires a lock and passes 

936 that lock instance to a worker thread to release later. If thread 

937 local storage isn't disabled in this case, the worker thread won't see 

938 the token set by the thread that acquired the lock. Our assumption 

939 is that these cases aren't common and as such default to using 

940 thread local storage.""" 

941 if lock_class is None: 

942 lock_class = Lock 

943 return lock_class( 

944 self, 

945 name, 

946 timeout=timeout, 

947 sleep=sleep, 

948 blocking=blocking, 

949 blocking_timeout=blocking_timeout, 

950 thread_local=thread_local, 

951 raise_on_release_error=raise_on_release_error, 

952 ) 

953 

954 def set_response_callback(self, command, callback): 

955 """Set a custom Response Callback""" 

956 self.cluster_response_callbacks[command] = callback 

957 

958 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: 

959 # Determine which nodes should be executed the command on. 

960 # Returns a list of target nodes. 

961 command = args[0].upper() 

962 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

963 command = f"{args[0]} {args[1]}".upper() 

964 

965 nodes_flag = kwargs.pop("nodes_flag", None) 

966 if nodes_flag is not None: 

967 # nodes flag passed by the user 

968 command_flag = nodes_flag 

969 else: 

970 # get the nodes group for this command if it was predefined 

971 command_flag = self.command_flags.get(command) 

972 if command_flag == self.__class__.RANDOM: 

973 # return a random node 

974 return [self.get_random_node()] 

975 elif command_flag == self.__class__.PRIMARIES: 

976 # return all primaries 

977 return self.get_primaries() 

978 elif command_flag == self.__class__.REPLICAS: 

979 # return all replicas 

980 return self.get_replicas() 

981 elif command_flag == self.__class__.ALL_NODES: 

982 # return all nodes 

983 return self.get_nodes() 

984 elif command_flag == self.__class__.DEFAULT_NODE: 

985 # return the cluster's default node 

986 return [self.nodes_manager.default_node] 

987 elif command in self.__class__.SEARCH_COMMANDS[0]: 

988 return [self.nodes_manager.default_node] 

989 else: 

990 # get the node that holds the key's slot 

991 slot = self.determine_slot(*args) 

992 node = self.nodes_manager.get_node_from_slot( 

993 slot, 

994 self.read_from_replicas and command in READ_COMMANDS, 

995 self.load_balancing_strategy if command in READ_COMMANDS else None, 

996 ) 

997 return [node] 

998 

999 def _should_reinitialized(self): 

1000 # To reinitialize the cluster on every MOVED error, 

1001 # set reinitialize_steps to 1. 

1002 # To avoid reinitializing the cluster on moved errors, set 

1003 # reinitialize_steps to 0. 

1004 if self.reinitialize_steps == 0: 

1005 return False 

1006 else: 

1007 return self.reinitialize_counter % self.reinitialize_steps == 0 

1008 

1009 def keyslot(self, key): 

1010 """ 

1011 Calculate keyslot for a given key. 

1012 See Keys distribution model in https://redis.io/topics/cluster-spec 

1013 """ 

1014 k = self.encoder.encode(key) 

1015 return key_slot(k) 

1016 

1017 def _get_command_keys(self, *args): 

1018 """ 

1019 Get the keys in the command. If the command has no keys in in, None is 

1020 returned. 

1021 

1022 NOTE: Due to a bug in redis<7.0, this function does not work properly 

1023 for EVAL or EVALSHA when the `numkeys` arg is 0. 

1024 - issue: https://github.com/redis/redis/issues/9493 

1025 - fix: https://github.com/redis/redis/pull/9733 

1026 

1027 So, don't use this function with EVAL or EVALSHA. 

1028 """ 

1029 redis_conn = self.get_default_node().redis_connection 

1030 return self.commands_parser.get_keys(redis_conn, *args) 

1031 

1032 def determine_slot(self, *args) -> int: 

1033 """ 

1034 Figure out what slot to use based on args. 

1035 

1036 Raises a RedisClusterException if there's a missing key and we can't 

1037 determine what slots to map the command to; or, if the keys don't 

1038 all map to the same key slot. 

1039 """ 

1040 command = args[0] 

1041 if self.command_flags.get(command) == SLOT_ID: 

1042 # The command contains the slot ID 

1043 return args[1] 

1044 

1045 # Get the keys in the command 

1046 

1047 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

1048 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

1049 # where `self._get_command_keys()` fails anyway. So, we special case 

1050 # EVAL/EVALSHA. 

1051 if command.upper() in ("EVAL", "EVALSHA"): 

1052 # command syntax: EVAL "script body" num_keys ... 

1053 if len(args) <= 2: 

1054 raise RedisClusterException(f"Invalid args in command: {args}") 

1055 num_actual_keys = int(args[2]) 

1056 eval_keys = args[3 : 3 + num_actual_keys] 

1057 # if there are 0 keys, that means the script can be run on any node 

1058 # so we can just return a random slot 

1059 if len(eval_keys) == 0: 

1060 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1061 keys = eval_keys 

1062 else: 

1063 keys = self._get_command_keys(*args) 

1064 if keys is None or len(keys) == 0: 

1065 # FCALL can call a function with 0 keys, that means the function 

1066 # can be run on any node so we can just return a random slot 

1067 if command.upper() in ("FCALL", "FCALL_RO"): 

1068 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1069 raise RedisClusterException( 

1070 "No way to dispatch this command to Redis Cluster. " 

1071 "Missing key.\nYou can execute the command by specifying " 

1072 f"target nodes.\nCommand: {args}" 

1073 ) 

1074 

1075 # single key command 

1076 if len(keys) == 1: 

1077 return self.keyslot(keys[0]) 

1078 

1079 # multi-key command; we need to make sure all keys are mapped to 

1080 # the same slot 

1081 slots = {self.keyslot(key) for key in keys} 

1082 if len(slots) != 1: 

1083 raise RedisClusterException( 

1084 f"{command} - all keys must map to the same key slot" 

1085 ) 

1086 

1087 return slots.pop() 

1088 

1089 def get_encoder(self): 

1090 """ 

1091 Get the connections' encoder 

1092 """ 

1093 return self.encoder 

1094 

1095 def get_connection_kwargs(self): 

1096 """ 

1097 Get the connections' key-word arguments 

1098 """ 

1099 return self.nodes_manager.connection_kwargs 

1100 

1101 def _is_nodes_flag(self, target_nodes): 

1102 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1103 

1104 def _parse_target_nodes(self, target_nodes): 

1105 if isinstance(target_nodes, list): 

1106 nodes = target_nodes 

1107 elif isinstance(target_nodes, ClusterNode): 

1108 # Supports passing a single ClusterNode as a variable 

1109 nodes = [target_nodes] 

1110 elif isinstance(target_nodes, dict): 

1111 # Supports dictionaries of the format {node_name: node}. 

1112 # It enables to execute commands with multi nodes as follows: 

1113 # rc.cluster_save_config(rc.get_primaries()) 

1114 nodes = target_nodes.values() 

1115 else: 

1116 raise TypeError( 

1117 "target_nodes type can be one of the following: " 

1118 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1119 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1120 f"The passed type is {type(target_nodes)}" 

1121 ) 

1122 return nodes 

1123 

1124 def execute_command(self, *args, **kwargs): 

1125 return self._internal_execute_command(*args, **kwargs) 

1126 

1127 def _internal_execute_command(self, *args, **kwargs): 

1128 """ 

1129 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1130 

1131 It will try the number of times specified by the retries property from 

1132 config option "self.retry" which defaults to 3 unless manually 

1133 configured. 

1134 

1135 If it reaches the number of times, the command will raise the exception 

1136 

1137 Key argument :target_nodes: can be passed with the following types: 

1138 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1139 ClusterNode 

1140 list<ClusterNode> 

1141 dict<Any, ClusterNode> 

1142 """ 

1143 target_nodes_specified = False 

1144 is_default_node = False 

1145 target_nodes = None 

1146 passed_targets = kwargs.pop("target_nodes", None) 

1147 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1148 target_nodes = self._parse_target_nodes(passed_targets) 

1149 target_nodes_specified = True 

1150 # If an error that allows retrying was thrown, the nodes and slots 

1151 # cache were reinitialized. We will retry executing the command with 

1152 # the updated cluster setup only when the target nodes can be 

1153 # determined again with the new cache tables. Therefore, when target 

1154 # nodes were passed to this function, we cannot retry the command 

1155 # execution since the nodes may not be valid anymore after the tables 

1156 # were reinitialized. So in case of passed target nodes, 

1157 # retry_attempts will be set to 0. 

1158 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries() 

1159 # Add one for the first execution 

1160 execute_attempts = 1 + retry_attempts 

1161 for _ in range(execute_attempts): 

1162 try: 

1163 res = {} 

1164 if not target_nodes_specified: 

1165 # Determine the nodes to execute the command on 

1166 target_nodes = self._determine_nodes( 

1167 *args, **kwargs, nodes_flag=passed_targets 

1168 ) 

1169 if not target_nodes: 

1170 raise RedisClusterException( 

1171 f"No targets were found to execute {args} command on" 

1172 ) 

1173 if ( 

1174 len(target_nodes) == 1 

1175 and target_nodes[0] == self.get_default_node() 

1176 ): 

1177 is_default_node = True 

1178 for node in target_nodes: 

1179 res[node.name] = self._execute_command(node, *args, **kwargs) 

1180 # Return the processed result 

1181 return self._process_result(args[0], res, **kwargs) 

1182 except Exception as e: 

1183 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1184 if is_default_node: 

1185 # Replace the default cluster node 

1186 self.replace_default_node() 

1187 # The nodes and slots cache were reinitialized. 

1188 # Try again with the new cluster setup. 

1189 retry_attempts -= 1 

1190 continue 

1191 else: 

1192 # raise the exception 

1193 raise e 

1194 

1195 def _execute_command(self, target_node, *args, **kwargs): 

1196 """ 

1197 Send a command to a node in the cluster 

1198 """ 

1199 command = args[0] 

1200 redis_node = None 

1201 connection = None 

1202 redirect_addr = None 

1203 asking = False 

1204 moved = False 

1205 ttl = int(self.RedisClusterRequestTTL) 

1206 

1207 while ttl > 0: 

1208 ttl -= 1 

1209 try: 

1210 if asking: 

1211 target_node = self.get_node(node_name=redirect_addr) 

1212 elif moved: 

1213 # MOVED occurred and the slots cache was updated, 

1214 # refresh the target node 

1215 slot = self.determine_slot(*args) 

1216 target_node = self.nodes_manager.get_node_from_slot( 

1217 slot, 

1218 self.read_from_replicas and command in READ_COMMANDS, 

1219 self.load_balancing_strategy 

1220 if command in READ_COMMANDS 

1221 else None, 

1222 ) 

1223 moved = False 

1224 

1225 redis_node = self.get_redis_connection(target_node) 

1226 connection = get_connection(redis_node) 

1227 if asking: 

1228 connection.send_command("ASKING") 

1229 redis_node.parse_response(connection, "ASKING", **kwargs) 

1230 asking = False 

1231 connection.send_command(*args, **kwargs) 

1232 response = redis_node.parse_response(connection, command, **kwargs) 

1233 

1234 # Remove keys entry, it needs only for cache. 

1235 kwargs.pop("keys", None) 

1236 

1237 if command in self.cluster_response_callbacks: 

1238 response = self.cluster_response_callbacks[command]( 

1239 response, **kwargs 

1240 ) 

1241 return response 

1242 except AuthenticationError: 

1243 raise 

1244 except MaxConnectionsError: 

1245 # MaxConnectionsError indicates client-side resource exhaustion 

1246 # (too many connections in the pool), not a node failure. 

1247 # Don't treat this as a node failure - just re-raise the error 

1248 # without reinitializing the cluster. 

1249 raise 

1250 except (ConnectionError, TimeoutError) as e: 

1251 # ConnectionError can also be raised if we couldn't get a 

1252 # connection from the pool before timing out, so check that 

1253 # this is an actual connection before attempting to disconnect. 

1254 if connection is not None: 

1255 connection.disconnect() 

1256 

1257 # Remove the failed node from the startup nodes before we try 

1258 # to reinitialize the cluster 

1259 self.nodes_manager.startup_nodes.pop(target_node.name, None) 

1260 # Reset the cluster node's connection 

1261 target_node.redis_connection = None 

1262 self.nodes_manager.initialize() 

1263 raise e 

1264 except MovedError as e: 

1265 # First, we will try to patch the slots/nodes cache with the 

1266 # redirected node output and try again. If MovedError exceeds 

1267 # 'reinitialize_steps' number of times, we will force 

1268 # reinitializing the tables, and then try again. 

1269 # 'reinitialize_steps' counter will increase faster when 

1270 # the same client object is shared between multiple threads. To 

1271 # reduce the frequency you can set this variable in the 

1272 # RedisCluster constructor. 

1273 self.reinitialize_counter += 1 

1274 if self._should_reinitialized(): 

1275 self.nodes_manager.initialize() 

1276 # Reset the counter 

1277 self.reinitialize_counter = 0 

1278 else: 

1279 self.nodes_manager.update_moved_exception(e) 

1280 moved = True 

1281 except TryAgainError: 

1282 if ttl < self.RedisClusterRequestTTL / 2: 

1283 time.sleep(0.05) 

1284 except AskError as e: 

1285 redirect_addr = get_node_name(host=e.host, port=e.port) 

1286 asking = True 

1287 except (ClusterDownError, SlotNotCoveredError): 

1288 # ClusterDownError can occur during a failover and to get 

1289 # self-healed, we will try to reinitialize the cluster layout 

1290 # and retry executing the command 

1291 

1292 # SlotNotCoveredError can occur when the cluster is not fully 

1293 # initialized or can be temporary issue. 

1294 # We will try to reinitialize the cluster topology 

1295 # and retry executing the command 

1296 

1297 time.sleep(0.25) 

1298 self.nodes_manager.initialize() 

1299 raise 

1300 except ResponseError: 

1301 raise 

1302 except Exception as e: 

1303 if connection: 

1304 connection.disconnect() 

1305 raise e 

1306 finally: 

1307 if connection is not None: 

1308 redis_node.connection_pool.release(connection) 

1309 

1310 raise ClusterError("TTL exhausted.") 

1311 

1312 def close(self) -> None: 

1313 try: 

1314 with self._lock: 

1315 if self.nodes_manager: 

1316 self.nodes_manager.close() 

1317 except AttributeError: 

1318 # RedisCluster's __init__ can fail before nodes_manager is set 

1319 pass 

1320 

1321 def _process_result(self, command, res, **kwargs): 

1322 """ 

1323 Process the result of the executed command. 

1324 The function would return a dict or a single value. 

1325 

1326 :type command: str 

1327 :type res: dict 

1328 

1329 `res` should be in the following format: 

1330 Dict<node_name, command_result> 

1331 """ 

1332 if command in self.result_callbacks: 

1333 return self.result_callbacks[command](command, res, **kwargs) 

1334 elif len(res) == 1: 

1335 # When we execute the command on a single node, we can 

1336 # remove the dictionary and return a single response 

1337 return list(res.values())[0] 

1338 else: 

1339 return res 

1340 

1341 def load_external_module(self, funcname, func): 

1342 """ 

1343 This function can be used to add externally defined redis modules, 

1344 and their namespaces to the redis client. 

1345 

1346 ``funcname`` - A string containing the name of the function to create 

1347 ``func`` - The function, being added to this class. 

1348 """ 

1349 setattr(self, funcname, func) 

1350 

1351 def transaction(self, func, *watches, **kwargs): 

1352 """ 

1353 Convenience method for executing the callable `func` as a transaction 

1354 while watching all keys specified in `watches`. The 'func' callable 

1355 should expect a single argument which is a Pipeline object. 

1356 """ 

1357 shard_hint = kwargs.pop("shard_hint", None) 

1358 value_from_callable = kwargs.pop("value_from_callable", False) 

1359 watch_delay = kwargs.pop("watch_delay", None) 

1360 with self.pipeline(True, shard_hint) as pipe: 

1361 while True: 

1362 try: 

1363 if watches: 

1364 pipe.watch(*watches) 

1365 func_value = func(pipe) 

1366 exec_value = pipe.execute() 

1367 return func_value if value_from_callable else exec_value 

1368 except WatchError: 

1369 if watch_delay is not None and watch_delay > 0: 

1370 time.sleep(watch_delay) 

1371 continue 

1372 

1373 

1374class ClusterNode: 

1375 def __init__(self, host, port, server_type=None, redis_connection=None): 

1376 if host == "localhost": 

1377 host = socket.gethostbyname(host) 

1378 

1379 self.host = host 

1380 self.port = port 

1381 self.name = get_node_name(host, port) 

1382 self.server_type = server_type 

1383 self.redis_connection = redis_connection 

1384 

1385 def __repr__(self): 

1386 return ( 

1387 f"[host={self.host}," 

1388 f"port={self.port}," 

1389 f"name={self.name}," 

1390 f"server_type={self.server_type}," 

1391 f"redis_connection={self.redis_connection}]" 

1392 ) 

1393 

1394 def __eq__(self, obj): 

1395 return isinstance(obj, ClusterNode) and obj.name == self.name 

1396 

1397 def __del__(self): 

1398 try: 

1399 if self.redis_connection is not None: 

1400 self.redis_connection.close() 

1401 except Exception: 

1402 # Ignore errors when closing the connection 

1403 pass 

1404 

1405 

1406class LoadBalancingStrategy(Enum): 

1407 ROUND_ROBIN = "round_robin" 

1408 ROUND_ROBIN_REPLICAS = "round_robin_replicas" 

1409 RANDOM_REPLICA = "random_replica" 

1410 

1411 

1412class LoadBalancer: 

1413 """ 

1414 Round-Robin Load Balancing 

1415 """ 

1416 

1417 def __init__(self, start_index: int = 0) -> None: 

1418 self.primary_to_idx = {} 

1419 self.start_index = start_index 

1420 

1421 def get_server_index( 

1422 self, 

1423 primary: str, 

1424 list_size: int, 

1425 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN, 

1426 ) -> int: 

1427 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA: 

1428 return self._get_random_replica_index(list_size) 

1429 else: 

1430 return self._get_round_robin_index( 

1431 primary, 

1432 list_size, 

1433 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, 

1434 ) 

1435 

1436 def reset(self) -> None: 

1437 self.primary_to_idx.clear() 

1438 

1439 def _get_random_replica_index(self, list_size: int) -> int: 

1440 return random.randint(1, list_size - 1) 

1441 

1442 def _get_round_robin_index( 

1443 self, primary: str, list_size: int, replicas_only: bool 

1444 ) -> int: 

1445 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

1446 if replicas_only and server_index == 0: 

1447 # skip the primary node index 

1448 server_index = 1 

1449 # Update the index for the next round 

1450 self.primary_to_idx[primary] = (server_index + 1) % list_size 

1451 return server_index 

1452 

1453 

1454class NodesManager: 

1455 def __init__( 

1456 self, 

1457 startup_nodes, 

1458 from_url=False, 

1459 require_full_coverage=False, 

1460 lock=None, 

1461 dynamic_startup_nodes=True, 

1462 connection_pool_class=ConnectionPool, 

1463 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

1464 cache: Optional[CacheInterface] = None, 

1465 cache_config: Optional[CacheConfig] = None, 

1466 cache_factory: Optional[CacheFactoryInterface] = None, 

1467 event_dispatcher: Optional[EventDispatcher] = None, 

1468 **kwargs, 

1469 ): 

1470 self.nodes_cache: Dict[str, Redis] = {} 

1471 self.slots_cache = {} 

1472 self.startup_nodes = {} 

1473 self.default_node = None 

1474 self.populate_startup_nodes(startup_nodes) 

1475 self.from_url = from_url 

1476 self._require_full_coverage = require_full_coverage 

1477 self._dynamic_startup_nodes = dynamic_startup_nodes 

1478 self.connection_pool_class = connection_pool_class 

1479 self.address_remap = address_remap 

1480 self._cache = cache 

1481 self._cache_config = cache_config 

1482 self._cache_factory = cache_factory 

1483 self._moved_exception = None 

1484 self.connection_kwargs = kwargs 

1485 self.read_load_balancer = LoadBalancer() 

1486 if lock is None: 

1487 lock = threading.RLock() 

1488 self._lock = lock 

1489 if event_dispatcher is None: 

1490 self._event_dispatcher = EventDispatcher() 

1491 else: 

1492 self._event_dispatcher = event_dispatcher 

1493 self._credential_provider = self.connection_kwargs.get( 

1494 "credential_provider", None 

1495 ) 

1496 self.initialize() 

1497 

1498 def get_node(self, host=None, port=None, node_name=None): 

1499 """ 

1500 Get the requested node from the cluster's nodes. 

1501 nodes. 

1502 :return: ClusterNode if the node exists, else None 

1503 """ 

1504 if host and port: 

1505 # the user passed host and port 

1506 if host == "localhost": 

1507 host = socket.gethostbyname(host) 

1508 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1509 elif node_name: 

1510 return self.nodes_cache.get(node_name) 

1511 else: 

1512 return None 

1513 

1514 def update_moved_exception(self, exception): 

1515 self._moved_exception = exception 

1516 

1517 def _update_moved_slots(self): 

1518 """ 

1519 Update the slot's node with the redirected one 

1520 """ 

1521 e = self._moved_exception 

1522 redirected_node = self.get_node(host=e.host, port=e.port) 

1523 if redirected_node is not None: 

1524 # The node already exists 

1525 if redirected_node.server_type is not PRIMARY: 

1526 # Update the node's server type 

1527 redirected_node.server_type = PRIMARY 

1528 else: 

1529 # This is a new node, we will add it to the nodes cache 

1530 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

1531 self.nodes_cache[redirected_node.name] = redirected_node 

1532 if redirected_node in self.slots_cache[e.slot_id]: 

1533 # The MOVED error resulted from a failover, and the new slot owner 

1534 # had previously been a replica. 

1535 old_primary = self.slots_cache[e.slot_id][0] 

1536 # Update the old primary to be a replica and add it to the end of 

1537 # the slot's node list 

1538 old_primary.server_type = REPLICA 

1539 self.slots_cache[e.slot_id].append(old_primary) 

1540 # Remove the old replica, which is now a primary, from the slot's 

1541 # node list 

1542 self.slots_cache[e.slot_id].remove(redirected_node) 

1543 # Override the old primary with the new one 

1544 self.slots_cache[e.slot_id][0] = redirected_node 

1545 if self.default_node == old_primary: 

1546 # Update the default node with the new primary 

1547 self.default_node = redirected_node 

1548 else: 

1549 # The new slot owner is a new server, or a server from a different 

1550 # shard. We need to remove all current nodes from the slot's list 

1551 # (including replications) and add just the new node. 

1552 self.slots_cache[e.slot_id] = [redirected_node] 

1553 # Reset moved_exception 

1554 self._moved_exception = None 

1555 

1556 @deprecated_args( 

1557 args_to_warn=["server_type"], 

1558 reason=( 

1559 "In case you need select some load balancing strategy " 

1560 "that will use replicas, please set it through 'load_balancing_strategy'" 

1561 ), 

1562 version="5.3.0", 

1563 ) 

1564 def get_node_from_slot( 

1565 self, 

1566 slot, 

1567 read_from_replicas=False, 

1568 load_balancing_strategy=None, 

1569 server_type=None, 

1570 ) -> ClusterNode: 

1571 """ 

1572 Gets a node that servers this hash slot 

1573 """ 

1574 if self._moved_exception: 

1575 with self._lock: 

1576 if self._moved_exception: 

1577 self._update_moved_slots() 

1578 

1579 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

1580 raise SlotNotCoveredError( 

1581 f'Slot "{slot}" not covered by the cluster. ' 

1582 f'"require_full_coverage={self._require_full_coverage}"' 

1583 ) 

1584 

1585 if read_from_replicas is True and load_balancing_strategy is None: 

1586 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN 

1587 

1588 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: 

1589 # get the server index using the strategy defined in load_balancing_strategy 

1590 primary_name = self.slots_cache[slot][0].name 

1591 node_idx = self.read_load_balancer.get_server_index( 

1592 primary_name, len(self.slots_cache[slot]), load_balancing_strategy 

1593 ) 

1594 elif ( 

1595 server_type is None 

1596 or server_type == PRIMARY 

1597 or len(self.slots_cache[slot]) == 1 

1598 ): 

1599 # return a primary 

1600 node_idx = 0 

1601 else: 

1602 # return a replica 

1603 # randomly choose one of the replicas 

1604 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

1605 

1606 return self.slots_cache[slot][node_idx] 

1607 

1608 def get_nodes_by_server_type(self, server_type): 

1609 """ 

1610 Get all nodes with the specified server type 

1611 :param server_type: 'primary' or 'replica' 

1612 :return: list of ClusterNode 

1613 """ 

1614 return [ 

1615 node 

1616 for node in self.nodes_cache.values() 

1617 if node.server_type == server_type 

1618 ] 

1619 

1620 def populate_startup_nodes(self, nodes): 

1621 """ 

1622 Populate all startup nodes and filters out any duplicates 

1623 """ 

1624 for n in nodes: 

1625 self.startup_nodes[n.name] = n 

1626 

1627 def check_slots_coverage(self, slots_cache): 

1628 # Validate if all slots are covered or if we should try next 

1629 # startup node 

1630 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

1631 if i not in slots_cache: 

1632 return False 

1633 return True 

1634 

1635 def create_redis_connections(self, nodes): 

1636 """ 

1637 This function will create a redis connection to all nodes in :nodes: 

1638 """ 

1639 connection_pools = [] 

1640 for node in nodes: 

1641 if node.redis_connection is None: 

1642 node.redis_connection = self.create_redis_node( 

1643 host=node.host, port=node.port, **self.connection_kwargs 

1644 ) 

1645 connection_pools.append(node.redis_connection.connection_pool) 

1646 

1647 self._event_dispatcher.dispatch( 

1648 AfterPooledConnectionsInstantiationEvent( 

1649 connection_pools, ClientType.SYNC, self._credential_provider 

1650 ) 

1651 ) 

1652 

1653 def create_redis_node(self, host, port, **kwargs): 

1654 # We are configuring the connection pool not to retry 

1655 # connections on lower level clients to avoid retrying 

1656 # connections to nodes that are not reachable 

1657 # and to avoid blocking the connection pool. 

1658 # The only error that will have some handling in the lower 

1659 # level clients is ConnectionError which will trigger disconnection 

1660 # of the socket. 

1661 # The retries will be handled on cluster client level 

1662 # where we will have proper handling of the cluster topology 

1663 node_retry_config = Retry( 

1664 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) 

1665 ) 

1666 

1667 protocol = kwargs.get("protocol", None) 

1668 if protocol in [3, "3"]: 

1669 kwargs.update( 

1670 {"maint_notifications_config": MaintNotificationsConfig(enabled=False)} 

1671 ) 

1672 if self.from_url: 

1673 # Create a redis node with a costumed connection pool 

1674 kwargs.update({"host": host}) 

1675 kwargs.update({"port": port}) 

1676 kwargs.update({"cache": self._cache}) 

1677 kwargs.update({"retry": node_retry_config}) 

1678 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

1679 else: 

1680 r = Redis( 

1681 host=host, 

1682 port=port, 

1683 cache=self._cache, 

1684 retry=node_retry_config, 

1685 **kwargs, 

1686 ) 

1687 return r 

1688 

1689 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

1690 node_name = get_node_name(host, port) 

1691 # check if we already have this node in the tmp_nodes_cache 

1692 target_node = tmp_nodes_cache.get(node_name) 

1693 if target_node is None: 

1694 # before creating a new cluster node, check if the cluster node already 

1695 # exists in the current nodes cache and has a valid connection so we can 

1696 # reuse it 

1697 target_node = self.nodes_cache.get(node_name) 

1698 if target_node is None or target_node.redis_connection is None: 

1699 # create new cluster node for this cluster 

1700 target_node = ClusterNode(host, port, role) 

1701 if target_node.server_type != role: 

1702 target_node.server_type = role 

1703 # add this node to the nodes cache 

1704 tmp_nodes_cache[target_node.name] = target_node 

1705 

1706 return target_node 

1707 

1708 def initialize(self): 

1709 """ 

1710 Initializes the nodes cache, slots cache and redis connections. 

1711 :startup_nodes: 

1712 Responsible for discovering other nodes in the cluster 

1713 """ 

1714 self.reset() 

1715 tmp_nodes_cache = {} 

1716 tmp_slots = {} 

1717 disagreements = [] 

1718 startup_nodes_reachable = False 

1719 fully_covered = False 

1720 kwargs = self.connection_kwargs 

1721 exception = None 

1722 # Convert to tuple to prevent RuntimeError if self.startup_nodes 

1723 # is modified during iteration 

1724 for startup_node in tuple(self.startup_nodes.values()): 

1725 try: 

1726 if startup_node.redis_connection: 

1727 r = startup_node.redis_connection 

1728 else: 

1729 # Create a new Redis connection 

1730 r = self.create_redis_node( 

1731 startup_node.host, startup_node.port, **kwargs 

1732 ) 

1733 self.startup_nodes[startup_node.name].redis_connection = r 

1734 # Make sure cluster mode is enabled on this node 

1735 try: 

1736 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

1737 r.connection_pool.disconnect() 

1738 except ResponseError: 

1739 raise RedisClusterException( 

1740 "Cluster mode is not enabled on this node" 

1741 ) 

1742 startup_nodes_reachable = True 

1743 except Exception as e: 

1744 # Try the next startup node. 

1745 # The exception is saved and raised only if we have no more nodes. 

1746 exception = e 

1747 continue 

1748 

1749 # CLUSTER SLOTS command results in the following output: 

1750 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

1751 # where each node contains the following list: [IP, port, node_id] 

1752 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

1753 # primary node of the first slot section. 

1754 # If there's only one server in the cluster, its ``host`` is '' 

1755 # Fix it to the host in startup_nodes 

1756 if ( 

1757 len(cluster_slots) == 1 

1758 and len(cluster_slots[0][2][0]) == 0 

1759 and len(self.startup_nodes) == 1 

1760 ): 

1761 cluster_slots[0][2][0] = startup_node.host 

1762 

1763 for slot in cluster_slots: 

1764 primary_node = slot[2] 

1765 host = str_if_bytes(primary_node[0]) 

1766 if host == "": 

1767 host = startup_node.host 

1768 port = int(primary_node[1]) 

1769 host, port = self.remap_host_port(host, port) 

1770 

1771 nodes_for_slot = [] 

1772 

1773 target_node = self._get_or_create_cluster_node( 

1774 host, port, PRIMARY, tmp_nodes_cache 

1775 ) 

1776 nodes_for_slot.append(target_node) 

1777 

1778 replica_nodes = slot[3:] 

1779 for replica_node in replica_nodes: 

1780 host = str_if_bytes(replica_node[0]) 

1781 port = int(replica_node[1]) 

1782 host, port = self.remap_host_port(host, port) 

1783 target_replica_node = self._get_or_create_cluster_node( 

1784 host, port, REPLICA, tmp_nodes_cache 

1785 ) 

1786 nodes_for_slot.append(target_replica_node) 

1787 

1788 for i in range(int(slot[0]), int(slot[1]) + 1): 

1789 if i not in tmp_slots: 

1790 tmp_slots[i] = nodes_for_slot 

1791 else: 

1792 # Validate that 2 nodes want to use the same slot cache 

1793 # setup 

1794 tmp_slot = tmp_slots[i][0] 

1795 if tmp_slot.name != target_node.name: 

1796 disagreements.append( 

1797 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

1798 ) 

1799 

1800 if len(disagreements) > 5: 

1801 raise RedisClusterException( 

1802 f"startup_nodes could not agree on a valid " 

1803 f"slots cache: {', '.join(disagreements)}" 

1804 ) 

1805 

1806 fully_covered = self.check_slots_coverage(tmp_slots) 

1807 if fully_covered: 

1808 # Don't need to continue to the next startup node if all 

1809 # slots are covered 

1810 break 

1811 

1812 if not startup_nodes_reachable: 

1813 raise RedisClusterException( 

1814 f"Redis Cluster cannot be connected. Please provide at least " 

1815 f"one reachable node: {str(exception)}" 

1816 ) from exception 

1817 

1818 if self._cache is None and self._cache_config is not None: 

1819 if self._cache_factory is None: 

1820 self._cache = CacheFactory(self._cache_config).get_cache() 

1821 else: 

1822 self._cache = self._cache_factory.get_cache() 

1823 

1824 # Create Redis connections to all nodes 

1825 self.create_redis_connections(list(tmp_nodes_cache.values())) 

1826 

1827 # Check if the slots are not fully covered 

1828 if not fully_covered and self._require_full_coverage: 

1829 # Despite the requirement that the slots be covered, there 

1830 # isn't a full coverage 

1831 raise RedisClusterException( 

1832 f"All slots are not covered after query all startup_nodes. " 

1833 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

1834 f"covered..." 

1835 ) 

1836 

1837 # Set the tmp variables to the real variables 

1838 self.nodes_cache = tmp_nodes_cache 

1839 self.slots_cache = tmp_slots 

1840 # Set the default node 

1841 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

1842 if self._dynamic_startup_nodes: 

1843 # Populate the startup nodes with all discovered nodes 

1844 self.startup_nodes = tmp_nodes_cache 

1845 # If initialize was called after a MovedError, clear it 

1846 self._moved_exception = None 

1847 

1848 def close(self) -> None: 

1849 self.default_node = None 

1850 for node in self.nodes_cache.values(): 

1851 if node.redis_connection: 

1852 node.redis_connection.close() 

1853 

1854 def reset(self): 

1855 try: 

1856 self.read_load_balancer.reset() 

1857 except TypeError: 

1858 # The read_load_balancer is None, do nothing 

1859 pass 

1860 

1861 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

1862 """ 

1863 Remap the host and port returned from the cluster to a different 

1864 internal value. Useful if the client is not connecting directly 

1865 to the cluster. 

1866 """ 

1867 if self.address_remap: 

1868 return self.address_remap((host, port)) 

1869 return host, port 

1870 

1871 def find_connection_owner(self, connection: Connection) -> Optional[Redis]: 

1872 node_name = get_node_name(connection.host, connection.port) 

1873 for node in tuple(self.nodes_cache.values()): 

1874 if node.redis_connection: 

1875 conn_args = node.redis_connection.connection_pool.connection_kwargs 

1876 if node_name == get_node_name( 

1877 conn_args.get("host"), conn_args.get("port") 

1878 ): 

1879 return node 

1880 

1881 

1882class ClusterPubSub(PubSub): 

1883 """ 

1884 Wrapper for PubSub class. 

1885 

1886 IMPORTANT: before using ClusterPubSub, read about the known limitations 

1887 with pubsub in Cluster mode and learn how to workaround them: 

1888 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html 

1889 """ 

1890 

1891 def __init__( 

1892 self, 

1893 redis_cluster, 

1894 node=None, 

1895 host=None, 

1896 port=None, 

1897 push_handler_func=None, 

1898 event_dispatcher: Optional["EventDispatcher"] = None, 

1899 **kwargs, 

1900 ): 

1901 """ 

1902 When a pubsub instance is created without specifying a node, a single 

1903 node will be transparently chosen for the pubsub connection on the 

1904 first command execution. The node will be determined by: 

1905 1. Hashing the channel name in the request to find its keyslot 

1906 2. Selecting a node that handles the keyslot: If read_from_replicas is 

1907 set to true or load_balancing_strategy is set, a replica can be selected. 

1908 

1909 :type redis_cluster: RedisCluster 

1910 :type node: ClusterNode 

1911 :type host: str 

1912 :type port: int 

1913 """ 

1914 self.node = None 

1915 self.set_pubsub_node(redis_cluster, node, host, port) 

1916 connection_pool = ( 

1917 None 

1918 if self.node is None 

1919 else redis_cluster.get_redis_connection(self.node).connection_pool 

1920 ) 

1921 self.cluster = redis_cluster 

1922 self.node_pubsub_mapping = {} 

1923 self._pubsubs_generator = self._pubsubs_generator() 

1924 if event_dispatcher is None: 

1925 self._event_dispatcher = EventDispatcher() 

1926 else: 

1927 self._event_dispatcher = event_dispatcher 

1928 super().__init__( 

1929 connection_pool=connection_pool, 

1930 encoder=redis_cluster.encoder, 

1931 push_handler_func=push_handler_func, 

1932 event_dispatcher=self._event_dispatcher, 

1933 **kwargs, 

1934 ) 

1935 

1936 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

1937 """ 

1938 The pubsub node will be set according to the passed node, host and port 

1939 When none of the node, host, or port are specified - the node is set 

1940 to None and will be determined by the keyslot of the channel in the 

1941 first command to be executed. 

1942 RedisClusterException will be thrown if the passed node does not exist 

1943 in the cluster. 

1944 If host is passed without port, or vice versa, a DataError will be 

1945 thrown. 

1946 :type cluster: RedisCluster 

1947 :type node: ClusterNode 

1948 :type host: str 

1949 :type port: int 

1950 """ 

1951 if node is not None: 

1952 # node is passed by the user 

1953 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

1954 pubsub_node = node 

1955 elif host is not None and port is not None: 

1956 # host and port passed by the user 

1957 node = cluster.get_node(host=host, port=port) 

1958 self._raise_on_invalid_node(cluster, node, host, port) 

1959 pubsub_node = node 

1960 elif any([host, port]) is True: 

1961 # only 'host' or 'port' passed 

1962 raise DataError("Passing a host requires passing a port, and vice versa") 

1963 else: 

1964 # nothing passed by the user. set node to None 

1965 pubsub_node = None 

1966 

1967 self.node = pubsub_node 

1968 

1969 def get_pubsub_node(self): 

1970 """ 

1971 Get the node that is being used as the pubsub connection 

1972 """ 

1973 return self.node 

1974 

1975 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

1976 """ 

1977 Raise a RedisClusterException if the node is None or doesn't exist in 

1978 the cluster. 

1979 """ 

1980 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

1981 raise RedisClusterException( 

1982 f"Node {host}:{port} doesn't exist in the cluster" 

1983 ) 

1984 

1985 def execute_command(self, *args): 

1986 """ 

1987 Execute a subscribe/unsubscribe command. 

1988 

1989 Taken code from redis-py and tweak to make it work within a cluster. 

1990 """ 

1991 # NOTE: don't parse the response in this function -- it could pull a 

1992 # legitimate message off the stack if the connection is already 

1993 # subscribed to one or more channels 

1994 

1995 if self.connection is None: 

1996 if self.connection_pool is None: 

1997 if len(args) > 1: 

1998 # Hash the first channel and get one of the nodes holding 

1999 # this slot 

2000 channel = args[1] 

2001 slot = self.cluster.keyslot(channel) 

2002 node = self.cluster.nodes_manager.get_node_from_slot( 

2003 slot, 

2004 self.cluster.read_from_replicas, 

2005 self.cluster.load_balancing_strategy, 

2006 ) 

2007 else: 

2008 # Get a random node 

2009 node = self.cluster.get_random_node() 

2010 self.node = node 

2011 redis_connection = self.cluster.get_redis_connection(node) 

2012 self.connection_pool = redis_connection.connection_pool 

2013 self.connection = self.connection_pool.get_connection() 

2014 # register a callback that re-subscribes to any channels we 

2015 # were listening to when we were disconnected 

2016 self.connection.register_connect_callback(self.on_connect) 

2017 if self.push_handler_func is not None: 

2018 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

2019 self._event_dispatcher.dispatch( 

2020 AfterPubSubConnectionInstantiationEvent( 

2021 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

2022 ) 

2023 ) 

2024 connection = self.connection 

2025 self._execute(connection, connection.send_command, *args) 

2026 

2027 def _get_node_pubsub(self, node): 

2028 try: 

2029 return self.node_pubsub_mapping[node.name] 

2030 except KeyError: 

2031 pubsub = node.redis_connection.pubsub( 

2032 push_handler_func=self.push_handler_func 

2033 ) 

2034 self.node_pubsub_mapping[node.name] = pubsub 

2035 return pubsub 

2036 

2037 def _sharded_message_generator(self): 

2038 for _ in range(len(self.node_pubsub_mapping)): 

2039 pubsub = next(self._pubsubs_generator) 

2040 message = pubsub.get_message() 

2041 if message is not None: 

2042 return message 

2043 return None 

2044 

2045 def _pubsubs_generator(self): 

2046 while True: 

2047 yield from self.node_pubsub_mapping.values() 

2048 

2049 def get_sharded_message( 

2050 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None 

2051 ): 

2052 if target_node: 

2053 message = self.node_pubsub_mapping[target_node.name].get_message( 

2054 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout 

2055 ) 

2056 else: 

2057 message = self._sharded_message_generator() 

2058 if message is None: 

2059 return None 

2060 elif str_if_bytes(message["type"]) == "sunsubscribe": 

2061 if message["channel"] in self.pending_unsubscribe_shard_channels: 

2062 self.pending_unsubscribe_shard_channels.remove(message["channel"]) 

2063 self.shard_channels.pop(message["channel"], None) 

2064 node = self.cluster.get_node_from_key(message["channel"]) 

2065 if self.node_pubsub_mapping[node.name].subscribed is False: 

2066 self.node_pubsub_mapping.pop(node.name) 

2067 if not self.channels and not self.patterns and not self.shard_channels: 

2068 # There are no subscriptions anymore, set subscribed_event flag 

2069 # to false 

2070 self.subscribed_event.clear() 

2071 if self.ignore_subscribe_messages or ignore_subscribe_messages: 

2072 return None 

2073 return message 

2074 

2075 def ssubscribe(self, *args, **kwargs): 

2076 if args: 

2077 args = list_or_args(args[0], args[1:]) 

2078 s_channels = dict.fromkeys(args) 

2079 s_channels.update(kwargs) 

2080 for s_channel, handler in s_channels.items(): 

2081 node = self.cluster.get_node_from_key(s_channel) 

2082 pubsub = self._get_node_pubsub(node) 

2083 if handler: 

2084 pubsub.ssubscribe(**{s_channel: handler}) 

2085 else: 

2086 pubsub.ssubscribe(s_channel) 

2087 self.shard_channels.update(pubsub.shard_channels) 

2088 self.pending_unsubscribe_shard_channels.difference_update( 

2089 self._normalize_keys({s_channel: None}) 

2090 ) 

2091 if pubsub.subscribed and not self.subscribed: 

2092 self.subscribed_event.set() 

2093 self.health_check_response_counter = 0 

2094 

2095 def sunsubscribe(self, *args): 

2096 if args: 

2097 args = list_or_args(args[0], args[1:]) 

2098 else: 

2099 args = self.shard_channels 

2100 

2101 for s_channel in args: 

2102 node = self.cluster.get_node_from_key(s_channel) 

2103 p = self._get_node_pubsub(node) 

2104 p.sunsubscribe(s_channel) 

2105 self.pending_unsubscribe_shard_channels.update( 

2106 p.pending_unsubscribe_shard_channels 

2107 ) 

2108 

2109 def get_redis_connection(self): 

2110 """ 

2111 Get the Redis connection of the pubsub connected node. 

2112 """ 

2113 if self.node is not None: 

2114 return self.node.redis_connection 

2115 

2116 def disconnect(self): 

2117 """ 

2118 Disconnect the pubsub connection. 

2119 """ 

2120 if self.connection: 

2121 self.connection.disconnect() 

2122 for pubsub in self.node_pubsub_mapping.values(): 

2123 pubsub.connection.disconnect() 

2124 

2125 

2126class ClusterPipeline(RedisCluster): 

2127 """ 

2128 Support for Redis pipeline 

2129 in cluster mode 

2130 """ 

2131 

2132 ERRORS_ALLOW_RETRY = ( 

2133 ConnectionError, 

2134 TimeoutError, 

2135 MovedError, 

2136 AskError, 

2137 TryAgainError, 

2138 ) 

2139 

2140 NO_SLOTS_COMMANDS = {"UNWATCH"} 

2141 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

2142 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

2143 

2144 @deprecated_args( 

2145 args_to_warn=[ 

2146 "cluster_error_retry_attempts", 

2147 ], 

2148 reason="Please configure the 'retry' object instead", 

2149 version="6.0.0", 

2150 ) 

2151 def __init__( 

2152 self, 

2153 nodes_manager: "NodesManager", 

2154 commands_parser: "CommandsParser", 

2155 result_callbacks: Optional[Dict[str, Callable]] = None, 

2156 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

2157 startup_nodes: Optional[List["ClusterNode"]] = None, 

2158 read_from_replicas: bool = False, 

2159 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

2160 cluster_error_retry_attempts: int = 3, 

2161 reinitialize_steps: int = 5, 

2162 retry: Optional[Retry] = None, 

2163 lock=None, 

2164 transaction=False, 

2165 **kwargs, 

2166 ): 

2167 """ """ 

2168 self.command_stack = [] 

2169 self.nodes_manager = nodes_manager 

2170 self.commands_parser = commands_parser 

2171 self.refresh_table_asap = False 

2172 self.result_callbacks = ( 

2173 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

2174 ) 

2175 self.startup_nodes = startup_nodes if startup_nodes else [] 

2176 self.read_from_replicas = read_from_replicas 

2177 self.load_balancing_strategy = load_balancing_strategy 

2178 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

2179 self.cluster_response_callbacks = cluster_response_callbacks 

2180 self.reinitialize_counter = 0 

2181 self.reinitialize_steps = reinitialize_steps 

2182 if retry is not None: 

2183 self.retry = retry 

2184 else: 

2185 self.retry = Retry( 

2186 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

2187 retries=cluster_error_retry_attempts, 

2188 ) 

2189 

2190 self.encoder = Encoder( 

2191 kwargs.get("encoding", "utf-8"), 

2192 kwargs.get("encoding_errors", "strict"), 

2193 kwargs.get("decode_responses", False), 

2194 ) 

2195 if lock is None: 

2196 lock = threading.RLock() 

2197 self._lock = lock 

2198 self.parent_execute_command = super().execute_command 

2199 self._execution_strategy: ExecutionStrategy = ( 

2200 PipelineStrategy(self) if not transaction else TransactionStrategy(self) 

2201 ) 

2202 

2203 def __repr__(self): 

2204 """ """ 

2205 return f"{type(self).__name__}" 

2206 

2207 def __enter__(self): 

2208 """ """ 

2209 return self 

2210 

2211 def __exit__(self, exc_type, exc_value, traceback): 

2212 """ """ 

2213 self.reset() 

2214 

2215 def __del__(self): 

2216 try: 

2217 self.reset() 

2218 except Exception: 

2219 pass 

2220 

2221 def __len__(self): 

2222 """ """ 

2223 return len(self._execution_strategy.command_queue) 

2224 

2225 def __bool__(self): 

2226 "Pipeline instances should always evaluate to True on Python 3+" 

2227 return True 

2228 

2229 def execute_command(self, *args, **kwargs): 

2230 """ 

2231 Wrapper function for pipeline_execute_command 

2232 """ 

2233 return self._execution_strategy.execute_command(*args, **kwargs) 

2234 

2235 def pipeline_execute_command(self, *args, **options): 

2236 """ 

2237 Stage a command to be executed when execute() is next called 

2238 

2239 Returns the current Pipeline object back so commands can be 

2240 chained together, such as: 

2241 

2242 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

2243 

2244 At some other point, you can then run: pipe.execute(), 

2245 which will execute all commands queued in the pipe. 

2246 """ 

2247 return self._execution_strategy.execute_command(*args, **options) 

2248 

2249 def annotate_exception(self, exception, number, command): 

2250 """ 

2251 Provides extra context to the exception prior to it being handled 

2252 """ 

2253 self._execution_strategy.annotate_exception(exception, number, command) 

2254 

2255 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2256 """ 

2257 Execute all the commands in the current pipeline 

2258 """ 

2259 

2260 try: 

2261 return self._execution_strategy.execute(raise_on_error) 

2262 finally: 

2263 self.reset() 

2264 

2265 def reset(self): 

2266 """ 

2267 Reset back to empty pipeline. 

2268 """ 

2269 self._execution_strategy.reset() 

2270 

2271 def send_cluster_commands( 

2272 self, stack, raise_on_error=True, allow_redirections=True 

2273 ): 

2274 return self._execution_strategy.send_cluster_commands( 

2275 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections 

2276 ) 

2277 

2278 def exists(self, *keys): 

2279 return self._execution_strategy.exists(*keys) 

2280 

2281 def eval(self): 

2282 """ """ 

2283 return self._execution_strategy.eval() 

2284 

2285 def multi(self): 

2286 """ 

2287 Start a transactional block of the pipeline after WATCH commands 

2288 are issued. End the transactional block with `execute`. 

2289 """ 

2290 self._execution_strategy.multi() 

2291 

2292 def load_scripts(self): 

2293 """ """ 

2294 self._execution_strategy.load_scripts() 

2295 

2296 def discard(self): 

2297 """ """ 

2298 self._execution_strategy.discard() 

2299 

2300 def watch(self, *names): 

2301 """Watches the values at keys ``names``""" 

2302 self._execution_strategy.watch(*names) 

2303 

2304 def unwatch(self): 

2305 """Unwatches all previously specified keys""" 

2306 self._execution_strategy.unwatch() 

2307 

2308 def script_load_for_pipeline(self, *args, **kwargs): 

2309 self._execution_strategy.script_load_for_pipeline(*args, **kwargs) 

2310 

2311 def delete(self, *names): 

2312 self._execution_strategy.delete(*names) 

2313 

2314 def unlink(self, *names): 

2315 self._execution_strategy.unlink(*names) 

2316 

2317 

2318def block_pipeline_command(name: str) -> Callable[..., Any]: 

2319 """ 

2320 Prints error because some pipelined commands should 

2321 be blocked when running in cluster-mode 

2322 """ 

2323 

2324 def inner(*args, **kwargs): 

2325 raise RedisClusterException( 

2326 f"ERROR: Calling pipelined function {name} is blocked " 

2327 f"when running redis in cluster mode..." 

2328 ) 

2329 

2330 return inner 

2331 

2332 

2333# Blocked pipeline commands 

2334PIPELINE_BLOCKED_COMMANDS = ( 

2335 "BGREWRITEAOF", 

2336 "BGSAVE", 

2337 "BITOP", 

2338 "BRPOPLPUSH", 

2339 "CLIENT GETNAME", 

2340 "CLIENT KILL", 

2341 "CLIENT LIST", 

2342 "CLIENT SETNAME", 

2343 "CLIENT", 

2344 "CONFIG GET", 

2345 "CONFIG RESETSTAT", 

2346 "CONFIG REWRITE", 

2347 "CONFIG SET", 

2348 "CONFIG", 

2349 "DBSIZE", 

2350 "ECHO", 

2351 "EVALSHA", 

2352 "FLUSHALL", 

2353 "FLUSHDB", 

2354 "INFO", 

2355 "KEYS", 

2356 "LASTSAVE", 

2357 "MGET", 

2358 "MGET NONATOMIC", 

2359 "MOVE", 

2360 "MSET", 

2361 "MSET NONATOMIC", 

2362 "MSETNX", 

2363 "PFCOUNT", 

2364 "PFMERGE", 

2365 "PING", 

2366 "PUBLISH", 

2367 "RANDOMKEY", 

2368 "READONLY", 

2369 "READWRITE", 

2370 "RENAME", 

2371 "RENAMENX", 

2372 "RPOPLPUSH", 

2373 "SAVE", 

2374 "SCAN", 

2375 "SCRIPT EXISTS", 

2376 "SCRIPT FLUSH", 

2377 "SCRIPT KILL", 

2378 "SCRIPT LOAD", 

2379 "SCRIPT", 

2380 "SDIFF", 

2381 "SDIFFSTORE", 

2382 "SENTINEL GET MASTER ADDR BY NAME", 

2383 "SENTINEL MASTER", 

2384 "SENTINEL MASTERS", 

2385 "SENTINEL MONITOR", 

2386 "SENTINEL REMOVE", 

2387 "SENTINEL SENTINELS", 

2388 "SENTINEL SET", 

2389 "SENTINEL SLAVES", 

2390 "SENTINEL", 

2391 "SHUTDOWN", 

2392 "SINTER", 

2393 "SINTERSTORE", 

2394 "SLAVEOF", 

2395 "SLOWLOG GET", 

2396 "SLOWLOG LEN", 

2397 "SLOWLOG RESET", 

2398 "SLOWLOG", 

2399 "SMOVE", 

2400 "SORT", 

2401 "SUNION", 

2402 "SUNIONSTORE", 

2403 "TIME", 

2404) 

2405for command in PIPELINE_BLOCKED_COMMANDS: 

2406 command = command.replace(" ", "_").lower() 

2407 

2408 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

2409 

2410 

2411class PipelineCommand: 

2412 """ """ 

2413 

2414 def __init__(self, args, options=None, position=None): 

2415 self.args = args 

2416 if options is None: 

2417 options = {} 

2418 self.options = options 

2419 self.position = position 

2420 self.result = None 

2421 self.node = None 

2422 self.asking = False 

2423 

2424 

2425class NodeCommands: 

2426 """ """ 

2427 

2428 def __init__(self, parse_response, connection_pool, connection): 

2429 """ """ 

2430 self.parse_response = parse_response 

2431 self.connection_pool = connection_pool 

2432 self.connection = connection 

2433 self.commands = [] 

2434 

2435 def append(self, c): 

2436 """ """ 

2437 self.commands.append(c) 

2438 

2439 def write(self): 

2440 """ 

2441 Code borrowed from Redis so it can be fixed 

2442 """ 

2443 connection = self.connection 

2444 commands = self.commands 

2445 

2446 # We are going to clobber the commands with the write, so go ahead 

2447 # and ensure that nothing is sitting there from a previous run. 

2448 for c in commands: 

2449 c.result = None 

2450 

2451 # build up all commands into a single request to increase network perf 

2452 # send all the commands and catch connection and timeout errors. 

2453 try: 

2454 connection.send_packed_command( 

2455 connection.pack_commands([c.args for c in commands]) 

2456 ) 

2457 except (ConnectionError, TimeoutError) as e: 

2458 for c in commands: 

2459 c.result = e 

2460 

2461 def read(self): 

2462 """ """ 

2463 connection = self.connection 

2464 for c in self.commands: 

2465 # if there is a result on this command, 

2466 # it means we ran into an exception 

2467 # like a connection error. Trying to parse 

2468 # a response on a connection that 

2469 # is no longer open will result in a 

2470 # connection error raised by redis-py. 

2471 # but redis-py doesn't check in parse_response 

2472 # that the sock object is 

2473 # still set and if you try to 

2474 # read from a closed connection, it will 

2475 # result in an AttributeError because 

2476 # it will do a readline() call on None. 

2477 # This can have all kinds of nasty side-effects. 

2478 # Treating this case as a connection error 

2479 # is fine because it will dump 

2480 # the connection object back into the 

2481 # pool and on the next write, it will 

2482 # explicitly open the connection and all will be well. 

2483 if c.result is None: 

2484 try: 

2485 c.result = self.parse_response(connection, c.args[0], **c.options) 

2486 except (ConnectionError, TimeoutError) as e: 

2487 for c in self.commands: 

2488 c.result = e 

2489 return 

2490 except RedisError: 

2491 c.result = sys.exc_info()[1] 

2492 

2493 

2494class ExecutionStrategy(ABC): 

2495 @property 

2496 @abstractmethod 

2497 def command_queue(self): 

2498 pass 

2499 

2500 @abstractmethod 

2501 def execute_command(self, *args, **kwargs): 

2502 """ 

2503 Execution flow for current execution strategy. 

2504 

2505 See: ClusterPipeline.execute_command() 

2506 """ 

2507 pass 

2508 

2509 @abstractmethod 

2510 def annotate_exception(self, exception, number, command): 

2511 """ 

2512 Annotate exception according to current execution strategy. 

2513 

2514 See: ClusterPipeline.annotate_exception() 

2515 """ 

2516 pass 

2517 

2518 @abstractmethod 

2519 def pipeline_execute_command(self, *args, **options): 

2520 """ 

2521 Pipeline execution flow for current execution strategy. 

2522 

2523 See: ClusterPipeline.pipeline_execute_command() 

2524 """ 

2525 pass 

2526 

2527 @abstractmethod 

2528 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2529 """ 

2530 Executes current execution strategy. 

2531 

2532 See: ClusterPipeline.execute() 

2533 """ 

2534 pass 

2535 

2536 @abstractmethod 

2537 def send_cluster_commands( 

2538 self, stack, raise_on_error=True, allow_redirections=True 

2539 ): 

2540 """ 

2541 Sends commands according to current execution strategy. 

2542 

2543 See: ClusterPipeline.send_cluster_commands() 

2544 """ 

2545 pass 

2546 

2547 @abstractmethod 

2548 def reset(self): 

2549 """ 

2550 Resets current execution strategy. 

2551 

2552 See: ClusterPipeline.reset() 

2553 """ 

2554 pass 

2555 

2556 @abstractmethod 

2557 def exists(self, *keys): 

2558 pass 

2559 

2560 @abstractmethod 

2561 def eval(self): 

2562 pass 

2563 

2564 @abstractmethod 

2565 def multi(self): 

2566 """ 

2567 Starts transactional context. 

2568 

2569 See: ClusterPipeline.multi() 

2570 """ 

2571 pass 

2572 

2573 @abstractmethod 

2574 def load_scripts(self): 

2575 pass 

2576 

2577 @abstractmethod 

2578 def watch(self, *names): 

2579 pass 

2580 

2581 @abstractmethod 

2582 def unwatch(self): 

2583 """ 

2584 Unwatches all previously specified keys 

2585 

2586 See: ClusterPipeline.unwatch() 

2587 """ 

2588 pass 

2589 

2590 @abstractmethod 

2591 def script_load_for_pipeline(self, *args, **kwargs): 

2592 pass 

2593 

2594 @abstractmethod 

2595 def delete(self, *names): 

2596 """ 

2597 "Delete a key specified by ``names``" 

2598 

2599 See: ClusterPipeline.delete() 

2600 """ 

2601 pass 

2602 

2603 @abstractmethod 

2604 def unlink(self, *names): 

2605 """ 

2606 "Unlink a key specified by ``names``" 

2607 

2608 See: ClusterPipeline.unlink() 

2609 """ 

2610 pass 

2611 

2612 @abstractmethod 

2613 def discard(self): 

2614 pass 

2615 

2616 

2617class AbstractStrategy(ExecutionStrategy): 

2618 def __init__( 

2619 self, 

2620 pipe: ClusterPipeline, 

2621 ): 

2622 self._command_queue: List[PipelineCommand] = [] 

2623 self._pipe = pipe 

2624 self._nodes_manager = self._pipe.nodes_manager 

2625 

2626 @property 

2627 def command_queue(self): 

2628 return self._command_queue 

2629 

2630 @command_queue.setter 

2631 def command_queue(self, queue: List[PipelineCommand]): 

2632 self._command_queue = queue 

2633 

2634 @abstractmethod 

2635 def execute_command(self, *args, **kwargs): 

2636 pass 

2637 

2638 def pipeline_execute_command(self, *args, **options): 

2639 self._command_queue.append( 

2640 PipelineCommand(args, options, len(self._command_queue)) 

2641 ) 

2642 return self._pipe 

2643 

2644 @abstractmethod 

2645 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2646 pass 

2647 

2648 @abstractmethod 

2649 def send_cluster_commands( 

2650 self, stack, raise_on_error=True, allow_redirections=True 

2651 ): 

2652 pass 

2653 

2654 @abstractmethod 

2655 def reset(self): 

2656 pass 

2657 

2658 def exists(self, *keys): 

2659 return self.execute_command("EXISTS", *keys) 

2660 

2661 def eval(self): 

2662 """ """ 

2663 raise RedisClusterException("method eval() is not implemented") 

2664 

2665 def load_scripts(self): 

2666 """ """ 

2667 raise RedisClusterException("method load_scripts() is not implemented") 

2668 

2669 def script_load_for_pipeline(self, *args, **kwargs): 

2670 """ """ 

2671 raise RedisClusterException( 

2672 "method script_load_for_pipeline() is not implemented" 

2673 ) 

2674 

2675 def annotate_exception(self, exception, number, command): 

2676 """ 

2677 Provides extra context to the exception prior to it being handled 

2678 """ 

2679 cmd = " ".join(map(safe_str, command)) 

2680 msg = ( 

2681 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

2682 f"caused error: {exception.args[0]}" 

2683 ) 

2684 exception.args = (msg,) + exception.args[1:] 

2685 

2686 

2687class PipelineStrategy(AbstractStrategy): 

2688 def __init__(self, pipe: ClusterPipeline): 

2689 super().__init__(pipe) 

2690 self.command_flags = pipe.command_flags 

2691 

2692 def execute_command(self, *args, **kwargs): 

2693 return self.pipeline_execute_command(*args, **kwargs) 

2694 

2695 def _raise_first_error(self, stack): 

2696 """ 

2697 Raise the first exception on the stack 

2698 """ 

2699 for c in stack: 

2700 r = c.result 

2701 if isinstance(r, Exception): 

2702 self.annotate_exception(r, c.position + 1, c.args) 

2703 raise r 

2704 

2705 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2706 stack = self._command_queue 

2707 if not stack: 

2708 return [] 

2709 

2710 try: 

2711 return self.send_cluster_commands(stack, raise_on_error) 

2712 finally: 

2713 self.reset() 

2714 

2715 def reset(self): 

2716 """ 

2717 Reset back to empty pipeline. 

2718 """ 

2719 self._command_queue = [] 

2720 

2721 def send_cluster_commands( 

2722 self, stack, raise_on_error=True, allow_redirections=True 

2723 ): 

2724 """ 

2725 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling. 

2726 

2727 If one of the retryable exceptions has been thrown we assume that: 

2728 - connection_pool was disconnected 

2729 - connection_pool was reset 

2730 - refresh_table_asap set to True 

2731 

2732 It will try the number of times specified by 

2733 the retries in config option "self.retry" 

2734 which defaults to 3 unless manually configured. 

2735 

2736 If it reaches the number of times, the command will 

2737 raises ClusterDownException. 

2738 """ 

2739 if not stack: 

2740 return [] 

2741 retry_attempts = self._pipe.retry.get_retries() 

2742 while True: 

2743 try: 

2744 return self._send_cluster_commands( 

2745 stack, 

2746 raise_on_error=raise_on_error, 

2747 allow_redirections=allow_redirections, 

2748 ) 

2749 except RedisCluster.ERRORS_ALLOW_RETRY as e: 

2750 if retry_attempts > 0: 

2751 # Try again with the new cluster setup. All other errors 

2752 # should be raised. 

2753 retry_attempts -= 1 

2754 pass 

2755 else: 

2756 raise e 

2757 

2758 def _send_cluster_commands( 

2759 self, stack, raise_on_error=True, allow_redirections=True 

2760 ): 

2761 """ 

2762 Send a bunch of cluster commands to the redis cluster. 

2763 

2764 `allow_redirections` If the pipeline should follow 

2765 `ASK` & `MOVED` responses automatically. If set 

2766 to false it will raise RedisClusterException. 

2767 """ 

2768 # the first time sending the commands we send all of 

2769 # the commands that were queued up. 

2770 # if we have to run through it again, we only retry 

2771 # the commands that failed. 

2772 attempt = sorted(stack, key=lambda x: x.position) 

2773 is_default_node = False 

2774 # build a list of node objects based on node names we need to 

2775 nodes = {} 

2776 

2777 # as we move through each command that still needs to be processed, 

2778 # we figure out the slot number that command maps to, then from 

2779 # the slot determine the node. 

2780 for c in attempt: 

2781 while True: 

2782 # refer to our internal node -> slot table that 

2783 # tells us where a given command should route to. 

2784 # (it might be possible we have a cached node that no longer 

2785 # exists in the cluster, which is why we do this in a loop) 

2786 passed_targets = c.options.pop("target_nodes", None) 

2787 if passed_targets and not self._is_nodes_flag(passed_targets): 

2788 target_nodes = self._parse_target_nodes(passed_targets) 

2789 else: 

2790 target_nodes = self._determine_nodes( 

2791 *c.args, node_flag=passed_targets 

2792 ) 

2793 if not target_nodes: 

2794 raise RedisClusterException( 

2795 f"No targets were found to execute {c.args} command on" 

2796 ) 

2797 if len(target_nodes) > 1: 

2798 raise RedisClusterException( 

2799 f"Too many targets for command {c.args}" 

2800 ) 

2801 

2802 node = target_nodes[0] 

2803 if node == self._pipe.get_default_node(): 

2804 is_default_node = True 

2805 

2806 # now that we know the name of the node 

2807 # ( it's just a string in the form of host:port ) 

2808 # we can build a list of commands for each node. 

2809 node_name = node.name 

2810 if node_name not in nodes: 

2811 redis_node = self._pipe.get_redis_connection(node) 

2812 try: 

2813 connection = get_connection(redis_node) 

2814 except (ConnectionError, TimeoutError): 

2815 for n in nodes.values(): 

2816 n.connection_pool.release(n.connection) 

2817 # Connection retries are being handled in the node's 

2818 # Retry object. Reinitialize the node -> slot table. 

2819 self._nodes_manager.initialize() 

2820 if is_default_node: 

2821 self._pipe.replace_default_node() 

2822 raise 

2823 nodes[node_name] = NodeCommands( 

2824 redis_node.parse_response, 

2825 redis_node.connection_pool, 

2826 connection, 

2827 ) 

2828 nodes[node_name].append(c) 

2829 break 

2830 

2831 # send the commands in sequence. 

2832 # we write to all the open sockets for each node first, 

2833 # before reading anything 

2834 # this allows us to flush all the requests out across the 

2835 # network 

2836 # so that we can read them from different sockets as they come back. 

2837 # we dont' multiplex on the sockets as they come available, 

2838 # but that shouldn't make too much difference. 

2839 try: 

2840 node_commands = nodes.values() 

2841 for n in node_commands: 

2842 n.write() 

2843 

2844 for n in node_commands: 

2845 n.read() 

2846 finally: 

2847 # release all of the redis connections we allocated earlier 

2848 # back into the connection pool. 

2849 # we used to do this step as part of a try/finally block, 

2850 # but it is really dangerous to 

2851 # release connections back into the pool if for some 

2852 # reason the socket has data still left in it 

2853 # from a previous operation. The write and 

2854 # read operations already have try/catch around them for 

2855 # all known types of errors including connection 

2856 # and socket level errors. 

2857 # So if we hit an exception, something really bad 

2858 # happened and putting any oF 

2859 # these connections back into the pool is a very bad idea. 

2860 # the socket might have unread buffer still sitting in it, 

2861 # and then the next time we read from it we pass the 

2862 # buffered result back from a previous command and 

2863 # every single request after to that connection will always get 

2864 # a mismatched result. 

2865 for n in nodes.values(): 

2866 n.connection_pool.release(n.connection) 

2867 

2868 # if the response isn't an exception it is a 

2869 # valid response from the node 

2870 # we're all done with that command, YAY! 

2871 # if we have more commands to attempt, we've run into problems. 

2872 # collect all the commands we are allowed to retry. 

2873 # (MOVED, ASK, or connection errors or timeout errors) 

2874 attempt = sorted( 

2875 ( 

2876 c 

2877 for c in attempt 

2878 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

2879 ), 

2880 key=lambda x: x.position, 

2881 ) 

2882 if attempt and allow_redirections: 

2883 # RETRY MAGIC HAPPENS HERE! 

2884 # send these remaining commands one at a time using `execute_command` 

2885 # in the main client. This keeps our retry logic 

2886 # in one place mostly, 

2887 # and allows us to be more confident in correctness of behavior. 

2888 # at this point any speed gains from pipelining have been lost 

2889 # anyway, so we might as well make the best 

2890 # attempt to get the correct behavior. 

2891 # 

2892 # The client command will handle retries for each 

2893 # individual command sequentially as we pass each 

2894 # one into `execute_command`. Any exceptions 

2895 # that bubble out should only appear once all 

2896 # retries have been exhausted. 

2897 # 

2898 # If a lot of commands have failed, we'll be setting the 

2899 # flag to rebuild the slots table from scratch. 

2900 # So MOVED errors should correct themselves fairly quickly. 

2901 self._pipe.reinitialize_counter += 1 

2902 if self._pipe._should_reinitialized(): 

2903 self._nodes_manager.initialize() 

2904 if is_default_node: 

2905 self._pipe.replace_default_node() 

2906 for c in attempt: 

2907 try: 

2908 # send each command individually like we 

2909 # do in the main client. 

2910 c.result = self._pipe.parent_execute_command(*c.args, **c.options) 

2911 except RedisError as e: 

2912 c.result = e 

2913 

2914 # turn the response back into a simple flat array that corresponds 

2915 # to the sequence of commands issued in the stack in pipeline.execute() 

2916 response = [] 

2917 for c in sorted(stack, key=lambda x: x.position): 

2918 if c.args[0] in self._pipe.cluster_response_callbacks: 

2919 # Remove keys entry, it needs only for cache. 

2920 c.options.pop("keys", None) 

2921 c.result = self._pipe.cluster_response_callbacks[c.args[0]]( 

2922 c.result, **c.options 

2923 ) 

2924 response.append(c.result) 

2925 

2926 if raise_on_error: 

2927 self._raise_first_error(stack) 

2928 

2929 return response 

2930 

2931 def _is_nodes_flag(self, target_nodes): 

2932 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags 

2933 

2934 def _parse_target_nodes(self, target_nodes): 

2935 if isinstance(target_nodes, list): 

2936 nodes = target_nodes 

2937 elif isinstance(target_nodes, ClusterNode): 

2938 # Supports passing a single ClusterNode as a variable 

2939 nodes = [target_nodes] 

2940 elif isinstance(target_nodes, dict): 

2941 # Supports dictionaries of the format {node_name: node}. 

2942 # It enables to execute commands with multi nodes as follows: 

2943 # rc.cluster_save_config(rc.get_primaries()) 

2944 nodes = target_nodes.values() 

2945 else: 

2946 raise TypeError( 

2947 "target_nodes type can be one of the following: " 

2948 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

2949 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

2950 f"The passed type is {type(target_nodes)}" 

2951 ) 

2952 return nodes 

2953 

2954 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: 

2955 # Determine which nodes should be executed the command on. 

2956 # Returns a list of target nodes. 

2957 command = args[0].upper() 

2958 if ( 

2959 len(args) >= 2 

2960 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags 

2961 ): 

2962 command = f"{args[0]} {args[1]}".upper() 

2963 

2964 nodes_flag = kwargs.pop("nodes_flag", None) 

2965 if nodes_flag is not None: 

2966 # nodes flag passed by the user 

2967 command_flag = nodes_flag 

2968 else: 

2969 # get the nodes group for this command if it was predefined 

2970 command_flag = self._pipe.command_flags.get(command) 

2971 if command_flag == self._pipe.RANDOM: 

2972 # return a random node 

2973 return [self._pipe.get_random_node()] 

2974 elif command_flag == self._pipe.PRIMARIES: 

2975 # return all primaries 

2976 return self._pipe.get_primaries() 

2977 elif command_flag == self._pipe.REPLICAS: 

2978 # return all replicas 

2979 return self._pipe.get_replicas() 

2980 elif command_flag == self._pipe.ALL_NODES: 

2981 # return all nodes 

2982 return self._pipe.get_nodes() 

2983 elif command_flag == self._pipe.DEFAULT_NODE: 

2984 # return the cluster's default node 

2985 return [self._nodes_manager.default_node] 

2986 elif command in self._pipe.SEARCH_COMMANDS[0]: 

2987 return [self._nodes_manager.default_node] 

2988 else: 

2989 # get the node that holds the key's slot 

2990 slot = self._pipe.determine_slot(*args) 

2991 node = self._nodes_manager.get_node_from_slot( 

2992 slot, 

2993 self._pipe.read_from_replicas and command in READ_COMMANDS, 

2994 self._pipe.load_balancing_strategy 

2995 if command in READ_COMMANDS 

2996 else None, 

2997 ) 

2998 return [node] 

2999 

3000 def multi(self): 

3001 raise RedisClusterException( 

3002 "method multi() is not supported outside of transactional context" 

3003 ) 

3004 

3005 def discard(self): 

3006 raise RedisClusterException( 

3007 "method discard() is not supported outside of transactional context" 

3008 ) 

3009 

3010 def watch(self, *names): 

3011 raise RedisClusterException( 

3012 "method watch() is not supported outside of transactional context" 

3013 ) 

3014 

3015 def unwatch(self, *names): 

3016 raise RedisClusterException( 

3017 "method unwatch() is not supported outside of transactional context" 

3018 ) 

3019 

3020 def delete(self, *names): 

3021 if len(names) != 1: 

3022 raise RedisClusterException( 

3023 "deleting multiple keys is not implemented in pipeline command" 

3024 ) 

3025 

3026 return self.execute_command("DEL", names[0]) 

3027 

3028 def unlink(self, *names): 

3029 if len(names) != 1: 

3030 raise RedisClusterException( 

3031 "unlinking multiple keys is not implemented in pipeline command" 

3032 ) 

3033 

3034 return self.execute_command("UNLINK", names[0]) 

3035 

3036 

3037class TransactionStrategy(AbstractStrategy): 

3038 NO_SLOTS_COMMANDS = {"UNWATCH"} 

3039 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

3040 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

3041 SLOT_REDIRECT_ERRORS = (AskError, MovedError) 

3042 CONNECTION_ERRORS = ( 

3043 ConnectionError, 

3044 OSError, 

3045 ClusterDownError, 

3046 SlotNotCoveredError, 

3047 ) 

3048 

3049 def __init__(self, pipe: ClusterPipeline): 

3050 super().__init__(pipe) 

3051 self._explicit_transaction = False 

3052 self._watching = False 

3053 self._pipeline_slots: Set[int] = set() 

3054 self._transaction_connection: Optional[Connection] = None 

3055 self._executing = False 

3056 self._retry = copy(self._pipe.retry) 

3057 self._retry.update_supported_errors( 

3058 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS 

3059 ) 

3060 

3061 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]: 

3062 """ 

3063 Find a connection for a pipeline transaction. 

3064 

3065 For running an atomic transaction, watch keys ensure that contents have not been 

3066 altered as long as the watch commands for those keys were sent over the same 

3067 connection. So once we start watching a key, we fetch a connection to the 

3068 node that owns that slot and reuse it. 

3069 """ 

3070 if not self._pipeline_slots: 

3071 raise RedisClusterException( 

3072 "At least a command with a key is needed to identify a node" 

3073 ) 

3074 

3075 node: ClusterNode = self._nodes_manager.get_node_from_slot( 

3076 list(self._pipeline_slots)[0], False 

3077 ) 

3078 redis_node: Redis = self._pipe.get_redis_connection(node) 

3079 if self._transaction_connection: 

3080 if not redis_node.connection_pool.owns_connection( 

3081 self._transaction_connection 

3082 ): 

3083 previous_node = self._nodes_manager.find_connection_owner( 

3084 self._transaction_connection 

3085 ) 

3086 previous_node.connection_pool.release(self._transaction_connection) 

3087 self._transaction_connection = None 

3088 

3089 if not self._transaction_connection: 

3090 self._transaction_connection = get_connection(redis_node) 

3091 

3092 return redis_node, self._transaction_connection 

3093 

3094 def execute_command(self, *args, **kwargs): 

3095 slot_number: Optional[int] = None 

3096 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS: 

3097 slot_number = self._pipe.determine_slot(*args) 

3098 

3099 if ( 

3100 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS 

3101 ) and not self._explicit_transaction: 

3102 if args[0] == "WATCH": 

3103 self._validate_watch() 

3104 

3105 if slot_number is not None: 

3106 if self._pipeline_slots and slot_number not in self._pipeline_slots: 

3107 raise CrossSlotTransactionError( 

3108 "Cannot watch or send commands on different slots" 

3109 ) 

3110 

3111 self._pipeline_slots.add(slot_number) 

3112 elif args[0] not in self.NO_SLOTS_COMMANDS: 

3113 raise RedisClusterException( 

3114 f"Cannot identify slot number for command: {args[0]}," 

3115 "it cannot be triggered in a transaction" 

3116 ) 

3117 

3118 return self._immediate_execute_command(*args, **kwargs) 

3119 else: 

3120 if slot_number is not None: 

3121 self._pipeline_slots.add(slot_number) 

3122 

3123 return self.pipeline_execute_command(*args, **kwargs) 

3124 

3125 def _validate_watch(self): 

3126 if self._explicit_transaction: 

3127 raise RedisError("Cannot issue a WATCH after a MULTI") 

3128 

3129 self._watching = True 

3130 

3131 def _immediate_execute_command(self, *args, **options): 

3132 return self._retry.call_with_retry( 

3133 lambda: self._get_connection_and_send_command(*args, **options), 

3134 self._reinitialize_on_error, 

3135 ) 

3136 

3137 def _get_connection_and_send_command(self, *args, **options): 

3138 redis_node, connection = self._get_client_and_connection_for_transaction() 

3139 return self._send_command_parse_response( 

3140 connection, redis_node, args[0], *args, **options 

3141 ) 

3142 

3143 def _send_command_parse_response( 

3144 self, conn, redis_node: Redis, command_name, *args, **options 

3145 ): 

3146 """ 

3147 Send a command and parse the response 

3148 """ 

3149 

3150 conn.send_command(*args) 

3151 output = redis_node.parse_response(conn, command_name, **options) 

3152 

3153 if command_name in self.UNWATCH_COMMANDS: 

3154 self._watching = False 

3155 return output 

3156 

3157 def _reinitialize_on_error(self, error): 

3158 if self._watching: 

3159 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: 

3160 raise WatchError("Slot rebalancing occurred while watching keys") 

3161 

3162 if ( 

3163 type(error) in self.SLOT_REDIRECT_ERRORS 

3164 or type(error) in self.CONNECTION_ERRORS 

3165 ): 

3166 if self._transaction_connection: 

3167 self._transaction_connection = None 

3168 

3169 self._pipe.reinitialize_counter += 1 

3170 if self._pipe._should_reinitialized(): 

3171 self._nodes_manager.initialize() 

3172 self.reinitialize_counter = 0 

3173 else: 

3174 if isinstance(error, AskError): 

3175 self._nodes_manager.update_moved_exception(error) 

3176 

3177 self._executing = False 

3178 

3179 def _raise_first_error(self, responses, stack): 

3180 """ 

3181 Raise the first exception on the stack 

3182 """ 

3183 for r, cmd in zip(responses, stack): 

3184 if isinstance(r, Exception): 

3185 self.annotate_exception(r, cmd.position + 1, cmd.args) 

3186 raise r 

3187 

3188 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3189 stack = self._command_queue 

3190 if not stack and (not self._watching or not self._pipeline_slots): 

3191 return [] 

3192 

3193 return self._execute_transaction_with_retries(stack, raise_on_error) 

3194 

3195 def _execute_transaction_with_retries( 

3196 self, stack: List["PipelineCommand"], raise_on_error: bool 

3197 ): 

3198 return self._retry.call_with_retry( 

3199 lambda: self._execute_transaction(stack, raise_on_error), 

3200 self._reinitialize_on_error, 

3201 ) 

3202 

3203 def _execute_transaction( 

3204 self, stack: List["PipelineCommand"], raise_on_error: bool 

3205 ): 

3206 if len(self._pipeline_slots) > 1: 

3207 raise CrossSlotTransactionError( 

3208 "All keys involved in a cluster transaction must map to the same slot" 

3209 ) 

3210 

3211 self._executing = True 

3212 

3213 redis_node, connection = self._get_client_and_connection_for_transaction() 

3214 

3215 stack = chain( 

3216 [PipelineCommand(("MULTI",))], 

3217 stack, 

3218 [PipelineCommand(("EXEC",))], 

3219 ) 

3220 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options] 

3221 packed_commands = connection.pack_commands(commands) 

3222 connection.send_packed_command(packed_commands) 

3223 errors = [] 

3224 

3225 # parse off the response for MULTI 

3226 # NOTE: we need to handle ResponseErrors here and continue 

3227 # so that we read all the additional command messages from 

3228 # the socket 

3229 try: 

3230 redis_node.parse_response(connection, "MULTI") 

3231 except ResponseError as e: 

3232 self.annotate_exception(e, 0, "MULTI") 

3233 errors.append(e) 

3234 except self.CONNECTION_ERRORS as cluster_error: 

3235 self.annotate_exception(cluster_error, 0, "MULTI") 

3236 raise 

3237 

3238 # and all the other commands 

3239 for i, command in enumerate(self._command_queue): 

3240 if EMPTY_RESPONSE in command.options: 

3241 errors.append((i, command.options[EMPTY_RESPONSE])) 

3242 else: 

3243 try: 

3244 _ = redis_node.parse_response(connection, "_") 

3245 except self.SLOT_REDIRECT_ERRORS as slot_error: 

3246 self.annotate_exception(slot_error, i + 1, command.args) 

3247 errors.append(slot_error) 

3248 except self.CONNECTION_ERRORS as cluster_error: 

3249 self.annotate_exception(cluster_error, i + 1, command.args) 

3250 raise 

3251 except ResponseError as e: 

3252 self.annotate_exception(e, i + 1, command.args) 

3253 errors.append(e) 

3254 

3255 response = None 

3256 # parse the EXEC. 

3257 try: 

3258 response = redis_node.parse_response(connection, "EXEC") 

3259 except ExecAbortError: 

3260 if errors: 

3261 raise errors[0] 

3262 raise 

3263 

3264 self._executing = False 

3265 

3266 # EXEC clears any watched keys 

3267 self._watching = False 

3268 

3269 if response is None: 

3270 raise WatchError("Watched variable changed.") 

3271 

3272 # put any parse errors into the response 

3273 for i, e in errors: 

3274 response.insert(i, e) 

3275 

3276 if len(response) != len(self._command_queue): 

3277 raise InvalidPipelineStack( 

3278 "Unexpected response length for cluster pipeline EXEC." 

3279 " Command stack was {} but response had length {}".format( 

3280 [c.args[0] for c in self._command_queue], len(response) 

3281 ) 

3282 ) 

3283 

3284 # find any errors in the response and raise if necessary 

3285 if raise_on_error or len(errors) > 0: 

3286 self._raise_first_error( 

3287 response, 

3288 self._command_queue, 

3289 ) 

3290 

3291 # We have to run response callbacks manually 

3292 data = [] 

3293 for r, cmd in zip(response, self._command_queue): 

3294 if not isinstance(r, Exception): 

3295 command_name = cmd.args[0] 

3296 if command_name in self._pipe.cluster_response_callbacks: 

3297 r = self._pipe.cluster_response_callbacks[command_name]( 

3298 r, **cmd.options 

3299 ) 

3300 data.append(r) 

3301 return data 

3302 

3303 def reset(self): 

3304 self._command_queue = [] 

3305 

3306 # make sure to reset the connection state in the event that we were 

3307 # watching something 

3308 if self._transaction_connection: 

3309 try: 

3310 if self._watching: 

3311 # call this manually since our unwatch or 

3312 # immediate_execute_command methods can call reset() 

3313 self._transaction_connection.send_command("UNWATCH") 

3314 self._transaction_connection.read_response() 

3315 # we can safely return the connection to the pool here since we're 

3316 # sure we're no longer WATCHing anything 

3317 node = self._nodes_manager.find_connection_owner( 

3318 self._transaction_connection 

3319 ) 

3320 node.redis_connection.connection_pool.release( 

3321 self._transaction_connection 

3322 ) 

3323 self._transaction_connection = None 

3324 except self.CONNECTION_ERRORS: 

3325 # disconnect will also remove any previous WATCHes 

3326 if self._transaction_connection: 

3327 self._transaction_connection.disconnect() 

3328 

3329 # clean up the other instance attributes 

3330 self._watching = False 

3331 self._explicit_transaction = False 

3332 self._pipeline_slots = set() 

3333 self._executing = False 

3334 

3335 def send_cluster_commands( 

3336 self, stack, raise_on_error=True, allow_redirections=True 

3337 ): 

3338 raise NotImplementedError( 

3339 "send_cluster_commands cannot be executed in transactional context." 

3340 ) 

3341 

3342 def multi(self): 

3343 if self._explicit_transaction: 

3344 raise RedisError("Cannot issue nested calls to MULTI") 

3345 if self._command_queue: 

3346 raise RedisError( 

3347 "Commands without an initial WATCH have already been issued" 

3348 ) 

3349 self._explicit_transaction = True 

3350 

3351 def watch(self, *names): 

3352 if self._explicit_transaction: 

3353 raise RedisError("Cannot issue a WATCH after a MULTI") 

3354 

3355 return self.execute_command("WATCH", *names) 

3356 

3357 def unwatch(self): 

3358 if self._watching: 

3359 return self.execute_command("UNWATCH") 

3360 

3361 return True 

3362 

3363 def discard(self): 

3364 self.reset() 

3365 

3366 def delete(self, *names): 

3367 return self.execute_command("DEL", *names) 

3368 

3369 def unlink(self, *names): 

3370 return self.execute_command("UNLINK", *names)