Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import random
2import socket
3import sys
4import threading
5import time
6from abc import ABC, abstractmethod
7from collections import OrderedDict
8from copy import copy
9from enum import Enum
10from itertools import chain
11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
13from redis._parsers import CommandsParser, Encoder
14from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy
15from redis._parsers.helpers import parse_scan
16from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
17from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
18from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
19from redis.commands import READ_COMMANDS, RedisClusterCommands
20from redis.commands.helpers import list_or_args
21from redis.commands.policies import PolicyResolver, StaticPolicyResolver
22from redis.connection import (
23 Connection,
24 ConnectionPool,
25 parse_url,
26)
27from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
28from redis.event import (
29 AfterPooledConnectionsInstantiationEvent,
30 AfterPubSubConnectionInstantiationEvent,
31 ClientType,
32 EventDispatcher,
33)
34from redis.exceptions import (
35 AskError,
36 AuthenticationError,
37 ClusterDownError,
38 ClusterError,
39 ConnectionError,
40 CrossSlotTransactionError,
41 DataError,
42 ExecAbortError,
43 InvalidPipelineStack,
44 MaxConnectionsError,
45 MovedError,
46 RedisClusterException,
47 RedisError,
48 ResponseError,
49 SlotNotCoveredError,
50 TimeoutError,
51 TryAgainError,
52 WatchError,
53)
54from redis.lock import Lock
55from redis.maint_notifications import MaintNotificationsConfig
56from redis.retry import Retry
57from redis.utils import (
58 deprecated_args,
59 dict_merge,
60 list_keys_to_dict,
61 merge_result,
62 safe_str,
63 str_if_bytes,
64 truncate_text,
65)
68def get_node_name(host: str, port: Union[str, int]) -> str:
69 return f"{host}:{port}"
72@deprecated_args(
73 allowed_args=["redis_node"],
74 reason="Use get_connection(redis_node) instead",
75 version="5.3.0",
76)
77def get_connection(redis_node: Redis, *args, **options) -> Connection:
78 return redis_node.connection or redis_node.connection_pool.get_connection()
81def parse_scan_result(command, res, **options):
82 cursors = {}
83 ret = []
84 for node_name, response in res.items():
85 cursor, r = parse_scan(response, **options)
86 cursors[node_name] = cursor
87 ret += r
89 return cursors, ret
92def parse_pubsub_numsub(command, res, **options):
93 numsub_d = OrderedDict()
94 for numsub_tups in res.values():
95 for channel, numsubbed in numsub_tups:
96 try:
97 numsub_d[channel] += numsubbed
98 except KeyError:
99 numsub_d[channel] = numsubbed
101 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
102 return ret_numsub
105def parse_cluster_slots(
106 resp: Any, **options: Any
107) -> Dict[Tuple[int, int], Dict[str, Any]]:
108 current_host = options.get("current_host", "")
110 def fix_server(*args: Any) -> Tuple[str, Any]:
111 return str_if_bytes(args[0]) or current_host, args[1]
113 slots = {}
114 for slot in resp:
115 start, end, primary = slot[:3]
116 replicas = slot[3:]
117 slots[start, end] = {
118 "primary": fix_server(*primary),
119 "replicas": [fix_server(*replica) for replica in replicas],
120 }
122 return slots
125def parse_cluster_shards(resp, **options):
126 """
127 Parse CLUSTER SHARDS response.
128 """
129 if isinstance(resp[0], dict):
130 return resp
131 shards = []
132 for x in resp:
133 shard = {"slots": [], "nodes": []}
134 for i in range(0, len(x[1]), 2):
135 shard["slots"].append((x[1][i], (x[1][i + 1])))
136 nodes = x[3]
137 for node in nodes:
138 dict_node = {}
139 for i in range(0, len(node), 2):
140 dict_node[node[i]] = node[i + 1]
141 shard["nodes"].append(dict_node)
142 shards.append(shard)
144 return shards
147def parse_cluster_myshardid(resp, **options):
148 """
149 Parse CLUSTER MYSHARDID response.
150 """
151 return resp.decode("utf-8")
154PRIMARY = "primary"
155REPLICA = "replica"
156SLOT_ID = "slot-id"
158REDIS_ALLOWED_KEYS = (
159 "connection_class",
160 "connection_pool",
161 "connection_pool_class",
162 "client_name",
163 "credential_provider",
164 "db",
165 "decode_responses",
166 "encoding",
167 "encoding_errors",
168 "host",
169 "lib_name",
170 "lib_version",
171 "max_connections",
172 "nodes_flag",
173 "redis_connect_func",
174 "password",
175 "port",
176 "timeout",
177 "queue_class",
178 "retry",
179 "retry_on_timeout",
180 "protocol",
181 "socket_connect_timeout",
182 "socket_keepalive",
183 "socket_keepalive_options",
184 "socket_timeout",
185 "ssl",
186 "ssl_ca_certs",
187 "ssl_ca_data",
188 "ssl_ca_path",
189 "ssl_certfile",
190 "ssl_cert_reqs",
191 "ssl_include_verify_flags",
192 "ssl_exclude_verify_flags",
193 "ssl_keyfile",
194 "ssl_password",
195 "ssl_check_hostname",
196 "unix_socket_path",
197 "username",
198 "cache",
199 "cache_config",
200)
201KWARGS_DISABLED_KEYS = ("host", "port", "retry")
204def cleanup_kwargs(**kwargs):
205 """
206 Remove unsupported or disabled keys from kwargs
207 """
208 connection_kwargs = {
209 k: v
210 for k, v in kwargs.items()
211 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
212 }
214 return connection_kwargs
217class AbstractRedisCluster:
218 RedisClusterRequestTTL = 16
220 PRIMARIES = "primaries"
221 REPLICAS = "replicas"
222 ALL_NODES = "all"
223 RANDOM = "random"
224 DEFAULT_NODE = "default-node"
226 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
228 COMMAND_FLAGS = dict_merge(
229 list_keys_to_dict(
230 [
231 "ACL CAT",
232 "ACL DELUSER",
233 "ACL DRYRUN",
234 "ACL GENPASS",
235 "ACL GETUSER",
236 "ACL HELP",
237 "ACL LIST",
238 "ACL LOG",
239 "ACL LOAD",
240 "ACL SAVE",
241 "ACL SETUSER",
242 "ACL USERS",
243 "ACL WHOAMI",
244 "AUTH",
245 "CLIENT LIST",
246 "CLIENT SETINFO",
247 "CLIENT SETNAME",
248 "CLIENT GETNAME",
249 "CONFIG SET",
250 "CONFIG REWRITE",
251 "CONFIG RESETSTAT",
252 "TIME",
253 "PUBSUB CHANNELS",
254 "PUBSUB NUMPAT",
255 "PUBSUB NUMSUB",
256 "PUBSUB SHARDCHANNELS",
257 "PUBSUB SHARDNUMSUB",
258 "PING",
259 "INFO",
260 "SHUTDOWN",
261 "KEYS",
262 "DBSIZE",
263 "BGSAVE",
264 "SLOWLOG GET",
265 "SLOWLOG LEN",
266 "SLOWLOG RESET",
267 "WAIT",
268 "WAITAOF",
269 "SAVE",
270 "MEMORY PURGE",
271 "MEMORY MALLOC-STATS",
272 "MEMORY STATS",
273 "LASTSAVE",
274 "CLIENT TRACKINGINFO",
275 "CLIENT PAUSE",
276 "CLIENT UNPAUSE",
277 "CLIENT UNBLOCK",
278 "CLIENT ID",
279 "CLIENT REPLY",
280 "CLIENT GETREDIR",
281 "CLIENT INFO",
282 "CLIENT KILL",
283 "READONLY",
284 "CLUSTER INFO",
285 "CLUSTER MEET",
286 "CLUSTER MYSHARDID",
287 "CLUSTER NODES",
288 "CLUSTER REPLICAS",
289 "CLUSTER RESET",
290 "CLUSTER SET-CONFIG-EPOCH",
291 "CLUSTER SLOTS",
292 "CLUSTER SHARDS",
293 "CLUSTER COUNT-FAILURE-REPORTS",
294 "CLUSTER KEYSLOT",
295 "COMMAND",
296 "COMMAND COUNT",
297 "COMMAND LIST",
298 "COMMAND GETKEYS",
299 "CONFIG GET",
300 "DEBUG",
301 "RANDOMKEY",
302 "READONLY",
303 "READWRITE",
304 "TIME",
305 "TFUNCTION LOAD",
306 "TFUNCTION DELETE",
307 "TFUNCTION LIST",
308 "TFCALL",
309 "TFCALLASYNC",
310 "LATENCY HISTORY",
311 "LATENCY LATEST",
312 "LATENCY RESET",
313 "MODULE LIST",
314 "MODULE LOAD",
315 "MODULE UNLOAD",
316 "MODULE LOADEX",
317 ],
318 DEFAULT_NODE,
319 ),
320 list_keys_to_dict(
321 [
322 "FLUSHALL",
323 "FLUSHDB",
324 "FUNCTION DELETE",
325 "FUNCTION FLUSH",
326 "FUNCTION LIST",
327 "FUNCTION LOAD",
328 "FUNCTION RESTORE",
329 "SCAN",
330 "SCRIPT EXISTS",
331 "SCRIPT FLUSH",
332 "SCRIPT LOAD",
333 ],
334 PRIMARIES,
335 ),
336 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
337 list_keys_to_dict(
338 [
339 "CLUSTER COUNTKEYSINSLOT",
340 "CLUSTER DELSLOTS",
341 "CLUSTER DELSLOTSRANGE",
342 "CLUSTER GETKEYSINSLOT",
343 "CLUSTER SETSLOT",
344 ],
345 SLOT_ID,
346 ),
347 )
349 SEARCH_COMMANDS = (
350 [
351 "FT.CREATE",
352 "FT.SEARCH",
353 "FT.AGGREGATE",
354 "FT.EXPLAIN",
355 "FT.EXPLAINCLI",
356 "FT,PROFILE",
357 "FT.ALTER",
358 "FT.DROPINDEX",
359 "FT.ALIASADD",
360 "FT.ALIASUPDATE",
361 "FT.ALIASDEL",
362 "FT.TAGVALS",
363 "FT.SUGADD",
364 "FT.SUGGET",
365 "FT.SUGDEL",
366 "FT.SUGLEN",
367 "FT.SYNUPDATE",
368 "FT.SYNDUMP",
369 "FT.SPELLCHECK",
370 "FT.DICTADD",
371 "FT.DICTDEL",
372 "FT.DICTDUMP",
373 "FT.INFO",
374 "FT._LIST",
375 "FT.CONFIG",
376 "FT.ADD",
377 "FT.DEL",
378 "FT.DROP",
379 "FT.GET",
380 "FT.MGET",
381 "FT.SYNADD",
382 ],
383 )
385 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
386 "CLUSTER SLOTS": parse_cluster_slots,
387 "CLUSTER SHARDS": parse_cluster_shards,
388 "CLUSTER MYSHARDID": parse_cluster_myshardid,
389 }
391 RESULT_CALLBACKS = dict_merge(
392 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
393 list_keys_to_dict(
394 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
395 ),
396 list_keys_to_dict(
397 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
398 ),
399 list_keys_to_dict(
400 [
401 "PING",
402 "CONFIG SET",
403 "CONFIG REWRITE",
404 "CONFIG RESETSTAT",
405 "CLIENT SETNAME",
406 "BGSAVE",
407 "SLOWLOG RESET",
408 "SAVE",
409 "MEMORY PURGE",
410 "CLIENT PAUSE",
411 "CLIENT UNPAUSE",
412 ],
413 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
414 ),
415 list_keys_to_dict(
416 ["DBSIZE", "WAIT"],
417 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
418 ),
419 list_keys_to_dict(
420 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
421 ),
422 list_keys_to_dict(["SCAN"], parse_scan_result),
423 list_keys_to_dict(
424 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
425 ),
426 list_keys_to_dict(
427 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
428 ),
429 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
430 )
432 ERRORS_ALLOW_RETRY = (
433 ConnectionError,
434 TimeoutError,
435 ClusterDownError,
436 SlotNotCoveredError,
437 )
439 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
440 """Replace the default cluster node.
441 A random cluster node will be chosen if target_node isn't passed, and primaries
442 will be prioritized. The default node will not be changed if there are no other
443 nodes in the cluster.
445 Args:
446 target_node (ClusterNode, optional): Target node to replace the default
447 node. Defaults to None.
448 """
449 if target_node:
450 self.nodes_manager.default_node = target_node
451 else:
452 curr_node = self.get_default_node()
453 primaries = [node for node in self.get_primaries() if node != curr_node]
454 if primaries:
455 # Choose a primary if the cluster contains different primaries
456 self.nodes_manager.default_node = random.choice(primaries)
457 else:
458 # Otherwise, choose a primary if the cluster contains different primaries
459 replicas = [node for node in self.get_replicas() if node != curr_node]
460 if replicas:
461 self.nodes_manager.default_node = random.choice(replicas)
464class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
465 @classmethod
466 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
467 """
468 Return a Redis client object configured from the given URL
470 For example::
472 redis://[[username]:[password]]@localhost:6379/0
473 rediss://[[username]:[password]]@localhost:6379/0
474 unix://[username@]/path/to/socket.sock?db=0[&password=password]
476 Three URL schemes are supported:
478 - `redis://` creates a TCP socket connection. See more at:
479 <https://www.iana.org/assignments/uri-schemes/prov/redis>
480 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
481 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
482 - ``unix://``: creates a Unix Domain Socket connection.
484 The username, password, hostname, path and all querystring values
485 are passed through urllib.parse.unquote in order to replace any
486 percent-encoded values with their corresponding characters.
488 There are several ways to specify a database number. The first value
489 found will be used:
491 1. A ``db`` querystring option, e.g. redis://localhost?db=0
492 2. If using the redis:// or rediss:// schemes, the path argument
493 of the url, e.g. redis://localhost/0
494 3. A ``db`` keyword argument to this function.
496 If none of these options are specified, the default db=0 is used.
498 All querystring options are cast to their appropriate Python types.
499 Boolean arguments can be specified with string values "True"/"False"
500 or "Yes"/"No". Values that cannot be properly cast cause a
501 ``ValueError`` to be raised. Once parsed, the querystring arguments
502 and keyword arguments are passed to the ``ConnectionPool``'s
503 class initializer. In the case of conflicting arguments, querystring
504 arguments always win.
506 """
507 return cls(url=url, **kwargs)
509 @deprecated_args(
510 args_to_warn=["read_from_replicas"],
511 reason="Please configure the 'load_balancing_strategy' instead",
512 version="5.3.0",
513 )
514 @deprecated_args(
515 args_to_warn=[
516 "cluster_error_retry_attempts",
517 ],
518 reason="Please configure the 'retry' object instead",
519 version="6.0.0",
520 )
521 def __init__(
522 self,
523 host: Optional[str] = None,
524 port: int = 6379,
525 startup_nodes: Optional[List["ClusterNode"]] = None,
526 cluster_error_retry_attempts: int = 3,
527 retry: Optional["Retry"] = None,
528 require_full_coverage: bool = True,
529 reinitialize_steps: int = 5,
530 read_from_replicas: bool = False,
531 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
532 dynamic_startup_nodes: bool = True,
533 url: Optional[str] = None,
534 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
535 cache: Optional[CacheInterface] = None,
536 cache_config: Optional[CacheConfig] = None,
537 event_dispatcher: Optional[EventDispatcher] = None,
538 policy_resolver: PolicyResolver = StaticPolicyResolver(),
539 **kwargs,
540 ):
541 """
542 Initialize a new RedisCluster client.
544 :param startup_nodes:
545 List of nodes from which initial bootstrapping can be done
546 :param host:
547 Can be used to point to a startup node
548 :param port:
549 Can be used to point to a startup node
550 :param require_full_coverage:
551 When set to False (default value): the client will not require a
552 full coverage of the slots. However, if not all slots are covered,
553 and at least one node has 'cluster-require-full-coverage' set to
554 'yes,' the server will throw a ClusterDownError for some key-based
555 commands. See -
556 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
557 When set to True: all slots must be covered to construct the
558 cluster client. If not all slots are covered, RedisClusterException
559 will be thrown.
560 :param read_from_replicas:
561 @deprecated - please use load_balancing_strategy instead
562 Enable read from replicas in READONLY mode. You can read possibly
563 stale data.
564 When set to true, read commands will be assigned between the
565 primary and its replications in a Round-Robin manner.
566 :param load_balancing_strategy:
567 Enable read from replicas in READONLY mode and defines the load balancing
568 strategy that will be used for cluster node selection.
569 The data read from replicas is eventually consistent with the data in primary nodes.
570 :param dynamic_startup_nodes:
571 Set the RedisCluster's startup nodes to all of the discovered nodes.
572 If true (default value), the cluster's discovered nodes will be used to
573 determine the cluster nodes-slots mapping in the next topology refresh.
574 It will remove the initial passed startup nodes if their endpoints aren't
575 listed in the CLUSTER SLOTS output.
576 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
577 specific IP addresses, it is best to set it to false.
578 :param cluster_error_retry_attempts:
579 @deprecated - Please configure the 'retry' object instead
580 In case 'retry' object is set - this argument is ignored!
582 Number of times to retry before raising an error when
583 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
584 :class:`~.ClusterDownError` are encountered
585 :param retry:
586 A retry object that defines the retry strategy and the number of
587 retries for the cluster client.
588 In current implementation for the cluster client (starting form redis-py version 6.0.0)
589 the retry object is not yet fully utilized, instead it is used just to determine
590 the number of retries for the cluster client.
591 In the future releases the retry object will be used to handle the cluster client retries!
592 :param reinitialize_steps:
593 Specifies the number of MOVED errors that need to occur before
594 reinitializing the whole cluster topology. If a MOVED error occurs
595 and the cluster does not need to be reinitialized on this current
596 error handling, only the MOVED slot will be patched with the
597 redirected node.
598 To reinitialize the cluster on every MOVED error, set
599 reinitialize_steps to 1.
600 To avoid reinitializing the cluster on moved errors, set
601 reinitialize_steps to 0.
602 :param address_remap:
603 An optional callable which, when provided with an internal network
604 address of a node, e.g. a `(host, port)` tuple, will return the address
605 where the node is reachable. This can be used to map the addresses at
606 which the nodes _think_ they are, to addresses at which a client may
607 reach them, such as when they sit behind a proxy.
609 :**kwargs:
610 Extra arguments that will be sent into Redis instance when created
611 (See Official redis-py doc for supported kwargs - the only limitation
612 is that you can't provide 'retry' object as part of kwargs.
613 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
614 Some kwargs are not supported and will raise a
615 RedisClusterException:
616 - db (Redis do not support database SELECT in cluster mode)
617 """
618 if startup_nodes is None:
619 startup_nodes = []
621 if "db" in kwargs:
622 # Argument 'db' is not possible to use in cluster mode
623 raise RedisClusterException(
624 "Argument 'db' is not possible to use in cluster mode"
625 )
627 if "retry" in kwargs:
628 # Argument 'retry' is not possible to be used in kwargs when in cluster mode
629 # the kwargs are set to the lower level connections to the cluster nodes
630 # and there we provide retry configuration without retries allowed.
631 # The retries should be handled on cluster client level.
632 raise RedisClusterException(
633 "The 'retry' argument cannot be used in kwargs when running in cluster mode."
634 )
636 # Get the startup node/s
637 from_url = False
638 if url is not None:
639 from_url = True
640 url_options = parse_url(url)
641 if "path" in url_options:
642 raise RedisClusterException(
643 "RedisCluster does not currently support Unix Domain "
644 "Socket connections"
645 )
646 if "db" in url_options and url_options["db"] != 0:
647 # Argument 'db' is not possible to use in cluster mode
648 raise RedisClusterException(
649 "A ``db`` querystring option can only be 0 in cluster mode"
650 )
651 kwargs.update(url_options)
652 host = kwargs.get("host")
653 port = kwargs.get("port", port)
654 startup_nodes.append(ClusterNode(host, port))
655 elif host is not None and port is not None:
656 startup_nodes.append(ClusterNode(host, port))
657 elif len(startup_nodes) == 0:
658 # No startup node was provided
659 raise RedisClusterException(
660 "RedisCluster requires at least one node to discover the "
661 "cluster. Please provide one of the followings:\n"
662 "1. host and port, for example:\n"
663 " RedisCluster(host='localhost', port=6379)\n"
664 "2. list of startup nodes, for example:\n"
665 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
666 " ClusterNode('localhost', 6378)])"
667 )
668 # Update the connection arguments
669 # Whenever a new connection is established, RedisCluster's on_connect
670 # method should be run
671 # If the user passed on_connect function we'll save it and run it
672 # inside the RedisCluster.on_connect() function
673 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
674 kwargs.update({"redis_connect_func": self.on_connect})
675 kwargs = cleanup_kwargs(**kwargs)
676 if retry:
677 self.retry = retry
678 else:
679 self.retry = Retry(
680 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
681 retries=cluster_error_retry_attempts,
682 )
684 self.encoder = Encoder(
685 kwargs.get("encoding", "utf-8"),
686 kwargs.get("encoding_errors", "strict"),
687 kwargs.get("decode_responses", False),
688 )
689 protocol = kwargs.get("protocol", None)
690 if (cache_config or cache) and protocol not in [3, "3"]:
691 raise RedisError("Client caching is only supported with RESP version 3")
693 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
694 self.node_flags = self.__class__.NODE_FLAGS.copy()
695 self.read_from_replicas = read_from_replicas
696 self.load_balancing_strategy = load_balancing_strategy
697 self.reinitialize_counter = 0
698 self.reinitialize_steps = reinitialize_steps
699 if event_dispatcher is None:
700 self._event_dispatcher = EventDispatcher()
701 else:
702 self._event_dispatcher = event_dispatcher
703 self.startup_nodes = startup_nodes
704 self.nodes_manager = NodesManager(
705 startup_nodes=startup_nodes,
706 from_url=from_url,
707 require_full_coverage=require_full_coverage,
708 dynamic_startup_nodes=dynamic_startup_nodes,
709 address_remap=address_remap,
710 cache=cache,
711 cache_config=cache_config,
712 event_dispatcher=self._event_dispatcher,
713 **kwargs,
714 )
716 self.cluster_response_callbacks = CaseInsensitiveDict(
717 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
718 )
719 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
721 # For backward compatibility, mapping from existing policies to new one
722 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
723 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
724 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
725 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
726 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
727 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
728 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
729 }
731 self._policies_callback_mapping: dict[
732 Union[RequestPolicy, ResponsePolicy], Callable
733 ] = {
734 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
735 self.get_random_primary_or_all_nodes(command_name)
736 ],
737 RequestPolicy.DEFAULT_KEYED: lambda command,
738 *args: self.get_nodes_from_slot(command, *args),
739 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
740 RequestPolicy.ALL_SHARDS: self.get_primaries,
741 RequestPolicy.ALL_NODES: self.get_nodes,
742 RequestPolicy.ALL_REPLICAS: self.get_replicas,
743 RequestPolicy.MULTI_SHARD: lambda *args,
744 **kwargs: self._split_multi_shard_command(*args, **kwargs),
745 RequestPolicy.SPECIAL: self.get_special_nodes,
746 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
747 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
748 }
750 self._policy_resolver = policy_resolver
751 self.commands_parser = CommandsParser(self)
753 # Node where FT.AGGREGATE command is executed.
754 self._aggregate_nodes = None
755 self._lock = threading.RLock()
757 def __enter__(self):
758 return self
760 def __exit__(self, exc_type, exc_value, traceback):
761 self.close()
763 def __del__(self):
764 try:
765 self.close()
766 except Exception:
767 pass
769 def disconnect_connection_pools(self):
770 for node in self.get_nodes():
771 if node.redis_connection:
772 try:
773 node.redis_connection.connection_pool.disconnect()
774 except OSError:
775 # Client was already disconnected. do nothing
776 pass
778 def on_connect(self, connection):
779 """
780 Initialize the connection, authenticate and select a database and send
781 READONLY if it is set during object initialization.
782 """
783 connection.on_connect()
785 if self.read_from_replicas or self.load_balancing_strategy:
786 # Sending READONLY command to server to configure connection as
787 # readonly. Since each cluster node may change its server type due
788 # to a failover, we should establish a READONLY connection
789 # regardless of the server type. If this is a primary connection,
790 # READONLY would not affect executing write commands.
791 connection.send_command("READONLY")
792 if str_if_bytes(connection.read_response()) != "OK":
793 raise ConnectionError("READONLY command failed")
795 if self.user_on_connect_func is not None:
796 self.user_on_connect_func(connection)
798 def get_redis_connection(self, node: "ClusterNode") -> Redis:
799 if not node.redis_connection:
800 with self._lock:
801 if not node.redis_connection:
802 self.nodes_manager.create_redis_connections([node])
803 return node.redis_connection
805 def get_node(self, host=None, port=None, node_name=None):
806 return self.nodes_manager.get_node(host, port, node_name)
808 def get_primaries(self):
809 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
811 def get_replicas(self):
812 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
814 def get_random_node(self):
815 return random.choice(list(self.nodes_manager.nodes_cache.values()))
817 def get_random_primary_or_all_nodes(self, command_name):
818 """
819 Returns random primary or all nodes depends on READONLY mode.
820 """
821 if self.read_from_replicas and command_name in READ_COMMANDS:
822 return self.get_random_node()
824 return self.get_random_primary_node()
826 def get_nodes(self):
827 return list(self.nodes_manager.nodes_cache.values())
829 def get_node_from_key(self, key, replica=False):
830 """
831 Get the node that holds the key's slot.
832 If replica set to True but the slot doesn't have any replicas, None is
833 returned.
834 """
835 slot = self.keyslot(key)
836 slot_cache = self.nodes_manager.slots_cache.get(slot)
837 if slot_cache is None or len(slot_cache) == 0:
838 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
839 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
840 return None
841 elif replica:
842 node_idx = 1
843 else:
844 # primary
845 node_idx = 0
847 return slot_cache[node_idx]
849 def get_default_node(self):
850 """
851 Get the cluster's default node
852 """
853 return self.nodes_manager.default_node
855 def get_nodes_from_slot(self, command: str, *args):
856 """
857 Returns a list of nodes that hold the specified keys' slots.
858 """
859 # get the node that holds the key's slot
860 slot = self.determine_slot(*args)
861 node = self.nodes_manager.get_node_from_slot(
862 slot,
863 self.read_from_replicas and command in READ_COMMANDS,
864 self.load_balancing_strategy if command in READ_COMMANDS else None,
865 )
866 return [node]
868 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]:
869 """
870 Splits the command with Multi-Shard policy, to the multiple commands
871 """
872 keys = self._get_command_keys(*args)
873 commands = []
875 for key in keys:
876 commands.append(
877 {
878 "args": (args[0], key),
879 "kwargs": kwargs,
880 }
881 )
883 return commands
885 def get_special_nodes(self) -> Optional[list["ClusterNode"]]:
886 """
887 Returns a list of nodes for commands with a special policy.
888 """
889 if not self._aggregate_nodes:
890 raise RedisClusterException(
891 "Cannot execute FT.CURSOR commands without FT.AGGREGATE"
892 )
894 return self._aggregate_nodes
896 def get_random_primary_node(self) -> "ClusterNode":
897 """
898 Returns a random primary node
899 """
900 return random.choice(self.get_primaries())
902 def _evaluate_all_succeeded(self, res):
903 """
904 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED
905 """
906 first_successful_response = None
908 if isinstance(res, dict):
909 for key, value in res.items():
910 if value:
911 if first_successful_response is None:
912 first_successful_response = {key: value}
913 else:
914 return {key: False}
915 else:
916 for response in res:
917 if response:
918 if first_successful_response is None:
919 # Dynamically resolve type
920 first_successful_response = type(response)(response)
921 else:
922 return type(response)(False)
924 return first_successful_response
926 def set_default_node(self, node):
927 """
928 Set the default node of the cluster.
929 :param node: 'ClusterNode'
930 :return True if the default node was set, else False
931 """
932 if node is None or self.get_node(node_name=node.name) is None:
933 return False
934 self.nodes_manager.default_node = node
935 return True
937 def set_retry(self, retry: Retry) -> None:
938 self.retry = retry
940 def monitor(self, target_node=None):
941 """
942 Returns a Monitor object for the specified target node.
943 The default cluster node will be selected if no target node was
944 specified.
945 Monitor is useful for handling the MONITOR command to the redis server.
946 next_command() method returns one command from monitor
947 listen() method yields commands from monitor.
948 """
949 if target_node is None:
950 target_node = self.get_default_node()
951 if target_node.redis_connection is None:
952 raise RedisClusterException(
953 f"Cluster Node {target_node.name} has no redis_connection"
954 )
955 return target_node.redis_connection.monitor()
957 def pubsub(self, node=None, host=None, port=None, **kwargs):
958 """
959 Allows passing a ClusterNode, or host&port, to get a pubsub instance
960 connected to the specified node
961 """
962 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
964 def pipeline(self, transaction=None, shard_hint=None):
965 """
966 Cluster impl:
967 Pipelines do not work in cluster mode the same way they
968 do in normal mode. Create a clone of this object so
969 that simulating pipelines will work correctly. Each
970 command will be called directly when used and
971 when calling execute() will only return the result stack.
972 """
973 if shard_hint:
974 raise RedisClusterException("shard_hint is deprecated in cluster mode")
976 return ClusterPipeline(
977 nodes_manager=self.nodes_manager,
978 commands_parser=self.commands_parser,
979 startup_nodes=self.nodes_manager.startup_nodes,
980 result_callbacks=self.result_callbacks,
981 cluster_response_callbacks=self.cluster_response_callbacks,
982 read_from_replicas=self.read_from_replicas,
983 load_balancing_strategy=self.load_balancing_strategy,
984 reinitialize_steps=self.reinitialize_steps,
985 retry=self.retry,
986 lock=self._lock,
987 transaction=transaction,
988 )
990 def lock(
991 self,
992 name,
993 timeout=None,
994 sleep=0.1,
995 blocking=True,
996 blocking_timeout=None,
997 lock_class=None,
998 thread_local=True,
999 raise_on_release_error: bool = True,
1000 ):
1001 """
1002 Return a new Lock object using key ``name`` that mimics
1003 the behavior of threading.Lock.
1005 If specified, ``timeout`` indicates a maximum life for the lock.
1006 By default, it will remain locked until release() is called.
1008 ``sleep`` indicates the amount of time to sleep per loop iteration
1009 when the lock is in blocking mode and another client is currently
1010 holding the lock.
1012 ``blocking`` indicates whether calling ``acquire`` should block until
1013 the lock has been acquired or to fail immediately, causing ``acquire``
1014 to return False and the lock not being acquired. Defaults to True.
1015 Note this value can be overridden by passing a ``blocking``
1016 argument to ``acquire``.
1018 ``blocking_timeout`` indicates the maximum amount of time in seconds to
1019 spend trying to acquire the lock. A value of ``None`` indicates
1020 continue trying forever. ``blocking_timeout`` can be specified as a
1021 float or integer, both representing the number of seconds to wait.
1023 ``lock_class`` forces the specified lock implementation. Note that as
1024 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
1025 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
1026 you have created your own custom lock class.
1028 ``thread_local`` indicates whether the lock token is placed in
1029 thread-local storage. By default, the token is placed in thread local
1030 storage so that a thread only sees its token, not a token set by
1031 another thread. Consider the following timeline:
1033 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
1034 thread-1 sets the token to "abc"
1035 time: 1, thread-2 blocks trying to acquire `my-lock` using the
1036 Lock instance.
1037 time: 5, thread-1 has not yet completed. redis expires the lock
1038 key.
1039 time: 5, thread-2 acquired `my-lock` now that it's available.
1040 thread-2 sets the token to "xyz"
1041 time: 6, thread-1 finishes its work and calls release(). if the
1042 token is *not* stored in thread local storage, then
1043 thread-1 would see the token value as "xyz" and would be
1044 able to successfully release the thread-2's lock.
1046 ``raise_on_release_error`` indicates whether to raise an exception when
1047 the lock is no longer owned when exiting the context manager. By default,
1048 this is True, meaning an exception will be raised. If False, the warning
1049 will be logged and the exception will be suppressed.
1051 In some use cases it's necessary to disable thread local storage. For
1052 example, if you have code where one thread acquires a lock and passes
1053 that lock instance to a worker thread to release later. If thread
1054 local storage isn't disabled in this case, the worker thread won't see
1055 the token set by the thread that acquired the lock. Our assumption
1056 is that these cases aren't common and as such default to using
1057 thread local storage."""
1058 if lock_class is None:
1059 lock_class = Lock
1060 return lock_class(
1061 self,
1062 name,
1063 timeout=timeout,
1064 sleep=sleep,
1065 blocking=blocking,
1066 blocking_timeout=blocking_timeout,
1067 thread_local=thread_local,
1068 raise_on_release_error=raise_on_release_error,
1069 )
1071 def set_response_callback(self, command, callback):
1072 """Set a custom Response Callback"""
1073 self.cluster_response_callbacks[command] = callback
1075 def _determine_nodes(
1076 self, *args, request_policy: RequestPolicy, **kwargs
1077 ) -> List["ClusterNode"]:
1078 """
1079 Determines a nodes the command should be executed on.
1080 """
1081 command = args[0].upper()
1082 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1083 command = f"{args[0]} {args[1]}".upper()
1085 nodes_flag = kwargs.pop("nodes_flag", None)
1086 if nodes_flag is not None:
1087 # nodes flag passed by the user
1088 command_flag = nodes_flag
1089 else:
1090 # get the nodes group for this command if it was predefined
1091 command_flag = self.command_flags.get(command)
1093 if command_flag in self._command_flags_mapping:
1094 request_policy = self._command_flags_mapping[command_flag]
1096 policy_callback = self._policies_callback_mapping[request_policy]
1098 if request_policy == RequestPolicy.DEFAULT_KEYED:
1099 nodes = policy_callback(command, *args)
1100 elif request_policy == RequestPolicy.MULTI_SHARD:
1101 nodes = policy_callback(*args, **kwargs)
1102 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
1103 nodes = policy_callback(args[0])
1104 else:
1105 nodes = policy_callback()
1107 if args[0].lower() == "ft.aggregate":
1108 self._aggregate_nodes = nodes
1110 return nodes
1112 def _should_reinitialized(self):
1113 # To reinitialize the cluster on every MOVED error,
1114 # set reinitialize_steps to 1.
1115 # To avoid reinitializing the cluster on moved errors, set
1116 # reinitialize_steps to 0.
1117 if self.reinitialize_steps == 0:
1118 return False
1119 else:
1120 return self.reinitialize_counter % self.reinitialize_steps == 0
1122 def keyslot(self, key):
1123 """
1124 Calculate keyslot for a given key.
1125 See Keys distribution model in https://redis.io/topics/cluster-spec
1126 """
1127 k = self.encoder.encode(key)
1128 return key_slot(k)
1130 def _get_command_keys(self, *args):
1131 """
1132 Get the keys in the command. If the command has no keys in in, None is
1133 returned.
1135 NOTE: Due to a bug in redis<7.0, this function does not work properly
1136 for EVAL or EVALSHA when the `numkeys` arg is 0.
1137 - issue: https://github.com/redis/redis/issues/9493
1138 - fix: https://github.com/redis/redis/pull/9733
1140 So, don't use this function with EVAL or EVALSHA.
1141 """
1142 redis_conn = self.get_default_node().redis_connection
1143 return self.commands_parser.get_keys(redis_conn, *args)
1145 def determine_slot(self, *args) -> Optional[int]:
1146 """
1147 Figure out what slot to use based on args.
1149 Raises a RedisClusterException if there's a missing key and we can't
1150 determine what slots to map the command to; or, if the keys don't
1151 all map to the same key slot.
1152 """
1153 command = args[0]
1154 if self.command_flags.get(command) == SLOT_ID:
1155 # The command contains the slot ID
1156 return args[1]
1158 # Get the keys in the command
1160 # CLIENT TRACKING is a special case.
1161 # It doesn't have any keys, it needs to be sent to the provided nodes
1162 # By default it will be sent to all nodes.
1163 if command.upper() == "CLIENT TRACKING":
1164 return None
1166 # EVAL and EVALSHA are common enough that it's wasteful to go to the
1167 # redis server to parse the keys. Besides, there is a bug in redis<7.0
1168 # where `self._get_command_keys()` fails anyway. So, we special case
1169 # EVAL/EVALSHA.
1170 if command.upper() in ("EVAL", "EVALSHA"):
1171 # command syntax: EVAL "script body" num_keys ...
1172 if len(args) <= 2:
1173 raise RedisClusterException(f"Invalid args in command: {args}")
1174 num_actual_keys = int(args[2])
1175 eval_keys = args[3 : 3 + num_actual_keys]
1176 # if there are 0 keys, that means the script can be run on any node
1177 # so we can just return a random slot
1178 if len(eval_keys) == 0:
1179 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1180 keys = eval_keys
1181 else:
1182 keys = self._get_command_keys(*args)
1183 if keys is None or len(keys) == 0:
1184 # FCALL can call a function with 0 keys, that means the function
1185 # can be run on any node so we can just return a random slot
1186 if command.upper() in ("FCALL", "FCALL_RO"):
1187 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1188 raise RedisClusterException(
1189 "No way to dispatch this command to Redis Cluster. "
1190 "Missing key.\nYou can execute the command by specifying "
1191 f"target nodes.\nCommand: {args}"
1192 )
1194 # single key command
1195 if len(keys) == 1:
1196 return self.keyslot(keys[0])
1198 # multi-key command; we need to make sure all keys are mapped to
1199 # the same slot
1200 slots = {self.keyslot(key) for key in keys}
1201 if len(slots) != 1:
1202 raise RedisClusterException(
1203 f"{command} - all keys must map to the same key slot"
1204 )
1206 return slots.pop()
1208 def get_encoder(self):
1209 """
1210 Get the connections' encoder
1211 """
1212 return self.encoder
1214 def get_connection_kwargs(self):
1215 """
1216 Get the connections' key-word arguments
1217 """
1218 return self.nodes_manager.connection_kwargs
1220 def _is_nodes_flag(self, target_nodes):
1221 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1223 def _parse_target_nodes(self, target_nodes):
1224 if isinstance(target_nodes, list):
1225 nodes = target_nodes
1226 elif isinstance(target_nodes, ClusterNode):
1227 # Supports passing a single ClusterNode as a variable
1228 nodes = [target_nodes]
1229 elif isinstance(target_nodes, dict):
1230 # Supports dictionaries of the format {node_name: node}.
1231 # It enables to execute commands with multi nodes as follows:
1232 # rc.cluster_save_config(rc.get_primaries())
1233 nodes = target_nodes.values()
1234 else:
1235 raise TypeError(
1236 "target_nodes type can be one of the following: "
1237 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1238 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1239 f"The passed type is {type(target_nodes)}"
1240 )
1241 return nodes
1243 def execute_command(self, *args, **kwargs):
1244 return self._internal_execute_command(*args, **kwargs)
1246 def _internal_execute_command(self, *args, **kwargs):
1247 """
1248 Wrapper for ERRORS_ALLOW_RETRY error handling.
1250 It will try the number of times specified by the retries property from
1251 config option "self.retry" which defaults to 3 unless manually
1252 configured.
1254 If it reaches the number of times, the command will raise the exception
1256 Key argument :target_nodes: can be passed with the following types:
1257 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1258 ClusterNode
1259 list<ClusterNode>
1260 dict<Any, ClusterNode>
1261 """
1262 target_nodes_specified = False
1263 is_default_node = False
1264 target_nodes = None
1265 passed_targets = kwargs.pop("target_nodes", None)
1266 command_policies = self._policy_resolver.resolve(args[0].lower())
1268 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1269 target_nodes = self._parse_target_nodes(passed_targets)
1270 target_nodes_specified = True
1272 if not command_policies and not target_nodes_specified:
1273 command = args[0].upper()
1274 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1275 command = f"{args[0]} {args[1]}".upper()
1277 # We only could resolve key properties if command is not
1278 # in a list of pre-defined request policies
1279 command_flag = self.command_flags.get(command)
1280 if not command_flag:
1281 # Fallback to default policy
1282 if not self.get_default_node():
1283 slot = None
1284 else:
1285 slot = self.determine_slot(*args)
1286 if not slot:
1287 command_policies = CommandPolicies()
1288 else:
1289 command_policies = CommandPolicies(
1290 request_policy=RequestPolicy.DEFAULT_KEYED,
1291 response_policy=ResponsePolicy.DEFAULT_KEYED,
1292 )
1293 else:
1294 if command_flag in self._command_flags_mapping:
1295 command_policies = CommandPolicies(
1296 request_policy=self._command_flags_mapping[command_flag]
1297 )
1298 else:
1299 command_policies = CommandPolicies()
1300 elif not command_policies and target_nodes_specified:
1301 command_policies = CommandPolicies()
1303 # If an error that allows retrying was thrown, the nodes and slots
1304 # cache were reinitialized. We will retry executing the command with
1305 # the updated cluster setup only when the target nodes can be
1306 # determined again with the new cache tables. Therefore, when target
1307 # nodes were passed to this function, we cannot retry the command
1308 # execution since the nodes may not be valid anymore after the tables
1309 # were reinitialized. So in case of passed target nodes,
1310 # retry_attempts will be set to 0.
1311 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
1312 # Add one for the first execution
1313 execute_attempts = 1 + retry_attempts
1314 for _ in range(execute_attempts):
1315 try:
1316 res = {}
1317 if not target_nodes_specified:
1318 # Determine the nodes to execute the command on
1319 target_nodes = self._determine_nodes(
1320 *args,
1321 request_policy=command_policies.request_policy,
1322 nodes_flag=passed_targets,
1323 )
1324 if not target_nodes:
1325 raise RedisClusterException(
1326 f"No targets were found to execute {args} command on"
1327 )
1328 if (
1329 len(target_nodes) == 1
1330 and target_nodes[0] == self.get_default_node()
1331 ):
1332 is_default_node = True
1333 for node in target_nodes:
1334 res[node.name] = self._execute_command(node, *args, **kwargs)
1336 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED:
1337 break
1339 # Return the processed result
1340 return self._process_result(
1341 args[0],
1342 res,
1343 response_policy=command_policies.response_policy,
1344 **kwargs,
1345 )
1346 except Exception as e:
1347 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1348 if is_default_node:
1349 # Replace the default cluster node
1350 self.replace_default_node()
1351 # The nodes and slots cache were reinitialized.
1352 # Try again with the new cluster setup.
1353 retry_attempts -= 1
1354 continue
1355 else:
1356 # raise the exception
1357 raise e
1359 def _execute_command(self, target_node, *args, **kwargs):
1360 """
1361 Send a command to a node in the cluster
1362 """
1363 command = args[0]
1364 redis_node = None
1365 connection = None
1366 redirect_addr = None
1367 asking = False
1368 moved = False
1369 ttl = int(self.RedisClusterRequestTTL)
1371 while ttl > 0:
1372 ttl -= 1
1373 try:
1374 if asking:
1375 target_node = self.get_node(node_name=redirect_addr)
1376 elif moved:
1377 # MOVED occurred and the slots cache was updated,
1378 # refresh the target node
1379 slot = self.determine_slot(*args)
1380 target_node = self.nodes_manager.get_node_from_slot(
1381 slot,
1382 self.read_from_replicas and command in READ_COMMANDS,
1383 self.load_balancing_strategy
1384 if command in READ_COMMANDS
1385 else None,
1386 )
1387 moved = False
1389 redis_node = self.get_redis_connection(target_node)
1390 connection = get_connection(redis_node)
1391 if asking:
1392 connection.send_command("ASKING")
1393 redis_node.parse_response(connection, "ASKING", **kwargs)
1394 asking = False
1395 connection.send_command(*args, **kwargs)
1396 response = redis_node.parse_response(connection, command, **kwargs)
1398 # Remove keys entry, it needs only for cache.
1399 kwargs.pop("keys", None)
1401 if command in self.cluster_response_callbacks:
1402 response = self.cluster_response_callbacks[command](
1403 response, **kwargs
1404 )
1405 return response
1406 except AuthenticationError:
1407 raise
1408 except MaxConnectionsError:
1409 # MaxConnectionsError indicates client-side resource exhaustion
1410 # (too many connections in the pool), not a node failure.
1411 # Don't treat this as a node failure - just re-raise the error
1412 # without reinitializing the cluster.
1413 raise
1414 except (ConnectionError, TimeoutError) as e:
1415 # ConnectionError can also be raised if we couldn't get a
1416 # connection from the pool before timing out, so check that
1417 # this is an actual connection before attempting to disconnect.
1418 if connection is not None:
1419 connection.disconnect()
1421 # Remove the failed node from the startup nodes before we try
1422 # to reinitialize the cluster
1423 self.nodes_manager.startup_nodes.pop(target_node.name, None)
1424 # Reset the cluster node's connection
1425 target_node.redis_connection = None
1426 self.nodes_manager.initialize()
1427 raise e
1428 except MovedError as e:
1429 # First, we will try to patch the slots/nodes cache with the
1430 # redirected node output and try again. If MovedError exceeds
1431 # 'reinitialize_steps' number of times, we will force
1432 # reinitializing the tables, and then try again.
1433 # 'reinitialize_steps' counter will increase faster when
1434 # the same client object is shared between multiple threads. To
1435 # reduce the frequency you can set this variable in the
1436 # RedisCluster constructor.
1437 self.reinitialize_counter += 1
1438 if self._should_reinitialized():
1439 self.nodes_manager.initialize()
1440 # Reset the counter
1441 self.reinitialize_counter = 0
1442 else:
1443 self.nodes_manager.update_moved_exception(e)
1444 moved = True
1445 except TryAgainError:
1446 if ttl < self.RedisClusterRequestTTL / 2:
1447 time.sleep(0.05)
1448 except AskError as e:
1449 redirect_addr = get_node_name(host=e.host, port=e.port)
1450 asking = True
1451 except (ClusterDownError, SlotNotCoveredError):
1452 # ClusterDownError can occur during a failover and to get
1453 # self-healed, we will try to reinitialize the cluster layout
1454 # and retry executing the command
1456 # SlotNotCoveredError can occur when the cluster is not fully
1457 # initialized or can be temporary issue.
1458 # We will try to reinitialize the cluster topology
1459 # and retry executing the command
1461 time.sleep(0.25)
1462 self.nodes_manager.initialize()
1463 raise
1464 except ResponseError:
1465 raise
1466 except Exception as e:
1467 if connection:
1468 connection.disconnect()
1469 raise e
1470 finally:
1471 if connection is not None:
1472 redis_node.connection_pool.release(connection)
1474 raise ClusterError("TTL exhausted.")
1476 def close(self) -> None:
1477 try:
1478 with self._lock:
1479 if self.nodes_manager:
1480 self.nodes_manager.close()
1481 except AttributeError:
1482 # RedisCluster's __init__ can fail before nodes_manager is set
1483 pass
1485 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs):
1486 """
1487 Process the result of the executed command.
1488 The function would return a dict or a single value.
1490 :type command: str
1491 :type res: dict
1493 `res` should be in the following format:
1494 Dict<node_name, command_result>
1495 """
1496 if command in self.result_callbacks:
1497 res = self.result_callbacks[command](command, res, **kwargs)
1498 elif len(res) == 1:
1499 # When we execute the command on a single node, we can
1500 # remove the dictionary and return a single response
1501 res = list(res.values())[0]
1503 return self._policies_callback_mapping[response_policy](res)
1505 def load_external_module(self, funcname, func):
1506 """
1507 This function can be used to add externally defined redis modules,
1508 and their namespaces to the redis client.
1510 ``funcname`` - A string containing the name of the function to create
1511 ``func`` - The function, being added to this class.
1512 """
1513 setattr(self, funcname, func)
1515 def transaction(self, func, *watches, **kwargs):
1516 """
1517 Convenience method for executing the callable `func` as a transaction
1518 while watching all keys specified in `watches`. The 'func' callable
1519 should expect a single argument which is a Pipeline object.
1520 """
1521 shard_hint = kwargs.pop("shard_hint", None)
1522 value_from_callable = kwargs.pop("value_from_callable", False)
1523 watch_delay = kwargs.pop("watch_delay", None)
1524 with self.pipeline(True, shard_hint) as pipe:
1525 while True:
1526 try:
1527 if watches:
1528 pipe.watch(*watches)
1529 func_value = func(pipe)
1530 exec_value = pipe.execute()
1531 return func_value if value_from_callable else exec_value
1532 except WatchError:
1533 if watch_delay is not None and watch_delay > 0:
1534 time.sleep(watch_delay)
1535 continue
1538class ClusterNode:
1539 def __init__(self, host, port, server_type=None, redis_connection=None):
1540 if host == "localhost":
1541 host = socket.gethostbyname(host)
1543 self.host = host
1544 self.port = port
1545 self.name = get_node_name(host, port)
1546 self.server_type = server_type
1547 self.redis_connection = redis_connection
1549 def __repr__(self):
1550 return (
1551 f"[host={self.host},"
1552 f"port={self.port},"
1553 f"name={self.name},"
1554 f"server_type={self.server_type},"
1555 f"redis_connection={self.redis_connection}]"
1556 )
1558 def __eq__(self, obj):
1559 return isinstance(obj, ClusterNode) and obj.name == self.name
1561 def __del__(self):
1562 try:
1563 if self.redis_connection is not None:
1564 self.redis_connection.close()
1565 except Exception:
1566 # Ignore errors when closing the connection
1567 pass
1570class LoadBalancingStrategy(Enum):
1571 ROUND_ROBIN = "round_robin"
1572 ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1573 RANDOM_REPLICA = "random_replica"
1576class LoadBalancer:
1577 """
1578 Round-Robin Load Balancing
1579 """
1581 def __init__(self, start_index: int = 0) -> None:
1582 self.primary_to_idx = {}
1583 self.start_index = start_index
1585 def get_server_index(
1586 self,
1587 primary: str,
1588 list_size: int,
1589 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1590 ) -> int:
1591 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1592 return self._get_random_replica_index(list_size)
1593 else:
1594 return self._get_round_robin_index(
1595 primary,
1596 list_size,
1597 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1598 )
1600 def reset(self) -> None:
1601 self.primary_to_idx.clear()
1603 def _get_random_replica_index(self, list_size: int) -> int:
1604 return random.randint(1, list_size - 1)
1606 def _get_round_robin_index(
1607 self, primary: str, list_size: int, replicas_only: bool
1608 ) -> int:
1609 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1610 if replicas_only and server_index == 0:
1611 # skip the primary node index
1612 server_index = 1
1613 # Update the index for the next round
1614 self.primary_to_idx[primary] = (server_index + 1) % list_size
1615 return server_index
1618class NodesManager:
1619 def __init__(
1620 self,
1621 startup_nodes,
1622 from_url=False,
1623 require_full_coverage=False,
1624 lock=None,
1625 dynamic_startup_nodes=True,
1626 connection_pool_class=ConnectionPool,
1627 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
1628 cache: Optional[CacheInterface] = None,
1629 cache_config: Optional[CacheConfig] = None,
1630 cache_factory: Optional[CacheFactoryInterface] = None,
1631 event_dispatcher: Optional[EventDispatcher] = None,
1632 **kwargs,
1633 ):
1634 self.nodes_cache: Dict[str, Redis] = {}
1635 self.slots_cache = {}
1636 self.startup_nodes = {}
1637 self.default_node = None
1638 self.populate_startup_nodes(startup_nodes)
1639 self.from_url = from_url
1640 self._require_full_coverage = require_full_coverage
1641 self._dynamic_startup_nodes = dynamic_startup_nodes
1642 self.connection_pool_class = connection_pool_class
1643 self.address_remap = address_remap
1644 self._cache = cache
1645 self._cache_config = cache_config
1646 self._cache_factory = cache_factory
1647 self._moved_exception = None
1648 self.connection_kwargs = kwargs
1649 self.read_load_balancer = LoadBalancer()
1650 if lock is None:
1651 lock = threading.RLock()
1652 self._lock = lock
1653 if event_dispatcher is None:
1654 self._event_dispatcher = EventDispatcher()
1655 else:
1656 self._event_dispatcher = event_dispatcher
1657 self._credential_provider = self.connection_kwargs.get(
1658 "credential_provider", None
1659 )
1660 self.initialize()
1662 def get_node(self, host=None, port=None, node_name=None):
1663 """
1664 Get the requested node from the cluster's nodes.
1665 nodes.
1666 :return: ClusterNode if the node exists, else None
1667 """
1668 if host and port:
1669 # the user passed host and port
1670 if host == "localhost":
1671 host = socket.gethostbyname(host)
1672 return self.nodes_cache.get(get_node_name(host=host, port=port))
1673 elif node_name:
1674 return self.nodes_cache.get(node_name)
1675 else:
1676 return None
1678 def update_moved_exception(self, exception):
1679 self._moved_exception = exception
1681 def _update_moved_slots(self):
1682 """
1683 Update the slot's node with the redirected one
1684 """
1685 e = self._moved_exception
1686 redirected_node = self.get_node(host=e.host, port=e.port)
1687 if redirected_node is not None:
1688 # The node already exists
1689 if redirected_node.server_type is not PRIMARY:
1690 # Update the node's server type
1691 redirected_node.server_type = PRIMARY
1692 else:
1693 # This is a new node, we will add it to the nodes cache
1694 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
1695 self.nodes_cache[redirected_node.name] = redirected_node
1696 if redirected_node in self.slots_cache[e.slot_id]:
1697 # The MOVED error resulted from a failover, and the new slot owner
1698 # had previously been a replica.
1699 old_primary = self.slots_cache[e.slot_id][0]
1700 # Update the old primary to be a replica and add it to the end of
1701 # the slot's node list
1702 old_primary.server_type = REPLICA
1703 self.slots_cache[e.slot_id].append(old_primary)
1704 # Remove the old replica, which is now a primary, from the slot's
1705 # node list
1706 self.slots_cache[e.slot_id].remove(redirected_node)
1707 # Override the old primary with the new one
1708 self.slots_cache[e.slot_id][0] = redirected_node
1709 if self.default_node == old_primary:
1710 # Update the default node with the new primary
1711 self.default_node = redirected_node
1712 else:
1713 # The new slot owner is a new server, or a server from a different
1714 # shard. We need to remove all current nodes from the slot's list
1715 # (including replications) and add just the new node.
1716 self.slots_cache[e.slot_id] = [redirected_node]
1717 # Reset moved_exception
1718 self._moved_exception = None
1720 @deprecated_args(
1721 args_to_warn=["server_type"],
1722 reason=(
1723 "In case you need select some load balancing strategy "
1724 "that will use replicas, please set it through 'load_balancing_strategy'"
1725 ),
1726 version="5.3.0",
1727 )
1728 def get_node_from_slot(
1729 self,
1730 slot,
1731 read_from_replicas=False,
1732 load_balancing_strategy=None,
1733 server_type=None,
1734 ) -> ClusterNode:
1735 """
1736 Gets a node that servers this hash slot
1737 """
1738 if self._moved_exception:
1739 with self._lock:
1740 if self._moved_exception:
1741 self._update_moved_slots()
1743 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
1744 raise SlotNotCoveredError(
1745 f'Slot "{slot}" not covered by the cluster. '
1746 f'"require_full_coverage={self._require_full_coverage}"'
1747 )
1749 if read_from_replicas is True and load_balancing_strategy is None:
1750 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1752 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1753 # get the server index using the strategy defined in load_balancing_strategy
1754 primary_name = self.slots_cache[slot][0].name
1755 node_idx = self.read_load_balancer.get_server_index(
1756 primary_name, len(self.slots_cache[slot]), load_balancing_strategy
1757 )
1758 elif (
1759 server_type is None
1760 or server_type == PRIMARY
1761 or len(self.slots_cache[slot]) == 1
1762 ):
1763 # return a primary
1764 node_idx = 0
1765 else:
1766 # return a replica
1767 # randomly choose one of the replicas
1768 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
1770 return self.slots_cache[slot][node_idx]
1772 def get_nodes_by_server_type(self, server_type):
1773 """
1774 Get all nodes with the specified server type
1775 :param server_type: 'primary' or 'replica'
1776 :return: list of ClusterNode
1777 """
1778 return [
1779 node
1780 for node in self.nodes_cache.values()
1781 if node.server_type == server_type
1782 ]
1784 def populate_startup_nodes(self, nodes):
1785 """
1786 Populate all startup nodes and filters out any duplicates
1787 """
1788 for n in nodes:
1789 self.startup_nodes[n.name] = n
1791 def check_slots_coverage(self, slots_cache):
1792 # Validate if all slots are covered or if we should try next
1793 # startup node
1794 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
1795 if i not in slots_cache:
1796 return False
1797 return True
1799 def create_redis_connections(self, nodes):
1800 """
1801 This function will create a redis connection to all nodes in :nodes:
1802 """
1803 connection_pools = []
1804 for node in nodes:
1805 if node.redis_connection is None:
1806 node.redis_connection = self.create_redis_node(
1807 host=node.host, port=node.port, **self.connection_kwargs
1808 )
1809 connection_pools.append(node.redis_connection.connection_pool)
1811 self._event_dispatcher.dispatch(
1812 AfterPooledConnectionsInstantiationEvent(
1813 connection_pools, ClientType.SYNC, self._credential_provider
1814 )
1815 )
1817 def create_redis_node(self, host, port, **kwargs):
1818 # We are configuring the connection pool not to retry
1819 # connections on lower level clients to avoid retrying
1820 # connections to nodes that are not reachable
1821 # and to avoid blocking the connection pool.
1822 # The only error that will have some handling in the lower
1823 # level clients is ConnectionError which will trigger disconnection
1824 # of the socket.
1825 # The retries will be handled on cluster client level
1826 # where we will have proper handling of the cluster topology
1827 node_retry_config = Retry(
1828 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
1829 )
1831 protocol = kwargs.get("protocol", None)
1832 if protocol in [3, "3"]:
1833 kwargs.update(
1834 {"maint_notifications_config": MaintNotificationsConfig(enabled=False)}
1835 )
1836 if self.from_url:
1837 # Create a redis node with a costumed connection pool
1838 kwargs.update({"host": host})
1839 kwargs.update({"port": port})
1840 kwargs.update({"cache": self._cache})
1841 kwargs.update({"retry": node_retry_config})
1842 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
1843 else:
1844 r = Redis(
1845 host=host,
1846 port=port,
1847 cache=self._cache,
1848 retry=node_retry_config,
1849 **kwargs,
1850 )
1851 return r
1853 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
1854 node_name = get_node_name(host, port)
1855 # check if we already have this node in the tmp_nodes_cache
1856 target_node = tmp_nodes_cache.get(node_name)
1857 if target_node is None:
1858 # before creating a new cluster node, check if the cluster node already
1859 # exists in the current nodes cache and has a valid connection so we can
1860 # reuse it
1861 target_node = self.nodes_cache.get(node_name)
1862 if target_node is None or target_node.redis_connection is None:
1863 # create new cluster node for this cluster
1864 target_node = ClusterNode(host, port, role)
1865 if target_node.server_type != role:
1866 target_node.server_type = role
1867 # add this node to the nodes cache
1868 tmp_nodes_cache[target_node.name] = target_node
1870 return target_node
1872 def initialize(self):
1873 """
1874 Initializes the nodes cache, slots cache and redis connections.
1875 :startup_nodes:
1876 Responsible for discovering other nodes in the cluster
1877 """
1878 self.reset()
1879 tmp_nodes_cache = {}
1880 tmp_slots = {}
1881 disagreements = []
1882 startup_nodes_reachable = False
1883 fully_covered = False
1884 kwargs = self.connection_kwargs
1885 exception = None
1886 # Convert to tuple to prevent RuntimeError if self.startup_nodes
1887 # is modified during iteration
1888 for startup_node in tuple(self.startup_nodes.values()):
1889 try:
1890 if startup_node.redis_connection:
1891 r = startup_node.redis_connection
1892 else:
1893 # Create a new Redis connection
1894 r = self.create_redis_node(
1895 startup_node.host, startup_node.port, **kwargs
1896 )
1897 self.startup_nodes[startup_node.name].redis_connection = r
1898 # Make sure cluster mode is enabled on this node
1899 try:
1900 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1901 r.connection_pool.disconnect()
1902 except ResponseError:
1903 raise RedisClusterException(
1904 "Cluster mode is not enabled on this node"
1905 )
1906 startup_nodes_reachable = True
1907 except Exception as e:
1908 # Try the next startup node.
1909 # The exception is saved and raised only if we have no more nodes.
1910 exception = e
1911 continue
1913 # CLUSTER SLOTS command results in the following output:
1914 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
1915 # where each node contains the following list: [IP, port, node_id]
1916 # Therefore, cluster_slots[0][2][0] will be the IP address of the
1917 # primary node of the first slot section.
1918 # If there's only one server in the cluster, its ``host`` is ''
1919 # Fix it to the host in startup_nodes
1920 if (
1921 len(cluster_slots) == 1
1922 and len(cluster_slots[0][2][0]) == 0
1923 and len(self.startup_nodes) == 1
1924 ):
1925 cluster_slots[0][2][0] = startup_node.host
1927 for slot in cluster_slots:
1928 primary_node = slot[2]
1929 host = str_if_bytes(primary_node[0])
1930 if host == "":
1931 host = startup_node.host
1932 port = int(primary_node[1])
1933 host, port = self.remap_host_port(host, port)
1935 nodes_for_slot = []
1937 target_node = self._get_or_create_cluster_node(
1938 host, port, PRIMARY, tmp_nodes_cache
1939 )
1940 nodes_for_slot.append(target_node)
1942 replica_nodes = slot[3:]
1943 for replica_node in replica_nodes:
1944 host = str_if_bytes(replica_node[0])
1945 port = int(replica_node[1])
1946 host, port = self.remap_host_port(host, port)
1947 target_replica_node = self._get_or_create_cluster_node(
1948 host, port, REPLICA, tmp_nodes_cache
1949 )
1950 nodes_for_slot.append(target_replica_node)
1952 for i in range(int(slot[0]), int(slot[1]) + 1):
1953 if i not in tmp_slots:
1954 tmp_slots[i] = nodes_for_slot
1955 else:
1956 # Validate that 2 nodes want to use the same slot cache
1957 # setup
1958 tmp_slot = tmp_slots[i][0]
1959 if tmp_slot.name != target_node.name:
1960 disagreements.append(
1961 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
1962 )
1964 if len(disagreements) > 5:
1965 raise RedisClusterException(
1966 f"startup_nodes could not agree on a valid "
1967 f"slots cache: {', '.join(disagreements)}"
1968 )
1970 fully_covered = self.check_slots_coverage(tmp_slots)
1971 if fully_covered:
1972 # Don't need to continue to the next startup node if all
1973 # slots are covered
1974 break
1976 if not startup_nodes_reachable:
1977 raise RedisClusterException(
1978 f"Redis Cluster cannot be connected. Please provide at least "
1979 f"one reachable node: {str(exception)}"
1980 ) from exception
1982 if self._cache is None and self._cache_config is not None:
1983 if self._cache_factory is None:
1984 self._cache = CacheFactory(self._cache_config).get_cache()
1985 else:
1986 self._cache = self._cache_factory.get_cache()
1988 # Create Redis connections to all nodes
1989 self.create_redis_connections(list(tmp_nodes_cache.values()))
1991 # Check if the slots are not fully covered
1992 if not fully_covered and self._require_full_coverage:
1993 # Despite the requirement that the slots be covered, there
1994 # isn't a full coverage
1995 raise RedisClusterException(
1996 f"All slots are not covered after query all startup_nodes. "
1997 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
1998 f"covered..."
1999 )
2001 # Set the tmp variables to the real variables
2002 self.nodes_cache = tmp_nodes_cache
2003 self.slots_cache = tmp_slots
2004 # Set the default node
2005 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
2006 if self._dynamic_startup_nodes:
2007 # Populate the startup nodes with all discovered nodes
2008 self.startup_nodes = tmp_nodes_cache
2009 # If initialize was called after a MovedError, clear it
2010 self._moved_exception = None
2012 def close(self) -> None:
2013 self.default_node = None
2014 for node in self.nodes_cache.values():
2015 if node.redis_connection:
2016 node.redis_connection.close()
2018 def reset(self):
2019 try:
2020 self.read_load_balancer.reset()
2021 except TypeError:
2022 # The read_load_balancer is None, do nothing
2023 pass
2025 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
2026 """
2027 Remap the host and port returned from the cluster to a different
2028 internal value. Useful if the client is not connecting directly
2029 to the cluster.
2030 """
2031 if self.address_remap:
2032 return self.address_remap((host, port))
2033 return host, port
2035 def find_connection_owner(self, connection: Connection) -> Optional[Redis]:
2036 node_name = get_node_name(connection.host, connection.port)
2037 for node in tuple(self.nodes_cache.values()):
2038 if node.redis_connection:
2039 conn_args = node.redis_connection.connection_pool.connection_kwargs
2040 if node_name == get_node_name(
2041 conn_args.get("host"), conn_args.get("port")
2042 ):
2043 return node
2046class ClusterPubSub(PubSub):
2047 """
2048 Wrapper for PubSub class.
2050 IMPORTANT: before using ClusterPubSub, read about the known limitations
2051 with pubsub in Cluster mode and learn how to workaround them:
2052 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
2053 """
2055 def __init__(
2056 self,
2057 redis_cluster,
2058 node=None,
2059 host=None,
2060 port=None,
2061 push_handler_func=None,
2062 event_dispatcher: Optional["EventDispatcher"] = None,
2063 **kwargs,
2064 ):
2065 """
2066 When a pubsub instance is created without specifying a node, a single
2067 node will be transparently chosen for the pubsub connection on the
2068 first command execution. The node will be determined by:
2069 1. Hashing the channel name in the request to find its keyslot
2070 2. Selecting a node that handles the keyslot: If read_from_replicas is
2071 set to true or load_balancing_strategy is set, a replica can be selected.
2073 :type redis_cluster: RedisCluster
2074 :type node: ClusterNode
2075 :type host: str
2076 :type port: int
2077 """
2078 self.node = None
2079 self.set_pubsub_node(redis_cluster, node, host, port)
2080 connection_pool = (
2081 None
2082 if self.node is None
2083 else redis_cluster.get_redis_connection(self.node).connection_pool
2084 )
2085 self.cluster = redis_cluster
2086 self.node_pubsub_mapping = {}
2087 self._pubsubs_generator = self._pubsubs_generator()
2088 if event_dispatcher is None:
2089 self._event_dispatcher = EventDispatcher()
2090 else:
2091 self._event_dispatcher = event_dispatcher
2092 super().__init__(
2093 connection_pool=connection_pool,
2094 encoder=redis_cluster.encoder,
2095 push_handler_func=push_handler_func,
2096 event_dispatcher=self._event_dispatcher,
2097 **kwargs,
2098 )
2100 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
2101 """
2102 The pubsub node will be set according to the passed node, host and port
2103 When none of the node, host, or port are specified - the node is set
2104 to None and will be determined by the keyslot of the channel in the
2105 first command to be executed.
2106 RedisClusterException will be thrown if the passed node does not exist
2107 in the cluster.
2108 If host is passed without port, or vice versa, a DataError will be
2109 thrown.
2110 :type cluster: RedisCluster
2111 :type node: ClusterNode
2112 :type host: str
2113 :type port: int
2114 """
2115 if node is not None:
2116 # node is passed by the user
2117 self._raise_on_invalid_node(cluster, node, node.host, node.port)
2118 pubsub_node = node
2119 elif host is not None and port is not None:
2120 # host and port passed by the user
2121 node = cluster.get_node(host=host, port=port)
2122 self._raise_on_invalid_node(cluster, node, host, port)
2123 pubsub_node = node
2124 elif any([host, port]) is True:
2125 # only 'host' or 'port' passed
2126 raise DataError("Passing a host requires passing a port, and vice versa")
2127 else:
2128 # nothing passed by the user. set node to None
2129 pubsub_node = None
2131 self.node = pubsub_node
2133 def get_pubsub_node(self):
2134 """
2135 Get the node that is being used as the pubsub connection
2136 """
2137 return self.node
2139 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
2140 """
2141 Raise a RedisClusterException if the node is None or doesn't exist in
2142 the cluster.
2143 """
2144 if node is None or redis_cluster.get_node(node_name=node.name) is None:
2145 raise RedisClusterException(
2146 f"Node {host}:{port} doesn't exist in the cluster"
2147 )
2149 def execute_command(self, *args):
2150 """
2151 Execute a subscribe/unsubscribe command.
2153 Taken code from redis-py and tweak to make it work within a cluster.
2154 """
2155 # NOTE: don't parse the response in this function -- it could pull a
2156 # legitimate message off the stack if the connection is already
2157 # subscribed to one or more channels
2159 if self.connection is None:
2160 if self.connection_pool is None:
2161 if len(args) > 1:
2162 # Hash the first channel and get one of the nodes holding
2163 # this slot
2164 channel = args[1]
2165 slot = self.cluster.keyslot(channel)
2166 node = self.cluster.nodes_manager.get_node_from_slot(
2167 slot,
2168 self.cluster.read_from_replicas,
2169 self.cluster.load_balancing_strategy,
2170 )
2171 else:
2172 # Get a random node
2173 node = self.cluster.get_random_node()
2174 self.node = node
2175 redis_connection = self.cluster.get_redis_connection(node)
2176 self.connection_pool = redis_connection.connection_pool
2177 self.connection = self.connection_pool.get_connection()
2178 # register a callback that re-subscribes to any channels we
2179 # were listening to when we were disconnected
2180 self.connection.register_connect_callback(self.on_connect)
2181 if self.push_handler_func is not None:
2182 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
2183 self._event_dispatcher.dispatch(
2184 AfterPubSubConnectionInstantiationEvent(
2185 self.connection, self.connection_pool, ClientType.SYNC, self._lock
2186 )
2187 )
2188 connection = self.connection
2189 self._execute(connection, connection.send_command, *args)
2191 def _get_node_pubsub(self, node):
2192 try:
2193 return self.node_pubsub_mapping[node.name]
2194 except KeyError:
2195 pubsub = node.redis_connection.pubsub(
2196 push_handler_func=self.push_handler_func
2197 )
2198 self.node_pubsub_mapping[node.name] = pubsub
2199 return pubsub
2201 def _sharded_message_generator(self):
2202 for _ in range(len(self.node_pubsub_mapping)):
2203 pubsub = next(self._pubsubs_generator)
2204 message = pubsub.get_message()
2205 if message is not None:
2206 return message
2207 return None
2209 def _pubsubs_generator(self):
2210 while True:
2211 current_nodes = list(self.node_pubsub_mapping.values())
2212 yield from current_nodes
2214 def get_sharded_message(
2215 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
2216 ):
2217 if target_node:
2218 message = self.node_pubsub_mapping[target_node.name].get_message(
2219 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
2220 )
2221 else:
2222 message = self._sharded_message_generator()
2223 if message is None:
2224 return None
2225 elif str_if_bytes(message["type"]) == "sunsubscribe":
2226 if message["channel"] in self.pending_unsubscribe_shard_channels:
2227 self.pending_unsubscribe_shard_channels.remove(message["channel"])
2228 self.shard_channels.pop(message["channel"], None)
2229 node = self.cluster.get_node_from_key(message["channel"])
2230 if self.node_pubsub_mapping[node.name].subscribed is False:
2231 self.node_pubsub_mapping.pop(node.name)
2232 if not self.channels and not self.patterns and not self.shard_channels:
2233 # There are no subscriptions anymore, set subscribed_event flag
2234 # to false
2235 self.subscribed_event.clear()
2236 if self.ignore_subscribe_messages or ignore_subscribe_messages:
2237 return None
2238 return message
2240 def ssubscribe(self, *args, **kwargs):
2241 if args:
2242 args = list_or_args(args[0], args[1:])
2243 s_channels = dict.fromkeys(args)
2244 s_channels.update(kwargs)
2245 for s_channel, handler in s_channels.items():
2246 node = self.cluster.get_node_from_key(s_channel)
2247 pubsub = self._get_node_pubsub(node)
2248 if handler:
2249 pubsub.ssubscribe(**{s_channel: handler})
2250 else:
2251 pubsub.ssubscribe(s_channel)
2252 self.shard_channels.update(pubsub.shard_channels)
2253 self.pending_unsubscribe_shard_channels.difference_update(
2254 self._normalize_keys({s_channel: None})
2255 )
2256 if pubsub.subscribed and not self.subscribed:
2257 self.subscribed_event.set()
2258 self.health_check_response_counter = 0
2260 def sunsubscribe(self, *args):
2261 if args:
2262 args = list_or_args(args[0], args[1:])
2263 else:
2264 args = self.shard_channels
2266 for s_channel in args:
2267 node = self.cluster.get_node_from_key(s_channel)
2268 p = self._get_node_pubsub(node)
2269 p.sunsubscribe(s_channel)
2270 self.pending_unsubscribe_shard_channels.update(
2271 p.pending_unsubscribe_shard_channels
2272 )
2274 def get_redis_connection(self):
2275 """
2276 Get the Redis connection of the pubsub connected node.
2277 """
2278 if self.node is not None:
2279 return self.node.redis_connection
2281 def disconnect(self):
2282 """
2283 Disconnect the pubsub connection.
2284 """
2285 if self.connection:
2286 self.connection.disconnect()
2287 for pubsub in self.node_pubsub_mapping.values():
2288 pubsub.connection.disconnect()
2291class ClusterPipeline(RedisCluster):
2292 """
2293 Support for Redis pipeline
2294 in cluster mode
2295 """
2297 ERRORS_ALLOW_RETRY = (
2298 ConnectionError,
2299 TimeoutError,
2300 MovedError,
2301 AskError,
2302 TryAgainError,
2303 )
2305 NO_SLOTS_COMMANDS = {"UNWATCH"}
2306 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
2307 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
2309 @deprecated_args(
2310 args_to_warn=[
2311 "cluster_error_retry_attempts",
2312 ],
2313 reason="Please configure the 'retry' object instead",
2314 version="6.0.0",
2315 )
2316 def __init__(
2317 self,
2318 nodes_manager: "NodesManager",
2319 commands_parser: "CommandsParser",
2320 result_callbacks: Optional[Dict[str, Callable]] = None,
2321 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
2322 startup_nodes: Optional[List["ClusterNode"]] = None,
2323 read_from_replicas: bool = False,
2324 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2325 cluster_error_retry_attempts: int = 3,
2326 reinitialize_steps: int = 5,
2327 retry: Optional[Retry] = None,
2328 lock=None,
2329 transaction=False,
2330 policy_resolver: PolicyResolver = StaticPolicyResolver(),
2331 **kwargs,
2332 ):
2333 """ """
2334 self.command_stack = []
2335 self.nodes_manager = nodes_manager
2336 self.commands_parser = commands_parser
2337 self.refresh_table_asap = False
2338 self.result_callbacks = (
2339 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
2340 )
2341 self.startup_nodes = startup_nodes if startup_nodes else []
2342 self.read_from_replicas = read_from_replicas
2343 self.load_balancing_strategy = load_balancing_strategy
2344 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
2345 self.cluster_response_callbacks = cluster_response_callbacks
2346 self.reinitialize_counter = 0
2347 self.reinitialize_steps = reinitialize_steps
2348 if retry is not None:
2349 self.retry = retry
2350 else:
2351 self.retry = Retry(
2352 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
2353 retries=cluster_error_retry_attempts,
2354 )
2356 self.encoder = Encoder(
2357 kwargs.get("encoding", "utf-8"),
2358 kwargs.get("encoding_errors", "strict"),
2359 kwargs.get("decode_responses", False),
2360 )
2361 if lock is None:
2362 lock = threading.RLock()
2363 self._lock = lock
2364 self.parent_execute_command = super().execute_command
2365 self._execution_strategy: ExecutionStrategy = (
2366 PipelineStrategy(self) if not transaction else TransactionStrategy(self)
2367 )
2369 # For backward compatibility, mapping from existing policies to new one
2370 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
2371 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
2372 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
2373 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
2374 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
2375 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
2376 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
2377 }
2379 self._policies_callback_mapping: dict[
2380 Union[RequestPolicy, ResponsePolicy], Callable
2381 ] = {
2382 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
2383 self.get_random_primary_or_all_nodes(command_name)
2384 ],
2385 RequestPolicy.DEFAULT_KEYED: lambda command,
2386 *args: self.get_nodes_from_slot(command, *args),
2387 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
2388 RequestPolicy.ALL_SHARDS: self.get_primaries,
2389 RequestPolicy.ALL_NODES: self.get_nodes,
2390 RequestPolicy.ALL_REPLICAS: self.get_replicas,
2391 RequestPolicy.MULTI_SHARD: lambda *args,
2392 **kwargs: self._split_multi_shard_command(*args, **kwargs),
2393 RequestPolicy.SPECIAL: self.get_special_nodes,
2394 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
2395 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
2396 }
2398 self._policy_resolver = policy_resolver
2400 def __repr__(self):
2401 """ """
2402 return f"{type(self).__name__}"
2404 def __enter__(self):
2405 """ """
2406 return self
2408 def __exit__(self, exc_type, exc_value, traceback):
2409 """ """
2410 self.reset()
2412 def __del__(self):
2413 try:
2414 self.reset()
2415 except Exception:
2416 pass
2418 def __len__(self):
2419 """ """
2420 return len(self._execution_strategy.command_queue)
2422 def __bool__(self):
2423 "Pipeline instances should always evaluate to True on Python 3+"
2424 return True
2426 def execute_command(self, *args, **kwargs):
2427 """
2428 Wrapper function for pipeline_execute_command
2429 """
2430 return self._execution_strategy.execute_command(*args, **kwargs)
2432 def pipeline_execute_command(self, *args, **options):
2433 """
2434 Stage a command to be executed when execute() is next called
2436 Returns the current Pipeline object back so commands can be
2437 chained together, such as:
2439 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
2441 At some other point, you can then run: pipe.execute(),
2442 which will execute all commands queued in the pipe.
2443 """
2444 return self._execution_strategy.execute_command(*args, **options)
2446 def annotate_exception(self, exception, number, command):
2447 """
2448 Provides extra context to the exception prior to it being handled
2449 """
2450 self._execution_strategy.annotate_exception(exception, number, command)
2452 def execute(self, raise_on_error: bool = True) -> List[Any]:
2453 """
2454 Execute all the commands in the current pipeline
2455 """
2457 try:
2458 return self._execution_strategy.execute(raise_on_error)
2459 finally:
2460 self.reset()
2462 def reset(self):
2463 """
2464 Reset back to empty pipeline.
2465 """
2466 self._execution_strategy.reset()
2468 def send_cluster_commands(
2469 self, stack, raise_on_error=True, allow_redirections=True
2470 ):
2471 return self._execution_strategy.send_cluster_commands(
2472 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
2473 )
2475 def exists(self, *keys):
2476 return self._execution_strategy.exists(*keys)
2478 def eval(self):
2479 """ """
2480 return self._execution_strategy.eval()
2482 def multi(self):
2483 """
2484 Start a transactional block of the pipeline after WATCH commands
2485 are issued. End the transactional block with `execute`.
2486 """
2487 self._execution_strategy.multi()
2489 def load_scripts(self):
2490 """ """
2491 self._execution_strategy.load_scripts()
2493 def discard(self):
2494 """ """
2495 self._execution_strategy.discard()
2497 def watch(self, *names):
2498 """Watches the values at keys ``names``"""
2499 self._execution_strategy.watch(*names)
2501 def unwatch(self):
2502 """Unwatches all previously specified keys"""
2503 self._execution_strategy.unwatch()
2505 def script_load_for_pipeline(self, *args, **kwargs):
2506 self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
2508 def delete(self, *names):
2509 self._execution_strategy.delete(*names)
2511 def unlink(self, *names):
2512 self._execution_strategy.unlink(*names)
2515def block_pipeline_command(name: str) -> Callable[..., Any]:
2516 """
2517 Prints error because some pipelined commands should
2518 be blocked when running in cluster-mode
2519 """
2521 def inner(*args, **kwargs):
2522 raise RedisClusterException(
2523 f"ERROR: Calling pipelined function {name} is blocked "
2524 f"when running redis in cluster mode..."
2525 )
2527 return inner
2530# Blocked pipeline commands
2531PIPELINE_BLOCKED_COMMANDS = (
2532 "BGREWRITEAOF",
2533 "BGSAVE",
2534 "BITOP",
2535 "BRPOPLPUSH",
2536 "CLIENT GETNAME",
2537 "CLIENT KILL",
2538 "CLIENT LIST",
2539 "CLIENT SETNAME",
2540 "CLIENT",
2541 "CONFIG GET",
2542 "CONFIG RESETSTAT",
2543 "CONFIG REWRITE",
2544 "CONFIG SET",
2545 "CONFIG",
2546 "DBSIZE",
2547 "ECHO",
2548 "EVALSHA",
2549 "FLUSHALL",
2550 "FLUSHDB",
2551 "INFO",
2552 "KEYS",
2553 "LASTSAVE",
2554 "MGET",
2555 "MGET NONATOMIC",
2556 "MOVE",
2557 "MSET",
2558 "MSETEX",
2559 "MSET NONATOMIC",
2560 "MSETNX",
2561 "PFCOUNT",
2562 "PFMERGE",
2563 "PING",
2564 "PUBLISH",
2565 "RANDOMKEY",
2566 "READONLY",
2567 "READWRITE",
2568 "RENAME",
2569 "RENAMENX",
2570 "RPOPLPUSH",
2571 "SAVE",
2572 "SCAN",
2573 "SCRIPT EXISTS",
2574 "SCRIPT FLUSH",
2575 "SCRIPT KILL",
2576 "SCRIPT LOAD",
2577 "SCRIPT",
2578 "SDIFF",
2579 "SDIFFSTORE",
2580 "SENTINEL GET MASTER ADDR BY NAME",
2581 "SENTINEL MASTER",
2582 "SENTINEL MASTERS",
2583 "SENTINEL MONITOR",
2584 "SENTINEL REMOVE",
2585 "SENTINEL SENTINELS",
2586 "SENTINEL SET",
2587 "SENTINEL SLAVES",
2588 "SENTINEL",
2589 "SHUTDOWN",
2590 "SINTER",
2591 "SINTERSTORE",
2592 "SLAVEOF",
2593 "SLOWLOG GET",
2594 "SLOWLOG LEN",
2595 "SLOWLOG RESET",
2596 "SLOWLOG",
2597 "SMOVE",
2598 "SORT",
2599 "SUNION",
2600 "SUNIONSTORE",
2601 "TIME",
2602)
2603for command in PIPELINE_BLOCKED_COMMANDS:
2604 command = command.replace(" ", "_").lower()
2606 setattr(ClusterPipeline, command, block_pipeline_command(command))
2609class PipelineCommand:
2610 """ """
2612 def __init__(self, args, options=None, position=None):
2613 self.args = args
2614 if options is None:
2615 options = {}
2616 self.options = options
2617 self.position = position
2618 self.result = None
2619 self.node = None
2620 self.asking = False
2621 self.command_policies: Optional[CommandPolicies] = None
2624class NodeCommands:
2625 """ """
2627 def __init__(self, parse_response, connection_pool, connection):
2628 """ """
2629 self.parse_response = parse_response
2630 self.connection_pool = connection_pool
2631 self.connection = connection
2632 self.commands = []
2634 def append(self, c):
2635 """ """
2636 self.commands.append(c)
2638 def write(self):
2639 """
2640 Code borrowed from Redis so it can be fixed
2641 """
2642 connection = self.connection
2643 commands = self.commands
2645 # We are going to clobber the commands with the write, so go ahead
2646 # and ensure that nothing is sitting there from a previous run.
2647 for c in commands:
2648 c.result = None
2650 # build up all commands into a single request to increase network perf
2651 # send all the commands and catch connection and timeout errors.
2652 try:
2653 connection.send_packed_command(
2654 connection.pack_commands([c.args for c in commands])
2655 )
2656 except (ConnectionError, TimeoutError) as e:
2657 for c in commands:
2658 c.result = e
2660 def read(self):
2661 """ """
2662 connection = self.connection
2663 for c in self.commands:
2664 # if there is a result on this command,
2665 # it means we ran into an exception
2666 # like a connection error. Trying to parse
2667 # a response on a connection that
2668 # is no longer open will result in a
2669 # connection error raised by redis-py.
2670 # but redis-py doesn't check in parse_response
2671 # that the sock object is
2672 # still set and if you try to
2673 # read from a closed connection, it will
2674 # result in an AttributeError because
2675 # it will do a readline() call on None.
2676 # This can have all kinds of nasty side-effects.
2677 # Treating this case as a connection error
2678 # is fine because it will dump
2679 # the connection object back into the
2680 # pool and on the next write, it will
2681 # explicitly open the connection and all will be well.
2682 if c.result is None:
2683 try:
2684 c.result = self.parse_response(connection, c.args[0], **c.options)
2685 except (ConnectionError, TimeoutError) as e:
2686 for c in self.commands:
2687 c.result = e
2688 return
2689 except RedisError:
2690 c.result = sys.exc_info()[1]
2693class ExecutionStrategy(ABC):
2694 @property
2695 @abstractmethod
2696 def command_queue(self):
2697 pass
2699 @abstractmethod
2700 def execute_command(self, *args, **kwargs):
2701 """
2702 Execution flow for current execution strategy.
2704 See: ClusterPipeline.execute_command()
2705 """
2706 pass
2708 @abstractmethod
2709 def annotate_exception(self, exception, number, command):
2710 """
2711 Annotate exception according to current execution strategy.
2713 See: ClusterPipeline.annotate_exception()
2714 """
2715 pass
2717 @abstractmethod
2718 def pipeline_execute_command(self, *args, **options):
2719 """
2720 Pipeline execution flow for current execution strategy.
2722 See: ClusterPipeline.pipeline_execute_command()
2723 """
2724 pass
2726 @abstractmethod
2727 def execute(self, raise_on_error: bool = True) -> List[Any]:
2728 """
2729 Executes current execution strategy.
2731 See: ClusterPipeline.execute()
2732 """
2733 pass
2735 @abstractmethod
2736 def send_cluster_commands(
2737 self, stack, raise_on_error=True, allow_redirections=True
2738 ):
2739 """
2740 Sends commands according to current execution strategy.
2742 See: ClusterPipeline.send_cluster_commands()
2743 """
2744 pass
2746 @abstractmethod
2747 def reset(self):
2748 """
2749 Resets current execution strategy.
2751 See: ClusterPipeline.reset()
2752 """
2753 pass
2755 @abstractmethod
2756 def exists(self, *keys):
2757 pass
2759 @abstractmethod
2760 def eval(self):
2761 pass
2763 @abstractmethod
2764 def multi(self):
2765 """
2766 Starts transactional context.
2768 See: ClusterPipeline.multi()
2769 """
2770 pass
2772 @abstractmethod
2773 def load_scripts(self):
2774 pass
2776 @abstractmethod
2777 def watch(self, *names):
2778 pass
2780 @abstractmethod
2781 def unwatch(self):
2782 """
2783 Unwatches all previously specified keys
2785 See: ClusterPipeline.unwatch()
2786 """
2787 pass
2789 @abstractmethod
2790 def script_load_for_pipeline(self, *args, **kwargs):
2791 pass
2793 @abstractmethod
2794 def delete(self, *names):
2795 """
2796 "Delete a key specified by ``names``"
2798 See: ClusterPipeline.delete()
2799 """
2800 pass
2802 @abstractmethod
2803 def unlink(self, *names):
2804 """
2805 "Unlink a key specified by ``names``"
2807 See: ClusterPipeline.unlink()
2808 """
2809 pass
2811 @abstractmethod
2812 def discard(self):
2813 pass
2816class AbstractStrategy(ExecutionStrategy):
2817 def __init__(
2818 self,
2819 pipe: ClusterPipeline,
2820 ):
2821 self._command_queue: List[PipelineCommand] = []
2822 self._pipe = pipe
2823 self._nodes_manager = self._pipe.nodes_manager
2825 @property
2826 def command_queue(self):
2827 return self._command_queue
2829 @command_queue.setter
2830 def command_queue(self, queue: List[PipelineCommand]):
2831 self._command_queue = queue
2833 @abstractmethod
2834 def execute_command(self, *args, **kwargs):
2835 pass
2837 def pipeline_execute_command(self, *args, **options):
2838 self._command_queue.append(
2839 PipelineCommand(args, options, len(self._command_queue))
2840 )
2841 return self._pipe
2843 @abstractmethod
2844 def execute(self, raise_on_error: bool = True) -> List[Any]:
2845 pass
2847 @abstractmethod
2848 def send_cluster_commands(
2849 self, stack, raise_on_error=True, allow_redirections=True
2850 ):
2851 pass
2853 @abstractmethod
2854 def reset(self):
2855 pass
2857 def exists(self, *keys):
2858 return self.execute_command("EXISTS", *keys)
2860 def eval(self):
2861 """ """
2862 raise RedisClusterException("method eval() is not implemented")
2864 def load_scripts(self):
2865 """ """
2866 raise RedisClusterException("method load_scripts() is not implemented")
2868 def script_load_for_pipeline(self, *args, **kwargs):
2869 """ """
2870 raise RedisClusterException(
2871 "method script_load_for_pipeline() is not implemented"
2872 )
2874 def annotate_exception(self, exception, number, command):
2875 """
2876 Provides extra context to the exception prior to it being handled
2877 """
2878 cmd = " ".join(map(safe_str, command))
2879 msg = (
2880 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
2881 f"caused error: {exception.args[0]}"
2882 )
2883 exception.args = (msg,) + exception.args[1:]
2886class PipelineStrategy(AbstractStrategy):
2887 def __init__(self, pipe: ClusterPipeline):
2888 super().__init__(pipe)
2889 self.command_flags = pipe.command_flags
2891 def execute_command(self, *args, **kwargs):
2892 return self.pipeline_execute_command(*args, **kwargs)
2894 def _raise_first_error(self, stack):
2895 """
2896 Raise the first exception on the stack
2897 """
2898 for c in stack:
2899 r = c.result
2900 if isinstance(r, Exception):
2901 self.annotate_exception(r, c.position + 1, c.args)
2902 raise r
2904 def execute(self, raise_on_error: bool = True) -> List[Any]:
2905 stack = self._command_queue
2906 if not stack:
2907 return []
2909 try:
2910 return self.send_cluster_commands(stack, raise_on_error)
2911 finally:
2912 self.reset()
2914 def reset(self):
2915 """
2916 Reset back to empty pipeline.
2917 """
2918 self._command_queue = []
2920 def send_cluster_commands(
2921 self, stack, raise_on_error=True, allow_redirections=True
2922 ):
2923 """
2924 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
2926 If one of the retryable exceptions has been thrown we assume that:
2927 - connection_pool was disconnected
2928 - connection_pool was reset
2929 - refresh_table_asap set to True
2931 It will try the number of times specified by
2932 the retries in config option "self.retry"
2933 which defaults to 3 unless manually configured.
2935 If it reaches the number of times, the command will
2936 raises ClusterDownException.
2937 """
2938 if not stack:
2939 return []
2940 retry_attempts = self._pipe.retry.get_retries()
2941 while True:
2942 try:
2943 return self._send_cluster_commands(
2944 stack,
2945 raise_on_error=raise_on_error,
2946 allow_redirections=allow_redirections,
2947 )
2948 except RedisCluster.ERRORS_ALLOW_RETRY as e:
2949 if retry_attempts > 0:
2950 # Try again with the new cluster setup. All other errors
2951 # should be raised.
2952 retry_attempts -= 1
2953 pass
2954 else:
2955 raise e
2957 def _send_cluster_commands(
2958 self, stack, raise_on_error=True, allow_redirections=True
2959 ):
2960 """
2961 Send a bunch of cluster commands to the redis cluster.
2963 `allow_redirections` If the pipeline should follow
2964 `ASK` & `MOVED` responses automatically. If set
2965 to false it will raise RedisClusterException.
2966 """
2967 # the first time sending the commands we send all of
2968 # the commands that were queued up.
2969 # if we have to run through it again, we only retry
2970 # the commands that failed.
2971 attempt = sorted(stack, key=lambda x: x.position)
2972 is_default_node = False
2973 # build a list of node objects based on node names we need to
2974 nodes = {}
2976 # as we move through each command that still needs to be processed,
2977 # we figure out the slot number that command maps to, then from
2978 # the slot determine the node.
2979 for c in attempt:
2980 command_policies = self._pipe._policy_resolver.resolve(c.args[0].lower())
2982 while True:
2983 # refer to our internal node -> slot table that
2984 # tells us where a given command should route to.
2985 # (it might be possible we have a cached node that no longer
2986 # exists in the cluster, which is why we do this in a loop)
2987 passed_targets = c.options.pop("target_nodes", None)
2988 if passed_targets and not self._is_nodes_flag(passed_targets):
2989 target_nodes = self._parse_target_nodes(passed_targets)
2991 if not command_policies:
2992 command_policies = CommandPolicies()
2993 else:
2994 if not command_policies:
2995 command = c.args[0].upper()
2996 if (
2997 len(c.args) >= 2
2998 and f"{c.args[0]} {c.args[1]}".upper()
2999 in self._pipe.command_flags
3000 ):
3001 command = f"{c.args[0]} {c.args[1]}".upper()
3003 # We only could resolve key properties if command is not
3004 # in a list of pre-defined request policies
3005 command_flag = self.command_flags.get(command)
3006 if not command_flag:
3007 # Fallback to default policy
3008 if not self._pipe.get_default_node():
3009 keys = None
3010 else:
3011 keys = self._pipe._get_command_keys(*c.args)
3012 if not keys or len(keys) == 0:
3013 command_policies = CommandPolicies()
3014 else:
3015 command_policies = CommandPolicies(
3016 request_policy=RequestPolicy.DEFAULT_KEYED,
3017 response_policy=ResponsePolicy.DEFAULT_KEYED,
3018 )
3019 else:
3020 if command_flag in self._pipe._command_flags_mapping:
3021 command_policies = CommandPolicies(
3022 request_policy=self._pipe._command_flags_mapping[
3023 command_flag
3024 ]
3025 )
3026 else:
3027 command_policies = CommandPolicies()
3029 target_nodes = self._determine_nodes(
3030 *c.args,
3031 request_policy=command_policies.request_policy,
3032 node_flag=passed_targets,
3033 )
3034 if not target_nodes:
3035 raise RedisClusterException(
3036 f"No targets were found to execute {c.args} command on"
3037 )
3038 c.command_policies = command_policies
3039 if len(target_nodes) > 1:
3040 raise RedisClusterException(
3041 f"Too many targets for command {c.args}"
3042 )
3044 node = target_nodes[0]
3045 if node == self._pipe.get_default_node():
3046 is_default_node = True
3048 # now that we know the name of the node
3049 # ( it's just a string in the form of host:port )
3050 # we can build a list of commands for each node.
3051 node_name = node.name
3052 if node_name not in nodes:
3053 redis_node = self._pipe.get_redis_connection(node)
3054 try:
3055 connection = get_connection(redis_node)
3056 except (ConnectionError, TimeoutError):
3057 for n in nodes.values():
3058 n.connection_pool.release(n.connection)
3059 # Connection retries are being handled in the node's
3060 # Retry object. Reinitialize the node -> slot table.
3061 self._nodes_manager.initialize()
3062 if is_default_node:
3063 self._pipe.replace_default_node()
3064 raise
3065 nodes[node_name] = NodeCommands(
3066 redis_node.parse_response,
3067 redis_node.connection_pool,
3068 connection,
3069 )
3070 nodes[node_name].append(c)
3071 break
3073 # send the commands in sequence.
3074 # we write to all the open sockets for each node first,
3075 # before reading anything
3076 # this allows us to flush all the requests out across the
3077 # network
3078 # so that we can read them from different sockets as they come back.
3079 # we dont' multiplex on the sockets as they come available,
3080 # but that shouldn't make too much difference.
3081 try:
3082 node_commands = nodes.values()
3083 for n in node_commands:
3084 n.write()
3086 for n in node_commands:
3087 n.read()
3088 finally:
3089 # release all of the redis connections we allocated earlier
3090 # back into the connection pool.
3091 # we used to do this step as part of a try/finally block,
3092 # but it is really dangerous to
3093 # release connections back into the pool if for some
3094 # reason the socket has data still left in it
3095 # from a previous operation. The write and
3096 # read operations already have try/catch around them for
3097 # all known types of errors including connection
3098 # and socket level errors.
3099 # So if we hit an exception, something really bad
3100 # happened and putting any oF
3101 # these connections back into the pool is a very bad idea.
3102 # the socket might have unread buffer still sitting in it,
3103 # and then the next time we read from it we pass the
3104 # buffered result back from a previous command and
3105 # every single request after to that connection will always get
3106 # a mismatched result.
3107 for n in nodes.values():
3108 n.connection_pool.release(n.connection)
3110 # if the response isn't an exception it is a
3111 # valid response from the node
3112 # we're all done with that command, YAY!
3113 # if we have more commands to attempt, we've run into problems.
3114 # collect all the commands we are allowed to retry.
3115 # (MOVED, ASK, or connection errors or timeout errors)
3116 attempt = sorted(
3117 (
3118 c
3119 for c in attempt
3120 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
3121 ),
3122 key=lambda x: x.position,
3123 )
3124 if attempt and allow_redirections:
3125 # RETRY MAGIC HAPPENS HERE!
3126 # send these remaining commands one at a time using `execute_command`
3127 # in the main client. This keeps our retry logic
3128 # in one place mostly,
3129 # and allows us to be more confident in correctness of behavior.
3130 # at this point any speed gains from pipelining have been lost
3131 # anyway, so we might as well make the best
3132 # attempt to get the correct behavior.
3133 #
3134 # The client command will handle retries for each
3135 # individual command sequentially as we pass each
3136 # one into `execute_command`. Any exceptions
3137 # that bubble out should only appear once all
3138 # retries have been exhausted.
3139 #
3140 # If a lot of commands have failed, we'll be setting the
3141 # flag to rebuild the slots table from scratch.
3142 # So MOVED errors should correct themselves fairly quickly.
3143 self._pipe.reinitialize_counter += 1
3144 if self._pipe._should_reinitialized():
3145 self._nodes_manager.initialize()
3146 if is_default_node:
3147 self._pipe.replace_default_node()
3148 for c in attempt:
3149 try:
3150 # send each command individually like we
3151 # do in the main client.
3152 c.result = self._pipe.parent_execute_command(*c.args, **c.options)
3153 except RedisError as e:
3154 c.result = e
3156 # turn the response back into a simple flat array that corresponds
3157 # to the sequence of commands issued in the stack in pipeline.execute()
3158 response = []
3159 for c in sorted(stack, key=lambda x: x.position):
3160 if c.args[0] in self._pipe.cluster_response_callbacks:
3161 # Remove keys entry, it needs only for cache.
3162 c.options.pop("keys", None)
3163 c.result = self._pipe._policies_callback_mapping[
3164 c.command_policies.response_policy
3165 ](
3166 self._pipe.cluster_response_callbacks[c.args[0]](
3167 c.result, **c.options
3168 )
3169 )
3170 response.append(c.result)
3172 if raise_on_error:
3173 self._raise_first_error(stack)
3175 return response
3177 def _is_nodes_flag(self, target_nodes):
3178 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
3180 def _parse_target_nodes(self, target_nodes):
3181 if isinstance(target_nodes, list):
3182 nodes = target_nodes
3183 elif isinstance(target_nodes, ClusterNode):
3184 # Supports passing a single ClusterNode as a variable
3185 nodes = [target_nodes]
3186 elif isinstance(target_nodes, dict):
3187 # Supports dictionaries of the format {node_name: node}.
3188 # It enables to execute commands with multi nodes as follows:
3189 # rc.cluster_save_config(rc.get_primaries())
3190 nodes = target_nodes.values()
3191 else:
3192 raise TypeError(
3193 "target_nodes type can be one of the following: "
3194 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
3195 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
3196 f"The passed type is {type(target_nodes)}"
3197 )
3198 return nodes
3200 def _determine_nodes(
3201 self, *args, request_policy: RequestPolicy, **kwargs
3202 ) -> List["ClusterNode"]:
3203 # Determine which nodes should be executed the command on.
3204 # Returns a list of target nodes.
3205 command = args[0].upper()
3206 if (
3207 len(args) >= 2
3208 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
3209 ):
3210 command = f"{args[0]} {args[1]}".upper()
3212 nodes_flag = kwargs.pop("nodes_flag", None)
3213 if nodes_flag is not None:
3214 # nodes flag passed by the user
3215 command_flag = nodes_flag
3216 else:
3217 # get the nodes group for this command if it was predefined
3218 command_flag = self._pipe.command_flags.get(command)
3220 if command_flag in self._pipe._command_flags_mapping:
3221 request_policy = self._pipe._command_flags_mapping[command_flag]
3223 policy_callback = self._pipe._policies_callback_mapping[request_policy]
3225 if request_policy == RequestPolicy.DEFAULT_KEYED:
3226 nodes = policy_callback(command, *args)
3227 elif request_policy == RequestPolicy.MULTI_SHARD:
3228 nodes = policy_callback(*args, **kwargs)
3229 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
3230 nodes = policy_callback(args[0])
3231 else:
3232 nodes = policy_callback()
3234 if args[0].lower() == "ft.aggregate":
3235 self._aggregate_nodes = nodes
3237 return nodes
3239 def multi(self):
3240 raise RedisClusterException(
3241 "method multi() is not supported outside of transactional context"
3242 )
3244 def discard(self):
3245 raise RedisClusterException(
3246 "method discard() is not supported outside of transactional context"
3247 )
3249 def watch(self, *names):
3250 raise RedisClusterException(
3251 "method watch() is not supported outside of transactional context"
3252 )
3254 def unwatch(self, *names):
3255 raise RedisClusterException(
3256 "method unwatch() is not supported outside of transactional context"
3257 )
3259 def delete(self, *names):
3260 if len(names) != 1:
3261 raise RedisClusterException(
3262 "deleting multiple keys is not implemented in pipeline command"
3263 )
3265 return self.execute_command("DEL", names[0])
3267 def unlink(self, *names):
3268 if len(names) != 1:
3269 raise RedisClusterException(
3270 "unlinking multiple keys is not implemented in pipeline command"
3271 )
3273 return self.execute_command("UNLINK", names[0])
3276class TransactionStrategy(AbstractStrategy):
3277 NO_SLOTS_COMMANDS = {"UNWATCH"}
3278 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
3279 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3280 SLOT_REDIRECT_ERRORS = (AskError, MovedError)
3281 CONNECTION_ERRORS = (
3282 ConnectionError,
3283 OSError,
3284 ClusterDownError,
3285 SlotNotCoveredError,
3286 )
3288 def __init__(self, pipe: ClusterPipeline):
3289 super().__init__(pipe)
3290 self._explicit_transaction = False
3291 self._watching = False
3292 self._pipeline_slots: Set[int] = set()
3293 self._transaction_connection: Optional[Connection] = None
3294 self._executing = False
3295 self._retry = copy(self._pipe.retry)
3296 self._retry.update_supported_errors(
3297 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
3298 )
3300 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
3301 """
3302 Find a connection for a pipeline transaction.
3304 For running an atomic transaction, watch keys ensure that contents have not been
3305 altered as long as the watch commands for those keys were sent over the same
3306 connection. So once we start watching a key, we fetch a connection to the
3307 node that owns that slot and reuse it.
3308 """
3309 if not self._pipeline_slots:
3310 raise RedisClusterException(
3311 "At least a command with a key is needed to identify a node"
3312 )
3314 node: ClusterNode = self._nodes_manager.get_node_from_slot(
3315 list(self._pipeline_slots)[0], False
3316 )
3317 redis_node: Redis = self._pipe.get_redis_connection(node)
3318 if self._transaction_connection:
3319 if not redis_node.connection_pool.owns_connection(
3320 self._transaction_connection
3321 ):
3322 previous_node = self._nodes_manager.find_connection_owner(
3323 self._transaction_connection
3324 )
3325 previous_node.connection_pool.release(self._transaction_connection)
3326 self._transaction_connection = None
3328 if not self._transaction_connection:
3329 self._transaction_connection = get_connection(redis_node)
3331 return redis_node, self._transaction_connection
3333 def execute_command(self, *args, **kwargs):
3334 slot_number: Optional[int] = None
3335 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
3336 slot_number = self._pipe.determine_slot(*args)
3338 if (
3339 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
3340 ) and not self._explicit_transaction:
3341 if args[0] == "WATCH":
3342 self._validate_watch()
3344 if slot_number is not None:
3345 if self._pipeline_slots and slot_number not in self._pipeline_slots:
3346 raise CrossSlotTransactionError(
3347 "Cannot watch or send commands on different slots"
3348 )
3350 self._pipeline_slots.add(slot_number)
3351 elif args[0] not in self.NO_SLOTS_COMMANDS:
3352 raise RedisClusterException(
3353 f"Cannot identify slot number for command: {args[0]},"
3354 "it cannot be triggered in a transaction"
3355 )
3357 return self._immediate_execute_command(*args, **kwargs)
3358 else:
3359 if slot_number is not None:
3360 self._pipeline_slots.add(slot_number)
3362 return self.pipeline_execute_command(*args, **kwargs)
3364 def _validate_watch(self):
3365 if self._explicit_transaction:
3366 raise RedisError("Cannot issue a WATCH after a MULTI")
3368 self._watching = True
3370 def _immediate_execute_command(self, *args, **options):
3371 return self._retry.call_with_retry(
3372 lambda: self._get_connection_and_send_command(*args, **options),
3373 self._reinitialize_on_error,
3374 )
3376 def _get_connection_and_send_command(self, *args, **options):
3377 redis_node, connection = self._get_client_and_connection_for_transaction()
3378 return self._send_command_parse_response(
3379 connection, redis_node, args[0], *args, **options
3380 )
3382 def _send_command_parse_response(
3383 self, conn, redis_node: Redis, command_name, *args, **options
3384 ):
3385 """
3386 Send a command and parse the response
3387 """
3389 conn.send_command(*args)
3390 output = redis_node.parse_response(conn, command_name, **options)
3392 if command_name in self.UNWATCH_COMMANDS:
3393 self._watching = False
3394 return output
3396 def _reinitialize_on_error(self, error):
3397 if self._watching:
3398 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
3399 raise WatchError("Slot rebalancing occurred while watching keys")
3401 if (
3402 type(error) in self.SLOT_REDIRECT_ERRORS
3403 or type(error) in self.CONNECTION_ERRORS
3404 ):
3405 if self._transaction_connection:
3406 self._transaction_connection = None
3408 self._pipe.reinitialize_counter += 1
3409 if self._pipe._should_reinitialized():
3410 self._nodes_manager.initialize()
3411 self.reinitialize_counter = 0
3412 else:
3413 if isinstance(error, AskError):
3414 self._nodes_manager.update_moved_exception(error)
3416 self._executing = False
3418 def _raise_first_error(self, responses, stack):
3419 """
3420 Raise the first exception on the stack
3421 """
3422 for r, cmd in zip(responses, stack):
3423 if isinstance(r, Exception):
3424 self.annotate_exception(r, cmd.position + 1, cmd.args)
3425 raise r
3427 def execute(self, raise_on_error: bool = True) -> List[Any]:
3428 stack = self._command_queue
3429 if not stack and (not self._watching or not self._pipeline_slots):
3430 return []
3432 return self._execute_transaction_with_retries(stack, raise_on_error)
3434 def _execute_transaction_with_retries(
3435 self, stack: List["PipelineCommand"], raise_on_error: bool
3436 ):
3437 return self._retry.call_with_retry(
3438 lambda: self._execute_transaction(stack, raise_on_error),
3439 self._reinitialize_on_error,
3440 )
3442 def _execute_transaction(
3443 self, stack: List["PipelineCommand"], raise_on_error: bool
3444 ):
3445 if len(self._pipeline_slots) > 1:
3446 raise CrossSlotTransactionError(
3447 "All keys involved in a cluster transaction must map to the same slot"
3448 )
3450 self._executing = True
3452 redis_node, connection = self._get_client_and_connection_for_transaction()
3454 stack = chain(
3455 [PipelineCommand(("MULTI",))],
3456 stack,
3457 [PipelineCommand(("EXEC",))],
3458 )
3459 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
3460 packed_commands = connection.pack_commands(commands)
3461 connection.send_packed_command(packed_commands)
3462 errors = []
3464 # parse off the response for MULTI
3465 # NOTE: we need to handle ResponseErrors here and continue
3466 # so that we read all the additional command messages from
3467 # the socket
3468 try:
3469 redis_node.parse_response(connection, "MULTI")
3470 except ResponseError as e:
3471 self.annotate_exception(e, 0, "MULTI")
3472 errors.append(e)
3473 except self.CONNECTION_ERRORS as cluster_error:
3474 self.annotate_exception(cluster_error, 0, "MULTI")
3475 raise
3477 # and all the other commands
3478 for i, command in enumerate(self._command_queue):
3479 if EMPTY_RESPONSE in command.options:
3480 errors.append((i, command.options[EMPTY_RESPONSE]))
3481 else:
3482 try:
3483 _ = redis_node.parse_response(connection, "_")
3484 except self.SLOT_REDIRECT_ERRORS as slot_error:
3485 self.annotate_exception(slot_error, i + 1, command.args)
3486 errors.append(slot_error)
3487 except self.CONNECTION_ERRORS as cluster_error:
3488 self.annotate_exception(cluster_error, i + 1, command.args)
3489 raise
3490 except ResponseError as e:
3491 self.annotate_exception(e, i + 1, command.args)
3492 errors.append(e)
3494 response = None
3495 # parse the EXEC.
3496 try:
3497 response = redis_node.parse_response(connection, "EXEC")
3498 except ExecAbortError:
3499 if errors:
3500 raise errors[0]
3501 raise
3503 self._executing = False
3505 # EXEC clears any watched keys
3506 self._watching = False
3508 if response is None:
3509 raise WatchError("Watched variable changed.")
3511 # put any parse errors into the response
3512 for i, e in errors:
3513 response.insert(i, e)
3515 if len(response) != len(self._command_queue):
3516 raise InvalidPipelineStack(
3517 "Unexpected response length for cluster pipeline EXEC."
3518 " Command stack was {} but response had length {}".format(
3519 [c.args[0] for c in self._command_queue], len(response)
3520 )
3521 )
3523 # find any errors in the response and raise if necessary
3524 if raise_on_error or len(errors) > 0:
3525 self._raise_first_error(
3526 response,
3527 self._command_queue,
3528 )
3530 # We have to run response callbacks manually
3531 data = []
3532 for r, cmd in zip(response, self._command_queue):
3533 if not isinstance(r, Exception):
3534 command_name = cmd.args[0]
3535 if command_name in self._pipe.cluster_response_callbacks:
3536 r = self._pipe.cluster_response_callbacks[command_name](
3537 r, **cmd.options
3538 )
3539 data.append(r)
3540 return data
3542 def reset(self):
3543 self._command_queue = []
3545 # make sure to reset the connection state in the event that we were
3546 # watching something
3547 if self._transaction_connection:
3548 try:
3549 if self._watching:
3550 # call this manually since our unwatch or
3551 # immediate_execute_command methods can call reset()
3552 self._transaction_connection.send_command("UNWATCH")
3553 self._transaction_connection.read_response()
3554 # we can safely return the connection to the pool here since we're
3555 # sure we're no longer WATCHing anything
3556 node = self._nodes_manager.find_connection_owner(
3557 self._transaction_connection
3558 )
3559 node.redis_connection.connection_pool.release(
3560 self._transaction_connection
3561 )
3562 self._transaction_connection = None
3563 except self.CONNECTION_ERRORS:
3564 # disconnect will also remove any previous WATCHes
3565 if self._transaction_connection:
3566 self._transaction_connection.disconnect()
3568 # clean up the other instance attributes
3569 self._watching = False
3570 self._explicit_transaction = False
3571 self._pipeline_slots = set()
3572 self._executing = False
3574 def send_cluster_commands(
3575 self, stack, raise_on_error=True, allow_redirections=True
3576 ):
3577 raise NotImplementedError(
3578 "send_cluster_commands cannot be executed in transactional context."
3579 )
3581 def multi(self):
3582 if self._explicit_transaction:
3583 raise RedisError("Cannot issue nested calls to MULTI")
3584 if self._command_queue:
3585 raise RedisError(
3586 "Commands without an initial WATCH have already been issued"
3587 )
3588 self._explicit_transaction = True
3590 def watch(self, *names):
3591 if self._explicit_transaction:
3592 raise RedisError("Cannot issue a WATCH after a MULTI")
3594 return self.execute_command("WATCH", *names)
3596 def unwatch(self):
3597 if self._watching:
3598 return self.execute_command("UNWATCH")
3600 return True
3602 def discard(self):
3603 self.reset()
3605 def delete(self, *names):
3606 return self.execute_command("DEL", *names)
3608 def unlink(self, *names):
3609 return self.execute_command("UNLINK", *names)