Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/asyncio/cluster.py: 17%

638 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 06:16 +0000

1import asyncio 

2import collections 

3import random 

4import socket 

5import ssl 

6import warnings 

7from typing import ( 

8 Any, 

9 Callable, 

10 Deque, 

11 Dict, 

12 Generator, 

13 List, 

14 Mapping, 

15 Optional, 

16 Tuple, 

17 Type, 

18 TypeVar, 

19 Union, 

20) 

21 

22from redis._cache import ( 

23 DEFAULT_BLACKLIST, 

24 DEFAULT_EVICTION_POLICY, 

25 DEFAULT_WHITELIST, 

26 AbstractCache, 

27) 

28from redis._parsers import AsyncCommandsParser, Encoder 

29from redis._parsers.helpers import ( 

30 _RedisCallbacks, 

31 _RedisCallbacksRESP2, 

32 _RedisCallbacksRESP3, 

33) 

34from redis.asyncio.client import ResponseCallbackT 

35from redis.asyncio.connection import Connection, DefaultParser, SSLConnection, parse_url 

36from redis.asyncio.lock import Lock 

37from redis.asyncio.retry import Retry 

38from redis.backoff import default_backoff 

39from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis 

40from redis.cluster import ( 

41 PIPELINE_BLOCKED_COMMANDS, 

42 PRIMARY, 

43 REPLICA, 

44 SLOT_ID, 

45 AbstractRedisCluster, 

46 LoadBalancer, 

47 block_pipeline_command, 

48 get_node_name, 

49 parse_cluster_slots, 

50) 

51from redis.commands import READ_COMMANDS, AsyncRedisClusterCommands 

52from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

53from redis.credentials import CredentialProvider 

54from redis.exceptions import ( 

55 AskError, 

56 BusyLoadingError, 

57 ClusterCrossSlotError, 

58 ClusterDownError, 

59 ClusterError, 

60 ConnectionError, 

61 DataError, 

62 MasterDownError, 

63 MaxConnectionsError, 

64 MovedError, 

65 RedisClusterException, 

66 ResponseError, 

67 SlotNotCoveredError, 

68 TimeoutError, 

69 TryAgainError, 

70) 

71from redis.typing import AnyKeyT, EncodableT, KeyT 

72from redis.utils import ( 

73 deprecated_function, 

74 dict_merge, 

75 get_lib_version, 

76 safe_str, 

77 str_if_bytes, 

78) 

79 

80TargetNodesT = TypeVar( 

81 "TargetNodesT", str, "ClusterNode", List["ClusterNode"], Dict[Any, "ClusterNode"] 

82) 

83 

84 

85class ClusterParser(DefaultParser): 

86 EXCEPTION_CLASSES = dict_merge( 

87 DefaultParser.EXCEPTION_CLASSES, 

88 { 

89 "ASK": AskError, 

90 "CLUSTERDOWN": ClusterDownError, 

91 "CROSSSLOT": ClusterCrossSlotError, 

92 "MASTERDOWN": MasterDownError, 

93 "MOVED": MovedError, 

94 "TRYAGAIN": TryAgainError, 

95 }, 

96 ) 

97 

98 

99class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands): 

100 """ 

101 Create a new RedisCluster client. 

102 

103 Pass one of parameters: 

104 

105 - `host` & `port` 

106 - `startup_nodes` 

107 

108 | Use ``await`` :meth:`initialize` to find cluster nodes & create connections. 

109 | Use ``await`` :meth:`close` to disconnect connections & close client. 

110 

111 Many commands support the target_nodes kwarg. It can be one of the 

112 :attr:`NODE_FLAGS`: 

113 

114 - :attr:`PRIMARIES` 

115 - :attr:`REPLICAS` 

116 - :attr:`ALL_NODES` 

117 - :attr:`RANDOM` 

118 - :attr:`DEFAULT_NODE` 

119 

120 Note: This client is not thread/process/fork safe. 

121 

122 :param host: 

123 | Can be used to point to a startup node 

124 :param port: 

125 | Port used if **host** is provided 

126 :param startup_nodes: 

127 | :class:`~.ClusterNode` to used as a startup node 

128 :param require_full_coverage: 

129 | When set to ``False``: the client will not require a full coverage of 

130 the slots. However, if not all slots are covered, and at least one node 

131 has ``cluster-require-full-coverage`` set to ``yes``, the server will throw 

132 a :class:`~.ClusterDownError` for some key-based commands. 

133 | When set to ``True``: all slots must be covered to construct the cluster 

134 client. If not all slots are covered, :class:`~.RedisClusterException` will be 

135 thrown. 

136 | See: 

137 https://redis.io/docs/manual/scaling/#redis-cluster-configuration-parameters 

138 :param read_from_replicas: 

139 | Enable read from replicas in READONLY mode. You can read possibly stale data. 

140 When set to true, read commands will be assigned between the primary and 

141 its replications in a Round-Robin manner. 

142 :param reinitialize_steps: 

143 | Specifies the number of MOVED errors that need to occur before reinitializing 

144 the whole cluster topology. If a MOVED error occurs and the cluster does not 

145 need to be reinitialized on this current error handling, only the MOVED slot 

146 will be patched with the redirected node. 

147 To reinitialize the cluster on every MOVED error, set reinitialize_steps to 1. 

148 To avoid reinitializing the cluster on moved errors, set reinitialize_steps to 

149 0. 

150 :param cluster_error_retry_attempts: 

151 | Number of times to retry before raising an error when :class:`~.TimeoutError` 

152 or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered 

153 :param connection_error_retry_attempts: 

154 | Number of times to retry before reinitializing when :class:`~.TimeoutError` 

155 or :class:`~.ConnectionError` are encountered. 

156 The default backoff strategy will be set if Retry object is not passed (see 

157 default_backoff in backoff.py). To change it, pass a custom Retry object 

158 using the "retry" keyword. 

159 :param max_connections: 

160 | Maximum number of connections per node. If there are no free connections & the 

161 maximum number of connections are already created, a 

162 :class:`~.MaxConnectionsError` is raised. This error may be retried as defined 

163 by :attr:`connection_error_retry_attempts` 

164 :param address_remap: 

165 | An optional callable which, when provided with an internal network 

166 address of a node, e.g. a `(host, port)` tuple, will return the address 

167 where the node is reachable. This can be used to map the addresses at 

168 which the nodes _think_ they are, to addresses at which a client may 

169 reach them, such as when they sit behind a proxy. 

170 

171 | Rest of the arguments will be passed to the 

172 :class:`~redis.asyncio.connection.Connection` instances when created 

173 

174 :raises RedisClusterException: 

175 if any arguments are invalid or unknown. Eg: 

176 

177 - `db` != 0 or None 

178 - `path` argument for unix socket connection 

179 - none of the `host`/`port` & `startup_nodes` were provided 

180 

181 """ 

182 

183 @classmethod 

184 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": 

185 """ 

186 Return a Redis client object configured from the given URL. 

187 

188 For example:: 

189 

190 redis://[[username]:[password]]@localhost:6379/0 

191 rediss://[[username]:[password]]@localhost:6379/0 

192 

193 Three URL schemes are supported: 

194 

195 - `redis://` creates a TCP socket connection. See more at: 

196 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

197 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

198 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

199 

200 The username, password, hostname, path and all querystring values are passed 

201 through ``urllib.parse.unquote`` in order to replace any percent-encoded values 

202 with their corresponding characters. 

203 

204 All querystring options are cast to their appropriate Python types. Boolean 

205 arguments can be specified with string values "True"/"False" or "Yes"/"No". 

206 Values that cannot be properly cast cause a ``ValueError`` to be raised. Once 

207 parsed, the querystring arguments and keyword arguments are passed to 

208 :class:`~redis.asyncio.connection.Connection` when created. 

209 In the case of conflicting arguments, querystring arguments are used. 

210 """ 

211 kwargs.update(parse_url(url)) 

212 if kwargs.pop("connection_class", None) is SSLConnection: 

213 kwargs["ssl"] = True 

214 return cls(**kwargs) 

215 

216 __slots__ = ( 

217 "_initialize", 

218 "_lock", 

219 "cluster_error_retry_attempts", 

220 "command_flags", 

221 "commands_parser", 

222 "connection_error_retry_attempts", 

223 "connection_kwargs", 

224 "encoder", 

225 "node_flags", 

226 "nodes_manager", 

227 "read_from_replicas", 

228 "reinitialize_counter", 

229 "reinitialize_steps", 

230 "response_callbacks", 

231 "result_callbacks", 

232 ) 

233 

234 def __init__( 

235 self, 

236 host: Optional[str] = None, 

237 port: Union[str, int] = 6379, 

238 # Cluster related kwargs 

239 startup_nodes: Optional[List["ClusterNode"]] = None, 

240 require_full_coverage: bool = True, 

241 read_from_replicas: bool = False, 

242 reinitialize_steps: int = 5, 

243 cluster_error_retry_attempts: int = 3, 

244 connection_error_retry_attempts: int = 3, 

245 max_connections: int = 2**31, 

246 # Client related kwargs 

247 db: Union[str, int] = 0, 

248 path: Optional[str] = None, 

249 credential_provider: Optional[CredentialProvider] = None, 

250 username: Optional[str] = None, 

251 password: Optional[str] = None, 

252 client_name: Optional[str] = None, 

253 lib_name: Optional[str] = "redis-py", 

254 lib_version: Optional[str] = get_lib_version(), 

255 # Encoding related kwargs 

256 encoding: str = "utf-8", 

257 encoding_errors: str = "strict", 

258 decode_responses: bool = False, 

259 # Connection related kwargs 

260 health_check_interval: float = 0, 

261 socket_connect_timeout: Optional[float] = None, 

262 socket_keepalive: bool = False, 

263 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

264 socket_timeout: Optional[float] = None, 

265 retry: Optional["Retry"] = None, 

266 retry_on_error: Optional[List[Type[Exception]]] = None, 

267 # SSL related kwargs 

268 ssl: bool = False, 

269 ssl_ca_certs: Optional[str] = None, 

270 ssl_ca_data: Optional[str] = None, 

271 ssl_cert_reqs: str = "required", 

272 ssl_certfile: Optional[str] = None, 

273 ssl_check_hostname: bool = False, 

274 ssl_keyfile: Optional[str] = None, 

275 ssl_min_version: Optional[ssl.TLSVersion] = None, 

276 protocol: Optional[int] = 2, 

277 address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None, 

278 cache_enabled: bool = False, 

279 client_cache: Optional[AbstractCache] = None, 

280 cache_max_size: int = 100, 

281 cache_ttl: int = 0, 

282 cache_policy: str = DEFAULT_EVICTION_POLICY, 

283 cache_blacklist: List[str] = DEFAULT_BLACKLIST, 

284 cache_whitelist: List[str] = DEFAULT_WHITELIST, 

285 ) -> None: 

286 if db: 

287 raise RedisClusterException( 

288 "Argument 'db' must be 0 or None in cluster mode" 

289 ) 

290 

291 if path: 

292 raise RedisClusterException( 

293 "Unix domain socket is not supported in cluster mode" 

294 ) 

295 

296 if (not host or not port) and not startup_nodes: 

297 raise RedisClusterException( 

298 "RedisCluster requires at least one node to discover the cluster.\n" 

299 "Please provide one of the following or use RedisCluster.from_url:\n" 

300 ' - host and port: RedisCluster(host="localhost", port=6379)\n' 

301 " - startup_nodes: RedisCluster(startup_nodes=[" 

302 'ClusterNode("localhost", 6379), ClusterNode("localhost", 6380)])' 

303 ) 

304 

305 kwargs: Dict[str, Any] = { 

306 "max_connections": max_connections, 

307 "connection_class": Connection, 

308 "parser_class": ClusterParser, 

309 # Client related kwargs 

310 "credential_provider": credential_provider, 

311 "username": username, 

312 "password": password, 

313 "client_name": client_name, 

314 "lib_name": lib_name, 

315 "lib_version": lib_version, 

316 # Encoding related kwargs 

317 "encoding": encoding, 

318 "encoding_errors": encoding_errors, 

319 "decode_responses": decode_responses, 

320 # Connection related kwargs 

321 "health_check_interval": health_check_interval, 

322 "socket_connect_timeout": socket_connect_timeout, 

323 "socket_keepalive": socket_keepalive, 

324 "socket_keepalive_options": socket_keepalive_options, 

325 "socket_timeout": socket_timeout, 

326 "retry": retry, 

327 "protocol": protocol, 

328 # Client cache related kwargs 

329 "cache_enabled": cache_enabled, 

330 "client_cache": client_cache, 

331 "cache_max_size": cache_max_size, 

332 "cache_ttl": cache_ttl, 

333 "cache_policy": cache_policy, 

334 "cache_blacklist": cache_blacklist, 

335 "cache_whitelist": cache_whitelist, 

336 } 

337 

338 if ssl: 

339 # SSL related kwargs 

340 kwargs.update( 

341 { 

342 "connection_class": SSLConnection, 

343 "ssl_ca_certs": ssl_ca_certs, 

344 "ssl_ca_data": ssl_ca_data, 

345 "ssl_cert_reqs": ssl_cert_reqs, 

346 "ssl_certfile": ssl_certfile, 

347 "ssl_check_hostname": ssl_check_hostname, 

348 "ssl_keyfile": ssl_keyfile, 

349 "ssl_min_version": ssl_min_version, 

350 } 

351 ) 

352 

353 if read_from_replicas: 

354 # Call our on_connect function to configure READONLY mode 

355 kwargs["redis_connect_func"] = self.on_connect 

356 

357 self.retry = retry 

358 if retry or retry_on_error or connection_error_retry_attempts > 0: 

359 # Set a retry object for all cluster nodes 

360 self.retry = retry or Retry( 

361 default_backoff(), connection_error_retry_attempts 

362 ) 

363 if not retry_on_error: 

364 # Default errors for retrying 

365 retry_on_error = [ConnectionError, TimeoutError] 

366 self.retry.update_supported_errors(retry_on_error) 

367 kwargs.update({"retry": self.retry}) 

368 

369 kwargs["response_callbacks"] = _RedisCallbacks.copy() 

370 if kwargs.get("protocol") in ["3", 3]: 

371 kwargs["response_callbacks"].update(_RedisCallbacksRESP3) 

372 else: 

373 kwargs["response_callbacks"].update(_RedisCallbacksRESP2) 

374 self.connection_kwargs = kwargs 

375 

376 if startup_nodes: 

377 passed_nodes = [] 

378 for node in startup_nodes: 

379 passed_nodes.append( 

380 ClusterNode(node.host, node.port, **self.connection_kwargs) 

381 ) 

382 startup_nodes = passed_nodes 

383 else: 

384 startup_nodes = [] 

385 if host and port: 

386 startup_nodes.append(ClusterNode(host, port, **self.connection_kwargs)) 

387 

388 self.nodes_manager = NodesManager( 

389 startup_nodes, 

390 require_full_coverage, 

391 kwargs, 

392 address_remap=address_remap, 

393 ) 

394 self.encoder = Encoder(encoding, encoding_errors, decode_responses) 

395 self.read_from_replicas = read_from_replicas 

396 self.reinitialize_steps = reinitialize_steps 

397 self.cluster_error_retry_attempts = cluster_error_retry_attempts 

398 self.connection_error_retry_attempts = connection_error_retry_attempts 

399 self.reinitialize_counter = 0 

400 self.commands_parser = AsyncCommandsParser() 

401 self.node_flags = self.__class__.NODE_FLAGS.copy() 

402 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

403 self.response_callbacks = kwargs["response_callbacks"] 

404 self.result_callbacks = self.__class__.RESULT_CALLBACKS.copy() 

405 self.result_callbacks["CLUSTER SLOTS"] = ( 

406 lambda cmd, res, **kwargs: parse_cluster_slots( 

407 list(res.values())[0], **kwargs 

408 ) 

409 ) 

410 

411 self._initialize = True 

412 self._lock: Optional[asyncio.Lock] = None 

413 

414 async def initialize(self) -> "RedisCluster": 

415 """Get all nodes from startup nodes & creates connections if not initialized.""" 

416 if self._initialize: 

417 if not self._lock: 

418 self._lock = asyncio.Lock() 

419 async with self._lock: 

420 if self._initialize: 

421 try: 

422 await self.nodes_manager.initialize() 

423 await self.commands_parser.initialize( 

424 self.nodes_manager.default_node 

425 ) 

426 self._initialize = False 

427 except BaseException: 

428 await self.nodes_manager.aclose() 

429 await self.nodes_manager.aclose("startup_nodes") 

430 raise 

431 return self 

432 

433 async def aclose(self) -> None: 

434 """Close all connections & client if initialized.""" 

435 if not self._initialize: 

436 if not self._lock: 

437 self._lock = asyncio.Lock() 

438 async with self._lock: 

439 if not self._initialize: 

440 self._initialize = True 

441 await self.nodes_manager.aclose() 

442 await self.nodes_manager.aclose("startup_nodes") 

443 

444 @deprecated_function(version="5.0.0", reason="Use aclose() instead", name="close") 

445 async def close(self) -> None: 

446 """alias for aclose() for backwards compatibility""" 

447 await self.aclose() 

448 

449 async def __aenter__(self) -> "RedisCluster": 

450 return await self.initialize() 

451 

452 async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> None: 

453 await self.aclose() 

454 

455 def __await__(self) -> Generator[Any, None, "RedisCluster"]: 

456 return self.initialize().__await__() 

457 

458 _DEL_MESSAGE = "Unclosed RedisCluster client" 

459 

460 def __del__( 

461 self, 

462 _warn: Any = warnings.warn, 

463 _grl: Any = asyncio.get_running_loop, 

464 ) -> None: 

465 if hasattr(self, "_initialize") and not self._initialize: 

466 _warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self) 

467 try: 

468 context = {"client": self, "message": self._DEL_MESSAGE} 

469 _grl().call_exception_handler(context) 

470 except RuntimeError: 

471 pass 

472 

473 async def on_connect(self, connection: Connection) -> None: 

474 await connection.on_connect() 

475 

476 # Sending READONLY command to server to configure connection as 

477 # readonly. Since each cluster node may change its server type due 

478 # to a failover, we should establish a READONLY connection 

479 # regardless of the server type. If this is a primary connection, 

480 # READONLY would not affect executing write commands. 

481 await connection.send_command("READONLY") 

482 if str_if_bytes(await connection.read_response()) != "OK": 

483 raise ConnectionError("READONLY command failed") 

484 

485 def get_nodes(self) -> List["ClusterNode"]: 

486 """Get all nodes of the cluster.""" 

487 return list(self.nodes_manager.nodes_cache.values()) 

488 

489 def get_primaries(self) -> List["ClusterNode"]: 

490 """Get the primary nodes of the cluster.""" 

491 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

492 

493 def get_replicas(self) -> List["ClusterNode"]: 

494 """Get the replica nodes of the cluster.""" 

495 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

496 

497 def get_random_node(self) -> "ClusterNode": 

498 """Get a random node of the cluster.""" 

499 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

500 

501 def get_default_node(self) -> "ClusterNode": 

502 """Get the default node of the client.""" 

503 return self.nodes_manager.default_node 

504 

505 def set_default_node(self, node: "ClusterNode") -> None: 

506 """ 

507 Set the default node of the client. 

508 

509 :raises DataError: if None is passed or node does not exist in cluster. 

510 """ 

511 if not node or not self.get_node(node_name=node.name): 

512 raise DataError("The requested node does not exist in the cluster.") 

513 

514 self.nodes_manager.default_node = node 

515 

516 def get_node( 

517 self, 

518 host: Optional[str] = None, 

519 port: Optional[int] = None, 

520 node_name: Optional[str] = None, 

521 ) -> Optional["ClusterNode"]: 

522 """Get node by (host, port) or node_name.""" 

523 return self.nodes_manager.get_node(host, port, node_name) 

524 

525 def get_node_from_key( 

526 self, key: str, replica: bool = False 

527 ) -> Optional["ClusterNode"]: 

528 """ 

529 Get the cluster node corresponding to the provided key. 

530 

531 :param key: 

532 :param replica: 

533 | Indicates if a replica should be returned 

534 | 

535 None will returned if no replica holds this key 

536 

537 :raises SlotNotCoveredError: if the key is not covered by any slot. 

538 """ 

539 slot = self.keyslot(key) 

540 slot_cache = self.nodes_manager.slots_cache.get(slot) 

541 if not slot_cache: 

542 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

543 

544 if replica: 

545 if len(self.nodes_manager.slots_cache[slot]) < 2: 

546 return None 

547 node_idx = 1 

548 else: 

549 node_idx = 0 

550 

551 return slot_cache[node_idx] 

552 

553 def keyslot(self, key: EncodableT) -> int: 

554 """ 

555 Find the keyslot for a given key. 

556 

557 See: https://redis.io/docs/manual/scaling/#redis-cluster-data-sharding 

558 """ 

559 return key_slot(self.encoder.encode(key)) 

560 

561 def get_encoder(self) -> Encoder: 

562 """Get the encoder object of the client.""" 

563 return self.encoder 

564 

565 def get_connection_kwargs(self) -> Dict[str, Optional[Any]]: 

566 """Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`.""" 

567 return self.connection_kwargs 

568 

569 def get_retry(self) -> Optional["Retry"]: 

570 return self.retry 

571 

572 def set_retry(self, retry: "Retry") -> None: 

573 self.retry = retry 

574 for node in self.get_nodes(): 

575 node.connection_kwargs.update({"retry": retry}) 

576 for conn in node._connections: 

577 conn.retry = retry 

578 

579 def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None: 

580 """Set a custom response callback.""" 

581 self.response_callbacks[command] = callback 

582 

583 async def _determine_nodes( 

584 self, command: str, *args: Any, node_flag: Optional[str] = None 

585 ) -> List["ClusterNode"]: 

586 # Determine which nodes should be executed the command on. 

587 # Returns a list of target nodes. 

588 if not node_flag: 

589 # get the nodes group for this command if it was predefined 

590 node_flag = self.command_flags.get(command) 

591 

592 if node_flag in self.node_flags: 

593 if node_flag == self.__class__.DEFAULT_NODE: 

594 # return the cluster's default node 

595 return [self.nodes_manager.default_node] 

596 if node_flag == self.__class__.PRIMARIES: 

597 # return all primaries 

598 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

599 if node_flag == self.__class__.REPLICAS: 

600 # return all replicas 

601 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

602 if node_flag == self.__class__.ALL_NODES: 

603 # return all nodes 

604 return list(self.nodes_manager.nodes_cache.values()) 

605 if node_flag == self.__class__.RANDOM: 

606 # return a random node 

607 return [random.choice(list(self.nodes_manager.nodes_cache.values()))] 

608 

609 # get the node that holds the key's slot 

610 return [ 

611 self.nodes_manager.get_node_from_slot( 

612 await self._determine_slot(command, *args), 

613 self.read_from_replicas and command in READ_COMMANDS, 

614 ) 

615 ] 

616 

617 async def _determine_slot(self, command: str, *args: Any) -> int: 

618 if self.command_flags.get(command) == SLOT_ID: 

619 # The command contains the slot ID 

620 return int(args[0]) 

621 

622 # Get the keys in the command 

623 

624 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

625 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

626 # where `self._get_command_keys()` fails anyway. So, we special case 

627 # EVAL/EVALSHA. 

628 # - issue: https://github.com/redis/redis/issues/9493 

629 # - fix: https://github.com/redis/redis/pull/9733 

630 if command.upper() in ("EVAL", "EVALSHA"): 

631 # command syntax: EVAL "script body" num_keys ... 

632 if len(args) < 2: 

633 raise RedisClusterException( 

634 f"Invalid args in command: {command, *args}" 

635 ) 

636 keys = args[2 : 2 + int(args[1])] 

637 # if there are 0 keys, that means the script can be run on any node 

638 # so we can just return a random slot 

639 if not keys: 

640 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

641 else: 

642 keys = await self.commands_parser.get_keys(command, *args) 

643 if not keys: 

644 # FCALL can call a function with 0 keys, that means the function 

645 # can be run on any node so we can just return a random slot 

646 if command.upper() in ("FCALL", "FCALL_RO"): 

647 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

648 raise RedisClusterException( 

649 "No way to dispatch this command to Redis Cluster. " 

650 "Missing key.\nYou can execute the command by specifying " 

651 f"target nodes.\nCommand: {args}" 

652 ) 

653 

654 # single key command 

655 if len(keys) == 1: 

656 return self.keyslot(keys[0]) 

657 

658 # multi-key command; we need to make sure all keys are mapped to 

659 # the same slot 

660 slots = {self.keyslot(key) for key in keys} 

661 if len(slots) != 1: 

662 raise RedisClusterException( 

663 f"{command} - all keys must map to the same key slot" 

664 ) 

665 

666 return slots.pop() 

667 

668 def _is_node_flag(self, target_nodes: Any) -> bool: 

669 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

670 

671 def _parse_target_nodes(self, target_nodes: Any) -> List["ClusterNode"]: 

672 if isinstance(target_nodes, list): 

673 nodes = target_nodes 

674 elif isinstance(target_nodes, ClusterNode): 

675 # Supports passing a single ClusterNode as a variable 

676 nodes = [target_nodes] 

677 elif isinstance(target_nodes, dict): 

678 # Supports dictionaries of the format {node_name: node}. 

679 # It enables to execute commands with multi nodes as follows: 

680 # rc.cluster_save_config(rc.get_primaries()) 

681 nodes = list(target_nodes.values()) 

682 else: 

683 raise TypeError( 

684 "target_nodes type can be one of the following: " 

685 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

686 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

687 f"The passed type is {type(target_nodes)}" 

688 ) 

689 return nodes 

690 

691 async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: 

692 """ 

693 Execute a raw command on the appropriate cluster node or target_nodes. 

694 

695 It will retry the command as specified by :attr:`cluster_error_retry_attempts` & 

696 then raise an exception. 

697 

698 :param args: 

699 | Raw command args 

700 :param kwargs: 

701 

702 - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode` 

703 or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`] 

704 - Rest of the kwargs are passed to the Redis connection 

705 

706 :raises RedisClusterException: if target_nodes is not provided & the command 

707 can't be mapped to a slot 

708 """ 

709 command = args[0] 

710 target_nodes = [] 

711 target_nodes_specified = False 

712 retry_attempts = self.cluster_error_retry_attempts 

713 

714 passed_targets = kwargs.pop("target_nodes", None) 

715 if passed_targets and not self._is_node_flag(passed_targets): 

716 target_nodes = self._parse_target_nodes(passed_targets) 

717 target_nodes_specified = True 

718 retry_attempts = 0 

719 

720 # Add one for the first execution 

721 execute_attempts = 1 + retry_attempts 

722 for _ in range(execute_attempts): 

723 if self._initialize: 

724 await self.initialize() 

725 if ( 

726 len(target_nodes) == 1 

727 and target_nodes[0] == self.get_default_node() 

728 ): 

729 # Replace the default cluster node 

730 self.replace_default_node() 

731 try: 

732 if not target_nodes_specified: 

733 # Determine the nodes to execute the command on 

734 target_nodes = await self._determine_nodes( 

735 *args, node_flag=passed_targets 

736 ) 

737 if not target_nodes: 

738 raise RedisClusterException( 

739 f"No targets were found to execute {args} command on" 

740 ) 

741 

742 if len(target_nodes) == 1: 

743 # Return the processed result 

744 ret = await self._execute_command(target_nodes[0], *args, **kwargs) 

745 if command in self.result_callbacks: 

746 return self.result_callbacks[command]( 

747 command, {target_nodes[0].name: ret}, **kwargs 

748 ) 

749 return ret 

750 else: 

751 keys = [node.name for node in target_nodes] 

752 values = await asyncio.gather( 

753 *( 

754 asyncio.create_task( 

755 self._execute_command(node, *args, **kwargs) 

756 ) 

757 for node in target_nodes 

758 ) 

759 ) 

760 if command in self.result_callbacks: 

761 return self.result_callbacks[command]( 

762 command, dict(zip(keys, values)), **kwargs 

763 ) 

764 return dict(zip(keys, values)) 

765 except Exception as e: 

766 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

767 # The nodes and slots cache were should be reinitialized. 

768 # Try again with the new cluster setup. 

769 retry_attempts -= 1 

770 continue 

771 else: 

772 # raise the exception 

773 raise e 

774 

775 async def _execute_command( 

776 self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any 

777 ) -> Any: 

778 asking = moved = False 

779 redirect_addr = None 

780 ttl = self.RedisClusterRequestTTL 

781 

782 while ttl > 0: 

783 ttl -= 1 

784 try: 

785 if asking: 

786 target_node = self.get_node(node_name=redirect_addr) 

787 await target_node.execute_command("ASKING") 

788 asking = False 

789 elif moved: 

790 # MOVED occurred and the slots cache was updated, 

791 # refresh the target node 

792 slot = await self._determine_slot(*args) 

793 target_node = self.nodes_manager.get_node_from_slot( 

794 slot, self.read_from_replicas and args[0] in READ_COMMANDS 

795 ) 

796 moved = False 

797 

798 return await target_node.execute_command(*args, **kwargs) 

799 except (BusyLoadingError, MaxConnectionsError): 

800 raise 

801 except (ConnectionError, TimeoutError): 

802 # Connection retries are being handled in the node's 

803 # Retry object. 

804 # Remove the failed node from the startup nodes before we try 

805 # to reinitialize the cluster 

806 self.nodes_manager.startup_nodes.pop(target_node.name, None) 

807 # Hard force of reinitialize of the node/slots setup 

808 # and try again with the new setup 

809 await self.aclose() 

810 raise 

811 except ClusterDownError: 

812 # ClusterDownError can occur during a failover and to get 

813 # self-healed, we will try to reinitialize the cluster layout 

814 # and retry executing the command 

815 await self.aclose() 

816 await asyncio.sleep(0.25) 

817 raise 

818 except MovedError as e: 

819 # First, we will try to patch the slots/nodes cache with the 

820 # redirected node output and try again. If MovedError exceeds 

821 # 'reinitialize_steps' number of times, we will force 

822 # reinitializing the tables, and then try again. 

823 # 'reinitialize_steps' counter will increase faster when 

824 # the same client object is shared between multiple threads. To 

825 # reduce the frequency you can set this variable in the 

826 # RedisCluster constructor. 

827 self.reinitialize_counter += 1 

828 if ( 

829 self.reinitialize_steps 

830 and self.reinitialize_counter % self.reinitialize_steps == 0 

831 ): 

832 await self.aclose() 

833 # Reset the counter 

834 self.reinitialize_counter = 0 

835 else: 

836 self.nodes_manager._moved_exception = e 

837 moved = True 

838 except AskError as e: 

839 redirect_addr = get_node_name(host=e.host, port=e.port) 

840 asking = True 

841 except TryAgainError: 

842 if ttl < self.RedisClusterRequestTTL / 2: 

843 await asyncio.sleep(0.05) 

844 

845 raise ClusterError("TTL exhausted.") 

846 

847 def pipeline( 

848 self, transaction: Optional[Any] = None, shard_hint: Optional[Any] = None 

849 ) -> "ClusterPipeline": 

850 """ 

851 Create & return a new :class:`~.ClusterPipeline` object. 

852 

853 Cluster implementation of pipeline does not support transaction or shard_hint. 

854 

855 :raises RedisClusterException: if transaction or shard_hint are truthy values 

856 """ 

857 if shard_hint: 

858 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

859 

860 if transaction: 

861 raise RedisClusterException("transaction is deprecated in cluster mode") 

862 

863 return ClusterPipeline(self) 

864 

865 def lock( 

866 self, 

867 name: KeyT, 

868 timeout: Optional[float] = None, 

869 sleep: float = 0.1, 

870 blocking: bool = True, 

871 blocking_timeout: Optional[float] = None, 

872 lock_class: Optional[Type[Lock]] = None, 

873 thread_local: bool = True, 

874 ) -> Lock: 

875 """ 

876 Return a new Lock object using key ``name`` that mimics 

877 the behavior of threading.Lock. 

878 

879 If specified, ``timeout`` indicates a maximum life for the lock. 

880 By default, it will remain locked until release() is called. 

881 

882 ``sleep`` indicates the amount of time to sleep per loop iteration 

883 when the lock is in blocking mode and another client is currently 

884 holding the lock. 

885 

886 ``blocking`` indicates whether calling ``acquire`` should block until 

887 the lock has been acquired or to fail immediately, causing ``acquire`` 

888 to return False and the lock not being acquired. Defaults to True. 

889 Note this value can be overridden by passing a ``blocking`` 

890 argument to ``acquire``. 

891 

892 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

893 spend trying to acquire the lock. A value of ``None`` indicates 

894 continue trying forever. ``blocking_timeout`` can be specified as a 

895 float or integer, both representing the number of seconds to wait. 

896 

897 ``lock_class`` forces the specified lock implementation. Note that as 

898 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

899 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

900 you have created your own custom lock class. 

901 

902 ``thread_local`` indicates whether the lock token is placed in 

903 thread-local storage. By default, the token is placed in thread local 

904 storage so that a thread only sees its token, not a token set by 

905 another thread. Consider the following timeline: 

906 

907 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

908 thread-1 sets the token to "abc" 

909 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

910 Lock instance. 

911 time: 5, thread-1 has not yet completed. redis expires the lock 

912 key. 

913 time: 5, thread-2 acquired `my-lock` now that it's available. 

914 thread-2 sets the token to "xyz" 

915 time: 6, thread-1 finishes its work and calls release(). if the 

916 token is *not* stored in thread local storage, then 

917 thread-1 would see the token value as "xyz" and would be 

918 able to successfully release the thread-2's lock. 

919 

920 In some use cases it's necessary to disable thread local storage. For 

921 example, if you have code where one thread acquires a lock and passes 

922 that lock instance to a worker thread to release later. If thread 

923 local storage isn't disabled in this case, the worker thread won't see 

924 the token set by the thread that acquired the lock. Our assumption 

925 is that these cases aren't common and as such default to using 

926 thread local storage.""" 

927 if lock_class is None: 

928 lock_class = Lock 

929 return lock_class( 

930 self, 

931 name, 

932 timeout=timeout, 

933 sleep=sleep, 

934 blocking=blocking, 

935 blocking_timeout=blocking_timeout, 

936 thread_local=thread_local, 

937 ) 

938 

939 

940class ClusterNode: 

941 """ 

942 Create a new ClusterNode. 

943 

944 Each ClusterNode manages multiple :class:`~redis.asyncio.connection.Connection` 

945 objects for the (host, port). 

946 """ 

947 

948 __slots__ = ( 

949 "_connections", 

950 "_free", 

951 "connection_class", 

952 "connection_kwargs", 

953 "host", 

954 "max_connections", 

955 "name", 

956 "port", 

957 "response_callbacks", 

958 "server_type", 

959 ) 

960 

961 def __init__( 

962 self, 

963 host: str, 

964 port: Union[str, int], 

965 server_type: Optional[str] = None, 

966 *, 

967 max_connections: int = 2**31, 

968 connection_class: Type[Connection] = Connection, 

969 **connection_kwargs: Any, 

970 ) -> None: 

971 if host == "localhost": 

972 host = socket.gethostbyname(host) 

973 

974 connection_kwargs["host"] = host 

975 connection_kwargs["port"] = port 

976 self.host = host 

977 self.port = port 

978 self.name = get_node_name(host, port) 

979 self.server_type = server_type 

980 

981 self.max_connections = max_connections 

982 self.connection_class = connection_class 

983 self.connection_kwargs = connection_kwargs 

984 self.response_callbacks = connection_kwargs.pop("response_callbacks", {}) 

985 

986 self._connections: List[Connection] = [] 

987 self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections) 

988 

989 def __repr__(self) -> str: 

990 return ( 

991 f"[host={self.host}, port={self.port}, " 

992 f"name={self.name}, server_type={self.server_type}]" 

993 ) 

994 

995 def __eq__(self, obj: Any) -> bool: 

996 return isinstance(obj, ClusterNode) and obj.name == self.name 

997 

998 _DEL_MESSAGE = "Unclosed ClusterNode object" 

999 

1000 def __del__( 

1001 self, 

1002 _warn: Any = warnings.warn, 

1003 _grl: Any = asyncio.get_running_loop, 

1004 ) -> None: 

1005 for connection in self._connections: 

1006 if connection.is_connected: 

1007 _warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self) 

1008 

1009 try: 

1010 context = {"client": self, "message": self._DEL_MESSAGE} 

1011 _grl().call_exception_handler(context) 

1012 except RuntimeError: 

1013 pass 

1014 break 

1015 

1016 async def disconnect(self) -> None: 

1017 ret = await asyncio.gather( 

1018 *( 

1019 asyncio.create_task(connection.disconnect()) 

1020 for connection in self._connections 

1021 ), 

1022 return_exceptions=True, 

1023 ) 

1024 exc = next((res for res in ret if isinstance(res, Exception)), None) 

1025 if exc: 

1026 raise exc 

1027 

1028 def acquire_connection(self) -> Connection: 

1029 try: 

1030 return self._free.popleft() 

1031 except IndexError: 

1032 if len(self._connections) < self.max_connections: 

1033 connection = self.connection_class(**self.connection_kwargs) 

1034 self._connections.append(connection) 

1035 return connection 

1036 

1037 raise MaxConnectionsError() 

1038 

1039 async def parse_response( 

1040 self, connection: Connection, command: str, **kwargs: Any 

1041 ) -> Any: 

1042 try: 

1043 if NEVER_DECODE in kwargs: 

1044 response = await connection.read_response(disable_decoding=True) 

1045 kwargs.pop(NEVER_DECODE) 

1046 else: 

1047 response = await connection.read_response() 

1048 except ResponseError: 

1049 if EMPTY_RESPONSE in kwargs: 

1050 return kwargs[EMPTY_RESPONSE] 

1051 raise 

1052 

1053 if EMPTY_RESPONSE in kwargs: 

1054 kwargs.pop(EMPTY_RESPONSE) 

1055 

1056 # Return response 

1057 if command in self.response_callbacks: 

1058 return self.response_callbacks[command](response, **kwargs) 

1059 

1060 return response 

1061 

1062 async def execute_command(self, *args: Any, **kwargs: Any) -> Any: 

1063 # Acquire connection 

1064 connection = self.acquire_connection() 

1065 keys = kwargs.pop("keys", None) 

1066 

1067 response_from_cache = await connection._get_from_local_cache(args) 

1068 if response_from_cache is not None: 

1069 self._free.append(connection) 

1070 return response_from_cache 

1071 else: 

1072 # Execute command 

1073 await connection.send_packed_command(connection.pack_command(*args), False) 

1074 

1075 # Read response 

1076 try: 

1077 response = await self.parse_response(connection, args[0], **kwargs) 

1078 connection._add_to_local_cache(args, response, keys) 

1079 return response 

1080 finally: 

1081 # Release connection 

1082 self._free.append(connection) 

1083 

1084 async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: 

1085 # Acquire connection 

1086 connection = self.acquire_connection() 

1087 

1088 # Execute command 

1089 await connection.send_packed_command( 

1090 connection.pack_commands(cmd.args for cmd in commands), False 

1091 ) 

1092 

1093 # Read responses 

1094 ret = False 

1095 for cmd in commands: 

1096 try: 

1097 cmd.result = await self.parse_response( 

1098 connection, cmd.args[0], **cmd.kwargs 

1099 ) 

1100 except Exception as e: 

1101 cmd.result = e 

1102 ret = True 

1103 

1104 # Release connection 

1105 self._free.append(connection) 

1106 

1107 return ret 

1108 

1109 

1110class NodesManager: 

1111 __slots__ = ( 

1112 "_moved_exception", 

1113 "connection_kwargs", 

1114 "default_node", 

1115 "nodes_cache", 

1116 "read_load_balancer", 

1117 "require_full_coverage", 

1118 "slots_cache", 

1119 "startup_nodes", 

1120 "address_remap", 

1121 ) 

1122 

1123 def __init__( 

1124 self, 

1125 startup_nodes: List["ClusterNode"], 

1126 require_full_coverage: bool, 

1127 connection_kwargs: Dict[str, Any], 

1128 address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None, 

1129 ) -> None: 

1130 self.startup_nodes = {node.name: node for node in startup_nodes} 

1131 self.require_full_coverage = require_full_coverage 

1132 self.connection_kwargs = connection_kwargs 

1133 self.address_remap = address_remap 

1134 

1135 self.default_node: "ClusterNode" = None 

1136 self.nodes_cache: Dict[str, "ClusterNode"] = {} 

1137 self.slots_cache: Dict[int, List["ClusterNode"]] = {} 

1138 self.read_load_balancer = LoadBalancer() 

1139 self._moved_exception: MovedError = None 

1140 

1141 def get_node( 

1142 self, 

1143 host: Optional[str] = None, 

1144 port: Optional[int] = None, 

1145 node_name: Optional[str] = None, 

1146 ) -> Optional["ClusterNode"]: 

1147 if host and port: 

1148 # the user passed host and port 

1149 if host == "localhost": 

1150 host = socket.gethostbyname(host) 

1151 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

1152 elif node_name: 

1153 return self.nodes_cache.get(node_name) 

1154 else: 

1155 raise DataError( 

1156 "get_node requires one of the following: " 

1157 "1. node name " 

1158 "2. host and port" 

1159 ) 

1160 

1161 def set_nodes( 

1162 self, 

1163 old: Dict[str, "ClusterNode"], 

1164 new: Dict[str, "ClusterNode"], 

1165 remove_old: bool = False, 

1166 ) -> None: 

1167 if remove_old: 

1168 for name in list(old.keys()): 

1169 if name not in new: 

1170 task = asyncio.create_task(old.pop(name).disconnect()) # noqa 

1171 

1172 for name, node in new.items(): 

1173 if name in old: 

1174 if old[name] is node: 

1175 continue 

1176 task = asyncio.create_task(old[name].disconnect()) # noqa 

1177 old[name] = node 

1178 

1179 def _update_moved_slots(self) -> None: 

1180 e = self._moved_exception 

1181 redirected_node = self.get_node(host=e.host, port=e.port) 

1182 if redirected_node: 

1183 # The node already exists 

1184 if redirected_node.server_type != PRIMARY: 

1185 # Update the node's server type 

1186 redirected_node.server_type = PRIMARY 

1187 else: 

1188 # This is a new node, we will add it to the nodes cache 

1189 redirected_node = ClusterNode( 

1190 e.host, e.port, PRIMARY, **self.connection_kwargs 

1191 ) 

1192 self.set_nodes(self.nodes_cache, {redirected_node.name: redirected_node}) 

1193 if redirected_node in self.slots_cache[e.slot_id]: 

1194 # The MOVED error resulted from a failover, and the new slot owner 

1195 # had previously been a replica. 

1196 old_primary = self.slots_cache[e.slot_id][0] 

1197 # Update the old primary to be a replica and add it to the end of 

1198 # the slot's node list 

1199 old_primary.server_type = REPLICA 

1200 self.slots_cache[e.slot_id].append(old_primary) 

1201 # Remove the old replica, which is now a primary, from the slot's 

1202 # node list 

1203 self.slots_cache[e.slot_id].remove(redirected_node) 

1204 # Override the old primary with the new one 

1205 self.slots_cache[e.slot_id][0] = redirected_node 

1206 if self.default_node == old_primary: 

1207 # Update the default node with the new primary 

1208 self.default_node = redirected_node 

1209 else: 

1210 # The new slot owner is a new server, or a server from a different 

1211 # shard. We need to remove all current nodes from the slot's list 

1212 # (including replications) and add just the new node. 

1213 self.slots_cache[e.slot_id] = [redirected_node] 

1214 # Reset moved_exception 

1215 self._moved_exception = None 

1216 

1217 def get_node_from_slot( 

1218 self, slot: int, read_from_replicas: bool = False 

1219 ) -> "ClusterNode": 

1220 if self._moved_exception: 

1221 self._update_moved_slots() 

1222 

1223 try: 

1224 if read_from_replicas: 

1225 # get the server index in a Round-Robin manner 

1226 primary_name = self.slots_cache[slot][0].name 

1227 node_idx = self.read_load_balancer.get_server_index( 

1228 primary_name, len(self.slots_cache[slot]) 

1229 ) 

1230 return self.slots_cache[slot][node_idx] 

1231 return self.slots_cache[slot][0] 

1232 except (IndexError, TypeError): 

1233 raise SlotNotCoveredError( 

1234 f'Slot "{slot}" not covered by the cluster. ' 

1235 f'"require_full_coverage={self.require_full_coverage}"' 

1236 ) 

1237 

1238 def get_nodes_by_server_type(self, server_type: str) -> List["ClusterNode"]: 

1239 return [ 

1240 node 

1241 for node in self.nodes_cache.values() 

1242 if node.server_type == server_type 

1243 ] 

1244 

1245 async def initialize(self) -> None: 

1246 self.read_load_balancer.reset() 

1247 tmp_nodes_cache: Dict[str, "ClusterNode"] = {} 

1248 tmp_slots: Dict[int, List["ClusterNode"]] = {} 

1249 disagreements = [] 

1250 startup_nodes_reachable = False 

1251 fully_covered = False 

1252 exception = None 

1253 for startup_node in self.startup_nodes.values(): 

1254 try: 

1255 # Make sure cluster mode is enabled on this node 

1256 try: 

1257 cluster_slots = await startup_node.execute_command("CLUSTER SLOTS") 

1258 except ResponseError: 

1259 raise RedisClusterException( 

1260 "Cluster mode is not enabled on this node" 

1261 ) 

1262 startup_nodes_reachable = True 

1263 except Exception as e: 

1264 # Try the next startup node. 

1265 # The exception is saved and raised only if we have no more nodes. 

1266 exception = e 

1267 continue 

1268 

1269 # CLUSTER SLOTS command results in the following output: 

1270 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

1271 # where each node contains the following list: [IP, port, node_id] 

1272 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

1273 # primary node of the first slot section. 

1274 # If there's only one server in the cluster, its ``host`` is '' 

1275 # Fix it to the host in startup_nodes 

1276 if ( 

1277 len(cluster_slots) == 1 

1278 and not cluster_slots[0][2][0] 

1279 and len(self.startup_nodes) == 1 

1280 ): 

1281 cluster_slots[0][2][0] = startup_node.host 

1282 

1283 for slot in cluster_slots: 

1284 for i in range(2, len(slot)): 

1285 slot[i] = [str_if_bytes(val) for val in slot[i]] 

1286 primary_node = slot[2] 

1287 host = primary_node[0] 

1288 if host == "": 

1289 host = startup_node.host 

1290 port = int(primary_node[1]) 

1291 host, port = self.remap_host_port(host, port) 

1292 

1293 target_node = tmp_nodes_cache.get(get_node_name(host, port)) 

1294 if not target_node: 

1295 target_node = ClusterNode( 

1296 host, port, PRIMARY, **self.connection_kwargs 

1297 ) 

1298 # add this node to the nodes cache 

1299 tmp_nodes_cache[target_node.name] = target_node 

1300 

1301 for i in range(int(slot[0]), int(slot[1]) + 1): 

1302 if i not in tmp_slots: 

1303 tmp_slots[i] = [] 

1304 tmp_slots[i].append(target_node) 

1305 replica_nodes = [slot[j] for j in range(3, len(slot))] 

1306 

1307 for replica_node in replica_nodes: 

1308 host = replica_node[0] 

1309 port = replica_node[1] 

1310 host, port = self.remap_host_port(host, port) 

1311 

1312 target_replica_node = tmp_nodes_cache.get( 

1313 get_node_name(host, port) 

1314 ) 

1315 if not target_replica_node: 

1316 target_replica_node = ClusterNode( 

1317 host, port, REPLICA, **self.connection_kwargs 

1318 ) 

1319 tmp_slots[i].append(target_replica_node) 

1320 # add this node to the nodes cache 

1321 tmp_nodes_cache[target_replica_node.name] = ( 

1322 target_replica_node 

1323 ) 

1324 else: 

1325 # Validate that 2 nodes want to use the same slot cache 

1326 # setup 

1327 tmp_slot = tmp_slots[i][0] 

1328 if tmp_slot.name != target_node.name: 

1329 disagreements.append( 

1330 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

1331 ) 

1332 

1333 if len(disagreements) > 5: 

1334 raise RedisClusterException( 

1335 f"startup_nodes could not agree on a valid " 

1336 f'slots cache: {", ".join(disagreements)}' 

1337 ) 

1338 

1339 # Validate if all slots are covered or if we should try next startup node 

1340 fully_covered = True 

1341 for i in range(REDIS_CLUSTER_HASH_SLOTS): 

1342 if i not in tmp_slots: 

1343 fully_covered = False 

1344 break 

1345 if fully_covered: 

1346 break 

1347 

1348 if not startup_nodes_reachable: 

1349 raise RedisClusterException( 

1350 f"Redis Cluster cannot be connected. Please provide at least " 

1351 f"one reachable node: {str(exception)}" 

1352 ) from exception 

1353 

1354 # Check if the slots are not fully covered 

1355 if not fully_covered and self.require_full_coverage: 

1356 # Despite the requirement that the slots be covered, there 

1357 # isn't a full coverage 

1358 raise RedisClusterException( 

1359 f"All slots are not covered after query all startup_nodes. " 

1360 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

1361 f"covered..." 

1362 ) 

1363 

1364 # Set the tmp variables to the real variables 

1365 self.slots_cache = tmp_slots 

1366 self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True) 

1367 # Populate the startup nodes with all discovered nodes 

1368 self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True) 

1369 

1370 # Set the default node 

1371 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

1372 # If initialize was called after a MovedError, clear it 

1373 self._moved_exception = None 

1374 

1375 async def aclose(self, attr: str = "nodes_cache") -> None: 

1376 self.default_node = None 

1377 await asyncio.gather( 

1378 *( 

1379 asyncio.create_task(node.disconnect()) 

1380 for node in getattr(self, attr).values() 

1381 ) 

1382 ) 

1383 

1384 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

1385 """ 

1386 Remap the host and port returned from the cluster to a different 

1387 internal value. Useful if the client is not connecting directly 

1388 to the cluster. 

1389 """ 

1390 if self.address_remap: 

1391 return self.address_remap((host, port)) 

1392 return host, port 

1393 

1394 

1395class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands): 

1396 """ 

1397 Create a new ClusterPipeline object. 

1398 

1399 Usage:: 

1400 

1401 result = await ( 

1402 rc.pipeline() 

1403 .set("A", 1) 

1404 .get("A") 

1405 .hset("K", "F", "V") 

1406 .hgetall("K") 

1407 .mset_nonatomic({"A": 2, "B": 3}) 

1408 .get("A") 

1409 .get("B") 

1410 .delete("A", "B", "K") 

1411 .execute() 

1412 ) 

1413 # result = [True, "1", 1, {"F": "V"}, True, True, "2", "3", 1, 1, 1] 

1414 

1415 Note: For commands `DELETE`, `EXISTS`, `TOUCH`, `UNLINK`, `mset_nonatomic`, which 

1416 are split across multiple nodes, you'll get multiple results for them in the array. 

1417 

1418 Retryable errors: 

1419 - :class:`~.ClusterDownError` 

1420 - :class:`~.ConnectionError` 

1421 - :class:`~.TimeoutError` 

1422 

1423 Redirection errors: 

1424 - :class:`~.TryAgainError` 

1425 - :class:`~.MovedError` 

1426 - :class:`~.AskError` 

1427 

1428 :param client: 

1429 | Existing :class:`~.RedisCluster` client 

1430 """ 

1431 

1432 __slots__ = ("_command_stack", "_client") 

1433 

1434 def __init__(self, client: RedisCluster) -> None: 

1435 self._client = client 

1436 

1437 self._command_stack: List["PipelineCommand"] = [] 

1438 

1439 async def initialize(self) -> "ClusterPipeline": 

1440 if self._client._initialize: 

1441 await self._client.initialize() 

1442 self._command_stack = [] 

1443 return self 

1444 

1445 async def __aenter__(self) -> "ClusterPipeline": 

1446 return await self.initialize() 

1447 

1448 async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> None: 

1449 self._command_stack = [] 

1450 

1451 def __await__(self) -> Generator[Any, None, "ClusterPipeline"]: 

1452 return self.initialize().__await__() 

1453 

1454 def __enter__(self) -> "ClusterPipeline": 

1455 self._command_stack = [] 

1456 return self 

1457 

1458 def __exit__(self, exc_type: None, exc_value: None, traceback: None) -> None: 

1459 self._command_stack = [] 

1460 

1461 def __bool__(self) -> bool: 

1462 "Pipeline instances should always evaluate to True on Python 3+" 

1463 return True 

1464 

1465 def __len__(self) -> int: 

1466 return len(self._command_stack) 

1467 

1468 def execute_command( 

1469 self, *args: Union[KeyT, EncodableT], **kwargs: Any 

1470 ) -> "ClusterPipeline": 

1471 """ 

1472 Append a raw command to the pipeline. 

1473 

1474 :param args: 

1475 | Raw command args 

1476 :param kwargs: 

1477 

1478 - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode` 

1479 or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`] 

1480 - Rest of the kwargs are passed to the Redis connection 

1481 """ 

1482 kwargs.pop("keys", None) # the keys are used only for client side caching 

1483 self._command_stack.append( 

1484 PipelineCommand(len(self._command_stack), *args, **kwargs) 

1485 ) 

1486 return self 

1487 

1488 async def execute( 

1489 self, raise_on_error: bool = True, allow_redirections: bool = True 

1490 ) -> List[Any]: 

1491 """ 

1492 Execute the pipeline. 

1493 

1494 It will retry the commands as specified by :attr:`cluster_error_retry_attempts` 

1495 & then raise an exception. 

1496 

1497 :param raise_on_error: 

1498 | Raise the first error if there are any errors 

1499 :param allow_redirections: 

1500 | Whether to retry each failed command individually in case of redirection 

1501 errors 

1502 

1503 :raises RedisClusterException: if target_nodes is not provided & the command 

1504 can't be mapped to a slot 

1505 """ 

1506 if not self._command_stack: 

1507 return [] 

1508 

1509 try: 

1510 for _ in range(self._client.cluster_error_retry_attempts): 

1511 if self._client._initialize: 

1512 await self._client.initialize() 

1513 

1514 try: 

1515 return await self._execute( 

1516 self._client, 

1517 self._command_stack, 

1518 raise_on_error=raise_on_error, 

1519 allow_redirections=allow_redirections, 

1520 ) 

1521 except BaseException as e: 

1522 if type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1523 # Try again with the new cluster setup. 

1524 exception = e 

1525 await self._client.aclose() 

1526 await asyncio.sleep(0.25) 

1527 else: 

1528 # All other errors should be raised. 

1529 raise 

1530 

1531 # If it fails the configured number of times then raise an exception 

1532 raise exception 

1533 finally: 

1534 self._command_stack = [] 

1535 

1536 async def _execute( 

1537 self, 

1538 client: "RedisCluster", 

1539 stack: List["PipelineCommand"], 

1540 raise_on_error: bool = True, 

1541 allow_redirections: bool = True, 

1542 ) -> List[Any]: 

1543 todo = [ 

1544 cmd for cmd in stack if not cmd.result or isinstance(cmd.result, Exception) 

1545 ] 

1546 

1547 nodes = {} 

1548 for cmd in todo: 

1549 passed_targets = cmd.kwargs.pop("target_nodes", None) 

1550 if passed_targets and not client._is_node_flag(passed_targets): 

1551 target_nodes = client._parse_target_nodes(passed_targets) 

1552 else: 

1553 target_nodes = await client._determine_nodes( 

1554 *cmd.args, node_flag=passed_targets 

1555 ) 

1556 if not target_nodes: 

1557 raise RedisClusterException( 

1558 f"No targets were found to execute {cmd.args} command on" 

1559 ) 

1560 if len(target_nodes) > 1: 

1561 raise RedisClusterException(f"Too many targets for command {cmd.args}") 

1562 node = target_nodes[0] 

1563 if node.name not in nodes: 

1564 nodes[node.name] = (node, []) 

1565 nodes[node.name][1].append(cmd) 

1566 

1567 errors = await asyncio.gather( 

1568 *( 

1569 asyncio.create_task(node[0].execute_pipeline(node[1])) 

1570 for node in nodes.values() 

1571 ) 

1572 ) 

1573 

1574 if any(errors): 

1575 if allow_redirections: 

1576 # send each errored command individually 

1577 for cmd in todo: 

1578 if isinstance(cmd.result, (TryAgainError, MovedError, AskError)): 

1579 try: 

1580 cmd.result = await client.execute_command( 

1581 *cmd.args, **cmd.kwargs 

1582 ) 

1583 except Exception as e: 

1584 cmd.result = e 

1585 

1586 if raise_on_error: 

1587 for cmd in todo: 

1588 result = cmd.result 

1589 if isinstance(result, Exception): 

1590 command = " ".join(map(safe_str, cmd.args)) 

1591 msg = ( 

1592 f"Command # {cmd.position + 1} ({command}) of pipeline " 

1593 f"caused error: {result.args}" 

1594 ) 

1595 result.args = (msg,) + result.args[1:] 

1596 raise result 

1597 

1598 default_node = nodes.get(client.get_default_node().name) 

1599 if default_node is not None: 

1600 # This pipeline execution used the default node, check if we need 

1601 # to replace it. 

1602 # Note: when the error is raised we'll reset the default node in the 

1603 # caller function. 

1604 for cmd in default_node[1]: 

1605 # Check if it has a command that failed with a relevant 

1606 # exception 

1607 if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY: 

1608 client.replace_default_node() 

1609 break 

1610 

1611 return [cmd.result for cmd in stack] 

1612 

1613 def _split_command_across_slots( 

1614 self, command: str, *keys: KeyT 

1615 ) -> "ClusterPipeline": 

1616 for slot_keys in self._client._partition_keys_by_slot(keys).values(): 

1617 self.execute_command(command, *slot_keys) 

1618 

1619 return self 

1620 

1621 def mset_nonatomic( 

1622 self, mapping: Mapping[AnyKeyT, EncodableT] 

1623 ) -> "ClusterPipeline": 

1624 encoder = self._client.encoder 

1625 

1626 slots_pairs = {} 

1627 for pair in mapping.items(): 

1628 slot = key_slot(encoder.encode(pair[0])) 

1629 slots_pairs.setdefault(slot, []).extend(pair) 

1630 

1631 for pairs in slots_pairs.values(): 

1632 self.execute_command("MSET", *pairs) 

1633 

1634 return self 

1635 

1636 

1637for command in PIPELINE_BLOCKED_COMMANDS: 

1638 command = command.replace(" ", "_").lower() 

1639 if command == "mset_nonatomic": 

1640 continue 

1641 

1642 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

1643 

1644 

1645class PipelineCommand: 

1646 def __init__(self, position: int, *args: Any, **kwargs: Any) -> None: 

1647 self.args = args 

1648 self.kwargs = kwargs 

1649 self.position = position 

1650 self.result: Union[Any, Exception] = None 

1651 

1652 def __repr__(self) -> str: 

1653 return f"[{self.position}] {self.args} ({self.kwargs})"