Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1833 statements  

1import logging 

2import random 

3import socket 

4import sys 

5import threading 

6import time 

7import weakref 

8from abc import ABC, abstractmethod 

9from collections import OrderedDict, defaultdict 

10from concurrent.futures import Future, ThreadPoolExecutor 

11from copy import copy 

12from enum import Enum 

13from itertools import chain 

14from types import MethodType 

15from typing import ( 

16 TYPE_CHECKING, 

17 Any, 

18 Callable, 

19 Dict, 

20 List, 

21 Literal, 

22 Optional, 

23 Set, 

24 Tuple, 

25 Type, 

26 Union, 

27) 

28 

29if TYPE_CHECKING: 

30 from redis.keyspace_notifications import ClusterKeyspaceNotifications 

31 

32from redis._defaults import DEFAULT_RETRY_BASE, DEFAULT_RETRY_CAP, DEFAULT_RETRY_COUNT 

33from redis._parsers import CommandsParser, Encoder 

34from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy 

35from redis._parsers.helpers import parse_scan 

36from redis.backoff import ExponentialWithJitterBackoff, NoBackoff 

37from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface 

38from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis 

39from redis.commands import READ_COMMANDS, RedisClusterCommands 

40from redis.commands.helpers import list_or_args, parse_pubsub_subscriptions 

41from redis.commands.policies import PolicyResolver, StaticPolicyResolver 

42from redis.connection import ( 

43 Connection, 

44 ConnectionPool, 

45 parse_url, 

46) 

47from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot 

48from redis.event import ( 

49 AfterPooledConnectionsInstantiationEvent, 

50 AfterPubSubConnectionInstantiationEvent, 

51 AfterSlotsCacheRefreshEvent, 

52 ClientType, 

53 EventDispatcher, 

54 EventListenerInterface, 

55) 

56from redis.exceptions import ( 

57 AskError, 

58 AuthenticationError, 

59 ClusterDownError, 

60 ClusterError, 

61 ConnectionError, 

62 CrossSlotTransactionError, 

63 DataError, 

64 ExecAbortError, 

65 InvalidPipelineStack, 

66 MaxConnectionsError, 

67 MovedError, 

68 RedisClusterException, 

69 RedisError, 

70 ResponseError, 

71 SlotNotCoveredError, 

72 TimeoutError, 

73 TryAgainError, 

74 WatchError, 

75) 

76from redis.lock import Lock 

77from redis.maint_notifications import ( 

78 MaintNotificationsConfig, 

79 OSSMaintNotificationsHandler, 

80) 

81from redis.observability.recorder import ( 

82 record_error_count, 

83 record_operation_duration, 

84) 

85from redis.retry import Retry 

86from redis.typing import ChannelT, PubSubHandler, Subscription 

87from redis.utils import ( 

88 check_protocol_version, 

89 deprecated_args, 

90 deprecated_function, 

91 dict_merge, 

92 list_keys_to_dict, 

93 merge_result, 

94 safe_str, 

95 str_if_bytes, 

96 truncate_text, 

97) 

98 

99logger = logging.getLogger(__name__) 

100 

101 

102def is_debug_log_enabled(): 

103 return logger.isEnabledFor(logging.DEBUG) 

104 

105 

106def get_node_name(host: str, port: Union[str, int]) -> str: 

107 return f"{host}:{port}" 

108 

109 

110@deprecated_args( 

111 allowed_args=["redis_node"], 

112 reason="Use get_connection(redis_node) instead", 

113 version="5.3.0", 

114) 

115def get_connection(redis_node: Redis, *args, **options) -> Connection: 

116 return redis_node.connection or redis_node.connection_pool.get_connection() 

117 

118 

119def parse_scan_result(command, res, **options): 

120 cursors = {} 

121 ret = [] 

122 for node_name, response in res.items(): 

123 cursor, r = parse_scan(response, **options) 

124 cursors[node_name] = cursor 

125 ret += r 

126 

127 return cursors, ret 

128 

129 

130def parse_pubsub_numsub(command, res, **options): 

131 numsub_d = OrderedDict() 

132 for numsub_tups in res.values(): 

133 for channel, numsubbed in numsub_tups: 

134 try: 

135 numsub_d[channel] += numsubbed 

136 except KeyError: 

137 numsub_d[channel] = numsubbed 

138 

139 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] 

140 return ret_numsub 

141 

142 

143def parse_cluster_slots( 

144 resp: Any, **options: Any 

145) -> Dict[Tuple[int, int], Dict[str, Any]]: 

146 current_host = options.get("current_host", "") 

147 

148 def fix_server(*args: Any) -> Tuple[str, Any]: 

149 return str_if_bytes(args[0]) or current_host, args[1] 

150 

151 slots = {} 

152 for slot in resp: 

153 start, end, primary = slot[:3] 

154 replicas = slot[3:] 

155 slots[start, end] = { 

156 "primary": fix_server(*primary), 

157 "replicas": [fix_server(*replica) for replica in replicas], 

158 } 

159 

160 return slots 

161 

162 

163def parse_cluster_shards(resp, **options): 

164 """ 

165 Parse CLUSTER SHARDS response. 

166 """ 

167 if isinstance(resp[0], dict): 

168 return resp 

169 shards = [] 

170 for x in resp: 

171 shard = {"slots": [], "nodes": []} 

172 for i in range(0, len(x[1]), 2): 

173 shard["slots"].append((x[1][i], (x[1][i + 1]))) 

174 nodes = x[3] 

175 for node in nodes: 

176 dict_node = {} 

177 for i in range(0, len(node), 2): 

178 dict_node[node[i]] = node[i + 1] 

179 shard["nodes"].append(dict_node) 

180 shards.append(shard) 

181 

182 return shards 

183 

184 

185def parse_cluster_shards_with_str_keys(resp, **options): 

186 """ 

187 Parse CLUSTER SHARDS with string top-level structural keys. 

188 

189 RESP2 parsing exposes top-level shard keys as ``"slots"``/``"nodes"`` 

190 while node attribute keys keep the connection's decoded/raw form. RESP3 can 

191 return top-level shard dictionaries directly, so normalize only the 

192 structural shard keys and preserve nested node dictionaries as delivered. 

193 """ 

194 if not resp: 

195 return resp 

196 if not isinstance(resp[0], dict): 

197 return parse_cluster_shards(resp, **options) 

198 

199 shards = [] 

200 for shard_resp in resp: 

201 slots = shard_resp.get(b"slots", shard_resp.get("slots", [])) 

202 nodes = shard_resp.get(b"nodes", shard_resp.get("nodes", [])) 

203 shard = { 

204 "slots": [ 

205 tuple(slot) if isinstance(slot, list) else slot for slot in slots 

206 ], 

207 "nodes": [dict(node) if isinstance(node, dict) else node for node in nodes], 

208 } 

209 shards.append(shard) 

210 return shards 

211 

212 

213def parse_cluster_shards_unified(resp, **options): 

214 """ 

215 Parse CLUSTER SHARDS into the approved unified shape. 

216 

217 Top-level shard keys and nested node attribute keys are strings for both 

218 RESP2 and RESP3 wire responses. 

219 """ 

220 if not resp: 

221 return resp 

222 if isinstance(resp[0], dict): 

223 shards = [] 

224 for shard_resp in resp: 

225 slots = shard_resp.get(b"slots", shard_resp.get("slots", [])) 

226 nodes = shard_resp.get(b"nodes", shard_resp.get("nodes", [])) 

227 shard = { 

228 "slots": slots, 

229 "nodes": [ 

230 {str_if_bytes(k): v for k, v in node.items()} 

231 if isinstance(node, dict) 

232 else node 

233 for node in nodes 

234 ], 

235 } 

236 shards.append(shard) 

237 return shards 

238 

239 shards = [] 

240 for x in resp: 

241 shard = {"slots": [], "nodes": []} 

242 for i in range(0, len(x[1]), 2): 

243 shard["slots"].append((x[1][i], x[1][i + 1])) 

244 nodes = x[3] 

245 for node in nodes: 

246 dict_node = {} 

247 for i in range(0, len(node), 2): 

248 dict_node[str_if_bytes(node[i])] = node[i + 1] 

249 shard["nodes"].append(dict_node) 

250 shards.append(shard) 

251 return shards 

252 

253 

254def parse_cluster_myshardid(resp, **options): 

255 """ 

256 Parse CLUSTER MYSHARDID response. 

257 """ 

258 return resp.decode("utf-8") 

259 

260 

261PRIMARY = "primary" 

262REPLICA = "replica" 

263SLOT_ID = "slot-id" 

264 

265REDIS_ALLOWED_KEYS = ( 

266 "connection_class", 

267 "connection_pool", 

268 "connection_pool_class", 

269 "client_name", 

270 "credential_provider", 

271 "db", 

272 "decode_responses", 

273 "encoding", 

274 "encoding_errors", 

275 "host", 

276 "driver_info", 

277 "lib_name", 

278 "lib_version", 

279 "max_connections", 

280 "nodes_flag", 

281 "redis_connect_func", 

282 "password", 

283 "port", 

284 "timeout", 

285 "queue_class", 

286 "retry", 

287 "retry_on_timeout", 

288 "protocol", 

289 "legacy_responses", 

290 "socket_connect_timeout", 

291 "socket_keepalive", 

292 "socket_keepalive_options", 

293 "socket_read_size", 

294 "socket_timeout", 

295 "ssl", 

296 "ssl_ca_certs", 

297 "ssl_ca_data", 

298 "ssl_ca_path", 

299 "ssl_certfile", 

300 "ssl_cert_reqs", 

301 "ssl_include_verify_flags", 

302 "ssl_exclude_verify_flags", 

303 "ssl_keyfile", 

304 "ssl_password", 

305 "ssl_check_hostname", 

306 "unix_socket_path", 

307 "username", 

308 "cache", 

309 "cache_config", 

310 "maint_notifications_config", 

311) 

312KWARGS_DISABLED_KEYS = ("host", "port", "retry") 

313 

314 

315def cleanup_kwargs(**kwargs): 

316 """ 

317 Remove unsupported or disabled keys from kwargs 

318 """ 

319 connection_kwargs = { 

320 k: v 

321 for k, v in kwargs.items() 

322 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS 

323 } 

324 

325 return connection_kwargs 

326 

327 

328class MaintNotificationsAbstractRedisCluster: 

329 """ 

330 Abstract class for handling maintenance notifications logic. 

331 This class is expected to be used as base class together with RedisCluster. 

332 

333 This class is intended to be used with multiple inheritance! 

334 

335 All logic related to maintenance notifications is encapsulated in this class. 

336 """ 

337 

338 def __init__( 

339 self, 

340 maint_notifications_config: Optional[MaintNotificationsConfig], 

341 **kwargs, 

342 ): 

343 # Initialize maintenance notifications 

344 is_protocol_supported = check_protocol_version(kwargs.get("protocol"), 3) 

345 

346 if ( 

347 maint_notifications_config 

348 and maint_notifications_config.enabled 

349 and not is_protocol_supported 

350 ): 

351 raise RedisError( 

352 "Maintenance notifications handlers on connection are only supported with RESP version 3" 

353 ) 

354 if maint_notifications_config is None and is_protocol_supported: 

355 maint_notifications_config = MaintNotificationsConfig() 

356 

357 self.maint_notifications_config = maint_notifications_config 

358 

359 if self.maint_notifications_config and self.maint_notifications_config.enabled: 

360 self._oss_cluster_maint_notifications_handler = ( 

361 OSSMaintNotificationsHandler(self, self.maint_notifications_config) 

362 ) 

363 # Update connection kwargs for all future nodes connections 

364 self._update_connection_kwargs_for_maint_notifications( 

365 self._oss_cluster_maint_notifications_handler 

366 ) 

367 # Update existing nodes connections - they are created as part of the RedisCluster constructor 

368 for node in self.get_nodes(): 

369 if node.redis_connection is None: 

370 continue 

371 node.redis_connection.connection_pool.update_maint_notifications_config( 

372 self.maint_notifications_config, 

373 oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler, 

374 ) 

375 else: 

376 self._oss_cluster_maint_notifications_handler = None 

377 

378 def _update_connection_kwargs_for_maint_notifications( 

379 self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler 

380 ): 

381 """ 

382 Update the connection kwargs for all future connections. 

383 """ 

384 self.nodes_manager.connection_kwargs.update( 

385 { 

386 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler, 

387 } 

388 ) 

389 

390 

391class AbstractRedisCluster: 

392 RedisClusterRequestTTL = 16 

393 

394 PRIMARIES = "primaries" 

395 REPLICAS = "replicas" 

396 ALL_NODES = "all" 

397 RANDOM = "random" 

398 DEFAULT_NODE = "default-node" 

399 

400 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} 

401 

402 COMMAND_FLAGS = dict_merge( 

403 list_keys_to_dict( 

404 [ 

405 "ACL CAT", 

406 "ACL DELUSER", 

407 "ACL DRYRUN", 

408 "ACL GENPASS", 

409 "ACL GETUSER", 

410 "ACL HELP", 

411 "ACL LIST", 

412 "ACL LOG", 

413 "ACL LOAD", 

414 "ACL SAVE", 

415 "ACL SETUSER", 

416 "ACL USERS", 

417 "ACL WHOAMI", 

418 "AUTH", 

419 "CLIENT LIST", 

420 "CLIENT SETINFO", 

421 "CLIENT SETNAME", 

422 "CLIENT GETNAME", 

423 "CONFIG SET", 

424 "CONFIG REWRITE", 

425 "CONFIG RESETSTAT", 

426 "TIME", 

427 "PUBSUB CHANNELS", 

428 "PUBSUB NUMPAT", 

429 "PUBSUB NUMSUB", 

430 "PUBSUB SHARDCHANNELS", 

431 "PUBSUB SHARDNUMSUB", 

432 "PING", 

433 "INFO", 

434 "SHUTDOWN", 

435 "KEYS", 

436 "DBSIZE", 

437 "BGSAVE", 

438 "SLOWLOG GET", 

439 "SLOWLOG LEN", 

440 "SLOWLOG RESET", 

441 "WAIT", 

442 "WAITAOF", 

443 "SAVE", 

444 "MEMORY PURGE", 

445 "MEMORY MALLOC-STATS", 

446 "MEMORY STATS", 

447 "LASTSAVE", 

448 "CLIENT TRACKINGINFO", 

449 "CLIENT PAUSE", 

450 "CLIENT UNPAUSE", 

451 "CLIENT UNBLOCK", 

452 "CLIENT ID", 

453 "CLIENT REPLY", 

454 "CLIENT GETREDIR", 

455 "CLIENT INFO", 

456 "CLIENT KILL", 

457 "READONLY", 

458 "CLUSTER INFO", 

459 "CLUSTER MEET", 

460 "CLUSTER MYSHARDID", 

461 "CLUSTER NODES", 

462 "CLUSTER REPLICAS", 

463 "CLUSTER RESET", 

464 "CLUSTER SET-CONFIG-EPOCH", 

465 "CLUSTER SLOTS", 

466 "CLUSTER SHARDS", 

467 "CLUSTER COUNT-FAILURE-REPORTS", 

468 "CLUSTER KEYSLOT", 

469 "COMMAND", 

470 "COMMAND COUNT", 

471 "COMMAND LIST", 

472 "COMMAND GETKEYS", 

473 "CONFIG GET", 

474 "DEBUG", 

475 "RANDOMKEY", 

476 "READONLY", 

477 "READWRITE", 

478 "TIME", 

479 "TFUNCTION LOAD", 

480 "TFUNCTION DELETE", 

481 "TFUNCTION LIST", 

482 "TFCALL", 

483 "TFCALLASYNC", 

484 "LATENCY HISTORY", 

485 "LATENCY LATEST", 

486 "LATENCY RESET", 

487 "MODULE LIST", 

488 "MODULE LOAD", 

489 "MODULE UNLOAD", 

490 "MODULE LOADEX", 

491 ], 

492 DEFAULT_NODE, 

493 ), 

494 list_keys_to_dict( 

495 [ 

496 "FLUSHALL", 

497 "FLUSHDB", 

498 "FUNCTION DELETE", 

499 "FUNCTION FLUSH", 

500 "FUNCTION LIST", 

501 "FUNCTION LOAD", 

502 "FUNCTION RESTORE", 

503 "SCAN", 

504 "SCRIPT EXISTS", 

505 "SCRIPT FLUSH", 

506 "SCRIPT LOAD", 

507 ], 

508 PRIMARIES, 

509 ), 

510 list_keys_to_dict(["FUNCTION DUMP"], RANDOM), 

511 list_keys_to_dict( 

512 [ 

513 "CLUSTER COUNTKEYSINSLOT", 

514 "CLUSTER DELSLOTS", 

515 "CLUSTER DELSLOTSRANGE", 

516 "CLUSTER GETKEYSINSLOT", 

517 "CLUSTER SETSLOT", 

518 ], 

519 SLOT_ID, 

520 ), 

521 ) 

522 

523 SEARCH_COMMANDS = ( 

524 [ 

525 "FT.CREATE", 

526 "FT.SEARCH", 

527 "FT.AGGREGATE", 

528 "FT.EXPLAIN", 

529 "FT.EXPLAINCLI", 

530 "FT,PROFILE", 

531 "FT.ALTER", 

532 "FT.DROPINDEX", 

533 "FT.ALIASADD", 

534 "FT.ALIASUPDATE", 

535 "FT.ALIASDEL", 

536 "FT.TAGVALS", 

537 "FT.SUGADD", 

538 "FT.SUGGET", 

539 "FT.SUGDEL", 

540 "FT.SUGLEN", 

541 "FT.SYNUPDATE", 

542 "FT.SYNDUMP", 

543 "FT.SPELLCHECK", 

544 "FT.DICTADD", 

545 "FT.DICTDEL", 

546 "FT.DICTDUMP", 

547 "FT.INFO", 

548 "FT._LIST", 

549 "FT.CONFIG", 

550 "FT.ADD", 

551 "FT.DEL", 

552 "FT.DROP", 

553 "FT.GET", 

554 "FT.MGET", 

555 "FT.SYNADD", 

556 ], 

557 ) 

558 

559 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { 

560 "CLUSTER SLOTS": parse_cluster_slots, 

561 "CLUSTER SHARDS": parse_cluster_shards, 

562 "CLUSTER MYSHARDID": parse_cluster_myshardid, 

563 } 

564 

565 RESULT_CALLBACKS = dict_merge( 

566 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub), 

567 list_keys_to_dict( 

568 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) 

569 ), 

570 list_keys_to_dict( 

571 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result 

572 ), 

573 list_keys_to_dict( 

574 [ 

575 "PING", 

576 "CONFIG SET", 

577 "CONFIG REWRITE", 

578 "CONFIG RESETSTAT", 

579 "CLIENT SETNAME", 

580 "BGSAVE", 

581 "SLOWLOG RESET", 

582 "SAVE", 

583 "MEMORY PURGE", 

584 "CLIENT PAUSE", 

585 "CLIENT UNPAUSE", 

586 ], 

587 lambda command, res: all(res.values()) if isinstance(res, dict) else res, 

588 ), 

589 list_keys_to_dict( 

590 ["DBSIZE", "WAIT"], 

591 lambda command, res: sum(res.values()) if isinstance(res, dict) else res, 

592 ), 

593 list_keys_to_dict( 

594 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 

595 ), 

596 list_keys_to_dict(["SCAN"], parse_scan_result), 

597 list_keys_to_dict( 

598 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() 

599 ), 

600 list_keys_to_dict( 

601 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] 

602 ), 

603 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), 

604 ) 

605 

606 ERRORS_ALLOW_RETRY = ( 

607 ConnectionError, 

608 TimeoutError, 

609 ClusterDownError, 

610 SlotNotCoveredError, 

611 ) 

612 

613 def replace_default_node(self, target_node: "ClusterNode" = None) -> None: 

614 """Replace the default cluster node. 

615 A random cluster node will be chosen if target_node isn't passed, and primaries 

616 will be prioritized. The default node will not be changed if there are no other 

617 nodes in the cluster. 

618 

619 Args: 

620 target_node (ClusterNode, optional): Target node to replace the default 

621 node. Defaults to None. 

622 """ 

623 if target_node: 

624 self.nodes_manager.default_node = target_node 

625 else: 

626 curr_node = self.get_default_node() 

627 primaries = [node for node in self.get_primaries() if node != curr_node] 

628 if primaries: 

629 # Choose a primary if the cluster contains different primaries 

630 self.nodes_manager.default_node = random.choice(primaries) 

631 else: 

632 # Otherwise, choose a primary if the cluster contains different primaries 

633 replicas = [node for node in self.get_replicas() if node != curr_node] 

634 if replicas: 

635 self.nodes_manager.default_node = random.choice(replicas) 

636 

637 

638class RedisCluster( 

639 AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands 

640): 

641 # Type discrimination marker for @overload self-type pattern 

642 _is_async_client: Literal[False] = False 

643 

644 @classmethod 

645 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": 

646 """ 

647 Return a Redis client object configured from the given URL 

648 

649 For example:: 

650 

651 redis://[[username]:[password]]@localhost:6379/0 

652 rediss://[[username]:[password]]@localhost:6379/0 

653 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

654 

655 Three URL schemes are supported: 

656 

657 - `redis://` creates a TCP socket connection. See more at: 

658 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

659 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

660 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

661 - ``unix://``: creates a Unix Domain Socket connection. 

662 

663 The username, password, hostname, path and all querystring values 

664 are passed through urllib.parse.unquote in order to replace any 

665 percent-encoded values with their corresponding characters. 

666 

667 There are several ways to specify a database number. The first value 

668 found will be used: 

669 

670 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

671 2. If using the redis:// or rediss:// schemes, the path argument 

672 of the url, e.g. redis://localhost/0 

673 3. A ``db`` keyword argument to this function. 

674 

675 If none of these options are specified, the default db=0 is used. 

676 

677 All querystring options are cast to their appropriate Python types. 

678 Boolean arguments can be specified with string values "True"/"False" 

679 or "Yes"/"No". Values that cannot be properly cast cause a 

680 ``ValueError`` to be raised. Once parsed, the querystring arguments 

681 and keyword arguments are passed to the ``ConnectionPool``'s 

682 class initializer. In the case of conflicting arguments, querystring 

683 arguments always win. 

684 

685 """ 

686 return cls(url=url, **kwargs) 

687 

688 @deprecated_args( 

689 args_to_warn=["read_from_replicas"], 

690 reason="Please configure the 'load_balancing_strategy' instead", 

691 version="5.3.0", 

692 ) 

693 @deprecated_args( 

694 args_to_warn=[ 

695 "cluster_error_retry_attempts", 

696 ], 

697 reason="Please configure the 'retry' object instead", 

698 version="6.0.0", 

699 ) 

700 def __init__( 

701 self, 

702 host: Optional[str] = None, 

703 port: int = 6379, 

704 startup_nodes: Optional[List["ClusterNode"]] = None, 

705 cluster_error_retry_attempts: int = DEFAULT_RETRY_COUNT, 

706 retry: Optional["Retry"] = None, 

707 require_full_coverage: bool = True, 

708 reinitialize_steps: int = 5, 

709 read_from_replicas: bool = False, 

710 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None, 

711 dynamic_startup_nodes: bool = True, 

712 url: Optional[str] = None, 

713 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

714 cache: Optional[CacheInterface] = None, 

715 cache_config: Optional[CacheConfig] = None, 

716 event_dispatcher: Optional[EventDispatcher] = None, 

717 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

718 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

719 **kwargs, 

720 ): 

721 """ 

722 Initialize a new RedisCluster client. 

723 

724 :param startup_nodes: 

725 List of nodes from which initial bootstrapping can be done 

726 :param host: 

727 Can be used to point to a startup node 

728 :param port: 

729 Can be used to point to a startup node 

730 :param require_full_coverage: 

731 When set to False (default value): the client will not require a 

732 full coverage of the slots. However, if not all slots are covered, 

733 and at least one node has 'cluster-require-full-coverage' set to 

734 'yes,' the server will throw a ClusterDownError for some key-based 

735 commands. See - 

736 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters 

737 When set to True: all slots must be covered to construct the 

738 cluster client. If not all slots are covered, RedisClusterException 

739 will be thrown. 

740 :param read_from_replicas: 

741 @deprecated - please use load_balancing_strategy instead 

742 Enable read from replicas in READONLY mode. You can read possibly 

743 stale data. 

744 When set to true, read commands will be assigned between the 

745 primary and its replications in a Round-Robin manner. 

746 :param load_balancing_strategy: 

747 Enable read from replicas in READONLY mode and defines the load balancing 

748 strategy that will be used for cluster node selection. 

749 The data read from replicas is eventually consistent with the data in primary nodes. 

750 :param dynamic_startup_nodes: 

751 Set the RedisCluster's startup nodes to all of the discovered nodes. 

752 If true (default value), the cluster's discovered nodes will be used to 

753 determine the cluster nodes-slots mapping in the next topology refresh. 

754 It will remove the initial passed startup nodes if their endpoints aren't 

755 listed in the CLUSTER SLOTS output. 

756 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists 

757 specific IP addresses, it is best to set it to false. 

758 :param cluster_error_retry_attempts: 

759 @deprecated - Please configure the 'retry' object instead 

760 In case 'retry' object is set - this argument is ignored! 

761 

762 Number of times to retry before raising an error when 

763 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or 

764 :class:`~.ClusterDownError` are encountered 

765 :param retry: 

766 A retry object that defines the retry strategy and the number of 

767 retries for the cluster client. 

768 In current implementation for the cluster client (starting form redis-py version 6.0.0) 

769 the retry object is not yet fully utilized, instead it is used just to determine 

770 the number of retries for the cluster client. 

771 In the future releases the retry object will be used to handle the cluster client retries! 

772 :param reinitialize_steps: 

773 Specifies the number of MOVED errors that need to occur before 

774 reinitializing the whole cluster topology. If a MOVED error occurs 

775 and the cluster does not need to be reinitialized on this current 

776 error handling, only the MOVED slot will be patched with the 

777 redirected node. 

778 To reinitialize the cluster on every MOVED error, set 

779 reinitialize_steps to 1. 

780 To avoid reinitializing the cluster on moved errors, set 

781 reinitialize_steps to 0. 

782 :param address_remap: 

783 An optional callable which, when provided with an internal network 

784 address of a node, e.g. a `(host, port)` tuple, will return the address 

785 where the node is reachable. This can be used to map the addresses at 

786 which the nodes _think_ they are, to addresses at which a client may 

787 reach them, such as when they sit behind a proxy. 

788 

789 :param maint_notifications_config: 

790 Configures the nodes connections to support maintenance notifications - see 

791 `redis.maint_notifications.MaintNotificationsConfig` for details. 

792 Only supported with RESP3. 

793 If not provided and protocol is RESP3, the maintenance notifications 

794 will be enabled by default (logic is included in the NodesManager 

795 initialization). 

796 :**kwargs: 

797 Extra arguments that will be sent into Redis instance when created 

798 (See Official redis-py doc for supported kwargs - the only limitation 

799 is that you can't provide 'retry' object as part of kwargs. 

800 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) 

801 Some kwargs are not supported and will raise a 

802 RedisClusterException: 

803 - db (Redis do not support database SELECT in cluster mode) 

804 

805 """ 

806 if startup_nodes is None: 

807 startup_nodes = [] 

808 

809 if "db" in kwargs: 

810 # Argument 'db' is not possible to use in cluster mode 

811 raise RedisClusterException( 

812 "Argument 'db' is not possible to use in cluster mode" 

813 ) 

814 

815 if "retry" in kwargs: 

816 # Argument 'retry' is not possible to be used in kwargs when in cluster mode 

817 # the kwargs are set to the lower level connections to the cluster nodes 

818 # and there we provide retry configuration without retries allowed. 

819 # The retries should be handled on cluster client level. 

820 raise RedisClusterException( 

821 "The 'retry' argument cannot be used in kwargs when running in cluster mode." 

822 ) 

823 

824 # Get the startup node/s 

825 from_url = False 

826 if url is not None: 

827 from_url = True 

828 url_options = parse_url(url) 

829 if "path" in url_options: 

830 raise RedisClusterException( 

831 "RedisCluster does not currently support Unix Domain " 

832 "Socket connections" 

833 ) 

834 if "db" in url_options and url_options["db"] != 0: 

835 # Argument 'db' is not possible to use in cluster mode 

836 raise RedisClusterException( 

837 "A ``db`` querystring option can only be 0 in cluster mode" 

838 ) 

839 kwargs.update(url_options) 

840 host = kwargs.get("host") 

841 port = kwargs.get("port", port) 

842 startup_nodes.append(ClusterNode(host, port)) 

843 elif host is not None and port is not None: 

844 startup_nodes.append(ClusterNode(host, port)) 

845 elif len(startup_nodes) == 0: 

846 # No startup node was provided 

847 raise RedisClusterException( 

848 "RedisCluster requires at least one node to discover the " 

849 "cluster. Please provide one of the followings:\n" 

850 "1. host and port, for example:\n" 

851 " RedisCluster(host='localhost', port=6379)\n" 

852 "2. list of startup nodes, for example:\n" 

853 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," 

854 " ClusterNode('localhost', 6378)])" 

855 ) 

856 # Update the connection arguments 

857 # Whenever a new connection is established, RedisCluster's on_connect 

858 # method should be run 

859 # If the user passed on_connect function we'll save it and run it 

860 # inside the RedisCluster.on_connect() function 

861 self.user_on_connect_func = kwargs.pop("redis_connect_func", None) 

862 kwargs.update({"redis_connect_func": self.on_connect}) 

863 kwargs = cleanup_kwargs(**kwargs) 

864 if retry: 

865 self.retry = retry 

866 else: 

867 self.retry = Retry( 

868 backoff=ExponentialWithJitterBackoff( 

869 base=DEFAULT_RETRY_BASE, cap=DEFAULT_RETRY_CAP 

870 ), 

871 retries=cluster_error_retry_attempts, 

872 ) 

873 

874 self.encoder = Encoder( 

875 kwargs.get("encoding", "utf-8"), 

876 kwargs.get("encoding_errors", "strict"), 

877 kwargs.get("decode_responses", False), 

878 ) 

879 protocol = kwargs.get("protocol", None) 

880 if (cache_config or cache) and not check_protocol_version(protocol, 3): 

881 raise RedisError("Client caching is only supported with RESP version 3") 

882 

883 if maint_notifications_config and not check_protocol_version(protocol, 3): 

884 raise RedisError( 

885 "Maintenance notifications are only supported with RESP version 3" 

886 ) 

887 if check_protocol_version(protocol, 3) and maint_notifications_config is None: 

888 maint_notifications_config = MaintNotificationsConfig() 

889 

890 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

891 self.node_flags = self.__class__.NODE_FLAGS.copy() 

892 self.read_from_replicas = read_from_replicas 

893 self.load_balancing_strategy = load_balancing_strategy 

894 self.reinitialize_counter = 0 

895 self.reinitialize_steps = reinitialize_steps 

896 if event_dispatcher is None: 

897 self._event_dispatcher = EventDispatcher() 

898 else: 

899 self._event_dispatcher = event_dispatcher 

900 self.startup_nodes = startup_nodes 

901 

902 self.nodes_manager = NodesManager( 

903 startup_nodes=startup_nodes, 

904 from_url=from_url, 

905 require_full_coverage=require_full_coverage, 

906 dynamic_startup_nodes=dynamic_startup_nodes, 

907 address_remap=address_remap, 

908 cache=cache, 

909 cache_config=cache_config, 

910 event_dispatcher=self._event_dispatcher, 

911 maint_notifications_config=maint_notifications_config, 

912 **kwargs, 

913 ) 

914 

915 cluster_response_callbacks = dict( 

916 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS 

917 ) 

918 legacy_responses = kwargs.get("legacy_responses", True) 

919 protocol = kwargs.get("protocol") 

920 if not legacy_responses: 

921 cluster_response_callbacks["CLUSTER SHARDS"] = parse_cluster_shards_unified 

922 elif protocol is None: 

923 cluster_response_callbacks["CLUSTER SHARDS"] = ( 

924 parse_cluster_shards_with_str_keys 

925 ) 

926 self.cluster_response_callbacks = CaseInsensitiveDict( 

927 cluster_response_callbacks 

928 ) 

929 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) 

930 

931 # For backward compatibility, mapping from existing policies to new one 

932 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

933 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

934 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

935 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

936 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

937 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

938 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

939 } 

940 

941 self._policies_callback_mapping: dict[ 

942 Union[RequestPolicy, ResponsePolicy], Callable 

943 ] = { 

944 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

945 self.get_random_primary_or_all_nodes(command_name) 

946 ], 

947 RequestPolicy.DEFAULT_KEYED: lambda command, 

948 *args: self.get_nodes_from_slot(command, *args), 

949 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

950 RequestPolicy.ALL_SHARDS: self.get_primaries, 

951 RequestPolicy.ALL_NODES: self.get_nodes, 

952 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

953 RequestPolicy.MULTI_SHARD: lambda *args, 

954 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

955 RequestPolicy.SPECIAL: self.get_special_nodes, 

956 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

957 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

958 } 

959 

960 self._policy_resolver = policy_resolver 

961 self.commands_parser = CommandsParser(self) 

962 

963 # Node where FT.AGGREGATE command is executed. 

964 self._aggregate_nodes = None 

965 self._lock = threading.RLock() 

966 

967 MaintNotificationsAbstractRedisCluster.__init__( 

968 self, maint_notifications_config, **kwargs 

969 ) 

970 

971 def __enter__(self): 

972 return self 

973 

974 def __exit__(self, exc_type, exc_value, traceback): 

975 self.close() 

976 

977 def __del__(self): 

978 try: 

979 self.close() 

980 except Exception: 

981 pass 

982 

983 def disconnect_connection_pools(self): 

984 for node in self.get_nodes(): 

985 if node.redis_connection: 

986 try: 

987 node.redis_connection.connection_pool.disconnect() 

988 except OSError: 

989 # Client was already disconnected. do nothing 

990 pass 

991 

992 def on_connect(self, connection): 

993 """ 

994 Initialize the connection, authenticate and select a database and send 

995 READONLY if it is set during object initialization. 

996 """ 

997 connection.on_connect() 

998 

999 if self.read_from_replicas or self.load_balancing_strategy: 

1000 # Sending READONLY command to server to configure connection as 

1001 # readonly. Since each cluster node may change its server type due 

1002 # to a failover, we should establish a READONLY connection 

1003 # regardless of the server type. If this is a primary connection, 

1004 # READONLY would not affect executing write commands. 

1005 connection.send_command("READONLY") 

1006 if str_if_bytes(connection.read_response()) != "OK": 

1007 raise ConnectionError("READONLY command failed") 

1008 

1009 if self.user_on_connect_func is not None: 

1010 self.user_on_connect_func(connection) 

1011 

1012 def get_redis_connection(self, node: "ClusterNode") -> Redis: 

1013 if not node.redis_connection: 

1014 with self._lock: 

1015 if not node.redis_connection: 

1016 self.nodes_manager.create_redis_connections([node]) 

1017 return node.redis_connection 

1018 

1019 def get_node(self, host=None, port=None, node_name=None): 

1020 return self.nodes_manager.get_node(host, port, node_name) 

1021 

1022 def get_primaries(self): 

1023 return self.nodes_manager.get_nodes_by_server_type(PRIMARY) 

1024 

1025 def get_replicas(self): 

1026 return self.nodes_manager.get_nodes_by_server_type(REPLICA) 

1027 

1028 def get_random_node(self): 

1029 return random.choice(list(self.nodes_manager.nodes_cache.values())) 

1030 

1031 def get_random_primary_or_all_nodes(self, command_name): 

1032 """ 

1033 Returns random primary or all nodes depends on READONLY mode. 

1034 """ 

1035 if self.read_from_replicas and command_name in READ_COMMANDS: 

1036 return self.get_random_node() 

1037 

1038 return self.get_random_primary_node() 

1039 

1040 def get_nodes(self): 

1041 return list(self.nodes_manager.nodes_cache.values()) 

1042 

1043 def get_node_from_key(self, key, replica=False): 

1044 """ 

1045 Get the node that holds the key's slot. 

1046 If replica set to True but the slot doesn't have any replicas, None is 

1047 returned. 

1048 """ 

1049 slot = self.keyslot(key) 

1050 slot_cache = self.nodes_manager.slots_cache.get(slot) 

1051 if slot_cache is None or len(slot_cache) == 0: 

1052 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') 

1053 if replica and len(self.nodes_manager.slots_cache[slot]) < 2: 

1054 return None 

1055 elif replica: 

1056 node_idx = 1 

1057 else: 

1058 # primary 

1059 node_idx = 0 

1060 

1061 return slot_cache[node_idx] 

1062 

1063 def get_default_node(self): 

1064 """ 

1065 Get the cluster's default node 

1066 """ 

1067 return self.nodes_manager.default_node 

1068 

1069 def get_nodes_from_slot(self, command: str, *args): 

1070 """ 

1071 Returns a list of nodes that hold the specified keys' slots. 

1072 """ 

1073 # get the node that holds the key's slot 

1074 slot = self.determine_slot(*args) 

1075 node = self.nodes_manager.get_node_from_slot( 

1076 slot, 

1077 self.read_from_replicas and command in READ_COMMANDS, 

1078 self.load_balancing_strategy if command in READ_COMMANDS else None, 

1079 ) 

1080 return [node] 

1081 

1082 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]: 

1083 """ 

1084 Splits the command with Multi-Shard policy, to the multiple commands 

1085 """ 

1086 keys = self._get_command_keys(*args) 

1087 commands = [] 

1088 

1089 for key in keys: 

1090 commands.append( 

1091 { 

1092 "args": (args[0], key), 

1093 "kwargs": kwargs, 

1094 } 

1095 ) 

1096 

1097 return commands 

1098 

1099 def get_special_nodes(self) -> Optional[list["ClusterNode"]]: 

1100 """ 

1101 Returns a list of nodes for commands with a special policy. 

1102 """ 

1103 if not self._aggregate_nodes: 

1104 raise RedisClusterException( 

1105 "Cannot execute FT.CURSOR commands without FT.AGGREGATE" 

1106 ) 

1107 

1108 return self._aggregate_nodes 

1109 

1110 def get_random_primary_node(self) -> "ClusterNode": 

1111 """ 

1112 Returns a random primary node 

1113 """ 

1114 return random.choice(self.get_primaries()) 

1115 

1116 def _evaluate_all_succeeded(self, res): 

1117 """ 

1118 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED 

1119 """ 

1120 first_successful_response = None 

1121 

1122 if isinstance(res, dict): 

1123 for key, value in res.items(): 

1124 if value: 

1125 if first_successful_response is None: 

1126 first_successful_response = {key: value} 

1127 else: 

1128 return {key: False} 

1129 else: 

1130 for response in res: 

1131 if response: 

1132 if first_successful_response is None: 

1133 # Dynamically resolve type 

1134 first_successful_response = type(response)(response) 

1135 else: 

1136 return type(response)(False) 

1137 

1138 return first_successful_response 

1139 

1140 def set_default_node(self, node): 

1141 """ 

1142 Set the default node of the cluster. 

1143 :param node: 'ClusterNode' 

1144 :return True if the default node was set, else False 

1145 """ 

1146 if node is None or self.get_node(node_name=node.name) is None: 

1147 return False 

1148 self.nodes_manager.default_node = node 

1149 return True 

1150 

1151 def set_retry(self, retry: Retry) -> None: 

1152 self.retry = retry 

1153 

1154 def monitor(self, target_node=None): 

1155 """ 

1156 Returns a Monitor object for the specified target node. 

1157 The default cluster node will be selected if no target node was 

1158 specified. 

1159 Monitor is useful for handling the MONITOR command to the redis server. 

1160 next_command() method returns one command from monitor 

1161 listen() method yields commands from monitor. 

1162 """ 

1163 if target_node is None: 

1164 target_node = self.get_default_node() 

1165 if target_node.redis_connection is None: 

1166 raise RedisClusterException( 

1167 f"Cluster Node {target_node.name} has no redis_connection" 

1168 ) 

1169 return target_node.redis_connection.monitor() 

1170 

1171 def pubsub(self, node=None, host=None, port=None, **kwargs): 

1172 """ 

1173 Allows passing a ClusterNode, or host&port, to get a pubsub instance 

1174 connected to the specified node 

1175 """ 

1176 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs) 

1177 

1178 def keyspace_notifications( 

1179 self, 

1180 key_prefix: Union[str, bytes, None] = None, 

1181 ignore_subscribe_messages: bool = True, 

1182 ) -> "ClusterKeyspaceNotifications": 

1183 """ 

1184 Return a :class:`~redis.keyspace_notifications.ClusterKeyspaceNotifications` 

1185 object for subscribing to keyspace and keyevent notifications across 

1186 all primary nodes in the cluster. 

1187 

1188 Note: Keyspace notifications must be enabled on all Redis cluster nodes 

1189 via the ``notify-keyspace-events`` configuration option. 

1190 

1191 Args: 

1192 key_prefix: Optional prefix to filter and strip from keys in 

1193 notifications. 

1194 ignore_subscribe_messages: If True, subscribe/unsubscribe 

1195 confirmations are not returned by 

1196 get_message/listen. 

1197 """ 

1198 from redis.keyspace_notifications import ClusterKeyspaceNotifications 

1199 

1200 return ClusterKeyspaceNotifications( 

1201 self, 

1202 key_prefix=key_prefix, 

1203 ignore_subscribe_messages=ignore_subscribe_messages, 

1204 ) 

1205 

1206 def pipeline(self, transaction=None, shard_hint=None): 

1207 """ 

1208 Cluster impl: 

1209 Pipelines do not work in cluster mode the same way they 

1210 do in normal mode. Create a clone of this object so 

1211 that simulating pipelines will work correctly. Each 

1212 command will be called directly when used and 

1213 when calling execute() will only return the result stack. 

1214 """ 

1215 if shard_hint: 

1216 raise RedisClusterException("shard_hint is deprecated in cluster mode") 

1217 

1218 return ClusterPipeline( 

1219 nodes_manager=self.nodes_manager, 

1220 commands_parser=self.commands_parser, 

1221 startup_nodes=self.nodes_manager.startup_nodes, 

1222 result_callbacks=self.result_callbacks, 

1223 cluster_response_callbacks=self.cluster_response_callbacks, 

1224 read_from_replicas=self.read_from_replicas, 

1225 load_balancing_strategy=self.load_balancing_strategy, 

1226 reinitialize_steps=self.reinitialize_steps, 

1227 retry=self.retry, 

1228 lock=self._lock, 

1229 transaction=transaction, 

1230 event_dispatcher=self._event_dispatcher, 

1231 ) 

1232 

1233 def lock( 

1234 self, 

1235 name, 

1236 timeout=None, 

1237 sleep=0.1, 

1238 blocking=True, 

1239 blocking_timeout=None, 

1240 lock_class=None, 

1241 thread_local=True, 

1242 raise_on_release_error: bool = True, 

1243 ): 

1244 """ 

1245 Return a new Lock object using key ``name`` that mimics 

1246 the behavior of threading.Lock. 

1247 

1248 If specified, ``timeout`` indicates a maximum life for the lock. 

1249 By default, it will remain locked until release() is called. 

1250 

1251 ``sleep`` indicates the amount of time to sleep per loop iteration 

1252 when the lock is in blocking mode and another client is currently 

1253 holding the lock. 

1254 

1255 ``blocking`` indicates whether calling ``acquire`` should block until 

1256 the lock has been acquired or to fail immediately, causing ``acquire`` 

1257 to return False and the lock not being acquired. Defaults to True. 

1258 Note this value can be overridden by passing a ``blocking`` 

1259 argument to ``acquire``. 

1260 

1261 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

1262 spend trying to acquire the lock. A value of ``None`` indicates 

1263 continue trying forever. ``blocking_timeout`` can be specified as a 

1264 float or integer, both representing the number of seconds to wait. 

1265 

1266 ``lock_class`` forces the specified lock implementation. Note that as 

1267 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

1268 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

1269 you have created your own custom lock class. 

1270 

1271 ``thread_local`` indicates whether the lock token is placed in 

1272 thread-local storage. By default, the token is placed in thread local 

1273 storage so that a thread only sees its token, not a token set by 

1274 another thread. Consider the following timeline: 

1275 

1276 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

1277 thread-1 sets the token to "abc" 

1278 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

1279 Lock instance. 

1280 time: 5, thread-1 has not yet completed. redis expires the lock 

1281 key. 

1282 time: 5, thread-2 acquired `my-lock` now that it's available. 

1283 thread-2 sets the token to "xyz" 

1284 time: 6, thread-1 finishes its work and calls release(). if the 

1285 token is *not* stored in thread local storage, then 

1286 thread-1 would see the token value as "xyz" and would be 

1287 able to successfully release the thread-2's lock. 

1288 

1289 ``raise_on_release_error`` indicates whether to raise an exception when 

1290 the lock is no longer owned when exiting the context manager. By default, 

1291 this is True, meaning an exception will be raised. If False, the warning 

1292 will be logged and the exception will be suppressed. 

1293 

1294 In some use cases it's necessary to disable thread local storage. For 

1295 example, if you have code where one thread acquires a lock and passes 

1296 that lock instance to a worker thread to release later. If thread 

1297 local storage isn't disabled in this case, the worker thread won't see 

1298 the token set by the thread that acquired the lock. Our assumption 

1299 is that these cases aren't common and as such default to using 

1300 thread local storage.""" 

1301 if lock_class is None: 

1302 lock_class = Lock 

1303 return lock_class( 

1304 self, 

1305 name, 

1306 timeout=timeout, 

1307 sleep=sleep, 

1308 blocking=blocking, 

1309 blocking_timeout=blocking_timeout, 

1310 thread_local=thread_local, 

1311 raise_on_release_error=raise_on_release_error, 

1312 ) 

1313 

1314 def set_response_callback(self, command, callback): 

1315 """Set a custom Response Callback""" 

1316 self.cluster_response_callbacks[command] = callback 

1317 

1318 def _determine_nodes( 

1319 self, *args, request_policy: RequestPolicy, **kwargs 

1320 ) -> List["ClusterNode"]: 

1321 """ 

1322 Determines a nodes the command should be executed on. 

1323 """ 

1324 command = args[0].upper() 

1325 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1326 command = f"{args[0]} {args[1]}".upper() 

1327 

1328 nodes_flag = kwargs.pop("nodes_flag", None) 

1329 if nodes_flag is not None: 

1330 # nodes flag passed by the user 

1331 command_flag = nodes_flag 

1332 else: 

1333 # get the nodes group for this command if it was predefined 

1334 command_flag = self.command_flags.get(command) 

1335 

1336 if command_flag in self._command_flags_mapping: 

1337 request_policy = self._command_flags_mapping[command_flag] 

1338 

1339 policy_callback = self._policies_callback_mapping[request_policy] 

1340 

1341 if request_policy == RequestPolicy.DEFAULT_KEYED: 

1342 nodes = policy_callback(command, *args) 

1343 elif request_policy == RequestPolicy.MULTI_SHARD: 

1344 nodes = policy_callback(*args, **kwargs) 

1345 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

1346 nodes = policy_callback(args[0]) 

1347 else: 

1348 nodes = policy_callback() 

1349 

1350 if args[0].lower() == "ft.aggregate": 

1351 self._aggregate_nodes = nodes 

1352 

1353 return nodes 

1354 

1355 def _should_reinitialized(self): 

1356 # To reinitialize the cluster on every MOVED error, 

1357 # set reinitialize_steps to 1. 

1358 # To avoid reinitializing the cluster on moved errors, set 

1359 # reinitialize_steps to 0. 

1360 if self.reinitialize_steps == 0: 

1361 return False 

1362 else: 

1363 return self.reinitialize_counter % self.reinitialize_steps == 0 

1364 

1365 def keyslot(self, key): 

1366 """ 

1367 Calculate keyslot for a given key. 

1368 See Keys distribution model in https://redis.io/topics/cluster-spec 

1369 """ 

1370 k = self.encoder.encode(key) 

1371 return key_slot(k) 

1372 

1373 def _get_command_keys(self, *args): 

1374 """ 

1375 Get the keys in the command. If the command has no keys in in, None is 

1376 returned. 

1377 

1378 NOTE: Due to a bug in redis<7.0, this function does not work properly 

1379 for EVAL or EVALSHA when the `numkeys` arg is 0. 

1380 - issue: https://github.com/redis/redis/issues/9493 

1381 - fix: https://github.com/redis/redis/pull/9733 

1382 

1383 So, don't use this function with EVAL or EVALSHA. 

1384 """ 

1385 redis_conn = self.get_default_node().redis_connection 

1386 return self.commands_parser.get_keys(redis_conn, *args) 

1387 

1388 def determine_slot(self, *args) -> Optional[int]: 

1389 """ 

1390 Figure out what slot to use based on args. 

1391 

1392 Raises a RedisClusterException if there's a missing key and we can't 

1393 determine what slots to map the command to; or, if the keys don't 

1394 all map to the same key slot. 

1395 """ 

1396 command = args[0] 

1397 if self.command_flags.get(command) == SLOT_ID: 

1398 # The command contains the slot ID 

1399 return args[1] 

1400 

1401 # Get the keys in the command 

1402 

1403 # CLIENT TRACKING is a special case. 

1404 # It doesn't have any keys, it needs to be sent to the provided nodes 

1405 # By default it will be sent to all nodes. 

1406 if command.upper() == "CLIENT TRACKING": 

1407 return None 

1408 

1409 # EVAL and EVALSHA are common enough that it's wasteful to go to the 

1410 # redis server to parse the keys. Besides, there is a bug in redis<7.0 

1411 # where `self._get_command_keys()` fails anyway. So, we special case 

1412 # EVAL/EVALSHA. 

1413 if command.upper() in ("EVAL", "EVALSHA"): 

1414 # command syntax: EVAL "script body" num_keys ... 

1415 if len(args) <= 2: 

1416 raise RedisClusterException(f"Invalid args in command: {args}") 

1417 num_actual_keys = int(args[2]) 

1418 eval_keys = args[3 : 3 + num_actual_keys] 

1419 # if there are 0 keys, that means the script can be run on any node 

1420 # so we can just return a random slot 

1421 if len(eval_keys) == 0: 

1422 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1423 keys = eval_keys 

1424 else: 

1425 keys = self._get_command_keys(*args) 

1426 if keys is None or len(keys) == 0: 

1427 # FCALL can call a function with 0 keys, that means the function 

1428 # can be run on any node so we can just return a random slot 

1429 if command.upper() in ("FCALL", "FCALL_RO"): 

1430 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) 

1431 raise RedisClusterException( 

1432 "No way to dispatch this command to Redis Cluster. " 

1433 "Missing key.\nYou can execute the command by specifying " 

1434 f"target nodes.\nCommand: {args}" 

1435 ) 

1436 

1437 # single key command 

1438 if len(keys) == 1: 

1439 return self.keyslot(keys[0]) 

1440 

1441 # multi-key command; we need to make sure all keys are mapped to 

1442 # the same slot 

1443 slots = {self.keyslot(key) for key in keys} 

1444 if len(slots) != 1: 

1445 raise RedisClusterException( 

1446 f"{command} - all keys must map to the same key slot" 

1447 ) 

1448 

1449 return slots.pop() 

1450 

1451 def get_encoder(self): 

1452 """ 

1453 Get the connections' encoder 

1454 """ 

1455 return self.encoder 

1456 

1457 def get_connection_kwargs(self): 

1458 """ 

1459 Get the connections' key-word arguments 

1460 """ 

1461 return self.nodes_manager.connection_kwargs 

1462 

1463 def _is_nodes_flag(self, target_nodes): 

1464 return isinstance(target_nodes, str) and target_nodes in self.node_flags 

1465 

1466 def _parse_target_nodes(self, target_nodes): 

1467 if isinstance(target_nodes, list): 

1468 nodes = target_nodes 

1469 elif isinstance(target_nodes, ClusterNode): 

1470 # Supports passing a single ClusterNode as a variable 

1471 nodes = [target_nodes] 

1472 elif isinstance(target_nodes, dict): 

1473 # Supports dictionaries of the format {node_name: node}. 

1474 # It enables to execute commands with multi nodes as follows: 

1475 # rc.cluster_save_config(rc.get_primaries()) 

1476 nodes = target_nodes.values() 

1477 else: 

1478 raise TypeError( 

1479 "target_nodes type can be one of the following: " 

1480 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

1481 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

1482 f"The passed type is {type(target_nodes)}" 

1483 ) 

1484 return nodes 

1485 

1486 def execute_command(self, *args, **kwargs): 

1487 return self._internal_execute_command(*args, **kwargs) 

1488 

1489 def _internal_execute_command(self, *args, **kwargs): 

1490 """ 

1491 Wrapper for ERRORS_ALLOW_RETRY error handling. 

1492 

1493 It will try the number of times specified by the retries property from 

1494 config option "self.retry" which defaults to 10 unless manually 

1495 configured. 

1496 

1497 If it reaches the number of times, the command will raise the exception 

1498 

1499 Key argument :target_nodes: can be passed with the following types: 

1500 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM 

1501 ClusterNode 

1502 list<ClusterNode> 

1503 dict<Any, ClusterNode> 

1504 """ 

1505 target_nodes_specified = False 

1506 is_default_node = False 

1507 target_nodes = None 

1508 passed_targets = kwargs.pop("target_nodes", None) 

1509 command_policies = self._policy_resolver.resolve(args[0].lower()) 

1510 

1511 if passed_targets is not None and not self._is_nodes_flag(passed_targets): 

1512 target_nodes = self._parse_target_nodes(passed_targets) 

1513 target_nodes_specified = True 

1514 

1515 if not command_policies and not target_nodes_specified: 

1516 command = args[0].upper() 

1517 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: 

1518 command = f"{args[0]} {args[1]}".upper() 

1519 

1520 # We only could resolve key properties if command is not 

1521 # in a list of pre-defined request policies 

1522 command_flag = self.command_flags.get(command) 

1523 if not command_flag: 

1524 # Fallback to default policy 

1525 if not self.get_default_node(): 

1526 slot = None 

1527 else: 

1528 slot = self.determine_slot(*args) 

1529 if slot is None: 

1530 command_policies = CommandPolicies() 

1531 else: 

1532 command_policies = CommandPolicies( 

1533 request_policy=RequestPolicy.DEFAULT_KEYED, 

1534 response_policy=ResponsePolicy.DEFAULT_KEYED, 

1535 ) 

1536 else: 

1537 if command_flag in self._command_flags_mapping: 

1538 command_policies = CommandPolicies( 

1539 request_policy=self._command_flags_mapping[command_flag] 

1540 ) 

1541 else: 

1542 command_policies = CommandPolicies() 

1543 elif not command_policies and target_nodes_specified: 

1544 command_policies = CommandPolicies() 

1545 

1546 # If an error that allows retrying was thrown, the nodes and slots 

1547 # cache were reinitialized. We will retry executing the command with 

1548 # the updated cluster setup only when the target nodes can be 

1549 # determined again with the new cache tables. Therefore, when target 

1550 # nodes were passed to this function, we cannot retry the command 

1551 # execution since the nodes may not be valid anymore after the tables 

1552 # were reinitialized. So in case of passed target nodes, 

1553 # retry_attempts will be set to 0. 

1554 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries() 

1555 # Add one for the first execution 

1556 execute_attempts = 1 + retry_attempts 

1557 failure_count = 0 

1558 

1559 # Start timing for observability 

1560 start_time = time.monotonic() 

1561 

1562 for _ in range(execute_attempts): 

1563 try: 

1564 res = {} 

1565 if not target_nodes_specified: 

1566 # Determine the nodes to execute the command on 

1567 target_nodes = self._determine_nodes( 

1568 *args, 

1569 request_policy=command_policies.request_policy, 

1570 nodes_flag=passed_targets, 

1571 ) 

1572 

1573 if not target_nodes: 

1574 raise RedisClusterException( 

1575 f"No targets were found to execute {args} command on" 

1576 ) 

1577 if ( 

1578 len(target_nodes) == 1 

1579 and target_nodes[0] == self.get_default_node() 

1580 ): 

1581 is_default_node = True 

1582 for node in target_nodes: 

1583 res[node.name] = self._execute_command(node, *args, **kwargs) 

1584 

1585 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED: 

1586 break 

1587 

1588 # Return the processed result 

1589 return self._process_result( 

1590 args[0], 

1591 res, 

1592 response_policy=command_policies.response_policy, 

1593 **kwargs, 

1594 ) 

1595 except Exception as e: 

1596 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: 

1597 if is_default_node: 

1598 # Replace the default cluster node 

1599 self.replace_default_node() 

1600 # The nodes and slots cache were reinitialized. 

1601 # Try again with the new cluster setup. 

1602 retry_attempts -= 1 

1603 failure_count += 1 

1604 

1605 if hasattr(e, "connection"): 

1606 self._record_command_metric( 

1607 command_name=args[0], 

1608 duration_seconds=time.monotonic() - start_time, 

1609 connection=e.connection, 

1610 error=e, 

1611 ) 

1612 

1613 self._record_error_metric( 

1614 error=e, 

1615 connection=e.connection, 

1616 retry_attempts=failure_count, 

1617 ) 

1618 continue 

1619 else: 

1620 # raise the exception 

1621 if hasattr(e, "connection"): 

1622 self._record_error_metric( 

1623 error=e, 

1624 connection=e.connection, 

1625 retry_attempts=failure_count, 

1626 is_internal=False, 

1627 ) 

1628 raise e 

1629 

1630 def _execute_command(self, target_node, *args, **kwargs): 

1631 """ 

1632 Send a command to a node in the cluster 

1633 """ 

1634 command = args[0] 

1635 redis_node = None 

1636 connection = None 

1637 redirect_addr = None 

1638 asking = False 

1639 moved = False 

1640 ttl = int(self.RedisClusterRequestTTL) 

1641 

1642 # Start timing for observability 

1643 start_time = time.monotonic() 

1644 

1645 while ttl > 0: 

1646 ttl -= 1 

1647 try: 

1648 if asking: 

1649 target_node = self.get_node(node_name=redirect_addr) 

1650 elif moved: 

1651 # MOVED occurred and the slots cache was updated, 

1652 # refresh the target node 

1653 slot = self.determine_slot(*args) 

1654 target_node = self.nodes_manager.get_node_from_slot( 

1655 slot, 

1656 self.read_from_replicas and command in READ_COMMANDS, 

1657 self.load_balancing_strategy 

1658 if command in READ_COMMANDS 

1659 else None, 

1660 ) 

1661 moved = False 

1662 

1663 redis_node = self.get_redis_connection(target_node) 

1664 connection = get_connection(redis_node) 

1665 if asking: 

1666 connection.send_command("ASKING") 

1667 redis_node.parse_response(connection, "ASKING", **kwargs) 

1668 asking = False 

1669 connection.send_command(*args, **kwargs) 

1670 response = redis_node.parse_response(connection, command, **kwargs) 

1671 

1672 # Remove keys entry, it needs only for cache. 

1673 kwargs.pop("keys", None) 

1674 

1675 if command in self.cluster_response_callbacks: 

1676 response = self.cluster_response_callbacks[command]( 

1677 response, **kwargs 

1678 ) 

1679 

1680 self._record_command_metric( 

1681 command_name=command, 

1682 duration_seconds=time.monotonic() - start_time, 

1683 connection=connection, 

1684 ) 

1685 return response 

1686 except AuthenticationError as e: 

1687 e.connection = connection if connection is not None else target_node 

1688 self._record_command_metric( 

1689 command_name=command, 

1690 duration_seconds=time.monotonic() - start_time, 

1691 connection=e.connection, 

1692 error=e, 

1693 ) 

1694 raise 

1695 except MaxConnectionsError as e: 

1696 # MaxConnectionsError indicates client-side resource exhaustion 

1697 # (too many connections in the pool), not a node failure. 

1698 # Don't treat this as a node failure - just re-raise the error 

1699 # without reinitializing the cluster. 

1700 # The connection in the error is used to report the metrics based on host and port info 

1701 # so we use the target node object which contains the host and port info 

1702 # because we did not get the connection yet 

1703 e.connection = target_node 

1704 self._record_command_metric( 

1705 command_name=command, 

1706 duration_seconds=time.monotonic() - start_time, 

1707 connection=e.connection, 

1708 error=e, 

1709 ) 

1710 raise 

1711 except (ConnectionError, TimeoutError) as e: 

1712 if is_debug_log_enabled(): 

1713 socket_address = self._extracts_socket_address(connection) 

1714 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1715 logger.debug( 

1716 f"{type(e).__name__} received for command {args_log_str}, on node {target_node.name}, " 

1717 f"and connection: {connection} using local socket address: {socket_address}, error: {e}" 

1718 ) 

1719 # this is used to report the metrics based on host and port info 

1720 e.connection = connection if connection else target_node 

1721 

1722 # ConnectionError can also be raised if we couldn't get a 

1723 # connection from the pool before timing out, so check that 

1724 # this is an actual connection before attempting to disconnect. 

1725 if connection is not None: 

1726 connection.disconnect() 

1727 

1728 # Instead of setting to None, properly handle the pool 

1729 # Get the pool safely - redis_connection could be set to None 

1730 # by another thread between the check and access 

1731 redis_conn = target_node.redis_connection 

1732 if redis_conn is not None: 

1733 pool = redis_conn.connection_pool 

1734 if pool is not None: 

1735 with pool._lock: 

1736 # take care for the active connections in the pool 

1737 pool.update_active_connections_for_reconnect() 

1738 # disconnect all free connections 

1739 pool.disconnect_free_connections() 

1740 

1741 # Move the failed node to the end of the cached nodes list 

1742 self.nodes_manager.move_node_to_end_of_cached_nodes(target_node.name) 

1743 

1744 # DON'T set redis_connection = None - keep the pool for reuse 

1745 # provide the name of the failed node so we can try it last 

1746 self.nodes_manager.initialize(last_failed_node_name=target_node.name) 

1747 self._record_command_metric( 

1748 command_name=command, 

1749 duration_seconds=time.monotonic() - start_time, 

1750 connection=e.connection, 

1751 error=e, 

1752 ) 

1753 raise e 

1754 except MovedError as e: 

1755 if is_debug_log_enabled(): 

1756 socket_address = self._extracts_socket_address(connection) 

1757 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1758 logger.debug( 

1759 f"MOVED error received for command {args_log_str}, on node {target_node.name}, " 

1760 f"and connection: {connection} using local socket address: {socket_address}, error: {e}" 

1761 ) 

1762 # First, we will try to patch the slots/nodes cache with the 

1763 # redirected node output and try again. If MovedError exceeds 

1764 # 'reinitialize_steps' number of times, we will force 

1765 # reinitializing the tables, and then try again. 

1766 # 'reinitialize_steps' counter will increase faster when 

1767 # the same client object is shared between multiple threads. To 

1768 # reduce the frequency you can set this variable in the 

1769 # RedisCluster constructor. 

1770 self.reinitialize_counter += 1 

1771 if self._should_reinitialized(): 

1772 # during this call all connections are closed or marked for disconnect, 

1773 # so we don't need to disconnect the changed node's connections 

1774 self.nodes_manager.initialize( 

1775 additional_startup_nodes_info=[(e.host, e.port)] 

1776 ) 

1777 # Reset the counter 

1778 self.reinitialize_counter = 0 

1779 else: 

1780 self.nodes_manager.move_slot(e) 

1781 moved = True 

1782 self._record_command_metric( 

1783 command_name=command, 

1784 duration_seconds=time.monotonic() - start_time, 

1785 connection=connection, 

1786 error=e, 

1787 ) 

1788 self._record_error_metric( 

1789 error=e, 

1790 connection=connection, 

1791 ) 

1792 except TryAgainError as e: 

1793 if is_debug_log_enabled(): 

1794 socket_address = self._extracts_socket_address(connection) 

1795 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1796 logger.debug( 

1797 f"TRYAGAIN error received for command {args_log_str}, on node {target_node.name}, " 

1798 f"and connection: {connection} using local socket address: {socket_address}" 

1799 ) 

1800 if ttl < self.RedisClusterRequestTTL / 2: 

1801 time.sleep(0.05) 

1802 

1803 self._record_command_metric( 

1804 command_name=command, 

1805 duration_seconds=time.monotonic() - start_time, 

1806 connection=connection, 

1807 error=e, 

1808 ) 

1809 self._record_error_metric( 

1810 error=e, 

1811 connection=connection, 

1812 ) 

1813 except AskError as e: 

1814 if is_debug_log_enabled(): 

1815 socket_address = self._extracts_socket_address(connection) 

1816 args_log_str = truncate_text(" ".join(map(safe_str, args))) 

1817 logger.debug( 

1818 f"ASK error received for command {args_log_str}, on node {target_node.name}, " 

1819 f"and connection: {connection} using local socket address: {socket_address}, error: {e}" 

1820 ) 

1821 redirect_addr = get_node_name(host=e.host, port=e.port) 

1822 asking = True 

1823 

1824 self._record_command_metric( 

1825 command_name=command, 

1826 duration_seconds=time.monotonic() - start_time, 

1827 connection=connection, 

1828 error=e, 

1829 ) 

1830 self._record_error_metric( 

1831 error=e, 

1832 connection=connection, 

1833 ) 

1834 except (ClusterDownError, SlotNotCoveredError) as e: 

1835 # ClusterDownError can occur during a failover and to get 

1836 # self-healed, we will try to reinitialize the cluster layout 

1837 # and retry executing the command 

1838 

1839 # SlotNotCoveredError can occur when the cluster is not fully 

1840 # initialized or can be temporary issue. 

1841 # We will try to reinitialize the cluster topology 

1842 # and retry executing the command 

1843 

1844 time.sleep(0.25) 

1845 self.nodes_manager.initialize() 

1846 

1847 # if we have a connection, use it, otherwise use the target node 

1848 # object which contains the host and port info 

1849 # this is used to report the metrics based on host and port info 

1850 e.connection = connection if connection else target_node 

1851 self._record_command_metric( 

1852 command_name=command, 

1853 duration_seconds=time.monotonic() - start_time, 

1854 connection=e.connection, 

1855 error=e, 

1856 ) 

1857 raise 

1858 except ResponseError as e: 

1859 # this is used to report the metrics based on host and port info 

1860 # ResponseError typically happens after get_connection() succeeds, 

1861 # so connection should be available 

1862 e.connection = connection if connection else target_node 

1863 self._record_command_metric( 

1864 command_name=command, 

1865 duration_seconds=time.monotonic() - start_time, 

1866 connection=e.connection, 

1867 error=e, 

1868 ) 

1869 raise 

1870 except Exception as e: 

1871 if connection: 

1872 connection.disconnect() 

1873 

1874 # if we have a connection, use it, otherwise use the target node 

1875 # object which contains the host and port info 

1876 # this is used to report the metrics based on host and port info 

1877 e.connection = connection if connection else target_node 

1878 self._record_command_metric( 

1879 command_name=command, 

1880 duration_seconds=time.monotonic() - start_time, 

1881 connection=e.connection, 

1882 error=e, 

1883 ) 

1884 raise e 

1885 finally: 

1886 if connection is not None: 

1887 redis_node.connection_pool.release(connection) 

1888 

1889 e = ClusterError("TTL exhausted.") 

1890 # In this case we should have an active connection. 

1891 # If we are here, we have received many MOVED or ASK errors and finally exhausted the TTL. 

1892 # This means that we used an active connection to read from the socket. 

1893 # This is used to report metrics based on the host and port information. 

1894 e.connection = connection 

1895 self._record_command_metric( 

1896 command_name=command, 

1897 duration_seconds=time.monotonic() - start_time, 

1898 connection=connection, 

1899 error=e, 

1900 ) 

1901 raise e 

1902 

1903 def _record_command_metric( 

1904 self, 

1905 command_name: str, 

1906 duration_seconds: float, 

1907 connection: Connection, 

1908 error=None, 

1909 ): 

1910 """ 

1911 Records operation duration metric directly. 

1912 """ 

1913 host = connection.host if connection else "unknown" 

1914 port = connection.port if connection else 0 

1915 db = str(connection.db) if connection and hasattr(connection, "db") else "0" 

1916 

1917 record_operation_duration( 

1918 command_name=command_name, 

1919 duration_seconds=duration_seconds, 

1920 server_address=host, 

1921 server_port=port, 

1922 db_namespace=db, 

1923 error=error, 

1924 ) 

1925 

1926 def _record_error_metric( 

1927 self, 

1928 error: Exception, 

1929 connection: Connection, 

1930 is_internal: bool = True, 

1931 retry_attempts: Optional[int] = None, 

1932 ): 

1933 """ 

1934 Records error count metric directly. 

1935 """ 

1936 record_error_count( 

1937 server_address=connection.host, 

1938 server_port=connection.port, 

1939 network_peer_address=connection.host, 

1940 network_peer_port=connection.port, 

1941 error_type=error, 

1942 retry_attempts=retry_attempts if retry_attempts is not None else 0, 

1943 is_internal=is_internal, 

1944 ) 

1945 

1946 def _extracts_socket_address( 

1947 self, connection: Optional[Connection] 

1948 ) -> Optional[int]: 

1949 if connection is None: 

1950 return None 

1951 try: 

1952 socket_address = ( 

1953 connection._sock.getsockname() if connection._sock else None 

1954 ) 

1955 socket_address = socket_address[1] if socket_address else None 

1956 except (AttributeError, OSError): 

1957 pass 

1958 return socket_address 

1959 

1960 def close(self) -> None: 

1961 try: 

1962 with self._lock: 

1963 if self.nodes_manager: 

1964 self.nodes_manager.close() 

1965 except AttributeError: 

1966 # RedisCluster's __init__ can fail before nodes_manager is set 

1967 pass 

1968 

1969 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs): 

1970 """ 

1971 Process the result of the executed command. 

1972 The function would return a dict or a single value. 

1973 

1974 :type command: str 

1975 :type res: dict 

1976 

1977 `res` should be in the following format: 

1978 Dict<node_name, command_result> 

1979 """ 

1980 if command in self.result_callbacks: 

1981 res = self.result_callbacks[command](command, res, **kwargs) 

1982 elif len(res) == 1: 

1983 # When we execute the command on a single node, we can 

1984 # remove the dictionary and return a single response 

1985 res = list(res.values())[0] 

1986 

1987 return self._policies_callback_mapping[response_policy](res) 

1988 

1989 def load_external_module(self, funcname, func): 

1990 """ 

1991 This function can be used to add externally defined redis modules, 

1992 and their namespaces to the redis client. 

1993 

1994 ``funcname`` - A string containing the name of the function to create 

1995 ``func`` - The function, being added to this class. 

1996 """ 

1997 setattr(self, funcname, func) 

1998 

1999 def transaction(self, func, *watches, **kwargs): 

2000 """ 

2001 Convenience method for executing the callable `func` as a transaction 

2002 while watching all keys specified in `watches`. The 'func' callable 

2003 should expect a single argument which is a Pipeline object. 

2004 """ 

2005 shard_hint = kwargs.pop("shard_hint", None) 

2006 value_from_callable = kwargs.pop("value_from_callable", False) 

2007 watch_delay = kwargs.pop("watch_delay", None) 

2008 with self.pipeline(True, shard_hint) as pipe: 

2009 while True: 

2010 try: 

2011 if watches: 

2012 pipe.watch(*watches) 

2013 func_value = func(pipe) 

2014 exec_value = pipe.execute() 

2015 return func_value if value_from_callable else exec_value 

2016 except WatchError: 

2017 if watch_delay is not None and watch_delay > 0: 

2018 time.sleep(watch_delay) 

2019 continue 

2020 

2021 

2022class ClusterNode: 

2023 def __init__(self, host, port, server_type=None, redis_connection=None): 

2024 if host == "localhost": 

2025 host = socket.gethostbyname(host) 

2026 

2027 self.host = host 

2028 self.port = port 

2029 self.name = get_node_name(host, port) 

2030 self.server_type = server_type 

2031 self.redis_connection = redis_connection 

2032 

2033 def __repr__(self): 

2034 return ( 

2035 f"[host={self.host}," 

2036 f"port={self.port}," 

2037 f"name={self.name}," 

2038 f"server_type={self.server_type}," 

2039 f"redis_connection={self.redis_connection}]" 

2040 ) 

2041 

2042 def __eq__(self, obj): 

2043 return isinstance(obj, ClusterNode) and obj.name == self.name 

2044 

2045 def __hash__(self): 

2046 return hash(self.name) 

2047 

2048 

2049class LoadBalancingStrategy(Enum): 

2050 ROUND_ROBIN = "round_robin" 

2051 ROUND_ROBIN_REPLICAS = "round_robin_replicas" 

2052 RANDOM = "random" 

2053 RANDOM_REPLICA = "random_replica" 

2054 

2055 

2056class LoadBalancer: 

2057 """ 

2058 Round-Robin Load Balancing 

2059 """ 

2060 

2061 def __init__(self, start_index: int = 0) -> None: 

2062 self.primary_to_idx: dict[str, int] = {} 

2063 self.start_index: int = start_index 

2064 self._lock: threading.Lock = threading.Lock() 

2065 

2066 def get_server_index( 

2067 self, 

2068 primary: str, 

2069 list_size: int, 

2070 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN, 

2071 ) -> int: 

2072 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA: 

2073 return self._get_random_server_index( 

2074 list_size, 

2075 replicas_only=True, 

2076 ) 

2077 elif load_balancing_strategy == LoadBalancingStrategy.RANDOM: 

2078 return self._get_random_server_index( 

2079 list_size, 

2080 replicas_only=False, 

2081 ) 

2082 else: 

2083 return self._get_round_robin_index( 

2084 primary, 

2085 list_size, 

2086 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, 

2087 ) 

2088 

2089 def reset(self) -> None: 

2090 with self._lock: 

2091 self.primary_to_idx.clear() 

2092 

2093 def _get_random_server_index(self, list_size: int, replicas_only: bool) -> int: 

2094 return random.randint(1 if replicas_only else 0, list_size - 1) 

2095 

2096 def _get_round_robin_index( 

2097 self, primary: str, list_size: int, replicas_only: bool 

2098 ) -> int: 

2099 with self._lock: 

2100 server_index = self.primary_to_idx.setdefault(primary, self.start_index) 

2101 if replicas_only and server_index == 0: 

2102 # skip the primary node index 

2103 server_index = 1 

2104 # Update the index for the next round 

2105 self.primary_to_idx[primary] = (server_index + 1) % list_size 

2106 return server_index 

2107 

2108 

2109class NodesManager: 

2110 def __init__( 

2111 self, 

2112 startup_nodes: list[ClusterNode], 

2113 from_url=False, 

2114 require_full_coverage=False, 

2115 lock: Optional[threading.RLock] = None, 

2116 dynamic_startup_nodes=True, 

2117 connection_pool_class=ConnectionPool, 

2118 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, 

2119 cache: Optional[CacheInterface] = None, 

2120 cache_config: Optional[CacheConfig] = None, 

2121 cache_factory: Optional[CacheFactoryInterface] = None, 

2122 event_dispatcher: Optional[EventDispatcher] = None, 

2123 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

2124 **kwargs, 

2125 ): 

2126 self.nodes_cache: dict[str, ClusterNode] = {} 

2127 self.slots_cache: dict[int, list[ClusterNode]] = {} 

2128 self.startup_nodes: dict[str, ClusterNode] = {n.name: n for n in startup_nodes} 

2129 self.default_node: Optional[ClusterNode] = None 

2130 self._epoch: int = 0 

2131 self.from_url = from_url 

2132 self._require_full_coverage = require_full_coverage 

2133 self._dynamic_startup_nodes = dynamic_startup_nodes 

2134 self.connection_pool_class = connection_pool_class 

2135 self.address_remap = address_remap 

2136 self._cache: Optional[CacheInterface] = None 

2137 if cache: 

2138 self._cache = cache 

2139 elif cache_factory is not None: 

2140 self._cache = cache_factory.get_cache() 

2141 elif cache_config is not None: 

2142 self._cache = CacheFactory(cache_config).get_cache() 

2143 self.connection_kwargs = kwargs 

2144 self.read_load_balancer = LoadBalancer() 

2145 

2146 # nodes_cache / slots_cache / startup_nodes / default_node are protected by _lock 

2147 if lock is None: 

2148 self._lock = threading.RLock() 

2149 else: 

2150 self._lock = lock 

2151 

2152 # initialize holds _initialization_lock to dedup multiple calls to reinitialize; 

2153 # note that if we hold both _lock and _initialization_lock, we _must_ acquire 

2154 # _initialization_lock first (ie: to have a consistent order) to avoid deadlock. 

2155 self._initialization_lock: threading.RLock = threading.RLock() 

2156 

2157 if event_dispatcher is None: 

2158 self._event_dispatcher = EventDispatcher() 

2159 else: 

2160 self._event_dispatcher = event_dispatcher 

2161 self._credential_provider = self.connection_kwargs.get( 

2162 "credential_provider", None 

2163 ) 

2164 self.maint_notifications_config = maint_notifications_config 

2165 

2166 self.initialize() 

2167 

2168 def get_node( 

2169 self, 

2170 host: Optional[str] = None, 

2171 port: Optional[int] = None, 

2172 node_name: Optional[str] = None, 

2173 ) -> Optional[ClusterNode]: 

2174 """ 

2175 Get the requested node from the cluster's nodes. 

2176 nodes. 

2177 :return: ClusterNode if the node exists, else None 

2178 """ 

2179 if host and port: 

2180 # the user passed host and port 

2181 if host == "localhost": 

2182 host = socket.gethostbyname(host) 

2183 with self._lock: 

2184 return self.nodes_cache.get(get_node_name(host=host, port=port)) 

2185 elif node_name: 

2186 with self._lock: 

2187 return self.nodes_cache.get(node_name) 

2188 else: 

2189 return None 

2190 

2191 def move_slot(self, e: Union[AskError, MovedError]): 

2192 """ 

2193 Update the slot's node with the redirected one 

2194 """ 

2195 node_changed = False 

2196 with self._lock: 

2197 redirected_node = self.get_node(host=e.host, port=e.port) 

2198 if redirected_node is not None: 

2199 # The node already exists 

2200 if redirected_node.server_type is not PRIMARY: 

2201 # Update the node's server type 

2202 redirected_node.server_type = PRIMARY 

2203 else: 

2204 # This is a new node, we will add it to the nodes cache 

2205 redirected_node = ClusterNode(e.host, e.port, PRIMARY) 

2206 self.nodes_cache[redirected_node.name] = redirected_node 

2207 

2208 slot_nodes = self.slots_cache[e.slot_id] 

2209 if redirected_node not in slot_nodes: 

2210 # The new slot owner is a new server, or a server from a different 

2211 # shard. We need to remove all current nodes from the slot's list 

2212 # (including replications) and add just the new node. 

2213 self.slots_cache[e.slot_id] = [redirected_node] 

2214 node_changed = True 

2215 elif redirected_node is not slot_nodes[0]: 

2216 # The MOVED error resulted from a failover, and the new slot owner 

2217 # had previously been a replica. 

2218 old_primary = slot_nodes[0] 

2219 # Update the old primary to be a replica and add it to the end of 

2220 # the slot's node list 

2221 old_primary.server_type = REPLICA 

2222 slot_nodes.append(old_primary) 

2223 # Remove the old replica, which is now a primary, from the slot's 

2224 # node list 

2225 slot_nodes.remove(redirected_node) 

2226 # Override the old primary with the new one 

2227 slot_nodes[0] = redirected_node 

2228 if self.default_node == old_primary: 

2229 # Update the default node with the new primary 

2230 self.default_node = redirected_node 

2231 node_changed = True 

2232 # else: circular MOVED to current primary -> no-op 

2233 # Dispatch outside the lock so listeners can acquire their own locks 

2234 # without risk of deadlock. Skipped on the no-op branch to avoid 

2235 # needless reconciliation walks under MOVED storms. A listener must 

2236 # not break slots-cache refresh; log and continue so a single buggy 

2237 # listener cannot starve the rest. 

2238 if node_changed: 

2239 try: 

2240 self._event_dispatcher.dispatch(AfterSlotsCacheRefreshEvent()) 

2241 except Exception as exc: 

2242 # Don't shadow the method parameter ``e``: ``except as`` binds 

2243 # the listener exception in the function scope and ``del``s 

2244 # the name on block exit (PEP 3134), which would also wipe 

2245 # out the original AskError/MovedError parameter. 

2246 logger.exception( 

2247 "listener raised during slots-cache refresh: %s: %s", 

2248 type(exc).__name__, 

2249 exc, 

2250 ) 

2251 

2252 @deprecated_args( 

2253 args_to_warn=["server_type"], 

2254 reason=( 

2255 "In case you need select some load balancing strategy " 

2256 "that will use replicas, please set it through 'load_balancing_strategy'" 

2257 ), 

2258 version="5.3.0", 

2259 ) 

2260 def get_node_from_slot( 

2261 self, 

2262 slot: int, 

2263 read_from_replicas: bool = False, 

2264 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

2265 server_type: Optional[Literal["primary", "replica"]] = None, 

2266 ) -> ClusterNode: 

2267 """ 

2268 Gets a node that servers this hash slot 

2269 """ 

2270 

2271 if read_from_replicas is True and load_balancing_strategy is None: 

2272 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN 

2273 

2274 with self._lock: 

2275 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: 

2276 raise SlotNotCoveredError( 

2277 f'Slot "{slot}" not covered by the cluster. ' 

2278 + f'"require_full_coverage={self._require_full_coverage}"' 

2279 ) 

2280 

2281 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: 

2282 # get the server index using the strategy defined in load_balancing_strategy 

2283 primary_name = self.slots_cache[slot][0].name 

2284 node_idx = self.read_load_balancer.get_server_index( 

2285 primary_name, len(self.slots_cache[slot]), load_balancing_strategy 

2286 ) 

2287 elif ( 

2288 server_type is None 

2289 or server_type == PRIMARY 

2290 or len(self.slots_cache[slot]) == 1 

2291 ): 

2292 # return a primary 

2293 node_idx = 0 

2294 else: 

2295 # return a replica 

2296 # randomly choose one of the replicas 

2297 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) 

2298 

2299 return self.slots_cache[slot][node_idx] 

2300 

2301 def get_nodes_by_server_type(self, server_type: Literal["primary", "replica"]): 

2302 """ 

2303 Get all nodes with the specified server type 

2304 :param server_type: 'primary' or 'replica' 

2305 :return: list of ClusterNode 

2306 """ 

2307 with self._lock: 

2308 return [ 

2309 node 

2310 for node in self.nodes_cache.values() 

2311 if node.server_type == server_type 

2312 ] 

2313 

2314 @deprecated_function( 

2315 reason="This method is not used anymore internally. The startup nodes are populated automatically.", 

2316 version="7.0.2", 

2317 ) 

2318 def populate_startup_nodes(self, nodes): 

2319 """ 

2320 Populate all startup nodes and filters out any duplicates 

2321 """ 

2322 with self._lock: 

2323 for n in nodes: 

2324 self.startup_nodes[n.name] = n 

2325 

2326 def move_node_to_end_of_cached_nodes(self, node_name: str) -> None: 

2327 """ 

2328 Move a failing node to the end of startup_nodes and nodes_cache so it's 

2329 tried last during reinitialization and when selecting the default node. 

2330 If the node is not in the respective list, nothing is done. 

2331 """ 

2332 # Move in startup_nodes 

2333 if node_name in self.startup_nodes and len(self.startup_nodes) > 1: 

2334 node = self.startup_nodes.pop(node_name) 

2335 self.startup_nodes[node_name] = node # Re-insert at end 

2336 

2337 # Move in nodes_cache - this affects get_nodes_by_server_type ordering 

2338 # which is used to select the default_node during initialize() 

2339 if node_name in self.nodes_cache and len(self.nodes_cache) > 1: 

2340 node = self.nodes_cache.pop(node_name) 

2341 self.nodes_cache[node_name] = node # Re-insert at end 

2342 

2343 def check_slots_coverage(self, slots_cache): 

2344 # Validate if all slots are covered or if we should try next 

2345 # startup node 

2346 for i in range(0, REDIS_CLUSTER_HASH_SLOTS): 

2347 if i not in slots_cache: 

2348 return False 

2349 return True 

2350 

2351 def create_redis_connections(self, nodes): 

2352 """ 

2353 This function will create a redis connection to all nodes in :nodes: 

2354 """ 

2355 connection_pools = [] 

2356 for node in nodes: 

2357 if node.redis_connection is None: 

2358 node.redis_connection = self.create_redis_node( 

2359 host=node.host, 

2360 port=node.port, 

2361 maint_notifications_config=self.maint_notifications_config, 

2362 **self.connection_kwargs, 

2363 ) 

2364 connection_pools.append(node.redis_connection.connection_pool) 

2365 

2366 self._event_dispatcher.dispatch( 

2367 AfterPooledConnectionsInstantiationEvent( 

2368 connection_pools, ClientType.SYNC, self._credential_provider 

2369 ) 

2370 ) 

2371 

2372 def create_redis_node( 

2373 self, 

2374 host, 

2375 port, 

2376 **kwargs, 

2377 ): 

2378 # We are configuring the connection pool not to retry 

2379 # connections on lower level clients to avoid retrying 

2380 # connections to nodes that are not reachable 

2381 # and to avoid blocking the connection pool. 

2382 # The only error that will have some handling in the lower 

2383 # level clients is ConnectionError which will trigger disconnection 

2384 # of the socket. 

2385 # The retries will be handled on cluster client level 

2386 # where we will have proper handling of the cluster topology 

2387 node_retry_config = Retry( 

2388 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) 

2389 ) 

2390 

2391 if self.from_url: 

2392 # Create a redis node with a custom connection pool 

2393 kwargs.update({"host": host}) 

2394 kwargs.update({"port": port}) 

2395 kwargs.update({"cache": self._cache}) 

2396 kwargs.update({"retry": node_retry_config}) 

2397 r = Redis(connection_pool=self.connection_pool_class(**kwargs)) 

2398 else: 

2399 r = Redis( 

2400 host=host, 

2401 port=port, 

2402 cache=self._cache, 

2403 retry=node_retry_config, 

2404 **kwargs, 

2405 ) 

2406 return r 

2407 

2408 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): 

2409 node_name = get_node_name(host, port) 

2410 # check if we already have this node in the tmp_nodes_cache 

2411 target_node = tmp_nodes_cache.get(node_name) 

2412 if target_node is None: 

2413 # before creating a new cluster node, check if the cluster node already 

2414 # exists in the current nodes cache and has a valid connection so we can 

2415 # reuse it 

2416 redis_connection: Optional[Redis] = None 

2417 with self._lock: 

2418 previous_node = self.nodes_cache.get(node_name) 

2419 if previous_node: 

2420 redis_connection = previous_node.redis_connection 

2421 # don't update the old ClusterNode, so we don't update its role 

2422 # outside of the lock 

2423 target_node = ClusterNode(host, port, role, redis_connection) 

2424 # add this node to the nodes cache 

2425 tmp_nodes_cache[target_node.name] = target_node 

2426 

2427 return target_node 

2428 

2429 def _get_epoch(self) -> int: 

2430 """ 

2431 Get the current epoch value. This method exists primarily to allow 

2432 tests to mock the epoch fetch and control race condition timing. 

2433 """ 

2434 with self._lock: 

2435 return self._epoch 

2436 

2437 def initialize( 

2438 self, 

2439 additional_startup_nodes_info: Optional[List[Tuple[str, int]]] = None, 

2440 disconnect_startup_nodes_pools: bool = True, 

2441 last_failed_node_name: Optional[str] = None, 

2442 ): 

2443 """ 

2444 Initializes the nodes cache, slots cache and redis connections. 

2445 :startup_nodes: 

2446 Responsible for discovering other nodes in the cluster 

2447 :disconnect_startup_nodes_pools: 

2448 Whether to disconnect the connection pool of the startup nodes 

2449 after the initialization is complete. This is useful when the 

2450 startup nodes are not part of the cluster and we want to avoid 

2451 keeping the connection open. 

2452 :additional_startup_nodes_info: 

2453 Additional nodes to add temporarily to the startup nodes. 

2454 The additional nodes will be used just in the process of extraction of the slots 

2455 and nodes information from the cluster. 

2456 This is useful when we want to add new nodes to the cluster 

2457 and initialize the client 

2458 with them. 

2459 The format of the list is a list of tuples, where each tuple contains 

2460 the host and port of the node. 

2461 :last_failed_node_name: 

2462 Name of the node that just failed and should be tried only after 

2463 other startup and additional startup nodes during this refresh. 

2464 """ 

2465 self.reset() 

2466 tmp_nodes_cache = {} 

2467 tmp_slots = {} 

2468 disagreements = [] 

2469 startup_nodes_reachable = False 

2470 fully_covered = False 

2471 kwargs = self.connection_kwargs 

2472 exception = None 

2473 epoch = self._get_epoch() 

2474 if additional_startup_nodes_info is None: 

2475 additional_startup_nodes_info = [] 

2476 

2477 with self._initialization_lock: 

2478 with self._lock: 

2479 if epoch != self._epoch: 

2480 # another thread has already re-initialized the nodes; don't 

2481 # bother running again 

2482 return 

2483 

2484 with self._lock: 

2485 startup_nodes = list(self.startup_nodes.values()) 

2486 deferred_failed_nodes = [] 

2487 if last_failed_node_name is not None: 

2488 for index, node in enumerate(startup_nodes): 

2489 if node.name == last_failed_node_name: 

2490 deferred_failed_nodes.append(startup_nodes.pop(index)) 

2491 break 

2492 if len(startup_nodes) > 1: 

2493 # Vary which startup node is queried first so clients do not 

2494 # all reinitialize through the same node. 

2495 random.shuffle(startup_nodes) 

2496 

2497 additional_startup_nodes = [ 

2498 ClusterNode(host, port) for host, port in additional_startup_nodes_info 

2499 ] 

2500 if last_failed_node_name is not None: 

2501 for index, node in enumerate(additional_startup_nodes): 

2502 if node.name == last_failed_node_name: 

2503 if not deferred_failed_nodes: 

2504 deferred_failed_nodes.append(node) 

2505 additional_startup_nodes.pop(index) 

2506 break 

2507 if is_debug_log_enabled(): 

2508 logger.debug( 

2509 f"Topology refresh: using additional nodes: {[node.name for node in additional_startup_nodes]}; " 

2510 f"and startup nodes: {[node.name for node in startup_nodes]}" 

2511 ) 

2512 

2513 for startup_node in chain( 

2514 startup_nodes, 

2515 additional_startup_nodes, 

2516 deferred_failed_nodes, 

2517 ): 

2518 try: 

2519 if startup_node.redis_connection: 

2520 r = startup_node.redis_connection 

2521 

2522 else: 

2523 # Create a new Redis connection 

2524 if is_debug_log_enabled(): 

2525 socket_timeout = kwargs.get("socket_timeout", "not set") 

2526 socket_connect_timeout = kwargs.get( 

2527 "socket_connect_timeout", "not set" 

2528 ) 

2529 maint_enabled = ( 

2530 self.maint_notifications_config.enabled 

2531 if self.maint_notifications_config 

2532 else False 

2533 ) 

2534 logger.debug( 

2535 "Topology refresh: Creating new Redis connection to " 

2536 f"{startup_node.host}:{startup_node.port}; " 

2537 f"with socket_timeout: {socket_timeout}, and " 

2538 f"socket_connect_timeout: {socket_connect_timeout}, " 

2539 "and maint_notifications enabled: " 

2540 f"{maint_enabled}" 

2541 ) 

2542 r = self.create_redis_node( 

2543 startup_node.host, 

2544 startup_node.port, 

2545 maint_notifications_config=self.maint_notifications_config, 

2546 **kwargs, 

2547 ) 

2548 if startup_node in self.startup_nodes.values(): 

2549 self.startup_nodes[startup_node.name].redis_connection = r 

2550 else: 

2551 startup_node.redis_connection = r 

2552 try: 

2553 # Make sure cluster mode is enabled on this node 

2554 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) 

2555 if disconnect_startup_nodes_pools: 

2556 with r.connection_pool._lock: 

2557 # take care to clear connections before we move on 

2558 # mark all active connections for reconnect - they will be 

2559 # reconnected on next use, but will allow current in flight commands to complete first 

2560 r.connection_pool.update_active_connections_for_reconnect() 

2561 # Needed to clear READONLY state when it is no longer applicable 

2562 r.connection_pool.disconnect_free_connections() 

2563 except ResponseError: 

2564 raise RedisClusterException( 

2565 "Cluster mode is not enabled on this node" 

2566 ) 

2567 startup_nodes_reachable = True 

2568 except Exception as e: 

2569 # Try the next startup node. 

2570 # The exception is saved and raised only if we have no more nodes. 

2571 exception = e 

2572 continue 

2573 

2574 # CLUSTER SLOTS command results in the following output: 

2575 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] 

2576 # where each node contains the following list: [IP, port, node_id] 

2577 # Therefore, cluster_slots[0][2][0] will be the IP address of the 

2578 # primary node of the first slot section. 

2579 # If there's only one server in the cluster, its ``host`` is '' 

2580 # Fix it to the host in startup_nodes 

2581 if ( 

2582 len(cluster_slots) == 1 

2583 and len(cluster_slots[0][2][0]) == 0 

2584 and len(self.startup_nodes) == 1 

2585 ): 

2586 cluster_slots[0][2][0] = startup_node.host 

2587 

2588 for slot in cluster_slots: 

2589 primary_node = slot[2] 

2590 host = str_if_bytes(primary_node[0]) 

2591 if host == "": 

2592 host = startup_node.host 

2593 port = int(primary_node[1]) 

2594 host, port = self.remap_host_port(host, port) 

2595 

2596 nodes_for_slot = [] 

2597 

2598 target_node = self._get_or_create_cluster_node( 

2599 host, port, PRIMARY, tmp_nodes_cache 

2600 ) 

2601 nodes_for_slot.append(target_node) 

2602 

2603 replica_nodes = slot[3:] 

2604 for replica_node in replica_nodes: 

2605 host = str_if_bytes(replica_node[0]) 

2606 port = int(replica_node[1]) 

2607 host, port = self.remap_host_port(host, port) 

2608 target_replica_node = self._get_or_create_cluster_node( 

2609 host, port, REPLICA, tmp_nodes_cache 

2610 ) 

2611 nodes_for_slot.append(target_replica_node) 

2612 

2613 for i in range(int(slot[0]), int(slot[1]) + 1): 

2614 if i not in tmp_slots: 

2615 tmp_slots[i] = nodes_for_slot 

2616 else: 

2617 # Validate that 2 nodes want to use the same slot cache 

2618 # setup 

2619 tmp_slot = tmp_slots[i][0] 

2620 if tmp_slot.name != target_node.name: 

2621 disagreements.append( 

2622 f"{tmp_slot.name} vs {target_node.name} on slot: {i}" 

2623 ) 

2624 

2625 if len(disagreements) > 5: 

2626 raise RedisClusterException( 

2627 f"startup_nodes could not agree on a valid " 

2628 f"slots cache: {', '.join(disagreements)}" 

2629 ) 

2630 

2631 fully_covered = self.check_slots_coverage(tmp_slots) 

2632 if fully_covered: 

2633 # Don't need to continue to the next startup node if all 

2634 # slots are covered 

2635 break 

2636 

2637 if not startup_nodes_reachable: 

2638 raise RedisClusterException( 

2639 f"Redis Cluster cannot be connected. Please provide at least " 

2640 f"one reachable node: {str(exception)}" 

2641 ) from exception 

2642 

2643 # Create Redis connections to all nodes 

2644 self.create_redis_connections(list(tmp_nodes_cache.values())) 

2645 

2646 # Check if the slots are not fully covered 

2647 if not fully_covered and self._require_full_coverage: 

2648 # Despite the requirement that the slots be covered, there 

2649 # isn't a full coverage 

2650 raise RedisClusterException( 

2651 f"All slots are not covered after query all startup_nodes. " 

2652 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} " 

2653 f"covered..." 

2654 ) 

2655 

2656 # Set the tmp variables to the real variables 

2657 with self._lock: 

2658 self.nodes_cache = tmp_nodes_cache 

2659 self.slots_cache = tmp_slots 

2660 # Set the default node 

2661 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] 

2662 if self._dynamic_startup_nodes: 

2663 # Populate the startup nodes with all discovered nodes 

2664 self.startup_nodes = tmp_nodes_cache 

2665 # Increment the epoch to signal that initialization has completed 

2666 self._epoch += 1 

2667 # Dispatch so listeners (e.g. ClusterPubSub) can reconcile per-node 

2668 # state after slot ownership may have changed. A listener must not 

2669 # break slots-cache refresh; log and continue so a single buggy 

2670 # listener cannot starve the rest. 

2671 try: 

2672 self._event_dispatcher.dispatch(AfterSlotsCacheRefreshEvent()) 

2673 except Exception as e: 

2674 logger.exception( 

2675 "listener raised during slots-cache refresh: %s: %s", 

2676 type(e).__name__, 

2677 e, 

2678 ) 

2679 

2680 def close(self) -> None: 

2681 with self._lock: 

2682 self.default_node = None 

2683 nodes = tuple(self.nodes_cache.values()) 

2684 for node in nodes: 

2685 if node.redis_connection: 

2686 node.redis_connection.close() 

2687 

2688 def reset(self): 

2689 try: 

2690 self.read_load_balancer.reset() 

2691 except TypeError: 

2692 # The read_load_balancer is None, do nothing 

2693 pass 

2694 

2695 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: 

2696 """ 

2697 Remap the host and port returned from the cluster to a different 

2698 internal value. Useful if the client is not connecting directly 

2699 to the cluster. 

2700 """ 

2701 if self.address_remap: 

2702 return self.address_remap((host, port)) 

2703 return host, port 

2704 

2705 def find_connection_owner(self, connection: Connection) -> Optional[ClusterNode]: 

2706 node_name = get_node_name(connection.host, connection.port) 

2707 with self._lock: 

2708 for node in tuple(self.nodes_cache.values()): 

2709 if node.redis_connection: 

2710 conn_args = node.redis_connection.connection_pool.connection_kwargs 

2711 if node_name == get_node_name( 

2712 conn_args.get("host"), conn_args.get("port") 

2713 ): 

2714 return node 

2715 return None 

2716 

2717 

2718def _unregister_slots_cache_listener( 

2719 dispatcher_ref: "weakref.ref[EventDispatcher]", 

2720 listener: EventListenerInterface, 

2721 event_type: Type[object], 

2722) -> None: 

2723 # Module-level finalizer callback. Kept free of strong references to the 

2724 # owning ClusterPubSub so attaching it via weakref.finalize does not 

2725 # extend the pubsub's lifetime. 

2726 dispatcher = dispatcher_ref() 

2727 if dispatcher is not None: 

2728 dispatcher.unregister_listeners({event_type: [listener]}) 

2729 

2730 

2731class ClusterPubSubSlotsCacheListener(EventListenerInterface): 

2732 """ 

2733 Listener that forwards AfterSlotsCacheRefreshEvent to a ClusterPubSub. 

2734 

2735 Holds a weak reference to the pubsub so it does not keep the instance 

2736 alive. Deterministic cleanup of the dispatcher's strong reference to this 

2737 listener is performed by a ``weakref.finalize`` attached to the owning 

2738 ClusterPubSub in ``ClusterPubSub.__init__``. 

2739 """ 

2740 

2741 def __init__(self, pubsub: "ClusterPubSub") -> None: 

2742 self._pubsub_ref: "weakref.ref[ClusterPubSub]" = weakref.ref(pubsub) 

2743 

2744 def listen(self, event: object) -> None: 

2745 pubsub = self._pubsub_ref() 

2746 if pubsub is None: 

2747 # Race window between pubsub GC and the finalizer running; safe 

2748 # no-op, finalizer will remove this listener shortly. 

2749 return 

2750 try: 

2751 pubsub.on_slots_changed() 

2752 except Exception as e: 

2753 # Listeners must not break slots-cache refresh; log and continue so 

2754 # a single buggy pubsub cannot starve the rest. 

2755 logger.exception( 

2756 "pubsub %r raised during slots-cache change: %s: %s", 

2757 pubsub, 

2758 type(e).__name__, 

2759 e, 

2760 ) 

2761 

2762 

2763class ClusterPubSub(PubSub): 

2764 """ 

2765 Wrapper for PubSub class. 

2766 

2767 IMPORTANT: before using ClusterPubSub, read about the known limitations 

2768 with pubsub in Cluster mode and learn how to workaround them: 

2769 https://redis.readthedocs.io/en/stable/clustering.html#known-pubsub-limitations 

2770 """ 

2771 

2772 def __init__( 

2773 self, 

2774 redis_cluster, 

2775 node=None, 

2776 host=None, 

2777 port=None, 

2778 push_handler_func=None, 

2779 event_dispatcher: Optional["EventDispatcher"] = None, 

2780 **kwargs, 

2781 ): 

2782 """ 

2783 When a pubsub instance is created without specifying a node, a single 

2784 node will be transparently chosen for the pubsub connection on the 

2785 first command execution. The node will be determined by: 

2786 1. Hashing the channel name in the request to find its keyslot 

2787 2. Selecting a node that handles the keyslot: If read_from_replicas is 

2788 set to true or load_balancing_strategy is set, a replica can be selected. 

2789 

2790 :type redis_cluster: RedisCluster 

2791 :type node: ClusterNode 

2792 :type host: str 

2793 :type port: int 

2794 """ 

2795 self.node = None 

2796 self.set_pubsub_node(redis_cluster, node, host, port) 

2797 connection_pool = ( 

2798 None 

2799 if self.node is None 

2800 else redis_cluster.get_redis_connection(self.node).connection_pool 

2801 ) 

2802 self.cluster = redis_cluster 

2803 self.node_pubsub_mapping = {} 

2804 # Reverse index: shard channel (normalized) -> owning node.name. Used to 

2805 # route sunsubscribe calls and reconcile subscriptions after slot 

2806 # migration / failover. 

2807 self._shard_channel_to_node: dict = {} 

2808 # Dedicated lock for shard-subscription bookkeeping. Distinct from 

2809 # PubSub.self._lock (which serializes wire I/O on the cluster-level 

2810 # connection used by aclose / send_command / regular subscribe) so 

2811 # that reconciliation cannot starve those unrelated paths during 

2812 # long per-channel migrations. 

2813 self._shard_state_lock: threading.RLock = threading.RLock() 

2814 # Worker executor for off-loading slot-migration reconciliation from 

2815 # the dispatch call site (mirrors async's asyncio.create_task model so 

2816 # the thread that triggered MovedError / topology refresh is not 

2817 # blocked on per-channel sunsubscribe / ssubscribe network I/O). 

2818 # Lazy-created on first on_slots_changed() to avoid a persistent 

2819 # worker thread for pubsubs that never see a slot migration. 

2820 # Initialized before super().__init__() because PubSub.__init__ calls 

2821 # self.reset(), which resolves to ClusterPubSub.reset() and reads 

2822 # these attributes. 

2823 self._reconcile_executor: Optional[ThreadPoolExecutor] = None 

2824 # In-flight reconciliation futures; tracked so reset() can cancel 

2825 # pending work and so exceptions surface via a done-callback. 

2826 self._reconcile_futures: Set[Future] = set() 

2827 self._pubsubs_generator = self._pubsubs_generator() 

2828 if event_dispatcher is None: 

2829 self._event_dispatcher = EventDispatcher() 

2830 else: 

2831 self._event_dispatcher = event_dispatcher 

2832 super().__init__( 

2833 connection_pool=connection_pool, 

2834 encoder=redis_cluster.encoder, 

2835 push_handler_func=push_handler_func, 

2836 event_dispatcher=self._event_dispatcher, 

2837 **kwargs, 

2838 ) 

2839 # Subscribe to slots-cache change notifications so shard subscriptions 

2840 # can be reconciled automatically after topology refreshes. 

2841 nm_dispatcher = redis_cluster.nodes_manager._event_dispatcher 

2842 self._slots_cache_listener = ClusterPubSubSlotsCacheListener(self) 

2843 nm_dispatcher.register_listeners( 

2844 {AfterSlotsCacheRefreshEvent: [self._slots_cache_listener]} 

2845 ) 

2846 # Deterministic GC-time cleanup so short-lived pubsubs do not leak 

2847 # listeners in the dispatcher when no slots-refresh event ever fires. 

2848 weakref.finalize( 

2849 self, 

2850 _unregister_slots_cache_listener, 

2851 weakref.ref(nm_dispatcher), 

2852 self._slots_cache_listener, 

2853 AfterSlotsCacheRefreshEvent, 

2854 ) 

2855 

2856 def set_pubsub_node(self, cluster, node=None, host=None, port=None): 

2857 """ 

2858 The pubsub node will be set according to the passed node, host and port 

2859 When none of the node, host, or port are specified - the node is set 

2860 to None and will be determined by the keyslot of the channel in the 

2861 first command to be executed. 

2862 RedisClusterException will be thrown if the passed node does not exist 

2863 in the cluster. 

2864 If host is passed without port, or vice versa, a DataError will be 

2865 thrown. 

2866 :type cluster: RedisCluster 

2867 :type node: ClusterNode 

2868 :type host: str 

2869 :type port: int 

2870 """ 

2871 if node is not None: 

2872 # node is passed by the user 

2873 self._raise_on_invalid_node(cluster, node, node.host, node.port) 

2874 pubsub_node = node 

2875 elif host is not None and port is not None: 

2876 # host and port passed by the user 

2877 node = cluster.get_node(host=host, port=port) 

2878 self._raise_on_invalid_node(cluster, node, host, port) 

2879 pubsub_node = node 

2880 elif any([host, port]) is True: 

2881 # only 'host' or 'port' passed 

2882 raise DataError("Passing a host requires passing a port, and vice versa") 

2883 else: 

2884 # nothing passed by the user. set node to None 

2885 pubsub_node = None 

2886 

2887 self.node = pubsub_node 

2888 

2889 def get_pubsub_node(self): 

2890 """ 

2891 Get the node that is being used as the pubsub connection 

2892 """ 

2893 return self.node 

2894 

2895 def _raise_on_invalid_node(self, redis_cluster, node, host, port): 

2896 """ 

2897 Raise a RedisClusterException if the node is None or doesn't exist in 

2898 the cluster. 

2899 """ 

2900 if node is None or redis_cluster.get_node(node_name=node.name) is None: 

2901 raise RedisClusterException( 

2902 f"Node {host}:{port} doesn't exist in the cluster" 

2903 ) 

2904 

2905 def execute_command(self, *args): 

2906 """ 

2907 Execute a subscribe/unsubscribe command. 

2908 

2909 Taken code from redis-py and tweak to make it work within a cluster. 

2910 """ 

2911 # NOTE: don't parse the response in this function -- it could pull a 

2912 # legitimate message off the stack if the connection is already 

2913 # subscribed to one or more channels 

2914 

2915 if self.connection is None: 

2916 if self.connection_pool is None: 

2917 if len(args) > 1: 

2918 # Hash the first channel and get one of the nodes holding 

2919 # this slot 

2920 channel = args[1] 

2921 slot = self.cluster.keyslot(channel) 

2922 node = self.cluster.nodes_manager.get_node_from_slot( 

2923 slot, 

2924 self.cluster.read_from_replicas, 

2925 self.cluster.load_balancing_strategy, 

2926 ) 

2927 else: 

2928 # Get a random node 

2929 node = self.cluster.get_random_node() 

2930 self.node = node 

2931 redis_connection = self.cluster.get_redis_connection(node) 

2932 self.connection_pool = redis_connection.connection_pool 

2933 self.connection = self.connection_pool.get_connection() 

2934 # register a callback that re-subscribes to any channels we 

2935 # were listening to when we were disconnected 

2936 self.connection.register_connect_callback(self.on_connect) 

2937 if self.push_handler_func is not None: 

2938 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

2939 self._event_dispatcher.dispatch( 

2940 AfterPubSubConnectionInstantiationEvent( 

2941 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

2942 ) 

2943 ) 

2944 connection = self.connection 

2945 self._execute(connection, connection.send_command, *args) 

2946 

2947 def _resubscribe_shard_channels(self) -> None: 

2948 # A single node can own multiple slot ranges, so a batched 

2949 # ``SSUBSCRIBE`` covering every tracked channel would be rejected by 

2950 # Redis with a ``CROSSSLOT`` error. Group by hash slot and emit one 

2951 # ``SSUBSCRIBE`` per slot. 

2952 by_slot: defaultdict[int, dict] = defaultdict(dict) 

2953 for k, v in self.shard_channels.items(): 

2954 by_slot[key_slot(self.encoder.encode(k))][k] = v 

2955 for subscriptions in by_slot.values(): 

2956 self._resubscribe(subscriptions, self.ssubscribe) 

2957 

2958 def _get_node_pubsub(self, node): 

2959 try: 

2960 return self.node_pubsub_mapping[node.name] 

2961 except KeyError: 

2962 redis_connection = self.cluster.get_redis_connection(node) 

2963 pubsub = redis_connection.pubsub( 

2964 push_handler_func=self.push_handler_func, 

2965 ) 

2966 # Replay shard subscriptions on reconnect with slot-aware grouping 

2967 # so that channels spanning multiple slots owned by this node do 

2968 # not trigger a CROSSSLOT error. 

2969 pubsub._resubscribe_shard_channels = MethodType( 

2970 ClusterPubSub._resubscribe_shard_channels, pubsub 

2971 ) 

2972 self.node_pubsub_mapping[node.name] = pubsub 

2973 return pubsub 

2974 

2975 def _find_node_name_for_pubsub(self, pubsub): 

2976 for node_name, node_pubsub in self.node_pubsub_mapping.items(): 

2977 if node_pubsub is pubsub: 

2978 return node_name 

2979 return None 

2980 

2981 def _sharded_message_generator(self, timeout=0.0): 

2982 for _ in range(len(self.node_pubsub_mapping)): 

2983 pubsub = next(self._pubsubs_generator) 

2984 # Don't pass ignore_subscribe_messages here - let get_sharded_message 

2985 # handle the filtering after processing subscription state changes 

2986 message = pubsub.get_message( 

2987 ignore_subscribe_messages=False, timeout=timeout 

2988 ) 

2989 if message is not None: 

2990 return pubsub, message 

2991 return None, None 

2992 

2993 def _pubsubs_generator(self): 

2994 while True: 

2995 current_nodes = list(self.node_pubsub_mapping.values()) 

2996 if not current_nodes: 

2997 return # Avoid infinite loop when no subscriptions exist 

2998 yield from current_nodes 

2999 

3000 def get_sharded_message( 

3001 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None 

3002 ): 

3003 if target_node: 

3004 # Use .get(): migration-driven cleanup in the sunsubscribe branch 

3005 # below and reset() both remove entries from node_pubsub_mapping, 

3006 # so a caller polling with target_node may race the cleanup. Match 

3007 # the async counterpart's None-handling rather than raising 

3008 # KeyError. None pubsub falls through to "no message available". 

3009 pubsub = self.node_pubsub_mapping.get(target_node.name) 

3010 if pubsub is not None: 

3011 # Don't pass ignore_subscribe_messages here - let get_sharded_message 

3012 # handle the filtering after processing subscription state changes 

3013 message = pubsub.get_message( 

3014 ignore_subscribe_messages=False, timeout=timeout 

3015 ) 

3016 else: 

3017 message = None 

3018 else: 

3019 pubsub, message = self._sharded_message_generator(timeout=timeout) 

3020 if message is None: 

3021 return None 

3022 # Only sunsubscribe mutates cluster-level shard state; bypassing the 

3023 # lock on the data-message hot path keeps smessage delivery from 

3024 # competing with the reconciliation worker for _shard_state_lock. 

3025 if str_if_bytes(message["type"]) == "sunsubscribe": 

3026 # Serialize state mutation against reinitialize_shard_subscriptions 

3027 # (worker thread). The blocking get_message above intentionally 

3028 # runs outside the lock so reconciliation is not stalled by long 

3029 # polls. 

3030 with self._shard_state_lock: 

3031 if message["channel"] in self.pending_unsubscribe_shard_channels: 

3032 # User-initiated sunsubscribe: drop from cluster-level tracking. 

3033 self.pending_unsubscribe_shard_channels.remove(message["channel"]) 

3034 self.shard_channels.pop(message["channel"], None) 

3035 self._shard_channel_to_node.pop(message["channel"], None) 

3036 # Drop the per-node pubsub that delivered the confirmation once 

3037 # it no longer holds any shard subscriptions, regardless of 

3038 # whether the sunsubscribe was user-initiated or driven by 

3039 # slot-migration reconciliation (_migrate_shard_channel, which 

3040 # intentionally does not add the channel to 

3041 # pending_unsubscribe_shard_channels). This releases the 

3042 # dedicated connection that would otherwise linger. 

3043 # Identifying the receiving pubsub directly (rather than via 

3044 # the cluster's current slot map) is required after slot 

3045 # migration, where the channel's owner is no longer the node 

3046 # that received our original SSUBSCRIBE. 

3047 if pubsub is not None and not pubsub.subscribed: 

3048 name = self._find_node_name_for_pubsub(pubsub) 

3049 if name is not None: 

3050 try: 

3051 pubsub.reset() 

3052 except Exception: 

3053 pass 

3054 self.node_pubsub_mapping.pop(name, None) 

3055 # Mirror PubSub.handle_message: the empty-check belongs in the 

3056 # unsubscribe branch since that is the only path that can 

3057 # reduce shard_channels here. 

3058 if not self.channels and not self.patterns and not self.shard_channels: 

3059 self.subscribed_event.clear() 

3060 # Only suppress subscribe/unsubscribe messages, not data messages (smessage) 

3061 if str_if_bytes(message["type"]) in ("ssubscribe", "sunsubscribe"): 

3062 if self.ignore_subscribe_messages or ignore_subscribe_messages: 

3063 return None 

3064 return message 

3065 

3066 def ssubscribe( 

3067 self, *args: ChannelT | Subscription, **kwargs: PubSubHandler 

3068 ) -> None: 

3069 """ 

3070 Subscribe to shard channels. 

3071 

3072 Channels supplied as keyword arguments expect a channel name as the key 

3073 and a callable as the value. ``Subscription`` objects can also be 

3074 supplied positionally with an optional handler. 

3075 """ 

3076 s_channels = parse_pubsub_subscriptions(args, kwargs) 

3077 # Serialize against reinitialize_shard_subscriptions (worker thread) 

3078 # so the reverse index, shard_channels, and node_pubsub_mapping are 

3079 # not mutated concurrently. 

3080 with self._shard_state_lock: 

3081 for s_channel, handler in s_channels.items(): 

3082 node = self.cluster.get_node_from_key(s_channel) 

3083 if not node: 

3084 continue 

3085 # Lazy re-route: if this channel is already tracked against a 

3086 # different node (e.g. after a slot migration), migrate it now 

3087 # so the caller's intent is applied on the current owner. 

3088 normalized_key = next(iter(self._normalize_keys({s_channel: None}))) 

3089 old_name = self._shard_channel_to_node.get(normalized_key) 

3090 if old_name and old_name != node.name: 

3091 # Match PubSub.ssubscribe() dict.update() semantics: the 

3092 # caller's newly supplied handler (including None) always 

3093 # overrides any previously registered handler. 

3094 self._migrate_shard_channel( 

3095 normalized_key, 

3096 handler, 

3097 old_name, 

3098 node, 

3099 ) 

3100 continue 

3101 pubsub = self._get_node_pubsub(node) 

3102 if handler: 

3103 pubsub.ssubscribe(Subscription(s_channel, handler)) 

3104 else: 

3105 pubsub.ssubscribe(s_channel) 

3106 self.shard_channels.update(pubsub.shard_channels) 

3107 self._shard_channel_to_node[normalized_key] = node.name 

3108 self.pending_unsubscribe_shard_channels.difference_update( 

3109 self._normalize_keys({s_channel: None}) 

3110 ) 

3111 if pubsub.subscribed and not self.subscribed: 

3112 self.subscribed_event.set() 

3113 self.health_check_response_counter = 0 

3114 

3115 def sunsubscribe(self, *args): 

3116 if args: 

3117 args = list_or_args(args[0], args[1:]) 

3118 else: 

3119 args = list(self.shard_channels) 

3120 

3121 # Serialize against reinitialize_shard_subscriptions: the reverse 

3122 # index and node_pubsub_mapping must not change between the lookup 

3123 # and the per-node sunsubscribe call below. 

3124 with self._shard_state_lock: 

3125 for s_channel in args: 

3126 normalized_key = next(iter(self._normalize_keys({s_channel: None}))) 

3127 # Route via the reverse index so we unsubscribe on the node 

3128 # that actually holds the subscription. After a slot migration 

3129 # the cluster's current owner may no longer be that node. 

3130 name = self._shard_channel_to_node.get(normalized_key) 

3131 if name and name in self.node_pubsub_mapping: 

3132 p = self.node_pubsub_mapping[name] 

3133 else: 

3134 node = self.cluster.get_node_from_key(s_channel) 

3135 if not node or node.name not in self.node_pubsub_mapping: 

3136 continue 

3137 p = self.node_pubsub_mapping[node.name] 

3138 p.sunsubscribe(s_channel) 

3139 self.pending_unsubscribe_shard_channels.update( 

3140 p.pending_unsubscribe_shard_channels 

3141 ) 

3142 

3143 def reinitialize_shard_subscriptions(self): 

3144 """ 

3145 Reconcile per-node shard subscriptions against the cluster's current 

3146 slot ownership map. For each tracked shard channel whose owning node 

3147 has changed (e.g. after CLUSTER SETSLOT / failover), sunsubscribe on 

3148 the old node's pubsub and ssubscribe on the new owner's pubsub, 

3149 preserving any registered handler. 

3150 """ 

3151 uncovered: list = [] 

3152 made_progress = False 

3153 first_migrate_error: Optional[BaseException] = None 

3154 with self._shard_state_lock: 

3155 for channel, handler in list(self.shard_channels.items()): 

3156 try: 

3157 new_node = self.cluster.get_node_from_key(channel) 

3158 except SlotNotCoveredError: 

3159 # Slot is transiently uncovered (mid-migration / partial 

3160 # topology refresh). Defer this channel so coverable 

3161 # siblings still reconcile this pass; we surface the 

3162 # error below so the caller (and logs) know not every 

3163 # channel was reconciled. Retry happens on the next 

3164 # slots-cache change notification. 

3165 uncovered.append(channel) 

3166 continue 

3167 old_name = self._shard_channel_to_node.get(channel) 

3168 if old_name == new_node.name: 

3169 continue 

3170 try: 

3171 self._migrate_shard_channel(channel, handler, old_name, new_node) 

3172 made_progress = True 

3173 except (ConnectionError, TimeoutError, OSError) as e: 

3174 # Transient connectivity error while subscribing on the 

3175 # new owner (or unsubscribing on the old owner if its 

3176 # handler chose to re-raise). Do not abort reconciliation 

3177 # for sibling channels: _shard_channel_to_node was not 

3178 # advanced for this channel, so the next slots-cache 

3179 # change notification will retry it. 

3180 logger.warning( 

3181 "shard channel %r migration deferred: %s: %s", 

3182 channel, 

3183 type(e).__name__, 

3184 e, 

3185 ) 

3186 if first_migrate_error is None: 

3187 first_migrate_error = e 

3188 continue 

3189 # Garbage-collect per-node pubsubs that no longer hold any 

3190 # subscription so their connections are released. 

3191 for name, pubsub in list(self.node_pubsub_mapping.items()): 

3192 if not pubsub.subscribed: 

3193 try: 

3194 pubsub.reset() 

3195 except Exception: 

3196 pass 

3197 self.node_pubsub_mapping.pop(name, None) 

3198 if uncovered: 

3199 # Surface the uncovered channels so the caller (and observer 

3200 # notification path) knows reconciliation was incomplete. All 

3201 # coverable siblings have already been migrated above. 

3202 raise SlotNotCoveredError( 

3203 f"{len(uncovered)} shard channel(s) left unreconciled; " 

3204 f"slot(s) not covered by the cluster: {uncovered!r}" 

3205 ) 

3206 if first_migrate_error is not None and not made_progress: 

3207 # Every migration attempted in this pass failed transiently and 

3208 # nothing else made progress. Re-raise the first caught error 

3209 # (typically the root cause; later failures are often downstream 

3210 # symptoms of the same unreachable node) so the worker's done- 

3211 # callback surfaces a single representative failure through the 

3212 # same logger channel used for SlotNotCoveredError. Per-channel 

3213 # WARNINGs above preserve the full forensic detail. 

3214 raise first_migrate_error 

3215 

3216 def _migrate_shard_channel(self, channel, handler, old_name, new_node): 

3217 # Detach from the old per-node pubsub, best-effort: the old node may 

3218 # already be unreachable during migration / failover. 

3219 if old_name and old_name in self.node_pubsub_mapping: 

3220 old_pubsub = self.node_pubsub_mapping[old_name] 

3221 try: 

3222 old_pubsub.sunsubscribe(channel) 

3223 except (ConnectionError, TimeoutError, OSError): 

3224 # redis-py's Connection has already called ``disconnect()`` 

3225 # before raising (see Connection.read_response / 

3226 # send_packed_command with ``disconnect_on_error=True``), 

3227 # so ``old_pubsub``'s dedicated socket is gone. Two cases: 

3228 # 

3229 # 1. The old node is no longer in the cluster topology 

3230 # (e.g. removed by failover / topology refresh): no 

3231 # reconnect target exists, so ``old_pubsub.subscribed`` 

3232 # would stay True forever and the end-of-pass GC block 

3233 # would skip it. Drop it eagerly so the round-robin 

3234 # generator does not keep yielding a dead pubsub that 

3235 # produces periodic errors from ``get_sharded_message``. 

3236 # 2. The old node is still known (transiently slow / 

3237 # unreachable): ``PubSub._execute`` auto-reconnects and 

3238 # ``on_connect`` re-subscribes to remaining channels, 

3239 # so other subscriptions on the same pubsub recover 

3240 # naturally. Leave it alone. 

3241 if self.cluster.get_node(node_name=old_name) is None: 

3242 try: 

3243 old_pubsub.reset() 

3244 except Exception: 

3245 pass 

3246 self.node_pubsub_mapping.pop(old_name, None) 

3247 # Attach to the new per-node pubsub, preserving the handler. Decode to 

3248 # a text key only when we must pass it as a kwarg (handler present). 

3249 new_pubsub = self._get_node_pubsub(new_node) 

3250 if handler: 

3251 new_pubsub.ssubscribe(Subscription(channel, handler)) 

3252 else: 

3253 new_pubsub.ssubscribe(channel) 

3254 self.shard_channels.update(new_pubsub.shard_channels) 

3255 normalized_key = next(iter(self._normalize_keys({channel: None}))) 

3256 self._shard_channel_to_node[normalized_key] = new_node.name 

3257 self.pending_unsubscribe_shard_channels.difference_update( 

3258 self._normalize_keys({channel: None}) 

3259 ) 

3260 if new_pubsub.subscribed and not self.subscribed: 

3261 self.subscribed_event.set() 

3262 self.health_check_response_counter = 0 

3263 

3264 def on_slots_changed(self): 

3265 # Observer hook invoked by NodesManager after a slots-cache refresh. 

3266 # Schedule reconciliation on a dedicated worker thread so the caller 

3267 # (typically MovedError handling in _execute_command or the topology 

3268 # refresh thread in initialize()) is not blocked on the network I/O 

3269 # performed by reinitialize_shard_subscriptions. Mirrors the async 

3270 # path's asyncio.create_task model. No-op when there are no shard 

3271 # subscriptions to reconcile. 

3272 if not self.shard_channels: 

3273 return 

3274 # Serialize lazy executor creation and submission against concurrent 

3275 # on_slots_changed calls (EventDispatcher releases its lock before 

3276 # invoking listeners, so two MovedError-handling threads can land 

3277 # here at once) and against reset() which tears the executor down. 

3278 # Without this, two threads could each create a ThreadPoolExecutor 

3279 # and one would be orphaned (leaking its worker thread); a reset() 

3280 # interleaved between create and submit() could also raise 

3281 # RuntimeError("cannot schedule new futures after shutdown"). 

3282 with self._shard_state_lock: 

3283 if self._reconcile_executor is None: 

3284 self._reconcile_executor = ThreadPoolExecutor( 

3285 max_workers=1, 

3286 thread_name_prefix="redis-cluster-pubsub-reconcile", 

3287 ) 

3288 future = self._reconcile_executor.submit( 

3289 self.reinitialize_shard_subscriptions 

3290 ) 

3291 self._reconcile_futures.add(future) 

3292 future.add_done_callback(self._discard_reconcile_future) 

3293 # Consume the future's exception (if any) so it is not silently lost. 

3294 # reinitialize_shard_subscriptions surfaces SlotNotCoveredError when 

3295 # a slot is still transiently uncovered; route it through the same 

3296 # logger channel as the async path for consistent observability. 

3297 future.add_done_callback(self._log_reconcile_future_exception) 

3298 

3299 def _discard_reconcile_future(self, future: "Future") -> None: 

3300 # Done-callback fires on the worker thread. Take _shard_state_lock so 

3301 # the discard observes the same mutual-exclusion discipline as the 

3302 # add() / clear() sites; without it the set mutation is correct only 

3303 # because of CPython's GIL and would race under free-threaded builds. 

3304 with self._shard_state_lock: 

3305 self._reconcile_futures.discard(future) 

3306 

3307 @staticmethod 

3308 def _log_reconcile_future_exception(future: "Future") -> None: 

3309 if future.cancelled(): 

3310 return 

3311 exc = future.exception() 

3312 if exc is not None: 

3313 logger.error( 

3314 "shard subscription reconciliation failed: %r", exc, exc_info=exc 

3315 ) 

3316 

3317 def reset(self) -> None: 

3318 # Hold _shard_state_lock across the entire teardown so it observes 

3319 # the same mutual-exclusion discipline as ssubscribe / sunsubscribe / 

3320 # get_sharded_message / reinitialize_shard_subscriptions, which all 

3321 # mutate shard_channels, _shard_channel_to_node, and 

3322 # node_pubsub_mapping under this lock. Without it, super().reset() 

3323 # rebinds shard_channels and pending_unsubscribe_shard_channels in 

3324 # parallel with a concurrent user-thread mutation, silently dropping 

3325 # subscription intent. cancel_futures drops queued reconciliation 

3326 # work; the currently-running task (if any) is already serialized 

3327 # against us by this same lock - shutdown(wait=False) avoids waiting 

3328 # on the worker thread's join, not on its critical section. 

3329 with self._shard_state_lock: 

3330 if self._reconcile_executor is not None: 

3331 self._reconcile_executor.shutdown(wait=False, cancel_futures=True) 

3332 self._reconcile_executor = None 

3333 self._reconcile_futures.clear() 

3334 # Tear down per-node pubsubs (parity with async aclose) so they 

3335 # don't leak their dedicated connections and don't replay stale 

3336 # shard_channels via PubSub.on_connect on a subsequent reconnect. 

3337 # Errors are swallowed because reset() is also a fallback path 

3338 # from __del__; we cannot let one buggy per-node pubsub mask the 

3339 # rest of the teardown. 

3340 for pubsub in self.node_pubsub_mapping.values(): 

3341 try: 

3342 pubsub.reset() 

3343 except Exception: 

3344 pass 

3345 # Drop the now-dead per-node pubsubs from the mapping so the 

3346 # round-robin in _pubsubs_generator / _sharded_message_generator 

3347 # cannot yield them between teardown and re-subscription. 

3348 self.node_pubsub_mapping.clear() 

3349 # _pubsubs_generator captures node_pubsub_mapping.values() into 

3350 # a local list inside ``yield from``; clearing the mapping does 

3351 # not reach references already held by that captured snapshot, 

3352 # so a generator suspended mid-yield-from would still surface 

3353 # the now-reset() per-node pubsubs after re-subscription. 

3354 # Recreate it to drop the captured list. type(self) bypasses 

3355 # the instance-level self-shadow established at __init__ 

3356 # (self._pubsubs_generator = self._pubsubs_generator()). 

3357 self._pubsubs_generator = type(self)._pubsubs_generator(self) 

3358 super().reset() 

3359 self._shard_channel_to_node = {} 

3360 

3361 def get_redis_connection(self): 

3362 """ 

3363 Get the Redis connection of the pubsub connected node. 

3364 """ 

3365 if self.node is not None: 

3366 return self.node.redis_connection 

3367 

3368 def disconnect(self): 

3369 """ 

3370 Disconnect the pubsub connection. 

3371 """ 

3372 if self.connection: 

3373 self.connection.disconnect() 

3374 for pubsub in self.node_pubsub_mapping.values(): 

3375 if pubsub.connection: 

3376 pubsub.connection.disconnect() 

3377 

3378 

3379class ClusterPipeline(RedisCluster): 

3380 """ 

3381 Support for Redis pipeline 

3382 in cluster mode 

3383 """ 

3384 

3385 ERRORS_ALLOW_RETRY = ( 

3386 ConnectionError, 

3387 TimeoutError, 

3388 MovedError, 

3389 AskError, 

3390 TryAgainError, 

3391 ) 

3392 

3393 NO_SLOTS_COMMANDS = {"UNWATCH"} 

3394 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

3395 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

3396 

3397 @deprecated_args( 

3398 args_to_warn=[ 

3399 "cluster_error_retry_attempts", 

3400 ], 

3401 reason="Please configure the 'retry' object instead", 

3402 version="6.0.0", 

3403 ) 

3404 def __init__( 

3405 self, 

3406 nodes_manager: "NodesManager", 

3407 commands_parser: "CommandsParser", 

3408 result_callbacks: Optional[Dict[str, Callable]] = None, 

3409 cluster_response_callbacks: Optional[Dict[str, Callable]] = None, 

3410 startup_nodes: Optional[List["ClusterNode"]] = None, 

3411 read_from_replicas: bool = False, 

3412 load_balancing_strategy: Optional[LoadBalancingStrategy] = None, 

3413 cluster_error_retry_attempts: int = DEFAULT_RETRY_COUNT, 

3414 reinitialize_steps: int = 5, 

3415 retry: Optional[Retry] = None, 

3416 lock=None, 

3417 transaction=False, 

3418 policy_resolver: PolicyResolver = StaticPolicyResolver(), 

3419 event_dispatcher: Optional["EventDispatcher"] = None, 

3420 **kwargs, 

3421 ): 

3422 """ """ 

3423 self.command_stack = [] 

3424 self.nodes_manager = nodes_manager 

3425 self.commands_parser = commands_parser 

3426 self.refresh_table_asap = False 

3427 self.result_callbacks = ( 

3428 result_callbacks or self.__class__.RESULT_CALLBACKS.copy() 

3429 ) 

3430 self.startup_nodes = startup_nodes if startup_nodes else [] 

3431 self.read_from_replicas = read_from_replicas 

3432 self.load_balancing_strategy = load_balancing_strategy 

3433 self.command_flags = self.__class__.COMMAND_FLAGS.copy() 

3434 self.cluster_response_callbacks = cluster_response_callbacks 

3435 self.reinitialize_counter = 0 

3436 self.reinitialize_steps = reinitialize_steps 

3437 if retry is not None: 

3438 self.retry = retry 

3439 else: 

3440 self.retry = Retry( 

3441 backoff=ExponentialWithJitterBackoff( 

3442 base=DEFAULT_RETRY_BASE, cap=DEFAULT_RETRY_CAP 

3443 ), 

3444 retries=cluster_error_retry_attempts, 

3445 ) 

3446 

3447 self.encoder = Encoder( 

3448 kwargs.get("encoding", "utf-8"), 

3449 kwargs.get("encoding_errors", "strict"), 

3450 kwargs.get("decode_responses", False), 

3451 ) 

3452 if lock is None: 

3453 lock = threading.RLock() 

3454 self._lock = lock 

3455 self.parent_execute_command = super().execute_command 

3456 self._execution_strategy: ExecutionStrategy = ( 

3457 PipelineStrategy(self) if not transaction else TransactionStrategy(self) 

3458 ) 

3459 

3460 # For backward compatibility, mapping from existing policies to new one 

3461 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { 

3462 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, 

3463 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, 

3464 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, 

3465 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, 

3466 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, 

3467 SLOT_ID: RequestPolicy.DEFAULT_KEYED, 

3468 } 

3469 

3470 self._policies_callback_mapping: dict[ 

3471 Union[RequestPolicy, ResponsePolicy], Callable 

3472 ] = { 

3473 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [ 

3474 self.get_random_primary_or_all_nodes(command_name) 

3475 ], 

3476 RequestPolicy.DEFAULT_KEYED: lambda command, 

3477 *args: self.get_nodes_from_slot(command, *args), 

3478 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], 

3479 RequestPolicy.ALL_SHARDS: self.get_primaries, 

3480 RequestPolicy.ALL_NODES: self.get_nodes, 

3481 RequestPolicy.ALL_REPLICAS: self.get_replicas, 

3482 RequestPolicy.MULTI_SHARD: lambda *args, 

3483 **kwargs: self._split_multi_shard_command(*args, **kwargs), 

3484 RequestPolicy.SPECIAL: self.get_special_nodes, 

3485 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, 

3486 ResponsePolicy.DEFAULT_KEYED: lambda res: res, 

3487 } 

3488 

3489 self._policy_resolver = policy_resolver 

3490 

3491 if event_dispatcher is None: 

3492 self._event_dispatcher = EventDispatcher() 

3493 else: 

3494 self._event_dispatcher = event_dispatcher 

3495 

3496 def __repr__(self): 

3497 """ """ 

3498 return f"{type(self).__name__}" 

3499 

3500 def __enter__(self): 

3501 """ """ 

3502 return self 

3503 

3504 def __exit__(self, exc_type, exc_value, traceback): 

3505 """ """ 

3506 self.reset() 

3507 

3508 def __del__(self): 

3509 try: 

3510 self.reset() 

3511 except Exception: 

3512 pass 

3513 

3514 def __len__(self): 

3515 """ """ 

3516 return len(self._execution_strategy.command_queue) 

3517 

3518 def __bool__(self): 

3519 "Pipeline instances should always evaluate to True on Python 3+" 

3520 return True 

3521 

3522 def execute_command(self, *args, **kwargs): 

3523 """ 

3524 Wrapper function for pipeline_execute_command 

3525 """ 

3526 return self._execution_strategy.execute_command(*args, **kwargs) 

3527 

3528 def pipeline_execute_command(self, *args, **options): 

3529 """ 

3530 Stage a command to be executed when execute() is next called 

3531 

3532 Returns the current Pipeline object back so commands can be 

3533 chained together, such as: 

3534 

3535 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

3536 

3537 At some other point, you can then run: pipe.execute(), 

3538 which will execute all commands queued in the pipe. 

3539 """ 

3540 return self._execution_strategy.execute_command(*args, **options) 

3541 

3542 def annotate_exception(self, exception, number, command): 

3543 """ 

3544 Provides extra context to the exception prior to it being handled 

3545 """ 

3546 self._execution_strategy.annotate_exception(exception, number, command) 

3547 

3548 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3549 """ 

3550 Execute all the commands in the current pipeline 

3551 """ 

3552 

3553 try: 

3554 return self._execution_strategy.execute(raise_on_error) 

3555 finally: 

3556 self.reset() 

3557 

3558 def reset(self): 

3559 """ 

3560 Reset back to empty pipeline. 

3561 """ 

3562 self._execution_strategy.reset() 

3563 

3564 def send_cluster_commands( 

3565 self, stack, raise_on_error=True, allow_redirections=True 

3566 ): 

3567 return self._execution_strategy.send_cluster_commands( 

3568 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections 

3569 ) 

3570 

3571 def exists(self, *keys): 

3572 return self._execution_strategy.exists(*keys) 

3573 

3574 def eval(self): 

3575 """ """ 

3576 return self._execution_strategy.eval() 

3577 

3578 def multi(self): 

3579 """ 

3580 Start a transactional block of the pipeline after WATCH commands 

3581 are issued. End the transactional block with `execute`. 

3582 """ 

3583 self._execution_strategy.multi() 

3584 

3585 def load_scripts(self): 

3586 """ """ 

3587 self._execution_strategy.load_scripts() 

3588 

3589 def discard(self): 

3590 """ """ 

3591 self._execution_strategy.discard() 

3592 

3593 def watch(self, *names): 

3594 """Watches the values at keys ``names``""" 

3595 self._execution_strategy.watch(*names) 

3596 

3597 def unwatch(self): 

3598 """Unwatches all previously specified keys""" 

3599 self._execution_strategy.unwatch() 

3600 

3601 def script_load_for_pipeline(self, *args, **kwargs): 

3602 self._execution_strategy.script_load_for_pipeline(*args, **kwargs) 

3603 

3604 def delete(self, *names): 

3605 self._execution_strategy.delete(*names) 

3606 

3607 def unlink(self, *names): 

3608 self._execution_strategy.unlink(*names) 

3609 

3610 

3611def block_pipeline_command(name: str) -> Callable[..., Any]: 

3612 """ 

3613 Prints error because some pipelined commands should 

3614 be blocked when running in cluster-mode 

3615 """ 

3616 

3617 def inner(*args, **kwargs): 

3618 raise RedisClusterException( 

3619 f"ERROR: Calling pipelined function {name} is blocked " 

3620 f"when running redis in cluster mode..." 

3621 ) 

3622 

3623 return inner 

3624 

3625 

3626# Blocked pipeline commands 

3627PIPELINE_BLOCKED_COMMANDS = ( 

3628 "BGREWRITEAOF", 

3629 "BGSAVE", 

3630 "BITOP", 

3631 "BRPOPLPUSH", 

3632 "CLIENT GETNAME", 

3633 "CLIENT KILL", 

3634 "CLIENT LIST", 

3635 "CLIENT SETNAME", 

3636 "CLIENT", 

3637 "CONFIG GET", 

3638 "CONFIG RESETSTAT", 

3639 "CONFIG REWRITE", 

3640 "CONFIG SET", 

3641 "CONFIG", 

3642 "DBSIZE", 

3643 "ECHO", 

3644 "EVALSHA", 

3645 "FLUSHALL", 

3646 "FLUSHDB", 

3647 "INFO", 

3648 "KEYS", 

3649 "LASTSAVE", 

3650 "MGET", 

3651 "MGET NONATOMIC", 

3652 "MOVE", 

3653 "MSET", 

3654 "MSETEX", 

3655 "MSET NONATOMIC", 

3656 "MSETNX", 

3657 "PFCOUNT", 

3658 "PFMERGE", 

3659 "PING", 

3660 "PUBLISH", 

3661 "RANDOMKEY", 

3662 "READONLY", 

3663 "READWRITE", 

3664 "RENAME", 

3665 "RENAMENX", 

3666 "RPOPLPUSH", 

3667 "SAVE", 

3668 "SCAN", 

3669 "SCRIPT EXISTS", 

3670 "SCRIPT FLUSH", 

3671 "SCRIPT KILL", 

3672 "SCRIPT LOAD", 

3673 "SCRIPT", 

3674 "SDIFF", 

3675 "SDIFFSTORE", 

3676 "SENTINEL GET MASTER ADDR BY NAME", 

3677 "SENTINEL MASTER", 

3678 "SENTINEL MASTERS", 

3679 "SENTINEL MONITOR", 

3680 "SENTINEL REMOVE", 

3681 "SENTINEL SENTINELS", 

3682 "SENTINEL SET", 

3683 "SENTINEL SLAVES", 

3684 "SENTINEL", 

3685 "SHUTDOWN", 

3686 "SINTER", 

3687 "SINTERSTORE", 

3688 "SLAVEOF", 

3689 "SLOWLOG GET", 

3690 "SLOWLOG LEN", 

3691 "SLOWLOG RESET", 

3692 "SLOWLOG", 

3693 "SMOVE", 

3694 "SORT", 

3695 "SUNION", 

3696 "SUNIONSTORE", 

3697 "TIME", 

3698) 

3699for command in PIPELINE_BLOCKED_COMMANDS: 

3700 command = command.replace(" ", "_").lower() 

3701 

3702 setattr(ClusterPipeline, command, block_pipeline_command(command)) 

3703 

3704 

3705class PipelineCommand: 

3706 """ """ 

3707 

3708 def __init__(self, args, options=None, position=None): 

3709 self.args = args 

3710 if options is None: 

3711 options = {} 

3712 self.options = options 

3713 self.position = position 

3714 self.result = None 

3715 self.node = None 

3716 self.asking = False 

3717 self.command_policies: Optional[CommandPolicies] = None 

3718 

3719 

3720class NodeCommands: 

3721 """ """ 

3722 

3723 def __init__( 

3724 self, parse_response, connection_pool: ConnectionPool, connection: Connection 

3725 ): 

3726 """ """ 

3727 self.parse_response = parse_response 

3728 self.connection_pool = connection_pool 

3729 self.connection = connection 

3730 self.commands = [] 

3731 

3732 def append(self, c): 

3733 """ """ 

3734 self.commands.append(c) 

3735 

3736 def write(self): 

3737 """ 

3738 Code borrowed from Redis so it can be fixed 

3739 """ 

3740 connection = self.connection 

3741 commands = self.commands 

3742 

3743 # We are going to clobber the commands with the write, so go ahead 

3744 # and ensure that nothing is sitting there from a previous run. 

3745 for c in commands: 

3746 c.result = None 

3747 

3748 # build up all commands into a single request to increase network perf 

3749 # send all the commands and catch connection and timeout errors. 

3750 try: 

3751 connection.send_packed_command( 

3752 connection.pack_commands([c.args for c in commands]) 

3753 ) 

3754 except (ConnectionError, TimeoutError) as e: 

3755 for c in commands: 

3756 c.result = e 

3757 

3758 def read(self): 

3759 """ """ 

3760 connection = self.connection 

3761 for c in self.commands: 

3762 # if there is a result on this command, 

3763 # it means we ran into an exception 

3764 # like a connection error. Trying to parse 

3765 # a response on a connection that 

3766 # is no longer open will result in a 

3767 # connection error raised by redis-py. 

3768 # but redis-py doesn't check in parse_response 

3769 # that the sock object is 

3770 # still set and if you try to 

3771 # read from a closed connection, it will 

3772 # result in an AttributeError because 

3773 # it will do a readline() call on None. 

3774 # This can have all kinds of nasty side-effects. 

3775 # Treating this case as a connection error 

3776 # is fine because it will dump 

3777 # the connection object back into the 

3778 # pool and on the next write, it will 

3779 # explicitly open the connection and all will be well. 

3780 if c.result is None: 

3781 try: 

3782 c.result = self.parse_response(connection, c.args[0], **c.options) 

3783 except (ConnectionError, TimeoutError) as e: 

3784 for c in self.commands: 

3785 c.result = e 

3786 return 

3787 except RedisError: 

3788 c.result = sys.exc_info()[1] 

3789 

3790 

3791class ExecutionStrategy(ABC): 

3792 @property 

3793 @abstractmethod 

3794 def command_queue(self): 

3795 pass 

3796 

3797 @abstractmethod 

3798 def execute_command(self, *args, **kwargs): 

3799 """ 

3800 Execution flow for current execution strategy. 

3801 

3802 See: ClusterPipeline.execute_command() 

3803 """ 

3804 pass 

3805 

3806 @abstractmethod 

3807 def annotate_exception(self, exception, number, command): 

3808 """ 

3809 Annotate exception according to current execution strategy. 

3810 

3811 See: ClusterPipeline.annotate_exception() 

3812 """ 

3813 pass 

3814 

3815 @abstractmethod 

3816 def pipeline_execute_command(self, *args, **options): 

3817 """ 

3818 Pipeline execution flow for current execution strategy. 

3819 

3820 See: ClusterPipeline.pipeline_execute_command() 

3821 """ 

3822 pass 

3823 

3824 @abstractmethod 

3825 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3826 """ 

3827 Executes current execution strategy. 

3828 

3829 See: ClusterPipeline.execute() 

3830 """ 

3831 pass 

3832 

3833 @abstractmethod 

3834 def send_cluster_commands( 

3835 self, stack, raise_on_error=True, allow_redirections=True 

3836 ): 

3837 """ 

3838 Sends commands according to current execution strategy. 

3839 

3840 See: ClusterPipeline.send_cluster_commands() 

3841 """ 

3842 pass 

3843 

3844 @abstractmethod 

3845 def reset(self): 

3846 """ 

3847 Resets current execution strategy. 

3848 

3849 See: ClusterPipeline.reset() 

3850 """ 

3851 pass 

3852 

3853 @abstractmethod 

3854 def exists(self, *keys): 

3855 pass 

3856 

3857 @abstractmethod 

3858 def eval(self): 

3859 pass 

3860 

3861 @abstractmethod 

3862 def multi(self): 

3863 """ 

3864 Starts transactional context. 

3865 

3866 See: ClusterPipeline.multi() 

3867 """ 

3868 pass 

3869 

3870 @abstractmethod 

3871 def load_scripts(self): 

3872 pass 

3873 

3874 @abstractmethod 

3875 def watch(self, *names): 

3876 pass 

3877 

3878 @abstractmethod 

3879 def unwatch(self): 

3880 """ 

3881 Unwatches all previously specified keys 

3882 

3883 See: ClusterPipeline.unwatch() 

3884 """ 

3885 pass 

3886 

3887 @abstractmethod 

3888 def script_load_for_pipeline(self, *args, **kwargs): 

3889 pass 

3890 

3891 @abstractmethod 

3892 def delete(self, *names): 

3893 """ 

3894 "Delete a key specified by ``names``" 

3895 

3896 See: ClusterPipeline.delete() 

3897 """ 

3898 pass 

3899 

3900 @abstractmethod 

3901 def unlink(self, *names): 

3902 """ 

3903 "Unlink a key specified by ``names``" 

3904 

3905 See: ClusterPipeline.unlink() 

3906 """ 

3907 pass 

3908 

3909 @abstractmethod 

3910 def discard(self): 

3911 pass 

3912 

3913 

3914class AbstractStrategy(ExecutionStrategy): 

3915 def __init__( 

3916 self, 

3917 pipe: ClusterPipeline, 

3918 ): 

3919 self._command_queue: List[PipelineCommand] = [] 

3920 self._pipe = pipe 

3921 self._nodes_manager = self._pipe.nodes_manager 

3922 

3923 @property 

3924 def command_queue(self): 

3925 return self._command_queue 

3926 

3927 @command_queue.setter 

3928 def command_queue(self, queue: List[PipelineCommand]): 

3929 self._command_queue = queue 

3930 

3931 @abstractmethod 

3932 def execute_command(self, *args, **kwargs): 

3933 pass 

3934 

3935 def pipeline_execute_command(self, *args, **options): 

3936 self._command_queue.append( 

3937 PipelineCommand(args, options, len(self._command_queue)) 

3938 ) 

3939 return self._pipe 

3940 

3941 @abstractmethod 

3942 def execute(self, raise_on_error: bool = True) -> List[Any]: 

3943 pass 

3944 

3945 @abstractmethod 

3946 def send_cluster_commands( 

3947 self, stack, raise_on_error=True, allow_redirections=True 

3948 ): 

3949 pass 

3950 

3951 @abstractmethod 

3952 def reset(self): 

3953 pass 

3954 

3955 def exists(self, *keys): 

3956 return self.execute_command("EXISTS", *keys) 

3957 

3958 def eval(self): 

3959 """ """ 

3960 raise RedisClusterException("method eval() is not implemented") 

3961 

3962 def load_scripts(self): 

3963 """ """ 

3964 raise RedisClusterException("method load_scripts() is not implemented") 

3965 

3966 def script_load_for_pipeline(self, *args, **kwargs): 

3967 """ """ 

3968 raise RedisClusterException( 

3969 "method script_load_for_pipeline() is not implemented" 

3970 ) 

3971 

3972 def annotate_exception(self, exception, number, command): 

3973 """ 

3974 Provides extra context to the exception prior to it being handled 

3975 """ 

3976 cmd = " ".join(map(safe_str, command)) 

3977 msg = ( 

3978 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

3979 f"caused error: {exception.args[0]}" 

3980 ) 

3981 exception.args = (msg,) + exception.args[1:] 

3982 

3983 

3984class PipelineStrategy(AbstractStrategy): 

3985 def __init__(self, pipe: ClusterPipeline): 

3986 super().__init__(pipe) 

3987 self.command_flags = pipe.command_flags 

3988 

3989 def execute_command(self, *args, **kwargs): 

3990 return self.pipeline_execute_command(*args, **kwargs) 

3991 

3992 def _raise_first_error(self, stack, start_time): 

3993 """ 

3994 Raise the first exception on the stack 

3995 """ 

3996 for c in stack: 

3997 r = c.result 

3998 if isinstance(r, Exception): 

3999 self.annotate_exception(r, c.position + 1, c.args) 

4000 

4001 record_operation_duration( 

4002 command_name="PIPELINE", 

4003 duration_seconds=time.monotonic() - start_time, 

4004 error=r, 

4005 ) 

4006 

4007 raise r 

4008 

4009 def execute(self, raise_on_error: bool = True) -> List[Any]: 

4010 stack = self._command_queue 

4011 if not stack: 

4012 return [] 

4013 

4014 try: 

4015 return self.send_cluster_commands(stack, raise_on_error) 

4016 finally: 

4017 self.reset() 

4018 

4019 def reset(self): 

4020 """ 

4021 Reset back to empty pipeline. 

4022 """ 

4023 self._command_queue = [] 

4024 

4025 def send_cluster_commands( 

4026 self, stack, raise_on_error=True, allow_redirections=True 

4027 ): 

4028 """ 

4029 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling. 

4030 

4031 If one of the retryable exceptions has been thrown we assume that: 

4032 - connection_pool was disconnected 

4033 - connection_pool was reset 

4034 - refresh_table_asap set to True 

4035 

4036 It will try the number of times specified by 

4037 the retries in config option "self.retry" 

4038 which defaults to 10 unless manually configured. 

4039 

4040 If it reaches the number of times, the command will 

4041 raises ClusterDownException. 

4042 """ 

4043 if not stack: 

4044 return [] 

4045 retry_attempts = self._pipe.retry.get_retries() 

4046 while True: 

4047 try: 

4048 return self._send_cluster_commands( 

4049 stack, 

4050 raise_on_error=raise_on_error, 

4051 allow_redirections=allow_redirections, 

4052 ) 

4053 except RedisCluster.ERRORS_ALLOW_RETRY as e: 

4054 if retry_attempts > 0: 

4055 # Try again with the new cluster setup. All other errors 

4056 # should be raised. 

4057 retry_attempts -= 1 

4058 pass 

4059 else: 

4060 raise e 

4061 

4062 def _send_cluster_commands( 

4063 self, stack, raise_on_error=True, allow_redirections=True 

4064 ): 

4065 """ 

4066 Send a bunch of cluster commands to the redis cluster. 

4067 

4068 `allow_redirections` If the pipeline should follow 

4069 `ASK` & `MOVED` responses automatically. If set 

4070 to false it will raise RedisClusterException. 

4071 """ 

4072 # the first time sending the commands we send all of 

4073 # the commands that were queued up. 

4074 # if we have to run through it again, we only retry 

4075 # the commands that failed. 

4076 attempt = sorted(stack, key=lambda x: x.position) 

4077 is_default_node = False 

4078 # build a list of node objects based on node names we need to 

4079 nodes: dict[str, NodeCommands] = {} 

4080 nodes_written = 0 

4081 nodes_read = 0 

4082 

4083 try: 

4084 # as we move through each command that still needs to be processed, 

4085 # we figure out the slot number that command maps to, then from 

4086 # the slot determine the node. 

4087 for c in attempt: 

4088 command_policies = self._pipe._policy_resolver.resolve( 

4089 c.args[0].lower() 

4090 ) 

4091 # refer to our internal node -> slot table that 

4092 # tells us where a given command should route to. 

4093 # (it might be possible we have a cached node that no longer 

4094 # exists in the cluster, which is why we do this in a loop) 

4095 passed_targets = c.options.pop("target_nodes", None) 

4096 if passed_targets and not self._is_nodes_flag(passed_targets): 

4097 target_nodes = self._parse_target_nodes(passed_targets) 

4098 

4099 if not command_policies: 

4100 command_policies = CommandPolicies() 

4101 else: 

4102 if not command_policies: 

4103 command = c.args[0].upper() 

4104 if ( 

4105 len(c.args) >= 2 

4106 and f"{c.args[0]} {c.args[1]}".upper() 

4107 in self._pipe.command_flags 

4108 ): 

4109 command = f"{c.args[0]} {c.args[1]}".upper() 

4110 

4111 # We only could resolve key properties if command is not 

4112 # in a list of pre-defined request policies 

4113 command_flag = self.command_flags.get(command) 

4114 if not command_flag: 

4115 # Fallback to default policy 

4116 if not self._pipe.get_default_node(): 

4117 keys = None 

4118 else: 

4119 keys = self._pipe._get_command_keys(*c.args) 

4120 if not keys or len(keys) == 0: 

4121 command_policies = CommandPolicies() 

4122 else: 

4123 command_policies = CommandPolicies( 

4124 request_policy=RequestPolicy.DEFAULT_KEYED, 

4125 response_policy=ResponsePolicy.DEFAULT_KEYED, 

4126 ) 

4127 else: 

4128 if command_flag in self._pipe._command_flags_mapping: 

4129 command_policies = CommandPolicies( 

4130 request_policy=self._pipe._command_flags_mapping[ 

4131 command_flag 

4132 ] 

4133 ) 

4134 else: 

4135 command_policies = CommandPolicies() 

4136 

4137 target_nodes = self._determine_nodes( 

4138 *c.args, 

4139 request_policy=command_policies.request_policy, 

4140 node_flag=passed_targets, 

4141 ) 

4142 if not target_nodes: 

4143 raise RedisClusterException( 

4144 f"No targets were found to execute {c.args} command on" 

4145 ) 

4146 c.command_policies = command_policies 

4147 if len(target_nodes) > 1: 

4148 raise RedisClusterException( 

4149 f"Too many targets for command {c.args}" 

4150 ) 

4151 

4152 node = target_nodes[0] 

4153 if node == self._pipe.get_default_node(): 

4154 is_default_node = True 

4155 

4156 # now that we know the name of the node 

4157 # ( it's just a string in the form of host:port ) 

4158 # we can build a list of commands for each node. 

4159 node_name = node.name 

4160 if node_name not in nodes: 

4161 redis_node = self._pipe.get_redis_connection(node) 

4162 try: 

4163 connection = get_connection(redis_node) 

4164 except (ConnectionError, TimeoutError): 

4165 # Release any connections we've already acquired before clearing nodes 

4166 for n in nodes.values(): 

4167 n.connection_pool.release(n.connection) 

4168 # Connection retries are being handled in the node's 

4169 # Retry object. Reinitialize the node -> slot table. 

4170 self._nodes_manager.initialize() 

4171 if is_default_node: 

4172 self._pipe.replace_default_node() 

4173 nodes = {} 

4174 raise 

4175 nodes[node_name] = NodeCommands( 

4176 redis_node.parse_response, 

4177 redis_node.connection_pool, 

4178 connection, 

4179 ) 

4180 nodes[node_name].append(c) 

4181 

4182 # send the commands in sequence. 

4183 # we write to all the open sockets for each node first, 

4184 # before reading anything 

4185 # this allows us to flush all the requests out across the 

4186 # network 

4187 # so that we can read them from different sockets as they come back. 

4188 # we don't multiplex on the sockets as they come available, 

4189 # but that shouldn't make too much difference. 

4190 

4191 # Start timing for observability 

4192 start_time = time.monotonic() 

4193 

4194 node_commands = nodes.values() 

4195 for n in node_commands: 

4196 nodes_written += 1 

4197 n.write() 

4198 

4199 for n in node_commands: 

4200 n.read() 

4201 

4202 # Find the first error in this node's commands, if any 

4203 node_error = None 

4204 for cmd in n.commands: 

4205 if isinstance(cmd.result, Exception): 

4206 node_error = cmd.result 

4207 break 

4208 

4209 record_operation_duration( 

4210 command_name="PIPELINE", 

4211 duration_seconds=time.monotonic() - start_time, 

4212 server_address=n.connection.host, 

4213 server_port=n.connection.port, 

4214 db_namespace=str(n.connection.db), 

4215 error=node_error, 

4216 ) 

4217 nodes_read += 1 

4218 finally: 

4219 # release all the redis connections we allocated earlier 

4220 # back into the connection pool. 

4221 # if the connection is dirty (that is: we've written 

4222 # commands to it, but haven't read the responses), we need 

4223 # to close the connection before returning it to the pool. 

4224 # otherwise, the next caller to use this connection will 

4225 # read the response from _this_ request, not its own request. 

4226 # disconnecting discards the dirty state & forces the next 

4227 # caller to reconnect. 

4228 # NOTE: dicts have a consistent ordering; we're iterating 

4229 # through nodes.values() in the same order as we are when 

4230 # reading / writing to the connections above, which is critical 

4231 # for how we're using the nodes_written/nodes_read offsets. 

4232 for i, n in enumerate(nodes.values()): 

4233 if i < nodes_written and i >= nodes_read: 

4234 n.connection.disconnect() 

4235 n.connection_pool.release(n.connection) 

4236 

4237 # if the response isn't an exception it is a 

4238 # valid response from the node 

4239 # we're all done with that command, YAY! 

4240 # if we have more commands to attempt, we've run into problems. 

4241 # collect all the commands we are allowed to retry. 

4242 # (MOVED, ASK, or connection errors or timeout errors) 

4243 attempt = sorted( 

4244 ( 

4245 c 

4246 for c in attempt 

4247 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) 

4248 ), 

4249 key=lambda x: x.position, 

4250 ) 

4251 if attempt and allow_redirections: 

4252 # RETRY MAGIC HAPPENS HERE! 

4253 # send these remaining commands one at a time using `execute_command` 

4254 # in the main client. This keeps our retry logic 

4255 # in one place mostly, 

4256 # and allows us to be more confident in correctness of behavior. 

4257 # at this point any speed gains from pipelining have been lost 

4258 # anyway, so we might as well make the best 

4259 # attempt to get the correct behavior. 

4260 # 

4261 # The client command will handle retries for each 

4262 # individual command sequentially as we pass each 

4263 # one into `execute_command`. Any exceptions 

4264 # that bubble out should only appear once all 

4265 # retries have been exhausted. 

4266 # 

4267 # If a lot of commands have failed, we'll be setting the 

4268 # flag to rebuild the slots table from scratch. 

4269 # So MOVED errors should correct themselves fairly quickly. 

4270 self._pipe.reinitialize_counter += 1 

4271 if self._pipe._should_reinitialized(): 

4272 self._nodes_manager.initialize() 

4273 if is_default_node: 

4274 self._pipe.replace_default_node() 

4275 for c in attempt: 

4276 try: 

4277 # send each command individually like we 

4278 # do in the main client. 

4279 c.result = self._pipe.parent_execute_command(*c.args, **c.options) 

4280 except RedisError as e: 

4281 c.result = e 

4282 

4283 # turn the response back into a simple flat array that corresponds 

4284 # to the sequence of commands issued in the stack in pipeline.execute() 

4285 response = [] 

4286 for c in sorted(stack, key=lambda x: x.position): 

4287 if c.args[0] in self._pipe.cluster_response_callbacks: 

4288 # Remove keys entry, it needs only for cache. 

4289 c.options.pop("keys", None) 

4290 c.result = self._pipe._policies_callback_mapping[ 

4291 c.command_policies.response_policy 

4292 ]( 

4293 self._pipe.cluster_response_callbacks[c.args[0]]( 

4294 c.result, **c.options 

4295 ) 

4296 ) 

4297 response.append(c.result) 

4298 

4299 if raise_on_error: 

4300 self._raise_first_error(stack, start_time) 

4301 

4302 return response 

4303 

4304 def _is_nodes_flag(self, target_nodes): 

4305 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags 

4306 

4307 def _parse_target_nodes(self, target_nodes): 

4308 if isinstance(target_nodes, list): 

4309 nodes = target_nodes 

4310 elif isinstance(target_nodes, ClusterNode): 

4311 # Supports passing a single ClusterNode as a variable 

4312 nodes = [target_nodes] 

4313 elif isinstance(target_nodes, dict): 

4314 # Supports dictionaries of the format {node_name: node}. 

4315 # It enables to execute commands with multi nodes as follows: 

4316 # rc.cluster_save_config(rc.get_primaries()) 

4317 nodes = target_nodes.values() 

4318 else: 

4319 raise TypeError( 

4320 "target_nodes type can be one of the following: " 

4321 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," 

4322 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " 

4323 f"The passed type is {type(target_nodes)}" 

4324 ) 

4325 return nodes 

4326 

4327 def _determine_nodes( 

4328 self, *args, request_policy: RequestPolicy, **kwargs 

4329 ) -> List["ClusterNode"]: 

4330 # Determine which nodes should be executed the command on. 

4331 # Returns a list of target nodes. 

4332 command = args[0].upper() 

4333 if ( 

4334 len(args) >= 2 

4335 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags 

4336 ): 

4337 command = f"{args[0]} {args[1]}".upper() 

4338 

4339 nodes_flag = kwargs.pop("nodes_flag", None) 

4340 if nodes_flag is not None: 

4341 # nodes flag passed by the user 

4342 command_flag = nodes_flag 

4343 else: 

4344 # get the nodes group for this command if it was predefined 

4345 command_flag = self._pipe.command_flags.get(command) 

4346 

4347 if command_flag in self._pipe._command_flags_mapping: 

4348 request_policy = self._pipe._command_flags_mapping[command_flag] 

4349 

4350 policy_callback = self._pipe._policies_callback_mapping[request_policy] 

4351 

4352 if request_policy == RequestPolicy.DEFAULT_KEYED: 

4353 nodes = policy_callback(command, *args) 

4354 elif request_policy == RequestPolicy.MULTI_SHARD: 

4355 nodes = policy_callback(*args, **kwargs) 

4356 elif request_policy == RequestPolicy.DEFAULT_KEYLESS: 

4357 nodes = policy_callback(args[0]) 

4358 else: 

4359 nodes = policy_callback() 

4360 

4361 if args[0].lower() == "ft.aggregate": 

4362 self._aggregate_nodes = nodes 

4363 

4364 return nodes 

4365 

4366 def multi(self): 

4367 raise RedisClusterException( 

4368 "method multi() is not supported outside of transactional context" 

4369 ) 

4370 

4371 def discard(self): 

4372 raise RedisClusterException( 

4373 "method discard() is not supported outside of transactional context" 

4374 ) 

4375 

4376 def watch(self, *names): 

4377 raise RedisClusterException( 

4378 "method watch() is not supported outside of transactional context" 

4379 ) 

4380 

4381 def unwatch(self, *names): 

4382 raise RedisClusterException( 

4383 "method unwatch() is not supported outside of transactional context" 

4384 ) 

4385 

4386 def delete(self, *names): 

4387 if len(names) != 1: 

4388 raise RedisClusterException( 

4389 "deleting multiple keys is not implemented in pipeline command" 

4390 ) 

4391 

4392 return self.execute_command("DEL", names[0]) 

4393 

4394 def unlink(self, *names): 

4395 if len(names) != 1: 

4396 raise RedisClusterException( 

4397 "unlinking multiple keys is not implemented in pipeline command" 

4398 ) 

4399 

4400 return self.execute_command("UNLINK", names[0]) 

4401 

4402 

4403class TransactionStrategy(AbstractStrategy): 

4404 NO_SLOTS_COMMANDS = {"UNWATCH"} 

4405 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} 

4406 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

4407 SLOT_REDIRECT_ERRORS = (AskError, MovedError) 

4408 CONNECTION_ERRORS = ( 

4409 ConnectionError, 

4410 OSError, 

4411 ClusterDownError, 

4412 SlotNotCoveredError, 

4413 ) 

4414 

4415 def __init__(self, pipe: ClusterPipeline): 

4416 super().__init__(pipe) 

4417 self._explicit_transaction = False 

4418 self._watching = False 

4419 self._pipeline_slots: Set[int] = set() 

4420 self._transaction_connection: Optional[Connection] = None 

4421 self._executing = False 

4422 self._retry = copy(self._pipe.retry) 

4423 self._retry.update_supported_errors( 

4424 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS 

4425 ) 

4426 

4427 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]: 

4428 """ 

4429 Find a connection for a pipeline transaction. 

4430 

4431 For running an atomic transaction, watch keys ensure that contents have not been 

4432 altered as long as the watch commands for those keys were sent over the same 

4433 connection. So once we start watching a key, we fetch a connection to the 

4434 node that owns that slot and reuse it. 

4435 """ 

4436 if not self._pipeline_slots: 

4437 raise RedisClusterException( 

4438 "At least a command with a key is needed to identify a node" 

4439 ) 

4440 

4441 node: ClusterNode = self._nodes_manager.get_node_from_slot( 

4442 list(self._pipeline_slots)[0], False 

4443 ) 

4444 redis_node: Redis = self._pipe.get_redis_connection(node) 

4445 if self._transaction_connection: 

4446 if not redis_node.connection_pool.owns_connection( 

4447 self._transaction_connection 

4448 ): 

4449 previous_node = self._nodes_manager.find_connection_owner( 

4450 self._transaction_connection 

4451 ) 

4452 previous_node.connection_pool.release(self._transaction_connection) 

4453 self._transaction_connection = None 

4454 

4455 if not self._transaction_connection: 

4456 self._transaction_connection = get_connection(redis_node) 

4457 

4458 return redis_node, self._transaction_connection 

4459 

4460 def execute_command(self, *args, **kwargs): 

4461 slot_number: Optional[int] = None 

4462 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS: 

4463 slot_number = self._pipe.determine_slot(*args) 

4464 

4465 if ( 

4466 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS 

4467 ) and not self._explicit_transaction: 

4468 if args[0] == "WATCH": 

4469 self._validate_watch() 

4470 

4471 if slot_number is not None: 

4472 if self._pipeline_slots and slot_number not in self._pipeline_slots: 

4473 raise CrossSlotTransactionError( 

4474 "Cannot watch or send commands on different slots" 

4475 ) 

4476 

4477 self._pipeline_slots.add(slot_number) 

4478 elif args[0] not in self.NO_SLOTS_COMMANDS: 

4479 raise RedisClusterException( 

4480 f"Cannot identify slot number for command: {args[0]}," 

4481 "it cannot be triggered in a transaction" 

4482 ) 

4483 

4484 return self._immediate_execute_command(*args, **kwargs) 

4485 else: 

4486 if slot_number is not None: 

4487 self._pipeline_slots.add(slot_number) 

4488 

4489 return self.pipeline_execute_command(*args, **kwargs) 

4490 

4491 def _validate_watch(self): 

4492 if self._explicit_transaction: 

4493 raise RedisError("Cannot issue a WATCH after a MULTI") 

4494 

4495 self._watching = True 

4496 

4497 def _immediate_execute_command(self, *args, **options): 

4498 return self._retry.call_with_retry( 

4499 lambda: self._get_connection_and_send_command(*args, **options), 

4500 self._reinitialize_on_error, 

4501 with_failure_count=True, 

4502 ) 

4503 

4504 def _get_connection_and_send_command(self, *args, **options): 

4505 redis_node, connection = self._get_client_and_connection_for_transaction() 

4506 

4507 # Start timing for observability 

4508 start_time = time.monotonic() 

4509 

4510 try: 

4511 response = self._send_command_parse_response( 

4512 connection, redis_node, args[0], *args, **options 

4513 ) 

4514 

4515 record_operation_duration( 

4516 command_name=args[0], 

4517 duration_seconds=time.monotonic() - start_time, 

4518 server_address=connection.host, 

4519 server_port=connection.port, 

4520 db_namespace=str(connection.db), 

4521 ) 

4522 

4523 return response 

4524 except Exception as e: 

4525 if connection: 

4526 # this is used to report the metrics based on host and port info 

4527 e.connection = connection 

4528 record_operation_duration( 

4529 command_name=args[0], 

4530 duration_seconds=time.monotonic() - start_time, 

4531 server_address=connection.host, 

4532 server_port=connection.port, 

4533 db_namespace=str(connection.db), 

4534 error=e, 

4535 ) 

4536 raise 

4537 

4538 def _send_command_parse_response( 

4539 self, conn, redis_node: Redis, command_name, *args, **options 

4540 ): 

4541 """ 

4542 Send a command and parse the response 

4543 """ 

4544 

4545 conn.send_command(*args) 

4546 output = redis_node.parse_response(conn, command_name, **options) 

4547 

4548 if command_name in self.UNWATCH_COMMANDS: 

4549 self._watching = False 

4550 return output 

4551 

4552 def _reinitialize_on_error(self, error, failure_count): 

4553 if hasattr(error, "connection"): 

4554 record_error_count( 

4555 server_address=error.connection.host, 

4556 server_port=error.connection.port, 

4557 network_peer_address=error.connection.host, 

4558 network_peer_port=error.connection.port, 

4559 error_type=error, 

4560 retry_attempts=failure_count, 

4561 is_internal=True, 

4562 ) 

4563 

4564 if self._watching: 

4565 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: 

4566 raise WatchError("Slot rebalancing occurred while watching keys") 

4567 

4568 if ( 

4569 type(error) in self.SLOT_REDIRECT_ERRORS 

4570 or type(error) in self.CONNECTION_ERRORS 

4571 ): 

4572 if self._transaction_connection: 

4573 if is_debug_log_enabled(): 

4574 logger.debug( 

4575 f"Operation failed, " 

4576 f"with connection: {self._transaction_connection}, " 

4577 f"details: {self._transaction_connection.extract_connection_details()}", 

4578 ) 

4579 # Disconnect and release back to pool 

4580 self._transaction_connection.disconnect() 

4581 node = self._nodes_manager.find_connection_owner( 

4582 self._transaction_connection 

4583 ) 

4584 if node and node.redis_connection: 

4585 node.redis_connection.connection_pool.release( 

4586 self._transaction_connection 

4587 ) 

4588 self._transaction_connection = None 

4589 

4590 self._pipe.reinitialize_counter += 1 

4591 if self._pipe._should_reinitialized(): 

4592 self._nodes_manager.initialize() 

4593 self.reinitialize_counter = 0 

4594 else: 

4595 if isinstance(error, AskError): 

4596 self._nodes_manager.move_slot(error) 

4597 

4598 self._executing = False 

4599 

4600 def _raise_first_error(self, responses, stack, start_time): 

4601 """ 

4602 Raise the first exception on the stack 

4603 """ 

4604 for r, cmd in zip(responses, stack): 

4605 if isinstance(r, Exception): 

4606 self.annotate_exception(r, cmd.position + 1, cmd.args) 

4607 

4608 record_operation_duration( 

4609 command_name="TRANSACTION", 

4610 duration_seconds=time.monotonic() - start_time, 

4611 server_address=self._transaction_connection.host, 

4612 server_port=self._transaction_connection.port, 

4613 db_namespace=str(self._transaction_connection.db), 

4614 ) 

4615 

4616 raise r 

4617 

4618 def execute(self, raise_on_error: bool = True) -> List[Any]: 

4619 stack = self._command_queue 

4620 if not stack and (not self._watching or not self._pipeline_slots): 

4621 return [] 

4622 

4623 return self._execute_transaction_with_retries(stack, raise_on_error) 

4624 

4625 def _execute_transaction_with_retries( 

4626 self, stack: List["PipelineCommand"], raise_on_error: bool 

4627 ): 

4628 return self._retry.call_with_retry( 

4629 lambda: self._execute_transaction(stack, raise_on_error), 

4630 lambda error, failure_count: self._reinitialize_on_error( 

4631 error, failure_count 

4632 ), 

4633 with_failure_count=True, 

4634 ) 

4635 

4636 def _execute_transaction( 

4637 self, stack: List["PipelineCommand"], raise_on_error: bool 

4638 ): 

4639 if len(self._pipeline_slots) > 1: 

4640 raise CrossSlotTransactionError( 

4641 "All keys involved in a cluster transaction must map to the same slot" 

4642 ) 

4643 

4644 self._executing = True 

4645 

4646 redis_node, connection = self._get_client_and_connection_for_transaction() 

4647 

4648 stack = chain( 

4649 [PipelineCommand(("MULTI",))], 

4650 stack, 

4651 [PipelineCommand(("EXEC",))], 

4652 ) 

4653 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options] 

4654 packed_commands = connection.pack_commands(commands) 

4655 

4656 # Start timing for observability 

4657 start_time = time.monotonic() 

4658 

4659 connection.send_packed_command(packed_commands) 

4660 errors = [] 

4661 

4662 # parse off the response for MULTI 

4663 # NOTE: we need to handle ResponseErrors here and continue 

4664 # so that we read all the additional command messages from 

4665 # the socket 

4666 try: 

4667 redis_node.parse_response(connection, "MULTI") 

4668 except ResponseError as e: 

4669 self.annotate_exception(e, 0, "MULTI") 

4670 errors.append(e) 

4671 except self.CONNECTION_ERRORS as cluster_error: 

4672 self.annotate_exception(cluster_error, 0, "MULTI") 

4673 raise 

4674 

4675 # and all the other commands 

4676 for i, command in enumerate(self._command_queue): 

4677 if EMPTY_RESPONSE in command.options: 

4678 errors.append((i, command.options[EMPTY_RESPONSE])) 

4679 else: 

4680 try: 

4681 _ = redis_node.parse_response(connection, "_") 

4682 except self.SLOT_REDIRECT_ERRORS as slot_error: 

4683 self.annotate_exception(slot_error, i + 1, command.args) 

4684 errors.append(slot_error) 

4685 except self.CONNECTION_ERRORS as cluster_error: 

4686 self.annotate_exception(cluster_error, i + 1, command.args) 

4687 raise 

4688 except ResponseError as e: 

4689 self.annotate_exception(e, i + 1, command.args) 

4690 errors.append(e) 

4691 

4692 response = None 

4693 # parse the EXEC. 

4694 try: 

4695 response = redis_node.parse_response(connection, "EXEC") 

4696 except ExecAbortError: 

4697 if errors: 

4698 raise errors[0] 

4699 raise 

4700 

4701 self._executing = False 

4702 

4703 record_operation_duration( 

4704 command_name="TRANSACTION", 

4705 duration_seconds=time.monotonic() - start_time, 

4706 server_address=connection.host, 

4707 server_port=connection.port, 

4708 db_namespace=str(connection.db), 

4709 ) 

4710 

4711 # EXEC clears any watched keys 

4712 self._watching = False 

4713 

4714 if response is None: 

4715 raise WatchError("Watched variable changed.") 

4716 

4717 # put any parse errors into the response 

4718 for i, e in errors: 

4719 response.insert(i, e) 

4720 

4721 if len(response) != len(self._command_queue): 

4722 raise InvalidPipelineStack( 

4723 "Unexpected response length for cluster pipeline EXEC." 

4724 " Command stack was {} but response had length {}".format( 

4725 [c.args[0] for c in self._command_queue], len(response) 

4726 ) 

4727 ) 

4728 

4729 # find any errors in the response and raise if necessary 

4730 if raise_on_error or len(errors) > 0: 

4731 self._raise_first_error( 

4732 response, 

4733 self._command_queue, 

4734 start_time, 

4735 ) 

4736 

4737 # We have to run response callbacks manually 

4738 data = [] 

4739 for r, cmd in zip(response, self._command_queue): 

4740 if not isinstance(r, Exception): 

4741 command_name = cmd.args[0] 

4742 if command_name in self._pipe.cluster_response_callbacks: 

4743 r = self._pipe.cluster_response_callbacks[command_name]( 

4744 r, **cmd.options 

4745 ) 

4746 data.append(r) 

4747 return data 

4748 

4749 def reset(self): 

4750 self._command_queue = [] 

4751 

4752 # make sure to reset the connection state in the event that we were 

4753 # watching something 

4754 if self._transaction_connection: 

4755 try: 

4756 if self._watching: 

4757 # call this manually since our unwatch or 

4758 # immediate_execute_command methods can call reset() 

4759 self._transaction_connection.send_command("UNWATCH") 

4760 self._transaction_connection.read_response() 

4761 # we can safely return the connection to the pool here since we're 

4762 # sure we're no longer WATCHing anything 

4763 node = self._nodes_manager.find_connection_owner( 

4764 self._transaction_connection 

4765 ) 

4766 if node and node.redis_connection: 

4767 node.redis_connection.connection_pool.release( 

4768 self._transaction_connection 

4769 ) 

4770 self._transaction_connection = None 

4771 except self.CONNECTION_ERRORS: 

4772 # disconnect will also remove any previous WATCHes 

4773 if self._transaction_connection: 

4774 self._transaction_connection.disconnect() 

4775 node = self._nodes_manager.find_connection_owner( 

4776 self._transaction_connection 

4777 ) 

4778 if node and node.redis_connection: 

4779 node.redis_connection.connection_pool.release( 

4780 self._transaction_connection 

4781 ) 

4782 self._transaction_connection = None 

4783 

4784 # clean up the other instance attributes 

4785 self._watching = False 

4786 self._explicit_transaction = False 

4787 self._pipeline_slots = set() 

4788 self._executing = False 

4789 

4790 def send_cluster_commands( 

4791 self, stack, raise_on_error=True, allow_redirections=True 

4792 ): 

4793 raise NotImplementedError( 

4794 "send_cluster_commands cannot be executed in transactional context." 

4795 ) 

4796 

4797 def multi(self): 

4798 if self._explicit_transaction: 

4799 raise RedisError("Cannot issue nested calls to MULTI") 

4800 if self._command_queue: 

4801 raise RedisError( 

4802 "Commands without an initial WATCH have already been issued" 

4803 ) 

4804 self._explicit_transaction = True 

4805 

4806 def watch(self, *names): 

4807 if self._explicit_transaction: 

4808 raise RedisError("Cannot issue a WATCH after a MULTI") 

4809 

4810 return self.execute_command("WATCH", *names) 

4811 

4812 def unwatch(self): 

4813 if self._watching: 

4814 return self.execute_command("UNWATCH") 

4815 

4816 return True 

4817 

4818 def discard(self): 

4819 self.reset() 

4820 

4821 def delete(self, *names): 

4822 return self.execute_command("DEL", *names) 

4823 

4824 def unlink(self, *names): 

4825 return self.execute_command("UNLINK", *names)