1import asyncio
2from typing import (
3 TYPE_CHECKING,
4 Any,
5 AsyncIterator,
6 Dict,
7 Iterable,
8 Iterator,
9 List,
10 Literal,
11 Mapping,
12 NoReturn,
13 Optional,
14 Union,
15)
16
17from redis.crc import key_slot
18from redis.exceptions import RedisClusterException, RedisError
19from redis.typing import (
20 AnyKeyT,
21 ClusterCommandsProtocol,
22 EncodableT,
23 KeysT,
24 KeyT,
25 PatternT,
26 ResponseT,
27)
28
29from .core import (
30 ACLCommands,
31 AsyncACLCommands,
32 AsyncDataAccessCommands,
33 AsyncFunctionCommands,
34 AsyncManagementCommands,
35 AsyncModuleCommands,
36 AsyncScriptCommands,
37 DataAccessCommands,
38 FunctionCommands,
39 ManagementCommands,
40 ModuleCommands,
41 PubSubCommands,
42 ScriptCommands,
43)
44from .helpers import list_or_args
45from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
46
47if TYPE_CHECKING:
48 from redis.asyncio.cluster import TargetNodesT
49
50# Not complete, but covers the major ones
51# https://redis.io/commands
52READ_COMMANDS = frozenset(
53 [
54 "BITCOUNT",
55 "BITPOS",
56 "EVAL_RO",
57 "EVALSHA_RO",
58 "EXISTS",
59 "GEODIST",
60 "GEOHASH",
61 "GEOPOS",
62 "GEORADIUS",
63 "GEORADIUSBYMEMBER",
64 "GET",
65 "GETBIT",
66 "GETRANGE",
67 "HEXISTS",
68 "HGET",
69 "HGETALL",
70 "HKEYS",
71 "HLEN",
72 "HMGET",
73 "HSTRLEN",
74 "HVALS",
75 "KEYS",
76 "LINDEX",
77 "LLEN",
78 "LRANGE",
79 "MGET",
80 "PTTL",
81 "RANDOMKEY",
82 "SCARD",
83 "SDIFF",
84 "SINTER",
85 "SISMEMBER",
86 "SMEMBERS",
87 "SRANDMEMBER",
88 "STRLEN",
89 "SUNION",
90 "TTL",
91 "ZCARD",
92 "ZCOUNT",
93 "ZRANGE",
94 "ZSCORE",
95 ]
96)
97
98
99class ClusterMultiKeyCommands(ClusterCommandsProtocol):
100 """
101 A class containing commands that handle more than one key
102 """
103
104 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]:
105 """Split keys into a dictionary that maps a slot to a list of keys."""
106
107 slots_to_keys = {}
108 for key in keys:
109 slot = key_slot(self.encoder.encode(key))
110 slots_to_keys.setdefault(slot, []).append(key)
111
112 return slots_to_keys
113
114 def _partition_pairs_by_slot(
115 self, mapping: Mapping[AnyKeyT, EncodableT]
116 ) -> Dict[int, List[EncodableT]]:
117 """Split pairs into a dictionary that maps a slot to a list of pairs."""
118
119 slots_to_pairs = {}
120 for pair in mapping.items():
121 slot = key_slot(self.encoder.encode(pair[0]))
122 slots_to_pairs.setdefault(slot, []).extend(pair)
123
124 return slots_to_pairs
125
126 def _execute_pipeline_by_slot(
127 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
128 ) -> List[Any]:
129 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
130 pipe = self.pipeline()
131 [
132 pipe.execute_command(
133 command,
134 *slot_args,
135 target_nodes=[
136 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
137 ],
138 )
139 for slot, slot_args in slots_to_args.items()
140 ]
141 return pipe.execute()
142
143 def _reorder_keys_by_command(
144 self,
145 keys: Iterable[KeyT],
146 slots_to_args: Mapping[int, Iterable[EncodableT]],
147 responses: Iterable[Any],
148 ) -> List[Any]:
149 results = {
150 k: v
151 for slot_values, response in zip(slots_to_args.values(), responses)
152 for k, v in zip(slot_values, response)
153 }
154 return [results[key] for key in keys]
155
156 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
157 """
158 Splits the keys into different slots and then calls MGET
159 for the keys of every slot. This operation will not be atomic
160 if keys belong to more than one slot.
161
162 Returns a list of values ordered identically to ``keys``
163
164 For more information see https://redis.io/commands/mget
165 """
166
167 # Concatenate all keys into a list
168 keys = list_or_args(keys, args)
169
170 # Split keys into slots
171 slots_to_keys = self._partition_keys_by_slot(keys)
172
173 # Execute commands using a pipeline
174 res = self._execute_pipeline_by_slot("MGET", slots_to_keys)
175
176 # Reorder keys in the order the user provided & return
177 return self._reorder_keys_by_command(keys, slots_to_keys, res)
178
179 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
180 """
181 Sets key/values based on a mapping. Mapping is a dictionary of
182 key/value pairs. Both keys and values should be strings or types that
183 can be cast to a string via str().
184
185 Splits the keys into different slots and then calls MSET
186 for the keys of every slot. This operation will not be atomic
187 if keys belong to more than one slot.
188
189 For more information see https://redis.io/commands/mset
190 """
191
192 # Partition the keys by slot
193 slots_to_pairs = self._partition_pairs_by_slot(mapping)
194
195 # Execute commands using a pipeline & return list of replies
196 return self._execute_pipeline_by_slot("MSET", slots_to_pairs)
197
198 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
199 """
200 Runs the given command once for the keys
201 of each slot. Returns the sum of the return values.
202 """
203
204 # Partition the keys by slot
205 slots_to_keys = self._partition_keys_by_slot(keys)
206
207 # Sum up the reply from each command
208 return sum(self._execute_pipeline_by_slot(command, slots_to_keys))
209
210 def exists(self, *keys: KeyT) -> ResponseT:
211 """
212 Returns the number of ``names`` that exist in the
213 whole cluster. The keys are first split up into slots
214 and then an EXISTS command is sent for every slot
215
216 For more information see https://redis.io/commands/exists
217 """
218 return self._split_command_across_slots("EXISTS", *keys)
219
220 def delete(self, *keys: KeyT) -> ResponseT:
221 """
222 Deletes the given keys in the cluster.
223 The keys are first split up into slots
224 and then an DEL command is sent for every slot
225
226 Non-existent keys are ignored.
227 Returns the number of keys that were deleted.
228
229 For more information see https://redis.io/commands/del
230 """
231 return self._split_command_across_slots("DEL", *keys)
232
233 def touch(self, *keys: KeyT) -> ResponseT:
234 """
235 Updates the last access time of given keys across the
236 cluster.
237
238 The keys are first split up into slots
239 and then an TOUCH command is sent for every slot
240
241 Non-existent keys are ignored.
242 Returns the number of keys that were touched.
243
244 For more information see https://redis.io/commands/touch
245 """
246 return self._split_command_across_slots("TOUCH", *keys)
247
248 def unlink(self, *keys: KeyT) -> ResponseT:
249 """
250 Remove the specified keys in a different thread.
251
252 The keys are first split up into slots
253 and then an TOUCH command is sent for every slot
254
255 Non-existent keys are ignored.
256 Returns the number of keys that were unlinked.
257
258 For more information see https://redis.io/commands/unlink
259 """
260 return self._split_command_across_slots("UNLINK", *keys)
261
262
263class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
264 """
265 A class containing commands that handle more than one key
266 """
267
268 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
269 """
270 Splits the keys into different slots and then calls MGET
271 for the keys of every slot. This operation will not be atomic
272 if keys belong to more than one slot.
273
274 Returns a list of values ordered identically to ``keys``
275
276 For more information see https://redis.io/commands/mget
277 """
278
279 # Concatenate all keys into a list
280 keys = list_or_args(keys, args)
281
282 # Split keys into slots
283 slots_to_keys = self._partition_keys_by_slot(keys)
284
285 # Execute commands using a pipeline
286 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys)
287
288 # Reorder keys in the order the user provided & return
289 return self._reorder_keys_by_command(keys, slots_to_keys, res)
290
291 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
292 """
293 Sets key/values based on a mapping. Mapping is a dictionary of
294 key/value pairs. Both keys and values should be strings or types that
295 can be cast to a string via str().
296
297 Splits the keys into different slots and then calls MSET
298 for the keys of every slot. This operation will not be atomic
299 if keys belong to more than one slot.
300
301 For more information see https://redis.io/commands/mset
302 """
303
304 # Partition the keys by slot
305 slots_to_pairs = self._partition_pairs_by_slot(mapping)
306
307 # Execute commands using a pipeline & return list of replies
308 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs)
309
310 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
311 """
312 Runs the given command once for the keys
313 of each slot. Returns the sum of the return values.
314 """
315
316 # Partition the keys by slot
317 slots_to_keys = self._partition_keys_by_slot(keys)
318
319 # Sum up the reply from each command
320 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))
321
322 async def _execute_pipeline_by_slot(
323 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
324 ) -> List[Any]:
325 if self._initialize:
326 await self.initialize()
327 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
328 pipe = self.pipeline()
329 [
330 pipe.execute_command(
331 command,
332 *slot_args,
333 target_nodes=[
334 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
335 ],
336 )
337 for slot, slot_args in slots_to_args.items()
338 ]
339 return await pipe.execute()
340
341
342class ClusterManagementCommands(ManagementCommands):
343 """
344 A class for Redis Cluster management commands
345
346 The class inherits from Redis's core ManagementCommands class and do the
347 required adjustments to work with cluster mode
348 """
349
350 def slaveof(self, *args, **kwargs) -> NoReturn:
351 """
352 Make the server a replica of another instance, or promote it as master.
353
354 For more information see https://redis.io/commands/slaveof
355 """
356 raise RedisClusterException("SLAVEOF is not supported in cluster mode")
357
358 def replicaof(self, *args, **kwargs) -> NoReturn:
359 """
360 Make the server a replica of another instance, or promote it as master.
361
362 For more information see https://redis.io/commands/replicaof
363 """
364 raise RedisClusterException("REPLICAOF is not supported in cluster mode")
365
366 def swapdb(self, *args, **kwargs) -> NoReturn:
367 """
368 Swaps two Redis databases.
369
370 For more information see https://redis.io/commands/swapdb
371 """
372 raise RedisClusterException("SWAPDB is not supported in cluster mode")
373
374 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT:
375 """
376 Returns the node's id.
377
378 :target_node: 'ClusterNode'
379 The node to execute the command on
380
381 For more information check https://redis.io/commands/cluster-myid/
382 """
383 return self.execute_command("CLUSTER MYID", target_nodes=target_node)
384
385 def cluster_addslots(
386 self, target_node: "TargetNodesT", *slots: EncodableT
387 ) -> ResponseT:
388 """
389 Assign new hash slots to receiving node. Sends to specified node.
390
391 :target_node: 'ClusterNode'
392 The node to execute the command on
393
394 For more information see https://redis.io/commands/cluster-addslots
395 """
396 return self.execute_command(
397 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node
398 )
399
400 def cluster_addslotsrange(
401 self, target_node: "TargetNodesT", *slots: EncodableT
402 ) -> ResponseT:
403 """
404 Similar to the CLUSTER ADDSLOTS command.
405 The difference between the two commands is that ADDSLOTS takes a list of slots
406 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges
407 (specified by start and end slots) to assign to the node.
408
409 :target_node: 'ClusterNode'
410 The node to execute the command on
411
412 For more information see https://redis.io/commands/cluster-addslotsrange
413 """
414 return self.execute_command(
415 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node
416 )
417
418 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT:
419 """
420 Return the number of local keys in the specified hash slot
421 Send to node based on specified slot_id
422
423 For more information see https://redis.io/commands/cluster-countkeysinslot
424 """
425 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id)
426
427 def cluster_count_failure_report(self, node_id: str) -> ResponseT:
428 """
429 Return the number of failure reports active for a given node
430 Sends to a random node
431
432 For more information see https://redis.io/commands/cluster-count-failure-reports
433 """
434 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id)
435
436 def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
437 """
438 Set hash slots as unbound in the cluster.
439 It determines by it self what node the slot is in and sends it there
440
441 Returns a list of the results for each processed slot.
442
443 For more information see https://redis.io/commands/cluster-delslots
444 """
445 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots]
446
447 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT:
448 """
449 Similar to the CLUSTER DELSLOTS command.
450 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove
451 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove
452 from the node.
453
454 For more information see https://redis.io/commands/cluster-delslotsrange
455 """
456 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots)
457
458 def cluster_failover(
459 self, target_node: "TargetNodesT", option: Optional[str] = None
460 ) -> ResponseT:
461 """
462 Forces a slave to perform a manual failover of its master
463 Sends to specified node
464
465 :target_node: 'ClusterNode'
466 The node to execute the command on
467
468 For more information see https://redis.io/commands/cluster-failover
469 """
470 if option:
471 if option.upper() not in ["FORCE", "TAKEOVER"]:
472 raise RedisError(
473 f"Invalid option for CLUSTER FAILOVER command: {option}"
474 )
475 else:
476 return self.execute_command(
477 "CLUSTER FAILOVER", option, target_nodes=target_node
478 )
479 else:
480 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node)
481
482 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
483 """
484 Provides info about Redis Cluster node state.
485 The command will be sent to a random node in the cluster if no target
486 node is specified.
487
488 For more information see https://redis.io/commands/cluster-info
489 """
490 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes)
491
492 def cluster_keyslot(self, key: str) -> ResponseT:
493 """
494 Returns the hash slot of the specified key
495 Sends to random node in the cluster
496
497 For more information see https://redis.io/commands/cluster-keyslot
498 """
499 return self.execute_command("CLUSTER KEYSLOT", key)
500
501 def cluster_meet(
502 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None
503 ) -> ResponseT:
504 """
505 Force a node cluster to handshake with another node.
506 Sends to specified node.
507
508 For more information see https://redis.io/commands/cluster-meet
509 """
510 return self.execute_command(
511 "CLUSTER MEET", host, port, target_nodes=target_nodes
512 )
513
514 def cluster_nodes(self) -> ResponseT:
515 """
516 Get Cluster config for the node.
517 Sends to random node in the cluster
518
519 For more information see https://redis.io/commands/cluster-nodes
520 """
521 return self.execute_command("CLUSTER NODES")
522
523 def cluster_replicate(
524 self, target_nodes: "TargetNodesT", node_id: str
525 ) -> ResponseT:
526 """
527 Reconfigure a node as a slave of the specified master node
528
529 For more information see https://redis.io/commands/cluster-replicate
530 """
531 return self.execute_command(
532 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes
533 )
534
535 def cluster_reset(
536 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None
537 ) -> ResponseT:
538 """
539 Reset a Redis Cluster node
540
541 If 'soft' is True then it will send 'SOFT' argument
542 If 'soft' is False then it will send 'HARD' argument
543
544 For more information see https://redis.io/commands/cluster-reset
545 """
546 return self.execute_command(
547 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes
548 )
549
550 def cluster_save_config(
551 self, target_nodes: Optional["TargetNodesT"] = None
552 ) -> ResponseT:
553 """
554 Forces the node to save cluster state on disk
555
556 For more information see https://redis.io/commands/cluster-saveconfig
557 """
558 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes)
559
560 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT:
561 """
562 Returns the number of keys in the specified cluster slot
563
564 For more information see https://redis.io/commands/cluster-getkeysinslot
565 """
566 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys)
567
568 def cluster_set_config_epoch(
569 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None
570 ) -> ResponseT:
571 """
572 Set the configuration epoch in a new node
573
574 For more information see https://redis.io/commands/cluster-set-config-epoch
575 """
576 return self.execute_command(
577 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes
578 )
579
580 def cluster_setslot(
581 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str
582 ) -> ResponseT:
583 """
584 Bind an hash slot to a specific node
585
586 :target_node: 'ClusterNode'
587 The node to execute the command on
588
589 For more information see https://redis.io/commands/cluster-setslot
590 """
591 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"):
592 return self.execute_command(
593 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node
594 )
595 elif state.upper() == "STABLE":
596 raise RedisError('For "stable" state please use cluster_setslot_stable')
597 else:
598 raise RedisError(f"Invalid slot state: {state}")
599
600 def cluster_setslot_stable(self, slot_id: int) -> ResponseT:
601 """
602 Clears migrating / importing state from the slot.
603 It determines by it self what node the slot is in and sends it there.
604
605 For more information see https://redis.io/commands/cluster-setslot
606 """
607 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE")
608
609 def cluster_replicas(
610 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None
611 ) -> ResponseT:
612 """
613 Provides a list of replica nodes replicating from the specified primary
614 target node.
615
616 For more information see https://redis.io/commands/cluster-replicas
617 """
618 return self.execute_command(
619 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes
620 )
621
622 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
623 """
624 Get array of Cluster slot to node mappings
625
626 For more information see https://redis.io/commands/cluster-slots
627 """
628 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes)
629
630 def cluster_shards(self, target_nodes=None):
631 """
632 Returns details about the shards of the cluster.
633
634 For more information see https://redis.io/commands/cluster-shards
635 """
636 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes)
637
638 def cluster_myshardid(self, target_nodes=None):
639 """
640 Returns the shard ID of the node.
641
642 For more information see https://redis.io/commands/cluster-myshardid/
643 """
644 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes)
645
646 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT:
647 """
648 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each
649 peer in the cluster: One for sending outbound messages towards the peer and one
650 for receiving inbound messages from the peer.
651
652 This command outputs information of all such peer links as an array.
653
654 For more information see https://redis.io/commands/cluster-links
655 """
656 return self.execute_command("CLUSTER LINKS", target_nodes=target_node)
657
658 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
659 raise NotImplementedError(
660 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client."
661 )
662
663 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
664 raise NotImplementedError(
665 "CLUSTER BUMPEPOCH is intentionally not implemented in the client."
666 )
667
668 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
669 """
670 Enables read queries.
671 The command will be sent to the default cluster node if target_nodes is
672 not specified.
673
674 For more information see https://redis.io/commands/readonly
675 """
676 if target_nodes == "replicas" or target_nodes == "all":
677 # read_from_replicas will only be enabled if the READONLY command
678 # is sent to all replicas
679 self.read_from_replicas = True
680 return self.execute_command("READONLY", target_nodes=target_nodes)
681
682 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
683 """
684 Disables read queries.
685 The command will be sent to the default cluster node if target_nodes is
686 not specified.
687
688 For more information see https://redis.io/commands/readwrite
689 """
690 # Reset read from replicas flag
691 self.read_from_replicas = False
692 return self.execute_command("READWRITE", target_nodes=target_nodes)
693
694
695class AsyncClusterManagementCommands(
696 ClusterManagementCommands, AsyncManagementCommands
697):
698 """
699 A class for Redis Cluster management commands
700
701 The class inherits from Redis's core ManagementCommands class and do the
702 required adjustments to work with cluster mode
703 """
704
705 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
706 """
707 Set hash slots as unbound in the cluster.
708 It determines by it self what node the slot is in and sends it there
709
710 Returns a list of the results for each processed slot.
711
712 For more information see https://redis.io/commands/cluster-delslots
713 """
714 return await asyncio.gather(
715 *(
716 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot))
717 for slot in slots
718 )
719 )
720
721
722class ClusterDataAccessCommands(DataAccessCommands):
723 """
724 A class for Redis Cluster Data Access Commands
725
726 The class inherits from Redis's core DataAccessCommand class and do the
727 required adjustments to work with cluster mode
728 """
729
730 def stralgo(
731 self,
732 algo: Literal["LCS"],
733 value1: KeyT,
734 value2: KeyT,
735 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings",
736 len: bool = False,
737 idx: bool = False,
738 minmatchlen: Optional[int] = None,
739 withmatchlen: bool = False,
740 **kwargs,
741 ) -> ResponseT:
742 """
743 Implements complex algorithms that operate on strings.
744 Right now the only algorithm implemented is the LCS algorithm
745 (longest common substring). However new algorithms could be
746 implemented in the future.
747
748 ``algo`` Right now must be LCS
749 ``value1`` and ``value2`` Can be two strings or two keys
750 ``specific_argument`` Specifying if the arguments to the algorithm
751 will be keys or strings. strings is the default.
752 ``len`` Returns just the len of the match.
753 ``idx`` Returns the match positions in each string.
754 ``minmatchlen`` Restrict the list of matches to the ones of a given
755 minimal length. Can be provided only when ``idx`` set to True.
756 ``withmatchlen`` Returns the matches with the len of the match.
757 Can be provided only when ``idx`` set to True.
758
759 For more information see https://redis.io/commands/stralgo
760 """
761 target_nodes = kwargs.pop("target_nodes", None)
762 if specific_argument == "strings" and target_nodes is None:
763 target_nodes = "default-node"
764 kwargs.update({"target_nodes": target_nodes})
765 return super().stralgo(
766 algo,
767 value1,
768 value2,
769 specific_argument,
770 len,
771 idx,
772 minmatchlen,
773 withmatchlen,
774 **kwargs,
775 )
776
777 def scan_iter(
778 self,
779 match: Optional[PatternT] = None,
780 count: Optional[int] = None,
781 _type: Optional[str] = None,
782 **kwargs,
783 ) -> Iterator:
784 # Do the first query with cursor=0 for all nodes
785 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
786 yield from data
787
788 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
789 if cursors:
790 # Get nodes by name
791 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
792
793 # Iterate over each node till its cursor is 0
794 kwargs.pop("target_nodes", None)
795 while cursors:
796 for name, cursor in cursors.items():
797 cur, data = self.scan(
798 cursor=cursor,
799 match=match,
800 count=count,
801 _type=_type,
802 target_nodes=nodes[name],
803 **kwargs,
804 )
805 yield from data
806 cursors[name] = cur[name]
807
808 cursors = {
809 name: cursor for name, cursor in cursors.items() if cursor != 0
810 }
811
812
813class AsyncClusterDataAccessCommands(
814 ClusterDataAccessCommands, AsyncDataAccessCommands
815):
816 """
817 A class for Redis Cluster Data Access Commands
818
819 The class inherits from Redis's core DataAccessCommand class and do the
820 required adjustments to work with cluster mode
821 """
822
823 async def scan_iter(
824 self,
825 match: Optional[PatternT] = None,
826 count: Optional[int] = None,
827 _type: Optional[str] = None,
828 **kwargs,
829 ) -> AsyncIterator:
830 # Do the first query with cursor=0 for all nodes
831 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
832 for value in data:
833 yield value
834
835 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
836 if cursors:
837 # Get nodes by name
838 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
839
840 # Iterate over each node till its cursor is 0
841 kwargs.pop("target_nodes", None)
842 while cursors:
843 for name, cursor in cursors.items():
844 cur, data = await self.scan(
845 cursor=cursor,
846 match=match,
847 count=count,
848 _type=_type,
849 target_nodes=nodes[name],
850 **kwargs,
851 )
852 for value in data:
853 yield value
854 cursors[name] = cur[name]
855
856 cursors = {
857 name: cursor for name, cursor in cursors.items() if cursor != 0
858 }
859
860
861class RedisClusterCommands(
862 ClusterMultiKeyCommands,
863 ClusterManagementCommands,
864 ACLCommands,
865 PubSubCommands,
866 ClusterDataAccessCommands,
867 ScriptCommands,
868 FunctionCommands,
869 ModuleCommands,
870 RedisModuleCommands,
871):
872 """
873 A class for all Redis Cluster commands
874
875 For key-based commands, the target node(s) will be internally determined
876 by the keys' hash slot.
877 Non-key-based commands can be executed with the 'target_nodes' argument to
878 target specific nodes. By default, if target_nodes is not specified, the
879 command will be executed on the default cluster node.
880
881 :param :target_nodes: type can be one of the followings:
882 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
883 - 'ClusterNode'
884 - 'list(ClusterNodes)'
885 - 'dict(any:clusterNodes)'
886
887 for example:
888 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
889 """
890
891
892class AsyncRedisClusterCommands(
893 AsyncClusterMultiKeyCommands,
894 AsyncClusterManagementCommands,
895 AsyncACLCommands,
896 AsyncClusterDataAccessCommands,
897 AsyncScriptCommands,
898 AsyncFunctionCommands,
899 AsyncModuleCommands,
900 AsyncRedisModuleCommands,
901):
902 """
903 A class for all Redis Cluster commands
904
905 For key-based commands, the target node(s) will be internally determined
906 by the keys' hash slot.
907 Non-key-based commands can be executed with the 'target_nodes' argument to
908 target specific nodes. By default, if target_nodes is not specified, the
909 command will be executed on the default cluster node.
910
911 :param :target_nodes: type can be one of the followings:
912 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
913 - 'ClusterNode'
914 - 'list(ClusterNodes)'
915 - 'dict(any:clusterNodes)'
916
917 for example:
918 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
919 """