Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/asyncio/cluster.py: 17%
638 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
1import asyncio
2import collections
3import random
4import socket
5import ssl
6import warnings
7from typing import (
8 Any,
9 Callable,
10 Deque,
11 Dict,
12 Generator,
13 List,
14 Mapping,
15 Optional,
16 Tuple,
17 Type,
18 TypeVar,
19 Union,
20)
22from redis._cache import (
23 DEFAULT_BLACKLIST,
24 DEFAULT_EVICTION_POLICY,
25 DEFAULT_WHITELIST,
26 AbstractCache,
27)
28from redis._parsers import AsyncCommandsParser, Encoder
29from redis._parsers.helpers import (
30 _RedisCallbacks,
31 _RedisCallbacksRESP2,
32 _RedisCallbacksRESP3,
33)
34from redis.asyncio.client import ResponseCallbackT
35from redis.asyncio.connection import Connection, DefaultParser, SSLConnection, parse_url
36from redis.asyncio.lock import Lock
37from redis.asyncio.retry import Retry
38from redis.backoff import default_backoff
39from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
40from redis.cluster import (
41 PIPELINE_BLOCKED_COMMANDS,
42 PRIMARY,
43 REPLICA,
44 SLOT_ID,
45 AbstractRedisCluster,
46 LoadBalancer,
47 block_pipeline_command,
48 get_node_name,
49 parse_cluster_slots,
50)
51from redis.commands import READ_COMMANDS, AsyncRedisClusterCommands
52from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
53from redis.credentials import CredentialProvider
54from redis.exceptions import (
55 AskError,
56 BusyLoadingError,
57 ClusterCrossSlotError,
58 ClusterDownError,
59 ClusterError,
60 ConnectionError,
61 DataError,
62 MasterDownError,
63 MaxConnectionsError,
64 MovedError,
65 RedisClusterException,
66 ResponseError,
67 SlotNotCoveredError,
68 TimeoutError,
69 TryAgainError,
70)
71from redis.typing import AnyKeyT, EncodableT, KeyT
72from redis.utils import (
73 deprecated_function,
74 dict_merge,
75 get_lib_version,
76 safe_str,
77 str_if_bytes,
78)
80TargetNodesT = TypeVar(
81 "TargetNodesT", str, "ClusterNode", List["ClusterNode"], Dict[Any, "ClusterNode"]
82)
85class ClusterParser(DefaultParser):
86 EXCEPTION_CLASSES = dict_merge(
87 DefaultParser.EXCEPTION_CLASSES,
88 {
89 "ASK": AskError,
90 "CLUSTERDOWN": ClusterDownError,
91 "CROSSSLOT": ClusterCrossSlotError,
92 "MASTERDOWN": MasterDownError,
93 "MOVED": MovedError,
94 "TRYAGAIN": TryAgainError,
95 },
96 )
99class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands):
100 """
101 Create a new RedisCluster client.
103 Pass one of parameters:
105 - `host` & `port`
106 - `startup_nodes`
108 | Use ``await`` :meth:`initialize` to find cluster nodes & create connections.
109 | Use ``await`` :meth:`close` to disconnect connections & close client.
111 Many commands support the target_nodes kwarg. It can be one of the
112 :attr:`NODE_FLAGS`:
114 - :attr:`PRIMARIES`
115 - :attr:`REPLICAS`
116 - :attr:`ALL_NODES`
117 - :attr:`RANDOM`
118 - :attr:`DEFAULT_NODE`
120 Note: This client is not thread/process/fork safe.
122 :param host:
123 | Can be used to point to a startup node
124 :param port:
125 | Port used if **host** is provided
126 :param startup_nodes:
127 | :class:`~.ClusterNode` to used as a startup node
128 :param require_full_coverage:
129 | When set to ``False``: the client will not require a full coverage of
130 the slots. However, if not all slots are covered, and at least one node
131 has ``cluster-require-full-coverage`` set to ``yes``, the server will throw
132 a :class:`~.ClusterDownError` for some key-based commands.
133 | When set to ``True``: all slots must be covered to construct the cluster
134 client. If not all slots are covered, :class:`~.RedisClusterException` will be
135 thrown.
136 | See:
137 https://redis.io/docs/manual/scaling/#redis-cluster-configuration-parameters
138 :param read_from_replicas:
139 | Enable read from replicas in READONLY mode. You can read possibly stale data.
140 When set to true, read commands will be assigned between the primary and
141 its replications in a Round-Robin manner.
142 :param reinitialize_steps:
143 | Specifies the number of MOVED errors that need to occur before reinitializing
144 the whole cluster topology. If a MOVED error occurs and the cluster does not
145 need to be reinitialized on this current error handling, only the MOVED slot
146 will be patched with the redirected node.
147 To reinitialize the cluster on every MOVED error, set reinitialize_steps to 1.
148 To avoid reinitializing the cluster on moved errors, set reinitialize_steps to
149 0.
150 :param cluster_error_retry_attempts:
151 | Number of times to retry before raising an error when :class:`~.TimeoutError`
152 or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered
153 :param connection_error_retry_attempts:
154 | Number of times to retry before reinitializing when :class:`~.TimeoutError`
155 or :class:`~.ConnectionError` are encountered.
156 The default backoff strategy will be set if Retry object is not passed (see
157 default_backoff in backoff.py). To change it, pass a custom Retry object
158 using the "retry" keyword.
159 :param max_connections:
160 | Maximum number of connections per node. If there are no free connections & the
161 maximum number of connections are already created, a
162 :class:`~.MaxConnectionsError` is raised. This error may be retried as defined
163 by :attr:`connection_error_retry_attempts`
164 :param address_remap:
165 | An optional callable which, when provided with an internal network
166 address of a node, e.g. a `(host, port)` tuple, will return the address
167 where the node is reachable. This can be used to map the addresses at
168 which the nodes _think_ they are, to addresses at which a client may
169 reach them, such as when they sit behind a proxy.
171 | Rest of the arguments will be passed to the
172 :class:`~redis.asyncio.connection.Connection` instances when created
174 :raises RedisClusterException:
175 if any arguments are invalid or unknown. Eg:
177 - `db` != 0 or None
178 - `path` argument for unix socket connection
179 - none of the `host`/`port` & `startup_nodes` were provided
181 """
183 @classmethod
184 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
185 """
186 Return a Redis client object configured from the given URL.
188 For example::
190 redis://[[username]:[password]]@localhost:6379/0
191 rediss://[[username]:[password]]@localhost:6379/0
193 Three URL schemes are supported:
195 - `redis://` creates a TCP socket connection. See more at:
196 <https://www.iana.org/assignments/uri-schemes/prov/redis>
197 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
198 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
200 The username, password, hostname, path and all querystring values are passed
201 through ``urllib.parse.unquote`` in order to replace any percent-encoded values
202 with their corresponding characters.
204 All querystring options are cast to their appropriate Python types. Boolean
205 arguments can be specified with string values "True"/"False" or "Yes"/"No".
206 Values that cannot be properly cast cause a ``ValueError`` to be raised. Once
207 parsed, the querystring arguments and keyword arguments are passed to
208 :class:`~redis.asyncio.connection.Connection` when created.
209 In the case of conflicting arguments, querystring arguments are used.
210 """
211 kwargs.update(parse_url(url))
212 if kwargs.pop("connection_class", None) is SSLConnection:
213 kwargs["ssl"] = True
214 return cls(**kwargs)
216 __slots__ = (
217 "_initialize",
218 "_lock",
219 "cluster_error_retry_attempts",
220 "command_flags",
221 "commands_parser",
222 "connection_error_retry_attempts",
223 "connection_kwargs",
224 "encoder",
225 "node_flags",
226 "nodes_manager",
227 "read_from_replicas",
228 "reinitialize_counter",
229 "reinitialize_steps",
230 "response_callbacks",
231 "result_callbacks",
232 )
234 def __init__(
235 self,
236 host: Optional[str] = None,
237 port: Union[str, int] = 6379,
238 # Cluster related kwargs
239 startup_nodes: Optional[List["ClusterNode"]] = None,
240 require_full_coverage: bool = True,
241 read_from_replicas: bool = False,
242 reinitialize_steps: int = 5,
243 cluster_error_retry_attempts: int = 3,
244 connection_error_retry_attempts: int = 3,
245 max_connections: int = 2**31,
246 # Client related kwargs
247 db: Union[str, int] = 0,
248 path: Optional[str] = None,
249 credential_provider: Optional[CredentialProvider] = None,
250 username: Optional[str] = None,
251 password: Optional[str] = None,
252 client_name: Optional[str] = None,
253 lib_name: Optional[str] = "redis-py",
254 lib_version: Optional[str] = get_lib_version(),
255 # Encoding related kwargs
256 encoding: str = "utf-8",
257 encoding_errors: str = "strict",
258 decode_responses: bool = False,
259 # Connection related kwargs
260 health_check_interval: float = 0,
261 socket_connect_timeout: Optional[float] = None,
262 socket_keepalive: bool = False,
263 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
264 socket_timeout: Optional[float] = None,
265 retry: Optional["Retry"] = None,
266 retry_on_error: Optional[List[Type[Exception]]] = None,
267 # SSL related kwargs
268 ssl: bool = False,
269 ssl_ca_certs: Optional[str] = None,
270 ssl_ca_data: Optional[str] = None,
271 ssl_cert_reqs: str = "required",
272 ssl_certfile: Optional[str] = None,
273 ssl_check_hostname: bool = False,
274 ssl_keyfile: Optional[str] = None,
275 ssl_min_version: Optional[ssl.TLSVersion] = None,
276 protocol: Optional[int] = 2,
277 address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None,
278 cache_enabled: bool = False,
279 client_cache: Optional[AbstractCache] = None,
280 cache_max_size: int = 100,
281 cache_ttl: int = 0,
282 cache_policy: str = DEFAULT_EVICTION_POLICY,
283 cache_blacklist: List[str] = DEFAULT_BLACKLIST,
284 cache_whitelist: List[str] = DEFAULT_WHITELIST,
285 ) -> None:
286 if db:
287 raise RedisClusterException(
288 "Argument 'db' must be 0 or None in cluster mode"
289 )
291 if path:
292 raise RedisClusterException(
293 "Unix domain socket is not supported in cluster mode"
294 )
296 if (not host or not port) and not startup_nodes:
297 raise RedisClusterException(
298 "RedisCluster requires at least one node to discover the cluster.\n"
299 "Please provide one of the following or use RedisCluster.from_url:\n"
300 ' - host and port: RedisCluster(host="localhost", port=6379)\n'
301 " - startup_nodes: RedisCluster(startup_nodes=["
302 'ClusterNode("localhost", 6379), ClusterNode("localhost", 6380)])'
303 )
305 kwargs: Dict[str, Any] = {
306 "max_connections": max_connections,
307 "connection_class": Connection,
308 "parser_class": ClusterParser,
309 # Client related kwargs
310 "credential_provider": credential_provider,
311 "username": username,
312 "password": password,
313 "client_name": client_name,
314 "lib_name": lib_name,
315 "lib_version": lib_version,
316 # Encoding related kwargs
317 "encoding": encoding,
318 "encoding_errors": encoding_errors,
319 "decode_responses": decode_responses,
320 # Connection related kwargs
321 "health_check_interval": health_check_interval,
322 "socket_connect_timeout": socket_connect_timeout,
323 "socket_keepalive": socket_keepalive,
324 "socket_keepalive_options": socket_keepalive_options,
325 "socket_timeout": socket_timeout,
326 "retry": retry,
327 "protocol": protocol,
328 # Client cache related kwargs
329 "cache_enabled": cache_enabled,
330 "client_cache": client_cache,
331 "cache_max_size": cache_max_size,
332 "cache_ttl": cache_ttl,
333 "cache_policy": cache_policy,
334 "cache_blacklist": cache_blacklist,
335 "cache_whitelist": cache_whitelist,
336 }
338 if ssl:
339 # SSL related kwargs
340 kwargs.update(
341 {
342 "connection_class": SSLConnection,
343 "ssl_ca_certs": ssl_ca_certs,
344 "ssl_ca_data": ssl_ca_data,
345 "ssl_cert_reqs": ssl_cert_reqs,
346 "ssl_certfile": ssl_certfile,
347 "ssl_check_hostname": ssl_check_hostname,
348 "ssl_keyfile": ssl_keyfile,
349 "ssl_min_version": ssl_min_version,
350 }
351 )
353 if read_from_replicas:
354 # Call our on_connect function to configure READONLY mode
355 kwargs["redis_connect_func"] = self.on_connect
357 self.retry = retry
358 if retry or retry_on_error or connection_error_retry_attempts > 0:
359 # Set a retry object for all cluster nodes
360 self.retry = retry or Retry(
361 default_backoff(), connection_error_retry_attempts
362 )
363 if not retry_on_error:
364 # Default errors for retrying
365 retry_on_error = [ConnectionError, TimeoutError]
366 self.retry.update_supported_errors(retry_on_error)
367 kwargs.update({"retry": self.retry})
369 kwargs["response_callbacks"] = _RedisCallbacks.copy()
370 if kwargs.get("protocol") in ["3", 3]:
371 kwargs["response_callbacks"].update(_RedisCallbacksRESP3)
372 else:
373 kwargs["response_callbacks"].update(_RedisCallbacksRESP2)
374 self.connection_kwargs = kwargs
376 if startup_nodes:
377 passed_nodes = []
378 for node in startup_nodes:
379 passed_nodes.append(
380 ClusterNode(node.host, node.port, **self.connection_kwargs)
381 )
382 startup_nodes = passed_nodes
383 else:
384 startup_nodes = []
385 if host and port:
386 startup_nodes.append(ClusterNode(host, port, **self.connection_kwargs))
388 self.nodes_manager = NodesManager(
389 startup_nodes,
390 require_full_coverage,
391 kwargs,
392 address_remap=address_remap,
393 )
394 self.encoder = Encoder(encoding, encoding_errors, decode_responses)
395 self.read_from_replicas = read_from_replicas
396 self.reinitialize_steps = reinitialize_steps
397 self.cluster_error_retry_attempts = cluster_error_retry_attempts
398 self.connection_error_retry_attempts = connection_error_retry_attempts
399 self.reinitialize_counter = 0
400 self.commands_parser = AsyncCommandsParser()
401 self.node_flags = self.__class__.NODE_FLAGS.copy()
402 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
403 self.response_callbacks = kwargs["response_callbacks"]
404 self.result_callbacks = self.__class__.RESULT_CALLBACKS.copy()
405 self.result_callbacks["CLUSTER SLOTS"] = (
406 lambda cmd, res, **kwargs: parse_cluster_slots(
407 list(res.values())[0], **kwargs
408 )
409 )
411 self._initialize = True
412 self._lock: Optional[asyncio.Lock] = None
414 async def initialize(self) -> "RedisCluster":
415 """Get all nodes from startup nodes & creates connections if not initialized."""
416 if self._initialize:
417 if not self._lock:
418 self._lock = asyncio.Lock()
419 async with self._lock:
420 if self._initialize:
421 try:
422 await self.nodes_manager.initialize()
423 await self.commands_parser.initialize(
424 self.nodes_manager.default_node
425 )
426 self._initialize = False
427 except BaseException:
428 await self.nodes_manager.aclose()
429 await self.nodes_manager.aclose("startup_nodes")
430 raise
431 return self
433 async def aclose(self) -> None:
434 """Close all connections & client if initialized."""
435 if not self._initialize:
436 if not self._lock:
437 self._lock = asyncio.Lock()
438 async with self._lock:
439 if not self._initialize:
440 self._initialize = True
441 await self.nodes_manager.aclose()
442 await self.nodes_manager.aclose("startup_nodes")
444 @deprecated_function(version="5.0.0", reason="Use aclose() instead", name="close")
445 async def close(self) -> None:
446 """alias for aclose() for backwards compatibility"""
447 await self.aclose()
449 async def __aenter__(self) -> "RedisCluster":
450 return await self.initialize()
452 async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> None:
453 await self.aclose()
455 def __await__(self) -> Generator[Any, None, "RedisCluster"]:
456 return self.initialize().__await__()
458 _DEL_MESSAGE = "Unclosed RedisCluster client"
460 def __del__(
461 self,
462 _warn: Any = warnings.warn,
463 _grl: Any = asyncio.get_running_loop,
464 ) -> None:
465 if hasattr(self, "_initialize") and not self._initialize:
466 _warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self)
467 try:
468 context = {"client": self, "message": self._DEL_MESSAGE}
469 _grl().call_exception_handler(context)
470 except RuntimeError:
471 pass
473 async def on_connect(self, connection: Connection) -> None:
474 await connection.on_connect()
476 # Sending READONLY command to server to configure connection as
477 # readonly. Since each cluster node may change its server type due
478 # to a failover, we should establish a READONLY connection
479 # regardless of the server type. If this is a primary connection,
480 # READONLY would not affect executing write commands.
481 await connection.send_command("READONLY")
482 if str_if_bytes(await connection.read_response()) != "OK":
483 raise ConnectionError("READONLY command failed")
485 def get_nodes(self) -> List["ClusterNode"]:
486 """Get all nodes of the cluster."""
487 return list(self.nodes_manager.nodes_cache.values())
489 def get_primaries(self) -> List["ClusterNode"]:
490 """Get the primary nodes of the cluster."""
491 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
493 def get_replicas(self) -> List["ClusterNode"]:
494 """Get the replica nodes of the cluster."""
495 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
497 def get_random_node(self) -> "ClusterNode":
498 """Get a random node of the cluster."""
499 return random.choice(list(self.nodes_manager.nodes_cache.values()))
501 def get_default_node(self) -> "ClusterNode":
502 """Get the default node of the client."""
503 return self.nodes_manager.default_node
505 def set_default_node(self, node: "ClusterNode") -> None:
506 """
507 Set the default node of the client.
509 :raises DataError: if None is passed or node does not exist in cluster.
510 """
511 if not node or not self.get_node(node_name=node.name):
512 raise DataError("The requested node does not exist in the cluster.")
514 self.nodes_manager.default_node = node
516 def get_node(
517 self,
518 host: Optional[str] = None,
519 port: Optional[int] = None,
520 node_name: Optional[str] = None,
521 ) -> Optional["ClusterNode"]:
522 """Get node by (host, port) or node_name."""
523 return self.nodes_manager.get_node(host, port, node_name)
525 def get_node_from_key(
526 self, key: str, replica: bool = False
527 ) -> Optional["ClusterNode"]:
528 """
529 Get the cluster node corresponding to the provided key.
531 :param key:
532 :param replica:
533 | Indicates if a replica should be returned
534 |
535 None will returned if no replica holds this key
537 :raises SlotNotCoveredError: if the key is not covered by any slot.
538 """
539 slot = self.keyslot(key)
540 slot_cache = self.nodes_manager.slots_cache.get(slot)
541 if not slot_cache:
542 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
544 if replica:
545 if len(self.nodes_manager.slots_cache[slot]) < 2:
546 return None
547 node_idx = 1
548 else:
549 node_idx = 0
551 return slot_cache[node_idx]
553 def keyslot(self, key: EncodableT) -> int:
554 """
555 Find the keyslot for a given key.
557 See: https://redis.io/docs/manual/scaling/#redis-cluster-data-sharding
558 """
559 return key_slot(self.encoder.encode(key))
561 def get_encoder(self) -> Encoder:
562 """Get the encoder object of the client."""
563 return self.encoder
565 def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
566 """Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
567 return self.connection_kwargs
569 def get_retry(self) -> Optional["Retry"]:
570 return self.retry
572 def set_retry(self, retry: "Retry") -> None:
573 self.retry = retry
574 for node in self.get_nodes():
575 node.connection_kwargs.update({"retry": retry})
576 for conn in node._connections:
577 conn.retry = retry
579 def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
580 """Set a custom response callback."""
581 self.response_callbacks[command] = callback
583 async def _determine_nodes(
584 self, command: str, *args: Any, node_flag: Optional[str] = None
585 ) -> List["ClusterNode"]:
586 # Determine which nodes should be executed the command on.
587 # Returns a list of target nodes.
588 if not node_flag:
589 # get the nodes group for this command if it was predefined
590 node_flag = self.command_flags.get(command)
592 if node_flag in self.node_flags:
593 if node_flag == self.__class__.DEFAULT_NODE:
594 # return the cluster's default node
595 return [self.nodes_manager.default_node]
596 if node_flag == self.__class__.PRIMARIES:
597 # return all primaries
598 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
599 if node_flag == self.__class__.REPLICAS:
600 # return all replicas
601 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
602 if node_flag == self.__class__.ALL_NODES:
603 # return all nodes
604 return list(self.nodes_manager.nodes_cache.values())
605 if node_flag == self.__class__.RANDOM:
606 # return a random node
607 return [random.choice(list(self.nodes_manager.nodes_cache.values()))]
609 # get the node that holds the key's slot
610 return [
611 self.nodes_manager.get_node_from_slot(
612 await self._determine_slot(command, *args),
613 self.read_from_replicas and command in READ_COMMANDS,
614 )
615 ]
617 async def _determine_slot(self, command: str, *args: Any) -> int:
618 if self.command_flags.get(command) == SLOT_ID:
619 # The command contains the slot ID
620 return int(args[0])
622 # Get the keys in the command
624 # EVAL and EVALSHA are common enough that it's wasteful to go to the
625 # redis server to parse the keys. Besides, there is a bug in redis<7.0
626 # where `self._get_command_keys()` fails anyway. So, we special case
627 # EVAL/EVALSHA.
628 # - issue: https://github.com/redis/redis/issues/9493
629 # - fix: https://github.com/redis/redis/pull/9733
630 if command.upper() in ("EVAL", "EVALSHA"):
631 # command syntax: EVAL "script body" num_keys ...
632 if len(args) < 2:
633 raise RedisClusterException(
634 f"Invalid args in command: {command, *args}"
635 )
636 keys = args[2 : 2 + int(args[1])]
637 # if there are 0 keys, that means the script can be run on any node
638 # so we can just return a random slot
639 if not keys:
640 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
641 else:
642 keys = await self.commands_parser.get_keys(command, *args)
643 if not keys:
644 # FCALL can call a function with 0 keys, that means the function
645 # can be run on any node so we can just return a random slot
646 if command.upper() in ("FCALL", "FCALL_RO"):
647 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
648 raise RedisClusterException(
649 "No way to dispatch this command to Redis Cluster. "
650 "Missing key.\nYou can execute the command by specifying "
651 f"target nodes.\nCommand: {args}"
652 )
654 # single key command
655 if len(keys) == 1:
656 return self.keyslot(keys[0])
658 # multi-key command; we need to make sure all keys are mapped to
659 # the same slot
660 slots = {self.keyslot(key) for key in keys}
661 if len(slots) != 1:
662 raise RedisClusterException(
663 f"{command} - all keys must map to the same key slot"
664 )
666 return slots.pop()
668 def _is_node_flag(self, target_nodes: Any) -> bool:
669 return isinstance(target_nodes, str) and target_nodes in self.node_flags
671 def _parse_target_nodes(self, target_nodes: Any) -> List["ClusterNode"]:
672 if isinstance(target_nodes, list):
673 nodes = target_nodes
674 elif isinstance(target_nodes, ClusterNode):
675 # Supports passing a single ClusterNode as a variable
676 nodes = [target_nodes]
677 elif isinstance(target_nodes, dict):
678 # Supports dictionaries of the format {node_name: node}.
679 # It enables to execute commands with multi nodes as follows:
680 # rc.cluster_save_config(rc.get_primaries())
681 nodes = list(target_nodes.values())
682 else:
683 raise TypeError(
684 "target_nodes type can be one of the following: "
685 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
686 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
687 f"The passed type is {type(target_nodes)}"
688 )
689 return nodes
691 async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
692 """
693 Execute a raw command on the appropriate cluster node or target_nodes.
695 It will retry the command as specified by :attr:`cluster_error_retry_attempts` &
696 then raise an exception.
698 :param args:
699 | Raw command args
700 :param kwargs:
702 - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode`
703 or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`]
704 - Rest of the kwargs are passed to the Redis connection
706 :raises RedisClusterException: if target_nodes is not provided & the command
707 can't be mapped to a slot
708 """
709 command = args[0]
710 target_nodes = []
711 target_nodes_specified = False
712 retry_attempts = self.cluster_error_retry_attempts
714 passed_targets = kwargs.pop("target_nodes", None)
715 if passed_targets and not self._is_node_flag(passed_targets):
716 target_nodes = self._parse_target_nodes(passed_targets)
717 target_nodes_specified = True
718 retry_attempts = 0
720 # Add one for the first execution
721 execute_attempts = 1 + retry_attempts
722 for _ in range(execute_attempts):
723 if self._initialize:
724 await self.initialize()
725 if (
726 len(target_nodes) == 1
727 and target_nodes[0] == self.get_default_node()
728 ):
729 # Replace the default cluster node
730 self.replace_default_node()
731 try:
732 if not target_nodes_specified:
733 # Determine the nodes to execute the command on
734 target_nodes = await self._determine_nodes(
735 *args, node_flag=passed_targets
736 )
737 if not target_nodes:
738 raise RedisClusterException(
739 f"No targets were found to execute {args} command on"
740 )
742 if len(target_nodes) == 1:
743 # Return the processed result
744 ret = await self._execute_command(target_nodes[0], *args, **kwargs)
745 if command in self.result_callbacks:
746 return self.result_callbacks[command](
747 command, {target_nodes[0].name: ret}, **kwargs
748 )
749 return ret
750 else:
751 keys = [node.name for node in target_nodes]
752 values = await asyncio.gather(
753 *(
754 asyncio.create_task(
755 self._execute_command(node, *args, **kwargs)
756 )
757 for node in target_nodes
758 )
759 )
760 if command in self.result_callbacks:
761 return self.result_callbacks[command](
762 command, dict(zip(keys, values)), **kwargs
763 )
764 return dict(zip(keys, values))
765 except Exception as e:
766 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
767 # The nodes and slots cache were should be reinitialized.
768 # Try again with the new cluster setup.
769 retry_attempts -= 1
770 continue
771 else:
772 # raise the exception
773 raise e
775 async def _execute_command(
776 self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any
777 ) -> Any:
778 asking = moved = False
779 redirect_addr = None
780 ttl = self.RedisClusterRequestTTL
782 while ttl > 0:
783 ttl -= 1
784 try:
785 if asking:
786 target_node = self.get_node(node_name=redirect_addr)
787 await target_node.execute_command("ASKING")
788 asking = False
789 elif moved:
790 # MOVED occurred and the slots cache was updated,
791 # refresh the target node
792 slot = await self._determine_slot(*args)
793 target_node = self.nodes_manager.get_node_from_slot(
794 slot, self.read_from_replicas and args[0] in READ_COMMANDS
795 )
796 moved = False
798 return await target_node.execute_command(*args, **kwargs)
799 except (BusyLoadingError, MaxConnectionsError):
800 raise
801 except (ConnectionError, TimeoutError):
802 # Connection retries are being handled in the node's
803 # Retry object.
804 # Remove the failed node from the startup nodes before we try
805 # to reinitialize the cluster
806 self.nodes_manager.startup_nodes.pop(target_node.name, None)
807 # Hard force of reinitialize of the node/slots setup
808 # and try again with the new setup
809 await self.aclose()
810 raise
811 except ClusterDownError:
812 # ClusterDownError can occur during a failover and to get
813 # self-healed, we will try to reinitialize the cluster layout
814 # and retry executing the command
815 await self.aclose()
816 await asyncio.sleep(0.25)
817 raise
818 except MovedError as e:
819 # First, we will try to patch the slots/nodes cache with the
820 # redirected node output and try again. If MovedError exceeds
821 # 'reinitialize_steps' number of times, we will force
822 # reinitializing the tables, and then try again.
823 # 'reinitialize_steps' counter will increase faster when
824 # the same client object is shared between multiple threads. To
825 # reduce the frequency you can set this variable in the
826 # RedisCluster constructor.
827 self.reinitialize_counter += 1
828 if (
829 self.reinitialize_steps
830 and self.reinitialize_counter % self.reinitialize_steps == 0
831 ):
832 await self.aclose()
833 # Reset the counter
834 self.reinitialize_counter = 0
835 else:
836 self.nodes_manager._moved_exception = e
837 moved = True
838 except AskError as e:
839 redirect_addr = get_node_name(host=e.host, port=e.port)
840 asking = True
841 except TryAgainError:
842 if ttl < self.RedisClusterRequestTTL / 2:
843 await asyncio.sleep(0.05)
845 raise ClusterError("TTL exhausted.")
847 def pipeline(
848 self, transaction: Optional[Any] = None, shard_hint: Optional[Any] = None
849 ) -> "ClusterPipeline":
850 """
851 Create & return a new :class:`~.ClusterPipeline` object.
853 Cluster implementation of pipeline does not support transaction or shard_hint.
855 :raises RedisClusterException: if transaction or shard_hint are truthy values
856 """
857 if shard_hint:
858 raise RedisClusterException("shard_hint is deprecated in cluster mode")
860 if transaction:
861 raise RedisClusterException("transaction is deprecated in cluster mode")
863 return ClusterPipeline(self)
865 def lock(
866 self,
867 name: KeyT,
868 timeout: Optional[float] = None,
869 sleep: float = 0.1,
870 blocking: bool = True,
871 blocking_timeout: Optional[float] = None,
872 lock_class: Optional[Type[Lock]] = None,
873 thread_local: bool = True,
874 ) -> Lock:
875 """
876 Return a new Lock object using key ``name`` that mimics
877 the behavior of threading.Lock.
879 If specified, ``timeout`` indicates a maximum life for the lock.
880 By default, it will remain locked until release() is called.
882 ``sleep`` indicates the amount of time to sleep per loop iteration
883 when the lock is in blocking mode and another client is currently
884 holding the lock.
886 ``blocking`` indicates whether calling ``acquire`` should block until
887 the lock has been acquired or to fail immediately, causing ``acquire``
888 to return False and the lock not being acquired. Defaults to True.
889 Note this value can be overridden by passing a ``blocking``
890 argument to ``acquire``.
892 ``blocking_timeout`` indicates the maximum amount of time in seconds to
893 spend trying to acquire the lock. A value of ``None`` indicates
894 continue trying forever. ``blocking_timeout`` can be specified as a
895 float or integer, both representing the number of seconds to wait.
897 ``lock_class`` forces the specified lock implementation. Note that as
898 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
899 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
900 you have created your own custom lock class.
902 ``thread_local`` indicates whether the lock token is placed in
903 thread-local storage. By default, the token is placed in thread local
904 storage so that a thread only sees its token, not a token set by
905 another thread. Consider the following timeline:
907 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
908 thread-1 sets the token to "abc"
909 time: 1, thread-2 blocks trying to acquire `my-lock` using the
910 Lock instance.
911 time: 5, thread-1 has not yet completed. redis expires the lock
912 key.
913 time: 5, thread-2 acquired `my-lock` now that it's available.
914 thread-2 sets the token to "xyz"
915 time: 6, thread-1 finishes its work and calls release(). if the
916 token is *not* stored in thread local storage, then
917 thread-1 would see the token value as "xyz" and would be
918 able to successfully release the thread-2's lock.
920 In some use cases it's necessary to disable thread local storage. For
921 example, if you have code where one thread acquires a lock and passes
922 that lock instance to a worker thread to release later. If thread
923 local storage isn't disabled in this case, the worker thread won't see
924 the token set by the thread that acquired the lock. Our assumption
925 is that these cases aren't common and as such default to using
926 thread local storage."""
927 if lock_class is None:
928 lock_class = Lock
929 return lock_class(
930 self,
931 name,
932 timeout=timeout,
933 sleep=sleep,
934 blocking=blocking,
935 blocking_timeout=blocking_timeout,
936 thread_local=thread_local,
937 )
940class ClusterNode:
941 """
942 Create a new ClusterNode.
944 Each ClusterNode manages multiple :class:`~redis.asyncio.connection.Connection`
945 objects for the (host, port).
946 """
948 __slots__ = (
949 "_connections",
950 "_free",
951 "connection_class",
952 "connection_kwargs",
953 "host",
954 "max_connections",
955 "name",
956 "port",
957 "response_callbacks",
958 "server_type",
959 )
961 def __init__(
962 self,
963 host: str,
964 port: Union[str, int],
965 server_type: Optional[str] = None,
966 *,
967 max_connections: int = 2**31,
968 connection_class: Type[Connection] = Connection,
969 **connection_kwargs: Any,
970 ) -> None:
971 if host == "localhost":
972 host = socket.gethostbyname(host)
974 connection_kwargs["host"] = host
975 connection_kwargs["port"] = port
976 self.host = host
977 self.port = port
978 self.name = get_node_name(host, port)
979 self.server_type = server_type
981 self.max_connections = max_connections
982 self.connection_class = connection_class
983 self.connection_kwargs = connection_kwargs
984 self.response_callbacks = connection_kwargs.pop("response_callbacks", {})
986 self._connections: List[Connection] = []
987 self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)
989 def __repr__(self) -> str:
990 return (
991 f"[host={self.host}, port={self.port}, "
992 f"name={self.name}, server_type={self.server_type}]"
993 )
995 def __eq__(self, obj: Any) -> bool:
996 return isinstance(obj, ClusterNode) and obj.name == self.name
998 _DEL_MESSAGE = "Unclosed ClusterNode object"
1000 def __del__(
1001 self,
1002 _warn: Any = warnings.warn,
1003 _grl: Any = asyncio.get_running_loop,
1004 ) -> None:
1005 for connection in self._connections:
1006 if connection.is_connected:
1007 _warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self)
1009 try:
1010 context = {"client": self, "message": self._DEL_MESSAGE}
1011 _grl().call_exception_handler(context)
1012 except RuntimeError:
1013 pass
1014 break
1016 async def disconnect(self) -> None:
1017 ret = await asyncio.gather(
1018 *(
1019 asyncio.create_task(connection.disconnect())
1020 for connection in self._connections
1021 ),
1022 return_exceptions=True,
1023 )
1024 exc = next((res for res in ret if isinstance(res, Exception)), None)
1025 if exc:
1026 raise exc
1028 def acquire_connection(self) -> Connection:
1029 try:
1030 return self._free.popleft()
1031 except IndexError:
1032 if len(self._connections) < self.max_connections:
1033 connection = self.connection_class(**self.connection_kwargs)
1034 self._connections.append(connection)
1035 return connection
1037 raise MaxConnectionsError()
1039 async def parse_response(
1040 self, connection: Connection, command: str, **kwargs: Any
1041 ) -> Any:
1042 try:
1043 if NEVER_DECODE in kwargs:
1044 response = await connection.read_response(disable_decoding=True)
1045 kwargs.pop(NEVER_DECODE)
1046 else:
1047 response = await connection.read_response()
1048 except ResponseError:
1049 if EMPTY_RESPONSE in kwargs:
1050 return kwargs[EMPTY_RESPONSE]
1051 raise
1053 if EMPTY_RESPONSE in kwargs:
1054 kwargs.pop(EMPTY_RESPONSE)
1056 # Return response
1057 if command in self.response_callbacks:
1058 return self.response_callbacks[command](response, **kwargs)
1060 return response
1062 async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
1063 # Acquire connection
1064 connection = self.acquire_connection()
1065 keys = kwargs.pop("keys", None)
1067 response_from_cache = await connection._get_from_local_cache(args)
1068 if response_from_cache is not None:
1069 self._free.append(connection)
1070 return response_from_cache
1071 else:
1072 # Execute command
1073 await connection.send_packed_command(connection.pack_command(*args), False)
1075 # Read response
1076 try:
1077 response = await self.parse_response(connection, args[0], **kwargs)
1078 connection._add_to_local_cache(args, response, keys)
1079 return response
1080 finally:
1081 # Release connection
1082 self._free.append(connection)
1084 async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
1085 # Acquire connection
1086 connection = self.acquire_connection()
1088 # Execute command
1089 await connection.send_packed_command(
1090 connection.pack_commands(cmd.args for cmd in commands), False
1091 )
1093 # Read responses
1094 ret = False
1095 for cmd in commands:
1096 try:
1097 cmd.result = await self.parse_response(
1098 connection, cmd.args[0], **cmd.kwargs
1099 )
1100 except Exception as e:
1101 cmd.result = e
1102 ret = True
1104 # Release connection
1105 self._free.append(connection)
1107 return ret
1110class NodesManager:
1111 __slots__ = (
1112 "_moved_exception",
1113 "connection_kwargs",
1114 "default_node",
1115 "nodes_cache",
1116 "read_load_balancer",
1117 "require_full_coverage",
1118 "slots_cache",
1119 "startup_nodes",
1120 "address_remap",
1121 )
1123 def __init__(
1124 self,
1125 startup_nodes: List["ClusterNode"],
1126 require_full_coverage: bool,
1127 connection_kwargs: Dict[str, Any],
1128 address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None,
1129 ) -> None:
1130 self.startup_nodes = {node.name: node for node in startup_nodes}
1131 self.require_full_coverage = require_full_coverage
1132 self.connection_kwargs = connection_kwargs
1133 self.address_remap = address_remap
1135 self.default_node: "ClusterNode" = None
1136 self.nodes_cache: Dict[str, "ClusterNode"] = {}
1137 self.slots_cache: Dict[int, List["ClusterNode"]] = {}
1138 self.read_load_balancer = LoadBalancer()
1139 self._moved_exception: MovedError = None
1141 def get_node(
1142 self,
1143 host: Optional[str] = None,
1144 port: Optional[int] = None,
1145 node_name: Optional[str] = None,
1146 ) -> Optional["ClusterNode"]:
1147 if host and port:
1148 # the user passed host and port
1149 if host == "localhost":
1150 host = socket.gethostbyname(host)
1151 return self.nodes_cache.get(get_node_name(host=host, port=port))
1152 elif node_name:
1153 return self.nodes_cache.get(node_name)
1154 else:
1155 raise DataError(
1156 "get_node requires one of the following: "
1157 "1. node name "
1158 "2. host and port"
1159 )
1161 def set_nodes(
1162 self,
1163 old: Dict[str, "ClusterNode"],
1164 new: Dict[str, "ClusterNode"],
1165 remove_old: bool = False,
1166 ) -> None:
1167 if remove_old:
1168 for name in list(old.keys()):
1169 if name not in new:
1170 task = asyncio.create_task(old.pop(name).disconnect()) # noqa
1172 for name, node in new.items():
1173 if name in old:
1174 if old[name] is node:
1175 continue
1176 task = asyncio.create_task(old[name].disconnect()) # noqa
1177 old[name] = node
1179 def _update_moved_slots(self) -> None:
1180 e = self._moved_exception
1181 redirected_node = self.get_node(host=e.host, port=e.port)
1182 if redirected_node:
1183 # The node already exists
1184 if redirected_node.server_type != PRIMARY:
1185 # Update the node's server type
1186 redirected_node.server_type = PRIMARY
1187 else:
1188 # This is a new node, we will add it to the nodes cache
1189 redirected_node = ClusterNode(
1190 e.host, e.port, PRIMARY, **self.connection_kwargs
1191 )
1192 self.set_nodes(self.nodes_cache, {redirected_node.name: redirected_node})
1193 if redirected_node in self.slots_cache[e.slot_id]:
1194 # The MOVED error resulted from a failover, and the new slot owner
1195 # had previously been a replica.
1196 old_primary = self.slots_cache[e.slot_id][0]
1197 # Update the old primary to be a replica and add it to the end of
1198 # the slot's node list
1199 old_primary.server_type = REPLICA
1200 self.slots_cache[e.slot_id].append(old_primary)
1201 # Remove the old replica, which is now a primary, from the slot's
1202 # node list
1203 self.slots_cache[e.slot_id].remove(redirected_node)
1204 # Override the old primary with the new one
1205 self.slots_cache[e.slot_id][0] = redirected_node
1206 if self.default_node == old_primary:
1207 # Update the default node with the new primary
1208 self.default_node = redirected_node
1209 else:
1210 # The new slot owner is a new server, or a server from a different
1211 # shard. We need to remove all current nodes from the slot's list
1212 # (including replications) and add just the new node.
1213 self.slots_cache[e.slot_id] = [redirected_node]
1214 # Reset moved_exception
1215 self._moved_exception = None
1217 def get_node_from_slot(
1218 self, slot: int, read_from_replicas: bool = False
1219 ) -> "ClusterNode":
1220 if self._moved_exception:
1221 self._update_moved_slots()
1223 try:
1224 if read_from_replicas:
1225 # get the server index in a Round-Robin manner
1226 primary_name = self.slots_cache[slot][0].name
1227 node_idx = self.read_load_balancer.get_server_index(
1228 primary_name, len(self.slots_cache[slot])
1229 )
1230 return self.slots_cache[slot][node_idx]
1231 return self.slots_cache[slot][0]
1232 except (IndexError, TypeError):
1233 raise SlotNotCoveredError(
1234 f'Slot "{slot}" not covered by the cluster. '
1235 f'"require_full_coverage={self.require_full_coverage}"'
1236 )
1238 def get_nodes_by_server_type(self, server_type: str) -> List["ClusterNode"]:
1239 return [
1240 node
1241 for node in self.nodes_cache.values()
1242 if node.server_type == server_type
1243 ]
1245 async def initialize(self) -> None:
1246 self.read_load_balancer.reset()
1247 tmp_nodes_cache: Dict[str, "ClusterNode"] = {}
1248 tmp_slots: Dict[int, List["ClusterNode"]] = {}
1249 disagreements = []
1250 startup_nodes_reachable = False
1251 fully_covered = False
1252 exception = None
1253 for startup_node in self.startup_nodes.values():
1254 try:
1255 # Make sure cluster mode is enabled on this node
1256 try:
1257 cluster_slots = await startup_node.execute_command("CLUSTER SLOTS")
1258 except ResponseError:
1259 raise RedisClusterException(
1260 "Cluster mode is not enabled on this node"
1261 )
1262 startup_nodes_reachable = True
1263 except Exception as e:
1264 # Try the next startup node.
1265 # The exception is saved and raised only if we have no more nodes.
1266 exception = e
1267 continue
1269 # CLUSTER SLOTS command results in the following output:
1270 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
1271 # where each node contains the following list: [IP, port, node_id]
1272 # Therefore, cluster_slots[0][2][0] will be the IP address of the
1273 # primary node of the first slot section.
1274 # If there's only one server in the cluster, its ``host`` is ''
1275 # Fix it to the host in startup_nodes
1276 if (
1277 len(cluster_slots) == 1
1278 and not cluster_slots[0][2][0]
1279 and len(self.startup_nodes) == 1
1280 ):
1281 cluster_slots[0][2][0] = startup_node.host
1283 for slot in cluster_slots:
1284 for i in range(2, len(slot)):
1285 slot[i] = [str_if_bytes(val) for val in slot[i]]
1286 primary_node = slot[2]
1287 host = primary_node[0]
1288 if host == "":
1289 host = startup_node.host
1290 port = int(primary_node[1])
1291 host, port = self.remap_host_port(host, port)
1293 target_node = tmp_nodes_cache.get(get_node_name(host, port))
1294 if not target_node:
1295 target_node = ClusterNode(
1296 host, port, PRIMARY, **self.connection_kwargs
1297 )
1298 # add this node to the nodes cache
1299 tmp_nodes_cache[target_node.name] = target_node
1301 for i in range(int(slot[0]), int(slot[1]) + 1):
1302 if i not in tmp_slots:
1303 tmp_slots[i] = []
1304 tmp_slots[i].append(target_node)
1305 replica_nodes = [slot[j] for j in range(3, len(slot))]
1307 for replica_node in replica_nodes:
1308 host = replica_node[0]
1309 port = replica_node[1]
1310 host, port = self.remap_host_port(host, port)
1312 target_replica_node = tmp_nodes_cache.get(
1313 get_node_name(host, port)
1314 )
1315 if not target_replica_node:
1316 target_replica_node = ClusterNode(
1317 host, port, REPLICA, **self.connection_kwargs
1318 )
1319 tmp_slots[i].append(target_replica_node)
1320 # add this node to the nodes cache
1321 tmp_nodes_cache[target_replica_node.name] = (
1322 target_replica_node
1323 )
1324 else:
1325 # Validate that 2 nodes want to use the same slot cache
1326 # setup
1327 tmp_slot = tmp_slots[i][0]
1328 if tmp_slot.name != target_node.name:
1329 disagreements.append(
1330 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
1331 )
1333 if len(disagreements) > 5:
1334 raise RedisClusterException(
1335 f"startup_nodes could not agree on a valid "
1336 f'slots cache: {", ".join(disagreements)}'
1337 )
1339 # Validate if all slots are covered or if we should try next startup node
1340 fully_covered = True
1341 for i in range(REDIS_CLUSTER_HASH_SLOTS):
1342 if i not in tmp_slots:
1343 fully_covered = False
1344 break
1345 if fully_covered:
1346 break
1348 if not startup_nodes_reachable:
1349 raise RedisClusterException(
1350 f"Redis Cluster cannot be connected. Please provide at least "
1351 f"one reachable node: {str(exception)}"
1352 ) from exception
1354 # Check if the slots are not fully covered
1355 if not fully_covered and self.require_full_coverage:
1356 # Despite the requirement that the slots be covered, there
1357 # isn't a full coverage
1358 raise RedisClusterException(
1359 f"All slots are not covered after query all startup_nodes. "
1360 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
1361 f"covered..."
1362 )
1364 # Set the tmp variables to the real variables
1365 self.slots_cache = tmp_slots
1366 self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)
1367 # Populate the startup nodes with all discovered nodes
1368 self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
1370 # Set the default node
1371 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
1372 # If initialize was called after a MovedError, clear it
1373 self._moved_exception = None
1375 async def aclose(self, attr: str = "nodes_cache") -> None:
1376 self.default_node = None
1377 await asyncio.gather(
1378 *(
1379 asyncio.create_task(node.disconnect())
1380 for node in getattr(self, attr).values()
1381 )
1382 )
1384 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
1385 """
1386 Remap the host and port returned from the cluster to a different
1387 internal value. Useful if the client is not connecting directly
1388 to the cluster.
1389 """
1390 if self.address_remap:
1391 return self.address_remap((host, port))
1392 return host, port
1395class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands):
1396 """
1397 Create a new ClusterPipeline object.
1399 Usage::
1401 result = await (
1402 rc.pipeline()
1403 .set("A", 1)
1404 .get("A")
1405 .hset("K", "F", "V")
1406 .hgetall("K")
1407 .mset_nonatomic({"A": 2, "B": 3})
1408 .get("A")
1409 .get("B")
1410 .delete("A", "B", "K")
1411 .execute()
1412 )
1413 # result = [True, "1", 1, {"F": "V"}, True, True, "2", "3", 1, 1, 1]
1415 Note: For commands `DELETE`, `EXISTS`, `TOUCH`, `UNLINK`, `mset_nonatomic`, which
1416 are split across multiple nodes, you'll get multiple results for them in the array.
1418 Retryable errors:
1419 - :class:`~.ClusterDownError`
1420 - :class:`~.ConnectionError`
1421 - :class:`~.TimeoutError`
1423 Redirection errors:
1424 - :class:`~.TryAgainError`
1425 - :class:`~.MovedError`
1426 - :class:`~.AskError`
1428 :param client:
1429 | Existing :class:`~.RedisCluster` client
1430 """
1432 __slots__ = ("_command_stack", "_client")
1434 def __init__(self, client: RedisCluster) -> None:
1435 self._client = client
1437 self._command_stack: List["PipelineCommand"] = []
1439 async def initialize(self) -> "ClusterPipeline":
1440 if self._client._initialize:
1441 await self._client.initialize()
1442 self._command_stack = []
1443 return self
1445 async def __aenter__(self) -> "ClusterPipeline":
1446 return await self.initialize()
1448 async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> None:
1449 self._command_stack = []
1451 def __await__(self) -> Generator[Any, None, "ClusterPipeline"]:
1452 return self.initialize().__await__()
1454 def __enter__(self) -> "ClusterPipeline":
1455 self._command_stack = []
1456 return self
1458 def __exit__(self, exc_type: None, exc_value: None, traceback: None) -> None:
1459 self._command_stack = []
1461 def __bool__(self) -> bool:
1462 "Pipeline instances should always evaluate to True on Python 3+"
1463 return True
1465 def __len__(self) -> int:
1466 return len(self._command_stack)
1468 def execute_command(
1469 self, *args: Union[KeyT, EncodableT], **kwargs: Any
1470 ) -> "ClusterPipeline":
1471 """
1472 Append a raw command to the pipeline.
1474 :param args:
1475 | Raw command args
1476 :param kwargs:
1478 - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode`
1479 or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`]
1480 - Rest of the kwargs are passed to the Redis connection
1481 """
1482 kwargs.pop("keys", None) # the keys are used only for client side caching
1483 self._command_stack.append(
1484 PipelineCommand(len(self._command_stack), *args, **kwargs)
1485 )
1486 return self
1488 async def execute(
1489 self, raise_on_error: bool = True, allow_redirections: bool = True
1490 ) -> List[Any]:
1491 """
1492 Execute the pipeline.
1494 It will retry the commands as specified by :attr:`cluster_error_retry_attempts`
1495 & then raise an exception.
1497 :param raise_on_error:
1498 | Raise the first error if there are any errors
1499 :param allow_redirections:
1500 | Whether to retry each failed command individually in case of redirection
1501 errors
1503 :raises RedisClusterException: if target_nodes is not provided & the command
1504 can't be mapped to a slot
1505 """
1506 if not self._command_stack:
1507 return []
1509 try:
1510 for _ in range(self._client.cluster_error_retry_attempts):
1511 if self._client._initialize:
1512 await self._client.initialize()
1514 try:
1515 return await self._execute(
1516 self._client,
1517 self._command_stack,
1518 raise_on_error=raise_on_error,
1519 allow_redirections=allow_redirections,
1520 )
1521 except BaseException as e:
1522 if type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1523 # Try again with the new cluster setup.
1524 exception = e
1525 await self._client.aclose()
1526 await asyncio.sleep(0.25)
1527 else:
1528 # All other errors should be raised.
1529 raise
1531 # If it fails the configured number of times then raise an exception
1532 raise exception
1533 finally:
1534 self._command_stack = []
1536 async def _execute(
1537 self,
1538 client: "RedisCluster",
1539 stack: List["PipelineCommand"],
1540 raise_on_error: bool = True,
1541 allow_redirections: bool = True,
1542 ) -> List[Any]:
1543 todo = [
1544 cmd for cmd in stack if not cmd.result or isinstance(cmd.result, Exception)
1545 ]
1547 nodes = {}
1548 for cmd in todo:
1549 passed_targets = cmd.kwargs.pop("target_nodes", None)
1550 if passed_targets and not client._is_node_flag(passed_targets):
1551 target_nodes = client._parse_target_nodes(passed_targets)
1552 else:
1553 target_nodes = await client._determine_nodes(
1554 *cmd.args, node_flag=passed_targets
1555 )
1556 if not target_nodes:
1557 raise RedisClusterException(
1558 f"No targets were found to execute {cmd.args} command on"
1559 )
1560 if len(target_nodes) > 1:
1561 raise RedisClusterException(f"Too many targets for command {cmd.args}")
1562 node = target_nodes[0]
1563 if node.name not in nodes:
1564 nodes[node.name] = (node, [])
1565 nodes[node.name][1].append(cmd)
1567 errors = await asyncio.gather(
1568 *(
1569 asyncio.create_task(node[0].execute_pipeline(node[1]))
1570 for node in nodes.values()
1571 )
1572 )
1574 if any(errors):
1575 if allow_redirections:
1576 # send each errored command individually
1577 for cmd in todo:
1578 if isinstance(cmd.result, (TryAgainError, MovedError, AskError)):
1579 try:
1580 cmd.result = await client.execute_command(
1581 *cmd.args, **cmd.kwargs
1582 )
1583 except Exception as e:
1584 cmd.result = e
1586 if raise_on_error:
1587 for cmd in todo:
1588 result = cmd.result
1589 if isinstance(result, Exception):
1590 command = " ".join(map(safe_str, cmd.args))
1591 msg = (
1592 f"Command # {cmd.position + 1} ({command}) of pipeline "
1593 f"caused error: {result.args}"
1594 )
1595 result.args = (msg,) + result.args[1:]
1596 raise result
1598 default_node = nodes.get(client.get_default_node().name)
1599 if default_node is not None:
1600 # This pipeline execution used the default node, check if we need
1601 # to replace it.
1602 # Note: when the error is raised we'll reset the default node in the
1603 # caller function.
1604 for cmd in default_node[1]:
1605 # Check if it has a command that failed with a relevant
1606 # exception
1607 if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY:
1608 client.replace_default_node()
1609 break
1611 return [cmd.result for cmd in stack]
1613 def _split_command_across_slots(
1614 self, command: str, *keys: KeyT
1615 ) -> "ClusterPipeline":
1616 for slot_keys in self._client._partition_keys_by_slot(keys).values():
1617 self.execute_command(command, *slot_keys)
1619 return self
1621 def mset_nonatomic(
1622 self, mapping: Mapping[AnyKeyT, EncodableT]
1623 ) -> "ClusterPipeline":
1624 encoder = self._client.encoder
1626 slots_pairs = {}
1627 for pair in mapping.items():
1628 slot = key_slot(encoder.encode(pair[0]))
1629 slots_pairs.setdefault(slot, []).extend(pair)
1631 for pairs in slots_pairs.values():
1632 self.execute_command("MSET", *pairs)
1634 return self
1637for command in PIPELINE_BLOCKED_COMMANDS:
1638 command = command.replace(" ", "_").lower()
1639 if command == "mset_nonatomic":
1640 continue
1642 setattr(ClusterPipeline, command, block_pipeline_command(command))
1645class PipelineCommand:
1646 def __init__(self, position: int, *args: Any, **kwargs: Any) -> None:
1647 self.args = args
1648 self.kwargs = kwargs
1649 self.position = position
1650 self.result: Union[Any, Exception] = None
1652 def __repr__(self) -> str:
1653 return f"[{self.position}] {self.args} ({self.kwargs})"