1from __future__ import annotations
2
3import asyncio
4from typing import (
5 TYPE_CHECKING,
6 Any,
7 AsyncIterator,
8 Awaitable,
9 Dict,
10 Iterable,
11 Iterator,
12 List,
13 Literal,
14 Mapping,
15 NoReturn,
16 Sequence,
17 overload,
18)
19
20from redis.crc import key_slot
21from redis.exceptions import RedisClusterException, RedisError
22from redis.typing import (
23 AnyKeyT,
24 AsyncClientProtocol,
25 ClusterCommandsProtocol,
26 ClusterNodeDetail,
27 EncodableT,
28 KeysT,
29 KeyT,
30 PatternT,
31 ResponseT,
32 StralgoResponse,
33 SyncClientProtocol,
34)
35from redis.utils import deprecated_function
36
37from .core import (
38 ACLCommands,
39 AsyncACLCommands,
40 AsyncDataAccessCommands,
41 AsyncFunctionCommands,
42 AsyncManagementCommands,
43 AsyncModuleCommands,
44 AsyncScriptCommands,
45 DataAccessCommands,
46 FunctionCommands,
47 HotkeysMetricsTypes,
48 ManagementCommands,
49 ModuleCommands,
50 PubSubCommands,
51 ScriptCommands,
52)
53from .helpers import list_or_args
54from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
55
56if TYPE_CHECKING:
57 from redis.asyncio.cluster import TargetNodesT
58
59# Not complete, but covers the major ones
60# https://redis.io/commands
61READ_COMMANDS = frozenset(
62 [
63 # Bit Operations
64 "BITCOUNT",
65 "BITFIELD_RO",
66 "BITPOS",
67 # Scripting
68 "EVAL_RO",
69 "EVALSHA_RO",
70 "FCALL_RO",
71 # Key Operations
72 "DBSIZE",
73 "DIGEST",
74 "DUMP",
75 "EXISTS",
76 "EXPIRETIME",
77 "PEXPIRETIME",
78 "KEYS",
79 "SCAN",
80 "PTTL",
81 "RANDOMKEY",
82 "TTL",
83 "TYPE",
84 # String Operations
85 "GET",
86 "GETBIT",
87 "GETRANGE",
88 "MGET",
89 "STRLEN",
90 "LCS",
91 # Geo Operations
92 "GEODIST",
93 "GEOHASH",
94 "GEOPOS",
95 "GEOSEARCH",
96 # Hash Operations
97 "HEXISTS",
98 "HGET",
99 "HGETALL",
100 "HKEYS",
101 "HLEN",
102 "HMGET",
103 "HSTRLEN",
104 "HVALS",
105 "HRANDFIELD",
106 "HEXPIRETIME",
107 "HPEXPIRETIME",
108 "HTTL",
109 "HPTTL",
110 "HSCAN",
111 # List Operations
112 "LINDEX",
113 "LPOS",
114 "LLEN",
115 "LRANGE",
116 # Set Operations
117 "SCARD",
118 "SDIFF",
119 "SINTER",
120 "SINTERCARD",
121 "SISMEMBER",
122 "SMISMEMBER",
123 "SMEMBERS",
124 "SRANDMEMBER",
125 "SUNION",
126 "SSCAN",
127 # Sorted Set Operations
128 "ZCARD",
129 "ZCOUNT",
130 "ZDIFF",
131 "ZINTER",
132 "ZINTERCARD",
133 "ZLEXCOUNT",
134 "ZMSCORE",
135 "ZRANDMEMBER",
136 "ZRANGE",
137 "ZRANGEBYLEX",
138 "ZRANGEBYSCORE",
139 "ZRANK",
140 "ZREVRANGE",
141 "ZREVRANGEBYLEX",
142 "ZREVRANGEBYSCORE",
143 "ZREVRANK",
144 "ZSCAN",
145 "ZSCORE",
146 "ZUNION",
147 # Stream Operations
148 "XLEN",
149 "XPENDING",
150 "XRANGE",
151 "XREAD",
152 "XREVRANGE",
153 # JSON Module
154 "JSON.ARRINDEX",
155 "JSON.ARRLEN",
156 "JSON.GET",
157 "JSON.MGET",
158 "JSON.OBJKEYS",
159 "JSON.OBJLEN",
160 "JSON.RESP",
161 "JSON.STRLEN",
162 "JSON.TYPE",
163 # RediSearch Module
164 "FT.EXPLAIN",
165 "FT.INFO",
166 "FT.PROFILE",
167 "FT.SEARCH",
168 ]
169)
170
171
172class ClusterMultiKeyCommands(ClusterCommandsProtocol):
173 """
174 A class containing commands that handle more than one key
175 """
176
177 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]:
178 """Split keys into a dictionary that maps a slot to a list of keys."""
179
180 slots_to_keys = {}
181 for key in keys:
182 slot = key_slot(self.encoder.encode(key))
183 slots_to_keys.setdefault(slot, []).append(key)
184
185 return slots_to_keys
186
187 def _partition_pairs_by_slot(
188 self, mapping: Mapping[AnyKeyT, EncodableT]
189 ) -> Dict[int, List[EncodableT]]:
190 """Split pairs into a dictionary that maps a slot to a list of pairs."""
191
192 slots_to_pairs = {}
193 for pair in mapping.items():
194 slot = key_slot(self.encoder.encode(pair[0]))
195 slots_to_pairs.setdefault(slot, []).extend(pair)
196
197 return slots_to_pairs
198
199 def _execute_pipeline_by_slot(
200 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
201 ) -> List[Any]:
202 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
203 pipe = self.pipeline()
204 [
205 pipe.execute_command(
206 command,
207 *slot_args,
208 target_nodes=[
209 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
210 ],
211 )
212 for slot, slot_args in slots_to_args.items()
213 ]
214 return pipe.execute()
215
216 def _reorder_keys_by_command(
217 self,
218 keys: Iterable[KeyT],
219 slots_to_args: Mapping[int, Iterable[EncodableT]],
220 responses: Iterable[Any],
221 ) -> List[Any]:
222 results = {
223 k: v
224 for slot_values, response in zip(slots_to_args.values(), responses)
225 for k, v in zip(slot_values, response)
226 }
227 return [results[key] for key in keys]
228
229 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Any | None]:
230 """
231 Splits the keys into different slots and then calls MGET
232 for the keys of every slot. This operation will not be atomic
233 if keys belong to more than one slot.
234
235 Returns a list of values ordered identically to ``keys``
236
237 For more information see https://redis.io/commands/mget
238 """
239
240 # Concatenate all keys into a list
241 keys = list_or_args(keys, args)
242
243 # Split keys into slots
244 slots_to_keys = self._partition_keys_by_slot(keys)
245
246 # Execute commands using a pipeline
247 res = self._execute_pipeline_by_slot("MGET", slots_to_keys)
248
249 # Reorder keys in the order the user provided & return
250 return self._reorder_keys_by_command(keys, slots_to_keys, res)
251
252 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
253 """
254 Sets key/values based on a mapping. Mapping is a dictionary of
255 key/value pairs. Both keys and values should be strings or types that
256 can be cast to a string via str().
257
258 Splits the keys into different slots and then calls MSET
259 for the keys of every slot. This operation will not be atomic
260 if keys belong to more than one slot.
261
262 For more information see https://redis.io/commands/mset
263 """
264
265 # Partition the keys by slot
266 slots_to_pairs = self._partition_pairs_by_slot(mapping)
267
268 # Execute commands using a pipeline & return list of replies
269 return self._execute_pipeline_by_slot("MSET", slots_to_pairs)
270
271 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
272 """
273 Runs the given command once for the keys
274 of each slot. Returns the sum of the return values.
275 """
276
277 # Partition the keys by slot
278 slots_to_keys = self._partition_keys_by_slot(keys)
279
280 # Sum up the reply from each command
281 return sum(self._execute_pipeline_by_slot(command, slots_to_keys))
282
283 @overload
284 def exists(self: SyncClientProtocol, *keys: KeyT) -> int: ...
285
286 @overload
287 def exists(self: AsyncClientProtocol, *keys: KeyT) -> Awaitable[int]: ...
288
289 def exists(self, *keys: KeyT) -> int | Awaitable[int]:
290 """
291 Returns the number of ``names`` that exist in the
292 whole cluster. The keys are first split up into slots
293 and then an EXISTS command is sent for every slot
294
295 For more information see https://redis.io/commands/exists
296 """
297 return self._split_command_across_slots("EXISTS", *keys)
298
299 @overload
300 def delete(self: SyncClientProtocol, *keys: KeyT) -> int: ...
301
302 @overload
303 def delete(self: AsyncClientProtocol, *keys: KeyT) -> Awaitable[int]: ...
304
305 def delete(self, *keys: KeyT) -> int | Awaitable[int]:
306 """
307 Deletes the given keys in the cluster.
308 The keys are first split up into slots
309 and then an DEL command is sent for every slot
310
311 Non-existent keys are ignored.
312 Returns the number of keys that were deleted.
313
314 For more information see https://redis.io/commands/del
315 """
316 return self._split_command_across_slots("DEL", *keys)
317
318 @overload
319 def touch(self: SyncClientProtocol, *keys: KeyT) -> int: ...
320
321 @overload
322 def touch(self: AsyncClientProtocol, *keys: KeyT) -> Awaitable[int]: ...
323
324 def touch(self, *keys: KeyT) -> int | Awaitable[int]:
325 """
326 Updates the last access time of given keys across the
327 cluster.
328
329 The keys are first split up into slots
330 and then an TOUCH command is sent for every slot
331
332 Non-existent keys are ignored.
333 Returns the number of keys that were touched.
334
335 For more information see https://redis.io/commands/touch
336 """
337 return self._split_command_across_slots("TOUCH", *keys)
338
339 @overload
340 def unlink(self: SyncClientProtocol, *keys: KeyT) -> int: ...
341
342 @overload
343 def unlink(self: AsyncClientProtocol, *keys: KeyT) -> Awaitable[int]: ...
344
345 def unlink(self, *keys: KeyT) -> int | Awaitable[int]:
346 """
347 Remove the specified keys in a different thread.
348
349 The keys are first split up into slots
350 and then an TOUCH command is sent for every slot
351
352 Non-existent keys are ignored.
353 Returns the number of keys that were unlinked.
354
355 For more information see https://redis.io/commands/unlink
356 """
357 return self._split_command_across_slots("UNLINK", *keys)
358
359
360class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
361 """
362 A class containing commands that handle more than one key
363 """
364
365 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Any | None]:
366 """
367 Splits the keys into different slots and then calls MGET
368 for the keys of every slot. This operation will not be atomic
369 if keys belong to more than one slot.
370
371 Returns a list of values ordered identically to ``keys``
372
373 For more information see https://redis.io/commands/mget
374 """
375
376 # Concatenate all keys into a list
377 keys = list_or_args(keys, args)
378
379 # Split keys into slots
380 slots_to_keys = self._partition_keys_by_slot(keys)
381
382 # Execute commands using a pipeline
383 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys)
384
385 # Reorder keys in the order the user provided & return
386 return self._reorder_keys_by_command(keys, slots_to_keys, res)
387
388 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
389 """
390 Sets key/values based on a mapping. Mapping is a dictionary of
391 key/value pairs. Both keys and values should be strings or types that
392 can be cast to a string via str().
393
394 Splits the keys into different slots and then calls MSET
395 for the keys of every slot. This operation will not be atomic
396 if keys belong to more than one slot.
397
398 For more information see https://redis.io/commands/mset
399 """
400
401 # Partition the keys by slot
402 slots_to_pairs = self._partition_pairs_by_slot(mapping)
403
404 # Execute commands using a pipeline & return list of replies
405 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs)
406
407 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
408 """
409 Runs the given command once for the keys
410 of each slot. Returns the sum of the return values.
411 """
412
413 # Partition the keys by slot
414 slots_to_keys = self._partition_keys_by_slot(keys)
415
416 # Sum up the reply from each command
417 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))
418
419 async def _execute_pipeline_by_slot(
420 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
421 ) -> List[Any]:
422 if self._initialize:
423 await self.initialize()
424 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
425 pipe = self.pipeline()
426 [
427 pipe.execute_command(
428 command,
429 *slot_args,
430 target_nodes=[
431 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
432 ],
433 )
434 for slot, slot_args in slots_to_args.items()
435 ]
436 return await pipe.execute()
437
438
439class ClusterManagementCommands(ManagementCommands):
440 """
441 A class for Redis Cluster management commands
442
443 The class inherits from Redis's core ManagementCommands class and do the
444 required adjustments to work with cluster mode
445 """
446
447 def slaveof(self, *args, **kwargs) -> NoReturn:
448 """
449 Make the server a replica of another instance, or promote it as master.
450
451 For more information see https://redis.io/commands/slaveof
452 """
453 raise RedisClusterException("SLAVEOF is not supported in cluster mode")
454
455 def replicaof(self, *args, **kwargs) -> NoReturn:
456 """
457 Make the server a replica of another instance, or promote it as master.
458
459 For more information see https://redis.io/commands/replicaof
460 """
461 raise RedisClusterException("REPLICAOF is not supported in cluster mode")
462
463 def swapdb(self, *args, **kwargs) -> NoReturn:
464 """
465 Swaps two Redis databases.
466
467 For more information see https://redis.io/commands/swapdb
468 """
469 raise RedisClusterException("SWAPDB is not supported in cluster mode")
470
471 @overload
472 def cluster_myid(
473 self: SyncClientProtocol, target_node: "TargetNodesT"
474 ) -> bytes | str: ...
475
476 @overload
477 def cluster_myid(
478 self: AsyncClientProtocol, target_node: "TargetNodesT"
479 ) -> Awaitable[bytes | str]: ...
480
481 def cluster_myid(self, target_node: "TargetNodesT") -> (bytes | str) | Awaitable[
482 bytes | str
483 ]:
484 """
485 Returns the node's id.
486
487 :target_node: 'ClusterNode'
488 The node to execute the command on
489
490 For more information check https://redis.io/commands/cluster-myid/
491 """
492 return self.execute_command("CLUSTER MYID", target_nodes=target_node)
493
494 @overload
495 def cluster_addslots(
496 self: SyncClientProtocol, target_node: "TargetNodesT", *slots: EncodableT
497 ) -> bool: ...
498
499 @overload
500 def cluster_addslots(
501 self: AsyncClientProtocol, target_node: "TargetNodesT", *slots: EncodableT
502 ) -> Awaitable[bool]: ...
503
504 def cluster_addslots(
505 self, target_node: "TargetNodesT", *slots: EncodableT
506 ) -> bool | Awaitable[bool]:
507 """
508 Assign new hash slots to receiving node. Sends to specified node.
509
510 :target_node: 'ClusterNode'
511 The node to execute the command on
512
513 For more information see https://redis.io/commands/cluster-addslots
514 """
515 return self.execute_command(
516 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node
517 )
518
519 @overload
520 def cluster_addslotsrange(
521 self: SyncClientProtocol, target_node: "TargetNodesT", *slots: EncodableT
522 ) -> bool: ...
523
524 @overload
525 def cluster_addslotsrange(
526 self: AsyncClientProtocol, target_node: "TargetNodesT", *slots: EncodableT
527 ) -> Awaitable[bool]: ...
528
529 def cluster_addslotsrange(
530 self, target_node: "TargetNodesT", *slots: EncodableT
531 ) -> bool | Awaitable[bool]:
532 """
533 Similar to the CLUSTER ADDSLOTS command.
534 The difference between the two commands is that ADDSLOTS takes a list of slots
535 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges
536 (specified by start and end slots) to assign to the node.
537
538 :target_node: 'ClusterNode'
539 The node to execute the command on
540
541 For more information see https://redis.io/commands/cluster-addslotsrange
542 """
543 return self.execute_command(
544 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node
545 )
546
547 @overload
548 def cluster_countkeysinslot(self: SyncClientProtocol, slot_id: int) -> int: ...
549
550 @overload
551 def cluster_countkeysinslot(
552 self: AsyncClientProtocol, slot_id: int
553 ) -> Awaitable[int]: ...
554
555 def cluster_countkeysinslot(self, slot_id: int) -> int | Awaitable[int]:
556 """
557 Return the number of local keys in the specified hash slot
558 Send to node based on specified slot_id
559
560 For more information see https://redis.io/commands/cluster-countkeysinslot
561 """
562 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id)
563
564 @overload
565 def cluster_count_failure_report(self: SyncClientProtocol, node_id: str) -> int: ...
566
567 @overload
568 def cluster_count_failure_report(
569 self: AsyncClientProtocol, node_id: str
570 ) -> Awaitable[int]: ...
571
572 def cluster_count_failure_report(self, node_id: str) -> int | Awaitable[int]:
573 """
574 Return the number of failure reports active for a given node
575 Sends to a random node
576
577 For more information see https://redis.io/commands/cluster-count-failure-reports
578 """
579 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id)
580
581 def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
582 """
583 Set hash slots as unbound in the cluster.
584 It determines by it self what node the slot is in and sends it there
585
586 Returns a list of the results for each processed slot.
587
588 For more information see https://redis.io/commands/cluster-delslots
589 """
590 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots]
591
592 @overload
593 def cluster_delslotsrange(self: SyncClientProtocol, *slots: EncodableT) -> bool: ...
594
595 @overload
596 def cluster_delslotsrange(
597 self: AsyncClientProtocol, *slots: EncodableT
598 ) -> Awaitable[bool]: ...
599
600 def cluster_delslotsrange(self, *slots: EncodableT) -> bool | Awaitable[bool]:
601 """
602 Similar to the CLUSTER DELSLOTS command.
603 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove
604 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove
605 from the node.
606
607 For more information see https://redis.io/commands/cluster-delslotsrange
608 """
609 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots)
610
611 @overload
612 def cluster_failover(
613 self: SyncClientProtocol,
614 target_node: "TargetNodesT",
615 option: str | None = None,
616 ) -> bool: ...
617
618 @overload
619 def cluster_failover(
620 self: AsyncClientProtocol,
621 target_node: "TargetNodesT",
622 option: str | None = None,
623 ) -> Awaitable[bool]: ...
624
625 def cluster_failover(
626 self, target_node: "TargetNodesT", option: str | None = None
627 ) -> bool | Awaitable[bool]:
628 """
629 Forces a slave to perform a manual failover of its master
630 Sends to specified node
631
632 :target_node: 'ClusterNode'
633 The node to execute the command on
634
635 For more information see https://redis.io/commands/cluster-failover
636 """
637 if option:
638 if option.upper() not in ["FORCE", "TAKEOVER"]:
639 raise RedisError(
640 f"Invalid option for CLUSTER FAILOVER command: {option}"
641 )
642 else:
643 return self.execute_command(
644 "CLUSTER FAILOVER", option, target_nodes=target_node
645 )
646 else:
647 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node)
648
649 @overload
650 def cluster_info(
651 self: SyncClientProtocol, target_nodes: "TargetNodesT" | None = None
652 ) -> dict[str, str]: ...
653
654 @overload
655 def cluster_info(
656 self: AsyncClientProtocol, target_nodes: "TargetNodesT" | None = None
657 ) -> Awaitable[dict[str, str]]: ...
658
659 def cluster_info(
660 self, target_nodes: "TargetNodesT" | None = None
661 ) -> dict[str, str] | Awaitable[dict[str, str]]:
662 """
663 Provides info about Redis Cluster node state.
664 The command will be sent to a random node in the cluster if no target
665 node is specified.
666
667 For more information see https://redis.io/commands/cluster-info
668 """
669 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes)
670
671 @overload
672 def cluster_keyslot(self: SyncClientProtocol, key: str) -> int: ...
673
674 @overload
675 def cluster_keyslot(self: AsyncClientProtocol, key: str) -> Awaitable[int]: ...
676
677 def cluster_keyslot(self, key: str) -> int | Awaitable[int]:
678 """
679 Returns the hash slot of the specified key
680 Sends to random node in the cluster
681
682 For more information see https://redis.io/commands/cluster-keyslot
683 """
684 return self.execute_command("CLUSTER KEYSLOT", key)
685
686 @overload
687 def cluster_meet(
688 self: SyncClientProtocol,
689 host: str,
690 port: int,
691 target_nodes: "TargetNodesT" | None = None,
692 ) -> bool: ...
693
694 @overload
695 def cluster_meet(
696 self: AsyncClientProtocol,
697 host: str,
698 port: int,
699 target_nodes: "TargetNodesT" | None = None,
700 ) -> Awaitable[bool]: ...
701
702 def cluster_meet(
703 self, host: str, port: int, target_nodes: "TargetNodesT" | None = None
704 ) -> bool | Awaitable[bool]:
705 """
706 Force a node cluster to handshake with another node.
707 Sends to specified node.
708
709 For more information see https://redis.io/commands/cluster-meet
710 """
711 return self.execute_command(
712 "CLUSTER MEET", host, port, target_nodes=target_nodes
713 )
714
715 @overload
716 def cluster_nodes(self: SyncClientProtocol) -> dict[str, ClusterNodeDetail]: ...
717
718 @overload
719 def cluster_nodes(
720 self: AsyncClientProtocol,
721 ) -> Awaitable[dict[str, ClusterNodeDetail]]: ...
722
723 def cluster_nodes(
724 self,
725 ) -> dict[str, ClusterNodeDetail] | Awaitable[dict[str, ClusterNodeDetail]]:
726 """
727 Get Cluster config for the node.
728 Sends to random node in the cluster
729
730 For more information see https://redis.io/commands/cluster-nodes
731 """
732 return self.execute_command("CLUSTER NODES")
733
734 @overload
735 def cluster_replicate(
736 self: SyncClientProtocol, target_nodes: "TargetNodesT", node_id: str
737 ) -> bool: ...
738
739 @overload
740 def cluster_replicate(
741 self: AsyncClientProtocol, target_nodes: "TargetNodesT", node_id: str
742 ) -> Awaitable[bool]: ...
743
744 def cluster_replicate(
745 self, target_nodes: "TargetNodesT", node_id: str
746 ) -> bool | Awaitable[bool]:
747 """
748 Reconfigure a node as a slave of the specified master node
749
750 For more information see https://redis.io/commands/cluster-replicate
751 """
752 return self.execute_command(
753 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes
754 )
755
756 @overload
757 def cluster_reset(
758 self: SyncClientProtocol,
759 soft: bool = True,
760 target_nodes: "TargetNodesT" | None = None,
761 ) -> bool: ...
762
763 @overload
764 def cluster_reset(
765 self: AsyncClientProtocol,
766 soft: bool = True,
767 target_nodes: "TargetNodesT" | None = None,
768 ) -> Awaitable[bool]: ...
769
770 def cluster_reset(
771 self, soft: bool = True, target_nodes: "TargetNodesT" | None = None
772 ) -> bool | Awaitable[bool]:
773 """
774 Reset a Redis Cluster node
775
776 If 'soft' is True then it will send 'SOFT' argument
777 If 'soft' is False then it will send 'HARD' argument
778
779 For more information see https://redis.io/commands/cluster-reset
780 """
781 return self.execute_command(
782 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes
783 )
784
785 @overload
786 def cluster_save_config(
787 self: SyncClientProtocol, target_nodes: "TargetNodesT" | None = None
788 ) -> bool: ...
789
790 @overload
791 def cluster_save_config(
792 self: AsyncClientProtocol, target_nodes: "TargetNodesT" | None = None
793 ) -> Awaitable[bool]: ...
794
795 def cluster_save_config(
796 self, target_nodes: "TargetNodesT" | None = None
797 ) -> bool | Awaitable[bool]:
798 """
799 Forces the node to save cluster state on disk
800
801 For more information see https://redis.io/commands/cluster-saveconfig
802 """
803 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes)
804
805 @overload
806 def cluster_get_keys_in_slot(
807 self: SyncClientProtocol, slot: int, num_keys: int
808 ) -> list[bytes | str]: ...
809
810 @overload
811 def cluster_get_keys_in_slot(
812 self: AsyncClientProtocol, slot: int, num_keys: int
813 ) -> Awaitable[list[bytes | str]]: ...
814
815 def cluster_get_keys_in_slot(
816 self, slot: int, num_keys: int
817 ) -> list[bytes | str] | Awaitable[list[bytes | str]]:
818 """
819 Returns the number of keys in the specified cluster slot
820
821 For more information see https://redis.io/commands/cluster-getkeysinslot
822 """
823 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys)
824
825 @overload
826 def cluster_set_config_epoch(
827 self: SyncClientProtocol, epoch: int, target_nodes: "TargetNodesT" | None = None
828 ) -> bool: ...
829
830 @overload
831 def cluster_set_config_epoch(
832 self: AsyncClientProtocol,
833 epoch: int,
834 target_nodes: "TargetNodesT" | None = None,
835 ) -> Awaitable[bool]: ...
836
837 def cluster_set_config_epoch(
838 self, epoch: int, target_nodes: "TargetNodesT" | None = None
839 ) -> bool | Awaitable[bool]:
840 """
841 Set the configuration epoch in a new node
842
843 For more information see https://redis.io/commands/cluster-set-config-epoch
844 """
845 return self.execute_command(
846 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes
847 )
848
849 @overload
850 def cluster_setslot(
851 self: SyncClientProtocol,
852 target_node: "TargetNodesT",
853 node_id: str,
854 slot_id: int,
855 state: str,
856 ) -> bool: ...
857
858 @overload
859 def cluster_setslot(
860 self: AsyncClientProtocol,
861 target_node: "TargetNodesT",
862 node_id: str,
863 slot_id: int,
864 state: str,
865 ) -> Awaitable[bool]: ...
866
867 def cluster_setslot(
868 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str
869 ) -> bool | Awaitable[bool]:
870 """
871 Bind an hash slot to a specific node
872
873 :target_node: 'ClusterNode'
874 The node to execute the command on
875
876 For more information see https://redis.io/commands/cluster-setslot
877 """
878 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"):
879 return self.execute_command(
880 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node
881 )
882 elif state.upper() == "STABLE":
883 raise RedisError('For "stable" state please use cluster_setslot_stable')
884 else:
885 raise RedisError(f"Invalid slot state: {state}")
886
887 @overload
888 def cluster_setslot_stable(self: SyncClientProtocol, slot_id: int) -> bool: ...
889
890 @overload
891 def cluster_setslot_stable(
892 self: AsyncClientProtocol, slot_id: int
893 ) -> Awaitable[bool]: ...
894
895 def cluster_setslot_stable(self, slot_id: int) -> bool | Awaitable[bool]:
896 """
897 Clears migrating / importing state from the slot.
898 It determines by it self what node the slot is in and sends it there.
899
900 For more information see https://redis.io/commands/cluster-setslot
901 """
902 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE")
903
904 @overload
905 def cluster_replicas(
906 self: SyncClientProtocol,
907 node_id: str,
908 target_nodes: "TargetNodesT" | None = None,
909 ) -> dict[str, ClusterNodeDetail]: ...
910
911 @overload
912 def cluster_replicas(
913 self: AsyncClientProtocol,
914 node_id: str,
915 target_nodes: "TargetNodesT" | None = None,
916 ) -> Awaitable[dict[str, ClusterNodeDetail]]: ...
917
918 def cluster_replicas(
919 self, node_id: str, target_nodes: "TargetNodesT" | None = None
920 ) -> dict[str, ClusterNodeDetail] | Awaitable[dict[str, ClusterNodeDetail]]:
921 """
922 Provides a list of replica nodes replicating from the specified primary
923 target node.
924
925 For more information see https://redis.io/commands/cluster-replicas
926 """
927 return self.execute_command(
928 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes
929 )
930
931 @overload
932 def cluster_slots(
933 self: SyncClientProtocol, target_nodes: "TargetNodesT" | None = None
934 ) -> list[Any]: ...
935
936 @overload
937 def cluster_slots(
938 self: AsyncClientProtocol, target_nodes: "TargetNodesT" | None = None
939 ) -> Awaitable[list[Any]]: ...
940
941 def cluster_slots(
942 self, target_nodes: "TargetNodesT" | None = None
943 ) -> list[Any] | Awaitable[list[Any]]:
944 """
945 Get array of Cluster slot to node mappings
946
947 For more information see https://redis.io/commands/cluster-slots
948 """
949 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes)
950
951 @overload
952 def cluster_shards(
953 self: SyncClientProtocol, target_nodes: "TargetNodesT" | None = None
954 ) -> list[Any]: ...
955
956 @overload
957 def cluster_shards(
958 self: AsyncClientProtocol, target_nodes: "TargetNodesT" | None = None
959 ) -> Awaitable[list[Any]]: ...
960
961 def cluster_shards(
962 self, target_nodes: "TargetNodesT" | None = None
963 ) -> list[Any] | Awaitable[list[Any]]:
964 """
965 Returns details about the shards of the cluster.
966
967 For more information see https://redis.io/commands/cluster-shards
968 """
969 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes)
970
971 @overload
972 def cluster_myshardid(
973 self: SyncClientProtocol, target_nodes: "TargetNodesT" | None = None
974 ) -> bytes | str: ...
975
976 @overload
977 def cluster_myshardid(
978 self: AsyncClientProtocol, target_nodes: "TargetNodesT" | None = None
979 ) -> Awaitable[bytes | str]: ...
980
981 def cluster_myshardid(self, target_nodes: "TargetNodesT" | None = None) -> (
982 bytes | str
983 ) | Awaitable[bytes | str]:
984 """
985 Returns the shard ID of the node.
986
987 For more information see https://redis.io/commands/cluster-myshardid/
988 """
989 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes)
990
991 @overload
992 def cluster_links(
993 self: SyncClientProtocol, target_node: "TargetNodesT"
994 ) -> list[Any]: ...
995
996 @overload
997 def cluster_links(
998 self: AsyncClientProtocol, target_node: "TargetNodesT"
999 ) -> Awaitable[list[Any]]: ...
1000
1001 def cluster_links(
1002 self, target_node: "TargetNodesT"
1003 ) -> list[Any] | Awaitable[list[Any]]:
1004 """
1005 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each
1006 peer in the cluster: One for sending outbound messages towards the peer and one
1007 for receiving inbound messages from the peer.
1008
1009 This command outputs information of all such peer links as an array.
1010
1011 For more information see https://redis.io/commands/cluster-links
1012 """
1013 return self.execute_command("CLUSTER LINKS", target_nodes=target_node)
1014
1015 def cluster_flushslots(self, target_nodes: "TargetNodesT" | None = None) -> None:
1016 raise NotImplementedError(
1017 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client."
1018 )
1019
1020 def cluster_bumpepoch(self, target_nodes: "TargetNodesT" | None = None) -> None:
1021 raise NotImplementedError(
1022 "CLUSTER BUMPEPOCH is intentionally not implemented in the client."
1023 )
1024
1025 def readonly(self, target_nodes: "TargetNodesT" | None = None) -> ResponseT:
1026 """
1027 Enables read queries.
1028 The command will be sent to the default cluster node if target_nodes is
1029 not specified.
1030
1031 For more information see https://redis.io/commands/readonly
1032 """
1033 if target_nodes == "replicas" or target_nodes == "all":
1034 # read_from_replicas will only be enabled if the READONLY command
1035 # is sent to all replicas
1036 self.read_from_replicas = True
1037 return self.execute_command("READONLY", target_nodes=target_nodes)
1038
1039 def readwrite(self, target_nodes: "TargetNodesT" | None = None) -> ResponseT:
1040 """
1041 Disables read queries.
1042 The command will be sent to the default cluster node if target_nodes is
1043 not specified.
1044
1045 For more information see https://redis.io/commands/readwrite
1046 """
1047 # Reset read from replicas flag
1048 self.read_from_replicas = False
1049 return self.execute_command("READWRITE", target_nodes=target_nodes)
1050
1051 @deprecated_function(
1052 version="7.2.0",
1053 reason="Use client-side caching feature instead.",
1054 )
1055 def client_tracking_on(
1056 self,
1057 clientid: int | None = None,
1058 prefix: Sequence[KeyT] = [],
1059 bcast: bool = False,
1060 optin: bool = False,
1061 optout: bool = False,
1062 noloop: bool = False,
1063 target_nodes: "TargetNodesT" | None = "all",
1064 ) -> ResponseT:
1065 """
1066 Enables the tracking feature of the Redis server, that is used
1067 for server assisted client side caching.
1068
1069 When clientid is provided - in target_nodes only the node that owns the
1070 connection with this id should be provided.
1071 When clientid is not provided - target_nodes can be any node.
1072
1073 For more information see https://redis.io/commands/client-tracking
1074 """
1075 return self.client_tracking(
1076 True,
1077 clientid,
1078 prefix,
1079 bcast,
1080 optin,
1081 optout,
1082 noloop,
1083 target_nodes=target_nodes,
1084 )
1085
1086 @deprecated_function(
1087 version="7.2.0",
1088 reason="Use client-side caching feature instead.",
1089 )
1090 def client_tracking_off(
1091 self,
1092 clientid: int | None = None,
1093 prefix: Sequence[KeyT] = [],
1094 bcast: bool = False,
1095 optin: bool = False,
1096 optout: bool = False,
1097 noloop: bool = False,
1098 target_nodes: "TargetNodesT" | None = "all",
1099 ) -> ResponseT:
1100 """
1101 Disables the tracking feature of the Redis server, that is used
1102 for server assisted client side caching.
1103
1104 When clientid is provided - in target_nodes only the node that owns the
1105 connection with this id should be provided.
1106 When clientid is not provided - target_nodes can be any node.
1107
1108 For more information see https://redis.io/commands/client-tracking
1109 """
1110 return self.client_tracking(
1111 False,
1112 clientid,
1113 prefix,
1114 bcast,
1115 optin,
1116 optout,
1117 noloop,
1118 target_nodes=target_nodes,
1119 )
1120
1121 def hotkeys_start(
1122 self,
1123 metrics: List[HotkeysMetricsTypes],
1124 count: int | None = None,
1125 duration: int | None = None,
1126 sample_ratio: int | None = None,
1127 slots: List[int] | None = None,
1128 **kwargs,
1129 ) -> str | bytes:
1130 """
1131 Cluster client does not support hotkeys command. Please use the non-cluster client.
1132
1133 For more information see https://redis.io/commands/hotkeys-start
1134 """
1135 raise NotImplementedError(
1136 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
1137 )
1138
1139 def hotkeys_stop(self, **kwargs) -> str | bytes:
1140 """
1141 Cluster client does not support hotkeys command. Please use the non-cluster client.
1142
1143 For more information see https://redis.io/commands/hotkeys-stop
1144 """
1145 raise NotImplementedError(
1146 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
1147 )
1148
1149 def hotkeys_reset(self, **kwargs) -> str | bytes:
1150 """
1151 Cluster client does not support hotkeys command. Please use the non-cluster client.
1152
1153 For more information see https://redis.io/commands/hotkeys-reset
1154 """
1155 raise NotImplementedError(
1156 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
1157 )
1158
1159 def hotkeys_get(self, **kwargs) -> list[dict[str | bytes, Any]]:
1160 """
1161 Cluster client does not support hotkeys command. Please use the non-cluster client.
1162
1163 For more information see https://redis.io/commands/hotkeys-get
1164 """
1165 raise NotImplementedError(
1166 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
1167 )
1168
1169
1170class AsyncClusterManagementCommands(
1171 ClusterManagementCommands, AsyncManagementCommands
1172):
1173 """
1174 A class for Redis Cluster management commands
1175
1176 The class inherits from Redis's core ManagementCommands class and do the
1177 required adjustments to work with cluster mode
1178 """
1179
1180 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
1181 """
1182 Set hash slots as unbound in the cluster.
1183 It determines by it self what node the slot is in and sends it there
1184
1185 Returns a list of the results for each processed slot.
1186
1187 For more information see https://redis.io/commands/cluster-delslots
1188 """
1189 return await asyncio.gather(
1190 *(
1191 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot))
1192 for slot in slots
1193 )
1194 )
1195
1196 @deprecated_function(
1197 version="7.2.0",
1198 reason="Use client-side caching feature instead.",
1199 )
1200 async def client_tracking_on(
1201 self,
1202 clientid: int | None = None,
1203 prefix: Sequence[KeyT] = [],
1204 bcast: bool = False,
1205 optin: bool = False,
1206 optout: bool = False,
1207 noloop: bool = False,
1208 target_nodes: "TargetNodesT" | None = "all",
1209 ) -> ResponseT:
1210 """
1211 Enables the tracking feature of the Redis server, that is used
1212 for server assisted client side caching.
1213
1214 When clientid is provided - in target_nodes only the node that owns the
1215 connection with this id should be provided.
1216 When clientid is not provided - target_nodes can be any node.
1217
1218 For more information see https://redis.io/commands/client-tracking
1219 """
1220 return await self.client_tracking(
1221 True,
1222 clientid,
1223 prefix,
1224 bcast,
1225 optin,
1226 optout,
1227 noloop,
1228 target_nodes=target_nodes,
1229 )
1230
1231 @deprecated_function(
1232 version="7.2.0",
1233 reason="Use client-side caching feature instead.",
1234 )
1235 async def client_tracking_off(
1236 self,
1237 clientid: int | None = None,
1238 prefix: Sequence[KeyT] = [],
1239 bcast: bool = False,
1240 optin: bool = False,
1241 optout: bool = False,
1242 noloop: bool = False,
1243 target_nodes: "TargetNodesT" | None = "all",
1244 ) -> ResponseT:
1245 """
1246 Disables the tracking feature of the Redis server, that is used
1247 for server assisted client side caching.
1248
1249 When clientid is provided - in target_nodes only the node that owns the
1250 connection with this id should be provided.
1251 When clientid is not provided - target_nodes can be any node.
1252
1253 For more information see https://redis.io/commands/client-tracking
1254 """
1255 return await self.client_tracking(
1256 False,
1257 clientid,
1258 prefix,
1259 bcast,
1260 optin,
1261 optout,
1262 noloop,
1263 target_nodes=target_nodes,
1264 )
1265
1266 async def hotkeys_start(
1267 self,
1268 metrics: List[HotkeysMetricsTypes],
1269 count: int | None = None,
1270 duration: int | None = None,
1271 sample_ratio: int | None = None,
1272 slots: List[int] | None = None,
1273 **kwargs,
1274 ) -> Awaitable[str | bytes]:
1275 """
1276 Cluster client does not support hotkeys command. Please use the non-cluster client.
1277
1278 For more information see https://redis.io/commands/hotkeys-start
1279 """
1280 raise NotImplementedError(
1281 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
1282 )
1283
1284 async def hotkeys_stop(self, **kwargs) -> Awaitable[str | bytes]:
1285 """
1286 Cluster client does not support hotkeys command. Please use the non-cluster client.
1287
1288 For more information see https://redis.io/commands/hotkeys-stop
1289 """
1290 raise NotImplementedError(
1291 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
1292 )
1293
1294 async def hotkeys_reset(self, **kwargs) -> Awaitable[str | bytes]:
1295 """
1296 Cluster client does not support hotkeys command. Please use the non-cluster client.
1297
1298 For more information see https://redis.io/commands/hotkeys-reset
1299 """
1300 raise NotImplementedError(
1301 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
1302 )
1303
1304 async def hotkeys_get(self, **kwargs) -> Awaitable[list[dict[str | bytes, Any]]]:
1305 """
1306 Cluster client does not support hotkeys command. Please use the non-cluster client.
1307
1308 For more information see https://redis.io/commands/hotkeys-get
1309 """
1310 raise NotImplementedError(
1311 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
1312 )
1313
1314
1315class ClusterDataAccessCommands(DataAccessCommands):
1316 """
1317 A class for Redis Cluster Data Access Commands
1318
1319 The class inherits from Redis's core DataAccessCommand class and do the
1320 required adjustments to work with cluster mode
1321 """
1322
1323 @overload
1324 def stralgo(
1325 self: SyncClientProtocol,
1326 algo: Literal["LCS"],
1327 value1: KeyT,
1328 value2: KeyT,
1329 specific_argument: Literal["strings"] | Literal["keys"] = "strings",
1330 len: bool = False,
1331 idx: bool = False,
1332 minmatchlen: int | None = None,
1333 withmatchlen: bool = False,
1334 **kwargs,
1335 ) -> StralgoResponse: ...
1336
1337 @overload
1338 def stralgo(
1339 self: AsyncClientProtocol,
1340 algo: Literal["LCS"],
1341 value1: KeyT,
1342 value2: KeyT,
1343 specific_argument: Literal["strings"] | Literal["keys"] = "strings",
1344 len: bool = False,
1345 idx: bool = False,
1346 minmatchlen: int | None = None,
1347 withmatchlen: bool = False,
1348 **kwargs,
1349 ) -> Awaitable[StralgoResponse]: ...
1350
1351 def stralgo(
1352 self,
1353 algo: Literal["LCS"],
1354 value1: KeyT,
1355 value2: KeyT,
1356 specific_argument: Literal["strings"] | Literal["keys"] = "strings",
1357 len: bool = False,
1358 idx: bool = False,
1359 minmatchlen: int | None = None,
1360 withmatchlen: bool = False,
1361 **kwargs,
1362 ) -> StralgoResponse | Awaitable[StralgoResponse]:
1363 """
1364 Implements complex algorithms that operate on strings.
1365 Right now the only algorithm implemented is the LCS algorithm
1366 (longest common substring). However new algorithms could be
1367 implemented in the future.
1368
1369 ``algo`` Right now must be LCS
1370 ``value1`` and ``value2`` Can be two strings or two keys
1371 ``specific_argument`` Specifying if the arguments to the algorithm
1372 will be keys or strings. strings is the default.
1373 ``len`` Returns just the len of the match.
1374 ``idx`` Returns the match positions in each string.
1375 ``minmatchlen`` Restrict the list of matches to the ones of a given
1376 minimal length. Can be provided only when ``idx`` set to True.
1377 ``withmatchlen`` Returns the matches with the len of the match.
1378 Can be provided only when ``idx`` set to True.
1379
1380 For more information see https://redis.io/commands/stralgo
1381 """
1382 target_nodes = kwargs.pop("target_nodes", None)
1383 if specific_argument == "strings" and target_nodes is None:
1384 target_nodes = "default-node"
1385 kwargs.update({"target_nodes": target_nodes})
1386 return super().stralgo(
1387 algo,
1388 value1,
1389 value2,
1390 specific_argument,
1391 len,
1392 idx,
1393 minmatchlen,
1394 withmatchlen,
1395 **kwargs,
1396 )
1397
1398 def scan_iter(
1399 self,
1400 match: PatternT | None = None,
1401 count: int | None = None,
1402 _type: str | None = None,
1403 **kwargs,
1404 ) -> Iterator:
1405 # Do the first query with cursor=0 for all nodes
1406 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
1407 yield from data
1408
1409 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
1410 if cursors:
1411 # Get nodes by name
1412 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
1413
1414 # Iterate over each node till its cursor is 0
1415 kwargs.pop("target_nodes", None)
1416 while cursors:
1417 for name, cursor in cursors.items():
1418 cur, data = self.scan(
1419 cursor=cursor,
1420 match=match,
1421 count=count,
1422 _type=_type,
1423 target_nodes=nodes[name],
1424 **kwargs,
1425 )
1426 yield from data
1427 cursors[name] = cur[name]
1428
1429 cursors = {
1430 name: cursor for name, cursor in cursors.items() if cursor != 0
1431 }
1432
1433
1434class AsyncClusterDataAccessCommands(
1435 ClusterDataAccessCommands, AsyncDataAccessCommands
1436):
1437 """
1438 A class for Redis Cluster Data Access Commands
1439
1440 The class inherits from Redis's core DataAccessCommand class and do the
1441 required adjustments to work with cluster mode
1442 """
1443
1444 async def scan_iter(
1445 self,
1446 match: PatternT | None = None,
1447 count: int | None = None,
1448 _type: str | None = None,
1449 **kwargs,
1450 ) -> AsyncIterator:
1451 # Do the first query with cursor=0 for all nodes
1452 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
1453 for value in data:
1454 yield value
1455
1456 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
1457 if cursors:
1458 # Get nodes by name
1459 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
1460
1461 # Iterate over each node till its cursor is 0
1462 kwargs.pop("target_nodes", None)
1463 while cursors:
1464 for name, cursor in cursors.items():
1465 cur, data = await self.scan(
1466 cursor=cursor,
1467 match=match,
1468 count=count,
1469 _type=_type,
1470 target_nodes=nodes[name],
1471 **kwargs,
1472 )
1473 for value in data:
1474 yield value
1475 cursors[name] = cur[name]
1476
1477 cursors = {
1478 name: cursor for name, cursor in cursors.items() if cursor != 0
1479 }
1480
1481
1482class RedisClusterCommands(
1483 ClusterMultiKeyCommands,
1484 ClusterManagementCommands,
1485 ACLCommands,
1486 PubSubCommands,
1487 ClusterDataAccessCommands,
1488 ScriptCommands,
1489 FunctionCommands,
1490 ModuleCommands,
1491 RedisModuleCommands,
1492):
1493 """
1494 A class for all Redis Cluster commands
1495
1496 For key-based commands, the target node(s) will be internally determined
1497 by the keys' hash slot.
1498 Non-key-based commands can be executed with the 'target_nodes' argument to
1499 target specific nodes. By default, if target_nodes is not specified, the
1500 command will be executed on the default cluster node.
1501
1502 :param :target_nodes: type can be one of the followings:
1503 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
1504 - 'ClusterNode'
1505 - 'list(ClusterNodes)'
1506 - 'dict(any:clusterNodes)'
1507
1508 for example:
1509 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
1510 """
1511
1512
1513class AsyncRedisClusterCommands(
1514 AsyncClusterMultiKeyCommands,
1515 AsyncClusterManagementCommands,
1516 AsyncACLCommands,
1517 PubSubCommands,
1518 AsyncClusterDataAccessCommands,
1519 AsyncScriptCommands,
1520 AsyncFunctionCommands,
1521 AsyncModuleCommands,
1522 AsyncRedisModuleCommands,
1523):
1524 """
1525 A class for all Redis Cluster commands
1526
1527 For key-based commands, the target node(s) will be internally determined
1528 by the keys' hash slot.
1529 Non-key-based commands can be executed with the 'target_nodes' argument to
1530 target specific nodes. By default, if target_nodes is not specified, the
1531 command will be executed on the default cluster node.
1532
1533 :param :target_nodes: type can be one of the followings:
1534 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
1535 - 'ClusterNode'
1536 - 'list(ClusterNodes)'
1537 - 'dict(any:clusterNodes)'
1538
1539 for example:
1540 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
1541 """