1import asyncio 
    2from typing import ( 
    3    TYPE_CHECKING, 
    4    Any, 
    5    AsyncIterator, 
    6    Dict, 
    7    Iterable, 
    8    Iterator, 
    9    List, 
    10    Literal, 
    11    Mapping, 
    12    NoReturn, 
    13    Optional, 
    14    Union, 
    15) 
    16 
    17from redis.crc import key_slot 
    18from redis.exceptions import RedisClusterException, RedisError 
    19from redis.typing import ( 
    20    AnyKeyT, 
    21    ClusterCommandsProtocol, 
    22    EncodableT, 
    23    KeysT, 
    24    KeyT, 
    25    PatternT, 
    26    ResponseT, 
    27) 
    28 
    29from .core import ( 
    30    ACLCommands, 
    31    AsyncACLCommands, 
    32    AsyncDataAccessCommands, 
    33    AsyncFunctionCommands, 
    34    AsyncManagementCommands, 
    35    AsyncModuleCommands, 
    36    AsyncScriptCommands, 
    37    DataAccessCommands, 
    38    FunctionCommands, 
    39    ManagementCommands, 
    40    ModuleCommands, 
    41    PubSubCommands, 
    42    ScriptCommands, 
    43) 
    44from .helpers import list_or_args 
    45from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands 
    46 
    47if TYPE_CHECKING: 
    48    from redis.asyncio.cluster import TargetNodesT 
    49 
    50# Not complete, but covers the major ones 
    51# https://redis.io/commands 
    52READ_COMMANDS = frozenset( 
    53    [ 
    54        "BITCOUNT", 
    55        "BITPOS", 
    56        "EVAL_RO", 
    57        "EVALSHA_RO", 
    58        "EXISTS", 
    59        "GEODIST", 
    60        "GEOHASH", 
    61        "GEOPOS", 
    62        "GEORADIUS", 
    63        "GEORADIUSBYMEMBER", 
    64        "GET", 
    65        "GETBIT", 
    66        "GETRANGE", 
    67        "HEXISTS", 
    68        "HGET", 
    69        "HGETALL", 
    70        "HKEYS", 
    71        "HLEN", 
    72        "HMGET", 
    73        "HSTRLEN", 
    74        "HVALS", 
    75        "KEYS", 
    76        "LINDEX", 
    77        "LLEN", 
    78        "LRANGE", 
    79        "MGET", 
    80        "PTTL", 
    81        "RANDOMKEY", 
    82        "SCARD", 
    83        "SDIFF", 
    84        "SINTER", 
    85        "SISMEMBER", 
    86        "SMEMBERS", 
    87        "SRANDMEMBER", 
    88        "STRLEN", 
    89        "SUNION", 
    90        "TTL", 
    91        "ZCARD", 
    92        "ZCOUNT", 
    93        "ZRANGE", 
    94        "ZSCORE", 
    95    ] 
    96) 
    97 
    98 
    99class ClusterMultiKeyCommands(ClusterCommandsProtocol): 
    100    """ 
    101    A class containing commands that handle more than one key 
    102    """ 
    103 
    104    def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]: 
    105        """Split keys into a dictionary that maps a slot to a list of keys.""" 
    106 
    107        slots_to_keys = {} 
    108        for key in keys: 
    109            slot = key_slot(self.encoder.encode(key)) 
    110            slots_to_keys.setdefault(slot, []).append(key) 
    111 
    112        return slots_to_keys 
    113 
    114    def _partition_pairs_by_slot( 
    115        self, mapping: Mapping[AnyKeyT, EncodableT] 
    116    ) -> Dict[int, List[EncodableT]]: 
    117        """Split pairs into a dictionary that maps a slot to a list of pairs.""" 
    118 
    119        slots_to_pairs = {} 
    120        for pair in mapping.items(): 
    121            slot = key_slot(self.encoder.encode(pair[0])) 
    122            slots_to_pairs.setdefault(slot, []).extend(pair) 
    123 
    124        return slots_to_pairs 
    125 
    126    def _execute_pipeline_by_slot( 
    127        self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 
    128    ) -> List[Any]: 
    129        read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 
    130        pipe = self.pipeline() 
    131        [ 
    132            pipe.execute_command( 
    133                command, 
    134                *slot_args, 
    135                target_nodes=[ 
    136                    self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 
    137                ], 
    138            ) 
    139            for slot, slot_args in slots_to_args.items() 
    140        ] 
    141        return pipe.execute() 
    142 
    143    def _reorder_keys_by_command( 
    144        self, 
    145        keys: Iterable[KeyT], 
    146        slots_to_args: Mapping[int, Iterable[EncodableT]], 
    147        responses: Iterable[Any], 
    148    ) -> List[Any]: 
    149        results = { 
    150            k: v 
    151            for slot_values, response in zip(slots_to_args.values(), responses) 
    152            for k, v in zip(slot_values, response) 
    153        } 
    154        return [results[key] for key in keys] 
    155 
    156    def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 
    157        """ 
    158        Splits the keys into different slots and then calls MGET 
    159        for the keys of every slot. This operation will not be atomic 
    160        if keys belong to more than one slot. 
    161 
    162        Returns a list of values ordered identically to ``keys`` 
    163 
    164        For more information see https://redis.io/commands/mget 
    165        """ 
    166 
    167        # Concatenate all keys into a list 
    168        keys = list_or_args(keys, args) 
    169 
    170        # Split keys into slots 
    171        slots_to_keys = self._partition_keys_by_slot(keys) 
    172 
    173        # Execute commands using a pipeline 
    174        res = self._execute_pipeline_by_slot("MGET", slots_to_keys) 
    175 
    176        # Reorder keys in the order the user provided & return 
    177        return self._reorder_keys_by_command(keys, slots_to_keys, res) 
    178 
    179    def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 
    180        """ 
    181        Sets key/values based on a mapping. Mapping is a dictionary of 
    182        key/value pairs. Both keys and values should be strings or types that 
    183        can be cast to a string via str(). 
    184 
    185        Splits the keys into different slots and then calls MSET 
    186        for the keys of every slot. This operation will not be atomic 
    187        if keys belong to more than one slot. 
    188 
    189        For more information see https://redis.io/commands/mset 
    190        """ 
    191 
    192        # Partition the keys by slot 
    193        slots_to_pairs = self._partition_pairs_by_slot(mapping) 
    194 
    195        # Execute commands using a pipeline & return list of replies 
    196        return self._execute_pipeline_by_slot("MSET", slots_to_pairs) 
    197 
    198    def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 
    199        """ 
    200        Runs the given command once for the keys 
    201        of each slot. Returns the sum of the return values. 
    202        """ 
    203 
    204        # Partition the keys by slot 
    205        slots_to_keys = self._partition_keys_by_slot(keys) 
    206 
    207        # Sum up the reply from each command 
    208        return sum(self._execute_pipeline_by_slot(command, slots_to_keys)) 
    209 
    210    def exists(self, *keys: KeyT) -> ResponseT: 
    211        """ 
    212        Returns the number of ``names`` that exist in the 
    213        whole cluster. The keys are first split up into slots 
    214        and then an EXISTS command is sent for every slot 
    215 
    216        For more information see https://redis.io/commands/exists 
    217        """ 
    218        return self._split_command_across_slots("EXISTS", *keys) 
    219 
    220    def delete(self, *keys: KeyT) -> ResponseT: 
    221        """ 
    222        Deletes the given keys in the cluster. 
    223        The keys are first split up into slots 
    224        and then an DEL command is sent for every slot 
    225 
    226        Non-existent keys are ignored. 
    227        Returns the number of keys that were deleted. 
    228 
    229        For more information see https://redis.io/commands/del 
    230        """ 
    231        return self._split_command_across_slots("DEL", *keys) 
    232 
    233    def touch(self, *keys: KeyT) -> ResponseT: 
    234        """ 
    235        Updates the last access time of given keys across the 
    236        cluster. 
    237 
    238        The keys are first split up into slots 
    239        and then an TOUCH command is sent for every slot 
    240 
    241        Non-existent keys are ignored. 
    242        Returns the number of keys that were touched. 
    243 
    244        For more information see https://redis.io/commands/touch 
    245        """ 
    246        return self._split_command_across_slots("TOUCH", *keys) 
    247 
    248    def unlink(self, *keys: KeyT) -> ResponseT: 
    249        """ 
    250        Remove the specified keys in a different thread. 
    251 
    252        The keys are first split up into slots 
    253        and then an TOUCH command is sent for every slot 
    254 
    255        Non-existent keys are ignored. 
    256        Returns the number of keys that were unlinked. 
    257 
    258        For more information see https://redis.io/commands/unlink 
    259        """ 
    260        return self._split_command_across_slots("UNLINK", *keys) 
    261 
    262 
    263class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands): 
    264    """ 
    265    A class containing commands that handle more than one key 
    266    """ 
    267 
    268    async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]: 
    269        """ 
    270        Splits the keys into different slots and then calls MGET 
    271        for the keys of every slot. This operation will not be atomic 
    272        if keys belong to more than one slot. 
    273 
    274        Returns a list of values ordered identically to ``keys`` 
    275 
    276        For more information see https://redis.io/commands/mget 
    277        """ 
    278 
    279        # Concatenate all keys into a list 
    280        keys = list_or_args(keys, args) 
    281 
    282        # Split keys into slots 
    283        slots_to_keys = self._partition_keys_by_slot(keys) 
    284 
    285        # Execute commands using a pipeline 
    286        res = await self._execute_pipeline_by_slot("MGET", slots_to_keys) 
    287 
    288        # Reorder keys in the order the user provided & return 
    289        return self._reorder_keys_by_command(keys, slots_to_keys, res) 
    290 
    291    async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: 
    292        """ 
    293        Sets key/values based on a mapping. Mapping is a dictionary of 
    294        key/value pairs. Both keys and values should be strings or types that 
    295        can be cast to a string via str(). 
    296 
    297        Splits the keys into different slots and then calls MSET 
    298        for the keys of every slot. This operation will not be atomic 
    299        if keys belong to more than one slot. 
    300 
    301        For more information see https://redis.io/commands/mset 
    302        """ 
    303 
    304        # Partition the keys by slot 
    305        slots_to_pairs = self._partition_pairs_by_slot(mapping) 
    306 
    307        # Execute commands using a pipeline & return list of replies 
    308        return await self._execute_pipeline_by_slot("MSET", slots_to_pairs) 
    309 
    310    async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: 
    311        """ 
    312        Runs the given command once for the keys 
    313        of each slot. Returns the sum of the return values. 
    314        """ 
    315 
    316        # Partition the keys by slot 
    317        slots_to_keys = self._partition_keys_by_slot(keys) 
    318 
    319        # Sum up the reply from each command 
    320        return sum(await self._execute_pipeline_by_slot(command, slots_to_keys)) 
    321 
    322    async def _execute_pipeline_by_slot( 
    323        self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] 
    324    ) -> List[Any]: 
    325        if self._initialize: 
    326            await self.initialize() 
    327        read_from_replicas = self.read_from_replicas and command in READ_COMMANDS 
    328        pipe = self.pipeline() 
    329        [ 
    330            pipe.execute_command( 
    331                command, 
    332                *slot_args, 
    333                target_nodes=[ 
    334                    self.nodes_manager.get_node_from_slot(slot, read_from_replicas) 
    335                ], 
    336            ) 
    337            for slot, slot_args in slots_to_args.items() 
    338        ] 
    339        return await pipe.execute() 
    340 
    341 
    342class ClusterManagementCommands(ManagementCommands): 
    343    """ 
    344    A class for Redis Cluster management commands 
    345 
    346    The class inherits from Redis's core ManagementCommands class and do the 
    347    required adjustments to work with cluster mode 
    348    """ 
    349 
    350    def slaveof(self, *args, **kwargs) -> NoReturn: 
    351        """ 
    352        Make the server a replica of another instance, or promote it as master. 
    353 
    354        For more information see https://redis.io/commands/slaveof 
    355        """ 
    356        raise RedisClusterException("SLAVEOF is not supported in cluster mode") 
    357 
    358    def replicaof(self, *args, **kwargs) -> NoReturn: 
    359        """ 
    360        Make the server a replica of another instance, or promote it as master. 
    361 
    362        For more information see https://redis.io/commands/replicaof 
    363        """ 
    364        raise RedisClusterException("REPLICAOF is not supported in cluster mode") 
    365 
    366    def swapdb(self, *args, **kwargs) -> NoReturn: 
    367        """ 
    368        Swaps two Redis databases. 
    369 
    370        For more information see https://redis.io/commands/swapdb 
    371        """ 
    372        raise RedisClusterException("SWAPDB is not supported in cluster mode") 
    373 
    374    def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT: 
    375        """ 
    376        Returns the node's id. 
    377 
    378        :target_node: 'ClusterNode' 
    379            The node to execute the command on 
    380 
    381        For more information check https://redis.io/commands/cluster-myid/ 
    382        """ 
    383        return self.execute_command("CLUSTER MYID", target_nodes=target_node) 
    384 
    385    def cluster_addslots( 
    386        self, target_node: "TargetNodesT", *slots: EncodableT 
    387    ) -> ResponseT: 
    388        """ 
    389        Assign new hash slots to receiving node. Sends to specified node. 
    390 
    391        :target_node: 'ClusterNode' 
    392            The node to execute the command on 
    393 
    394        For more information see https://redis.io/commands/cluster-addslots 
    395        """ 
    396        return self.execute_command( 
    397            "CLUSTER ADDSLOTS", *slots, target_nodes=target_node 
    398        ) 
    399 
    400    def cluster_addslotsrange( 
    401        self, target_node: "TargetNodesT", *slots: EncodableT 
    402    ) -> ResponseT: 
    403        """ 
    404        Similar to the CLUSTER ADDSLOTS command. 
    405        The difference between the two commands is that ADDSLOTS takes a list of slots 
    406        to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges 
    407        (specified by start and end slots) to assign to the node. 
    408 
    409        :target_node: 'ClusterNode' 
    410            The node to execute the command on 
    411 
    412        For more information see https://redis.io/commands/cluster-addslotsrange 
    413        """ 
    414        return self.execute_command( 
    415            "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node 
    416        ) 
    417 
    418    def cluster_countkeysinslot(self, slot_id: int) -> ResponseT: 
    419        """ 
    420        Return the number of local keys in the specified hash slot 
    421        Send to node based on specified slot_id 
    422 
    423        For more information see https://redis.io/commands/cluster-countkeysinslot 
    424        """ 
    425        return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id) 
    426 
    427    def cluster_count_failure_report(self, node_id: str) -> ResponseT: 
    428        """ 
    429        Return the number of failure reports active for a given node 
    430        Sends to a random node 
    431 
    432        For more information see https://redis.io/commands/cluster-count-failure-reports 
    433        """ 
    434        return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id) 
    435 
    436    def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 
    437        """ 
    438        Set hash slots as unbound in the cluster. 
    439        It determines by it self what node the slot is in and sends it there 
    440 
    441        Returns a list of the results for each processed slot. 
    442 
    443        For more information see https://redis.io/commands/cluster-delslots 
    444        """ 
    445        return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots] 
    446 
    447    def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT: 
    448        """ 
    449        Similar to the CLUSTER DELSLOTS command. 
    450        The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove 
    451        from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove 
    452        from the node. 
    453 
    454        For more information see https://redis.io/commands/cluster-delslotsrange 
    455        """ 
    456        return self.execute_command("CLUSTER DELSLOTSRANGE", *slots) 
    457 
    458    def cluster_failover( 
    459        self, target_node: "TargetNodesT", option: Optional[str] = None 
    460    ) -> ResponseT: 
    461        """ 
    462        Forces a slave to perform a manual failover of its master 
    463        Sends to specified node 
    464 
    465        :target_node: 'ClusterNode' 
    466            The node to execute the command on 
    467 
    468        For more information see https://redis.io/commands/cluster-failover 
    469        """ 
    470        if option: 
    471            if option.upper() not in ["FORCE", "TAKEOVER"]: 
    472                raise RedisError( 
    473                    f"Invalid option for CLUSTER FAILOVER command: {option}" 
    474                ) 
    475            else: 
    476                return self.execute_command( 
    477                    "CLUSTER FAILOVER", option, target_nodes=target_node 
    478                ) 
    479        else: 
    480            return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node) 
    481 
    482    def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 
    483        """ 
    484        Provides info about Redis Cluster node state. 
    485        The command will be sent to a random node in the cluster if no target 
    486        node is specified. 
    487 
    488        For more information see https://redis.io/commands/cluster-info 
    489        """ 
    490        return self.execute_command("CLUSTER INFO", target_nodes=target_nodes) 
    491 
    492    def cluster_keyslot(self, key: str) -> ResponseT: 
    493        """ 
    494        Returns the hash slot of the specified key 
    495        Sends to random node in the cluster 
    496 
    497        For more information see https://redis.io/commands/cluster-keyslot 
    498        """ 
    499        return self.execute_command("CLUSTER KEYSLOT", key) 
    500 
    501    def cluster_meet( 
    502        self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None 
    503    ) -> ResponseT: 
    504        """ 
    505        Force a node cluster to handshake with another node. 
    506        Sends to specified node. 
    507 
    508        For more information see https://redis.io/commands/cluster-meet 
    509        """ 
    510        return self.execute_command( 
    511            "CLUSTER MEET", host, port, target_nodes=target_nodes 
    512        ) 
    513 
    514    def cluster_nodes(self) -> ResponseT: 
    515        """ 
    516        Get Cluster config for the node. 
    517        Sends to random node in the cluster 
    518 
    519        For more information see https://redis.io/commands/cluster-nodes 
    520        """ 
    521        return self.execute_command("CLUSTER NODES") 
    522 
    523    def cluster_replicate( 
    524        self, target_nodes: "TargetNodesT", node_id: str 
    525    ) -> ResponseT: 
    526        """ 
    527        Reconfigure a node as a slave of the specified master node 
    528 
    529        For more information see https://redis.io/commands/cluster-replicate 
    530        """ 
    531        return self.execute_command( 
    532            "CLUSTER REPLICATE", node_id, target_nodes=target_nodes 
    533        ) 
    534 
    535    def cluster_reset( 
    536        self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None 
    537    ) -> ResponseT: 
    538        """ 
    539        Reset a Redis Cluster node 
    540 
    541        If 'soft' is True then it will send 'SOFT' argument 
    542        If 'soft' is False then it will send 'HARD' argument 
    543 
    544        For more information see https://redis.io/commands/cluster-reset 
    545        """ 
    546        return self.execute_command( 
    547            "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes 
    548        ) 
    549 
    550    def cluster_save_config( 
    551        self, target_nodes: Optional["TargetNodesT"] = None 
    552    ) -> ResponseT: 
    553        """ 
    554        Forces the node to save cluster state on disk 
    555 
    556        For more information see https://redis.io/commands/cluster-saveconfig 
    557        """ 
    558        return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes) 
    559 
    560    def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT: 
    561        """ 
    562        Returns the number of keys in the specified cluster slot 
    563 
    564        For more information see https://redis.io/commands/cluster-getkeysinslot 
    565        """ 
    566        return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) 
    567 
    568    def cluster_set_config_epoch( 
    569        self, epoch: int, target_nodes: Optional["TargetNodesT"] = None 
    570    ) -> ResponseT: 
    571        """ 
    572        Set the configuration epoch in a new node 
    573 
    574        For more information see https://redis.io/commands/cluster-set-config-epoch 
    575        """ 
    576        return self.execute_command( 
    577            "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes 
    578        ) 
    579 
    580    def cluster_setslot( 
    581        self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str 
    582    ) -> ResponseT: 
    583        """ 
    584        Bind an hash slot to a specific node 
    585 
    586        :target_node: 'ClusterNode' 
    587            The node to execute the command on 
    588 
    589        For more information see https://redis.io/commands/cluster-setslot 
    590        """ 
    591        if state.upper() in ("IMPORTING", "NODE", "MIGRATING"): 
    592            return self.execute_command( 
    593                "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node 
    594            ) 
    595        elif state.upper() == "STABLE": 
    596            raise RedisError('For "stable" state please use cluster_setslot_stable') 
    597        else: 
    598            raise RedisError(f"Invalid slot state: {state}") 
    599 
    600    def cluster_setslot_stable(self, slot_id: int) -> ResponseT: 
    601        """ 
    602        Clears migrating / importing state from the slot. 
    603        It determines by it self what node the slot is in and sends it there. 
    604 
    605        For more information see https://redis.io/commands/cluster-setslot 
    606        """ 
    607        return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") 
    608 
    609    def cluster_replicas( 
    610        self, node_id: str, target_nodes: Optional["TargetNodesT"] = None 
    611    ) -> ResponseT: 
    612        """ 
    613        Provides a list of replica nodes replicating from the specified primary 
    614        target node. 
    615 
    616        For more information see https://redis.io/commands/cluster-replicas 
    617        """ 
    618        return self.execute_command( 
    619            "CLUSTER REPLICAS", node_id, target_nodes=target_nodes 
    620        ) 
    621 
    622    def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 
    623        """ 
    624        Get array of Cluster slot to node mappings 
    625 
    626        For more information see https://redis.io/commands/cluster-slots 
    627        """ 
    628        return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes) 
    629 
    630    def cluster_shards(self, target_nodes=None): 
    631        """ 
    632        Returns details about the shards of the cluster. 
    633 
    634        For more information see https://redis.io/commands/cluster-shards 
    635        """ 
    636        return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes) 
    637 
    638    def cluster_myshardid(self, target_nodes=None): 
    639        """ 
    640        Returns the shard ID of the node. 
    641 
    642        For more information see https://redis.io/commands/cluster-myshardid/ 
    643        """ 
    644        return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes) 
    645 
    646    def cluster_links(self, target_node: "TargetNodesT") -> ResponseT: 
    647        """ 
    648        Each node in a Redis Cluster maintains a pair of long-lived TCP link with each 
    649        peer in the cluster: One for sending outbound messages towards the peer and one 
    650        for receiving inbound messages from the peer. 
    651 
    652        This command outputs information of all such peer links as an array. 
    653 
    654        For more information see https://redis.io/commands/cluster-links 
    655        """ 
    656        return self.execute_command("CLUSTER LINKS", target_nodes=target_node) 
    657 
    658    def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 
    659        raise NotImplementedError( 
    660            "CLUSTER FLUSHSLOTS is intentionally not implemented in the client." 
    661        ) 
    662 
    663    def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None: 
    664        raise NotImplementedError( 
    665            "CLUSTER BUMPEPOCH is intentionally not implemented in the client." 
    666        ) 
    667 
    668    def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 
    669        """ 
    670        Enables read queries. 
    671        The command will be sent to the default cluster node if target_nodes is 
    672        not specified. 
    673 
    674        For more information see https://redis.io/commands/readonly 
    675        """ 
    676        if target_nodes == "replicas" or target_nodes == "all": 
    677            # read_from_replicas will only be enabled if the READONLY command 
    678            # is sent to all replicas 
    679            self.read_from_replicas = True 
    680        return self.execute_command("READONLY", target_nodes=target_nodes) 
    681 
    682    def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: 
    683        """ 
    684        Disables read queries. 
    685        The command will be sent to the default cluster node if target_nodes is 
    686        not specified. 
    687 
    688        For more information see https://redis.io/commands/readwrite 
    689        """ 
    690        # Reset read from replicas flag 
    691        self.read_from_replicas = False 
    692        return self.execute_command("READWRITE", target_nodes=target_nodes) 
    693 
    694 
    695class AsyncClusterManagementCommands( 
    696    ClusterManagementCommands, AsyncManagementCommands 
    697): 
    698    """ 
    699    A class for Redis Cluster management commands 
    700 
    701    The class inherits from Redis's core ManagementCommands class and do the 
    702    required adjustments to work with cluster mode 
    703    """ 
    704 
    705    async def cluster_delslots(self, *slots: EncodableT) -> List[bool]: 
    706        """ 
    707        Set hash slots as unbound in the cluster. 
    708        It determines by it self what node the slot is in and sends it there 
    709 
    710        Returns a list of the results for each processed slot. 
    711 
    712        For more information see https://redis.io/commands/cluster-delslots 
    713        """ 
    714        return await asyncio.gather( 
    715            *( 
    716                asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot)) 
    717                for slot in slots 
    718            ) 
    719        ) 
    720 
    721 
    722class ClusterDataAccessCommands(DataAccessCommands): 
    723    """ 
    724    A class for Redis Cluster Data Access Commands 
    725 
    726    The class inherits from Redis's core DataAccessCommand class and do the 
    727    required adjustments to work with cluster mode 
    728    """ 
    729 
    730    def stralgo( 
    731        self, 
    732        algo: Literal["LCS"], 
    733        value1: KeyT, 
    734        value2: KeyT, 
    735        specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", 
    736        len: bool = False, 
    737        idx: bool = False, 
    738        minmatchlen: Optional[int] = None, 
    739        withmatchlen: bool = False, 
    740        **kwargs, 
    741    ) -> ResponseT: 
    742        """ 
    743        Implements complex algorithms that operate on strings. 
    744        Right now the only algorithm implemented is the LCS algorithm 
    745        (longest common substring). However new algorithms could be 
    746        implemented in the future. 
    747 
    748        ``algo`` Right now must be LCS 
    749        ``value1`` and ``value2`` Can be two strings or two keys 
    750        ``specific_argument`` Specifying if the arguments to the algorithm 
    751        will be keys or strings. strings is the default. 
    752        ``len`` Returns just the len of the match. 
    753        ``idx`` Returns the match positions in each string. 
    754        ``minmatchlen`` Restrict the list of matches to the ones of a given 
    755        minimal length. Can be provided only when ``idx`` set to True. 
    756        ``withmatchlen`` Returns the matches with the len of the match. 
    757        Can be provided only when ``idx`` set to True. 
    758 
    759        For more information see https://redis.io/commands/stralgo 
    760        """ 
    761        target_nodes = kwargs.pop("target_nodes", None) 
    762        if specific_argument == "strings" and target_nodes is None: 
    763            target_nodes = "default-node" 
    764        kwargs.update({"target_nodes": target_nodes}) 
    765        return super().stralgo( 
    766            algo, 
    767            value1, 
    768            value2, 
    769            specific_argument, 
    770            len, 
    771            idx, 
    772            minmatchlen, 
    773            withmatchlen, 
    774            **kwargs, 
    775        ) 
    776 
    777    def scan_iter( 
    778        self, 
    779        match: Optional[PatternT] = None, 
    780        count: Optional[int] = None, 
    781        _type: Optional[str] = None, 
    782        **kwargs, 
    783    ) -> Iterator: 
    784        # Do the first query with cursor=0 for all nodes 
    785        cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) 
    786        yield from data 
    787 
    788        cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 
    789        if cursors: 
    790            # Get nodes by name 
    791            nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 
    792 
    793            # Iterate over each node till its cursor is 0 
    794            kwargs.pop("target_nodes", None) 
    795            while cursors: 
    796                for name, cursor in cursors.items(): 
    797                    cur, data = self.scan( 
    798                        cursor=cursor, 
    799                        match=match, 
    800                        count=count, 
    801                        _type=_type, 
    802                        target_nodes=nodes[name], 
    803                        **kwargs, 
    804                    ) 
    805                    yield from data 
    806                    cursors[name] = cur[name] 
    807 
    808                cursors = { 
    809                    name: cursor for name, cursor in cursors.items() if cursor != 0 
    810                } 
    811 
    812 
    813class AsyncClusterDataAccessCommands( 
    814    ClusterDataAccessCommands, AsyncDataAccessCommands 
    815): 
    816    """ 
    817    A class for Redis Cluster Data Access Commands 
    818 
    819    The class inherits from Redis's core DataAccessCommand class and do the 
    820    required adjustments to work with cluster mode 
    821    """ 
    822 
    823    async def scan_iter( 
    824        self, 
    825        match: Optional[PatternT] = None, 
    826        count: Optional[int] = None, 
    827        _type: Optional[str] = None, 
    828        **kwargs, 
    829    ) -> AsyncIterator: 
    830        # Do the first query with cursor=0 for all nodes 
    831        cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs) 
    832        for value in data: 
    833            yield value 
    834 
    835        cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} 
    836        if cursors: 
    837            # Get nodes by name 
    838            nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} 
    839 
    840            # Iterate over each node till its cursor is 0 
    841            kwargs.pop("target_nodes", None) 
    842            while cursors: 
    843                for name, cursor in cursors.items(): 
    844                    cur, data = await self.scan( 
    845                        cursor=cursor, 
    846                        match=match, 
    847                        count=count, 
    848                        _type=_type, 
    849                        target_nodes=nodes[name], 
    850                        **kwargs, 
    851                    ) 
    852                    for value in data: 
    853                        yield value 
    854                    cursors[name] = cur[name] 
    855 
    856                cursors = { 
    857                    name: cursor for name, cursor in cursors.items() if cursor != 0 
    858                } 
    859 
    860 
    861class RedisClusterCommands( 
    862    ClusterMultiKeyCommands, 
    863    ClusterManagementCommands, 
    864    ACLCommands, 
    865    PubSubCommands, 
    866    ClusterDataAccessCommands, 
    867    ScriptCommands, 
    868    FunctionCommands, 
    869    ModuleCommands, 
    870    RedisModuleCommands, 
    871): 
    872    """ 
    873    A class for all Redis Cluster commands 
    874 
    875    For key-based commands, the target node(s) will be internally determined 
    876    by the keys' hash slot. 
    877    Non-key-based commands can be executed with the 'target_nodes' argument to 
    878    target specific nodes. By default, if target_nodes is not specified, the 
    879    command will be executed on the default cluster node. 
    880 
    881    :param :target_nodes: type can be one of the followings: 
    882        - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 
    883        - 'ClusterNode' 
    884        - 'list(ClusterNodes)' 
    885        - 'dict(any:clusterNodes)' 
    886 
    887    for example: 
    888        r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 
    889    """ 
    890 
    891 
    892class AsyncRedisClusterCommands( 
    893    AsyncClusterMultiKeyCommands, 
    894    AsyncClusterManagementCommands, 
    895    AsyncACLCommands, 
    896    AsyncClusterDataAccessCommands, 
    897    AsyncScriptCommands, 
    898    AsyncFunctionCommands, 
    899    AsyncModuleCommands, 
    900    AsyncRedisModuleCommands, 
    901): 
    902    """ 
    903    A class for all Redis Cluster commands 
    904 
    905    For key-based commands, the target node(s) will be internally determined 
    906    by the keys' hash slot. 
    907    Non-key-based commands can be executed with the 'target_nodes' argument to 
    908    target specific nodes. By default, if target_nodes is not specified, the 
    909    command will be executed on the default cluster node. 
    910 
    911    :param :target_nodes: type can be one of the followings: 
    912        - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM 
    913        - 'ClusterNode' 
    914        - 'list(ClusterNodes)' 
    915        - 'dict(any:clusterNodes)' 
    916 
    917    for example: 
    918        r.cluster_info(target_nodes=RedisCluster.ALL_NODES) 
    919    """