Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/commands/cluster.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

198 statements  

1import asyncio 

2from typing import ( 

3 TYPE_CHECKING, 

4 Any, 

5 AsyncIterator, 

6 Dict, 

7 Iterable, 

8 Iterator, 

9 List, 

10 Literal, 

11 Mapping, 

12 NoReturn, 

13 Optional, 

14 Sequence, 

15 Union, 

16) 

17 

18from redis.crc import key_slot 

19from redis.exceptions import RedisClusterException, RedisError 

20from redis.typing import ( 

21 AnyKeyT, 

22 ClusterCommandsProtocol, 

23 EncodableT, 

24 KeysT, 

25 KeyT, 

26 PatternT, 

27 ResponseT, 

28) 

29from redis.utils import deprecated_function 

30 

31from .core import ( 

32 ACLCommands, 

33 AsyncACLCommands, 

34 AsyncDataAccessCommands, 

35 AsyncFunctionCommands, 

36 AsyncManagementCommands, 

37 AsyncModuleCommands, 

38 AsyncScriptCommands, 

39 DataAccessCommands, 

40 FunctionCommands, 

41 ManagementCommands, 

42 ModuleCommands, 

43 PubSubCommands, 

44 ScriptCommands, 

45) 

46from .helpers import list_or_args 

47from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands 

48 

49if TYPE_CHECKING: 

50 from redis.asyncio.cluster import TargetNodesT 

51 

52# Not complete, but covers the major ones 

53# https://redis.io/commands 

54READ_COMMANDS = frozenset( 

55 [ 

56 # Bit Operations 

57 "BITCOUNT", 

58 "BITFIELD_RO", 

59 "BITPOS", 

60 # Scripting 

61 "EVAL_RO", 

62 "EVALSHA_RO", 

63 "FCALL_RO", 

64 # Key Operations 

65 "DBSIZE", 

66 "DIGEST", 

67 "DUMP", 

68 "EXISTS", 

69 "EXPIRETIME", 

70 "PEXPIRETIME", 

71 "KEYS", 

72 "SCAN", 

73 "PTTL", 

74 "RANDOMKEY", 

75 "TTL", 

76 "TYPE", 

77 # String Operations 

78 "GET", 

79 "GETBIT", 

80 "GETRANGE", 

81 "MGET", 

82 "STRLEN", 

83 "LCS", 

84 # Geo Operations 

85 "GEODIST", 

86 "GEOHASH", 

87 "GEOPOS", 

88 "GEOSEARCH", 

89 # Hash Operations 

90 "HEXISTS", 

91 "HGET", 

92 "HGETALL", 

93 "HKEYS", 

94 "HLEN", 

95 "HMGET", 

96 "HSTRLEN", 

97 "HVALS", 

98 "HRANDFIELD", 

99 "HEXPIRETIME", 

100 "HPEXPIRETIME", 

101 "HTTL", 

102 "HPTTL", 

103 "HSCAN", 

104 # List Operations 

105 "LINDEX", 

106 "LPOS", 

107 "LLEN", 

108 "LRANGE", 

109 # Set Operations 

110 "SCARD", 

111 "SDIFF", 

112 "SINTER", 

113 "SINTERCARD", 

114 "SISMEMBER", 

115 "SMISMEMBER", 

116 "SMEMBERS", 

117 "SRANDMEMBER", 

118 "SUNION", 

119 "SSCAN", 

120 # Sorted Set Operations 

121 "ZCARD", 

122 "ZCOUNT", 

123 "ZDIFF", 

124 "ZINTER", 

125 "ZINTERCARD", 

126 "ZLEXCOUNT", 

127 "ZMSCORE", 

128 "ZRANDMEMBER", 

129 "ZRANGE", 

130 "ZRANGEBYLEX", 

131 "ZRANGEBYSCORE", 

132 "ZRANK", 

133 "ZREVRANGE", 

134 "ZREVRANGEBYLEX", 

135 "ZREVRANGEBYSCORE", 

136 "ZREVRANK", 

137 "ZSCAN", 

138 "ZSCORE", 

139 "ZUNION", 

140 # Stream Operations 

141 "XLEN", 

142 "XPENDING", 

143 "XRANGE", 

144 "XREAD", 

145 "XREVRANGE", 

146 # JSON Module 

147 "JSON.ARRINDEX", 

148 "JSON.ARRLEN", 

149 "JSON.GET", 

150 "JSON.MGET", 

151 "JSON.OBJKEYS", 

152 "JSON.OBJLEN", 

153 "JSON.RESP", 

154 "JSON.STRLEN", 

155 "JSON.TYPE", 

156 # RediSearch Module 

157 "FT.EXPLAIN", 

158 "FT.INFO", 

159 "FT.PROFILE", 

160 "FT.SEARCH", 

161 ] 

162) 

163 

164 

165class ClusterMultiKeyCommands(ClusterCommandsProtocol): 

166 """ 

167 A class containing commands that handle more than one key 

168 """ 

169 

170 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]: 

171 """Split keys into a dictionary that maps a slot to a list of keys.""" 

172 

173 slots_to_keys = {} 

174 for key in keys: 

175 slot = key_slot(self.encoder.encode(key)) 

176 slots_to_keys.setdefault(slot, []).append(key) 

177 

178 return slots_to_keys 

179 

180 def _partition_pairs_by_slot( 

181 self, mapping: Mapping[AnyKeyT, EncodableT] 

182 ) -> Dict[int, List[EncodableT]]: 

183 """Split pairs into a dictionary that maps a slot to a list of pairs.""" 

184 

185 slots_to_pairs = {} 

186 for pair in mapping.items(): 

187 slot = key_slot(self.encoder.encode(pair[0])) 

188 slots_to_pairs.setdefault(slot, []).extend(pair) 

189 

190 return slots_to_pairs 

191 

192 def _execute_pipeline_by_slot( 

193 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

194 ) -> List[Any]: 

195 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

196 pipe = self.pipeline() 

197 [ 

198 pipe.execute_command( 

199 command, 

200 *slot_args, 

201 target_nodes=[ 

202 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

203 ], 

204 ) 

205 for slot, slot_args in slots_to_args.items() 

206 ] 

207 return pipe.execute() 

208 

209 def _reorder_keys_by_command( 

210 self, 

211 keys: Iterable[KeyT], 

212 slots_to_args: Mapping[int, Iterable[EncodableT]], 

213 responses: Iterable[Any], 

214 ) -> List[Any]: 

215 results = { 

216 k: v 

217 for slot_values, response in zip(slots_to_args.values(), responses) 

218 for k, v in zip(slot_values, response) 

219 } 

220 return [results[key] for key in keys] 

221 

222 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

223 """ 

224 Splits the keys into different slots and then calls MGET 

225 for the keys of every slot. This operation will not be atomic 

226 if keys belong to more than one slot. 

227 

228 Returns a list of values ordered identically to ``keys`` 

229 

230 For more information see https://redis.io/commands/mget 

231 """ 

232 

233 # Concatenate all keys into a list 

234 keys = list_or_args(keys, args) 

235 

236 # Split keys into slots 

237 slots_to_keys = self._partition_keys_by_slot(keys) 

238 

239 # Execute commands using a pipeline 

240 res = self._execute_pipeline_by_slot("MGET", slots_to_keys) 

241 

242 # Reorder keys in the order the user provided & return 

243 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

244 

245 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

246 """ 

247 Sets key/values based on a mapping. Mapping is a dictionary of 

248 key/value pairs. Both keys and values should be strings or types that 

249 can be cast to a string via str(). 

250 

251 Splits the keys into different slots and then calls MSET 

252 for the keys of every slot. This operation will not be atomic 

253 if keys belong to more than one slot. 

254 

255 For more information see https://redis.io/commands/mset 

256 """ 

257 

258 # Partition the keys by slot 

259 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

260 

261 # Execute commands using a pipeline & return list of replies 

262 return self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

263 

264 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

265 """ 

266 Runs the given command once for the keys 

267 of each slot. Returns the sum of the return values. 

268 """ 

269 

270 # Partition the keys by slot 

271 slots_to_keys = self._partition_keys_by_slot(keys) 

272 

273 # Sum up the reply from each command 

274 return sum(self._execute_pipeline_by_slot(command, slots_to_keys)) 

275 

276 def exists(self, *keys: KeyT) -> ResponseT: 

277 """ 

278 Returns the number of ``names`` that exist in the 

279 whole cluster. The keys are first split up into slots 

280 and then an EXISTS command is sent for every slot 

281 

282 For more information see https://redis.io/commands/exists 

283 """ 

284 return self._split_command_across_slots("EXISTS", *keys) 

285 

286 def delete(self, *keys: KeyT) -> ResponseT: 

287 """ 

288 Deletes the given keys in the cluster. 

289 The keys are first split up into slots 

290 and then an DEL command is sent for every slot 

291 

292 Non-existent keys are ignored. 

293 Returns the number of keys that were deleted. 

294 

295 For more information see https://redis.io/commands/del 

296 """ 

297 return self._split_command_across_slots("DEL", *keys) 

298 

299 def touch(self, *keys: KeyT) -> ResponseT: 

300 """ 

301 Updates the last access time of given keys across the 

302 cluster. 

303 

304 The keys are first split up into slots 

305 and then an TOUCH command is sent for every slot 

306 

307 Non-existent keys are ignored. 

308 Returns the number of keys that were touched. 

309 

310 For more information see https://redis.io/commands/touch 

311 """ 

312 return self._split_command_across_slots("TOUCH", *keys) 

313 

314 def unlink(self, *keys: KeyT) -> ResponseT: 

315 """ 

316 Remove the specified keys in a different thread. 

317 

318 The keys are first split up into slots 

319 and then an TOUCH command is sent for every slot 

320 

321 Non-existent keys are ignored. 

322 Returns the number of keys that were unlinked. 

323 

324 For more information see https://redis.io/commands/unlink 

325 """ 

326 return self._split_command_across_slots("UNLINK", *keys) 

327 

328 

329class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands): 

330 """ 

331 A class containing commands that handle more than one key 

332 """ 

333 

334 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

335 """ 

336 Splits the keys into different slots and then calls MGET 

337 for the keys of every slot. This operation will not be atomic 

338 if keys belong to more than one slot. 

339 

340 Returns a list of values ordered identically to ``keys`` 

341 

342 For more information see https://redis.io/commands/mget 

343 """ 

344 

345 # Concatenate all keys into a list 

346 keys = list_or_args(keys, args) 

347 

348 # Split keys into slots 

349 slots_to_keys = self._partition_keys_by_slot(keys) 

350 

351 # Execute commands using a pipeline 

352 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys) 

353 

354 # Reorder keys in the order the user provided & return 

355 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

356 

357 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

358 """ 

359 Sets key/values based on a mapping. Mapping is a dictionary of 

360 key/value pairs. Both keys and values should be strings or types that 

361 can be cast to a string via str(). 

362 

363 Splits the keys into different slots and then calls MSET 

364 for the keys of every slot. This operation will not be atomic 

365 if keys belong to more than one slot. 

366 

367 For more information see https://redis.io/commands/mset 

368 """ 

369 

370 # Partition the keys by slot 

371 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

372 

373 # Execute commands using a pipeline & return list of replies 

374 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

375 

376 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

377 """ 

378 Runs the given command once for the keys 

379 of each slot. Returns the sum of the return values. 

380 """ 

381 

382 # Partition the keys by slot 

383 slots_to_keys = self._partition_keys_by_slot(keys) 

384 

385 # Sum up the reply from each command 

386 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys)) 

387 

388 async def _execute_pipeline_by_slot( 

389 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

390 ) -> List[Any]: 

391 if self._initialize: 

392 await self.initialize() 

393 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

394 pipe = self.pipeline() 

395 [ 

396 pipe.execute_command( 

397 command, 

398 *slot_args, 

399 target_nodes=[ 

400 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

401 ], 

402 ) 

403 for slot, slot_args in slots_to_args.items() 

404 ] 

405 return await pipe.execute() 

406 

407 

408class ClusterManagementCommands(ManagementCommands): 

409 """ 

410 A class for Redis Cluster management commands 

411 

412 The class inherits from Redis's core ManagementCommands class and do the 

413 required adjustments to work with cluster mode 

414 """ 

415 

416 def slaveof(self, *args, **kwargs) -> NoReturn: 

417 """ 

418 Make the server a replica of another instance, or promote it as master. 

419 

420 For more information see https://redis.io/commands/slaveof 

421 """ 

422 raise RedisClusterException("SLAVEOF is not supported in cluster mode") 

423 

424 def replicaof(self, *args, **kwargs) -> NoReturn: 

425 """ 

426 Make the server a replica of another instance, or promote it as master. 

427 

428 For more information see https://redis.io/commands/replicaof 

429 """ 

430 raise RedisClusterException("REPLICAOF is not supported in cluster mode") 

431 

432 def swapdb(self, *args, **kwargs) -> NoReturn: 

433 """ 

434 Swaps two Redis databases. 

435 

436 For more information see https://redis.io/commands/swapdb 

437 """ 

438 raise RedisClusterException("SWAPDB is not supported in cluster mode") 

439 

440 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT: 

441 """ 

442 Returns the node's id. 

443 

444 :target_node: 'ClusterNode' 

445 The node to execute the command on 

446 

447 For more information check https://redis.io/commands/cluster-myid/ 

448 """ 

449 return self.execute_command("CLUSTER MYID", target_nodes=target_node) 

450 

451 def cluster_addslots( 

452 self, target_node: "TargetNodesT", *slots: EncodableT 

453 ) -> ResponseT: 

454 """ 

455 Assign new hash slots to receiving node. Sends to specified node. 

456 

457 :target_node: 'ClusterNode' 

458 The node to execute the command on 

459 

460 For more information see https://redis.io/commands/cluster-addslots 

461 """ 

462 return self.execute_command( 

463 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node 

464 ) 

465 

466 def cluster_addslotsrange( 

467 self, target_node: "TargetNodesT", *slots: EncodableT 

468 ) -> ResponseT: 

469 """ 

470 Similar to the CLUSTER ADDSLOTS command. 

471 The difference between the two commands is that ADDSLOTS takes a list of slots 

472 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges 

473 (specified by start and end slots) to assign to the node. 

474 

475 :target_node: 'ClusterNode' 

476 The node to execute the command on 

477 

478 For more information see https://redis.io/commands/cluster-addslotsrange 

479 """ 

480 return self.execute_command( 

481 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node 

482 ) 

483 

484 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT: 

485 """ 

486 Return the number of local keys in the specified hash slot 

487 Send to node based on specified slot_id 

488 

489 For more information see https://redis.io/commands/cluster-countkeysinslot 

490 """ 

491 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id) 

492 

493 def cluster_count_failure_report(self, node_id: str) -> ResponseT: 

494 """ 

495 Return the number of failure reports active for a given node 

496 Sends to a random node 

497 

498 For more information see https://redis.io/commands/cluster-count-failure-reports 

499 """ 

500 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id) 

501 

502 def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

503 """ 

504 Set hash slots as unbound in the cluster. 

505 It determines by it self what node the slot is in and sends it there 

506 

507 Returns a list of the results for each processed slot. 

508 

509 For more information see https://redis.io/commands/cluster-delslots 

510 """ 

511 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots] 

512 

513 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT: 

514 """ 

515 Similar to the CLUSTER DELSLOTS command. 

516 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove 

517 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove 

518 from the node. 

519 

520 For more information see https://redis.io/commands/cluster-delslotsrange 

521 """ 

522 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots) 

523 

524 def cluster_failover( 

525 self, target_node: "TargetNodesT", option: Optional[str] = None 

526 ) -> ResponseT: 

527 """ 

528 Forces a slave to perform a manual failover of its master 

529 Sends to specified node 

530 

531 :target_node: 'ClusterNode' 

532 The node to execute the command on 

533 

534 For more information see https://redis.io/commands/cluster-failover 

535 """ 

536 if option: 

537 if option.upper() not in ["FORCE", "TAKEOVER"]: 

538 raise RedisError( 

539 f"Invalid option for CLUSTER FAILOVER command: {option}" 

540 ) 

541 else: 

542 return self.execute_command( 

543 "CLUSTER FAILOVER", option, target_nodes=target_node 

544 ) 

545 else: 

546 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node) 

547 

548 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

549 """ 

550 Provides info about Redis Cluster node state. 

551 The command will be sent to a random node in the cluster if no target 

552 node is specified. 

553 

554 For more information see https://redis.io/commands/cluster-info 

555 """ 

556 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes) 

557 

558 def cluster_keyslot(self, key: str) -> ResponseT: 

559 """ 

560 Returns the hash slot of the specified key 

561 Sends to random node in the cluster 

562 

563 For more information see https://redis.io/commands/cluster-keyslot 

564 """ 

565 return self.execute_command("CLUSTER KEYSLOT", key) 

566 

567 def cluster_meet( 

568 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None 

569 ) -> ResponseT: 

570 """ 

571 Force a node cluster to handshake with another node. 

572 Sends to specified node. 

573 

574 For more information see https://redis.io/commands/cluster-meet 

575 """ 

576 return self.execute_command( 

577 "CLUSTER MEET", host, port, target_nodes=target_nodes 

578 ) 

579 

580 def cluster_nodes(self) -> ResponseT: 

581 """ 

582 Get Cluster config for the node. 

583 Sends to random node in the cluster 

584 

585 For more information see https://redis.io/commands/cluster-nodes 

586 """ 

587 return self.execute_command("CLUSTER NODES") 

588 

589 def cluster_replicate( 

590 self, target_nodes: "TargetNodesT", node_id: str 

591 ) -> ResponseT: 

592 """ 

593 Reconfigure a node as a slave of the specified master node 

594 

595 For more information see https://redis.io/commands/cluster-replicate 

596 """ 

597 return self.execute_command( 

598 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes 

599 ) 

600 

601 def cluster_reset( 

602 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None 

603 ) -> ResponseT: 

604 """ 

605 Reset a Redis Cluster node 

606 

607 If 'soft' is True then it will send 'SOFT' argument 

608 If 'soft' is False then it will send 'HARD' argument 

609 

610 For more information see https://redis.io/commands/cluster-reset 

611 """ 

612 return self.execute_command( 

613 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes 

614 ) 

615 

616 def cluster_save_config( 

617 self, target_nodes: Optional["TargetNodesT"] = None 

618 ) -> ResponseT: 

619 """ 

620 Forces the node to save cluster state on disk 

621 

622 For more information see https://redis.io/commands/cluster-saveconfig 

623 """ 

624 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes) 

625 

626 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT: 

627 """ 

628 Returns the number of keys in the specified cluster slot 

629 

630 For more information see https://redis.io/commands/cluster-getkeysinslot 

631 """ 

632 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) 

633 

634 def cluster_set_config_epoch( 

635 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None 

636 ) -> ResponseT: 

637 """ 

638 Set the configuration epoch in a new node 

639 

640 For more information see https://redis.io/commands/cluster-set-config-epoch 

641 """ 

642 return self.execute_command( 

643 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes 

644 ) 

645 

646 def cluster_setslot( 

647 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str 

648 ) -> ResponseT: 

649 """ 

650 Bind an hash slot to a specific node 

651 

652 :target_node: 'ClusterNode' 

653 The node to execute the command on 

654 

655 For more information see https://redis.io/commands/cluster-setslot 

656 """ 

657 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"): 

658 return self.execute_command( 

659 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node 

660 ) 

661 elif state.upper() == "STABLE": 

662 raise RedisError('For "stable" state please use cluster_setslot_stable') 

663 else: 

664 raise RedisError(f"Invalid slot state: {state}") 

665 

666 def cluster_setslot_stable(self, slot_id: int) -> ResponseT: 

667 """ 

668 Clears migrating / importing state from the slot. 

669 It determines by it self what node the slot is in and sends it there. 

670 

671 For more information see https://redis.io/commands/cluster-setslot 

672 """ 

673 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") 

674 

675 def cluster_replicas( 

676 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None 

677 ) -> ResponseT: 

678 """ 

679 Provides a list of replica nodes replicating from the specified primary 

680 target node. 

681 

682 For more information see https://redis.io/commands/cluster-replicas 

683 """ 

684 return self.execute_command( 

685 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes 

686 ) 

687 

688 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

689 """ 

690 Get array of Cluster slot to node mappings 

691 

692 For more information see https://redis.io/commands/cluster-slots 

693 """ 

694 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes) 

695 

696 def cluster_shards(self, target_nodes=None): 

697 """ 

698 Returns details about the shards of the cluster. 

699 

700 For more information see https://redis.io/commands/cluster-shards 

701 """ 

702 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes) 

703 

704 def cluster_myshardid(self, target_nodes=None): 

705 """ 

706 Returns the shard ID of the node. 

707 

708 For more information see https://redis.io/commands/cluster-myshardid/ 

709 """ 

710 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes) 

711 

712 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT: 

713 """ 

714 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each 

715 peer in the cluster: One for sending outbound messages towards the peer and one 

716 for receiving inbound messages from the peer. 

717 

718 This command outputs information of all such peer links as an array. 

719 

720 For more information see https://redis.io/commands/cluster-links 

721 """ 

722 return self.execute_command("CLUSTER LINKS", target_nodes=target_node) 

723 

724 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

725 raise NotImplementedError( 

726 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client." 

727 ) 

728 

729 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

730 raise NotImplementedError( 

731 "CLUSTER BUMPEPOCH is intentionally not implemented in the client." 

732 ) 

733 

734 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

735 """ 

736 Enables read queries. 

737 The command will be sent to the default cluster node if target_nodes is 

738 not specified. 

739 

740 For more information see https://redis.io/commands/readonly 

741 """ 

742 if target_nodes == "replicas" or target_nodes == "all": 

743 # read_from_replicas will only be enabled if the READONLY command 

744 # is sent to all replicas 

745 self.read_from_replicas = True 

746 return self.execute_command("READONLY", target_nodes=target_nodes) 

747 

748 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

749 """ 

750 Disables read queries. 

751 The command will be sent to the default cluster node if target_nodes is 

752 not specified. 

753 

754 For more information see https://redis.io/commands/readwrite 

755 """ 

756 # Reset read from replicas flag 

757 self.read_from_replicas = False 

758 return self.execute_command("READWRITE", target_nodes=target_nodes) 

759 

760 @deprecated_function( 

761 version="7.2.0", 

762 reason="Use client-side caching feature instead.", 

763 ) 

764 def client_tracking_on( 

765 self, 

766 clientid: Optional[int] = None, 

767 prefix: Sequence[KeyT] = [], 

768 bcast: bool = False, 

769 optin: bool = False, 

770 optout: bool = False, 

771 noloop: bool = False, 

772 target_nodes: Optional["TargetNodesT"] = "all", 

773 ) -> ResponseT: 

774 """ 

775 Enables the tracking feature of the Redis server, that is used 

776 for server assisted client side caching. 

777 

778 When clientid is provided - in target_nodes only the node that owns the 

779 connection with this id should be provided. 

780 When clientid is not provided - target_nodes can be any node. 

781 

782 For more information see https://redis.io/commands/client-tracking 

783 """ 

784 return self.client_tracking( 

785 True, 

786 clientid, 

787 prefix, 

788 bcast, 

789 optin, 

790 optout, 

791 noloop, 

792 target_nodes=target_nodes, 

793 ) 

794 

795 @deprecated_function( 

796 version="7.2.0", 

797 reason="Use client-side caching feature instead.", 

798 ) 

799 def client_tracking_off( 

800 self, 

801 clientid: Optional[int] = None, 

802 prefix: Sequence[KeyT] = [], 

803 bcast: bool = False, 

804 optin: bool = False, 

805 optout: bool = False, 

806 noloop: bool = False, 

807 target_nodes: Optional["TargetNodesT"] = "all", 

808 ) -> ResponseT: 

809 """ 

810 Disables the tracking feature of the Redis server, that is used 

811 for server assisted client side caching. 

812 

813 When clientid is provided - in target_nodes only the node that owns the 

814 connection with this id should be provided. 

815 When clientid is not provided - target_nodes can be any node. 

816 

817 For more information see https://redis.io/commands/client-tracking 

818 """ 

819 return self.client_tracking( 

820 False, 

821 clientid, 

822 prefix, 

823 bcast, 

824 optin, 

825 optout, 

826 noloop, 

827 target_nodes=target_nodes, 

828 ) 

829 

830 

831class AsyncClusterManagementCommands( 

832 ClusterManagementCommands, AsyncManagementCommands 

833): 

834 """ 

835 A class for Redis Cluster management commands 

836 

837 The class inherits from Redis's core ManagementCommands class and do the 

838 required adjustments to work with cluster mode 

839 """ 

840 

841 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

842 """ 

843 Set hash slots as unbound in the cluster. 

844 It determines by it self what node the slot is in and sends it there 

845 

846 Returns a list of the results for each processed slot. 

847 

848 For more information see https://redis.io/commands/cluster-delslots 

849 """ 

850 return await asyncio.gather( 

851 *( 

852 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot)) 

853 for slot in slots 

854 ) 

855 ) 

856 

857 @deprecated_function( 

858 version="7.2.0", 

859 reason="Use client-side caching feature instead.", 

860 ) 

861 async def client_tracking_on( 

862 self, 

863 clientid: Optional[int] = None, 

864 prefix: Sequence[KeyT] = [], 

865 bcast: bool = False, 

866 optin: bool = False, 

867 optout: bool = False, 

868 noloop: bool = False, 

869 target_nodes: Optional["TargetNodesT"] = "all", 

870 ) -> ResponseT: 

871 """ 

872 Enables the tracking feature of the Redis server, that is used 

873 for server assisted client side caching. 

874 

875 When clientid is provided - in target_nodes only the node that owns the 

876 connection with this id should be provided. 

877 When clientid is not provided - target_nodes can be any node. 

878 

879 For more information see https://redis.io/commands/client-tracking 

880 """ 

881 return await self.client_tracking( 

882 True, 

883 clientid, 

884 prefix, 

885 bcast, 

886 optin, 

887 optout, 

888 noloop, 

889 target_nodes=target_nodes, 

890 ) 

891 

892 @deprecated_function( 

893 version="7.2.0", 

894 reason="Use client-side caching feature instead.", 

895 ) 

896 async def client_tracking_off( 

897 self, 

898 clientid: Optional[int] = None, 

899 prefix: Sequence[KeyT] = [], 

900 bcast: bool = False, 

901 optin: bool = False, 

902 optout: bool = False, 

903 noloop: bool = False, 

904 target_nodes: Optional["TargetNodesT"] = "all", 

905 ) -> ResponseT: 

906 """ 

907 Disables the tracking feature of the Redis server, that is used 

908 for server assisted client side caching. 

909 

910 When clientid is provided - in target_nodes only the node that owns the 

911 connection with this id should be provided. 

912 When clientid is not provided - target_nodes can be any node. 

913 

914 For more information see https://redis.io/commands/client-tracking 

915 """ 

916 return await self.client_tracking( 

917 False, 

918 clientid, 

919 prefix, 

920 bcast, 

921 optin, 

922 optout, 

923 noloop, 

924 target_nodes=target_nodes, 

925 ) 

926 

927 

928class ClusterDataAccessCommands(DataAccessCommands): 

929 """ 

930 A class for Redis Cluster Data Access Commands 

931 

932 The class inherits from Redis's core DataAccessCommand class and do the 

933 required adjustments to work with cluster mode 

934 """ 

935 

936 def stralgo( 

937 self, 

938 algo: Literal["LCS"], 

939 value1: KeyT, 

940 value2: KeyT, 

941 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", 

942 len: bool = False, 

943 idx: bool = False, 

944 minmatchlen: Optional[int] = None, 

945 withmatchlen: bool = False, 

946 **kwargs, 

947 ) -> ResponseT: 

948 """ 

949 Implements complex algorithms that operate on strings. 

950 Right now the only algorithm implemented is the LCS algorithm 

951 (longest common substring). However new algorithms could be 

952 implemented in the future. 

953 

954 ``algo`` Right now must be LCS 

955 ``value1`` and ``value2`` Can be two strings or two keys 

956 ``specific_argument`` Specifying if the arguments to the algorithm 

957 will be keys or strings. strings is the default. 

958 ``len`` Returns just the len of the match. 

959 ``idx`` Returns the match positions in each string. 

960 ``minmatchlen`` Restrict the list of matches to the ones of a given 

961 minimal length. Can be provided only when ``idx`` set to True. 

962 ``withmatchlen`` Returns the matches with the len of the match. 

963 Can be provided only when ``idx`` set to True. 

964 

965 For more information see https://redis.io/commands/stralgo 

966 """ 

967 target_nodes = kwargs.pop("target_nodes", None) 

968 if specific_argument == "strings" and target_nodes is None: 

969 target_nodes = "default-node" 

970 kwargs.update({"target_nodes": target_nodes}) 

971 return super().stralgo( 

972 algo, 

973 value1, 

974 value2, 

975 specific_argument, 

976 len, 

977 idx, 

978 minmatchlen, 

979 withmatchlen, 

980 **kwargs, 

981 ) 

982 

983 def scan_iter( 

984 self, 

985 match: Optional[PatternT] = None, 

986 count: Optional[int] = None, 

987 _type: Optional[str] = None, 

988 **kwargs, 

989 ) -> Iterator: 

990 # Do the first query with cursor=0 for all nodes 

991 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) 

992 yield from data 

993 

994 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

995 if cursors: 

996 # Get nodes by name 

997 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

998 

999 # Iterate over each node till its cursor is 0 

1000 kwargs.pop("target_nodes", None) 

1001 while cursors: 

1002 for name, cursor in cursors.items(): 

1003 cur, data = self.scan( 

1004 cursor=cursor, 

1005 match=match, 

1006 count=count, 

1007 _type=_type, 

1008 target_nodes=nodes[name], 

1009 **kwargs, 

1010 ) 

1011 yield from data 

1012 cursors[name] = cur[name] 

1013 

1014 cursors = { 

1015 name: cursor for name, cursor in cursors.items() if cursor != 0 

1016 } 

1017 

1018 

1019class AsyncClusterDataAccessCommands( 

1020 ClusterDataAccessCommands, AsyncDataAccessCommands 

1021): 

1022 """ 

1023 A class for Redis Cluster Data Access Commands 

1024 

1025 The class inherits from Redis's core DataAccessCommand class and do the 

1026 required adjustments to work with cluster mode 

1027 """ 

1028 

1029 async def scan_iter( 

1030 self, 

1031 match: Optional[PatternT] = None, 

1032 count: Optional[int] = None, 

1033 _type: Optional[str] = None, 

1034 **kwargs, 

1035 ) -> AsyncIterator: 

1036 # Do the first query with cursor=0 for all nodes 

1037 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs) 

1038 for value in data: 

1039 yield value 

1040 

1041 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

1042 if cursors: 

1043 # Get nodes by name 

1044 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

1045 

1046 # Iterate over each node till its cursor is 0 

1047 kwargs.pop("target_nodes", None) 

1048 while cursors: 

1049 for name, cursor in cursors.items(): 

1050 cur, data = await self.scan( 

1051 cursor=cursor, 

1052 match=match, 

1053 count=count, 

1054 _type=_type, 

1055 target_nodes=nodes[name], 

1056 **kwargs, 

1057 ) 

1058 for value in data: 

1059 yield value 

1060 cursors[name] = cur[name] 

1061 

1062 cursors = { 

1063 name: cursor for name, cursor in cursors.items() if cursor != 0 

1064 } 

1065 

1066 

1067class RedisClusterCommands( 

1068 ClusterMultiKeyCommands, 

1069 ClusterManagementCommands, 

1070 ACLCommands, 

1071 PubSubCommands, 

1072 ClusterDataAccessCommands, 

1073 ScriptCommands, 

1074 FunctionCommands, 

1075 ModuleCommands, 

1076 RedisModuleCommands, 

1077): 

1078 """ 

1079 A class for all Redis Cluster commands 

1080 

1081 For key-based commands, the target node(s) will be internally determined 

1082 by the keys' hash slot. 

1083 Non-key-based commands can be executed with the 'target_nodes' argument to 

1084 target specific nodes. By default, if target_nodes is not specified, the 

1085 command will be executed on the default cluster node. 

1086 

1087 :param :target_nodes: type can be one of the followings: 

1088 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

1089 - 'ClusterNode' 

1090 - 'list(ClusterNodes)' 

1091 - 'dict(any:clusterNodes)' 

1092 

1093 for example: 

1094 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

1095 """ 

1096 

1097 

1098class AsyncRedisClusterCommands( 

1099 AsyncClusterMultiKeyCommands, 

1100 AsyncClusterManagementCommands, 

1101 AsyncACLCommands, 

1102 AsyncClusterDataAccessCommands, 

1103 AsyncScriptCommands, 

1104 AsyncFunctionCommands, 

1105 AsyncModuleCommands, 

1106 AsyncRedisModuleCommands, 

1107): 

1108 """ 

1109 A class for all Redis Cluster commands 

1110 

1111 For key-based commands, the target node(s) will be internally determined 

1112 by the keys' hash slot. 

1113 Non-key-based commands can be executed with the 'target_nodes' argument to 

1114 target specific nodes. By default, if target_nodes is not specified, the 

1115 command will be executed on the default cluster node. 

1116 

1117 :param :target_nodes: type can be one of the followings: 

1118 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

1119 - 'ClusterNode' 

1120 - 'list(ClusterNodes)' 

1121 - 'dict(any:clusterNodes)' 

1122 

1123 for example: 

1124 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

1125 """