1import asyncio
2from typing import (
3 TYPE_CHECKING,
4 Any,
5 AsyncIterator,
6 Dict,
7 Iterable,
8 Iterator,
9 List,
10 Literal,
11 Mapping,
12 NoReturn,
13 Optional,
14 Sequence,
15 Union,
16)
17
18from redis.crc import key_slot
19from redis.exceptions import RedisClusterException, RedisError
20from redis.typing import (
21 AnyKeyT,
22 ClusterCommandsProtocol,
23 EncodableT,
24 KeysT,
25 KeyT,
26 PatternT,
27 ResponseT,
28)
29from redis.utils import deprecated_function
30
31from .core import (
32 ACLCommands,
33 AsyncACLCommands,
34 AsyncDataAccessCommands,
35 AsyncFunctionCommands,
36 AsyncManagementCommands,
37 AsyncModuleCommands,
38 AsyncScriptCommands,
39 DataAccessCommands,
40 FunctionCommands,
41 ManagementCommands,
42 ModuleCommands,
43 PubSubCommands,
44 ScriptCommands,
45)
46from .helpers import list_or_args
47from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
48
49if TYPE_CHECKING:
50 from redis.asyncio.cluster import TargetNodesT
51
52# Not complete, but covers the major ones
53# https://redis.io/commands
54READ_COMMANDS = frozenset(
55 [
56 # Bit Operations
57 "BITCOUNT",
58 "BITFIELD_RO",
59 "BITPOS",
60 # Scripting
61 "EVAL_RO",
62 "EVALSHA_RO",
63 "FCALL_RO",
64 # Key Operations
65 "DBSIZE",
66 "DIGEST",
67 "DUMP",
68 "EXISTS",
69 "EXPIRETIME",
70 "PEXPIRETIME",
71 "KEYS",
72 "SCAN",
73 "PTTL",
74 "RANDOMKEY",
75 "TTL",
76 "TYPE",
77 # String Operations
78 "GET",
79 "GETBIT",
80 "GETRANGE",
81 "MGET",
82 "STRLEN",
83 "LCS",
84 # Geo Operations
85 "GEODIST",
86 "GEOHASH",
87 "GEOPOS",
88 "GEOSEARCH",
89 # Hash Operations
90 "HEXISTS",
91 "HGET",
92 "HGETALL",
93 "HKEYS",
94 "HLEN",
95 "HMGET",
96 "HSTRLEN",
97 "HVALS",
98 "HRANDFIELD",
99 "HEXPIRETIME",
100 "HPEXPIRETIME",
101 "HTTL",
102 "HPTTL",
103 "HSCAN",
104 # List Operations
105 "LINDEX",
106 "LPOS",
107 "LLEN",
108 "LRANGE",
109 # Set Operations
110 "SCARD",
111 "SDIFF",
112 "SINTER",
113 "SINTERCARD",
114 "SISMEMBER",
115 "SMISMEMBER",
116 "SMEMBERS",
117 "SRANDMEMBER",
118 "SUNION",
119 "SSCAN",
120 # Sorted Set Operations
121 "ZCARD",
122 "ZCOUNT",
123 "ZDIFF",
124 "ZINTER",
125 "ZINTERCARD",
126 "ZLEXCOUNT",
127 "ZMSCORE",
128 "ZRANDMEMBER",
129 "ZRANGE",
130 "ZRANGEBYLEX",
131 "ZRANGEBYSCORE",
132 "ZRANK",
133 "ZREVRANGE",
134 "ZREVRANGEBYLEX",
135 "ZREVRANGEBYSCORE",
136 "ZREVRANK",
137 "ZSCAN",
138 "ZSCORE",
139 "ZUNION",
140 # Stream Operations
141 "XLEN",
142 "XPENDING",
143 "XRANGE",
144 "XREAD",
145 "XREVRANGE",
146 # JSON Module
147 "JSON.ARRINDEX",
148 "JSON.ARRLEN",
149 "JSON.GET",
150 "JSON.MGET",
151 "JSON.OBJKEYS",
152 "JSON.OBJLEN",
153 "JSON.RESP",
154 "JSON.STRLEN",
155 "JSON.TYPE",
156 # RediSearch Module
157 "FT.EXPLAIN",
158 "FT.INFO",
159 "FT.PROFILE",
160 "FT.SEARCH",
161 ]
162)
163
164
165class ClusterMultiKeyCommands(ClusterCommandsProtocol):
166 """
167 A class containing commands that handle more than one key
168 """
169
170 def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]:
171 """Split keys into a dictionary that maps a slot to a list of keys."""
172
173 slots_to_keys = {}
174 for key in keys:
175 slot = key_slot(self.encoder.encode(key))
176 slots_to_keys.setdefault(slot, []).append(key)
177
178 return slots_to_keys
179
180 def _partition_pairs_by_slot(
181 self, mapping: Mapping[AnyKeyT, EncodableT]
182 ) -> Dict[int, List[EncodableT]]:
183 """Split pairs into a dictionary that maps a slot to a list of pairs."""
184
185 slots_to_pairs = {}
186 for pair in mapping.items():
187 slot = key_slot(self.encoder.encode(pair[0]))
188 slots_to_pairs.setdefault(slot, []).extend(pair)
189
190 return slots_to_pairs
191
192 def _execute_pipeline_by_slot(
193 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
194 ) -> List[Any]:
195 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
196 pipe = self.pipeline()
197 [
198 pipe.execute_command(
199 command,
200 *slot_args,
201 target_nodes=[
202 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
203 ],
204 )
205 for slot, slot_args in slots_to_args.items()
206 ]
207 return pipe.execute()
208
209 def _reorder_keys_by_command(
210 self,
211 keys: Iterable[KeyT],
212 slots_to_args: Mapping[int, Iterable[EncodableT]],
213 responses: Iterable[Any],
214 ) -> List[Any]:
215 results = {
216 k: v
217 for slot_values, response in zip(slots_to_args.values(), responses)
218 for k, v in zip(slot_values, response)
219 }
220 return [results[key] for key in keys]
221
222 def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
223 """
224 Splits the keys into different slots and then calls MGET
225 for the keys of every slot. This operation will not be atomic
226 if keys belong to more than one slot.
227
228 Returns a list of values ordered identically to ``keys``
229
230 For more information see https://redis.io/commands/mget
231 """
232
233 # Concatenate all keys into a list
234 keys = list_or_args(keys, args)
235
236 # Split keys into slots
237 slots_to_keys = self._partition_keys_by_slot(keys)
238
239 # Execute commands using a pipeline
240 res = self._execute_pipeline_by_slot("MGET", slots_to_keys)
241
242 # Reorder keys in the order the user provided & return
243 return self._reorder_keys_by_command(keys, slots_to_keys, res)
244
245 def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
246 """
247 Sets key/values based on a mapping. Mapping is a dictionary of
248 key/value pairs. Both keys and values should be strings or types that
249 can be cast to a string via str().
250
251 Splits the keys into different slots and then calls MSET
252 for the keys of every slot. This operation will not be atomic
253 if keys belong to more than one slot.
254
255 For more information see https://redis.io/commands/mset
256 """
257
258 # Partition the keys by slot
259 slots_to_pairs = self._partition_pairs_by_slot(mapping)
260
261 # Execute commands using a pipeline & return list of replies
262 return self._execute_pipeline_by_slot("MSET", slots_to_pairs)
263
264 def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
265 """
266 Runs the given command once for the keys
267 of each slot. Returns the sum of the return values.
268 """
269
270 # Partition the keys by slot
271 slots_to_keys = self._partition_keys_by_slot(keys)
272
273 # Sum up the reply from each command
274 return sum(self._execute_pipeline_by_slot(command, slots_to_keys))
275
276 def exists(self, *keys: KeyT) -> ResponseT:
277 """
278 Returns the number of ``names`` that exist in the
279 whole cluster. The keys are first split up into slots
280 and then an EXISTS command is sent for every slot
281
282 For more information see https://redis.io/commands/exists
283 """
284 return self._split_command_across_slots("EXISTS", *keys)
285
286 def delete(self, *keys: KeyT) -> ResponseT:
287 """
288 Deletes the given keys in the cluster.
289 The keys are first split up into slots
290 and then an DEL command is sent for every slot
291
292 Non-existent keys are ignored.
293 Returns the number of keys that were deleted.
294
295 For more information see https://redis.io/commands/del
296 """
297 return self._split_command_across_slots("DEL", *keys)
298
299 def touch(self, *keys: KeyT) -> ResponseT:
300 """
301 Updates the last access time of given keys across the
302 cluster.
303
304 The keys are first split up into slots
305 and then an TOUCH command is sent for every slot
306
307 Non-existent keys are ignored.
308 Returns the number of keys that were touched.
309
310 For more information see https://redis.io/commands/touch
311 """
312 return self._split_command_across_slots("TOUCH", *keys)
313
314 def unlink(self, *keys: KeyT) -> ResponseT:
315 """
316 Remove the specified keys in a different thread.
317
318 The keys are first split up into slots
319 and then an TOUCH command is sent for every slot
320
321 Non-existent keys are ignored.
322 Returns the number of keys that were unlinked.
323
324 For more information see https://redis.io/commands/unlink
325 """
326 return self._split_command_across_slots("UNLINK", *keys)
327
328
329class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
330 """
331 A class containing commands that handle more than one key
332 """
333
334 async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
335 """
336 Splits the keys into different slots and then calls MGET
337 for the keys of every slot. This operation will not be atomic
338 if keys belong to more than one slot.
339
340 Returns a list of values ordered identically to ``keys``
341
342 For more information see https://redis.io/commands/mget
343 """
344
345 # Concatenate all keys into a list
346 keys = list_or_args(keys, args)
347
348 # Split keys into slots
349 slots_to_keys = self._partition_keys_by_slot(keys)
350
351 # Execute commands using a pipeline
352 res = await self._execute_pipeline_by_slot("MGET", slots_to_keys)
353
354 # Reorder keys in the order the user provided & return
355 return self._reorder_keys_by_command(keys, slots_to_keys, res)
356
357 async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
358 """
359 Sets key/values based on a mapping. Mapping is a dictionary of
360 key/value pairs. Both keys and values should be strings or types that
361 can be cast to a string via str().
362
363 Splits the keys into different slots and then calls MSET
364 for the keys of every slot. This operation will not be atomic
365 if keys belong to more than one slot.
366
367 For more information see https://redis.io/commands/mset
368 """
369
370 # Partition the keys by slot
371 slots_to_pairs = self._partition_pairs_by_slot(mapping)
372
373 # Execute commands using a pipeline & return list of replies
374 return await self._execute_pipeline_by_slot("MSET", slots_to_pairs)
375
376 async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
377 """
378 Runs the given command once for the keys
379 of each slot. Returns the sum of the return values.
380 """
381
382 # Partition the keys by slot
383 slots_to_keys = self._partition_keys_by_slot(keys)
384
385 # Sum up the reply from each command
386 return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))
387
388 async def _execute_pipeline_by_slot(
389 self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
390 ) -> List[Any]:
391 if self._initialize:
392 await self.initialize()
393 read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
394 pipe = self.pipeline()
395 [
396 pipe.execute_command(
397 command,
398 *slot_args,
399 target_nodes=[
400 self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
401 ],
402 )
403 for slot, slot_args in slots_to_args.items()
404 ]
405 return await pipe.execute()
406
407
408class ClusterManagementCommands(ManagementCommands):
409 """
410 A class for Redis Cluster management commands
411
412 The class inherits from Redis's core ManagementCommands class and do the
413 required adjustments to work with cluster mode
414 """
415
416 def slaveof(self, *args, **kwargs) -> NoReturn:
417 """
418 Make the server a replica of another instance, or promote it as master.
419
420 For more information see https://redis.io/commands/slaveof
421 """
422 raise RedisClusterException("SLAVEOF is not supported in cluster mode")
423
424 def replicaof(self, *args, **kwargs) -> NoReturn:
425 """
426 Make the server a replica of another instance, or promote it as master.
427
428 For more information see https://redis.io/commands/replicaof
429 """
430 raise RedisClusterException("REPLICAOF is not supported in cluster mode")
431
432 def swapdb(self, *args, **kwargs) -> NoReturn:
433 """
434 Swaps two Redis databases.
435
436 For more information see https://redis.io/commands/swapdb
437 """
438 raise RedisClusterException("SWAPDB is not supported in cluster mode")
439
440 def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT:
441 """
442 Returns the node's id.
443
444 :target_node: 'ClusterNode'
445 The node to execute the command on
446
447 For more information check https://redis.io/commands/cluster-myid/
448 """
449 return self.execute_command("CLUSTER MYID", target_nodes=target_node)
450
451 def cluster_addslots(
452 self, target_node: "TargetNodesT", *slots: EncodableT
453 ) -> ResponseT:
454 """
455 Assign new hash slots to receiving node. Sends to specified node.
456
457 :target_node: 'ClusterNode'
458 The node to execute the command on
459
460 For more information see https://redis.io/commands/cluster-addslots
461 """
462 return self.execute_command(
463 "CLUSTER ADDSLOTS", *slots, target_nodes=target_node
464 )
465
466 def cluster_addslotsrange(
467 self, target_node: "TargetNodesT", *slots: EncodableT
468 ) -> ResponseT:
469 """
470 Similar to the CLUSTER ADDSLOTS command.
471 The difference between the two commands is that ADDSLOTS takes a list of slots
472 to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges
473 (specified by start and end slots) to assign to the node.
474
475 :target_node: 'ClusterNode'
476 The node to execute the command on
477
478 For more information see https://redis.io/commands/cluster-addslotsrange
479 """
480 return self.execute_command(
481 "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node
482 )
483
484 def cluster_countkeysinslot(self, slot_id: int) -> ResponseT:
485 """
486 Return the number of local keys in the specified hash slot
487 Send to node based on specified slot_id
488
489 For more information see https://redis.io/commands/cluster-countkeysinslot
490 """
491 return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id)
492
493 def cluster_count_failure_report(self, node_id: str) -> ResponseT:
494 """
495 Return the number of failure reports active for a given node
496 Sends to a random node
497
498 For more information see https://redis.io/commands/cluster-count-failure-reports
499 """
500 return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id)
501
502 def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
503 """
504 Set hash slots as unbound in the cluster.
505 It determines by it self what node the slot is in and sends it there
506
507 Returns a list of the results for each processed slot.
508
509 For more information see https://redis.io/commands/cluster-delslots
510 """
511 return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots]
512
513 def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT:
514 """
515 Similar to the CLUSTER DELSLOTS command.
516 The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove
517 from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove
518 from the node.
519
520 For more information see https://redis.io/commands/cluster-delslotsrange
521 """
522 return self.execute_command("CLUSTER DELSLOTSRANGE", *slots)
523
524 def cluster_failover(
525 self, target_node: "TargetNodesT", option: Optional[str] = None
526 ) -> ResponseT:
527 """
528 Forces a slave to perform a manual failover of its master
529 Sends to specified node
530
531 :target_node: 'ClusterNode'
532 The node to execute the command on
533
534 For more information see https://redis.io/commands/cluster-failover
535 """
536 if option:
537 if option.upper() not in ["FORCE", "TAKEOVER"]:
538 raise RedisError(
539 f"Invalid option for CLUSTER FAILOVER command: {option}"
540 )
541 else:
542 return self.execute_command(
543 "CLUSTER FAILOVER", option, target_nodes=target_node
544 )
545 else:
546 return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node)
547
548 def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
549 """
550 Provides info about Redis Cluster node state.
551 The command will be sent to a random node in the cluster if no target
552 node is specified.
553
554 For more information see https://redis.io/commands/cluster-info
555 """
556 return self.execute_command("CLUSTER INFO", target_nodes=target_nodes)
557
558 def cluster_keyslot(self, key: str) -> ResponseT:
559 """
560 Returns the hash slot of the specified key
561 Sends to random node in the cluster
562
563 For more information see https://redis.io/commands/cluster-keyslot
564 """
565 return self.execute_command("CLUSTER KEYSLOT", key)
566
567 def cluster_meet(
568 self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None
569 ) -> ResponseT:
570 """
571 Force a node cluster to handshake with another node.
572 Sends to specified node.
573
574 For more information see https://redis.io/commands/cluster-meet
575 """
576 return self.execute_command(
577 "CLUSTER MEET", host, port, target_nodes=target_nodes
578 )
579
580 def cluster_nodes(self) -> ResponseT:
581 """
582 Get Cluster config for the node.
583 Sends to random node in the cluster
584
585 For more information see https://redis.io/commands/cluster-nodes
586 """
587 return self.execute_command("CLUSTER NODES")
588
589 def cluster_replicate(
590 self, target_nodes: "TargetNodesT", node_id: str
591 ) -> ResponseT:
592 """
593 Reconfigure a node as a slave of the specified master node
594
595 For more information see https://redis.io/commands/cluster-replicate
596 """
597 return self.execute_command(
598 "CLUSTER REPLICATE", node_id, target_nodes=target_nodes
599 )
600
601 def cluster_reset(
602 self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None
603 ) -> ResponseT:
604 """
605 Reset a Redis Cluster node
606
607 If 'soft' is True then it will send 'SOFT' argument
608 If 'soft' is False then it will send 'HARD' argument
609
610 For more information see https://redis.io/commands/cluster-reset
611 """
612 return self.execute_command(
613 "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes
614 )
615
616 def cluster_save_config(
617 self, target_nodes: Optional["TargetNodesT"] = None
618 ) -> ResponseT:
619 """
620 Forces the node to save cluster state on disk
621
622 For more information see https://redis.io/commands/cluster-saveconfig
623 """
624 return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes)
625
626 def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT:
627 """
628 Returns the number of keys in the specified cluster slot
629
630 For more information see https://redis.io/commands/cluster-getkeysinslot
631 """
632 return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys)
633
634 def cluster_set_config_epoch(
635 self, epoch: int, target_nodes: Optional["TargetNodesT"] = None
636 ) -> ResponseT:
637 """
638 Set the configuration epoch in a new node
639
640 For more information see https://redis.io/commands/cluster-set-config-epoch
641 """
642 return self.execute_command(
643 "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes
644 )
645
646 def cluster_setslot(
647 self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str
648 ) -> ResponseT:
649 """
650 Bind an hash slot to a specific node
651
652 :target_node: 'ClusterNode'
653 The node to execute the command on
654
655 For more information see https://redis.io/commands/cluster-setslot
656 """
657 if state.upper() in ("IMPORTING", "NODE", "MIGRATING"):
658 return self.execute_command(
659 "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node
660 )
661 elif state.upper() == "STABLE":
662 raise RedisError('For "stable" state please use cluster_setslot_stable')
663 else:
664 raise RedisError(f"Invalid slot state: {state}")
665
666 def cluster_setslot_stable(self, slot_id: int) -> ResponseT:
667 """
668 Clears migrating / importing state from the slot.
669 It determines by it self what node the slot is in and sends it there.
670
671 For more information see https://redis.io/commands/cluster-setslot
672 """
673 return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE")
674
675 def cluster_replicas(
676 self, node_id: str, target_nodes: Optional["TargetNodesT"] = None
677 ) -> ResponseT:
678 """
679 Provides a list of replica nodes replicating from the specified primary
680 target node.
681
682 For more information see https://redis.io/commands/cluster-replicas
683 """
684 return self.execute_command(
685 "CLUSTER REPLICAS", node_id, target_nodes=target_nodes
686 )
687
688 def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
689 """
690 Get array of Cluster slot to node mappings
691
692 For more information see https://redis.io/commands/cluster-slots
693 """
694 return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes)
695
696 def cluster_shards(self, target_nodes=None):
697 """
698 Returns details about the shards of the cluster.
699
700 For more information see https://redis.io/commands/cluster-shards
701 """
702 return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes)
703
704 def cluster_myshardid(self, target_nodes=None):
705 """
706 Returns the shard ID of the node.
707
708 For more information see https://redis.io/commands/cluster-myshardid/
709 """
710 return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes)
711
712 def cluster_links(self, target_node: "TargetNodesT") -> ResponseT:
713 """
714 Each node in a Redis Cluster maintains a pair of long-lived TCP link with each
715 peer in the cluster: One for sending outbound messages towards the peer and one
716 for receiving inbound messages from the peer.
717
718 This command outputs information of all such peer links as an array.
719
720 For more information see https://redis.io/commands/cluster-links
721 """
722 return self.execute_command("CLUSTER LINKS", target_nodes=target_node)
723
724 def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
725 raise NotImplementedError(
726 "CLUSTER FLUSHSLOTS is intentionally not implemented in the client."
727 )
728
729 def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
730 raise NotImplementedError(
731 "CLUSTER BUMPEPOCH is intentionally not implemented in the client."
732 )
733
734 def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
735 """
736 Enables read queries.
737 The command will be sent to the default cluster node if target_nodes is
738 not specified.
739
740 For more information see https://redis.io/commands/readonly
741 """
742 if target_nodes == "replicas" or target_nodes == "all":
743 # read_from_replicas will only be enabled if the READONLY command
744 # is sent to all replicas
745 self.read_from_replicas = True
746 return self.execute_command("READONLY", target_nodes=target_nodes)
747
748 def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
749 """
750 Disables read queries.
751 The command will be sent to the default cluster node if target_nodes is
752 not specified.
753
754 For more information see https://redis.io/commands/readwrite
755 """
756 # Reset read from replicas flag
757 self.read_from_replicas = False
758 return self.execute_command("READWRITE", target_nodes=target_nodes)
759
760 @deprecated_function(
761 version="7.2.0",
762 reason="Use client-side caching feature instead.",
763 )
764 def client_tracking_on(
765 self,
766 clientid: Optional[int] = None,
767 prefix: Sequence[KeyT] = [],
768 bcast: bool = False,
769 optin: bool = False,
770 optout: bool = False,
771 noloop: bool = False,
772 target_nodes: Optional["TargetNodesT"] = "all",
773 ) -> ResponseT:
774 """
775 Enables the tracking feature of the Redis server, that is used
776 for server assisted client side caching.
777
778 When clientid is provided - in target_nodes only the node that owns the
779 connection with this id should be provided.
780 When clientid is not provided - target_nodes can be any node.
781
782 For more information see https://redis.io/commands/client-tracking
783 """
784 return self.client_tracking(
785 True,
786 clientid,
787 prefix,
788 bcast,
789 optin,
790 optout,
791 noloop,
792 target_nodes=target_nodes,
793 )
794
795 @deprecated_function(
796 version="7.2.0",
797 reason="Use client-side caching feature instead.",
798 )
799 def client_tracking_off(
800 self,
801 clientid: Optional[int] = None,
802 prefix: Sequence[KeyT] = [],
803 bcast: bool = False,
804 optin: bool = False,
805 optout: bool = False,
806 noloop: bool = False,
807 target_nodes: Optional["TargetNodesT"] = "all",
808 ) -> ResponseT:
809 """
810 Disables the tracking feature of the Redis server, that is used
811 for server assisted client side caching.
812
813 When clientid is provided - in target_nodes only the node that owns the
814 connection with this id should be provided.
815 When clientid is not provided - target_nodes can be any node.
816
817 For more information see https://redis.io/commands/client-tracking
818 """
819 return self.client_tracking(
820 False,
821 clientid,
822 prefix,
823 bcast,
824 optin,
825 optout,
826 noloop,
827 target_nodes=target_nodes,
828 )
829
830
831class AsyncClusterManagementCommands(
832 ClusterManagementCommands, AsyncManagementCommands
833):
834 """
835 A class for Redis Cluster management commands
836
837 The class inherits from Redis's core ManagementCommands class and do the
838 required adjustments to work with cluster mode
839 """
840
841 async def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
842 """
843 Set hash slots as unbound in the cluster.
844 It determines by it self what node the slot is in and sends it there
845
846 Returns a list of the results for each processed slot.
847
848 For more information see https://redis.io/commands/cluster-delslots
849 """
850 return await asyncio.gather(
851 *(
852 asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot))
853 for slot in slots
854 )
855 )
856
857 @deprecated_function(
858 version="7.2.0",
859 reason="Use client-side caching feature instead.",
860 )
861 async def client_tracking_on(
862 self,
863 clientid: Optional[int] = None,
864 prefix: Sequence[KeyT] = [],
865 bcast: bool = False,
866 optin: bool = False,
867 optout: bool = False,
868 noloop: bool = False,
869 target_nodes: Optional["TargetNodesT"] = "all",
870 ) -> ResponseT:
871 """
872 Enables the tracking feature of the Redis server, that is used
873 for server assisted client side caching.
874
875 When clientid is provided - in target_nodes only the node that owns the
876 connection with this id should be provided.
877 When clientid is not provided - target_nodes can be any node.
878
879 For more information see https://redis.io/commands/client-tracking
880 """
881 return await self.client_tracking(
882 True,
883 clientid,
884 prefix,
885 bcast,
886 optin,
887 optout,
888 noloop,
889 target_nodes=target_nodes,
890 )
891
892 @deprecated_function(
893 version="7.2.0",
894 reason="Use client-side caching feature instead.",
895 )
896 async def client_tracking_off(
897 self,
898 clientid: Optional[int] = None,
899 prefix: Sequence[KeyT] = [],
900 bcast: bool = False,
901 optin: bool = False,
902 optout: bool = False,
903 noloop: bool = False,
904 target_nodes: Optional["TargetNodesT"] = "all",
905 ) -> ResponseT:
906 """
907 Disables the tracking feature of the Redis server, that is used
908 for server assisted client side caching.
909
910 When clientid is provided - in target_nodes only the node that owns the
911 connection with this id should be provided.
912 When clientid is not provided - target_nodes can be any node.
913
914 For more information see https://redis.io/commands/client-tracking
915 """
916 return await self.client_tracking(
917 False,
918 clientid,
919 prefix,
920 bcast,
921 optin,
922 optout,
923 noloop,
924 target_nodes=target_nodes,
925 )
926
927
928class ClusterDataAccessCommands(DataAccessCommands):
929 """
930 A class for Redis Cluster Data Access Commands
931
932 The class inherits from Redis's core DataAccessCommand class and do the
933 required adjustments to work with cluster mode
934 """
935
936 def stralgo(
937 self,
938 algo: Literal["LCS"],
939 value1: KeyT,
940 value2: KeyT,
941 specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings",
942 len: bool = False,
943 idx: bool = False,
944 minmatchlen: Optional[int] = None,
945 withmatchlen: bool = False,
946 **kwargs,
947 ) -> ResponseT:
948 """
949 Implements complex algorithms that operate on strings.
950 Right now the only algorithm implemented is the LCS algorithm
951 (longest common substring). However new algorithms could be
952 implemented in the future.
953
954 ``algo`` Right now must be LCS
955 ``value1`` and ``value2`` Can be two strings or two keys
956 ``specific_argument`` Specifying if the arguments to the algorithm
957 will be keys or strings. strings is the default.
958 ``len`` Returns just the len of the match.
959 ``idx`` Returns the match positions in each string.
960 ``minmatchlen`` Restrict the list of matches to the ones of a given
961 minimal length. Can be provided only when ``idx`` set to True.
962 ``withmatchlen`` Returns the matches with the len of the match.
963 Can be provided only when ``idx`` set to True.
964
965 For more information see https://redis.io/commands/stralgo
966 """
967 target_nodes = kwargs.pop("target_nodes", None)
968 if specific_argument == "strings" and target_nodes is None:
969 target_nodes = "default-node"
970 kwargs.update({"target_nodes": target_nodes})
971 return super().stralgo(
972 algo,
973 value1,
974 value2,
975 specific_argument,
976 len,
977 idx,
978 minmatchlen,
979 withmatchlen,
980 **kwargs,
981 )
982
983 def scan_iter(
984 self,
985 match: Optional[PatternT] = None,
986 count: Optional[int] = None,
987 _type: Optional[str] = None,
988 **kwargs,
989 ) -> Iterator:
990 # Do the first query with cursor=0 for all nodes
991 cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
992 yield from data
993
994 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
995 if cursors:
996 # Get nodes by name
997 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
998
999 # Iterate over each node till its cursor is 0
1000 kwargs.pop("target_nodes", None)
1001 while cursors:
1002 for name, cursor in cursors.items():
1003 cur, data = self.scan(
1004 cursor=cursor,
1005 match=match,
1006 count=count,
1007 _type=_type,
1008 target_nodes=nodes[name],
1009 **kwargs,
1010 )
1011 yield from data
1012 cursors[name] = cur[name]
1013
1014 cursors = {
1015 name: cursor for name, cursor in cursors.items() if cursor != 0
1016 }
1017
1018
1019class AsyncClusterDataAccessCommands(
1020 ClusterDataAccessCommands, AsyncDataAccessCommands
1021):
1022 """
1023 A class for Redis Cluster Data Access Commands
1024
1025 The class inherits from Redis's core DataAccessCommand class and do the
1026 required adjustments to work with cluster mode
1027 """
1028
1029 async def scan_iter(
1030 self,
1031 match: Optional[PatternT] = None,
1032 count: Optional[int] = None,
1033 _type: Optional[str] = None,
1034 **kwargs,
1035 ) -> AsyncIterator:
1036 # Do the first query with cursor=0 for all nodes
1037 cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
1038 for value in data:
1039 yield value
1040
1041 cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
1042 if cursors:
1043 # Get nodes by name
1044 nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
1045
1046 # Iterate over each node till its cursor is 0
1047 kwargs.pop("target_nodes", None)
1048 while cursors:
1049 for name, cursor in cursors.items():
1050 cur, data = await self.scan(
1051 cursor=cursor,
1052 match=match,
1053 count=count,
1054 _type=_type,
1055 target_nodes=nodes[name],
1056 **kwargs,
1057 )
1058 for value in data:
1059 yield value
1060 cursors[name] = cur[name]
1061
1062 cursors = {
1063 name: cursor for name, cursor in cursors.items() if cursor != 0
1064 }
1065
1066
1067class RedisClusterCommands(
1068 ClusterMultiKeyCommands,
1069 ClusterManagementCommands,
1070 ACLCommands,
1071 PubSubCommands,
1072 ClusterDataAccessCommands,
1073 ScriptCommands,
1074 FunctionCommands,
1075 ModuleCommands,
1076 RedisModuleCommands,
1077):
1078 """
1079 A class for all Redis Cluster commands
1080
1081 For key-based commands, the target node(s) will be internally determined
1082 by the keys' hash slot.
1083 Non-key-based commands can be executed with the 'target_nodes' argument to
1084 target specific nodes. By default, if target_nodes is not specified, the
1085 command will be executed on the default cluster node.
1086
1087 :param :target_nodes: type can be one of the followings:
1088 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
1089 - 'ClusterNode'
1090 - 'list(ClusterNodes)'
1091 - 'dict(any:clusterNodes)'
1092
1093 for example:
1094 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
1095 """
1096
1097
1098class AsyncRedisClusterCommands(
1099 AsyncClusterMultiKeyCommands,
1100 AsyncClusterManagementCommands,
1101 AsyncACLCommands,
1102 AsyncClusterDataAccessCommands,
1103 AsyncScriptCommands,
1104 AsyncFunctionCommands,
1105 AsyncModuleCommands,
1106 AsyncRedisModuleCommands,
1107):
1108 """
1109 A class for all Redis Cluster commands
1110
1111 For key-based commands, the target node(s) will be internally determined
1112 by the keys' hash slot.
1113 Non-key-based commands can be executed with the 'target_nodes' argument to
1114 target specific nodes. By default, if target_nodes is not specified, the
1115 command will be executed on the default cluster node.
1116
1117 :param :target_nodes: type can be one of the followings:
1118 - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
1119 - 'ClusterNode'
1120 - 'list(ClusterNodes)'
1121 - 'dict(any:clusterNodes)'
1122
1123 for example:
1124 r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
1125 """