1import asyncio
2from typing import (
3 TYPE_CHECKING,
4 Any,
5 AsyncIterator,
6 Dict,
7 Iterable,
8 Iterator,
9 List,
10 Literal,
11 Mapping,
12 NoReturn,
13 Optional,
14 Union,
15)
16
17from redis.crc import key_slot
18from redis.exceptions import RedisClusterException, RedisError
19from redis.typing import (
20 AnyKeyT,
21 ClusterCommandsProtocol,
22 EncodableT,
23 KeysT,
24 KeyT,
25 PatternT,
26 ResponseT,
27)
28
29from .core import (
30 ACLCommands,
31 AsyncACLCommands,
32 AsyncDataAccessCommands,
33 AsyncFunctionCommands,
34 AsyncManagementCommands,
35 AsyncModuleCommands,
36 AsyncScriptCommands,
37 DataAccessCommands,
38 FunctionCommands,
39 ManagementCommands,
40 ModuleCommands,
41 PubSubCommands,
42 ScriptCommands,
43)
44from .helpers import list_or_args
45from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
46
47if TYPE_CHECKING:
48 from redis.asyncio.cluster import TargetNodesT
49
50# Not complete, but covers the major ones
51# https://redis.io/commands
52READ_COMMANDS = frozenset(
53 [
54 # Bit Operations
55 "BITCOUNT",
56 "BITFIELD_RO",
57 "BITPOS",
58 # Scripting
59 "EVAL_RO",
60 "EVALSHA_RO",
61 "FCALL_RO",
62 # Key Operations
63 "DBSIZE",
64 "DIGEST",
65 "DUMP",
66 "EXISTS",
67 "EXPIRETIME",
68 "PEXPIRETIME",
69 "KEYS",
70 "SCAN",
71 "PTTL",
72 "RANDOMKEY",
73 "TTL",
74 "TYPE",
75 # String Operations
76 "GET",
77 "GETBIT",
78 "GETRANGE",
79 "MGET",
80 "STRLEN",
81 "LCS",
82 # Geo Operations
83 "GEODIST",
84 "GEOHASH",
85 "GEOPOS",
86 "GEOSEARCH",
87 # Hash Operations
88 "HEXISTS",
89 "HGET",
90 "HGETALL",
91 "HKEYS",
92 "HLEN",
93 "HMGET",
94 "HSTRLEN",
95 "HVALS",
96 "HRANDFIELD",
97 "HEXPIRETIME",
98 "HPEXPIRETIME",
99 "HTTL",
100 "HPTTL",
101 "HSCAN",
102 # List Operations
103 "LINDEX",
104 "LPOS",
105 "LLEN",
106 "LRANGE",
107 # Set Operations
108 "SCARD",
109 "SDIFF",
110 "SINTER",
111 "SINTERCARD",
112 "SISMEMBER",
113 "SMISMEMBER",
114 "SMEMBERS",
115 "SRANDMEMBER",
116 "SUNION",
117 "SSCAN",
118 # Sorted Set Operations
119 "ZCARD",
120 "ZCOUNT",
121 "ZDIFF",
122 "ZINTER",
123 "ZINTERCARD",
124 "ZLEXCOUNT",
125 "ZMSCORE",
126 "ZRANDMEMBER",
127 "ZRANGE",
128 "ZRANGEBYLEX",
129 "ZRANGEBYSCORE",
130 "ZRANK",
131 "ZREVRANGE",
132 "ZREVRANGEBYLEX",
133 "ZREVRANGEBYSCORE",
134 "ZREVRANK",
135 "ZSCAN",
136 "ZSCORE",
137 "ZUNION",
138 # Stream Operations
139 "XLEN",
140 "XPENDING",
141 "XRANGE",
142 "XREAD",
143 "XREVRANGE",
144 # JSON Module
145 "JSON.ARRINDEX",
146 "JSON.ARRLEN",
147 "JSON.GET",
148 "JSON.MGET",
149 "JSON.OBJKEYS",
150 "JSON.OBJLEN",
151 "JSON.RESP",
152 "JSON.STRLEN",
153 "JSON.TYPE",
154 # RediSearch Module
155 "FT.EXPLAIN",
156 "FT.INFO",
157 "FT.PROFILE",
158 "FT.SEARCH",
159 ]
160)
161
162
163class ClusterMultiKeyCommands(ClusterCommandsProtocol):
164 """
165 A class containing commands that handle more than one key
166 """
167
168 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]:
169 """Split keys into a dictionary that maps a slot to a list of keys."""
170
171 slots_to_keys = {}
172 for key in keys:
173 slot = key_slot(self.encoder.encode(key))
174 slots_to_keys.setdefault(slot, []).append(key)
175
176 return slots_to_keys
177
178 def _partition_pairs_by_slot(
179 self, mapping: Mapping[AnyKeyT, EncodableT]
180 ) -> Dict[int, List[EncodableT]]:
181 """Split pairs into a dictionary that maps a slot to a list of pairs."""
182
183 slots_to_pairs = {}
184 for pair in mapping.items():
185 slot = key_slot(self.encoder.encode(pair[0]))
186 slots_to_pairs.setdefault(slot, []).extend(pair)
187
188 return slots_to_pairs
189
190 def _execute_pipeline_by_slot(
191 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
192 ) -> List[Any]:
193 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
194 pipe = self.pipeline()
195 [
196 pipe.execute_command(
197 command,
198 *slot_args,
199 target_nodes=[
200 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
201 ],
202 )
203 for slot, slot_args in slots_to_args.items()
204 ]
205 return pipe.execute()
206
207 def _reorder_keys_by_command(
208 self,
209 keys: Iterable[KeyT],
210 slots_to_args: Mapping[int, Iterable[EncodableT]],
211 responses: Iterable[Any],
212 ) -> List[Any]:
213 results = {
214 k: v
215 for slot_values, response in zip(slots_to_args.values(), responses)
216 for k, v in zip(slot_values, response)
217 }
218 return [results[key] for key in keys]
219
220 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
221 """
222 Splits the keys into different slots and then calls MGET
223 for the keys of every slot. This operation will not be atomic
224 if keys belong to more than one slot.
225
226 Returns a list of values ordered identically to ``keys``
227
228 For more information see https://redis.io/commands/mget
229 """
230
231 # Concatenate all keys into a list
232 keys = list_or_args(keys, args)
233
234 # Split keys into slots
235 slots_to_keys = self._partition_keys_by_slot(keys)
236
237 # Execute commands using a pipeline
238 res = self._execute_pipeline_by_slot("MGET", slots_to_keys)
239
240 # Reorder keys in the order the user provided & return
241 return self._reorder_keys_by_command(keys, slots_to_keys, res)
242
243 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
244 """
245 Sets key/values based on a mapping. Mapping is a dictionary of
246 key/value pairs. Both keys and values should be strings or types that
247 can be cast to a string via str().
248
249 Splits the keys into different slots and then calls MSET
250 for the keys of every slot. This operation will not be atomic
251 if keys belong to more than one slot.
252
253 For more information see https://redis.io/commands/mset
254 """
255
256 # Partition the keys by slot
257 slots_to_pairs = self._partition_pairs_by_slot(mapping)
258
259 # Execute commands using a pipeline & return list of replies
260 return self._execute_pipeline_by_slot("MSET", slots_to_pairs)
261
262 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
263 """
264 Runs the given command once for the keys
265 of each slot. Returns the sum of the return values.
266 """
267
268 # Partition the keys by slot
269 slots_to_keys = self._partition_keys_by_slot(keys)
270
271 # Sum up the reply from each command
272 return sum(self._execute_pipeline_by_slot(command, slots_to_keys))
273
274 def exists(self, *keys: KeyT) -> ResponseT:
275 """
276 Returns the number of ``names`` that exist in the
277 whole cluster. The keys are first split up into slots
278 and then an EXISTS command is sent for every slot
279
280 For more information see https://redis.io/commands/exists
281 """
282 return self._split_command_across_slots("EXISTS", *keys)
283
284 def delete(self, *keys: KeyT) -> ResponseT:
285 """
286 Deletes the given keys in the cluster.
287 The keys are first split up into slots
288 and then an DEL command is sent for every slot
289
290 Non-existent keys are ignored.
291 Returns the number of keys that were deleted.
292
293 For more information see https://redis.io/commands/del
294 """
295 return self._split_command_across_slots("DEL", *keys)
296
297 def touch(self, *keys: KeyT) -> ResponseT:
298 """
299 Updates the last access time of given keys across the
300 cluster.
301
302 The keys are first split up into slots
303 and then an TOUCH command is sent for every slot
304
305 Non-existent keys are ignored.
306 Returns the number of keys that were touched.
307
308 For more information see https://redis.io/commands/touch
309 """
310 return self._split_command_across_slots("TOUCH", *keys)
311
312 def unlink(self, *keys: KeyT) -> ResponseT:
313 """
314 Remove the specified keys in a different thread.
315
316 The keys are first split up into slots
317 and then an TOUCH command is sent for every slot
318
319 Non-existent keys are ignored.
320 Returns the number of keys that were unlinked.
321
322 For more information see https://redis.io/commands/unlink
323 """
324 return self._split_command_across_slots("UNLINK", *keys)
325
326
327class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
328 """
329 A class containing commands that handle more than one key
330 """
331
332 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
333 """
334 Splits the keys into different slots and then calls MGET
335 for the keys of every slot. This operation will not be atomic
336 if keys belong to more than one slot.
337
338 Returns a list of values ordered identically to ``keys``
339
340 For more information see https://redis.io/commands/mget
341 """
342
343 # Concatenate all keys into a list
344 keys = list_or_args(keys, args)
345
346 # Split keys into slots
347 slots_to_keys = self._partition_keys_by_slot(keys)
348
349 # Execute commands using a pipeline
350 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys)
351
352 # Reorder keys in the order the user provided & return
353 return self._reorder_keys_by_command(keys, slots_to_keys, res)
354
355 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
356 """
357 Sets key/values based on a mapping. Mapping is a dictionary of
358 key/value pairs. Both keys and values should be strings or types that
359 can be cast to a string via str().
360
361 Splits the keys into different slots and then calls MSET
362 for the keys of every slot. This operation will not be atomic
363 if keys belong to more than one slot.
364
365 For more information see https://redis.io/commands/mset
366 """
367
368 # Partition the keys by slot
369 slots_to_pairs = self._partition_pairs_by_slot(mapping)
370
371 # Execute commands using a pipeline & return list of replies
372 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs)
373
374 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
375 """
376 Runs the given command once for the keys
377 of each slot. Returns the sum of the return values.
378 """
379
380 # Partition the keys by slot
381 slots_to_keys = self._partition_keys_by_slot(keys)
382
383 # Sum up the reply from each command
384 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))
385
386 async def _execute_pipeline_by_slot(
387 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
388 ) -> List[Any]:
389 if self._initialize:
390 await self.initialize()
391 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
392 pipe = self.pipeline()
393 [
394 pipe.execute_command(
395 command,
396 *slot_args,
397 target_nodes=[
398 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
399 ],
400 )
401 for slot, slot_args in slots_to_args.items()
402 ]
403 return await pipe.execute()
404
405
406class ClusterManagementCommands(ManagementCommands):
407 """
408 A class for Redis Cluster management commands
409
410 The class inherits from Redis's core ManagementCommands class and do the
411 required adjustments to work with cluster mode
412 """
413
414 def slaveof(self, *args, **kwargs) -> NoReturn:
415 """
416 Make the server a replica of another instance, or promote it as master.
417
418 For more information see https://redis.io/commands/slaveof
419 """
420 raise RedisClusterException("SLAVEOF is not supported in cluster mode")
421
422 def replicaof(self, *args, **kwargs) -> NoReturn:
423 """
424 Make the server a replica of another instance, or promote it as master.
425
426 For more information see https://redis.io/commands/replicaof
427 """
428 raise RedisClusterException("REPLICAOF is not supported in cluster mode")
429
430 def swapdb(self, *args, **kwargs) -> NoReturn:
431 """
432 Swaps two Redis databases.
433
434 For more information see https://redis.io/commands/swapdb
435 """
436 raise RedisClusterException("SWAPDB is not supported in cluster mode")
437
438 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT:
439 """
440 Returns the node's id.
441
442 :target_node: 'ClusterNode'
443 The node to execute the command on
444
445 For more information check https://redis.io/commands/cluster-myid/
446 """
447 return self.execute_command("CLUSTER MYID", target_nodes=target_node)
448
449 def cluster_addslots(
450 self, target_node: "TargetNodesT", *slots: EncodableT
451 ) -> ResponseT:
452 """
453 Assign new hash slots to receiving node. Sends to specified node.
454
455 :target_node: 'ClusterNode'
456 The node to execute the command on
457
458 For more information see https://redis.io/commands/cluster-addslots
459 """
460 return self.execute_command(
461 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node
462 )
463
464 def cluster_addslotsrange(
465 self, target_node: "TargetNodesT", *slots: EncodableT
466 ) -> ResponseT:
467 """
468 Similar to the CLUSTER ADDSLOTS command.
469 The difference between the two commands is that ADDSLOTS takes a list of slots
470 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges
471 (specified by start and end slots) to assign to the node.
472
473 :target_node: 'ClusterNode'
474 The node to execute the command on
475
476 For more information see https://redis.io/commands/cluster-addslotsrange
477 """
478 return self.execute_command(
479 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node
480 )
481
482 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT:
483 """
484 Return the number of local keys in the specified hash slot
485 Send to node based on specified slot_id
486
487 For more information see https://redis.io/commands/cluster-countkeysinslot
488 """
489 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id)
490
491 def cluster_count_failure_report(self, node_id: str) -> ResponseT:
492 """
493 Return the number of failure reports active for a given node
494 Sends to a random node
495
496 For more information see https://redis.io/commands/cluster-count-failure-reports
497 """
498 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id)
499
500 def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
501 """
502 Set hash slots as unbound in the cluster.
503 It determines by it self what node the slot is in and sends it there
504
505 Returns a list of the results for each processed slot.
506
507 For more information see https://redis.io/commands/cluster-delslots
508 """
509 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots]
510
511 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT:
512 """
513 Similar to the CLUSTER DELSLOTS command.
514 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove
515 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove
516 from the node.
517
518 For more information see https://redis.io/commands/cluster-delslotsrange
519 """
520 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots)
521
522 def cluster_failover(
523 self, target_node: "TargetNodesT", option: Optional[str] = None
524 ) -> ResponseT:
525 """
526 Forces a slave to perform a manual failover of its master
527 Sends to specified node
528
529 :target_node: 'ClusterNode'
530 The node to execute the command on
531
532 For more information see https://redis.io/commands/cluster-failover
533 """
534 if option:
535 if option.upper() not in ["FORCE", "TAKEOVER"]:
536 raise RedisError(
537 f"Invalid option for CLUSTER FAILOVER command: {option}"
538 )
539 else:
540 return self.execute_command(
541 "CLUSTER FAILOVER", option, target_nodes=target_node
542 )
543 else:
544 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node)
545
546 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
547 """
548 Provides info about Redis Cluster node state.
549 The command will be sent to a random node in the cluster if no target
550 node is specified.
551
552 For more information see https://redis.io/commands/cluster-info
553 """
554 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes)
555
556 def cluster_keyslot(self, key: str) -> ResponseT:
557 """
558 Returns the hash slot of the specified key
559 Sends to random node in the cluster
560
561 For more information see https://redis.io/commands/cluster-keyslot
562 """
563 return self.execute_command("CLUSTER KEYSLOT", key)
564
565 def cluster_meet(
566 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None
567 ) -> ResponseT:
568 """
569 Force a node cluster to handshake with another node.
570 Sends to specified node.
571
572 For more information see https://redis.io/commands/cluster-meet
573 """
574 return self.execute_command(
575 "CLUSTER MEET", host, port, target_nodes=target_nodes
576 )
577
578 def cluster_nodes(self) -> ResponseT:
579 """
580 Get Cluster config for the node.
581 Sends to random node in the cluster
582
583 For more information see https://redis.io/commands/cluster-nodes
584 """
585 return self.execute_command("CLUSTER NODES")
586
587 def cluster_replicate(
588 self, target_nodes: "TargetNodesT", node_id: str
589 ) -> ResponseT:
590 """
591 Reconfigure a node as a slave of the specified master node
592
593 For more information see https://redis.io/commands/cluster-replicate
594 """
595 return self.execute_command(
596 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes
597 )
598
599 def cluster_reset(
600 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None
601 ) -> ResponseT:
602 """
603 Reset a Redis Cluster node
604
605 If 'soft' is True then it will send 'SOFT' argument
606 If 'soft' is False then it will send 'HARD' argument
607
608 For more information see https://redis.io/commands/cluster-reset
609 """
610 return self.execute_command(
611 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes
612 )
613
614 def cluster_save_config(
615 self, target_nodes: Optional["TargetNodesT"] = None
616 ) -> ResponseT:
617 """
618 Forces the node to save cluster state on disk
619
620 For more information see https://redis.io/commands/cluster-saveconfig
621 """
622 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes)
623
624 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT:
625 """
626 Returns the number of keys in the specified cluster slot
627
628 For more information see https://redis.io/commands/cluster-getkeysinslot
629 """
630 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys)
631
632 def cluster_set_config_epoch(
633 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None
634 ) -> ResponseT:
635 """
636 Set the configuration epoch in a new node
637
638 For more information see https://redis.io/commands/cluster-set-config-epoch
639 """
640 return self.execute_command(
641 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes
642 )
643
644 def cluster_setslot(
645 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str
646 ) -> ResponseT:
647 """
648 Bind an hash slot to a specific node
649
650 :target_node: 'ClusterNode'
651 The node to execute the command on
652
653 For more information see https://redis.io/commands/cluster-setslot
654 """
655 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"):
656 return self.execute_command(
657 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node
658 )
659 elif state.upper() == "STABLE":
660 raise RedisError('For "stable" state please use cluster_setslot_stable')
661 else:
662 raise RedisError(f"Invalid slot state: {state}")
663
664 def cluster_setslot_stable(self, slot_id: int) -> ResponseT:
665 """
666 Clears migrating / importing state from the slot.
667 It determines by it self what node the slot is in and sends it there.
668
669 For more information see https://redis.io/commands/cluster-setslot
670 """
671 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE")
672
673 def cluster_replicas(
674 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None
675 ) -> ResponseT:
676 """
677 Provides a list of replica nodes replicating from the specified primary
678 target node.
679
680 For more information see https://redis.io/commands/cluster-replicas
681 """
682 return self.execute_command(
683 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes
684 )
685
686 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
687 """
688 Get array of Cluster slot to node mappings
689
690 For more information see https://redis.io/commands/cluster-slots
691 """
692 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes)
693
694 def cluster_shards(self, target_nodes=None):
695 """
696 Returns details about the shards of the cluster.
697
698 For more information see https://redis.io/commands/cluster-shards
699 """
700 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes)
701
702 def cluster_myshardid(self, target_nodes=None):
703 """
704 Returns the shard ID of the node.
705
706 For more information see https://redis.io/commands/cluster-myshardid/
707 """
708 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes)
709
710 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT:
711 """
712 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each
713 peer in the cluster: One for sending outbound messages towards the peer and one
714 for receiving inbound messages from the peer.
715
716 This command outputs information of all such peer links as an array.
717
718 For more information see https://redis.io/commands/cluster-links
719 """
720 return self.execute_command("CLUSTER LINKS", target_nodes=target_node)
721
722 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
723 raise NotImplementedError(
724 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client."
725 )
726
727 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
728 raise NotImplementedError(
729 "CLUSTER BUMPEPOCH is intentionally not implemented in the client."
730 )
731
732 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
733 """
734 Enables read queries.
735 The command will be sent to the default cluster node if target_nodes is
736 not specified.
737
738 For more information see https://redis.io/commands/readonly
739 """
740 if target_nodes == "replicas" or target_nodes == "all":
741 # read_from_replicas will only be enabled if the READONLY command
742 # is sent to all replicas
743 self.read_from_replicas = True
744 return self.execute_command("READONLY", target_nodes=target_nodes)
745
746 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
747 """
748 Disables read queries.
749 The command will be sent to the default cluster node if target_nodes is
750 not specified.
751
752 For more information see https://redis.io/commands/readwrite
753 """
754 # Reset read from replicas flag
755 self.read_from_replicas = False
756 return self.execute_command("READWRITE", target_nodes=target_nodes)
757
758
759class AsyncClusterManagementCommands(
760 ClusterManagementCommands, AsyncManagementCommands
761):
762 """
763 A class for Redis Cluster management commands
764
765 The class inherits from Redis's core ManagementCommands class and do the
766 required adjustments to work with cluster mode
767 """
768
769 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
770 """
771 Set hash slots as unbound in the cluster.
772 It determines by it self what node the slot is in and sends it there
773
774 Returns a list of the results for each processed slot.
775
776 For more information see https://redis.io/commands/cluster-delslots
777 """
778 return await asyncio.gather(
779 *(
780 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot))
781 for slot in slots
782 )
783 )
784
785
786class ClusterDataAccessCommands(DataAccessCommands):
787 """
788 A class for Redis Cluster Data Access Commands
789
790 The class inherits from Redis's core DataAccessCommand class and do the
791 required adjustments to work with cluster mode
792 """
793
794 def stralgo(
795 self,
796 algo: Literal["LCS"],
797 value1: KeyT,
798 value2: KeyT,
799 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings",
800 len: bool = False,
801 idx: bool = False,
802 minmatchlen: Optional[int] = None,
803 withmatchlen: bool = False,
804 **kwargs,
805 ) -> ResponseT:
806 """
807 Implements complex algorithms that operate on strings.
808 Right now the only algorithm implemented is the LCS algorithm
809 (longest common substring). However new algorithms could be
810 implemented in the future.
811
812 ``algo`` Right now must be LCS
813 ``value1`` and ``value2`` Can be two strings or two keys
814 ``specific_argument`` Specifying if the arguments to the algorithm
815 will be keys or strings. strings is the default.
816 ``len`` Returns just the len of the match.
817 ``idx`` Returns the match positions in each string.
818 ``minmatchlen`` Restrict the list of matches to the ones of a given
819 minimal length. Can be provided only when ``idx`` set to True.
820 ``withmatchlen`` Returns the matches with the len of the match.
821 Can be provided only when ``idx`` set to True.
822
823 For more information see https://redis.io/commands/stralgo
824 """
825 target_nodes = kwargs.pop("target_nodes", None)
826 if specific_argument == "strings" and target_nodes is None:
827 target_nodes = "default-node"
828 kwargs.update({"target_nodes": target_nodes})
829 return super().stralgo(
830 algo,
831 value1,
832 value2,
833 specific_argument,
834 len,
835 idx,
836 minmatchlen,
837 withmatchlen,
838 **kwargs,
839 )
840
841 def scan_iter(
842 self,
843 match: Optional[PatternT] = None,
844 count: Optional[int] = None,
845 _type: Optional[str] = None,
846 **kwargs,
847 ) -> Iterator:
848 # Do the first query with cursor=0 for all nodes
849 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
850 yield from data
851
852 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
853 if cursors:
854 # Get nodes by name
855 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
856
857 # Iterate over each node till its cursor is 0
858 kwargs.pop("target_nodes", None)
859 while cursors:
860 for name, cursor in cursors.items():
861 cur, data = self.scan(
862 cursor=cursor,
863 match=match,
864 count=count,
865 _type=_type,
866 target_nodes=nodes[name],
867 **kwargs,
868 )
869 yield from data
870 cursors[name] = cur[name]
871
872 cursors = {
873 name: cursor for name, cursor in cursors.items() if cursor != 0
874 }
875
876
877class AsyncClusterDataAccessCommands(
878 ClusterDataAccessCommands, AsyncDataAccessCommands
879):
880 """
881 A class for Redis Cluster Data Access Commands
882
883 The class inherits from Redis's core DataAccessCommand class and do the
884 required adjustments to work with cluster mode
885 """
886
887 async def scan_iter(
888 self,
889 match: Optional[PatternT] = None,
890 count: Optional[int] = None,
891 _type: Optional[str] = None,
892 **kwargs,
893 ) -> AsyncIterator:
894 # Do the first query with cursor=0 for all nodes
895 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
896 for value in data:
897 yield value
898
899 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
900 if cursors:
901 # Get nodes by name
902 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
903
904 # Iterate over each node till its cursor is 0
905 kwargs.pop("target_nodes", None)
906 while cursors:
907 for name, cursor in cursors.items():
908 cur, data = await self.scan(
909 cursor=cursor,
910 match=match,
911 count=count,
912 _type=_type,
913 target_nodes=nodes[name],
914 **kwargs,
915 )
916 for value in data:
917 yield value
918 cursors[name] = cur[name]
919
920 cursors = {
921 name: cursor for name, cursor in cursors.items() if cursor != 0
922 }
923
924
925class RedisClusterCommands(
926 ClusterMultiKeyCommands,
927 ClusterManagementCommands,
928 ACLCommands,
929 PubSubCommands,
930 ClusterDataAccessCommands,
931 ScriptCommands,
932 FunctionCommands,
933 ModuleCommands,
934 RedisModuleCommands,
935):
936 """
937 A class for all Redis Cluster commands
938
939 For key-based commands, the target node(s) will be internally determined
940 by the keys' hash slot.
941 Non-key-based commands can be executed with the 'target_nodes' argument to
942 target specific nodes. By default, if target_nodes is not specified, the
943 command will be executed on the default cluster node.
944
945 :param :target_nodes: type can be one of the followings:
946 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
947 - 'ClusterNode'
948 - 'list(ClusterNodes)'
949 - 'dict(any:clusterNodes)'
950
951 for example:
952 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
953 """
954
955
956class AsyncRedisClusterCommands(
957 AsyncClusterMultiKeyCommands,
958 AsyncClusterManagementCommands,
959 AsyncACLCommands,
960 AsyncClusterDataAccessCommands,
961 AsyncScriptCommands,
962 AsyncFunctionCommands,
963 AsyncModuleCommands,
964 AsyncRedisModuleCommands,
965):
966 """
967 A class for all Redis Cluster commands
968
969 For key-based commands, the target node(s) will be internally determined
970 by the keys' hash slot.
971 Non-key-based commands can be executed with the 'target_nodes' argument to
972 target specific nodes. By default, if target_nodes is not specified, the
973 command will be executed on the default cluster node.
974
975 :param :target_nodes: type can be one of the followings:
976 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
977 - 'ClusterNode'
978 - 'list(ClusterNodes)'
979 - 'dict(any:clusterNodes)'
980
981 for example:
982 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
983 """