Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2import random
3import socket
4import sys
5import threading
6import time
7from abc import ABC, abstractmethod
8from collections import OrderedDict
9from copy import copy
10from enum import Enum
11from itertools import chain
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Callable,
16 Dict,
17 List,
18 Literal,
19 Optional,
20 Set,
21 Tuple,
22 Union,
23)
25if TYPE_CHECKING:
26 from redis.keyspace_notifications import ClusterKeyspaceNotifications
28from redis._parsers import CommandsParser, Encoder
29from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy
30from redis._parsers.helpers import parse_scan
31from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
32from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
33from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
34from redis.commands import READ_COMMANDS, RedisClusterCommands
35from redis.commands.helpers import list_or_args
36from redis.commands.policies import PolicyResolver, StaticPolicyResolver
37from redis.connection import (
38 Connection,
39 ConnectionPool,
40 parse_url,
41)
42from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
43from redis.event import (
44 AfterPooledConnectionsInstantiationEvent,
45 AfterPubSubConnectionInstantiationEvent,
46 ClientType,
47 EventDispatcher,
48)
49from redis.exceptions import (
50 AskError,
51 AuthenticationError,
52 ClusterDownError,
53 ClusterError,
54 ConnectionError,
55 CrossSlotTransactionError,
56 DataError,
57 ExecAbortError,
58 InvalidPipelineStack,
59 MaxConnectionsError,
60 MovedError,
61 RedisClusterException,
62 RedisError,
63 ResponseError,
64 SlotNotCoveredError,
65 TimeoutError,
66 TryAgainError,
67 WatchError,
68)
69from redis.lock import Lock
70from redis.maint_notifications import (
71 MaintNotificationsConfig,
72 OSSMaintNotificationsHandler,
73)
74from redis.observability.recorder import (
75 record_error_count,
76 record_operation_duration,
77)
78from redis.retry import Retry
79from redis.utils import (
80 check_protocol_version,
81 deprecated_args,
82 deprecated_function,
83 dict_merge,
84 list_keys_to_dict,
85 merge_result,
86 safe_str,
87 str_if_bytes,
88 truncate_text,
89)
91logger = logging.getLogger(__name__)
94def is_debug_log_enabled():
95 return logger.isEnabledFor(logging.DEBUG)
98def get_node_name(host: str, port: Union[str, int]) -> str:
99 return f"{host}:{port}"
102@deprecated_args(
103 allowed_args=["redis_node"],
104 reason="Use get_connection(redis_node) instead",
105 version="5.3.0",
106)
107def get_connection(redis_node: Redis, *args, **options) -> Connection:
108 return redis_node.connection or redis_node.connection_pool.get_connection()
111def parse_scan_result(command, res, **options):
112 cursors = {}
113 ret = []
114 for node_name, response in res.items():
115 cursor, r = parse_scan(response, **options)
116 cursors[node_name] = cursor
117 ret += r
119 return cursors, ret
122def parse_pubsub_numsub(command, res, **options):
123 numsub_d = OrderedDict()
124 for numsub_tups in res.values():
125 for channel, numsubbed in numsub_tups:
126 try:
127 numsub_d[channel] += numsubbed
128 except KeyError:
129 numsub_d[channel] = numsubbed
131 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
132 return ret_numsub
135def parse_cluster_slots(
136 resp: Any, **options: Any
137) -> Dict[Tuple[int, int], Dict[str, Any]]:
138 current_host = options.get("current_host", "")
140 def fix_server(*args: Any) -> Tuple[str, Any]:
141 return str_if_bytes(args[0]) or current_host, args[1]
143 slots = {}
144 for slot in resp:
145 start, end, primary = slot[:3]
146 replicas = slot[3:]
147 slots[start, end] = {
148 "primary": fix_server(*primary),
149 "replicas": [fix_server(*replica) for replica in replicas],
150 }
152 return slots
155def parse_cluster_shards(resp, **options):
156 """
157 Parse CLUSTER SHARDS response.
158 """
159 if isinstance(resp[0], dict):
160 return resp
161 shards = []
162 for x in resp:
163 shard = {"slots": [], "nodes": []}
164 for i in range(0, len(x[1]), 2):
165 shard["slots"].append((x[1][i], (x[1][i + 1])))
166 nodes = x[3]
167 for node in nodes:
168 dict_node = {}
169 for i in range(0, len(node), 2):
170 dict_node[node[i]] = node[i + 1]
171 shard["nodes"].append(dict_node)
172 shards.append(shard)
174 return shards
177def parse_cluster_myshardid(resp, **options):
178 """
179 Parse CLUSTER MYSHARDID response.
180 """
181 return resp.decode("utf-8")
184PRIMARY = "primary"
185REPLICA = "replica"
186SLOT_ID = "slot-id"
188REDIS_ALLOWED_KEYS = (
189 "connection_class",
190 "connection_pool",
191 "connection_pool_class",
192 "client_name",
193 "credential_provider",
194 "db",
195 "decode_responses",
196 "encoding",
197 "encoding_errors",
198 "host",
199 "lib_name",
200 "lib_version",
201 "max_connections",
202 "nodes_flag",
203 "redis_connect_func",
204 "password",
205 "port",
206 "timeout",
207 "queue_class",
208 "retry",
209 "retry_on_timeout",
210 "protocol",
211 "socket_connect_timeout",
212 "socket_keepalive",
213 "socket_keepalive_options",
214 "socket_timeout",
215 "ssl",
216 "ssl_ca_certs",
217 "ssl_ca_data",
218 "ssl_ca_path",
219 "ssl_certfile",
220 "ssl_cert_reqs",
221 "ssl_include_verify_flags",
222 "ssl_exclude_verify_flags",
223 "ssl_keyfile",
224 "ssl_password",
225 "ssl_check_hostname",
226 "unix_socket_path",
227 "username",
228 "cache",
229 "cache_config",
230 "maint_notifications_config",
231)
232KWARGS_DISABLED_KEYS = ("host", "port", "retry")
235def cleanup_kwargs(**kwargs):
236 """
237 Remove unsupported or disabled keys from kwargs
238 """
239 connection_kwargs = {
240 k: v
241 for k, v in kwargs.items()
242 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
243 }
245 return connection_kwargs
248class MaintNotificationsAbstractRedisCluster:
249 """
250 Abstract class for handling maintenance notifications logic.
251 This class is expected to be used as base class together with RedisCluster.
253 This class is intended to be used with multiple inheritance!
255 All logic related to maintenance notifications is encapsulated in this class.
256 """
258 def __init__(
259 self,
260 maint_notifications_config: Optional[MaintNotificationsConfig],
261 **kwargs,
262 ):
263 # Initialize maintenance notifications
264 is_protocol_supported = check_protocol_version(kwargs.get("protocol"), 3)
266 if (
267 maint_notifications_config
268 and maint_notifications_config.enabled
269 and not is_protocol_supported
270 ):
271 raise RedisError(
272 "Maintenance notifications handlers on connection are only supported with RESP version 3"
273 )
274 if maint_notifications_config is None and is_protocol_supported:
275 maint_notifications_config = MaintNotificationsConfig()
277 self.maint_notifications_config = maint_notifications_config
279 if self.maint_notifications_config and self.maint_notifications_config.enabled:
280 self._oss_cluster_maint_notifications_handler = (
281 OSSMaintNotificationsHandler(self, self.maint_notifications_config)
282 )
283 # Update connection kwargs for all future nodes connections
284 self._update_connection_kwargs_for_maint_notifications(
285 self._oss_cluster_maint_notifications_handler
286 )
287 # Update existing nodes connections - they are created as part of the RedisCluster constructor
288 for node in self.get_nodes():
289 if node.redis_connection is None:
290 continue
291 node.redis_connection.connection_pool.update_maint_notifications_config(
292 self.maint_notifications_config,
293 oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler,
294 )
295 else:
296 self._oss_cluster_maint_notifications_handler = None
298 def _update_connection_kwargs_for_maint_notifications(
299 self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler
300 ):
301 """
302 Update the connection kwargs for all future connections.
303 """
304 self.nodes_manager.connection_kwargs.update(
305 {
306 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
307 }
308 )
311class AbstractRedisCluster:
312 RedisClusterRequestTTL = 16
314 PRIMARIES = "primaries"
315 REPLICAS = "replicas"
316 ALL_NODES = "all"
317 RANDOM = "random"
318 DEFAULT_NODE = "default-node"
320 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
322 COMMAND_FLAGS = dict_merge(
323 list_keys_to_dict(
324 [
325 "ACL CAT",
326 "ACL DELUSER",
327 "ACL DRYRUN",
328 "ACL GENPASS",
329 "ACL GETUSER",
330 "ACL HELP",
331 "ACL LIST",
332 "ACL LOG",
333 "ACL LOAD",
334 "ACL SAVE",
335 "ACL SETUSER",
336 "ACL USERS",
337 "ACL WHOAMI",
338 "AUTH",
339 "CLIENT LIST",
340 "CLIENT SETINFO",
341 "CLIENT SETNAME",
342 "CLIENT GETNAME",
343 "CONFIG SET",
344 "CONFIG REWRITE",
345 "CONFIG RESETSTAT",
346 "TIME",
347 "PUBSUB CHANNELS",
348 "PUBSUB NUMPAT",
349 "PUBSUB NUMSUB",
350 "PUBSUB SHARDCHANNELS",
351 "PUBSUB SHARDNUMSUB",
352 "PING",
353 "INFO",
354 "SHUTDOWN",
355 "KEYS",
356 "DBSIZE",
357 "BGSAVE",
358 "SLOWLOG GET",
359 "SLOWLOG LEN",
360 "SLOWLOG RESET",
361 "WAIT",
362 "WAITAOF",
363 "SAVE",
364 "MEMORY PURGE",
365 "MEMORY MALLOC-STATS",
366 "MEMORY STATS",
367 "LASTSAVE",
368 "CLIENT TRACKINGINFO",
369 "CLIENT PAUSE",
370 "CLIENT UNPAUSE",
371 "CLIENT UNBLOCK",
372 "CLIENT ID",
373 "CLIENT REPLY",
374 "CLIENT GETREDIR",
375 "CLIENT INFO",
376 "CLIENT KILL",
377 "READONLY",
378 "CLUSTER INFO",
379 "CLUSTER MEET",
380 "CLUSTER MYSHARDID",
381 "CLUSTER NODES",
382 "CLUSTER REPLICAS",
383 "CLUSTER RESET",
384 "CLUSTER SET-CONFIG-EPOCH",
385 "CLUSTER SLOTS",
386 "CLUSTER SHARDS",
387 "CLUSTER COUNT-FAILURE-REPORTS",
388 "CLUSTER KEYSLOT",
389 "COMMAND",
390 "COMMAND COUNT",
391 "COMMAND LIST",
392 "COMMAND GETKEYS",
393 "CONFIG GET",
394 "DEBUG",
395 "RANDOMKEY",
396 "READONLY",
397 "READWRITE",
398 "TIME",
399 "TFUNCTION LOAD",
400 "TFUNCTION DELETE",
401 "TFUNCTION LIST",
402 "TFCALL",
403 "TFCALLASYNC",
404 "LATENCY HISTORY",
405 "LATENCY LATEST",
406 "LATENCY RESET",
407 "MODULE LIST",
408 "MODULE LOAD",
409 "MODULE UNLOAD",
410 "MODULE LOADEX",
411 ],
412 DEFAULT_NODE,
413 ),
414 list_keys_to_dict(
415 [
416 "FLUSHALL",
417 "FLUSHDB",
418 "FUNCTION DELETE",
419 "FUNCTION FLUSH",
420 "FUNCTION LIST",
421 "FUNCTION LOAD",
422 "FUNCTION RESTORE",
423 "SCAN",
424 "SCRIPT EXISTS",
425 "SCRIPT FLUSH",
426 "SCRIPT LOAD",
427 ],
428 PRIMARIES,
429 ),
430 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
431 list_keys_to_dict(
432 [
433 "CLUSTER COUNTKEYSINSLOT",
434 "CLUSTER DELSLOTS",
435 "CLUSTER DELSLOTSRANGE",
436 "CLUSTER GETKEYSINSLOT",
437 "CLUSTER SETSLOT",
438 ],
439 SLOT_ID,
440 ),
441 )
443 SEARCH_COMMANDS = (
444 [
445 "FT.CREATE",
446 "FT.SEARCH",
447 "FT.AGGREGATE",
448 "FT.EXPLAIN",
449 "FT.EXPLAINCLI",
450 "FT,PROFILE",
451 "FT.ALTER",
452 "FT.DROPINDEX",
453 "FT.ALIASADD",
454 "FT.ALIASUPDATE",
455 "FT.ALIASDEL",
456 "FT.TAGVALS",
457 "FT.SUGADD",
458 "FT.SUGGET",
459 "FT.SUGDEL",
460 "FT.SUGLEN",
461 "FT.SYNUPDATE",
462 "FT.SYNDUMP",
463 "FT.SPELLCHECK",
464 "FT.DICTADD",
465 "FT.DICTDEL",
466 "FT.DICTDUMP",
467 "FT.INFO",
468 "FT._LIST",
469 "FT.CONFIG",
470 "FT.ADD",
471 "FT.DEL",
472 "FT.DROP",
473 "FT.GET",
474 "FT.MGET",
475 "FT.SYNADD",
476 ],
477 )
479 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
480 "CLUSTER SLOTS": parse_cluster_slots,
481 "CLUSTER SHARDS": parse_cluster_shards,
482 "CLUSTER MYSHARDID": parse_cluster_myshardid,
483 }
485 RESULT_CALLBACKS = dict_merge(
486 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
487 list_keys_to_dict(
488 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
489 ),
490 list_keys_to_dict(
491 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
492 ),
493 list_keys_to_dict(
494 [
495 "PING",
496 "CONFIG SET",
497 "CONFIG REWRITE",
498 "CONFIG RESETSTAT",
499 "CLIENT SETNAME",
500 "BGSAVE",
501 "SLOWLOG RESET",
502 "SAVE",
503 "MEMORY PURGE",
504 "CLIENT PAUSE",
505 "CLIENT UNPAUSE",
506 ],
507 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
508 ),
509 list_keys_to_dict(
510 ["DBSIZE", "WAIT"],
511 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
512 ),
513 list_keys_to_dict(
514 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
515 ),
516 list_keys_to_dict(["SCAN"], parse_scan_result),
517 list_keys_to_dict(
518 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
519 ),
520 list_keys_to_dict(
521 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
522 ),
523 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
524 )
526 ERRORS_ALLOW_RETRY = (
527 ConnectionError,
528 TimeoutError,
529 ClusterDownError,
530 SlotNotCoveredError,
531 )
533 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
534 """Replace the default cluster node.
535 A random cluster node will be chosen if target_node isn't passed, and primaries
536 will be prioritized. The default node will not be changed if there are no other
537 nodes in the cluster.
539 Args:
540 target_node (ClusterNode, optional): Target node to replace the default
541 node. Defaults to None.
542 """
543 if target_node:
544 self.nodes_manager.default_node = target_node
545 else:
546 curr_node = self.get_default_node()
547 primaries = [node for node in self.get_primaries() if node != curr_node]
548 if primaries:
549 # Choose a primary if the cluster contains different primaries
550 self.nodes_manager.default_node = random.choice(primaries)
551 else:
552 # Otherwise, choose a primary if the cluster contains different primaries
553 replicas = [node for node in self.get_replicas() if node != curr_node]
554 if replicas:
555 self.nodes_manager.default_node = random.choice(replicas)
558class RedisCluster(
559 AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands
560):
561 # Type discrimination marker for @overload self-type pattern
562 _is_async_client: Literal[False] = False
564 @classmethod
565 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
566 """
567 Return a Redis client object configured from the given URL
569 For example::
571 redis://[[username]:[password]]@localhost:6379/0
572 rediss://[[username]:[password]]@localhost:6379/0
573 unix://[username@]/path/to/socket.sock?db=0[&password=password]
575 Three URL schemes are supported:
577 - `redis://` creates a TCP socket connection. See more at:
578 <https://www.iana.org/assignments/uri-schemes/prov/redis>
579 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
580 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
581 - ``unix://``: creates a Unix Domain Socket connection.
583 The username, password, hostname, path and all querystring values
584 are passed through urllib.parse.unquote in order to replace any
585 percent-encoded values with their corresponding characters.
587 There are several ways to specify a database number. The first value
588 found will be used:
590 1. A ``db`` querystring option, e.g. redis://localhost?db=0
591 2. If using the redis:// or rediss:// schemes, the path argument
592 of the url, e.g. redis://localhost/0
593 3. A ``db`` keyword argument to this function.
595 If none of these options are specified, the default db=0 is used.
597 All querystring options are cast to their appropriate Python types.
598 Boolean arguments can be specified with string values "True"/"False"
599 or "Yes"/"No". Values that cannot be properly cast cause a
600 ``ValueError`` to be raised. Once parsed, the querystring arguments
601 and keyword arguments are passed to the ``ConnectionPool``'s
602 class initializer. In the case of conflicting arguments, querystring
603 arguments always win.
605 """
606 return cls(url=url, **kwargs)
608 @deprecated_args(
609 args_to_warn=["read_from_replicas"],
610 reason="Please configure the 'load_balancing_strategy' instead",
611 version="5.3.0",
612 )
613 @deprecated_args(
614 args_to_warn=[
615 "cluster_error_retry_attempts",
616 ],
617 reason="Please configure the 'retry' object instead",
618 version="6.0.0",
619 )
620 def __init__(
621 self,
622 host: Optional[str] = None,
623 port: int = 6379,
624 startup_nodes: Optional[List["ClusterNode"]] = None,
625 cluster_error_retry_attempts: int = 3,
626 retry: Optional["Retry"] = None,
627 require_full_coverage: bool = True,
628 reinitialize_steps: int = 5,
629 read_from_replicas: bool = False,
630 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
631 dynamic_startup_nodes: bool = True,
632 url: Optional[str] = None,
633 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
634 cache: Optional[CacheInterface] = None,
635 cache_config: Optional[CacheConfig] = None,
636 event_dispatcher: Optional[EventDispatcher] = None,
637 policy_resolver: PolicyResolver = StaticPolicyResolver(),
638 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
639 **kwargs,
640 ):
641 """
642 Initialize a new RedisCluster client.
644 :param startup_nodes:
645 List of nodes from which initial bootstrapping can be done
646 :param host:
647 Can be used to point to a startup node
648 :param port:
649 Can be used to point to a startup node
650 :param require_full_coverage:
651 When set to False (default value): the client will not require a
652 full coverage of the slots. However, if not all slots are covered,
653 and at least one node has 'cluster-require-full-coverage' set to
654 'yes,' the server will throw a ClusterDownError for some key-based
655 commands. See -
656 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
657 When set to True: all slots must be covered to construct the
658 cluster client. If not all slots are covered, RedisClusterException
659 will be thrown.
660 :param read_from_replicas:
661 @deprecated - please use load_balancing_strategy instead
662 Enable read from replicas in READONLY mode. You can read possibly
663 stale data.
664 When set to true, read commands will be assigned between the
665 primary and its replications in a Round-Robin manner.
666 :param load_balancing_strategy:
667 Enable read from replicas in READONLY mode and defines the load balancing
668 strategy that will be used for cluster node selection.
669 The data read from replicas is eventually consistent with the data in primary nodes.
670 :param dynamic_startup_nodes:
671 Set the RedisCluster's startup nodes to all of the discovered nodes.
672 If true (default value), the cluster's discovered nodes will be used to
673 determine the cluster nodes-slots mapping in the next topology refresh.
674 It will remove the initial passed startup nodes if their endpoints aren't
675 listed in the CLUSTER SLOTS output.
676 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
677 specific IP addresses, it is best to set it to false.
678 :param cluster_error_retry_attempts:
679 @deprecated - Please configure the 'retry' object instead
680 In case 'retry' object is set - this argument is ignored!
682 Number of times to retry before raising an error when
683 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
684 :class:`~.ClusterDownError` are encountered
685 :param retry:
686 A retry object that defines the retry strategy and the number of
687 retries for the cluster client.
688 In current implementation for the cluster client (starting form redis-py version 6.0.0)
689 the retry object is not yet fully utilized, instead it is used just to determine
690 the number of retries for the cluster client.
691 In the future releases the retry object will be used to handle the cluster client retries!
692 :param reinitialize_steps:
693 Specifies the number of MOVED errors that need to occur before
694 reinitializing the whole cluster topology. If a MOVED error occurs
695 and the cluster does not need to be reinitialized on this current
696 error handling, only the MOVED slot will be patched with the
697 redirected node.
698 To reinitialize the cluster on every MOVED error, set
699 reinitialize_steps to 1.
700 To avoid reinitializing the cluster on moved errors, set
701 reinitialize_steps to 0.
702 :param address_remap:
703 An optional callable which, when provided with an internal network
704 address of a node, e.g. a `(host, port)` tuple, will return the address
705 where the node is reachable. This can be used to map the addresses at
706 which the nodes _think_ they are, to addresses at which a client may
707 reach them, such as when they sit behind a proxy.
709 :param maint_notifications_config:
710 Configures the nodes connections to support maintenance notifications - see
711 `redis.maint_notifications.MaintNotificationsConfig` for details.
712 Only supported with RESP3.
713 If not provided and protocol is RESP3, the maintenance notifications
714 will be enabled by default (logic is included in the NodesManager
715 initialization).
716 :**kwargs:
717 Extra arguments that will be sent into Redis instance when created
718 (See Official redis-py doc for supported kwargs - the only limitation
719 is that you can't provide 'retry' object as part of kwargs.
720 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
721 Some kwargs are not supported and will raise a
722 RedisClusterException:
723 - db (Redis do not support database SELECT in cluster mode)
725 """
726 if startup_nodes is None:
727 startup_nodes = []
729 if "db" in kwargs:
730 # Argument 'db' is not possible to use in cluster mode
731 raise RedisClusterException(
732 "Argument 'db' is not possible to use in cluster mode"
733 )
735 if "retry" in kwargs:
736 # Argument 'retry' is not possible to be used in kwargs when in cluster mode
737 # the kwargs are set to the lower level connections to the cluster nodes
738 # and there we provide retry configuration without retries allowed.
739 # The retries should be handled on cluster client level.
740 raise RedisClusterException(
741 "The 'retry' argument cannot be used in kwargs when running in cluster mode."
742 )
744 # Get the startup node/s
745 from_url = False
746 if url is not None:
747 from_url = True
748 url_options = parse_url(url)
749 if "path" in url_options:
750 raise RedisClusterException(
751 "RedisCluster does not currently support Unix Domain "
752 "Socket connections"
753 )
754 if "db" in url_options and url_options["db"] != 0:
755 # Argument 'db' is not possible to use in cluster mode
756 raise RedisClusterException(
757 "A ``db`` querystring option can only be 0 in cluster mode"
758 )
759 kwargs.update(url_options)
760 host = kwargs.get("host")
761 port = kwargs.get("port", port)
762 startup_nodes.append(ClusterNode(host, port))
763 elif host is not None and port is not None:
764 startup_nodes.append(ClusterNode(host, port))
765 elif len(startup_nodes) == 0:
766 # No startup node was provided
767 raise RedisClusterException(
768 "RedisCluster requires at least one node to discover the "
769 "cluster. Please provide one of the followings:\n"
770 "1. host and port, for example:\n"
771 " RedisCluster(host='localhost', port=6379)\n"
772 "2. list of startup nodes, for example:\n"
773 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
774 " ClusterNode('localhost', 6378)])"
775 )
776 # Update the connection arguments
777 # Whenever a new connection is established, RedisCluster's on_connect
778 # method should be run
779 # If the user passed on_connect function we'll save it and run it
780 # inside the RedisCluster.on_connect() function
781 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
782 kwargs.update({"redis_connect_func": self.on_connect})
783 kwargs = cleanup_kwargs(**kwargs)
784 if retry:
785 self.retry = retry
786 else:
787 self.retry = Retry(
788 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
789 retries=cluster_error_retry_attempts,
790 )
792 self.encoder = Encoder(
793 kwargs.get("encoding", "utf-8"),
794 kwargs.get("encoding_errors", "strict"),
795 kwargs.get("decode_responses", False),
796 )
797 protocol = kwargs.get("protocol", None)
798 if (cache_config or cache) and not check_protocol_version(protocol, 3):
799 raise RedisError("Client caching is only supported with RESP version 3")
801 if maint_notifications_config and not check_protocol_version(protocol, 3):
802 raise RedisError(
803 "Maintenance notifications are only supported with RESP version 3"
804 )
805 if check_protocol_version(protocol, 3) and maint_notifications_config is None:
806 maint_notifications_config = MaintNotificationsConfig()
808 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
809 self.node_flags = self.__class__.NODE_FLAGS.copy()
810 self.read_from_replicas = read_from_replicas
811 self.load_balancing_strategy = load_balancing_strategy
812 self.reinitialize_counter = 0
813 self.reinitialize_steps = reinitialize_steps
814 if event_dispatcher is None:
815 self._event_dispatcher = EventDispatcher()
816 else:
817 self._event_dispatcher = event_dispatcher
818 self.startup_nodes = startup_nodes
820 self.nodes_manager = NodesManager(
821 startup_nodes=startup_nodes,
822 from_url=from_url,
823 require_full_coverage=require_full_coverage,
824 dynamic_startup_nodes=dynamic_startup_nodes,
825 address_remap=address_remap,
826 cache=cache,
827 cache_config=cache_config,
828 event_dispatcher=self._event_dispatcher,
829 maint_notifications_config=maint_notifications_config,
830 **kwargs,
831 )
833 self.cluster_response_callbacks = CaseInsensitiveDict(
834 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
835 )
836 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
838 # For backward compatibility, mapping from existing policies to new one
839 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
840 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
841 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
842 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
843 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
844 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
845 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
846 }
848 self._policies_callback_mapping: dict[
849 Union[RequestPolicy, ResponsePolicy], Callable
850 ] = {
851 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
852 self.get_random_primary_or_all_nodes(command_name)
853 ],
854 RequestPolicy.DEFAULT_KEYED: lambda command,
855 *args: self.get_nodes_from_slot(command, *args),
856 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
857 RequestPolicy.ALL_SHARDS: self.get_primaries,
858 RequestPolicy.ALL_NODES: self.get_nodes,
859 RequestPolicy.ALL_REPLICAS: self.get_replicas,
860 RequestPolicy.MULTI_SHARD: lambda *args,
861 **kwargs: self._split_multi_shard_command(*args, **kwargs),
862 RequestPolicy.SPECIAL: self.get_special_nodes,
863 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
864 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
865 }
867 self._policy_resolver = policy_resolver
868 self.commands_parser = CommandsParser(self)
870 # Node where FT.AGGREGATE command is executed.
871 self._aggregate_nodes = None
872 self._lock = threading.RLock()
874 MaintNotificationsAbstractRedisCluster.__init__(
875 self, maint_notifications_config, **kwargs
876 )
878 def __enter__(self):
879 return self
881 def __exit__(self, exc_type, exc_value, traceback):
882 self.close()
884 def __del__(self):
885 try:
886 self.close()
887 except Exception:
888 pass
890 def disconnect_connection_pools(self):
891 for node in self.get_nodes():
892 if node.redis_connection:
893 try:
894 node.redis_connection.connection_pool.disconnect()
895 except OSError:
896 # Client was already disconnected. do nothing
897 pass
899 def on_connect(self, connection):
900 """
901 Initialize the connection, authenticate and select a database and send
902 READONLY if it is set during object initialization.
903 """
904 connection.on_connect()
906 if self.read_from_replicas or self.load_balancing_strategy:
907 # Sending READONLY command to server to configure connection as
908 # readonly. Since each cluster node may change its server type due
909 # to a failover, we should establish a READONLY connection
910 # regardless of the server type. If this is a primary connection,
911 # READONLY would not affect executing write commands.
912 connection.send_command("READONLY")
913 if str_if_bytes(connection.read_response()) != "OK":
914 raise ConnectionError("READONLY command failed")
916 if self.user_on_connect_func is not None:
917 self.user_on_connect_func(connection)
919 def get_redis_connection(self, node: "ClusterNode") -> Redis:
920 if not node.redis_connection:
921 with self._lock:
922 if not node.redis_connection:
923 self.nodes_manager.create_redis_connections([node])
924 return node.redis_connection
926 def get_node(self, host=None, port=None, node_name=None):
927 return self.nodes_manager.get_node(host, port, node_name)
929 def get_primaries(self):
930 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
932 def get_replicas(self):
933 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
935 def get_random_node(self):
936 return random.choice(list(self.nodes_manager.nodes_cache.values()))
938 def get_random_primary_or_all_nodes(self, command_name):
939 """
940 Returns random primary or all nodes depends on READONLY mode.
941 """
942 if self.read_from_replicas and command_name in READ_COMMANDS:
943 return self.get_random_node()
945 return self.get_random_primary_node()
947 def get_nodes(self):
948 return list(self.nodes_manager.nodes_cache.values())
950 def get_node_from_key(self, key, replica=False):
951 """
952 Get the node that holds the key's slot.
953 If replica set to True but the slot doesn't have any replicas, None is
954 returned.
955 """
956 slot = self.keyslot(key)
957 slot_cache = self.nodes_manager.slots_cache.get(slot)
958 if slot_cache is None or len(slot_cache) == 0:
959 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
960 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
961 return None
962 elif replica:
963 node_idx = 1
964 else:
965 # primary
966 node_idx = 0
968 return slot_cache[node_idx]
970 def get_default_node(self):
971 """
972 Get the cluster's default node
973 """
974 return self.nodes_manager.default_node
976 def get_nodes_from_slot(self, command: str, *args):
977 """
978 Returns a list of nodes that hold the specified keys' slots.
979 """
980 # get the node that holds the key's slot
981 slot = self.determine_slot(*args)
982 node = self.nodes_manager.get_node_from_slot(
983 slot,
984 self.read_from_replicas and command in READ_COMMANDS,
985 self.load_balancing_strategy if command in READ_COMMANDS else None,
986 )
987 return [node]
989 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]:
990 """
991 Splits the command with Multi-Shard policy, to the multiple commands
992 """
993 keys = self._get_command_keys(*args)
994 commands = []
996 for key in keys:
997 commands.append(
998 {
999 "args": (args[0], key),
1000 "kwargs": kwargs,
1001 }
1002 )
1004 return commands
1006 def get_special_nodes(self) -> Optional[list["ClusterNode"]]:
1007 """
1008 Returns a list of nodes for commands with a special policy.
1009 """
1010 if not self._aggregate_nodes:
1011 raise RedisClusterException(
1012 "Cannot execute FT.CURSOR commands without FT.AGGREGATE"
1013 )
1015 return self._aggregate_nodes
1017 def get_random_primary_node(self) -> "ClusterNode":
1018 """
1019 Returns a random primary node
1020 """
1021 return random.choice(self.get_primaries())
1023 def _evaluate_all_succeeded(self, res):
1024 """
1025 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED
1026 """
1027 first_successful_response = None
1029 if isinstance(res, dict):
1030 for key, value in res.items():
1031 if value:
1032 if first_successful_response is None:
1033 first_successful_response = {key: value}
1034 else:
1035 return {key: False}
1036 else:
1037 for response in res:
1038 if response:
1039 if first_successful_response is None:
1040 # Dynamically resolve type
1041 first_successful_response = type(response)(response)
1042 else:
1043 return type(response)(False)
1045 return first_successful_response
1047 def set_default_node(self, node):
1048 """
1049 Set the default node of the cluster.
1050 :param node: 'ClusterNode'
1051 :return True if the default node was set, else False
1052 """
1053 if node is None or self.get_node(node_name=node.name) is None:
1054 return False
1055 self.nodes_manager.default_node = node
1056 return True
1058 def set_retry(self, retry: Retry) -> None:
1059 self.retry = retry
1061 def monitor(self, target_node=None):
1062 """
1063 Returns a Monitor object for the specified target node.
1064 The default cluster node will be selected if no target node was
1065 specified.
1066 Monitor is useful for handling the MONITOR command to the redis server.
1067 next_command() method returns one command from monitor
1068 listen() method yields commands from monitor.
1069 """
1070 if target_node is None:
1071 target_node = self.get_default_node()
1072 if target_node.redis_connection is None:
1073 raise RedisClusterException(
1074 f"Cluster Node {target_node.name} has no redis_connection"
1075 )
1076 return target_node.redis_connection.monitor()
1078 def pubsub(self, node=None, host=None, port=None, **kwargs):
1079 """
1080 Allows passing a ClusterNode, or host&port, to get a pubsub instance
1081 connected to the specified node
1082 """
1083 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
1085 def keyspace_notifications(
1086 self,
1087 key_prefix: Union[str, bytes, None] = None,
1088 ignore_subscribe_messages: bool = True,
1089 ) -> "ClusterKeyspaceNotifications":
1090 """
1091 Return a :class:`~redis.keyspace_notifications.ClusterKeyspaceNotifications`
1092 object for subscribing to keyspace and keyevent notifications across
1093 all primary nodes in the cluster.
1095 Note: Keyspace notifications must be enabled on all Redis cluster nodes
1096 via the ``notify-keyspace-events`` configuration option.
1098 Args:
1099 key_prefix: Optional prefix to filter and strip from keys in
1100 notifications.
1101 ignore_subscribe_messages: If True, subscribe/unsubscribe
1102 confirmations are not returned by
1103 get_message/listen.
1104 """
1105 from redis.keyspace_notifications import ClusterKeyspaceNotifications
1107 return ClusterKeyspaceNotifications(
1108 self,
1109 key_prefix=key_prefix,
1110 ignore_subscribe_messages=ignore_subscribe_messages,
1111 )
1113 def pipeline(self, transaction=None, shard_hint=None):
1114 """
1115 Cluster impl:
1116 Pipelines do not work in cluster mode the same way they
1117 do in normal mode. Create a clone of this object so
1118 that simulating pipelines will work correctly. Each
1119 command will be called directly when used and
1120 when calling execute() will only return the result stack.
1121 """
1122 if shard_hint:
1123 raise RedisClusterException("shard_hint is deprecated in cluster mode")
1125 return ClusterPipeline(
1126 nodes_manager=self.nodes_manager,
1127 commands_parser=self.commands_parser,
1128 startup_nodes=self.nodes_manager.startup_nodes,
1129 result_callbacks=self.result_callbacks,
1130 cluster_response_callbacks=self.cluster_response_callbacks,
1131 read_from_replicas=self.read_from_replicas,
1132 load_balancing_strategy=self.load_balancing_strategy,
1133 reinitialize_steps=self.reinitialize_steps,
1134 retry=self.retry,
1135 lock=self._lock,
1136 transaction=transaction,
1137 event_dispatcher=self._event_dispatcher,
1138 )
1140 def lock(
1141 self,
1142 name,
1143 timeout=None,
1144 sleep=0.1,
1145 blocking=True,
1146 blocking_timeout=None,
1147 lock_class=None,
1148 thread_local=True,
1149 raise_on_release_error: bool = True,
1150 ):
1151 """
1152 Return a new Lock object using key ``name`` that mimics
1153 the behavior of threading.Lock.
1155 If specified, ``timeout`` indicates a maximum life for the lock.
1156 By default, it will remain locked until release() is called.
1158 ``sleep`` indicates the amount of time to sleep per loop iteration
1159 when the lock is in blocking mode and another client is currently
1160 holding the lock.
1162 ``blocking`` indicates whether calling ``acquire`` should block until
1163 the lock has been acquired or to fail immediately, causing ``acquire``
1164 to return False and the lock not being acquired. Defaults to True.
1165 Note this value can be overridden by passing a ``blocking``
1166 argument to ``acquire``.
1168 ``blocking_timeout`` indicates the maximum amount of time in seconds to
1169 spend trying to acquire the lock. A value of ``None`` indicates
1170 continue trying forever. ``blocking_timeout`` can be specified as a
1171 float or integer, both representing the number of seconds to wait.
1173 ``lock_class`` forces the specified lock implementation. Note that as
1174 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
1175 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
1176 you have created your own custom lock class.
1178 ``thread_local`` indicates whether the lock token is placed in
1179 thread-local storage. By default, the token is placed in thread local
1180 storage so that a thread only sees its token, not a token set by
1181 another thread. Consider the following timeline:
1183 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
1184 thread-1 sets the token to "abc"
1185 time: 1, thread-2 blocks trying to acquire `my-lock` using the
1186 Lock instance.
1187 time: 5, thread-1 has not yet completed. redis expires the lock
1188 key.
1189 time: 5, thread-2 acquired `my-lock` now that it's available.
1190 thread-2 sets the token to "xyz"
1191 time: 6, thread-1 finishes its work and calls release(). if the
1192 token is *not* stored in thread local storage, then
1193 thread-1 would see the token value as "xyz" and would be
1194 able to successfully release the thread-2's lock.
1196 ``raise_on_release_error`` indicates whether to raise an exception when
1197 the lock is no longer owned when exiting the context manager. By default,
1198 this is True, meaning an exception will be raised. If False, the warning
1199 will be logged and the exception will be suppressed.
1201 In some use cases it's necessary to disable thread local storage. For
1202 example, if you have code where one thread acquires a lock and passes
1203 that lock instance to a worker thread to release later. If thread
1204 local storage isn't disabled in this case, the worker thread won't see
1205 the token set by the thread that acquired the lock. Our assumption
1206 is that these cases aren't common and as such default to using
1207 thread local storage."""
1208 if lock_class is None:
1209 lock_class = Lock
1210 return lock_class(
1211 self,
1212 name,
1213 timeout=timeout,
1214 sleep=sleep,
1215 blocking=blocking,
1216 blocking_timeout=blocking_timeout,
1217 thread_local=thread_local,
1218 raise_on_release_error=raise_on_release_error,
1219 )
1221 def set_response_callback(self, command, callback):
1222 """Set a custom Response Callback"""
1223 self.cluster_response_callbacks[command] = callback
1225 def _determine_nodes(
1226 self, *args, request_policy: RequestPolicy, **kwargs
1227 ) -> List["ClusterNode"]:
1228 """
1229 Determines a nodes the command should be executed on.
1230 """
1231 command = args[0].upper()
1232 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1233 command = f"{args[0]} {args[1]}".upper()
1235 nodes_flag = kwargs.pop("nodes_flag", None)
1236 if nodes_flag is not None:
1237 # nodes flag passed by the user
1238 command_flag = nodes_flag
1239 else:
1240 # get the nodes group for this command if it was predefined
1241 command_flag = self.command_flags.get(command)
1243 if command_flag in self._command_flags_mapping:
1244 request_policy = self._command_flags_mapping[command_flag]
1246 policy_callback = self._policies_callback_mapping[request_policy]
1248 if request_policy == RequestPolicy.DEFAULT_KEYED:
1249 nodes = policy_callback(command, *args)
1250 elif request_policy == RequestPolicy.MULTI_SHARD:
1251 nodes = policy_callback(*args, **kwargs)
1252 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
1253 nodes = policy_callback(args[0])
1254 else:
1255 nodes = policy_callback()
1257 if args[0].lower() == "ft.aggregate":
1258 self._aggregate_nodes = nodes
1260 return nodes
1262 def _should_reinitialized(self):
1263 # To reinitialize the cluster on every MOVED error,
1264 # set reinitialize_steps to 1.
1265 # To avoid reinitializing the cluster on moved errors, set
1266 # reinitialize_steps to 0.
1267 if self.reinitialize_steps == 0:
1268 return False
1269 else:
1270 return self.reinitialize_counter % self.reinitialize_steps == 0
1272 def keyslot(self, key):
1273 """
1274 Calculate keyslot for a given key.
1275 See Keys distribution model in https://redis.io/topics/cluster-spec
1276 """
1277 k = self.encoder.encode(key)
1278 return key_slot(k)
1280 def _get_command_keys(self, *args):
1281 """
1282 Get the keys in the command. If the command has no keys in in, None is
1283 returned.
1285 NOTE: Due to a bug in redis<7.0, this function does not work properly
1286 for EVAL or EVALSHA when the `numkeys` arg is 0.
1287 - issue: https://github.com/redis/redis/issues/9493
1288 - fix: https://github.com/redis/redis/pull/9733
1290 So, don't use this function with EVAL or EVALSHA.
1291 """
1292 redis_conn = self.get_default_node().redis_connection
1293 return self.commands_parser.get_keys(redis_conn, *args)
1295 def determine_slot(self, *args) -> Optional[int]:
1296 """
1297 Figure out what slot to use based on args.
1299 Raises a RedisClusterException if there's a missing key and we can't
1300 determine what slots to map the command to; or, if the keys don't
1301 all map to the same key slot.
1302 """
1303 command = args[0]
1304 if self.command_flags.get(command) == SLOT_ID:
1305 # The command contains the slot ID
1306 return args[1]
1308 # Get the keys in the command
1310 # CLIENT TRACKING is a special case.
1311 # It doesn't have any keys, it needs to be sent to the provided nodes
1312 # By default it will be sent to all nodes.
1313 if command.upper() == "CLIENT TRACKING":
1314 return None
1316 # EVAL and EVALSHA are common enough that it's wasteful to go to the
1317 # redis server to parse the keys. Besides, there is a bug in redis<7.0
1318 # where `self._get_command_keys()` fails anyway. So, we special case
1319 # EVAL/EVALSHA.
1320 if command.upper() in ("EVAL", "EVALSHA"):
1321 # command syntax: EVAL "script body" num_keys ...
1322 if len(args) <= 2:
1323 raise RedisClusterException(f"Invalid args in command: {args}")
1324 num_actual_keys = int(args[2])
1325 eval_keys = args[3 : 3 + num_actual_keys]
1326 # if there are 0 keys, that means the script can be run on any node
1327 # so we can just return a random slot
1328 if len(eval_keys) == 0:
1329 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1330 keys = eval_keys
1331 else:
1332 keys = self._get_command_keys(*args)
1333 if keys is None or len(keys) == 0:
1334 # FCALL can call a function with 0 keys, that means the function
1335 # can be run on any node so we can just return a random slot
1336 if command.upper() in ("FCALL", "FCALL_RO"):
1337 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1338 raise RedisClusterException(
1339 "No way to dispatch this command to Redis Cluster. "
1340 "Missing key.\nYou can execute the command by specifying "
1341 f"target nodes.\nCommand: {args}"
1342 )
1344 # single key command
1345 if len(keys) == 1:
1346 return self.keyslot(keys[0])
1348 # multi-key command; we need to make sure all keys are mapped to
1349 # the same slot
1350 slots = {self.keyslot(key) for key in keys}
1351 if len(slots) != 1:
1352 raise RedisClusterException(
1353 f"{command} - all keys must map to the same key slot"
1354 )
1356 return slots.pop()
1358 def get_encoder(self):
1359 """
1360 Get the connections' encoder
1361 """
1362 return self.encoder
1364 def get_connection_kwargs(self):
1365 """
1366 Get the connections' key-word arguments
1367 """
1368 return self.nodes_manager.connection_kwargs
1370 def _is_nodes_flag(self, target_nodes):
1371 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1373 def _parse_target_nodes(self, target_nodes):
1374 if isinstance(target_nodes, list):
1375 nodes = target_nodes
1376 elif isinstance(target_nodes, ClusterNode):
1377 # Supports passing a single ClusterNode as a variable
1378 nodes = [target_nodes]
1379 elif isinstance(target_nodes, dict):
1380 # Supports dictionaries of the format {node_name: node}.
1381 # It enables to execute commands with multi nodes as follows:
1382 # rc.cluster_save_config(rc.get_primaries())
1383 nodes = target_nodes.values()
1384 else:
1385 raise TypeError(
1386 "target_nodes type can be one of the following: "
1387 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1388 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1389 f"The passed type is {type(target_nodes)}"
1390 )
1391 return nodes
1393 def execute_command(self, *args, **kwargs):
1394 return self._internal_execute_command(*args, **kwargs)
1396 def _internal_execute_command(self, *args, **kwargs):
1397 """
1398 Wrapper for ERRORS_ALLOW_RETRY error handling.
1400 It will try the number of times specified by the retries property from
1401 config option "self.retry" which defaults to 3 unless manually
1402 configured.
1404 If it reaches the number of times, the command will raise the exception
1406 Key argument :target_nodes: can be passed with the following types:
1407 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1408 ClusterNode
1409 list<ClusterNode>
1410 dict<Any, ClusterNode>
1411 """
1412 target_nodes_specified = False
1413 is_default_node = False
1414 target_nodes = None
1415 passed_targets = kwargs.pop("target_nodes", None)
1416 command_policies = self._policy_resolver.resolve(args[0].lower())
1418 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1419 target_nodes = self._parse_target_nodes(passed_targets)
1420 target_nodes_specified = True
1422 if not command_policies and not target_nodes_specified:
1423 command = args[0].upper()
1424 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1425 command = f"{args[0]} {args[1]}".upper()
1427 # We only could resolve key properties if command is not
1428 # in a list of pre-defined request policies
1429 command_flag = self.command_flags.get(command)
1430 if not command_flag:
1431 # Fallback to default policy
1432 if not self.get_default_node():
1433 slot = None
1434 else:
1435 slot = self.determine_slot(*args)
1436 if slot is None:
1437 command_policies = CommandPolicies()
1438 else:
1439 command_policies = CommandPolicies(
1440 request_policy=RequestPolicy.DEFAULT_KEYED,
1441 response_policy=ResponsePolicy.DEFAULT_KEYED,
1442 )
1443 else:
1444 if command_flag in self._command_flags_mapping:
1445 command_policies = CommandPolicies(
1446 request_policy=self._command_flags_mapping[command_flag]
1447 )
1448 else:
1449 command_policies = CommandPolicies()
1450 elif not command_policies and target_nodes_specified:
1451 command_policies = CommandPolicies()
1453 # If an error that allows retrying was thrown, the nodes and slots
1454 # cache were reinitialized. We will retry executing the command with
1455 # the updated cluster setup only when the target nodes can be
1456 # determined again with the new cache tables. Therefore, when target
1457 # nodes were passed to this function, we cannot retry the command
1458 # execution since the nodes may not be valid anymore after the tables
1459 # were reinitialized. So in case of passed target nodes,
1460 # retry_attempts will be set to 0.
1461 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
1462 # Add one for the first execution
1463 execute_attempts = 1 + retry_attempts
1464 failure_count = 0
1466 # Start timing for observability
1467 start_time = time.monotonic()
1469 for _ in range(execute_attempts):
1470 try:
1471 res = {}
1472 if not target_nodes_specified:
1473 # Determine the nodes to execute the command on
1474 target_nodes = self._determine_nodes(
1475 *args,
1476 request_policy=command_policies.request_policy,
1477 nodes_flag=passed_targets,
1478 )
1480 if not target_nodes:
1481 raise RedisClusterException(
1482 f"No targets were found to execute {args} command on"
1483 )
1484 if (
1485 len(target_nodes) == 1
1486 and target_nodes[0] == self.get_default_node()
1487 ):
1488 is_default_node = True
1489 for node in target_nodes:
1490 res[node.name] = self._execute_command(node, *args, **kwargs)
1492 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED:
1493 break
1495 # Return the processed result
1496 return self._process_result(
1497 args[0],
1498 res,
1499 response_policy=command_policies.response_policy,
1500 **kwargs,
1501 )
1502 except Exception as e:
1503 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1504 if is_default_node:
1505 # Replace the default cluster node
1506 self.replace_default_node()
1507 # The nodes and slots cache were reinitialized.
1508 # Try again with the new cluster setup.
1509 retry_attempts -= 1
1510 failure_count += 1
1512 if hasattr(e, "connection"):
1513 self._record_command_metric(
1514 command_name=args[0],
1515 duration_seconds=time.monotonic() - start_time,
1516 connection=e.connection,
1517 error=e,
1518 )
1520 self._record_error_metric(
1521 error=e,
1522 connection=e.connection,
1523 retry_attempts=failure_count,
1524 )
1525 continue
1526 else:
1527 # raise the exception
1528 if hasattr(e, "connection"):
1529 self._record_error_metric(
1530 error=e,
1531 connection=e.connection,
1532 retry_attempts=failure_count,
1533 is_internal=False,
1534 )
1535 raise e
1537 def _execute_command(self, target_node, *args, **kwargs):
1538 """
1539 Send a command to a node in the cluster
1540 """
1541 command = args[0]
1542 redis_node = None
1543 connection = None
1544 redirect_addr = None
1545 asking = False
1546 moved = False
1547 ttl = int(self.RedisClusterRequestTTL)
1549 # Start timing for observability
1550 start_time = time.monotonic()
1552 while ttl > 0:
1553 ttl -= 1
1554 try:
1555 if asking:
1556 target_node = self.get_node(node_name=redirect_addr)
1557 elif moved:
1558 # MOVED occurred and the slots cache was updated,
1559 # refresh the target node
1560 slot = self.determine_slot(*args)
1561 target_node = self.nodes_manager.get_node_from_slot(
1562 slot,
1563 self.read_from_replicas and command in READ_COMMANDS,
1564 self.load_balancing_strategy
1565 if command in READ_COMMANDS
1566 else None,
1567 )
1568 moved = False
1570 redis_node = self.get_redis_connection(target_node)
1571 connection = get_connection(redis_node)
1572 if asking:
1573 connection.send_command("ASKING")
1574 redis_node.parse_response(connection, "ASKING", **kwargs)
1575 asking = False
1576 connection.send_command(*args, **kwargs)
1577 response = redis_node.parse_response(connection, command, **kwargs)
1579 # Remove keys entry, it needs only for cache.
1580 kwargs.pop("keys", None)
1582 if command in self.cluster_response_callbacks:
1583 response = self.cluster_response_callbacks[command](
1584 response, **kwargs
1585 )
1587 self._record_command_metric(
1588 command_name=command,
1589 duration_seconds=time.monotonic() - start_time,
1590 connection=connection,
1591 )
1592 return response
1593 except AuthenticationError as e:
1594 e.connection = connection if connection is not None else target_node
1595 self._record_command_metric(
1596 command_name=command,
1597 duration_seconds=time.monotonic() - start_time,
1598 connection=e.connection,
1599 error=e,
1600 )
1601 raise
1602 except MaxConnectionsError as e:
1603 # MaxConnectionsError indicates client-side resource exhaustion
1604 # (too many connections in the pool), not a node failure.
1605 # Don't treat this as a node failure - just re-raise the error
1606 # without reinitializing the cluster.
1607 # The connection in the error is used to report the metrics based on host and port info
1608 # so we use the target node object which contains the host and port info
1609 # because we did not get the connection yet
1610 e.connection = target_node
1611 self._record_command_metric(
1612 command_name=command,
1613 duration_seconds=time.monotonic() - start_time,
1614 connection=e.connection,
1615 error=e,
1616 )
1617 raise
1618 except (ConnectionError, TimeoutError) as e:
1619 if is_debug_log_enabled():
1620 socket_address = self._extracts_socket_address(connection)
1621 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1622 logger.debug(
1623 f"{type(e).__name__} received for command {args_log_str}, on node {target_node.name}, "
1624 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1625 )
1626 # this is used to report the metrics based on host and port info
1627 e.connection = connection if connection else target_node
1629 # ConnectionError can also be raised if we couldn't get a
1630 # connection from the pool before timing out, so check that
1631 # this is an actual connection before attempting to disconnect.
1632 if connection is not None:
1633 connection.disconnect()
1635 # Instead of setting to None, properly handle the pool
1636 # Get the pool safely - redis_connection could be set to None
1637 # by another thread between the check and access
1638 redis_conn = target_node.redis_connection
1639 if redis_conn is not None:
1640 pool = redis_conn.connection_pool
1641 if pool is not None:
1642 with pool._lock:
1643 # take care for the active connections in the pool
1644 pool.update_active_connections_for_reconnect()
1645 # disconnect all free connections
1646 pool.disconnect_free_connections()
1648 # Move the failed node to the end of the cached nodes list
1649 self.nodes_manager.move_node_to_end_of_cached_nodes(target_node.name)
1651 # DON'T set redis_connection = None - keep the pool for reuse
1652 self.nodes_manager.initialize()
1653 self._record_command_metric(
1654 command_name=command,
1655 duration_seconds=time.monotonic() - start_time,
1656 connection=e.connection,
1657 error=e,
1658 )
1659 raise e
1660 except MovedError as e:
1661 if is_debug_log_enabled():
1662 socket_address = self._extracts_socket_address(connection)
1663 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1664 logger.debug(
1665 f"MOVED error received for command {args_log_str}, on node {target_node.name}, "
1666 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1667 )
1668 # First, we will try to patch the slots/nodes cache with the
1669 # redirected node output and try again. If MovedError exceeds
1670 # 'reinitialize_steps' number of times, we will force
1671 # reinitializing the tables, and then try again.
1672 # 'reinitialize_steps' counter will increase faster when
1673 # the same client object is shared between multiple threads. To
1674 # reduce the frequency you can set this variable in the
1675 # RedisCluster constructor.
1676 self.reinitialize_counter += 1
1677 if self._should_reinitialized():
1678 # during this call all connections are closed or marked for disconnect,
1679 # so we don't need to disconnect the changed node's connections
1680 self.nodes_manager.initialize(
1681 additional_startup_nodes_info=[(e.host, e.port)]
1682 )
1683 # Reset the counter
1684 self.reinitialize_counter = 0
1685 else:
1686 self.nodes_manager.move_slot(e)
1687 moved = True
1688 self._record_command_metric(
1689 command_name=command,
1690 duration_seconds=time.monotonic() - start_time,
1691 connection=connection,
1692 error=e,
1693 )
1694 self._record_error_metric(
1695 error=e,
1696 connection=connection,
1697 )
1698 except TryAgainError as e:
1699 if is_debug_log_enabled():
1700 socket_address = self._extracts_socket_address(connection)
1701 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1702 logger.debug(
1703 f"TRYAGAIN error received for command {args_log_str}, on node {target_node.name}, "
1704 f"and connection: {connection} using local socket address: {socket_address}"
1705 )
1706 if ttl < self.RedisClusterRequestTTL / 2:
1707 time.sleep(0.05)
1709 self._record_command_metric(
1710 command_name=command,
1711 duration_seconds=time.monotonic() - start_time,
1712 connection=connection,
1713 error=e,
1714 )
1715 self._record_error_metric(
1716 error=e,
1717 connection=connection,
1718 )
1719 except AskError as e:
1720 if is_debug_log_enabled():
1721 socket_address = self._extracts_socket_address(connection)
1722 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1723 logger.debug(
1724 f"ASK error received for command {args_log_str}, on node {target_node.name}, "
1725 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1726 )
1727 redirect_addr = get_node_name(host=e.host, port=e.port)
1728 asking = True
1730 self._record_command_metric(
1731 command_name=command,
1732 duration_seconds=time.monotonic() - start_time,
1733 connection=connection,
1734 error=e,
1735 )
1736 self._record_error_metric(
1737 error=e,
1738 connection=connection,
1739 )
1740 except (ClusterDownError, SlotNotCoveredError) as e:
1741 # ClusterDownError can occur during a failover and to get
1742 # self-healed, we will try to reinitialize the cluster layout
1743 # and retry executing the command
1745 # SlotNotCoveredError can occur when the cluster is not fully
1746 # initialized or can be temporary issue.
1747 # We will try to reinitialize the cluster topology
1748 # and retry executing the command
1750 time.sleep(0.25)
1751 self.nodes_manager.initialize()
1753 # if we have a connection, use it, otherwise use the target node
1754 # object which contains the host and port info
1755 # this is used to report the metrics based on host and port info
1756 e.connection = connection if connection else target_node
1757 self._record_command_metric(
1758 command_name=command,
1759 duration_seconds=time.monotonic() - start_time,
1760 connection=e.connection,
1761 error=e,
1762 )
1763 raise
1764 except ResponseError as e:
1765 # this is used to report the metrics based on host and port info
1766 # ResponseError typically happens after get_connection() succeeds,
1767 # so connection should be available
1768 e.connection = connection if connection else target_node
1769 self._record_command_metric(
1770 command_name=command,
1771 duration_seconds=time.monotonic() - start_time,
1772 connection=e.connection,
1773 error=e,
1774 )
1775 raise
1776 except Exception as e:
1777 if connection:
1778 connection.disconnect()
1780 # if we have a connection, use it, otherwise use the target node
1781 # object which contains the host and port info
1782 # this is used to report the metrics based on host and port info
1783 e.connection = connection if connection else target_node
1784 self._record_command_metric(
1785 command_name=command,
1786 duration_seconds=time.monotonic() - start_time,
1787 connection=e.connection,
1788 error=e,
1789 )
1790 raise e
1791 finally:
1792 if connection is not None:
1793 redis_node.connection_pool.release(connection)
1795 e = ClusterError("TTL exhausted.")
1796 # In this case we should have an active connection.
1797 # If we are here, we have received many MOVED or ASK errors and finally exhausted the TTL.
1798 # This means that we used an active connection to read from the socket.
1799 # This is used to report metrics based on the host and port information.
1800 e.connection = connection
1801 self._record_command_metric(
1802 command_name=command,
1803 duration_seconds=time.monotonic() - start_time,
1804 connection=connection,
1805 error=e,
1806 )
1807 raise e
1809 def _record_command_metric(
1810 self,
1811 command_name: str,
1812 duration_seconds: float,
1813 connection: Connection,
1814 error=None,
1815 ):
1816 """
1817 Records operation duration metric directly.
1818 """
1819 host = connection.host if connection else "unknown"
1820 port = connection.port if connection else 0
1821 db = str(connection.db) if connection and hasattr(connection, "db") else "0"
1823 record_operation_duration(
1824 command_name=command_name,
1825 duration_seconds=duration_seconds,
1826 server_address=host,
1827 server_port=port,
1828 db_namespace=db,
1829 error=error,
1830 )
1832 def _record_error_metric(
1833 self,
1834 error: Exception,
1835 connection: Connection,
1836 is_internal: bool = True,
1837 retry_attempts: Optional[int] = None,
1838 ):
1839 """
1840 Records error count metric directly.
1841 """
1842 record_error_count(
1843 server_address=connection.host,
1844 server_port=connection.port,
1845 network_peer_address=connection.host,
1846 network_peer_port=connection.port,
1847 error_type=error,
1848 retry_attempts=retry_attempts if retry_attempts is not None else 0,
1849 is_internal=is_internal,
1850 )
1852 def _extracts_socket_address(
1853 self, connection: Optional[Connection]
1854 ) -> Optional[int]:
1855 if connection is None:
1856 return None
1857 try:
1858 socket_address = (
1859 connection._sock.getsockname() if connection._sock else None
1860 )
1861 socket_address = socket_address[1] if socket_address else None
1862 except (AttributeError, OSError):
1863 pass
1864 return socket_address
1866 def close(self) -> None:
1867 try:
1868 with self._lock:
1869 if self.nodes_manager:
1870 self.nodes_manager.close()
1871 except AttributeError:
1872 # RedisCluster's __init__ can fail before nodes_manager is set
1873 pass
1875 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs):
1876 """
1877 Process the result of the executed command.
1878 The function would return a dict or a single value.
1880 :type command: str
1881 :type res: dict
1883 `res` should be in the following format:
1884 Dict<node_name, command_result>
1885 """
1886 if command in self.result_callbacks:
1887 res = self.result_callbacks[command](command, res, **kwargs)
1888 elif len(res) == 1:
1889 # When we execute the command on a single node, we can
1890 # remove the dictionary and return a single response
1891 res = list(res.values())[0]
1893 return self._policies_callback_mapping[response_policy](res)
1895 def load_external_module(self, funcname, func):
1896 """
1897 This function can be used to add externally defined redis modules,
1898 and their namespaces to the redis client.
1900 ``funcname`` - A string containing the name of the function to create
1901 ``func`` - The function, being added to this class.
1902 """
1903 setattr(self, funcname, func)
1905 def transaction(self, func, *watches, **kwargs):
1906 """
1907 Convenience method for executing the callable `func` as a transaction
1908 while watching all keys specified in `watches`. The 'func' callable
1909 should expect a single argument which is a Pipeline object.
1910 """
1911 shard_hint = kwargs.pop("shard_hint", None)
1912 value_from_callable = kwargs.pop("value_from_callable", False)
1913 watch_delay = kwargs.pop("watch_delay", None)
1914 with self.pipeline(True, shard_hint) as pipe:
1915 while True:
1916 try:
1917 if watches:
1918 pipe.watch(*watches)
1919 func_value = func(pipe)
1920 exec_value = pipe.execute()
1921 return func_value if value_from_callable else exec_value
1922 except WatchError:
1923 if watch_delay is not None and watch_delay > 0:
1924 time.sleep(watch_delay)
1925 continue
1928class ClusterNode:
1929 def __init__(self, host, port, server_type=None, redis_connection=None):
1930 if host == "localhost":
1931 host = socket.gethostbyname(host)
1933 self.host = host
1934 self.port = port
1935 self.name = get_node_name(host, port)
1936 self.server_type = server_type
1937 self.redis_connection = redis_connection
1939 def __repr__(self):
1940 return (
1941 f"[host={self.host},"
1942 f"port={self.port},"
1943 f"name={self.name},"
1944 f"server_type={self.server_type},"
1945 f"redis_connection={self.redis_connection}]"
1946 )
1948 def __eq__(self, obj):
1949 return isinstance(obj, ClusterNode) and obj.name == self.name
1951 def __hash__(self):
1952 return hash(self.name)
1955class LoadBalancingStrategy(Enum):
1956 ROUND_ROBIN = "round_robin"
1957 ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1958 RANDOM_REPLICA = "random_replica"
1961class LoadBalancer:
1962 """
1963 Round-Robin Load Balancing
1964 """
1966 def __init__(self, start_index: int = 0) -> None:
1967 self.primary_to_idx: dict[str, int] = {}
1968 self.start_index: int = start_index
1969 self._lock: threading.Lock = threading.Lock()
1971 def get_server_index(
1972 self,
1973 primary: str,
1974 list_size: int,
1975 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1976 ) -> int:
1977 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1978 return self._get_random_replica_index(list_size)
1979 else:
1980 return self._get_round_robin_index(
1981 primary,
1982 list_size,
1983 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1984 )
1986 def reset(self) -> None:
1987 with self._lock:
1988 self.primary_to_idx.clear()
1990 def _get_random_replica_index(self, list_size: int) -> int:
1991 return random.randint(1, list_size - 1)
1993 def _get_round_robin_index(
1994 self, primary: str, list_size: int, replicas_only: bool
1995 ) -> int:
1996 with self._lock:
1997 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1998 if replicas_only and server_index == 0:
1999 # skip the primary node index
2000 server_index = 1
2001 # Update the index for the next round
2002 self.primary_to_idx[primary] = (server_index + 1) % list_size
2003 return server_index
2006class NodesManager:
2007 def __init__(
2008 self,
2009 startup_nodes: list[ClusterNode],
2010 from_url=False,
2011 require_full_coverage=False,
2012 lock: Optional[threading.RLock] = None,
2013 dynamic_startup_nodes=True,
2014 connection_pool_class=ConnectionPool,
2015 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
2016 cache: Optional[CacheInterface] = None,
2017 cache_config: Optional[CacheConfig] = None,
2018 cache_factory: Optional[CacheFactoryInterface] = None,
2019 event_dispatcher: Optional[EventDispatcher] = None,
2020 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
2021 **kwargs,
2022 ):
2023 self.nodes_cache: dict[str, ClusterNode] = {}
2024 self.slots_cache: dict[int, list[ClusterNode]] = {}
2025 self.startup_nodes: dict[str, ClusterNode] = {n.name: n for n in startup_nodes}
2026 self.default_node: Optional[ClusterNode] = None
2027 self._epoch: int = 0
2028 self.from_url = from_url
2029 self._require_full_coverage = require_full_coverage
2030 self._dynamic_startup_nodes = dynamic_startup_nodes
2031 self.connection_pool_class = connection_pool_class
2032 self.address_remap = address_remap
2033 self._cache: Optional[CacheInterface] = None
2034 if cache:
2035 self._cache = cache
2036 elif cache_factory is not None:
2037 self._cache = cache_factory.get_cache()
2038 elif cache_config is not None:
2039 self._cache = CacheFactory(cache_config).get_cache()
2040 self.connection_kwargs = kwargs
2041 self.read_load_balancer = LoadBalancer()
2043 # nodes_cache / slots_cache / startup_nodes / default_node are protected by _lock
2044 if lock is None:
2045 self._lock = threading.RLock()
2046 else:
2047 self._lock = lock
2049 # initialize holds _initialization_lock to dedup multiple calls to reinitialize;
2050 # note that if we hold both _lock and _initialization_lock, we _must_ acquire
2051 # _initialization_lock first (ie: to have a consistent order) to avoid deadlock.
2052 self._initialization_lock: threading.RLock = threading.RLock()
2054 if event_dispatcher is None:
2055 self._event_dispatcher = EventDispatcher()
2056 else:
2057 self._event_dispatcher = event_dispatcher
2058 self._credential_provider = self.connection_kwargs.get(
2059 "credential_provider", None
2060 )
2061 self.maint_notifications_config = maint_notifications_config
2063 self.initialize()
2065 def get_node(
2066 self,
2067 host: Optional[str] = None,
2068 port: Optional[int] = None,
2069 node_name: Optional[str] = None,
2070 ) -> Optional[ClusterNode]:
2071 """
2072 Get the requested node from the cluster's nodes.
2073 nodes.
2074 :return: ClusterNode if the node exists, else None
2075 """
2076 if host and port:
2077 # the user passed host and port
2078 if host == "localhost":
2079 host = socket.gethostbyname(host)
2080 with self._lock:
2081 return self.nodes_cache.get(get_node_name(host=host, port=port))
2082 elif node_name:
2083 with self._lock:
2084 return self.nodes_cache.get(node_name)
2085 else:
2086 return None
2088 def move_slot(self, e: Union[AskError, MovedError]):
2089 """
2090 Update the slot's node with the redirected one
2091 """
2092 with self._lock:
2093 redirected_node = self.get_node(host=e.host, port=e.port)
2094 if redirected_node is not None:
2095 # The node already exists
2096 if redirected_node.server_type is not PRIMARY:
2097 # Update the node's server type
2098 redirected_node.server_type = PRIMARY
2099 else:
2100 # This is a new node, we will add it to the nodes cache
2101 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
2102 self.nodes_cache[redirected_node.name] = redirected_node
2104 slot_nodes = self.slots_cache[e.slot_id]
2105 if redirected_node not in slot_nodes:
2106 # The new slot owner is a new server, or a server from a different
2107 # shard. We need to remove all current nodes from the slot's list
2108 # (including replications) and add just the new node.
2109 self.slots_cache[e.slot_id] = [redirected_node]
2110 elif redirected_node is not slot_nodes[0]:
2111 # The MOVED error resulted from a failover, and the new slot owner
2112 # had previously been a replica.
2113 old_primary = slot_nodes[0]
2114 # Update the old primary to be a replica and add it to the end of
2115 # the slot's node list
2116 old_primary.server_type = REPLICA
2117 slot_nodes.append(old_primary)
2118 # Remove the old replica, which is now a primary, from the slot's
2119 # node list
2120 slot_nodes.remove(redirected_node)
2121 # Override the old primary with the new one
2122 slot_nodes[0] = redirected_node
2123 if self.default_node == old_primary:
2124 # Update the default node with the new primary
2125 self.default_node = redirected_node
2126 # else: circular MOVED to current primary -> no-op
2128 @deprecated_args(
2129 args_to_warn=["server_type"],
2130 reason=(
2131 "In case you need select some load balancing strategy "
2132 "that will use replicas, please set it through 'load_balancing_strategy'"
2133 ),
2134 version="5.3.0",
2135 )
2136 def get_node_from_slot(
2137 self,
2138 slot: int,
2139 read_from_replicas: bool = False,
2140 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2141 server_type: Optional[Literal["primary", "replica"]] = None,
2142 ) -> ClusterNode:
2143 """
2144 Gets a node that servers this hash slot
2145 """
2147 if read_from_replicas is True and load_balancing_strategy is None:
2148 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
2150 with self._lock:
2151 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
2152 raise SlotNotCoveredError(
2153 f'Slot "{slot}" not covered by the cluster. '
2154 + f'"require_full_coverage={self._require_full_coverage}"'
2155 )
2157 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
2158 # get the server index using the strategy defined in load_balancing_strategy
2159 primary_name = self.slots_cache[slot][0].name
2160 node_idx = self.read_load_balancer.get_server_index(
2161 primary_name, len(self.slots_cache[slot]), load_balancing_strategy
2162 )
2163 elif (
2164 server_type is None
2165 or server_type == PRIMARY
2166 or len(self.slots_cache[slot]) == 1
2167 ):
2168 # return a primary
2169 node_idx = 0
2170 else:
2171 # return a replica
2172 # randomly choose one of the replicas
2173 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
2175 return self.slots_cache[slot][node_idx]
2177 def get_nodes_by_server_type(self, server_type: Literal["primary", "replica"]):
2178 """
2179 Get all nodes with the specified server type
2180 :param server_type: 'primary' or 'replica'
2181 :return: list of ClusterNode
2182 """
2183 with self._lock:
2184 return [
2185 node
2186 for node in self.nodes_cache.values()
2187 if node.server_type == server_type
2188 ]
2190 @deprecated_function(
2191 reason="This method is not used anymore internally. The startup nodes are populated automatically.",
2192 version="7.0.2",
2193 )
2194 def populate_startup_nodes(self, nodes):
2195 """
2196 Populate all startup nodes and filters out any duplicates
2197 """
2198 with self._lock:
2199 for n in nodes:
2200 self.startup_nodes[n.name] = n
2202 def move_node_to_end_of_cached_nodes(self, node_name: str) -> None:
2203 """
2204 Move a failing node to the end of startup_nodes and nodes_cache so it's
2205 tried last during reinitialization and when selecting the default node.
2206 If the node is not in the respective list, nothing is done.
2207 """
2208 # Move in startup_nodes
2209 if node_name in self.startup_nodes and len(self.startup_nodes) > 1:
2210 node = self.startup_nodes.pop(node_name)
2211 self.startup_nodes[node_name] = node # Re-insert at end
2213 # Move in nodes_cache - this affects get_nodes_by_server_type ordering
2214 # which is used to select the default_node during initialize()
2215 if node_name in self.nodes_cache and len(self.nodes_cache) > 1:
2216 node = self.nodes_cache.pop(node_name)
2217 self.nodes_cache[node_name] = node # Re-insert at end
2219 def check_slots_coverage(self, slots_cache):
2220 # Validate if all slots are covered or if we should try next
2221 # startup node
2222 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
2223 if i not in slots_cache:
2224 return False
2225 return True
2227 def create_redis_connections(self, nodes):
2228 """
2229 This function will create a redis connection to all nodes in :nodes:
2230 """
2231 connection_pools = []
2232 for node in nodes:
2233 if node.redis_connection is None:
2234 node.redis_connection = self.create_redis_node(
2235 host=node.host,
2236 port=node.port,
2237 maint_notifications_config=self.maint_notifications_config,
2238 **self.connection_kwargs,
2239 )
2240 connection_pools.append(node.redis_connection.connection_pool)
2242 self._event_dispatcher.dispatch(
2243 AfterPooledConnectionsInstantiationEvent(
2244 connection_pools, ClientType.SYNC, self._credential_provider
2245 )
2246 )
2248 def create_redis_node(
2249 self,
2250 host,
2251 port,
2252 **kwargs,
2253 ):
2254 # We are configuring the connection pool not to retry
2255 # connections on lower level clients to avoid retrying
2256 # connections to nodes that are not reachable
2257 # and to avoid blocking the connection pool.
2258 # The only error that will have some handling in the lower
2259 # level clients is ConnectionError which will trigger disconnection
2260 # of the socket.
2261 # The retries will be handled on cluster client level
2262 # where we will have proper handling of the cluster topology
2263 node_retry_config = Retry(
2264 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
2265 )
2267 if self.from_url:
2268 # Create a redis node with a custom connection pool
2269 kwargs.update({"host": host})
2270 kwargs.update({"port": port})
2271 kwargs.update({"cache": self._cache})
2272 kwargs.update({"retry": node_retry_config})
2273 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
2274 else:
2275 r = Redis(
2276 host=host,
2277 port=port,
2278 cache=self._cache,
2279 retry=node_retry_config,
2280 **kwargs,
2281 )
2282 return r
2284 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
2285 node_name = get_node_name(host, port)
2286 # check if we already have this node in the tmp_nodes_cache
2287 target_node = tmp_nodes_cache.get(node_name)
2288 if target_node is None:
2289 # before creating a new cluster node, check if the cluster node already
2290 # exists in the current nodes cache and has a valid connection so we can
2291 # reuse it
2292 redis_connection: Optional[Redis] = None
2293 with self._lock:
2294 previous_node = self.nodes_cache.get(node_name)
2295 if previous_node:
2296 redis_connection = previous_node.redis_connection
2297 # don't update the old ClusterNode, so we don't update its role
2298 # outside of the lock
2299 target_node = ClusterNode(host, port, role, redis_connection)
2300 # add this node to the nodes cache
2301 tmp_nodes_cache[target_node.name] = target_node
2303 return target_node
2305 def _get_epoch(self) -> int:
2306 """
2307 Get the current epoch value. This method exists primarily to allow
2308 tests to mock the epoch fetch and control race condition timing.
2309 """
2310 with self._lock:
2311 return self._epoch
2313 def initialize(
2314 self,
2315 additional_startup_nodes_info: Optional[List[Tuple[str, int]]] = None,
2316 disconnect_startup_nodes_pools: bool = True,
2317 ):
2318 """
2319 Initializes the nodes cache, slots cache and redis connections.
2320 :startup_nodes:
2321 Responsible for discovering other nodes in the cluster
2322 :disconnect_startup_nodes_pools:
2323 Whether to disconnect the connection pool of the startup nodes
2324 after the initialization is complete. This is useful when the
2325 startup nodes are not part of the cluster and we want to avoid
2326 keeping the connection open.
2327 :additional_startup_nodes_info:
2328 Additional nodes to add temporarily to the startup nodes.
2329 The additional nodes will be used just in the process of extraction of the slots
2330 and nodes information from the cluster.
2331 This is useful when we want to add new nodes to the cluster
2332 and initialize the client
2333 with them.
2334 The format of the list is a list of tuples, where each tuple contains
2335 the host and port of the node.
2336 """
2337 self.reset()
2338 tmp_nodes_cache = {}
2339 tmp_slots = {}
2340 disagreements = []
2341 startup_nodes_reachable = False
2342 fully_covered = False
2343 kwargs = self.connection_kwargs
2344 exception = None
2345 epoch = self._get_epoch()
2346 if additional_startup_nodes_info is None:
2347 additional_startup_nodes_info = []
2349 with self._initialization_lock:
2350 with self._lock:
2351 if epoch != self._epoch:
2352 # another thread has already re-initialized the nodes; don't
2353 # bother running again
2354 return
2356 with self._lock:
2357 startup_nodes = tuple(self.startup_nodes.values())
2359 additional_startup_nodes = [
2360 ClusterNode(host, port) for host, port in additional_startup_nodes_info
2361 ]
2362 if is_debug_log_enabled():
2363 logger.debug(
2364 f"Topology refresh: using additional nodes: {[node.name for node in additional_startup_nodes]}; "
2365 f"and startup nodes: {[node.name for node in startup_nodes]}"
2366 )
2368 for startup_node in (*startup_nodes, *additional_startup_nodes):
2369 try:
2370 if startup_node.redis_connection:
2371 r = startup_node.redis_connection
2373 else:
2374 # Create a new Redis connection
2375 if is_debug_log_enabled():
2376 socket_timeout = kwargs.get("socket_timeout", "not set")
2377 socket_connect_timeout = kwargs.get(
2378 "socket_connect_timeout", "not set"
2379 )
2380 maint_enabled = (
2381 self.maint_notifications_config.enabled
2382 if self.maint_notifications_config
2383 else False
2384 )
2385 logger.debug(
2386 "Topology refresh: Creating new Redis connection to "
2387 f"{startup_node.host}:{startup_node.port}; "
2388 f"with socket_timeout: {socket_timeout}, and "
2389 f"socket_connect_timeout: {socket_connect_timeout}, "
2390 "and maint_notifications enabled: "
2391 f"{maint_enabled}"
2392 )
2393 r = self.create_redis_node(
2394 startup_node.host,
2395 startup_node.port,
2396 maint_notifications_config=self.maint_notifications_config,
2397 **kwargs,
2398 )
2399 if startup_node in self.startup_nodes.values():
2400 self.startup_nodes[startup_node.name].redis_connection = r
2401 else:
2402 startup_node.redis_connection = r
2403 try:
2404 # Make sure cluster mode is enabled on this node
2405 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
2406 if disconnect_startup_nodes_pools:
2407 with r.connection_pool._lock:
2408 # take care to clear connections before we move on
2409 # mark all active connections for reconnect - they will be
2410 # reconnected on next use, but will allow current in flight commands to complete first
2411 r.connection_pool.update_active_connections_for_reconnect()
2412 # Needed to clear READONLY state when it is no longer applicable
2413 r.connection_pool.disconnect_free_connections()
2414 except ResponseError:
2415 raise RedisClusterException(
2416 "Cluster mode is not enabled on this node"
2417 )
2418 startup_nodes_reachable = True
2419 except Exception as e:
2420 # Try the next startup node.
2421 # The exception is saved and raised only if we have no more nodes.
2422 exception = e
2423 continue
2425 # CLUSTER SLOTS command results in the following output:
2426 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
2427 # where each node contains the following list: [IP, port, node_id]
2428 # Therefore, cluster_slots[0][2][0] will be the IP address of the
2429 # primary node of the first slot section.
2430 # If there's only one server in the cluster, its ``host`` is ''
2431 # Fix it to the host in startup_nodes
2432 if (
2433 len(cluster_slots) == 1
2434 and len(cluster_slots[0][2][0]) == 0
2435 and len(self.startup_nodes) == 1
2436 ):
2437 cluster_slots[0][2][0] = startup_node.host
2439 for slot in cluster_slots:
2440 primary_node = slot[2]
2441 host = str_if_bytes(primary_node[0])
2442 if host == "":
2443 host = startup_node.host
2444 port = int(primary_node[1])
2445 host, port = self.remap_host_port(host, port)
2447 nodes_for_slot = []
2449 target_node = self._get_or_create_cluster_node(
2450 host, port, PRIMARY, tmp_nodes_cache
2451 )
2452 nodes_for_slot.append(target_node)
2454 replica_nodes = slot[3:]
2455 for replica_node in replica_nodes:
2456 host = str_if_bytes(replica_node[0])
2457 port = int(replica_node[1])
2458 host, port = self.remap_host_port(host, port)
2459 target_replica_node = self._get_or_create_cluster_node(
2460 host, port, REPLICA, tmp_nodes_cache
2461 )
2462 nodes_for_slot.append(target_replica_node)
2464 for i in range(int(slot[0]), int(slot[1]) + 1):
2465 if i not in tmp_slots:
2466 tmp_slots[i] = nodes_for_slot
2467 else:
2468 # Validate that 2 nodes want to use the same slot cache
2469 # setup
2470 tmp_slot = tmp_slots[i][0]
2471 if tmp_slot.name != target_node.name:
2472 disagreements.append(
2473 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
2474 )
2476 if len(disagreements) > 5:
2477 raise RedisClusterException(
2478 f"startup_nodes could not agree on a valid "
2479 f"slots cache: {', '.join(disagreements)}"
2480 )
2482 fully_covered = self.check_slots_coverage(tmp_slots)
2483 if fully_covered:
2484 # Don't need to continue to the next startup node if all
2485 # slots are covered
2486 break
2488 if not startup_nodes_reachable:
2489 raise RedisClusterException(
2490 f"Redis Cluster cannot be connected. Please provide at least "
2491 f"one reachable node: {str(exception)}"
2492 ) from exception
2494 # Create Redis connections to all nodes
2495 self.create_redis_connections(list(tmp_nodes_cache.values()))
2497 # Check if the slots are not fully covered
2498 if not fully_covered and self._require_full_coverage:
2499 # Despite the requirement that the slots be covered, there
2500 # isn't a full coverage
2501 raise RedisClusterException(
2502 f"All slots are not covered after query all startup_nodes. "
2503 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
2504 f"covered..."
2505 )
2507 # Set the tmp variables to the real variables
2508 with self._lock:
2509 self.nodes_cache = tmp_nodes_cache
2510 self.slots_cache = tmp_slots
2511 # Set the default node
2512 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
2513 if self._dynamic_startup_nodes:
2514 # Populate the startup nodes with all discovered nodes
2515 self.startup_nodes = tmp_nodes_cache
2516 # Increment the epoch to signal that initialization has completed
2517 self._epoch += 1
2519 def close(self) -> None:
2520 with self._lock:
2521 self.default_node = None
2522 nodes = tuple(self.nodes_cache.values())
2523 for node in nodes:
2524 if node.redis_connection:
2525 node.redis_connection.close()
2527 def reset(self):
2528 try:
2529 self.read_load_balancer.reset()
2530 except TypeError:
2531 # The read_load_balancer is None, do nothing
2532 pass
2534 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
2535 """
2536 Remap the host and port returned from the cluster to a different
2537 internal value. Useful if the client is not connecting directly
2538 to the cluster.
2539 """
2540 if self.address_remap:
2541 return self.address_remap((host, port))
2542 return host, port
2544 def find_connection_owner(self, connection: Connection) -> Optional[ClusterNode]:
2545 node_name = get_node_name(connection.host, connection.port)
2546 with self._lock:
2547 for node in tuple(self.nodes_cache.values()):
2548 if node.redis_connection:
2549 conn_args = node.redis_connection.connection_pool.connection_kwargs
2550 if node_name == get_node_name(
2551 conn_args.get("host"), conn_args.get("port")
2552 ):
2553 return node
2554 return None
2557class ClusterPubSub(PubSub):
2558 """
2559 Wrapper for PubSub class.
2561 IMPORTANT: before using ClusterPubSub, read about the known limitations
2562 with pubsub in Cluster mode and learn how to workaround them:
2563 https://redis.readthedocs.io/en/stable/clustering.html#known-pubsub-limitations
2564 """
2566 def __init__(
2567 self,
2568 redis_cluster,
2569 node=None,
2570 host=None,
2571 port=None,
2572 push_handler_func=None,
2573 event_dispatcher: Optional["EventDispatcher"] = None,
2574 **kwargs,
2575 ):
2576 """
2577 When a pubsub instance is created without specifying a node, a single
2578 node will be transparently chosen for the pubsub connection on the
2579 first command execution. The node will be determined by:
2580 1. Hashing the channel name in the request to find its keyslot
2581 2. Selecting a node that handles the keyslot: If read_from_replicas is
2582 set to true or load_balancing_strategy is set, a replica can be selected.
2584 :type redis_cluster: RedisCluster
2585 :type node: ClusterNode
2586 :type host: str
2587 :type port: int
2588 """
2589 self.node = None
2590 self.set_pubsub_node(redis_cluster, node, host, port)
2591 connection_pool = (
2592 None
2593 if self.node is None
2594 else redis_cluster.get_redis_connection(self.node).connection_pool
2595 )
2596 self.cluster = redis_cluster
2597 self.node_pubsub_mapping = {}
2598 self._pubsubs_generator = self._pubsubs_generator()
2599 if event_dispatcher is None:
2600 self._event_dispatcher = EventDispatcher()
2601 else:
2602 self._event_dispatcher = event_dispatcher
2603 super().__init__(
2604 connection_pool=connection_pool,
2605 encoder=redis_cluster.encoder,
2606 push_handler_func=push_handler_func,
2607 event_dispatcher=self._event_dispatcher,
2608 **kwargs,
2609 )
2611 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
2612 """
2613 The pubsub node will be set according to the passed node, host and port
2614 When none of the node, host, or port are specified - the node is set
2615 to None and will be determined by the keyslot of the channel in the
2616 first command to be executed.
2617 RedisClusterException will be thrown if the passed node does not exist
2618 in the cluster.
2619 If host is passed without port, or vice versa, a DataError will be
2620 thrown.
2621 :type cluster: RedisCluster
2622 :type node: ClusterNode
2623 :type host: str
2624 :type port: int
2625 """
2626 if node is not None:
2627 # node is passed by the user
2628 self._raise_on_invalid_node(cluster, node, node.host, node.port)
2629 pubsub_node = node
2630 elif host is not None and port is not None:
2631 # host and port passed by the user
2632 node = cluster.get_node(host=host, port=port)
2633 self._raise_on_invalid_node(cluster, node, host, port)
2634 pubsub_node = node
2635 elif any([host, port]) is True:
2636 # only 'host' or 'port' passed
2637 raise DataError("Passing a host requires passing a port, and vice versa")
2638 else:
2639 # nothing passed by the user. set node to None
2640 pubsub_node = None
2642 self.node = pubsub_node
2644 def get_pubsub_node(self):
2645 """
2646 Get the node that is being used as the pubsub connection
2647 """
2648 return self.node
2650 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
2651 """
2652 Raise a RedisClusterException if the node is None or doesn't exist in
2653 the cluster.
2654 """
2655 if node is None or redis_cluster.get_node(node_name=node.name) is None:
2656 raise RedisClusterException(
2657 f"Node {host}:{port} doesn't exist in the cluster"
2658 )
2660 def execute_command(self, *args):
2661 """
2662 Execute a subscribe/unsubscribe command.
2664 Taken code from redis-py and tweak to make it work within a cluster.
2665 """
2666 # NOTE: don't parse the response in this function -- it could pull a
2667 # legitimate message off the stack if the connection is already
2668 # subscribed to one or more channels
2670 if self.connection is None:
2671 if self.connection_pool is None:
2672 if len(args) > 1:
2673 # Hash the first channel and get one of the nodes holding
2674 # this slot
2675 channel = args[1]
2676 slot = self.cluster.keyslot(channel)
2677 node = self.cluster.nodes_manager.get_node_from_slot(
2678 slot,
2679 self.cluster.read_from_replicas,
2680 self.cluster.load_balancing_strategy,
2681 )
2682 else:
2683 # Get a random node
2684 node = self.cluster.get_random_node()
2685 self.node = node
2686 redis_connection = self.cluster.get_redis_connection(node)
2687 self.connection_pool = redis_connection.connection_pool
2688 self.connection = self.connection_pool.get_connection()
2689 # register a callback that re-subscribes to any channels we
2690 # were listening to when we were disconnected
2691 self.connection.register_connect_callback(self.on_connect)
2692 if self.push_handler_func is not None:
2693 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
2694 self._event_dispatcher.dispatch(
2695 AfterPubSubConnectionInstantiationEvent(
2696 self.connection, self.connection_pool, ClientType.SYNC, self._lock
2697 )
2698 )
2699 connection = self.connection
2700 self._execute(connection, connection.send_command, *args)
2702 def _get_node_pubsub(self, node):
2703 try:
2704 return self.node_pubsub_mapping[node.name]
2705 except KeyError:
2706 pubsub = node.redis_connection.pubsub(
2707 push_handler_func=self.push_handler_func
2708 )
2709 self.node_pubsub_mapping[node.name] = pubsub
2710 return pubsub
2712 def _sharded_message_generator(self, timeout=0.0):
2713 for _ in range(len(self.node_pubsub_mapping)):
2714 pubsub = next(self._pubsubs_generator)
2715 # Don't pass ignore_subscribe_messages here - let get_sharded_message
2716 # handle the filtering after processing subscription state changes
2717 message = pubsub.get_message(
2718 ignore_subscribe_messages=False, timeout=timeout
2719 )
2720 if message is not None:
2721 return message
2722 return None
2724 def _pubsubs_generator(self):
2725 while True:
2726 current_nodes = list(self.node_pubsub_mapping.values())
2727 if not current_nodes:
2728 return # Avoid infinite loop when no subscriptions exist
2729 yield from current_nodes
2731 def get_sharded_message(
2732 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
2733 ):
2734 if target_node:
2735 # Don't pass ignore_subscribe_messages here - let get_sharded_message
2736 # handle the filtering after processing subscription state changes
2737 message = self.node_pubsub_mapping[target_node.name].get_message(
2738 ignore_subscribe_messages=False, timeout=timeout
2739 )
2740 else:
2741 message = self._sharded_message_generator(timeout=timeout)
2742 if message is None:
2743 return None
2744 elif str_if_bytes(message["type"]) == "sunsubscribe":
2745 if message["channel"] in self.pending_unsubscribe_shard_channels:
2746 self.pending_unsubscribe_shard_channels.remove(message["channel"])
2747 self.shard_channels.pop(message["channel"], None)
2748 node = self.cluster.get_node_from_key(message["channel"])
2749 if self.node_pubsub_mapping[node.name].subscribed is False:
2750 self.node_pubsub_mapping.pop(node.name)
2751 if not self.channels and not self.patterns and not self.shard_channels:
2752 # There are no subscriptions anymore, set subscribed_event flag
2753 # to false
2754 self.subscribed_event.clear()
2755 # Only suppress subscribe/unsubscribe messages, not data messages (smessage)
2756 if str_if_bytes(message["type"]) in ("ssubscribe", "sunsubscribe"):
2757 if self.ignore_subscribe_messages or ignore_subscribe_messages:
2758 return None
2759 return message
2761 def ssubscribe(self, *args, **kwargs):
2762 if args:
2763 args = list_or_args(args[0], args[1:])
2764 s_channels = dict.fromkeys(args)
2765 s_channels.update(kwargs)
2766 for s_channel, handler in s_channels.items():
2767 node = self.cluster.get_node_from_key(s_channel)
2768 pubsub = self._get_node_pubsub(node)
2769 if handler:
2770 pubsub.ssubscribe(**{s_channel: handler})
2771 else:
2772 pubsub.ssubscribe(s_channel)
2773 self.shard_channels.update(pubsub.shard_channels)
2774 self.pending_unsubscribe_shard_channels.difference_update(
2775 self._normalize_keys({s_channel: None})
2776 )
2777 if pubsub.subscribed and not self.subscribed:
2778 self.subscribed_event.set()
2779 self.health_check_response_counter = 0
2781 def sunsubscribe(self, *args):
2782 if args:
2783 args = list_or_args(args[0], args[1:])
2784 else:
2785 args = self.shard_channels
2787 for s_channel in args:
2788 node = self.cluster.get_node_from_key(s_channel)
2789 p = self._get_node_pubsub(node)
2790 p.sunsubscribe(s_channel)
2791 self.pending_unsubscribe_shard_channels.update(
2792 p.pending_unsubscribe_shard_channels
2793 )
2795 def get_redis_connection(self):
2796 """
2797 Get the Redis connection of the pubsub connected node.
2798 """
2799 if self.node is not None:
2800 return self.node.redis_connection
2802 def disconnect(self):
2803 """
2804 Disconnect the pubsub connection.
2805 """
2806 if self.connection:
2807 self.connection.disconnect()
2808 for pubsub in self.node_pubsub_mapping.values():
2809 pubsub.connection.disconnect()
2812class ClusterPipeline(RedisCluster):
2813 """
2814 Support for Redis pipeline
2815 in cluster mode
2816 """
2818 ERRORS_ALLOW_RETRY = (
2819 ConnectionError,
2820 TimeoutError,
2821 MovedError,
2822 AskError,
2823 TryAgainError,
2824 )
2826 NO_SLOTS_COMMANDS = {"UNWATCH"}
2827 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
2828 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
2830 @deprecated_args(
2831 args_to_warn=[
2832 "cluster_error_retry_attempts",
2833 ],
2834 reason="Please configure the 'retry' object instead",
2835 version="6.0.0",
2836 )
2837 def __init__(
2838 self,
2839 nodes_manager: "NodesManager",
2840 commands_parser: "CommandsParser",
2841 result_callbacks: Optional[Dict[str, Callable]] = None,
2842 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
2843 startup_nodes: Optional[List["ClusterNode"]] = None,
2844 read_from_replicas: bool = False,
2845 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2846 cluster_error_retry_attempts: int = 3,
2847 reinitialize_steps: int = 5,
2848 retry: Optional[Retry] = None,
2849 lock=None,
2850 transaction=False,
2851 policy_resolver: PolicyResolver = StaticPolicyResolver(),
2852 event_dispatcher: Optional["EventDispatcher"] = None,
2853 **kwargs,
2854 ):
2855 """ """
2856 self.command_stack = []
2857 self.nodes_manager = nodes_manager
2858 self.commands_parser = commands_parser
2859 self.refresh_table_asap = False
2860 self.result_callbacks = (
2861 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
2862 )
2863 self.startup_nodes = startup_nodes if startup_nodes else []
2864 self.read_from_replicas = read_from_replicas
2865 self.load_balancing_strategy = load_balancing_strategy
2866 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
2867 self.cluster_response_callbacks = cluster_response_callbacks
2868 self.reinitialize_counter = 0
2869 self.reinitialize_steps = reinitialize_steps
2870 if retry is not None:
2871 self.retry = retry
2872 else:
2873 self.retry = Retry(
2874 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
2875 retries=cluster_error_retry_attempts,
2876 )
2878 self.encoder = Encoder(
2879 kwargs.get("encoding", "utf-8"),
2880 kwargs.get("encoding_errors", "strict"),
2881 kwargs.get("decode_responses", False),
2882 )
2883 if lock is None:
2884 lock = threading.RLock()
2885 self._lock = lock
2886 self.parent_execute_command = super().execute_command
2887 self._execution_strategy: ExecutionStrategy = (
2888 PipelineStrategy(self) if not transaction else TransactionStrategy(self)
2889 )
2891 # For backward compatibility, mapping from existing policies to new one
2892 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
2893 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
2894 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
2895 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
2896 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
2897 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
2898 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
2899 }
2901 self._policies_callback_mapping: dict[
2902 Union[RequestPolicy, ResponsePolicy], Callable
2903 ] = {
2904 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
2905 self.get_random_primary_or_all_nodes(command_name)
2906 ],
2907 RequestPolicy.DEFAULT_KEYED: lambda command,
2908 *args: self.get_nodes_from_slot(command, *args),
2909 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
2910 RequestPolicy.ALL_SHARDS: self.get_primaries,
2911 RequestPolicy.ALL_NODES: self.get_nodes,
2912 RequestPolicy.ALL_REPLICAS: self.get_replicas,
2913 RequestPolicy.MULTI_SHARD: lambda *args,
2914 **kwargs: self._split_multi_shard_command(*args, **kwargs),
2915 RequestPolicy.SPECIAL: self.get_special_nodes,
2916 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
2917 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
2918 }
2920 self._policy_resolver = policy_resolver
2922 if event_dispatcher is None:
2923 self._event_dispatcher = EventDispatcher()
2924 else:
2925 self._event_dispatcher = event_dispatcher
2927 def __repr__(self):
2928 """ """
2929 return f"{type(self).__name__}"
2931 def __enter__(self):
2932 """ """
2933 return self
2935 def __exit__(self, exc_type, exc_value, traceback):
2936 """ """
2937 self.reset()
2939 def __del__(self):
2940 try:
2941 self.reset()
2942 except Exception:
2943 pass
2945 def __len__(self):
2946 """ """
2947 return len(self._execution_strategy.command_queue)
2949 def __bool__(self):
2950 "Pipeline instances should always evaluate to True on Python 3+"
2951 return True
2953 def execute_command(self, *args, **kwargs):
2954 """
2955 Wrapper function for pipeline_execute_command
2956 """
2957 return self._execution_strategy.execute_command(*args, **kwargs)
2959 def pipeline_execute_command(self, *args, **options):
2960 """
2961 Stage a command to be executed when execute() is next called
2963 Returns the current Pipeline object back so commands can be
2964 chained together, such as:
2966 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
2968 At some other point, you can then run: pipe.execute(),
2969 which will execute all commands queued in the pipe.
2970 """
2971 return self._execution_strategy.execute_command(*args, **options)
2973 def annotate_exception(self, exception, number, command):
2974 """
2975 Provides extra context to the exception prior to it being handled
2976 """
2977 self._execution_strategy.annotate_exception(exception, number, command)
2979 def execute(self, raise_on_error: bool = True) -> List[Any]:
2980 """
2981 Execute all the commands in the current pipeline
2982 """
2984 try:
2985 return self._execution_strategy.execute(raise_on_error)
2986 finally:
2987 self.reset()
2989 def reset(self):
2990 """
2991 Reset back to empty pipeline.
2992 """
2993 self._execution_strategy.reset()
2995 def send_cluster_commands(
2996 self, stack, raise_on_error=True, allow_redirections=True
2997 ):
2998 return self._execution_strategy.send_cluster_commands(
2999 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
3000 )
3002 def exists(self, *keys):
3003 return self._execution_strategy.exists(*keys)
3005 def eval(self):
3006 """ """
3007 return self._execution_strategy.eval()
3009 def multi(self):
3010 """
3011 Start a transactional block of the pipeline after WATCH commands
3012 are issued. End the transactional block with `execute`.
3013 """
3014 self._execution_strategy.multi()
3016 def load_scripts(self):
3017 """ """
3018 self._execution_strategy.load_scripts()
3020 def discard(self):
3021 """ """
3022 self._execution_strategy.discard()
3024 def watch(self, *names):
3025 """Watches the values at keys ``names``"""
3026 self._execution_strategy.watch(*names)
3028 def unwatch(self):
3029 """Unwatches all previously specified keys"""
3030 self._execution_strategy.unwatch()
3032 def script_load_for_pipeline(self, *args, **kwargs):
3033 self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
3035 def delete(self, *names):
3036 self._execution_strategy.delete(*names)
3038 def unlink(self, *names):
3039 self._execution_strategy.unlink(*names)
3042def block_pipeline_command(name: str) -> Callable[..., Any]:
3043 """
3044 Prints error because some pipelined commands should
3045 be blocked when running in cluster-mode
3046 """
3048 def inner(*args, **kwargs):
3049 raise RedisClusterException(
3050 f"ERROR: Calling pipelined function {name} is blocked "
3051 f"when running redis in cluster mode..."
3052 )
3054 return inner
3057# Blocked pipeline commands
3058PIPELINE_BLOCKED_COMMANDS = (
3059 "BGREWRITEAOF",
3060 "BGSAVE",
3061 "BITOP",
3062 "BRPOPLPUSH",
3063 "CLIENT GETNAME",
3064 "CLIENT KILL",
3065 "CLIENT LIST",
3066 "CLIENT SETNAME",
3067 "CLIENT",
3068 "CONFIG GET",
3069 "CONFIG RESETSTAT",
3070 "CONFIG REWRITE",
3071 "CONFIG SET",
3072 "CONFIG",
3073 "DBSIZE",
3074 "ECHO",
3075 "EVALSHA",
3076 "FLUSHALL",
3077 "FLUSHDB",
3078 "INFO",
3079 "KEYS",
3080 "LASTSAVE",
3081 "MGET",
3082 "MGET NONATOMIC",
3083 "MOVE",
3084 "MSET",
3085 "MSETEX",
3086 "MSET NONATOMIC",
3087 "MSETNX",
3088 "PFCOUNT",
3089 "PFMERGE",
3090 "PING",
3091 "PUBLISH",
3092 "RANDOMKEY",
3093 "READONLY",
3094 "READWRITE",
3095 "RENAME",
3096 "RENAMENX",
3097 "RPOPLPUSH",
3098 "SAVE",
3099 "SCAN",
3100 "SCRIPT EXISTS",
3101 "SCRIPT FLUSH",
3102 "SCRIPT KILL",
3103 "SCRIPT LOAD",
3104 "SCRIPT",
3105 "SDIFF",
3106 "SDIFFSTORE",
3107 "SENTINEL GET MASTER ADDR BY NAME",
3108 "SENTINEL MASTER",
3109 "SENTINEL MASTERS",
3110 "SENTINEL MONITOR",
3111 "SENTINEL REMOVE",
3112 "SENTINEL SENTINELS",
3113 "SENTINEL SET",
3114 "SENTINEL SLAVES",
3115 "SENTINEL",
3116 "SHUTDOWN",
3117 "SINTER",
3118 "SINTERSTORE",
3119 "SLAVEOF",
3120 "SLOWLOG GET",
3121 "SLOWLOG LEN",
3122 "SLOWLOG RESET",
3123 "SLOWLOG",
3124 "SMOVE",
3125 "SORT",
3126 "SUNION",
3127 "SUNIONSTORE",
3128 "TIME",
3129)
3130for command in PIPELINE_BLOCKED_COMMANDS:
3131 command = command.replace(" ", "_").lower()
3133 setattr(ClusterPipeline, command, block_pipeline_command(command))
3136class PipelineCommand:
3137 """ """
3139 def __init__(self, args, options=None, position=None):
3140 self.args = args
3141 if options is None:
3142 options = {}
3143 self.options = options
3144 self.position = position
3145 self.result = None
3146 self.node = None
3147 self.asking = False
3148 self.command_policies: Optional[CommandPolicies] = None
3151class NodeCommands:
3152 """ """
3154 def __init__(
3155 self, parse_response, connection_pool: ConnectionPool, connection: Connection
3156 ):
3157 """ """
3158 self.parse_response = parse_response
3159 self.connection_pool = connection_pool
3160 self.connection = connection
3161 self.commands = []
3163 def append(self, c):
3164 """ """
3165 self.commands.append(c)
3167 def write(self):
3168 """
3169 Code borrowed from Redis so it can be fixed
3170 """
3171 connection = self.connection
3172 commands = self.commands
3174 # We are going to clobber the commands with the write, so go ahead
3175 # and ensure that nothing is sitting there from a previous run.
3176 for c in commands:
3177 c.result = None
3179 # build up all commands into a single request to increase network perf
3180 # send all the commands and catch connection and timeout errors.
3181 try:
3182 connection.send_packed_command(
3183 connection.pack_commands([c.args for c in commands])
3184 )
3185 except (ConnectionError, TimeoutError) as e:
3186 for c in commands:
3187 c.result = e
3189 def read(self):
3190 """ """
3191 connection = self.connection
3192 for c in self.commands:
3193 # if there is a result on this command,
3194 # it means we ran into an exception
3195 # like a connection error. Trying to parse
3196 # a response on a connection that
3197 # is no longer open will result in a
3198 # connection error raised by redis-py.
3199 # but redis-py doesn't check in parse_response
3200 # that the sock object is
3201 # still set and if you try to
3202 # read from a closed connection, it will
3203 # result in an AttributeError because
3204 # it will do a readline() call on None.
3205 # This can have all kinds of nasty side-effects.
3206 # Treating this case as a connection error
3207 # is fine because it will dump
3208 # the connection object back into the
3209 # pool and on the next write, it will
3210 # explicitly open the connection and all will be well.
3211 if c.result is None:
3212 try:
3213 c.result = self.parse_response(connection, c.args[0], **c.options)
3214 except (ConnectionError, TimeoutError) as e:
3215 for c in self.commands:
3216 c.result = e
3217 return
3218 except RedisError:
3219 c.result = sys.exc_info()[1]
3222class ExecutionStrategy(ABC):
3223 @property
3224 @abstractmethod
3225 def command_queue(self):
3226 pass
3228 @abstractmethod
3229 def execute_command(self, *args, **kwargs):
3230 """
3231 Execution flow for current execution strategy.
3233 See: ClusterPipeline.execute_command()
3234 """
3235 pass
3237 @abstractmethod
3238 def annotate_exception(self, exception, number, command):
3239 """
3240 Annotate exception according to current execution strategy.
3242 See: ClusterPipeline.annotate_exception()
3243 """
3244 pass
3246 @abstractmethod
3247 def pipeline_execute_command(self, *args, **options):
3248 """
3249 Pipeline execution flow for current execution strategy.
3251 See: ClusterPipeline.pipeline_execute_command()
3252 """
3253 pass
3255 @abstractmethod
3256 def execute(self, raise_on_error: bool = True) -> List[Any]:
3257 """
3258 Executes current execution strategy.
3260 See: ClusterPipeline.execute()
3261 """
3262 pass
3264 @abstractmethod
3265 def send_cluster_commands(
3266 self, stack, raise_on_error=True, allow_redirections=True
3267 ):
3268 """
3269 Sends commands according to current execution strategy.
3271 See: ClusterPipeline.send_cluster_commands()
3272 """
3273 pass
3275 @abstractmethod
3276 def reset(self):
3277 """
3278 Resets current execution strategy.
3280 See: ClusterPipeline.reset()
3281 """
3282 pass
3284 @abstractmethod
3285 def exists(self, *keys):
3286 pass
3288 @abstractmethod
3289 def eval(self):
3290 pass
3292 @abstractmethod
3293 def multi(self):
3294 """
3295 Starts transactional context.
3297 See: ClusterPipeline.multi()
3298 """
3299 pass
3301 @abstractmethod
3302 def load_scripts(self):
3303 pass
3305 @abstractmethod
3306 def watch(self, *names):
3307 pass
3309 @abstractmethod
3310 def unwatch(self):
3311 """
3312 Unwatches all previously specified keys
3314 See: ClusterPipeline.unwatch()
3315 """
3316 pass
3318 @abstractmethod
3319 def script_load_for_pipeline(self, *args, **kwargs):
3320 pass
3322 @abstractmethod
3323 def delete(self, *names):
3324 """
3325 "Delete a key specified by ``names``"
3327 See: ClusterPipeline.delete()
3328 """
3329 pass
3331 @abstractmethod
3332 def unlink(self, *names):
3333 """
3334 "Unlink a key specified by ``names``"
3336 See: ClusterPipeline.unlink()
3337 """
3338 pass
3340 @abstractmethod
3341 def discard(self):
3342 pass
3345class AbstractStrategy(ExecutionStrategy):
3346 def __init__(
3347 self,
3348 pipe: ClusterPipeline,
3349 ):
3350 self._command_queue: List[PipelineCommand] = []
3351 self._pipe = pipe
3352 self._nodes_manager = self._pipe.nodes_manager
3354 @property
3355 def command_queue(self):
3356 return self._command_queue
3358 @command_queue.setter
3359 def command_queue(self, queue: List[PipelineCommand]):
3360 self._command_queue = queue
3362 @abstractmethod
3363 def execute_command(self, *args, **kwargs):
3364 pass
3366 def pipeline_execute_command(self, *args, **options):
3367 self._command_queue.append(
3368 PipelineCommand(args, options, len(self._command_queue))
3369 )
3370 return self._pipe
3372 @abstractmethod
3373 def execute(self, raise_on_error: bool = True) -> List[Any]:
3374 pass
3376 @abstractmethod
3377 def send_cluster_commands(
3378 self, stack, raise_on_error=True, allow_redirections=True
3379 ):
3380 pass
3382 @abstractmethod
3383 def reset(self):
3384 pass
3386 def exists(self, *keys):
3387 return self.execute_command("EXISTS", *keys)
3389 def eval(self):
3390 """ """
3391 raise RedisClusterException("method eval() is not implemented")
3393 def load_scripts(self):
3394 """ """
3395 raise RedisClusterException("method load_scripts() is not implemented")
3397 def script_load_for_pipeline(self, *args, **kwargs):
3398 """ """
3399 raise RedisClusterException(
3400 "method script_load_for_pipeline() is not implemented"
3401 )
3403 def annotate_exception(self, exception, number, command):
3404 """
3405 Provides extra context to the exception prior to it being handled
3406 """
3407 cmd = " ".join(map(safe_str, command))
3408 msg = (
3409 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
3410 f"caused error: {exception.args[0]}"
3411 )
3412 exception.args = (msg,) + exception.args[1:]
3415class PipelineStrategy(AbstractStrategy):
3416 def __init__(self, pipe: ClusterPipeline):
3417 super().__init__(pipe)
3418 self.command_flags = pipe.command_flags
3420 def execute_command(self, *args, **kwargs):
3421 return self.pipeline_execute_command(*args, **kwargs)
3423 def _raise_first_error(self, stack, start_time):
3424 """
3425 Raise the first exception on the stack
3426 """
3427 for c in stack:
3428 r = c.result
3429 if isinstance(r, Exception):
3430 self.annotate_exception(r, c.position + 1, c.args)
3432 record_operation_duration(
3433 command_name="PIPELINE",
3434 duration_seconds=time.monotonic() - start_time,
3435 error=r,
3436 )
3438 raise r
3440 def execute(self, raise_on_error: bool = True) -> List[Any]:
3441 stack = self._command_queue
3442 if not stack:
3443 return []
3445 try:
3446 return self.send_cluster_commands(stack, raise_on_error)
3447 finally:
3448 self.reset()
3450 def reset(self):
3451 """
3452 Reset back to empty pipeline.
3453 """
3454 self._command_queue = []
3456 def send_cluster_commands(
3457 self, stack, raise_on_error=True, allow_redirections=True
3458 ):
3459 """
3460 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
3462 If one of the retryable exceptions has been thrown we assume that:
3463 - connection_pool was disconnected
3464 - connection_pool was reset
3465 - refresh_table_asap set to True
3467 It will try the number of times specified by
3468 the retries in config option "self.retry"
3469 which defaults to 3 unless manually configured.
3471 If it reaches the number of times, the command will
3472 raises ClusterDownException.
3473 """
3474 if not stack:
3475 return []
3476 retry_attempts = self._pipe.retry.get_retries()
3477 while True:
3478 try:
3479 return self._send_cluster_commands(
3480 stack,
3481 raise_on_error=raise_on_error,
3482 allow_redirections=allow_redirections,
3483 )
3484 except RedisCluster.ERRORS_ALLOW_RETRY as e:
3485 if retry_attempts > 0:
3486 # Try again with the new cluster setup. All other errors
3487 # should be raised.
3488 retry_attempts -= 1
3489 pass
3490 else:
3491 raise e
3493 def _send_cluster_commands(
3494 self, stack, raise_on_error=True, allow_redirections=True
3495 ):
3496 """
3497 Send a bunch of cluster commands to the redis cluster.
3499 `allow_redirections` If the pipeline should follow
3500 `ASK` & `MOVED` responses automatically. If set
3501 to false it will raise RedisClusterException.
3502 """
3503 # the first time sending the commands we send all of
3504 # the commands that were queued up.
3505 # if we have to run through it again, we only retry
3506 # the commands that failed.
3507 attempt = sorted(stack, key=lambda x: x.position)
3508 is_default_node = False
3509 # build a list of node objects based on node names we need to
3510 nodes: dict[str, NodeCommands] = {}
3511 nodes_written = 0
3512 nodes_read = 0
3514 try:
3515 # as we move through each command that still needs to be processed,
3516 # we figure out the slot number that command maps to, then from
3517 # the slot determine the node.
3518 for c in attempt:
3519 command_policies = self._pipe._policy_resolver.resolve(
3520 c.args[0].lower()
3521 )
3522 # refer to our internal node -> slot table that
3523 # tells us where a given command should route to.
3524 # (it might be possible we have a cached node that no longer
3525 # exists in the cluster, which is why we do this in a loop)
3526 passed_targets = c.options.pop("target_nodes", None)
3527 if passed_targets and not self._is_nodes_flag(passed_targets):
3528 target_nodes = self._parse_target_nodes(passed_targets)
3530 if not command_policies:
3531 command_policies = CommandPolicies()
3532 else:
3533 if not command_policies:
3534 command = c.args[0].upper()
3535 if (
3536 len(c.args) >= 2
3537 and f"{c.args[0]} {c.args[1]}".upper()
3538 in self._pipe.command_flags
3539 ):
3540 command = f"{c.args[0]} {c.args[1]}".upper()
3542 # We only could resolve key properties if command is not
3543 # in a list of pre-defined request policies
3544 command_flag = self.command_flags.get(command)
3545 if not command_flag:
3546 # Fallback to default policy
3547 if not self._pipe.get_default_node():
3548 keys = None
3549 else:
3550 keys = self._pipe._get_command_keys(*c.args)
3551 if not keys or len(keys) == 0:
3552 command_policies = CommandPolicies()
3553 else:
3554 command_policies = CommandPolicies(
3555 request_policy=RequestPolicy.DEFAULT_KEYED,
3556 response_policy=ResponsePolicy.DEFAULT_KEYED,
3557 )
3558 else:
3559 if command_flag in self._pipe._command_flags_mapping:
3560 command_policies = CommandPolicies(
3561 request_policy=self._pipe._command_flags_mapping[
3562 command_flag
3563 ]
3564 )
3565 else:
3566 command_policies = CommandPolicies()
3568 target_nodes = self._determine_nodes(
3569 *c.args,
3570 request_policy=command_policies.request_policy,
3571 node_flag=passed_targets,
3572 )
3573 if not target_nodes:
3574 raise RedisClusterException(
3575 f"No targets were found to execute {c.args} command on"
3576 )
3577 c.command_policies = command_policies
3578 if len(target_nodes) > 1:
3579 raise RedisClusterException(
3580 f"Too many targets for command {c.args}"
3581 )
3583 node = target_nodes[0]
3584 if node == self._pipe.get_default_node():
3585 is_default_node = True
3587 # now that we know the name of the node
3588 # ( it's just a string in the form of host:port )
3589 # we can build a list of commands for each node.
3590 node_name = node.name
3591 if node_name not in nodes:
3592 redis_node = self._pipe.get_redis_connection(node)
3593 try:
3594 connection = get_connection(redis_node)
3595 except (ConnectionError, TimeoutError):
3596 # Release any connections we've already acquired before clearing nodes
3597 for n in nodes.values():
3598 n.connection_pool.release(n.connection)
3599 # Connection retries are being handled in the node's
3600 # Retry object. Reinitialize the node -> slot table.
3601 self._nodes_manager.initialize()
3602 if is_default_node:
3603 self._pipe.replace_default_node()
3604 nodes = {}
3605 raise
3606 nodes[node_name] = NodeCommands(
3607 redis_node.parse_response,
3608 redis_node.connection_pool,
3609 connection,
3610 )
3611 nodes[node_name].append(c)
3613 # send the commands in sequence.
3614 # we write to all the open sockets for each node first,
3615 # before reading anything
3616 # this allows us to flush all the requests out across the
3617 # network
3618 # so that we can read them from different sockets as they come back.
3619 # we don't multiplex on the sockets as they come available,
3620 # but that shouldn't make too much difference.
3622 # Start timing for observability
3623 start_time = time.monotonic()
3625 node_commands = nodes.values()
3626 for n in node_commands:
3627 nodes_written += 1
3628 n.write()
3630 for n in node_commands:
3631 n.read()
3633 # Find the first error in this node's commands, if any
3634 node_error = None
3635 for cmd in n.commands:
3636 if isinstance(cmd.result, Exception):
3637 node_error = cmd.result
3638 break
3640 record_operation_duration(
3641 command_name="PIPELINE",
3642 duration_seconds=time.monotonic() - start_time,
3643 server_address=n.connection.host,
3644 server_port=n.connection.port,
3645 db_namespace=str(n.connection.db),
3646 error=node_error,
3647 )
3648 nodes_read += 1
3649 finally:
3650 # release all the redis connections we allocated earlier
3651 # back into the connection pool.
3652 # if the connection is dirty (that is: we've written
3653 # commands to it, but haven't read the responses), we need
3654 # to close the connection before returning it to the pool.
3655 # otherwise, the next caller to use this connection will
3656 # read the response from _this_ request, not its own request.
3657 # disconnecting discards the dirty state & forces the next
3658 # caller to reconnect.
3659 # NOTE: dicts have a consistent ordering; we're iterating
3660 # through nodes.values() in the same order as we are when
3661 # reading / writing to the connections above, which is critical
3662 # for how we're using the nodes_written/nodes_read offsets.
3663 for i, n in enumerate(nodes.values()):
3664 if i < nodes_written and i >= nodes_read:
3665 n.connection.disconnect()
3666 n.connection_pool.release(n.connection)
3668 # if the response isn't an exception it is a
3669 # valid response from the node
3670 # we're all done with that command, YAY!
3671 # if we have more commands to attempt, we've run into problems.
3672 # collect all the commands we are allowed to retry.
3673 # (MOVED, ASK, or connection errors or timeout errors)
3674 attempt = sorted(
3675 (
3676 c
3677 for c in attempt
3678 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
3679 ),
3680 key=lambda x: x.position,
3681 )
3682 if attempt and allow_redirections:
3683 # RETRY MAGIC HAPPENS HERE!
3684 # send these remaining commands one at a time using `execute_command`
3685 # in the main client. This keeps our retry logic
3686 # in one place mostly,
3687 # and allows us to be more confident in correctness of behavior.
3688 # at this point any speed gains from pipelining have been lost
3689 # anyway, so we might as well make the best
3690 # attempt to get the correct behavior.
3691 #
3692 # The client command will handle retries for each
3693 # individual command sequentially as we pass each
3694 # one into `execute_command`. Any exceptions
3695 # that bubble out should only appear once all
3696 # retries have been exhausted.
3697 #
3698 # If a lot of commands have failed, we'll be setting the
3699 # flag to rebuild the slots table from scratch.
3700 # So MOVED errors should correct themselves fairly quickly.
3701 self._pipe.reinitialize_counter += 1
3702 if self._pipe._should_reinitialized():
3703 self._nodes_manager.initialize()
3704 if is_default_node:
3705 self._pipe.replace_default_node()
3706 for c in attempt:
3707 try:
3708 # send each command individually like we
3709 # do in the main client.
3710 c.result = self._pipe.parent_execute_command(*c.args, **c.options)
3711 except RedisError as e:
3712 c.result = e
3714 # turn the response back into a simple flat array that corresponds
3715 # to the sequence of commands issued in the stack in pipeline.execute()
3716 response = []
3717 for c in sorted(stack, key=lambda x: x.position):
3718 if c.args[0] in self._pipe.cluster_response_callbacks:
3719 # Remove keys entry, it needs only for cache.
3720 c.options.pop("keys", None)
3721 c.result = self._pipe._policies_callback_mapping[
3722 c.command_policies.response_policy
3723 ](
3724 self._pipe.cluster_response_callbacks[c.args[0]](
3725 c.result, **c.options
3726 )
3727 )
3728 response.append(c.result)
3730 if raise_on_error:
3731 self._raise_first_error(stack, start_time)
3733 return response
3735 def _is_nodes_flag(self, target_nodes):
3736 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
3738 def _parse_target_nodes(self, target_nodes):
3739 if isinstance(target_nodes, list):
3740 nodes = target_nodes
3741 elif isinstance(target_nodes, ClusterNode):
3742 # Supports passing a single ClusterNode as a variable
3743 nodes = [target_nodes]
3744 elif isinstance(target_nodes, dict):
3745 # Supports dictionaries of the format {node_name: node}.
3746 # It enables to execute commands with multi nodes as follows:
3747 # rc.cluster_save_config(rc.get_primaries())
3748 nodes = target_nodes.values()
3749 else:
3750 raise TypeError(
3751 "target_nodes type can be one of the following: "
3752 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
3753 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
3754 f"The passed type is {type(target_nodes)}"
3755 )
3756 return nodes
3758 def _determine_nodes(
3759 self, *args, request_policy: RequestPolicy, **kwargs
3760 ) -> List["ClusterNode"]:
3761 # Determine which nodes should be executed the command on.
3762 # Returns a list of target nodes.
3763 command = args[0].upper()
3764 if (
3765 len(args) >= 2
3766 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
3767 ):
3768 command = f"{args[0]} {args[1]}".upper()
3770 nodes_flag = kwargs.pop("nodes_flag", None)
3771 if nodes_flag is not None:
3772 # nodes flag passed by the user
3773 command_flag = nodes_flag
3774 else:
3775 # get the nodes group for this command if it was predefined
3776 command_flag = self._pipe.command_flags.get(command)
3778 if command_flag in self._pipe._command_flags_mapping:
3779 request_policy = self._pipe._command_flags_mapping[command_flag]
3781 policy_callback = self._pipe._policies_callback_mapping[request_policy]
3783 if request_policy == RequestPolicy.DEFAULT_KEYED:
3784 nodes = policy_callback(command, *args)
3785 elif request_policy == RequestPolicy.MULTI_SHARD:
3786 nodes = policy_callback(*args, **kwargs)
3787 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
3788 nodes = policy_callback(args[0])
3789 else:
3790 nodes = policy_callback()
3792 if args[0].lower() == "ft.aggregate":
3793 self._aggregate_nodes = nodes
3795 return nodes
3797 def multi(self):
3798 raise RedisClusterException(
3799 "method multi() is not supported outside of transactional context"
3800 )
3802 def discard(self):
3803 raise RedisClusterException(
3804 "method discard() is not supported outside of transactional context"
3805 )
3807 def watch(self, *names):
3808 raise RedisClusterException(
3809 "method watch() is not supported outside of transactional context"
3810 )
3812 def unwatch(self, *names):
3813 raise RedisClusterException(
3814 "method unwatch() is not supported outside of transactional context"
3815 )
3817 def delete(self, *names):
3818 if len(names) != 1:
3819 raise RedisClusterException(
3820 "deleting multiple keys is not implemented in pipeline command"
3821 )
3823 return self.execute_command("DEL", names[0])
3825 def unlink(self, *names):
3826 if len(names) != 1:
3827 raise RedisClusterException(
3828 "unlinking multiple keys is not implemented in pipeline command"
3829 )
3831 return self.execute_command("UNLINK", names[0])
3834class TransactionStrategy(AbstractStrategy):
3835 NO_SLOTS_COMMANDS = {"UNWATCH"}
3836 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
3837 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3838 SLOT_REDIRECT_ERRORS = (AskError, MovedError)
3839 CONNECTION_ERRORS = (
3840 ConnectionError,
3841 OSError,
3842 ClusterDownError,
3843 SlotNotCoveredError,
3844 )
3846 def __init__(self, pipe: ClusterPipeline):
3847 super().__init__(pipe)
3848 self._explicit_transaction = False
3849 self._watching = False
3850 self._pipeline_slots: Set[int] = set()
3851 self._transaction_connection: Optional[Connection] = None
3852 self._executing = False
3853 self._retry = copy(self._pipe.retry)
3854 self._retry.update_supported_errors(
3855 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
3856 )
3858 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
3859 """
3860 Find a connection for a pipeline transaction.
3862 For running an atomic transaction, watch keys ensure that contents have not been
3863 altered as long as the watch commands for those keys were sent over the same
3864 connection. So once we start watching a key, we fetch a connection to the
3865 node that owns that slot and reuse it.
3866 """
3867 if not self._pipeline_slots:
3868 raise RedisClusterException(
3869 "At least a command with a key is needed to identify a node"
3870 )
3872 node: ClusterNode = self._nodes_manager.get_node_from_slot(
3873 list(self._pipeline_slots)[0], False
3874 )
3875 redis_node: Redis = self._pipe.get_redis_connection(node)
3876 if self._transaction_connection:
3877 if not redis_node.connection_pool.owns_connection(
3878 self._transaction_connection
3879 ):
3880 previous_node = self._nodes_manager.find_connection_owner(
3881 self._transaction_connection
3882 )
3883 previous_node.connection_pool.release(self._transaction_connection)
3884 self._transaction_connection = None
3886 if not self._transaction_connection:
3887 self._transaction_connection = get_connection(redis_node)
3889 return redis_node, self._transaction_connection
3891 def execute_command(self, *args, **kwargs):
3892 slot_number: Optional[int] = None
3893 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
3894 slot_number = self._pipe.determine_slot(*args)
3896 if (
3897 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
3898 ) and not self._explicit_transaction:
3899 if args[0] == "WATCH":
3900 self._validate_watch()
3902 if slot_number is not None:
3903 if self._pipeline_slots and slot_number not in self._pipeline_slots:
3904 raise CrossSlotTransactionError(
3905 "Cannot watch or send commands on different slots"
3906 )
3908 self._pipeline_slots.add(slot_number)
3909 elif args[0] not in self.NO_SLOTS_COMMANDS:
3910 raise RedisClusterException(
3911 f"Cannot identify slot number for command: {args[0]},"
3912 "it cannot be triggered in a transaction"
3913 )
3915 return self._immediate_execute_command(*args, **kwargs)
3916 else:
3917 if slot_number is not None:
3918 self._pipeline_slots.add(slot_number)
3920 return self.pipeline_execute_command(*args, **kwargs)
3922 def _validate_watch(self):
3923 if self._explicit_transaction:
3924 raise RedisError("Cannot issue a WATCH after a MULTI")
3926 self._watching = True
3928 def _immediate_execute_command(self, *args, **options):
3929 return self._retry.call_with_retry(
3930 lambda: self._get_connection_and_send_command(*args, **options),
3931 self._reinitialize_on_error,
3932 with_failure_count=True,
3933 )
3935 def _get_connection_and_send_command(self, *args, **options):
3936 redis_node, connection = self._get_client_and_connection_for_transaction()
3938 # Start timing for observability
3939 start_time = time.monotonic()
3941 try:
3942 response = self._send_command_parse_response(
3943 connection, redis_node, args[0], *args, **options
3944 )
3946 record_operation_duration(
3947 command_name=args[0],
3948 duration_seconds=time.monotonic() - start_time,
3949 server_address=connection.host,
3950 server_port=connection.port,
3951 db_namespace=str(connection.db),
3952 )
3954 return response
3955 except Exception as e:
3956 if connection:
3957 # this is used to report the metrics based on host and port info
3958 e.connection = connection
3959 record_operation_duration(
3960 command_name=args[0],
3961 duration_seconds=time.monotonic() - start_time,
3962 server_address=connection.host,
3963 server_port=connection.port,
3964 db_namespace=str(connection.db),
3965 error=e,
3966 )
3967 raise
3969 def _send_command_parse_response(
3970 self, conn, redis_node: Redis, command_name, *args, **options
3971 ):
3972 """
3973 Send a command and parse the response
3974 """
3976 conn.send_command(*args)
3977 output = redis_node.parse_response(conn, command_name, **options)
3979 if command_name in self.UNWATCH_COMMANDS:
3980 self._watching = False
3981 return output
3983 def _reinitialize_on_error(self, error, failure_count):
3984 if hasattr(error, "connection"):
3985 record_error_count(
3986 server_address=error.connection.host,
3987 server_port=error.connection.port,
3988 network_peer_address=error.connection.host,
3989 network_peer_port=error.connection.port,
3990 error_type=error,
3991 retry_attempts=failure_count,
3992 is_internal=True,
3993 )
3995 if self._watching:
3996 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
3997 raise WatchError("Slot rebalancing occurred while watching keys")
3999 if (
4000 type(error) in self.SLOT_REDIRECT_ERRORS
4001 or type(error) in self.CONNECTION_ERRORS
4002 ):
4003 if self._transaction_connection:
4004 if is_debug_log_enabled():
4005 logger.debug(
4006 f"Operation failed, "
4007 f"with connection: {self._transaction_connection}, "
4008 f"details: {self._transaction_connection.extract_connection_details()}",
4009 )
4010 # Disconnect and release back to pool
4011 self._transaction_connection.disconnect()
4012 node = self._nodes_manager.find_connection_owner(
4013 self._transaction_connection
4014 )
4015 if node and node.redis_connection:
4016 node.redis_connection.connection_pool.release(
4017 self._transaction_connection
4018 )
4019 self._transaction_connection = None
4021 self._pipe.reinitialize_counter += 1
4022 if self._pipe._should_reinitialized():
4023 self._nodes_manager.initialize()
4024 self.reinitialize_counter = 0
4025 else:
4026 if isinstance(error, AskError):
4027 self._nodes_manager.move_slot(error)
4029 self._executing = False
4031 def _raise_first_error(self, responses, stack, start_time):
4032 """
4033 Raise the first exception on the stack
4034 """
4035 for r, cmd in zip(responses, stack):
4036 if isinstance(r, Exception):
4037 self.annotate_exception(r, cmd.position + 1, cmd.args)
4039 record_operation_duration(
4040 command_name="TRANSACTION",
4041 duration_seconds=time.monotonic() - start_time,
4042 server_address=self._transaction_connection.host,
4043 server_port=self._transaction_connection.port,
4044 db_namespace=str(self._transaction_connection.db),
4045 )
4047 raise r
4049 def execute(self, raise_on_error: bool = True) -> List[Any]:
4050 stack = self._command_queue
4051 if not stack and (not self._watching or not self._pipeline_slots):
4052 return []
4054 return self._execute_transaction_with_retries(stack, raise_on_error)
4056 def _execute_transaction_with_retries(
4057 self, stack: List["PipelineCommand"], raise_on_error: bool
4058 ):
4059 return self._retry.call_with_retry(
4060 lambda: self._execute_transaction(stack, raise_on_error),
4061 lambda error, failure_count: self._reinitialize_on_error(
4062 error, failure_count
4063 ),
4064 with_failure_count=True,
4065 )
4067 def _execute_transaction(
4068 self, stack: List["PipelineCommand"], raise_on_error: bool
4069 ):
4070 if len(self._pipeline_slots) > 1:
4071 raise CrossSlotTransactionError(
4072 "All keys involved in a cluster transaction must map to the same slot"
4073 )
4075 self._executing = True
4077 redis_node, connection = self._get_client_and_connection_for_transaction()
4079 stack = chain(
4080 [PipelineCommand(("MULTI",))],
4081 stack,
4082 [PipelineCommand(("EXEC",))],
4083 )
4084 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
4085 packed_commands = connection.pack_commands(commands)
4087 # Start timing for observability
4088 start_time = time.monotonic()
4090 connection.send_packed_command(packed_commands)
4091 errors = []
4093 # parse off the response for MULTI
4094 # NOTE: we need to handle ResponseErrors here and continue
4095 # so that we read all the additional command messages from
4096 # the socket
4097 try:
4098 redis_node.parse_response(connection, "MULTI")
4099 except ResponseError as e:
4100 self.annotate_exception(e, 0, "MULTI")
4101 errors.append(e)
4102 except self.CONNECTION_ERRORS as cluster_error:
4103 self.annotate_exception(cluster_error, 0, "MULTI")
4104 raise
4106 # and all the other commands
4107 for i, command in enumerate(self._command_queue):
4108 if EMPTY_RESPONSE in command.options:
4109 errors.append((i, command.options[EMPTY_RESPONSE]))
4110 else:
4111 try:
4112 _ = redis_node.parse_response(connection, "_")
4113 except self.SLOT_REDIRECT_ERRORS as slot_error:
4114 self.annotate_exception(slot_error, i + 1, command.args)
4115 errors.append(slot_error)
4116 except self.CONNECTION_ERRORS as cluster_error:
4117 self.annotate_exception(cluster_error, i + 1, command.args)
4118 raise
4119 except ResponseError as e:
4120 self.annotate_exception(e, i + 1, command.args)
4121 errors.append(e)
4123 response = None
4124 # parse the EXEC.
4125 try:
4126 response = redis_node.parse_response(connection, "EXEC")
4127 except ExecAbortError:
4128 if errors:
4129 raise errors[0]
4130 raise
4132 self._executing = False
4134 record_operation_duration(
4135 command_name="TRANSACTION",
4136 duration_seconds=time.monotonic() - start_time,
4137 server_address=connection.host,
4138 server_port=connection.port,
4139 db_namespace=str(connection.db),
4140 )
4142 # EXEC clears any watched keys
4143 self._watching = False
4145 if response is None:
4146 raise WatchError("Watched variable changed.")
4148 # put any parse errors into the response
4149 for i, e in errors:
4150 response.insert(i, e)
4152 if len(response) != len(self._command_queue):
4153 raise InvalidPipelineStack(
4154 "Unexpected response length for cluster pipeline EXEC."
4155 " Command stack was {} but response had length {}".format(
4156 [c.args[0] for c in self._command_queue], len(response)
4157 )
4158 )
4160 # find any errors in the response and raise if necessary
4161 if raise_on_error or len(errors) > 0:
4162 self._raise_first_error(
4163 response,
4164 self._command_queue,
4165 start_time,
4166 )
4168 # We have to run response callbacks manually
4169 data = []
4170 for r, cmd in zip(response, self._command_queue):
4171 if not isinstance(r, Exception):
4172 command_name = cmd.args[0]
4173 if command_name in self._pipe.cluster_response_callbacks:
4174 r = self._pipe.cluster_response_callbacks[command_name](
4175 r, **cmd.options
4176 )
4177 data.append(r)
4178 return data
4180 def reset(self):
4181 self._command_queue = []
4183 # make sure to reset the connection state in the event that we were
4184 # watching something
4185 if self._transaction_connection:
4186 try:
4187 if self._watching:
4188 # call this manually since our unwatch or
4189 # immediate_execute_command methods can call reset()
4190 self._transaction_connection.send_command("UNWATCH")
4191 self._transaction_connection.read_response()
4192 # we can safely return the connection to the pool here since we're
4193 # sure we're no longer WATCHing anything
4194 node = self._nodes_manager.find_connection_owner(
4195 self._transaction_connection
4196 )
4197 if node and node.redis_connection:
4198 node.redis_connection.connection_pool.release(
4199 self._transaction_connection
4200 )
4201 self._transaction_connection = None
4202 except self.CONNECTION_ERRORS:
4203 # disconnect will also remove any previous WATCHes
4204 if self._transaction_connection:
4205 self._transaction_connection.disconnect()
4206 node = self._nodes_manager.find_connection_owner(
4207 self._transaction_connection
4208 )
4209 if node and node.redis_connection:
4210 node.redis_connection.connection_pool.release(
4211 self._transaction_connection
4212 )
4213 self._transaction_connection = None
4215 # clean up the other instance attributes
4216 self._watching = False
4217 self._explicit_transaction = False
4218 self._pipeline_slots = set()
4219 self._executing = False
4221 def send_cluster_commands(
4222 self, stack, raise_on_error=True, allow_redirections=True
4223 ):
4224 raise NotImplementedError(
4225 "send_cluster_commands cannot be executed in transactional context."
4226 )
4228 def multi(self):
4229 if self._explicit_transaction:
4230 raise RedisError("Cannot issue nested calls to MULTI")
4231 if self._command_queue:
4232 raise RedisError(
4233 "Commands without an initial WATCH have already been issued"
4234 )
4235 self._explicit_transaction = True
4237 def watch(self, *names):
4238 if self._explicit_transaction:
4239 raise RedisError("Cannot issue a WATCH after a MULTI")
4241 return self.execute_command("WATCH", *names)
4243 def unwatch(self):
4244 if self._watching:
4245 return self.execute_command("UNWATCH")
4247 return True
4249 def discard(self):
4250 self.reset()
4252 def delete(self, *names):
4253 return self.execute_command("DEL", *names)
4255 def unlink(self, *names):
4256 return self.execute_command("UNLINK", *names)