1import asyncio
2from typing import (
3 TYPE_CHECKING,
4 Any,
5 AsyncIterator,
6 Awaitable,
7 Dict,
8 Iterable,
9 Iterator,
10 List,
11 Literal,
12 Mapping,
13 NoReturn,
14 Optional,
15 Sequence,
16 Union,
17)
18
19from redis.crc import key_slot
20from redis.exceptions import RedisClusterException, RedisError
21from redis.typing import (
22 AnyKeyT,
23 ClusterCommandsProtocol,
24 EncodableT,
25 KeysT,
26 KeyT,
27 PatternT,
28 ResponseT,
29)
30from redis.utils import deprecated_function
31
32from .core import (
33 ACLCommands,
34 AsyncACLCommands,
35 AsyncDataAccessCommands,
36 AsyncFunctionCommands,
37 AsyncManagementCommands,
38 AsyncModuleCommands,
39 AsyncScriptCommands,
40 DataAccessCommands,
41 FunctionCommands,
42 HotkeysMetricsTypes,
43 ManagementCommands,
44 ModuleCommands,
45 PubSubCommands,
46 ScriptCommands,
47)
48from .helpers import list_or_args
49from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
50
51if TYPE_CHECKING:
52 from redis.asyncio.cluster import TargetNodesT
53
54# Not complete, but covers the major ones
55# https://redis.io/commands
56READ_COMMANDS = frozenset(
57 [
58 # Bit Operations
59 "BITCOUNT",
60 "BITFIELD_RO",
61 "BITPOS",
62 # Scripting
63 "EVAL_RO",
64 "EVALSHA_RO",
65 "FCALL_RO",
66 # Key Operations
67 "DBSIZE",
68 "DIGEST",
69 "DUMP",
70 "EXISTS",
71 "EXPIRETIME",
72 "PEXPIRETIME",
73 "KEYS",
74 "SCAN",
75 "PTTL",
76 "RANDOMKEY",
77 "TTL",
78 "TYPE",
79 # String Operations
80 "GET",
81 "GETBIT",
82 "GETRANGE",
83 "MGET",
84 "STRLEN",
85 "LCS",
86 # Geo Operations
87 "GEODIST",
88 "GEOHASH",
89 "GEOPOS",
90 "GEOSEARCH",
91 # Hash Operations
92 "HEXISTS",
93 "HGET",
94 "HGETALL",
95 "HKEYS",
96 "HLEN",
97 "HMGET",
98 "HSTRLEN",
99 "HVALS",
100 "HRANDFIELD",
101 "HEXPIRETIME",
102 "HPEXPIRETIME",
103 "HTTL",
104 "HPTTL",
105 "HSCAN",
106 # List Operations
107 "LINDEX",
108 "LPOS",
109 "LLEN",
110 "LRANGE",
111 # Set Operations
112 "SCARD",
113 "SDIFF",
114 "SINTER",
115 "SINTERCARD",
116 "SISMEMBER",
117 "SMISMEMBER",
118 "SMEMBERS",
119 "SRANDMEMBER",
120 "SUNION",
121 "SSCAN",
122 # Sorted Set Operations
123 "ZCARD",
124 "ZCOUNT",
125 "ZDIFF",
126 "ZINTER",
127 "ZINTERCARD",
128 "ZLEXCOUNT",
129 "ZMSCORE",
130 "ZRANDMEMBER",
131 "ZRANGE",
132 "ZRANGEBYLEX",
133 "ZRANGEBYSCORE",
134 "ZRANK",
135 "ZREVRANGE",
136 "ZREVRANGEBYLEX",
137 "ZREVRANGEBYSCORE",
138 "ZREVRANK",
139 "ZSCAN",
140 "ZSCORE",
141 "ZUNION",
142 # Stream Operations
143 "XLEN",
144 "XPENDING",
145 "XRANGE",
146 "XREAD",
147 "XREVRANGE",
148 # JSON Module
149 "JSON.ARRINDEX",
150 "JSON.ARRLEN",
151 "JSON.GET",
152 "JSON.MGET",
153 "JSON.OBJKEYS",
154 "JSON.OBJLEN",
155 "JSON.RESP",
156 "JSON.STRLEN",
157 "JSON.TYPE",
158 # RediSearch Module
159 "FT.EXPLAIN",
160 "FT.INFO",
161 "FT.PROFILE",
162 "FT.SEARCH",
163 ]
164)
165
166
167class ClusterMultiKeyCommands(ClusterCommandsProtocol):
168 """
169 A class containing commands that handle more than one key
170 """
171
172 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]:
173 """Split keys into a dictionary that maps a slot to a list of keys."""
174
175 slots_to_keys = {}
176 for key in keys:
177 slot = key_slot(self.encoder.encode(key))
178 slots_to_keys.setdefault(slot, []).append(key)
179
180 return slots_to_keys
181
182 def _partition_pairs_by_slot(
183 self, mapping: Mapping[AnyKeyT, EncodableT]
184 ) -> Dict[int, List[EncodableT]]:
185 """Split pairs into a dictionary that maps a slot to a list of pairs."""
186
187 slots_to_pairs = {}
188 for pair in mapping.items():
189 slot = key_slot(self.encoder.encode(pair[0]))
190 slots_to_pairs.setdefault(slot, []).extend(pair)
191
192 return slots_to_pairs
193
194 def _execute_pipeline_by_slot(
195 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
196 ) -> List[Any]:
197 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
198 pipe = self.pipeline()
199 [
200 pipe.execute_command(
201 command,
202 *slot_args,
203 target_nodes=[
204 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
205 ],
206 )
207 for slot, slot_args in slots_to_args.items()
208 ]
209 return pipe.execute()
210
211 def _reorder_keys_by_command(
212 self,
213 keys: Iterable[KeyT],
214 slots_to_args: Mapping[int, Iterable[EncodableT]],
215 responses: Iterable[Any],
216 ) -> List[Any]:
217 results = {
218 k: v
219 for slot_values, response in zip(slots_to_args.values(), responses)
220 for k, v in zip(slot_values, response)
221 }
222 return [results[key] for key in keys]
223
224 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
225 """
226 Splits the keys into different slots and then calls MGET
227 for the keys of every slot. This operation will not be atomic
228 if keys belong to more than one slot.
229
230 Returns a list of values ordered identically to ``keys``
231
232 For more information see https://redis.io/commands/mget
233 """
234
235 # Concatenate all keys into a list
236 keys = list_or_args(keys, args)
237
238 # Split keys into slots
239 slots_to_keys = self._partition_keys_by_slot(keys)
240
241 # Execute commands using a pipeline
242 res = self._execute_pipeline_by_slot("MGET", slots_to_keys)
243
244 # Reorder keys in the order the user provided & return
245 return self._reorder_keys_by_command(keys, slots_to_keys, res)
246
247 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
248 """
249 Sets key/values based on a mapping. Mapping is a dictionary of
250 key/value pairs. Both keys and values should be strings or types that
251 can be cast to a string via str().
252
253 Splits the keys into different slots and then calls MSET
254 for the keys of every slot. This operation will not be atomic
255 if keys belong to more than one slot.
256
257 For more information see https://redis.io/commands/mset
258 """
259
260 # Partition the keys by slot
261 slots_to_pairs = self._partition_pairs_by_slot(mapping)
262
263 # Execute commands using a pipeline & return list of replies
264 return self._execute_pipeline_by_slot("MSET", slots_to_pairs)
265
266 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
267 """
268 Runs the given command once for the keys
269 of each slot. Returns the sum of the return values.
270 """
271
272 # Partition the keys by slot
273 slots_to_keys = self._partition_keys_by_slot(keys)
274
275 # Sum up the reply from each command
276 return sum(self._execute_pipeline_by_slot(command, slots_to_keys))
277
278 def exists(self, *keys: KeyT) -> ResponseT:
279 """
280 Returns the number of ``names`` that exist in the
281 whole cluster. The keys are first split up into slots
282 and then an EXISTS command is sent for every slot
283
284 For more information see https://redis.io/commands/exists
285 """
286 return self._split_command_across_slots("EXISTS", *keys)
287
288 def delete(self, *keys: KeyT) -> ResponseT:
289 """
290 Deletes the given keys in the cluster.
291 The keys are first split up into slots
292 and then an DEL command is sent for every slot
293
294 Non-existent keys are ignored.
295 Returns the number of keys that were deleted.
296
297 For more information see https://redis.io/commands/del
298 """
299 return self._split_command_across_slots("DEL", *keys)
300
301 def touch(self, *keys: KeyT) -> ResponseT:
302 """
303 Updates the last access time of given keys across the
304 cluster.
305
306 The keys are first split up into slots
307 and then an TOUCH command is sent for every slot
308
309 Non-existent keys are ignored.
310 Returns the number of keys that were touched.
311
312 For more information see https://redis.io/commands/touch
313 """
314 return self._split_command_across_slots("TOUCH", *keys)
315
316 def unlink(self, *keys: KeyT) -> ResponseT:
317 """
318 Remove the specified keys in a different thread.
319
320 The keys are first split up into slots
321 and then an TOUCH command is sent for every slot
322
323 Non-existent keys are ignored.
324 Returns the number of keys that were unlinked.
325
326 For more information see https://redis.io/commands/unlink
327 """
328 return self._split_command_across_slots("UNLINK", *keys)
329
330
331class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
332 """
333 A class containing commands that handle more than one key
334 """
335
336 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
337 """
338 Splits the keys into different slots and then calls MGET
339 for the keys of every slot. This operation will not be atomic
340 if keys belong to more than one slot.
341
342 Returns a list of values ordered identically to ``keys``
343
344 For more information see https://redis.io/commands/mget
345 """
346
347 # Concatenate all keys into a list
348 keys = list_or_args(keys, args)
349
350 # Split keys into slots
351 slots_to_keys = self._partition_keys_by_slot(keys)
352
353 # Execute commands using a pipeline
354 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys)
355
356 # Reorder keys in the order the user provided & return
357 return self._reorder_keys_by_command(keys, slots_to_keys, res)
358
359 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
360 """
361 Sets key/values based on a mapping. Mapping is a dictionary of
362 key/value pairs. Both keys and values should be strings or types that
363 can be cast to a string via str().
364
365 Splits the keys into different slots and then calls MSET
366 for the keys of every slot. This operation will not be atomic
367 if keys belong to more than one slot.
368
369 For more information see https://redis.io/commands/mset
370 """
371
372 # Partition the keys by slot
373 slots_to_pairs = self._partition_pairs_by_slot(mapping)
374
375 # Execute commands using a pipeline & return list of replies
376 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs)
377
378 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
379 """
380 Runs the given command once for the keys
381 of each slot. Returns the sum of the return values.
382 """
383
384 # Partition the keys by slot
385 slots_to_keys = self._partition_keys_by_slot(keys)
386
387 # Sum up the reply from each command
388 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))
389
390 async def _execute_pipeline_by_slot(
391 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
392 ) -> List[Any]:
393 if self._initialize:
394 await self.initialize()
395 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
396 pipe = self.pipeline()
397 [
398 pipe.execute_command(
399 command,
400 *slot_args,
401 target_nodes=[
402 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
403 ],
404 )
405 for slot, slot_args in slots_to_args.items()
406 ]
407 return await pipe.execute()
408
409
410class ClusterManagementCommands(ManagementCommands):
411 """
412 A class for Redis Cluster management commands
413
414 The class inherits from Redis's core ManagementCommands class and do the
415 required adjustments to work with cluster mode
416 """
417
418 def slaveof(self, *args, **kwargs) -> NoReturn:
419 """
420 Make the server a replica of another instance, or promote it as master.
421
422 For more information see https://redis.io/commands/slaveof
423 """
424 raise RedisClusterException("SLAVEOF is not supported in cluster mode")
425
426 def replicaof(self, *args, **kwargs) -> NoReturn:
427 """
428 Make the server a replica of another instance, or promote it as master.
429
430 For more information see https://redis.io/commands/replicaof
431 """
432 raise RedisClusterException("REPLICAOF is not supported in cluster mode")
433
434 def swapdb(self, *args, **kwargs) -> NoReturn:
435 """
436 Swaps two Redis databases.
437
438 For more information see https://redis.io/commands/swapdb
439 """
440 raise RedisClusterException("SWAPDB is not supported in cluster mode")
441
442 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT:
443 """
444 Returns the node's id.
445
446 :target_node: 'ClusterNode'
447 The node to execute the command on
448
449 For more information check https://redis.io/commands/cluster-myid/
450 """
451 return self.execute_command("CLUSTER MYID", target_nodes=target_node)
452
453 def cluster_addslots(
454 self, target_node: "TargetNodesT", *slots: EncodableT
455 ) -> ResponseT:
456 """
457 Assign new hash slots to receiving node. Sends to specified node.
458
459 :target_node: 'ClusterNode'
460 The node to execute the command on
461
462 For more information see https://redis.io/commands/cluster-addslots
463 """
464 return self.execute_command(
465 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node
466 )
467
468 def cluster_addslotsrange(
469 self, target_node: "TargetNodesT", *slots: EncodableT
470 ) -> ResponseT:
471 """
472 Similar to the CLUSTER ADDSLOTS command.
473 The difference between the two commands is that ADDSLOTS takes a list of slots
474 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges
475 (specified by start and end slots) to assign to the node.
476
477 :target_node: 'ClusterNode'
478 The node to execute the command on
479
480 For more information see https://redis.io/commands/cluster-addslotsrange
481 """
482 return self.execute_command(
483 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node
484 )
485
486 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT:
487 """
488 Return the number of local keys in the specified hash slot
489 Send to node based on specified slot_id
490
491 For more information see https://redis.io/commands/cluster-countkeysinslot
492 """
493 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id)
494
495 def cluster_count_failure_report(self, node_id: str) -> ResponseT:
496 """
497 Return the number of failure reports active for a given node
498 Sends to a random node
499
500 For more information see https://redis.io/commands/cluster-count-failure-reports
501 """
502 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id)
503
504 def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
505 """
506 Set hash slots as unbound in the cluster.
507 It determines by it self what node the slot is in and sends it there
508
509 Returns a list of the results for each processed slot.
510
511 For more information see https://redis.io/commands/cluster-delslots
512 """
513 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots]
514
515 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT:
516 """
517 Similar to the CLUSTER DELSLOTS command.
518 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove
519 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove
520 from the node.
521
522 For more information see https://redis.io/commands/cluster-delslotsrange
523 """
524 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots)
525
526 def cluster_failover(
527 self, target_node: "TargetNodesT", option: Optional[str] = None
528 ) -> ResponseT:
529 """
530 Forces a slave to perform a manual failover of its master
531 Sends to specified node
532
533 :target_node: 'ClusterNode'
534 The node to execute the command on
535
536 For more information see https://redis.io/commands/cluster-failover
537 """
538 if option:
539 if option.upper() not in ["FORCE", "TAKEOVER"]:
540 raise RedisError(
541 f"Invalid option for CLUSTER FAILOVER command: {option}"
542 )
543 else:
544 return self.execute_command(
545 "CLUSTER FAILOVER", option, target_nodes=target_node
546 )
547 else:
548 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node)
549
550 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
551 """
552 Provides info about Redis Cluster node state.
553 The command will be sent to a random node in the cluster if no target
554 node is specified.
555
556 For more information see https://redis.io/commands/cluster-info
557 """
558 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes)
559
560 def cluster_keyslot(self, key: str) -> ResponseT:
561 """
562 Returns the hash slot of the specified key
563 Sends to random node in the cluster
564
565 For more information see https://redis.io/commands/cluster-keyslot
566 """
567 return self.execute_command("CLUSTER KEYSLOT", key)
568
569 def cluster_meet(
570 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None
571 ) -> ResponseT:
572 """
573 Force a node cluster to handshake with another node.
574 Sends to specified node.
575
576 For more information see https://redis.io/commands/cluster-meet
577 """
578 return self.execute_command(
579 "CLUSTER MEET", host, port, target_nodes=target_nodes
580 )
581
582 def cluster_nodes(self) -> ResponseT:
583 """
584 Get Cluster config for the node.
585 Sends to random node in the cluster
586
587 For more information see https://redis.io/commands/cluster-nodes
588 """
589 return self.execute_command("CLUSTER NODES")
590
591 def cluster_replicate(
592 self, target_nodes: "TargetNodesT", node_id: str
593 ) -> ResponseT:
594 """
595 Reconfigure a node as a slave of the specified master node
596
597 For more information see https://redis.io/commands/cluster-replicate
598 """
599 return self.execute_command(
600 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes
601 )
602
603 def cluster_reset(
604 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None
605 ) -> ResponseT:
606 """
607 Reset a Redis Cluster node
608
609 If 'soft' is True then it will send 'SOFT' argument
610 If 'soft' is False then it will send 'HARD' argument
611
612 For more information see https://redis.io/commands/cluster-reset
613 """
614 return self.execute_command(
615 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes
616 )
617
618 def cluster_save_config(
619 self, target_nodes: Optional["TargetNodesT"] = None
620 ) -> ResponseT:
621 """
622 Forces the node to save cluster state on disk
623
624 For more information see https://redis.io/commands/cluster-saveconfig
625 """
626 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes)
627
628 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT:
629 """
630 Returns the number of keys in the specified cluster slot
631
632 For more information see https://redis.io/commands/cluster-getkeysinslot
633 """
634 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys)
635
636 def cluster_set_config_epoch(
637 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None
638 ) -> ResponseT:
639 """
640 Set the configuration epoch in a new node
641
642 For more information see https://redis.io/commands/cluster-set-config-epoch
643 """
644 return self.execute_command(
645 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes
646 )
647
648 def cluster_setslot(
649 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str
650 ) -> ResponseT:
651 """
652 Bind an hash slot to a specific node
653
654 :target_node: 'ClusterNode'
655 The node to execute the command on
656
657 For more information see https://redis.io/commands/cluster-setslot
658 """
659 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"):
660 return self.execute_command(
661 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node
662 )
663 elif state.upper() == "STABLE":
664 raise RedisError('For "stable" state please use cluster_setslot_stable')
665 else:
666 raise RedisError(f"Invalid slot state: {state}")
667
668 def cluster_setslot_stable(self, slot_id: int) -> ResponseT:
669 """
670 Clears migrating / importing state from the slot.
671 It determines by it self what node the slot is in and sends it there.
672
673 For more information see https://redis.io/commands/cluster-setslot
674 """
675 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE")
676
677 def cluster_replicas(
678 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None
679 ) -> ResponseT:
680 """
681 Provides a list of replica nodes replicating from the specified primary
682 target node.
683
684 For more information see https://redis.io/commands/cluster-replicas
685 """
686 return self.execute_command(
687 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes
688 )
689
690 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
691 """
692 Get array of Cluster slot to node mappings
693
694 For more information see https://redis.io/commands/cluster-slots
695 """
696 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes)
697
698 def cluster_shards(self, target_nodes=None):
699 """
700 Returns details about the shards of the cluster.
701
702 For more information see https://redis.io/commands/cluster-shards
703 """
704 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes)
705
706 def cluster_myshardid(self, target_nodes=None):
707 """
708 Returns the shard ID of the node.
709
710 For more information see https://redis.io/commands/cluster-myshardid/
711 """
712 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes)
713
714 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT:
715 """
716 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each
717 peer in the cluster: One for sending outbound messages towards the peer and one
718 for receiving inbound messages from the peer.
719
720 This command outputs information of all such peer links as an array.
721
722 For more information see https://redis.io/commands/cluster-links
723 """
724 return self.execute_command("CLUSTER LINKS", target_nodes=target_node)
725
726 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
727 raise NotImplementedError(
728 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client."
729 )
730
731 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
732 raise NotImplementedError(
733 "CLUSTER BUMPEPOCH is intentionally not implemented in the client."
734 )
735
736 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
737 """
738 Enables read queries.
739 The command will be sent to the default cluster node if target_nodes is
740 not specified.
741
742 For more information see https://redis.io/commands/readonly
743 """
744 if target_nodes == "replicas" or target_nodes == "all":
745 # read_from_replicas will only be enabled if the READONLY command
746 # is sent to all replicas
747 self.read_from_replicas = True
748 return self.execute_command("READONLY", target_nodes=target_nodes)
749
750 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
751 """
752 Disables read queries.
753 The command will be sent to the default cluster node if target_nodes is
754 not specified.
755
756 For more information see https://redis.io/commands/readwrite
757 """
758 # Reset read from replicas flag
759 self.read_from_replicas = False
760 return self.execute_command("READWRITE", target_nodes=target_nodes)
761
762 @deprecated_function(
763 version="7.2.0",
764 reason="Use client-side caching feature instead.",
765 )
766 def client_tracking_on(
767 self,
768 clientid: Optional[int] = None,
769 prefix: Sequence[KeyT] = [],
770 bcast: bool = False,
771 optin: bool = False,
772 optout: bool = False,
773 noloop: bool = False,
774 target_nodes: Optional["TargetNodesT"] = "all",
775 ) -> ResponseT:
776 """
777 Enables the tracking feature of the Redis server, that is used
778 for server assisted client side caching.
779
780 When clientid is provided - in target_nodes only the node that owns the
781 connection with this id should be provided.
782 When clientid is not provided - target_nodes can be any node.
783
784 For more information see https://redis.io/commands/client-tracking
785 """
786 return self.client_tracking(
787 True,
788 clientid,
789 prefix,
790 bcast,
791 optin,
792 optout,
793 noloop,
794 target_nodes=target_nodes,
795 )
796
797 @deprecated_function(
798 version="7.2.0",
799 reason="Use client-side caching feature instead.",
800 )
801 def client_tracking_off(
802 self,
803 clientid: Optional[int] = None,
804 prefix: Sequence[KeyT] = [],
805 bcast: bool = False,
806 optin: bool = False,
807 optout: bool = False,
808 noloop: bool = False,
809 target_nodes: Optional["TargetNodesT"] = "all",
810 ) -> ResponseT:
811 """
812 Disables the tracking feature of the Redis server, that is used
813 for server assisted client side caching.
814
815 When clientid is provided - in target_nodes only the node that owns the
816 connection with this id should be provided.
817 When clientid is not provided - target_nodes can be any node.
818
819 For more information see https://redis.io/commands/client-tracking
820 """
821 return self.client_tracking(
822 False,
823 clientid,
824 prefix,
825 bcast,
826 optin,
827 optout,
828 noloop,
829 target_nodes=target_nodes,
830 )
831
832 def hotkeys_start(
833 self,
834 metrics: List[HotkeysMetricsTypes],
835 count: Optional[int] = None,
836 duration: Optional[int] = None,
837 sample_ratio: Optional[int] = None,
838 slots: Optional[List[int]] = None,
839 **kwargs,
840 ) -> Union[str, bytes]:
841 """
842 Cluster client does not support hotkeys command. Please use the non-cluster client.
843
844 For more information see https://redis.io/commands/hotkeys-start
845 """
846 raise NotImplementedError(
847 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
848 )
849
850 def hotkeys_stop(self, **kwargs) -> Union[str, bytes]:
851 """
852 Cluster client does not support hotkeys command. Please use the non-cluster client.
853
854 For more information see https://redis.io/commands/hotkeys-stop
855 """
856 raise NotImplementedError(
857 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
858 )
859
860 def hotkeys_reset(self, **kwargs) -> Union[str, bytes]:
861 """
862 Cluster client does not support hotkeys command. Please use the non-cluster client.
863
864 For more information see https://redis.io/commands/hotkeys-reset
865 """
866 raise NotImplementedError(
867 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
868 )
869
870 def hotkeys_get(self, **kwargs) -> list[dict[Union[str, bytes], Any]]:
871 """
872 Cluster client does not support hotkeys command. Please use the non-cluster client.
873
874 For more information see https://redis.io/commands/hotkeys-get
875 """
876 raise NotImplementedError(
877 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
878 )
879
880
881class AsyncClusterManagementCommands(
882 ClusterManagementCommands, AsyncManagementCommands
883):
884 """
885 A class for Redis Cluster management commands
886
887 The class inherits from Redis's core ManagementCommands class and do the
888 required adjustments to work with cluster mode
889 """
890
891 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
892 """
893 Set hash slots as unbound in the cluster.
894 It determines by it self what node the slot is in and sends it there
895
896 Returns a list of the results for each processed slot.
897
898 For more information see https://redis.io/commands/cluster-delslots
899 """
900 return await asyncio.gather(
901 *(
902 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot))
903 for slot in slots
904 )
905 )
906
907 @deprecated_function(
908 version="7.2.0",
909 reason="Use client-side caching feature instead.",
910 )
911 async def client_tracking_on(
912 self,
913 clientid: Optional[int] = None,
914 prefix: Sequence[KeyT] = [],
915 bcast: bool = False,
916 optin: bool = False,
917 optout: bool = False,
918 noloop: bool = False,
919 target_nodes: Optional["TargetNodesT"] = "all",
920 ) -> ResponseT:
921 """
922 Enables the tracking feature of the Redis server, that is used
923 for server assisted client side caching.
924
925 When clientid is provided - in target_nodes only the node that owns the
926 connection with this id should be provided.
927 When clientid is not provided - target_nodes can be any node.
928
929 For more information see https://redis.io/commands/client-tracking
930 """
931 return await self.client_tracking(
932 True,
933 clientid,
934 prefix,
935 bcast,
936 optin,
937 optout,
938 noloop,
939 target_nodes=target_nodes,
940 )
941
942 @deprecated_function(
943 version="7.2.0",
944 reason="Use client-side caching feature instead.",
945 )
946 async def client_tracking_off(
947 self,
948 clientid: Optional[int] = None,
949 prefix: Sequence[KeyT] = [],
950 bcast: bool = False,
951 optin: bool = False,
952 optout: bool = False,
953 noloop: bool = False,
954 target_nodes: Optional["TargetNodesT"] = "all",
955 ) -> ResponseT:
956 """
957 Disables the tracking feature of the Redis server, that is used
958 for server assisted client side caching.
959
960 When clientid is provided - in target_nodes only the node that owns the
961 connection with this id should be provided.
962 When clientid is not provided - target_nodes can be any node.
963
964 For more information see https://redis.io/commands/client-tracking
965 """
966 return await self.client_tracking(
967 False,
968 clientid,
969 prefix,
970 bcast,
971 optin,
972 optout,
973 noloop,
974 target_nodes=target_nodes,
975 )
976
977 async def hotkeys_start(
978 self,
979 metrics: List[HotkeysMetricsTypes],
980 count: Optional[int] = None,
981 duration: Optional[int] = None,
982 sample_ratio: Optional[int] = None,
983 slots: Optional[List[int]] = None,
984 **kwargs,
985 ) -> Awaitable[Union[str, bytes]]:
986 """
987 Cluster client does not support hotkeys command. Please use the non-cluster client.
988
989 For more information see https://redis.io/commands/hotkeys-start
990 """
991 raise NotImplementedError(
992 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
993 )
994
995 async def hotkeys_stop(self, **kwargs) -> Awaitable[Union[str, bytes]]:
996 """
997 Cluster client does not support hotkeys command. Please use the non-cluster client.
998
999 For more information see https://redis.io/commands/hotkeys-stop
1000 """
1001 raise NotImplementedError(
1002 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
1003 )
1004
1005 async def hotkeys_reset(self, **kwargs) -> Awaitable[Union[str, bytes]]:
1006 """
1007 Cluster client does not support hotkeys command. Please use the non-cluster client.
1008
1009 For more information see https://redis.io/commands/hotkeys-reset
1010 """
1011 raise NotImplementedError(
1012 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
1013 )
1014
1015 async def hotkeys_get(
1016 self, **kwargs
1017 ) -> Awaitable[list[dict[Union[str, bytes], Any]]]:
1018 """
1019 Cluster client does not support hotkeys command. Please use the non-cluster client.
1020
1021 For more information see https://redis.io/commands/hotkeys-get
1022 """
1023 raise NotImplementedError(
1024 "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
1025 )
1026
1027
1028class ClusterDataAccessCommands(DataAccessCommands):
1029 """
1030 A class for Redis Cluster Data Access Commands
1031
1032 The class inherits from Redis's core DataAccessCommand class and do the
1033 required adjustments to work with cluster mode
1034 """
1035
1036 def stralgo(
1037 self,
1038 algo: Literal["LCS"],
1039 value1: KeyT,
1040 value2: KeyT,
1041 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings",
1042 len: bool = False,
1043 idx: bool = False,
1044 minmatchlen: Optional[int] = None,
1045 withmatchlen: bool = False,
1046 **kwargs,
1047 ) -> ResponseT:
1048 """
1049 Implements complex algorithms that operate on strings.
1050 Right now the only algorithm implemented is the LCS algorithm
1051 (longest common substring). However new algorithms could be
1052 implemented in the future.
1053
1054 ``algo`` Right now must be LCS
1055 ``value1`` and ``value2`` Can be two strings or two keys
1056 ``specific_argument`` Specifying if the arguments to the algorithm
1057 will be keys or strings. strings is the default.
1058 ``len`` Returns just the len of the match.
1059 ``idx`` Returns the match positions in each string.
1060 ``minmatchlen`` Restrict the list of matches to the ones of a given
1061 minimal length. Can be provided only when ``idx`` set to True.
1062 ``withmatchlen`` Returns the matches with the len of the match.
1063 Can be provided only when ``idx`` set to True.
1064
1065 For more information see https://redis.io/commands/stralgo
1066 """
1067 target_nodes = kwargs.pop("target_nodes", None)
1068 if specific_argument == "strings" and target_nodes is None:
1069 target_nodes = "default-node"
1070 kwargs.update({"target_nodes": target_nodes})
1071 return super().stralgo(
1072 algo,
1073 value1,
1074 value2,
1075 specific_argument,
1076 len,
1077 idx,
1078 minmatchlen,
1079 withmatchlen,
1080 **kwargs,
1081 )
1082
1083 def scan_iter(
1084 self,
1085 match: Optional[PatternT] = None,
1086 count: Optional[int] = None,
1087 _type: Optional[str] = None,
1088 **kwargs,
1089 ) -> Iterator:
1090 # Do the first query with cursor=0 for all nodes
1091 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
1092 yield from data
1093
1094 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
1095 if cursors:
1096 # Get nodes by name
1097 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
1098
1099 # Iterate over each node till its cursor is 0
1100 kwargs.pop("target_nodes", None)
1101 while cursors:
1102 for name, cursor in cursors.items():
1103 cur, data = self.scan(
1104 cursor=cursor,
1105 match=match,
1106 count=count,
1107 _type=_type,
1108 target_nodes=nodes[name],
1109 **kwargs,
1110 )
1111 yield from data
1112 cursors[name] = cur[name]
1113
1114 cursors = {
1115 name: cursor for name, cursor in cursors.items() if cursor != 0
1116 }
1117
1118
1119class AsyncClusterDataAccessCommands(
1120 ClusterDataAccessCommands, AsyncDataAccessCommands
1121):
1122 """
1123 A class for Redis Cluster Data Access Commands
1124
1125 The class inherits from Redis's core DataAccessCommand class and do the
1126 required adjustments to work with cluster mode
1127 """
1128
1129 async def scan_iter(
1130 self,
1131 match: Optional[PatternT] = None,
1132 count: Optional[int] = None,
1133 _type: Optional[str] = None,
1134 **kwargs,
1135 ) -> AsyncIterator:
1136 # Do the first query with cursor=0 for all nodes
1137 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
1138 for value in data:
1139 yield value
1140
1141 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
1142 if cursors:
1143 # Get nodes by name
1144 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
1145
1146 # Iterate over each node till its cursor is 0
1147 kwargs.pop("target_nodes", None)
1148 while cursors:
1149 for name, cursor in cursors.items():
1150 cur, data = await self.scan(
1151 cursor=cursor,
1152 match=match,
1153 count=count,
1154 _type=_type,
1155 target_nodes=nodes[name],
1156 **kwargs,
1157 )
1158 for value in data:
1159 yield value
1160 cursors[name] = cur[name]
1161
1162 cursors = {
1163 name: cursor for name, cursor in cursors.items() if cursor != 0
1164 }
1165
1166
1167class RedisClusterCommands(
1168 ClusterMultiKeyCommands,
1169 ClusterManagementCommands,
1170 ACLCommands,
1171 PubSubCommands,
1172 ClusterDataAccessCommands,
1173 ScriptCommands,
1174 FunctionCommands,
1175 ModuleCommands,
1176 RedisModuleCommands,
1177):
1178 """
1179 A class for all Redis Cluster commands
1180
1181 For key-based commands, the target node(s) will be internally determined
1182 by the keys' hash slot.
1183 Non-key-based commands can be executed with the 'target_nodes' argument to
1184 target specific nodes. By default, if target_nodes is not specified, the
1185 command will be executed on the default cluster node.
1186
1187 :param :target_nodes: type can be one of the followings:
1188 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
1189 - 'ClusterNode'
1190 - 'list(ClusterNodes)'
1191 - 'dict(any:clusterNodes)'
1192
1193 for example:
1194 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
1195 """
1196
1197
1198class AsyncRedisClusterCommands(
1199 AsyncClusterMultiKeyCommands,
1200 AsyncClusterManagementCommands,
1201 AsyncACLCommands,
1202 AsyncClusterDataAccessCommands,
1203 AsyncScriptCommands,
1204 AsyncFunctionCommands,
1205 AsyncModuleCommands,
1206 AsyncRedisModuleCommands,
1207):
1208 """
1209 A class for all Redis Cluster commands
1210
1211 For key-based commands, the target node(s) will be internally determined
1212 by the keys' hash slot.
1213 Non-key-based commands can be executed with the 'target_nodes' argument to
1214 target specific nodes. By default, if target_nodes is not specified, the
1215 command will be executed on the default cluster node.
1216
1217 :param :target_nodes: type can be one of the followings:
1218 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
1219 - 'ClusterNode'
1220 - 'list(ClusterNodes)'
1221 - 'dict(any:clusterNodes)'
1222
1223 for example:
1224 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
1225 """