Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/commands/cluster.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

185 statements  

1import asyncio 

2from typing import ( 

3 TYPE_CHECKING, 

4 Any, 

5 AsyncIterator, 

6 Dict, 

7 Iterable, 

8 Iterator, 

9 List, 

10 Literal, 

11 Mapping, 

12 NoReturn, 

13 Optional, 

14 Union, 

15) 

16 

17from redis.crc import key_slot 

18from redis.exceptions import RedisClusterException, RedisError 

19from redis.typing import ( 

20 AnyKeyT, 

21 ClusterCommandsProtocol, 

22 EncodableT, 

23 KeysT, 

24 KeyT, 

25 PatternT, 

26 ResponseT, 

27) 

28 

29from .core import ( 

30 ACLCommands, 

31 AsyncACLCommands, 

32 AsyncDataAccessCommands, 

33 AsyncFunctionCommands, 

34 AsyncManagementCommands, 

35 AsyncModuleCommands, 

36 AsyncScriptCommands, 

37 DataAccessCommands, 

38 FunctionCommands, 

39 ManagementCommands, 

40 ModuleCommands, 

41 PubSubCommands, 

42 ScriptCommands, 

43) 

44from .helpers import list_or_args 

45from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands 

46 

47if TYPE_CHECKING: 

48 from redis.asyncio.cluster import TargetNodesT 

49 

50# Not complete, but covers the major ones 

51# https://redis.io/commands 

52READ_COMMANDS = frozenset( 

53 [ 

54 "BITCOUNT", 

55 "BITPOS", 

56 "EVAL_RO", 

57 "EVALSHA_RO", 

58 "EXISTS", 

59 "GEODIST", 

60 "GEOHASH", 

61 "GEOPOS", 

62 "GEORADIUS", 

63 "GEORADIUSBYMEMBER", 

64 "GET", 

65 "GETBIT", 

66 "GETRANGE", 

67 "HEXISTS", 

68 "HGET", 

69 "HGETALL", 

70 "HKEYS", 

71 "HLEN", 

72 "HMGET", 

73 "HSTRLEN", 

74 "HVALS", 

75 "KEYS", 

76 "LINDEX", 

77 "LLEN", 

78 "LRANGE", 

79 "MGET", 

80 "PTTL", 

81 "RANDOMKEY", 

82 "SCARD", 

83 "SDIFF", 

84 "SINTER", 

85 "SISMEMBER", 

86 "SMEMBERS", 

87 "SRANDMEMBER", 

88 "STRLEN", 

89 "SUNION", 

90 "TTL", 

91 "ZCARD", 

92 "ZCOUNT", 

93 "ZRANGE", 

94 "ZSCORE", 

95 ] 

96) 

97 

98 

99class ClusterMultiKeyCommands(ClusterCommandsProtocol): 

100 """ 

101 A class containing commands that handle more than one key 

102 """ 

103 

104 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]: 

105 """Split keys into a dictionary that maps a slot to a list of keys.""" 

106 

107 slots_to_keys = {} 

108 for key in keys: 

109 slot = key_slot(self.encoder.encode(key)) 

110 slots_to_keys.setdefault(slot, []).append(key) 

111 

112 return slots_to_keys 

113 

114 def _partition_pairs_by_slot( 

115 self, mapping: Mapping[AnyKeyT, EncodableT] 

116 ) -> Dict[int, List[EncodableT]]: 

117 """Split pairs into a dictionary that maps a slot to a list of pairs.""" 

118 

119 slots_to_pairs = {} 

120 for pair in mapping.items(): 

121 slot = key_slot(self.encoder.encode(pair[0])) 

122 slots_to_pairs.setdefault(slot, []).extend(pair) 

123 

124 return slots_to_pairs 

125 

126 def _execute_pipeline_by_slot( 

127 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

128 ) -> List[Any]: 

129 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

130 pipe = self.pipeline() 

131 [ 

132 pipe.execute_command( 

133 command, 

134 *slot_args, 

135 target_nodes=[ 

136 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

137 ], 

138 ) 

139 for slot, slot_args in slots_to_args.items() 

140 ] 

141 return pipe.execute() 

142 

143 def _reorder_keys_by_command( 

144 self, 

145 keys: Iterable[KeyT], 

146 slots_to_args: Mapping[int, Iterable[EncodableT]], 

147 responses: Iterable[Any], 

148 ) -> List[Any]: 

149 results = { 

150 k: v 

151 for slot_values, response in zip(slots_to_args.values(), responses) 

152 for k, v in zip(slot_values, response) 

153 } 

154 return [results[key] for key in keys] 

155 

156 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

157 """ 

158 Splits the keys into different slots and then calls MGET 

159 for the keys of every slot. This operation will not be atomic 

160 if keys belong to more than one slot. 

161 

162 Returns a list of values ordered identically to ``keys`` 

163 

164 For more information see https://redis.io/commands/mget 

165 """ 

166 

167 # Concatenate all keys into a list 

168 keys = list_or_args(keys, args) 

169 

170 # Split keys into slots 

171 slots_to_keys = self._partition_keys_by_slot(keys) 

172 

173 # Execute commands using a pipeline 

174 res = self._execute_pipeline_by_slot("MGET", slots_to_keys) 

175 

176 # Reorder keys in the order the user provided & return 

177 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

178 

179 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

180 """ 

181 Sets key/values based on a mapping. Mapping is a dictionary of 

182 key/value pairs. Both keys and values should be strings or types that 

183 can be cast to a string via str(). 

184 

185 Splits the keys into different slots and then calls MSET 

186 for the keys of every slot. This operation will not be atomic 

187 if keys belong to more than one slot. 

188 

189 For more information see https://redis.io/commands/mset 

190 """ 

191 

192 # Partition the keys by slot 

193 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

194 

195 # Execute commands using a pipeline & return list of replies 

196 return self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

197 

198 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

199 """ 

200 Runs the given command once for the keys 

201 of each slot. Returns the sum of the return values. 

202 """ 

203 

204 # Partition the keys by slot 

205 slots_to_keys = self._partition_keys_by_slot(keys) 

206 

207 # Sum up the reply from each command 

208 return sum(self._execute_pipeline_by_slot(command, slots_to_keys)) 

209 

210 def exists(self, *keys: KeyT) -> ResponseT: 

211 """ 

212 Returns the number of ``names`` that exist in the 

213 whole cluster. The keys are first split up into slots 

214 and then an EXISTS command is sent for every slot 

215 

216 For more information see https://redis.io/commands/exists 

217 """ 

218 return self._split_command_across_slots("EXISTS", *keys) 

219 

220 def delete(self, *keys: KeyT) -> ResponseT: 

221 """ 

222 Deletes the given keys in the cluster. 

223 The keys are first split up into slots 

224 and then an DEL command is sent for every slot 

225 

226 Non-existent keys are ignored. 

227 Returns the number of keys that were deleted. 

228 

229 For more information see https://redis.io/commands/del 

230 """ 

231 return self._split_command_across_slots("DEL", *keys) 

232 

233 def touch(self, *keys: KeyT) -> ResponseT: 

234 """ 

235 Updates the last access time of given keys across the 

236 cluster. 

237 

238 The keys are first split up into slots 

239 and then an TOUCH command is sent for every slot 

240 

241 Non-existent keys are ignored. 

242 Returns the number of keys that were touched. 

243 

244 For more information see https://redis.io/commands/touch 

245 """ 

246 return self._split_command_across_slots("TOUCH", *keys) 

247 

248 def unlink(self, *keys: KeyT) -> ResponseT: 

249 """ 

250 Remove the specified keys in a different thread. 

251 

252 The keys are first split up into slots 

253 and then an TOUCH command is sent for every slot 

254 

255 Non-existent keys are ignored. 

256 Returns the number of keys that were unlinked. 

257 

258 For more information see https://redis.io/commands/unlink 

259 """ 

260 return self._split_command_across_slots("UNLINK", *keys) 

261 

262 

263class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands): 

264 """ 

265 A class containing commands that handle more than one key 

266 """ 

267 

268 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

269 """ 

270 Splits the keys into different slots and then calls MGET 

271 for the keys of every slot. This operation will not be atomic 

272 if keys belong to more than one slot. 

273 

274 Returns a list of values ordered identically to ``keys`` 

275 

276 For more information see https://redis.io/commands/mget 

277 """ 

278 

279 # Concatenate all keys into a list 

280 keys = list_or_args(keys, args) 

281 

282 # Split keys into slots 

283 slots_to_keys = self._partition_keys_by_slot(keys) 

284 

285 # Execute commands using a pipeline 

286 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys) 

287 

288 # Reorder keys in the order the user provided & return 

289 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

290 

291 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

292 """ 

293 Sets key/values based on a mapping. Mapping is a dictionary of 

294 key/value pairs. Both keys and values should be strings or types that 

295 can be cast to a string via str(). 

296 

297 Splits the keys into different slots and then calls MSET 

298 for the keys of every slot. This operation will not be atomic 

299 if keys belong to more than one slot. 

300 

301 For more information see https://redis.io/commands/mset 

302 """ 

303 

304 # Partition the keys by slot 

305 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

306 

307 # Execute commands using a pipeline & return list of replies 

308 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

309 

310 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

311 """ 

312 Runs the given command once for the keys 

313 of each slot. Returns the sum of the return values. 

314 """ 

315 

316 # Partition the keys by slot 

317 slots_to_keys = self._partition_keys_by_slot(keys) 

318 

319 # Sum up the reply from each command 

320 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys)) 

321 

322 async def _execute_pipeline_by_slot( 

323 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

324 ) -> List[Any]: 

325 if self._initialize: 

326 await self.initialize() 

327 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

328 pipe = self.pipeline() 

329 [ 

330 pipe.execute_command( 

331 command, 

332 *slot_args, 

333 target_nodes=[ 

334 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

335 ], 

336 ) 

337 for slot, slot_args in slots_to_args.items() 

338 ] 

339 return await pipe.execute() 

340 

341 

342class ClusterManagementCommands(ManagementCommands): 

343 """ 

344 A class for Redis Cluster management commands 

345 

346 The class inherits from Redis's core ManagementCommands class and do the 

347 required adjustments to work with cluster mode 

348 """ 

349 

350 def slaveof(self, *args, **kwargs) -> NoReturn: 

351 """ 

352 Make the server a replica of another instance, or promote it as master. 

353 

354 For more information see https://redis.io/commands/slaveof 

355 """ 

356 raise RedisClusterException("SLAVEOF is not supported in cluster mode") 

357 

358 def replicaof(self, *args, **kwargs) -> NoReturn: 

359 """ 

360 Make the server a replica of another instance, or promote it as master. 

361 

362 For more information see https://redis.io/commands/replicaof 

363 """ 

364 raise RedisClusterException("REPLICAOF is not supported in cluster mode") 

365 

366 def swapdb(self, *args, **kwargs) -> NoReturn: 

367 """ 

368 Swaps two Redis databases. 

369 

370 For more information see https://redis.io/commands/swapdb 

371 """ 

372 raise RedisClusterException("SWAPDB is not supported in cluster mode") 

373 

374 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT: 

375 """ 

376 Returns the node's id. 

377 

378 :target_node: 'ClusterNode' 

379 The node to execute the command on 

380 

381 For more information check https://redis.io/commands/cluster-myid/ 

382 """ 

383 return self.execute_command("CLUSTER MYID", target_nodes=target_node) 

384 

385 def cluster_addslots( 

386 self, target_node: "TargetNodesT", *slots: EncodableT 

387 ) -> ResponseT: 

388 """ 

389 Assign new hash slots to receiving node. Sends to specified node. 

390 

391 :target_node: 'ClusterNode' 

392 The node to execute the command on 

393 

394 For more information see https://redis.io/commands/cluster-addslots 

395 """ 

396 return self.execute_command( 

397 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node 

398 ) 

399 

400 def cluster_addslotsrange( 

401 self, target_node: "TargetNodesT", *slots: EncodableT 

402 ) -> ResponseT: 

403 """ 

404 Similar to the CLUSTER ADDSLOTS command. 

405 The difference between the two commands is that ADDSLOTS takes a list of slots 

406 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges 

407 (specified by start and end slots) to assign to the node. 

408 

409 :target_node: 'ClusterNode' 

410 The node to execute the command on 

411 

412 For more information see https://redis.io/commands/cluster-addslotsrange 

413 """ 

414 return self.execute_command( 

415 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node 

416 ) 

417 

418 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT: 

419 """ 

420 Return the number of local keys in the specified hash slot 

421 Send to node based on specified slot_id 

422 

423 For more information see https://redis.io/commands/cluster-countkeysinslot 

424 """ 

425 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id) 

426 

427 def cluster_count_failure_report(self, node_id: str) -> ResponseT: 

428 """ 

429 Return the number of failure reports active for a given node 

430 Sends to a random node 

431 

432 For more information see https://redis.io/commands/cluster-count-failure-reports 

433 """ 

434 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id) 

435 

436 def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

437 """ 

438 Set hash slots as unbound in the cluster. 

439 It determines by it self what node the slot is in and sends it there 

440 

441 Returns a list of the results for each processed slot. 

442 

443 For more information see https://redis.io/commands/cluster-delslots 

444 """ 

445 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots] 

446 

447 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT: 

448 """ 

449 Similar to the CLUSTER DELSLOTS command. 

450 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove 

451 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove 

452 from the node. 

453 

454 For more information see https://redis.io/commands/cluster-delslotsrange 

455 """ 

456 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots) 

457 

458 def cluster_failover( 

459 self, target_node: "TargetNodesT", option: Optional[str] = None 

460 ) -> ResponseT: 

461 """ 

462 Forces a slave to perform a manual failover of its master 

463 Sends to specified node 

464 

465 :target_node: 'ClusterNode' 

466 The node to execute the command on 

467 

468 For more information see https://redis.io/commands/cluster-failover 

469 """ 

470 if option: 

471 if option.upper() not in ["FORCE", "TAKEOVER"]: 

472 raise RedisError( 

473 f"Invalid option for CLUSTER FAILOVER command: {option}" 

474 ) 

475 else: 

476 return self.execute_command( 

477 "CLUSTER FAILOVER", option, target_nodes=target_node 

478 ) 

479 else: 

480 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node) 

481 

482 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

483 """ 

484 Provides info about Redis Cluster node state. 

485 The command will be sent to a random node in the cluster if no target 

486 node is specified. 

487 

488 For more information see https://redis.io/commands/cluster-info 

489 """ 

490 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes) 

491 

492 def cluster_keyslot(self, key: str) -> ResponseT: 

493 """ 

494 Returns the hash slot of the specified key 

495 Sends to random node in the cluster 

496 

497 For more information see https://redis.io/commands/cluster-keyslot 

498 """ 

499 return self.execute_command("CLUSTER KEYSLOT", key) 

500 

501 def cluster_meet( 

502 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None 

503 ) -> ResponseT: 

504 """ 

505 Force a node cluster to handshake with another node. 

506 Sends to specified node. 

507 

508 For more information see https://redis.io/commands/cluster-meet 

509 """ 

510 return self.execute_command( 

511 "CLUSTER MEET", host, port, target_nodes=target_nodes 

512 ) 

513 

514 def cluster_nodes(self) -> ResponseT: 

515 """ 

516 Get Cluster config for the node. 

517 Sends to random node in the cluster 

518 

519 For more information see https://redis.io/commands/cluster-nodes 

520 """ 

521 return self.execute_command("CLUSTER NODES") 

522 

523 def cluster_replicate( 

524 self, target_nodes: "TargetNodesT", node_id: str 

525 ) -> ResponseT: 

526 """ 

527 Reconfigure a node as a slave of the specified master node 

528 

529 For more information see https://redis.io/commands/cluster-replicate 

530 """ 

531 return self.execute_command( 

532 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes 

533 ) 

534 

535 def cluster_reset( 

536 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None 

537 ) -> ResponseT: 

538 """ 

539 Reset a Redis Cluster node 

540 

541 If 'soft' is True then it will send 'SOFT' argument 

542 If 'soft' is False then it will send 'HARD' argument 

543 

544 For more information see https://redis.io/commands/cluster-reset 

545 """ 

546 return self.execute_command( 

547 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes 

548 ) 

549 

550 def cluster_save_config( 

551 self, target_nodes: Optional["TargetNodesT"] = None 

552 ) -> ResponseT: 

553 """ 

554 Forces the node to save cluster state on disk 

555 

556 For more information see https://redis.io/commands/cluster-saveconfig 

557 """ 

558 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes) 

559 

560 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT: 

561 """ 

562 Returns the number of keys in the specified cluster slot 

563 

564 For more information see https://redis.io/commands/cluster-getkeysinslot 

565 """ 

566 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) 

567 

568 def cluster_set_config_epoch( 

569 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None 

570 ) -> ResponseT: 

571 """ 

572 Set the configuration epoch in a new node 

573 

574 For more information see https://redis.io/commands/cluster-set-config-epoch 

575 """ 

576 return self.execute_command( 

577 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes 

578 ) 

579 

580 def cluster_setslot( 

581 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str 

582 ) -> ResponseT: 

583 """ 

584 Bind an hash slot to a specific node 

585 

586 :target_node: 'ClusterNode' 

587 The node to execute the command on 

588 

589 For more information see https://redis.io/commands/cluster-setslot 

590 """ 

591 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"): 

592 return self.execute_command( 

593 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node 

594 ) 

595 elif state.upper() == "STABLE": 

596 raise RedisError('For "stable" state please use cluster_setslot_stable') 

597 else: 

598 raise RedisError(f"Invalid slot state: {state}") 

599 

600 def cluster_setslot_stable(self, slot_id: int) -> ResponseT: 

601 """ 

602 Clears migrating / importing state from the slot. 

603 It determines by it self what node the slot is in and sends it there. 

604 

605 For more information see https://redis.io/commands/cluster-setslot 

606 """ 

607 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") 

608 

609 def cluster_replicas( 

610 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None 

611 ) -> ResponseT: 

612 """ 

613 Provides a list of replica nodes replicating from the specified primary 

614 target node. 

615 

616 For more information see https://redis.io/commands/cluster-replicas 

617 """ 

618 return self.execute_command( 

619 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes 

620 ) 

621 

622 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

623 """ 

624 Get array of Cluster slot to node mappings 

625 

626 For more information see https://redis.io/commands/cluster-slots 

627 """ 

628 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes) 

629 

630 def cluster_shards(self, target_nodes=None): 

631 """ 

632 Returns details about the shards of the cluster. 

633 

634 For more information see https://redis.io/commands/cluster-shards 

635 """ 

636 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes) 

637 

638 def cluster_myshardid(self, target_nodes=None): 

639 """ 

640 Returns the shard ID of the node. 

641 

642 For more information see https://redis.io/commands/cluster-myshardid/ 

643 """ 

644 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes) 

645 

646 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT: 

647 """ 

648 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each 

649 peer in the cluster: One for sending outbound messages towards the peer and one 

650 for receiving inbound messages from the peer. 

651 

652 This command outputs information of all such peer links as an array. 

653 

654 For more information see https://redis.io/commands/cluster-links 

655 """ 

656 return self.execute_command("CLUSTER LINKS", target_nodes=target_node) 

657 

658 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

659 raise NotImplementedError( 

660 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client." 

661 ) 

662 

663 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

664 raise NotImplementedError( 

665 "CLUSTER BUMPEPOCH is intentionally not implemented in the client." 

666 ) 

667 

668 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

669 """ 

670 Enables read queries. 

671 The command will be sent to the default cluster node if target_nodes is 

672 not specified. 

673 

674 For more information see https://redis.io/commands/readonly 

675 """ 

676 if target_nodes == "replicas" or target_nodes == "all": 

677 # read_from_replicas will only be enabled if the READONLY command 

678 # is sent to all replicas 

679 self.read_from_replicas = True 

680 return self.execute_command("READONLY", target_nodes=target_nodes) 

681 

682 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

683 """ 

684 Disables read queries. 

685 The command will be sent to the default cluster node if target_nodes is 

686 not specified. 

687 

688 For more information see https://redis.io/commands/readwrite 

689 """ 

690 # Reset read from replicas flag 

691 self.read_from_replicas = False 

692 return self.execute_command("READWRITE", target_nodes=target_nodes) 

693 

694 

695class AsyncClusterManagementCommands( 

696 ClusterManagementCommands, AsyncManagementCommands 

697): 

698 """ 

699 A class for Redis Cluster management commands 

700 

701 The class inherits from Redis's core ManagementCommands class and do the 

702 required adjustments to work with cluster mode 

703 """ 

704 

705 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

706 """ 

707 Set hash slots as unbound in the cluster. 

708 It determines by it self what node the slot is in and sends it there 

709 

710 Returns a list of the results for each processed slot. 

711 

712 For more information see https://redis.io/commands/cluster-delslots 

713 """ 

714 return await asyncio.gather( 

715 *( 

716 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot)) 

717 for slot in slots 

718 ) 

719 ) 

720 

721 

722class ClusterDataAccessCommands(DataAccessCommands): 

723 """ 

724 A class for Redis Cluster Data Access Commands 

725 

726 The class inherits from Redis's core DataAccessCommand class and do the 

727 required adjustments to work with cluster mode 

728 """ 

729 

730 def stralgo( 

731 self, 

732 algo: Literal["LCS"], 

733 value1: KeyT, 

734 value2: KeyT, 

735 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", 

736 len: bool = False, 

737 idx: bool = False, 

738 minmatchlen: Optional[int] = None, 

739 withmatchlen: bool = False, 

740 **kwargs, 

741 ) -> ResponseT: 

742 """ 

743 Implements complex algorithms that operate on strings. 

744 Right now the only algorithm implemented is the LCS algorithm 

745 (longest common substring). However new algorithms could be 

746 implemented in the future. 

747 

748 ``algo`` Right now must be LCS 

749 ``value1`` and ``value2`` Can be two strings or two keys 

750 ``specific_argument`` Specifying if the arguments to the algorithm 

751 will be keys or strings. strings is the default. 

752 ``len`` Returns just the len of the match. 

753 ``idx`` Returns the match positions in each string. 

754 ``minmatchlen`` Restrict the list of matches to the ones of a given 

755 minimal length. Can be provided only when ``idx`` set to True. 

756 ``withmatchlen`` Returns the matches with the len of the match. 

757 Can be provided only when ``idx`` set to True. 

758 

759 For more information see https://redis.io/commands/stralgo 

760 """ 

761 target_nodes = kwargs.pop("target_nodes", None) 

762 if specific_argument == "strings" and target_nodes is None: 

763 target_nodes = "default-node" 

764 kwargs.update({"target_nodes": target_nodes}) 

765 return super().stralgo( 

766 algo, 

767 value1, 

768 value2, 

769 specific_argument, 

770 len, 

771 idx, 

772 minmatchlen, 

773 withmatchlen, 

774 **kwargs, 

775 ) 

776 

777 def scan_iter( 

778 self, 

779 match: Optional[PatternT] = None, 

780 count: Optional[int] = None, 

781 _type: Optional[str] = None, 

782 **kwargs, 

783 ) -> Iterator: 

784 # Do the first query with cursor=0 for all nodes 

785 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) 

786 yield from data 

787 

788 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

789 if cursors: 

790 # Get nodes by name 

791 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

792 

793 # Iterate over each node till its cursor is 0 

794 kwargs.pop("target_nodes", None) 

795 while cursors: 

796 for name, cursor in cursors.items(): 

797 cur, data = self.scan( 

798 cursor=cursor, 

799 match=match, 

800 count=count, 

801 _type=_type, 

802 target_nodes=nodes[name], 

803 **kwargs, 

804 ) 

805 yield from data 

806 cursors[name] = cur[name] 

807 

808 cursors = { 

809 name: cursor for name, cursor in cursors.items() if cursor != 0 

810 } 

811 

812 

813class AsyncClusterDataAccessCommands( 

814 ClusterDataAccessCommands, AsyncDataAccessCommands 

815): 

816 """ 

817 A class for Redis Cluster Data Access Commands 

818 

819 The class inherits from Redis's core DataAccessCommand class and do the 

820 required adjustments to work with cluster mode 

821 """ 

822 

823 async def scan_iter( 

824 self, 

825 match: Optional[PatternT] = None, 

826 count: Optional[int] = None, 

827 _type: Optional[str] = None, 

828 **kwargs, 

829 ) -> AsyncIterator: 

830 # Do the first query with cursor=0 for all nodes 

831 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs) 

832 for value in data: 

833 yield value 

834 

835 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

836 if cursors: 

837 # Get nodes by name 

838 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

839 

840 # Iterate over each node till its cursor is 0 

841 kwargs.pop("target_nodes", None) 

842 while cursors: 

843 for name, cursor in cursors.items(): 

844 cur, data = await self.scan( 

845 cursor=cursor, 

846 match=match, 

847 count=count, 

848 _type=_type, 

849 target_nodes=nodes[name], 

850 **kwargs, 

851 ) 

852 for value in data: 

853 yield value 

854 cursors[name] = cur[name] 

855 

856 cursors = { 

857 name: cursor for name, cursor in cursors.items() if cursor != 0 

858 } 

859 

860 

861class RedisClusterCommands( 

862 ClusterMultiKeyCommands, 

863 ClusterManagementCommands, 

864 ACLCommands, 

865 PubSubCommands, 

866 ClusterDataAccessCommands, 

867 ScriptCommands, 

868 FunctionCommands, 

869 ModuleCommands, 

870 RedisModuleCommands, 

871): 

872 """ 

873 A class for all Redis Cluster commands 

874 

875 For key-based commands, the target node(s) will be internally determined 

876 by the keys' hash slot. 

877 Non-key-based commands can be executed with the 'target_nodes' argument to 

878 target specific nodes. By default, if target_nodes is not specified, the 

879 command will be executed on the default cluster node. 

880 

881 :param :target_nodes: type can be one of the followings: 

882 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

883 - 'ClusterNode' 

884 - 'list(ClusterNodes)' 

885 - 'dict(any:clusterNodes)' 

886 

887 for example: 

888 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

889 """ 

890 

891 

892class AsyncRedisClusterCommands( 

893 AsyncClusterMultiKeyCommands, 

894 AsyncClusterManagementCommands, 

895 AsyncACLCommands, 

896 AsyncClusterDataAccessCommands, 

897 AsyncScriptCommands, 

898 AsyncFunctionCommands, 

899 AsyncModuleCommands, 

900 AsyncRedisModuleCommands, 

901): 

902 """ 

903 A class for all Redis Cluster commands 

904 

905 For key-based commands, the target node(s) will be internally determined 

906 by the keys' hash slot. 

907 Non-key-based commands can be executed with the 'target_nodes' argument to 

908 target specific nodes. By default, if target_nodes is not specified, the 

909 command will be executed on the default cluster node. 

910 

911 :param :target_nodes: type can be one of the followings: 

912 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

913 - 'ClusterNode' 

914 - 'list(ClusterNodes)' 

915 - 'dict(any:clusterNodes)' 

916 

917 for example: 

918 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

919 """