Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1333 statements  

1import random 

2import socket 

3import sys 

4import threading 

5import time 

6from abc import ABC, abstractmethod 

7from collections import OrderedDict 

8from copy import copy 

9from enum import Enum 

10from itertools import chain 

11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union 

12 

13from redis._parsers import CommandsParser, Encoder 

14from redis._parsers.helpers import parse_scan 

15from redis.backoff import ExponentialWithJitterBackoff, NoBackoff 

16from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface 

17from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis 

18from redis.commands import READ_COMMANDS, RedisClusterCommands 

19from redis.commands.helpers import list_or_args 

20from redis.connection import ( 

21 Connection, 

22 ConnectionPool, 

23 parse_url, 

24) 

25from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

26from redis.event import ( 

27 AfterPooledConnectionsInstantiationEvent, 

28 AfterPubSubConnectionInstantiationEvent, 

29 ClientType, 

30 EventDispatcher, 

31) 

32from redis.exceptions import ( 

33 AskError, 

34 AuthenticationError, 

35 ClusterDownError, 

36 ClusterError, 

37 ConnectionError, 

38 CrossSlotTransactionError, 

39 DataError, 

40 ExecAbortError, 

41 InvalidPipelineStack, 

42 MaxConnectionsError, 

43 MovedError, 

44 RedisClusterException, 

45 RedisError, 

46 ResponseError, 

47 SlotNotCoveredError, 

48 TimeoutError, 

49 TryAgainError, 

50 WatchError, 

51) 

52from redis.lock import Lock 

53from redis.retry import Retry 

54from redis.utils import ( 

55 deprecated_args, 

56 dict_merge, 

57 list_keys_to_dict, 

58 merge_result, 

59 safe_str, 

60 str_if_bytes, 

61 truncate_text, 

62) 

63 

64 

65def get_node_name(host: str, port: Union[str, int]) -> str: 

66 return f"{host}:{port}" 

67 

68 

69@deprecated_args( 

70 allowed_args=["redis_node"], 

71 reason="Use get_connection(redis_node) instead", 

72 version="5.3.0", 

73) 

74def get_connection(redis_node: Redis, *args, **options) -> Connection: 

75 return redis_node.connection or redis_node.connection_pool.get_connection() 

76 

77 

78def parse_scan_result(command, res, **options): 

79 cursors = {} 

80 ret = [] 

81 for node_name, response in res.items(): 

82 cursor, r = parse_scan(response, **options) 

83 cursors[node_name] = cursor 

84 ret += r 

85 

86 return cursors, ret 

87 

88 

89def parse_pubsub_numsub(command, res, **options): 

90 numsub_d = OrderedDict() 

91 for numsub_tups in res.values(): 

92 for channel, numsubbed in numsub_tups: 

93 try: 

94 numsub_d[channel] += numsubbed 

95 except KeyError: 

96 numsub_d[channel] = numsubbed 

97 

98 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

99 return ret_numsub 

100 

101 

102def parse_cluster_slots( 

103 resp: Any, **options: Any 

104) -> Dict[Tuple[int, int], Dict[str, Any]]: 

105 current_host = options.get("current_host", "") 

106 

107 def fix_server(*args: Any) -> Tuple[str, Any]: 

108 return str_if_bytes(args[0]) or current_host, args[1] 

109 

110 slots = {} 

111 for slot in resp: 

112 start, end, primary = slot[:3] 

113 replicas = slot[3:] 

114 slots[start, end] = { 

115 "primary": fix_server(*primary), 

116 "replicas": [fix_server(*replica) for replica in replicas], 

117 } 

118 

119 return slots 

120 

121 

122def parse_cluster_shards(resp, **options): 

123 """ 

124 Parse CLUSTER SHARDS response. 

125 """ 

126 if isinstance(resp[0], dict): 

127 return resp 

128 shards = [] 

129 for x in resp: 

130 shard = {"slots": [], "nodes": []} 

131 for i in range(0, len(x[1]), 2): 

132 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

133 nodes = x[3] 

134 for node in nodes: 

135 dict_node = {} 

136 for i in range(0, len(node), 2): 

137 dict_node[node[i]] = node[i + 1] 

138 shard["nodes"].append(dict_node) 

139 shards.append(shard) 

140 

141 return shards 

142 

143 

144def parse_cluster_myshardid(resp, **options): 

145 """ 

146 Parse CLUSTER MYSHARDID response. 

147 """ 

148 return resp.decode("utf-8") 

149 

150 

151PRIMARY = "primary" 

152REPLICA = "replica" 

153SLOT_ID = "slot-id" 

154 

155REDIS_ALLOWED_KEYS = ( 

156 "connection_class", 

157 "connection_pool", 

158 "connection_pool_class", 

159 "client_name", 

160 "credential_provider", 

161 "db", 

162 "decode_responses", 

163 "encoding", 

164 "encoding_errors", 

165 "host", 

166 "lib_name", 

167 "lib_version", 

168 "max_connections", 

169 "nodes_flag", 

170 "redis_connect_func", 

171 "password", 

172 "port", 

173 "timeout", 

174 "queue_class", 

175 "retry", 

176 "retry_on_timeout", 

177 "protocol", 

178 "socket_connect_timeout", 

179 "socket_keepalive", 

180 "socket_keepalive_options", 

181 "socket_timeout", 

182 "ssl", 

183 "ssl_ca_certs", 

184 "ssl_ca_data", 

185 "ssl_certfile", 

186 "ssl_cert_reqs", 

187 "ssl_keyfile", 

188 "ssl_password", 

189 "ssl_check_hostname", 

190 "unix_socket_path", 

191 "username", 

192 "cache", 

193 "cache_config", 

194) 

195KWARGS_DISABLED_KEYS = ("host", "port", "retry") 

196 

197 

198def cleanup_kwargs(**kwargs): 

199 """ 

200 Remove unsupported or disabled keys from kwargs 

201 """ 

202 connection_kwargs = { 

203 k: v 

204 for k, v in kwargs.items() 

205 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

206 } 

207 

208 return connection_kwargs 

209 

210 

211class AbstractRedisCluster: 

212 RedisClusterRequestTTL = 16 

213 

214 PRIMARIES = "primaries" 

215 REPLICAS = "replicas" 

216 ALL_NODES = "all" 

217 RANDOM = "random" 

218 DEFAULT_NODE = "default-node" 

219 

220 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

221 

222 COMMAND_FLAGS = dict_merge( 

223 list_keys_to_dict( 

224 [ 

225 "ACL CAT", 

226 "ACL DELUSER", 

227 "ACL DRYRUN", 

228 "ACL GENPASS", 

229 "ACL GETUSER", 

230 "ACL HELP", 

231 "ACL LIST", 

232 "ACL LOG", 

233 "ACL LOAD", 

234 "ACL SAVE", 

235 "ACL SETUSER", 

236 "ACL USERS", 

237 "ACL WHOAMI", 

238 "AUTH", 

239 "CLIENT LIST", 

240 "CLIENT SETINFO", 

241 "CLIENT SETNAME", 

242 "CLIENT GETNAME", 

243 "CONFIG SET", 

244 "CONFIG REWRITE", 

245 "CONFIG RESETSTAT", 

246 "TIME", 

247 "PUBSUB CHANNELS", 

248 "PUBSUB NUMPAT", 

249 "PUBSUB NUMSUB", 

250 "PUBSUB SHARDCHANNELS", 

251 "PUBSUB SHARDNUMSUB", 

252 "PING", 

253 "INFO", 

254 "SHUTDOWN", 

255 "KEYS", 

256 "DBSIZE", 

257 "BGSAVE", 

258 "SLOWLOG GET", 

259 "SLOWLOG LEN", 

260 "SLOWLOG RESET", 

261 "WAIT", 

262 "WAITAOF", 

263 "SAVE", 

264 "MEMORY PURGE", 

265 "MEMORY MALLOC-STATS", 

266 "MEMORY STATS", 

267 "LASTSAVE", 

268 "CLIENT TRACKINGINFO", 

269 "CLIENT PAUSE", 

270 "CLIENT UNPAUSE", 

271 "CLIENT UNBLOCK", 

272 "CLIENT ID", 

273 "CLIENT REPLY", 

274 "CLIENT GETREDIR", 

275 "CLIENT INFO", 

276 "CLIENT KILL", 

277 "READONLY", 

278 "CLUSTER INFO", 

279 "CLUSTER MEET", 

280 "CLUSTER MYSHARDID", 

281 "CLUSTER NODES", 

282 "CLUSTER REPLICAS", 

283 "CLUSTER RESET", 

284 "CLUSTER SET-CONFIG-EPOCH", 

285 "CLUSTER SLOTS", 

286 "CLUSTER SHARDS", 

287 "CLUSTER COUNT-FAILURE-REPORTS", 

288 "CLUSTER KEYSLOT", 

289 "COMMAND", 

290 "COMMAND COUNT", 

291 "COMMAND LIST", 

292 "COMMAND GETKEYS", 

293 "CONFIG GET", 

294 "DEBUG", 

295 "RANDOMKEY", 

296 "READONLY", 

297 "READWRITE", 

298 "TIME", 

299 "TFUNCTION LOAD", 

300 "TFUNCTION DELETE", 

301 "TFUNCTION LIST", 

302 "TFCALL", 

303 "TFCALLASYNC", 

304 "LATENCY HISTORY", 

305 "LATENCY LATEST", 

306 "LATENCY RESET", 

307 "MODULE LIST", 

308 "MODULE LOAD", 

309 "MODULE UNLOAD", 

310 "MODULE LOADEX", 

311 ], 

312 DEFAULT_NODE, 

313 ), 

314 list_keys_to_dict( 

315 [ 

316 "FLUSHALL", 

317 "FLUSHDB", 

318 "FUNCTION DELETE", 

319 "FUNCTION FLUSH", 

320 "FUNCTION LIST", 

321 "FUNCTION LOAD", 

322 "FUNCTION RESTORE", 

323 "SCAN", 

324 "SCRIPT EXISTS", 

325 "SCRIPT FLUSH", 

326 "SCRIPT LOAD", 

327 ], 

328 PRIMARIES, 

329 ), 

330 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

331 list_keys_to_dict( 

332 [ 

333 "CLUSTER COUNTKEYSINSLOT", 

334 "CLUSTER DELSLOTS", 

335 "CLUSTER DELSLOTSRANGE", 

336 "CLUSTER GETKEYSINSLOT", 

337 "CLUSTER SETSLOT", 

338 ], 

339 SLOT_ID, 

340 ), 

341 ) 

342 

343 SEARCH_COMMANDS = ( 

344 [ 

345 "FT.CREATE", 

346 "FT.SEARCH", 

347 "FT.AGGREGATE", 

348 "FT.EXPLAIN", 

349 "FT.EXPLAINCLI", 

350 "FT,PROFILE", 

351 "FT.ALTER", 

352 "FT.DROPINDEX", 

353 "FT.ALIASADD", 

354 "FT.ALIASUPDATE", 

355 "FT.ALIASDEL", 

356 "FT.TAGVALS", 

357 "FT.SUGADD", 

358 "FT.SUGGET", 

359 "FT.SUGDEL", 

360 "FT.SUGLEN", 

361 "FT.SYNUPDATE", 

362 "FT.SYNDUMP", 

363 "FT.SPELLCHECK", 

364 "FT.DICTADD", 

365 "FT.DICTDEL", 

366 "FT.DICTDUMP", 

367 "FT.INFO", 

368 "FT._LIST", 

369 "FT.CONFIG", 

370 "FT.ADD", 

371 "FT.DEL", 

372 "FT.DROP", 

373 "FT.GET", 

374 "FT.MGET", 

375 "FT.SYNADD", 

376 ], 

377 ) 

378 

379 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

380 "CLUSTER SLOTS": parse_cluster_slots, 

381 "CLUSTER SHARDS": parse_cluster_shards, 

382 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

383 } 

384 

385 RESULT_CALLBACKS = dict_merge( 

386 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub), 

387 list_keys_to_dict( 

388 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

389 ), 

390 list_keys_to_dict( 

391 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result 

392 ), 

393 list_keys_to_dict( 

394 [ 

395 "PING", 

396 "CONFIG SET", 

397 "CONFIG REWRITE", 

398 "CONFIG RESETSTAT", 

399 "CLIENT SETNAME", 

400 "BGSAVE", 

401 "SLOWLOG RESET", 

402 "SAVE", 

403 "MEMORY PURGE", 

404 "CLIENT PAUSE", 

405 "CLIENT UNPAUSE", 

406 ], 

407 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

408 ), 

409 list_keys_to_dict( 

410 ["DBSIZE", "WAIT"], 

411 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

412 ), 

413 list_keys_to_dict( 

414 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

415 ), 

416 list_keys_to_dict(["SCAN"], parse_scan_result), 

417 list_keys_to_dict( 

418 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

419 ), 

420 list_keys_to_dict( 

421 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

422 ), 

423 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

424 ) 

425 

426 ERRORS_ALLOW_RETRY = ( 

427 ConnectionError, 

428 TimeoutError, 

429 ClusterDownError, 

430 SlotNotCoveredError, 

431 ) 

432 

433 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

434 """Replace the default cluster node. 

435 A random cluster node will be chosen if target_node isn't passed, and primaries 

436 will be prioritized. The default node will not be changed if there are no other 

437 nodes in the cluster. 

438 

439 Args: 

440 target_node (ClusterNode, optional): Target node to replace the default 

441 node. Defaults to None. 

442 """ 

443 if target_node: 

444 self.nodes_manager.default_node = target_node 

445 else: 

446 curr_node = self.get_default_node() 

447 primaries = [node for node in self.get_primaries() if node != curr_node] 

448 if primaries: 

449 # Choose a primary if the cluster contains different primaries 

450 self.nodes_manager.default_node = random.choice(primaries) 

451 else: 

452 # Otherwise, choose a primary if the cluster contains different primaries 

453 replicas = [node for node in self.get_replicas() if node != curr_node] 

454 if replicas: 

455 self.nodes_manager.default_node = random.choice(replicas) 

456 

457 

458class RedisCluster(AbstractRedisCluster, RedisClusterCommands): 

459 @classmethod 

460 def from_url(cls, url, **kwargs): 

461 """ 

462 Return a Redis client object configured from the given URL 

463 

464 For example:: 

465 

466 redis://[[username]:[password]]@localhost:6379/0 

467 rediss://[[username]:[password]]@localhost:6379/0 

468 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

469 

470 Three URL schemes are supported: 

471 

472 - `redis://` creates a TCP socket connection. See more at: 

473 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

474 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

475 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

476 - ``unix://``: creates a Unix Domain Socket connection. 

477 

478 The username, password, hostname, path and all querystring values 

479 are passed through urllib.parse.unquote in order to replace any 

480 percent-encoded values with their corresponding characters. 

481 

482 There are several ways to specify a database number. The first value 

483 found will be used: 

484 

485 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

486 2. If using the redis:// or rediss:// schemes, the path argument 

487 of the url, e.g. redis://localhost/0 

488 3. A ``db`` keyword argument to this function. 

489 

490 If none of these options are specified, the default db=0 is used. 

491 

492 All querystring options are cast to their appropriate Python types. 

493 Boolean arguments can be specified with string values "True"/"False" 

494 or "Yes"/"No". Values that cannot be properly cast cause a 

495 ``ValueError`` to be raised. Once parsed, the querystring arguments 

496 and keyword arguments are passed to the ``ConnectionPool``'s 

497 class initializer. In the case of conflicting arguments, querystring 

498 arguments always win. 

499 

500 """ 

501 return cls(url=url, **kwargs) 

502 

503 @deprecated_args( 

504 args_to_warn=["read_from_replicas"], 

505 reason="Please configure the 'load_balancing_strategy' instead", 

506 version="5.3.0", 

507 ) 

508 @deprecated_args( 

509 args_to_warn=[ 

510 "cluster_error_retry_attempts", 

511 ], 

512 reason="Please configure the 'retry' object instead", 

513 version="6.0.0", 

514 ) 

515 def __init__( 

516 self, 

517 host: Optional[str] = None, 

518 port: int = 6379, 

519 startup_nodes: Optional[List["ClusterNode"]] = None, 

520 cluster_error_retry_attempts: int = 3, 

521 retry: Optional["Retry"] = None, 

522 require_full_coverage: bool = True, 

523 reinitialize_steps: int = 5, 

524 read_from_replicas: bool = False, 

525 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None, 

526 dynamic_startup_nodes: bool = True, 

527 url: Optional[str] = None, 

528 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

529 cache: Optional[CacheInterface] = None, 

530 cache_config: Optional[CacheConfig] = None, 

531 event_dispatcher: Optional[EventDispatcher] = None, 

532 **kwargs, 

533 ): 

534 """ 

535 Initialize a new RedisCluster client. 

536 

537 :param startup_nodes: 

538 List of nodes from which initial bootstrapping can be done 

539 :param host: 

540 Can be used to point to a startup node 

541 :param port: 

542 Can be used to point to a startup node 

543 :param require_full_coverage: 

544 When set to False (default value): the client will not require a 

545 full coverage of the slots. However, if not all slots are covered, 

546 and at least one node has 'cluster-require-full-coverage' set to 

547 'yes,' the server will throw a ClusterDownError for some key-based 

548 commands. See - 

549 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

550 When set to True: all slots must be covered to construct the 

551 cluster client. If not all slots are covered, RedisClusterException 

552 will be thrown. 

553 :param read_from_replicas: 

554 @deprecated - please use load_balancing_strategy instead 

555 Enable read from replicas in READONLY mode. You can read possibly 

556 stale data. 

557 When set to true, read commands will be assigned between the 

558 primary and its replications in a Round-Robin manner. 

559 :param load_balancing_strategy: 

560 Enable read from replicas in READONLY mode and defines the load balancing 

561 strategy that will be used for cluster node selection. 

562 The data read from replicas is eventually consistent with the data in primary nodes. 

563 :param dynamic_startup_nodes: 

564 Set the RedisCluster's startup nodes to all of the discovered nodes. 

565 If true (default value), the cluster's discovered nodes will be used to 

566 determine the cluster nodes-slots mapping in the next topology refresh. 

567 It will remove the initial passed startup nodes if their endpoints aren't 

568 listed in the CLUSTER SLOTS output. 

569 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

570 specific IP addresses, it is best to set it to false. 

571 :param cluster_error_retry_attempts: 

572 @deprecated - Please configure the 'retry' object instead 

573 In case 'retry' object is set - this argument is ignored! 

574 

575 Number of times to retry before raising an error when 

576 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or 

577 :class:`~.ClusterDownError` are encountered 

578 :param retry: 

579 A retry object that defines the retry strategy and the number of 

580 retries for the cluster client. 

581 In current implementation for the cluster client (starting form redis-py version 6.0.0) 

582 the retry object is not yet fully utilized, instead it is used just to determine 

583 the number of retries for the cluster client. 

584 In the future releases the retry object will be used to handle the cluster client retries! 

585 :param reinitialize_steps: 

586 Specifies the number of MOVED errors that need to occur before 

587 reinitializing the whole cluster topology. If a MOVED error occurs 

588 and the cluster does not need to be reinitialized on this current 

589 error handling, only the MOVED slot will be patched with the 

590 redirected node. 

591 To reinitialize the cluster on every MOVED error, set 

592 reinitialize_steps to 1. 

593 To avoid reinitializing the cluster on moved errors, set 

594 reinitialize_steps to 0. 

595 :param address_remap: 

596 An optional callable which, when provided with an internal network 

597 address of a node, e.g. a `(host, port)` tuple, will return the address 

598 where the node is reachable. This can be used to map the addresses at 

599 which the nodes _think_ they are, to addresses at which a client may 

600 reach them, such as when they sit behind a proxy. 

601 

602 :**kwargs: 

603 Extra arguments that will be sent into Redis instance when created 

604 (See Official redis-py doc for supported kwargs - the only limitation 

605 is that you can't provide 'retry' object as part of kwargs. 

606 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

607 Some kwargs are not supported and will raise a 

608 RedisClusterException: 

609 - db (Redis do not support database SELECT in cluster mode) 

610 """ 

611 if startup_nodes is None: 

612 startup_nodes = [] 

613 

614 if "db" in kwargs: 

615 # Argument 'db' is not possible to use in cluster mode 

616 raise RedisClusterException( 

617 "Argument 'db' is not possible to use in cluster mode" 

618 ) 

619 

620 if "retry" in kwargs: 

621 # Argument 'retry' is not possible to be used in kwargs when in cluster mode 

622 # the kwargs are set to the lower level connections to the cluster nodes 

623 # and there we provide retry configuration without retries allowed. 

624 # The retries should be handled on cluster client level. 

625 raise RedisClusterException( 

626 "The 'retry' argument cannot be used in kwargs when running in cluster mode." 

627 ) 

628 

629 # Get the startup node/s 

630 from_url = False 

631 if url is not None: 

632 from_url = True 

633 url_options = parse_url(url) 

634 if "path" in url_options: 

635 raise RedisClusterException( 

636 "RedisCluster does not currently support Unix Domain " 

637 "Socket connections" 

638 ) 

639 if "db" in url_options and url_options["db"] != 0: 

640 # Argument 'db' is not possible to use in cluster mode 

641 raise RedisClusterException( 

642 "A ``db`` querystring option can only be 0 in cluster mode" 

643 ) 

644 kwargs.update(url_options) 

645 host = kwargs.get("host") 

646 port = kwargs.get("port", port) 

647 startup_nodes.append(ClusterNode(host, port)) 

648 elif host is not None and port is not None: 

649 startup_nodes.append(ClusterNode(host, port)) 

650 elif len(startup_nodes) == 0: 

651 # No startup node was provided 

652 raise RedisClusterException( 

653 "RedisCluster requires at least one node to discover the " 

654 "cluster. Please provide one of the followings:\n" 

655 "1. host and port, for example:\n" 

656 " RedisCluster(host='localhost', port=6379)\n" 

657 "2. list of startup nodes, for example:\n" 

658 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

659 " ClusterNode('localhost', 6378)])" 

660 ) 

661 # Update the connection arguments 

662 # Whenever a new connection is established, RedisCluster's on_connect 

663 # method should be run 

664 # If the user passed on_connect function we'll save it and run it 

665 # inside the RedisCluster.on_connect() function 

666 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

667 kwargs.update({"redis_connect_func": self.on_connect}) 

668 kwargs = cleanup_kwargs(**kwargs) 

669 if retry: 

670 self.retry = retry 

671 else: 

672 self.retry = Retry( 

673 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

674 retries=cluster_error_retry_attempts, 

675 ) 

676 

677 self.encoder = Encoder( 

678 kwargs.get("encoding", "utf-8"), 

679 kwargs.get("encoding_errors", "strict"), 

680 kwargs.get("decode_responses", False), 

681 ) 

682 protocol = kwargs.get("protocol", None) 

683 if (cache_config or cache) and protocol not in [3, "3"]: 

684 raise RedisError("Client caching is only supported with RESP version 3") 

685 

686 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

687 self.node_flags = self.__class__.NODE_FLAGS.copy() 

688 self.read_from_replicas = read_from_replicas 

689 self.load_balancing_strategy = load_balancing_strategy 

690 self.reinitialize_counter = 0 

691 self.reinitialize_steps = reinitialize_steps 

692 if event_dispatcher is None: 

693 self._event_dispatcher = EventDispatcher() 

694 else: 

695 self._event_dispatcher = event_dispatcher 

696 self.nodes_manager = NodesManager( 

697 startup_nodes=startup_nodes, 

698 from_url=from_url, 

699 require_full_coverage=require_full_coverage, 

700 dynamic_startup_nodes=dynamic_startup_nodes, 

701 address_remap=address_remap, 

702 cache=cache, 

703 cache_config=cache_config, 

704 event_dispatcher=self._event_dispatcher, 

705 **kwargs, 

706 ) 

707 

708 self.cluster_response_callbacks = CaseInsensitiveDict( 

709 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

710 ) 

711 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

712 

713 self.commands_parser = CommandsParser(self) 

714 self._lock = threading.RLock() 

715 

716 def __enter__(self): 

717 return self 

718 

719 def __exit__(self, exc_type, exc_value, traceback): 

720 self.close() 

721 

722 def __del__(self): 

723 try: 

724 self.close() 

725 except Exception: 

726 pass 

727 

728 def disconnect_connection_pools(self): 

729 for node in self.get_nodes(): 

730 if node.redis_connection: 

731 try: 

732 node.redis_connection.connection_pool.disconnect() 

733 except OSError: 

734 # Client was already disconnected. do nothing 

735 pass 

736 

737 def on_connect(self, connection): 

738 """ 

739 Initialize the connection, authenticate and select a database and send 

740 READONLY if it is set during object initialization. 

741 """ 

742 connection.on_connect() 

743 

744 if self.read_from_replicas or self.load_balancing_strategy: 

745 # Sending READONLY command to server to configure connection as 

746 # readonly. Since each cluster node may change its server type due 

747 # to a failover, we should establish a READONLY connection 

748 # regardless of the server type. If this is a primary connection, 

749 # READONLY would not affect executing write commands. 

750 connection.send_command("READONLY") 

751 if str_if_bytes(connection.read_response()) != "OK": 

752 raise ConnectionError("READONLY command failed") 

753 

754 if self.user_on_connect_func is not None: 

755 self.user_on_connect_func(connection) 

756 

757 def get_redis_connection(self, node: "ClusterNode") -> Redis: 

758 if not node.redis_connection: 

759 with self._lock: 

760 if not node.redis_connection: 

761 self.nodes_manager.create_redis_connections([node]) 

762 return node.redis_connection 

763 

764 def get_node(self, host=None, port=None, node_name=None): 

765 return self.nodes_manager.get_node(host, port, node_name) 

766 

767 def get_primaries(self): 

768 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

769 

770 def get_replicas(self): 

771 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

772 

773 def get_random_node(self): 

774 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

775 

776 def get_nodes(self): 

777 return list(self.nodes_manager.nodes_cache.values()) 

778 

779 def get_node_from_key(self, key, replica=False): 

780 """ 

781 Get the node that holds the key's slot. 

782 If replica set to True but the slot doesn't have any replicas, None is 

783 returned. 

784 """ 

785 slot = self.keyslot(key) 

786 slot_cache = self.nodes_manager.slots_cache.get(slot) 

787 if slot_cache is None or len(slot_cache) == 0: 

788 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

789 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

790 return None 

791 elif replica: 

792 node_idx = 1 

793 else: 

794 # primary 

795 node_idx = 0 

796 

797 return slot_cache[node_idx] 

798 

799 def get_default_node(self): 

800 """ 

801 Get the cluster's default node 

802 """ 

803 return self.nodes_manager.default_node 

804 

805 def set_default_node(self, node): 

806 """ 

807 Set the default node of the cluster. 

808 :param node: 'ClusterNode' 

809 :return True if the default node was set, else False 

810 """ 

811 if node is None or self.get_node(node_name=node.name) is None: 

812 return False 

813 self.nodes_manager.default_node = node 

814 return True 

815 

816 def set_retry(self, retry: Retry) -> None: 

817 self.retry = retry 

818 

819 def monitor(self, target_node=None): 

820 """ 

821 Returns a Monitor object for the specified target node. 

822 The default cluster node will be selected if no target node was 

823 specified. 

824 Monitor is useful for handling the MONITOR command to the redis server. 

825 next_command() method returns one command from monitor 

826 listen() method yields commands from monitor. 

827 """ 

828 if target_node is None: 

829 target_node = self.get_default_node() 

830 if target_node.redis_connection is None: 

831 raise RedisClusterException( 

832 f"Cluster Node {target_node.name} has no redis_connection" 

833 ) 

834 return target_node.redis_connection.monitor() 

835 

836 def pubsub(self, node=None, host=None, port=None, **kwargs): 

837 """ 

838 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

839 connected to the specified node 

840 """ 

841 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

842 

843 def pipeline(self, transaction=None, shard_hint=None): 

844 """ 

845 Cluster impl: 

846 Pipelines do not work in cluster mode the same way they 

847 do in normal mode. Create a clone of this object so 

848 that simulating pipelines will work correctly. Each 

849 command will be called directly when used and 

850 when calling execute() will only return the result stack. 

851 """ 

852 if shard_hint: 

853 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

854 

855 return ClusterPipeline( 

856 nodes_manager=self.nodes_manager, 

857 commands_parser=self.commands_parser, 

858 startup_nodes=self.nodes_manager.startup_nodes, 

859 result_callbacks=self.result_callbacks, 

860 cluster_response_callbacks=self.cluster_response_callbacks, 

861 read_from_replicas=self.read_from_replicas, 

862 load_balancing_strategy=self.load_balancing_strategy, 

863 reinitialize_steps=self.reinitialize_steps, 

864 retry=self.retry, 

865 lock=self._lock, 

866 transaction=transaction, 

867 ) 

868 

869 def lock( 

870 self, 

871 name, 

872 timeout=None, 

873 sleep=0.1, 

874 blocking=True, 

875 blocking_timeout=None, 

876 lock_class=None, 

877 thread_local=True, 

878 raise_on_release_error: bool = True, 

879 ): 

880 """ 

881 Return a new Lock object using key ``name`` that mimics 

882 the behavior of threading.Lock. 

883 

884 If specified, ``timeout`` indicates a maximum life for the lock. 

885 By default, it will remain locked until release() is called. 

886 

887 ``sleep`` indicates the amount of time to sleep per loop iteration 

888 when the lock is in blocking mode and another client is currently 

889 holding the lock. 

890 

891 ``blocking`` indicates whether calling ``acquire`` should block until 

892 the lock has been acquired or to fail immediately, causing ``acquire`` 

893 to return False and the lock not being acquired. Defaults to True. 

894 Note this value can be overridden by passing a ``blocking`` 

895 argument to ``acquire``. 

896 

897 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

898 spend trying to acquire the lock. A value of ``None`` indicates 

899 continue trying forever. ``blocking_timeout`` can be specified as a 

900 float or integer, both representing the number of seconds to wait. 

901 

902 ``lock_class`` forces the specified lock implementation. Note that as 

903 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

904 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

905 you have created your own custom lock class. 

906 

907 ``thread_local`` indicates whether the lock token is placed in 

908 thread-local storage. By default, the token is placed in thread local 

909 storage so that a thread only sees its token, not a token set by 

910 another thread. Consider the following timeline: 

911 

912 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

913 thread-1 sets the token to "abc" 

914 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

915 Lock instance. 

916 time: 5, thread-1 has not yet completed. redis expires the lock 

917 key. 

918 time: 5, thread-2 acquired `my-lock` now that it's available. 

919 thread-2 sets the token to "xyz" 

920 time: 6, thread-1 finishes its work and calls release(). if the 

921 token is *not* stored in thread local storage, then 

922 thread-1 would see the token value as "xyz" and would be 

923 able to successfully release the thread-2's lock. 

924 

925 ``raise_on_release_error`` indicates whether to raise an exception when 

926 the lock is no longer owned when exiting the context manager. By default, 

927 this is True, meaning an exception will be raised. If False, the warning 

928 will be logged and the exception will be suppressed. 

929 

930 In some use cases it's necessary to disable thread local storage. For 

931 example, if you have code where one thread acquires a lock and passes 

932 that lock instance to a worker thread to release later. If thread 

933 local storage isn't disabled in this case, the worker thread won't see 

934 the token set by the thread that acquired the lock. Our assumption 

935 is that these cases aren't common and as such default to using 

936 thread local storage.""" 

937 if lock_class is None: 

938 lock_class = Lock 

939 return lock_class( 

940 self, 

941 name, 

942 timeout=timeout, 

943 sleep=sleep, 

944 blocking=blocking, 

945 blocking_timeout=blocking_timeout, 

946 thread_local=thread_local, 

947 raise_on_release_error=raise_on_release_error, 

948 ) 

949 

950 def set_response_callback(self, command, callback): 

951 """Set a custom Response Callback""" 

952 self.cluster_response_callbacks[command] = callback 

953 

954 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: 

955 # Determine which nodes should be executed the command on. 

956 # Returns a list of target nodes. 

957 command = args[0].upper() 

958 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

959 command = f"{args[0]} {args[1]}".upper() 

960 

961 nodes_flag = kwargs.pop("nodes_flag", None) 

962 if nodes_flag is not None: 

963 # nodes flag passed by the user 

964 command_flag = nodes_flag 

965 else: 

966 # get the nodes group for this command if it was predefined 

967 command_flag = self.command_flags.get(command) 

968 if command_flag == self.__class__.RANDOM: 

969 # return a random node 

970 return [self.get_random_node()] 

971 elif command_flag == self.__class__.PRIMARIES: 

972 # return all primaries 

973 return self.get_primaries() 

974 elif command_flag == self.__class__.REPLICAS: 

975 # return all replicas 

976 return self.get_replicas() 

977 elif command_flag == self.__class__.ALL_NODES: 

978 # return all nodes 

979 return self.get_nodes() 

980 elif command_flag == self.__class__.DEFAULT_NODE: 

981 # return the cluster's default node 

982 return [self.nodes_manager.default_node] 

983 elif command in self.__class__.SEARCH_COMMANDS[0]: 

984 return [self.nodes_manager.default_node] 

985 else: 

986 # get the node that holds the key's slot 

987 slot = self.determine_slot(*args) 

988 node = self.nodes_manager.get_node_from_slot( 

989 slot, 

990 self.read_from_replicas and command in READ_COMMANDS, 

991 self.load_balancing_strategy if command in READ_COMMANDS else None, 

992 ) 

993 return [node] 

994 

995 def _should_reinitialized(self): 

996 # To reinitialize the cluster on every MOVED error, 

997 # set reinitialize_steps to 1. 

998 # To avoid reinitializing the cluster on moved errors, set 

999 # reinitialize_steps to 0. 

1000 if self.reinitialize_steps == 0: 

1001 return False 

1002 else: 

1003 return self.reinitialize_counter % self.reinitialize_steps == 0 

1004 

1005 def keyslot(self, key): 

1006 """ 

1007 Calculate keyslot for a given key. 

1008 See Keys distribution model in https://redis.io/topics/cluster-spec 

1009 """ 

1010 k = self.encoder.encode(key) 

1011 return key_slot(k) 

1012 

1013 def _get_command_keys(self, *args): 

1014 """ 

1015 Get the keys in the command. If the command has no keys in in, None is 

1016 returned. 

1017 

1018 NOTE: Due to a bug in redis<7.0, this function does not work properly 

1019 for EVAL or EVALSHA when the `numkeys` arg is 0. 

1020 - issue: https://github.com/redis/redis/issues/9493 

1021 - fix: https://github.com/redis/redis/pull/9733 

1022 

1023 So, don't use this function with EVAL or EVALSHA. 

1024 """ 

1025 redis_conn = self.get_default_node().redis_connection 

1026 return self.commands_parser.get_keys(redis_conn, *args) 

1027 

1028 def determine_slot(self, *args) -> int: 

1029 """ 

1030 Figure out what slot to use based on args. 

1031 

1032 Raises a RedisClusterException if there's a missing key and we can't 

1033 determine what slots to map the command to; or, if the keys don't 

1034 all map to the same key slot. 

1035 """ 

1036 command = args[0] 

1037 if self.command_flags.get(command) == SLOT_ID: 

1038 # The command contains the slot ID 

1039 return args[1] 

1040 

1041 # Get the keys in the command 

1042 

1043 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

1044 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

1045 # where `self._get_command_keys()` fails anyway. So, we special case 

1046 # EVAL/EVALSHA. 

1047 if command.upper() in ("EVAL", "EVALSHA"): 

1048 # command syntax: EVAL "script body" num_keys ... 

1049 if len(args) <= 2: 

1050 raise RedisClusterException(f"Invalid args in command: {args}") 

1051 num_actual_keys = int(args[2]) 

1052 eval_keys = args[3 : 3 + num_actual_keys] 

1053 # if there are 0 keys, that means the script can be run on any node 

1054 # so we can just return a random slot 

1055 if len(eval_keys) == 0: 

1056 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1057 keys = eval_keys 

1058 else: 

1059 keys = self._get_command_keys(*args) 

1060 if keys is None or len(keys) == 0: 

1061 # FCALL can call a function with 0 keys, that means the function 

1062 # can be run on any node so we can just return a random slot 

1063 if command.upper() in ("FCALL", "FCALL_RO"): 

1064 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1065 raise RedisClusterException( 

1066 "No way to dispatch this command to Redis Cluster. " 

1067 "Missing key.\nYou can execute the command by specifying " 

1068 f"target nodes.\nCommand: {args}" 

1069 ) 

1070 

1071 # single key command 

1072 if len(keys) == 1: 

1073 return self.keyslot(keys[0]) 

1074 

1075 # multi-key command; we need to make sure all keys are mapped to 

1076 # the same slot 

1077 slots = {self.keyslot(key) for key in keys} 

1078 if len(slots) != 1: 

1079 raise RedisClusterException( 

1080 f"{command} - all keys must map to the same key slot" 

1081 ) 

1082 

1083 return slots.pop() 

1084 

1085 def get_encoder(self): 

1086 """ 

1087 Get the connections' encoder 

1088 """ 

1089 return self.encoder 

1090 

1091 def get_connection_kwargs(self): 

1092 """ 

1093 Get the connections' key-word arguments 

1094 """ 

1095 return self.nodes_manager.connection_kwargs 

1096 

1097 def _is_nodes_flag(self, target_nodes): 

1098 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1099 

1100 def _parse_target_nodes(self, target_nodes): 

1101 if isinstance(target_nodes, list): 

1102 nodes = target_nodes 

1103 elif isinstance(target_nodes, ClusterNode): 

1104 # Supports passing a single ClusterNode as a variable 

1105 nodes = [target_nodes] 

1106 elif isinstance(target_nodes, dict): 

1107 # Supports dictionaries of the format {node_name: node}. 

1108 # It enables to execute commands with multi nodes as follows: 

1109 # rc.cluster_save_config(rc.get_primaries()) 

1110 nodes = target_nodes.values() 

1111 else: 

1112 raise TypeError( 

1113 "target_nodes type can be one of the following: " 

1114 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1115 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1116 f"The passed type is {type(target_nodes)}" 

1117 ) 

1118 return nodes 

1119 

1120 def execute_command(self, *args, **kwargs): 

1121 return self._internal_execute_command(*args, **kwargs) 

1122 

1123 def _internal_execute_command(self, *args, **kwargs): 

1124 """ 

1125 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1126 

1127 It will try the number of times specified by the retries property from 

1128 config option "self.retry" which defaults to 3 unless manually 

1129 configured. 

1130 

1131 If it reaches the number of times, the command will raise the exception 

1132 

1133 Key argument :target_nodes: can be passed with the following types: 

1134 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1135 ClusterNode 

1136 list<ClusterNode> 

1137 dict<Any, ClusterNode> 

1138 """ 

1139 target_nodes_specified = False 

1140 is_default_node = False 

1141 target_nodes = None 

1142 passed_targets = kwargs.pop("target_nodes", None) 

1143 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1144 target_nodes = self._parse_target_nodes(passed_targets) 

1145 target_nodes_specified = True 

1146 # If an error that allows retrying was thrown, the nodes and slots 

1147 # cache were reinitialized. We will retry executing the command with 

1148 # the updated cluster setup only when the target nodes can be 

1149 # determined again with the new cache tables. Therefore, when target 

1150 # nodes were passed to this function, we cannot retry the command 

1151 # execution since the nodes may not be valid anymore after the tables 

1152 # were reinitialized. So in case of passed target nodes, 

1153 # retry_attempts will be set to 0. 

1154 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries() 

1155 # Add one for the first execution 

1156 execute_attempts = 1 + retry_attempts 

1157 for _ in range(execute_attempts): 

1158 try: 

1159 res = {} 

1160 if not target_nodes_specified: 

1161 # Determine the nodes to execute the command on 

1162 target_nodes = self._determine_nodes( 

1163 *args, **kwargs, nodes_flag=passed_targets 

1164 ) 

1165 if not target_nodes: 

1166 raise RedisClusterException( 

1167 f"No targets were found to execute {args} command on" 

1168 ) 

1169 if ( 

1170 len(target_nodes) == 1 

1171 and target_nodes[0] == self.get_default_node() 

1172 ): 

1173 is_default_node = True 

1174 for node in target_nodes: 

1175 res[node.name] = self._execute_command(node, *args, **kwargs) 

1176 # Return the processed result 

1177 return self._process_result(args[0], res, **kwargs) 

1178 except Exception as e: 

1179 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1180 if is_default_node: 

1181 # Replace the default cluster node 

1182 self.replace_default_node() 

1183 # The nodes and slots cache were reinitialized. 

1184 # Try again with the new cluster setup. 

1185 retry_attempts -= 1 

1186 continue 

1187 else: 

1188 # raise the exception 

1189 raise e 

1190 

1191 def _execute_command(self, target_node, *args, **kwargs): 

1192 """ 

1193 Send a command to a node in the cluster 

1194 """ 

1195 command = args[0] 

1196 redis_node = None 

1197 connection = None 

1198 redirect_addr = None 

1199 asking = False 

1200 moved = False 

1201 ttl = int(self.RedisClusterRequestTTL) 

1202 

1203 while ttl > 0: 

1204 ttl -= 1 

1205 try: 

1206 if asking: 

1207 target_node = self.get_node(node_name=redirect_addr) 

1208 elif moved: 

1209 # MOVED occurred and the slots cache was updated, 

1210 # refresh the target node 

1211 slot = self.determine_slot(*args) 

1212 target_node = self.nodes_manager.get_node_from_slot( 

1213 slot, 

1214 self.read_from_replicas and command in READ_COMMANDS, 

1215 self.load_balancing_strategy 

1216 if command in READ_COMMANDS 

1217 else None, 

1218 ) 

1219 moved = False 

1220 

1221 redis_node = self.get_redis_connection(target_node) 

1222 connection = get_connection(redis_node) 

1223 if asking: 

1224 connection.send_command("ASKING") 

1225 redis_node.parse_response(connection, "ASKING", **kwargs) 

1226 asking = False 

1227 connection.send_command(*args, **kwargs) 

1228 response = redis_node.parse_response(connection, command, **kwargs) 

1229 

1230 # Remove keys entry, it needs only for cache. 

1231 kwargs.pop("keys", None) 

1232 

1233 if command in self.cluster_response_callbacks: 

1234 response = self.cluster_response_callbacks[command]( 

1235 response, **kwargs 

1236 ) 

1237 return response 

1238 except AuthenticationError: 

1239 raise 

1240 except MaxConnectionsError: 

1241 # MaxConnectionsError indicates client-side resource exhaustion 

1242 # (too many connections in the pool), not a node failure. 

1243 # Don't treat this as a node failure - just re-raise the error 

1244 # without reinitializing the cluster. 

1245 raise 

1246 except (ConnectionError, TimeoutError) as e: 

1247 # ConnectionError can also be raised if we couldn't get a 

1248 # connection from the pool before timing out, so check that 

1249 # this is an actual connection before attempting to disconnect. 

1250 if connection is not None: 

1251 connection.disconnect() 

1252 

1253 # Remove the failed node from the startup nodes before we try 

1254 # to reinitialize the cluster 

1255 self.nodes_manager.startup_nodes.pop(target_node.name, None) 

1256 # Reset the cluster node's connection 

1257 target_node.redis_connection = None 

1258 self.nodes_manager.initialize() 

1259 raise e 

1260 except MovedError as e: 

1261 # First, we will try to patch the slots/nodes cache with the 

1262 # redirected node output and try again. If MovedError exceeds 

1263 # 'reinitialize_steps' number of times, we will force 

1264 # reinitializing the tables, and then try again. 

1265 # 'reinitialize_steps' counter will increase faster when 

1266 # the same client object is shared between multiple threads. To 

1267 # reduce the frequency you can set this variable in the 

1268 # RedisCluster constructor. 

1269 self.reinitialize_counter += 1 

1270 if self._should_reinitialized(): 

1271 self.nodes_manager.initialize() 

1272 # Reset the counter 

1273 self.reinitialize_counter = 0 

1274 else: 

1275 self.nodes_manager.update_moved_exception(e) 

1276 moved = True 

1277 except TryAgainError: 

1278 if ttl < self.RedisClusterRequestTTL / 2: 

1279 time.sleep(0.05) 

1280 except AskError as e: 

1281 redirect_addr = get_node_name(host=e.host, port=e.port) 

1282 asking = True 

1283 except (ClusterDownError, SlotNotCoveredError): 

1284 # ClusterDownError can occur during a failover and to get 

1285 # self-healed, we will try to reinitialize the cluster layout 

1286 # and retry executing the command 

1287 

1288 # SlotNotCoveredError can occur when the cluster is not fully 

1289 # initialized or can be temporary issue. 

1290 # We will try to reinitialize the cluster topology 

1291 # and retry executing the command 

1292 

1293 time.sleep(0.25) 

1294 self.nodes_manager.initialize() 

1295 raise 

1296 except ResponseError: 

1297 raise 

1298 except Exception as e: 

1299 if connection: 

1300 connection.disconnect() 

1301 raise e 

1302 finally: 

1303 if connection is not None: 

1304 redis_node.connection_pool.release(connection) 

1305 

1306 raise ClusterError("TTL exhausted.") 

1307 

1308 def close(self) -> None: 

1309 try: 

1310 with self._lock: 

1311 if self.nodes_manager: 

1312 self.nodes_manager.close() 

1313 except AttributeError: 

1314 # RedisCluster's __init__ can fail before nodes_manager is set 

1315 pass 

1316 

1317 def _process_result(self, command, res, **kwargs): 

1318 """ 

1319 Process the result of the executed command. 

1320 The function would return a dict or a single value. 

1321 

1322 :type command: str 

1323 :type res: dict 

1324 

1325 `res` should be in the following format: 

1326 Dict<node_name, command_result> 

1327 """ 

1328 if command in self.result_callbacks: 

1329 return self.result_callbacks[command](command, res, **kwargs) 

1330 elif len(res) == 1: 

1331 # When we execute the command on a single node, we can 

1332 # remove the dictionary and return a single response 

1333 return list(res.values())[0] 

1334 else: 

1335 return res 

1336 

1337 def load_external_module(self, funcname, func): 

1338 """ 

1339 This function can be used to add externally defined redis modules, 

1340 and their namespaces to the redis client. 

1341 

1342 ``funcname`` - A string containing the name of the function to create 

1343 ``func`` - The function, being added to this class. 

1344 """ 

1345 setattr(self, funcname, func) 

1346 

1347 def transaction(self, func, *watches, **kwargs): 

1348 """ 

1349 Convenience method for executing the callable `func` as a transaction 

1350 while watching all keys specified in `watches`. The 'func' callable 

1351 should expect a single argument which is a Pipeline object. 

1352 """ 

1353 shard_hint = kwargs.pop("shard_hint", None) 

1354 value_from_callable = kwargs.pop("value_from_callable", False) 

1355 watch_delay = kwargs.pop("watch_delay", None) 

1356 with self.pipeline(True, shard_hint) as pipe: 

1357 while True: 

1358 try: 

1359 if watches: 

1360 pipe.watch(*watches) 

1361 func_value = func(pipe) 

1362 exec_value = pipe.execute() 

1363 return func_value if value_from_callable else exec_value 

1364 except WatchError: 

1365 if watch_delay is not None and watch_delay > 0: 

1366 time.sleep(watch_delay) 

1367 continue 

1368 

1369 

1370class ClusterNode: 

1371 def __init__(self, host, port, server_type=None, redis_connection=None): 

1372 if host == "localhost": 

1373 host = socket.gethostbyname(host) 

1374 

1375 self.host = host 

1376 self.port = port 

1377 self.name = get_node_name(host, port) 

1378 self.server_type = server_type 

1379 self.redis_connection = redis_connection 

1380 

1381 def __repr__(self): 

1382 return ( 

1383 f"[host={self.host}," 

1384 f"port={self.port}," 

1385 f"name={self.name}," 

1386 f"server_type={self.server_type}," 

1387 f"redis_connection={self.redis_connection}]" 

1388 ) 

1389 

1390 def __eq__(self, obj): 

1391 return isinstance(obj, ClusterNode) and obj.name == self.name 

1392 

1393 def __del__(self): 

1394 try: 

1395 if self.redis_connection is not None: 

1396 self.redis_connection.close() 

1397 except Exception: 

1398 # Ignore errors when closing the connection 

1399 pass 

1400 

1401 

1402class LoadBalancingStrategy(Enum): 

1403 ROUND_ROBIN = "round_robin" 

1404 ROUND_ROBIN_REPLICAS = "round_robin_replicas" 

1405 RANDOM_REPLICA = "random_replica" 

1406 

1407 

1408class LoadBalancer: 

1409 """ 

1410 Round-Robin Load Balancing 

1411 """ 

1412 

1413 def __init__(self, start_index: int = 0) -> None: 

1414 self.primary_to_idx = {} 

1415 self.start_index = start_index 

1416 

1417 def get_server_index( 

1418 self, 

1419 primary: str, 

1420 list_size: int, 

1421 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN, 

1422 ) -> int: 

1423 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA: 

1424 return self._get_random_replica_index(list_size) 

1425 else: 

1426 return self._get_round_robin_index( 

1427 primary, 

1428 list_size, 

1429 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, 

1430 ) 

1431 

1432 def reset(self) -> None: 

1433 self.primary_to_idx.clear() 

1434 

1435 def _get_random_replica_index(self, list_size: int) -> int: 

1436 return random.randint(1, list_size - 1) 

1437 

1438 def _get_round_robin_index( 

1439 self, primary: str, list_size: int, replicas_only: bool 

1440 ) -> int: 

1441 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

1442 if replicas_only and server_index == 0: 

1443 # skip the primary node index 

1444 server_index = 1 

1445 # Update the index for the next round 

1446 self.primary_to_idx[primary] = (server_index + 1) % list_size 

1447 return server_index 

1448 

1449 

1450class NodesManager: 

1451 def __init__( 

1452 self, 

1453 startup_nodes, 

1454 from_url=False, 

1455 require_full_coverage=False, 

1456 lock=None, 

1457 dynamic_startup_nodes=True, 

1458 connection_pool_class=ConnectionPool, 

1459 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

1460 cache: Optional[CacheInterface] = None, 

1461 cache_config: Optional[CacheConfig] = None, 

1462 cache_factory: Optional[CacheFactoryInterface] = None, 

1463 event_dispatcher: Optional[EventDispatcher] = None, 

1464 **kwargs, 

1465 ): 

1466 self.nodes_cache: Dict[str, Redis] = {} 

1467 self.slots_cache = {} 

1468 self.startup_nodes = {} 

1469 self.default_node = None 

1470 self.populate_startup_nodes(startup_nodes) 

1471 self.from_url = from_url 

1472 self._require_full_coverage = require_full_coverage 

1473 self._dynamic_startup_nodes = dynamic_startup_nodes 

1474 self.connection_pool_class = connection_pool_class 

1475 self.address_remap = address_remap 

1476 self._cache = cache 

1477 self._cache_config = cache_config 

1478 self._cache_factory = cache_factory 

1479 self._moved_exception = None 

1480 self.connection_kwargs = kwargs 

1481 self.read_load_balancer = LoadBalancer() 

1482 if lock is None: 

1483 lock = threading.RLock() 

1484 self._lock = lock 

1485 if event_dispatcher is None: 

1486 self._event_dispatcher = EventDispatcher() 

1487 else: 

1488 self._event_dispatcher = event_dispatcher 

1489 self._credential_provider = self.connection_kwargs.get( 

1490 "credential_provider", None 

1491 ) 

1492 self.initialize() 

1493 

1494 def get_node(self, host=None, port=None, node_name=None): 

1495 """ 

1496 Get the requested node from the cluster's nodes. 

1497 nodes. 

1498 :return: ClusterNode if the node exists, else None 

1499 """ 

1500 if host and port: 

1501 # the user passed host and port 

1502 if host == "localhost": 

1503 host = socket.gethostbyname(host) 

1504 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1505 elif node_name: 

1506 return self.nodes_cache.get(node_name) 

1507 else: 

1508 return None 

1509 

1510 def update_moved_exception(self, exception): 

1511 self._moved_exception = exception 

1512 

1513 def _update_moved_slots(self): 

1514 """ 

1515 Update the slot's node with the redirected one 

1516 """ 

1517 e = self._moved_exception 

1518 redirected_node = self.get_node(host=e.host, port=e.port) 

1519 if redirected_node is not None: 

1520 # The node already exists 

1521 if redirected_node.server_type is not PRIMARY: 

1522 # Update the node's server type 

1523 redirected_node.server_type = PRIMARY 

1524 else: 

1525 # This is a new node, we will add it to the nodes cache 

1526 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

1527 self.nodes_cache[redirected_node.name] = redirected_node 

1528 if redirected_node in self.slots_cache[e.slot_id]: 

1529 # The MOVED error resulted from a failover, and the new slot owner 

1530 # had previously been a replica. 

1531 old_primary = self.slots_cache[e.slot_id][0] 

1532 # Update the old primary to be a replica and add it to the end of 

1533 # the slot's node list 

1534 old_primary.server_type = REPLICA 

1535 self.slots_cache[e.slot_id].append(old_primary) 

1536 # Remove the old replica, which is now a primary, from the slot's 

1537 # node list 

1538 self.slots_cache[e.slot_id].remove(redirected_node) 

1539 # Override the old primary with the new one 

1540 self.slots_cache[e.slot_id][0] = redirected_node 

1541 if self.default_node == old_primary: 

1542 # Update the default node with the new primary 

1543 self.default_node = redirected_node 

1544 else: 

1545 # The new slot owner is a new server, or a server from a different 

1546 # shard. We need to remove all current nodes from the slot's list 

1547 # (including replications) and add just the new node. 

1548 self.slots_cache[e.slot_id] = [redirected_node] 

1549 # Reset moved_exception 

1550 self._moved_exception = None 

1551 

1552 @deprecated_args( 

1553 args_to_warn=["server_type"], 

1554 reason=( 

1555 "In case you need select some load balancing strategy " 

1556 "that will use replicas, please set it through 'load_balancing_strategy'" 

1557 ), 

1558 version="5.3.0", 

1559 ) 

1560 def get_node_from_slot( 

1561 self, 

1562 slot, 

1563 read_from_replicas=False, 

1564 load_balancing_strategy=None, 

1565 server_type=None, 

1566 ) -> ClusterNode: 

1567 """ 

1568 Gets a node that servers this hash slot 

1569 """ 

1570 if self._moved_exception: 

1571 with self._lock: 

1572 if self._moved_exception: 

1573 self._update_moved_slots() 

1574 

1575 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

1576 raise SlotNotCoveredError( 

1577 f'Slot "{slot}" not covered by the cluster. ' 

1578 f'"require_full_coverage={self._require_full_coverage}"' 

1579 ) 

1580 

1581 if read_from_replicas is True and load_balancing_strategy is None: 

1582 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN 

1583 

1584 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: 

1585 # get the server index using the strategy defined in load_balancing_strategy 

1586 primary_name = self.slots_cache[slot][0].name 

1587 node_idx = self.read_load_balancer.get_server_index( 

1588 primary_name, len(self.slots_cache[slot]), load_balancing_strategy 

1589 ) 

1590 elif ( 

1591 server_type is None 

1592 or server_type == PRIMARY 

1593 or len(self.slots_cache[slot]) == 1 

1594 ): 

1595 # return a primary 

1596 node_idx = 0 

1597 else: 

1598 # return a replica 

1599 # randomly choose one of the replicas 

1600 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

1601 

1602 return self.slots_cache[slot][node_idx] 

1603 

1604 def get_nodes_by_server_type(self, server_type): 

1605 """ 

1606 Get all nodes with the specified server type 

1607 :param server_type: 'primary' or 'replica' 

1608 :return: list of ClusterNode 

1609 """ 

1610 return [ 

1611 node 

1612 for node in self.nodes_cache.values() 

1613 if node.server_type == server_type 

1614 ] 

1615 

1616 def populate_startup_nodes(self, nodes): 

1617 """ 

1618 Populate all startup nodes and filters out any duplicates 

1619 """ 

1620 for n in nodes: 

1621 self.startup_nodes[n.name] = n 

1622 

1623 def check_slots_coverage(self, slots_cache): 

1624 # Validate if all slots are covered or if we should try next 

1625 # startup node 

1626 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

1627 if i not in slots_cache: 

1628 return False 

1629 return True 

1630 

1631 def create_redis_connections(self, nodes): 

1632 """ 

1633 This function will create a redis connection to all nodes in :nodes: 

1634 """ 

1635 connection_pools = [] 

1636 for node in nodes: 

1637 if node.redis_connection is None: 

1638 node.redis_connection = self.create_redis_node( 

1639 host=node.host, port=node.port, **self.connection_kwargs 

1640 ) 

1641 connection_pools.append(node.redis_connection.connection_pool) 

1642 

1643 self._event_dispatcher.dispatch( 

1644 AfterPooledConnectionsInstantiationEvent( 

1645 connection_pools, ClientType.SYNC, self._credential_provider 

1646 ) 

1647 ) 

1648 

1649 def create_redis_node(self, host, port, **kwargs): 

1650 # We are configuring the connection pool not to retry 

1651 # connections on lower level clients to avoid retrying 

1652 # connections to nodes that are not reachable 

1653 # and to avoid blocking the connection pool. 

1654 # The only error that will have some handling in the lower 

1655 # level clients is ConnectionError which will trigger disconnection 

1656 # of the socket. 

1657 # The retries will be handled on cluster client level 

1658 # where we will have proper handling of the cluster topology 

1659 node_retry_config = Retry( 

1660 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) 

1661 ) 

1662 

1663 if self.from_url: 

1664 # Create a redis node with a costumed connection pool 

1665 kwargs.update({"host": host}) 

1666 kwargs.update({"port": port}) 

1667 kwargs.update({"cache": self._cache}) 

1668 kwargs.update({"retry": node_retry_config}) 

1669 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

1670 else: 

1671 r = Redis( 

1672 host=host, 

1673 port=port, 

1674 cache=self._cache, 

1675 retry=node_retry_config, 

1676 **kwargs, 

1677 ) 

1678 return r 

1679 

1680 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

1681 node_name = get_node_name(host, port) 

1682 # check if we already have this node in the tmp_nodes_cache 

1683 target_node = tmp_nodes_cache.get(node_name) 

1684 if target_node is None: 

1685 # before creating a new cluster node, check if the cluster node already 

1686 # exists in the current nodes cache and has a valid connection so we can 

1687 # reuse it 

1688 target_node = self.nodes_cache.get(node_name) 

1689 if target_node is None or target_node.redis_connection is None: 

1690 # create new cluster node for this cluster 

1691 target_node = ClusterNode(host, port, role) 

1692 if target_node.server_type != role: 

1693 target_node.server_type = role 

1694 # add this node to the nodes cache 

1695 tmp_nodes_cache[target_node.name] = target_node 

1696 

1697 return target_node 

1698 

1699 def initialize(self): 

1700 """ 

1701 Initializes the nodes cache, slots cache and redis connections. 

1702 :startup_nodes: 

1703 Responsible for discovering other nodes in the cluster 

1704 """ 

1705 self.reset() 

1706 tmp_nodes_cache = {} 

1707 tmp_slots = {} 

1708 disagreements = [] 

1709 startup_nodes_reachable = False 

1710 fully_covered = False 

1711 kwargs = self.connection_kwargs 

1712 exception = None 

1713 # Convert to tuple to prevent RuntimeError if self.startup_nodes 

1714 # is modified during iteration 

1715 for startup_node in tuple(self.startup_nodes.values()): 

1716 try: 

1717 if startup_node.redis_connection: 

1718 r = startup_node.redis_connection 

1719 else: 

1720 # Create a new Redis connection 

1721 r = self.create_redis_node( 

1722 startup_node.host, startup_node.port, **kwargs 

1723 ) 

1724 self.startup_nodes[startup_node.name].redis_connection = r 

1725 # Make sure cluster mode is enabled on this node 

1726 try: 

1727 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

1728 r.connection_pool.disconnect() 

1729 except ResponseError: 

1730 raise RedisClusterException( 

1731 "Cluster mode is not enabled on this node" 

1732 ) 

1733 startup_nodes_reachable = True 

1734 except Exception as e: 

1735 # Try the next startup node. 

1736 # The exception is saved and raised only if we have no more nodes. 

1737 exception = e 

1738 continue 

1739 

1740 # CLUSTER SLOTS command results in the following output: 

1741 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

1742 # where each node contains the following list: [IP, port, node_id] 

1743 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

1744 # primary node of the first slot section. 

1745 # If there's only one server in the cluster, its ``host`` is '' 

1746 # Fix it to the host in startup_nodes 

1747 if ( 

1748 len(cluster_slots) == 1 

1749 and len(cluster_slots[0][2][0]) == 0 

1750 and len(self.startup_nodes) == 1 

1751 ): 

1752 cluster_slots[0][2][0] = startup_node.host 

1753 

1754 for slot in cluster_slots: 

1755 primary_node = slot[2] 

1756 host = str_if_bytes(primary_node[0]) 

1757 if host == "": 

1758 host = startup_node.host 

1759 port = int(primary_node[1]) 

1760 host, port = self.remap_host_port(host, port) 

1761 

1762 nodes_for_slot = [] 

1763 

1764 target_node = self._get_or_create_cluster_node( 

1765 host, port, PRIMARY, tmp_nodes_cache 

1766 ) 

1767 nodes_for_slot.append(target_node) 

1768 

1769 replica_nodes = slot[3:] 

1770 for replica_node in replica_nodes: 

1771 host = str_if_bytes(replica_node[0]) 

1772 port = int(replica_node[1]) 

1773 host, port = self.remap_host_port(host, port) 

1774 target_replica_node = self._get_or_create_cluster_node( 

1775 host, port, REPLICA, tmp_nodes_cache 

1776 ) 

1777 nodes_for_slot.append(target_replica_node) 

1778 

1779 for i in range(int(slot[0]), int(slot[1]) + 1): 

1780 if i not in tmp_slots: 

1781 tmp_slots[i] = nodes_for_slot 

1782 else: 

1783 # Validate that 2 nodes want to use the same slot cache 

1784 # setup 

1785 tmp_slot = tmp_slots[i][0] 

1786 if tmp_slot.name != target_node.name: 

1787 disagreements.append( 

1788 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

1789 ) 

1790 

1791 if len(disagreements) > 5: 

1792 raise RedisClusterException( 

1793 f"startup_nodes could not agree on a valid " 

1794 f"slots cache: {', '.join(disagreements)}" 

1795 ) 

1796 

1797 fully_covered = self.check_slots_coverage(tmp_slots) 

1798 if fully_covered: 

1799 # Don't need to continue to the next startup node if all 

1800 # slots are covered 

1801 break 

1802 

1803 if not startup_nodes_reachable: 

1804 raise RedisClusterException( 

1805 f"Redis Cluster cannot be connected. Please provide at least " 

1806 f"one reachable node: {str(exception)}" 

1807 ) from exception 

1808 

1809 if self._cache is None and self._cache_config is not None: 

1810 if self._cache_factory is None: 

1811 self._cache = CacheFactory(self._cache_config).get_cache() 

1812 else: 

1813 self._cache = self._cache_factory.get_cache() 

1814 

1815 # Create Redis connections to all nodes 

1816 self.create_redis_connections(list(tmp_nodes_cache.values())) 

1817 

1818 # Check if the slots are not fully covered 

1819 if not fully_covered and self._require_full_coverage: 

1820 # Despite the requirement that the slots be covered, there 

1821 # isn't a full coverage 

1822 raise RedisClusterException( 

1823 f"All slots are not covered after query all startup_nodes. " 

1824 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

1825 f"covered..." 

1826 ) 

1827 

1828 # Set the tmp variables to the real variables 

1829 self.nodes_cache = tmp_nodes_cache 

1830 self.slots_cache = tmp_slots 

1831 # Set the default node 

1832 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

1833 if self._dynamic_startup_nodes: 

1834 # Populate the startup nodes with all discovered nodes 

1835 self.startup_nodes = tmp_nodes_cache 

1836 # If initialize was called after a MovedError, clear it 

1837 self._moved_exception = None 

1838 

1839 def close(self) -> None: 

1840 self.default_node = None 

1841 for node in self.nodes_cache.values(): 

1842 if node.redis_connection: 

1843 node.redis_connection.close() 

1844 

1845 def reset(self): 

1846 try: 

1847 self.read_load_balancer.reset() 

1848 except TypeError: 

1849 # The read_load_balancer is None, do nothing 

1850 pass 

1851 

1852 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

1853 """ 

1854 Remap the host and port returned from the cluster to a different 

1855 internal value. Useful if the client is not connecting directly 

1856 to the cluster. 

1857 """ 

1858 if self.address_remap: 

1859 return self.address_remap((host, port)) 

1860 return host, port 

1861 

1862 def find_connection_owner(self, connection: Connection) -> Optional[Redis]: 

1863 node_name = get_node_name(connection.host, connection.port) 

1864 for node in tuple(self.nodes_cache.values()): 

1865 if node.redis_connection: 

1866 conn_args = node.redis_connection.connection_pool.connection_kwargs 

1867 if node_name == get_node_name( 

1868 conn_args.get("host"), conn_args.get("port") 

1869 ): 

1870 return node 

1871 

1872 

1873class ClusterPubSub(PubSub): 

1874 """ 

1875 Wrapper for PubSub class. 

1876 

1877 IMPORTANT: before using ClusterPubSub, read about the known limitations 

1878 with pubsub in Cluster mode and learn how to workaround them: 

1879 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html 

1880 """ 

1881 

1882 def __init__( 

1883 self, 

1884 redis_cluster, 

1885 node=None, 

1886 host=None, 

1887 port=None, 

1888 push_handler_func=None, 

1889 event_dispatcher: Optional["EventDispatcher"] = None, 

1890 **kwargs, 

1891 ): 

1892 """ 

1893 When a pubsub instance is created without specifying a node, a single 

1894 node will be transparently chosen for the pubsub connection on the 

1895 first command execution. The node will be determined by: 

1896 1. Hashing the channel name in the request to find its keyslot 

1897 2. Selecting a node that handles the keyslot: If read_from_replicas is 

1898 set to true or load_balancing_strategy is set, a replica can be selected. 

1899 

1900 :type redis_cluster: RedisCluster 

1901 :type node: ClusterNode 

1902 :type host: str 

1903 :type port: int 

1904 """ 

1905 self.node = None 

1906 self.set_pubsub_node(redis_cluster, node, host, port) 

1907 connection_pool = ( 

1908 None 

1909 if self.node is None 

1910 else redis_cluster.get_redis_connection(self.node).connection_pool 

1911 ) 

1912 self.cluster = redis_cluster 

1913 self.node_pubsub_mapping = {} 

1914 self._pubsubs_generator = self._pubsubs_generator() 

1915 if event_dispatcher is None: 

1916 self._event_dispatcher = EventDispatcher() 

1917 else: 

1918 self._event_dispatcher = event_dispatcher 

1919 super().__init__( 

1920 connection_pool=connection_pool, 

1921 encoder=redis_cluster.encoder, 

1922 push_handler_func=push_handler_func, 

1923 event_dispatcher=self._event_dispatcher, 

1924 **kwargs, 

1925 ) 

1926 

1927 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

1928 """ 

1929 The pubsub node will be set according to the passed node, host and port 

1930 When none of the node, host, or port are specified - the node is set 

1931 to None and will be determined by the keyslot of the channel in the 

1932 first command to be executed. 

1933 RedisClusterException will be thrown if the passed node does not exist 

1934 in the cluster. 

1935 If host is passed without port, or vice versa, a DataError will be 

1936 thrown. 

1937 :type cluster: RedisCluster 

1938 :type node: ClusterNode 

1939 :type host: str 

1940 :type port: int 

1941 """ 

1942 if node is not None: 

1943 # node is passed by the user 

1944 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

1945 pubsub_node = node 

1946 elif host is not None and port is not None: 

1947 # host and port passed by the user 

1948 node = cluster.get_node(host=host, port=port) 

1949 self._raise_on_invalid_node(cluster, node, host, port) 

1950 pubsub_node = node 

1951 elif any([host, port]) is True: 

1952 # only 'host' or 'port' passed 

1953 raise DataError("Passing a host requires passing a port, and vice versa") 

1954 else: 

1955 # nothing passed by the user. set node to None 

1956 pubsub_node = None 

1957 

1958 self.node = pubsub_node 

1959 

1960 def get_pubsub_node(self): 

1961 """ 

1962 Get the node that is being used as the pubsub connection 

1963 """ 

1964 return self.node 

1965 

1966 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

1967 """ 

1968 Raise a RedisClusterException if the node is None or doesn't exist in 

1969 the cluster. 

1970 """ 

1971 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

1972 raise RedisClusterException( 

1973 f"Node {host}:{port} doesn't exist in the cluster" 

1974 ) 

1975 

1976 def execute_command(self, *args): 

1977 """ 

1978 Execute a subscribe/unsubscribe command. 

1979 

1980 Taken code from redis-py and tweak to make it work within a cluster. 

1981 """ 

1982 # NOTE: don't parse the response in this function -- it could pull a 

1983 # legitimate message off the stack if the connection is already 

1984 # subscribed to one or more channels 

1985 

1986 if self.connection is None: 

1987 if self.connection_pool is None: 

1988 if len(args) > 1: 

1989 # Hash the first channel and get one of the nodes holding 

1990 # this slot 

1991 channel = args[1] 

1992 slot = self.cluster.keyslot(channel) 

1993 node = self.cluster.nodes_manager.get_node_from_slot( 

1994 slot, 

1995 self.cluster.read_from_replicas, 

1996 self.cluster.load_balancing_strategy, 

1997 ) 

1998 else: 

1999 # Get a random node 

2000 node = self.cluster.get_random_node() 

2001 self.node = node 

2002 redis_connection = self.cluster.get_redis_connection(node) 

2003 self.connection_pool = redis_connection.connection_pool 

2004 self.connection = self.connection_pool.get_connection() 

2005 # register a callback that re-subscribes to any channels we 

2006 # were listening to when we were disconnected 

2007 self.connection.register_connect_callback(self.on_connect) 

2008 if self.push_handler_func is not None: 

2009 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

2010 self._event_dispatcher.dispatch( 

2011 AfterPubSubConnectionInstantiationEvent( 

2012 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

2013 ) 

2014 ) 

2015 connection = self.connection 

2016 self._execute(connection, connection.send_command, *args) 

2017 

2018 def _get_node_pubsub(self, node): 

2019 try: 

2020 return self.node_pubsub_mapping[node.name] 

2021 except KeyError: 

2022 pubsub = node.redis_connection.pubsub( 

2023 push_handler_func=self.push_handler_func 

2024 ) 

2025 self.node_pubsub_mapping[node.name] = pubsub 

2026 return pubsub 

2027 

2028 def _sharded_message_generator(self): 

2029 for _ in range(len(self.node_pubsub_mapping)): 

2030 pubsub = next(self._pubsubs_generator) 

2031 message = pubsub.get_message() 

2032 if message is not None: 

2033 return message 

2034 return None 

2035 

2036 def _pubsubs_generator(self): 

2037 while True: 

2038 yield from self.node_pubsub_mapping.values() 

2039 

2040 def get_sharded_message( 

2041 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None 

2042 ): 

2043 if target_node: 

2044 message = self.node_pubsub_mapping[target_node.name].get_message( 

2045 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout 

2046 ) 

2047 else: 

2048 message = self._sharded_message_generator() 

2049 if message is None: 

2050 return None 

2051 elif str_if_bytes(message["type"]) == "sunsubscribe": 

2052 if message["channel"] in self.pending_unsubscribe_shard_channels: 

2053 self.pending_unsubscribe_shard_channels.remove(message["channel"]) 

2054 self.shard_channels.pop(message["channel"], None) 

2055 node = self.cluster.get_node_from_key(message["channel"]) 

2056 if self.node_pubsub_mapping[node.name].subscribed is False: 

2057 self.node_pubsub_mapping.pop(node.name) 

2058 if not self.channels and not self.patterns and not self.shard_channels: 

2059 # There are no subscriptions anymore, set subscribed_event flag 

2060 # to false 

2061 self.subscribed_event.clear() 

2062 if self.ignore_subscribe_messages or ignore_subscribe_messages: 

2063 return None 

2064 return message 

2065 

2066 def ssubscribe(self, *args, **kwargs): 

2067 if args: 

2068 args = list_or_args(args[0], args[1:]) 

2069 s_channels = dict.fromkeys(args) 

2070 s_channels.update(kwargs) 

2071 for s_channel, handler in s_channels.items(): 

2072 node = self.cluster.get_node_from_key(s_channel) 

2073 pubsub = self._get_node_pubsub(node) 

2074 if handler: 

2075 pubsub.ssubscribe(**{s_channel: handler}) 

2076 else: 

2077 pubsub.ssubscribe(s_channel) 

2078 self.shard_channels.update(pubsub.shard_channels) 

2079 self.pending_unsubscribe_shard_channels.difference_update( 

2080 self._normalize_keys({s_channel: None}) 

2081 ) 

2082 if pubsub.subscribed and not self.subscribed: 

2083 self.subscribed_event.set() 

2084 self.health_check_response_counter = 0 

2085 

2086 def sunsubscribe(self, *args): 

2087 if args: 

2088 args = list_or_args(args[0], args[1:]) 

2089 else: 

2090 args = self.shard_channels 

2091 

2092 for s_channel in args: 

2093 node = self.cluster.get_node_from_key(s_channel) 

2094 p = self._get_node_pubsub(node) 

2095 p.sunsubscribe(s_channel) 

2096 self.pending_unsubscribe_shard_channels.update( 

2097 p.pending_unsubscribe_shard_channels 

2098 ) 

2099 

2100 def get_redis_connection(self): 

2101 """ 

2102 Get the Redis connection of the pubsub connected node. 

2103 """ 

2104 if self.node is not None: 

2105 return self.node.redis_connection 

2106 

2107 def disconnect(self): 

2108 """ 

2109 Disconnect the pubsub connection. 

2110 """ 

2111 if self.connection: 

2112 self.connection.disconnect() 

2113 for pubsub in self.node_pubsub_mapping.values(): 

2114 pubsub.connection.disconnect() 

2115 

2116 

2117class ClusterPipeline(RedisCluster): 

2118 """ 

2119 Support for Redis pipeline 

2120 in cluster mode 

2121 """ 

2122 

2123 ERRORS_ALLOW_RETRY = ( 

2124 ConnectionError, 

2125 TimeoutError, 

2126 MovedError, 

2127 AskError, 

2128 TryAgainError, 

2129 ) 

2130 

2131 NO_SLOTS_COMMANDS = {"UNWATCH"} 

2132 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

2133 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

2134 

2135 @deprecated_args( 

2136 args_to_warn=[ 

2137 "cluster_error_retry_attempts", 

2138 ], 

2139 reason="Please configure the 'retry' object instead", 

2140 version="6.0.0", 

2141 ) 

2142 def __init__( 

2143 self, 

2144 nodes_manager: "NodesManager", 

2145 commands_parser: "CommandsParser", 

2146 result_callbacks: Optional[Dict[str, Callable]] = None, 

2147 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

2148 startup_nodes: Optional[List["ClusterNode"]] = None, 

2149 read_from_replicas: bool = False, 

2150 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

2151 cluster_error_retry_attempts: int = 3, 

2152 reinitialize_steps: int = 5, 

2153 retry: Optional[Retry] = None, 

2154 lock=None, 

2155 transaction=False, 

2156 **kwargs, 

2157 ): 

2158 """ """ 

2159 self.command_stack = [] 

2160 self.nodes_manager = nodes_manager 

2161 self.commands_parser = commands_parser 

2162 self.refresh_table_asap = False 

2163 self.result_callbacks = ( 

2164 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

2165 ) 

2166 self.startup_nodes = startup_nodes if startup_nodes else [] 

2167 self.read_from_replicas = read_from_replicas 

2168 self.load_balancing_strategy = load_balancing_strategy 

2169 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

2170 self.cluster_response_callbacks = cluster_response_callbacks 

2171 self.reinitialize_counter = 0 

2172 self.reinitialize_steps = reinitialize_steps 

2173 if retry is not None: 

2174 self.retry = retry 

2175 else: 

2176 self.retry = Retry( 

2177 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

2178 retries=cluster_error_retry_attempts, 

2179 ) 

2180 

2181 self.encoder = Encoder( 

2182 kwargs.get("encoding", "utf-8"), 

2183 kwargs.get("encoding_errors", "strict"), 

2184 kwargs.get("decode_responses", False), 

2185 ) 

2186 if lock is None: 

2187 lock = threading.RLock() 

2188 self._lock = lock 

2189 self.parent_execute_command = super().execute_command 

2190 self._execution_strategy: ExecutionStrategy = ( 

2191 PipelineStrategy(self) if not transaction else TransactionStrategy(self) 

2192 ) 

2193 

2194 def __repr__(self): 

2195 """ """ 

2196 return f"{type(self).__name__}" 

2197 

2198 def __enter__(self): 

2199 """ """ 

2200 return self 

2201 

2202 def __exit__(self, exc_type, exc_value, traceback): 

2203 """ """ 

2204 self.reset() 

2205 

2206 def __del__(self): 

2207 try: 

2208 self.reset() 

2209 except Exception: 

2210 pass 

2211 

2212 def __len__(self): 

2213 """ """ 

2214 return len(self._execution_strategy.command_queue) 

2215 

2216 def __bool__(self): 

2217 "Pipeline instances should always evaluate to True on Python 3+" 

2218 return True 

2219 

2220 def execute_command(self, *args, **kwargs): 

2221 """ 

2222 Wrapper function for pipeline_execute_command 

2223 """ 

2224 return self._execution_strategy.execute_command(*args, **kwargs) 

2225 

2226 def pipeline_execute_command(self, *args, **options): 

2227 """ 

2228 Stage a command to be executed when execute() is next called 

2229 

2230 Returns the current Pipeline object back so commands can be 

2231 chained together, such as: 

2232 

2233 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

2234 

2235 At some other point, you can then run: pipe.execute(), 

2236 which will execute all commands queued in the pipe. 

2237 """ 

2238 return self._execution_strategy.execute_command(*args, **options) 

2239 

2240 def annotate_exception(self, exception, number, command): 

2241 """ 

2242 Provides extra context to the exception prior to it being handled 

2243 """ 

2244 self._execution_strategy.annotate_exception(exception, number, command) 

2245 

2246 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2247 """ 

2248 Execute all the commands in the current pipeline 

2249 """ 

2250 

2251 try: 

2252 return self._execution_strategy.execute(raise_on_error) 

2253 finally: 

2254 self.reset() 

2255 

2256 def reset(self): 

2257 """ 

2258 Reset back to empty pipeline. 

2259 """ 

2260 self._execution_strategy.reset() 

2261 

2262 def send_cluster_commands( 

2263 self, stack, raise_on_error=True, allow_redirections=True 

2264 ): 

2265 return self._execution_strategy.send_cluster_commands( 

2266 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections 

2267 ) 

2268 

2269 def exists(self, *keys): 

2270 return self._execution_strategy.exists(*keys) 

2271 

2272 def eval(self): 

2273 """ """ 

2274 return self._execution_strategy.eval() 

2275 

2276 def multi(self): 

2277 """ 

2278 Start a transactional block of the pipeline after WATCH commands 

2279 are issued. End the transactional block with `execute`. 

2280 """ 

2281 self._execution_strategy.multi() 

2282 

2283 def load_scripts(self): 

2284 """ """ 

2285 self._execution_strategy.load_scripts() 

2286 

2287 def discard(self): 

2288 """ """ 

2289 self._execution_strategy.discard() 

2290 

2291 def watch(self, *names): 

2292 """Watches the values at keys ``names``""" 

2293 self._execution_strategy.watch(*names) 

2294 

2295 def unwatch(self): 

2296 """Unwatches all previously specified keys""" 

2297 self._execution_strategy.unwatch() 

2298 

2299 def script_load_for_pipeline(self, *args, **kwargs): 

2300 self._execution_strategy.script_load_for_pipeline(*args, **kwargs) 

2301 

2302 def delete(self, *names): 

2303 self._execution_strategy.delete(*names) 

2304 

2305 def unlink(self, *names): 

2306 self._execution_strategy.unlink(*names) 

2307 

2308 

2309def block_pipeline_command(name: str) -> Callable[..., Any]: 

2310 """ 

2311 Prints error because some pipelined commands should 

2312 be blocked when running in cluster-mode 

2313 """ 

2314 

2315 def inner(*args, **kwargs): 

2316 raise RedisClusterException( 

2317 f"ERROR: Calling pipelined function {name} is blocked " 

2318 f"when running redis in cluster mode..." 

2319 ) 

2320 

2321 return inner 

2322 

2323 

2324# Blocked pipeline commands 

2325PIPELINE_BLOCKED_COMMANDS = ( 

2326 "BGREWRITEAOF", 

2327 "BGSAVE", 

2328 "BITOP", 

2329 "BRPOPLPUSH", 

2330 "CLIENT GETNAME", 

2331 "CLIENT KILL", 

2332 "CLIENT LIST", 

2333 "CLIENT SETNAME", 

2334 "CLIENT", 

2335 "CONFIG GET", 

2336 "CONFIG RESETSTAT", 

2337 "CONFIG REWRITE", 

2338 "CONFIG SET", 

2339 "CONFIG", 

2340 "DBSIZE", 

2341 "ECHO", 

2342 "EVALSHA", 

2343 "FLUSHALL", 

2344 "FLUSHDB", 

2345 "INFO", 

2346 "KEYS", 

2347 "LASTSAVE", 

2348 "MGET", 

2349 "MGET NONATOMIC", 

2350 "MOVE", 

2351 "MSET", 

2352 "MSET NONATOMIC", 

2353 "MSETNX", 

2354 "PFCOUNT", 

2355 "PFMERGE", 

2356 "PING", 

2357 "PUBLISH", 

2358 "RANDOMKEY", 

2359 "READONLY", 

2360 "READWRITE", 

2361 "RENAME", 

2362 "RENAMENX", 

2363 "RPOPLPUSH", 

2364 "SAVE", 

2365 "SCAN", 

2366 "SCRIPT EXISTS", 

2367 "SCRIPT FLUSH", 

2368 "SCRIPT KILL", 

2369 "SCRIPT LOAD", 

2370 "SCRIPT", 

2371 "SDIFF", 

2372 "SDIFFSTORE", 

2373 "SENTINEL GET MASTER ADDR BY NAME", 

2374 "SENTINEL MASTER", 

2375 "SENTINEL MASTERS", 

2376 "SENTINEL MONITOR", 

2377 "SENTINEL REMOVE", 

2378 "SENTINEL SENTINELS", 

2379 "SENTINEL SET", 

2380 "SENTINEL SLAVES", 

2381 "SENTINEL", 

2382 "SHUTDOWN", 

2383 "SINTER", 

2384 "SINTERSTORE", 

2385 "SLAVEOF", 

2386 "SLOWLOG GET", 

2387 "SLOWLOG LEN", 

2388 "SLOWLOG RESET", 

2389 "SLOWLOG", 

2390 "SMOVE", 

2391 "SORT", 

2392 "SUNION", 

2393 "SUNIONSTORE", 

2394 "TIME", 

2395) 

2396for command in PIPELINE_BLOCKED_COMMANDS: 

2397 command = command.replace(" ", "_").lower() 

2398 

2399 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

2400 

2401 

2402class PipelineCommand: 

2403 """ """ 

2404 

2405 def __init__(self, args, options=None, position=None): 

2406 self.args = args 

2407 if options is None: 

2408 options = {} 

2409 self.options = options 

2410 self.position = position 

2411 self.result = None 

2412 self.node = None 

2413 self.asking = False 

2414 

2415 

2416class NodeCommands: 

2417 """ """ 

2418 

2419 def __init__(self, parse_response, connection_pool, connection): 

2420 """ """ 

2421 self.parse_response = parse_response 

2422 self.connection_pool = connection_pool 

2423 self.connection = connection 

2424 self.commands = [] 

2425 

2426 def append(self, c): 

2427 """ """ 

2428 self.commands.append(c) 

2429 

2430 def write(self): 

2431 """ 

2432 Code borrowed from Redis so it can be fixed 

2433 """ 

2434 connection = self.connection 

2435 commands = self.commands 

2436 

2437 # We are going to clobber the commands with the write, so go ahead 

2438 # and ensure that nothing is sitting there from a previous run. 

2439 for c in commands: 

2440 c.result = None 

2441 

2442 # build up all commands into a single request to increase network perf 

2443 # send all the commands and catch connection and timeout errors. 

2444 try: 

2445 connection.send_packed_command( 

2446 connection.pack_commands([c.args for c in commands]) 

2447 ) 

2448 except (ConnectionError, TimeoutError) as e: 

2449 for c in commands: 

2450 c.result = e 

2451 

2452 def read(self): 

2453 """ """ 

2454 connection = self.connection 

2455 for c in self.commands: 

2456 # if there is a result on this command, 

2457 # it means we ran into an exception 

2458 # like a connection error. Trying to parse 

2459 # a response on a connection that 

2460 # is no longer open will result in a 

2461 # connection error raised by redis-py. 

2462 # but redis-py doesn't check in parse_response 

2463 # that the sock object is 

2464 # still set and if you try to 

2465 # read from a closed connection, it will 

2466 # result in an AttributeError because 

2467 # it will do a readline() call on None. 

2468 # This can have all kinds of nasty side-effects. 

2469 # Treating this case as a connection error 

2470 # is fine because it will dump 

2471 # the connection object back into the 

2472 # pool and on the next write, it will 

2473 # explicitly open the connection and all will be well. 

2474 if c.result is None: 

2475 try: 

2476 c.result = self.parse_response(connection, c.args[0], **c.options) 

2477 except (ConnectionError, TimeoutError) as e: 

2478 for c in self.commands: 

2479 c.result = e 

2480 return 

2481 except RedisError: 

2482 c.result = sys.exc_info()[1] 

2483 

2484 

2485class ExecutionStrategy(ABC): 

2486 @property 

2487 @abstractmethod 

2488 def command_queue(self): 

2489 pass 

2490 

2491 @abstractmethod 

2492 def execute_command(self, *args, **kwargs): 

2493 """ 

2494 Execution flow for current execution strategy. 

2495 

2496 See: ClusterPipeline.execute_command() 

2497 """ 

2498 pass 

2499 

2500 @abstractmethod 

2501 def annotate_exception(self, exception, number, command): 

2502 """ 

2503 Annotate exception according to current execution strategy. 

2504 

2505 See: ClusterPipeline.annotate_exception() 

2506 """ 

2507 pass 

2508 

2509 @abstractmethod 

2510 def pipeline_execute_command(self, *args, **options): 

2511 """ 

2512 Pipeline execution flow for current execution strategy. 

2513 

2514 See: ClusterPipeline.pipeline_execute_command() 

2515 """ 

2516 pass 

2517 

2518 @abstractmethod 

2519 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2520 """ 

2521 Executes current execution strategy. 

2522 

2523 See: ClusterPipeline.execute() 

2524 """ 

2525 pass 

2526 

2527 @abstractmethod 

2528 def send_cluster_commands( 

2529 self, stack, raise_on_error=True, allow_redirections=True 

2530 ): 

2531 """ 

2532 Sends commands according to current execution strategy. 

2533 

2534 See: ClusterPipeline.send_cluster_commands() 

2535 """ 

2536 pass 

2537 

2538 @abstractmethod 

2539 def reset(self): 

2540 """ 

2541 Resets current execution strategy. 

2542 

2543 See: ClusterPipeline.reset() 

2544 """ 

2545 pass 

2546 

2547 @abstractmethod 

2548 def exists(self, *keys): 

2549 pass 

2550 

2551 @abstractmethod 

2552 def eval(self): 

2553 pass 

2554 

2555 @abstractmethod 

2556 def multi(self): 

2557 """ 

2558 Starts transactional context. 

2559 

2560 See: ClusterPipeline.multi() 

2561 """ 

2562 pass 

2563 

2564 @abstractmethod 

2565 def load_scripts(self): 

2566 pass 

2567 

2568 @abstractmethod 

2569 def watch(self, *names): 

2570 pass 

2571 

2572 @abstractmethod 

2573 def unwatch(self): 

2574 """ 

2575 Unwatches all previously specified keys 

2576 

2577 See: ClusterPipeline.unwatch() 

2578 """ 

2579 pass 

2580 

2581 @abstractmethod 

2582 def script_load_for_pipeline(self, *args, **kwargs): 

2583 pass 

2584 

2585 @abstractmethod 

2586 def delete(self, *names): 

2587 """ 

2588 "Delete a key specified by ``names``" 

2589 

2590 See: ClusterPipeline.delete() 

2591 """ 

2592 pass 

2593 

2594 @abstractmethod 

2595 def unlink(self, *names): 

2596 """ 

2597 "Unlink a key specified by ``names``" 

2598 

2599 See: ClusterPipeline.unlink() 

2600 """ 

2601 pass 

2602 

2603 @abstractmethod 

2604 def discard(self): 

2605 pass 

2606 

2607 

2608class AbstractStrategy(ExecutionStrategy): 

2609 def __init__( 

2610 self, 

2611 pipe: ClusterPipeline, 

2612 ): 

2613 self._command_queue: List[PipelineCommand] = [] 

2614 self._pipe = pipe 

2615 self._nodes_manager = self._pipe.nodes_manager 

2616 

2617 @property 

2618 def command_queue(self): 

2619 return self._command_queue 

2620 

2621 @command_queue.setter 

2622 def command_queue(self, queue: List[PipelineCommand]): 

2623 self._command_queue = queue 

2624 

2625 @abstractmethod 

2626 def execute_command(self, *args, **kwargs): 

2627 pass 

2628 

2629 def pipeline_execute_command(self, *args, **options): 

2630 self._command_queue.append( 

2631 PipelineCommand(args, options, len(self._command_queue)) 

2632 ) 

2633 return self._pipe 

2634 

2635 @abstractmethod 

2636 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2637 pass 

2638 

2639 @abstractmethod 

2640 def send_cluster_commands( 

2641 self, stack, raise_on_error=True, allow_redirections=True 

2642 ): 

2643 pass 

2644 

2645 @abstractmethod 

2646 def reset(self): 

2647 pass 

2648 

2649 def exists(self, *keys): 

2650 return self.execute_command("EXISTS", *keys) 

2651 

2652 def eval(self): 

2653 """ """ 

2654 raise RedisClusterException("method eval() is not implemented") 

2655 

2656 def load_scripts(self): 

2657 """ """ 

2658 raise RedisClusterException("method load_scripts() is not implemented") 

2659 

2660 def script_load_for_pipeline(self, *args, **kwargs): 

2661 """ """ 

2662 raise RedisClusterException( 

2663 "method script_load_for_pipeline() is not implemented" 

2664 ) 

2665 

2666 def annotate_exception(self, exception, number, command): 

2667 """ 

2668 Provides extra context to the exception prior to it being handled 

2669 """ 

2670 cmd = " ".join(map(safe_str, command)) 

2671 msg = ( 

2672 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

2673 f"caused error: {exception.args[0]}" 

2674 ) 

2675 exception.args = (msg,) + exception.args[1:] 

2676 

2677 

2678class PipelineStrategy(AbstractStrategy): 

2679 def __init__(self, pipe: ClusterPipeline): 

2680 super().__init__(pipe) 

2681 self.command_flags = pipe.command_flags 

2682 

2683 def execute_command(self, *args, **kwargs): 

2684 return self.pipeline_execute_command(*args, **kwargs) 

2685 

2686 def _raise_first_error(self, stack): 

2687 """ 

2688 Raise the first exception on the stack 

2689 """ 

2690 for c in stack: 

2691 r = c.result 

2692 if isinstance(r, Exception): 

2693 self.annotate_exception(r, c.position + 1, c.args) 

2694 raise r 

2695 

2696 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2697 stack = self._command_queue 

2698 if not stack: 

2699 return [] 

2700 

2701 try: 

2702 return self.send_cluster_commands(stack, raise_on_error) 

2703 finally: 

2704 self.reset() 

2705 

2706 def reset(self): 

2707 """ 

2708 Reset back to empty pipeline. 

2709 """ 

2710 self._command_queue = [] 

2711 

2712 def send_cluster_commands( 

2713 self, stack, raise_on_error=True, allow_redirections=True 

2714 ): 

2715 """ 

2716 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling. 

2717 

2718 If one of the retryable exceptions has been thrown we assume that: 

2719 - connection_pool was disconnected 

2720 - connection_pool was reset 

2721 - refresh_table_asap set to True 

2722 

2723 It will try the number of times specified by 

2724 the retries in config option "self.retry" 

2725 which defaults to 3 unless manually configured. 

2726 

2727 If it reaches the number of times, the command will 

2728 raises ClusterDownException. 

2729 """ 

2730 if not stack: 

2731 return [] 

2732 retry_attempts = self._pipe.retry.get_retries() 

2733 while True: 

2734 try: 

2735 return self._send_cluster_commands( 

2736 stack, 

2737 raise_on_error=raise_on_error, 

2738 allow_redirections=allow_redirections, 

2739 ) 

2740 except RedisCluster.ERRORS_ALLOW_RETRY as e: 

2741 if retry_attempts > 0: 

2742 # Try again with the new cluster setup. All other errors 

2743 # should be raised. 

2744 retry_attempts -= 1 

2745 pass 

2746 else: 

2747 raise e 

2748 

2749 def _send_cluster_commands( 

2750 self, stack, raise_on_error=True, allow_redirections=True 

2751 ): 

2752 """ 

2753 Send a bunch of cluster commands to the redis cluster. 

2754 

2755 `allow_redirections` If the pipeline should follow 

2756 `ASK` & `MOVED` responses automatically. If set 

2757 to false it will raise RedisClusterException. 

2758 """ 

2759 # the first time sending the commands we send all of 

2760 # the commands that were queued up. 

2761 # if we have to run through it again, we only retry 

2762 # the commands that failed. 

2763 attempt = sorted(stack, key=lambda x: x.position) 

2764 is_default_node = False 

2765 # build a list of node objects based on node names we need to 

2766 nodes = {} 

2767 

2768 # as we move through each command that still needs to be processed, 

2769 # we figure out the slot number that command maps to, then from 

2770 # the slot determine the node. 

2771 for c in attempt: 

2772 while True: 

2773 # refer to our internal node -> slot table that 

2774 # tells us where a given command should route to. 

2775 # (it might be possible we have a cached node that no longer 

2776 # exists in the cluster, which is why we do this in a loop) 

2777 passed_targets = c.options.pop("target_nodes", None) 

2778 if passed_targets and not self._is_nodes_flag(passed_targets): 

2779 target_nodes = self._parse_target_nodes(passed_targets) 

2780 else: 

2781 target_nodes = self._determine_nodes( 

2782 *c.args, node_flag=passed_targets 

2783 ) 

2784 if not target_nodes: 

2785 raise RedisClusterException( 

2786 f"No targets were found to execute {c.args} command on" 

2787 ) 

2788 if len(target_nodes) > 1: 

2789 raise RedisClusterException( 

2790 f"Too many targets for command {c.args}" 

2791 ) 

2792 

2793 node = target_nodes[0] 

2794 if node == self._pipe.get_default_node(): 

2795 is_default_node = True 

2796 

2797 # now that we know the name of the node 

2798 # ( it's just a string in the form of host:port ) 

2799 # we can build a list of commands for each node. 

2800 node_name = node.name 

2801 if node_name not in nodes: 

2802 redis_node = self._pipe.get_redis_connection(node) 

2803 try: 

2804 connection = get_connection(redis_node) 

2805 except (ConnectionError, TimeoutError): 

2806 for n in nodes.values(): 

2807 n.connection_pool.release(n.connection) 

2808 # Connection retries are being handled in the node's 

2809 # Retry object. Reinitialize the node -> slot table. 

2810 self._nodes_manager.initialize() 

2811 if is_default_node: 

2812 self._pipe.replace_default_node() 

2813 raise 

2814 nodes[node_name] = NodeCommands( 

2815 redis_node.parse_response, 

2816 redis_node.connection_pool, 

2817 connection, 

2818 ) 

2819 nodes[node_name].append(c) 

2820 break 

2821 

2822 # send the commands in sequence. 

2823 # we write to all the open sockets for each node first, 

2824 # before reading anything 

2825 # this allows us to flush all the requests out across the 

2826 # network 

2827 # so that we can read them from different sockets as they come back. 

2828 # we dont' multiplex on the sockets as they come available, 

2829 # but that shouldn't make too much difference. 

2830 try: 

2831 node_commands = nodes.values() 

2832 for n in node_commands: 

2833 n.write() 

2834 

2835 for n in node_commands: 

2836 n.read() 

2837 finally: 

2838 # release all of the redis connections we allocated earlier 

2839 # back into the connection pool. 

2840 # we used to do this step as part of a try/finally block, 

2841 # but it is really dangerous to 

2842 # release connections back into the pool if for some 

2843 # reason the socket has data still left in it 

2844 # from a previous operation. The write and 

2845 # read operations already have try/catch around them for 

2846 # all known types of errors including connection 

2847 # and socket level errors. 

2848 # So if we hit an exception, something really bad 

2849 # happened and putting any oF 

2850 # these connections back into the pool is a very bad idea. 

2851 # the socket might have unread buffer still sitting in it, 

2852 # and then the next time we read from it we pass the 

2853 # buffered result back from a previous command and 

2854 # every single request after to that connection will always get 

2855 # a mismatched result. 

2856 for n in nodes.values(): 

2857 n.connection_pool.release(n.connection) 

2858 

2859 # if the response isn't an exception it is a 

2860 # valid response from the node 

2861 # we're all done with that command, YAY! 

2862 # if we have more commands to attempt, we've run into problems. 

2863 # collect all the commands we are allowed to retry. 

2864 # (MOVED, ASK, or connection errors or timeout errors) 

2865 attempt = sorted( 

2866 ( 

2867 c 

2868 for c in attempt 

2869 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

2870 ), 

2871 key=lambda x: x.position, 

2872 ) 

2873 if attempt and allow_redirections: 

2874 # RETRY MAGIC HAPPENS HERE! 

2875 # send these remaining commands one at a time using `execute_command` 

2876 # in the main client. This keeps our retry logic 

2877 # in one place mostly, 

2878 # and allows us to be more confident in correctness of behavior. 

2879 # at this point any speed gains from pipelining have been lost 

2880 # anyway, so we might as well make the best 

2881 # attempt to get the correct behavior. 

2882 # 

2883 # The client command will handle retries for each 

2884 # individual command sequentially as we pass each 

2885 # one into `execute_command`. Any exceptions 

2886 # that bubble out should only appear once all 

2887 # retries have been exhausted. 

2888 # 

2889 # If a lot of commands have failed, we'll be setting the 

2890 # flag to rebuild the slots table from scratch. 

2891 # So MOVED errors should correct themselves fairly quickly. 

2892 self._pipe.reinitialize_counter += 1 

2893 if self._pipe._should_reinitialized(): 

2894 self._nodes_manager.initialize() 

2895 if is_default_node: 

2896 self._pipe.replace_default_node() 

2897 for c in attempt: 

2898 try: 

2899 # send each command individually like we 

2900 # do in the main client. 

2901 c.result = self._pipe.parent_execute_command(*c.args, **c.options) 

2902 except RedisError as e: 

2903 c.result = e 

2904 

2905 # turn the response back into a simple flat array that corresponds 

2906 # to the sequence of commands issued in the stack in pipeline.execute() 

2907 response = [] 

2908 for c in sorted(stack, key=lambda x: x.position): 

2909 if c.args[0] in self._pipe.cluster_response_callbacks: 

2910 # Remove keys entry, it needs only for cache. 

2911 c.options.pop("keys", None) 

2912 c.result = self._pipe.cluster_response_callbacks[c.args[0]]( 

2913 c.result, **c.options 

2914 ) 

2915 response.append(c.result) 

2916 

2917 if raise_on_error: 

2918 self._raise_first_error(stack) 

2919 

2920 return response 

2921 

2922 def _is_nodes_flag(self, target_nodes): 

2923 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags 

2924 

2925 def _parse_target_nodes(self, target_nodes): 

2926 if isinstance(target_nodes, list): 

2927 nodes = target_nodes 

2928 elif isinstance(target_nodes, ClusterNode): 

2929 # Supports passing a single ClusterNode as a variable 

2930 nodes = [target_nodes] 

2931 elif isinstance(target_nodes, dict): 

2932 # Supports dictionaries of the format {node_name: node}. 

2933 # It enables to execute commands with multi nodes as follows: 

2934 # rc.cluster_save_config(rc.get_primaries()) 

2935 nodes = target_nodes.values() 

2936 else: 

2937 raise TypeError( 

2938 "target_nodes type can be one of the following: " 

2939 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

2940 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

2941 f"The passed type is {type(target_nodes)}" 

2942 ) 

2943 return nodes 

2944 

2945 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: 

2946 # Determine which nodes should be executed the command on. 

2947 # Returns a list of target nodes. 

2948 command = args[0].upper() 

2949 if ( 

2950 len(args) >= 2 

2951 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags 

2952 ): 

2953 command = f"{args[0]} {args[1]}".upper() 

2954 

2955 nodes_flag = kwargs.pop("nodes_flag", None) 

2956 if nodes_flag is not None: 

2957 # nodes flag passed by the user 

2958 command_flag = nodes_flag 

2959 else: 

2960 # get the nodes group for this command if it was predefined 

2961 command_flag = self._pipe.command_flags.get(command) 

2962 if command_flag == self._pipe.RANDOM: 

2963 # return a random node 

2964 return [self._pipe.get_random_node()] 

2965 elif command_flag == self._pipe.PRIMARIES: 

2966 # return all primaries 

2967 return self._pipe.get_primaries() 

2968 elif command_flag == self._pipe.REPLICAS: 

2969 # return all replicas 

2970 return self._pipe.get_replicas() 

2971 elif command_flag == self._pipe.ALL_NODES: 

2972 # return all nodes 

2973 return self._pipe.get_nodes() 

2974 elif command_flag == self._pipe.DEFAULT_NODE: 

2975 # return the cluster's default node 

2976 return [self._nodes_manager.default_node] 

2977 elif command in self._pipe.SEARCH_COMMANDS[0]: 

2978 return [self._nodes_manager.default_node] 

2979 else: 

2980 # get the node that holds the key's slot 

2981 slot = self._pipe.determine_slot(*args) 

2982 node = self._nodes_manager.get_node_from_slot( 

2983 slot, 

2984 self._pipe.read_from_replicas and command in READ_COMMANDS, 

2985 self._pipe.load_balancing_strategy 

2986 if command in READ_COMMANDS 

2987 else None, 

2988 ) 

2989 return [node] 

2990 

2991 def multi(self): 

2992 raise RedisClusterException( 

2993 "method multi() is not supported outside of transactional context" 

2994 ) 

2995 

2996 def discard(self): 

2997 raise RedisClusterException( 

2998 "method discard() is not supported outside of transactional context" 

2999 ) 

3000 

3001 def watch(self, *names): 

3002 raise RedisClusterException( 

3003 "method watch() is not supported outside of transactional context" 

3004 ) 

3005 

3006 def unwatch(self, *names): 

3007 raise RedisClusterException( 

3008 "method unwatch() is not supported outside of transactional context" 

3009 ) 

3010 

3011 def delete(self, *names): 

3012 if len(names) != 1: 

3013 raise RedisClusterException( 

3014 "deleting multiple keys is not implemented in pipeline command" 

3015 ) 

3016 

3017 return self.execute_command("DEL", names[0]) 

3018 

3019 def unlink(self, *names): 

3020 if len(names) != 1: 

3021 raise RedisClusterException( 

3022 "unlinking multiple keys is not implemented in pipeline command" 

3023 ) 

3024 

3025 return self.execute_command("UNLINK", names[0]) 

3026 

3027 

3028class TransactionStrategy(AbstractStrategy): 

3029 NO_SLOTS_COMMANDS = {"UNWATCH"} 

3030 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

3031 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

3032 SLOT_REDIRECT_ERRORS = (AskError, MovedError) 

3033 CONNECTION_ERRORS = ( 

3034 ConnectionError, 

3035 OSError, 

3036 ClusterDownError, 

3037 SlotNotCoveredError, 

3038 ) 

3039 

3040 def __init__(self, pipe: ClusterPipeline): 

3041 super().__init__(pipe) 

3042 self._explicit_transaction = False 

3043 self._watching = False 

3044 self._pipeline_slots: Set[int] = set() 

3045 self._transaction_connection: Optional[Connection] = None 

3046 self._executing = False 

3047 self._retry = copy(self._pipe.retry) 

3048 self._retry.update_supported_errors( 

3049 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS 

3050 ) 

3051 

3052 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]: 

3053 """ 

3054 Find a connection for a pipeline transaction. 

3055 

3056 For running an atomic transaction, watch keys ensure that contents have not been 

3057 altered as long as the watch commands for those keys were sent over the same 

3058 connection. So once we start watching a key, we fetch a connection to the 

3059 node that owns that slot and reuse it. 

3060 """ 

3061 if not self._pipeline_slots: 

3062 raise RedisClusterException( 

3063 "At least a command with a key is needed to identify a node" 

3064 ) 

3065 

3066 node: ClusterNode = self._nodes_manager.get_node_from_slot( 

3067 list(self._pipeline_slots)[0], False 

3068 ) 

3069 redis_node: Redis = self._pipe.get_redis_connection(node) 

3070 if self._transaction_connection: 

3071 if not redis_node.connection_pool.owns_connection( 

3072 self._transaction_connection 

3073 ): 

3074 previous_node = self._nodes_manager.find_connection_owner( 

3075 self._transaction_connection 

3076 ) 

3077 previous_node.connection_pool.release(self._transaction_connection) 

3078 self._transaction_connection = None 

3079 

3080 if not self._transaction_connection: 

3081 self._transaction_connection = get_connection(redis_node) 

3082 

3083 return redis_node, self._transaction_connection 

3084 

3085 def execute_command(self, *args, **kwargs): 

3086 slot_number: Optional[int] = None 

3087 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS: 

3088 slot_number = self._pipe.determine_slot(*args) 

3089 

3090 if ( 

3091 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS 

3092 ) and not self._explicit_transaction: 

3093 if args[0] == "WATCH": 

3094 self._validate_watch() 

3095 

3096 if slot_number is not None: 

3097 if self._pipeline_slots and slot_number not in self._pipeline_slots: 

3098 raise CrossSlotTransactionError( 

3099 "Cannot watch or send commands on different slots" 

3100 ) 

3101 

3102 self._pipeline_slots.add(slot_number) 

3103 elif args[0] not in self.NO_SLOTS_COMMANDS: 

3104 raise RedisClusterException( 

3105 f"Cannot identify slot number for command: {args[0]}," 

3106 "it cannot be triggered in a transaction" 

3107 ) 

3108 

3109 return self._immediate_execute_command(*args, **kwargs) 

3110 else: 

3111 if slot_number is not None: 

3112 self._pipeline_slots.add(slot_number) 

3113 

3114 return self.pipeline_execute_command(*args, **kwargs) 

3115 

3116 def _validate_watch(self): 

3117 if self._explicit_transaction: 

3118 raise RedisError("Cannot issue a WATCH after a MULTI") 

3119 

3120 self._watching = True 

3121 

3122 def _immediate_execute_command(self, *args, **options): 

3123 return self._retry.call_with_retry( 

3124 lambda: self._get_connection_and_send_command(*args, **options), 

3125 self._reinitialize_on_error, 

3126 ) 

3127 

3128 def _get_connection_and_send_command(self, *args, **options): 

3129 redis_node, connection = self._get_client_and_connection_for_transaction() 

3130 return self._send_command_parse_response( 

3131 connection, redis_node, args[0], *args, **options 

3132 ) 

3133 

3134 def _send_command_parse_response( 

3135 self, conn, redis_node: Redis, command_name, *args, **options 

3136 ): 

3137 """ 

3138 Send a command and parse the response 

3139 """ 

3140 

3141 conn.send_command(*args) 

3142 output = redis_node.parse_response(conn, command_name, **options) 

3143 

3144 if command_name in self.UNWATCH_COMMANDS: 

3145 self._watching = False 

3146 return output 

3147 

3148 def _reinitialize_on_error(self, error): 

3149 if self._watching: 

3150 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: 

3151 raise WatchError("Slot rebalancing occurred while watching keys") 

3152 

3153 if ( 

3154 type(error) in self.SLOT_REDIRECT_ERRORS 

3155 or type(error) in self.CONNECTION_ERRORS 

3156 ): 

3157 if self._transaction_connection: 

3158 self._transaction_connection = None 

3159 

3160 self._pipe.reinitialize_counter += 1 

3161 if self._pipe._should_reinitialized(): 

3162 self._nodes_manager.initialize() 

3163 self.reinitialize_counter = 0 

3164 else: 

3165 self._nodes_manager.update_moved_exception(error) 

3166 

3167 self._executing = False 

3168 

3169 def _raise_first_error(self, responses, stack): 

3170 """ 

3171 Raise the first exception on the stack 

3172 """ 

3173 for r, cmd in zip(responses, stack): 

3174 if isinstance(r, Exception): 

3175 self.annotate_exception(r, cmd.position + 1, cmd.args) 

3176 raise r 

3177 

3178 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3179 stack = self._command_queue 

3180 if not stack and (not self._watching or not self._pipeline_slots): 

3181 return [] 

3182 

3183 return self._execute_transaction_with_retries(stack, raise_on_error) 

3184 

3185 def _execute_transaction_with_retries( 

3186 self, stack: List["PipelineCommand"], raise_on_error: bool 

3187 ): 

3188 return self._retry.call_with_retry( 

3189 lambda: self._execute_transaction(stack, raise_on_error), 

3190 self._reinitialize_on_error, 

3191 ) 

3192 

3193 def _execute_transaction( 

3194 self, stack: List["PipelineCommand"], raise_on_error: bool 

3195 ): 

3196 if len(self._pipeline_slots) > 1: 

3197 raise CrossSlotTransactionError( 

3198 "All keys involved in a cluster transaction must map to the same slot" 

3199 ) 

3200 

3201 self._executing = True 

3202 

3203 redis_node, connection = self._get_client_and_connection_for_transaction() 

3204 

3205 stack = chain( 

3206 [PipelineCommand(("MULTI",))], 

3207 stack, 

3208 [PipelineCommand(("EXEC",))], 

3209 ) 

3210 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options] 

3211 packed_commands = connection.pack_commands(commands) 

3212 connection.send_packed_command(packed_commands) 

3213 errors = [] 

3214 

3215 # parse off the response for MULTI 

3216 # NOTE: we need to handle ResponseErrors here and continue 

3217 # so that we read all the additional command messages from 

3218 # the socket 

3219 try: 

3220 redis_node.parse_response(connection, "MULTI") 

3221 except ResponseError as e: 

3222 self.annotate_exception(e, 0, "MULTI") 

3223 errors.append(e) 

3224 except self.CONNECTION_ERRORS as cluster_error: 

3225 self.annotate_exception(cluster_error, 0, "MULTI") 

3226 raise 

3227 

3228 # and all the other commands 

3229 for i, command in enumerate(self._command_queue): 

3230 if EMPTY_RESPONSE in command.options: 

3231 errors.append((i, command.options[EMPTY_RESPONSE])) 

3232 else: 

3233 try: 

3234 _ = redis_node.parse_response(connection, "_") 

3235 except self.SLOT_REDIRECT_ERRORS as slot_error: 

3236 self.annotate_exception(slot_error, i + 1, command.args) 

3237 errors.append(slot_error) 

3238 except self.CONNECTION_ERRORS as cluster_error: 

3239 self.annotate_exception(cluster_error, i + 1, command.args) 

3240 raise 

3241 except ResponseError as e: 

3242 self.annotate_exception(e, i + 1, command.args) 

3243 errors.append(e) 

3244 

3245 response = None 

3246 # parse the EXEC. 

3247 try: 

3248 response = redis_node.parse_response(connection, "EXEC") 

3249 except ExecAbortError: 

3250 if errors: 

3251 raise errors[0] 

3252 raise 

3253 

3254 self._executing = False 

3255 

3256 # EXEC clears any watched keys 

3257 self._watching = False 

3258 

3259 if response is None: 

3260 raise WatchError("Watched variable changed.") 

3261 

3262 # put any parse errors into the response 

3263 for i, e in errors: 

3264 response.insert(i, e) 

3265 

3266 if len(response) != len(self._command_queue): 

3267 raise InvalidPipelineStack( 

3268 "Unexpected response length for cluster pipeline EXEC." 

3269 " Command stack was {} but response had length {}".format( 

3270 [c.args[0] for c in self._command_queue], len(response) 

3271 ) 

3272 ) 

3273 

3274 # find any errors in the response and raise if necessary 

3275 if raise_on_error or len(errors) > 0: 

3276 self._raise_first_error( 

3277 response, 

3278 self._command_queue, 

3279 ) 

3280 

3281 # We have to run response callbacks manually 

3282 data = [] 

3283 for r, cmd in zip(response, self._command_queue): 

3284 if not isinstance(r, Exception): 

3285 command_name = cmd.args[0] 

3286 if command_name in self._pipe.cluster_response_callbacks: 

3287 r = self._pipe.cluster_response_callbacks[command_name]( 

3288 r, **cmd.options 

3289 ) 

3290 data.append(r) 

3291 return data 

3292 

3293 def reset(self): 

3294 self._command_queue = [] 

3295 

3296 # make sure to reset the connection state in the event that we were 

3297 # watching something 

3298 if self._transaction_connection: 

3299 try: 

3300 if self._watching: 

3301 # call this manually since our unwatch or 

3302 # immediate_execute_command methods can call reset() 

3303 self._transaction_connection.send_command("UNWATCH") 

3304 self._transaction_connection.read_response() 

3305 # we can safely return the connection to the pool here since we're 

3306 # sure we're no longer WATCHing anything 

3307 node = self._nodes_manager.find_connection_owner( 

3308 self._transaction_connection 

3309 ) 

3310 node.redis_connection.connection_pool.release( 

3311 self._transaction_connection 

3312 ) 

3313 self._transaction_connection = None 

3314 except self.CONNECTION_ERRORS: 

3315 # disconnect will also remove any previous WATCHes 

3316 if self._transaction_connection: 

3317 self._transaction_connection.disconnect() 

3318 

3319 # clean up the other instance attributes 

3320 self._watching = False 

3321 self._explicit_transaction = False 

3322 self._pipeline_slots = set() 

3323 self._executing = False 

3324 

3325 def send_cluster_commands( 

3326 self, stack, raise_on_error=True, allow_redirections=True 

3327 ): 

3328 raise NotImplementedError( 

3329 "send_cluster_commands cannot be executed in transactional context." 

3330 ) 

3331 

3332 def multi(self): 

3333 if self._explicit_transaction: 

3334 raise RedisError("Cannot issue nested calls to MULTI") 

3335 if self._command_queue: 

3336 raise RedisError( 

3337 "Commands without an initial WATCH have already been issued" 

3338 ) 

3339 self._explicit_transaction = True 

3340 

3341 def watch(self, *names): 

3342 if self._explicit_transaction: 

3343 raise RedisError("Cannot issue a WATCH after a MULTI") 

3344 

3345 return self.execute_command("WATCH", *names) 

3346 

3347 def unwatch(self): 

3348 if self._watching: 

3349 return self.execute_command("UNWATCH") 

3350 

3351 return True 

3352 

3353 def discard(self): 

3354 self.reset() 

3355 

3356 def delete(self, *names): 

3357 return self.execute_command("DEL", *names) 

3358 

3359 def unlink(self, *names): 

3360 return self.execute_command("UNLINK", *names)