Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/commands/cluster.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

214 statements  

1import asyncio 

2from typing import ( 

3 TYPE_CHECKING, 

4 Any, 

5 AsyncIterator, 

6 Awaitable, 

7 Dict, 

8 Iterable, 

9 Iterator, 

10 List, 

11 Literal, 

12 Mapping, 

13 NoReturn, 

14 Optional, 

15 Sequence, 

16 Union, 

17) 

18 

19from redis.crc import key_slot 

20from redis.exceptions import RedisClusterException, RedisError 

21from redis.typing import ( 

22 AnyKeyT, 

23 ClusterCommandsProtocol, 

24 EncodableT, 

25 KeysT, 

26 KeyT, 

27 PatternT, 

28 ResponseT, 

29) 

30from redis.utils import deprecated_function 

31 

32from .core import ( 

33 ACLCommands, 

34 AsyncACLCommands, 

35 AsyncDataAccessCommands, 

36 AsyncFunctionCommands, 

37 AsyncManagementCommands, 

38 AsyncModuleCommands, 

39 AsyncScriptCommands, 

40 DataAccessCommands, 

41 FunctionCommands, 

42 HotkeysMetricsTypes, 

43 ManagementCommands, 

44 ModuleCommands, 

45 PubSubCommands, 

46 ScriptCommands, 

47) 

48from .helpers import list_or_args 

49from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands 

50 

51if TYPE_CHECKING: 

52 from redis.asyncio.cluster import TargetNodesT 

53 

54# Not complete, but covers the major ones 

55# https://redis.io/commands 

56READ_COMMANDS = frozenset( 

57 [ 

58 # Bit Operations 

59 "BITCOUNT", 

60 "BITFIELD_RO", 

61 "BITPOS", 

62 # Scripting 

63 "EVAL_RO", 

64 "EVALSHA_RO", 

65 "FCALL_RO", 

66 # Key Operations 

67 "DBSIZE", 

68 "DIGEST", 

69 "DUMP", 

70 "EXISTS", 

71 "EXPIRETIME", 

72 "PEXPIRETIME", 

73 "KEYS", 

74 "SCAN", 

75 "PTTL", 

76 "RANDOMKEY", 

77 "TTL", 

78 "TYPE", 

79 # String Operations 

80 "GET", 

81 "GETBIT", 

82 "GETRANGE", 

83 "MGET", 

84 "STRLEN", 

85 "LCS", 

86 # Geo Operations 

87 "GEODIST", 

88 "GEOHASH", 

89 "GEOPOS", 

90 "GEOSEARCH", 

91 # Hash Operations 

92 "HEXISTS", 

93 "HGET", 

94 "HGETALL", 

95 "HKEYS", 

96 "HLEN", 

97 "HMGET", 

98 "HSTRLEN", 

99 "HVALS", 

100 "HRANDFIELD", 

101 "HEXPIRETIME", 

102 "HPEXPIRETIME", 

103 "HTTL", 

104 "HPTTL", 

105 "HSCAN", 

106 # List Operations 

107 "LINDEX", 

108 "LPOS", 

109 "LLEN", 

110 "LRANGE", 

111 # Set Operations 

112 "SCARD", 

113 "SDIFF", 

114 "SINTER", 

115 "SINTERCARD", 

116 "SISMEMBER", 

117 "SMISMEMBER", 

118 "SMEMBERS", 

119 "SRANDMEMBER", 

120 "SUNION", 

121 "SSCAN", 

122 # Sorted Set Operations 

123 "ZCARD", 

124 "ZCOUNT", 

125 "ZDIFF", 

126 "ZINTER", 

127 "ZINTERCARD", 

128 "ZLEXCOUNT", 

129 "ZMSCORE", 

130 "ZRANDMEMBER", 

131 "ZRANGE", 

132 "ZRANGEBYLEX", 

133 "ZRANGEBYSCORE", 

134 "ZRANK", 

135 "ZREVRANGE", 

136 "ZREVRANGEBYLEX", 

137 "ZREVRANGEBYSCORE", 

138 "ZREVRANK", 

139 "ZSCAN", 

140 "ZSCORE", 

141 "ZUNION", 

142 # Stream Operations 

143 "XLEN", 

144 "XPENDING", 

145 "XRANGE", 

146 "XREAD", 

147 "XREVRANGE", 

148 # JSON Module 

149 "JSON.ARRINDEX", 

150 "JSON.ARRLEN", 

151 "JSON.GET", 

152 "JSON.MGET", 

153 "JSON.OBJKEYS", 

154 "JSON.OBJLEN", 

155 "JSON.RESP", 

156 "JSON.STRLEN", 

157 "JSON.TYPE", 

158 # RediSearch Module 

159 "FT.EXPLAIN", 

160 "FT.INFO", 

161 "FT.PROFILE", 

162 "FT.SEARCH", 

163 ] 

164) 

165 

166 

167class ClusterMultiKeyCommands(ClusterCommandsProtocol): 

168 """ 

169 A class containing commands that handle more than one key 

170 """ 

171 

172 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]: 

173 """Split keys into a dictionary that maps a slot to a list of keys.""" 

174 

175 slots_to_keys = {} 

176 for key in keys: 

177 slot = key_slot(self.encoder.encode(key)) 

178 slots_to_keys.setdefault(slot, []).append(key) 

179 

180 return slots_to_keys 

181 

182 def _partition_pairs_by_slot( 

183 self, mapping: Mapping[AnyKeyT, EncodableT] 

184 ) -> Dict[int, List[EncodableT]]: 

185 """Split pairs into a dictionary that maps a slot to a list of pairs.""" 

186 

187 slots_to_pairs = {} 

188 for pair in mapping.items(): 

189 slot = key_slot(self.encoder.encode(pair[0])) 

190 slots_to_pairs.setdefault(slot, []).extend(pair) 

191 

192 return slots_to_pairs 

193 

194 def _execute_pipeline_by_slot( 

195 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

196 ) -> List[Any]: 

197 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

198 pipe = self.pipeline() 

199 [ 

200 pipe.execute_command( 

201 command, 

202 *slot_args, 

203 target_nodes=[ 

204 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

205 ], 

206 ) 

207 for slot, slot_args in slots_to_args.items() 

208 ] 

209 return pipe.execute() 

210 

211 def _reorder_keys_by_command( 

212 self, 

213 keys: Iterable[KeyT], 

214 slots_to_args: Mapping[int, Iterable[EncodableT]], 

215 responses: Iterable[Any], 

216 ) -> List[Any]: 

217 results = { 

218 k: v 

219 for slot_values, response in zip(slots_to_args.values(), responses) 

220 for k, v in zip(slot_values, response) 

221 } 

222 return [results[key] for key in keys] 

223 

224 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

225 """ 

226 Splits the keys into different slots and then calls MGET 

227 for the keys of every slot. This operation will not be atomic 

228 if keys belong to more than one slot. 

229 

230 Returns a list of values ordered identically to ``keys`` 

231 

232 For more information see https://redis.io/commands/mget 

233 """ 

234 

235 # Concatenate all keys into a list 

236 keys = list_or_args(keys, args) 

237 

238 # Split keys into slots 

239 slots_to_keys = self._partition_keys_by_slot(keys) 

240 

241 # Execute commands using a pipeline 

242 res = self._execute_pipeline_by_slot("MGET", slots_to_keys) 

243 

244 # Reorder keys in the order the user provided & return 

245 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

246 

247 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

248 """ 

249 Sets key/values based on a mapping. Mapping is a dictionary of 

250 key/value pairs. Both keys and values should be strings or types that 

251 can be cast to a string via str(). 

252 

253 Splits the keys into different slots and then calls MSET 

254 for the keys of every slot. This operation will not be atomic 

255 if keys belong to more than one slot. 

256 

257 For more information see https://redis.io/commands/mset 

258 """ 

259 

260 # Partition the keys by slot 

261 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

262 

263 # Execute commands using a pipeline & return list of replies 

264 return self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

265 

266 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

267 """ 

268 Runs the given command once for the keys 

269 of each slot. Returns the sum of the return values. 

270 """ 

271 

272 # Partition the keys by slot 

273 slots_to_keys = self._partition_keys_by_slot(keys) 

274 

275 # Sum up the reply from each command 

276 return sum(self._execute_pipeline_by_slot(command, slots_to_keys)) 

277 

278 def exists(self, *keys: KeyT) -> ResponseT: 

279 """ 

280 Returns the number of ``names`` that exist in the 

281 whole cluster. The keys are first split up into slots 

282 and then an EXISTS command is sent for every slot 

283 

284 For more information see https://redis.io/commands/exists 

285 """ 

286 return self._split_command_across_slots("EXISTS", *keys) 

287 

288 def delete(self, *keys: KeyT) -> ResponseT: 

289 """ 

290 Deletes the given keys in the cluster. 

291 The keys are first split up into slots 

292 and then an DEL command is sent for every slot 

293 

294 Non-existent keys are ignored. 

295 Returns the number of keys that were deleted. 

296 

297 For more information see https://redis.io/commands/del 

298 """ 

299 return self._split_command_across_slots("DEL", *keys) 

300 

301 def touch(self, *keys: KeyT) -> ResponseT: 

302 """ 

303 Updates the last access time of given keys across the 

304 cluster. 

305 

306 The keys are first split up into slots 

307 and then an TOUCH command is sent for every slot 

308 

309 Non-existent keys are ignored. 

310 Returns the number of keys that were touched. 

311 

312 For more information see https://redis.io/commands/touch 

313 """ 

314 return self._split_command_across_slots("TOUCH", *keys) 

315 

316 def unlink(self, *keys: KeyT) -> ResponseT: 

317 """ 

318 Remove the specified keys in a different thread. 

319 

320 The keys are first split up into slots 

321 and then an TOUCH command is sent for every slot 

322 

323 Non-existent keys are ignored. 

324 Returns the number of keys that were unlinked. 

325 

326 For more information see https://redis.io/commands/unlink 

327 """ 

328 return self._split_command_across_slots("UNLINK", *keys) 

329 

330 

331class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands): 

332 """ 

333 A class containing commands that handle more than one key 

334 """ 

335 

336 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

337 """ 

338 Splits the keys into different slots and then calls MGET 

339 for the keys of every slot. This operation will not be atomic 

340 if keys belong to more than one slot. 

341 

342 Returns a list of values ordered identically to ``keys`` 

343 

344 For more information see https://redis.io/commands/mget 

345 """ 

346 

347 # Concatenate all keys into a list 

348 keys = list_or_args(keys, args) 

349 

350 # Split keys into slots 

351 slots_to_keys = self._partition_keys_by_slot(keys) 

352 

353 # Execute commands using a pipeline 

354 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys) 

355 

356 # Reorder keys in the order the user provided & return 

357 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

358 

359 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

360 """ 

361 Sets key/values based on a mapping. Mapping is a dictionary of 

362 key/value pairs. Both keys and values should be strings or types that 

363 can be cast to a string via str(). 

364 

365 Splits the keys into different slots and then calls MSET 

366 for the keys of every slot. This operation will not be atomic 

367 if keys belong to more than one slot. 

368 

369 For more information see https://redis.io/commands/mset 

370 """ 

371 

372 # Partition the keys by slot 

373 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

374 

375 # Execute commands using a pipeline & return list of replies 

376 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

377 

378 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

379 """ 

380 Runs the given command once for the keys 

381 of each slot. Returns the sum of the return values. 

382 """ 

383 

384 # Partition the keys by slot 

385 slots_to_keys = self._partition_keys_by_slot(keys) 

386 

387 # Sum up the reply from each command 

388 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys)) 

389 

390 async def _execute_pipeline_by_slot( 

391 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

392 ) -> List[Any]: 

393 if self._initialize: 

394 await self.initialize() 

395 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

396 pipe = self.pipeline() 

397 [ 

398 pipe.execute_command( 

399 command, 

400 *slot_args, 

401 target_nodes=[ 

402 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

403 ], 

404 ) 

405 for slot, slot_args in slots_to_args.items() 

406 ] 

407 return await pipe.execute() 

408 

409 

410class ClusterManagementCommands(ManagementCommands): 

411 """ 

412 A class for Redis Cluster management commands 

413 

414 The class inherits from Redis's core ManagementCommands class and do the 

415 required adjustments to work with cluster mode 

416 """ 

417 

418 def slaveof(self, *args, **kwargs) -> NoReturn: 

419 """ 

420 Make the server a replica of another instance, or promote it as master. 

421 

422 For more information see https://redis.io/commands/slaveof 

423 """ 

424 raise RedisClusterException("SLAVEOF is not supported in cluster mode") 

425 

426 def replicaof(self, *args, **kwargs) -> NoReturn: 

427 """ 

428 Make the server a replica of another instance, or promote it as master. 

429 

430 For more information see https://redis.io/commands/replicaof 

431 """ 

432 raise RedisClusterException("REPLICAOF is not supported in cluster mode") 

433 

434 def swapdb(self, *args, **kwargs) -> NoReturn: 

435 """ 

436 Swaps two Redis databases. 

437 

438 For more information see https://redis.io/commands/swapdb 

439 """ 

440 raise RedisClusterException("SWAPDB is not supported in cluster mode") 

441 

442 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT: 

443 """ 

444 Returns the node's id. 

445 

446 :target_node: 'ClusterNode' 

447 The node to execute the command on 

448 

449 For more information check https://redis.io/commands/cluster-myid/ 

450 """ 

451 return self.execute_command("CLUSTER MYID", target_nodes=target_node) 

452 

453 def cluster_addslots( 

454 self, target_node: "TargetNodesT", *slots: EncodableT 

455 ) -> ResponseT: 

456 """ 

457 Assign new hash slots to receiving node. Sends to specified node. 

458 

459 :target_node: 'ClusterNode' 

460 The node to execute the command on 

461 

462 For more information see https://redis.io/commands/cluster-addslots 

463 """ 

464 return self.execute_command( 

465 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node 

466 ) 

467 

468 def cluster_addslotsrange( 

469 self, target_node: "TargetNodesT", *slots: EncodableT 

470 ) -> ResponseT: 

471 """ 

472 Similar to the CLUSTER ADDSLOTS command. 

473 The difference between the two commands is that ADDSLOTS takes a list of slots 

474 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges 

475 (specified by start and end slots) to assign to the node. 

476 

477 :target_node: 'ClusterNode' 

478 The node to execute the command on 

479 

480 For more information see https://redis.io/commands/cluster-addslotsrange 

481 """ 

482 return self.execute_command( 

483 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node 

484 ) 

485 

486 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT: 

487 """ 

488 Return the number of local keys in the specified hash slot 

489 Send to node based on specified slot_id 

490 

491 For more information see https://redis.io/commands/cluster-countkeysinslot 

492 """ 

493 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id) 

494 

495 def cluster_count_failure_report(self, node_id: str) -> ResponseT: 

496 """ 

497 Return the number of failure reports active for a given node 

498 Sends to a random node 

499 

500 For more information see https://redis.io/commands/cluster-count-failure-reports 

501 """ 

502 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id) 

503 

504 def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

505 """ 

506 Set hash slots as unbound in the cluster. 

507 It determines by it self what node the slot is in and sends it there 

508 

509 Returns a list of the results for each processed slot. 

510 

511 For more information see https://redis.io/commands/cluster-delslots 

512 """ 

513 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots] 

514 

515 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT: 

516 """ 

517 Similar to the CLUSTER DELSLOTS command. 

518 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove 

519 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove 

520 from the node. 

521 

522 For more information see https://redis.io/commands/cluster-delslotsrange 

523 """ 

524 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots) 

525 

526 def cluster_failover( 

527 self, target_node: "TargetNodesT", option: Optional[str] = None 

528 ) -> ResponseT: 

529 """ 

530 Forces a slave to perform a manual failover of its master 

531 Sends to specified node 

532 

533 :target_node: 'ClusterNode' 

534 The node to execute the command on 

535 

536 For more information see https://redis.io/commands/cluster-failover 

537 """ 

538 if option: 

539 if option.upper() not in ["FORCE", "TAKEOVER"]: 

540 raise RedisError( 

541 f"Invalid option for CLUSTER FAILOVER command: {option}" 

542 ) 

543 else: 

544 return self.execute_command( 

545 "CLUSTER FAILOVER", option, target_nodes=target_node 

546 ) 

547 else: 

548 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node) 

549 

550 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

551 """ 

552 Provides info about Redis Cluster node state. 

553 The command will be sent to a random node in the cluster if no target 

554 node is specified. 

555 

556 For more information see https://redis.io/commands/cluster-info 

557 """ 

558 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes) 

559 

560 def cluster_keyslot(self, key: str) -> ResponseT: 

561 """ 

562 Returns the hash slot of the specified key 

563 Sends to random node in the cluster 

564 

565 For more information see https://redis.io/commands/cluster-keyslot 

566 """ 

567 return self.execute_command("CLUSTER KEYSLOT", key) 

568 

569 def cluster_meet( 

570 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None 

571 ) -> ResponseT: 

572 """ 

573 Force a node cluster to handshake with another node. 

574 Sends to specified node. 

575 

576 For more information see https://redis.io/commands/cluster-meet 

577 """ 

578 return self.execute_command( 

579 "CLUSTER MEET", host, port, target_nodes=target_nodes 

580 ) 

581 

582 def cluster_nodes(self) -> ResponseT: 

583 """ 

584 Get Cluster config for the node. 

585 Sends to random node in the cluster 

586 

587 For more information see https://redis.io/commands/cluster-nodes 

588 """ 

589 return self.execute_command("CLUSTER NODES") 

590 

591 def cluster_replicate( 

592 self, target_nodes: "TargetNodesT", node_id: str 

593 ) -> ResponseT: 

594 """ 

595 Reconfigure a node as a slave of the specified master node 

596 

597 For more information see https://redis.io/commands/cluster-replicate 

598 """ 

599 return self.execute_command( 

600 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes 

601 ) 

602 

603 def cluster_reset( 

604 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None 

605 ) -> ResponseT: 

606 """ 

607 Reset a Redis Cluster node 

608 

609 If 'soft' is True then it will send 'SOFT' argument 

610 If 'soft' is False then it will send 'HARD' argument 

611 

612 For more information see https://redis.io/commands/cluster-reset 

613 """ 

614 return self.execute_command( 

615 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes 

616 ) 

617 

618 def cluster_save_config( 

619 self, target_nodes: Optional["TargetNodesT"] = None 

620 ) -> ResponseT: 

621 """ 

622 Forces the node to save cluster state on disk 

623 

624 For more information see https://redis.io/commands/cluster-saveconfig 

625 """ 

626 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes) 

627 

628 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT: 

629 """ 

630 Returns the number of keys in the specified cluster slot 

631 

632 For more information see https://redis.io/commands/cluster-getkeysinslot 

633 """ 

634 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) 

635 

636 def cluster_set_config_epoch( 

637 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None 

638 ) -> ResponseT: 

639 """ 

640 Set the configuration epoch in a new node 

641 

642 For more information see https://redis.io/commands/cluster-set-config-epoch 

643 """ 

644 return self.execute_command( 

645 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes 

646 ) 

647 

648 def cluster_setslot( 

649 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str 

650 ) -> ResponseT: 

651 """ 

652 Bind an hash slot to a specific node 

653 

654 :target_node: 'ClusterNode' 

655 The node to execute the command on 

656 

657 For more information see https://redis.io/commands/cluster-setslot 

658 """ 

659 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"): 

660 return self.execute_command( 

661 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node 

662 ) 

663 elif state.upper() == "STABLE": 

664 raise RedisError('For "stable" state please use cluster_setslot_stable') 

665 else: 

666 raise RedisError(f"Invalid slot state: {state}") 

667 

668 def cluster_setslot_stable(self, slot_id: int) -> ResponseT: 

669 """ 

670 Clears migrating / importing state from the slot. 

671 It determines by it self what node the slot is in and sends it there. 

672 

673 For more information see https://redis.io/commands/cluster-setslot 

674 """ 

675 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") 

676 

677 def cluster_replicas( 

678 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None 

679 ) -> ResponseT: 

680 """ 

681 Provides a list of replica nodes replicating from the specified primary 

682 target node. 

683 

684 For more information see https://redis.io/commands/cluster-replicas 

685 """ 

686 return self.execute_command( 

687 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes 

688 ) 

689 

690 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

691 """ 

692 Get array of Cluster slot to node mappings 

693 

694 For more information see https://redis.io/commands/cluster-slots 

695 """ 

696 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes) 

697 

698 def cluster_shards(self, target_nodes=None): 

699 """ 

700 Returns details about the shards of the cluster. 

701 

702 For more information see https://redis.io/commands/cluster-shards 

703 """ 

704 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes) 

705 

706 def cluster_myshardid(self, target_nodes=None): 

707 """ 

708 Returns the shard ID of the node. 

709 

710 For more information see https://redis.io/commands/cluster-myshardid/ 

711 """ 

712 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes) 

713 

714 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT: 

715 """ 

716 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each 

717 peer in the cluster: One for sending outbound messages towards the peer and one 

718 for receiving inbound messages from the peer. 

719 

720 This command outputs information of all such peer links as an array. 

721 

722 For more information see https://redis.io/commands/cluster-links 

723 """ 

724 return self.execute_command("CLUSTER LINKS", target_nodes=target_node) 

725 

726 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

727 raise NotImplementedError( 

728 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client." 

729 ) 

730 

731 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

732 raise NotImplementedError( 

733 "CLUSTER BUMPEPOCH is intentionally not implemented in the client." 

734 ) 

735 

736 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

737 """ 

738 Enables read queries. 

739 The command will be sent to the default cluster node if target_nodes is 

740 not specified. 

741 

742 For more information see https://redis.io/commands/readonly 

743 """ 

744 if target_nodes == "replicas" or target_nodes == "all": 

745 # read_from_replicas will only be enabled if the READONLY command 

746 # is sent to all replicas 

747 self.read_from_replicas = True 

748 return self.execute_command("READONLY", target_nodes=target_nodes) 

749 

750 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

751 """ 

752 Disables read queries. 

753 The command will be sent to the default cluster node if target_nodes is 

754 not specified. 

755 

756 For more information see https://redis.io/commands/readwrite 

757 """ 

758 # Reset read from replicas flag 

759 self.read_from_replicas = False 

760 return self.execute_command("READWRITE", target_nodes=target_nodes) 

761 

762 @deprecated_function( 

763 version="7.2.0", 

764 reason="Use client-side caching feature instead.", 

765 ) 

766 def client_tracking_on( 

767 self, 

768 clientid: Optional[int] = None, 

769 prefix: Sequence[KeyT] = [], 

770 bcast: bool = False, 

771 optin: bool = False, 

772 optout: bool = False, 

773 noloop: bool = False, 

774 target_nodes: Optional["TargetNodesT"] = "all", 

775 ) -> ResponseT: 

776 """ 

777 Enables the tracking feature of the Redis server, that is used 

778 for server assisted client side caching. 

779 

780 When clientid is provided - in target_nodes only the node that owns the 

781 connection with this id should be provided. 

782 When clientid is not provided - target_nodes can be any node. 

783 

784 For more information see https://redis.io/commands/client-tracking 

785 """ 

786 return self.client_tracking( 

787 True, 

788 clientid, 

789 prefix, 

790 bcast, 

791 optin, 

792 optout, 

793 noloop, 

794 target_nodes=target_nodes, 

795 ) 

796 

797 @deprecated_function( 

798 version="7.2.0", 

799 reason="Use client-side caching feature instead.", 

800 ) 

801 def client_tracking_off( 

802 self, 

803 clientid: Optional[int] = None, 

804 prefix: Sequence[KeyT] = [], 

805 bcast: bool = False, 

806 optin: bool = False, 

807 optout: bool = False, 

808 noloop: bool = False, 

809 target_nodes: Optional["TargetNodesT"] = "all", 

810 ) -> ResponseT: 

811 """ 

812 Disables the tracking feature of the Redis server, that is used 

813 for server assisted client side caching. 

814 

815 When clientid is provided - in target_nodes only the node that owns the 

816 connection with this id should be provided. 

817 When clientid is not provided - target_nodes can be any node. 

818 

819 For more information see https://redis.io/commands/client-tracking 

820 """ 

821 return self.client_tracking( 

822 False, 

823 clientid, 

824 prefix, 

825 bcast, 

826 optin, 

827 optout, 

828 noloop, 

829 target_nodes=target_nodes, 

830 ) 

831 

832 def hotkeys_start( 

833 self, 

834 metrics: List[HotkeysMetricsTypes], 

835 count: Optional[int] = None, 

836 duration: Optional[int] = None, 

837 sample_ratio: Optional[int] = None, 

838 slots: Optional[List[int]] = None, 

839 **kwargs, 

840 ) -> Union[str, bytes]: 

841 """ 

842 Cluster client does not support hotkeys command. Please use the non-cluster client. 

843 

844 For more information see https://redis.io/commands/hotkeys-start 

845 """ 

846 raise NotImplementedError( 

847 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client." 

848 ) 

849 

850 def hotkeys_stop(self, **kwargs) -> Union[str, bytes]: 

851 """ 

852 Cluster client does not support hotkeys command. Please use the non-cluster client. 

853 

854 For more information see https://redis.io/commands/hotkeys-stop 

855 """ 

856 raise NotImplementedError( 

857 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client." 

858 ) 

859 

860 def hotkeys_reset(self, **kwargs) -> Union[str, bytes]: 

861 """ 

862 Cluster client does not support hotkeys command. Please use the non-cluster client. 

863 

864 For more information see https://redis.io/commands/hotkeys-reset 

865 """ 

866 raise NotImplementedError( 

867 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client." 

868 ) 

869 

870 def hotkeys_get(self, **kwargs) -> list[dict[Union[str, bytes], Any]]: 

871 """ 

872 Cluster client does not support hotkeys command. Please use the non-cluster client. 

873 

874 For more information see https://redis.io/commands/hotkeys-get 

875 """ 

876 raise NotImplementedError( 

877 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client." 

878 ) 

879 

880 

881class AsyncClusterManagementCommands( 

882 ClusterManagementCommands, AsyncManagementCommands 

883): 

884 """ 

885 A class for Redis Cluster management commands 

886 

887 The class inherits from Redis's core ManagementCommands class and do the 

888 required adjustments to work with cluster mode 

889 """ 

890 

891 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

892 """ 

893 Set hash slots as unbound in the cluster. 

894 It determines by it self what node the slot is in and sends it there 

895 

896 Returns a list of the results for each processed slot. 

897 

898 For more information see https://redis.io/commands/cluster-delslots 

899 """ 

900 return await asyncio.gather( 

901 *( 

902 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot)) 

903 for slot in slots 

904 ) 

905 ) 

906 

907 @deprecated_function( 

908 version="7.2.0", 

909 reason="Use client-side caching feature instead.", 

910 ) 

911 async def client_tracking_on( 

912 self, 

913 clientid: Optional[int] = None, 

914 prefix: Sequence[KeyT] = [], 

915 bcast: bool = False, 

916 optin: bool = False, 

917 optout: bool = False, 

918 noloop: bool = False, 

919 target_nodes: Optional["TargetNodesT"] = "all", 

920 ) -> ResponseT: 

921 """ 

922 Enables the tracking feature of the Redis server, that is used 

923 for server assisted client side caching. 

924 

925 When clientid is provided - in target_nodes only the node that owns the 

926 connection with this id should be provided. 

927 When clientid is not provided - target_nodes can be any node. 

928 

929 For more information see https://redis.io/commands/client-tracking 

930 """ 

931 return await self.client_tracking( 

932 True, 

933 clientid, 

934 prefix, 

935 bcast, 

936 optin, 

937 optout, 

938 noloop, 

939 target_nodes=target_nodes, 

940 ) 

941 

942 @deprecated_function( 

943 version="7.2.0", 

944 reason="Use client-side caching feature instead.", 

945 ) 

946 async def client_tracking_off( 

947 self, 

948 clientid: Optional[int] = None, 

949 prefix: Sequence[KeyT] = [], 

950 bcast: bool = False, 

951 optin: bool = False, 

952 optout: bool = False, 

953 noloop: bool = False, 

954 target_nodes: Optional["TargetNodesT"] = "all", 

955 ) -> ResponseT: 

956 """ 

957 Disables the tracking feature of the Redis server, that is used 

958 for server assisted client side caching. 

959 

960 When clientid is provided - in target_nodes only the node that owns the 

961 connection with this id should be provided. 

962 When clientid is not provided - target_nodes can be any node. 

963 

964 For more information see https://redis.io/commands/client-tracking 

965 """ 

966 return await self.client_tracking( 

967 False, 

968 clientid, 

969 prefix, 

970 bcast, 

971 optin, 

972 optout, 

973 noloop, 

974 target_nodes=target_nodes, 

975 ) 

976 

977 async def hotkeys_start( 

978 self, 

979 metrics: List[HotkeysMetricsTypes], 

980 count: Optional[int] = None, 

981 duration: Optional[int] = None, 

982 sample_ratio: Optional[int] = None, 

983 slots: Optional[List[int]] = None, 

984 **kwargs, 

985 ) -> Awaitable[Union[str, bytes]]: 

986 """ 

987 Cluster client does not support hotkeys command. Please use the non-cluster client. 

988 

989 For more information see https://redis.io/commands/hotkeys-start 

990 """ 

991 raise NotImplementedError( 

992 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client." 

993 ) 

994 

995 async def hotkeys_stop(self, **kwargs) -> Awaitable[Union[str, bytes]]: 

996 """ 

997 Cluster client does not support hotkeys command. Please use the non-cluster client. 

998 

999 For more information see https://redis.io/commands/hotkeys-stop 

1000 """ 

1001 raise NotImplementedError( 

1002 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client." 

1003 ) 

1004 

1005 async def hotkeys_reset(self, **kwargs) -> Awaitable[Union[str, bytes]]: 

1006 """ 

1007 Cluster client does not support hotkeys command. Please use the non-cluster client. 

1008 

1009 For more information see https://redis.io/commands/hotkeys-reset 

1010 """ 

1011 raise NotImplementedError( 

1012 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client." 

1013 ) 

1014 

1015 async def hotkeys_get( 

1016 self, **kwargs 

1017 ) -> Awaitable[list[dict[Union[str, bytes], Any]]]: 

1018 """ 

1019 Cluster client does not support hotkeys command. Please use the non-cluster client. 

1020 

1021 For more information see https://redis.io/commands/hotkeys-get 

1022 """ 

1023 raise NotImplementedError( 

1024 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client." 

1025 ) 

1026 

1027 

1028class ClusterDataAccessCommands(DataAccessCommands): 

1029 """ 

1030 A class for Redis Cluster Data Access Commands 

1031 

1032 The class inherits from Redis's core DataAccessCommand class and do the 

1033 required adjustments to work with cluster mode 

1034 """ 

1035 

1036 def stralgo( 

1037 self, 

1038 algo: Literal["LCS"], 

1039 value1: KeyT, 

1040 value2: KeyT, 

1041 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", 

1042 len: bool = False, 

1043 idx: bool = False, 

1044 minmatchlen: Optional[int] = None, 

1045 withmatchlen: bool = False, 

1046 **kwargs, 

1047 ) -> ResponseT: 

1048 """ 

1049 Implements complex algorithms that operate on strings. 

1050 Right now the only algorithm implemented is the LCS algorithm 

1051 (longest common substring). However new algorithms could be 

1052 implemented in the future. 

1053 

1054 ``algo`` Right now must be LCS 

1055 ``value1`` and ``value2`` Can be two strings or two keys 

1056 ``specific_argument`` Specifying if the arguments to the algorithm 

1057 will be keys or strings. strings is the default. 

1058 ``len`` Returns just the len of the match. 

1059 ``idx`` Returns the match positions in each string. 

1060 ``minmatchlen`` Restrict the list of matches to the ones of a given 

1061 minimal length. Can be provided only when ``idx`` set to True. 

1062 ``withmatchlen`` Returns the matches with the len of the match. 

1063 Can be provided only when ``idx`` set to True. 

1064 

1065 For more information see https://redis.io/commands/stralgo 

1066 """ 

1067 target_nodes = kwargs.pop("target_nodes", None) 

1068 if specific_argument == "strings" and target_nodes is None: 

1069 target_nodes = "default-node" 

1070 kwargs.update({"target_nodes": target_nodes}) 

1071 return super().stralgo( 

1072 algo, 

1073 value1, 

1074 value2, 

1075 specific_argument, 

1076 len, 

1077 idx, 

1078 minmatchlen, 

1079 withmatchlen, 

1080 **kwargs, 

1081 ) 

1082 

1083 def scan_iter( 

1084 self, 

1085 match: Optional[PatternT] = None, 

1086 count: Optional[int] = None, 

1087 _type: Optional[str] = None, 

1088 **kwargs, 

1089 ) -> Iterator: 

1090 # Do the first query with cursor=0 for all nodes 

1091 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) 

1092 yield from data 

1093 

1094 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

1095 if cursors: 

1096 # Get nodes by name 

1097 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

1098 

1099 # Iterate over each node till its cursor is 0 

1100 kwargs.pop("target_nodes", None) 

1101 while cursors: 

1102 for name, cursor in cursors.items(): 

1103 cur, data = self.scan( 

1104 cursor=cursor, 

1105 match=match, 

1106 count=count, 

1107 _type=_type, 

1108 target_nodes=nodes[name], 

1109 **kwargs, 

1110 ) 

1111 yield from data 

1112 cursors[name] = cur[name] 

1113 

1114 cursors = { 

1115 name: cursor for name, cursor in cursors.items() if cursor != 0 

1116 } 

1117 

1118 

1119class AsyncClusterDataAccessCommands( 

1120 ClusterDataAccessCommands, AsyncDataAccessCommands 

1121): 

1122 """ 

1123 A class for Redis Cluster Data Access Commands 

1124 

1125 The class inherits from Redis's core DataAccessCommand class and do the 

1126 required adjustments to work with cluster mode 

1127 """ 

1128 

1129 async def scan_iter( 

1130 self, 

1131 match: Optional[PatternT] = None, 

1132 count: Optional[int] = None, 

1133 _type: Optional[str] = None, 

1134 **kwargs, 

1135 ) -> AsyncIterator: 

1136 # Do the first query with cursor=0 for all nodes 

1137 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs) 

1138 for value in data: 

1139 yield value 

1140 

1141 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

1142 if cursors: 

1143 # Get nodes by name 

1144 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

1145 

1146 # Iterate over each node till its cursor is 0 

1147 kwargs.pop("target_nodes", None) 

1148 while cursors: 

1149 for name, cursor in cursors.items(): 

1150 cur, data = await self.scan( 

1151 cursor=cursor, 

1152 match=match, 

1153 count=count, 

1154 _type=_type, 

1155 target_nodes=nodes[name], 

1156 **kwargs, 

1157 ) 

1158 for value in data: 

1159 yield value 

1160 cursors[name] = cur[name] 

1161 

1162 cursors = { 

1163 name: cursor for name, cursor in cursors.items() if cursor != 0 

1164 } 

1165 

1166 

1167class RedisClusterCommands( 

1168 ClusterMultiKeyCommands, 

1169 ClusterManagementCommands, 

1170 ACLCommands, 

1171 PubSubCommands, 

1172 ClusterDataAccessCommands, 

1173 ScriptCommands, 

1174 FunctionCommands, 

1175 ModuleCommands, 

1176 RedisModuleCommands, 

1177): 

1178 """ 

1179 A class for all Redis Cluster commands 

1180 

1181 For key-based commands, the target node(s) will be internally determined 

1182 by the keys' hash slot. 

1183 Non-key-based commands can be executed with the 'target_nodes' argument to 

1184 target specific nodes. By default, if target_nodes is not specified, the 

1185 command will be executed on the default cluster node. 

1186 

1187 :param :target_nodes: type can be one of the followings: 

1188 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

1189 - 'ClusterNode' 

1190 - 'list(ClusterNodes)' 

1191 - 'dict(any:clusterNodes)' 

1192 

1193 for example: 

1194 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

1195 """ 

1196 

1197 

1198class AsyncRedisClusterCommands( 

1199 AsyncClusterMultiKeyCommands, 

1200 AsyncClusterManagementCommands, 

1201 AsyncACLCommands, 

1202 AsyncClusterDataAccessCommands, 

1203 AsyncScriptCommands, 

1204 AsyncFunctionCommands, 

1205 AsyncModuleCommands, 

1206 AsyncRedisModuleCommands, 

1207): 

1208 """ 

1209 A class for all Redis Cluster commands 

1210 

1211 For key-based commands, the target node(s) will be internally determined 

1212 by the keys' hash slot. 

1213 Non-key-based commands can be executed with the 'target_nodes' argument to 

1214 target specific nodes. By default, if target_nodes is not specified, the 

1215 command will be executed on the default cluster node. 

1216 

1217 :param :target_nodes: type can be one of the followings: 

1218 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

1219 - 'ClusterNode' 

1220 - 'list(ClusterNodes)' 

1221 - 'dict(any:clusterNodes)' 

1222 

1223 for example: 

1224 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

1225 """