Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/commands/cluster.py: 37%
187 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
1import asyncio
2from typing import (
3 TYPE_CHECKING,
4 Any,
5 AsyncIterator,
6 Dict,
7 Iterable,
8 Iterator,
9 List,
10 Literal,
11 Mapping,
12 NoReturn,
13 Optional,
14 Union,
15)
17from redis.crc import key_slot
18from redis.exceptions import RedisClusterException, RedisError
19from redis.typing import (
20 AnyKeyT,
21 ClusterCommandsProtocol,
22 EncodableT,
23 KeysT,
24 KeyT,
25 PatternT,
26 ResponseT,
27)
29from .core import (
30 ACLCommands,
31 AsyncACLCommands,
32 AsyncDataAccessCommands,
33 AsyncFunctionCommands,
34 AsyncGearsCommands,
35 AsyncManagementCommands,
36 AsyncModuleCommands,
37 AsyncScriptCommands,
38 DataAccessCommands,
39 FunctionCommands,
40 GearsCommands,
41 ManagementCommands,
42 ModuleCommands,
43 PubSubCommands,
44 ScriptCommands,
45)
46from .helpers import list_or_args
47from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
49if TYPE_CHECKING:
50 from redis.asyncio.cluster import TargetNodesT
52# Not complete, but covers the major ones
53# https://redis.io/commands
54READ_COMMANDS = frozenset(
55 [
56 "BITCOUNT",
57 "BITPOS",
58 "EVAL_RO",
59 "EVALSHA_RO",
60 "EXISTS",
61 "GEODIST",
62 "GEOHASH",
63 "GEOPOS",
64 "GEORADIUS",
65 "GEORADIUSBYMEMBER",
66 "GET",
67 "GETBIT",
68 "GETRANGE",
69 "HEXISTS",
70 "HGET",
71 "HGETALL",
72 "HKEYS",
73 "HLEN",
74 "HMGET",
75 "HSTRLEN",
76 "HVALS",
77 "KEYS",
78 "LINDEX",
79 "LLEN",
80 "LRANGE",
81 "MGET",
82 "PTTL",
83 "RANDOMKEY",
84 "SCARD",
85 "SDIFF",
86 "SINTER",
87 "SISMEMBER",
88 "SMEMBERS",
89 "SRANDMEMBER",
90 "STRLEN",
91 "SUNION",
92 "TTL",
93 "ZCARD",
94 "ZCOUNT",
95 "ZRANGE",
96 "ZSCORE",
97 ]
98)
101class ClusterMultiKeyCommands(ClusterCommandsProtocol):
102 """
103 A class containing commands that handle more than one key
104 """
106 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]:
107 """Split keys into a dictionary that maps a slot to a list of keys."""
109 slots_to_keys = {}
110 for key in keys:
111 slot = key_slot(self.encoder.encode(key))
112 slots_to_keys.setdefault(slot, []).append(key)
114 return slots_to_keys
116 def _partition_pairs_by_slot(
117 self, mapping: Mapping[AnyKeyT, EncodableT]
118 ) -> Dict[int, List[EncodableT]]:
119 """Split pairs into a dictionary that maps a slot to a list of pairs."""
121 slots_to_pairs = {}
122 for pair in mapping.items():
123 slot = key_slot(self.encoder.encode(pair[0]))
124 slots_to_pairs.setdefault(slot, []).extend(pair)
126 return slots_to_pairs
128 def _execute_pipeline_by_slot(
129 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
130 ) -> List[Any]:
131 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
132 pipe = self.pipeline()
133 [
134 pipe.execute_command(
135 command,
136 *slot_args,
137 target_nodes=[
138 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
139 ],
140 )
141 for slot, slot_args in slots_to_args.items()
142 ]
143 return pipe.execute()
145 def _reorder_keys_by_command(
146 self,
147 keys: Iterable[KeyT],
148 slots_to_args: Mapping[int, Iterable[EncodableT]],
149 responses: Iterable[Any],
150 ) -> List[Any]:
151 results = {
152 k: v
153 for slot_values, response in zip(slots_to_args.values(), responses)
154 for k, v in zip(slot_values, response)
155 }
156 return [results[key] for key in keys]
158 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
159 """
160 Splits the keys into different slots and then calls MGET
161 for the keys of every slot. This operation will not be atomic
162 if keys belong to more than one slot.
164 Returns a list of values ordered identically to ``keys``
166 For more information see https://redis.io/commands/mget
167 """
169 # Concatenate all keys into a list
170 keys = list_or_args(keys, args)
172 # Split keys into slots
173 slots_to_keys = self._partition_keys_by_slot(keys)
175 # Execute commands using a pipeline
176 res = self._execute_pipeline_by_slot("MGET", slots_to_keys)
178 # Reorder keys in the order the user provided & return
179 return self._reorder_keys_by_command(keys, slots_to_keys, res)
181 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
182 """
183 Sets key/values based on a mapping. Mapping is a dictionary of
184 key/value pairs. Both keys and values should be strings or types that
185 can be cast to a string via str().
187 Splits the keys into different slots and then calls MSET
188 for the keys of every slot. This operation will not be atomic
189 if keys belong to more than one slot.
191 For more information see https://redis.io/commands/mset
192 """
194 # Partition the keys by slot
195 slots_to_pairs = self._partition_pairs_by_slot(mapping)
197 # Execute commands using a pipeline & return list of replies
198 return self._execute_pipeline_by_slot("MSET", slots_to_pairs)
200 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
201 """
202 Runs the given command once for the keys
203 of each slot. Returns the sum of the return values.
204 """
206 # Partition the keys by slot
207 slots_to_keys = self._partition_keys_by_slot(keys)
209 # Sum up the reply from each command
210 return sum(self._execute_pipeline_by_slot(command, slots_to_keys))
212 def exists(self, *keys: KeyT) -> ResponseT:
213 """
214 Returns the number of ``names`` that exist in the
215 whole cluster. The keys are first split up into slots
216 and then an EXISTS command is sent for every slot
218 For more information see https://redis.io/commands/exists
219 """
220 return self._split_command_across_slots("EXISTS", *keys)
222 def delete(self, *keys: KeyT) -> ResponseT:
223 """
224 Deletes the given keys in the cluster.
225 The keys are first split up into slots
226 and then an DEL command is sent for every slot
228 Non-existent keys are ignored.
229 Returns the number of keys that were deleted.
231 For more information see https://redis.io/commands/del
232 """
233 return self._split_command_across_slots("DEL", *keys)
235 def touch(self, *keys: KeyT) -> ResponseT:
236 """
237 Updates the last access time of given keys across the
238 cluster.
240 The keys are first split up into slots
241 and then an TOUCH command is sent for every slot
243 Non-existent keys are ignored.
244 Returns the number of keys that were touched.
246 For more information see https://redis.io/commands/touch
247 """
248 return self._split_command_across_slots("TOUCH", *keys)
250 def unlink(self, *keys: KeyT) -> ResponseT:
251 """
252 Remove the specified keys in a different thread.
254 The keys are first split up into slots
255 and then an TOUCH command is sent for every slot
257 Non-existent keys are ignored.
258 Returns the number of keys that were unlinked.
260 For more information see https://redis.io/commands/unlink
261 """
262 return self._split_command_across_slots("UNLINK", *keys)
265class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
266 """
267 A class containing commands that handle more than one key
268 """
270 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
271 """
272 Splits the keys into different slots and then calls MGET
273 for the keys of every slot. This operation will not be atomic
274 if keys belong to more than one slot.
276 Returns a list of values ordered identically to ``keys``
278 For more information see https://redis.io/commands/mget
279 """
281 # Concatenate all keys into a list
282 keys = list_or_args(keys, args)
284 # Split keys into slots
285 slots_to_keys = self._partition_keys_by_slot(keys)
287 # Execute commands using a pipeline
288 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys)
290 # Reorder keys in the order the user provided & return
291 return self._reorder_keys_by_command(keys, slots_to_keys, res)
293 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
294 """
295 Sets key/values based on a mapping. Mapping is a dictionary of
296 key/value pairs. Both keys and values should be strings or types that
297 can be cast to a string via str().
299 Splits the keys into different slots and then calls MSET
300 for the keys of every slot. This operation will not be atomic
301 if keys belong to more than one slot.
303 For more information see https://redis.io/commands/mset
304 """
306 # Partition the keys by slot
307 slots_to_pairs = self._partition_pairs_by_slot(mapping)
309 # Execute commands using a pipeline & return list of replies
310 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs)
312 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
313 """
314 Runs the given command once for the keys
315 of each slot. Returns the sum of the return values.
316 """
318 # Partition the keys by slot
319 slots_to_keys = self._partition_keys_by_slot(keys)
321 # Sum up the reply from each command
322 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))
324 async def _execute_pipeline_by_slot(
325 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
326 ) -> List[Any]:
327 if self._initialize:
328 await self.initialize()
329 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
330 pipe = self.pipeline()
331 [
332 pipe.execute_command(
333 command,
334 *slot_args,
335 target_nodes=[
336 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
337 ],
338 )
339 for slot, slot_args in slots_to_args.items()
340 ]
341 return await pipe.execute()
344class ClusterManagementCommands(ManagementCommands):
345 """
346 A class for Redis Cluster management commands
348 The class inherits from Redis's core ManagementCommands class and do the
349 required adjustments to work with cluster mode
350 """
352 def slaveof(self, *args, **kwargs) -> NoReturn:
353 """
354 Make the server a replica of another instance, or promote it as master.
356 For more information see https://redis.io/commands/slaveof
357 """
358 raise RedisClusterException("SLAVEOF is not supported in cluster mode")
360 def replicaof(self, *args, **kwargs) -> NoReturn:
361 """
362 Make the server a replica of another instance, or promote it as master.
364 For more information see https://redis.io/commands/replicaof
365 """
366 raise RedisClusterException("REPLICAOF is not supported in cluster mode")
368 def swapdb(self, *args, **kwargs) -> NoReturn:
369 """
370 Swaps two Redis databases.
372 For more information see https://redis.io/commands/swapdb
373 """
374 raise RedisClusterException("SWAPDB is not supported in cluster mode")
376 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT:
377 """
378 Returns the node's id.
380 :target_node: 'ClusterNode'
381 The node to execute the command on
383 For more information check https://redis.io/commands/cluster-myid/
384 """
385 return self.execute_command("CLUSTER MYID", target_nodes=target_node)
387 def cluster_addslots(
388 self, target_node: "TargetNodesT", *slots: EncodableT
389 ) -> ResponseT:
390 """
391 Assign new hash slots to receiving node. Sends to specified node.
393 :target_node: 'ClusterNode'
394 The node to execute the command on
396 For more information see https://redis.io/commands/cluster-addslots
397 """
398 return self.execute_command(
399 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node
400 )
402 def cluster_addslotsrange(
403 self, target_node: "TargetNodesT", *slots: EncodableT
404 ) -> ResponseT:
405 """
406 Similar to the CLUSTER ADDSLOTS command.
407 The difference between the two commands is that ADDSLOTS takes a list of slots
408 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges
409 (specified by start and end slots) to assign to the node.
411 :target_node: 'ClusterNode'
412 The node to execute the command on
414 For more information see https://redis.io/commands/cluster-addslotsrange
415 """
416 return self.execute_command(
417 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node
418 )
420 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT:
421 """
422 Return the number of local keys in the specified hash slot
423 Send to node based on specified slot_id
425 For more information see https://redis.io/commands/cluster-countkeysinslot
426 """
427 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id)
429 def cluster_count_failure_report(self, node_id: str) -> ResponseT:
430 """
431 Return the number of failure reports active for a given node
432 Sends to a random node
434 For more information see https://redis.io/commands/cluster-count-failure-reports
435 """
436 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id)
438 def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
439 """
440 Set hash slots as unbound in the cluster.
441 It determines by it self what node the slot is in and sends it there
443 Returns a list of the results for each processed slot.
445 For more information see https://redis.io/commands/cluster-delslots
446 """
447 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots]
449 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT:
450 """
451 Similar to the CLUSTER DELSLOTS command.
452 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove
453 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove
454 from the node.
456 For more information see https://redis.io/commands/cluster-delslotsrange
457 """
458 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots)
460 def cluster_failover(
461 self, target_node: "TargetNodesT", option: Optional[str] = None
462 ) -> ResponseT:
463 """
464 Forces a slave to perform a manual failover of its master
465 Sends to specified node
467 :target_node: 'ClusterNode'
468 The node to execute the command on
470 For more information see https://redis.io/commands/cluster-failover
471 """
472 if option:
473 if option.upper() not in ["FORCE", "TAKEOVER"]:
474 raise RedisError(
475 f"Invalid option for CLUSTER FAILOVER command: {option}"
476 )
477 else:
478 return self.execute_command(
479 "CLUSTER FAILOVER", option, target_nodes=target_node
480 )
481 else:
482 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node)
484 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
485 """
486 Provides info about Redis Cluster node state.
487 The command will be sent to a random node in the cluster if no target
488 node is specified.
490 For more information see https://redis.io/commands/cluster-info
491 """
492 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes)
494 def cluster_keyslot(self, key: str) -> ResponseT:
495 """
496 Returns the hash slot of the specified key
497 Sends to random node in the cluster
499 For more information see https://redis.io/commands/cluster-keyslot
500 """
501 return self.execute_command("CLUSTER KEYSLOT", key)
503 def cluster_meet(
504 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None
505 ) -> ResponseT:
506 """
507 Force a node cluster to handshake with another node.
508 Sends to specified node.
510 For more information see https://redis.io/commands/cluster-meet
511 """
512 return self.execute_command(
513 "CLUSTER MEET", host, port, target_nodes=target_nodes
514 )
516 def cluster_nodes(self) -> ResponseT:
517 """
518 Get Cluster config for the node.
519 Sends to random node in the cluster
521 For more information see https://redis.io/commands/cluster-nodes
522 """
523 return self.execute_command("CLUSTER NODES")
525 def cluster_replicate(
526 self, target_nodes: "TargetNodesT", node_id: str
527 ) -> ResponseT:
528 """
529 Reconfigure a node as a slave of the specified master node
531 For more information see https://redis.io/commands/cluster-replicate
532 """
533 return self.execute_command(
534 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes
535 )
537 def cluster_reset(
538 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None
539 ) -> ResponseT:
540 """
541 Reset a Redis Cluster node
543 If 'soft' is True then it will send 'SOFT' argument
544 If 'soft' is False then it will send 'HARD' argument
546 For more information see https://redis.io/commands/cluster-reset
547 """
548 return self.execute_command(
549 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes
550 )
552 def cluster_save_config(
553 self, target_nodes: Optional["TargetNodesT"] = None
554 ) -> ResponseT:
555 """
556 Forces the node to save cluster state on disk
558 For more information see https://redis.io/commands/cluster-saveconfig
559 """
560 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes)
562 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT:
563 """
564 Returns the number of keys in the specified cluster slot
566 For more information see https://redis.io/commands/cluster-getkeysinslot
567 """
568 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys)
570 def cluster_set_config_epoch(
571 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None
572 ) -> ResponseT:
573 """
574 Set the configuration epoch in a new node
576 For more information see https://redis.io/commands/cluster-set-config-epoch
577 """
578 return self.execute_command(
579 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes
580 )
582 def cluster_setslot(
583 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str
584 ) -> ResponseT:
585 """
586 Bind an hash slot to a specific node
588 :target_node: 'ClusterNode'
589 The node to execute the command on
591 For more information see https://redis.io/commands/cluster-setslot
592 """
593 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"):
594 return self.execute_command(
595 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node
596 )
597 elif state.upper() == "STABLE":
598 raise RedisError('For "stable" state please use ' "cluster_setslot_stable")
599 else:
600 raise RedisError(f"Invalid slot state: {state}")
602 def cluster_setslot_stable(self, slot_id: int) -> ResponseT:
603 """
604 Clears migrating / importing state from the slot.
605 It determines by it self what node the slot is in and sends it there.
607 For more information see https://redis.io/commands/cluster-setslot
608 """
609 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE")
611 def cluster_replicas(
612 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None
613 ) -> ResponseT:
614 """
615 Provides a list of replica nodes replicating from the specified primary
616 target node.
618 For more information see https://redis.io/commands/cluster-replicas
619 """
620 return self.execute_command(
621 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes
622 )
624 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
625 """
626 Get array of Cluster slot to node mappings
628 For more information see https://redis.io/commands/cluster-slots
629 """
630 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes)
632 def cluster_shards(self, target_nodes=None):
633 """
634 Returns details about the shards of the cluster.
636 For more information see https://redis.io/commands/cluster-shards
637 """
638 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes)
640 def cluster_myshardid(self, target_nodes=None):
641 """
642 Returns the shard ID of the node.
644 For more information see https://redis.io/commands/cluster-myshardid/
645 """
646 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes)
648 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT:
649 """
650 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each
651 peer in the cluster: One for sending outbound messages towards the peer and one
652 for receiving inbound messages from the peer.
654 This command outputs information of all such peer links as an array.
656 For more information see https://redis.io/commands/cluster-links
657 """
658 return self.execute_command("CLUSTER LINKS", target_nodes=target_node)
660 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
661 raise NotImplementedError(
662 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client."
663 )
665 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
666 raise NotImplementedError(
667 "CLUSTER BUMPEPOCH is intentionally not implemented in the client."
668 )
670 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
671 """
672 Enables read queries.
673 The command will be sent to the default cluster node if target_nodes is
674 not specified.
676 For more information see https://redis.io/commands/readonly
677 """
678 if target_nodes == "replicas" or target_nodes == "all":
679 # read_from_replicas will only be enabled if the READONLY command
680 # is sent to all replicas
681 self.read_from_replicas = True
682 return self.execute_command("READONLY", target_nodes=target_nodes)
684 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
685 """
686 Disables read queries.
687 The command will be sent to the default cluster node if target_nodes is
688 not specified.
690 For more information see https://redis.io/commands/readwrite
691 """
692 # Reset read from replicas flag
693 self.read_from_replicas = False
694 return self.execute_command("READWRITE", target_nodes=target_nodes)
696 def gears_refresh_cluster(self, **kwargs) -> ResponseT:
697 """
698 On an OSS cluster, before executing any gears function, you must call this command. # noqa
699 """
700 return self.execute_command("REDISGEARS_2.REFRESHCLUSTER", **kwargs)
703class AsyncClusterManagementCommands(
704 ClusterManagementCommands, AsyncManagementCommands
705):
706 """
707 A class for Redis Cluster management commands
709 The class inherits from Redis's core ManagementCommands class and do the
710 required adjustments to work with cluster mode
711 """
713 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
714 """
715 Set hash slots as unbound in the cluster.
716 It determines by it self what node the slot is in and sends it there
718 Returns a list of the results for each processed slot.
720 For more information see https://redis.io/commands/cluster-delslots
721 """
722 return await asyncio.gather(
723 *(
724 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot))
725 for slot in slots
726 )
727 )
730class ClusterDataAccessCommands(DataAccessCommands):
731 """
732 A class for Redis Cluster Data Access Commands
734 The class inherits from Redis's core DataAccessCommand class and do the
735 required adjustments to work with cluster mode
736 """
738 def stralgo(
739 self,
740 algo: Literal["LCS"],
741 value1: KeyT,
742 value2: KeyT,
743 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings",
744 len: bool = False,
745 idx: bool = False,
746 minmatchlen: Optional[int] = None,
747 withmatchlen: bool = False,
748 **kwargs,
749 ) -> ResponseT:
750 """
751 Implements complex algorithms that operate on strings.
752 Right now the only algorithm implemented is the LCS algorithm
753 (longest common substring). However new algorithms could be
754 implemented in the future.
756 ``algo`` Right now must be LCS
757 ``value1`` and ``value2`` Can be two strings or two keys
758 ``specific_argument`` Specifying if the arguments to the algorithm
759 will be keys or strings. strings is the default.
760 ``len`` Returns just the len of the match.
761 ``idx`` Returns the match positions in each string.
762 ``minmatchlen`` Restrict the list of matches to the ones of a given
763 minimal length. Can be provided only when ``idx`` set to True.
764 ``withmatchlen`` Returns the matches with the len of the match.
765 Can be provided only when ``idx`` set to True.
767 For more information see https://redis.io/commands/stralgo
768 """
769 target_nodes = kwargs.pop("target_nodes", None)
770 if specific_argument == "strings" and target_nodes is None:
771 target_nodes = "default-node"
772 kwargs.update({"target_nodes": target_nodes})
773 return super().stralgo(
774 algo,
775 value1,
776 value2,
777 specific_argument,
778 len,
779 idx,
780 minmatchlen,
781 withmatchlen,
782 **kwargs,
783 )
785 def scan_iter(
786 self,
787 match: Optional[PatternT] = None,
788 count: Optional[int] = None,
789 _type: Optional[str] = None,
790 **kwargs,
791 ) -> Iterator:
792 # Do the first query with cursor=0 for all nodes
793 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
794 yield from data
796 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
797 if cursors:
798 # Get nodes by name
799 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
801 # Iterate over each node till its cursor is 0
802 kwargs.pop("target_nodes", None)
803 while cursors:
804 for name, cursor in cursors.items():
805 cur, data = self.scan(
806 cursor=cursor,
807 match=match,
808 count=count,
809 _type=_type,
810 target_nodes=nodes[name],
811 **kwargs,
812 )
813 yield from data
814 cursors[name] = cur[name]
816 cursors = {
817 name: cursor for name, cursor in cursors.items() if cursor != 0
818 }
821class AsyncClusterDataAccessCommands(
822 ClusterDataAccessCommands, AsyncDataAccessCommands
823):
824 """
825 A class for Redis Cluster Data Access Commands
827 The class inherits from Redis's core DataAccessCommand class and do the
828 required adjustments to work with cluster mode
829 """
831 async def scan_iter(
832 self,
833 match: Optional[PatternT] = None,
834 count: Optional[int] = None,
835 _type: Optional[str] = None,
836 **kwargs,
837 ) -> AsyncIterator:
838 # Do the first query with cursor=0 for all nodes
839 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
840 for value in data:
841 yield value
843 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
844 if cursors:
845 # Get nodes by name
846 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
848 # Iterate over each node till its cursor is 0
849 kwargs.pop("target_nodes", None)
850 while cursors:
851 for name, cursor in cursors.items():
852 cur, data = await self.scan(
853 cursor=cursor,
854 match=match,
855 count=count,
856 _type=_type,
857 target_nodes=nodes[name],
858 **kwargs,
859 )
860 for value in data:
861 yield value
862 cursors[name] = cur[name]
864 cursors = {
865 name: cursor for name, cursor in cursors.items() if cursor != 0
866 }
869class RedisClusterCommands(
870 ClusterMultiKeyCommands,
871 ClusterManagementCommands,
872 ACLCommands,
873 PubSubCommands,
874 ClusterDataAccessCommands,
875 ScriptCommands,
876 FunctionCommands,
877 GearsCommands,
878 ModuleCommands,
879 RedisModuleCommands,
880):
881 """
882 A class for all Redis Cluster commands
884 For key-based commands, the target node(s) will be internally determined
885 by the keys' hash slot.
886 Non-key-based commands can be executed with the 'target_nodes' argument to
887 target specific nodes. By default, if target_nodes is not specified, the
888 command will be executed on the default cluster node.
890 :param :target_nodes: type can be one of the followings:
891 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
892 - 'ClusterNode'
893 - 'list(ClusterNodes)'
894 - 'dict(any:clusterNodes)'
896 for example:
897 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
898 """
901class AsyncRedisClusterCommands(
902 AsyncClusterMultiKeyCommands,
903 AsyncClusterManagementCommands,
904 AsyncACLCommands,
905 AsyncClusterDataAccessCommands,
906 AsyncScriptCommands,
907 AsyncFunctionCommands,
908 AsyncGearsCommands,
909 AsyncModuleCommands,
910 AsyncRedisModuleCommands,
911):
912 """
913 A class for all Redis Cluster commands
915 For key-based commands, the target node(s) will be internally determined
916 by the keys' hash slot.
917 Non-key-based commands can be executed with the 'target_nodes' argument to
918 target specific nodes. By default, if target_nodes is not specified, the
919 command will be executed on the default cluster node.
921 :param :target_nodes: type can be one of the followings:
922 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
923 - 'ClusterNode'
924 - 'list(ClusterNodes)'
925 - 'dict(any:clusterNodes)'
927 for example:
928 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
929 """