Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/commands/cluster.py: 37%

184 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 07:09 +0000

1import asyncio 

2from typing import ( 

3 TYPE_CHECKING, 

4 Any, 

5 AsyncIterator, 

6 Dict, 

7 Iterable, 

8 Iterator, 

9 List, 

10 Mapping, 

11 NoReturn, 

12 Optional, 

13 Union, 

14) 

15 

16from redis.compat import Literal 

17from redis.crc import key_slot 

18from redis.exceptions import RedisClusterException, RedisError 

19from redis.typing import ( 

20 AnyKeyT, 

21 ClusterCommandsProtocol, 

22 EncodableT, 

23 KeysT, 

24 KeyT, 

25 PatternT, 

26) 

27 

28from .core import ( 

29 ACLCommands, 

30 AsyncACLCommands, 

31 AsyncDataAccessCommands, 

32 AsyncFunctionCommands, 

33 AsyncManagementCommands, 

34 AsyncScriptCommands, 

35 DataAccessCommands, 

36 FunctionCommands, 

37 ManagementCommands, 

38 PubSubCommands, 

39 ResponseT, 

40 ScriptCommands, 

41) 

42from .helpers import list_or_args 

43from .redismodules import RedisModuleCommands 

44 

45if TYPE_CHECKING: 

46 from redis.asyncio.cluster import TargetNodesT 

47 

48 

49# Not complete, but covers the major ones 

50# https://redis.io/commands 

51READ_COMMANDS = frozenset( 

52 [ 

53 "BITCOUNT", 

54 "BITPOS", 

55 "EVAL_RO", 

56 "EVALSHA_RO", 

57 "EXISTS", 

58 "GEODIST", 

59 "GEOHASH", 

60 "GEOPOS", 

61 "GEORADIUS", 

62 "GEORADIUSBYMEMBER", 

63 "GET", 

64 "GETBIT", 

65 "GETRANGE", 

66 "HEXISTS", 

67 "HGET", 

68 "HGETALL", 

69 "HKEYS", 

70 "HLEN", 

71 "HMGET", 

72 "HSTRLEN", 

73 "HVALS", 

74 "KEYS", 

75 "LINDEX", 

76 "LLEN", 

77 "LRANGE", 

78 "MGET", 

79 "PTTL", 

80 "RANDOMKEY", 

81 "SCARD", 

82 "SDIFF", 

83 "SINTER", 

84 "SISMEMBER", 

85 "SMEMBERS", 

86 "SRANDMEMBER", 

87 "STRLEN", 

88 "SUNION", 

89 "TTL", 

90 "ZCARD", 

91 "ZCOUNT", 

92 "ZRANGE", 

93 "ZSCORE", 

94 ] 

95) 

96 

97 

98class ClusterMultiKeyCommands(ClusterCommandsProtocol): 

99 """ 

100 A class containing commands that handle more than one key 

101 """ 

102 

103 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]: 

104 """Split keys into a dictionary that maps a slot to a list of keys.""" 

105 

106 slots_to_keys = {} 

107 for key in keys: 

108 slot = key_slot(self.encoder.encode(key)) 

109 slots_to_keys.setdefault(slot, []).append(key) 

110 

111 return slots_to_keys 

112 

113 def _partition_pairs_by_slot( 

114 self, mapping: Mapping[AnyKeyT, EncodableT] 

115 ) -> Dict[int, List[EncodableT]]: 

116 """Split pairs into a dictionary that maps a slot to a list of pairs.""" 

117 

118 slots_to_pairs = {} 

119 for pair in mapping.items(): 

120 slot = key_slot(self.encoder.encode(pair[0])) 

121 slots_to_pairs.setdefault(slot, []).extend(pair) 

122 

123 return slots_to_pairs 

124 

125 def _execute_pipeline_by_slot( 

126 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

127 ) -> List[Any]: 

128 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

129 pipe = self.pipeline() 

130 [ 

131 pipe.execute_command( 

132 command, 

133 *slot_args, 

134 target_nodes=[ 

135 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

136 ], 

137 ) 

138 for slot, slot_args in slots_to_args.items() 

139 ] 

140 return pipe.execute() 

141 

142 def _reorder_keys_by_command( 

143 self, 

144 keys: Iterable[KeyT], 

145 slots_to_args: Mapping[int, Iterable[EncodableT]], 

146 responses: Iterable[Any], 

147 ) -> List[Any]: 

148 results = { 

149 k: v 

150 for slot_values, response in zip(slots_to_args.values(), responses) 

151 for k, v in zip(slot_values, response) 

152 } 

153 return [results[key] for key in keys] 

154 

155 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

156 """ 

157 Splits the keys into different slots and then calls MGET 

158 for the keys of every slot. This operation will not be atomic 

159 if keys belong to more than one slot. 

160 

161 Returns a list of values ordered identically to ``keys`` 

162 

163 For more information see https://redis.io/commands/mget 

164 """ 

165 

166 # Concatenate all keys into a list 

167 keys = list_or_args(keys, args) 

168 

169 # Split keys into slots 

170 slots_to_keys = self._partition_keys_by_slot(keys) 

171 

172 # Execute commands using a pipeline 

173 res = self._execute_pipeline_by_slot("MGET", slots_to_keys) 

174 

175 # Reorder keys in the order the user provided & return 

176 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

177 

178 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

179 """ 

180 Sets key/values based on a mapping. Mapping is a dictionary of 

181 key/value pairs. Both keys and values should be strings or types that 

182 can be cast to a string via str(). 

183 

184 Splits the keys into different slots and then calls MSET 

185 for the keys of every slot. This operation will not be atomic 

186 if keys belong to more than one slot. 

187 

188 For more information see https://redis.io/commands/mset 

189 """ 

190 

191 # Partition the keys by slot 

192 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

193 

194 # Execute commands using a pipeline & return list of replies 

195 return self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

196 

197 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

198 """ 

199 Runs the given command once for the keys 

200 of each slot. Returns the sum of the return values. 

201 """ 

202 

203 # Partition the keys by slot 

204 slots_to_keys = self._partition_keys_by_slot(keys) 

205 

206 # Sum up the reply from each command 

207 return sum(self._execute_pipeline_by_slot(command, slots_to_keys)) 

208 

209 def exists(self, *keys: KeyT) -> ResponseT: 

210 """ 

211 Returns the number of ``names`` that exist in the 

212 whole cluster. The keys are first split up into slots 

213 and then an EXISTS command is sent for every slot 

214 

215 For more information see https://redis.io/commands/exists 

216 """ 

217 return self._split_command_across_slots("EXISTS", *keys) 

218 

219 def delete(self, *keys: KeyT) -> ResponseT: 

220 """ 

221 Deletes the given keys in the cluster. 

222 The keys are first split up into slots 

223 and then an DEL command is sent for every slot 

224 

225 Non-existant keys are ignored. 

226 Returns the number of keys that were deleted. 

227 

228 For more information see https://redis.io/commands/del 

229 """ 

230 return self._split_command_across_slots("DEL", *keys) 

231 

232 def touch(self, *keys: KeyT) -> ResponseT: 

233 """ 

234 Updates the last access time of given keys across the 

235 cluster. 

236 

237 The keys are first split up into slots 

238 and then an TOUCH command is sent for every slot 

239 

240 Non-existant keys are ignored. 

241 Returns the number of keys that were touched. 

242 

243 For more information see https://redis.io/commands/touch 

244 """ 

245 return self._split_command_across_slots("TOUCH", *keys) 

246 

247 def unlink(self, *keys: KeyT) -> ResponseT: 

248 """ 

249 Remove the specified keys in a different thread. 

250 

251 The keys are first split up into slots 

252 and then an TOUCH command is sent for every slot 

253 

254 Non-existant keys are ignored. 

255 Returns the number of keys that were unlinked. 

256 

257 For more information see https://redis.io/commands/unlink 

258 """ 

259 return self._split_command_across_slots("UNLINK", *keys) 

260 

261 

262class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands): 

263 """ 

264 A class containing commands that handle more than one key 

265 """ 

266 

267 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

268 """ 

269 Splits the keys into different slots and then calls MGET 

270 for the keys of every slot. This operation will not be atomic 

271 if keys belong to more than one slot. 

272 

273 Returns a list of values ordered identically to ``keys`` 

274 

275 For more information see https://redis.io/commands/mget 

276 """ 

277 

278 # Concatenate all keys into a list 

279 keys = list_or_args(keys, args) 

280 

281 # Split keys into slots 

282 slots_to_keys = self._partition_keys_by_slot(keys) 

283 

284 # Execute commands using a pipeline 

285 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys) 

286 

287 # Reorder keys in the order the user provided & return 

288 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

289 

290 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

291 """ 

292 Sets key/values based on a mapping. Mapping is a dictionary of 

293 key/value pairs. Both keys and values should be strings or types that 

294 can be cast to a string via str(). 

295 

296 Splits the keys into different slots and then calls MSET 

297 for the keys of every slot. This operation will not be atomic 

298 if keys belong to more than one slot. 

299 

300 For more information see https://redis.io/commands/mset 

301 """ 

302 

303 # Partition the keys by slot 

304 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

305 

306 # Execute commands using a pipeline & return list of replies 

307 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

308 

309 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

310 """ 

311 Runs the given command once for the keys 

312 of each slot. Returns the sum of the return values. 

313 """ 

314 

315 # Partition the keys by slot 

316 slots_to_keys = self._partition_keys_by_slot(keys) 

317 

318 # Sum up the reply from each command 

319 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys)) 

320 

321 async def _execute_pipeline_by_slot( 

322 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

323 ) -> List[Any]: 

324 if self._initialize: 

325 await self.initialize() 

326 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

327 pipe = self.pipeline() 

328 [ 

329 pipe.execute_command( 

330 command, 

331 *slot_args, 

332 target_nodes=[ 

333 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

334 ], 

335 ) 

336 for slot, slot_args in slots_to_args.items() 

337 ] 

338 return await pipe.execute() 

339 

340 

341class ClusterManagementCommands(ManagementCommands): 

342 """ 

343 A class for Redis Cluster management commands 

344 

345 The class inherits from Redis's core ManagementCommands class and do the 

346 required adjustments to work with cluster mode 

347 """ 

348 

349 def slaveof(self, *args, **kwargs) -> NoReturn: 

350 """ 

351 Make the server a replica of another instance, or promote it as master. 

352 

353 For more information see https://redis.io/commands/slaveof 

354 """ 

355 raise RedisClusterException("SLAVEOF is not supported in cluster mode") 

356 

357 def replicaof(self, *args, **kwargs) -> NoReturn: 

358 """ 

359 Make the server a replica of another instance, or promote it as master. 

360 

361 For more information see https://redis.io/commands/replicaof 

362 """ 

363 raise RedisClusterException("REPLICAOF is not supported in cluster mode") 

364 

365 def swapdb(self, *args, **kwargs) -> NoReturn: 

366 """ 

367 Swaps two Redis databases. 

368 

369 For more information see https://redis.io/commands/swapdb 

370 """ 

371 raise RedisClusterException("SWAPDB is not supported in cluster mode") 

372 

373 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT: 

374 """ 

375 Returns the node's id. 

376 

377 :target_node: 'ClusterNode' 

378 The node to execute the command on 

379 

380 For more information check https://redis.io/commands/cluster-myid/ 

381 """ 

382 return self.execute_command("CLUSTER MYID", target_nodes=target_node) 

383 

384 def cluster_addslots( 

385 self, target_node: "TargetNodesT", *slots: EncodableT 

386 ) -> ResponseT: 

387 """ 

388 Assign new hash slots to receiving node. Sends to specified node. 

389 

390 :target_node: 'ClusterNode' 

391 The node to execute the command on 

392 

393 For more information see https://redis.io/commands/cluster-addslots 

394 """ 

395 return self.execute_command( 

396 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node 

397 ) 

398 

399 def cluster_addslotsrange( 

400 self, target_node: "TargetNodesT", *slots: EncodableT 

401 ) -> ResponseT: 

402 """ 

403 Similar to the CLUSTER ADDSLOTS command. 

404 The difference between the two commands is that ADDSLOTS takes a list of slots 

405 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges 

406 (specified by start and end slots) to assign to the node. 

407 

408 :target_node: 'ClusterNode' 

409 The node to execute the command on 

410 

411 For more information see https://redis.io/commands/cluster-addslotsrange 

412 """ 

413 return self.execute_command( 

414 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node 

415 ) 

416 

417 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT: 

418 """ 

419 Return the number of local keys in the specified hash slot 

420 Send to node based on specified slot_id 

421 

422 For more information see https://redis.io/commands/cluster-countkeysinslot 

423 """ 

424 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id) 

425 

426 def cluster_count_failure_report(self, node_id: str) -> ResponseT: 

427 """ 

428 Return the number of failure reports active for a given node 

429 Sends to a random node 

430 

431 For more information see https://redis.io/commands/cluster-count-failure-reports 

432 """ 

433 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id) 

434 

435 def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

436 """ 

437 Set hash slots as unbound in the cluster. 

438 It determines by it self what node the slot is in and sends it there 

439 

440 Returns a list of the results for each processed slot. 

441 

442 For more information see https://redis.io/commands/cluster-delslots 

443 """ 

444 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots] 

445 

446 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT: 

447 """ 

448 Similar to the CLUSTER DELSLOTS command. 

449 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove 

450 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove 

451 from the node. 

452 

453 For more information see https://redis.io/commands/cluster-delslotsrange 

454 """ 

455 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots) 

456 

457 def cluster_failover( 

458 self, target_node: "TargetNodesT", option: Optional[str] = None 

459 ) -> ResponseT: 

460 """ 

461 Forces a slave to perform a manual failover of its master 

462 Sends to specified node 

463 

464 :target_node: 'ClusterNode' 

465 The node to execute the command on 

466 

467 For more information see https://redis.io/commands/cluster-failover 

468 """ 

469 if option: 

470 if option.upper() not in ["FORCE", "TAKEOVER"]: 

471 raise RedisError( 

472 f"Invalid option for CLUSTER FAILOVER command: {option}" 

473 ) 

474 else: 

475 return self.execute_command( 

476 "CLUSTER FAILOVER", option, target_nodes=target_node 

477 ) 

478 else: 

479 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node) 

480 

481 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

482 """ 

483 Provides info about Redis Cluster node state. 

484 The command will be sent to a random node in the cluster if no target 

485 node is specified. 

486 

487 For more information see https://redis.io/commands/cluster-info 

488 """ 

489 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes) 

490 

491 def cluster_keyslot(self, key: str) -> ResponseT: 

492 """ 

493 Returns the hash slot of the specified key 

494 Sends to random node in the cluster 

495 

496 For more information see https://redis.io/commands/cluster-keyslot 

497 """ 

498 return self.execute_command("CLUSTER KEYSLOT", key) 

499 

500 def cluster_meet( 

501 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None 

502 ) -> ResponseT: 

503 """ 

504 Force a node cluster to handshake with another node. 

505 Sends to specified node. 

506 

507 For more information see https://redis.io/commands/cluster-meet 

508 """ 

509 return self.execute_command( 

510 "CLUSTER MEET", host, port, target_nodes=target_nodes 

511 ) 

512 

513 def cluster_nodes(self) -> ResponseT: 

514 """ 

515 Get Cluster config for the node. 

516 Sends to random node in the cluster 

517 

518 For more information see https://redis.io/commands/cluster-nodes 

519 """ 

520 return self.execute_command("CLUSTER NODES") 

521 

522 def cluster_replicate( 

523 self, target_nodes: "TargetNodesT", node_id: str 

524 ) -> ResponseT: 

525 """ 

526 Reconfigure a node as a slave of the specified master node 

527 

528 For more information see https://redis.io/commands/cluster-replicate 

529 """ 

530 return self.execute_command( 

531 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes 

532 ) 

533 

534 def cluster_reset( 

535 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None 

536 ) -> ResponseT: 

537 """ 

538 Reset a Redis Cluster node 

539 

540 If 'soft' is True then it will send 'SOFT' argument 

541 If 'soft' is False then it will send 'HARD' argument 

542 

543 For more information see https://redis.io/commands/cluster-reset 

544 """ 

545 return self.execute_command( 

546 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes 

547 ) 

548 

549 def cluster_save_config( 

550 self, target_nodes: Optional["TargetNodesT"] = None 

551 ) -> ResponseT: 

552 """ 

553 Forces the node to save cluster state on disk 

554 

555 For more information see https://redis.io/commands/cluster-saveconfig 

556 """ 

557 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes) 

558 

559 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT: 

560 """ 

561 Returns the number of keys in the specified cluster slot 

562 

563 For more information see https://redis.io/commands/cluster-getkeysinslot 

564 """ 

565 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) 

566 

567 def cluster_set_config_epoch( 

568 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None 

569 ) -> ResponseT: 

570 """ 

571 Set the configuration epoch in a new node 

572 

573 For more information see https://redis.io/commands/cluster-set-config-epoch 

574 """ 

575 return self.execute_command( 

576 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes 

577 ) 

578 

579 def cluster_setslot( 

580 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str 

581 ) -> ResponseT: 

582 """ 

583 Bind an hash slot to a specific node 

584 

585 :target_node: 'ClusterNode' 

586 The node to execute the command on 

587 

588 For more information see https://redis.io/commands/cluster-setslot 

589 """ 

590 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"): 

591 return self.execute_command( 

592 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node 

593 ) 

594 elif state.upper() == "STABLE": 

595 raise RedisError('For "stable" state please use ' "cluster_setslot_stable") 

596 else: 

597 raise RedisError(f"Invalid slot state: {state}") 

598 

599 def cluster_setslot_stable(self, slot_id: int) -> ResponseT: 

600 """ 

601 Clears migrating / importing state from the slot. 

602 It determines by it self what node the slot is in and sends it there. 

603 

604 For more information see https://redis.io/commands/cluster-setslot 

605 """ 

606 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") 

607 

608 def cluster_replicas( 

609 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None 

610 ) -> ResponseT: 

611 """ 

612 Provides a list of replica nodes replicating from the specified primary 

613 target node. 

614 

615 For more information see https://redis.io/commands/cluster-replicas 

616 """ 

617 return self.execute_command( 

618 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes 

619 ) 

620 

621 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

622 """ 

623 Get array of Cluster slot to node mappings 

624 

625 For more information see https://redis.io/commands/cluster-slots 

626 """ 

627 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes) 

628 

629 def cluster_shards(self, target_nodes=None): 

630 """ 

631 Returns details about the shards of the cluster. 

632 

633 For more information see https://redis.io/commands/cluster-shards 

634 """ 

635 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes) 

636 

637 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT: 

638 """ 

639 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each 

640 peer in the cluster: One for sending outbound messages towards the peer and one 

641 for receiving inbound messages from the peer. 

642 

643 This command outputs information of all such peer links as an array. 

644 

645 For more information see https://redis.io/commands/cluster-links 

646 """ 

647 return self.execute_command("CLUSTER LINKS", target_nodes=target_node) 

648 

649 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

650 raise NotImplementedError( 

651 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client." 

652 ) 

653 

654 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

655 raise NotImplementedError( 

656 "CLUSTER BUMPEPOCH is intentionally not implemented in the client." 

657 ) 

658 

659 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

660 """ 

661 Enables read queries. 

662 The command will be sent to the default cluster node if target_nodes is 

663 not specified. 

664 

665 For more information see https://redis.io/commands/readonly 

666 """ 

667 if target_nodes == "replicas" or target_nodes == "all": 

668 # read_from_replicas will only be enabled if the READONLY command 

669 # is sent to all replicas 

670 self.read_from_replicas = True 

671 return self.execute_command("READONLY", target_nodes=target_nodes) 

672 

673 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

674 """ 

675 Disables read queries. 

676 The command will be sent to the default cluster node if target_nodes is 

677 not specified. 

678 

679 For more information see https://redis.io/commands/readwrite 

680 """ 

681 # Reset read from replicas flag 

682 self.read_from_replicas = False 

683 return self.execute_command("READWRITE", target_nodes=target_nodes) 

684 

685 

686class AsyncClusterManagementCommands( 

687 ClusterManagementCommands, AsyncManagementCommands 

688): 

689 """ 

690 A class for Redis Cluster management commands 

691 

692 The class inherits from Redis's core ManagementCommands class and do the 

693 required adjustments to work with cluster mode 

694 """ 

695 

696 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

697 """ 

698 Set hash slots as unbound in the cluster. 

699 It determines by it self what node the slot is in and sends it there 

700 

701 Returns a list of the results for each processed slot. 

702 

703 For more information see https://redis.io/commands/cluster-delslots 

704 """ 

705 return await asyncio.gather( 

706 *( 

707 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot)) 

708 for slot in slots 

709 ) 

710 ) 

711 

712 

713class ClusterDataAccessCommands(DataAccessCommands): 

714 """ 

715 A class for Redis Cluster Data Access Commands 

716 

717 The class inherits from Redis's core DataAccessCommand class and do the 

718 required adjustments to work with cluster mode 

719 """ 

720 

721 def stralgo( 

722 self, 

723 algo: Literal["LCS"], 

724 value1: KeyT, 

725 value2: KeyT, 

726 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", 

727 len: bool = False, 

728 idx: bool = False, 

729 minmatchlen: Optional[int] = None, 

730 withmatchlen: bool = False, 

731 **kwargs, 

732 ) -> ResponseT: 

733 """ 

734 Implements complex algorithms that operate on strings. 

735 Right now the only algorithm implemented is the LCS algorithm 

736 (longest common substring). However new algorithms could be 

737 implemented in the future. 

738 

739 ``algo`` Right now must be LCS 

740 ``value1`` and ``value2`` Can be two strings or two keys 

741 ``specific_argument`` Specifying if the arguments to the algorithm 

742 will be keys or strings. strings is the default. 

743 ``len`` Returns just the len of the match. 

744 ``idx`` Returns the match positions in each string. 

745 ``minmatchlen`` Restrict the list of matches to the ones of a given 

746 minimal length. Can be provided only when ``idx`` set to True. 

747 ``withmatchlen`` Returns the matches with the len of the match. 

748 Can be provided only when ``idx`` set to True. 

749 

750 For more information see https://redis.io/commands/stralgo 

751 """ 

752 target_nodes = kwargs.pop("target_nodes", None) 

753 if specific_argument == "strings" and target_nodes is None: 

754 target_nodes = "default-node" 

755 kwargs.update({"target_nodes": target_nodes}) 

756 return super().stralgo( 

757 algo, 

758 value1, 

759 value2, 

760 specific_argument, 

761 len, 

762 idx, 

763 minmatchlen, 

764 withmatchlen, 

765 **kwargs, 

766 ) 

767 

768 def scan_iter( 

769 self, 

770 match: Optional[PatternT] = None, 

771 count: Optional[int] = None, 

772 _type: Optional[str] = None, 

773 **kwargs, 

774 ) -> Iterator: 

775 # Do the first query with cursor=0 for all nodes 

776 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) 

777 yield from data 

778 

779 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

780 if cursors: 

781 # Get nodes by name 

782 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

783 

784 # Iterate over each node till its cursor is 0 

785 kwargs.pop("target_nodes", None) 

786 while cursors: 

787 for name, cursor in cursors.items(): 

788 cur, data = self.scan( 

789 cursor=cursor, 

790 match=match, 

791 count=count, 

792 _type=_type, 

793 target_nodes=nodes[name], 

794 **kwargs, 

795 ) 

796 yield from data 

797 cursors[name] = cur[name] 

798 

799 cursors = { 

800 name: cursor for name, cursor in cursors.items() if cursor != 0 

801 } 

802 

803 

804class AsyncClusterDataAccessCommands( 

805 ClusterDataAccessCommands, AsyncDataAccessCommands 

806): 

807 """ 

808 A class for Redis Cluster Data Access Commands 

809 

810 The class inherits from Redis's core DataAccessCommand class and do the 

811 required adjustments to work with cluster mode 

812 """ 

813 

814 async def scan_iter( 

815 self, 

816 match: Optional[PatternT] = None, 

817 count: Optional[int] = None, 

818 _type: Optional[str] = None, 

819 **kwargs, 

820 ) -> AsyncIterator: 

821 # Do the first query with cursor=0 for all nodes 

822 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs) 

823 for value in data: 

824 yield value 

825 

826 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

827 if cursors: 

828 # Get nodes by name 

829 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

830 

831 # Iterate over each node till its cursor is 0 

832 kwargs.pop("target_nodes", None) 

833 while cursors: 

834 for name, cursor in cursors.items(): 

835 cur, data = await self.scan( 

836 cursor=cursor, 

837 match=match, 

838 count=count, 

839 _type=_type, 

840 target_nodes=nodes[name], 

841 **kwargs, 

842 ) 

843 for value in data: 

844 yield value 

845 cursors[name] = cur[name] 

846 

847 cursors = { 

848 name: cursor for name, cursor in cursors.items() if cursor != 0 

849 } 

850 

851 

852class RedisClusterCommands( 

853 ClusterMultiKeyCommands, 

854 ClusterManagementCommands, 

855 ACLCommands, 

856 PubSubCommands, 

857 ClusterDataAccessCommands, 

858 ScriptCommands, 

859 FunctionCommands, 

860 RedisModuleCommands, 

861): 

862 """ 

863 A class for all Redis Cluster commands 

864 

865 For key-based commands, the target node(s) will be internally determined 

866 by the keys' hash slot. 

867 Non-key-based commands can be executed with the 'target_nodes' argument to 

868 target specific nodes. By default, if target_nodes is not specified, the 

869 command will be executed on the default cluster node. 

870 

871 :param :target_nodes: type can be one of the followings: 

872 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

873 - 'ClusterNode' 

874 - 'list(ClusterNodes)' 

875 - 'dict(any:clusterNodes)' 

876 

877 for example: 

878 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

879 """ 

880 

881 

882class AsyncRedisClusterCommands( 

883 AsyncClusterMultiKeyCommands, 

884 AsyncClusterManagementCommands, 

885 AsyncACLCommands, 

886 AsyncClusterDataAccessCommands, 

887 AsyncScriptCommands, 

888 AsyncFunctionCommands, 

889): 

890 """ 

891 A class for all Redis Cluster commands 

892 

893 For key-based commands, the target node(s) will be internally determined 

894 by the keys' hash slot. 

895 Non-key-based commands can be executed with the 'target_nodes' argument to 

896 target specific nodes. By default, if target_nodes is not specified, the 

897 command will be executed on the default cluster node. 

898 

899 :param :target_nodes: type can be one of the followings: 

900 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

901 - 'ClusterNode' 

902 - 'list(ClusterNodes)' 

903 - 'dict(any:clusterNodes)' 

904 

905 for example: 

906 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

907 """