Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/asyncio/cluster.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

999 statements  

1import asyncio 

2import collections 

3import random 

4import socket 

5import threading 

6import time 

7import warnings 

8from abc import ABC, abstractmethod 

9from copy import copy 

10from itertools import chain 

11from typing import ( 

12 Any, 

13 Callable, 

14 Coroutine, 

15 Deque, 

16 Dict, 

17 Generator, 

18 List, 

19 Mapping, 

20 Optional, 

21 Set, 

22 Tuple, 

23 Type, 

24 TypeVar, 

25 Union, 

26) 

27 

28from redis._parsers import AsyncCommandsParser, Encoder 

29from redis._parsers.helpers import ( 

30 _RedisCallbacks, 

31 _RedisCallbacksRESP2, 

32 _RedisCallbacksRESP3, 

33) 

34from redis.asyncio.client import ResponseCallbackT 

35from redis.asyncio.connection import Connection, SSLConnection, parse_url 

36from redis.asyncio.lock import Lock 

37from redis.asyncio.retry import Retry 

38from redis.auth.token import TokenInterface 

39from redis.backoff import ExponentialWithJitterBackoff, NoBackoff 

40from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis 

41from redis.cluster import ( 

42 PIPELINE_BLOCKED_COMMANDS, 

43 PRIMARY, 

44 REPLICA, 

45 SLOT_ID, 

46 AbstractRedisCluster, 

47 LoadBalancer, 

48 LoadBalancingStrategy, 

49 block_pipeline_command, 

50 get_node_name, 

51 parse_cluster_slots, 

52) 

53from redis.commands import READ_COMMANDS, AsyncRedisClusterCommands 

54from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

55from redis.credentials import CredentialProvider 

56from redis.event import AfterAsyncClusterInstantiationEvent, EventDispatcher 

57from redis.exceptions import ( 

58 AskError, 

59 BusyLoadingError, 

60 ClusterDownError, 

61 ClusterError, 

62 ConnectionError, 

63 CrossSlotTransactionError, 

64 DataError, 

65 ExecAbortError, 

66 InvalidPipelineStack, 

67 MaxConnectionsError, 

68 MovedError, 

69 RedisClusterException, 

70 RedisError, 

71 ResponseError, 

72 SlotNotCoveredError, 

73 TimeoutError, 

74 TryAgainError, 

75 WatchError, 

76) 

77from redis.typing import AnyKeyT, EncodableT, KeyT 

78from redis.utils import ( 

79 SSL_AVAILABLE, 

80 deprecated_args, 

81 deprecated_function, 

82 get_lib_version, 

83 safe_str, 

84 str_if_bytes, 

85 truncate_text, 

86) 

87 

88if SSL_AVAILABLE: 

89 from ssl import TLSVersion, VerifyFlags, VerifyMode 

90else: 

91 TLSVersion = None 

92 VerifyMode = None 

93 VerifyFlags = None 

94 

95TargetNodesT = TypeVar( 

96 "TargetNodesT", str, "ClusterNode", List["ClusterNode"], Dict[Any, "ClusterNode"] 

97) 

98 

99 

100class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands): 

101 """ 

102 Create a new RedisCluster client. 

103 

104 Pass one of parameters: 

105 

106 - `host` & `port` 

107 - `startup_nodes` 

108 

109 | Use ``await`` :meth:`initialize` to find cluster nodes & create connections. 

110 | Use ``await`` :meth:`close` to disconnect connections & close client. 

111 

112 Many commands support the target_nodes kwarg. It can be one of the 

113 :attr:`NODE_FLAGS`: 

114 

115 - :attr:`PRIMARIES` 

116 - :attr:`REPLICAS` 

117 - :attr:`ALL_NODES` 

118 - :attr:`RANDOM` 

119 - :attr:`DEFAULT_NODE` 

120 

121 Note: This client is not thread/process/fork safe. 

122 

123 :param host: 

124 | Can be used to point to a startup node 

125 :param port: 

126 | Port used if **host** is provided 

127 :param startup_nodes: 

128 | :class:`~.ClusterNode` to used as a startup node 

129 :param require_full_coverage: 

130 | When set to ``False``: the client will not require a full coverage of 

131 the slots. However, if not all slots are covered, and at least one node 

132 has ``cluster-require-full-coverage`` set to ``yes``, the server will throw 

133 a :class:`~.ClusterDownError` for some key-based commands. 

134 | When set to ``True``: all slots must be covered to construct the cluster 

135 client. If not all slots are covered, :class:`~.RedisClusterException` will be 

136 thrown. 

137 | See: 

138 https://redis.io/docs/manual/scaling/#redis-cluster-configuration-parameters 

139 :param read_from_replicas: 

140 | @deprecated - please use load_balancing_strategy instead 

141 | Enable read from replicas in READONLY mode. 

142 When set to true, read commands will be assigned between the primary and 

143 its replications in a Round-Robin manner. 

144 The data read from replicas is eventually consistent with the data in primary nodes. 

145 :param load_balancing_strategy: 

146 | Enable read from replicas in READONLY mode and defines the load balancing 

147 strategy that will be used for cluster node selection. 

148 The data read from replicas is eventually consistent with the data in primary nodes. 

149 :param dynamic_startup_nodes: 

150 | Set the RedisCluster's startup nodes to all the discovered nodes. 

151 If true (default value), the cluster's discovered nodes will be used to 

152 determine the cluster nodes-slots mapping in the next topology refresh. 

153 It will remove the initial passed startup nodes if their endpoints aren't 

154 listed in the CLUSTER SLOTS output. 

155 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

156 specific IP addresses, it is best to set it to false. 

157 :param reinitialize_steps: 

158 | Specifies the number of MOVED errors that need to occur before reinitializing 

159 the whole cluster topology. If a MOVED error occurs and the cluster does not 

160 need to be reinitialized on this current error handling, only the MOVED slot 

161 will be patched with the redirected node. 

162 To reinitialize the cluster on every MOVED error, set reinitialize_steps to 1. 

163 To avoid reinitializing the cluster on moved errors, set reinitialize_steps to 

164 0. 

165 :param cluster_error_retry_attempts: 

166 | @deprecated - Please configure the 'retry' object instead 

167 In case 'retry' object is set - this argument is ignored! 

168 

169 Number of times to retry before raising an error when :class:`~.TimeoutError`, 

170 :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` 

171 or :class:`~.ClusterDownError` are encountered 

172 :param retry: 

173 | A retry object that defines the retry strategy and the number of 

174 retries for the cluster client. 

175 In current implementation for the cluster client (starting form redis-py version 6.0.0) 

176 the retry object is not yet fully utilized, instead it is used just to determine 

177 the number of retries for the cluster client. 

178 In the future releases the retry object will be used to handle the cluster client retries! 

179 :param max_connections: 

180 | Maximum number of connections per node. If there are no free connections & the 

181 maximum number of connections are already created, a 

182 :class:`~.MaxConnectionsError` is raised. 

183 :param address_remap: 

184 | An optional callable which, when provided with an internal network 

185 address of a node, e.g. a `(host, port)` tuple, will return the address 

186 where the node is reachable. This can be used to map the addresses at 

187 which the nodes _think_ they are, to addresses at which a client may 

188 reach them, such as when they sit behind a proxy. 

189 

190 | Rest of the arguments will be passed to the 

191 :class:`~redis.asyncio.connection.Connection` instances when created 

192 

193 :raises RedisClusterException: 

194 if any arguments are invalid or unknown. Eg: 

195 

196 - `db` != 0 or None 

197 - `path` argument for unix socket connection 

198 - none of the `host`/`port` & `startup_nodes` were provided 

199 

200 """ 

201 

202 @classmethod 

203 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": 

204 """ 

205 Return a Redis client object configured from the given URL. 

206 

207 For example:: 

208 

209 redis://[[username]:[password]]@localhost:6379/0 

210 rediss://[[username]:[password]]@localhost:6379/0 

211 

212 Three URL schemes are supported: 

213 

214 - `redis://` creates a TCP socket connection. See more at: 

215 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

216 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

217 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

218 

219 The username, password, hostname, path and all querystring values are passed 

220 through ``urllib.parse.unquote`` in order to replace any percent-encoded values 

221 with their corresponding characters. 

222 

223 All querystring options are cast to their appropriate Python types. Boolean 

224 arguments can be specified with string values "True"/"False" or "Yes"/"No". 

225 Values that cannot be properly cast cause a ``ValueError`` to be raised. Once 

226 parsed, the querystring arguments and keyword arguments are passed to 

227 :class:`~redis.asyncio.connection.Connection` when created. 

228 In the case of conflicting arguments, querystring arguments are used. 

229 """ 

230 kwargs.update(parse_url(url)) 

231 if kwargs.pop("connection_class", None) is SSLConnection: 

232 kwargs["ssl"] = True 

233 return cls(**kwargs) 

234 

235 __slots__ = ( 

236 "_initialize", 

237 "_lock", 

238 "retry", 

239 "command_flags", 

240 "commands_parser", 

241 "connection_kwargs", 

242 "encoder", 

243 "node_flags", 

244 "nodes_manager", 

245 "read_from_replicas", 

246 "reinitialize_counter", 

247 "reinitialize_steps", 

248 "response_callbacks", 

249 "result_callbacks", 

250 ) 

251 

252 @deprecated_args( 

253 args_to_warn=["read_from_replicas"], 

254 reason="Please configure the 'load_balancing_strategy' instead", 

255 version="5.3.0", 

256 ) 

257 @deprecated_args( 

258 args_to_warn=[ 

259 "cluster_error_retry_attempts", 

260 ], 

261 reason="Please configure the 'retry' object instead", 

262 version="6.0.0", 

263 ) 

264 def __init__( 

265 self, 

266 host: Optional[str] = None, 

267 port: Union[str, int] = 6379, 

268 # Cluster related kwargs 

269 startup_nodes: Optional[List["ClusterNode"]] = None, 

270 require_full_coverage: bool = True, 

271 read_from_replicas: bool = False, 

272 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

273 dynamic_startup_nodes: bool = True, 

274 reinitialize_steps: int = 5, 

275 cluster_error_retry_attempts: int = 3, 

276 max_connections: int = 2**31, 

277 retry: Optional["Retry"] = None, 

278 retry_on_error: Optional[List[Type[Exception]]] = None, 

279 # Client related kwargs 

280 db: Union[str, int] = 0, 

281 path: Optional[str] = None, 

282 credential_provider: Optional[CredentialProvider] = None, 

283 username: Optional[str] = None, 

284 password: Optional[str] = None, 

285 client_name: Optional[str] = None, 

286 lib_name: Optional[str] = "redis-py", 

287 lib_version: Optional[str] = get_lib_version(), 

288 # Encoding related kwargs 

289 encoding: str = "utf-8", 

290 encoding_errors: str = "strict", 

291 decode_responses: bool = False, 

292 # Connection related kwargs 

293 health_check_interval: float = 0, 

294 socket_connect_timeout: Optional[float] = None, 

295 socket_keepalive: bool = False, 

296 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

297 socket_timeout: Optional[float] = None, 

298 # SSL related kwargs 

299 ssl: bool = False, 

300 ssl_ca_certs: Optional[str] = None, 

301 ssl_ca_data: Optional[str] = None, 

302 ssl_cert_reqs: Union[str, VerifyMode] = "required", 

303 ssl_include_verify_flags: Optional[List[VerifyFlags]] = None, 

304 ssl_exclude_verify_flags: Optional[List[VerifyFlags]] = None, 

305 ssl_certfile: Optional[str] = None, 

306 ssl_check_hostname: bool = True, 

307 ssl_keyfile: Optional[str] = None, 

308 ssl_min_version: Optional[TLSVersion] = None, 

309 ssl_ciphers: Optional[str] = None, 

310 protocol: Optional[int] = 2, 

311 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

312 event_dispatcher: Optional[EventDispatcher] = None, 

313 ) -> None: 

314 if db: 

315 raise RedisClusterException( 

316 "Argument 'db' must be 0 or None in cluster mode" 

317 ) 

318 

319 if path: 

320 raise RedisClusterException( 

321 "Unix domain socket is not supported in cluster mode" 

322 ) 

323 

324 if (not host or not port) and not startup_nodes: 

325 raise RedisClusterException( 

326 "RedisCluster requires at least one node to discover the cluster.\n" 

327 "Please provide one of the following or use RedisCluster.from_url:\n" 

328 ' - host and port: RedisCluster(host="localhost", port=6379)\n' 

329 " - startup_nodes: RedisCluster(startup_nodes=[" 

330 'ClusterNode("localhost", 6379), ClusterNode("localhost", 6380)])' 

331 ) 

332 

333 kwargs: Dict[str, Any] = { 

334 "max_connections": max_connections, 

335 "connection_class": Connection, 

336 # Client related kwargs 

337 "credential_provider": credential_provider, 

338 "username": username, 

339 "password": password, 

340 "client_name": client_name, 

341 "lib_name": lib_name, 

342 "lib_version": lib_version, 

343 # Encoding related kwargs 

344 "encoding": encoding, 

345 "encoding_errors": encoding_errors, 

346 "decode_responses": decode_responses, 

347 # Connection related kwargs 

348 "health_check_interval": health_check_interval, 

349 "socket_connect_timeout": socket_connect_timeout, 

350 "socket_keepalive": socket_keepalive, 

351 "socket_keepalive_options": socket_keepalive_options, 

352 "socket_timeout": socket_timeout, 

353 "protocol": protocol, 

354 } 

355 

356 if ssl: 

357 # SSL related kwargs 

358 kwargs.update( 

359 { 

360 "connection_class": SSLConnection, 

361 "ssl_ca_certs": ssl_ca_certs, 

362 "ssl_ca_data": ssl_ca_data, 

363 "ssl_cert_reqs": ssl_cert_reqs, 

364 "ssl_include_verify_flags": ssl_include_verify_flags, 

365 "ssl_exclude_verify_flags": ssl_exclude_verify_flags, 

366 "ssl_certfile": ssl_certfile, 

367 "ssl_check_hostname": ssl_check_hostname, 

368 "ssl_keyfile": ssl_keyfile, 

369 "ssl_min_version": ssl_min_version, 

370 "ssl_ciphers": ssl_ciphers, 

371 } 

372 ) 

373 

374 if read_from_replicas or load_balancing_strategy: 

375 # Call our on_connect function to configure READONLY mode 

376 kwargs["redis_connect_func"] = self.on_connect 

377 

378 if retry: 

379 self.retry = retry 

380 else: 

381 self.retry = Retry( 

382 backoff=ExponentialWithJitterBackoff(base=1, cap=10), 

383 retries=cluster_error_retry_attempts, 

384 ) 

385 if retry_on_error: 

386 self.retry.update_supported_errors(retry_on_error) 

387 

388 kwargs["response_callbacks"] = _RedisCallbacks.copy() 

389 if kwargs.get("protocol") in ["3", 3]: 

390 kwargs["response_callbacks"].update(_RedisCallbacksRESP3) 

391 else: 

392 kwargs["response_callbacks"].update(_RedisCallbacksRESP2) 

393 self.connection_kwargs = kwargs 

394 

395 if startup_nodes: 

396 passed_nodes = [] 

397 for node in startup_nodes: 

398 passed_nodes.append( 

399 ClusterNode(node.host, node.port, **self.connection_kwargs) 

400 ) 

401 startup_nodes = passed_nodes 

402 else: 

403 startup_nodes = [] 

404 if host and port: 

405 startup_nodes.append(ClusterNode(host, port, **self.connection_kwargs)) 

406 

407 if event_dispatcher is None: 

408 self._event_dispatcher = EventDispatcher() 

409 else: 

410 self._event_dispatcher = event_dispatcher 

411 

412 self.startup_nodes = startup_nodes 

413 self.nodes_manager = NodesManager( 

414 startup_nodes, 

415 require_full_coverage, 

416 kwargs, 

417 dynamic_startup_nodes=dynamic_startup_nodes, 

418 address_remap=address_remap, 

419 event_dispatcher=self._event_dispatcher, 

420 ) 

421 self.encoder = Encoder(encoding, encoding_errors, decode_responses) 

422 self.read_from_replicas = read_from_replicas 

423 self.load_balancing_strategy = load_balancing_strategy 

424 self.reinitialize_steps = reinitialize_steps 

425 self.reinitialize_counter = 0 

426 self.commands_parser = AsyncCommandsParser() 

427 self.node_flags = self.__class__.NODE_FLAGS.copy() 

428 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

429 self.response_callbacks = kwargs["response_callbacks"] 

430 self.result_callbacks = self.__class__.RESULT_CALLBACKS.copy() 

431 self.result_callbacks["CLUSTER SLOTS"] = ( 

432 lambda cmd, res, **kwargs: parse_cluster_slots( 

433 list(res.values())[0], **kwargs 

434 ) 

435 ) 

436 

437 self._initialize = True 

438 self._lock: Optional[asyncio.Lock] = None 

439 

440 # When used as an async context manager, we need to increment and decrement 

441 # a usage counter so that we can close the connection pool when no one is 

442 # using the client. 

443 self._usage_counter = 0 

444 self._usage_lock = asyncio.Lock() 

445 

446 async def initialize(self) -> "RedisCluster": 

447 """Get all nodes from startup nodes & creates connections if not initialized.""" 

448 if self._initialize: 

449 if not self._lock: 

450 self._lock = asyncio.Lock() 

451 async with self._lock: 

452 if self._initialize: 

453 try: 

454 await self.nodes_manager.initialize() 

455 await self.commands_parser.initialize( 

456 self.nodes_manager.default_node 

457 ) 

458 self._initialize = False 

459 except BaseException: 

460 await self.nodes_manager.aclose() 

461 await self.nodes_manager.aclose("startup_nodes") 

462 raise 

463 return self 

464 

465 async def aclose(self) -> None: 

466 """Close all connections & client if initialized.""" 

467 if not self._initialize: 

468 if not self._lock: 

469 self._lock = asyncio.Lock() 

470 async with self._lock: 

471 if not self._initialize: 

472 self._initialize = True 

473 await self.nodes_manager.aclose() 

474 await self.nodes_manager.aclose("startup_nodes") 

475 

476 @deprecated_function(version="5.0.0", reason="Use aclose() instead", name="close") 

477 async def close(self) -> None: 

478 """alias for aclose() for backwards compatibility""" 

479 await self.aclose() 

480 

481 async def __aenter__(self) -> "RedisCluster": 

482 """ 

483 Async context manager entry. Increments a usage counter so that the 

484 connection pool is only closed (via aclose()) when no context is using 

485 the client. 

486 """ 

487 await self._increment_usage() 

488 try: 

489 # Initialize the client (i.e. establish connection, etc.) 

490 return await self.initialize() 

491 except Exception: 

492 # If initialization fails, decrement the counter to keep it in sync 

493 await self._decrement_usage() 

494 raise 

495 

496 async def _increment_usage(self) -> int: 

497 """ 

498 Helper coroutine to increment the usage counter while holding the lock. 

499 Returns the new value of the usage counter. 

500 """ 

501 async with self._usage_lock: 

502 self._usage_counter += 1 

503 return self._usage_counter 

504 

505 async def _decrement_usage(self) -> int: 

506 """ 

507 Helper coroutine to decrement the usage counter while holding the lock. 

508 Returns the new value of the usage counter. 

509 """ 

510 async with self._usage_lock: 

511 self._usage_counter -= 1 

512 return self._usage_counter 

513 

514 async def __aexit__(self, exc_type, exc_value, traceback): 

515 """ 

516 Async context manager exit. Decrements a usage counter. If this is the 

517 last exit (counter becomes zero), the client closes its connection pool. 

518 """ 

519 current_usage = await asyncio.shield(self._decrement_usage()) 

520 if current_usage == 0: 

521 # This was the last active context, so disconnect the pool. 

522 await asyncio.shield(self.aclose()) 

523 

524 def __await__(self) -> Generator[Any, None, "RedisCluster"]: 

525 return self.initialize().__await__() 

526 

527 _DEL_MESSAGE = "Unclosed RedisCluster client" 

528 

529 def __del__( 

530 self, 

531 _warn: Any = warnings.warn, 

532 _grl: Any = asyncio.get_running_loop, 

533 ) -> None: 

534 if hasattr(self, "_initialize") and not self._initialize: 

535 _warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self) 

536 try: 

537 context = {"client": self, "message": self._DEL_MESSAGE} 

538 _grl().call_exception_handler(context) 

539 except RuntimeError: 

540 pass 

541 

542 async def on_connect(self, connection: Connection) -> None: 

543 await connection.on_connect() 

544 

545 # Sending READONLY command to server to configure connection as 

546 # readonly. Since each cluster node may change its server type due 

547 # to a failover, we should establish a READONLY connection 

548 # regardless of the server type. If this is a primary connection, 

549 # READONLY would not affect executing write commands. 

550 await connection.send_command("READONLY") 

551 if str_if_bytes(await connection.read_response()) != "OK": 

552 raise ConnectionError("READONLY command failed") 

553 

554 def get_nodes(self) -> List["ClusterNode"]: 

555 """Get all nodes of the cluster.""" 

556 return list(self.nodes_manager.nodes_cache.values()) 

557 

558 def get_primaries(self) -> List["ClusterNode"]: 

559 """Get the primary nodes of the cluster.""" 

560 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

561 

562 def get_replicas(self) -> List["ClusterNode"]: 

563 """Get the replica nodes of the cluster.""" 

564 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

565 

566 def get_random_node(self) -> "ClusterNode": 

567 """Get a random node of the cluster.""" 

568 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

569 

570 def get_default_node(self) -> "ClusterNode": 

571 """Get the default node of the client.""" 

572 return self.nodes_manager.default_node 

573 

574 def set_default_node(self, node: "ClusterNode") -> None: 

575 """ 

576 Set the default node of the client. 

577 

578 :raises DataError: if None is passed or node does not exist in cluster. 

579 """ 

580 if not node or not self.get_node(node_name=node.name): 

581 raise DataError("The requested node does not exist in the cluster.") 

582 

583 self.nodes_manager.default_node = node 

584 

585 def get_node( 

586 self, 

587 host: Optional[str] = None, 

588 port: Optional[int] = None, 

589 node_name: Optional[str] = None, 

590 ) -> Optional["ClusterNode"]: 

591 """Get node by (host, port) or node_name.""" 

592 return self.nodes_manager.get_node(host, port, node_name) 

593 

594 def get_node_from_key( 

595 self, key: str, replica: bool = False 

596 ) -> Optional["ClusterNode"]: 

597 """ 

598 Get the cluster node corresponding to the provided key. 

599 

600 :param key: 

601 :param replica: 

602 | Indicates if a replica should be returned 

603 | 

604 None will returned if no replica holds this key 

605 

606 :raises SlotNotCoveredError: if the key is not covered by any slot. 

607 """ 

608 slot = self.keyslot(key) 

609 slot_cache = self.nodes_manager.slots_cache.get(slot) 

610 if not slot_cache: 

611 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

612 

613 if replica: 

614 if len(self.nodes_manager.slots_cache[slot]) < 2: 

615 return None 

616 node_idx = 1 

617 else: 

618 node_idx = 0 

619 

620 return slot_cache[node_idx] 

621 

622 def keyslot(self, key: EncodableT) -> int: 

623 """ 

624 Find the keyslot for a given key. 

625 

626 See: https://redis.io/docs/manual/scaling/#redis-cluster-data-sharding 

627 """ 

628 return key_slot(self.encoder.encode(key)) 

629 

630 def get_encoder(self) -> Encoder: 

631 """Get the encoder object of the client.""" 

632 return self.encoder 

633 

634 def get_connection_kwargs(self) -> Dict[str, Optional[Any]]: 

635 """Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`.""" 

636 return self.connection_kwargs 

637 

638 def set_retry(self, retry: Retry) -> None: 

639 self.retry = retry 

640 

641 def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None: 

642 """Set a custom response callback.""" 

643 self.response_callbacks[command] = callback 

644 

645 async def _determine_nodes( 

646 self, command: str, *args: Any, node_flag: Optional[str] = None 

647 ) -> List["ClusterNode"]: 

648 # Determine which nodes should be executed the command on. 

649 # Returns a list of target nodes. 

650 if not node_flag: 

651 # get the nodes group for this command if it was predefined 

652 node_flag = self.command_flags.get(command) 

653 

654 if node_flag in self.node_flags: 

655 if node_flag == self.__class__.DEFAULT_NODE: 

656 # return the cluster's default node 

657 return [self.nodes_manager.default_node] 

658 if node_flag == self.__class__.PRIMARIES: 

659 # return all primaries 

660 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

661 if node_flag == self.__class__.REPLICAS: 

662 # return all replicas 

663 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

664 if node_flag == self.__class__.ALL_NODES: 

665 # return all nodes 

666 return list(self.nodes_manager.nodes_cache.values()) 

667 if node_flag == self.__class__.RANDOM: 

668 # return a random node 

669 return [random.choice(list(self.nodes_manager.nodes_cache.values()))] 

670 

671 # get the node that holds the key's slot 

672 return [ 

673 self.nodes_manager.get_node_from_slot( 

674 await self._determine_slot(command, *args), 

675 self.read_from_replicas and command in READ_COMMANDS, 

676 self.load_balancing_strategy if command in READ_COMMANDS else None, 

677 ) 

678 ] 

679 

680 async def _determine_slot(self, command: str, *args: Any) -> int: 

681 if self.command_flags.get(command) == SLOT_ID: 

682 # The command contains the slot ID 

683 return int(args[0]) 

684 

685 # Get the keys in the command 

686 

687 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

688 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

689 # where `self._get_command_keys()` fails anyway. So, we special case 

690 # EVAL/EVALSHA. 

691 # - issue: https://github.com/redis/redis/issues/9493 

692 # - fix: https://github.com/redis/redis/pull/9733 

693 if command.upper() in ("EVAL", "EVALSHA"): 

694 # command syntax: EVAL "script body" num_keys ... 

695 if len(args) < 2: 

696 raise RedisClusterException( 

697 f"Invalid args in command: {command, *args}" 

698 ) 

699 keys = args[2 : 2 + int(args[1])] 

700 # if there are 0 keys, that means the script can be run on any node 

701 # so we can just return a random slot 

702 if not keys: 

703 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

704 else: 

705 keys = await self.commands_parser.get_keys(command, *args) 

706 if not keys: 

707 # FCALL can call a function with 0 keys, that means the function 

708 # can be run on any node so we can just return a random slot 

709 if command.upper() in ("FCALL", "FCALL_RO"): 

710 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

711 raise RedisClusterException( 

712 "No way to dispatch this command to Redis Cluster. " 

713 "Missing key.\nYou can execute the command by specifying " 

714 f"target nodes.\nCommand: {args}" 

715 ) 

716 

717 # single key command 

718 if len(keys) == 1: 

719 return self.keyslot(keys[0]) 

720 

721 # multi-key command; we need to make sure all keys are mapped to 

722 # the same slot 

723 slots = {self.keyslot(key) for key in keys} 

724 if len(slots) != 1: 

725 raise RedisClusterException( 

726 f"{command} - all keys must map to the same key slot" 

727 ) 

728 

729 return slots.pop() 

730 

731 def _is_node_flag(self, target_nodes: Any) -> bool: 

732 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

733 

734 def _parse_target_nodes(self, target_nodes: Any) -> List["ClusterNode"]: 

735 if isinstance(target_nodes, list): 

736 nodes = target_nodes 

737 elif isinstance(target_nodes, ClusterNode): 

738 # Supports passing a single ClusterNode as a variable 

739 nodes = [target_nodes] 

740 elif isinstance(target_nodes, dict): 

741 # Supports dictionaries of the format {node_name: node}. 

742 # It enables to execute commands with multi nodes as follows: 

743 # rc.cluster_save_config(rc.get_primaries()) 

744 nodes = list(target_nodes.values()) 

745 else: 

746 raise TypeError( 

747 "target_nodes type can be one of the following: " 

748 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

749 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

750 f"The passed type is {type(target_nodes)}" 

751 ) 

752 return nodes 

753 

754 async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: 

755 """ 

756 Execute a raw command on the appropriate cluster node or target_nodes. 

757 

758 It will retry the command as specified by the retries property of 

759 the :attr:`retry` & then raise an exception. 

760 

761 :param args: 

762 | Raw command args 

763 :param kwargs: 

764 

765 - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode` 

766 or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`] 

767 - Rest of the kwargs are passed to the Redis connection 

768 

769 :raises RedisClusterException: if target_nodes is not provided & the command 

770 can't be mapped to a slot 

771 """ 

772 command = args[0] 

773 target_nodes = [] 

774 target_nodes_specified = False 

775 retry_attempts = self.retry.get_retries() 

776 

777 passed_targets = kwargs.pop("target_nodes", None) 

778 if passed_targets and not self._is_node_flag(passed_targets): 

779 target_nodes = self._parse_target_nodes(passed_targets) 

780 target_nodes_specified = True 

781 retry_attempts = 0 

782 

783 # Add one for the first execution 

784 execute_attempts = 1 + retry_attempts 

785 for _ in range(execute_attempts): 

786 if self._initialize: 

787 await self.initialize() 

788 if ( 

789 len(target_nodes) == 1 

790 and target_nodes[0] == self.get_default_node() 

791 ): 

792 # Replace the default cluster node 

793 self.replace_default_node() 

794 try: 

795 if not target_nodes_specified: 

796 # Determine the nodes to execute the command on 

797 target_nodes = await self._determine_nodes( 

798 *args, node_flag=passed_targets 

799 ) 

800 if not target_nodes: 

801 raise RedisClusterException( 

802 f"No targets were found to execute {args} command on" 

803 ) 

804 

805 if len(target_nodes) == 1: 

806 # Return the processed result 

807 ret = await self._execute_command(target_nodes[0], *args, **kwargs) 

808 if command in self.result_callbacks: 

809 return self.result_callbacks[command]( 

810 command, {target_nodes[0].name: ret}, **kwargs 

811 ) 

812 return ret 

813 else: 

814 keys = [node.name for node in target_nodes] 

815 values = await asyncio.gather( 

816 *( 

817 asyncio.create_task( 

818 self._execute_command(node, *args, **kwargs) 

819 ) 

820 for node in target_nodes 

821 ) 

822 ) 

823 if command in self.result_callbacks: 

824 return self.result_callbacks[command]( 

825 command, dict(zip(keys, values)), **kwargs 

826 ) 

827 return dict(zip(keys, values)) 

828 except Exception as e: 

829 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

830 # The nodes and slots cache were should be reinitialized. 

831 # Try again with the new cluster setup. 

832 retry_attempts -= 1 

833 continue 

834 else: 

835 # raise the exception 

836 raise e 

837 

838 async def _execute_command( 

839 self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any 

840 ) -> Any: 

841 asking = moved = False 

842 redirect_addr = None 

843 ttl = self.RedisClusterRequestTTL 

844 

845 while ttl > 0: 

846 ttl -= 1 

847 try: 

848 if asking: 

849 target_node = self.get_node(node_name=redirect_addr) 

850 await target_node.execute_command("ASKING") 

851 asking = False 

852 elif moved: 

853 # MOVED occurred and the slots cache was updated, 

854 # refresh the target node 

855 slot = await self._determine_slot(*args) 

856 target_node = self.nodes_manager.get_node_from_slot( 

857 slot, 

858 self.read_from_replicas and args[0] in READ_COMMANDS, 

859 self.load_balancing_strategy 

860 if args[0] in READ_COMMANDS 

861 else None, 

862 ) 

863 moved = False 

864 

865 return await target_node.execute_command(*args, **kwargs) 

866 except BusyLoadingError: 

867 raise 

868 except MaxConnectionsError: 

869 # MaxConnectionsError indicates client-side resource exhaustion 

870 # (too many connections in the pool), not a node failure. 

871 # Don't treat this as a node failure - just re-raise the error 

872 # without reinitializing the cluster. 

873 raise 

874 except (ConnectionError, TimeoutError): 

875 # Connection retries are being handled in the node's 

876 # Retry object. 

877 # Remove the failed node from the startup nodes before we try 

878 # to reinitialize the cluster 

879 self.nodes_manager.startup_nodes.pop(target_node.name, None) 

880 # Hard force of reinitialize of the node/slots setup 

881 # and try again with the new setup 

882 await self.aclose() 

883 raise 

884 except (ClusterDownError, SlotNotCoveredError): 

885 # ClusterDownError can occur during a failover and to get 

886 # self-healed, we will try to reinitialize the cluster layout 

887 # and retry executing the command 

888 

889 # SlotNotCoveredError can occur when the cluster is not fully 

890 # initialized or can be temporary issue. 

891 # We will try to reinitialize the cluster topology 

892 # and retry executing the command 

893 

894 await self.aclose() 

895 await asyncio.sleep(0.25) 

896 raise 

897 except MovedError as e: 

898 # First, we will try to patch the slots/nodes cache with the 

899 # redirected node output and try again. If MovedError exceeds 

900 # 'reinitialize_steps' number of times, we will force 

901 # reinitializing the tables, and then try again. 

902 # 'reinitialize_steps' counter will increase faster when 

903 # the same client object is shared between multiple threads. To 

904 # reduce the frequency you can set this variable in the 

905 # RedisCluster constructor. 

906 self.reinitialize_counter += 1 

907 if ( 

908 self.reinitialize_steps 

909 and self.reinitialize_counter % self.reinitialize_steps == 0 

910 ): 

911 await self.aclose() 

912 # Reset the counter 

913 self.reinitialize_counter = 0 

914 else: 

915 self.nodes_manager._moved_exception = e 

916 moved = True 

917 except AskError as e: 

918 redirect_addr = get_node_name(host=e.host, port=e.port) 

919 asking = True 

920 except TryAgainError: 

921 if ttl < self.RedisClusterRequestTTL / 2: 

922 await asyncio.sleep(0.05) 

923 

924 raise ClusterError("TTL exhausted.") 

925 

926 def pipeline( 

927 self, transaction: Optional[Any] = None, shard_hint: Optional[Any] = None 

928 ) -> "ClusterPipeline": 

929 """ 

930 Create & return a new :class:`~.ClusterPipeline` object. 

931 

932 Cluster implementation of pipeline does not support transaction or shard_hint. 

933 

934 :raises RedisClusterException: if transaction or shard_hint are truthy values 

935 """ 

936 if shard_hint: 

937 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

938 

939 return ClusterPipeline(self, transaction) 

940 

941 def lock( 

942 self, 

943 name: KeyT, 

944 timeout: Optional[float] = None, 

945 sleep: float = 0.1, 

946 blocking: bool = True, 

947 blocking_timeout: Optional[float] = None, 

948 lock_class: Optional[Type[Lock]] = None, 

949 thread_local: bool = True, 

950 raise_on_release_error: bool = True, 

951 ) -> Lock: 

952 """ 

953 Return a new Lock object using key ``name`` that mimics 

954 the behavior of threading.Lock. 

955 

956 If specified, ``timeout`` indicates a maximum life for the lock. 

957 By default, it will remain locked until release() is called. 

958 

959 ``sleep`` indicates the amount of time to sleep per loop iteration 

960 when the lock is in blocking mode and another client is currently 

961 holding the lock. 

962 

963 ``blocking`` indicates whether calling ``acquire`` should block until 

964 the lock has been acquired or to fail immediately, causing ``acquire`` 

965 to return False and the lock not being acquired. Defaults to True. 

966 Note this value can be overridden by passing a ``blocking`` 

967 argument to ``acquire``. 

968 

969 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

970 spend trying to acquire the lock. A value of ``None`` indicates 

971 continue trying forever. ``blocking_timeout`` can be specified as a 

972 float or integer, both representing the number of seconds to wait. 

973 

974 ``lock_class`` forces the specified lock implementation. Note that as 

975 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

976 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

977 you have created your own custom lock class. 

978 

979 ``thread_local`` indicates whether the lock token is placed in 

980 thread-local storage. By default, the token is placed in thread local 

981 storage so that a thread only sees its token, not a token set by 

982 another thread. Consider the following timeline: 

983 

984 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

985 thread-1 sets the token to "abc" 

986 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

987 Lock instance. 

988 time: 5, thread-1 has not yet completed. redis expires the lock 

989 key. 

990 time: 5, thread-2 acquired `my-lock` now that it's available. 

991 thread-2 sets the token to "xyz" 

992 time: 6, thread-1 finishes its work and calls release(). if the 

993 token is *not* stored in thread local storage, then 

994 thread-1 would see the token value as "xyz" and would be 

995 able to successfully release the thread-2's lock. 

996 

997 ``raise_on_release_error`` indicates whether to raise an exception when 

998 the lock is no longer owned when exiting the context manager. By default, 

999 this is True, meaning an exception will be raised. If False, the warning 

1000 will be logged and the exception will be suppressed. 

1001 

1002 In some use cases it's necessary to disable thread local storage. For 

1003 example, if you have code where one thread acquires a lock and passes 

1004 that lock instance to a worker thread to release later. If thread 

1005 local storage isn't disabled in this case, the worker thread won't see 

1006 the token set by the thread that acquired the lock. Our assumption 

1007 is that these cases aren't common and as such default to using 

1008 thread local storage.""" 

1009 if lock_class is None: 

1010 lock_class = Lock 

1011 return lock_class( 

1012 self, 

1013 name, 

1014 timeout=timeout, 

1015 sleep=sleep, 

1016 blocking=blocking, 

1017 blocking_timeout=blocking_timeout, 

1018 thread_local=thread_local, 

1019 raise_on_release_error=raise_on_release_error, 

1020 ) 

1021 

1022 async def transaction( 

1023 self, func: Coroutine[None, "ClusterPipeline", Any], *watches, **kwargs 

1024 ): 

1025 """ 

1026 Convenience method for executing the callable `func` as a transaction 

1027 while watching all keys specified in `watches`. The 'func' callable 

1028 should expect a single argument which is a Pipeline object. 

1029 """ 

1030 shard_hint = kwargs.pop("shard_hint", None) 

1031 value_from_callable = kwargs.pop("value_from_callable", False) 

1032 watch_delay = kwargs.pop("watch_delay", None) 

1033 async with self.pipeline(True, shard_hint) as pipe: 

1034 while True: 

1035 try: 

1036 if watches: 

1037 await pipe.watch(*watches) 

1038 func_value = await func(pipe) 

1039 exec_value = await pipe.execute() 

1040 return func_value if value_from_callable else exec_value 

1041 except WatchError: 

1042 if watch_delay is not None and watch_delay > 0: 

1043 time.sleep(watch_delay) 

1044 continue 

1045 

1046 

1047class ClusterNode: 

1048 """ 

1049 Create a new ClusterNode. 

1050 

1051 Each ClusterNode manages multiple :class:`~redis.asyncio.connection.Connection` 

1052 objects for the (host, port). 

1053 """ 

1054 

1055 __slots__ = ( 

1056 "_connections", 

1057 "_free", 

1058 "_lock", 

1059 "_event_dispatcher", 

1060 "connection_class", 

1061 "connection_kwargs", 

1062 "host", 

1063 "max_connections", 

1064 "name", 

1065 "port", 

1066 "response_callbacks", 

1067 "server_type", 

1068 ) 

1069 

1070 def __init__( 

1071 self, 

1072 host: str, 

1073 port: Union[str, int], 

1074 server_type: Optional[str] = None, 

1075 *, 

1076 max_connections: int = 2**31, 

1077 connection_class: Type[Connection] = Connection, 

1078 **connection_kwargs: Any, 

1079 ) -> None: 

1080 if host == "localhost": 

1081 host = socket.gethostbyname(host) 

1082 

1083 connection_kwargs["host"] = host 

1084 connection_kwargs["port"] = port 

1085 self.host = host 

1086 self.port = port 

1087 self.name = get_node_name(host, port) 

1088 self.server_type = server_type 

1089 

1090 self.max_connections = max_connections 

1091 self.connection_class = connection_class 

1092 self.connection_kwargs = connection_kwargs 

1093 self.response_callbacks = connection_kwargs.pop("response_callbacks", {}) 

1094 

1095 self._connections: List[Connection] = [] 

1096 self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections) 

1097 self._event_dispatcher = self.connection_kwargs.get("event_dispatcher", None) 

1098 if self._event_dispatcher is None: 

1099 self._event_dispatcher = EventDispatcher() 

1100 

1101 def __repr__(self) -> str: 

1102 return ( 

1103 f"[host={self.host}, port={self.port}, " 

1104 f"name={self.name}, server_type={self.server_type}]" 

1105 ) 

1106 

1107 def __eq__(self, obj: Any) -> bool: 

1108 return isinstance(obj, ClusterNode) and obj.name == self.name 

1109 

1110 _DEL_MESSAGE = "Unclosed ClusterNode object" 

1111 

1112 def __del__( 

1113 self, 

1114 _warn: Any = warnings.warn, 

1115 _grl: Any = asyncio.get_running_loop, 

1116 ) -> None: 

1117 for connection in self._connections: 

1118 if connection.is_connected: 

1119 _warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self) 

1120 

1121 try: 

1122 context = {"client": self, "message": self._DEL_MESSAGE} 

1123 _grl().call_exception_handler(context) 

1124 except RuntimeError: 

1125 pass 

1126 break 

1127 

1128 async def disconnect(self) -> None: 

1129 ret = await asyncio.gather( 

1130 *( 

1131 asyncio.create_task(connection.disconnect()) 

1132 for connection in self._connections 

1133 ), 

1134 return_exceptions=True, 

1135 ) 

1136 exc = next((res for res in ret if isinstance(res, Exception)), None) 

1137 if exc: 

1138 raise exc 

1139 

1140 def acquire_connection(self) -> Connection: 

1141 try: 

1142 return self._free.popleft() 

1143 except IndexError: 

1144 if len(self._connections) < self.max_connections: 

1145 # We are configuring the connection pool not to retry 

1146 # connections on lower level clients to avoid retrying 

1147 # connections to nodes that are not reachable 

1148 # and to avoid blocking the connection pool. 

1149 # The only error that will have some handling in the lower 

1150 # level clients is ConnectionError which will trigger disconnection 

1151 # of the socket. 

1152 # The retries will be handled on cluster client level 

1153 # where we will have proper handling of the cluster topology 

1154 retry = Retry( 

1155 backoff=NoBackoff(), 

1156 retries=0, 

1157 supported_errors=(ConnectionError,), 

1158 ) 

1159 connection_kwargs = self.connection_kwargs.copy() 

1160 connection_kwargs["retry"] = retry 

1161 connection = self.connection_class(**connection_kwargs) 

1162 self._connections.append(connection) 

1163 return connection 

1164 

1165 raise MaxConnectionsError() 

1166 

1167 def release(self, connection: Connection) -> None: 

1168 """ 

1169 Release connection back to free queue. 

1170 """ 

1171 self._free.append(connection) 

1172 

1173 async def parse_response( 

1174 self, connection: Connection, command: str, **kwargs: Any 

1175 ) -> Any: 

1176 try: 

1177 if NEVER_DECODE in kwargs: 

1178 response = await connection.read_response(disable_decoding=True) 

1179 kwargs.pop(NEVER_DECODE) 

1180 else: 

1181 response = await connection.read_response() 

1182 except ResponseError: 

1183 if EMPTY_RESPONSE in kwargs: 

1184 return kwargs[EMPTY_RESPONSE] 

1185 raise 

1186 

1187 if EMPTY_RESPONSE in kwargs: 

1188 kwargs.pop(EMPTY_RESPONSE) 

1189 

1190 # Remove keys entry, it needs only for cache. 

1191 kwargs.pop("keys", None) 

1192 

1193 # Return response 

1194 if command in self.response_callbacks: 

1195 return self.response_callbacks[command](response, **kwargs) 

1196 

1197 return response 

1198 

1199 async def execute_command(self, *args: Any, **kwargs: Any) -> Any: 

1200 # Acquire connection 

1201 connection = self.acquire_connection() 

1202 

1203 # Execute command 

1204 await connection.send_packed_command(connection.pack_command(*args), False) 

1205 

1206 # Read response 

1207 try: 

1208 return await self.parse_response(connection, args[0], **kwargs) 

1209 finally: 

1210 # Release connection 

1211 self._free.append(connection) 

1212 

1213 async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: 

1214 # Acquire connection 

1215 connection = self.acquire_connection() 

1216 

1217 # Execute command 

1218 await connection.send_packed_command( 

1219 connection.pack_commands(cmd.args for cmd in commands), False 

1220 ) 

1221 

1222 # Read responses 

1223 ret = False 

1224 for cmd in commands: 

1225 try: 

1226 cmd.result = await self.parse_response( 

1227 connection, cmd.args[0], **cmd.kwargs 

1228 ) 

1229 except Exception as e: 

1230 cmd.result = e 

1231 ret = True 

1232 

1233 # Release connection 

1234 self._free.append(connection) 

1235 

1236 return ret 

1237 

1238 async def re_auth_callback(self, token: TokenInterface): 

1239 tmp_queue = collections.deque() 

1240 while self._free: 

1241 conn = self._free.popleft() 

1242 await conn.retry.call_with_retry( 

1243 lambda: conn.send_command( 

1244 "AUTH", token.try_get("oid"), token.get_value() 

1245 ), 

1246 lambda error: self._mock(error), 

1247 ) 

1248 await conn.retry.call_with_retry( 

1249 lambda: conn.read_response(), lambda error: self._mock(error) 

1250 ) 

1251 tmp_queue.append(conn) 

1252 

1253 while tmp_queue: 

1254 conn = tmp_queue.popleft() 

1255 self._free.append(conn) 

1256 

1257 async def _mock(self, error: RedisError): 

1258 """ 

1259 Dummy functions, needs to be passed as error callback to retry object. 

1260 :param error: 

1261 :return: 

1262 """ 

1263 pass 

1264 

1265 

1266class NodesManager: 

1267 __slots__ = ( 

1268 "_dynamic_startup_nodes", 

1269 "_moved_exception", 

1270 "_event_dispatcher", 

1271 "connection_kwargs", 

1272 "default_node", 

1273 "nodes_cache", 

1274 "read_load_balancer", 

1275 "require_full_coverage", 

1276 "slots_cache", 

1277 "startup_nodes", 

1278 "address_remap", 

1279 ) 

1280 

1281 def __init__( 

1282 self, 

1283 startup_nodes: List["ClusterNode"], 

1284 require_full_coverage: bool, 

1285 connection_kwargs: Dict[str, Any], 

1286 dynamic_startup_nodes: bool = True, 

1287 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

1288 event_dispatcher: Optional[EventDispatcher] = None, 

1289 ) -> None: 

1290 self.startup_nodes = {node.name: node for node in startup_nodes} 

1291 self.require_full_coverage = require_full_coverage 

1292 self.connection_kwargs = connection_kwargs 

1293 self.address_remap = address_remap 

1294 

1295 self.default_node: "ClusterNode" = None 

1296 self.nodes_cache: Dict[str, "ClusterNode"] = {} 

1297 self.slots_cache: Dict[int, List["ClusterNode"]] = {} 

1298 self.read_load_balancer = LoadBalancer() 

1299 

1300 self._dynamic_startup_nodes: bool = dynamic_startup_nodes 

1301 self._moved_exception: MovedError = None 

1302 if event_dispatcher is None: 

1303 self._event_dispatcher = EventDispatcher() 

1304 else: 

1305 self._event_dispatcher = event_dispatcher 

1306 

1307 def get_node( 

1308 self, 

1309 host: Optional[str] = None, 

1310 port: Optional[int] = None, 

1311 node_name: Optional[str] = None, 

1312 ) -> Optional["ClusterNode"]: 

1313 if host and port: 

1314 # the user passed host and port 

1315 if host == "localhost": 

1316 host = socket.gethostbyname(host) 

1317 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1318 elif node_name: 

1319 return self.nodes_cache.get(node_name) 

1320 else: 

1321 raise DataError( 

1322 "get_node requires one of the following: 1. node name 2. host and port" 

1323 ) 

1324 

1325 def set_nodes( 

1326 self, 

1327 old: Dict[str, "ClusterNode"], 

1328 new: Dict[str, "ClusterNode"], 

1329 remove_old: bool = False, 

1330 ) -> None: 

1331 if remove_old: 

1332 for name in list(old.keys()): 

1333 if name not in new: 

1334 task = asyncio.create_task(old.pop(name).disconnect()) # noqa 

1335 

1336 for name, node in new.items(): 

1337 if name in old: 

1338 if old[name] is node: 

1339 continue 

1340 task = asyncio.create_task(old[name].disconnect()) # noqa 

1341 old[name] = node 

1342 

1343 def update_moved_exception(self, exception): 

1344 self._moved_exception = exception 

1345 

1346 def _update_moved_slots(self) -> None: 

1347 e = self._moved_exception 

1348 redirected_node = self.get_node(host=e.host, port=e.port) 

1349 if redirected_node: 

1350 # The node already exists 

1351 if redirected_node.server_type != PRIMARY: 

1352 # Update the node's server type 

1353 redirected_node.server_type = PRIMARY 

1354 else: 

1355 # This is a new node, we will add it to the nodes cache 

1356 redirected_node = ClusterNode( 

1357 e.host, e.port, PRIMARY, **self.connection_kwargs 

1358 ) 

1359 self.set_nodes(self.nodes_cache, {redirected_node.name: redirected_node}) 

1360 if redirected_node in self.slots_cache[e.slot_id]: 

1361 # The MOVED error resulted from a failover, and the new slot owner 

1362 # had previously been a replica. 

1363 old_primary = self.slots_cache[e.slot_id][0] 

1364 # Update the old primary to be a replica and add it to the end of 

1365 # the slot's node list 

1366 old_primary.server_type = REPLICA 

1367 self.slots_cache[e.slot_id].append(old_primary) 

1368 # Remove the old replica, which is now a primary, from the slot's 

1369 # node list 

1370 self.slots_cache[e.slot_id].remove(redirected_node) 

1371 # Override the old primary with the new one 

1372 self.slots_cache[e.slot_id][0] = redirected_node 

1373 if self.default_node == old_primary: 

1374 # Update the default node with the new primary 

1375 self.default_node = redirected_node 

1376 else: 

1377 # The new slot owner is a new server, or a server from a different 

1378 # shard. We need to remove all current nodes from the slot's list 

1379 # (including replications) and add just the new node. 

1380 self.slots_cache[e.slot_id] = [redirected_node] 

1381 # Reset moved_exception 

1382 self._moved_exception = None 

1383 

1384 def get_node_from_slot( 

1385 self, 

1386 slot: int, 

1387 read_from_replicas: bool = False, 

1388 load_balancing_strategy=None, 

1389 ) -> "ClusterNode": 

1390 if self._moved_exception: 

1391 self._update_moved_slots() 

1392 

1393 if read_from_replicas is True and load_balancing_strategy is None: 

1394 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN 

1395 

1396 try: 

1397 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: 

1398 # get the server index using the strategy defined in load_balancing_strategy 

1399 primary_name = self.slots_cache[slot][0].name 

1400 node_idx = self.read_load_balancer.get_server_index( 

1401 primary_name, len(self.slots_cache[slot]), load_balancing_strategy 

1402 ) 

1403 return self.slots_cache[slot][node_idx] 

1404 return self.slots_cache[slot][0] 

1405 except (IndexError, TypeError): 

1406 raise SlotNotCoveredError( 

1407 f'Slot "{slot}" not covered by the cluster. ' 

1408 f'"require_full_coverage={self.require_full_coverage}"' 

1409 ) 

1410 

1411 def get_nodes_by_server_type(self, server_type: str) -> List["ClusterNode"]: 

1412 return [ 

1413 node 

1414 for node in self.nodes_cache.values() 

1415 if node.server_type == server_type 

1416 ] 

1417 

1418 async def initialize(self) -> None: 

1419 self.read_load_balancer.reset() 

1420 tmp_nodes_cache: Dict[str, "ClusterNode"] = {} 

1421 tmp_slots: Dict[int, List["ClusterNode"]] = {} 

1422 disagreements = [] 

1423 startup_nodes_reachable = False 

1424 fully_covered = False 

1425 exception = None 

1426 # Convert to tuple to prevent RuntimeError if self.startup_nodes 

1427 # is modified during iteration 

1428 for startup_node in tuple(self.startup_nodes.values()): 

1429 try: 

1430 # Make sure cluster mode is enabled on this node 

1431 try: 

1432 self._event_dispatcher.dispatch( 

1433 AfterAsyncClusterInstantiationEvent( 

1434 self.nodes_cache, 

1435 self.connection_kwargs.get("credential_provider", None), 

1436 ) 

1437 ) 

1438 cluster_slots = await startup_node.execute_command("CLUSTER SLOTS") 

1439 except ResponseError: 

1440 raise RedisClusterException( 

1441 "Cluster mode is not enabled on this node" 

1442 ) 

1443 startup_nodes_reachable = True 

1444 except Exception as e: 

1445 # Try the next startup node. 

1446 # The exception is saved and raised only if we have no more nodes. 

1447 exception = e 

1448 continue 

1449 

1450 # CLUSTER SLOTS command results in the following output: 

1451 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

1452 # where each node contains the following list: [IP, port, node_id] 

1453 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

1454 # primary node of the first slot section. 

1455 # If there's only one server in the cluster, its ``host`` is '' 

1456 # Fix it to the host in startup_nodes 

1457 if ( 

1458 len(cluster_slots) == 1 

1459 and not cluster_slots[0][2][0] 

1460 and len(self.startup_nodes) == 1 

1461 ): 

1462 cluster_slots[0][2][0] = startup_node.host 

1463 

1464 for slot in cluster_slots: 

1465 for i in range(2, len(slot)): 

1466 slot[i] = [str_if_bytes(val) for val in slot[i]] 

1467 primary_node = slot[2] 

1468 host = primary_node[0] 

1469 if host == "": 

1470 host = startup_node.host 

1471 port = int(primary_node[1]) 

1472 host, port = self.remap_host_port(host, port) 

1473 

1474 nodes_for_slot = [] 

1475 

1476 target_node = tmp_nodes_cache.get(get_node_name(host, port)) 

1477 if not target_node: 

1478 target_node = ClusterNode( 

1479 host, port, PRIMARY, **self.connection_kwargs 

1480 ) 

1481 # add this node to the nodes cache 

1482 tmp_nodes_cache[target_node.name] = target_node 

1483 nodes_for_slot.append(target_node) 

1484 

1485 replica_nodes = slot[3:] 

1486 for replica_node in replica_nodes: 

1487 host = replica_node[0] 

1488 port = replica_node[1] 

1489 host, port = self.remap_host_port(host, port) 

1490 

1491 target_replica_node = tmp_nodes_cache.get(get_node_name(host, port)) 

1492 if not target_replica_node: 

1493 target_replica_node = ClusterNode( 

1494 host, port, REPLICA, **self.connection_kwargs 

1495 ) 

1496 # add this node to the nodes cache 

1497 tmp_nodes_cache[target_replica_node.name] = target_replica_node 

1498 nodes_for_slot.append(target_replica_node) 

1499 

1500 for i in range(int(slot[0]), int(slot[1]) + 1): 

1501 if i not in tmp_slots: 

1502 tmp_slots[i] = nodes_for_slot 

1503 else: 

1504 # Validate that 2 nodes want to use the same slot cache 

1505 # setup 

1506 tmp_slot = tmp_slots[i][0] 

1507 if tmp_slot.name != target_node.name: 

1508 disagreements.append( 

1509 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

1510 ) 

1511 

1512 if len(disagreements) > 5: 

1513 raise RedisClusterException( 

1514 f"startup_nodes could not agree on a valid " 

1515 f"slots cache: {', '.join(disagreements)}" 

1516 ) 

1517 

1518 # Validate if all slots are covered or if we should try next startup node 

1519 fully_covered = True 

1520 for i in range(REDIS_CLUSTER_HASH_SLOTS): 

1521 if i not in tmp_slots: 

1522 fully_covered = False 

1523 break 

1524 if fully_covered: 

1525 break 

1526 

1527 if not startup_nodes_reachable: 

1528 raise RedisClusterException( 

1529 f"Redis Cluster cannot be connected. Please provide at least " 

1530 f"one reachable node: {str(exception)}" 

1531 ) from exception 

1532 

1533 # Check if the slots are not fully covered 

1534 if not fully_covered and self.require_full_coverage: 

1535 # Despite the requirement that the slots be covered, there 

1536 # isn't a full coverage 

1537 raise RedisClusterException( 

1538 f"All slots are not covered after query all startup_nodes. " 

1539 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

1540 f"covered..." 

1541 ) 

1542 

1543 # Set the tmp variables to the real variables 

1544 self.slots_cache = tmp_slots 

1545 self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True) 

1546 

1547 if self._dynamic_startup_nodes: 

1548 # Populate the startup nodes with all discovered nodes 

1549 self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True) 

1550 

1551 # Set the default node 

1552 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

1553 # If initialize was called after a MovedError, clear it 

1554 self._moved_exception = None 

1555 

1556 async def aclose(self, attr: str = "nodes_cache") -> None: 

1557 self.default_node = None 

1558 await asyncio.gather( 

1559 *( 

1560 asyncio.create_task(node.disconnect()) 

1561 for node in getattr(self, attr).values() 

1562 ) 

1563 ) 

1564 

1565 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

1566 """ 

1567 Remap the host and port returned from the cluster to a different 

1568 internal value. Useful if the client is not connecting directly 

1569 to the cluster. 

1570 """ 

1571 if self.address_remap: 

1572 return self.address_remap((host, port)) 

1573 return host, port 

1574 

1575 

1576class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands): 

1577 """ 

1578 Create a new ClusterPipeline object. 

1579 

1580 Usage:: 

1581 

1582 result = await ( 

1583 rc.pipeline() 

1584 .set("A", 1) 

1585 .get("A") 

1586 .hset("K", "F", "V") 

1587 .hgetall("K") 

1588 .mset_nonatomic({"A": 2, "B": 3}) 

1589 .get("A") 

1590 .get("B") 

1591 .delete("A", "B", "K") 

1592 .execute() 

1593 ) 

1594 # result = [True, "1", 1, {"F": "V"}, True, True, "2", "3", 1, 1, 1] 

1595 

1596 Note: For commands `DELETE`, `EXISTS`, `TOUCH`, `UNLINK`, `mset_nonatomic`, which 

1597 are split across multiple nodes, you'll get multiple results for them in the array. 

1598 

1599 Retryable errors: 

1600 - :class:`~.ClusterDownError` 

1601 - :class:`~.ConnectionError` 

1602 - :class:`~.TimeoutError` 

1603 

1604 Redirection errors: 

1605 - :class:`~.TryAgainError` 

1606 - :class:`~.MovedError` 

1607 - :class:`~.AskError` 

1608 

1609 :param client: 

1610 | Existing :class:`~.RedisCluster` client 

1611 """ 

1612 

1613 __slots__ = ("cluster_client", "_transaction", "_execution_strategy") 

1614 

1615 def __init__( 

1616 self, client: RedisCluster, transaction: Optional[bool] = None 

1617 ) -> None: 

1618 self.cluster_client = client 

1619 self._transaction = transaction 

1620 self._execution_strategy: ExecutionStrategy = ( 

1621 PipelineStrategy(self) 

1622 if not self._transaction 

1623 else TransactionStrategy(self) 

1624 ) 

1625 

1626 async def initialize(self) -> "ClusterPipeline": 

1627 await self._execution_strategy.initialize() 

1628 return self 

1629 

1630 async def __aenter__(self) -> "ClusterPipeline": 

1631 return await self.initialize() 

1632 

1633 async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> None: 

1634 await self.reset() 

1635 

1636 def __await__(self) -> Generator[Any, None, "ClusterPipeline"]: 

1637 return self.initialize().__await__() 

1638 

1639 def __bool__(self) -> bool: 

1640 "Pipeline instances should always evaluate to True on Python 3+" 

1641 return True 

1642 

1643 def __len__(self) -> int: 

1644 return len(self._execution_strategy) 

1645 

1646 def execute_command( 

1647 self, *args: Union[KeyT, EncodableT], **kwargs: Any 

1648 ) -> "ClusterPipeline": 

1649 """ 

1650 Append a raw command to the pipeline. 

1651 

1652 :param args: 

1653 | Raw command args 

1654 :param kwargs: 

1655 

1656 - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode` 

1657 or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`] 

1658 - Rest of the kwargs are passed to the Redis connection 

1659 """ 

1660 return self._execution_strategy.execute_command(*args, **kwargs) 

1661 

1662 async def execute( 

1663 self, raise_on_error: bool = True, allow_redirections: bool = True 

1664 ) -> List[Any]: 

1665 """ 

1666 Execute the pipeline. 

1667 

1668 It will retry the commands as specified by retries specified in :attr:`retry` 

1669 & then raise an exception. 

1670 

1671 :param raise_on_error: 

1672 | Raise the first error if there are any errors 

1673 :param allow_redirections: 

1674 | Whether to retry each failed command individually in case of redirection 

1675 errors 

1676 

1677 :raises RedisClusterException: if target_nodes is not provided & the command 

1678 can't be mapped to a slot 

1679 """ 

1680 try: 

1681 return await self._execution_strategy.execute( 

1682 raise_on_error, allow_redirections 

1683 ) 

1684 finally: 

1685 await self.reset() 

1686 

1687 def _split_command_across_slots( 

1688 self, command: str, *keys: KeyT 

1689 ) -> "ClusterPipeline": 

1690 for slot_keys in self.cluster_client._partition_keys_by_slot(keys).values(): 

1691 self.execute_command(command, *slot_keys) 

1692 

1693 return self 

1694 

1695 async def reset(self): 

1696 """ 

1697 Reset back to empty pipeline. 

1698 """ 

1699 await self._execution_strategy.reset() 

1700 

1701 def multi(self): 

1702 """ 

1703 Start a transactional block of the pipeline after WATCH commands 

1704 are issued. End the transactional block with `execute`. 

1705 """ 

1706 self._execution_strategy.multi() 

1707 

1708 async def discard(self): 

1709 """ """ 

1710 await self._execution_strategy.discard() 

1711 

1712 async def watch(self, *names): 

1713 """Watches the values at keys ``names``""" 

1714 await self._execution_strategy.watch(*names) 

1715 

1716 async def unwatch(self): 

1717 """Unwatches all previously specified keys""" 

1718 await self._execution_strategy.unwatch() 

1719 

1720 async def unlink(self, *names): 

1721 await self._execution_strategy.unlink(*names) 

1722 

1723 def mset_nonatomic( 

1724 self, mapping: Mapping[AnyKeyT, EncodableT] 

1725 ) -> "ClusterPipeline": 

1726 return self._execution_strategy.mset_nonatomic(mapping) 

1727 

1728 

1729for command in PIPELINE_BLOCKED_COMMANDS: 

1730 command = command.replace(" ", "_").lower() 

1731 if command == "mset_nonatomic": 

1732 continue 

1733 

1734 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

1735 

1736 

1737class PipelineCommand: 

1738 def __init__(self, position: int, *args: Any, **kwargs: Any) -> None: 

1739 self.args = args 

1740 self.kwargs = kwargs 

1741 self.position = position 

1742 self.result: Union[Any, Exception] = None 

1743 

1744 def __repr__(self) -> str: 

1745 return f"[{self.position}] {self.args} ({self.kwargs})" 

1746 

1747 

1748class ExecutionStrategy(ABC): 

1749 @abstractmethod 

1750 async def initialize(self) -> "ClusterPipeline": 

1751 """ 

1752 Initialize the execution strategy. 

1753 

1754 See ClusterPipeline.initialize() 

1755 """ 

1756 pass 

1757 

1758 @abstractmethod 

1759 def execute_command( 

1760 self, *args: Union[KeyT, EncodableT], **kwargs: Any 

1761 ) -> "ClusterPipeline": 

1762 """ 

1763 Append a raw command to the pipeline. 

1764 

1765 See ClusterPipeline.execute_command() 

1766 """ 

1767 pass 

1768 

1769 @abstractmethod 

1770 async def execute( 

1771 self, raise_on_error: bool = True, allow_redirections: bool = True 

1772 ) -> List[Any]: 

1773 """ 

1774 Execute the pipeline. 

1775 

1776 It will retry the commands as specified by retries specified in :attr:`retry` 

1777 & then raise an exception. 

1778 

1779 See ClusterPipeline.execute() 

1780 """ 

1781 pass 

1782 

1783 @abstractmethod 

1784 def mset_nonatomic( 

1785 self, mapping: Mapping[AnyKeyT, EncodableT] 

1786 ) -> "ClusterPipeline": 

1787 """ 

1788 Executes multiple MSET commands according to the provided slot/pairs mapping. 

1789 

1790 See ClusterPipeline.mset_nonatomic() 

1791 """ 

1792 pass 

1793 

1794 @abstractmethod 

1795 async def reset(self): 

1796 """ 

1797 Resets current execution strategy. 

1798 

1799 See: ClusterPipeline.reset() 

1800 """ 

1801 pass 

1802 

1803 @abstractmethod 

1804 def multi(self): 

1805 """ 

1806 Starts transactional context. 

1807 

1808 See: ClusterPipeline.multi() 

1809 """ 

1810 pass 

1811 

1812 @abstractmethod 

1813 async def watch(self, *names): 

1814 """ 

1815 Watch given keys. 

1816 

1817 See: ClusterPipeline.watch() 

1818 """ 

1819 pass 

1820 

1821 @abstractmethod 

1822 async def unwatch(self): 

1823 """ 

1824 Unwatches all previously specified keys 

1825 

1826 See: ClusterPipeline.unwatch() 

1827 """ 

1828 pass 

1829 

1830 @abstractmethod 

1831 async def discard(self): 

1832 pass 

1833 

1834 @abstractmethod 

1835 async def unlink(self, *names): 

1836 """ 

1837 "Unlink a key specified by ``names``" 

1838 

1839 See: ClusterPipeline.unlink() 

1840 """ 

1841 pass 

1842 

1843 @abstractmethod 

1844 def __len__(self) -> int: 

1845 pass 

1846 

1847 

1848class AbstractStrategy(ExecutionStrategy): 

1849 def __init__(self, pipe: ClusterPipeline) -> None: 

1850 self._pipe: ClusterPipeline = pipe 

1851 self._command_queue: List["PipelineCommand"] = [] 

1852 

1853 async def initialize(self) -> "ClusterPipeline": 

1854 if self._pipe.cluster_client._initialize: 

1855 await self._pipe.cluster_client.initialize() 

1856 self._command_queue = [] 

1857 return self._pipe 

1858 

1859 def execute_command( 

1860 self, *args: Union[KeyT, EncodableT], **kwargs: Any 

1861 ) -> "ClusterPipeline": 

1862 self._command_queue.append( 

1863 PipelineCommand(len(self._command_queue), *args, **kwargs) 

1864 ) 

1865 return self._pipe 

1866 

1867 def _annotate_exception(self, exception, number, command): 

1868 """ 

1869 Provides extra context to the exception prior to it being handled 

1870 """ 

1871 cmd = " ".join(map(safe_str, command)) 

1872 msg = ( 

1873 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

1874 f"caused error: {exception.args[0]}" 

1875 ) 

1876 exception.args = (msg,) + exception.args[1:] 

1877 

1878 @abstractmethod 

1879 def mset_nonatomic( 

1880 self, mapping: Mapping[AnyKeyT, EncodableT] 

1881 ) -> "ClusterPipeline": 

1882 pass 

1883 

1884 @abstractmethod 

1885 async def execute( 

1886 self, raise_on_error: bool = True, allow_redirections: bool = True 

1887 ) -> List[Any]: 

1888 pass 

1889 

1890 @abstractmethod 

1891 async def reset(self): 

1892 pass 

1893 

1894 @abstractmethod 

1895 def multi(self): 

1896 pass 

1897 

1898 @abstractmethod 

1899 async def watch(self, *names): 

1900 pass 

1901 

1902 @abstractmethod 

1903 async def unwatch(self): 

1904 pass 

1905 

1906 @abstractmethod 

1907 async def discard(self): 

1908 pass 

1909 

1910 @abstractmethod 

1911 async def unlink(self, *names): 

1912 pass 

1913 

1914 def __len__(self) -> int: 

1915 return len(self._command_queue) 

1916 

1917 

1918class PipelineStrategy(AbstractStrategy): 

1919 def __init__(self, pipe: ClusterPipeline) -> None: 

1920 super().__init__(pipe) 

1921 

1922 def mset_nonatomic( 

1923 self, mapping: Mapping[AnyKeyT, EncodableT] 

1924 ) -> "ClusterPipeline": 

1925 encoder = self._pipe.cluster_client.encoder 

1926 

1927 slots_pairs = {} 

1928 for pair in mapping.items(): 

1929 slot = key_slot(encoder.encode(pair[0])) 

1930 slots_pairs.setdefault(slot, []).extend(pair) 

1931 

1932 for pairs in slots_pairs.values(): 

1933 self.execute_command("MSET", *pairs) 

1934 

1935 return self._pipe 

1936 

1937 async def execute( 

1938 self, raise_on_error: bool = True, allow_redirections: bool = True 

1939 ) -> List[Any]: 

1940 if not self._command_queue: 

1941 return [] 

1942 

1943 try: 

1944 retry_attempts = self._pipe.cluster_client.retry.get_retries() 

1945 while True: 

1946 try: 

1947 if self._pipe.cluster_client._initialize: 

1948 await self._pipe.cluster_client.initialize() 

1949 return await self._execute( 

1950 self._pipe.cluster_client, 

1951 self._command_queue, 

1952 raise_on_error=raise_on_error, 

1953 allow_redirections=allow_redirections, 

1954 ) 

1955 

1956 except RedisCluster.ERRORS_ALLOW_RETRY as e: 

1957 if retry_attempts > 0: 

1958 # Try again with the new cluster setup. All other errors 

1959 # should be raised. 

1960 retry_attempts -= 1 

1961 await self._pipe.cluster_client.aclose() 

1962 await asyncio.sleep(0.25) 

1963 else: 

1964 # All other errors should be raised. 

1965 raise e 

1966 finally: 

1967 await self.reset() 

1968 

1969 async def _execute( 

1970 self, 

1971 client: "RedisCluster", 

1972 stack: List["PipelineCommand"], 

1973 raise_on_error: bool = True, 

1974 allow_redirections: bool = True, 

1975 ) -> List[Any]: 

1976 todo = [ 

1977 cmd for cmd in stack if not cmd.result or isinstance(cmd.result, Exception) 

1978 ] 

1979 

1980 nodes = {} 

1981 for cmd in todo: 

1982 passed_targets = cmd.kwargs.pop("target_nodes", None) 

1983 if passed_targets and not client._is_node_flag(passed_targets): 

1984 target_nodes = client._parse_target_nodes(passed_targets) 

1985 else: 

1986 target_nodes = await client._determine_nodes( 

1987 *cmd.args, node_flag=passed_targets 

1988 ) 

1989 if not target_nodes: 

1990 raise RedisClusterException( 

1991 f"No targets were found to execute {cmd.args} command on" 

1992 ) 

1993 if len(target_nodes) > 1: 

1994 raise RedisClusterException(f"Too many targets for command {cmd.args}") 

1995 node = target_nodes[0] 

1996 if node.name not in nodes: 

1997 nodes[node.name] = (node, []) 

1998 nodes[node.name][1].append(cmd) 

1999 

2000 errors = await asyncio.gather( 

2001 *( 

2002 asyncio.create_task(node[0].execute_pipeline(node[1])) 

2003 for node in nodes.values() 

2004 ) 

2005 ) 

2006 

2007 if any(errors): 

2008 if allow_redirections: 

2009 # send each errored command individually 

2010 for cmd in todo: 

2011 if isinstance(cmd.result, (TryAgainError, MovedError, AskError)): 

2012 try: 

2013 cmd.result = await client.execute_command( 

2014 *cmd.args, **cmd.kwargs 

2015 ) 

2016 except Exception as e: 

2017 cmd.result = e 

2018 

2019 if raise_on_error: 

2020 for cmd in todo: 

2021 result = cmd.result 

2022 if isinstance(result, Exception): 

2023 command = " ".join(map(safe_str, cmd.args)) 

2024 msg = ( 

2025 f"Command # {cmd.position + 1} " 

2026 f"({truncate_text(command)}) " 

2027 f"of pipeline caused error: {result.args}" 

2028 ) 

2029 result.args = (msg,) + result.args[1:] 

2030 raise result 

2031 

2032 default_cluster_node = client.get_default_node() 

2033 

2034 # Check whether the default node was used. In some cases, 

2035 # 'client.get_default_node()' may return None. The check below 

2036 # prevents a potential AttributeError. 

2037 if default_cluster_node is not None: 

2038 default_node = nodes.get(default_cluster_node.name) 

2039 if default_node is not None: 

2040 # This pipeline execution used the default node, check if we need 

2041 # to replace it. 

2042 # Note: when the error is raised we'll reset the default node in the 

2043 # caller function. 

2044 for cmd in default_node[1]: 

2045 # Check if it has a command that failed with a relevant 

2046 # exception 

2047 if type(cmd.result) in RedisCluster.ERRORS_ALLOW_RETRY: 

2048 client.replace_default_node() 

2049 break 

2050 

2051 return [cmd.result for cmd in stack] 

2052 

2053 async def reset(self): 

2054 """ 

2055 Reset back to empty pipeline. 

2056 """ 

2057 self._command_queue = [] 

2058 

2059 def multi(self): 

2060 raise RedisClusterException( 

2061 "method multi() is not supported outside of transactional context" 

2062 ) 

2063 

2064 async def watch(self, *names): 

2065 raise RedisClusterException( 

2066 "method watch() is not supported outside of transactional context" 

2067 ) 

2068 

2069 async def unwatch(self): 

2070 raise RedisClusterException( 

2071 "method unwatch() is not supported outside of transactional context" 

2072 ) 

2073 

2074 async def discard(self): 

2075 raise RedisClusterException( 

2076 "method discard() is not supported outside of transactional context" 

2077 ) 

2078 

2079 async def unlink(self, *names): 

2080 if len(names) != 1: 

2081 raise RedisClusterException( 

2082 "unlinking multiple keys is not implemented in pipeline command" 

2083 ) 

2084 

2085 return self.execute_command("UNLINK", names[0]) 

2086 

2087 

2088class TransactionStrategy(AbstractStrategy): 

2089 NO_SLOTS_COMMANDS = {"UNWATCH"} 

2090 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

2091 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

2092 SLOT_REDIRECT_ERRORS = (AskError, MovedError) 

2093 CONNECTION_ERRORS = ( 

2094 ConnectionError, 

2095 OSError, 

2096 ClusterDownError, 

2097 SlotNotCoveredError, 

2098 ) 

2099 

2100 def __init__(self, pipe: ClusterPipeline) -> None: 

2101 super().__init__(pipe) 

2102 self._explicit_transaction = False 

2103 self._watching = False 

2104 self._pipeline_slots: Set[int] = set() 

2105 self._transaction_node: Optional[ClusterNode] = None 

2106 self._transaction_connection: Optional[Connection] = None 

2107 self._executing = False 

2108 self._retry = copy(self._pipe.cluster_client.retry) 

2109 self._retry.update_supported_errors( 

2110 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS 

2111 ) 

2112 

2113 def _get_client_and_connection_for_transaction( 

2114 self, 

2115 ) -> Tuple[ClusterNode, Connection]: 

2116 """ 

2117 Find a connection for a pipeline transaction. 

2118 

2119 For running an atomic transaction, watch keys ensure that contents have not been 

2120 altered as long as the watch commands for those keys were sent over the same 

2121 connection. So once we start watching a key, we fetch a connection to the 

2122 node that owns that slot and reuse it. 

2123 """ 

2124 if not self._pipeline_slots: 

2125 raise RedisClusterException( 

2126 "At least a command with a key is needed to identify a node" 

2127 ) 

2128 

2129 node: ClusterNode = self._pipe.cluster_client.nodes_manager.get_node_from_slot( 

2130 list(self._pipeline_slots)[0], False 

2131 ) 

2132 self._transaction_node = node 

2133 

2134 if not self._transaction_connection: 

2135 connection: Connection = self._transaction_node.acquire_connection() 

2136 self._transaction_connection = connection 

2137 

2138 return self._transaction_node, self._transaction_connection 

2139 

2140 def execute_command(self, *args: Union[KeyT, EncodableT], **kwargs: Any) -> "Any": 

2141 # Given the limitation of ClusterPipeline sync API, we have to run it in thread. 

2142 response = None 

2143 error = None 

2144 

2145 def runner(): 

2146 nonlocal response 

2147 nonlocal error 

2148 try: 

2149 response = asyncio.run(self._execute_command(*args, **kwargs)) 

2150 except Exception as e: 

2151 error = e 

2152 

2153 thread = threading.Thread(target=runner) 

2154 thread.start() 

2155 thread.join() 

2156 

2157 if error: 

2158 raise error 

2159 

2160 return response 

2161 

2162 async def _execute_command( 

2163 self, *args: Union[KeyT, EncodableT], **kwargs: Any 

2164 ) -> Any: 

2165 if self._pipe.cluster_client._initialize: 

2166 await self._pipe.cluster_client.initialize() 

2167 

2168 slot_number: Optional[int] = None 

2169 if args[0] not in self.NO_SLOTS_COMMANDS: 

2170 slot_number = await self._pipe.cluster_client._determine_slot(*args) 

2171 

2172 if ( 

2173 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS 

2174 ) and not self._explicit_transaction: 

2175 if args[0] == "WATCH": 

2176 self._validate_watch() 

2177 

2178 if slot_number is not None: 

2179 if self._pipeline_slots and slot_number not in self._pipeline_slots: 

2180 raise CrossSlotTransactionError( 

2181 "Cannot watch or send commands on different slots" 

2182 ) 

2183 

2184 self._pipeline_slots.add(slot_number) 

2185 elif args[0] not in self.NO_SLOTS_COMMANDS: 

2186 raise RedisClusterException( 

2187 f"Cannot identify slot number for command: {args[0]}," 

2188 "it cannot be triggered in a transaction" 

2189 ) 

2190 

2191 return self._immediate_execute_command(*args, **kwargs) 

2192 else: 

2193 if slot_number is not None: 

2194 self._pipeline_slots.add(slot_number) 

2195 

2196 return super().execute_command(*args, **kwargs) 

2197 

2198 def _validate_watch(self): 

2199 if self._explicit_transaction: 

2200 raise RedisError("Cannot issue a WATCH after a MULTI") 

2201 

2202 self._watching = True 

2203 

2204 async def _immediate_execute_command(self, *args, **options): 

2205 return await self._retry.call_with_retry( 

2206 lambda: self._get_connection_and_send_command(*args, **options), 

2207 self._reinitialize_on_error, 

2208 ) 

2209 

2210 async def _get_connection_and_send_command(self, *args, **options): 

2211 redis_node, connection = self._get_client_and_connection_for_transaction() 

2212 return await self._send_command_parse_response( 

2213 connection, redis_node, args[0], *args, **options 

2214 ) 

2215 

2216 async def _send_command_parse_response( 

2217 self, 

2218 connection: Connection, 

2219 redis_node: ClusterNode, 

2220 command_name, 

2221 *args, 

2222 **options, 

2223 ): 

2224 """ 

2225 Send a command and parse the response 

2226 """ 

2227 

2228 await connection.send_command(*args) 

2229 output = await redis_node.parse_response(connection, command_name, **options) 

2230 

2231 if command_name in self.UNWATCH_COMMANDS: 

2232 self._watching = False 

2233 return output 

2234 

2235 async def _reinitialize_on_error(self, error): 

2236 if self._watching: 

2237 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: 

2238 raise WatchError("Slot rebalancing occurred while watching keys") 

2239 

2240 if ( 

2241 type(error) in self.SLOT_REDIRECT_ERRORS 

2242 or type(error) in self.CONNECTION_ERRORS 

2243 ): 

2244 if self._transaction_connection: 

2245 self._transaction_connection = None 

2246 

2247 self._pipe.cluster_client.reinitialize_counter += 1 

2248 if ( 

2249 self._pipe.cluster_client.reinitialize_steps 

2250 and self._pipe.cluster_client.reinitialize_counter 

2251 % self._pipe.cluster_client.reinitialize_steps 

2252 == 0 

2253 ): 

2254 await self._pipe.cluster_client.nodes_manager.initialize() 

2255 self.reinitialize_counter = 0 

2256 else: 

2257 if isinstance(error, AskError): 

2258 self._pipe.cluster_client.nodes_manager.update_moved_exception( 

2259 error 

2260 ) 

2261 

2262 self._executing = False 

2263 

2264 def _raise_first_error(self, responses, stack): 

2265 """ 

2266 Raise the first exception on the stack 

2267 """ 

2268 for r, cmd in zip(responses, stack): 

2269 if isinstance(r, Exception): 

2270 self._annotate_exception(r, cmd.position + 1, cmd.args) 

2271 raise r 

2272 

2273 def mset_nonatomic( 

2274 self, mapping: Mapping[AnyKeyT, EncodableT] 

2275 ) -> "ClusterPipeline": 

2276 raise NotImplementedError("Method is not supported in transactional context.") 

2277 

2278 async def execute( 

2279 self, raise_on_error: bool = True, allow_redirections: bool = True 

2280 ) -> List[Any]: 

2281 stack = self._command_queue 

2282 if not stack and (not self._watching or not self._pipeline_slots): 

2283 return [] 

2284 

2285 return await self._execute_transaction_with_retries(stack, raise_on_error) 

2286 

2287 async def _execute_transaction_with_retries( 

2288 self, stack: List["PipelineCommand"], raise_on_error: bool 

2289 ): 

2290 return await self._retry.call_with_retry( 

2291 lambda: self._execute_transaction(stack, raise_on_error), 

2292 self._reinitialize_on_error, 

2293 ) 

2294 

2295 async def _execute_transaction( 

2296 self, stack: List["PipelineCommand"], raise_on_error: bool 

2297 ): 

2298 if len(self._pipeline_slots) > 1: 

2299 raise CrossSlotTransactionError( 

2300 "All keys involved in a cluster transaction must map to the same slot" 

2301 ) 

2302 

2303 self._executing = True 

2304 

2305 redis_node, connection = self._get_client_and_connection_for_transaction() 

2306 

2307 stack = chain( 

2308 [PipelineCommand(0, "MULTI")], 

2309 stack, 

2310 [PipelineCommand(0, "EXEC")], 

2311 ) 

2312 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.kwargs] 

2313 packed_commands = connection.pack_commands(commands) 

2314 await connection.send_packed_command(packed_commands) 

2315 errors = [] 

2316 

2317 # parse off the response for MULTI 

2318 # NOTE: we need to handle ResponseErrors here and continue 

2319 # so that we read all the additional command messages from 

2320 # the socket 

2321 try: 

2322 await redis_node.parse_response(connection, "MULTI") 

2323 except ResponseError as e: 

2324 self._annotate_exception(e, 0, "MULTI") 

2325 errors.append(e) 

2326 except self.CONNECTION_ERRORS as cluster_error: 

2327 self._annotate_exception(cluster_error, 0, "MULTI") 

2328 raise 

2329 

2330 # and all the other commands 

2331 for i, command in enumerate(self._command_queue): 

2332 if EMPTY_RESPONSE in command.kwargs: 

2333 errors.append((i, command.kwargs[EMPTY_RESPONSE])) 

2334 else: 

2335 try: 

2336 _ = await redis_node.parse_response(connection, "_") 

2337 except self.SLOT_REDIRECT_ERRORS as slot_error: 

2338 self._annotate_exception(slot_error, i + 1, command.args) 

2339 errors.append(slot_error) 

2340 except self.CONNECTION_ERRORS as cluster_error: 

2341 self._annotate_exception(cluster_error, i + 1, command.args) 

2342 raise 

2343 except ResponseError as e: 

2344 self._annotate_exception(e, i + 1, command.args) 

2345 errors.append(e) 

2346 

2347 response = None 

2348 # parse the EXEC. 

2349 try: 

2350 response = await redis_node.parse_response(connection, "EXEC") 

2351 except ExecAbortError: 

2352 if errors: 

2353 raise errors[0] 

2354 raise 

2355 

2356 self._executing = False 

2357 

2358 # EXEC clears any watched keys 

2359 self._watching = False 

2360 

2361 if response is None: 

2362 raise WatchError("Watched variable changed.") 

2363 

2364 # put any parse errors into the response 

2365 for i, e in errors: 

2366 response.insert(i, e) 

2367 

2368 if len(response) != len(self._command_queue): 

2369 raise InvalidPipelineStack( 

2370 "Unexpected response length for cluster pipeline EXEC." 

2371 " Command stack was {} but response had length {}".format( 

2372 [c.args[0] for c in self._command_queue], len(response) 

2373 ) 

2374 ) 

2375 

2376 # find any errors in the response and raise if necessary 

2377 if raise_on_error or len(errors) > 0: 

2378 self._raise_first_error( 

2379 response, 

2380 self._command_queue, 

2381 ) 

2382 

2383 # We have to run response callbacks manually 

2384 data = [] 

2385 for r, cmd in zip(response, self._command_queue): 

2386 if not isinstance(r, Exception): 

2387 command_name = cmd.args[0] 

2388 if command_name in self._pipe.cluster_client.response_callbacks: 

2389 r = self._pipe.cluster_client.response_callbacks[command_name]( 

2390 r, **cmd.kwargs 

2391 ) 

2392 data.append(r) 

2393 return data 

2394 

2395 async def reset(self): 

2396 self._command_queue = [] 

2397 

2398 # make sure to reset the connection state in the event that we were 

2399 # watching something 

2400 if self._transaction_connection: 

2401 try: 

2402 if self._watching: 

2403 # call this manually since our unwatch or 

2404 # immediate_execute_command methods can call reset() 

2405 await self._transaction_connection.send_command("UNWATCH") 

2406 await self._transaction_connection.read_response() 

2407 # we can safely return the connection to the pool here since we're 

2408 # sure we're no longer WATCHing anything 

2409 self._transaction_node.release(self._transaction_connection) 

2410 self._transaction_connection = None 

2411 except self.CONNECTION_ERRORS: 

2412 # disconnect will also remove any previous WATCHes 

2413 if self._transaction_connection: 

2414 await self._transaction_connection.disconnect() 

2415 

2416 # clean up the other instance attributes 

2417 self._transaction_node = None 

2418 self._watching = False 

2419 self._explicit_transaction = False 

2420 self._pipeline_slots = set() 

2421 self._executing = False 

2422 

2423 def multi(self): 

2424 if self._explicit_transaction: 

2425 raise RedisError("Cannot issue nested calls to MULTI") 

2426 if self._command_queue: 

2427 raise RedisError( 

2428 "Commands without an initial WATCH have already been issued" 

2429 ) 

2430 self._explicit_transaction = True 

2431 

2432 async def watch(self, *names): 

2433 if self._explicit_transaction: 

2434 raise RedisError("Cannot issue a WATCH after a MULTI") 

2435 

2436 return await self.execute_command("WATCH", *names) 

2437 

2438 async def unwatch(self): 

2439 if self._watching: 

2440 return await self.execute_command("UNWATCH") 

2441 

2442 return True 

2443 

2444 async def discard(self): 

2445 await self.reset() 

2446 

2447 async def unlink(self, *names): 

2448 return self.execute_command("UNLINK", *names)