Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/commands/cluster.py: 37%

186 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:16 +0000

1import asyncio 

2from typing import ( 

3 TYPE_CHECKING, 

4 Any, 

5 AsyncIterator, 

6 Dict, 

7 Iterable, 

8 Iterator, 

9 List, 

10 Mapping, 

11 NoReturn, 

12 Optional, 

13 Union, 

14) 

15 

16from redis.compat import Literal 

17from redis.crc import key_slot 

18from redis.exceptions import RedisClusterException, RedisError 

19from redis.typing import ( 

20 AnyKeyT, 

21 ClusterCommandsProtocol, 

22 EncodableT, 

23 KeysT, 

24 KeyT, 

25 PatternT, 

26) 

27 

28from .core import ( 

29 ACLCommands, 

30 AsyncACLCommands, 

31 AsyncDataAccessCommands, 

32 AsyncFunctionCommands, 

33 AsyncManagementCommands, 

34 AsyncScriptCommands, 

35 DataAccessCommands, 

36 FunctionCommands, 

37 ManagementCommands, 

38 PubSubCommands, 

39 ResponseT, 

40 ScriptCommands, 

41) 

42from .helpers import list_or_args 

43from .redismodules import RedisModuleCommands 

44 

45if TYPE_CHECKING: 

46 from redis.asyncio.cluster import TargetNodesT 

47 

48# Not complete, but covers the major ones 

49# https://redis.io/commands 

50READ_COMMANDS = frozenset( 

51 [ 

52 "BITCOUNT", 

53 "BITPOS", 

54 "EVAL_RO", 

55 "EVALSHA_RO", 

56 "EXISTS", 

57 "GEODIST", 

58 "GEOHASH", 

59 "GEOPOS", 

60 "GEORADIUS", 

61 "GEORADIUSBYMEMBER", 

62 "GET", 

63 "GETBIT", 

64 "GETRANGE", 

65 "HEXISTS", 

66 "HGET", 

67 "HGETALL", 

68 "HKEYS", 

69 "HLEN", 

70 "HMGET", 

71 "HSTRLEN", 

72 "HVALS", 

73 "KEYS", 

74 "LINDEX", 

75 "LLEN", 

76 "LRANGE", 

77 "MGET", 

78 "PTTL", 

79 "RANDOMKEY", 

80 "SCARD", 

81 "SDIFF", 

82 "SINTER", 

83 "SISMEMBER", 

84 "SMEMBERS", 

85 "SRANDMEMBER", 

86 "STRLEN", 

87 "SUNION", 

88 "TTL", 

89 "ZCARD", 

90 "ZCOUNT", 

91 "ZRANGE", 

92 "ZSCORE", 

93 ] 

94) 

95 

96 

97class ClusterMultiKeyCommands(ClusterCommandsProtocol): 

98 """ 

99 A class containing commands that handle more than one key 

100 """ 

101 

102 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]: 

103 """Split keys into a dictionary that maps a slot to a list of keys.""" 

104 

105 slots_to_keys = {} 

106 for key in keys: 

107 slot = key_slot(self.encoder.encode(key)) 

108 slots_to_keys.setdefault(slot, []).append(key) 

109 

110 return slots_to_keys 

111 

112 def _partition_pairs_by_slot( 

113 self, mapping: Mapping[AnyKeyT, EncodableT] 

114 ) -> Dict[int, List[EncodableT]]: 

115 """Split pairs into a dictionary that maps a slot to a list of pairs.""" 

116 

117 slots_to_pairs = {} 

118 for pair in mapping.items(): 

119 slot = key_slot(self.encoder.encode(pair[0])) 

120 slots_to_pairs.setdefault(slot, []).extend(pair) 

121 

122 return slots_to_pairs 

123 

124 def _execute_pipeline_by_slot( 

125 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

126 ) -> List[Any]: 

127 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

128 pipe = self.pipeline() 

129 [ 

130 pipe.execute_command( 

131 command, 

132 *slot_args, 

133 target_nodes=[ 

134 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

135 ], 

136 ) 

137 for slot, slot_args in slots_to_args.items() 

138 ] 

139 return pipe.execute() 

140 

141 def _reorder_keys_by_command( 

142 self, 

143 keys: Iterable[KeyT], 

144 slots_to_args: Mapping[int, Iterable[EncodableT]], 

145 responses: Iterable[Any], 

146 ) -> List[Any]: 

147 results = { 

148 k: v 

149 for slot_values, response in zip(slots_to_args.values(), responses) 

150 for k, v in zip(slot_values, response) 

151 } 

152 return [results[key] for key in keys] 

153 

154 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

155 """ 

156 Splits the keys into different slots and then calls MGET 

157 for the keys of every slot. This operation will not be atomic 

158 if keys belong to more than one slot. 

159 

160 Returns a list of values ordered identically to ``keys`` 

161 

162 For more information see https://redis.io/commands/mget 

163 """ 

164 

165 # Concatenate all keys into a list 

166 keys = list_or_args(keys, args) 

167 

168 # Split keys into slots 

169 slots_to_keys = self._partition_keys_by_slot(keys) 

170 

171 # Execute commands using a pipeline 

172 res = self._execute_pipeline_by_slot("MGET", slots_to_keys) 

173 

174 # Reorder keys in the order the user provided & return 

175 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

176 

177 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

178 """ 

179 Sets key/values based on a mapping. Mapping is a dictionary of 

180 key/value pairs. Both keys and values should be strings or types that 

181 can be cast to a string via str(). 

182 

183 Splits the keys into different slots and then calls MSET 

184 for the keys of every slot. This operation will not be atomic 

185 if keys belong to more than one slot. 

186 

187 For more information see https://redis.io/commands/mset 

188 """ 

189 

190 # Partition the keys by slot 

191 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

192 

193 # Execute commands using a pipeline & return list of replies 

194 return self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

195 

196 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

197 """ 

198 Runs the given command once for the keys 

199 of each slot. Returns the sum of the return values. 

200 """ 

201 

202 # Partition the keys by slot 

203 slots_to_keys = self._partition_keys_by_slot(keys) 

204 

205 # Sum up the reply from each command 

206 return sum(self._execute_pipeline_by_slot(command, slots_to_keys)) 

207 

208 def exists(self, *keys: KeyT) -> ResponseT: 

209 """ 

210 Returns the number of ``names`` that exist in the 

211 whole cluster. The keys are first split up into slots 

212 and then an EXISTS command is sent for every slot 

213 

214 For more information see https://redis.io/commands/exists 

215 """ 

216 return self._split_command_across_slots("EXISTS", *keys) 

217 

218 def delete(self, *keys: KeyT) -> ResponseT: 

219 """ 

220 Deletes the given keys in the cluster. 

221 The keys are first split up into slots 

222 and then an DEL command is sent for every slot 

223 

224 Non-existant keys are ignored. 

225 Returns the number of keys that were deleted. 

226 

227 For more information see https://redis.io/commands/del 

228 """ 

229 return self._split_command_across_slots("DEL", *keys) 

230 

231 def touch(self, *keys: KeyT) -> ResponseT: 

232 """ 

233 Updates the last access time of given keys across the 

234 cluster. 

235 

236 The keys are first split up into slots 

237 and then an TOUCH command is sent for every slot 

238 

239 Non-existant keys are ignored. 

240 Returns the number of keys that were touched. 

241 

242 For more information see https://redis.io/commands/touch 

243 """ 

244 return self._split_command_across_slots("TOUCH", *keys) 

245 

246 def unlink(self, *keys: KeyT) -> ResponseT: 

247 """ 

248 Remove the specified keys in a different thread. 

249 

250 The keys are first split up into slots 

251 and then an TOUCH command is sent for every slot 

252 

253 Non-existant keys are ignored. 

254 Returns the number of keys that were unlinked. 

255 

256 For more information see https://redis.io/commands/unlink 

257 """ 

258 return self._split_command_across_slots("UNLINK", *keys) 

259 

260 

261class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands): 

262 """ 

263 A class containing commands that handle more than one key 

264 """ 

265 

266 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 

267 """ 

268 Splits the keys into different slots and then calls MGET 

269 for the keys of every slot. This operation will not be atomic 

270 if keys belong to more than one slot. 

271 

272 Returns a list of values ordered identically to ``keys`` 

273 

274 For more information see https://redis.io/commands/mget 

275 """ 

276 

277 # Concatenate all keys into a list 

278 keys = list_or_args(keys, args) 

279 

280 # Split keys into slots 

281 slots_to_keys = self._partition_keys_by_slot(keys) 

282 

283 # Execute commands using a pipeline 

284 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys) 

285 

286 # Reorder keys in the order the user provided & return 

287 return self._reorder_keys_by_command(keys, slots_to_keys, res) 

288 

289 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 

290 """ 

291 Sets key/values based on a mapping. Mapping is a dictionary of 

292 key/value pairs. Both keys and values should be strings or types that 

293 can be cast to a string via str(). 

294 

295 Splits the keys into different slots and then calls MSET 

296 for the keys of every slot. This operation will not be atomic 

297 if keys belong to more than one slot. 

298 

299 For more information see https://redis.io/commands/mset 

300 """ 

301 

302 # Partition the keys by slot 

303 slots_to_pairs = self._partition_pairs_by_slot(mapping) 

304 

305 # Execute commands using a pipeline & return list of replies 

306 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs) 

307 

308 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 

309 """ 

310 Runs the given command once for the keys 

311 of each slot. Returns the sum of the return values. 

312 """ 

313 

314 # Partition the keys by slot 

315 slots_to_keys = self._partition_keys_by_slot(keys) 

316 

317 # Sum up the reply from each command 

318 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys)) 

319 

320 async def _execute_pipeline_by_slot( 

321 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 

322 ) -> List[Any]: 

323 if self._initialize: 

324 await self.initialize() 

325 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 

326 pipe = self.pipeline() 

327 [ 

328 pipe.execute_command( 

329 command, 

330 *slot_args, 

331 target_nodes=[ 

332 self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 

333 ], 

334 ) 

335 for slot, slot_args in slots_to_args.items() 

336 ] 

337 return await pipe.execute() 

338 

339 

340class ClusterManagementCommands(ManagementCommands): 

341 """ 

342 A class for Redis Cluster management commands 

343 

344 The class inherits from Redis's core ManagementCommands class and do the 

345 required adjustments to work with cluster mode 

346 """ 

347 

348 def slaveof(self, *args, **kwargs) -> NoReturn: 

349 """ 

350 Make the server a replica of another instance, or promote it as master. 

351 

352 For more information see https://redis.io/commands/slaveof 

353 """ 

354 raise RedisClusterException("SLAVEOF is not supported in cluster mode") 

355 

356 def replicaof(self, *args, **kwargs) -> NoReturn: 

357 """ 

358 Make the server a replica of another instance, or promote it as master. 

359 

360 For more information see https://redis.io/commands/replicaof 

361 """ 

362 raise RedisClusterException("REPLICAOF is not supported in cluster mode") 

363 

364 def swapdb(self, *args, **kwargs) -> NoReturn: 

365 """ 

366 Swaps two Redis databases. 

367 

368 For more information see https://redis.io/commands/swapdb 

369 """ 

370 raise RedisClusterException("SWAPDB is not supported in cluster mode") 

371 

372 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT: 

373 """ 

374 Returns the node's id. 

375 

376 :target_node: 'ClusterNode' 

377 The node to execute the command on 

378 

379 For more information check https://redis.io/commands/cluster-myid/ 

380 """ 

381 return self.execute_command("CLUSTER MYID", target_nodes=target_node) 

382 

383 def cluster_addslots( 

384 self, target_node: "TargetNodesT", *slots: EncodableT 

385 ) -> ResponseT: 

386 """ 

387 Assign new hash slots to receiving node. Sends to specified node. 

388 

389 :target_node: 'ClusterNode' 

390 The node to execute the command on 

391 

392 For more information see https://redis.io/commands/cluster-addslots 

393 """ 

394 return self.execute_command( 

395 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node 

396 ) 

397 

398 def cluster_addslotsrange( 

399 self, target_node: "TargetNodesT", *slots: EncodableT 

400 ) -> ResponseT: 

401 """ 

402 Similar to the CLUSTER ADDSLOTS command. 

403 The difference between the two commands is that ADDSLOTS takes a list of slots 

404 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges 

405 (specified by start and end slots) to assign to the node. 

406 

407 :target_node: 'ClusterNode' 

408 The node to execute the command on 

409 

410 For more information see https://redis.io/commands/cluster-addslotsrange 

411 """ 

412 return self.execute_command( 

413 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node 

414 ) 

415 

416 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT: 

417 """ 

418 Return the number of local keys in the specified hash slot 

419 Send to node based on specified slot_id 

420 

421 For more information see https://redis.io/commands/cluster-countkeysinslot 

422 """ 

423 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id) 

424 

425 def cluster_count_failure_report(self, node_id: str) -> ResponseT: 

426 """ 

427 Return the number of failure reports active for a given node 

428 Sends to a random node 

429 

430 For more information see https://redis.io/commands/cluster-count-failure-reports 

431 """ 

432 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id) 

433 

434 def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

435 """ 

436 Set hash slots as unbound in the cluster. 

437 It determines by it self what node the slot is in and sends it there 

438 

439 Returns a list of the results for each processed slot. 

440 

441 For more information see https://redis.io/commands/cluster-delslots 

442 """ 

443 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots] 

444 

445 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT: 

446 """ 

447 Similar to the CLUSTER DELSLOTS command. 

448 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove 

449 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove 

450 from the node. 

451 

452 For more information see https://redis.io/commands/cluster-delslotsrange 

453 """ 

454 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots) 

455 

456 def cluster_failover( 

457 self, target_node: "TargetNodesT", option: Optional[str] = None 

458 ) -> ResponseT: 

459 """ 

460 Forces a slave to perform a manual failover of its master 

461 Sends to specified node 

462 

463 :target_node: 'ClusterNode' 

464 The node to execute the command on 

465 

466 For more information see https://redis.io/commands/cluster-failover 

467 """ 

468 if option: 

469 if option.upper() not in ["FORCE", "TAKEOVER"]: 

470 raise RedisError( 

471 f"Invalid option for CLUSTER FAILOVER command: {option}" 

472 ) 

473 else: 

474 return self.execute_command( 

475 "CLUSTER FAILOVER", option, target_nodes=target_node 

476 ) 

477 else: 

478 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node) 

479 

480 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

481 """ 

482 Provides info about Redis Cluster node state. 

483 The command will be sent to a random node in the cluster if no target 

484 node is specified. 

485 

486 For more information see https://redis.io/commands/cluster-info 

487 """ 

488 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes) 

489 

490 def cluster_keyslot(self, key: str) -> ResponseT: 

491 """ 

492 Returns the hash slot of the specified key 

493 Sends to random node in the cluster 

494 

495 For more information see https://redis.io/commands/cluster-keyslot 

496 """ 

497 return self.execute_command("CLUSTER KEYSLOT", key) 

498 

499 def cluster_meet( 

500 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None 

501 ) -> ResponseT: 

502 """ 

503 Force a node cluster to handshake with another node. 

504 Sends to specified node. 

505 

506 For more information see https://redis.io/commands/cluster-meet 

507 """ 

508 return self.execute_command( 

509 "CLUSTER MEET", host, port, target_nodes=target_nodes 

510 ) 

511 

512 def cluster_nodes(self) -> ResponseT: 

513 """ 

514 Get Cluster config for the node. 

515 Sends to random node in the cluster 

516 

517 For more information see https://redis.io/commands/cluster-nodes 

518 """ 

519 return self.execute_command("CLUSTER NODES") 

520 

521 def cluster_replicate( 

522 self, target_nodes: "TargetNodesT", node_id: str 

523 ) -> ResponseT: 

524 """ 

525 Reconfigure a node as a slave of the specified master node 

526 

527 For more information see https://redis.io/commands/cluster-replicate 

528 """ 

529 return self.execute_command( 

530 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes 

531 ) 

532 

533 def cluster_reset( 

534 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None 

535 ) -> ResponseT: 

536 """ 

537 Reset a Redis Cluster node 

538 

539 If 'soft' is True then it will send 'SOFT' argument 

540 If 'soft' is False then it will send 'HARD' argument 

541 

542 For more information see https://redis.io/commands/cluster-reset 

543 """ 

544 return self.execute_command( 

545 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes 

546 ) 

547 

548 def cluster_save_config( 

549 self, target_nodes: Optional["TargetNodesT"] = None 

550 ) -> ResponseT: 

551 """ 

552 Forces the node to save cluster state on disk 

553 

554 For more information see https://redis.io/commands/cluster-saveconfig 

555 """ 

556 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes) 

557 

558 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT: 

559 """ 

560 Returns the number of keys in the specified cluster slot 

561 

562 For more information see https://redis.io/commands/cluster-getkeysinslot 

563 """ 

564 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) 

565 

566 def cluster_set_config_epoch( 

567 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None 

568 ) -> ResponseT: 

569 """ 

570 Set the configuration epoch in a new node 

571 

572 For more information see https://redis.io/commands/cluster-set-config-epoch 

573 """ 

574 return self.execute_command( 

575 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes 

576 ) 

577 

578 def cluster_setslot( 

579 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str 

580 ) -> ResponseT: 

581 """ 

582 Bind an hash slot to a specific node 

583 

584 :target_node: 'ClusterNode' 

585 The node to execute the command on 

586 

587 For more information see https://redis.io/commands/cluster-setslot 

588 """ 

589 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"): 

590 return self.execute_command( 

591 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node 

592 ) 

593 elif state.upper() == "STABLE": 

594 raise RedisError('For "stable" state please use ' "cluster_setslot_stable") 

595 else: 

596 raise RedisError(f"Invalid slot state: {state}") 

597 

598 def cluster_setslot_stable(self, slot_id: int) -> ResponseT: 

599 """ 

600 Clears migrating / importing state from the slot. 

601 It determines by it self what node the slot is in and sends it there. 

602 

603 For more information see https://redis.io/commands/cluster-setslot 

604 """ 

605 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") 

606 

607 def cluster_replicas( 

608 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None 

609 ) -> ResponseT: 

610 """ 

611 Provides a list of replica nodes replicating from the specified primary 

612 target node. 

613 

614 For more information see https://redis.io/commands/cluster-replicas 

615 """ 

616 return self.execute_command( 

617 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes 

618 ) 

619 

620 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

621 """ 

622 Get array of Cluster slot to node mappings 

623 

624 For more information see https://redis.io/commands/cluster-slots 

625 """ 

626 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes) 

627 

628 def cluster_shards(self, target_nodes=None): 

629 """ 

630 Returns details about the shards of the cluster. 

631 

632 For more information see https://redis.io/commands/cluster-shards 

633 """ 

634 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes) 

635 

636 def cluster_myshardid(self, target_nodes=None): 

637 """ 

638 Returns the shard ID of the node. 

639 

640 For more information see https://redis.io/commands/cluster-myshardid/ 

641 """ 

642 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes) 

643 

644 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT: 

645 """ 

646 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each 

647 peer in the cluster: One for sending outbound messages towards the peer and one 

648 for receiving inbound messages from the peer. 

649 

650 This command outputs information of all such peer links as an array. 

651 

652 For more information see https://redis.io/commands/cluster-links 

653 """ 

654 return self.execute_command("CLUSTER LINKS", target_nodes=target_node) 

655 

656 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

657 raise NotImplementedError( 

658 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client." 

659 ) 

660 

661 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 

662 raise NotImplementedError( 

663 "CLUSTER BUMPEPOCH is intentionally not implemented in the client." 

664 ) 

665 

666 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

667 """ 

668 Enables read queries. 

669 The command will be sent to the default cluster node if target_nodes is 

670 not specified. 

671 

672 For more information see https://redis.io/commands/readonly 

673 """ 

674 if target_nodes == "replicas" or target_nodes == "all": 

675 # read_from_replicas will only be enabled if the READONLY command 

676 # is sent to all replicas 

677 self.read_from_replicas = True 

678 return self.execute_command("READONLY", target_nodes=target_nodes) 

679 

680 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 

681 """ 

682 Disables read queries. 

683 The command will be sent to the default cluster node if target_nodes is 

684 not specified. 

685 

686 For more information see https://redis.io/commands/readwrite 

687 """ 

688 # Reset read from replicas flag 

689 self.read_from_replicas = False 

690 return self.execute_command("READWRITE", target_nodes=target_nodes) 

691 

692 

693class AsyncClusterManagementCommands( 

694 ClusterManagementCommands, AsyncManagementCommands 

695): 

696 """ 

697 A class for Redis Cluster management commands 

698 

699 The class inherits from Redis's core ManagementCommands class and do the 

700 required adjustments to work with cluster mode 

701 """ 

702 

703 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 

704 """ 

705 Set hash slots as unbound in the cluster. 

706 It determines by it self what node the slot is in and sends it there 

707 

708 Returns a list of the results for each processed slot. 

709 

710 For more information see https://redis.io/commands/cluster-delslots 

711 """ 

712 return await asyncio.gather( 

713 *( 

714 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot)) 

715 for slot in slots 

716 ) 

717 ) 

718 

719 

720class ClusterDataAccessCommands(DataAccessCommands): 

721 """ 

722 A class for Redis Cluster Data Access Commands 

723 

724 The class inherits from Redis's core DataAccessCommand class and do the 

725 required adjustments to work with cluster mode 

726 """ 

727 

728 def stralgo( 

729 self, 

730 algo: Literal["LCS"], 

731 value1: KeyT, 

732 value2: KeyT, 

733 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", 

734 len: bool = False, 

735 idx: bool = False, 

736 minmatchlen: Optional[int] = None, 

737 withmatchlen: bool = False, 

738 **kwargs, 

739 ) -> ResponseT: 

740 """ 

741 Implements complex algorithms that operate on strings. 

742 Right now the only algorithm implemented is the LCS algorithm 

743 (longest common substring). However new algorithms could be 

744 implemented in the future. 

745 

746 ``algo`` Right now must be LCS 

747 ``value1`` and ``value2`` Can be two strings or two keys 

748 ``specific_argument`` Specifying if the arguments to the algorithm 

749 will be keys or strings. strings is the default. 

750 ``len`` Returns just the len of the match. 

751 ``idx`` Returns the match positions in each string. 

752 ``minmatchlen`` Restrict the list of matches to the ones of a given 

753 minimal length. Can be provided only when ``idx`` set to True. 

754 ``withmatchlen`` Returns the matches with the len of the match. 

755 Can be provided only when ``idx`` set to True. 

756 

757 For more information see https://redis.io/commands/stralgo 

758 """ 

759 target_nodes = kwargs.pop("target_nodes", None) 

760 if specific_argument == "strings" and target_nodes is None: 

761 target_nodes = "default-node" 

762 kwargs.update({"target_nodes": target_nodes}) 

763 return super().stralgo( 

764 algo, 

765 value1, 

766 value2, 

767 specific_argument, 

768 len, 

769 idx, 

770 minmatchlen, 

771 withmatchlen, 

772 **kwargs, 

773 ) 

774 

775 def scan_iter( 

776 self, 

777 match: Optional[PatternT] = None, 

778 count: Optional[int] = None, 

779 _type: Optional[str] = None, 

780 **kwargs, 

781 ) -> Iterator: 

782 # Do the first query with cursor=0 for all nodes 

783 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) 

784 yield from data 

785 

786 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

787 if cursors: 

788 # Get nodes by name 

789 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

790 

791 # Iterate over each node till its cursor is 0 

792 kwargs.pop("target_nodes", None) 

793 while cursors: 

794 for name, cursor in cursors.items(): 

795 cur, data = self.scan( 

796 cursor=cursor, 

797 match=match, 

798 count=count, 

799 _type=_type, 

800 target_nodes=nodes[name], 

801 **kwargs, 

802 ) 

803 yield from data 

804 cursors[name] = cur[name] 

805 

806 cursors = { 

807 name: cursor for name, cursor in cursors.items() if cursor != 0 

808 } 

809 

810 

811class AsyncClusterDataAccessCommands( 

812 ClusterDataAccessCommands, AsyncDataAccessCommands 

813): 

814 """ 

815 A class for Redis Cluster Data Access Commands 

816 

817 The class inherits from Redis's core DataAccessCommand class and do the 

818 required adjustments to work with cluster mode 

819 """ 

820 

821 async def scan_iter( 

822 self, 

823 match: Optional[PatternT] = None, 

824 count: Optional[int] = None, 

825 _type: Optional[str] = None, 

826 **kwargs, 

827 ) -> AsyncIterator: 

828 # Do the first query with cursor=0 for all nodes 

829 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs) 

830 for value in data: 

831 yield value 

832 

833 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 

834 if cursors: 

835 # Get nodes by name 

836 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 

837 

838 # Iterate over each node till its cursor is 0 

839 kwargs.pop("target_nodes", None) 

840 while cursors: 

841 for name, cursor in cursors.items(): 

842 cur, data = await self.scan( 

843 cursor=cursor, 

844 match=match, 

845 count=count, 

846 _type=_type, 

847 target_nodes=nodes[name], 

848 **kwargs, 

849 ) 

850 for value in data: 

851 yield value 

852 cursors[name] = cur[name] 

853 

854 cursors = { 

855 name: cursor for name, cursor in cursors.items() if cursor != 0 

856 } 

857 

858 

859class RedisClusterCommands( 

860 ClusterMultiKeyCommands, 

861 ClusterManagementCommands, 

862 ACLCommands, 

863 PubSubCommands, 

864 ClusterDataAccessCommands, 

865 ScriptCommands, 

866 FunctionCommands, 

867 RedisModuleCommands, 

868): 

869 """ 

870 A class for all Redis Cluster commands 

871 

872 For key-based commands, the target node(s) will be internally determined 

873 by the keys' hash slot. 

874 Non-key-based commands can be executed with the 'target_nodes' argument to 

875 target specific nodes. By default, if target_nodes is not specified, the 

876 command will be executed on the default cluster node. 

877 

878 :param :target_nodes: type can be one of the followings: 

879 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

880 - 'ClusterNode' 

881 - 'list(ClusterNodes)' 

882 - 'dict(any:clusterNodes)' 

883 

884 for example: 

885 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

886 """ 

887 

888 

889class AsyncRedisClusterCommands( 

890 AsyncClusterMultiKeyCommands, 

891 AsyncClusterManagementCommands, 

892 AsyncACLCommands, 

893 AsyncClusterDataAccessCommands, 

894 AsyncScriptCommands, 

895 AsyncFunctionCommands, 

896): 

897 """ 

898 A class for all Redis Cluster commands 

899 

900 For key-based commands, the target node(s) will be internally determined 

901 by the keys' hash slot. 

902 Non-key-based commands can be executed with the 'target_nodes' argument to 

903 target specific nodes. By default, if target_nodes is not specified, the 

904 command will be executed on the default cluster node. 

905 

906 :param :target_nodes: type can be one of the followings: 

907 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 

908 - 'ClusterNode' 

909 - 'list(ClusterNodes)' 

910 - 'dict(any:clusterNodes)' 

911 

912 for example: 

913 r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 

914 """