Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/commands/cluster.py: 37%
186 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:16 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:16 +0000
1import asyncio
2from typing import (
3 TYPE_CHECKING,
4 Any,
5 AsyncIterator,
6 Dict,
7 Iterable,
8 Iterator,
9 List,
10 Mapping,
11 NoReturn,
12 Optional,
13 Union,
14)
16from redis.compat import Literal
17from redis.crc import key_slot
18from redis.exceptions import RedisClusterException, RedisError
19from redis.typing import (
20 AnyKeyT,
21 ClusterCommandsProtocol,
22 EncodableT,
23 KeysT,
24 KeyT,
25 PatternT,
26)
28from .core import (
29 ACLCommands,
30 AsyncACLCommands,
31 AsyncDataAccessCommands,
32 AsyncFunctionCommands,
33 AsyncManagementCommands,
34 AsyncScriptCommands,
35 DataAccessCommands,
36 FunctionCommands,
37 ManagementCommands,
38 PubSubCommands,
39 ResponseT,
40 ScriptCommands,
41)
42from .helpers import list_or_args
43from .redismodules import RedisModuleCommands
45if TYPE_CHECKING:
46 from redis.asyncio.cluster import TargetNodesT
48# Not complete, but covers the major ones
49# https://redis.io/commands
50READ_COMMANDS = frozenset(
51 [
52 "BITCOUNT",
53 "BITPOS",
54 "EVAL_RO",
55 "EVALSHA_RO",
56 "EXISTS",
57 "GEODIST",
58 "GEOHASH",
59 "GEOPOS",
60 "GEORADIUS",
61 "GEORADIUSBYMEMBER",
62 "GET",
63 "GETBIT",
64 "GETRANGE",
65 "HEXISTS",
66 "HGET",
67 "HGETALL",
68 "HKEYS",
69 "HLEN",
70 "HMGET",
71 "HSTRLEN",
72 "HVALS",
73 "KEYS",
74 "LINDEX",
75 "LLEN",
76 "LRANGE",
77 "MGET",
78 "PTTL",
79 "RANDOMKEY",
80 "SCARD",
81 "SDIFF",
82 "SINTER",
83 "SISMEMBER",
84 "SMEMBERS",
85 "SRANDMEMBER",
86 "STRLEN",
87 "SUNION",
88 "TTL",
89 "ZCARD",
90 "ZCOUNT",
91 "ZRANGE",
92 "ZSCORE",
93 ]
94)
97class ClusterMultiKeyCommands(ClusterCommandsProtocol):
98 """
99 A class containing commands that handle more than one key
100 """
102 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]:
103 """Split keys into a dictionary that maps a slot to a list of keys."""
105 slots_to_keys = {}
106 for key in keys:
107 slot = key_slot(self.encoder.encode(key))
108 slots_to_keys.setdefault(slot, []).append(key)
110 return slots_to_keys
112 def _partition_pairs_by_slot(
113 self, mapping: Mapping[AnyKeyT, EncodableT]
114 ) -> Dict[int, List[EncodableT]]:
115 """Split pairs into a dictionary that maps a slot to a list of pairs."""
117 slots_to_pairs = {}
118 for pair in mapping.items():
119 slot = key_slot(self.encoder.encode(pair[0]))
120 slots_to_pairs.setdefault(slot, []).extend(pair)
122 return slots_to_pairs
124 def _execute_pipeline_by_slot(
125 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
126 ) -> List[Any]:
127 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
128 pipe = self.pipeline()
129 [
130 pipe.execute_command(
131 command,
132 *slot_args,
133 target_nodes=[
134 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
135 ],
136 )
137 for slot, slot_args in slots_to_args.items()
138 ]
139 return pipe.execute()
141 def _reorder_keys_by_command(
142 self,
143 keys: Iterable[KeyT],
144 slots_to_args: Mapping[int, Iterable[EncodableT]],
145 responses: Iterable[Any],
146 ) -> List[Any]:
147 results = {
148 k: v
149 for slot_values, response in zip(slots_to_args.values(), responses)
150 for k, v in zip(slot_values, response)
151 }
152 return [results[key] for key in keys]
154 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
155 """
156 Splits the keys into different slots and then calls MGET
157 for the keys of every slot. This operation will not be atomic
158 if keys belong to more than one slot.
160 Returns a list of values ordered identically to ``keys``
162 For more information see https://redis.io/commands/mget
163 """
165 # Concatenate all keys into a list
166 keys = list_or_args(keys, args)
168 # Split keys into slots
169 slots_to_keys = self._partition_keys_by_slot(keys)
171 # Execute commands using a pipeline
172 res = self._execute_pipeline_by_slot("MGET", slots_to_keys)
174 # Reorder keys in the order the user provided & return
175 return self._reorder_keys_by_command(keys, slots_to_keys, res)
177 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
178 """
179 Sets key/values based on a mapping. Mapping is a dictionary of
180 key/value pairs. Both keys and values should be strings or types that
181 can be cast to a string via str().
183 Splits the keys into different slots and then calls MSET
184 for the keys of every slot. This operation will not be atomic
185 if keys belong to more than one slot.
187 For more information see https://redis.io/commands/mset
188 """
190 # Partition the keys by slot
191 slots_to_pairs = self._partition_pairs_by_slot(mapping)
193 # Execute commands using a pipeline & return list of replies
194 return self._execute_pipeline_by_slot("MSET", slots_to_pairs)
196 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
197 """
198 Runs the given command once for the keys
199 of each slot. Returns the sum of the return values.
200 """
202 # Partition the keys by slot
203 slots_to_keys = self._partition_keys_by_slot(keys)
205 # Sum up the reply from each command
206 return sum(self._execute_pipeline_by_slot(command, slots_to_keys))
208 def exists(self, *keys: KeyT) -> ResponseT:
209 """
210 Returns the number of ``names`` that exist in the
211 whole cluster. The keys are first split up into slots
212 and then an EXISTS command is sent for every slot
214 For more information see https://redis.io/commands/exists
215 """
216 return self._split_command_across_slots("EXISTS", *keys)
218 def delete(self, *keys: KeyT) -> ResponseT:
219 """
220 Deletes the given keys in the cluster.
221 The keys are first split up into slots
222 and then an DEL command is sent for every slot
224 Non-existant keys are ignored.
225 Returns the number of keys that were deleted.
227 For more information see https://redis.io/commands/del
228 """
229 return self._split_command_across_slots("DEL", *keys)
231 def touch(self, *keys: KeyT) -> ResponseT:
232 """
233 Updates the last access time of given keys across the
234 cluster.
236 The keys are first split up into slots
237 and then an TOUCH command is sent for every slot
239 Non-existant keys are ignored.
240 Returns the number of keys that were touched.
242 For more information see https://redis.io/commands/touch
243 """
244 return self._split_command_across_slots("TOUCH", *keys)
246 def unlink(self, *keys: KeyT) -> ResponseT:
247 """
248 Remove the specified keys in a different thread.
250 The keys are first split up into slots
251 and then an TOUCH command is sent for every slot
253 Non-existant keys are ignored.
254 Returns the number of keys that were unlinked.
256 For more information see https://redis.io/commands/unlink
257 """
258 return self._split_command_across_slots("UNLINK", *keys)
261class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
262 """
263 A class containing commands that handle more than one key
264 """
266 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
267 """
268 Splits the keys into different slots and then calls MGET
269 for the keys of every slot. This operation will not be atomic
270 if keys belong to more than one slot.
272 Returns a list of values ordered identically to ``keys``
274 For more information see https://redis.io/commands/mget
275 """
277 # Concatenate all keys into a list
278 keys = list_or_args(keys, args)
280 # Split keys into slots
281 slots_to_keys = self._partition_keys_by_slot(keys)
283 # Execute commands using a pipeline
284 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys)
286 # Reorder keys in the order the user provided & return
287 return self._reorder_keys_by_command(keys, slots_to_keys, res)
289 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
290 """
291 Sets key/values based on a mapping. Mapping is a dictionary of
292 key/value pairs. Both keys and values should be strings or types that
293 can be cast to a string via str().
295 Splits the keys into different slots and then calls MSET
296 for the keys of every slot. This operation will not be atomic
297 if keys belong to more than one slot.
299 For more information see https://redis.io/commands/mset
300 """
302 # Partition the keys by slot
303 slots_to_pairs = self._partition_pairs_by_slot(mapping)
305 # Execute commands using a pipeline & return list of replies
306 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs)
308 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
309 """
310 Runs the given command once for the keys
311 of each slot. Returns the sum of the return values.
312 """
314 # Partition the keys by slot
315 slots_to_keys = self._partition_keys_by_slot(keys)
317 # Sum up the reply from each command
318 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))
320 async def _execute_pipeline_by_slot(
321 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
322 ) -> List[Any]:
323 if self._initialize:
324 await self.initialize()
325 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
326 pipe = self.pipeline()
327 [
328 pipe.execute_command(
329 command,
330 *slot_args,
331 target_nodes=[
332 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
333 ],
334 )
335 for slot, slot_args in slots_to_args.items()
336 ]
337 return await pipe.execute()
340class ClusterManagementCommands(ManagementCommands):
341 """
342 A class for Redis Cluster management commands
344 The class inherits from Redis's core ManagementCommands class and do the
345 required adjustments to work with cluster mode
346 """
348 def slaveof(self, *args, **kwargs) -> NoReturn:
349 """
350 Make the server a replica of another instance, or promote it as master.
352 For more information see https://redis.io/commands/slaveof
353 """
354 raise RedisClusterException("SLAVEOF is not supported in cluster mode")
356 def replicaof(self, *args, **kwargs) -> NoReturn:
357 """
358 Make the server a replica of another instance, or promote it as master.
360 For more information see https://redis.io/commands/replicaof
361 """
362 raise RedisClusterException("REPLICAOF is not supported in cluster mode")
364 def swapdb(self, *args, **kwargs) -> NoReturn:
365 """
366 Swaps two Redis databases.
368 For more information see https://redis.io/commands/swapdb
369 """
370 raise RedisClusterException("SWAPDB is not supported in cluster mode")
372 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT:
373 """
374 Returns the node's id.
376 :target_node: 'ClusterNode'
377 The node to execute the command on
379 For more information check https://redis.io/commands/cluster-myid/
380 """
381 return self.execute_command("CLUSTER MYID", target_nodes=target_node)
383 def cluster_addslots(
384 self, target_node: "TargetNodesT", *slots: EncodableT
385 ) -> ResponseT:
386 """
387 Assign new hash slots to receiving node. Sends to specified node.
389 :target_node: 'ClusterNode'
390 The node to execute the command on
392 For more information see https://redis.io/commands/cluster-addslots
393 """
394 return self.execute_command(
395 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node
396 )
398 def cluster_addslotsrange(
399 self, target_node: "TargetNodesT", *slots: EncodableT
400 ) -> ResponseT:
401 """
402 Similar to the CLUSTER ADDSLOTS command.
403 The difference between the two commands is that ADDSLOTS takes a list of slots
404 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges
405 (specified by start and end slots) to assign to the node.
407 :target_node: 'ClusterNode'
408 The node to execute the command on
410 For more information see https://redis.io/commands/cluster-addslotsrange
411 """
412 return self.execute_command(
413 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node
414 )
416 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT:
417 """
418 Return the number of local keys in the specified hash slot
419 Send to node based on specified slot_id
421 For more information see https://redis.io/commands/cluster-countkeysinslot
422 """
423 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id)
425 def cluster_count_failure_report(self, node_id: str) -> ResponseT:
426 """
427 Return the number of failure reports active for a given node
428 Sends to a random node
430 For more information see https://redis.io/commands/cluster-count-failure-reports
431 """
432 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id)
434 def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
435 """
436 Set hash slots as unbound in the cluster.
437 It determines by it self what node the slot is in and sends it there
439 Returns a list of the results for each processed slot.
441 For more information see https://redis.io/commands/cluster-delslots
442 """
443 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots]
445 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT:
446 """
447 Similar to the CLUSTER DELSLOTS command.
448 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove
449 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove
450 from the node.
452 For more information see https://redis.io/commands/cluster-delslotsrange
453 """
454 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots)
456 def cluster_failover(
457 self, target_node: "TargetNodesT", option: Optional[str] = None
458 ) -> ResponseT:
459 """
460 Forces a slave to perform a manual failover of its master
461 Sends to specified node
463 :target_node: 'ClusterNode'
464 The node to execute the command on
466 For more information see https://redis.io/commands/cluster-failover
467 """
468 if option:
469 if option.upper() not in ["FORCE", "TAKEOVER"]:
470 raise RedisError(
471 f"Invalid option for CLUSTER FAILOVER command: {option}"
472 )
473 else:
474 return self.execute_command(
475 "CLUSTER FAILOVER", option, target_nodes=target_node
476 )
477 else:
478 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node)
480 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
481 """
482 Provides info about Redis Cluster node state.
483 The command will be sent to a random node in the cluster if no target
484 node is specified.
486 For more information see https://redis.io/commands/cluster-info
487 """
488 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes)
490 def cluster_keyslot(self, key: str) -> ResponseT:
491 """
492 Returns the hash slot of the specified key
493 Sends to random node in the cluster
495 For more information see https://redis.io/commands/cluster-keyslot
496 """
497 return self.execute_command("CLUSTER KEYSLOT", key)
499 def cluster_meet(
500 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None
501 ) -> ResponseT:
502 """
503 Force a node cluster to handshake with another node.
504 Sends to specified node.
506 For more information see https://redis.io/commands/cluster-meet
507 """
508 return self.execute_command(
509 "CLUSTER MEET", host, port, target_nodes=target_nodes
510 )
512 def cluster_nodes(self) -> ResponseT:
513 """
514 Get Cluster config for the node.
515 Sends to random node in the cluster
517 For more information see https://redis.io/commands/cluster-nodes
518 """
519 return self.execute_command("CLUSTER NODES")
521 def cluster_replicate(
522 self, target_nodes: "TargetNodesT", node_id: str
523 ) -> ResponseT:
524 """
525 Reconfigure a node as a slave of the specified master node
527 For more information see https://redis.io/commands/cluster-replicate
528 """
529 return self.execute_command(
530 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes
531 )
533 def cluster_reset(
534 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None
535 ) -> ResponseT:
536 """
537 Reset a Redis Cluster node
539 If 'soft' is True then it will send 'SOFT' argument
540 If 'soft' is False then it will send 'HARD' argument
542 For more information see https://redis.io/commands/cluster-reset
543 """
544 return self.execute_command(
545 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes
546 )
548 def cluster_save_config(
549 self, target_nodes: Optional["TargetNodesT"] = None
550 ) -> ResponseT:
551 """
552 Forces the node to save cluster state on disk
554 For more information see https://redis.io/commands/cluster-saveconfig
555 """
556 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes)
558 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT:
559 """
560 Returns the number of keys in the specified cluster slot
562 For more information see https://redis.io/commands/cluster-getkeysinslot
563 """
564 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys)
566 def cluster_set_config_epoch(
567 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None
568 ) -> ResponseT:
569 """
570 Set the configuration epoch in a new node
572 For more information see https://redis.io/commands/cluster-set-config-epoch
573 """
574 return self.execute_command(
575 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes
576 )
578 def cluster_setslot(
579 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str
580 ) -> ResponseT:
581 """
582 Bind an hash slot to a specific node
584 :target_node: 'ClusterNode'
585 The node to execute the command on
587 For more information see https://redis.io/commands/cluster-setslot
588 """
589 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"):
590 return self.execute_command(
591 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node
592 )
593 elif state.upper() == "STABLE":
594 raise RedisError('For "stable" state please use ' "cluster_setslot_stable")
595 else:
596 raise RedisError(f"Invalid slot state: {state}")
598 def cluster_setslot_stable(self, slot_id: int) -> ResponseT:
599 """
600 Clears migrating / importing state from the slot.
601 It determines by it self what node the slot is in and sends it there.
603 For more information see https://redis.io/commands/cluster-setslot
604 """
605 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE")
607 def cluster_replicas(
608 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None
609 ) -> ResponseT:
610 """
611 Provides a list of replica nodes replicating from the specified primary
612 target node.
614 For more information see https://redis.io/commands/cluster-replicas
615 """
616 return self.execute_command(
617 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes
618 )
620 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
621 """
622 Get array of Cluster slot to node mappings
624 For more information see https://redis.io/commands/cluster-slots
625 """
626 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes)
628 def cluster_shards(self, target_nodes=None):
629 """
630 Returns details about the shards of the cluster.
632 For more information see https://redis.io/commands/cluster-shards
633 """
634 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes)
636 def cluster_myshardid(self, target_nodes=None):
637 """
638 Returns the shard ID of the node.
640 For more information see https://redis.io/commands/cluster-myshardid/
641 """
642 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes)
644 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT:
645 """
646 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each
647 peer in the cluster: One for sending outbound messages towards the peer and one
648 for receiving inbound messages from the peer.
650 This command outputs information of all such peer links as an array.
652 For more information see https://redis.io/commands/cluster-links
653 """
654 return self.execute_command("CLUSTER LINKS", target_nodes=target_node)
656 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
657 raise NotImplementedError(
658 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client."
659 )
661 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
662 raise NotImplementedError(
663 "CLUSTER BUMPEPOCH is intentionally not implemented in the client."
664 )
666 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
667 """
668 Enables read queries.
669 The command will be sent to the default cluster node if target_nodes is
670 not specified.
672 For more information see https://redis.io/commands/readonly
673 """
674 if target_nodes == "replicas" or target_nodes == "all":
675 # read_from_replicas will only be enabled if the READONLY command
676 # is sent to all replicas
677 self.read_from_replicas = True
678 return self.execute_command("READONLY", target_nodes=target_nodes)
680 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
681 """
682 Disables read queries.
683 The command will be sent to the default cluster node if target_nodes is
684 not specified.
686 For more information see https://redis.io/commands/readwrite
687 """
688 # Reset read from replicas flag
689 self.read_from_replicas = False
690 return self.execute_command("READWRITE", target_nodes=target_nodes)
693class AsyncClusterManagementCommands(
694 ClusterManagementCommands, AsyncManagementCommands
695):
696 """
697 A class for Redis Cluster management commands
699 The class inherits from Redis's core ManagementCommands class and do the
700 required adjustments to work with cluster mode
701 """
703 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
704 """
705 Set hash slots as unbound in the cluster.
706 It determines by it self what node the slot is in and sends it there
708 Returns a list of the results for each processed slot.
710 For more information see https://redis.io/commands/cluster-delslots
711 """
712 return await asyncio.gather(
713 *(
714 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot))
715 for slot in slots
716 )
717 )
720class ClusterDataAccessCommands(DataAccessCommands):
721 """
722 A class for Redis Cluster Data Access Commands
724 The class inherits from Redis's core DataAccessCommand class and do the
725 required adjustments to work with cluster mode
726 """
728 def stralgo(
729 self,
730 algo: Literal["LCS"],
731 value1: KeyT,
732 value2: KeyT,
733 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings",
734 len: bool = False,
735 idx: bool = False,
736 minmatchlen: Optional[int] = None,
737 withmatchlen: bool = False,
738 **kwargs,
739 ) -> ResponseT:
740 """
741 Implements complex algorithms that operate on strings.
742 Right now the only algorithm implemented is the LCS algorithm
743 (longest common substring). However new algorithms could be
744 implemented in the future.
746 ``algo`` Right now must be LCS
747 ``value1`` and ``value2`` Can be two strings or two keys
748 ``specific_argument`` Specifying if the arguments to the algorithm
749 will be keys or strings. strings is the default.
750 ``len`` Returns just the len of the match.
751 ``idx`` Returns the match positions in each string.
752 ``minmatchlen`` Restrict the list of matches to the ones of a given
753 minimal length. Can be provided only when ``idx`` set to True.
754 ``withmatchlen`` Returns the matches with the len of the match.
755 Can be provided only when ``idx`` set to True.
757 For more information see https://redis.io/commands/stralgo
758 """
759 target_nodes = kwargs.pop("target_nodes", None)
760 if specific_argument == "strings" and target_nodes is None:
761 target_nodes = "default-node"
762 kwargs.update({"target_nodes": target_nodes})
763 return super().stralgo(
764 algo,
765 value1,
766 value2,
767 specific_argument,
768 len,
769 idx,
770 minmatchlen,
771 withmatchlen,
772 **kwargs,
773 )
775 def scan_iter(
776 self,
777 match: Optional[PatternT] = None,
778 count: Optional[int] = None,
779 _type: Optional[str] = None,
780 **kwargs,
781 ) -> Iterator:
782 # Do the first query with cursor=0 for all nodes
783 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
784 yield from data
786 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
787 if cursors:
788 # Get nodes by name
789 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
791 # Iterate over each node till its cursor is 0
792 kwargs.pop("target_nodes", None)
793 while cursors:
794 for name, cursor in cursors.items():
795 cur, data = self.scan(
796 cursor=cursor,
797 match=match,
798 count=count,
799 _type=_type,
800 target_nodes=nodes[name],
801 **kwargs,
802 )
803 yield from data
804 cursors[name] = cur[name]
806 cursors = {
807 name: cursor for name, cursor in cursors.items() if cursor != 0
808 }
811class AsyncClusterDataAccessCommands(
812 ClusterDataAccessCommands, AsyncDataAccessCommands
813):
814 """
815 A class for Redis Cluster Data Access Commands
817 The class inherits from Redis's core DataAccessCommand class and do the
818 required adjustments to work with cluster mode
819 """
821 async def scan_iter(
822 self,
823 match: Optional[PatternT] = None,
824 count: Optional[int] = None,
825 _type: Optional[str] = None,
826 **kwargs,
827 ) -> AsyncIterator:
828 # Do the first query with cursor=0 for all nodes
829 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
830 for value in data:
831 yield value
833 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
834 if cursors:
835 # Get nodes by name
836 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
838 # Iterate over each node till its cursor is 0
839 kwargs.pop("target_nodes", None)
840 while cursors:
841 for name, cursor in cursors.items():
842 cur, data = await self.scan(
843 cursor=cursor,
844 match=match,
845 count=count,
846 _type=_type,
847 target_nodes=nodes[name],
848 **kwargs,
849 )
850 for value in data:
851 yield value
852 cursors[name] = cur[name]
854 cursors = {
855 name: cursor for name, cursor in cursors.items() if cursor != 0
856 }
859class RedisClusterCommands(
860 ClusterMultiKeyCommands,
861 ClusterManagementCommands,
862 ACLCommands,
863 PubSubCommands,
864 ClusterDataAccessCommands,
865 ScriptCommands,
866 FunctionCommands,
867 RedisModuleCommands,
868):
869 """
870 A class for all Redis Cluster commands
872 For key-based commands, the target node(s) will be internally determined
873 by the keys' hash slot.
874 Non-key-based commands can be executed with the 'target_nodes' argument to
875 target specific nodes. By default, if target_nodes is not specified, the
876 command will be executed on the default cluster node.
878 :param :target_nodes: type can be one of the followings:
879 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
880 - 'ClusterNode'
881 - 'list(ClusterNodes)'
882 - 'dict(any:clusterNodes)'
884 for example:
885 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
886 """
889class AsyncRedisClusterCommands(
890 AsyncClusterMultiKeyCommands,
891 AsyncClusterManagementCommands,
892 AsyncACLCommands,
893 AsyncClusterDataAccessCommands,
894 AsyncScriptCommands,
895 AsyncFunctionCommands,
896):
897 """
898 A class for all Redis Cluster commands
900 For key-based commands, the target node(s) will be internally determined
901 by the keys' hash slot.
902 Non-key-based commands can be executed with the 'target_nodes' argument to
903 target specific nodes. By default, if target_nodes is not specified, the
904 command will be executed on the default cluster node.
906 :param :target_nodes: type can be one of the followings:
907 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
908 - 'ClusterNode'
909 - 'list(ClusterNodes)'
910 - 'dict(any:clusterNodes)'
912 for example:
913 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
914 """