Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2import random
3import socket
4import sys
5import threading
6import time
7from abc import ABC, abstractmethod
8from collections import OrderedDict
9from copy import copy
10from enum import Enum
11from itertools import chain
12from typing import (
13 Any,
14 Callable,
15 Dict,
16 List,
17 Literal,
18 Optional,
19 Set,
20 Tuple,
21 Union,
22)
24from redis._parsers import CommandsParser, Encoder
25from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy
26from redis._parsers.helpers import parse_scan
27from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
28from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
29from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
30from redis.commands import READ_COMMANDS, RedisClusterCommands
31from redis.commands.helpers import list_or_args
32from redis.commands.policies import PolicyResolver, StaticPolicyResolver
33from redis.connection import (
34 Connection,
35 ConnectionPool,
36 parse_url,
37)
38from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
39from redis.event import (
40 AfterPooledConnectionsInstantiationEvent,
41 AfterPubSubConnectionInstantiationEvent,
42 ClientType,
43 EventDispatcher,
44)
45from redis.exceptions import (
46 AskError,
47 AuthenticationError,
48 ClusterDownError,
49 ClusterError,
50 ConnectionError,
51 CrossSlotTransactionError,
52 DataError,
53 ExecAbortError,
54 InvalidPipelineStack,
55 MaxConnectionsError,
56 MovedError,
57 RedisClusterException,
58 RedisError,
59 ResponseError,
60 SlotNotCoveredError,
61 TimeoutError,
62 TryAgainError,
63 WatchError,
64)
65from redis.lock import Lock
66from redis.maint_notifications import (
67 MaintNotificationsConfig,
68 OSSMaintNotificationsHandler,
69)
70from redis.observability.recorder import (
71 record_error_count,
72 record_operation_duration,
73)
74from redis.retry import Retry
75from redis.utils import (
76 check_protocol_version,
77 deprecated_args,
78 deprecated_function,
79 dict_merge,
80 list_keys_to_dict,
81 merge_result,
82 safe_str,
83 str_if_bytes,
84 truncate_text,
85)
87logger = logging.getLogger(__name__)
90def is_debug_log_enabled():
91 return logger.isEnabledFor(logging.DEBUG)
94def get_node_name(host: str, port: Union[str, int]) -> str:
95 return f"{host}:{port}"
98@deprecated_args(
99 allowed_args=["redis_node"],
100 reason="Use get_connection(redis_node) instead",
101 version="5.3.0",
102)
103def get_connection(redis_node: Redis, *args, **options) -> Connection:
104 return redis_node.connection or redis_node.connection_pool.get_connection()
107def parse_scan_result(command, res, **options):
108 cursors = {}
109 ret = []
110 for node_name, response in res.items():
111 cursor, r = parse_scan(response, **options)
112 cursors[node_name] = cursor
113 ret += r
115 return cursors, ret
118def parse_pubsub_numsub(command, res, **options):
119 numsub_d = OrderedDict()
120 for numsub_tups in res.values():
121 for channel, numsubbed in numsub_tups:
122 try:
123 numsub_d[channel] += numsubbed
124 except KeyError:
125 numsub_d[channel] = numsubbed
127 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
128 return ret_numsub
131def parse_cluster_slots(
132 resp: Any, **options: Any
133) -> Dict[Tuple[int, int], Dict[str, Any]]:
134 current_host = options.get("current_host", "")
136 def fix_server(*args: Any) -> Tuple[str, Any]:
137 return str_if_bytes(args[0]) or current_host, args[1]
139 slots = {}
140 for slot in resp:
141 start, end, primary = slot[:3]
142 replicas = slot[3:]
143 slots[start, end] = {
144 "primary": fix_server(*primary),
145 "replicas": [fix_server(*replica) for replica in replicas],
146 }
148 return slots
151def parse_cluster_shards(resp, **options):
152 """
153 Parse CLUSTER SHARDS response.
154 """
155 if isinstance(resp[0], dict):
156 return resp
157 shards = []
158 for x in resp:
159 shard = {"slots": [], "nodes": []}
160 for i in range(0, len(x[1]), 2):
161 shard["slots"].append((x[1][i], (x[1][i + 1])))
162 nodes = x[3]
163 for node in nodes:
164 dict_node = {}
165 for i in range(0, len(node), 2):
166 dict_node[node[i]] = node[i + 1]
167 shard["nodes"].append(dict_node)
168 shards.append(shard)
170 return shards
173def parse_cluster_myshardid(resp, **options):
174 """
175 Parse CLUSTER MYSHARDID response.
176 """
177 return resp.decode("utf-8")
180PRIMARY = "primary"
181REPLICA = "replica"
182SLOT_ID = "slot-id"
184REDIS_ALLOWED_KEYS = (
185 "connection_class",
186 "connection_pool",
187 "connection_pool_class",
188 "client_name",
189 "credential_provider",
190 "db",
191 "decode_responses",
192 "encoding",
193 "encoding_errors",
194 "host",
195 "lib_name",
196 "lib_version",
197 "max_connections",
198 "nodes_flag",
199 "redis_connect_func",
200 "password",
201 "port",
202 "timeout",
203 "queue_class",
204 "retry",
205 "retry_on_timeout",
206 "protocol",
207 "socket_connect_timeout",
208 "socket_keepalive",
209 "socket_keepalive_options",
210 "socket_timeout",
211 "ssl",
212 "ssl_ca_certs",
213 "ssl_ca_data",
214 "ssl_ca_path",
215 "ssl_certfile",
216 "ssl_cert_reqs",
217 "ssl_include_verify_flags",
218 "ssl_exclude_verify_flags",
219 "ssl_keyfile",
220 "ssl_password",
221 "ssl_check_hostname",
222 "unix_socket_path",
223 "username",
224 "cache",
225 "cache_config",
226 "maint_notifications_config",
227)
228KWARGS_DISABLED_KEYS = ("host", "port", "retry")
231def cleanup_kwargs(**kwargs):
232 """
233 Remove unsupported or disabled keys from kwargs
234 """
235 connection_kwargs = {
236 k: v
237 for k, v in kwargs.items()
238 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
239 }
241 return connection_kwargs
244class MaintNotificationsAbstractRedisCluster:
245 """
246 Abstract class for handling maintenance notifications logic.
247 This class is expected to be used as base class together with RedisCluster.
249 This class is intended to be used with multiple inheritance!
251 All logic related to maintenance notifications is encapsulated in this class.
252 """
254 def __init__(
255 self,
256 maint_notifications_config: Optional[MaintNotificationsConfig],
257 **kwargs,
258 ):
259 # Initialize maintenance notifications
260 is_protocol_supported = check_protocol_version(kwargs.get("protocol"), 3)
262 if (
263 maint_notifications_config
264 and maint_notifications_config.enabled
265 and not is_protocol_supported
266 ):
267 raise RedisError(
268 "Maintenance notifications handlers on connection are only supported with RESP version 3"
269 )
270 if maint_notifications_config is None and is_protocol_supported:
271 maint_notifications_config = MaintNotificationsConfig()
273 self.maint_notifications_config = maint_notifications_config
275 if self.maint_notifications_config and self.maint_notifications_config.enabled:
276 self._oss_cluster_maint_notifications_handler = (
277 OSSMaintNotificationsHandler(self, self.maint_notifications_config)
278 )
279 # Update connection kwargs for all future nodes connections
280 self._update_connection_kwargs_for_maint_notifications(
281 self._oss_cluster_maint_notifications_handler
282 )
283 # Update existing nodes connections - they are created as part of the RedisCluster constructor
284 for node in self.get_nodes():
285 if node.redis_connection is None:
286 continue
287 node.redis_connection.connection_pool.update_maint_notifications_config(
288 self.maint_notifications_config,
289 oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler,
290 )
291 else:
292 self._oss_cluster_maint_notifications_handler = None
294 def _update_connection_kwargs_for_maint_notifications(
295 self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler
296 ):
297 """
298 Update the connection kwargs for all future connections.
299 """
300 self.nodes_manager.connection_kwargs.update(
301 {
302 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
303 }
304 )
307class AbstractRedisCluster:
308 RedisClusterRequestTTL = 16
310 PRIMARIES = "primaries"
311 REPLICAS = "replicas"
312 ALL_NODES = "all"
313 RANDOM = "random"
314 DEFAULT_NODE = "default-node"
316 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
318 COMMAND_FLAGS = dict_merge(
319 list_keys_to_dict(
320 [
321 "ACL CAT",
322 "ACL DELUSER",
323 "ACL DRYRUN",
324 "ACL GENPASS",
325 "ACL GETUSER",
326 "ACL HELP",
327 "ACL LIST",
328 "ACL LOG",
329 "ACL LOAD",
330 "ACL SAVE",
331 "ACL SETUSER",
332 "ACL USERS",
333 "ACL WHOAMI",
334 "AUTH",
335 "CLIENT LIST",
336 "CLIENT SETINFO",
337 "CLIENT SETNAME",
338 "CLIENT GETNAME",
339 "CONFIG SET",
340 "CONFIG REWRITE",
341 "CONFIG RESETSTAT",
342 "TIME",
343 "PUBSUB CHANNELS",
344 "PUBSUB NUMPAT",
345 "PUBSUB NUMSUB",
346 "PUBSUB SHARDCHANNELS",
347 "PUBSUB SHARDNUMSUB",
348 "PING",
349 "INFO",
350 "SHUTDOWN",
351 "KEYS",
352 "DBSIZE",
353 "BGSAVE",
354 "SLOWLOG GET",
355 "SLOWLOG LEN",
356 "SLOWLOG RESET",
357 "WAIT",
358 "WAITAOF",
359 "SAVE",
360 "MEMORY PURGE",
361 "MEMORY MALLOC-STATS",
362 "MEMORY STATS",
363 "LASTSAVE",
364 "CLIENT TRACKINGINFO",
365 "CLIENT PAUSE",
366 "CLIENT UNPAUSE",
367 "CLIENT UNBLOCK",
368 "CLIENT ID",
369 "CLIENT REPLY",
370 "CLIENT GETREDIR",
371 "CLIENT INFO",
372 "CLIENT KILL",
373 "READONLY",
374 "CLUSTER INFO",
375 "CLUSTER MEET",
376 "CLUSTER MYSHARDID",
377 "CLUSTER NODES",
378 "CLUSTER REPLICAS",
379 "CLUSTER RESET",
380 "CLUSTER SET-CONFIG-EPOCH",
381 "CLUSTER SLOTS",
382 "CLUSTER SHARDS",
383 "CLUSTER COUNT-FAILURE-REPORTS",
384 "CLUSTER KEYSLOT",
385 "COMMAND",
386 "COMMAND COUNT",
387 "COMMAND LIST",
388 "COMMAND GETKEYS",
389 "CONFIG GET",
390 "DEBUG",
391 "RANDOMKEY",
392 "READONLY",
393 "READWRITE",
394 "TIME",
395 "TFUNCTION LOAD",
396 "TFUNCTION DELETE",
397 "TFUNCTION LIST",
398 "TFCALL",
399 "TFCALLASYNC",
400 "LATENCY HISTORY",
401 "LATENCY LATEST",
402 "LATENCY RESET",
403 "MODULE LIST",
404 "MODULE LOAD",
405 "MODULE UNLOAD",
406 "MODULE LOADEX",
407 ],
408 DEFAULT_NODE,
409 ),
410 list_keys_to_dict(
411 [
412 "FLUSHALL",
413 "FLUSHDB",
414 "FUNCTION DELETE",
415 "FUNCTION FLUSH",
416 "FUNCTION LIST",
417 "FUNCTION LOAD",
418 "FUNCTION RESTORE",
419 "SCAN",
420 "SCRIPT EXISTS",
421 "SCRIPT FLUSH",
422 "SCRIPT LOAD",
423 ],
424 PRIMARIES,
425 ),
426 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
427 list_keys_to_dict(
428 [
429 "CLUSTER COUNTKEYSINSLOT",
430 "CLUSTER DELSLOTS",
431 "CLUSTER DELSLOTSRANGE",
432 "CLUSTER GETKEYSINSLOT",
433 "CLUSTER SETSLOT",
434 ],
435 SLOT_ID,
436 ),
437 )
439 SEARCH_COMMANDS = (
440 [
441 "FT.CREATE",
442 "FT.SEARCH",
443 "FT.AGGREGATE",
444 "FT.EXPLAIN",
445 "FT.EXPLAINCLI",
446 "FT,PROFILE",
447 "FT.ALTER",
448 "FT.DROPINDEX",
449 "FT.ALIASADD",
450 "FT.ALIASUPDATE",
451 "FT.ALIASDEL",
452 "FT.TAGVALS",
453 "FT.SUGADD",
454 "FT.SUGGET",
455 "FT.SUGDEL",
456 "FT.SUGLEN",
457 "FT.SYNUPDATE",
458 "FT.SYNDUMP",
459 "FT.SPELLCHECK",
460 "FT.DICTADD",
461 "FT.DICTDEL",
462 "FT.DICTDUMP",
463 "FT.INFO",
464 "FT._LIST",
465 "FT.CONFIG",
466 "FT.ADD",
467 "FT.DEL",
468 "FT.DROP",
469 "FT.GET",
470 "FT.MGET",
471 "FT.SYNADD",
472 ],
473 )
475 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
476 "CLUSTER SLOTS": parse_cluster_slots,
477 "CLUSTER SHARDS": parse_cluster_shards,
478 "CLUSTER MYSHARDID": parse_cluster_myshardid,
479 }
481 RESULT_CALLBACKS = dict_merge(
482 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
483 list_keys_to_dict(
484 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
485 ),
486 list_keys_to_dict(
487 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
488 ),
489 list_keys_to_dict(
490 [
491 "PING",
492 "CONFIG SET",
493 "CONFIG REWRITE",
494 "CONFIG RESETSTAT",
495 "CLIENT SETNAME",
496 "BGSAVE",
497 "SLOWLOG RESET",
498 "SAVE",
499 "MEMORY PURGE",
500 "CLIENT PAUSE",
501 "CLIENT UNPAUSE",
502 ],
503 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
504 ),
505 list_keys_to_dict(
506 ["DBSIZE", "WAIT"],
507 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
508 ),
509 list_keys_to_dict(
510 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
511 ),
512 list_keys_to_dict(["SCAN"], parse_scan_result),
513 list_keys_to_dict(
514 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
515 ),
516 list_keys_to_dict(
517 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
518 ),
519 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
520 )
522 ERRORS_ALLOW_RETRY = (
523 ConnectionError,
524 TimeoutError,
525 ClusterDownError,
526 SlotNotCoveredError,
527 )
529 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
530 """Replace the default cluster node.
531 A random cluster node will be chosen if target_node isn't passed, and primaries
532 will be prioritized. The default node will not be changed if there are no other
533 nodes in the cluster.
535 Args:
536 target_node (ClusterNode, optional): Target node to replace the default
537 node. Defaults to None.
538 """
539 if target_node:
540 self.nodes_manager.default_node = target_node
541 else:
542 curr_node = self.get_default_node()
543 primaries = [node for node in self.get_primaries() if node != curr_node]
544 if primaries:
545 # Choose a primary if the cluster contains different primaries
546 self.nodes_manager.default_node = random.choice(primaries)
547 else:
548 # Otherwise, choose a primary if the cluster contains different primaries
549 replicas = [node for node in self.get_replicas() if node != curr_node]
550 if replicas:
551 self.nodes_manager.default_node = random.choice(replicas)
554class RedisCluster(
555 AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands
556):
557 @classmethod
558 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
559 """
560 Return a Redis client object configured from the given URL
562 For example::
564 redis://[[username]:[password]]@localhost:6379/0
565 rediss://[[username]:[password]]@localhost:6379/0
566 unix://[username@]/path/to/socket.sock?db=0[&password=password]
568 Three URL schemes are supported:
570 - `redis://` creates a TCP socket connection. See more at:
571 <https://www.iana.org/assignments/uri-schemes/prov/redis>
572 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
573 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
574 - ``unix://``: creates a Unix Domain Socket connection.
576 The username, password, hostname, path and all querystring values
577 are passed through urllib.parse.unquote in order to replace any
578 percent-encoded values with their corresponding characters.
580 There are several ways to specify a database number. The first value
581 found will be used:
583 1. A ``db`` querystring option, e.g. redis://localhost?db=0
584 2. If using the redis:// or rediss:// schemes, the path argument
585 of the url, e.g. redis://localhost/0
586 3. A ``db`` keyword argument to this function.
588 If none of these options are specified, the default db=0 is used.
590 All querystring options are cast to their appropriate Python types.
591 Boolean arguments can be specified with string values "True"/"False"
592 or "Yes"/"No". Values that cannot be properly cast cause a
593 ``ValueError`` to be raised. Once parsed, the querystring arguments
594 and keyword arguments are passed to the ``ConnectionPool``'s
595 class initializer. In the case of conflicting arguments, querystring
596 arguments always win.
598 """
599 return cls(url=url, **kwargs)
601 @deprecated_args(
602 args_to_warn=["read_from_replicas"],
603 reason="Please configure the 'load_balancing_strategy' instead",
604 version="5.3.0",
605 )
606 @deprecated_args(
607 args_to_warn=[
608 "cluster_error_retry_attempts",
609 ],
610 reason="Please configure the 'retry' object instead",
611 version="6.0.0",
612 )
613 def __init__(
614 self,
615 host: Optional[str] = None,
616 port: int = 6379,
617 startup_nodes: Optional[List["ClusterNode"]] = None,
618 cluster_error_retry_attempts: int = 3,
619 retry: Optional["Retry"] = None,
620 require_full_coverage: bool = True,
621 reinitialize_steps: int = 5,
622 read_from_replicas: bool = False,
623 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
624 dynamic_startup_nodes: bool = True,
625 url: Optional[str] = None,
626 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
627 cache: Optional[CacheInterface] = None,
628 cache_config: Optional[CacheConfig] = None,
629 event_dispatcher: Optional[EventDispatcher] = None,
630 policy_resolver: PolicyResolver = StaticPolicyResolver(),
631 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
632 **kwargs,
633 ):
634 """
635 Initialize a new RedisCluster client.
637 :param startup_nodes:
638 List of nodes from which initial bootstrapping can be done
639 :param host:
640 Can be used to point to a startup node
641 :param port:
642 Can be used to point to a startup node
643 :param require_full_coverage:
644 When set to False (default value): the client will not require a
645 full coverage of the slots. However, if not all slots are covered,
646 and at least one node has 'cluster-require-full-coverage' set to
647 'yes,' the server will throw a ClusterDownError for some key-based
648 commands. See -
649 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
650 When set to True: all slots must be covered to construct the
651 cluster client. If not all slots are covered, RedisClusterException
652 will be thrown.
653 :param read_from_replicas:
654 @deprecated - please use load_balancing_strategy instead
655 Enable read from replicas in READONLY mode. You can read possibly
656 stale data.
657 When set to true, read commands will be assigned between the
658 primary and its replications in a Round-Robin manner.
659 :param load_balancing_strategy:
660 Enable read from replicas in READONLY mode and defines the load balancing
661 strategy that will be used for cluster node selection.
662 The data read from replicas is eventually consistent with the data in primary nodes.
663 :param dynamic_startup_nodes:
664 Set the RedisCluster's startup nodes to all of the discovered nodes.
665 If true (default value), the cluster's discovered nodes will be used to
666 determine the cluster nodes-slots mapping in the next topology refresh.
667 It will remove the initial passed startup nodes if their endpoints aren't
668 listed in the CLUSTER SLOTS output.
669 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
670 specific IP addresses, it is best to set it to false.
671 :param cluster_error_retry_attempts:
672 @deprecated - Please configure the 'retry' object instead
673 In case 'retry' object is set - this argument is ignored!
675 Number of times to retry before raising an error when
676 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
677 :class:`~.ClusterDownError` are encountered
678 :param retry:
679 A retry object that defines the retry strategy and the number of
680 retries for the cluster client.
681 In current implementation for the cluster client (starting form redis-py version 6.0.0)
682 the retry object is not yet fully utilized, instead it is used just to determine
683 the number of retries for the cluster client.
684 In the future releases the retry object will be used to handle the cluster client retries!
685 :param reinitialize_steps:
686 Specifies the number of MOVED errors that need to occur before
687 reinitializing the whole cluster topology. If a MOVED error occurs
688 and the cluster does not need to be reinitialized on this current
689 error handling, only the MOVED slot will be patched with the
690 redirected node.
691 To reinitialize the cluster on every MOVED error, set
692 reinitialize_steps to 1.
693 To avoid reinitializing the cluster on moved errors, set
694 reinitialize_steps to 0.
695 :param address_remap:
696 An optional callable which, when provided with an internal network
697 address of a node, e.g. a `(host, port)` tuple, will return the address
698 where the node is reachable. This can be used to map the addresses at
699 which the nodes _think_ they are, to addresses at which a client may
700 reach them, such as when they sit behind a proxy.
702 :param maint_notifications_config:
703 Configures the nodes connections to support maintenance notifications - see
704 `redis.maint_notifications.MaintNotificationsConfig` for details.
705 Only supported with RESP3.
706 If not provided and protocol is RESP3, the maintenance notifications
707 will be enabled by default (logic is included in the NodesManager
708 initialization).
709 :**kwargs:
710 Extra arguments that will be sent into Redis instance when created
711 (See Official redis-py doc for supported kwargs - the only limitation
712 is that you can't provide 'retry' object as part of kwargs.
713 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
714 Some kwargs are not supported and will raise a
715 RedisClusterException:
716 - db (Redis do not support database SELECT in cluster mode)
718 """
719 if startup_nodes is None:
720 startup_nodes = []
722 if "db" in kwargs:
723 # Argument 'db' is not possible to use in cluster mode
724 raise RedisClusterException(
725 "Argument 'db' is not possible to use in cluster mode"
726 )
728 if "retry" in kwargs:
729 # Argument 'retry' is not possible to be used in kwargs when in cluster mode
730 # the kwargs are set to the lower level connections to the cluster nodes
731 # and there we provide retry configuration without retries allowed.
732 # The retries should be handled on cluster client level.
733 raise RedisClusterException(
734 "The 'retry' argument cannot be used in kwargs when running in cluster mode."
735 )
737 # Get the startup node/s
738 from_url = False
739 if url is not None:
740 from_url = True
741 url_options = parse_url(url)
742 if "path" in url_options:
743 raise RedisClusterException(
744 "RedisCluster does not currently support Unix Domain "
745 "Socket connections"
746 )
747 if "db" in url_options and url_options["db"] != 0:
748 # Argument 'db' is not possible to use in cluster mode
749 raise RedisClusterException(
750 "A ``db`` querystring option can only be 0 in cluster mode"
751 )
752 kwargs.update(url_options)
753 host = kwargs.get("host")
754 port = kwargs.get("port", port)
755 startup_nodes.append(ClusterNode(host, port))
756 elif host is not None and port is not None:
757 startup_nodes.append(ClusterNode(host, port))
758 elif len(startup_nodes) == 0:
759 # No startup node was provided
760 raise RedisClusterException(
761 "RedisCluster requires at least one node to discover the "
762 "cluster. Please provide one of the followings:\n"
763 "1. host and port, for example:\n"
764 " RedisCluster(host='localhost', port=6379)\n"
765 "2. list of startup nodes, for example:\n"
766 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
767 " ClusterNode('localhost', 6378)])"
768 )
769 # Update the connection arguments
770 # Whenever a new connection is established, RedisCluster's on_connect
771 # method should be run
772 # If the user passed on_connect function we'll save it and run it
773 # inside the RedisCluster.on_connect() function
774 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
775 kwargs.update({"redis_connect_func": self.on_connect})
776 kwargs = cleanup_kwargs(**kwargs)
777 if retry:
778 self.retry = retry
779 else:
780 self.retry = Retry(
781 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
782 retries=cluster_error_retry_attempts,
783 )
785 self.encoder = Encoder(
786 kwargs.get("encoding", "utf-8"),
787 kwargs.get("encoding_errors", "strict"),
788 kwargs.get("decode_responses", False),
789 )
790 protocol = kwargs.get("protocol", None)
791 if (cache_config or cache) and not check_protocol_version(protocol, 3):
792 raise RedisError("Client caching is only supported with RESP version 3")
794 if maint_notifications_config and not check_protocol_version(protocol, 3):
795 raise RedisError(
796 "Maintenance notifications are only supported with RESP version 3"
797 )
798 if check_protocol_version(protocol, 3) and maint_notifications_config is None:
799 maint_notifications_config = MaintNotificationsConfig()
801 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
802 self.node_flags = self.__class__.NODE_FLAGS.copy()
803 self.read_from_replicas = read_from_replicas
804 self.load_balancing_strategy = load_balancing_strategy
805 self.reinitialize_counter = 0
806 self.reinitialize_steps = reinitialize_steps
807 if event_dispatcher is None:
808 self._event_dispatcher = EventDispatcher()
809 else:
810 self._event_dispatcher = event_dispatcher
811 self.startup_nodes = startup_nodes
813 self.nodes_manager = NodesManager(
814 startup_nodes=startup_nodes,
815 from_url=from_url,
816 require_full_coverage=require_full_coverage,
817 dynamic_startup_nodes=dynamic_startup_nodes,
818 address_remap=address_remap,
819 cache=cache,
820 cache_config=cache_config,
821 event_dispatcher=self._event_dispatcher,
822 maint_notifications_config=maint_notifications_config,
823 **kwargs,
824 )
826 self.cluster_response_callbacks = CaseInsensitiveDict(
827 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
828 )
829 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
831 # For backward compatibility, mapping from existing policies to new one
832 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
833 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
834 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
835 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
836 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
837 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
838 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
839 }
841 self._policies_callback_mapping: dict[
842 Union[RequestPolicy, ResponsePolicy], Callable
843 ] = {
844 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
845 self.get_random_primary_or_all_nodes(command_name)
846 ],
847 RequestPolicy.DEFAULT_KEYED: lambda command,
848 *args: self.get_nodes_from_slot(command, *args),
849 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
850 RequestPolicy.ALL_SHARDS: self.get_primaries,
851 RequestPolicy.ALL_NODES: self.get_nodes,
852 RequestPolicy.ALL_REPLICAS: self.get_replicas,
853 RequestPolicy.MULTI_SHARD: lambda *args,
854 **kwargs: self._split_multi_shard_command(*args, **kwargs),
855 RequestPolicy.SPECIAL: self.get_special_nodes,
856 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
857 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
858 }
860 self._policy_resolver = policy_resolver
861 self.commands_parser = CommandsParser(self)
863 # Node where FT.AGGREGATE command is executed.
864 self._aggregate_nodes = None
865 self._lock = threading.RLock()
867 MaintNotificationsAbstractRedisCluster.__init__(
868 self, maint_notifications_config, **kwargs
869 )
871 def __enter__(self):
872 return self
874 def __exit__(self, exc_type, exc_value, traceback):
875 self.close()
877 def __del__(self):
878 try:
879 self.close()
880 except Exception:
881 pass
883 def disconnect_connection_pools(self):
884 for node in self.get_nodes():
885 if node.redis_connection:
886 try:
887 node.redis_connection.connection_pool.disconnect()
888 except OSError:
889 # Client was already disconnected. do nothing
890 pass
892 def on_connect(self, connection):
893 """
894 Initialize the connection, authenticate and select a database and send
895 READONLY if it is set during object initialization.
896 """
897 connection.on_connect()
899 if self.read_from_replicas or self.load_balancing_strategy:
900 # Sending READONLY command to server to configure connection as
901 # readonly. Since each cluster node may change its server type due
902 # to a failover, we should establish a READONLY connection
903 # regardless of the server type. If this is a primary connection,
904 # READONLY would not affect executing write commands.
905 connection.send_command("READONLY")
906 if str_if_bytes(connection.read_response()) != "OK":
907 raise ConnectionError("READONLY command failed")
909 if self.user_on_connect_func is not None:
910 self.user_on_connect_func(connection)
912 def get_redis_connection(self, node: "ClusterNode") -> Redis:
913 if not node.redis_connection:
914 with self._lock:
915 if not node.redis_connection:
916 self.nodes_manager.create_redis_connections([node])
917 return node.redis_connection
919 def get_node(self, host=None, port=None, node_name=None):
920 return self.nodes_manager.get_node(host, port, node_name)
922 def get_primaries(self):
923 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
925 def get_replicas(self):
926 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
928 def get_random_node(self):
929 return random.choice(list(self.nodes_manager.nodes_cache.values()))
931 def get_random_primary_or_all_nodes(self, command_name):
932 """
933 Returns random primary or all nodes depends on READONLY mode.
934 """
935 if self.read_from_replicas and command_name in READ_COMMANDS:
936 return self.get_random_node()
938 return self.get_random_primary_node()
940 def get_nodes(self):
941 return list(self.nodes_manager.nodes_cache.values())
943 def get_node_from_key(self, key, replica=False):
944 """
945 Get the node that holds the key's slot.
946 If replica set to True but the slot doesn't have any replicas, None is
947 returned.
948 """
949 slot = self.keyslot(key)
950 slot_cache = self.nodes_manager.slots_cache.get(slot)
951 if slot_cache is None or len(slot_cache) == 0:
952 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
953 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
954 return None
955 elif replica:
956 node_idx = 1
957 else:
958 # primary
959 node_idx = 0
961 return slot_cache[node_idx]
963 def get_default_node(self):
964 """
965 Get the cluster's default node
966 """
967 return self.nodes_manager.default_node
969 def get_nodes_from_slot(self, command: str, *args):
970 """
971 Returns a list of nodes that hold the specified keys' slots.
972 """
973 # get the node that holds the key's slot
974 slot = self.determine_slot(*args)
975 node = self.nodes_manager.get_node_from_slot(
976 slot,
977 self.read_from_replicas and command in READ_COMMANDS,
978 self.load_balancing_strategy if command in READ_COMMANDS else None,
979 )
980 return [node]
982 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]:
983 """
984 Splits the command with Multi-Shard policy, to the multiple commands
985 """
986 keys = self._get_command_keys(*args)
987 commands = []
989 for key in keys:
990 commands.append(
991 {
992 "args": (args[0], key),
993 "kwargs": kwargs,
994 }
995 )
997 return commands
999 def get_special_nodes(self) -> Optional[list["ClusterNode"]]:
1000 """
1001 Returns a list of nodes for commands with a special policy.
1002 """
1003 if not self._aggregate_nodes:
1004 raise RedisClusterException(
1005 "Cannot execute FT.CURSOR commands without FT.AGGREGATE"
1006 )
1008 return self._aggregate_nodes
1010 def get_random_primary_node(self) -> "ClusterNode":
1011 """
1012 Returns a random primary node
1013 """
1014 return random.choice(self.get_primaries())
1016 def _evaluate_all_succeeded(self, res):
1017 """
1018 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED
1019 """
1020 first_successful_response = None
1022 if isinstance(res, dict):
1023 for key, value in res.items():
1024 if value:
1025 if first_successful_response is None:
1026 first_successful_response = {key: value}
1027 else:
1028 return {key: False}
1029 else:
1030 for response in res:
1031 if response:
1032 if first_successful_response is None:
1033 # Dynamically resolve type
1034 first_successful_response = type(response)(response)
1035 else:
1036 return type(response)(False)
1038 return first_successful_response
1040 def set_default_node(self, node):
1041 """
1042 Set the default node of the cluster.
1043 :param node: 'ClusterNode'
1044 :return True if the default node was set, else False
1045 """
1046 if node is None or self.get_node(node_name=node.name) is None:
1047 return False
1048 self.nodes_manager.default_node = node
1049 return True
1051 def set_retry(self, retry: Retry) -> None:
1052 self.retry = retry
1054 def monitor(self, target_node=None):
1055 """
1056 Returns a Monitor object for the specified target node.
1057 The default cluster node will be selected if no target node was
1058 specified.
1059 Monitor is useful for handling the MONITOR command to the redis server.
1060 next_command() method returns one command from monitor
1061 listen() method yields commands from monitor.
1062 """
1063 if target_node is None:
1064 target_node = self.get_default_node()
1065 if target_node.redis_connection is None:
1066 raise RedisClusterException(
1067 f"Cluster Node {target_node.name} has no redis_connection"
1068 )
1069 return target_node.redis_connection.monitor()
1071 def pubsub(self, node=None, host=None, port=None, **kwargs):
1072 """
1073 Allows passing a ClusterNode, or host&port, to get a pubsub instance
1074 connected to the specified node
1075 """
1076 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
1078 def pipeline(self, transaction=None, shard_hint=None):
1079 """
1080 Cluster impl:
1081 Pipelines do not work in cluster mode the same way they
1082 do in normal mode. Create a clone of this object so
1083 that simulating pipelines will work correctly. Each
1084 command will be called directly when used and
1085 when calling execute() will only return the result stack.
1086 """
1087 if shard_hint:
1088 raise RedisClusterException("shard_hint is deprecated in cluster mode")
1090 return ClusterPipeline(
1091 nodes_manager=self.nodes_manager,
1092 commands_parser=self.commands_parser,
1093 startup_nodes=self.nodes_manager.startup_nodes,
1094 result_callbacks=self.result_callbacks,
1095 cluster_response_callbacks=self.cluster_response_callbacks,
1096 read_from_replicas=self.read_from_replicas,
1097 load_balancing_strategy=self.load_balancing_strategy,
1098 reinitialize_steps=self.reinitialize_steps,
1099 retry=self.retry,
1100 lock=self._lock,
1101 transaction=transaction,
1102 event_dispatcher=self._event_dispatcher,
1103 )
1105 def lock(
1106 self,
1107 name,
1108 timeout=None,
1109 sleep=0.1,
1110 blocking=True,
1111 blocking_timeout=None,
1112 lock_class=None,
1113 thread_local=True,
1114 raise_on_release_error: bool = True,
1115 ):
1116 """
1117 Return a new Lock object using key ``name`` that mimics
1118 the behavior of threading.Lock.
1120 If specified, ``timeout`` indicates a maximum life for the lock.
1121 By default, it will remain locked until release() is called.
1123 ``sleep`` indicates the amount of time to sleep per loop iteration
1124 when the lock is in blocking mode and another client is currently
1125 holding the lock.
1127 ``blocking`` indicates whether calling ``acquire`` should block until
1128 the lock has been acquired or to fail immediately, causing ``acquire``
1129 to return False and the lock not being acquired. Defaults to True.
1130 Note this value can be overridden by passing a ``blocking``
1131 argument to ``acquire``.
1133 ``blocking_timeout`` indicates the maximum amount of time in seconds to
1134 spend trying to acquire the lock. A value of ``None`` indicates
1135 continue trying forever. ``blocking_timeout`` can be specified as a
1136 float or integer, both representing the number of seconds to wait.
1138 ``lock_class`` forces the specified lock implementation. Note that as
1139 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
1140 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
1141 you have created your own custom lock class.
1143 ``thread_local`` indicates whether the lock token is placed in
1144 thread-local storage. By default, the token is placed in thread local
1145 storage so that a thread only sees its token, not a token set by
1146 another thread. Consider the following timeline:
1148 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
1149 thread-1 sets the token to "abc"
1150 time: 1, thread-2 blocks trying to acquire `my-lock` using the
1151 Lock instance.
1152 time: 5, thread-1 has not yet completed. redis expires the lock
1153 key.
1154 time: 5, thread-2 acquired `my-lock` now that it's available.
1155 thread-2 sets the token to "xyz"
1156 time: 6, thread-1 finishes its work and calls release(). if the
1157 token is *not* stored in thread local storage, then
1158 thread-1 would see the token value as "xyz" and would be
1159 able to successfully release the thread-2's lock.
1161 ``raise_on_release_error`` indicates whether to raise an exception when
1162 the lock is no longer owned when exiting the context manager. By default,
1163 this is True, meaning an exception will be raised. If False, the warning
1164 will be logged and the exception will be suppressed.
1166 In some use cases it's necessary to disable thread local storage. For
1167 example, if you have code where one thread acquires a lock and passes
1168 that lock instance to a worker thread to release later. If thread
1169 local storage isn't disabled in this case, the worker thread won't see
1170 the token set by the thread that acquired the lock. Our assumption
1171 is that these cases aren't common and as such default to using
1172 thread local storage."""
1173 if lock_class is None:
1174 lock_class = Lock
1175 return lock_class(
1176 self,
1177 name,
1178 timeout=timeout,
1179 sleep=sleep,
1180 blocking=blocking,
1181 blocking_timeout=blocking_timeout,
1182 thread_local=thread_local,
1183 raise_on_release_error=raise_on_release_error,
1184 )
1186 def set_response_callback(self, command, callback):
1187 """Set a custom Response Callback"""
1188 self.cluster_response_callbacks[command] = callback
1190 def _determine_nodes(
1191 self, *args, request_policy: RequestPolicy, **kwargs
1192 ) -> List["ClusterNode"]:
1193 """
1194 Determines a nodes the command should be executed on.
1195 """
1196 command = args[0].upper()
1197 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1198 command = f"{args[0]} {args[1]}".upper()
1200 nodes_flag = kwargs.pop("nodes_flag", None)
1201 if nodes_flag is not None:
1202 # nodes flag passed by the user
1203 command_flag = nodes_flag
1204 else:
1205 # get the nodes group for this command if it was predefined
1206 command_flag = self.command_flags.get(command)
1208 if command_flag in self._command_flags_mapping:
1209 request_policy = self._command_flags_mapping[command_flag]
1211 policy_callback = self._policies_callback_mapping[request_policy]
1213 if request_policy == RequestPolicy.DEFAULT_KEYED:
1214 nodes = policy_callback(command, *args)
1215 elif request_policy == RequestPolicy.MULTI_SHARD:
1216 nodes = policy_callback(*args, **kwargs)
1217 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
1218 nodes = policy_callback(args[0])
1219 else:
1220 nodes = policy_callback()
1222 if args[0].lower() == "ft.aggregate":
1223 self._aggregate_nodes = nodes
1225 return nodes
1227 def _should_reinitialized(self):
1228 # To reinitialize the cluster on every MOVED error,
1229 # set reinitialize_steps to 1.
1230 # To avoid reinitializing the cluster on moved errors, set
1231 # reinitialize_steps to 0.
1232 if self.reinitialize_steps == 0:
1233 return False
1234 else:
1235 return self.reinitialize_counter % self.reinitialize_steps == 0
1237 def keyslot(self, key):
1238 """
1239 Calculate keyslot for a given key.
1240 See Keys distribution model in https://redis.io/topics/cluster-spec
1241 """
1242 k = self.encoder.encode(key)
1243 return key_slot(k)
1245 def _get_command_keys(self, *args):
1246 """
1247 Get the keys in the command. If the command has no keys in in, None is
1248 returned.
1250 NOTE: Due to a bug in redis<7.0, this function does not work properly
1251 for EVAL or EVALSHA when the `numkeys` arg is 0.
1252 - issue: https://github.com/redis/redis/issues/9493
1253 - fix: https://github.com/redis/redis/pull/9733
1255 So, don't use this function with EVAL or EVALSHA.
1256 """
1257 redis_conn = self.get_default_node().redis_connection
1258 return self.commands_parser.get_keys(redis_conn, *args)
1260 def determine_slot(self, *args) -> Optional[int]:
1261 """
1262 Figure out what slot to use based on args.
1264 Raises a RedisClusterException if there's a missing key and we can't
1265 determine what slots to map the command to; or, if the keys don't
1266 all map to the same key slot.
1267 """
1268 command = args[0]
1269 if self.command_flags.get(command) == SLOT_ID:
1270 # The command contains the slot ID
1271 return args[1]
1273 # Get the keys in the command
1275 # CLIENT TRACKING is a special case.
1276 # It doesn't have any keys, it needs to be sent to the provided nodes
1277 # By default it will be sent to all nodes.
1278 if command.upper() == "CLIENT TRACKING":
1279 return None
1281 # EVAL and EVALSHA are common enough that it's wasteful to go to the
1282 # redis server to parse the keys. Besides, there is a bug in redis<7.0
1283 # where `self._get_command_keys()` fails anyway. So, we special case
1284 # EVAL/EVALSHA.
1285 if command.upper() in ("EVAL", "EVALSHA"):
1286 # command syntax: EVAL "script body" num_keys ...
1287 if len(args) <= 2:
1288 raise RedisClusterException(f"Invalid args in command: {args}")
1289 num_actual_keys = int(args[2])
1290 eval_keys = args[3 : 3 + num_actual_keys]
1291 # if there are 0 keys, that means the script can be run on any node
1292 # so we can just return a random slot
1293 if len(eval_keys) == 0:
1294 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1295 keys = eval_keys
1296 else:
1297 keys = self._get_command_keys(*args)
1298 if keys is None or len(keys) == 0:
1299 # FCALL can call a function with 0 keys, that means the function
1300 # can be run on any node so we can just return a random slot
1301 if command.upper() in ("FCALL", "FCALL_RO"):
1302 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1303 raise RedisClusterException(
1304 "No way to dispatch this command to Redis Cluster. "
1305 "Missing key.\nYou can execute the command by specifying "
1306 f"target nodes.\nCommand: {args}"
1307 )
1309 # single key command
1310 if len(keys) == 1:
1311 return self.keyslot(keys[0])
1313 # multi-key command; we need to make sure all keys are mapped to
1314 # the same slot
1315 slots = {self.keyslot(key) for key in keys}
1316 if len(slots) != 1:
1317 raise RedisClusterException(
1318 f"{command} - all keys must map to the same key slot"
1319 )
1321 return slots.pop()
1323 def get_encoder(self):
1324 """
1325 Get the connections' encoder
1326 """
1327 return self.encoder
1329 def get_connection_kwargs(self):
1330 """
1331 Get the connections' key-word arguments
1332 """
1333 return self.nodes_manager.connection_kwargs
1335 def _is_nodes_flag(self, target_nodes):
1336 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1338 def _parse_target_nodes(self, target_nodes):
1339 if isinstance(target_nodes, list):
1340 nodes = target_nodes
1341 elif isinstance(target_nodes, ClusterNode):
1342 # Supports passing a single ClusterNode as a variable
1343 nodes = [target_nodes]
1344 elif isinstance(target_nodes, dict):
1345 # Supports dictionaries of the format {node_name: node}.
1346 # It enables to execute commands with multi nodes as follows:
1347 # rc.cluster_save_config(rc.get_primaries())
1348 nodes = target_nodes.values()
1349 else:
1350 raise TypeError(
1351 "target_nodes type can be one of the following: "
1352 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1353 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1354 f"The passed type is {type(target_nodes)}"
1355 )
1356 return nodes
1358 def execute_command(self, *args, **kwargs):
1359 return self._internal_execute_command(*args, **kwargs)
1361 def _internal_execute_command(self, *args, **kwargs):
1362 """
1363 Wrapper for ERRORS_ALLOW_RETRY error handling.
1365 It will try the number of times specified by the retries property from
1366 config option "self.retry" which defaults to 3 unless manually
1367 configured.
1369 If it reaches the number of times, the command will raise the exception
1371 Key argument :target_nodes: can be passed with the following types:
1372 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1373 ClusterNode
1374 list<ClusterNode>
1375 dict<Any, ClusterNode>
1376 """
1377 target_nodes_specified = False
1378 is_default_node = False
1379 target_nodes = None
1380 passed_targets = kwargs.pop("target_nodes", None)
1381 command_policies = self._policy_resolver.resolve(args[0].lower())
1383 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1384 target_nodes = self._parse_target_nodes(passed_targets)
1385 target_nodes_specified = True
1387 if not command_policies and not target_nodes_specified:
1388 command = args[0].upper()
1389 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1390 command = f"{args[0]} {args[1]}".upper()
1392 # We only could resolve key properties if command is not
1393 # in a list of pre-defined request policies
1394 command_flag = self.command_flags.get(command)
1395 if not command_flag:
1396 # Fallback to default policy
1397 if not self.get_default_node():
1398 slot = None
1399 else:
1400 slot = self.determine_slot(*args)
1401 if slot is None:
1402 command_policies = CommandPolicies()
1403 else:
1404 command_policies = CommandPolicies(
1405 request_policy=RequestPolicy.DEFAULT_KEYED,
1406 response_policy=ResponsePolicy.DEFAULT_KEYED,
1407 )
1408 else:
1409 if command_flag in self._command_flags_mapping:
1410 command_policies = CommandPolicies(
1411 request_policy=self._command_flags_mapping[command_flag]
1412 )
1413 else:
1414 command_policies = CommandPolicies()
1415 elif not command_policies and target_nodes_specified:
1416 command_policies = CommandPolicies()
1418 # If an error that allows retrying was thrown, the nodes and slots
1419 # cache were reinitialized. We will retry executing the command with
1420 # the updated cluster setup only when the target nodes can be
1421 # determined again with the new cache tables. Therefore, when target
1422 # nodes were passed to this function, we cannot retry the command
1423 # execution since the nodes may not be valid anymore after the tables
1424 # were reinitialized. So in case of passed target nodes,
1425 # retry_attempts will be set to 0.
1426 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
1427 # Add one for the first execution
1428 execute_attempts = 1 + retry_attempts
1429 failure_count = 0
1431 # Start timing for observability
1432 start_time = time.monotonic()
1434 for _ in range(execute_attempts):
1435 try:
1436 res = {}
1437 if not target_nodes_specified:
1438 # Determine the nodes to execute the command on
1439 target_nodes = self._determine_nodes(
1440 *args,
1441 request_policy=command_policies.request_policy,
1442 nodes_flag=passed_targets,
1443 )
1445 if not target_nodes:
1446 raise RedisClusterException(
1447 f"No targets were found to execute {args} command on"
1448 )
1449 if (
1450 len(target_nodes) == 1
1451 and target_nodes[0] == self.get_default_node()
1452 ):
1453 is_default_node = True
1454 for node in target_nodes:
1455 res[node.name] = self._execute_command(node, *args, **kwargs)
1457 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED:
1458 break
1460 # Return the processed result
1461 return self._process_result(
1462 args[0],
1463 res,
1464 response_policy=command_policies.response_policy,
1465 **kwargs,
1466 )
1467 except Exception as e:
1468 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1469 if is_default_node:
1470 # Replace the default cluster node
1471 self.replace_default_node()
1472 # The nodes and slots cache were reinitialized.
1473 # Try again with the new cluster setup.
1474 retry_attempts -= 1
1475 failure_count += 1
1477 if hasattr(e, "connection"):
1478 self._record_command_metric(
1479 command_name=args[0],
1480 duration_seconds=time.monotonic() - start_time,
1481 connection=e.connection,
1482 error=e,
1483 )
1485 self._record_error_metric(
1486 error=e,
1487 connection=e.connection,
1488 retry_attempts=failure_count,
1489 )
1490 continue
1491 else:
1492 # raise the exception
1493 if hasattr(e, "connection"):
1494 self._record_error_metric(
1495 error=e,
1496 connection=e.connection,
1497 retry_attempts=failure_count,
1498 is_internal=False,
1499 )
1500 raise e
1502 def _execute_command(self, target_node, *args, **kwargs):
1503 """
1504 Send a command to a node in the cluster
1505 """
1506 command = args[0]
1507 redis_node = None
1508 connection = None
1509 redirect_addr = None
1510 asking = False
1511 moved = False
1512 ttl = int(self.RedisClusterRequestTTL)
1514 # Start timing for observability
1515 start_time = time.monotonic()
1517 while ttl > 0:
1518 ttl -= 1
1519 try:
1520 if asking:
1521 target_node = self.get_node(node_name=redirect_addr)
1522 elif moved:
1523 # MOVED occurred and the slots cache was updated,
1524 # refresh the target node
1525 slot = self.determine_slot(*args)
1526 target_node = self.nodes_manager.get_node_from_slot(
1527 slot,
1528 self.read_from_replicas and command in READ_COMMANDS,
1529 self.load_balancing_strategy
1530 if command in READ_COMMANDS
1531 else None,
1532 )
1533 moved = False
1535 redis_node = self.get_redis_connection(target_node)
1536 connection = get_connection(redis_node)
1537 if asking:
1538 connection.send_command("ASKING")
1539 redis_node.parse_response(connection, "ASKING", **kwargs)
1540 asking = False
1541 connection.send_command(*args, **kwargs)
1542 response = redis_node.parse_response(connection, command, **kwargs)
1544 # Remove keys entry, it needs only for cache.
1545 kwargs.pop("keys", None)
1547 if command in self.cluster_response_callbacks:
1548 response = self.cluster_response_callbacks[command](
1549 response, **kwargs
1550 )
1552 self._record_command_metric(
1553 command_name=command,
1554 duration_seconds=time.monotonic() - start_time,
1555 connection=connection,
1556 )
1557 return response
1558 except AuthenticationError as e:
1559 e.connection = connection if connection is not None else target_node
1560 self._record_command_metric(
1561 command_name=command,
1562 duration_seconds=time.monotonic() - start_time,
1563 connection=connection,
1564 error=e,
1565 )
1566 raise
1567 except MaxConnectionsError as e:
1568 # MaxConnectionsError indicates client-side resource exhaustion
1569 # (too many connections in the pool), not a node failure.
1570 # Don't treat this as a node failure - just re-raise the error
1571 # without reinitializing the cluster.
1572 # The connection in the error is used to report the metrics based on host and port info
1573 # so we use the target node object which contains the host and port info
1574 # because we did not get the connection yet
1575 e.connection = target_node
1576 self._record_command_metric(
1577 command_name=command,
1578 duration_seconds=time.monotonic() - start_time,
1579 connection=connection,
1580 error=e,
1581 )
1582 raise
1583 except (ConnectionError, TimeoutError) as e:
1584 if is_debug_log_enabled():
1585 socket_address = self._extracts_socket_address(connection)
1586 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1587 logger.debug(
1588 f"{type(e).__name__} received for command {args_log_str}, on node {target_node.name}, "
1589 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1590 )
1591 # this is used to report the metrics based on host and port info
1592 e.connection = connection if connection else target_node
1594 # ConnectionError can also be raised if we couldn't get a
1595 # connection from the pool before timing out, so check that
1596 # this is an actual connection before attempting to disconnect.
1597 if connection is not None:
1598 connection.disconnect()
1600 # Instead of setting to None, properly handle the pool
1601 # Get the pool safely - redis_connection could be set to None
1602 # by another thread between the check and access
1603 redis_conn = target_node.redis_connection
1604 if redis_conn is not None:
1605 pool = redis_conn.connection_pool
1606 if pool is not None:
1607 with pool._lock:
1608 # take care for the active connections in the pool
1609 pool.update_active_connections_for_reconnect()
1610 # disconnect all free connections
1611 pool.disconnect_free_connections()
1613 # Move the failed node to the end of the cached nodes list
1614 self.nodes_manager.move_node_to_end_of_cached_nodes(target_node.name)
1616 # DON'T set redis_connection = None - keep the pool for reuse
1617 self.nodes_manager.initialize()
1618 e.connection = connection
1619 self._record_command_metric(
1620 command_name=command,
1621 duration_seconds=time.monotonic() - start_time,
1622 connection=connection,
1623 error=e,
1624 )
1625 raise e
1626 except MovedError as e:
1627 if is_debug_log_enabled():
1628 socket_address = self._extracts_socket_address(connection)
1629 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1630 logger.debug(
1631 f"MOVED error received for command {args_log_str}, on node {target_node.name}, "
1632 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1633 )
1634 # First, we will try to patch the slots/nodes cache with the
1635 # redirected node output and try again. If MovedError exceeds
1636 # 'reinitialize_steps' number of times, we will force
1637 # reinitializing the tables, and then try again.
1638 # 'reinitialize_steps' counter will increase faster when
1639 # the same client object is shared between multiple threads. To
1640 # reduce the frequency you can set this variable in the
1641 # RedisCluster constructor.
1642 self.reinitialize_counter += 1
1643 if self._should_reinitialized():
1644 # during this call all connections are closed or marked for disconnect,
1645 # so we don't need to disconnect the changed node's connections
1646 self.nodes_manager.initialize(
1647 additional_startup_nodes_info=[(e.host, e.port)]
1648 )
1649 # Reset the counter
1650 self.reinitialize_counter = 0
1651 else:
1652 self.nodes_manager.move_slot(e)
1653 moved = True
1654 self._record_command_metric(
1655 command_name=command,
1656 duration_seconds=time.monotonic() - start_time,
1657 connection=connection,
1658 error=e,
1659 )
1660 self._record_error_metric(
1661 error=e,
1662 connection=connection,
1663 )
1664 except TryAgainError as e:
1665 if is_debug_log_enabled():
1666 socket_address = self._extracts_socket_address(connection)
1667 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1668 logger.debug(
1669 f"TRYAGAIN error received for command {args_log_str}, on node {target_node.name}, "
1670 f"and connection: {connection} using local socket address: {socket_address}"
1671 )
1672 if ttl < self.RedisClusterRequestTTL / 2:
1673 time.sleep(0.05)
1675 self._record_command_metric(
1676 command_name=command,
1677 duration_seconds=time.monotonic() - start_time,
1678 connection=connection,
1679 error=e,
1680 )
1681 self._record_error_metric(
1682 error=e,
1683 connection=connection,
1684 )
1685 except AskError as e:
1686 if is_debug_log_enabled():
1687 socket_address = self._extracts_socket_address(connection)
1688 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1689 logger.debug(
1690 f"ASK error received for command {args_log_str}, on node {target_node.name}, "
1691 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1692 )
1693 redirect_addr = get_node_name(host=e.host, port=e.port)
1694 asking = True
1696 self._record_command_metric(
1697 command_name=command,
1698 duration_seconds=time.monotonic() - start_time,
1699 connection=connection,
1700 error=e,
1701 )
1702 self._record_error_metric(
1703 error=e,
1704 connection=connection,
1705 )
1706 except (ClusterDownError, SlotNotCoveredError) as e:
1707 # ClusterDownError can occur during a failover and to get
1708 # self-healed, we will try to reinitialize the cluster layout
1709 # and retry executing the command
1711 # SlotNotCoveredError can occur when the cluster is not fully
1712 # initialized or can be temporary issue.
1713 # We will try to reinitialize the cluster topology
1714 # and retry executing the command
1716 time.sleep(0.25)
1717 self.nodes_manager.initialize()
1719 # if we have a connection, use it, otherwise use the target node
1720 # object which contains the host and port info
1721 # this is used to report the metrics based on host and port info
1722 e.connection = connection if connection else target_node
1723 self._record_command_metric(
1724 command_name=command,
1725 duration_seconds=time.monotonic() - start_time,
1726 connection=connection,
1727 error=e,
1728 )
1729 raise
1730 except ResponseError as e:
1731 # this is used to report the metrics based on host and port info
1732 e.connection = connection
1733 self._record_command_metric(
1734 command_name=command,
1735 duration_seconds=time.monotonic() - start_time,
1736 connection=connection,
1737 error=e,
1738 )
1739 raise
1740 except Exception as e:
1741 if connection:
1742 connection.disconnect()
1744 # if we have a connection, use it, otherwise use the target node
1745 # object which contains the host and port info
1746 # this is used to report the metrics based on host and port info
1747 e.connection = connection if connection else target_node
1748 self._record_command_metric(
1749 command_name=command,
1750 duration_seconds=time.monotonic() - start_time,
1751 connection=connection,
1752 error=e,
1753 )
1754 raise e
1755 finally:
1756 if connection is not None:
1757 redis_node.connection_pool.release(connection)
1759 e = ClusterError("TTL exhausted.")
1760 # In this case we should have an active connection.
1761 # If we are here, we have received many MOVED or ASK errors and finally exhausted the TTL.
1762 # This means that we used an active connection to read from the socket.
1763 # This is used to report metrics based on the host and port information.
1764 e.connection = connection
1765 self._record_command_metric(
1766 command_name=command,
1767 duration_seconds=time.monotonic() - start_time,
1768 connection=connection,
1769 error=e,
1770 )
1771 raise e
1773 def _record_command_metric(
1774 self,
1775 command_name: str,
1776 duration_seconds: float,
1777 connection: Connection,
1778 error=None,
1779 ):
1780 """
1781 Records operation duration metric directly.
1782 """
1783 record_operation_duration(
1784 command_name=command_name,
1785 duration_seconds=duration_seconds,
1786 server_address=connection.host,
1787 server_port=connection.port,
1788 db_namespace=str(connection.db),
1789 error=error,
1790 )
1792 def _record_error_metric(
1793 self,
1794 error: Exception,
1795 connection: Connection,
1796 is_internal: bool = True,
1797 retry_attempts: Optional[int] = None,
1798 ):
1799 """
1800 Records error count metric directly.
1801 """
1802 record_error_count(
1803 server_address=connection.host,
1804 server_port=connection.port,
1805 network_peer_address=connection.host,
1806 network_peer_port=connection.port,
1807 error_type=error,
1808 retry_attempts=retry_attempts if retry_attempts is not None else 0,
1809 is_internal=is_internal,
1810 )
1812 def _extracts_socket_address(
1813 self, connection: Optional[Connection]
1814 ) -> Optional[int]:
1815 if connection is None:
1816 return None
1817 try:
1818 socket_address = (
1819 connection._sock.getsockname() if connection._sock else None
1820 )
1821 socket_address = socket_address[1] if socket_address else None
1822 except (AttributeError, OSError):
1823 pass
1824 return socket_address
1826 def close(self) -> None:
1827 try:
1828 with self._lock:
1829 if self.nodes_manager:
1830 self.nodes_manager.close()
1831 except AttributeError:
1832 # RedisCluster's __init__ can fail before nodes_manager is set
1833 pass
1835 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs):
1836 """
1837 Process the result of the executed command.
1838 The function would return a dict or a single value.
1840 :type command: str
1841 :type res: dict
1843 `res` should be in the following format:
1844 Dict<node_name, command_result>
1845 """
1846 if command in self.result_callbacks:
1847 res = self.result_callbacks[command](command, res, **kwargs)
1848 elif len(res) == 1:
1849 # When we execute the command on a single node, we can
1850 # remove the dictionary and return a single response
1851 res = list(res.values())[0]
1853 return self._policies_callback_mapping[response_policy](res)
1855 def load_external_module(self, funcname, func):
1856 """
1857 This function can be used to add externally defined redis modules,
1858 and their namespaces to the redis client.
1860 ``funcname`` - A string containing the name of the function to create
1861 ``func`` - The function, being added to this class.
1862 """
1863 setattr(self, funcname, func)
1865 def transaction(self, func, *watches, **kwargs):
1866 """
1867 Convenience method for executing the callable `func` as a transaction
1868 while watching all keys specified in `watches`. The 'func' callable
1869 should expect a single argument which is a Pipeline object.
1870 """
1871 shard_hint = kwargs.pop("shard_hint", None)
1872 value_from_callable = kwargs.pop("value_from_callable", False)
1873 watch_delay = kwargs.pop("watch_delay", None)
1874 with self.pipeline(True, shard_hint) as pipe:
1875 while True:
1876 try:
1877 if watches:
1878 pipe.watch(*watches)
1879 func_value = func(pipe)
1880 exec_value = pipe.execute()
1881 return func_value if value_from_callable else exec_value
1882 except WatchError:
1883 if watch_delay is not None and watch_delay > 0:
1884 time.sleep(watch_delay)
1885 continue
1888class ClusterNode:
1889 def __init__(self, host, port, server_type=None, redis_connection=None):
1890 if host == "localhost":
1891 host = socket.gethostbyname(host)
1893 self.host = host
1894 self.port = port
1895 self.name = get_node_name(host, port)
1896 self.server_type = server_type
1897 self.redis_connection = redis_connection
1899 def __repr__(self):
1900 return (
1901 f"[host={self.host},"
1902 f"port={self.port},"
1903 f"name={self.name},"
1904 f"server_type={self.server_type},"
1905 f"redis_connection={self.redis_connection}]"
1906 )
1908 def __eq__(self, obj):
1909 return isinstance(obj, ClusterNode) and obj.name == self.name
1911 def __hash__(self):
1912 return hash(self.name)
1915class LoadBalancingStrategy(Enum):
1916 ROUND_ROBIN = "round_robin"
1917 ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1918 RANDOM_REPLICA = "random_replica"
1921class LoadBalancer:
1922 """
1923 Round-Robin Load Balancing
1924 """
1926 def __init__(self, start_index: int = 0) -> None:
1927 self.primary_to_idx: dict[str, int] = {}
1928 self.start_index: int = start_index
1929 self._lock: threading.Lock = threading.Lock()
1931 def get_server_index(
1932 self,
1933 primary: str,
1934 list_size: int,
1935 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1936 ) -> int:
1937 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1938 return self._get_random_replica_index(list_size)
1939 else:
1940 return self._get_round_robin_index(
1941 primary,
1942 list_size,
1943 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1944 )
1946 def reset(self) -> None:
1947 with self._lock:
1948 self.primary_to_idx.clear()
1950 def _get_random_replica_index(self, list_size: int) -> int:
1951 return random.randint(1, list_size - 1)
1953 def _get_round_robin_index(
1954 self, primary: str, list_size: int, replicas_only: bool
1955 ) -> int:
1956 with self._lock:
1957 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1958 if replicas_only and server_index == 0:
1959 # skip the primary node index
1960 server_index = 1
1961 # Update the index for the next round
1962 self.primary_to_idx[primary] = (server_index + 1) % list_size
1963 return server_index
1966class NodesManager:
1967 def __init__(
1968 self,
1969 startup_nodes: list[ClusterNode],
1970 from_url=False,
1971 require_full_coverage=False,
1972 lock: Optional[threading.RLock] = None,
1973 dynamic_startup_nodes=True,
1974 connection_pool_class=ConnectionPool,
1975 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
1976 cache: Optional[CacheInterface] = None,
1977 cache_config: Optional[CacheConfig] = None,
1978 cache_factory: Optional[CacheFactoryInterface] = None,
1979 event_dispatcher: Optional[EventDispatcher] = None,
1980 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
1981 **kwargs,
1982 ):
1983 self.nodes_cache: dict[str, ClusterNode] = {}
1984 self.slots_cache: dict[int, list[ClusterNode]] = {}
1985 self.startup_nodes: dict[str, ClusterNode] = {n.name: n for n in startup_nodes}
1986 self.default_node: Optional[ClusterNode] = None
1987 self._epoch: int = 0
1988 self.from_url = from_url
1989 self._require_full_coverage = require_full_coverage
1990 self._dynamic_startup_nodes = dynamic_startup_nodes
1991 self.connection_pool_class = connection_pool_class
1992 self.address_remap = address_remap
1993 self._cache: Optional[CacheInterface] = None
1994 if cache:
1995 self._cache = cache
1996 elif cache_factory is not None:
1997 self._cache = cache_factory.get_cache()
1998 elif cache_config is not None:
1999 self._cache = CacheFactory(cache_config).get_cache()
2000 self.connection_kwargs = kwargs
2001 self.read_load_balancer = LoadBalancer()
2003 # nodes_cache / slots_cache / startup_nodes / default_node are protected by _lock
2004 if lock is None:
2005 self._lock = threading.RLock()
2006 else:
2007 self._lock = lock
2009 # initialize holds _initialization_lock to dedup multiple calls to reinitialize;
2010 # note that if we hold both _lock and _initialization_lock, we _must_ acquire
2011 # _initialization_lock first (ie: to have a consistent order) to avoid deadlock.
2012 self._initialization_lock: threading.RLock = threading.RLock()
2014 if event_dispatcher is None:
2015 self._event_dispatcher = EventDispatcher()
2016 else:
2017 self._event_dispatcher = event_dispatcher
2018 self._credential_provider = self.connection_kwargs.get(
2019 "credential_provider", None
2020 )
2021 self.maint_notifications_config = maint_notifications_config
2023 self.initialize()
2025 def get_node(
2026 self,
2027 host: Optional[str] = None,
2028 port: Optional[int] = None,
2029 node_name: Optional[str] = None,
2030 ) -> Optional[ClusterNode]:
2031 """
2032 Get the requested node from the cluster's nodes.
2033 nodes.
2034 :return: ClusterNode if the node exists, else None
2035 """
2036 if host and port:
2037 # the user passed host and port
2038 if host == "localhost":
2039 host = socket.gethostbyname(host)
2040 with self._lock:
2041 return self.nodes_cache.get(get_node_name(host=host, port=port))
2042 elif node_name:
2043 with self._lock:
2044 return self.nodes_cache.get(node_name)
2045 else:
2046 return None
2048 def move_slot(self, e: Union[AskError, MovedError]):
2049 """
2050 Update the slot's node with the redirected one
2051 """
2052 with self._lock:
2053 redirected_node = self.get_node(host=e.host, port=e.port)
2054 if redirected_node is not None:
2055 # The node already exists
2056 if redirected_node.server_type is not PRIMARY:
2057 # Update the node's server type
2058 redirected_node.server_type = PRIMARY
2059 else:
2060 # This is a new node, we will add it to the nodes cache
2061 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
2062 self.nodes_cache[redirected_node.name] = redirected_node
2064 slot_nodes = self.slots_cache[e.slot_id]
2065 if redirected_node not in slot_nodes:
2066 # The new slot owner is a new server, or a server from a different
2067 # shard. We need to remove all current nodes from the slot's list
2068 # (including replications) and add just the new node.
2069 self.slots_cache[e.slot_id] = [redirected_node]
2070 elif redirected_node is not slot_nodes[0]:
2071 # The MOVED error resulted from a failover, and the new slot owner
2072 # had previously been a replica.
2073 old_primary = slot_nodes[0]
2074 # Update the old primary to be a replica and add it to the end of
2075 # the slot's node list
2076 old_primary.server_type = REPLICA
2077 slot_nodes.append(old_primary)
2078 # Remove the old replica, which is now a primary, from the slot's
2079 # node list
2080 slot_nodes.remove(redirected_node)
2081 # Override the old primary with the new one
2082 slot_nodes[0] = redirected_node
2083 if self.default_node == old_primary:
2084 # Update the default node with the new primary
2085 self.default_node = redirected_node
2086 # else: circular MOVED to current primary -> no-op
2088 @deprecated_args(
2089 args_to_warn=["server_type"],
2090 reason=(
2091 "In case you need select some load balancing strategy "
2092 "that will use replicas, please set it through 'load_balancing_strategy'"
2093 ),
2094 version="5.3.0",
2095 )
2096 def get_node_from_slot(
2097 self,
2098 slot: int,
2099 read_from_replicas: bool = False,
2100 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2101 server_type: Optional[Literal["primary", "replica"]] = None,
2102 ) -> ClusterNode:
2103 """
2104 Gets a node that servers this hash slot
2105 """
2107 if read_from_replicas is True and load_balancing_strategy is None:
2108 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
2110 with self._lock:
2111 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
2112 raise SlotNotCoveredError(
2113 f'Slot "{slot}" not covered by the cluster. '
2114 + f'"require_full_coverage={self._require_full_coverage}"'
2115 )
2117 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
2118 # get the server index using the strategy defined in load_balancing_strategy
2119 primary_name = self.slots_cache[slot][0].name
2120 node_idx = self.read_load_balancer.get_server_index(
2121 primary_name, len(self.slots_cache[slot]), load_balancing_strategy
2122 )
2123 elif (
2124 server_type is None
2125 or server_type == PRIMARY
2126 or len(self.slots_cache[slot]) == 1
2127 ):
2128 # return a primary
2129 node_idx = 0
2130 else:
2131 # return a replica
2132 # randomly choose one of the replicas
2133 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
2135 return self.slots_cache[slot][node_idx]
2137 def get_nodes_by_server_type(self, server_type: Literal["primary", "replica"]):
2138 """
2139 Get all nodes with the specified server type
2140 :param server_type: 'primary' or 'replica'
2141 :return: list of ClusterNode
2142 """
2143 with self._lock:
2144 return [
2145 node
2146 for node in self.nodes_cache.values()
2147 if node.server_type == server_type
2148 ]
2150 @deprecated_function(
2151 reason="This method is not used anymore internally. The startup nodes are populated automatically.",
2152 version="7.0.2",
2153 )
2154 def populate_startup_nodes(self, nodes):
2155 """
2156 Populate all startup nodes and filters out any duplicates
2157 """
2158 with self._lock:
2159 for n in nodes:
2160 self.startup_nodes[n.name] = n
2162 def move_node_to_end_of_cached_nodes(self, node_name: str) -> None:
2163 """
2164 Move a failing node to the end of startup_nodes and nodes_cache so it's
2165 tried last during reinitialization and when selecting the default node.
2166 If the node is not in the respective list, nothing is done.
2167 """
2168 # Move in startup_nodes
2169 if node_name in self.startup_nodes and len(self.startup_nodes) > 1:
2170 node = self.startup_nodes.pop(node_name)
2171 self.startup_nodes[node_name] = node # Re-insert at end
2173 # Move in nodes_cache - this affects get_nodes_by_server_type ordering
2174 # which is used to select the default_node during initialize()
2175 if node_name in self.nodes_cache and len(self.nodes_cache) > 1:
2176 node = self.nodes_cache.pop(node_name)
2177 self.nodes_cache[node_name] = node # Re-insert at end
2179 def check_slots_coverage(self, slots_cache):
2180 # Validate if all slots are covered or if we should try next
2181 # startup node
2182 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
2183 if i not in slots_cache:
2184 return False
2185 return True
2187 def create_redis_connections(self, nodes):
2188 """
2189 This function will create a redis connection to all nodes in :nodes:
2190 """
2191 connection_pools = []
2192 for node in nodes:
2193 if node.redis_connection is None:
2194 node.redis_connection = self.create_redis_node(
2195 host=node.host,
2196 port=node.port,
2197 maint_notifications_config=self.maint_notifications_config,
2198 **self.connection_kwargs,
2199 )
2200 connection_pools.append(node.redis_connection.connection_pool)
2202 self._event_dispatcher.dispatch(
2203 AfterPooledConnectionsInstantiationEvent(
2204 connection_pools, ClientType.SYNC, self._credential_provider
2205 )
2206 )
2208 def create_redis_node(
2209 self,
2210 host,
2211 port,
2212 **kwargs,
2213 ):
2214 # We are configuring the connection pool not to retry
2215 # connections on lower level clients to avoid retrying
2216 # connections to nodes that are not reachable
2217 # and to avoid blocking the connection pool.
2218 # The only error that will have some handling in the lower
2219 # level clients is ConnectionError which will trigger disconnection
2220 # of the socket.
2221 # The retries will be handled on cluster client level
2222 # where we will have proper handling of the cluster topology
2223 node_retry_config = Retry(
2224 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
2225 )
2227 if self.from_url:
2228 # Create a redis node with a custom connection pool
2229 kwargs.update({"host": host})
2230 kwargs.update({"port": port})
2231 kwargs.update({"cache": self._cache})
2232 kwargs.update({"retry": node_retry_config})
2233 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
2234 else:
2235 r = Redis(
2236 host=host,
2237 port=port,
2238 cache=self._cache,
2239 retry=node_retry_config,
2240 **kwargs,
2241 )
2242 return r
2244 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
2245 node_name = get_node_name(host, port)
2246 # check if we already have this node in the tmp_nodes_cache
2247 target_node = tmp_nodes_cache.get(node_name)
2248 if target_node is None:
2249 # before creating a new cluster node, check if the cluster node already
2250 # exists in the current nodes cache and has a valid connection so we can
2251 # reuse it
2252 redis_connection: Optional[Redis] = None
2253 with self._lock:
2254 previous_node = self.nodes_cache.get(node_name)
2255 if previous_node:
2256 redis_connection = previous_node.redis_connection
2257 # don't update the old ClusterNode, so we don't update its role
2258 # outside of the lock
2259 target_node = ClusterNode(host, port, role, redis_connection)
2260 # add this node to the nodes cache
2261 tmp_nodes_cache[target_node.name] = target_node
2263 return target_node
2265 def _get_epoch(self) -> int:
2266 """
2267 Get the current epoch value. This method exists primarily to allow
2268 tests to mock the epoch fetch and control race condition timing.
2269 """
2270 with self._lock:
2271 return self._epoch
2273 def initialize(
2274 self,
2275 additional_startup_nodes_info: Optional[List[Tuple[str, int]]] = None,
2276 disconnect_startup_nodes_pools: bool = True,
2277 ):
2278 """
2279 Initializes the nodes cache, slots cache and redis connections.
2280 :startup_nodes:
2281 Responsible for discovering other nodes in the cluster
2282 :disconnect_startup_nodes_pools:
2283 Whether to disconnect the connection pool of the startup nodes
2284 after the initialization is complete. This is useful when the
2285 startup nodes are not part of the cluster and we want to avoid
2286 keeping the connection open.
2287 :additional_startup_nodes_info:
2288 Additional nodes to add temporarily to the startup nodes.
2289 The additional nodes will be used just in the process of extraction of the slots
2290 and nodes information from the cluster.
2291 This is useful when we want to add new nodes to the cluster
2292 and initialize the client
2293 with them.
2294 The format of the list is a list of tuples, where each tuple contains
2295 the host and port of the node.
2296 """
2297 self.reset()
2298 tmp_nodes_cache = {}
2299 tmp_slots = {}
2300 disagreements = []
2301 startup_nodes_reachable = False
2302 fully_covered = False
2303 kwargs = self.connection_kwargs
2304 exception = None
2305 epoch = self._get_epoch()
2306 if additional_startup_nodes_info is None:
2307 additional_startup_nodes_info = []
2309 with self._initialization_lock:
2310 with self._lock:
2311 if epoch != self._epoch:
2312 # another thread has already re-initialized the nodes; don't
2313 # bother running again
2314 return
2316 with self._lock:
2317 startup_nodes = tuple(self.startup_nodes.values())
2319 additional_startup_nodes = [
2320 ClusterNode(host, port) for host, port in additional_startup_nodes_info
2321 ]
2322 if is_debug_log_enabled():
2323 logger.debug(
2324 f"Topology refresh: using additional nodes: {[node.name for node in additional_startup_nodes]}; "
2325 f"and startup nodes: {[node.name for node in startup_nodes]}"
2326 )
2328 for startup_node in (*startup_nodes, *additional_startup_nodes):
2329 try:
2330 if startup_node.redis_connection:
2331 r = startup_node.redis_connection
2333 else:
2334 # Create a new Redis connection
2335 if is_debug_log_enabled():
2336 socket_timeout = kwargs.get("socket_timeout", "not set")
2337 socket_connect_timeout = kwargs.get(
2338 "socket_connect_timeout", "not set"
2339 )
2340 maint_enabled = (
2341 self.maint_notifications_config.enabled
2342 if self.maint_notifications_config
2343 else False
2344 )
2345 logger.debug(
2346 "Topology refresh: Creating new Redis connection to "
2347 f"{startup_node.host}:{startup_node.port}; "
2348 f"with socket_timeout: {socket_timeout}, and "
2349 f"socket_connect_timeout: {socket_connect_timeout}, "
2350 "and maint_notifications enabled: "
2351 f"{maint_enabled}"
2352 )
2353 r = self.create_redis_node(
2354 startup_node.host,
2355 startup_node.port,
2356 maint_notifications_config=self.maint_notifications_config,
2357 **kwargs,
2358 )
2359 if startup_node in self.startup_nodes.values():
2360 self.startup_nodes[startup_node.name].redis_connection = r
2361 else:
2362 startup_node.redis_connection = r
2363 try:
2364 # Make sure cluster mode is enabled on this node
2365 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
2366 if disconnect_startup_nodes_pools:
2367 with r.connection_pool._lock:
2368 # take care to clear connections before we move on
2369 # mark all active connections for reconnect - they will be
2370 # reconnected on next use, but will allow current in flight commands to complete first
2371 r.connection_pool.update_active_connections_for_reconnect()
2372 # Needed to clear READONLY state when it is no longer applicable
2373 r.connection_pool.disconnect_free_connections()
2374 except ResponseError:
2375 raise RedisClusterException(
2376 "Cluster mode is not enabled on this node"
2377 )
2378 startup_nodes_reachable = True
2379 except Exception as e:
2380 # Try the next startup node.
2381 # The exception is saved and raised only if we have no more nodes.
2382 exception = e
2383 continue
2385 # CLUSTER SLOTS command results in the following output:
2386 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
2387 # where each node contains the following list: [IP, port, node_id]
2388 # Therefore, cluster_slots[0][2][0] will be the IP address of the
2389 # primary node of the first slot section.
2390 # If there's only one server in the cluster, its ``host`` is ''
2391 # Fix it to the host in startup_nodes
2392 if (
2393 len(cluster_slots) == 1
2394 and len(cluster_slots[0][2][0]) == 0
2395 and len(self.startup_nodes) == 1
2396 ):
2397 cluster_slots[0][2][0] = startup_node.host
2399 for slot in cluster_slots:
2400 primary_node = slot[2]
2401 host = str_if_bytes(primary_node[0])
2402 if host == "":
2403 host = startup_node.host
2404 port = int(primary_node[1])
2405 host, port = self.remap_host_port(host, port)
2407 nodes_for_slot = []
2409 target_node = self._get_or_create_cluster_node(
2410 host, port, PRIMARY, tmp_nodes_cache
2411 )
2412 nodes_for_slot.append(target_node)
2414 replica_nodes = slot[3:]
2415 for replica_node in replica_nodes:
2416 host = str_if_bytes(replica_node[0])
2417 port = int(replica_node[1])
2418 host, port = self.remap_host_port(host, port)
2419 target_replica_node = self._get_or_create_cluster_node(
2420 host, port, REPLICA, tmp_nodes_cache
2421 )
2422 nodes_for_slot.append(target_replica_node)
2424 for i in range(int(slot[0]), int(slot[1]) + 1):
2425 if i not in tmp_slots:
2426 tmp_slots[i] = nodes_for_slot
2427 else:
2428 # Validate that 2 nodes want to use the same slot cache
2429 # setup
2430 tmp_slot = tmp_slots[i][0]
2431 if tmp_slot.name != target_node.name:
2432 disagreements.append(
2433 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
2434 )
2436 if len(disagreements) > 5:
2437 raise RedisClusterException(
2438 f"startup_nodes could not agree on a valid "
2439 f"slots cache: {', '.join(disagreements)}"
2440 )
2442 fully_covered = self.check_slots_coverage(tmp_slots)
2443 if fully_covered:
2444 # Don't need to continue to the next startup node if all
2445 # slots are covered
2446 break
2448 if not startup_nodes_reachable:
2449 raise RedisClusterException(
2450 f"Redis Cluster cannot be connected. Please provide at least "
2451 f"one reachable node: {str(exception)}"
2452 ) from exception
2454 # Create Redis connections to all nodes
2455 self.create_redis_connections(list(tmp_nodes_cache.values()))
2457 # Check if the slots are not fully covered
2458 if not fully_covered and self._require_full_coverage:
2459 # Despite the requirement that the slots be covered, there
2460 # isn't a full coverage
2461 raise RedisClusterException(
2462 f"All slots are not covered after query all startup_nodes. "
2463 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
2464 f"covered..."
2465 )
2467 # Set the tmp variables to the real variables
2468 with self._lock:
2469 self.nodes_cache = tmp_nodes_cache
2470 self.slots_cache = tmp_slots
2471 # Set the default node
2472 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
2473 if self._dynamic_startup_nodes:
2474 # Populate the startup nodes with all discovered nodes
2475 self.startup_nodes = tmp_nodes_cache
2476 # Increment the epoch to signal that initialization has completed
2477 self._epoch += 1
2479 def close(self) -> None:
2480 with self._lock:
2481 self.default_node = None
2482 nodes = tuple(self.nodes_cache.values())
2483 for node in nodes:
2484 if node.redis_connection:
2485 node.redis_connection.close()
2487 def reset(self):
2488 try:
2489 self.read_load_balancer.reset()
2490 except TypeError:
2491 # The read_load_balancer is None, do nothing
2492 pass
2494 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
2495 """
2496 Remap the host and port returned from the cluster to a different
2497 internal value. Useful if the client is not connecting directly
2498 to the cluster.
2499 """
2500 if self.address_remap:
2501 return self.address_remap((host, port))
2502 return host, port
2504 def find_connection_owner(self, connection: Connection) -> Optional[ClusterNode]:
2505 node_name = get_node_name(connection.host, connection.port)
2506 with self._lock:
2507 for node in tuple(self.nodes_cache.values()):
2508 if node.redis_connection:
2509 conn_args = node.redis_connection.connection_pool.connection_kwargs
2510 if node_name == get_node_name(
2511 conn_args.get("host"), conn_args.get("port")
2512 ):
2513 return node
2514 return None
2517class ClusterPubSub(PubSub):
2518 """
2519 Wrapper for PubSub class.
2521 IMPORTANT: before using ClusterPubSub, read about the known limitations
2522 with pubsub in Cluster mode and learn how to workaround them:
2523 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
2524 """
2526 def __init__(
2527 self,
2528 redis_cluster,
2529 node=None,
2530 host=None,
2531 port=None,
2532 push_handler_func=None,
2533 event_dispatcher: Optional["EventDispatcher"] = None,
2534 **kwargs,
2535 ):
2536 """
2537 When a pubsub instance is created without specifying a node, a single
2538 node will be transparently chosen for the pubsub connection on the
2539 first command execution. The node will be determined by:
2540 1. Hashing the channel name in the request to find its keyslot
2541 2. Selecting a node that handles the keyslot: If read_from_replicas is
2542 set to true or load_balancing_strategy is set, a replica can be selected.
2544 :type redis_cluster: RedisCluster
2545 :type node: ClusterNode
2546 :type host: str
2547 :type port: int
2548 """
2549 self.node = None
2550 self.set_pubsub_node(redis_cluster, node, host, port)
2551 connection_pool = (
2552 None
2553 if self.node is None
2554 else redis_cluster.get_redis_connection(self.node).connection_pool
2555 )
2556 self.cluster = redis_cluster
2557 self.node_pubsub_mapping = {}
2558 self._pubsubs_generator = self._pubsubs_generator()
2559 if event_dispatcher is None:
2560 self._event_dispatcher = EventDispatcher()
2561 else:
2562 self._event_dispatcher = event_dispatcher
2563 super().__init__(
2564 connection_pool=connection_pool,
2565 encoder=redis_cluster.encoder,
2566 push_handler_func=push_handler_func,
2567 event_dispatcher=self._event_dispatcher,
2568 **kwargs,
2569 )
2571 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
2572 """
2573 The pubsub node will be set according to the passed node, host and port
2574 When none of the node, host, or port are specified - the node is set
2575 to None and will be determined by the keyslot of the channel in the
2576 first command to be executed.
2577 RedisClusterException will be thrown if the passed node does not exist
2578 in the cluster.
2579 If host is passed without port, or vice versa, a DataError will be
2580 thrown.
2581 :type cluster: RedisCluster
2582 :type node: ClusterNode
2583 :type host: str
2584 :type port: int
2585 """
2586 if node is not None:
2587 # node is passed by the user
2588 self._raise_on_invalid_node(cluster, node, node.host, node.port)
2589 pubsub_node = node
2590 elif host is not None and port is not None:
2591 # host and port passed by the user
2592 node = cluster.get_node(host=host, port=port)
2593 self._raise_on_invalid_node(cluster, node, host, port)
2594 pubsub_node = node
2595 elif any([host, port]) is True:
2596 # only 'host' or 'port' passed
2597 raise DataError("Passing a host requires passing a port, and vice versa")
2598 else:
2599 # nothing passed by the user. set node to None
2600 pubsub_node = None
2602 self.node = pubsub_node
2604 def get_pubsub_node(self):
2605 """
2606 Get the node that is being used as the pubsub connection
2607 """
2608 return self.node
2610 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
2611 """
2612 Raise a RedisClusterException if the node is None or doesn't exist in
2613 the cluster.
2614 """
2615 if node is None or redis_cluster.get_node(node_name=node.name) is None:
2616 raise RedisClusterException(
2617 f"Node {host}:{port} doesn't exist in the cluster"
2618 )
2620 def execute_command(self, *args):
2621 """
2622 Execute a subscribe/unsubscribe command.
2624 Taken code from redis-py and tweak to make it work within a cluster.
2625 """
2626 # NOTE: don't parse the response in this function -- it could pull a
2627 # legitimate message off the stack if the connection is already
2628 # subscribed to one or more channels
2630 if self.connection is None:
2631 if self.connection_pool is None:
2632 if len(args) > 1:
2633 # Hash the first channel and get one of the nodes holding
2634 # this slot
2635 channel = args[1]
2636 slot = self.cluster.keyslot(channel)
2637 node = self.cluster.nodes_manager.get_node_from_slot(
2638 slot,
2639 self.cluster.read_from_replicas,
2640 self.cluster.load_balancing_strategy,
2641 )
2642 else:
2643 # Get a random node
2644 node = self.cluster.get_random_node()
2645 self.node = node
2646 redis_connection = self.cluster.get_redis_connection(node)
2647 self.connection_pool = redis_connection.connection_pool
2648 self.connection = self.connection_pool.get_connection()
2649 # register a callback that re-subscribes to any channels we
2650 # were listening to when we were disconnected
2651 self.connection.register_connect_callback(self.on_connect)
2652 if self.push_handler_func is not None:
2653 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
2654 self._event_dispatcher.dispatch(
2655 AfterPubSubConnectionInstantiationEvent(
2656 self.connection, self.connection_pool, ClientType.SYNC, self._lock
2657 )
2658 )
2659 connection = self.connection
2660 self._execute(connection, connection.send_command, *args)
2662 def _get_node_pubsub(self, node):
2663 try:
2664 return self.node_pubsub_mapping[node.name]
2665 except KeyError:
2666 pubsub = node.redis_connection.pubsub(
2667 push_handler_func=self.push_handler_func
2668 )
2669 self.node_pubsub_mapping[node.name] = pubsub
2670 return pubsub
2672 def _sharded_message_generator(self):
2673 for _ in range(len(self.node_pubsub_mapping)):
2674 pubsub = next(self._pubsubs_generator)
2675 message = pubsub.get_message()
2676 if message is not None:
2677 return message
2678 return None
2680 def _pubsubs_generator(self):
2681 while True:
2682 current_nodes = list(self.node_pubsub_mapping.values())
2683 yield from current_nodes
2685 def get_sharded_message(
2686 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
2687 ):
2688 if target_node:
2689 message = self.node_pubsub_mapping[target_node.name].get_message(
2690 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
2691 )
2692 else:
2693 message = self._sharded_message_generator()
2694 if message is None:
2695 return None
2696 elif str_if_bytes(message["type"]) == "sunsubscribe":
2697 if message["channel"] in self.pending_unsubscribe_shard_channels:
2698 self.pending_unsubscribe_shard_channels.remove(message["channel"])
2699 self.shard_channels.pop(message["channel"], None)
2700 node = self.cluster.get_node_from_key(message["channel"])
2701 if self.node_pubsub_mapping[node.name].subscribed is False:
2702 self.node_pubsub_mapping.pop(node.name)
2703 if not self.channels and not self.patterns and not self.shard_channels:
2704 # There are no subscriptions anymore, set subscribed_event flag
2705 # to false
2706 self.subscribed_event.clear()
2707 if self.ignore_subscribe_messages or ignore_subscribe_messages:
2708 return None
2709 return message
2711 def ssubscribe(self, *args, **kwargs):
2712 if args:
2713 args = list_or_args(args[0], args[1:])
2714 s_channels = dict.fromkeys(args)
2715 s_channels.update(kwargs)
2716 for s_channel, handler in s_channels.items():
2717 node = self.cluster.get_node_from_key(s_channel)
2718 pubsub = self._get_node_pubsub(node)
2719 if handler:
2720 pubsub.ssubscribe(**{s_channel: handler})
2721 else:
2722 pubsub.ssubscribe(s_channel)
2723 self.shard_channels.update(pubsub.shard_channels)
2724 self.pending_unsubscribe_shard_channels.difference_update(
2725 self._normalize_keys({s_channel: None})
2726 )
2727 if pubsub.subscribed and not self.subscribed:
2728 self.subscribed_event.set()
2729 self.health_check_response_counter = 0
2731 def sunsubscribe(self, *args):
2732 if args:
2733 args = list_or_args(args[0], args[1:])
2734 else:
2735 args = self.shard_channels
2737 for s_channel in args:
2738 node = self.cluster.get_node_from_key(s_channel)
2739 p = self._get_node_pubsub(node)
2740 p.sunsubscribe(s_channel)
2741 self.pending_unsubscribe_shard_channels.update(
2742 p.pending_unsubscribe_shard_channels
2743 )
2745 def get_redis_connection(self):
2746 """
2747 Get the Redis connection of the pubsub connected node.
2748 """
2749 if self.node is not None:
2750 return self.node.redis_connection
2752 def disconnect(self):
2753 """
2754 Disconnect the pubsub connection.
2755 """
2756 if self.connection:
2757 self.connection.disconnect()
2758 for pubsub in self.node_pubsub_mapping.values():
2759 pubsub.connection.disconnect()
2762class ClusterPipeline(RedisCluster):
2763 """
2764 Support for Redis pipeline
2765 in cluster mode
2766 """
2768 ERRORS_ALLOW_RETRY = (
2769 ConnectionError,
2770 TimeoutError,
2771 MovedError,
2772 AskError,
2773 TryAgainError,
2774 )
2776 NO_SLOTS_COMMANDS = {"UNWATCH"}
2777 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
2778 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
2780 @deprecated_args(
2781 args_to_warn=[
2782 "cluster_error_retry_attempts",
2783 ],
2784 reason="Please configure the 'retry' object instead",
2785 version="6.0.0",
2786 )
2787 def __init__(
2788 self,
2789 nodes_manager: "NodesManager",
2790 commands_parser: "CommandsParser",
2791 result_callbacks: Optional[Dict[str, Callable]] = None,
2792 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
2793 startup_nodes: Optional[List["ClusterNode"]] = None,
2794 read_from_replicas: bool = False,
2795 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2796 cluster_error_retry_attempts: int = 3,
2797 reinitialize_steps: int = 5,
2798 retry: Optional[Retry] = None,
2799 lock=None,
2800 transaction=False,
2801 policy_resolver: PolicyResolver = StaticPolicyResolver(),
2802 event_dispatcher: Optional["EventDispatcher"] = None,
2803 **kwargs,
2804 ):
2805 """ """
2806 self.command_stack = []
2807 self.nodes_manager = nodes_manager
2808 self.commands_parser = commands_parser
2809 self.refresh_table_asap = False
2810 self.result_callbacks = (
2811 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
2812 )
2813 self.startup_nodes = startup_nodes if startup_nodes else []
2814 self.read_from_replicas = read_from_replicas
2815 self.load_balancing_strategy = load_balancing_strategy
2816 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
2817 self.cluster_response_callbacks = cluster_response_callbacks
2818 self.reinitialize_counter = 0
2819 self.reinitialize_steps = reinitialize_steps
2820 if retry is not None:
2821 self.retry = retry
2822 else:
2823 self.retry = Retry(
2824 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
2825 retries=cluster_error_retry_attempts,
2826 )
2828 self.encoder = Encoder(
2829 kwargs.get("encoding", "utf-8"),
2830 kwargs.get("encoding_errors", "strict"),
2831 kwargs.get("decode_responses", False),
2832 )
2833 if lock is None:
2834 lock = threading.RLock()
2835 self._lock = lock
2836 self.parent_execute_command = super().execute_command
2837 self._execution_strategy: ExecutionStrategy = (
2838 PipelineStrategy(self) if not transaction else TransactionStrategy(self)
2839 )
2841 # For backward compatibility, mapping from existing policies to new one
2842 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
2843 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
2844 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
2845 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
2846 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
2847 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
2848 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
2849 }
2851 self._policies_callback_mapping: dict[
2852 Union[RequestPolicy, ResponsePolicy], Callable
2853 ] = {
2854 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
2855 self.get_random_primary_or_all_nodes(command_name)
2856 ],
2857 RequestPolicy.DEFAULT_KEYED: lambda command,
2858 *args: self.get_nodes_from_slot(command, *args),
2859 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
2860 RequestPolicy.ALL_SHARDS: self.get_primaries,
2861 RequestPolicy.ALL_NODES: self.get_nodes,
2862 RequestPolicy.ALL_REPLICAS: self.get_replicas,
2863 RequestPolicy.MULTI_SHARD: lambda *args,
2864 **kwargs: self._split_multi_shard_command(*args, **kwargs),
2865 RequestPolicy.SPECIAL: self.get_special_nodes,
2866 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
2867 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
2868 }
2870 self._policy_resolver = policy_resolver
2872 if event_dispatcher is None:
2873 self._event_dispatcher = EventDispatcher()
2874 else:
2875 self._event_dispatcher = event_dispatcher
2877 def __repr__(self):
2878 """ """
2879 return f"{type(self).__name__}"
2881 def __enter__(self):
2882 """ """
2883 return self
2885 def __exit__(self, exc_type, exc_value, traceback):
2886 """ """
2887 self.reset()
2889 def __del__(self):
2890 try:
2891 self.reset()
2892 except Exception:
2893 pass
2895 def __len__(self):
2896 """ """
2897 return len(self._execution_strategy.command_queue)
2899 def __bool__(self):
2900 "Pipeline instances should always evaluate to True on Python 3+"
2901 return True
2903 def execute_command(self, *args, **kwargs):
2904 """
2905 Wrapper function for pipeline_execute_command
2906 """
2907 return self._execution_strategy.execute_command(*args, **kwargs)
2909 def pipeline_execute_command(self, *args, **options):
2910 """
2911 Stage a command to be executed when execute() is next called
2913 Returns the current Pipeline object back so commands can be
2914 chained together, such as:
2916 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
2918 At some other point, you can then run: pipe.execute(),
2919 which will execute all commands queued in the pipe.
2920 """
2921 return self._execution_strategy.execute_command(*args, **options)
2923 def annotate_exception(self, exception, number, command):
2924 """
2925 Provides extra context to the exception prior to it being handled
2926 """
2927 self._execution_strategy.annotate_exception(exception, number, command)
2929 def execute(self, raise_on_error: bool = True) -> List[Any]:
2930 """
2931 Execute all the commands in the current pipeline
2932 """
2934 try:
2935 return self._execution_strategy.execute(raise_on_error)
2936 finally:
2937 self.reset()
2939 def reset(self):
2940 """
2941 Reset back to empty pipeline.
2942 """
2943 self._execution_strategy.reset()
2945 def send_cluster_commands(
2946 self, stack, raise_on_error=True, allow_redirections=True
2947 ):
2948 return self._execution_strategy.send_cluster_commands(
2949 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
2950 )
2952 def exists(self, *keys):
2953 return self._execution_strategy.exists(*keys)
2955 def eval(self):
2956 """ """
2957 return self._execution_strategy.eval()
2959 def multi(self):
2960 """
2961 Start a transactional block of the pipeline after WATCH commands
2962 are issued. End the transactional block with `execute`.
2963 """
2964 self._execution_strategy.multi()
2966 def load_scripts(self):
2967 """ """
2968 self._execution_strategy.load_scripts()
2970 def discard(self):
2971 """ """
2972 self._execution_strategy.discard()
2974 def watch(self, *names):
2975 """Watches the values at keys ``names``"""
2976 self._execution_strategy.watch(*names)
2978 def unwatch(self):
2979 """Unwatches all previously specified keys"""
2980 self._execution_strategy.unwatch()
2982 def script_load_for_pipeline(self, *args, **kwargs):
2983 self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
2985 def delete(self, *names):
2986 self._execution_strategy.delete(*names)
2988 def unlink(self, *names):
2989 self._execution_strategy.unlink(*names)
2992def block_pipeline_command(name: str) -> Callable[..., Any]:
2993 """
2994 Prints error because some pipelined commands should
2995 be blocked when running in cluster-mode
2996 """
2998 def inner(*args, **kwargs):
2999 raise RedisClusterException(
3000 f"ERROR: Calling pipelined function {name} is blocked "
3001 f"when running redis in cluster mode..."
3002 )
3004 return inner
3007# Blocked pipeline commands
3008PIPELINE_BLOCKED_COMMANDS = (
3009 "BGREWRITEAOF",
3010 "BGSAVE",
3011 "BITOP",
3012 "BRPOPLPUSH",
3013 "CLIENT GETNAME",
3014 "CLIENT KILL",
3015 "CLIENT LIST",
3016 "CLIENT SETNAME",
3017 "CLIENT",
3018 "CONFIG GET",
3019 "CONFIG RESETSTAT",
3020 "CONFIG REWRITE",
3021 "CONFIG SET",
3022 "CONFIG",
3023 "DBSIZE",
3024 "ECHO",
3025 "EVALSHA",
3026 "FLUSHALL",
3027 "FLUSHDB",
3028 "INFO",
3029 "KEYS",
3030 "LASTSAVE",
3031 "MGET",
3032 "MGET NONATOMIC",
3033 "MOVE",
3034 "MSET",
3035 "MSETEX",
3036 "MSET NONATOMIC",
3037 "MSETNX",
3038 "PFCOUNT",
3039 "PFMERGE",
3040 "PING",
3041 "PUBLISH",
3042 "RANDOMKEY",
3043 "READONLY",
3044 "READWRITE",
3045 "RENAME",
3046 "RENAMENX",
3047 "RPOPLPUSH",
3048 "SAVE",
3049 "SCAN",
3050 "SCRIPT EXISTS",
3051 "SCRIPT FLUSH",
3052 "SCRIPT KILL",
3053 "SCRIPT LOAD",
3054 "SCRIPT",
3055 "SDIFF",
3056 "SDIFFSTORE",
3057 "SENTINEL GET MASTER ADDR BY NAME",
3058 "SENTINEL MASTER",
3059 "SENTINEL MASTERS",
3060 "SENTINEL MONITOR",
3061 "SENTINEL REMOVE",
3062 "SENTINEL SENTINELS",
3063 "SENTINEL SET",
3064 "SENTINEL SLAVES",
3065 "SENTINEL",
3066 "SHUTDOWN",
3067 "SINTER",
3068 "SINTERSTORE",
3069 "SLAVEOF",
3070 "SLOWLOG GET",
3071 "SLOWLOG LEN",
3072 "SLOWLOG RESET",
3073 "SLOWLOG",
3074 "SMOVE",
3075 "SORT",
3076 "SUNION",
3077 "SUNIONSTORE",
3078 "TIME",
3079)
3080for command in PIPELINE_BLOCKED_COMMANDS:
3081 command = command.replace(" ", "_").lower()
3083 setattr(ClusterPipeline, command, block_pipeline_command(command))
3086class PipelineCommand:
3087 """ """
3089 def __init__(self, args, options=None, position=None):
3090 self.args = args
3091 if options is None:
3092 options = {}
3093 self.options = options
3094 self.position = position
3095 self.result = None
3096 self.node = None
3097 self.asking = False
3098 self.command_policies: Optional[CommandPolicies] = None
3101class NodeCommands:
3102 """ """
3104 def __init__(
3105 self, parse_response, connection_pool: ConnectionPool, connection: Connection
3106 ):
3107 """ """
3108 self.parse_response = parse_response
3109 self.connection_pool = connection_pool
3110 self.connection = connection
3111 self.commands = []
3113 def append(self, c):
3114 """ """
3115 self.commands.append(c)
3117 def write(self):
3118 """
3119 Code borrowed from Redis so it can be fixed
3120 """
3121 connection = self.connection
3122 commands = self.commands
3124 # We are going to clobber the commands with the write, so go ahead
3125 # and ensure that nothing is sitting there from a previous run.
3126 for c in commands:
3127 c.result = None
3129 # build up all commands into a single request to increase network perf
3130 # send all the commands and catch connection and timeout errors.
3131 try:
3132 connection.send_packed_command(
3133 connection.pack_commands([c.args for c in commands])
3134 )
3135 except (ConnectionError, TimeoutError) as e:
3136 for c in commands:
3137 c.result = e
3139 def read(self):
3140 """ """
3141 connection = self.connection
3142 for c in self.commands:
3143 # if there is a result on this command,
3144 # it means we ran into an exception
3145 # like a connection error. Trying to parse
3146 # a response on a connection that
3147 # is no longer open will result in a
3148 # connection error raised by redis-py.
3149 # but redis-py doesn't check in parse_response
3150 # that the sock object is
3151 # still set and if you try to
3152 # read from a closed connection, it will
3153 # result in an AttributeError because
3154 # it will do a readline() call on None.
3155 # This can have all kinds of nasty side-effects.
3156 # Treating this case as a connection error
3157 # is fine because it will dump
3158 # the connection object back into the
3159 # pool and on the next write, it will
3160 # explicitly open the connection and all will be well.
3161 if c.result is None:
3162 try:
3163 c.result = self.parse_response(connection, c.args[0], **c.options)
3164 except (ConnectionError, TimeoutError) as e:
3165 for c in self.commands:
3166 c.result = e
3167 return
3168 except RedisError:
3169 c.result = sys.exc_info()[1]
3172class ExecutionStrategy(ABC):
3173 @property
3174 @abstractmethod
3175 def command_queue(self):
3176 pass
3178 @abstractmethod
3179 def execute_command(self, *args, **kwargs):
3180 """
3181 Execution flow for current execution strategy.
3183 See: ClusterPipeline.execute_command()
3184 """
3185 pass
3187 @abstractmethod
3188 def annotate_exception(self, exception, number, command):
3189 """
3190 Annotate exception according to current execution strategy.
3192 See: ClusterPipeline.annotate_exception()
3193 """
3194 pass
3196 @abstractmethod
3197 def pipeline_execute_command(self, *args, **options):
3198 """
3199 Pipeline execution flow for current execution strategy.
3201 See: ClusterPipeline.pipeline_execute_command()
3202 """
3203 pass
3205 @abstractmethod
3206 def execute(self, raise_on_error: bool = True) -> List[Any]:
3207 """
3208 Executes current execution strategy.
3210 See: ClusterPipeline.execute()
3211 """
3212 pass
3214 @abstractmethod
3215 def send_cluster_commands(
3216 self, stack, raise_on_error=True, allow_redirections=True
3217 ):
3218 """
3219 Sends commands according to current execution strategy.
3221 See: ClusterPipeline.send_cluster_commands()
3222 """
3223 pass
3225 @abstractmethod
3226 def reset(self):
3227 """
3228 Resets current execution strategy.
3230 See: ClusterPipeline.reset()
3231 """
3232 pass
3234 @abstractmethod
3235 def exists(self, *keys):
3236 pass
3238 @abstractmethod
3239 def eval(self):
3240 pass
3242 @abstractmethod
3243 def multi(self):
3244 """
3245 Starts transactional context.
3247 See: ClusterPipeline.multi()
3248 """
3249 pass
3251 @abstractmethod
3252 def load_scripts(self):
3253 pass
3255 @abstractmethod
3256 def watch(self, *names):
3257 pass
3259 @abstractmethod
3260 def unwatch(self):
3261 """
3262 Unwatches all previously specified keys
3264 See: ClusterPipeline.unwatch()
3265 """
3266 pass
3268 @abstractmethod
3269 def script_load_for_pipeline(self, *args, **kwargs):
3270 pass
3272 @abstractmethod
3273 def delete(self, *names):
3274 """
3275 "Delete a key specified by ``names``"
3277 See: ClusterPipeline.delete()
3278 """
3279 pass
3281 @abstractmethod
3282 def unlink(self, *names):
3283 """
3284 "Unlink a key specified by ``names``"
3286 See: ClusterPipeline.unlink()
3287 """
3288 pass
3290 @abstractmethod
3291 def discard(self):
3292 pass
3295class AbstractStrategy(ExecutionStrategy):
3296 def __init__(
3297 self,
3298 pipe: ClusterPipeline,
3299 ):
3300 self._command_queue: List[PipelineCommand] = []
3301 self._pipe = pipe
3302 self._nodes_manager = self._pipe.nodes_manager
3304 @property
3305 def command_queue(self):
3306 return self._command_queue
3308 @command_queue.setter
3309 def command_queue(self, queue: List[PipelineCommand]):
3310 self._command_queue = queue
3312 @abstractmethod
3313 def execute_command(self, *args, **kwargs):
3314 pass
3316 def pipeline_execute_command(self, *args, **options):
3317 self._command_queue.append(
3318 PipelineCommand(args, options, len(self._command_queue))
3319 )
3320 return self._pipe
3322 @abstractmethod
3323 def execute(self, raise_on_error: bool = True) -> List[Any]:
3324 pass
3326 @abstractmethod
3327 def send_cluster_commands(
3328 self, stack, raise_on_error=True, allow_redirections=True
3329 ):
3330 pass
3332 @abstractmethod
3333 def reset(self):
3334 pass
3336 def exists(self, *keys):
3337 return self.execute_command("EXISTS", *keys)
3339 def eval(self):
3340 """ """
3341 raise RedisClusterException("method eval() is not implemented")
3343 def load_scripts(self):
3344 """ """
3345 raise RedisClusterException("method load_scripts() is not implemented")
3347 def script_load_for_pipeline(self, *args, **kwargs):
3348 """ """
3349 raise RedisClusterException(
3350 "method script_load_for_pipeline() is not implemented"
3351 )
3353 def annotate_exception(self, exception, number, command):
3354 """
3355 Provides extra context to the exception prior to it being handled
3356 """
3357 cmd = " ".join(map(safe_str, command))
3358 msg = (
3359 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
3360 f"caused error: {exception.args[0]}"
3361 )
3362 exception.args = (msg,) + exception.args[1:]
3365class PipelineStrategy(AbstractStrategy):
3366 def __init__(self, pipe: ClusterPipeline):
3367 super().__init__(pipe)
3368 self.command_flags = pipe.command_flags
3370 def execute_command(self, *args, **kwargs):
3371 return self.pipeline_execute_command(*args, **kwargs)
3373 def _raise_first_error(self, stack, start_time):
3374 """
3375 Raise the first exception on the stack
3376 """
3377 for c in stack:
3378 r = c.result
3379 if isinstance(r, Exception):
3380 self.annotate_exception(r, c.position + 1, c.args)
3382 record_operation_duration(
3383 command_name="PIPELINE",
3384 duration_seconds=time.monotonic() - start_time,
3385 error=r,
3386 )
3388 raise r
3390 def execute(self, raise_on_error: bool = True) -> List[Any]:
3391 stack = self._command_queue
3392 if not stack:
3393 return []
3395 try:
3396 return self.send_cluster_commands(stack, raise_on_error)
3397 finally:
3398 self.reset()
3400 def reset(self):
3401 """
3402 Reset back to empty pipeline.
3403 """
3404 self._command_queue = []
3406 def send_cluster_commands(
3407 self, stack, raise_on_error=True, allow_redirections=True
3408 ):
3409 """
3410 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
3412 If one of the retryable exceptions has been thrown we assume that:
3413 - connection_pool was disconnected
3414 - connection_pool was reset
3415 - refresh_table_asap set to True
3417 It will try the number of times specified by
3418 the retries in config option "self.retry"
3419 which defaults to 3 unless manually configured.
3421 If it reaches the number of times, the command will
3422 raises ClusterDownException.
3423 """
3424 if not stack:
3425 return []
3426 retry_attempts = self._pipe.retry.get_retries()
3427 while True:
3428 try:
3429 return self._send_cluster_commands(
3430 stack,
3431 raise_on_error=raise_on_error,
3432 allow_redirections=allow_redirections,
3433 )
3434 except RedisCluster.ERRORS_ALLOW_RETRY as e:
3435 if retry_attempts > 0:
3436 # Try again with the new cluster setup. All other errors
3437 # should be raised.
3438 retry_attempts -= 1
3439 pass
3440 else:
3441 raise e
3443 def _send_cluster_commands(
3444 self, stack, raise_on_error=True, allow_redirections=True
3445 ):
3446 """
3447 Send a bunch of cluster commands to the redis cluster.
3449 `allow_redirections` If the pipeline should follow
3450 `ASK` & `MOVED` responses automatically. If set
3451 to false it will raise RedisClusterException.
3452 """
3453 # the first time sending the commands we send all of
3454 # the commands that were queued up.
3455 # if we have to run through it again, we only retry
3456 # the commands that failed.
3457 attempt = sorted(stack, key=lambda x: x.position)
3458 is_default_node = False
3459 # build a list of node objects based on node names we need to
3460 nodes: dict[str, NodeCommands] = {}
3461 nodes_written = 0
3462 nodes_read = 0
3464 try:
3465 # as we move through each command that still needs to be processed,
3466 # we figure out the slot number that command maps to, then from
3467 # the slot determine the node.
3468 for c in attempt:
3469 command_policies = self._pipe._policy_resolver.resolve(
3470 c.args[0].lower()
3471 )
3472 # refer to our internal node -> slot table that
3473 # tells us where a given command should route to.
3474 # (it might be possible we have a cached node that no longer
3475 # exists in the cluster, which is why we do this in a loop)
3476 passed_targets = c.options.pop("target_nodes", None)
3477 if passed_targets and not self._is_nodes_flag(passed_targets):
3478 target_nodes = self._parse_target_nodes(passed_targets)
3480 if not command_policies:
3481 command_policies = CommandPolicies()
3482 else:
3483 if not command_policies:
3484 command = c.args[0].upper()
3485 if (
3486 len(c.args) >= 2
3487 and f"{c.args[0]} {c.args[1]}".upper()
3488 in self._pipe.command_flags
3489 ):
3490 command = f"{c.args[0]} {c.args[1]}".upper()
3492 # We only could resolve key properties if command is not
3493 # in a list of pre-defined request policies
3494 command_flag = self.command_flags.get(command)
3495 if not command_flag:
3496 # Fallback to default policy
3497 if not self._pipe.get_default_node():
3498 keys = None
3499 else:
3500 keys = self._pipe._get_command_keys(*c.args)
3501 if not keys or len(keys) == 0:
3502 command_policies = CommandPolicies()
3503 else:
3504 command_policies = CommandPolicies(
3505 request_policy=RequestPolicy.DEFAULT_KEYED,
3506 response_policy=ResponsePolicy.DEFAULT_KEYED,
3507 )
3508 else:
3509 if command_flag in self._pipe._command_flags_mapping:
3510 command_policies = CommandPolicies(
3511 request_policy=self._pipe._command_flags_mapping[
3512 command_flag
3513 ]
3514 )
3515 else:
3516 command_policies = CommandPolicies()
3518 target_nodes = self._determine_nodes(
3519 *c.args,
3520 request_policy=command_policies.request_policy,
3521 node_flag=passed_targets,
3522 )
3523 if not target_nodes:
3524 raise RedisClusterException(
3525 f"No targets were found to execute {c.args} command on"
3526 )
3527 c.command_policies = command_policies
3528 if len(target_nodes) > 1:
3529 raise RedisClusterException(
3530 f"Too many targets for command {c.args}"
3531 )
3533 node = target_nodes[0]
3534 if node == self._pipe.get_default_node():
3535 is_default_node = True
3537 # now that we know the name of the node
3538 # ( it's just a string in the form of host:port )
3539 # we can build a list of commands for each node.
3540 node_name = node.name
3541 if node_name not in nodes:
3542 redis_node = self._pipe.get_redis_connection(node)
3543 try:
3544 connection = get_connection(redis_node)
3545 except (ConnectionError, TimeoutError):
3546 # Release any connections we've already acquired before clearing nodes
3547 for n in nodes.values():
3548 n.connection_pool.release(n.connection)
3549 # Connection retries are being handled in the node's
3550 # Retry object. Reinitialize the node -> slot table.
3551 self._nodes_manager.initialize()
3552 if is_default_node:
3553 self._pipe.replace_default_node()
3554 nodes = {}
3555 raise
3556 nodes[node_name] = NodeCommands(
3557 redis_node.parse_response,
3558 redis_node.connection_pool,
3559 connection,
3560 )
3561 nodes[node_name].append(c)
3563 # send the commands in sequence.
3564 # we write to all the open sockets for each node first,
3565 # before reading anything
3566 # this allows us to flush all the requests out across the
3567 # network
3568 # so that we can read them from different sockets as they come back.
3569 # we don't multiplex on the sockets as they come available,
3570 # but that shouldn't make too much difference.
3572 # Start timing for observability
3573 start_time = time.monotonic()
3575 node_commands = nodes.values()
3576 for n in node_commands:
3577 nodes_written += 1
3578 n.write()
3580 for n in node_commands:
3581 n.read()
3583 # Find the first error in this node's commands, if any
3584 node_error = None
3585 for cmd in n.commands:
3586 if isinstance(cmd.result, Exception):
3587 node_error = cmd.result
3588 break
3590 record_operation_duration(
3591 command_name="PIPELINE",
3592 duration_seconds=time.monotonic() - start_time,
3593 server_address=n.connection.host,
3594 server_port=n.connection.port,
3595 db_namespace=str(n.connection.db),
3596 error=node_error,
3597 )
3598 nodes_read += 1
3599 finally:
3600 # release all the redis connections we allocated earlier
3601 # back into the connection pool.
3602 # if the connection is dirty (that is: we've written
3603 # commands to it, but haven't read the responses), we need
3604 # to close the connection before returning it to the pool.
3605 # otherwise, the next caller to use this connection will
3606 # read the response from _this_ request, not its own request.
3607 # disconnecting discards the dirty state & forces the next
3608 # caller to reconnect.
3609 # NOTE: dicts have a consistent ordering; we're iterating
3610 # through nodes.values() in the same order as we are when
3611 # reading / writing to the connections above, which is critical
3612 # for how we're using the nodes_written/nodes_read offsets.
3613 for i, n in enumerate(nodes.values()):
3614 if i < nodes_written and i >= nodes_read:
3615 n.connection.disconnect()
3616 n.connection_pool.release(n.connection)
3618 # if the response isn't an exception it is a
3619 # valid response from the node
3620 # we're all done with that command, YAY!
3621 # if we have more commands to attempt, we've run into problems.
3622 # collect all the commands we are allowed to retry.
3623 # (MOVED, ASK, or connection errors or timeout errors)
3624 attempt = sorted(
3625 (
3626 c
3627 for c in attempt
3628 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
3629 ),
3630 key=lambda x: x.position,
3631 )
3632 if attempt and allow_redirections:
3633 # RETRY MAGIC HAPPENS HERE!
3634 # send these remaining commands one at a time using `execute_command`
3635 # in the main client. This keeps our retry logic
3636 # in one place mostly,
3637 # and allows us to be more confident in correctness of behavior.
3638 # at this point any speed gains from pipelining have been lost
3639 # anyway, so we might as well make the best
3640 # attempt to get the correct behavior.
3641 #
3642 # The client command will handle retries for each
3643 # individual command sequentially as we pass each
3644 # one into `execute_command`. Any exceptions
3645 # that bubble out should only appear once all
3646 # retries have been exhausted.
3647 #
3648 # If a lot of commands have failed, we'll be setting the
3649 # flag to rebuild the slots table from scratch.
3650 # So MOVED errors should correct themselves fairly quickly.
3651 self._pipe.reinitialize_counter += 1
3652 if self._pipe._should_reinitialized():
3653 self._nodes_manager.initialize()
3654 if is_default_node:
3655 self._pipe.replace_default_node()
3656 for c in attempt:
3657 try:
3658 # send each command individually like we
3659 # do in the main client.
3660 c.result = self._pipe.parent_execute_command(*c.args, **c.options)
3661 except RedisError as e:
3662 c.result = e
3664 # turn the response back into a simple flat array that corresponds
3665 # to the sequence of commands issued in the stack in pipeline.execute()
3666 response = []
3667 for c in sorted(stack, key=lambda x: x.position):
3668 if c.args[0] in self._pipe.cluster_response_callbacks:
3669 # Remove keys entry, it needs only for cache.
3670 c.options.pop("keys", None)
3671 c.result = self._pipe._policies_callback_mapping[
3672 c.command_policies.response_policy
3673 ](
3674 self._pipe.cluster_response_callbacks[c.args[0]](
3675 c.result, **c.options
3676 )
3677 )
3678 response.append(c.result)
3680 if raise_on_error:
3681 self._raise_first_error(stack, start_time)
3683 return response
3685 def _is_nodes_flag(self, target_nodes):
3686 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
3688 def _parse_target_nodes(self, target_nodes):
3689 if isinstance(target_nodes, list):
3690 nodes = target_nodes
3691 elif isinstance(target_nodes, ClusterNode):
3692 # Supports passing a single ClusterNode as a variable
3693 nodes = [target_nodes]
3694 elif isinstance(target_nodes, dict):
3695 # Supports dictionaries of the format {node_name: node}.
3696 # It enables to execute commands with multi nodes as follows:
3697 # rc.cluster_save_config(rc.get_primaries())
3698 nodes = target_nodes.values()
3699 else:
3700 raise TypeError(
3701 "target_nodes type can be one of the following: "
3702 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
3703 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
3704 f"The passed type is {type(target_nodes)}"
3705 )
3706 return nodes
3708 def _determine_nodes(
3709 self, *args, request_policy: RequestPolicy, **kwargs
3710 ) -> List["ClusterNode"]:
3711 # Determine which nodes should be executed the command on.
3712 # Returns a list of target nodes.
3713 command = args[0].upper()
3714 if (
3715 len(args) >= 2
3716 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
3717 ):
3718 command = f"{args[0]} {args[1]}".upper()
3720 nodes_flag = kwargs.pop("nodes_flag", None)
3721 if nodes_flag is not None:
3722 # nodes flag passed by the user
3723 command_flag = nodes_flag
3724 else:
3725 # get the nodes group for this command if it was predefined
3726 command_flag = self._pipe.command_flags.get(command)
3728 if command_flag in self._pipe._command_flags_mapping:
3729 request_policy = self._pipe._command_flags_mapping[command_flag]
3731 policy_callback = self._pipe._policies_callback_mapping[request_policy]
3733 if request_policy == RequestPolicy.DEFAULT_KEYED:
3734 nodes = policy_callback(command, *args)
3735 elif request_policy == RequestPolicy.MULTI_SHARD:
3736 nodes = policy_callback(*args, **kwargs)
3737 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
3738 nodes = policy_callback(args[0])
3739 else:
3740 nodes = policy_callback()
3742 if args[0].lower() == "ft.aggregate":
3743 self._aggregate_nodes = nodes
3745 return nodes
3747 def multi(self):
3748 raise RedisClusterException(
3749 "method multi() is not supported outside of transactional context"
3750 )
3752 def discard(self):
3753 raise RedisClusterException(
3754 "method discard() is not supported outside of transactional context"
3755 )
3757 def watch(self, *names):
3758 raise RedisClusterException(
3759 "method watch() is not supported outside of transactional context"
3760 )
3762 def unwatch(self, *names):
3763 raise RedisClusterException(
3764 "method unwatch() is not supported outside of transactional context"
3765 )
3767 def delete(self, *names):
3768 if len(names) != 1:
3769 raise RedisClusterException(
3770 "deleting multiple keys is not implemented in pipeline command"
3771 )
3773 return self.execute_command("DEL", names[0])
3775 def unlink(self, *names):
3776 if len(names) != 1:
3777 raise RedisClusterException(
3778 "unlinking multiple keys is not implemented in pipeline command"
3779 )
3781 return self.execute_command("UNLINK", names[0])
3784class TransactionStrategy(AbstractStrategy):
3785 NO_SLOTS_COMMANDS = {"UNWATCH"}
3786 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
3787 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3788 SLOT_REDIRECT_ERRORS = (AskError, MovedError)
3789 CONNECTION_ERRORS = (
3790 ConnectionError,
3791 OSError,
3792 ClusterDownError,
3793 SlotNotCoveredError,
3794 )
3796 def __init__(self, pipe: ClusterPipeline):
3797 super().__init__(pipe)
3798 self._explicit_transaction = False
3799 self._watching = False
3800 self._pipeline_slots: Set[int] = set()
3801 self._transaction_connection: Optional[Connection] = None
3802 self._executing = False
3803 self._retry = copy(self._pipe.retry)
3804 self._retry.update_supported_errors(
3805 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
3806 )
3808 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
3809 """
3810 Find a connection for a pipeline transaction.
3812 For running an atomic transaction, watch keys ensure that contents have not been
3813 altered as long as the watch commands for those keys were sent over the same
3814 connection. So once we start watching a key, we fetch a connection to the
3815 node that owns that slot and reuse it.
3816 """
3817 if not self._pipeline_slots:
3818 raise RedisClusterException(
3819 "At least a command with a key is needed to identify a node"
3820 )
3822 node: ClusterNode = self._nodes_manager.get_node_from_slot(
3823 list(self._pipeline_slots)[0], False
3824 )
3825 redis_node: Redis = self._pipe.get_redis_connection(node)
3826 if self._transaction_connection:
3827 if not redis_node.connection_pool.owns_connection(
3828 self._transaction_connection
3829 ):
3830 previous_node = self._nodes_manager.find_connection_owner(
3831 self._transaction_connection
3832 )
3833 previous_node.connection_pool.release(self._transaction_connection)
3834 self._transaction_connection = None
3836 if not self._transaction_connection:
3837 self._transaction_connection = get_connection(redis_node)
3839 return redis_node, self._transaction_connection
3841 def execute_command(self, *args, **kwargs):
3842 slot_number: Optional[int] = None
3843 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
3844 slot_number = self._pipe.determine_slot(*args)
3846 if (
3847 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
3848 ) and not self._explicit_transaction:
3849 if args[0] == "WATCH":
3850 self._validate_watch()
3852 if slot_number is not None:
3853 if self._pipeline_slots and slot_number not in self._pipeline_slots:
3854 raise CrossSlotTransactionError(
3855 "Cannot watch or send commands on different slots"
3856 )
3858 self._pipeline_slots.add(slot_number)
3859 elif args[0] not in self.NO_SLOTS_COMMANDS:
3860 raise RedisClusterException(
3861 f"Cannot identify slot number for command: {args[0]},"
3862 "it cannot be triggered in a transaction"
3863 )
3865 return self._immediate_execute_command(*args, **kwargs)
3866 else:
3867 if slot_number is not None:
3868 self._pipeline_slots.add(slot_number)
3870 return self.pipeline_execute_command(*args, **kwargs)
3872 def _validate_watch(self):
3873 if self._explicit_transaction:
3874 raise RedisError("Cannot issue a WATCH after a MULTI")
3876 self._watching = True
3878 def _immediate_execute_command(self, *args, **options):
3879 return self._retry.call_with_retry(
3880 lambda: self._get_connection_and_send_command(*args, **options),
3881 self._reinitialize_on_error,
3882 with_failure_count=True,
3883 )
3885 def _get_connection_and_send_command(self, *args, **options):
3886 redis_node, connection = self._get_client_and_connection_for_transaction()
3888 # Start timing for observability
3889 start_time = time.monotonic()
3891 try:
3892 response = self._send_command_parse_response(
3893 connection, redis_node, args[0], *args, **options
3894 )
3896 record_operation_duration(
3897 command_name=args[0],
3898 duration_seconds=time.monotonic() - start_time,
3899 server_address=connection.host,
3900 server_port=connection.port,
3901 db_namespace=str(connection.db),
3902 )
3904 return response
3905 except Exception as e:
3906 if connection:
3907 # this is used to report the metrics based on host and port info
3908 e.connection = connection
3909 record_operation_duration(
3910 command_name=args[0],
3911 duration_seconds=time.monotonic() - start_time,
3912 server_address=connection.host,
3913 server_port=connection.port,
3914 db_namespace=str(connection.db),
3915 error=e,
3916 )
3917 raise
3919 def _send_command_parse_response(
3920 self, conn, redis_node: Redis, command_name, *args, **options
3921 ):
3922 """
3923 Send a command and parse the response
3924 """
3926 conn.send_command(*args)
3927 output = redis_node.parse_response(conn, command_name, **options)
3929 if command_name in self.UNWATCH_COMMANDS:
3930 self._watching = False
3931 return output
3933 def _reinitialize_on_error(self, error, failure_count):
3934 if hasattr(error, "connection"):
3935 record_error_count(
3936 server_address=error.connection.host,
3937 server_port=error.connection.port,
3938 network_peer_address=error.connection.host,
3939 network_peer_port=error.connection.port,
3940 error_type=error,
3941 retry_attempts=failure_count,
3942 is_internal=True,
3943 )
3945 if self._watching:
3946 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
3947 raise WatchError("Slot rebalancing occurred while watching keys")
3949 if (
3950 type(error) in self.SLOT_REDIRECT_ERRORS
3951 or type(error) in self.CONNECTION_ERRORS
3952 ):
3953 if self._transaction_connection:
3954 if is_debug_log_enabled():
3955 logger.debug(
3956 f"Operation failed, "
3957 f"with connection: {self._transaction_connection}, "
3958 f"details: {self._transaction_connection.extract_connection_details()}",
3959 )
3960 # Disconnect and release back to pool
3961 self._transaction_connection.disconnect()
3962 node = self._nodes_manager.find_connection_owner(
3963 self._transaction_connection
3964 )
3965 if node and node.redis_connection:
3966 node.redis_connection.connection_pool.release(
3967 self._transaction_connection
3968 )
3969 self._transaction_connection = None
3971 self._pipe.reinitialize_counter += 1
3972 if self._pipe._should_reinitialized():
3973 self._nodes_manager.initialize()
3974 self.reinitialize_counter = 0
3975 else:
3976 if isinstance(error, AskError):
3977 self._nodes_manager.move_slot(error)
3979 self._executing = False
3981 def _raise_first_error(self, responses, stack, start_time):
3982 """
3983 Raise the first exception on the stack
3984 """
3985 for r, cmd in zip(responses, stack):
3986 if isinstance(r, Exception):
3987 self.annotate_exception(r, cmd.position + 1, cmd.args)
3989 record_operation_duration(
3990 command_name="TRANSACTION",
3991 duration_seconds=time.monotonic() - start_time,
3992 server_address=self._transaction_connection.host,
3993 server_port=self._transaction_connection.port,
3994 db_namespace=str(self._transaction_connection.db),
3995 )
3997 raise r
3999 def execute(self, raise_on_error: bool = True) -> List[Any]:
4000 stack = self._command_queue
4001 if not stack and (not self._watching or not self._pipeline_slots):
4002 return []
4004 return self._execute_transaction_with_retries(stack, raise_on_error)
4006 def _execute_transaction_with_retries(
4007 self, stack: List["PipelineCommand"], raise_on_error: bool
4008 ):
4009 return self._retry.call_with_retry(
4010 lambda: self._execute_transaction(stack, raise_on_error),
4011 lambda error, failure_count: self._reinitialize_on_error(
4012 error, failure_count
4013 ),
4014 with_failure_count=True,
4015 )
4017 def _execute_transaction(
4018 self, stack: List["PipelineCommand"], raise_on_error: bool
4019 ):
4020 if len(self._pipeline_slots) > 1:
4021 raise CrossSlotTransactionError(
4022 "All keys involved in a cluster transaction must map to the same slot"
4023 )
4025 self._executing = True
4027 redis_node, connection = self._get_client_and_connection_for_transaction()
4029 stack = chain(
4030 [PipelineCommand(("MULTI",))],
4031 stack,
4032 [PipelineCommand(("EXEC",))],
4033 )
4034 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
4035 packed_commands = connection.pack_commands(commands)
4037 # Start timing for observability
4038 start_time = time.monotonic()
4040 connection.send_packed_command(packed_commands)
4041 errors = []
4043 # parse off the response for MULTI
4044 # NOTE: we need to handle ResponseErrors here and continue
4045 # so that we read all the additional command messages from
4046 # the socket
4047 try:
4048 redis_node.parse_response(connection, "MULTI")
4049 except ResponseError as e:
4050 self.annotate_exception(e, 0, "MULTI")
4051 errors.append(e)
4052 except self.CONNECTION_ERRORS as cluster_error:
4053 self.annotate_exception(cluster_error, 0, "MULTI")
4054 raise
4056 # and all the other commands
4057 for i, command in enumerate(self._command_queue):
4058 if EMPTY_RESPONSE in command.options:
4059 errors.append((i, command.options[EMPTY_RESPONSE]))
4060 else:
4061 try:
4062 _ = redis_node.parse_response(connection, "_")
4063 except self.SLOT_REDIRECT_ERRORS as slot_error:
4064 self.annotate_exception(slot_error, i + 1, command.args)
4065 errors.append(slot_error)
4066 except self.CONNECTION_ERRORS as cluster_error:
4067 self.annotate_exception(cluster_error, i + 1, command.args)
4068 raise
4069 except ResponseError as e:
4070 self.annotate_exception(e, i + 1, command.args)
4071 errors.append(e)
4073 response = None
4074 # parse the EXEC.
4075 try:
4076 response = redis_node.parse_response(connection, "EXEC")
4077 except ExecAbortError:
4078 if errors:
4079 raise errors[0]
4080 raise
4082 self._executing = False
4084 record_operation_duration(
4085 command_name="TRANSACTION",
4086 duration_seconds=time.monotonic() - start_time,
4087 server_address=connection.host,
4088 server_port=connection.port,
4089 db_namespace=str(connection.db),
4090 )
4092 # EXEC clears any watched keys
4093 self._watching = False
4095 if response is None:
4096 raise WatchError("Watched variable changed.")
4098 # put any parse errors into the response
4099 for i, e in errors:
4100 response.insert(i, e)
4102 if len(response) != len(self._command_queue):
4103 raise InvalidPipelineStack(
4104 "Unexpected response length for cluster pipeline EXEC."
4105 " Command stack was {} but response had length {}".format(
4106 [c.args[0] for c in self._command_queue], len(response)
4107 )
4108 )
4110 # find any errors in the response and raise if necessary
4111 if raise_on_error or len(errors) > 0:
4112 self._raise_first_error(
4113 response,
4114 self._command_queue,
4115 start_time,
4116 )
4118 # We have to run response callbacks manually
4119 data = []
4120 for r, cmd in zip(response, self._command_queue):
4121 if not isinstance(r, Exception):
4122 command_name = cmd.args[0]
4123 if command_name in self._pipe.cluster_response_callbacks:
4124 r = self._pipe.cluster_response_callbacks[command_name](
4125 r, **cmd.options
4126 )
4127 data.append(r)
4128 return data
4130 def reset(self):
4131 self._command_queue = []
4133 # make sure to reset the connection state in the event that we were
4134 # watching something
4135 if self._transaction_connection:
4136 try:
4137 if self._watching:
4138 # call this manually since our unwatch or
4139 # immediate_execute_command methods can call reset()
4140 self._transaction_connection.send_command("UNWATCH")
4141 self._transaction_connection.read_response()
4142 # we can safely return the connection to the pool here since we're
4143 # sure we're no longer WATCHing anything
4144 node = self._nodes_manager.find_connection_owner(
4145 self._transaction_connection
4146 )
4147 if node and node.redis_connection:
4148 node.redis_connection.connection_pool.release(
4149 self._transaction_connection
4150 )
4151 self._transaction_connection = None
4152 except self.CONNECTION_ERRORS:
4153 # disconnect will also remove any previous WATCHes
4154 if self._transaction_connection:
4155 self._transaction_connection.disconnect()
4156 node = self._nodes_manager.find_connection_owner(
4157 self._transaction_connection
4158 )
4159 if node and node.redis_connection:
4160 node.redis_connection.connection_pool.release(
4161 self._transaction_connection
4162 )
4163 self._transaction_connection = None
4165 # clean up the other instance attributes
4166 self._watching = False
4167 self._explicit_transaction = False
4168 self._pipeline_slots = set()
4169 self._executing = False
4171 def send_cluster_commands(
4172 self, stack, raise_on_error=True, allow_redirections=True
4173 ):
4174 raise NotImplementedError(
4175 "send_cluster_commands cannot be executed in transactional context."
4176 )
4178 def multi(self):
4179 if self._explicit_transaction:
4180 raise RedisError("Cannot issue nested calls to MULTI")
4181 if self._command_queue:
4182 raise RedisError(
4183 "Commands without an initial WATCH have already been issued"
4184 )
4185 self._explicit_transaction = True
4187 def watch(self, *names):
4188 if self._explicit_transaction:
4189 raise RedisError("Cannot issue a WATCH after a MULTI")
4191 return self.execute_command("WATCH", *names)
4193 def unwatch(self):
4194 if self._watching:
4195 return self.execute_command("UNWATCH")
4197 return True
4199 def discard(self):
4200 self.reset()
4202 def delete(self, *names):
4203 return self.execute_command("DEL", *names)
4205 def unlink(self, *names):
4206 return self.execute_command("UNLINK", *names)