Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/commands/cluster.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

185 statements  

1import asyncio 

2from typing import ( 

3 TYPE_CHECKING, 

4 Any, 

5 AsyncIterator, 

6 Dict, 

7 Iterable, 

8 Iterator, 

9 List, 

10 Literal, 

11 Mapping, 

12 NoReturn, 

13 Optional, 

14 Union, 

15) 

16 

17from redis.crc import key_slot 

18from redis.exceptions import RedisClusterException, RedisError 

19from redis.typing import ( 

20 AnyKeyT, 

21 ClusterCommandsProtocol, 

22 EncodableT, 

23 KeysT, 

24 KeyT, 

25 PatternT, 

26 ResponseT, 

27) 

28 

29from .core import ( 

30 ACLCommands, 

31 AsyncACLCommands, 

32 AsyncDataAccessCommands, 

33 AsyncFunctionCommands, 

34 AsyncManagementCommands, 

35 AsyncModuleCommands, 

36 AsyncScriptCommands, 

37 DataAccessCommands, 

38 FunctionCommands, 

39 ManagementCommands, 

40 ModuleCommands, 

41 PubSubCommands, 

42 ScriptCommands, 

43) 

44from .helpers import list_or_args 

45from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands 

46 

47if TYPE_CHECKING: 

48 from redis.asyncio.cluster import TargetNodesT 

49 

50# Not complete, but covers the major ones 

51# https://redis.io/commands 

52READ_COMMANDS = frozenset( 

53 [ 

54 # Bit Operations 

55 "BITCOUNT", 

56 "BITFIELD_RO", 

57 "BITPOS", 

58 # Scripting 

59 "EVAL_RO", 

60 "EVALSHA_RO", 

61 "FCALL_RO", 

62 # Key Operations 

63 "DBSIZE", 

64 "DIGEST", 

65 "DUMP", 

66 "EXISTS", 

67 "EXPIRETIME", 

68 "PEXPIRETIME", 

69 "KEYS", 

70 "SCAN", 

71 "PTTL", 

72 "RANDOMKEY", 

73 "TTL", 

74 "TYPE", 

75 # String Operations 

76 "GET", 

77 "GETBIT", 

78 "GETRANGE", 

79 "MGET", 

80 "STRLEN", 

81 "LCS", 

82 # Geo Operations 

83 "GEODIST", 

84 "GEOHASH", 

85 "GEOPOS", 

86 "GEOSEARCH", 

87 # Hash Operations 

88 "HEXISTS", 

89 "HGET", 

90 "HGETALL", 

91 "HKEYS", 

92 "HLEN", 

93 "HMGET", 

94 "HSTRLEN", 

95 "HVALS", 

96 "HRANDFIELD", 

97 "HEXPIRETIME", 

98 "HPEXPIRETIME", 

99 "HTTL", 

100 "HPTTL", 

101 "HSCAN", 

102 # List Operations 

103 "LINDEX", 

104 "LPOS", 

105 "LLEN", 

106 "LRANGE", 

107 # Set Operations 

108 "SCARD", 

109 "SDIFF", 

110 "SINTER", 

111 "SINTERCARD", 

112 "SISMEMBER", 

113 "SMISMEMBER", 

114 "SMEMBERS", 

115 "SRANDMEMBER", 

116 "SUNION", 

117 "SSCAN", 

118 # Sorted Set Operations 

119 "ZCARD", 

120 "ZCOUNT", 

121 "ZDIFF", 

122 "ZINTER", 

123 "ZINTERCARD", 

124 "ZLEXCOUNT", 

125 "ZMSCORE", 

126 "ZRANDMEMBER", 

127 "ZRANGE", 

128 "ZRANGEBYLEX", 

129 "ZRANGEBYSCORE", 

130 "ZRANK", 

131 "ZREVRANGE", 

132 "ZREVRANGEBYLEX", 

133 "ZREVRANGEBYSCORE", 

134 "ZREVRANK", 

135 "ZSCAN", 

136 "ZSCORE", 

137 "ZUNION", 

138 # Stream Operations 

139 "XLEN", 

140 "XPENDING", 

141 "XRANGE", 

142 "XREAD", 

143 "XREVRANGE", 

144 # JSON Module 

145 "JSON.ARRINDEX", 

146 "JSON.ARRLEN", 

147 "JSON.GET", 

148 "JSON.MGET", 

149 "JSON.OBJKEYS", 

150 "JSON.OBJLEN", 

151 "JSON.RESP", 

152 "JSON.STRLEN", 

153 "JSON.TYPE", 

154 # RediSearch Module 

155 "FT.EXPLAIN", 

156 "FT.INFO", 

157 "FT.PROFILE", 

158 "FT.SEARCH", 

159 ] 

160) 

161 

162 

163class ClusterMultiKeyCommands(ClusterCommandsProtocol): 

164 """ 

165 A class containing commands that handle more than one key 

166 """ 

167 

168 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]: 

169 """Split keys into a dictionary that maps a slot to a list of keys.""" 

170 

171 slots_to_keys = {} 

172 for key in keys: 

173 slot = key_slot(self.encoder.encode(key)) 

174 slots_to_keys.setdefault(slot, []).append(key) 

175 

176 return slots_to_keys 

177 

178 def _partition_pairs_by_slot( 

179 self, mapping: Mapping[AnyKeyT, EncodableT] 

180 ) -> Dict[int, List[EncodableT]]: 

181 """Split pairs into a dictionary that maps a slot to a list of pairs.""" 

182 

183 slots_to_pairs = {} 

184 for pair in mapping.items(): 

185 slot = key_slot(self.encoder.encode(pair[0])) 

186 slots_to_pairs.setdefault(slot, []).extend(pair) 

187 

188 return slots_to_pairs 

189 

190 def _execute_pipeline_by_slot( 

191 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

192 ) -> List[Any]: 

193 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

194 pipe = self.pipeline() 

195 [ 

196 pipe.execute_command( 

197 command, 

198 *slot_args, 

199 target_nodes=[ 

200 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

201 ], 

202 ) 

203 for slot, slot_args in slots_to_args.items() 

204 ] 

205 return pipe.execute() 

206 

207 def _reorder_keys_by_command( 

208 self, 

209 keys: Iterable[KeyT], 

210 slots_to_args: Mapping[int, Iterable[EncodableT]], 

211 responses: Iterable[Any], 

212 ) -> List[Any]: 

213 results = { 

214 k: v 

215 for slot_values, response in zip(slots_to_args.values(), responses) 

216 for k, v in zip(slot_values, response) 

217 } 

218 return [results[key] for key in keys] 

219 

220 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

221 """ 

222 Splits the keys into different slots and then calls MGET 

223 for the keys of every slot. This operation will not be atomic 

224 if keys belong to more than one slot. 

225 

226 Returns a list of values ordered identically to ``keys`` 

227 

228 For more information see https://redis.io/commands/mget 

229 """ 

230 

231 # Concatenate all keys into a list 

232 keys = list_or_args(keys, args) 

233 

234 # Split keys into slots 

235 slots_to_keys = self._partition_keys_by_slot(keys) 

236 

237 # Execute commands using a pipeline 

238 res = self._execute_pipeline_by_slot("MGET", slots_to_keys) 

239 

240 # Reorder keys in the order the user provided & return 

241 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

242 

243 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

244 """ 

245 Sets key/values based on a mapping. Mapping is a dictionary of 

246 key/value pairs. Both keys and values should be strings or types that 

247 can be cast to a string via str(). 

248 

249 Splits the keys into different slots and then calls MSET 

250 for the keys of every slot. This operation will not be atomic 

251 if keys belong to more than one slot. 

252 

253 For more information see https://redis.io/commands/mset 

254 """ 

255 

256 # Partition the keys by slot 

257 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

258 

259 # Execute commands using a pipeline & return list of replies 

260 return self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

261 

262 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

263 """ 

264 Runs the given command once for the keys 

265 of each slot. Returns the sum of the return values. 

266 """ 

267 

268 # Partition the keys by slot 

269 slots_to_keys = self._partition_keys_by_slot(keys) 

270 

271 # Sum up the reply from each command 

272 return sum(self._execute_pipeline_by_slot(command, slots_to_keys)) 

273 

274 def exists(self, *keys: KeyT) -> ResponseT: 

275 """ 

276 Returns the number of ``names`` that exist in the 

277 whole cluster. The keys are first split up into slots 

278 and then an EXISTS command is sent for every slot 

279 

280 For more information see https://redis.io/commands/exists 

281 """ 

282 return self._split_command_across_slots("EXISTS", *keys) 

283 

284 def delete(self, *keys: KeyT) -> ResponseT: 

285 """ 

286 Deletes the given keys in the cluster. 

287 The keys are first split up into slots 

288 and then an DEL command is sent for every slot 

289 

290 Non-existent keys are ignored. 

291 Returns the number of keys that were deleted. 

292 

293 For more information see https://redis.io/commands/del 

294 """ 

295 return self._split_command_across_slots("DEL", *keys) 

296 

297 def touch(self, *keys: KeyT) -> ResponseT: 

298 """ 

299 Updates the last access time of given keys across the 

300 cluster. 

301 

302 The keys are first split up into slots 

303 and then an TOUCH command is sent for every slot 

304 

305 Non-existent keys are ignored. 

306 Returns the number of keys that were touched. 

307 

308 For more information see https://redis.io/commands/touch 

309 """ 

310 return self._split_command_across_slots("TOUCH", *keys) 

311 

312 def unlink(self, *keys: KeyT) -> ResponseT: 

313 """ 

314 Remove the specified keys in a different thread. 

315 

316 The keys are first split up into slots 

317 and then an TOUCH command is sent for every slot 

318 

319 Non-existent keys are ignored. 

320 Returns the number of keys that were unlinked. 

321 

322 For more information see https://redis.io/commands/unlink 

323 """ 

324 return self._split_command_across_slots("UNLINK", *keys) 

325 

326 

327class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands): 

328 """ 

329 A class containing commands that handle more than one key 

330 """ 

331 

332 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

333 """ 

334 Splits the keys into different slots and then calls MGET 

335 for the keys of every slot. This operation will not be atomic 

336 if keys belong to more than one slot. 

337 

338 Returns a list of values ordered identically to ``keys`` 

339 

340 For more information see https://redis.io/commands/mget 

341 """ 

342 

343 # Concatenate all keys into a list 

344 keys = list_or_args(keys, args) 

345 

346 # Split keys into slots 

347 slots_to_keys = self._partition_keys_by_slot(keys) 

348 

349 # Execute commands using a pipeline 

350 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys) 

351 

352 # Reorder keys in the order the user provided & return 

353 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

354 

355 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

356 """ 

357 Sets key/values based on a mapping. Mapping is a dictionary of 

358 key/value pairs. Both keys and values should be strings or types that 

359 can be cast to a string via str(). 

360 

361 Splits the keys into different slots and then calls MSET 

362 for the keys of every slot. This operation will not be atomic 

363 if keys belong to more than one slot. 

364 

365 For more information see https://redis.io/commands/mset 

366 """ 

367 

368 # Partition the keys by slot 

369 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

370 

371 # Execute commands using a pipeline & return list of replies 

372 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

373 

374 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

375 """ 

376 Runs the given command once for the keys 

377 of each slot. Returns the sum of the return values. 

378 """ 

379 

380 # Partition the keys by slot 

381 slots_to_keys = self._partition_keys_by_slot(keys) 

382 

383 # Sum up the reply from each command 

384 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys)) 

385 

386 async def _execute_pipeline_by_slot( 

387 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

388 ) -> List[Any]: 

389 if self._initialize: 

390 await self.initialize() 

391 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

392 pipe = self.pipeline() 

393 [ 

394 pipe.execute_command( 

395 command, 

396 *slot_args, 

397 target_nodes=[ 

398 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

399 ], 

400 ) 

401 for slot, slot_args in slots_to_args.items() 

402 ] 

403 return await pipe.execute() 

404 

405 

406class ClusterManagementCommands(ManagementCommands): 

407 """ 

408 A class for Redis Cluster management commands 

409 

410 The class inherits from Redis's core ManagementCommands class and do the 

411 required adjustments to work with cluster mode 

412 """ 

413 

414 def slaveof(self, *args, **kwargs) -> NoReturn: 

415 """ 

416 Make the server a replica of another instance, or promote it as master. 

417 

418 For more information see https://redis.io/commands/slaveof 

419 """ 

420 raise RedisClusterException("SLAVEOF is not supported in cluster mode") 

421 

422 def replicaof(self, *args, **kwargs) -> NoReturn: 

423 """ 

424 Make the server a replica of another instance, or promote it as master. 

425 

426 For more information see https://redis.io/commands/replicaof 

427 """ 

428 raise RedisClusterException("REPLICAOF is not supported in cluster mode") 

429 

430 def swapdb(self, *args, **kwargs) -> NoReturn: 

431 """ 

432 Swaps two Redis databases. 

433 

434 For more information see https://redis.io/commands/swapdb 

435 """ 

436 raise RedisClusterException("SWAPDB is not supported in cluster mode") 

437 

438 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT: 

439 """ 

440 Returns the node's id. 

441 

442 :target_node: 'ClusterNode' 

443 The node to execute the command on 

444 

445 For more information check https://redis.io/commands/cluster-myid/ 

446 """ 

447 return self.execute_command("CLUSTER MYID", target_nodes=target_node) 

448 

449 def cluster_addslots( 

450 self, target_node: "TargetNodesT", *slots: EncodableT 

451 ) -> ResponseT: 

452 """ 

453 Assign new hash slots to receiving node. Sends to specified node. 

454 

455 :target_node: 'ClusterNode' 

456 The node to execute the command on 

457 

458 For more information see https://redis.io/commands/cluster-addslots 

459 """ 

460 return self.execute_command( 

461 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node 

462 ) 

463 

464 def cluster_addslotsrange( 

465 self, target_node: "TargetNodesT", *slots: EncodableT 

466 ) -> ResponseT: 

467 """ 

468 Similar to the CLUSTER ADDSLOTS command. 

469 The difference between the two commands is that ADDSLOTS takes a list of slots 

470 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges 

471 (specified by start and end slots) to assign to the node. 

472 

473 :target_node: 'ClusterNode' 

474 The node to execute the command on 

475 

476 For more information see https://redis.io/commands/cluster-addslotsrange 

477 """ 

478 return self.execute_command( 

479 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node 

480 ) 

481 

482 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT: 

483 """ 

484 Return the number of local keys in the specified hash slot 

485 Send to node based on specified slot_id 

486 

487 For more information see https://redis.io/commands/cluster-countkeysinslot 

488 """ 

489 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id) 

490 

491 def cluster_count_failure_report(self, node_id: str) -> ResponseT: 

492 """ 

493 Return the number of failure reports active for a given node 

494 Sends to a random node 

495 

496 For more information see https://redis.io/commands/cluster-count-failure-reports 

497 """ 

498 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id) 

499 

500 def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

501 """ 

502 Set hash slots as unbound in the cluster. 

503 It determines by it self what node the slot is in and sends it there 

504 

505 Returns a list of the results for each processed slot. 

506 

507 For more information see https://redis.io/commands/cluster-delslots 

508 """ 

509 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots] 

510 

511 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT: 

512 """ 

513 Similar to the CLUSTER DELSLOTS command. 

514 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove 

515 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove 

516 from the node. 

517 

518 For more information see https://redis.io/commands/cluster-delslotsrange 

519 """ 

520 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots) 

521 

522 def cluster_failover( 

523 self, target_node: "TargetNodesT", option: Optional[str] = None 

524 ) -> ResponseT: 

525 """ 

526 Forces a slave to perform a manual failover of its master 

527 Sends to specified node 

528 

529 :target_node: 'ClusterNode' 

530 The node to execute the command on 

531 

532 For more information see https://redis.io/commands/cluster-failover 

533 """ 

534 if option: 

535 if option.upper() not in ["FORCE", "TAKEOVER"]: 

536 raise RedisError( 

537 f"Invalid option for CLUSTER FAILOVER command: {option}" 

538 ) 

539 else: 

540 return self.execute_command( 

541 "CLUSTER FAILOVER", option, target_nodes=target_node 

542 ) 

543 else: 

544 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node) 

545 

546 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

547 """ 

548 Provides info about Redis Cluster node state. 

549 The command will be sent to a random node in the cluster if no target 

550 node is specified. 

551 

552 For more information see https://redis.io/commands/cluster-info 

553 """ 

554 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes) 

555 

556 def cluster_keyslot(self, key: str) -> ResponseT: 

557 """ 

558 Returns the hash slot of the specified key 

559 Sends to random node in the cluster 

560 

561 For more information see https://redis.io/commands/cluster-keyslot 

562 """ 

563 return self.execute_command("CLUSTER KEYSLOT", key) 

564 

565 def cluster_meet( 

566 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None 

567 ) -> ResponseT: 

568 """ 

569 Force a node cluster to handshake with another node. 

570 Sends to specified node. 

571 

572 For more information see https://redis.io/commands/cluster-meet 

573 """ 

574 return self.execute_command( 

575 "CLUSTER MEET", host, port, target_nodes=target_nodes 

576 ) 

577 

578 def cluster_nodes(self) -> ResponseT: 

579 """ 

580 Get Cluster config for the node. 

581 Sends to random node in the cluster 

582 

583 For more information see https://redis.io/commands/cluster-nodes 

584 """ 

585 return self.execute_command("CLUSTER NODES") 

586 

587 def cluster_replicate( 

588 self, target_nodes: "TargetNodesT", node_id: str 

589 ) -> ResponseT: 

590 """ 

591 Reconfigure a node as a slave of the specified master node 

592 

593 For more information see https://redis.io/commands/cluster-replicate 

594 """ 

595 return self.execute_command( 

596 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes 

597 ) 

598 

599 def cluster_reset( 

600 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None 

601 ) -> ResponseT: 

602 """ 

603 Reset a Redis Cluster node 

604 

605 If 'soft' is True then it will send 'SOFT' argument 

606 If 'soft' is False then it will send 'HARD' argument 

607 

608 For more information see https://redis.io/commands/cluster-reset 

609 """ 

610 return self.execute_command( 

611 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes 

612 ) 

613 

614 def cluster_save_config( 

615 self, target_nodes: Optional["TargetNodesT"] = None 

616 ) -> ResponseT: 

617 """ 

618 Forces the node to save cluster state on disk 

619 

620 For more information see https://redis.io/commands/cluster-saveconfig 

621 """ 

622 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes) 

623 

624 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT: 

625 """ 

626 Returns the number of keys in the specified cluster slot 

627 

628 For more information see https://redis.io/commands/cluster-getkeysinslot 

629 """ 

630 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) 

631 

632 def cluster_set_config_epoch( 

633 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None 

634 ) -> ResponseT: 

635 """ 

636 Set the configuration epoch in a new node 

637 

638 For more information see https://redis.io/commands/cluster-set-config-epoch 

639 """ 

640 return self.execute_command( 

641 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes 

642 ) 

643 

644 def cluster_setslot( 

645 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str 

646 ) -> ResponseT: 

647 """ 

648 Bind an hash slot to a specific node 

649 

650 :target_node: 'ClusterNode' 

651 The node to execute the command on 

652 

653 For more information see https://redis.io/commands/cluster-setslot 

654 """ 

655 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"): 

656 return self.execute_command( 

657 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node 

658 ) 

659 elif state.upper() == "STABLE": 

660 raise RedisError('For "stable" state please use cluster_setslot_stable') 

661 else: 

662 raise RedisError(f"Invalid slot state: {state}") 

663 

664 def cluster_setslot_stable(self, slot_id: int) -> ResponseT: 

665 """ 

666 Clears migrating / importing state from the slot. 

667 It determines by it self what node the slot is in and sends it there. 

668 

669 For more information see https://redis.io/commands/cluster-setslot 

670 """ 

671 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") 

672 

673 def cluster_replicas( 

674 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None 

675 ) -> ResponseT: 

676 """ 

677 Provides a list of replica nodes replicating from the specified primary 

678 target node. 

679 

680 For more information see https://redis.io/commands/cluster-replicas 

681 """ 

682 return self.execute_command( 

683 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes 

684 ) 

685 

686 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

687 """ 

688 Get array of Cluster slot to node mappings 

689 

690 For more information see https://redis.io/commands/cluster-slots 

691 """ 

692 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes) 

693 

694 def cluster_shards(self, target_nodes=None): 

695 """ 

696 Returns details about the shards of the cluster. 

697 

698 For more information see https://redis.io/commands/cluster-shards 

699 """ 

700 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes) 

701 

702 def cluster_myshardid(self, target_nodes=None): 

703 """ 

704 Returns the shard ID of the node. 

705 

706 For more information see https://redis.io/commands/cluster-myshardid/ 

707 """ 

708 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes) 

709 

710 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT: 

711 """ 

712 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each 

713 peer in the cluster: One for sending outbound messages towards the peer and one 

714 for receiving inbound messages from the peer. 

715 

716 This command outputs information of all such peer links as an array. 

717 

718 For more information see https://redis.io/commands/cluster-links 

719 """ 

720 return self.execute_command("CLUSTER LINKS", target_nodes=target_node) 

721 

722 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

723 raise NotImplementedError( 

724 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client." 

725 ) 

726 

727 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

728 raise NotImplementedError( 

729 "CLUSTER BUMPEPOCH is intentionally not implemented in the client." 

730 ) 

731 

732 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

733 """ 

734 Enables read queries. 

735 The command will be sent to the default cluster node if target_nodes is 

736 not specified. 

737 

738 For more information see https://redis.io/commands/readonly 

739 """ 

740 if target_nodes == "replicas" or target_nodes == "all": 

741 # read_from_replicas will only be enabled if the READONLY command 

742 # is sent to all replicas 

743 self.read_from_replicas = True 

744 return self.execute_command("READONLY", target_nodes=target_nodes) 

745 

746 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

747 """ 

748 Disables read queries. 

749 The command will be sent to the default cluster node if target_nodes is 

750 not specified. 

751 

752 For more information see https://redis.io/commands/readwrite 

753 """ 

754 # Reset read from replicas flag 

755 self.read_from_replicas = False 

756 return self.execute_command("READWRITE", target_nodes=target_nodes) 

757 

758 

759class AsyncClusterManagementCommands( 

760 ClusterManagementCommands, AsyncManagementCommands 

761): 

762 """ 

763 A class for Redis Cluster management commands 

764 

765 The class inherits from Redis's core ManagementCommands class and do the 

766 required adjustments to work with cluster mode 

767 """ 

768 

769 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

770 """ 

771 Set hash slots as unbound in the cluster. 

772 It determines by it self what node the slot is in and sends it there 

773 

774 Returns a list of the results for each processed slot. 

775 

776 For more information see https://redis.io/commands/cluster-delslots 

777 """ 

778 return await asyncio.gather( 

779 *( 

780 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot)) 

781 for slot in slots 

782 ) 

783 ) 

784 

785 

786class ClusterDataAccessCommands(DataAccessCommands): 

787 """ 

788 A class for Redis Cluster Data Access Commands 

789 

790 The class inherits from Redis's core DataAccessCommand class and do the 

791 required adjustments to work with cluster mode 

792 """ 

793 

794 def stralgo( 

795 self, 

796 algo: Literal["LCS"], 

797 value1: KeyT, 

798 value2: KeyT, 

799 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", 

800 len: bool = False, 

801 idx: bool = False, 

802 minmatchlen: Optional[int] = None, 

803 withmatchlen: bool = False, 

804 **kwargs, 

805 ) -> ResponseT: 

806 """ 

807 Implements complex algorithms that operate on strings. 

808 Right now the only algorithm implemented is the LCS algorithm 

809 (longest common substring). However new algorithms could be 

810 implemented in the future. 

811 

812 ``algo`` Right now must be LCS 

813 ``value1`` and ``value2`` Can be two strings or two keys 

814 ``specific_argument`` Specifying if the arguments to the algorithm 

815 will be keys or strings. strings is the default. 

816 ``len`` Returns just the len of the match. 

817 ``idx`` Returns the match positions in each string. 

818 ``minmatchlen`` Restrict the list of matches to the ones of a given 

819 minimal length. Can be provided only when ``idx`` set to True. 

820 ``withmatchlen`` Returns the matches with the len of the match. 

821 Can be provided only when ``idx`` set to True. 

822 

823 For more information see https://redis.io/commands/stralgo 

824 """ 

825 target_nodes = kwargs.pop("target_nodes", None) 

826 if specific_argument == "strings" and target_nodes is None: 

827 target_nodes = "default-node" 

828 kwargs.update({"target_nodes": target_nodes}) 

829 return super().stralgo( 

830 algo, 

831 value1, 

832 value2, 

833 specific_argument, 

834 len, 

835 idx, 

836 minmatchlen, 

837 withmatchlen, 

838 **kwargs, 

839 ) 

840 

841 def scan_iter( 

842 self, 

843 match: Optional[PatternT] = None, 

844 count: Optional[int] = None, 

845 _type: Optional[str] = None, 

846 **kwargs, 

847 ) -> Iterator: 

848 # Do the first query with cursor=0 for all nodes 

849 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) 

850 yield from data 

851 

852 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

853 if cursors: 

854 # Get nodes by name 

855 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

856 

857 # Iterate over each node till its cursor is 0 

858 kwargs.pop("target_nodes", None) 

859 while cursors: 

860 for name, cursor in cursors.items(): 

861 cur, data = self.scan( 

862 cursor=cursor, 

863 match=match, 

864 count=count, 

865 _type=_type, 

866 target_nodes=nodes[name], 

867 **kwargs, 

868 ) 

869 yield from data 

870 cursors[name] = cur[name] 

871 

872 cursors = { 

873 name: cursor for name, cursor in cursors.items() if cursor != 0 

874 } 

875 

876 

877class AsyncClusterDataAccessCommands( 

878 ClusterDataAccessCommands, AsyncDataAccessCommands 

879): 

880 """ 

881 A class for Redis Cluster Data Access Commands 

882 

883 The class inherits from Redis's core DataAccessCommand class and do the 

884 required adjustments to work with cluster mode 

885 """ 

886 

887 async def scan_iter( 

888 self, 

889 match: Optional[PatternT] = None, 

890 count: Optional[int] = None, 

891 _type: Optional[str] = None, 

892 **kwargs, 

893 ) -> AsyncIterator: 

894 # Do the first query with cursor=0 for all nodes 

895 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs) 

896 for value in data: 

897 yield value 

898 

899 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

900 if cursors: 

901 # Get nodes by name 

902 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

903 

904 # Iterate over each node till its cursor is 0 

905 kwargs.pop("target_nodes", None) 

906 while cursors: 

907 for name, cursor in cursors.items(): 

908 cur, data = await self.scan( 

909 cursor=cursor, 

910 match=match, 

911 count=count, 

912 _type=_type, 

913 target_nodes=nodes[name], 

914 **kwargs, 

915 ) 

916 for value in data: 

917 yield value 

918 cursors[name] = cur[name] 

919 

920 cursors = { 

921 name: cursor for name, cursor in cursors.items() if cursor != 0 

922 } 

923 

924 

925class RedisClusterCommands( 

926 ClusterMultiKeyCommands, 

927 ClusterManagementCommands, 

928 ACLCommands, 

929 PubSubCommands, 

930 ClusterDataAccessCommands, 

931 ScriptCommands, 

932 FunctionCommands, 

933 ModuleCommands, 

934 RedisModuleCommands, 

935): 

936 """ 

937 A class for all Redis Cluster commands 

938 

939 For key-based commands, the target node(s) will be internally determined 

940 by the keys' hash slot. 

941 Non-key-based commands can be executed with the 'target_nodes' argument to 

942 target specific nodes. By default, if target_nodes is not specified, the 

943 command will be executed on the default cluster node. 

944 

945 :param :target_nodes: type can be one of the followings: 

946 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

947 - 'ClusterNode' 

948 - 'list(ClusterNodes)' 

949 - 'dict(any:clusterNodes)' 

950 

951 for example: 

952 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

953 """ 

954 

955 

956class AsyncRedisClusterCommands( 

957 AsyncClusterMultiKeyCommands, 

958 AsyncClusterManagementCommands, 

959 AsyncACLCommands, 

960 AsyncClusterDataAccessCommands, 

961 AsyncScriptCommands, 

962 AsyncFunctionCommands, 

963 AsyncModuleCommands, 

964 AsyncRedisModuleCommands, 

965): 

966 """ 

967 A class for all Redis Cluster commands 

968 

969 For key-based commands, the target node(s) will be internally determined 

970 by the keys' hash slot. 

971 Non-key-based commands can be executed with the 'target_nodes' argument to 

972 target specific nodes. By default, if target_nodes is not specified, the 

973 command will be executed on the default cluster node. 

974 

975 :param :target_nodes: type can be one of the followings: 

976 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

977 - 'ClusterNode' 

978 - 'list(ClusterNodes)' 

979 - 'dict(any:clusterNodes)' 

980 

981 for example: 

982 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

983 """