Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/commands/cluster.py: 37%
184 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:09 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:09 +0000
1import asyncio
2from typing import (
3 TYPE_CHECKING,
4 Any,
5 AsyncIterator,
6 Dict,
7 Iterable,
8 Iterator,
9 List,
10 Mapping,
11 NoReturn,
12 Optional,
13 Union,
14)
16from redis.compat import Literal
17from redis.crc import key_slot
18from redis.exceptions import RedisClusterException, RedisError
19from redis.typing import (
20 AnyKeyT,
21 ClusterCommandsProtocol,
22 EncodableT,
23 KeysT,
24 KeyT,
25 PatternT,
26)
28from .core import (
29 ACLCommands,
30 AsyncACLCommands,
31 AsyncDataAccessCommands,
32 AsyncFunctionCommands,
33 AsyncManagementCommands,
34 AsyncScriptCommands,
35 DataAccessCommands,
36 FunctionCommands,
37 ManagementCommands,
38 PubSubCommands,
39 ResponseT,
40 ScriptCommands,
41)
42from .helpers import list_or_args
43from .redismodules import RedisModuleCommands
45if TYPE_CHECKING:
46 from redis.asyncio.cluster import TargetNodesT
49# Not complete, but covers the major ones
50# https://redis.io/commands
51READ_COMMANDS = frozenset(
52 [
53 "BITCOUNT",
54 "BITPOS",
55 "EVAL_RO",
56 "EVALSHA_RO",
57 "EXISTS",
58 "GEODIST",
59 "GEOHASH",
60 "GEOPOS",
61 "GEORADIUS",
62 "GEORADIUSBYMEMBER",
63 "GET",
64 "GETBIT",
65 "GETRANGE",
66 "HEXISTS",
67 "HGET",
68 "HGETALL",
69 "HKEYS",
70 "HLEN",
71 "HMGET",
72 "HSTRLEN",
73 "HVALS",
74 "KEYS",
75 "LINDEX",
76 "LLEN",
77 "LRANGE",
78 "MGET",
79 "PTTL",
80 "RANDOMKEY",
81 "SCARD",
82 "SDIFF",
83 "SINTER",
84 "SISMEMBER",
85 "SMEMBERS",
86 "SRANDMEMBER",
87 "STRLEN",
88 "SUNION",
89 "TTL",
90 "ZCARD",
91 "ZCOUNT",
92 "ZRANGE",
93 "ZSCORE",
94 ]
95)
98class ClusterMultiKeyCommands(ClusterCommandsProtocol):
99 """
100 A class containing commands that handle more than one key
101 """
103 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]:
104 """Split keys into a dictionary that maps a slot to a list of keys."""
106 slots_to_keys = {}
107 for key in keys:
108 slot = key_slot(self.encoder.encode(key))
109 slots_to_keys.setdefault(slot, []).append(key)
111 return slots_to_keys
113 def _partition_pairs_by_slot(
114 self, mapping: Mapping[AnyKeyT, EncodableT]
115 ) -> Dict[int, List[EncodableT]]:
116 """Split pairs into a dictionary that maps a slot to a list of pairs."""
118 slots_to_pairs = {}
119 for pair in mapping.items():
120 slot = key_slot(self.encoder.encode(pair[0]))
121 slots_to_pairs.setdefault(slot, []).extend(pair)
123 return slots_to_pairs
125 def _execute_pipeline_by_slot(
126 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
127 ) -> List[Any]:
128 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
129 pipe = self.pipeline()
130 [
131 pipe.execute_command(
132 command,
133 *slot_args,
134 target_nodes=[
135 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
136 ],
137 )
138 for slot, slot_args in slots_to_args.items()
139 ]
140 return pipe.execute()
142 def _reorder_keys_by_command(
143 self,
144 keys: Iterable[KeyT],
145 slots_to_args: Mapping[int, Iterable[EncodableT]],
146 responses: Iterable[Any],
147 ) -> List[Any]:
148 results = {
149 k: v
150 for slot_values, response in zip(slots_to_args.values(), responses)
151 for k, v in zip(slot_values, response)
152 }
153 return [results[key] for key in keys]
155 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
156 """
157 Splits the keys into different slots and then calls MGET
158 for the keys of every slot. This operation will not be atomic
159 if keys belong to more than one slot.
161 Returns a list of values ordered identically to ``keys``
163 For more information see https://redis.io/commands/mget
164 """
166 # Concatenate all keys into a list
167 keys = list_or_args(keys, args)
169 # Split keys into slots
170 slots_to_keys = self._partition_keys_by_slot(keys)
172 # Execute commands using a pipeline
173 res = self._execute_pipeline_by_slot("MGET", slots_to_keys)
175 # Reorder keys in the order the user provided & return
176 return self._reorder_keys_by_command(keys, slots_to_keys, res)
178 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
179 """
180 Sets key/values based on a mapping. Mapping is a dictionary of
181 key/value pairs. Both keys and values should be strings or types that
182 can be cast to a string via str().
184 Splits the keys into different slots and then calls MSET
185 for the keys of every slot. This operation will not be atomic
186 if keys belong to more than one slot.
188 For more information see https://redis.io/commands/mset
189 """
191 # Partition the keys by slot
192 slots_to_pairs = self._partition_pairs_by_slot(mapping)
194 # Execute commands using a pipeline & return list of replies
195 return self._execute_pipeline_by_slot("MSET", slots_to_pairs)
197 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
198 """
199 Runs the given command once for the keys
200 of each slot. Returns the sum of the return values.
201 """
203 # Partition the keys by slot
204 slots_to_keys = self._partition_keys_by_slot(keys)
206 # Sum up the reply from each command
207 return sum(self._execute_pipeline_by_slot(command, slots_to_keys))
209 def exists(self, *keys: KeyT) -> ResponseT:
210 """
211 Returns the number of ``names`` that exist in the
212 whole cluster. The keys are first split up into slots
213 and then an EXISTS command is sent for every slot
215 For more information see https://redis.io/commands/exists
216 """
217 return self._split_command_across_slots("EXISTS", *keys)
219 def delete(self, *keys: KeyT) -> ResponseT:
220 """
221 Deletes the given keys in the cluster.
222 The keys are first split up into slots
223 and then an DEL command is sent for every slot
225 Non-existant keys are ignored.
226 Returns the number of keys that were deleted.
228 For more information see https://redis.io/commands/del
229 """
230 return self._split_command_across_slots("DEL", *keys)
232 def touch(self, *keys: KeyT) -> ResponseT:
233 """
234 Updates the last access time of given keys across the
235 cluster.
237 The keys are first split up into slots
238 and then an TOUCH command is sent for every slot
240 Non-existant keys are ignored.
241 Returns the number of keys that were touched.
243 For more information see https://redis.io/commands/touch
244 """
245 return self._split_command_across_slots("TOUCH", *keys)
247 def unlink(self, *keys: KeyT) -> ResponseT:
248 """
249 Remove the specified keys in a different thread.
251 The keys are first split up into slots
252 and then an TOUCH command is sent for every slot
254 Non-existant keys are ignored.
255 Returns the number of keys that were unlinked.
257 For more information see https://redis.io/commands/unlink
258 """
259 return self._split_command_across_slots("UNLINK", *keys)
262class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
263 """
264 A class containing commands that handle more than one key
265 """
267 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
268 """
269 Splits the keys into different slots and then calls MGET
270 for the keys of every slot. This operation will not be atomic
271 if keys belong to more than one slot.
273 Returns a list of values ordered identically to ``keys``
275 For more information see https://redis.io/commands/mget
276 """
278 # Concatenate all keys into a list
279 keys = list_or_args(keys, args)
281 # Split keys into slots
282 slots_to_keys = self._partition_keys_by_slot(keys)
284 # Execute commands using a pipeline
285 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys)
287 # Reorder keys in the order the user provided & return
288 return self._reorder_keys_by_command(keys, slots_to_keys, res)
290 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
291 """
292 Sets key/values based on a mapping. Mapping is a dictionary of
293 key/value pairs. Both keys and values should be strings or types that
294 can be cast to a string via str().
296 Splits the keys into different slots and then calls MSET
297 for the keys of every slot. This operation will not be atomic
298 if keys belong to more than one slot.
300 For more information see https://redis.io/commands/mset
301 """
303 # Partition the keys by slot
304 slots_to_pairs = self._partition_pairs_by_slot(mapping)
306 # Execute commands using a pipeline & return list of replies
307 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs)
309 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
310 """
311 Runs the given command once for the keys
312 of each slot. Returns the sum of the return values.
313 """
315 # Partition the keys by slot
316 slots_to_keys = self._partition_keys_by_slot(keys)
318 # Sum up the reply from each command
319 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))
321 async def _execute_pipeline_by_slot(
322 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
323 ) -> List[Any]:
324 if self._initialize:
325 await self.initialize()
326 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
327 pipe = self.pipeline()
328 [
329 pipe.execute_command(
330 command,
331 *slot_args,
332 target_nodes=[
333 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
334 ],
335 )
336 for slot, slot_args in slots_to_args.items()
337 ]
338 return await pipe.execute()
341class ClusterManagementCommands(ManagementCommands):
342 """
343 A class for Redis Cluster management commands
345 The class inherits from Redis's core ManagementCommands class and do the
346 required adjustments to work with cluster mode
347 """
349 def slaveof(self, *args, **kwargs) -> NoReturn:
350 """
351 Make the server a replica of another instance, or promote it as master.
353 For more information see https://redis.io/commands/slaveof
354 """
355 raise RedisClusterException("SLAVEOF is not supported in cluster mode")
357 def replicaof(self, *args, **kwargs) -> NoReturn:
358 """
359 Make the server a replica of another instance, or promote it as master.
361 For more information see https://redis.io/commands/replicaof
362 """
363 raise RedisClusterException("REPLICAOF is not supported in cluster mode")
365 def swapdb(self, *args, **kwargs) -> NoReturn:
366 """
367 Swaps two Redis databases.
369 For more information see https://redis.io/commands/swapdb
370 """
371 raise RedisClusterException("SWAPDB is not supported in cluster mode")
373 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT:
374 """
375 Returns the node's id.
377 :target_node: 'ClusterNode'
378 The node to execute the command on
380 For more information check https://redis.io/commands/cluster-myid/
381 """
382 return self.execute_command("CLUSTER MYID", target_nodes=target_node)
384 def cluster_addslots(
385 self, target_node: "TargetNodesT", *slots: EncodableT
386 ) -> ResponseT:
387 """
388 Assign new hash slots to receiving node. Sends to specified node.
390 :target_node: 'ClusterNode'
391 The node to execute the command on
393 For more information see https://redis.io/commands/cluster-addslots
394 """
395 return self.execute_command(
396 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node
397 )
399 def cluster_addslotsrange(
400 self, target_node: "TargetNodesT", *slots: EncodableT
401 ) -> ResponseT:
402 """
403 Similar to the CLUSTER ADDSLOTS command.
404 The difference between the two commands is that ADDSLOTS takes a list of slots
405 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges
406 (specified by start and end slots) to assign to the node.
408 :target_node: 'ClusterNode'
409 The node to execute the command on
411 For more information see https://redis.io/commands/cluster-addslotsrange
412 """
413 return self.execute_command(
414 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node
415 )
417 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT:
418 """
419 Return the number of local keys in the specified hash slot
420 Send to node based on specified slot_id
422 For more information see https://redis.io/commands/cluster-countkeysinslot
423 """
424 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id)
426 def cluster_count_failure_report(self, node_id: str) -> ResponseT:
427 """
428 Return the number of failure reports active for a given node
429 Sends to a random node
431 For more information see https://redis.io/commands/cluster-count-failure-reports
432 """
433 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id)
435 def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
436 """
437 Set hash slots as unbound in the cluster.
438 It determines by it self what node the slot is in and sends it there
440 Returns a list of the results for each processed slot.
442 For more information see https://redis.io/commands/cluster-delslots
443 """
444 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots]
446 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT:
447 """
448 Similar to the CLUSTER DELSLOTS command.
449 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove
450 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove
451 from the node.
453 For more information see https://redis.io/commands/cluster-delslotsrange
454 """
455 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots)
457 def cluster_failover(
458 self, target_node: "TargetNodesT", option: Optional[str] = None
459 ) -> ResponseT:
460 """
461 Forces a slave to perform a manual failover of its master
462 Sends to specified node
464 :target_node: 'ClusterNode'
465 The node to execute the command on
467 For more information see https://redis.io/commands/cluster-failover
468 """
469 if option:
470 if option.upper() not in ["FORCE", "TAKEOVER"]:
471 raise RedisError(
472 f"Invalid option for CLUSTER FAILOVER command: {option}"
473 )
474 else:
475 return self.execute_command(
476 "CLUSTER FAILOVER", option, target_nodes=target_node
477 )
478 else:
479 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node)
481 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
482 """
483 Provides info about Redis Cluster node state.
484 The command will be sent to a random node in the cluster if no target
485 node is specified.
487 For more information see https://redis.io/commands/cluster-info
488 """
489 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes)
491 def cluster_keyslot(self, key: str) -> ResponseT:
492 """
493 Returns the hash slot of the specified key
494 Sends to random node in the cluster
496 For more information see https://redis.io/commands/cluster-keyslot
497 """
498 return self.execute_command("CLUSTER KEYSLOT", key)
500 def cluster_meet(
501 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None
502 ) -> ResponseT:
503 """
504 Force a node cluster to handshake with another node.
505 Sends to specified node.
507 For more information see https://redis.io/commands/cluster-meet
508 """
509 return self.execute_command(
510 "CLUSTER MEET", host, port, target_nodes=target_nodes
511 )
513 def cluster_nodes(self) -> ResponseT:
514 """
515 Get Cluster config for the node.
516 Sends to random node in the cluster
518 For more information see https://redis.io/commands/cluster-nodes
519 """
520 return self.execute_command("CLUSTER NODES")
522 def cluster_replicate(
523 self, target_nodes: "TargetNodesT", node_id: str
524 ) -> ResponseT:
525 """
526 Reconfigure a node as a slave of the specified master node
528 For more information see https://redis.io/commands/cluster-replicate
529 """
530 return self.execute_command(
531 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes
532 )
534 def cluster_reset(
535 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None
536 ) -> ResponseT:
537 """
538 Reset a Redis Cluster node
540 If 'soft' is True then it will send 'SOFT' argument
541 If 'soft' is False then it will send 'HARD' argument
543 For more information see https://redis.io/commands/cluster-reset
544 """
545 return self.execute_command(
546 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes
547 )
549 def cluster_save_config(
550 self, target_nodes: Optional["TargetNodesT"] = None
551 ) -> ResponseT:
552 """
553 Forces the node to save cluster state on disk
555 For more information see https://redis.io/commands/cluster-saveconfig
556 """
557 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes)
559 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT:
560 """
561 Returns the number of keys in the specified cluster slot
563 For more information see https://redis.io/commands/cluster-getkeysinslot
564 """
565 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys)
567 def cluster_set_config_epoch(
568 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None
569 ) -> ResponseT:
570 """
571 Set the configuration epoch in a new node
573 For more information see https://redis.io/commands/cluster-set-config-epoch
574 """
575 return self.execute_command(
576 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes
577 )
579 def cluster_setslot(
580 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str
581 ) -> ResponseT:
582 """
583 Bind an hash slot to a specific node
585 :target_node: 'ClusterNode'
586 The node to execute the command on
588 For more information see https://redis.io/commands/cluster-setslot
589 """
590 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"):
591 return self.execute_command(
592 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node
593 )
594 elif state.upper() == "STABLE":
595 raise RedisError('For "stable" state please use ' "cluster_setslot_stable")
596 else:
597 raise RedisError(f"Invalid slot state: {state}")
599 def cluster_setslot_stable(self, slot_id: int) -> ResponseT:
600 """
601 Clears migrating / importing state from the slot.
602 It determines by it self what node the slot is in and sends it there.
604 For more information see https://redis.io/commands/cluster-setslot
605 """
606 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE")
608 def cluster_replicas(
609 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None
610 ) -> ResponseT:
611 """
612 Provides a list of replica nodes replicating from the specified primary
613 target node.
615 For more information see https://redis.io/commands/cluster-replicas
616 """
617 return self.execute_command(
618 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes
619 )
621 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
622 """
623 Get array of Cluster slot to node mappings
625 For more information see https://redis.io/commands/cluster-slots
626 """
627 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes)
629 def cluster_shards(self, target_nodes=None):
630 """
631 Returns details about the shards of the cluster.
633 For more information see https://redis.io/commands/cluster-shards
634 """
635 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes)
637 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT:
638 """
639 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each
640 peer in the cluster: One for sending outbound messages towards the peer and one
641 for receiving inbound messages from the peer.
643 This command outputs information of all such peer links as an array.
645 For more information see https://redis.io/commands/cluster-links
646 """
647 return self.execute_command("CLUSTER LINKS", target_nodes=target_node)
649 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
650 raise NotImplementedError(
651 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client."
652 )
654 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
655 raise NotImplementedError(
656 "CLUSTER BUMPEPOCH is intentionally not implemented in the client."
657 )
659 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
660 """
661 Enables read queries.
662 The command will be sent to the default cluster node if target_nodes is
663 not specified.
665 For more information see https://redis.io/commands/readonly
666 """
667 if target_nodes == "replicas" or target_nodes == "all":
668 # read_from_replicas will only be enabled if the READONLY command
669 # is sent to all replicas
670 self.read_from_replicas = True
671 return self.execute_command("READONLY", target_nodes=target_nodes)
673 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
674 """
675 Disables read queries.
676 The command will be sent to the default cluster node if target_nodes is
677 not specified.
679 For more information see https://redis.io/commands/readwrite
680 """
681 # Reset read from replicas flag
682 self.read_from_replicas = False
683 return self.execute_command("READWRITE", target_nodes=target_nodes)
686class AsyncClusterManagementCommands(
687 ClusterManagementCommands, AsyncManagementCommands
688):
689 """
690 A class for Redis Cluster management commands
692 The class inherits from Redis's core ManagementCommands class and do the
693 required adjustments to work with cluster mode
694 """
696 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
697 """
698 Set hash slots as unbound in the cluster.
699 It determines by it self what node the slot is in and sends it there
701 Returns a list of the results for each processed slot.
703 For more information see https://redis.io/commands/cluster-delslots
704 """
705 return await asyncio.gather(
706 *(
707 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot))
708 for slot in slots
709 )
710 )
713class ClusterDataAccessCommands(DataAccessCommands):
714 """
715 A class for Redis Cluster Data Access Commands
717 The class inherits from Redis's core DataAccessCommand class and do the
718 required adjustments to work with cluster mode
719 """
721 def stralgo(
722 self,
723 algo: Literal["LCS"],
724 value1: KeyT,
725 value2: KeyT,
726 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings",
727 len: bool = False,
728 idx: bool = False,
729 minmatchlen: Optional[int] = None,
730 withmatchlen: bool = False,
731 **kwargs,
732 ) -> ResponseT:
733 """
734 Implements complex algorithms that operate on strings.
735 Right now the only algorithm implemented is the LCS algorithm
736 (longest common substring). However new algorithms could be
737 implemented in the future.
739 ``algo`` Right now must be LCS
740 ``value1`` and ``value2`` Can be two strings or two keys
741 ``specific_argument`` Specifying if the arguments to the algorithm
742 will be keys or strings. strings is the default.
743 ``len`` Returns just the len of the match.
744 ``idx`` Returns the match positions in each string.
745 ``minmatchlen`` Restrict the list of matches to the ones of a given
746 minimal length. Can be provided only when ``idx`` set to True.
747 ``withmatchlen`` Returns the matches with the len of the match.
748 Can be provided only when ``idx`` set to True.
750 For more information see https://redis.io/commands/stralgo
751 """
752 target_nodes = kwargs.pop("target_nodes", None)
753 if specific_argument == "strings" and target_nodes is None:
754 target_nodes = "default-node"
755 kwargs.update({"target_nodes": target_nodes})
756 return super().stralgo(
757 algo,
758 value1,
759 value2,
760 specific_argument,
761 len,
762 idx,
763 minmatchlen,
764 withmatchlen,
765 **kwargs,
766 )
768 def scan_iter(
769 self,
770 match: Optional[PatternT] = None,
771 count: Optional[int] = None,
772 _type: Optional[str] = None,
773 **kwargs,
774 ) -> Iterator:
775 # Do the first query with cursor=0 for all nodes
776 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
777 yield from data
779 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
780 if cursors:
781 # Get nodes by name
782 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
784 # Iterate over each node till its cursor is 0
785 kwargs.pop("target_nodes", None)
786 while cursors:
787 for name, cursor in cursors.items():
788 cur, data = self.scan(
789 cursor=cursor,
790 match=match,
791 count=count,
792 _type=_type,
793 target_nodes=nodes[name],
794 **kwargs,
795 )
796 yield from data
797 cursors[name] = cur[name]
799 cursors = {
800 name: cursor for name, cursor in cursors.items() if cursor != 0
801 }
804class AsyncClusterDataAccessCommands(
805 ClusterDataAccessCommands, AsyncDataAccessCommands
806):
807 """
808 A class for Redis Cluster Data Access Commands
810 The class inherits from Redis's core DataAccessCommand class and do the
811 required adjustments to work with cluster mode
812 """
814 async def scan_iter(
815 self,
816 match: Optional[PatternT] = None,
817 count: Optional[int] = None,
818 _type: Optional[str] = None,
819 **kwargs,
820 ) -> AsyncIterator:
821 # Do the first query with cursor=0 for all nodes
822 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
823 for value in data:
824 yield value
826 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
827 if cursors:
828 # Get nodes by name
829 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
831 # Iterate over each node till its cursor is 0
832 kwargs.pop("target_nodes", None)
833 while cursors:
834 for name, cursor in cursors.items():
835 cur, data = await self.scan(
836 cursor=cursor,
837 match=match,
838 count=count,
839 _type=_type,
840 target_nodes=nodes[name],
841 **kwargs,
842 )
843 for value in data:
844 yield value
845 cursors[name] = cur[name]
847 cursors = {
848 name: cursor for name, cursor in cursors.items() if cursor != 0
849 }
852class RedisClusterCommands(
853 ClusterMultiKeyCommands,
854 ClusterManagementCommands,
855 ACLCommands,
856 PubSubCommands,
857 ClusterDataAccessCommands,
858 ScriptCommands,
859 FunctionCommands,
860 RedisModuleCommands,
861):
862 """
863 A class for all Redis Cluster commands
865 For key-based commands, the target node(s) will be internally determined
866 by the keys' hash slot.
867 Non-key-based commands can be executed with the 'target_nodes' argument to
868 target specific nodes. By default, if target_nodes is not specified, the
869 command will be executed on the default cluster node.
871 :param :target_nodes: type can be one of the followings:
872 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
873 - 'ClusterNode'
874 - 'list(ClusterNodes)'
875 - 'dict(any:clusterNodes)'
877 for example:
878 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
879 """
882class AsyncRedisClusterCommands(
883 AsyncClusterMultiKeyCommands,
884 AsyncClusterManagementCommands,
885 AsyncACLCommands,
886 AsyncClusterDataAccessCommands,
887 AsyncScriptCommands,
888 AsyncFunctionCommands,
889):
890 """
891 A class for all Redis Cluster commands
893 For key-based commands, the target node(s) will be internally determined
894 by the keys' hash slot.
895 Non-key-based commands can be executed with the 'target_nodes' argument to
896 target specific nodes. By default, if target_nodes is not specified, the
897 command will be executed on the default cluster node.
899 :param :target_nodes: type can be one of the followings:
900 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
901 - 'ClusterNode'
902 - 'list(ClusterNodes)'
903 - 'dict(any:clusterNodes)'
905 for example:
906 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
907 """