Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/commands/cluster.py: 37%

187 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 06:16 +0000

1import asyncio 

2from typing import ( 

3 TYPE_CHECKING, 

4 Any, 

5 AsyncIterator, 

6 Dict, 

7 Iterable, 

8 Iterator, 

9 List, 

10 Literal, 

11 Mapping, 

12 NoReturn, 

13 Optional, 

14 Union, 

15) 

16 

17from redis.crc import key_slot 

18from redis.exceptions import RedisClusterException, RedisError 

19from redis.typing import ( 

20 AnyKeyT, 

21 ClusterCommandsProtocol, 

22 EncodableT, 

23 KeysT, 

24 KeyT, 

25 PatternT, 

26 ResponseT, 

27) 

28 

29from .core import ( 

30 ACLCommands, 

31 AsyncACLCommands, 

32 AsyncDataAccessCommands, 

33 AsyncFunctionCommands, 

34 AsyncGearsCommands, 

35 AsyncManagementCommands, 

36 AsyncModuleCommands, 

37 AsyncScriptCommands, 

38 DataAccessCommands, 

39 FunctionCommands, 

40 GearsCommands, 

41 ManagementCommands, 

42 ModuleCommands, 

43 PubSubCommands, 

44 ScriptCommands, 

45) 

46from .helpers import list_or_args 

47from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands 

48 

49if TYPE_CHECKING: 

50 from redis.asyncio.cluster import TargetNodesT 

51 

52# Not complete, but covers the major ones 

53# https://redis.io/commands 

54READ_COMMANDS = frozenset( 

55 [ 

56 "BITCOUNT", 

57 "BITPOS", 

58 "EVAL_RO", 

59 "EVALSHA_RO", 

60 "EXISTS", 

61 "GEODIST", 

62 "GEOHASH", 

63 "GEOPOS", 

64 "GEORADIUS", 

65 "GEORADIUSBYMEMBER", 

66 "GET", 

67 "GETBIT", 

68 "GETRANGE", 

69 "HEXISTS", 

70 "HGET", 

71 "HGETALL", 

72 "HKEYS", 

73 "HLEN", 

74 "HMGET", 

75 "HSTRLEN", 

76 "HVALS", 

77 "KEYS", 

78 "LINDEX", 

79 "LLEN", 

80 "LRANGE", 

81 "MGET", 

82 "PTTL", 

83 "RANDOMKEY", 

84 "SCARD", 

85 "SDIFF", 

86 "SINTER", 

87 "SISMEMBER", 

88 "SMEMBERS", 

89 "SRANDMEMBER", 

90 "STRLEN", 

91 "SUNION", 

92 "TTL", 

93 "ZCARD", 

94 "ZCOUNT", 

95 "ZRANGE", 

96 "ZSCORE", 

97 ] 

98) 

99 

100 

101class ClusterMultiKeyCommands(ClusterCommandsProtocol): 

102 """ 

103 A class containing commands that handle more than one key 

104 """ 

105 

106 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]: 

107 """Split keys into a dictionary that maps a slot to a list of keys.""" 

108 

109 slots_to_keys = {} 

110 for key in keys: 

111 slot = key_slot(self.encoder.encode(key)) 

112 slots_to_keys.setdefault(slot, []).append(key) 

113 

114 return slots_to_keys 

115 

116 def _partition_pairs_by_slot( 

117 self, mapping: Mapping[AnyKeyT, EncodableT] 

118 ) -> Dict[int, List[EncodableT]]: 

119 """Split pairs into a dictionary that maps a slot to a list of pairs.""" 

120 

121 slots_to_pairs = {} 

122 for pair in mapping.items(): 

123 slot = key_slot(self.encoder.encode(pair[0])) 

124 slots_to_pairs.setdefault(slot, []).extend(pair) 

125 

126 return slots_to_pairs 

127 

128 def _execute_pipeline_by_slot( 

129 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

130 ) -> List[Any]: 

131 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

132 pipe = self.pipeline() 

133 [ 

134 pipe.execute_command( 

135 command, 

136 *slot_args, 

137 target_nodes=[ 

138 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

139 ], 

140 ) 

141 for slot, slot_args in slots_to_args.items() 

142 ] 

143 return pipe.execute() 

144 

145 def _reorder_keys_by_command( 

146 self, 

147 keys: Iterable[KeyT], 

148 slots_to_args: Mapping[int, Iterable[EncodableT]], 

149 responses: Iterable[Any], 

150 ) -> List[Any]: 

151 results = { 

152 k: v 

153 for slot_values, response in zip(slots_to_args.values(), responses) 

154 for k, v in zip(slot_values, response) 

155 } 

156 return [results[key] for key in keys] 

157 

158 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

159 """ 

160 Splits the keys into different slots and then calls MGET 

161 for the keys of every slot. This operation will not be atomic 

162 if keys belong to more than one slot. 

163 

164 Returns a list of values ordered identically to ``keys`` 

165 

166 For more information see https://redis.io/commands/mget 

167 """ 

168 

169 # Concatenate all keys into a list 

170 keys = list_or_args(keys, args) 

171 

172 # Split keys into slots 

173 slots_to_keys = self._partition_keys_by_slot(keys) 

174 

175 # Execute commands using a pipeline 

176 res = self._execute_pipeline_by_slot("MGET", slots_to_keys) 

177 

178 # Reorder keys in the order the user provided & return 

179 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

180 

181 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

182 """ 

183 Sets key/values based on a mapping. Mapping is a dictionary of 

184 key/value pairs. Both keys and values should be strings or types that 

185 can be cast to a string via str(). 

186 

187 Splits the keys into different slots and then calls MSET 

188 for the keys of every slot. This operation will not be atomic 

189 if keys belong to more than one slot. 

190 

191 For more information see https://redis.io/commands/mset 

192 """ 

193 

194 # Partition the keys by slot 

195 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

196 

197 # Execute commands using a pipeline & return list of replies 

198 return self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

199 

200 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

201 """ 

202 Runs the given command once for the keys 

203 of each slot. Returns the sum of the return values. 

204 """ 

205 

206 # Partition the keys by slot 

207 slots_to_keys = self._partition_keys_by_slot(keys) 

208 

209 # Sum up the reply from each command 

210 return sum(self._execute_pipeline_by_slot(command, slots_to_keys)) 

211 

212 def exists(self, *keys: KeyT) -> ResponseT: 

213 """ 

214 Returns the number of ``names`` that exist in the 

215 whole cluster. The keys are first split up into slots 

216 and then an EXISTS command is sent for every slot 

217 

218 For more information see https://redis.io/commands/exists 

219 """ 

220 return self._split_command_across_slots("EXISTS", *keys) 

221 

222 def delete(self, *keys: KeyT) -> ResponseT: 

223 """ 

224 Deletes the given keys in the cluster. 

225 The keys are first split up into slots 

226 and then an DEL command is sent for every slot 

227 

228 Non-existent keys are ignored. 

229 Returns the number of keys that were deleted. 

230 

231 For more information see https://redis.io/commands/del 

232 """ 

233 return self._split_command_across_slots("DEL", *keys) 

234 

235 def touch(self, *keys: KeyT) -> ResponseT: 

236 """ 

237 Updates the last access time of given keys across the 

238 cluster. 

239 

240 The keys are first split up into slots 

241 and then an TOUCH command is sent for every slot 

242 

243 Non-existent keys are ignored. 

244 Returns the number of keys that were touched. 

245 

246 For more information see https://redis.io/commands/touch 

247 """ 

248 return self._split_command_across_slots("TOUCH", *keys) 

249 

250 def unlink(self, *keys: KeyT) -> ResponseT: 

251 """ 

252 Remove the specified keys in a different thread. 

253 

254 The keys are first split up into slots 

255 and then an TOUCH command is sent for every slot 

256 

257 Non-existent keys are ignored. 

258 Returns the number of keys that were unlinked. 

259 

260 For more information see https://redis.io/commands/unlink 

261 """ 

262 return self._split_command_across_slots("UNLINK", *keys) 

263 

264 

265class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands): 

266 """ 

267 A class containing commands that handle more than one key 

268 """ 

269 

270 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

271 """ 

272 Splits the keys into different slots and then calls MGET 

273 for the keys of every slot. This operation will not be atomic 

274 if keys belong to more than one slot. 

275 

276 Returns a list of values ordered identically to ``keys`` 

277 

278 For more information see https://redis.io/commands/mget 

279 """ 

280 

281 # Concatenate all keys into a list 

282 keys = list_or_args(keys, args) 

283 

284 # Split keys into slots 

285 slots_to_keys = self._partition_keys_by_slot(keys) 

286 

287 # Execute commands using a pipeline 

288 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys) 

289 

290 # Reorder keys in the order the user provided & return 

291 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

292 

293 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

294 """ 

295 Sets key/values based on a mapping. Mapping is a dictionary of 

296 key/value pairs. Both keys and values should be strings or types that 

297 can be cast to a string via str(). 

298 

299 Splits the keys into different slots and then calls MSET 

300 for the keys of every slot. This operation will not be atomic 

301 if keys belong to more than one slot. 

302 

303 For more information see https://redis.io/commands/mset 

304 """ 

305 

306 # Partition the keys by slot 

307 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

308 

309 # Execute commands using a pipeline & return list of replies 

310 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

311 

312 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

313 """ 

314 Runs the given command once for the keys 

315 of each slot. Returns the sum of the return values. 

316 """ 

317 

318 # Partition the keys by slot 

319 slots_to_keys = self._partition_keys_by_slot(keys) 

320 

321 # Sum up the reply from each command 

322 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys)) 

323 

324 async def _execute_pipeline_by_slot( 

325 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

326 ) -> List[Any]: 

327 if self._initialize: 

328 await self.initialize() 

329 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

330 pipe = self.pipeline() 

331 [ 

332 pipe.execute_command( 

333 command, 

334 *slot_args, 

335 target_nodes=[ 

336 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

337 ], 

338 ) 

339 for slot, slot_args in slots_to_args.items() 

340 ] 

341 return await pipe.execute() 

342 

343 

344class ClusterManagementCommands(ManagementCommands): 

345 """ 

346 A class for Redis Cluster management commands 

347 

348 The class inherits from Redis's core ManagementCommands class and do the 

349 required adjustments to work with cluster mode 

350 """ 

351 

352 def slaveof(self, *args, **kwargs) -> NoReturn: 

353 """ 

354 Make the server a replica of another instance, or promote it as master. 

355 

356 For more information see https://redis.io/commands/slaveof 

357 """ 

358 raise RedisClusterException("SLAVEOF is not supported in cluster mode") 

359 

360 def replicaof(self, *args, **kwargs) -> NoReturn: 

361 """ 

362 Make the server a replica of another instance, or promote it as master. 

363 

364 For more information see https://redis.io/commands/replicaof 

365 """ 

366 raise RedisClusterException("REPLICAOF is not supported in cluster mode") 

367 

368 def swapdb(self, *args, **kwargs) -> NoReturn: 

369 """ 

370 Swaps two Redis databases. 

371 

372 For more information see https://redis.io/commands/swapdb 

373 """ 

374 raise RedisClusterException("SWAPDB is not supported in cluster mode") 

375 

376 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT: 

377 """ 

378 Returns the node's id. 

379 

380 :target_node: 'ClusterNode' 

381 The node to execute the command on 

382 

383 For more information check https://redis.io/commands/cluster-myid/ 

384 """ 

385 return self.execute_command("CLUSTER MYID", target_nodes=target_node) 

386 

387 def cluster_addslots( 

388 self, target_node: "TargetNodesT", *slots: EncodableT 

389 ) -> ResponseT: 

390 """ 

391 Assign new hash slots to receiving node. Sends to specified node. 

392 

393 :target_node: 'ClusterNode' 

394 The node to execute the command on 

395 

396 For more information see https://redis.io/commands/cluster-addslots 

397 """ 

398 return self.execute_command( 

399 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node 

400 ) 

401 

402 def cluster_addslotsrange( 

403 self, target_node: "TargetNodesT", *slots: EncodableT 

404 ) -> ResponseT: 

405 """ 

406 Similar to the CLUSTER ADDSLOTS command. 

407 The difference between the two commands is that ADDSLOTS takes a list of slots 

408 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges 

409 (specified by start and end slots) to assign to the node. 

410 

411 :target_node: 'ClusterNode' 

412 The node to execute the command on 

413 

414 For more information see https://redis.io/commands/cluster-addslotsrange 

415 """ 

416 return self.execute_command( 

417 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node 

418 ) 

419 

420 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT: 

421 """ 

422 Return the number of local keys in the specified hash slot 

423 Send to node based on specified slot_id 

424 

425 For more information see https://redis.io/commands/cluster-countkeysinslot 

426 """ 

427 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id) 

428 

429 def cluster_count_failure_report(self, node_id: str) -> ResponseT: 

430 """ 

431 Return the number of failure reports active for a given node 

432 Sends to a random node 

433 

434 For more information see https://redis.io/commands/cluster-count-failure-reports 

435 """ 

436 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id) 

437 

438 def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

439 """ 

440 Set hash slots as unbound in the cluster. 

441 It determines by it self what node the slot is in and sends it there 

442 

443 Returns a list of the results for each processed slot. 

444 

445 For more information see https://redis.io/commands/cluster-delslots 

446 """ 

447 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots] 

448 

449 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT: 

450 """ 

451 Similar to the CLUSTER DELSLOTS command. 

452 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove 

453 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove 

454 from the node. 

455 

456 For more information see https://redis.io/commands/cluster-delslotsrange 

457 """ 

458 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots) 

459 

460 def cluster_failover( 

461 self, target_node: "TargetNodesT", option: Optional[str] = None 

462 ) -> ResponseT: 

463 """ 

464 Forces a slave to perform a manual failover of its master 

465 Sends to specified node 

466 

467 :target_node: 'ClusterNode' 

468 The node to execute the command on 

469 

470 For more information see https://redis.io/commands/cluster-failover 

471 """ 

472 if option: 

473 if option.upper() not in ["FORCE", "TAKEOVER"]: 

474 raise RedisError( 

475 f"Invalid option for CLUSTER FAILOVER command: {option}" 

476 ) 

477 else: 

478 return self.execute_command( 

479 "CLUSTER FAILOVER", option, target_nodes=target_node 

480 ) 

481 else: 

482 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node) 

483 

484 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

485 """ 

486 Provides info about Redis Cluster node state. 

487 The command will be sent to a random node in the cluster if no target 

488 node is specified. 

489 

490 For more information see https://redis.io/commands/cluster-info 

491 """ 

492 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes) 

493 

494 def cluster_keyslot(self, key: str) -> ResponseT: 

495 """ 

496 Returns the hash slot of the specified key 

497 Sends to random node in the cluster 

498 

499 For more information see https://redis.io/commands/cluster-keyslot 

500 """ 

501 return self.execute_command("CLUSTER KEYSLOT", key) 

502 

503 def cluster_meet( 

504 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None 

505 ) -> ResponseT: 

506 """ 

507 Force a node cluster to handshake with another node. 

508 Sends to specified node. 

509 

510 For more information see https://redis.io/commands/cluster-meet 

511 """ 

512 return self.execute_command( 

513 "CLUSTER MEET", host, port, target_nodes=target_nodes 

514 ) 

515 

516 def cluster_nodes(self) -> ResponseT: 

517 """ 

518 Get Cluster config for the node. 

519 Sends to random node in the cluster 

520 

521 For more information see https://redis.io/commands/cluster-nodes 

522 """ 

523 return self.execute_command("CLUSTER NODES") 

524 

525 def cluster_replicate( 

526 self, target_nodes: "TargetNodesT", node_id: str 

527 ) -> ResponseT: 

528 """ 

529 Reconfigure a node as a slave of the specified master node 

530 

531 For more information see https://redis.io/commands/cluster-replicate 

532 """ 

533 return self.execute_command( 

534 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes 

535 ) 

536 

537 def cluster_reset( 

538 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None 

539 ) -> ResponseT: 

540 """ 

541 Reset a Redis Cluster node 

542 

543 If 'soft' is True then it will send 'SOFT' argument 

544 If 'soft' is False then it will send 'HARD' argument 

545 

546 For more information see https://redis.io/commands/cluster-reset 

547 """ 

548 return self.execute_command( 

549 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes 

550 ) 

551 

552 def cluster_save_config( 

553 self, target_nodes: Optional["TargetNodesT"] = None 

554 ) -> ResponseT: 

555 """ 

556 Forces the node to save cluster state on disk 

557 

558 For more information see https://redis.io/commands/cluster-saveconfig 

559 """ 

560 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes) 

561 

562 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT: 

563 """ 

564 Returns the number of keys in the specified cluster slot 

565 

566 For more information see https://redis.io/commands/cluster-getkeysinslot 

567 """ 

568 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) 

569 

570 def cluster_set_config_epoch( 

571 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None 

572 ) -> ResponseT: 

573 """ 

574 Set the configuration epoch in a new node 

575 

576 For more information see https://redis.io/commands/cluster-set-config-epoch 

577 """ 

578 return self.execute_command( 

579 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes 

580 ) 

581 

582 def cluster_setslot( 

583 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str 

584 ) -> ResponseT: 

585 """ 

586 Bind an hash slot to a specific node 

587 

588 :target_node: 'ClusterNode' 

589 The node to execute the command on 

590 

591 For more information see https://redis.io/commands/cluster-setslot 

592 """ 

593 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"): 

594 return self.execute_command( 

595 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node 

596 ) 

597 elif state.upper() == "STABLE": 

598 raise RedisError('For "stable" state please use ' "cluster_setslot_stable") 

599 else: 

600 raise RedisError(f"Invalid slot state: {state}") 

601 

602 def cluster_setslot_stable(self, slot_id: int) -> ResponseT: 

603 """ 

604 Clears migrating / importing state from the slot. 

605 It determines by it self what node the slot is in and sends it there. 

606 

607 For more information see https://redis.io/commands/cluster-setslot 

608 """ 

609 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") 

610 

611 def cluster_replicas( 

612 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None 

613 ) -> ResponseT: 

614 """ 

615 Provides a list of replica nodes replicating from the specified primary 

616 target node. 

617 

618 For more information see https://redis.io/commands/cluster-replicas 

619 """ 

620 return self.execute_command( 

621 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes 

622 ) 

623 

624 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

625 """ 

626 Get array of Cluster slot to node mappings 

627 

628 For more information see https://redis.io/commands/cluster-slots 

629 """ 

630 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes) 

631 

632 def cluster_shards(self, target_nodes=None): 

633 """ 

634 Returns details about the shards of the cluster. 

635 

636 For more information see https://redis.io/commands/cluster-shards 

637 """ 

638 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes) 

639 

640 def cluster_myshardid(self, target_nodes=None): 

641 """ 

642 Returns the shard ID of the node. 

643 

644 For more information see https://redis.io/commands/cluster-myshardid/ 

645 """ 

646 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes) 

647 

648 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT: 

649 """ 

650 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each 

651 peer in the cluster: One for sending outbound messages towards the peer and one 

652 for receiving inbound messages from the peer. 

653 

654 This command outputs information of all such peer links as an array. 

655 

656 For more information see https://redis.io/commands/cluster-links 

657 """ 

658 return self.execute_command("CLUSTER LINKS", target_nodes=target_node) 

659 

660 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

661 raise NotImplementedError( 

662 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client." 

663 ) 

664 

665 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

666 raise NotImplementedError( 

667 "CLUSTER BUMPEPOCH is intentionally not implemented in the client." 

668 ) 

669 

670 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

671 """ 

672 Enables read queries. 

673 The command will be sent to the default cluster node if target_nodes is 

674 not specified. 

675 

676 For more information see https://redis.io/commands/readonly 

677 """ 

678 if target_nodes == "replicas" or target_nodes == "all": 

679 # read_from_replicas will only be enabled if the READONLY command 

680 # is sent to all replicas 

681 self.read_from_replicas = True 

682 return self.execute_command("READONLY", target_nodes=target_nodes) 

683 

684 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

685 """ 

686 Disables read queries. 

687 The command will be sent to the default cluster node if target_nodes is 

688 not specified. 

689 

690 For more information see https://redis.io/commands/readwrite 

691 """ 

692 # Reset read from replicas flag 

693 self.read_from_replicas = False 

694 return self.execute_command("READWRITE", target_nodes=target_nodes) 

695 

696 def gears_refresh_cluster(self, **kwargs) -> ResponseT: 

697 """ 

698 On an OSS cluster, before executing any gears function, you must call this command. # noqa 

699 """ 

700 return self.execute_command("REDISGEARS_2.REFRESHCLUSTER", **kwargs) 

701 

702 

703class AsyncClusterManagementCommands( 

704 ClusterManagementCommands, AsyncManagementCommands 

705): 

706 """ 

707 A class for Redis Cluster management commands 

708 

709 The class inherits from Redis's core ManagementCommands class and do the 

710 required adjustments to work with cluster mode 

711 """ 

712 

713 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

714 """ 

715 Set hash slots as unbound in the cluster. 

716 It determines by it self what node the slot is in and sends it there 

717 

718 Returns a list of the results for each processed slot. 

719 

720 For more information see https://redis.io/commands/cluster-delslots 

721 """ 

722 return await asyncio.gather( 

723 *( 

724 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot)) 

725 for slot in slots 

726 ) 

727 ) 

728 

729 

730class ClusterDataAccessCommands(DataAccessCommands): 

731 """ 

732 A class for Redis Cluster Data Access Commands 

733 

734 The class inherits from Redis's core DataAccessCommand class and do the 

735 required adjustments to work with cluster mode 

736 """ 

737 

738 def stralgo( 

739 self, 

740 algo: Literal["LCS"], 

741 value1: KeyT, 

742 value2: KeyT, 

743 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", 

744 len: bool = False, 

745 idx: bool = False, 

746 minmatchlen: Optional[int] = None, 

747 withmatchlen: bool = False, 

748 **kwargs, 

749 ) -> ResponseT: 

750 """ 

751 Implements complex algorithms that operate on strings. 

752 Right now the only algorithm implemented is the LCS algorithm 

753 (longest common substring). However new algorithms could be 

754 implemented in the future. 

755 

756 ``algo`` Right now must be LCS 

757 ``value1`` and ``value2`` Can be two strings or two keys 

758 ``specific_argument`` Specifying if the arguments to the algorithm 

759 will be keys or strings. strings is the default. 

760 ``len`` Returns just the len of the match. 

761 ``idx`` Returns the match positions in each string. 

762 ``minmatchlen`` Restrict the list of matches to the ones of a given 

763 minimal length. Can be provided only when ``idx`` set to True. 

764 ``withmatchlen`` Returns the matches with the len of the match. 

765 Can be provided only when ``idx`` set to True. 

766 

767 For more information see https://redis.io/commands/stralgo 

768 """ 

769 target_nodes = kwargs.pop("target_nodes", None) 

770 if specific_argument == "strings" and target_nodes is None: 

771 target_nodes = "default-node" 

772 kwargs.update({"target_nodes": target_nodes}) 

773 return super().stralgo( 

774 algo, 

775 value1, 

776 value2, 

777 specific_argument, 

778 len, 

779 idx, 

780 minmatchlen, 

781 withmatchlen, 

782 **kwargs, 

783 ) 

784 

785 def scan_iter( 

786 self, 

787 match: Optional[PatternT] = None, 

788 count: Optional[int] = None, 

789 _type: Optional[str] = None, 

790 **kwargs, 

791 ) -> Iterator: 

792 # Do the first query with cursor=0 for all nodes 

793 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) 

794 yield from data 

795 

796 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

797 if cursors: 

798 # Get nodes by name 

799 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

800 

801 # Iterate over each node till its cursor is 0 

802 kwargs.pop("target_nodes", None) 

803 while cursors: 

804 for name, cursor in cursors.items(): 

805 cur, data = self.scan( 

806 cursor=cursor, 

807 match=match, 

808 count=count, 

809 _type=_type, 

810 target_nodes=nodes[name], 

811 **kwargs, 

812 ) 

813 yield from data 

814 cursors[name] = cur[name] 

815 

816 cursors = { 

817 name: cursor for name, cursor in cursors.items() if cursor != 0 

818 } 

819 

820 

821class AsyncClusterDataAccessCommands( 

822 ClusterDataAccessCommands, AsyncDataAccessCommands 

823): 

824 """ 

825 A class for Redis Cluster Data Access Commands 

826 

827 The class inherits from Redis's core DataAccessCommand class and do the 

828 required adjustments to work with cluster mode 

829 """ 

830 

831 async def scan_iter( 

832 self, 

833 match: Optional[PatternT] = None, 

834 count: Optional[int] = None, 

835 _type: Optional[str] = None, 

836 **kwargs, 

837 ) -> AsyncIterator: 

838 # Do the first query with cursor=0 for all nodes 

839 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs) 

840 for value in data: 

841 yield value 

842 

843 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

844 if cursors: 

845 # Get nodes by name 

846 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

847 

848 # Iterate over each node till its cursor is 0 

849 kwargs.pop("target_nodes", None) 

850 while cursors: 

851 for name, cursor in cursors.items(): 

852 cur, data = await self.scan( 

853 cursor=cursor, 

854 match=match, 

855 count=count, 

856 _type=_type, 

857 target_nodes=nodes[name], 

858 **kwargs, 

859 ) 

860 for value in data: 

861 yield value 

862 cursors[name] = cur[name] 

863 

864 cursors = { 

865 name: cursor for name, cursor in cursors.items() if cursor != 0 

866 } 

867 

868 

869class RedisClusterCommands( 

870 ClusterMultiKeyCommands, 

871 ClusterManagementCommands, 

872 ACLCommands, 

873 PubSubCommands, 

874 ClusterDataAccessCommands, 

875 ScriptCommands, 

876 FunctionCommands, 

877 GearsCommands, 

878 ModuleCommands, 

879 RedisModuleCommands, 

880): 

881 """ 

882 A class for all Redis Cluster commands 

883 

884 For key-based commands, the target node(s) will be internally determined 

885 by the keys' hash slot. 

886 Non-key-based commands can be executed with the 'target_nodes' argument to 

887 target specific nodes. By default, if target_nodes is not specified, the 

888 command will be executed on the default cluster node. 

889 

890 :param :target_nodes: type can be one of the followings: 

891 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

892 - 'ClusterNode' 

893 - 'list(ClusterNodes)' 

894 - 'dict(any:clusterNodes)' 

895 

896 for example: 

897 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

898 """ 

899 

900 

901class AsyncRedisClusterCommands( 

902 AsyncClusterMultiKeyCommands, 

903 AsyncClusterManagementCommands, 

904 AsyncACLCommands, 

905 AsyncClusterDataAccessCommands, 

906 AsyncScriptCommands, 

907 AsyncFunctionCommands, 

908 AsyncGearsCommands, 

909 AsyncModuleCommands, 

910 AsyncRedisModuleCommands, 

911): 

912 """ 

913 A class for all Redis Cluster commands 

914 

915 For key-based commands, the target node(s) will be internally determined 

916 by the keys' hash slot. 

917 Non-key-based commands can be executed with the 'target_nodes' argument to 

918 target specific nodes. By default, if target_nodes is not specified, the 

919 command will be executed on the default cluster node. 

920 

921 :param :target_nodes: type can be one of the followings: 

922 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

923 - 'ClusterNode' 

924 - 'list(ClusterNodes)' 

925 - 'dict(any:clusterNodes)' 

926 

927 for example: 

928 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

929 """