Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2import random
3import socket
4import sys
5import threading
6import time
7import weakref
8from abc import ABC, abstractmethod
9from collections import OrderedDict, defaultdict
10from concurrent.futures import Future, ThreadPoolExecutor
11from copy import copy
12from enum import Enum
13from itertools import chain
14from types import MethodType
15from typing import (
16 TYPE_CHECKING,
17 Any,
18 Callable,
19 Dict,
20 List,
21 Literal,
22 Optional,
23 Set,
24 Tuple,
25 Type,
26 Union,
27)
29if TYPE_CHECKING:
30 from redis.keyspace_notifications import ClusterKeyspaceNotifications
32from redis._parsers import CommandsParser, Encoder
33from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy
34from redis._parsers.helpers import parse_scan
35from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
36from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
37from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
38from redis.commands import READ_COMMANDS, RedisClusterCommands
39from redis.commands.helpers import list_or_args
40from redis.commands.policies import PolicyResolver, StaticPolicyResolver
41from redis.connection import (
42 Connection,
43 ConnectionPool,
44 parse_url,
45)
46from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
47from redis.event import (
48 AfterPooledConnectionsInstantiationEvent,
49 AfterPubSubConnectionInstantiationEvent,
50 AfterSlotsCacheRefreshEvent,
51 ClientType,
52 EventDispatcher,
53 EventListenerInterface,
54)
55from redis.exceptions import (
56 AskError,
57 AuthenticationError,
58 ClusterDownError,
59 ClusterError,
60 ConnectionError,
61 CrossSlotTransactionError,
62 DataError,
63 ExecAbortError,
64 InvalidPipelineStack,
65 MaxConnectionsError,
66 MovedError,
67 RedisClusterException,
68 RedisError,
69 ResponseError,
70 SlotNotCoveredError,
71 TimeoutError,
72 TryAgainError,
73 WatchError,
74)
75from redis.lock import Lock
76from redis.maint_notifications import (
77 MaintNotificationsConfig,
78 OSSMaintNotificationsHandler,
79)
80from redis.observability.recorder import (
81 record_error_count,
82 record_operation_duration,
83)
84from redis.retry import Retry
85from redis.utils import (
86 DEFAULT_RESP_VERSION,
87 check_protocol_version,
88 deprecated_args,
89 deprecated_function,
90 dict_merge,
91 list_keys_to_dict,
92 merge_result,
93 safe_str,
94 str_if_bytes,
95 truncate_text,
96)
98logger = logging.getLogger(__name__)
101def is_debug_log_enabled():
102 return logger.isEnabledFor(logging.DEBUG)
105def get_node_name(host: str, port: Union[str, int]) -> str:
106 return f"{host}:{port}"
109@deprecated_args(
110 allowed_args=["redis_node"],
111 reason="Use get_connection(redis_node) instead",
112 version="5.3.0",
113)
114def get_connection(redis_node: Redis, *args, **options) -> Connection:
115 return redis_node.connection or redis_node.connection_pool.get_connection()
118def parse_scan_result(command, res, **options):
119 cursors = {}
120 ret = []
121 for node_name, response in res.items():
122 cursor, r = parse_scan(response, **options)
123 cursors[node_name] = cursor
124 ret += r
126 return cursors, ret
129def parse_pubsub_numsub(command, res, **options):
130 numsub_d = OrderedDict()
131 for numsub_tups in res.values():
132 for channel, numsubbed in numsub_tups:
133 try:
134 numsub_d[channel] += numsubbed
135 except KeyError:
136 numsub_d[channel] = numsubbed
138 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
139 return ret_numsub
142def parse_cluster_slots(
143 resp: Any, **options: Any
144) -> Dict[Tuple[int, int], Dict[str, Any]]:
145 current_host = options.get("current_host", "")
147 def fix_server(*args: Any) -> Tuple[str, Any]:
148 return str_if_bytes(args[0]) or current_host, args[1]
150 slots = {}
151 for slot in resp:
152 start, end, primary = slot[:3]
153 replicas = slot[3:]
154 slots[start, end] = {
155 "primary": fix_server(*primary),
156 "replicas": [fix_server(*replica) for replica in replicas],
157 }
159 return slots
162def parse_cluster_shards(resp, **options):
163 """
164 Parse CLUSTER SHARDS response.
166 Normalises the output so that all dictionary keys are strings regardless
167 of protocol version (RESP2 returns bytes keys for node attributes,
168 RESP3 returns bytes keys at every level).
169 """
170 if isinstance(resp[0], dict):
171 # RESP3 – native dicts with bytes keys; decode them.
172 shards = []
173 for item in resp:
174 shard = {
175 "slots": item.get("slots") or item.get(b"slots", []),
176 "nodes": [],
177 }
178 raw_nodes = item.get("nodes") or item.get(b"nodes", [])
179 for node in raw_nodes:
180 if isinstance(node, dict):
181 shard["nodes"].append({str_if_bytes(k): v for k, v in node.items()})
182 else:
183 shard["nodes"].append(node)
184 shards.append(shard)
185 return shards
187 # RESP2 – flat list structure; build dicts with string keys.
188 shards = []
189 for x in resp:
190 shard = {"slots": [], "nodes": []}
191 for i in range(0, len(x[1]), 2):
192 shard["slots"].append((x[1][i], (x[1][i + 1])))
193 nodes = x[3]
194 for node in nodes:
195 dict_node = {}
196 for i in range(0, len(node), 2):
197 dict_node[str_if_bytes(node[i])] = node[i + 1]
198 shard["nodes"].append(dict_node)
199 shards.append(shard)
201 return shards
204def parse_cluster_myshardid(resp, **options):
205 """
206 Parse CLUSTER MYSHARDID response.
207 """
208 return resp.decode("utf-8")
211PRIMARY = "primary"
212REPLICA = "replica"
213SLOT_ID = "slot-id"
215REDIS_ALLOWED_KEYS = (
216 "connection_class",
217 "connection_pool",
218 "connection_pool_class",
219 "client_name",
220 "credential_provider",
221 "db",
222 "decode_responses",
223 "encoding",
224 "encoding_errors",
225 "host",
226 "lib_name",
227 "lib_version",
228 "max_connections",
229 "nodes_flag",
230 "redis_connect_func",
231 "password",
232 "port",
233 "timeout",
234 "queue_class",
235 "retry",
236 "retry_on_timeout",
237 "protocol",
238 "socket_connect_timeout",
239 "socket_keepalive",
240 "socket_keepalive_options",
241 "socket_timeout",
242 "ssl",
243 "ssl_ca_certs",
244 "ssl_ca_data",
245 "ssl_ca_path",
246 "ssl_certfile",
247 "ssl_cert_reqs",
248 "ssl_include_verify_flags",
249 "ssl_exclude_verify_flags",
250 "ssl_keyfile",
251 "ssl_password",
252 "ssl_check_hostname",
253 "unix_socket_path",
254 "username",
255 "cache",
256 "cache_config",
257 "maint_notifications_config",
258)
259KWARGS_DISABLED_KEYS = ("host", "port", "retry")
262def cleanup_kwargs(**kwargs):
263 """
264 Remove unsupported or disabled keys from kwargs
265 """
266 connection_kwargs = {
267 k: v
268 for k, v in kwargs.items()
269 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
270 }
272 return connection_kwargs
275class MaintNotificationsAbstractRedisCluster:
276 """
277 Abstract class for handling maintenance notifications logic.
278 This class is expected to be used as base class together with RedisCluster.
280 This class is intended to be used with multiple inheritance!
282 All logic related to maintenance notifications is encapsulated in this class.
283 """
285 def __init__(
286 self,
287 maint_notifications_config: Optional[MaintNotificationsConfig],
288 **kwargs,
289 ):
290 # Initialize maintenance notifications
291 is_protocol_supported = check_protocol_version(
292 kwargs.get("protocol", DEFAULT_RESP_VERSION), 3
293 )
295 if (
296 maint_notifications_config
297 and maint_notifications_config.enabled
298 and not is_protocol_supported
299 ):
300 raise RedisError(
301 "Maintenance notifications handlers on connection are only supported with RESP version 3"
302 )
303 if maint_notifications_config is None and is_protocol_supported:
304 maint_notifications_config = MaintNotificationsConfig()
306 self.maint_notifications_config = maint_notifications_config
308 if self.maint_notifications_config and self.maint_notifications_config.enabled:
309 self._oss_cluster_maint_notifications_handler = (
310 OSSMaintNotificationsHandler(self, self.maint_notifications_config)
311 )
312 # Update connection kwargs for all future nodes connections
313 self._update_connection_kwargs_for_maint_notifications(
314 self._oss_cluster_maint_notifications_handler
315 )
316 # Update existing nodes connections - they are created as part of the RedisCluster constructor
317 for node in self.get_nodes():
318 if node.redis_connection is None:
319 continue
320 node.redis_connection.connection_pool.update_maint_notifications_config(
321 self.maint_notifications_config,
322 oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler,
323 )
324 else:
325 self._oss_cluster_maint_notifications_handler = None
327 def _update_connection_kwargs_for_maint_notifications(
328 self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler
329 ):
330 """
331 Update the connection kwargs for all future connections.
332 """
333 self.nodes_manager.connection_kwargs.update(
334 {
335 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
336 }
337 )
340class AbstractRedisCluster:
341 RedisClusterRequestTTL = 16
343 PRIMARIES = "primaries"
344 REPLICAS = "replicas"
345 ALL_NODES = "all"
346 RANDOM = "random"
347 DEFAULT_NODE = "default-node"
349 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
351 COMMAND_FLAGS = dict_merge(
352 list_keys_to_dict(
353 [
354 "ACL CAT",
355 "ACL DELUSER",
356 "ACL DRYRUN",
357 "ACL GENPASS",
358 "ACL GETUSER",
359 "ACL HELP",
360 "ACL LIST",
361 "ACL LOG",
362 "ACL LOAD",
363 "ACL SAVE",
364 "ACL SETUSER",
365 "ACL USERS",
366 "ACL WHOAMI",
367 "AUTH",
368 "CLIENT LIST",
369 "CLIENT SETINFO",
370 "CLIENT SETNAME",
371 "CLIENT GETNAME",
372 "CONFIG SET",
373 "CONFIG REWRITE",
374 "CONFIG RESETSTAT",
375 "TIME",
376 "PUBSUB CHANNELS",
377 "PUBSUB NUMPAT",
378 "PUBSUB NUMSUB",
379 "PUBSUB SHARDCHANNELS",
380 "PUBSUB SHARDNUMSUB",
381 "PING",
382 "INFO",
383 "SHUTDOWN",
384 "KEYS",
385 "DBSIZE",
386 "BGSAVE",
387 "SLOWLOG GET",
388 "SLOWLOG LEN",
389 "SLOWLOG RESET",
390 "WAIT",
391 "WAITAOF",
392 "SAVE",
393 "MEMORY PURGE",
394 "MEMORY MALLOC-STATS",
395 "MEMORY STATS",
396 "LASTSAVE",
397 "CLIENT TRACKINGINFO",
398 "CLIENT PAUSE",
399 "CLIENT UNPAUSE",
400 "CLIENT UNBLOCK",
401 "CLIENT ID",
402 "CLIENT REPLY",
403 "CLIENT GETREDIR",
404 "CLIENT INFO",
405 "CLIENT KILL",
406 "READONLY",
407 "CLUSTER INFO",
408 "CLUSTER MEET",
409 "CLUSTER MYSHARDID",
410 "CLUSTER NODES",
411 "CLUSTER REPLICAS",
412 "CLUSTER RESET",
413 "CLUSTER SET-CONFIG-EPOCH",
414 "CLUSTER SLOTS",
415 "CLUSTER SHARDS",
416 "CLUSTER COUNT-FAILURE-REPORTS",
417 "CLUSTER KEYSLOT",
418 "COMMAND",
419 "COMMAND COUNT",
420 "COMMAND LIST",
421 "COMMAND GETKEYS",
422 "CONFIG GET",
423 "DEBUG",
424 "RANDOMKEY",
425 "READONLY",
426 "READWRITE",
427 "TIME",
428 "TFUNCTION LOAD",
429 "TFUNCTION DELETE",
430 "TFUNCTION LIST",
431 "TFCALL",
432 "TFCALLASYNC",
433 "LATENCY HISTORY",
434 "LATENCY LATEST",
435 "LATENCY RESET",
436 "MODULE LIST",
437 "MODULE LOAD",
438 "MODULE UNLOAD",
439 "MODULE LOADEX",
440 ],
441 DEFAULT_NODE,
442 ),
443 list_keys_to_dict(
444 [
445 "FLUSHALL",
446 "FLUSHDB",
447 "FUNCTION DELETE",
448 "FUNCTION FLUSH",
449 "FUNCTION LIST",
450 "FUNCTION LOAD",
451 "FUNCTION RESTORE",
452 "SCAN",
453 "SCRIPT EXISTS",
454 "SCRIPT FLUSH",
455 "SCRIPT LOAD",
456 ],
457 PRIMARIES,
458 ),
459 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
460 list_keys_to_dict(
461 [
462 "CLUSTER COUNTKEYSINSLOT",
463 "CLUSTER DELSLOTS",
464 "CLUSTER DELSLOTSRANGE",
465 "CLUSTER GETKEYSINSLOT",
466 "CLUSTER SETSLOT",
467 ],
468 SLOT_ID,
469 ),
470 )
472 SEARCH_COMMANDS = (
473 [
474 "FT.CREATE",
475 "FT.SEARCH",
476 "FT.AGGREGATE",
477 "FT.EXPLAIN",
478 "FT.EXPLAINCLI",
479 "FT,PROFILE",
480 "FT.ALTER",
481 "FT.DROPINDEX",
482 "FT.ALIASADD",
483 "FT.ALIASUPDATE",
484 "FT.ALIASDEL",
485 "FT.TAGVALS",
486 "FT.SUGADD",
487 "FT.SUGGET",
488 "FT.SUGDEL",
489 "FT.SUGLEN",
490 "FT.SYNUPDATE",
491 "FT.SYNDUMP",
492 "FT.SPELLCHECK",
493 "FT.DICTADD",
494 "FT.DICTDEL",
495 "FT.DICTDUMP",
496 "FT.INFO",
497 "FT._LIST",
498 "FT.CONFIG",
499 "FT.ADD",
500 "FT.DEL",
501 "FT.DROP",
502 "FT.GET",
503 "FT.MGET",
504 "FT.SYNADD",
505 ],
506 )
508 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
509 "CLUSTER SLOTS": parse_cluster_slots,
510 "CLUSTER SHARDS": parse_cluster_shards,
511 "CLUSTER MYSHARDID": parse_cluster_myshardid,
512 }
514 RESULT_CALLBACKS = dict_merge(
515 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
516 list_keys_to_dict(
517 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
518 ),
519 list_keys_to_dict(
520 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
521 ),
522 list_keys_to_dict(
523 [
524 "PING",
525 "CONFIG SET",
526 "CONFIG REWRITE",
527 "CONFIG RESETSTAT",
528 "CLIENT SETNAME",
529 "BGSAVE",
530 "SLOWLOG RESET",
531 "SAVE",
532 "MEMORY PURGE",
533 "CLIENT PAUSE",
534 "CLIENT UNPAUSE",
535 ],
536 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
537 ),
538 list_keys_to_dict(
539 ["DBSIZE", "WAIT"],
540 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
541 ),
542 list_keys_to_dict(
543 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
544 ),
545 list_keys_to_dict(["SCAN"], parse_scan_result),
546 list_keys_to_dict(
547 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
548 ),
549 list_keys_to_dict(
550 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
551 ),
552 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
553 )
555 ERRORS_ALLOW_RETRY = (
556 ConnectionError,
557 TimeoutError,
558 ClusterDownError,
559 SlotNotCoveredError,
560 )
562 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
563 """Replace the default cluster node.
564 A random cluster node will be chosen if target_node isn't passed, and primaries
565 will be prioritized. The default node will not be changed if there are no other
566 nodes in the cluster.
568 Args:
569 target_node (ClusterNode, optional): Target node to replace the default
570 node. Defaults to None.
571 """
572 if target_node:
573 self.nodes_manager.default_node = target_node
574 else:
575 curr_node = self.get_default_node()
576 primaries = [node for node in self.get_primaries() if node != curr_node]
577 if primaries:
578 # Choose a primary if the cluster contains different primaries
579 self.nodes_manager.default_node = random.choice(primaries)
580 else:
581 # Otherwise, choose a primary if the cluster contains different primaries
582 replicas = [node for node in self.get_replicas() if node != curr_node]
583 if replicas:
584 self.nodes_manager.default_node = random.choice(replicas)
587class RedisCluster(
588 AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands
589):
590 # Type discrimination marker for @overload self-type pattern
591 _is_async_client: Literal[False] = False
593 @classmethod
594 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
595 """
596 Return a Redis client object configured from the given URL
598 For example::
600 redis://[[username]:[password]]@localhost:6379/0
601 rediss://[[username]:[password]]@localhost:6379/0
602 unix://[username@]/path/to/socket.sock?db=0[&password=password]
604 Three URL schemes are supported:
606 - `redis://` creates a TCP socket connection. See more at:
607 <https://www.iana.org/assignments/uri-schemes/prov/redis>
608 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
609 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
610 - ``unix://``: creates a Unix Domain Socket connection.
612 The username, password, hostname, path and all querystring values
613 are passed through urllib.parse.unquote in order to replace any
614 percent-encoded values with their corresponding characters.
616 There are several ways to specify a database number. The first value
617 found will be used:
619 1. A ``db`` querystring option, e.g. redis://localhost?db=0
620 2. If using the redis:// or rediss:// schemes, the path argument
621 of the url, e.g. redis://localhost/0
622 3. A ``db`` keyword argument to this function.
624 If none of these options are specified, the default db=0 is used.
626 All querystring options are cast to their appropriate Python types.
627 Boolean arguments can be specified with string values "True"/"False"
628 or "Yes"/"No". Values that cannot be properly cast cause a
629 ``ValueError`` to be raised. Once parsed, the querystring arguments
630 and keyword arguments are passed to the ``ConnectionPool``'s
631 class initializer. In the case of conflicting arguments, querystring
632 arguments always win.
634 """
635 return cls(url=url, **kwargs)
637 @deprecated_args(
638 args_to_warn=["read_from_replicas"],
639 reason="Please configure the 'load_balancing_strategy' instead",
640 version="5.3.0",
641 )
642 @deprecated_args(
643 args_to_warn=[
644 "cluster_error_retry_attempts",
645 ],
646 reason="Please configure the 'retry' object instead",
647 version="6.0.0",
648 )
649 def __init__(
650 self,
651 host: Optional[str] = None,
652 port: int = 6379,
653 startup_nodes: Optional[List["ClusterNode"]] = None,
654 cluster_error_retry_attempts: int = 3,
655 retry: Optional["Retry"] = None,
656 require_full_coverage: bool = True,
657 reinitialize_steps: int = 5,
658 read_from_replicas: bool = False,
659 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
660 dynamic_startup_nodes: bool = True,
661 url: Optional[str] = None,
662 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
663 cache: Optional[CacheInterface] = None,
664 cache_config: Optional[CacheConfig] = None,
665 event_dispatcher: Optional[EventDispatcher] = None,
666 policy_resolver: PolicyResolver = StaticPolicyResolver(),
667 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
668 **kwargs,
669 ):
670 """
671 Initialize a new RedisCluster client.
673 :param startup_nodes:
674 List of nodes from which initial bootstrapping can be done
675 :param host:
676 Can be used to point to a startup node
677 :param port:
678 Can be used to point to a startup node
679 :param require_full_coverage:
680 When set to False (default value): the client will not require a
681 full coverage of the slots. However, if not all slots are covered,
682 and at least one node has 'cluster-require-full-coverage' set to
683 'yes,' the server will throw a ClusterDownError for some key-based
684 commands. See -
685 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
686 When set to True: all slots must be covered to construct the
687 cluster client. If not all slots are covered, RedisClusterException
688 will be thrown.
689 :param read_from_replicas:
690 @deprecated - please use load_balancing_strategy instead
691 Enable read from replicas in READONLY mode. You can read possibly
692 stale data.
693 When set to true, read commands will be assigned between the
694 primary and its replications in a Round-Robin manner.
695 :param load_balancing_strategy:
696 Enable read from replicas in READONLY mode and defines the load balancing
697 strategy that will be used for cluster node selection.
698 The data read from replicas is eventually consistent with the data in primary nodes.
699 :param dynamic_startup_nodes:
700 Set the RedisCluster's startup nodes to all of the discovered nodes.
701 If true (default value), the cluster's discovered nodes will be used to
702 determine the cluster nodes-slots mapping in the next topology refresh.
703 It will remove the initial passed startup nodes if their endpoints aren't
704 listed in the CLUSTER SLOTS output.
705 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
706 specific IP addresses, it is best to set it to false.
707 :param cluster_error_retry_attempts:
708 @deprecated - Please configure the 'retry' object instead
709 In case 'retry' object is set - this argument is ignored!
711 Number of times to retry before raising an error when
712 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
713 :class:`~.ClusterDownError` are encountered
714 :param retry:
715 A retry object that defines the retry strategy and the number of
716 retries for the cluster client.
717 In current implementation for the cluster client (starting form redis-py version 6.0.0)
718 the retry object is not yet fully utilized, instead it is used just to determine
719 the number of retries for the cluster client.
720 In the future releases the retry object will be used to handle the cluster client retries!
721 :param reinitialize_steps:
722 Specifies the number of MOVED errors that need to occur before
723 reinitializing the whole cluster topology. If a MOVED error occurs
724 and the cluster does not need to be reinitialized on this current
725 error handling, only the MOVED slot will be patched with the
726 redirected node.
727 To reinitialize the cluster on every MOVED error, set
728 reinitialize_steps to 1.
729 To avoid reinitializing the cluster on moved errors, set
730 reinitialize_steps to 0.
731 :param address_remap:
732 An optional callable which, when provided with an internal network
733 address of a node, e.g. a `(host, port)` tuple, will return the address
734 where the node is reachable. This can be used to map the addresses at
735 which the nodes _think_ they are, to addresses at which a client may
736 reach them, such as when they sit behind a proxy.
738 :param maint_notifications_config:
739 Configures the nodes connections to support maintenance notifications - see
740 `redis.maint_notifications.MaintNotificationsConfig` for details.
741 Only supported with RESP3.
742 If not provided and protocol is RESP3, the maintenance notifications
743 will be enabled by default (logic is included in the NodesManager
744 initialization).
745 :**kwargs:
746 Extra arguments that will be sent into Redis instance when created
747 (See Official redis-py doc for supported kwargs - the only limitation
748 is that you can't provide 'retry' object as part of kwargs.
749 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
750 Some kwargs are not supported and will raise a
751 RedisClusterException:
752 - db (Redis do not support database SELECT in cluster mode)
754 """
755 if startup_nodes is None:
756 startup_nodes = []
758 if "db" in kwargs:
759 # Argument 'db' is not possible to use in cluster mode
760 raise RedisClusterException(
761 "Argument 'db' is not possible to use in cluster mode"
762 )
764 if "retry" in kwargs:
765 # Argument 'retry' is not possible to be used in kwargs when in cluster mode
766 # the kwargs are set to the lower level connections to the cluster nodes
767 # and there we provide retry configuration without retries allowed.
768 # The retries should be handled on cluster client level.
769 raise RedisClusterException(
770 "The 'retry' argument cannot be used in kwargs when running in cluster mode."
771 )
773 # Get the startup node/s
774 from_url = False
775 if url is not None:
776 from_url = True
777 url_options = parse_url(url)
778 if "path" in url_options:
779 raise RedisClusterException(
780 "RedisCluster does not currently support Unix Domain "
781 "Socket connections"
782 )
783 if "db" in url_options and url_options["db"] != 0:
784 # Argument 'db' is not possible to use in cluster mode
785 raise RedisClusterException(
786 "A ``db`` querystring option can only be 0 in cluster mode"
787 )
788 kwargs.update(url_options)
789 host = kwargs.get("host")
790 port = kwargs.get("port", port)
791 startup_nodes.append(ClusterNode(host, port))
792 elif host is not None and port is not None:
793 startup_nodes.append(ClusterNode(host, port))
794 elif len(startup_nodes) == 0:
795 # No startup node was provided
796 raise RedisClusterException(
797 "RedisCluster requires at least one node to discover the "
798 "cluster. Please provide one of the followings:\n"
799 "1. host and port, for example:\n"
800 " RedisCluster(host='localhost', port=6379)\n"
801 "2. list of startup nodes, for example:\n"
802 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
803 " ClusterNode('localhost', 6378)])"
804 )
805 # Update the connection arguments
806 # Whenever a new connection is established, RedisCluster's on_connect
807 # method should be run
808 # If the user passed on_connect function we'll save it and run it
809 # inside the RedisCluster.on_connect() function
810 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
811 kwargs.update({"redis_connect_func": self.on_connect})
812 kwargs = cleanup_kwargs(**kwargs)
813 if retry:
814 self.retry = retry
815 else:
816 self.retry = Retry(
817 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
818 retries=cluster_error_retry_attempts,
819 )
821 self.encoder = Encoder(
822 kwargs.get("encoding", "utf-8"),
823 kwargs.get("encoding_errors", "strict"),
824 kwargs.get("decode_responses", False),
825 )
826 protocol = kwargs.get("protocol", DEFAULT_RESP_VERSION)
827 if (cache_config or cache) and not check_protocol_version(protocol, 3):
828 raise RedisError("Client caching is only supported with RESP version 3")
830 if maint_notifications_config and not check_protocol_version(protocol, 3):
831 raise RedisError(
832 "Maintenance notifications are only supported with RESP version 3"
833 )
834 if check_protocol_version(protocol, 3) and maint_notifications_config is None:
835 maint_notifications_config = MaintNotificationsConfig()
837 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
838 self.node_flags = self.__class__.NODE_FLAGS.copy()
839 self.read_from_replicas = read_from_replicas
840 self.load_balancing_strategy = load_balancing_strategy
841 self.reinitialize_counter = 0
842 self.reinitialize_steps = reinitialize_steps
843 if event_dispatcher is None:
844 self._event_dispatcher = EventDispatcher()
845 else:
846 self._event_dispatcher = event_dispatcher
847 self.startup_nodes = startup_nodes
849 self.nodes_manager = NodesManager(
850 startup_nodes=startup_nodes,
851 from_url=from_url,
852 require_full_coverage=require_full_coverage,
853 dynamic_startup_nodes=dynamic_startup_nodes,
854 address_remap=address_remap,
855 cache=cache,
856 cache_config=cache_config,
857 event_dispatcher=self._event_dispatcher,
858 maint_notifications_config=maint_notifications_config,
859 **kwargs,
860 )
862 self.cluster_response_callbacks = CaseInsensitiveDict(
863 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
864 )
865 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
867 # For backward compatibility, mapping from existing policies to new one
868 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
869 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
870 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
871 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
872 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
873 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
874 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
875 }
877 self._policies_callback_mapping: dict[
878 Union[RequestPolicy, ResponsePolicy], Callable
879 ] = {
880 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
881 self.get_random_primary_or_all_nodes(command_name)
882 ],
883 RequestPolicy.DEFAULT_KEYED: lambda command,
884 *args: self.get_nodes_from_slot(command, *args),
885 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
886 RequestPolicy.ALL_SHARDS: self.get_primaries,
887 RequestPolicy.ALL_NODES: self.get_nodes,
888 RequestPolicy.ALL_REPLICAS: self.get_replicas,
889 RequestPolicy.MULTI_SHARD: lambda *args,
890 **kwargs: self._split_multi_shard_command(*args, **kwargs),
891 RequestPolicy.SPECIAL: self.get_special_nodes,
892 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
893 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
894 }
896 self._policy_resolver = policy_resolver
897 self.commands_parser = CommandsParser(self)
899 # Node where FT.AGGREGATE command is executed.
900 self._aggregate_nodes = None
901 self._lock = threading.RLock()
903 MaintNotificationsAbstractRedisCluster.__init__(
904 self, maint_notifications_config, **kwargs
905 )
907 def __enter__(self):
908 return self
910 def __exit__(self, exc_type, exc_value, traceback):
911 self.close()
913 def __del__(self):
914 try:
915 self.close()
916 except Exception:
917 pass
919 def disconnect_connection_pools(self):
920 for node in self.get_nodes():
921 if node.redis_connection:
922 try:
923 node.redis_connection.connection_pool.disconnect()
924 except OSError:
925 # Client was already disconnected. do nothing
926 pass
928 def on_connect(self, connection):
929 """
930 Initialize the connection, authenticate and select a database and send
931 READONLY if it is set during object initialization.
932 """
933 connection.on_connect()
935 if self.read_from_replicas or self.load_balancing_strategy:
936 # Sending READONLY command to server to configure connection as
937 # readonly. Since each cluster node may change its server type due
938 # to a failover, we should establish a READONLY connection
939 # regardless of the server type. If this is a primary connection,
940 # READONLY would not affect executing write commands.
941 connection.send_command("READONLY")
942 if str_if_bytes(connection.read_response()) != "OK":
943 raise ConnectionError("READONLY command failed")
945 if self.user_on_connect_func is not None:
946 self.user_on_connect_func(connection)
948 def get_redis_connection(self, node: "ClusterNode") -> Redis:
949 if not node.redis_connection:
950 with self._lock:
951 if not node.redis_connection:
952 self.nodes_manager.create_redis_connections([node])
953 return node.redis_connection
955 def get_node(self, host=None, port=None, node_name=None):
956 return self.nodes_manager.get_node(host, port, node_name)
958 def get_primaries(self):
959 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
961 def get_replicas(self):
962 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
964 def get_random_node(self):
965 return random.choice(list(self.nodes_manager.nodes_cache.values()))
967 def get_random_primary_or_all_nodes(self, command_name):
968 """
969 Returns random primary or all nodes depends on READONLY mode.
970 """
971 if self.read_from_replicas and command_name in READ_COMMANDS:
972 return self.get_random_node()
974 return self.get_random_primary_node()
976 def get_nodes(self):
977 return list(self.nodes_manager.nodes_cache.values())
979 def get_node_from_key(self, key, replica=False):
980 """
981 Get the node that holds the key's slot.
982 If replica set to True but the slot doesn't have any replicas, None is
983 returned.
984 """
985 slot = self.keyslot(key)
986 slot_cache = self.nodes_manager.slots_cache.get(slot)
987 if slot_cache is None or len(slot_cache) == 0:
988 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
989 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
990 return None
991 elif replica:
992 node_idx = 1
993 else:
994 # primary
995 node_idx = 0
997 return slot_cache[node_idx]
999 def get_default_node(self):
1000 """
1001 Get the cluster's default node
1002 """
1003 return self.nodes_manager.default_node
1005 def get_nodes_from_slot(self, command: str, *args):
1006 """
1007 Returns a list of nodes that hold the specified keys' slots.
1008 """
1009 # get the node that holds the key's slot
1010 slot = self.determine_slot(*args)
1011 node = self.nodes_manager.get_node_from_slot(
1012 slot,
1013 self.read_from_replicas and command in READ_COMMANDS,
1014 self.load_balancing_strategy if command in READ_COMMANDS else None,
1015 )
1016 return [node]
1018 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]:
1019 """
1020 Splits the command with Multi-Shard policy, to the multiple commands
1021 """
1022 keys = self._get_command_keys(*args)
1023 commands = []
1025 for key in keys:
1026 commands.append(
1027 {
1028 "args": (args[0], key),
1029 "kwargs": kwargs,
1030 }
1031 )
1033 return commands
1035 def get_special_nodes(self) -> Optional[list["ClusterNode"]]:
1036 """
1037 Returns a list of nodes for commands with a special policy.
1038 """
1039 if not self._aggregate_nodes:
1040 raise RedisClusterException(
1041 "Cannot execute FT.CURSOR commands without FT.AGGREGATE"
1042 )
1044 return self._aggregate_nodes
1046 def get_random_primary_node(self) -> "ClusterNode":
1047 """
1048 Returns a random primary node
1049 """
1050 return random.choice(self.get_primaries())
1052 def _evaluate_all_succeeded(self, res):
1053 """
1054 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED
1055 """
1056 first_successful_response = None
1058 if isinstance(res, dict):
1059 for key, value in res.items():
1060 if value:
1061 if first_successful_response is None:
1062 first_successful_response = {key: value}
1063 else:
1064 return {key: False}
1065 else:
1066 for response in res:
1067 if response:
1068 if first_successful_response is None:
1069 # Dynamically resolve type
1070 first_successful_response = type(response)(response)
1071 else:
1072 return type(response)(False)
1074 return first_successful_response
1076 def set_default_node(self, node):
1077 """
1078 Set the default node of the cluster.
1079 :param node: 'ClusterNode'
1080 :return True if the default node was set, else False
1081 """
1082 if node is None or self.get_node(node_name=node.name) is None:
1083 return False
1084 self.nodes_manager.default_node = node
1085 return True
1087 def set_retry(self, retry: Retry) -> None:
1088 self.retry = retry
1090 def monitor(self, target_node=None):
1091 """
1092 Returns a Monitor object for the specified target node.
1093 The default cluster node will be selected if no target node was
1094 specified.
1095 Monitor is useful for handling the MONITOR command to the redis server.
1096 next_command() method returns one command from monitor
1097 listen() method yields commands from monitor.
1098 """
1099 if target_node is None:
1100 target_node = self.get_default_node()
1101 if target_node.redis_connection is None:
1102 raise RedisClusterException(
1103 f"Cluster Node {target_node.name} has no redis_connection"
1104 )
1105 return target_node.redis_connection.monitor()
1107 def pubsub(self, node=None, host=None, port=None, **kwargs):
1108 """
1109 Allows passing a ClusterNode, or host&port, to get a pubsub instance
1110 connected to the specified node
1111 """
1112 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
1114 def keyspace_notifications(
1115 self,
1116 key_prefix: Union[str, bytes, None] = None,
1117 ignore_subscribe_messages: bool = True,
1118 ) -> "ClusterKeyspaceNotifications":
1119 """
1120 Return a :class:`~redis.keyspace_notifications.ClusterKeyspaceNotifications`
1121 object for subscribing to keyspace and keyevent notifications across
1122 all primary nodes in the cluster.
1124 Note: Keyspace notifications must be enabled on all Redis cluster nodes
1125 via the ``notify-keyspace-events`` configuration option.
1127 Args:
1128 key_prefix: Optional prefix to filter and strip from keys in
1129 notifications.
1130 ignore_subscribe_messages: If True, subscribe/unsubscribe
1131 confirmations are not returned by
1132 get_message/listen.
1133 """
1134 from redis.keyspace_notifications import ClusterKeyspaceNotifications
1136 return ClusterKeyspaceNotifications(
1137 self,
1138 key_prefix=key_prefix,
1139 ignore_subscribe_messages=ignore_subscribe_messages,
1140 )
1142 def pipeline(self, transaction=None, shard_hint=None):
1143 """
1144 Cluster impl:
1145 Pipelines do not work in cluster mode the same way they
1146 do in normal mode. Create a clone of this object so
1147 that simulating pipelines will work correctly. Each
1148 command will be called directly when used and
1149 when calling execute() will only return the result stack.
1150 """
1151 if shard_hint:
1152 raise RedisClusterException("shard_hint is deprecated in cluster mode")
1154 return ClusterPipeline(
1155 nodes_manager=self.nodes_manager,
1156 commands_parser=self.commands_parser,
1157 startup_nodes=self.nodes_manager.startup_nodes,
1158 result_callbacks=self.result_callbacks,
1159 cluster_response_callbacks=self.cluster_response_callbacks,
1160 read_from_replicas=self.read_from_replicas,
1161 load_balancing_strategy=self.load_balancing_strategy,
1162 reinitialize_steps=self.reinitialize_steps,
1163 retry=self.retry,
1164 lock=self._lock,
1165 transaction=transaction,
1166 event_dispatcher=self._event_dispatcher,
1167 )
1169 def lock(
1170 self,
1171 name,
1172 timeout=None,
1173 sleep=0.1,
1174 blocking=True,
1175 blocking_timeout=None,
1176 lock_class=None,
1177 thread_local=True,
1178 raise_on_release_error: bool = True,
1179 ):
1180 """
1181 Return a new Lock object using key ``name`` that mimics
1182 the behavior of threading.Lock.
1184 If specified, ``timeout`` indicates a maximum life for the lock.
1185 By default, it will remain locked until release() is called.
1187 ``sleep`` indicates the amount of time to sleep per loop iteration
1188 when the lock is in blocking mode and another client is currently
1189 holding the lock.
1191 ``blocking`` indicates whether calling ``acquire`` should block until
1192 the lock has been acquired or to fail immediately, causing ``acquire``
1193 to return False and the lock not being acquired. Defaults to True.
1194 Note this value can be overridden by passing a ``blocking``
1195 argument to ``acquire``.
1197 ``blocking_timeout`` indicates the maximum amount of time in seconds to
1198 spend trying to acquire the lock. A value of ``None`` indicates
1199 continue trying forever. ``blocking_timeout`` can be specified as a
1200 float or integer, both representing the number of seconds to wait.
1202 ``lock_class`` forces the specified lock implementation. Note that as
1203 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
1204 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
1205 you have created your own custom lock class.
1207 ``thread_local`` indicates whether the lock token is placed in
1208 thread-local storage. By default, the token is placed in thread local
1209 storage so that a thread only sees its token, not a token set by
1210 another thread. Consider the following timeline:
1212 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
1213 thread-1 sets the token to "abc"
1214 time: 1, thread-2 blocks trying to acquire `my-lock` using the
1215 Lock instance.
1216 time: 5, thread-1 has not yet completed. redis expires the lock
1217 key.
1218 time: 5, thread-2 acquired `my-lock` now that it's available.
1219 thread-2 sets the token to "xyz"
1220 time: 6, thread-1 finishes its work and calls release(). if the
1221 token is *not* stored in thread local storage, then
1222 thread-1 would see the token value as "xyz" and would be
1223 able to successfully release the thread-2's lock.
1225 ``raise_on_release_error`` indicates whether to raise an exception when
1226 the lock is no longer owned when exiting the context manager. By default,
1227 this is True, meaning an exception will be raised. If False, the warning
1228 will be logged and the exception will be suppressed.
1230 In some use cases it's necessary to disable thread local storage. For
1231 example, if you have code where one thread acquires a lock and passes
1232 that lock instance to a worker thread to release later. If thread
1233 local storage isn't disabled in this case, the worker thread won't see
1234 the token set by the thread that acquired the lock. Our assumption
1235 is that these cases aren't common and as such default to using
1236 thread local storage."""
1237 if lock_class is None:
1238 lock_class = Lock
1239 return lock_class(
1240 self,
1241 name,
1242 timeout=timeout,
1243 sleep=sleep,
1244 blocking=blocking,
1245 blocking_timeout=blocking_timeout,
1246 thread_local=thread_local,
1247 raise_on_release_error=raise_on_release_error,
1248 )
1250 def set_response_callback(self, command, callback):
1251 """Set a custom Response Callback"""
1252 self.cluster_response_callbacks[command] = callback
1254 def _determine_nodes(
1255 self, *args, request_policy: RequestPolicy, **kwargs
1256 ) -> List["ClusterNode"]:
1257 """
1258 Determines a nodes the command should be executed on.
1259 """
1260 command = args[0].upper()
1261 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1262 command = f"{args[0]} {args[1]}".upper()
1264 nodes_flag = kwargs.pop("nodes_flag", None)
1265 if nodes_flag is not None:
1266 # nodes flag passed by the user
1267 command_flag = nodes_flag
1268 else:
1269 # get the nodes group for this command if it was predefined
1270 command_flag = self.command_flags.get(command)
1272 if command_flag in self._command_flags_mapping:
1273 request_policy = self._command_flags_mapping[command_flag]
1275 policy_callback = self._policies_callback_mapping[request_policy]
1277 if request_policy == RequestPolicy.DEFAULT_KEYED:
1278 nodes = policy_callback(command, *args)
1279 elif request_policy == RequestPolicy.MULTI_SHARD:
1280 nodes = policy_callback(*args, **kwargs)
1281 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
1282 nodes = policy_callback(args[0])
1283 else:
1284 nodes = policy_callback()
1286 if args[0].lower() == "ft.aggregate":
1287 self._aggregate_nodes = nodes
1289 return nodes
1291 def _should_reinitialized(self):
1292 # To reinitialize the cluster on every MOVED error,
1293 # set reinitialize_steps to 1.
1294 # To avoid reinitializing the cluster on moved errors, set
1295 # reinitialize_steps to 0.
1296 if self.reinitialize_steps == 0:
1297 return False
1298 else:
1299 return self.reinitialize_counter % self.reinitialize_steps == 0
1301 def keyslot(self, key):
1302 """
1303 Calculate keyslot for a given key.
1304 See Keys distribution model in https://redis.io/topics/cluster-spec
1305 """
1306 k = self.encoder.encode(key)
1307 return key_slot(k)
1309 def _get_command_keys(self, *args):
1310 """
1311 Get the keys in the command. If the command has no keys in in, None is
1312 returned.
1314 NOTE: Due to a bug in redis<7.0, this function does not work properly
1315 for EVAL or EVALSHA when the `numkeys` arg is 0.
1316 - issue: https://github.com/redis/redis/issues/9493
1317 - fix: https://github.com/redis/redis/pull/9733
1319 So, don't use this function with EVAL or EVALSHA.
1320 """
1321 redis_conn = self.get_default_node().redis_connection
1322 return self.commands_parser.get_keys(redis_conn, *args)
1324 def determine_slot(self, *args) -> Optional[int]:
1325 """
1326 Figure out what slot to use based on args.
1328 Raises a RedisClusterException if there's a missing key and we can't
1329 determine what slots to map the command to; or, if the keys don't
1330 all map to the same key slot.
1331 """
1332 command = args[0]
1333 if self.command_flags.get(command) == SLOT_ID:
1334 # The command contains the slot ID
1335 return args[1]
1337 # Get the keys in the command
1339 # CLIENT TRACKING is a special case.
1340 # It doesn't have any keys, it needs to be sent to the provided nodes
1341 # By default it will be sent to all nodes.
1342 if command.upper() == "CLIENT TRACKING":
1343 return None
1345 # EVAL and EVALSHA are common enough that it's wasteful to go to the
1346 # redis server to parse the keys. Besides, there is a bug in redis<7.0
1347 # where `self._get_command_keys()` fails anyway. So, we special case
1348 # EVAL/EVALSHA.
1349 if command.upper() in ("EVAL", "EVALSHA"):
1350 # command syntax: EVAL "script body" num_keys ...
1351 if len(args) <= 2:
1352 raise RedisClusterException(f"Invalid args in command: {args}")
1353 num_actual_keys = int(args[2])
1354 eval_keys = args[3 : 3 + num_actual_keys]
1355 # if there are 0 keys, that means the script can be run on any node
1356 # so we can just return a random slot
1357 if len(eval_keys) == 0:
1358 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1359 keys = eval_keys
1360 else:
1361 keys = self._get_command_keys(*args)
1362 if keys is None or len(keys) == 0:
1363 # FCALL can call a function with 0 keys, that means the function
1364 # can be run on any node so we can just return a random slot
1365 if command.upper() in ("FCALL", "FCALL_RO"):
1366 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1367 raise RedisClusterException(
1368 "No way to dispatch this command to Redis Cluster. "
1369 "Missing key.\nYou can execute the command by specifying "
1370 f"target nodes.\nCommand: {args}"
1371 )
1373 # single key command
1374 if len(keys) == 1:
1375 return self.keyslot(keys[0])
1377 # multi-key command; we need to make sure all keys are mapped to
1378 # the same slot
1379 slots = {self.keyslot(key) for key in keys}
1380 if len(slots) != 1:
1381 raise RedisClusterException(
1382 f"{command} - all keys must map to the same key slot"
1383 )
1385 return slots.pop()
1387 def get_encoder(self):
1388 """
1389 Get the connections' encoder
1390 """
1391 return self.encoder
1393 def get_connection_kwargs(self):
1394 """
1395 Get the connections' key-word arguments
1396 """
1397 return self.nodes_manager.connection_kwargs
1399 def _is_nodes_flag(self, target_nodes):
1400 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1402 def _parse_target_nodes(self, target_nodes):
1403 if isinstance(target_nodes, list):
1404 nodes = target_nodes
1405 elif isinstance(target_nodes, ClusterNode):
1406 # Supports passing a single ClusterNode as a variable
1407 nodes = [target_nodes]
1408 elif isinstance(target_nodes, dict):
1409 # Supports dictionaries of the format {node_name: node}.
1410 # It enables to execute commands with multi nodes as follows:
1411 # rc.cluster_save_config(rc.get_primaries())
1412 nodes = target_nodes.values()
1413 else:
1414 raise TypeError(
1415 "target_nodes type can be one of the following: "
1416 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1417 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1418 f"The passed type is {type(target_nodes)}"
1419 )
1420 return nodes
1422 def execute_command(self, *args, **kwargs):
1423 return self._internal_execute_command(*args, **kwargs)
1425 def _internal_execute_command(self, *args, **kwargs):
1426 """
1427 Wrapper for ERRORS_ALLOW_RETRY error handling.
1429 It will try the number of times specified by the retries property from
1430 config option "self.retry" which defaults to 3 unless manually
1431 configured.
1433 If it reaches the number of times, the command will raise the exception
1435 Key argument :target_nodes: can be passed with the following types:
1436 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1437 ClusterNode
1438 list<ClusterNode>
1439 dict<Any, ClusterNode>
1440 """
1441 target_nodes_specified = False
1442 is_default_node = False
1443 target_nodes = None
1444 passed_targets = kwargs.pop("target_nodes", None)
1445 command_policies = self._policy_resolver.resolve(args[0].lower())
1447 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1448 target_nodes = self._parse_target_nodes(passed_targets)
1449 target_nodes_specified = True
1451 if not command_policies and not target_nodes_specified:
1452 command = args[0].upper()
1453 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1454 command = f"{args[0]} {args[1]}".upper()
1456 # We only could resolve key properties if command is not
1457 # in a list of pre-defined request policies
1458 command_flag = self.command_flags.get(command)
1459 if not command_flag:
1460 # Fallback to default policy
1461 if not self.get_default_node():
1462 slot = None
1463 else:
1464 slot = self.determine_slot(*args)
1465 if slot is None:
1466 command_policies = CommandPolicies()
1467 else:
1468 command_policies = CommandPolicies(
1469 request_policy=RequestPolicy.DEFAULT_KEYED,
1470 response_policy=ResponsePolicy.DEFAULT_KEYED,
1471 )
1472 else:
1473 if command_flag in self._command_flags_mapping:
1474 command_policies = CommandPolicies(
1475 request_policy=self._command_flags_mapping[command_flag]
1476 )
1477 else:
1478 command_policies = CommandPolicies()
1479 elif not command_policies and target_nodes_specified:
1480 command_policies = CommandPolicies()
1482 # If an error that allows retrying was thrown, the nodes and slots
1483 # cache were reinitialized. We will retry executing the command with
1484 # the updated cluster setup only when the target nodes can be
1485 # determined again with the new cache tables. Therefore, when target
1486 # nodes were passed to this function, we cannot retry the command
1487 # execution since the nodes may not be valid anymore after the tables
1488 # were reinitialized. So in case of passed target nodes,
1489 # retry_attempts will be set to 0.
1490 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
1491 # Add one for the first execution
1492 execute_attempts = 1 + retry_attempts
1493 failure_count = 0
1495 # Start timing for observability
1496 start_time = time.monotonic()
1498 for _ in range(execute_attempts):
1499 try:
1500 res = {}
1501 if not target_nodes_specified:
1502 # Determine the nodes to execute the command on
1503 target_nodes = self._determine_nodes(
1504 *args,
1505 request_policy=command_policies.request_policy,
1506 nodes_flag=passed_targets,
1507 )
1509 if not target_nodes:
1510 raise RedisClusterException(
1511 f"No targets were found to execute {args} command on"
1512 )
1513 if (
1514 len(target_nodes) == 1
1515 and target_nodes[0] == self.get_default_node()
1516 ):
1517 is_default_node = True
1518 for node in target_nodes:
1519 res[node.name] = self._execute_command(node, *args, **kwargs)
1521 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED:
1522 break
1524 # Return the processed result
1525 return self._process_result(
1526 args[0],
1527 res,
1528 response_policy=command_policies.response_policy,
1529 **kwargs,
1530 )
1531 except Exception as e:
1532 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1533 if is_default_node:
1534 # Replace the default cluster node
1535 self.replace_default_node()
1536 # The nodes and slots cache were reinitialized.
1537 # Try again with the new cluster setup.
1538 retry_attempts -= 1
1539 failure_count += 1
1541 if hasattr(e, "connection"):
1542 self._record_command_metric(
1543 command_name=args[0],
1544 duration_seconds=time.monotonic() - start_time,
1545 connection=e.connection,
1546 error=e,
1547 )
1549 self._record_error_metric(
1550 error=e,
1551 connection=e.connection,
1552 retry_attempts=failure_count,
1553 )
1554 continue
1555 else:
1556 # raise the exception
1557 if hasattr(e, "connection"):
1558 self._record_error_metric(
1559 error=e,
1560 connection=e.connection,
1561 retry_attempts=failure_count,
1562 is_internal=False,
1563 )
1564 raise e
1566 def _execute_command(self, target_node, *args, **kwargs):
1567 """
1568 Send a command to a node in the cluster
1569 """
1570 command = args[0]
1571 redis_node = None
1572 connection = None
1573 redirect_addr = None
1574 asking = False
1575 moved = False
1576 ttl = int(self.RedisClusterRequestTTL)
1578 # Start timing for observability
1579 start_time = time.monotonic()
1581 while ttl > 0:
1582 ttl -= 1
1583 try:
1584 if asking:
1585 target_node = self.get_node(node_name=redirect_addr)
1586 elif moved:
1587 # MOVED occurred and the slots cache was updated,
1588 # refresh the target node
1589 slot = self.determine_slot(*args)
1590 target_node = self.nodes_manager.get_node_from_slot(
1591 slot,
1592 self.read_from_replicas and command in READ_COMMANDS,
1593 self.load_balancing_strategy
1594 if command in READ_COMMANDS
1595 else None,
1596 )
1597 moved = False
1599 redis_node = self.get_redis_connection(target_node)
1600 connection = get_connection(redis_node)
1601 if asking:
1602 connection.send_command("ASKING")
1603 redis_node.parse_response(connection, "ASKING", **kwargs)
1604 asking = False
1605 connection.send_command(*args, **kwargs)
1606 response = redis_node.parse_response(connection, command, **kwargs)
1608 # Remove keys entry, it needs only for cache.
1609 kwargs.pop("keys", None)
1611 if command in self.cluster_response_callbacks:
1612 response = self.cluster_response_callbacks[command](
1613 response, **kwargs
1614 )
1616 self._record_command_metric(
1617 command_name=command,
1618 duration_seconds=time.monotonic() - start_time,
1619 connection=connection,
1620 )
1621 return response
1622 except AuthenticationError as e:
1623 e.connection = connection if connection is not None else target_node
1624 self._record_command_metric(
1625 command_name=command,
1626 duration_seconds=time.monotonic() - start_time,
1627 connection=e.connection,
1628 error=e,
1629 )
1630 raise
1631 except MaxConnectionsError as e:
1632 # MaxConnectionsError indicates client-side resource exhaustion
1633 # (too many connections in the pool), not a node failure.
1634 # Don't treat this as a node failure - just re-raise the error
1635 # without reinitializing the cluster.
1636 # The connection in the error is used to report the metrics based on host and port info
1637 # so we use the target node object which contains the host and port info
1638 # because we did not get the connection yet
1639 e.connection = target_node
1640 self._record_command_metric(
1641 command_name=command,
1642 duration_seconds=time.monotonic() - start_time,
1643 connection=e.connection,
1644 error=e,
1645 )
1646 raise
1647 except (ConnectionError, TimeoutError) as e:
1648 if is_debug_log_enabled():
1649 socket_address = self._extracts_socket_address(connection)
1650 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1651 logger.debug(
1652 f"{type(e).__name__} received for command {args_log_str}, on node {target_node.name}, "
1653 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1654 )
1655 # this is used to report the metrics based on host and port info
1656 e.connection = connection if connection else target_node
1658 # ConnectionError can also be raised if we couldn't get a
1659 # connection from the pool before timing out, so check that
1660 # this is an actual connection before attempting to disconnect.
1661 if connection is not None:
1662 connection.disconnect()
1664 # Instead of setting to None, properly handle the pool
1665 # Get the pool safely - redis_connection could be set to None
1666 # by another thread between the check and access
1667 redis_conn = target_node.redis_connection
1668 if redis_conn is not None:
1669 pool = redis_conn.connection_pool
1670 if pool is not None:
1671 with pool._lock:
1672 # take care for the active connections in the pool
1673 pool.update_active_connections_for_reconnect()
1674 # disconnect all free connections
1675 pool.disconnect_free_connections()
1677 # Move the failed node to the end of the cached nodes list
1678 self.nodes_manager.move_node_to_end_of_cached_nodes(target_node.name)
1680 # DON'T set redis_connection = None - keep the pool for reuse
1681 self.nodes_manager.initialize()
1682 self._record_command_metric(
1683 command_name=command,
1684 duration_seconds=time.monotonic() - start_time,
1685 connection=e.connection,
1686 error=e,
1687 )
1688 raise e
1689 except MovedError as e:
1690 if is_debug_log_enabled():
1691 socket_address = self._extracts_socket_address(connection)
1692 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1693 logger.debug(
1694 f"MOVED error received for command {args_log_str}, on node {target_node.name}, "
1695 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1696 )
1697 # First, we will try to patch the slots/nodes cache with the
1698 # redirected node output and try again. If MovedError exceeds
1699 # 'reinitialize_steps' number of times, we will force
1700 # reinitializing the tables, and then try again.
1701 # 'reinitialize_steps' counter will increase faster when
1702 # the same client object is shared between multiple threads. To
1703 # reduce the frequency you can set this variable in the
1704 # RedisCluster constructor.
1705 self.reinitialize_counter += 1
1706 if self._should_reinitialized():
1707 # during this call all connections are closed or marked for disconnect,
1708 # so we don't need to disconnect the changed node's connections
1709 self.nodes_manager.initialize(
1710 additional_startup_nodes_info=[(e.host, e.port)]
1711 )
1712 # Reset the counter
1713 self.reinitialize_counter = 0
1714 else:
1715 self.nodes_manager.move_slot(e)
1716 moved = True
1717 self._record_command_metric(
1718 command_name=command,
1719 duration_seconds=time.monotonic() - start_time,
1720 connection=connection,
1721 error=e,
1722 )
1723 self._record_error_metric(
1724 error=e,
1725 connection=connection,
1726 )
1727 except TryAgainError as e:
1728 if is_debug_log_enabled():
1729 socket_address = self._extracts_socket_address(connection)
1730 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1731 logger.debug(
1732 f"TRYAGAIN error received for command {args_log_str}, on node {target_node.name}, "
1733 f"and connection: {connection} using local socket address: {socket_address}"
1734 )
1735 if ttl < self.RedisClusterRequestTTL / 2:
1736 time.sleep(0.05)
1738 self._record_command_metric(
1739 command_name=command,
1740 duration_seconds=time.monotonic() - start_time,
1741 connection=connection,
1742 error=e,
1743 )
1744 self._record_error_metric(
1745 error=e,
1746 connection=connection,
1747 )
1748 except AskError as e:
1749 if is_debug_log_enabled():
1750 socket_address = self._extracts_socket_address(connection)
1751 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1752 logger.debug(
1753 f"ASK error received for command {args_log_str}, on node {target_node.name}, "
1754 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1755 )
1756 redirect_addr = get_node_name(host=e.host, port=e.port)
1757 asking = True
1759 self._record_command_metric(
1760 command_name=command,
1761 duration_seconds=time.monotonic() - start_time,
1762 connection=connection,
1763 error=e,
1764 )
1765 self._record_error_metric(
1766 error=e,
1767 connection=connection,
1768 )
1769 except (ClusterDownError, SlotNotCoveredError) as e:
1770 # ClusterDownError can occur during a failover and to get
1771 # self-healed, we will try to reinitialize the cluster layout
1772 # and retry executing the command
1774 # SlotNotCoveredError can occur when the cluster is not fully
1775 # initialized or can be temporary issue.
1776 # We will try to reinitialize the cluster topology
1777 # and retry executing the command
1779 time.sleep(0.25)
1780 self.nodes_manager.initialize()
1782 # if we have a connection, use it, otherwise use the target node
1783 # object which contains the host and port info
1784 # this is used to report the metrics based on host and port info
1785 e.connection = connection if connection else target_node
1786 self._record_command_metric(
1787 command_name=command,
1788 duration_seconds=time.monotonic() - start_time,
1789 connection=e.connection,
1790 error=e,
1791 )
1792 raise
1793 except ResponseError as e:
1794 # this is used to report the metrics based on host and port info
1795 # ResponseError typically happens after get_connection() succeeds,
1796 # so connection should be available
1797 e.connection = connection if connection else target_node
1798 self._record_command_metric(
1799 command_name=command,
1800 duration_seconds=time.monotonic() - start_time,
1801 connection=e.connection,
1802 error=e,
1803 )
1804 raise
1805 except Exception as e:
1806 if connection:
1807 connection.disconnect()
1809 # if we have a connection, use it, otherwise use the target node
1810 # object which contains the host and port info
1811 # this is used to report the metrics based on host and port info
1812 e.connection = connection if connection else target_node
1813 self._record_command_metric(
1814 command_name=command,
1815 duration_seconds=time.monotonic() - start_time,
1816 connection=e.connection,
1817 error=e,
1818 )
1819 raise e
1820 finally:
1821 if connection is not None:
1822 redis_node.connection_pool.release(connection)
1824 e = ClusterError("TTL exhausted.")
1825 # In this case we should have an active connection.
1826 # If we are here, we have received many MOVED or ASK errors and finally exhausted the TTL.
1827 # This means that we used an active connection to read from the socket.
1828 # This is used to report metrics based on the host and port information.
1829 e.connection = connection
1830 self._record_command_metric(
1831 command_name=command,
1832 duration_seconds=time.monotonic() - start_time,
1833 connection=connection,
1834 error=e,
1835 )
1836 raise e
1838 def _record_command_metric(
1839 self,
1840 command_name: str,
1841 duration_seconds: float,
1842 connection: Connection,
1843 error=None,
1844 ):
1845 """
1846 Records operation duration metric directly.
1847 """
1848 host = connection.host if connection else "unknown"
1849 port = connection.port if connection else 0
1850 db = str(connection.db) if connection and hasattr(connection, "db") else "0"
1852 record_operation_duration(
1853 command_name=command_name,
1854 duration_seconds=duration_seconds,
1855 server_address=host,
1856 server_port=port,
1857 db_namespace=db,
1858 error=error,
1859 )
1861 def _record_error_metric(
1862 self,
1863 error: Exception,
1864 connection: Connection,
1865 is_internal: bool = True,
1866 retry_attempts: Optional[int] = None,
1867 ):
1868 """
1869 Records error count metric directly.
1870 """
1871 record_error_count(
1872 server_address=connection.host,
1873 server_port=connection.port,
1874 network_peer_address=connection.host,
1875 network_peer_port=connection.port,
1876 error_type=error,
1877 retry_attempts=retry_attempts if retry_attempts is not None else 0,
1878 is_internal=is_internal,
1879 )
1881 def _extracts_socket_address(
1882 self, connection: Optional[Connection]
1883 ) -> Optional[int]:
1884 if connection is None:
1885 return None
1886 try:
1887 socket_address = (
1888 connection._sock.getsockname() if connection._sock else None
1889 )
1890 socket_address = socket_address[1] if socket_address else None
1891 except (AttributeError, OSError):
1892 pass
1893 return socket_address
1895 def close(self) -> None:
1896 try:
1897 with self._lock:
1898 if self.nodes_manager:
1899 self.nodes_manager.close()
1900 except AttributeError:
1901 # RedisCluster's __init__ can fail before nodes_manager is set
1902 pass
1904 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs):
1905 """
1906 Process the result of the executed command.
1907 The function would return a dict or a single value.
1909 :type command: str
1910 :type res: dict
1912 `res` should be in the following format:
1913 Dict<node_name, command_result>
1914 """
1915 if command in self.result_callbacks:
1916 res = self.result_callbacks[command](command, res, **kwargs)
1917 elif len(res) == 1:
1918 # When we execute the command on a single node, we can
1919 # remove the dictionary and return a single response
1920 res = list(res.values())[0]
1922 return self._policies_callback_mapping[response_policy](res)
1924 def load_external_module(self, funcname, func):
1925 """
1926 This function can be used to add externally defined redis modules,
1927 and their namespaces to the redis client.
1929 ``funcname`` - A string containing the name of the function to create
1930 ``func`` - The function, being added to this class.
1931 """
1932 setattr(self, funcname, func)
1934 def transaction(self, func, *watches, **kwargs):
1935 """
1936 Convenience method for executing the callable `func` as a transaction
1937 while watching all keys specified in `watches`. The 'func' callable
1938 should expect a single argument which is a Pipeline object.
1939 """
1940 shard_hint = kwargs.pop("shard_hint", None)
1941 value_from_callable = kwargs.pop("value_from_callable", False)
1942 watch_delay = kwargs.pop("watch_delay", None)
1943 with self.pipeline(True, shard_hint) as pipe:
1944 while True:
1945 try:
1946 if watches:
1947 pipe.watch(*watches)
1948 func_value = func(pipe)
1949 exec_value = pipe.execute()
1950 return func_value if value_from_callable else exec_value
1951 except WatchError:
1952 if watch_delay is not None and watch_delay > 0:
1953 time.sleep(watch_delay)
1954 continue
1957class ClusterNode:
1958 def __init__(self, host, port, server_type=None, redis_connection=None):
1959 if host == "localhost":
1960 host = socket.gethostbyname(host)
1962 self.host = host
1963 self.port = port
1964 self.name = get_node_name(host, port)
1965 self.server_type = server_type
1966 self.redis_connection = redis_connection
1968 def __repr__(self):
1969 return (
1970 f"[host={self.host},"
1971 f"port={self.port},"
1972 f"name={self.name},"
1973 f"server_type={self.server_type},"
1974 f"redis_connection={self.redis_connection}]"
1975 )
1977 def __eq__(self, obj):
1978 return isinstance(obj, ClusterNode) and obj.name == self.name
1980 def __hash__(self):
1981 return hash(self.name)
1984class LoadBalancingStrategy(Enum):
1985 ROUND_ROBIN = "round_robin"
1986 ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1987 RANDOM = "random"
1988 RANDOM_REPLICA = "random_replica"
1991class LoadBalancer:
1992 """
1993 Round-Robin Load Balancing
1994 """
1996 def __init__(self, start_index: int = 0) -> None:
1997 self.primary_to_idx: dict[str, int] = {}
1998 self.start_index: int = start_index
1999 self._lock: threading.Lock = threading.Lock()
2001 def get_server_index(
2002 self,
2003 primary: str,
2004 list_size: int,
2005 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
2006 ) -> int:
2007 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
2008 return self._get_random_server_index(
2009 list_size,
2010 replicas_only=True,
2011 )
2012 elif load_balancing_strategy == LoadBalancingStrategy.RANDOM:
2013 return self._get_random_server_index(
2014 list_size,
2015 replicas_only=False,
2016 )
2017 else:
2018 return self._get_round_robin_index(
2019 primary,
2020 list_size,
2021 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
2022 )
2024 def reset(self) -> None:
2025 with self._lock:
2026 self.primary_to_idx.clear()
2028 def _get_random_server_index(self, list_size: int, replicas_only: bool) -> int:
2029 return random.randint(1 if replicas_only else 0, list_size - 1)
2031 def _get_round_robin_index(
2032 self, primary: str, list_size: int, replicas_only: bool
2033 ) -> int:
2034 with self._lock:
2035 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
2036 if replicas_only and server_index == 0:
2037 # skip the primary node index
2038 server_index = 1
2039 # Update the index for the next round
2040 self.primary_to_idx[primary] = (server_index + 1) % list_size
2041 return server_index
2044class NodesManager:
2045 def __init__(
2046 self,
2047 startup_nodes: list[ClusterNode],
2048 from_url=False,
2049 require_full_coverage=False,
2050 lock: Optional[threading.RLock] = None,
2051 dynamic_startup_nodes=True,
2052 connection_pool_class=ConnectionPool,
2053 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
2054 cache: Optional[CacheInterface] = None,
2055 cache_config: Optional[CacheConfig] = None,
2056 cache_factory: Optional[CacheFactoryInterface] = None,
2057 event_dispatcher: Optional[EventDispatcher] = None,
2058 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
2059 **kwargs,
2060 ):
2061 self.nodes_cache: dict[str, ClusterNode] = {}
2062 self.slots_cache: dict[int, list[ClusterNode]] = {}
2063 self.startup_nodes: dict[str, ClusterNode] = {n.name: n for n in startup_nodes}
2064 self.default_node: Optional[ClusterNode] = None
2065 self._epoch: int = 0
2066 self.from_url = from_url
2067 self._require_full_coverage = require_full_coverage
2068 self._dynamic_startup_nodes = dynamic_startup_nodes
2069 self.connection_pool_class = connection_pool_class
2070 self.address_remap = address_remap
2071 self._cache: Optional[CacheInterface] = None
2072 if cache:
2073 self._cache = cache
2074 elif cache_factory is not None:
2075 self._cache = cache_factory.get_cache()
2076 elif cache_config is not None:
2077 self._cache = CacheFactory(cache_config).get_cache()
2078 self.connection_kwargs = kwargs
2079 self.read_load_balancer = LoadBalancer()
2081 # nodes_cache / slots_cache / startup_nodes / default_node are protected by _lock
2082 if lock is None:
2083 self._lock = threading.RLock()
2084 else:
2085 self._lock = lock
2087 # initialize holds _initialization_lock to dedup multiple calls to reinitialize;
2088 # note that if we hold both _lock and _initialization_lock, we _must_ acquire
2089 # _initialization_lock first (ie: to have a consistent order) to avoid deadlock.
2090 self._initialization_lock: threading.RLock = threading.RLock()
2092 if event_dispatcher is None:
2093 self._event_dispatcher = EventDispatcher()
2094 else:
2095 self._event_dispatcher = event_dispatcher
2096 self._credential_provider = self.connection_kwargs.get(
2097 "credential_provider", None
2098 )
2099 self.maint_notifications_config = maint_notifications_config
2101 self.initialize()
2103 def get_node(
2104 self,
2105 host: Optional[str] = None,
2106 port: Optional[int] = None,
2107 node_name: Optional[str] = None,
2108 ) -> Optional[ClusterNode]:
2109 """
2110 Get the requested node from the cluster's nodes.
2111 nodes.
2112 :return: ClusterNode if the node exists, else None
2113 """
2114 if host and port:
2115 # the user passed host and port
2116 if host == "localhost":
2117 host = socket.gethostbyname(host)
2118 with self._lock:
2119 return self.nodes_cache.get(get_node_name(host=host, port=port))
2120 elif node_name:
2121 with self._lock:
2122 return self.nodes_cache.get(node_name)
2123 else:
2124 return None
2126 def move_slot(self, e: Union[AskError, MovedError]):
2127 """
2128 Update the slot's node with the redirected one
2129 """
2130 node_changed = False
2131 with self._lock:
2132 redirected_node = self.get_node(host=e.host, port=e.port)
2133 if redirected_node is not None:
2134 # The node already exists
2135 if redirected_node.server_type is not PRIMARY:
2136 # Update the node's server type
2137 redirected_node.server_type = PRIMARY
2138 else:
2139 # This is a new node, we will add it to the nodes cache
2140 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
2141 self.nodes_cache[redirected_node.name] = redirected_node
2143 slot_nodes = self.slots_cache[e.slot_id]
2144 if redirected_node not in slot_nodes:
2145 # The new slot owner is a new server, or a server from a different
2146 # shard. We need to remove all current nodes from the slot's list
2147 # (including replications) and add just the new node.
2148 self.slots_cache[e.slot_id] = [redirected_node]
2149 node_changed = True
2150 elif redirected_node is not slot_nodes[0]:
2151 # The MOVED error resulted from a failover, and the new slot owner
2152 # had previously been a replica.
2153 old_primary = slot_nodes[0]
2154 # Update the old primary to be a replica and add it to the end of
2155 # the slot's node list
2156 old_primary.server_type = REPLICA
2157 slot_nodes.append(old_primary)
2158 # Remove the old replica, which is now a primary, from the slot's
2159 # node list
2160 slot_nodes.remove(redirected_node)
2161 # Override the old primary with the new one
2162 slot_nodes[0] = redirected_node
2163 if self.default_node == old_primary:
2164 # Update the default node with the new primary
2165 self.default_node = redirected_node
2166 node_changed = True
2167 # else: circular MOVED to current primary -> no-op
2168 # Dispatch outside the lock so listeners can acquire their own locks
2169 # without risk of deadlock. Skipped on the no-op branch to avoid
2170 # needless reconciliation walks under MOVED storms. A listener must
2171 # not break slots-cache refresh; log and continue so a single buggy
2172 # listener cannot starve the rest.
2173 if node_changed:
2174 try:
2175 self._event_dispatcher.dispatch(AfterSlotsCacheRefreshEvent())
2176 except Exception as exc:
2177 # Don't shadow the method parameter ``e``: ``except as`` binds
2178 # the listener exception in the function scope and ``del``s
2179 # the name on block exit (PEP 3134), which would also wipe
2180 # out the original AskError/MovedError parameter.
2181 logger.exception(
2182 "listener raised during slots-cache refresh: %s: %s",
2183 type(exc).__name__,
2184 exc,
2185 )
2187 @deprecated_args(
2188 args_to_warn=["server_type"],
2189 reason=(
2190 "In case you need select some load balancing strategy "
2191 "that will use replicas, please set it through 'load_balancing_strategy'"
2192 ),
2193 version="5.3.0",
2194 )
2195 def get_node_from_slot(
2196 self,
2197 slot: int,
2198 read_from_replicas: bool = False,
2199 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2200 server_type: Optional[Literal["primary", "replica"]] = None,
2201 ) -> ClusterNode:
2202 """
2203 Gets a node that servers this hash slot
2204 """
2206 if read_from_replicas is True and load_balancing_strategy is None:
2207 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
2209 with self._lock:
2210 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
2211 raise SlotNotCoveredError(
2212 f'Slot "{slot}" not covered by the cluster. '
2213 + f'"require_full_coverage={self._require_full_coverage}"'
2214 )
2216 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
2217 # get the server index using the strategy defined in load_balancing_strategy
2218 primary_name = self.slots_cache[slot][0].name
2219 node_idx = self.read_load_balancer.get_server_index(
2220 primary_name, len(self.slots_cache[slot]), load_balancing_strategy
2221 )
2222 elif (
2223 server_type is None
2224 or server_type == PRIMARY
2225 or len(self.slots_cache[slot]) == 1
2226 ):
2227 # return a primary
2228 node_idx = 0
2229 else:
2230 # return a replica
2231 # randomly choose one of the replicas
2232 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
2234 return self.slots_cache[slot][node_idx]
2236 def get_nodes_by_server_type(self, server_type: Literal["primary", "replica"]):
2237 """
2238 Get all nodes with the specified server type
2239 :param server_type: 'primary' or 'replica'
2240 :return: list of ClusterNode
2241 """
2242 with self._lock:
2243 return [
2244 node
2245 for node in self.nodes_cache.values()
2246 if node.server_type == server_type
2247 ]
2249 @deprecated_function(
2250 reason="This method is not used anymore internally. The startup nodes are populated automatically.",
2251 version="7.0.2",
2252 )
2253 def populate_startup_nodes(self, nodes):
2254 """
2255 Populate all startup nodes and filters out any duplicates
2256 """
2257 with self._lock:
2258 for n in nodes:
2259 self.startup_nodes[n.name] = n
2261 def move_node_to_end_of_cached_nodes(self, node_name: str) -> None:
2262 """
2263 Move a failing node to the end of startup_nodes and nodes_cache so it's
2264 tried last during reinitialization and when selecting the default node.
2265 If the node is not in the respective list, nothing is done.
2266 """
2267 # Move in startup_nodes
2268 if node_name in self.startup_nodes and len(self.startup_nodes) > 1:
2269 node = self.startup_nodes.pop(node_name)
2270 self.startup_nodes[node_name] = node # Re-insert at end
2272 # Move in nodes_cache - this affects get_nodes_by_server_type ordering
2273 # which is used to select the default_node during initialize()
2274 if node_name in self.nodes_cache and len(self.nodes_cache) > 1:
2275 node = self.nodes_cache.pop(node_name)
2276 self.nodes_cache[node_name] = node # Re-insert at end
2278 def check_slots_coverage(self, slots_cache):
2279 # Validate if all slots are covered or if we should try next
2280 # startup node
2281 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
2282 if i not in slots_cache:
2283 return False
2284 return True
2286 def create_redis_connections(self, nodes):
2287 """
2288 This function will create a redis connection to all nodes in :nodes:
2289 """
2290 connection_pools = []
2291 for node in nodes:
2292 if node.redis_connection is None:
2293 node.redis_connection = self.create_redis_node(
2294 host=node.host,
2295 port=node.port,
2296 maint_notifications_config=self.maint_notifications_config,
2297 **self.connection_kwargs,
2298 )
2299 connection_pools.append(node.redis_connection.connection_pool)
2301 self._event_dispatcher.dispatch(
2302 AfterPooledConnectionsInstantiationEvent(
2303 connection_pools, ClientType.SYNC, self._credential_provider
2304 )
2305 )
2307 def create_redis_node(
2308 self,
2309 host,
2310 port,
2311 **kwargs,
2312 ):
2313 # We are configuring the connection pool not to retry
2314 # connections on lower level clients to avoid retrying
2315 # connections to nodes that are not reachable
2316 # and to avoid blocking the connection pool.
2317 # The only error that will have some handling in the lower
2318 # level clients is ConnectionError which will trigger disconnection
2319 # of the socket.
2320 # The retries will be handled on cluster client level
2321 # where we will have proper handling of the cluster topology
2322 node_retry_config = Retry(
2323 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
2324 )
2326 if self.from_url:
2327 # Create a redis node with a custom connection pool
2328 kwargs.update({"host": host})
2329 kwargs.update({"port": port})
2330 kwargs.update({"cache": self._cache})
2331 kwargs.update({"retry": node_retry_config})
2332 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
2333 else:
2334 r = Redis(
2335 host=host,
2336 port=port,
2337 cache=self._cache,
2338 retry=node_retry_config,
2339 **kwargs,
2340 )
2341 return r
2343 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
2344 node_name = get_node_name(host, port)
2345 # check if we already have this node in the tmp_nodes_cache
2346 target_node = tmp_nodes_cache.get(node_name)
2347 if target_node is None:
2348 # before creating a new cluster node, check if the cluster node already
2349 # exists in the current nodes cache and has a valid connection so we can
2350 # reuse it
2351 redis_connection: Optional[Redis] = None
2352 with self._lock:
2353 previous_node = self.nodes_cache.get(node_name)
2354 if previous_node:
2355 redis_connection = previous_node.redis_connection
2356 # don't update the old ClusterNode, so we don't update its role
2357 # outside of the lock
2358 target_node = ClusterNode(host, port, role, redis_connection)
2359 # add this node to the nodes cache
2360 tmp_nodes_cache[target_node.name] = target_node
2362 return target_node
2364 def _get_epoch(self) -> int:
2365 """
2366 Get the current epoch value. This method exists primarily to allow
2367 tests to mock the epoch fetch and control race condition timing.
2368 """
2369 with self._lock:
2370 return self._epoch
2372 def initialize(
2373 self,
2374 additional_startup_nodes_info: Optional[List[Tuple[str, int]]] = None,
2375 disconnect_startup_nodes_pools: bool = True,
2376 ):
2377 """
2378 Initializes the nodes cache, slots cache and redis connections.
2379 :startup_nodes:
2380 Responsible for discovering other nodes in the cluster
2381 :disconnect_startup_nodes_pools:
2382 Whether to disconnect the connection pool of the startup nodes
2383 after the initialization is complete. This is useful when the
2384 startup nodes are not part of the cluster and we want to avoid
2385 keeping the connection open.
2386 :additional_startup_nodes_info:
2387 Additional nodes to add temporarily to the startup nodes.
2388 The additional nodes will be used just in the process of extraction of the slots
2389 and nodes information from the cluster.
2390 This is useful when we want to add new nodes to the cluster
2391 and initialize the client
2392 with them.
2393 The format of the list is a list of tuples, where each tuple contains
2394 the host and port of the node.
2395 """
2396 self.reset()
2397 tmp_nodes_cache = {}
2398 tmp_slots = {}
2399 disagreements = []
2400 startup_nodes_reachable = False
2401 fully_covered = False
2402 kwargs = self.connection_kwargs
2403 exception = None
2404 epoch = self._get_epoch()
2405 if additional_startup_nodes_info is None:
2406 additional_startup_nodes_info = []
2408 with self._initialization_lock:
2409 with self._lock:
2410 if epoch != self._epoch:
2411 # another thread has already re-initialized the nodes; don't
2412 # bother running again
2413 return
2415 with self._lock:
2416 startup_nodes = tuple(self.startup_nodes.values())
2418 additional_startup_nodes = [
2419 ClusterNode(host, port) for host, port in additional_startup_nodes_info
2420 ]
2421 if is_debug_log_enabled():
2422 logger.debug(
2423 f"Topology refresh: using additional nodes: {[node.name for node in additional_startup_nodes]}; "
2424 f"and startup nodes: {[node.name for node in startup_nodes]}"
2425 )
2427 for startup_node in (*startup_nodes, *additional_startup_nodes):
2428 try:
2429 if startup_node.redis_connection:
2430 r = startup_node.redis_connection
2432 else:
2433 # Create a new Redis connection
2434 if is_debug_log_enabled():
2435 socket_timeout = kwargs.get("socket_timeout", "not set")
2436 socket_connect_timeout = kwargs.get(
2437 "socket_connect_timeout", "not set"
2438 )
2439 maint_enabled = (
2440 self.maint_notifications_config.enabled
2441 if self.maint_notifications_config
2442 else False
2443 )
2444 logger.debug(
2445 "Topology refresh: Creating new Redis connection to "
2446 f"{startup_node.host}:{startup_node.port}; "
2447 f"with socket_timeout: {socket_timeout}, and "
2448 f"socket_connect_timeout: {socket_connect_timeout}, "
2449 "and maint_notifications enabled: "
2450 f"{maint_enabled}"
2451 )
2452 r = self.create_redis_node(
2453 startup_node.host,
2454 startup_node.port,
2455 maint_notifications_config=self.maint_notifications_config,
2456 **kwargs,
2457 )
2458 if startup_node in self.startup_nodes.values():
2459 self.startup_nodes[startup_node.name].redis_connection = r
2460 else:
2461 startup_node.redis_connection = r
2462 try:
2463 # Make sure cluster mode is enabled on this node
2464 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
2465 if disconnect_startup_nodes_pools:
2466 with r.connection_pool._lock:
2467 # take care to clear connections before we move on
2468 # mark all active connections for reconnect - they will be
2469 # reconnected on next use, but will allow current in flight commands to complete first
2470 r.connection_pool.update_active_connections_for_reconnect()
2471 # Needed to clear READONLY state when it is no longer applicable
2472 r.connection_pool.disconnect_free_connections()
2473 except ResponseError:
2474 raise RedisClusterException(
2475 "Cluster mode is not enabled on this node"
2476 )
2477 startup_nodes_reachable = True
2478 except Exception as e:
2479 # Try the next startup node.
2480 # The exception is saved and raised only if we have no more nodes.
2481 exception = e
2482 continue
2484 # CLUSTER SLOTS command results in the following output:
2485 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
2486 # where each node contains the following list: [IP, port, node_id]
2487 # Therefore, cluster_slots[0][2][0] will be the IP address of the
2488 # primary node of the first slot section.
2489 # If there's only one server in the cluster, its ``host`` is ''
2490 # Fix it to the host in startup_nodes
2491 if (
2492 len(cluster_slots) == 1
2493 and len(cluster_slots[0][2][0]) == 0
2494 and len(self.startup_nodes) == 1
2495 ):
2496 cluster_slots[0][2][0] = startup_node.host
2498 for slot in cluster_slots:
2499 primary_node = slot[2]
2500 host = str_if_bytes(primary_node[0])
2501 if host == "":
2502 host = startup_node.host
2503 port = int(primary_node[1])
2504 host, port = self.remap_host_port(host, port)
2506 nodes_for_slot = []
2508 target_node = self._get_or_create_cluster_node(
2509 host, port, PRIMARY, tmp_nodes_cache
2510 )
2511 nodes_for_slot.append(target_node)
2513 replica_nodes = slot[3:]
2514 for replica_node in replica_nodes:
2515 host = str_if_bytes(replica_node[0])
2516 port = int(replica_node[1])
2517 host, port = self.remap_host_port(host, port)
2518 target_replica_node = self._get_or_create_cluster_node(
2519 host, port, REPLICA, tmp_nodes_cache
2520 )
2521 nodes_for_slot.append(target_replica_node)
2523 for i in range(int(slot[0]), int(slot[1]) + 1):
2524 if i not in tmp_slots:
2525 tmp_slots[i] = nodes_for_slot
2526 else:
2527 # Validate that 2 nodes want to use the same slot cache
2528 # setup
2529 tmp_slot = tmp_slots[i][0]
2530 if tmp_slot.name != target_node.name:
2531 disagreements.append(
2532 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
2533 )
2535 if len(disagreements) > 5:
2536 raise RedisClusterException(
2537 f"startup_nodes could not agree on a valid "
2538 f"slots cache: {', '.join(disagreements)}"
2539 )
2541 fully_covered = self.check_slots_coverage(tmp_slots)
2542 if fully_covered:
2543 # Don't need to continue to the next startup node if all
2544 # slots are covered
2545 break
2547 if not startup_nodes_reachable:
2548 raise RedisClusterException(
2549 f"Redis Cluster cannot be connected. Please provide at least "
2550 f"one reachable node: {str(exception)}"
2551 ) from exception
2553 # Create Redis connections to all nodes
2554 self.create_redis_connections(list(tmp_nodes_cache.values()))
2556 # Check if the slots are not fully covered
2557 if not fully_covered and self._require_full_coverage:
2558 # Despite the requirement that the slots be covered, there
2559 # isn't a full coverage
2560 raise RedisClusterException(
2561 f"All slots are not covered after query all startup_nodes. "
2562 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
2563 f"covered..."
2564 )
2566 # Set the tmp variables to the real variables
2567 with self._lock:
2568 self.nodes_cache = tmp_nodes_cache
2569 self.slots_cache = tmp_slots
2570 # Set the default node
2571 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
2572 if self._dynamic_startup_nodes:
2573 # Populate the startup nodes with all discovered nodes
2574 self.startup_nodes = tmp_nodes_cache
2575 # Increment the epoch to signal that initialization has completed
2576 self._epoch += 1
2577 # Dispatch so listeners (e.g. ClusterPubSub) can reconcile per-node
2578 # state after slot ownership may have changed. A listener must not
2579 # break slots-cache refresh; log and continue so a single buggy
2580 # listener cannot starve the rest.
2581 try:
2582 self._event_dispatcher.dispatch(AfterSlotsCacheRefreshEvent())
2583 except Exception as e:
2584 logger.exception(
2585 "listener raised during slots-cache refresh: %s: %s",
2586 type(e).__name__,
2587 e,
2588 )
2590 def close(self) -> None:
2591 with self._lock:
2592 self.default_node = None
2593 nodes = tuple(self.nodes_cache.values())
2594 for node in nodes:
2595 if node.redis_connection:
2596 node.redis_connection.close()
2598 def reset(self):
2599 try:
2600 self.read_load_balancer.reset()
2601 except TypeError:
2602 # The read_load_balancer is None, do nothing
2603 pass
2605 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
2606 """
2607 Remap the host and port returned from the cluster to a different
2608 internal value. Useful if the client is not connecting directly
2609 to the cluster.
2610 """
2611 if self.address_remap:
2612 return self.address_remap((host, port))
2613 return host, port
2615 def find_connection_owner(self, connection: Connection) -> Optional[ClusterNode]:
2616 node_name = get_node_name(connection.host, connection.port)
2617 with self._lock:
2618 for node in tuple(self.nodes_cache.values()):
2619 if node.redis_connection:
2620 conn_args = node.redis_connection.connection_pool.connection_kwargs
2621 if node_name == get_node_name(
2622 conn_args.get("host"), conn_args.get("port")
2623 ):
2624 return node
2625 return None
2628def _unregister_slots_cache_listener(
2629 dispatcher_ref: "weakref.ref[EventDispatcher]",
2630 listener: EventListenerInterface,
2631 event_type: Type[object],
2632) -> None:
2633 # Module-level finalizer callback. Kept free of strong references to the
2634 # owning ClusterPubSub so attaching it via weakref.finalize does not
2635 # extend the pubsub's lifetime.
2636 dispatcher = dispatcher_ref()
2637 if dispatcher is not None:
2638 dispatcher.unregister_listeners({event_type: [listener]})
2641class ClusterPubSubSlotsCacheListener(EventListenerInterface):
2642 """
2643 Listener that forwards AfterSlotsCacheRefreshEvent to a ClusterPubSub.
2645 Holds a weak reference to the pubsub so it does not keep the instance
2646 alive. Deterministic cleanup of the dispatcher's strong reference to this
2647 listener is performed by a ``weakref.finalize`` attached to the owning
2648 ClusterPubSub in ``ClusterPubSub.__init__``.
2649 """
2651 def __init__(self, pubsub: "ClusterPubSub") -> None:
2652 self._pubsub_ref: "weakref.ref[ClusterPubSub]" = weakref.ref(pubsub)
2654 def listen(self, event: object) -> None:
2655 pubsub = self._pubsub_ref()
2656 if pubsub is None:
2657 # Race window between pubsub GC and the finalizer running; safe
2658 # no-op, finalizer will remove this listener shortly.
2659 return
2660 try:
2661 pubsub.on_slots_changed()
2662 except Exception as e:
2663 # Listeners must not break slots-cache refresh; log and continue so
2664 # a single buggy pubsub cannot starve the rest.
2665 logger.exception(
2666 "pubsub %r raised during slots-cache change: %s: %s",
2667 pubsub,
2668 type(e).__name__,
2669 e,
2670 )
2673class ClusterPubSub(PubSub):
2674 """
2675 Wrapper for PubSub class.
2677 IMPORTANT: before using ClusterPubSub, read about the known limitations
2678 with pubsub in Cluster mode and learn how to workaround them:
2679 https://redis.readthedocs.io/en/stable/clustering.html#known-pubsub-limitations
2680 """
2682 def __init__(
2683 self,
2684 redis_cluster,
2685 node=None,
2686 host=None,
2687 port=None,
2688 push_handler_func=None,
2689 event_dispatcher: Optional["EventDispatcher"] = None,
2690 **kwargs,
2691 ):
2692 """
2693 When a pubsub instance is created without specifying a node, a single
2694 node will be transparently chosen for the pubsub connection on the
2695 first command execution. The node will be determined by:
2696 1. Hashing the channel name in the request to find its keyslot
2697 2. Selecting a node that handles the keyslot: If read_from_replicas is
2698 set to true or load_balancing_strategy is set, a replica can be selected.
2700 :type redis_cluster: RedisCluster
2701 :type node: ClusterNode
2702 :type host: str
2703 :type port: int
2704 """
2705 self.node = None
2706 self.set_pubsub_node(redis_cluster, node, host, port)
2707 connection_pool = (
2708 None
2709 if self.node is None
2710 else redis_cluster.get_redis_connection(self.node).connection_pool
2711 )
2712 self.cluster = redis_cluster
2713 self.node_pubsub_mapping = {}
2714 # Reverse index: shard channel (normalized) -> owning node.name. Used to
2715 # route sunsubscribe calls and reconcile subscriptions after slot
2716 # migration / failover.
2717 self._shard_channel_to_node: dict = {}
2718 # Dedicated lock for shard-subscription bookkeeping. Distinct from
2719 # PubSub.self._lock (which serializes wire I/O on the cluster-level
2720 # connection used by aclose / send_command / regular subscribe) so
2721 # that reconciliation cannot starve those unrelated paths during
2722 # long per-channel migrations.
2723 self._shard_state_lock: threading.RLock = threading.RLock()
2724 # Worker executor for off-loading slot-migration reconciliation from
2725 # the dispatch call site (mirrors async's asyncio.create_task model so
2726 # the thread that triggered MovedError / topology refresh is not
2727 # blocked on per-channel sunsubscribe / ssubscribe network I/O).
2728 # Lazy-created on first on_slots_changed() to avoid a persistent
2729 # worker thread for pubsubs that never see a slot migration.
2730 # Initialized before super().__init__() because PubSub.__init__ calls
2731 # self.reset(), which resolves to ClusterPubSub.reset() and reads
2732 # these attributes.
2733 self._reconcile_executor: Optional[ThreadPoolExecutor] = None
2734 # In-flight reconciliation futures; tracked so reset() can cancel
2735 # pending work and so exceptions surface via a done-callback.
2736 self._reconcile_futures: Set[Future] = set()
2737 self._pubsubs_generator = self._pubsubs_generator()
2738 if event_dispatcher is None:
2739 self._event_dispatcher = EventDispatcher()
2740 else:
2741 self._event_dispatcher = event_dispatcher
2742 super().__init__(
2743 connection_pool=connection_pool,
2744 encoder=redis_cluster.encoder,
2745 push_handler_func=push_handler_func,
2746 event_dispatcher=self._event_dispatcher,
2747 **kwargs,
2748 )
2749 # Subscribe to slots-cache change notifications so shard subscriptions
2750 # can be reconciled automatically after topology refreshes.
2751 nm_dispatcher = redis_cluster.nodes_manager._event_dispatcher
2752 self._slots_cache_listener = ClusterPubSubSlotsCacheListener(self)
2753 nm_dispatcher.register_listeners(
2754 {AfterSlotsCacheRefreshEvent: [self._slots_cache_listener]}
2755 )
2756 # Deterministic GC-time cleanup so short-lived pubsubs do not leak
2757 # listeners in the dispatcher when no slots-refresh event ever fires.
2758 weakref.finalize(
2759 self,
2760 _unregister_slots_cache_listener,
2761 weakref.ref(nm_dispatcher),
2762 self._slots_cache_listener,
2763 AfterSlotsCacheRefreshEvent,
2764 )
2766 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
2767 """
2768 The pubsub node will be set according to the passed node, host and port
2769 When none of the node, host, or port are specified - the node is set
2770 to None and will be determined by the keyslot of the channel in the
2771 first command to be executed.
2772 RedisClusterException will be thrown if the passed node does not exist
2773 in the cluster.
2774 If host is passed without port, or vice versa, a DataError will be
2775 thrown.
2776 :type cluster: RedisCluster
2777 :type node: ClusterNode
2778 :type host: str
2779 :type port: int
2780 """
2781 if node is not None:
2782 # node is passed by the user
2783 self._raise_on_invalid_node(cluster, node, node.host, node.port)
2784 pubsub_node = node
2785 elif host is not None and port is not None:
2786 # host and port passed by the user
2787 node = cluster.get_node(host=host, port=port)
2788 self._raise_on_invalid_node(cluster, node, host, port)
2789 pubsub_node = node
2790 elif any([host, port]) is True:
2791 # only 'host' or 'port' passed
2792 raise DataError("Passing a host requires passing a port, and vice versa")
2793 else:
2794 # nothing passed by the user. set node to None
2795 pubsub_node = None
2797 self.node = pubsub_node
2799 def get_pubsub_node(self):
2800 """
2801 Get the node that is being used as the pubsub connection
2802 """
2803 return self.node
2805 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
2806 """
2807 Raise a RedisClusterException if the node is None or doesn't exist in
2808 the cluster.
2809 """
2810 if node is None or redis_cluster.get_node(node_name=node.name) is None:
2811 raise RedisClusterException(
2812 f"Node {host}:{port} doesn't exist in the cluster"
2813 )
2815 def execute_command(self, *args):
2816 """
2817 Execute a subscribe/unsubscribe command.
2819 Taken code from redis-py and tweak to make it work within a cluster.
2820 """
2821 # NOTE: don't parse the response in this function -- it could pull a
2822 # legitimate message off the stack if the connection is already
2823 # subscribed to one or more channels
2825 if self.connection is None:
2826 if self.connection_pool is None:
2827 if len(args) > 1:
2828 # Hash the first channel and get one of the nodes holding
2829 # this slot
2830 channel = args[1]
2831 slot = self.cluster.keyslot(channel)
2832 node = self.cluster.nodes_manager.get_node_from_slot(
2833 slot,
2834 self.cluster.read_from_replicas,
2835 self.cluster.load_balancing_strategy,
2836 )
2837 else:
2838 # Get a random node
2839 node = self.cluster.get_random_node()
2840 self.node = node
2841 redis_connection = self.cluster.get_redis_connection(node)
2842 self.connection_pool = redis_connection.connection_pool
2843 self.connection = self.connection_pool.get_connection()
2844 # register a callback that re-subscribes to any channels we
2845 # were listening to when we were disconnected
2846 self.connection.register_connect_callback(self.on_connect)
2847 if self.push_handler_func is not None:
2848 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
2849 self._event_dispatcher.dispatch(
2850 AfterPubSubConnectionInstantiationEvent(
2851 self.connection, self.connection_pool, ClientType.SYNC, self._lock
2852 )
2853 )
2854 connection = self.connection
2855 self._execute(connection, connection.send_command, *args)
2857 def _resubscribe_shard_channels(self) -> None:
2858 # A single node can own multiple slot ranges, so a batched
2859 # ``SSUBSCRIBE`` covering every tracked channel would be rejected by
2860 # Redis with a ``CROSSSLOT`` error. Group by hash slot and emit one
2861 # ``SSUBSCRIBE`` per slot.
2862 by_slot: defaultdict[int, dict] = defaultdict(dict)
2863 for k, v in self.shard_channels.items():
2864 by_slot[key_slot(self.encoder.encode(k))][k] = v
2865 for subscriptions in by_slot.values():
2866 self._resubscribe(subscriptions, self.ssubscribe)
2868 def _get_node_pubsub(self, node):
2869 try:
2870 return self.node_pubsub_mapping[node.name]
2871 except KeyError:
2872 redis_connection = self.cluster.get_redis_connection(node)
2873 pubsub = redis_connection.pubsub(
2874 push_handler_func=self.push_handler_func,
2875 )
2876 # Replay shard subscriptions on reconnect with slot-aware grouping
2877 # so that channels spanning multiple slots owned by this node do
2878 # not trigger a CROSSSLOT error.
2879 pubsub._resubscribe_shard_channels = MethodType(
2880 ClusterPubSub._resubscribe_shard_channels, pubsub
2881 )
2882 self.node_pubsub_mapping[node.name] = pubsub
2883 return pubsub
2885 def _find_node_name_for_pubsub(self, pubsub):
2886 for node_name, node_pubsub in self.node_pubsub_mapping.items():
2887 if node_pubsub is pubsub:
2888 return node_name
2889 return None
2891 def _sharded_message_generator(self, timeout=0.0):
2892 for _ in range(len(self.node_pubsub_mapping)):
2893 pubsub = next(self._pubsubs_generator)
2894 # Don't pass ignore_subscribe_messages here - let get_sharded_message
2895 # handle the filtering after processing subscription state changes
2896 message = pubsub.get_message(
2897 ignore_subscribe_messages=False, timeout=timeout
2898 )
2899 if message is not None:
2900 return pubsub, message
2901 return None, None
2903 def _pubsubs_generator(self):
2904 while True:
2905 current_nodes = list(self.node_pubsub_mapping.values())
2906 if not current_nodes:
2907 return # Avoid infinite loop when no subscriptions exist
2908 yield from current_nodes
2910 def get_sharded_message(
2911 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
2912 ):
2913 if target_node:
2914 # Use .get(): migration-driven cleanup in the sunsubscribe branch
2915 # below and reset() both remove entries from node_pubsub_mapping,
2916 # so a caller polling with target_node may race the cleanup. Match
2917 # the async counterpart's None-handling rather than raising
2918 # KeyError. None pubsub falls through to "no message available".
2919 pubsub = self.node_pubsub_mapping.get(target_node.name)
2920 if pubsub is not None:
2921 # Don't pass ignore_subscribe_messages here - let get_sharded_message
2922 # handle the filtering after processing subscription state changes
2923 message = pubsub.get_message(
2924 ignore_subscribe_messages=False, timeout=timeout
2925 )
2926 else:
2927 message = None
2928 else:
2929 pubsub, message = self._sharded_message_generator(timeout=timeout)
2930 if message is None:
2931 return None
2932 # Only sunsubscribe mutates cluster-level shard state; bypassing the
2933 # lock on the data-message hot path keeps smessage delivery from
2934 # competing with the reconciliation worker for _shard_state_lock.
2935 if str_if_bytes(message["type"]) == "sunsubscribe":
2936 # Serialize state mutation against reinitialize_shard_subscriptions
2937 # (worker thread). The blocking get_message above intentionally
2938 # runs outside the lock so reconciliation is not stalled by long
2939 # polls.
2940 with self._shard_state_lock:
2941 if message["channel"] in self.pending_unsubscribe_shard_channels:
2942 # User-initiated sunsubscribe: drop from cluster-level tracking.
2943 self.pending_unsubscribe_shard_channels.remove(message["channel"])
2944 self.shard_channels.pop(message["channel"], None)
2945 self._shard_channel_to_node.pop(message["channel"], None)
2946 # Drop the per-node pubsub that delivered the confirmation once
2947 # it no longer holds any shard subscriptions, regardless of
2948 # whether the sunsubscribe was user-initiated or driven by
2949 # slot-migration reconciliation (_migrate_shard_channel, which
2950 # intentionally does not add the channel to
2951 # pending_unsubscribe_shard_channels). This releases the
2952 # dedicated connection that would otherwise linger.
2953 # Identifying the receiving pubsub directly (rather than via
2954 # the cluster's current slot map) is required after slot
2955 # migration, where the channel's owner is no longer the node
2956 # that received our original SSUBSCRIBE.
2957 if pubsub is not None and not pubsub.subscribed:
2958 name = self._find_node_name_for_pubsub(pubsub)
2959 if name is not None:
2960 try:
2961 pubsub.reset()
2962 except Exception:
2963 pass
2964 self.node_pubsub_mapping.pop(name, None)
2965 # Mirror PubSub.handle_message: the empty-check belongs in the
2966 # unsubscribe branch since that is the only path that can
2967 # reduce shard_channels here.
2968 if not self.channels and not self.patterns and not self.shard_channels:
2969 self.subscribed_event.clear()
2970 # Only suppress subscribe/unsubscribe messages, not data messages (smessage)
2971 if str_if_bytes(message["type"]) in ("ssubscribe", "sunsubscribe"):
2972 if self.ignore_subscribe_messages or ignore_subscribe_messages:
2973 return None
2974 return message
2976 def ssubscribe(self, *args, **kwargs):
2977 if args:
2978 args = list_or_args(args[0], args[1:])
2979 s_channels = dict.fromkeys(args)
2980 s_channels.update(kwargs)
2981 # Serialize against reinitialize_shard_subscriptions (worker thread)
2982 # so the reverse index, shard_channels, and node_pubsub_mapping are
2983 # not mutated concurrently.
2984 with self._shard_state_lock:
2985 for s_channel, handler in s_channels.items():
2986 node = self.cluster.get_node_from_key(s_channel)
2987 if not node:
2988 continue
2989 # Lazy re-route: if this channel is already tracked against a
2990 # different node (e.g. after a slot migration), migrate it now
2991 # so the caller's intent is applied on the current owner.
2992 normalized_key = next(iter(self._normalize_keys({s_channel: None})))
2993 old_name = self._shard_channel_to_node.get(normalized_key)
2994 if old_name and old_name != node.name:
2995 # Match PubSub.ssubscribe() dict.update() semantics: the
2996 # caller's newly supplied handler (including None) always
2997 # overrides any previously registered handler.
2998 self._migrate_shard_channel(
2999 normalized_key,
3000 handler,
3001 old_name,
3002 node,
3003 )
3004 continue
3005 pubsub = self._get_node_pubsub(node)
3006 if handler:
3007 pubsub.ssubscribe(**{s_channel: handler})
3008 else:
3009 pubsub.ssubscribe(s_channel)
3010 self.shard_channels.update(pubsub.shard_channels)
3011 self._shard_channel_to_node[normalized_key] = node.name
3012 self.pending_unsubscribe_shard_channels.difference_update(
3013 self._normalize_keys({s_channel: None})
3014 )
3015 if pubsub.subscribed and not self.subscribed:
3016 self.subscribed_event.set()
3017 self.health_check_response_counter = 0
3019 def sunsubscribe(self, *args):
3020 if args:
3021 args = list_or_args(args[0], args[1:])
3022 else:
3023 args = list(self.shard_channels)
3025 # Serialize against reinitialize_shard_subscriptions: the reverse
3026 # index and node_pubsub_mapping must not change between the lookup
3027 # and the per-node sunsubscribe call below.
3028 with self._shard_state_lock:
3029 for s_channel in args:
3030 normalized_key = next(iter(self._normalize_keys({s_channel: None})))
3031 # Route via the reverse index so we unsubscribe on the node
3032 # that actually holds the subscription. After a slot migration
3033 # the cluster's current owner may no longer be that node.
3034 name = self._shard_channel_to_node.get(normalized_key)
3035 if name and name in self.node_pubsub_mapping:
3036 p = self.node_pubsub_mapping[name]
3037 else:
3038 node = self.cluster.get_node_from_key(s_channel)
3039 if not node or node.name not in self.node_pubsub_mapping:
3040 continue
3041 p = self.node_pubsub_mapping[node.name]
3042 p.sunsubscribe(s_channel)
3043 self.pending_unsubscribe_shard_channels.update(
3044 p.pending_unsubscribe_shard_channels
3045 )
3047 def reinitialize_shard_subscriptions(self):
3048 """
3049 Reconcile per-node shard subscriptions against the cluster's current
3050 slot ownership map. For each tracked shard channel whose owning node
3051 has changed (e.g. after CLUSTER SETSLOT / failover), sunsubscribe on
3052 the old node's pubsub and ssubscribe on the new owner's pubsub,
3053 preserving any registered handler.
3054 """
3055 uncovered: list = []
3056 made_progress = False
3057 first_migrate_error: Optional[BaseException] = None
3058 with self._shard_state_lock:
3059 for channel, handler in list(self.shard_channels.items()):
3060 try:
3061 new_node = self.cluster.get_node_from_key(channel)
3062 except SlotNotCoveredError:
3063 # Slot is transiently uncovered (mid-migration / partial
3064 # topology refresh). Defer this channel so coverable
3065 # siblings still reconcile this pass; we surface the
3066 # error below so the caller (and logs) know not every
3067 # channel was reconciled. Retry happens on the next
3068 # slots-cache change notification.
3069 uncovered.append(channel)
3070 continue
3071 old_name = self._shard_channel_to_node.get(channel)
3072 if old_name == new_node.name:
3073 continue
3074 try:
3075 self._migrate_shard_channel(channel, handler, old_name, new_node)
3076 made_progress = True
3077 except (ConnectionError, TimeoutError, OSError) as e:
3078 # Transient connectivity error while subscribing on the
3079 # new owner (or unsubscribing on the old owner if its
3080 # handler chose to re-raise). Do not abort reconciliation
3081 # for sibling channels: _shard_channel_to_node was not
3082 # advanced for this channel, so the next slots-cache
3083 # change notification will retry it.
3084 logger.warning(
3085 "shard channel %r migration deferred: %s: %s",
3086 channel,
3087 type(e).__name__,
3088 e,
3089 )
3090 if first_migrate_error is None:
3091 first_migrate_error = e
3092 continue
3093 # Garbage-collect per-node pubsubs that no longer hold any
3094 # subscription so their connections are released.
3095 for name, pubsub in list(self.node_pubsub_mapping.items()):
3096 if not pubsub.subscribed:
3097 try:
3098 pubsub.reset()
3099 except Exception:
3100 pass
3101 self.node_pubsub_mapping.pop(name, None)
3102 if uncovered:
3103 # Surface the uncovered channels so the caller (and observer
3104 # notification path) knows reconciliation was incomplete. All
3105 # coverable siblings have already been migrated above.
3106 raise SlotNotCoveredError(
3107 f"{len(uncovered)} shard channel(s) left unreconciled; "
3108 f"slot(s) not covered by the cluster: {uncovered!r}"
3109 )
3110 if first_migrate_error is not None and not made_progress:
3111 # Every migration attempted in this pass failed transiently and
3112 # nothing else made progress. Re-raise the first caught error
3113 # (typically the root cause; later failures are often downstream
3114 # symptoms of the same unreachable node) so the worker's done-
3115 # callback surfaces a single representative failure through the
3116 # same logger channel used for SlotNotCoveredError. Per-channel
3117 # WARNINGs above preserve the full forensic detail.
3118 raise first_migrate_error
3120 def _migrate_shard_channel(self, channel, handler, old_name, new_node):
3121 # Detach from the old per-node pubsub, best-effort: the old node may
3122 # already be unreachable during migration / failover.
3123 if old_name and old_name in self.node_pubsub_mapping:
3124 old_pubsub = self.node_pubsub_mapping[old_name]
3125 try:
3126 old_pubsub.sunsubscribe(channel)
3127 except (ConnectionError, TimeoutError, OSError):
3128 # redis-py's Connection has already called ``disconnect()``
3129 # before raising (see Connection.read_response /
3130 # send_packed_command with ``disconnect_on_error=True``),
3131 # so ``old_pubsub``'s dedicated socket is gone. Two cases:
3132 #
3133 # 1. The old node is no longer in the cluster topology
3134 # (e.g. removed by failover / topology refresh): no
3135 # reconnect target exists, so ``old_pubsub.subscribed``
3136 # would stay True forever and the end-of-pass GC block
3137 # would skip it. Drop it eagerly so the round-robin
3138 # generator does not keep yielding a dead pubsub that
3139 # produces periodic errors from ``get_sharded_message``.
3140 # 2. The old node is still known (transiently slow /
3141 # unreachable): ``PubSub._execute`` auto-reconnects and
3142 # ``on_connect`` re-subscribes to remaining channels,
3143 # so other subscriptions on the same pubsub recover
3144 # naturally. Leave it alone.
3145 if self.cluster.get_node(node_name=old_name) is None:
3146 try:
3147 old_pubsub.reset()
3148 except Exception:
3149 pass
3150 self.node_pubsub_mapping.pop(old_name, None)
3151 # Attach to the new per-node pubsub, preserving the handler. Decode to
3152 # a text key only when we must pass it as a kwarg (handler present).
3153 new_pubsub = self._get_node_pubsub(new_node)
3154 if handler:
3155 decoded = (
3156 self.encoder.decode(channel, force=True)
3157 if isinstance(channel, (bytes, bytearray))
3158 else channel
3159 )
3160 new_pubsub.ssubscribe(**{decoded: handler})
3161 else:
3162 new_pubsub.ssubscribe(channel)
3163 self.shard_channels.update(new_pubsub.shard_channels)
3164 normalized_key = next(iter(self._normalize_keys({channel: None})))
3165 self._shard_channel_to_node[normalized_key] = new_node.name
3166 self.pending_unsubscribe_shard_channels.difference_update(
3167 self._normalize_keys({channel: None})
3168 )
3169 if new_pubsub.subscribed and not self.subscribed:
3170 self.subscribed_event.set()
3171 self.health_check_response_counter = 0
3173 def on_slots_changed(self):
3174 # Observer hook invoked by NodesManager after a slots-cache refresh.
3175 # Schedule reconciliation on a dedicated worker thread so the caller
3176 # (typically MovedError handling in _execute_command or the topology
3177 # refresh thread in initialize()) is not blocked on the network I/O
3178 # performed by reinitialize_shard_subscriptions. Mirrors the async
3179 # path's asyncio.create_task model. No-op when there are no shard
3180 # subscriptions to reconcile.
3181 if not self.shard_channels:
3182 return
3183 # Serialize lazy executor creation and submission against concurrent
3184 # on_slots_changed calls (EventDispatcher releases its lock before
3185 # invoking listeners, so two MovedError-handling threads can land
3186 # here at once) and against reset() which tears the executor down.
3187 # Without this, two threads could each create a ThreadPoolExecutor
3188 # and one would be orphaned (leaking its worker thread); a reset()
3189 # interleaved between create and submit() could also raise
3190 # RuntimeError("cannot schedule new futures after shutdown").
3191 with self._shard_state_lock:
3192 if self._reconcile_executor is None:
3193 self._reconcile_executor = ThreadPoolExecutor(
3194 max_workers=1,
3195 thread_name_prefix="redis-cluster-pubsub-reconcile",
3196 )
3197 future = self._reconcile_executor.submit(
3198 self.reinitialize_shard_subscriptions
3199 )
3200 self._reconcile_futures.add(future)
3201 future.add_done_callback(self._discard_reconcile_future)
3202 # Consume the future's exception (if any) so it is not silently lost.
3203 # reinitialize_shard_subscriptions surfaces SlotNotCoveredError when
3204 # a slot is still transiently uncovered; route it through the same
3205 # logger channel as the async path for consistent observability.
3206 future.add_done_callback(self._log_reconcile_future_exception)
3208 def _discard_reconcile_future(self, future: "Future") -> None:
3209 # Done-callback fires on the worker thread. Take _shard_state_lock so
3210 # the discard observes the same mutual-exclusion discipline as the
3211 # add() / clear() sites; without it the set mutation is correct only
3212 # because of CPython's GIL and would race under free-threaded builds.
3213 with self._shard_state_lock:
3214 self._reconcile_futures.discard(future)
3216 @staticmethod
3217 def _log_reconcile_future_exception(future: "Future") -> None:
3218 if future.cancelled():
3219 return
3220 exc = future.exception()
3221 if exc is not None:
3222 logger.error(
3223 "shard subscription reconciliation failed: %r", exc, exc_info=exc
3224 )
3226 def reset(self) -> None:
3227 # Hold _shard_state_lock across the entire teardown so it observes
3228 # the same mutual-exclusion discipline as ssubscribe / sunsubscribe /
3229 # get_sharded_message / reinitialize_shard_subscriptions, which all
3230 # mutate shard_channels, _shard_channel_to_node, and
3231 # node_pubsub_mapping under this lock. Without it, super().reset()
3232 # rebinds shard_channels and pending_unsubscribe_shard_channels in
3233 # parallel with a concurrent user-thread mutation, silently dropping
3234 # subscription intent. cancel_futures drops queued reconciliation
3235 # work; the currently-running task (if any) is already serialized
3236 # against us by this same lock - shutdown(wait=False) avoids waiting
3237 # on the worker thread's join, not on its critical section.
3238 with self._shard_state_lock:
3239 if self._reconcile_executor is not None:
3240 self._reconcile_executor.shutdown(wait=False, cancel_futures=True)
3241 self._reconcile_executor = None
3242 self._reconcile_futures.clear()
3243 # Tear down per-node pubsubs (parity with async aclose) so they
3244 # don't leak their dedicated connections and don't replay stale
3245 # shard_channels via PubSub.on_connect on a subsequent reconnect.
3246 # Errors are swallowed because reset() is also a fallback path
3247 # from __del__; we cannot let one buggy per-node pubsub mask the
3248 # rest of the teardown.
3249 for pubsub in self.node_pubsub_mapping.values():
3250 try:
3251 pubsub.reset()
3252 except Exception:
3253 pass
3254 # Drop the now-dead per-node pubsubs from the mapping so the
3255 # round-robin in _pubsubs_generator / _sharded_message_generator
3256 # cannot yield them between teardown and re-subscription.
3257 self.node_pubsub_mapping.clear()
3258 # _pubsubs_generator captures node_pubsub_mapping.values() into
3259 # a local list inside ``yield from``; clearing the mapping does
3260 # not reach references already held by that captured snapshot,
3261 # so a generator suspended mid-yield-from would still surface
3262 # the now-reset() per-node pubsubs after re-subscription.
3263 # Recreate it to drop the captured list. type(self) bypasses
3264 # the instance-level self-shadow established at __init__
3265 # (self._pubsubs_generator = self._pubsubs_generator()).
3266 self._pubsubs_generator = type(self)._pubsubs_generator(self)
3267 super().reset()
3268 self._shard_channel_to_node = {}
3270 def get_redis_connection(self):
3271 """
3272 Get the Redis connection of the pubsub connected node.
3273 """
3274 if self.node is not None:
3275 return self.node.redis_connection
3277 def disconnect(self):
3278 """
3279 Disconnect the pubsub connection.
3280 """
3281 if self.connection:
3282 self.connection.disconnect()
3283 for pubsub in self.node_pubsub_mapping.values():
3284 if pubsub.connection:
3285 pubsub.connection.disconnect()
3288class ClusterPipeline(RedisCluster):
3289 """
3290 Support for Redis pipeline
3291 in cluster mode
3292 """
3294 ERRORS_ALLOW_RETRY = (
3295 ConnectionError,
3296 TimeoutError,
3297 MovedError,
3298 AskError,
3299 TryAgainError,
3300 )
3302 NO_SLOTS_COMMANDS = {"UNWATCH"}
3303 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
3304 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3306 @deprecated_args(
3307 args_to_warn=[
3308 "cluster_error_retry_attempts",
3309 ],
3310 reason="Please configure the 'retry' object instead",
3311 version="6.0.0",
3312 )
3313 def __init__(
3314 self,
3315 nodes_manager: "NodesManager",
3316 commands_parser: "CommandsParser",
3317 result_callbacks: Optional[Dict[str, Callable]] = None,
3318 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
3319 startup_nodes: Optional[List["ClusterNode"]] = None,
3320 read_from_replicas: bool = False,
3321 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
3322 cluster_error_retry_attempts: int = 3,
3323 reinitialize_steps: int = 5,
3324 retry: Optional[Retry] = None,
3325 lock=None,
3326 transaction=False,
3327 policy_resolver: PolicyResolver = StaticPolicyResolver(),
3328 event_dispatcher: Optional["EventDispatcher"] = None,
3329 **kwargs,
3330 ):
3331 """ """
3332 self.command_stack = []
3333 self.nodes_manager = nodes_manager
3334 self.commands_parser = commands_parser
3335 self.refresh_table_asap = False
3336 self.result_callbacks = (
3337 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
3338 )
3339 self.startup_nodes = startup_nodes if startup_nodes else []
3340 self.read_from_replicas = read_from_replicas
3341 self.load_balancing_strategy = load_balancing_strategy
3342 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
3343 self.cluster_response_callbacks = cluster_response_callbacks
3344 self.reinitialize_counter = 0
3345 self.reinitialize_steps = reinitialize_steps
3346 if retry is not None:
3347 self.retry = retry
3348 else:
3349 self.retry = Retry(
3350 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
3351 retries=cluster_error_retry_attempts,
3352 )
3354 self.encoder = Encoder(
3355 kwargs.get("encoding", "utf-8"),
3356 kwargs.get("encoding_errors", "strict"),
3357 kwargs.get("decode_responses", False),
3358 )
3359 if lock is None:
3360 lock = threading.RLock()
3361 self._lock = lock
3362 self.parent_execute_command = super().execute_command
3363 self._execution_strategy: ExecutionStrategy = (
3364 PipelineStrategy(self) if not transaction else TransactionStrategy(self)
3365 )
3367 # For backward compatibility, mapping from existing policies to new one
3368 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
3369 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
3370 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
3371 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
3372 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
3373 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
3374 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
3375 }
3377 self._policies_callback_mapping: dict[
3378 Union[RequestPolicy, ResponsePolicy], Callable
3379 ] = {
3380 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
3381 self.get_random_primary_or_all_nodes(command_name)
3382 ],
3383 RequestPolicy.DEFAULT_KEYED: lambda command,
3384 *args: self.get_nodes_from_slot(command, *args),
3385 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
3386 RequestPolicy.ALL_SHARDS: self.get_primaries,
3387 RequestPolicy.ALL_NODES: self.get_nodes,
3388 RequestPolicy.ALL_REPLICAS: self.get_replicas,
3389 RequestPolicy.MULTI_SHARD: lambda *args,
3390 **kwargs: self._split_multi_shard_command(*args, **kwargs),
3391 RequestPolicy.SPECIAL: self.get_special_nodes,
3392 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
3393 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
3394 }
3396 self._policy_resolver = policy_resolver
3398 if event_dispatcher is None:
3399 self._event_dispatcher = EventDispatcher()
3400 else:
3401 self._event_dispatcher = event_dispatcher
3403 def __repr__(self):
3404 """ """
3405 return f"{type(self).__name__}"
3407 def __enter__(self):
3408 """ """
3409 return self
3411 def __exit__(self, exc_type, exc_value, traceback):
3412 """ """
3413 self.reset()
3415 def __del__(self):
3416 try:
3417 self.reset()
3418 except Exception:
3419 pass
3421 def __len__(self):
3422 """ """
3423 return len(self._execution_strategy.command_queue)
3425 def __bool__(self):
3426 "Pipeline instances should always evaluate to True on Python 3+"
3427 return True
3429 def execute_command(self, *args, **kwargs):
3430 """
3431 Wrapper function for pipeline_execute_command
3432 """
3433 return self._execution_strategy.execute_command(*args, **kwargs)
3435 def pipeline_execute_command(self, *args, **options):
3436 """
3437 Stage a command to be executed when execute() is next called
3439 Returns the current Pipeline object back so commands can be
3440 chained together, such as:
3442 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
3444 At some other point, you can then run: pipe.execute(),
3445 which will execute all commands queued in the pipe.
3446 """
3447 return self._execution_strategy.execute_command(*args, **options)
3449 def annotate_exception(self, exception, number, command):
3450 """
3451 Provides extra context to the exception prior to it being handled
3452 """
3453 self._execution_strategy.annotate_exception(exception, number, command)
3455 def execute(self, raise_on_error: bool = True) -> List[Any]:
3456 """
3457 Execute all the commands in the current pipeline
3458 """
3460 try:
3461 return self._execution_strategy.execute(raise_on_error)
3462 finally:
3463 self.reset()
3465 def reset(self):
3466 """
3467 Reset back to empty pipeline.
3468 """
3469 self._execution_strategy.reset()
3471 def send_cluster_commands(
3472 self, stack, raise_on_error=True, allow_redirections=True
3473 ):
3474 return self._execution_strategy.send_cluster_commands(
3475 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
3476 )
3478 def exists(self, *keys):
3479 return self._execution_strategy.exists(*keys)
3481 def eval(self):
3482 """ """
3483 return self._execution_strategy.eval()
3485 def multi(self):
3486 """
3487 Start a transactional block of the pipeline after WATCH commands
3488 are issued. End the transactional block with `execute`.
3489 """
3490 self._execution_strategy.multi()
3492 def load_scripts(self):
3493 """ """
3494 self._execution_strategy.load_scripts()
3496 def discard(self):
3497 """ """
3498 self._execution_strategy.discard()
3500 def watch(self, *names):
3501 """Watches the values at keys ``names``"""
3502 self._execution_strategy.watch(*names)
3504 def unwatch(self):
3505 """Unwatches all previously specified keys"""
3506 self._execution_strategy.unwatch()
3508 def script_load_for_pipeline(self, *args, **kwargs):
3509 self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
3511 def delete(self, *names):
3512 self._execution_strategy.delete(*names)
3514 def unlink(self, *names):
3515 self._execution_strategy.unlink(*names)
3518def block_pipeline_command(name: str) -> Callable[..., Any]:
3519 """
3520 Prints error because some pipelined commands should
3521 be blocked when running in cluster-mode
3522 """
3524 def inner(*args, **kwargs):
3525 raise RedisClusterException(
3526 f"ERROR: Calling pipelined function {name} is blocked "
3527 f"when running redis in cluster mode..."
3528 )
3530 return inner
3533# Blocked pipeline commands
3534PIPELINE_BLOCKED_COMMANDS = (
3535 "BGREWRITEAOF",
3536 "BGSAVE",
3537 "BITOP",
3538 "BRPOPLPUSH",
3539 "CLIENT GETNAME",
3540 "CLIENT KILL",
3541 "CLIENT LIST",
3542 "CLIENT SETNAME",
3543 "CLIENT",
3544 "CONFIG GET",
3545 "CONFIG RESETSTAT",
3546 "CONFIG REWRITE",
3547 "CONFIG SET",
3548 "CONFIG",
3549 "DBSIZE",
3550 "ECHO",
3551 "EVALSHA",
3552 "FLUSHALL",
3553 "FLUSHDB",
3554 "INFO",
3555 "KEYS",
3556 "LASTSAVE",
3557 "MGET",
3558 "MGET NONATOMIC",
3559 "MOVE",
3560 "MSET",
3561 "MSETEX",
3562 "MSET NONATOMIC",
3563 "MSETNX",
3564 "PFCOUNT",
3565 "PFMERGE",
3566 "PING",
3567 "PUBLISH",
3568 "RANDOMKEY",
3569 "READONLY",
3570 "READWRITE",
3571 "RENAME",
3572 "RENAMENX",
3573 "RPOPLPUSH",
3574 "SAVE",
3575 "SCAN",
3576 "SCRIPT EXISTS",
3577 "SCRIPT FLUSH",
3578 "SCRIPT KILL",
3579 "SCRIPT LOAD",
3580 "SCRIPT",
3581 "SDIFF",
3582 "SDIFFSTORE",
3583 "SENTINEL GET MASTER ADDR BY NAME",
3584 "SENTINEL MASTER",
3585 "SENTINEL MASTERS",
3586 "SENTINEL MONITOR",
3587 "SENTINEL REMOVE",
3588 "SENTINEL SENTINELS",
3589 "SENTINEL SET",
3590 "SENTINEL SLAVES",
3591 "SENTINEL",
3592 "SHUTDOWN",
3593 "SINTER",
3594 "SINTERSTORE",
3595 "SLAVEOF",
3596 "SLOWLOG GET",
3597 "SLOWLOG LEN",
3598 "SLOWLOG RESET",
3599 "SLOWLOG",
3600 "SMOVE",
3601 "SORT",
3602 "SUNION",
3603 "SUNIONSTORE",
3604 "TIME",
3605)
3606for command in PIPELINE_BLOCKED_COMMANDS:
3607 command = command.replace(" ", "_").lower()
3609 setattr(ClusterPipeline, command, block_pipeline_command(command))
3612class PipelineCommand:
3613 """ """
3615 def __init__(self, args, options=None, position=None):
3616 self.args = args
3617 if options is None:
3618 options = {}
3619 self.options = options
3620 self.position = position
3621 self.result = None
3622 self.node = None
3623 self.asking = False
3624 self.command_policies: Optional[CommandPolicies] = None
3627class NodeCommands:
3628 """ """
3630 def __init__(
3631 self, parse_response, connection_pool: ConnectionPool, connection: Connection
3632 ):
3633 """ """
3634 self.parse_response = parse_response
3635 self.connection_pool = connection_pool
3636 self.connection = connection
3637 self.commands = []
3639 def append(self, c):
3640 """ """
3641 self.commands.append(c)
3643 def write(self):
3644 """
3645 Code borrowed from Redis so it can be fixed
3646 """
3647 connection = self.connection
3648 commands = self.commands
3650 # We are going to clobber the commands with the write, so go ahead
3651 # and ensure that nothing is sitting there from a previous run.
3652 for c in commands:
3653 c.result = None
3655 # build up all commands into a single request to increase network perf
3656 # send all the commands and catch connection and timeout errors.
3657 try:
3658 connection.send_packed_command(
3659 connection.pack_commands([c.args for c in commands])
3660 )
3661 except (ConnectionError, TimeoutError) as e:
3662 for c in commands:
3663 c.result = e
3665 def read(self):
3666 """ """
3667 connection = self.connection
3668 for c in self.commands:
3669 # if there is a result on this command,
3670 # it means we ran into an exception
3671 # like a connection error. Trying to parse
3672 # a response on a connection that
3673 # is no longer open will result in a
3674 # connection error raised by redis-py.
3675 # but redis-py doesn't check in parse_response
3676 # that the sock object is
3677 # still set and if you try to
3678 # read from a closed connection, it will
3679 # result in an AttributeError because
3680 # it will do a readline() call on None.
3681 # This can have all kinds of nasty side-effects.
3682 # Treating this case as a connection error
3683 # is fine because it will dump
3684 # the connection object back into the
3685 # pool and on the next write, it will
3686 # explicitly open the connection and all will be well.
3687 if c.result is None:
3688 try:
3689 c.result = self.parse_response(connection, c.args[0], **c.options)
3690 except (ConnectionError, TimeoutError) as e:
3691 for c in self.commands:
3692 c.result = e
3693 return
3694 except RedisError:
3695 c.result = sys.exc_info()[1]
3698class ExecutionStrategy(ABC):
3699 @property
3700 @abstractmethod
3701 def command_queue(self):
3702 pass
3704 @abstractmethod
3705 def execute_command(self, *args, **kwargs):
3706 """
3707 Execution flow for current execution strategy.
3709 See: ClusterPipeline.execute_command()
3710 """
3711 pass
3713 @abstractmethod
3714 def annotate_exception(self, exception, number, command):
3715 """
3716 Annotate exception according to current execution strategy.
3718 See: ClusterPipeline.annotate_exception()
3719 """
3720 pass
3722 @abstractmethod
3723 def pipeline_execute_command(self, *args, **options):
3724 """
3725 Pipeline execution flow for current execution strategy.
3727 See: ClusterPipeline.pipeline_execute_command()
3728 """
3729 pass
3731 @abstractmethod
3732 def execute(self, raise_on_error: bool = True) -> List[Any]:
3733 """
3734 Executes current execution strategy.
3736 See: ClusterPipeline.execute()
3737 """
3738 pass
3740 @abstractmethod
3741 def send_cluster_commands(
3742 self, stack, raise_on_error=True, allow_redirections=True
3743 ):
3744 """
3745 Sends commands according to current execution strategy.
3747 See: ClusterPipeline.send_cluster_commands()
3748 """
3749 pass
3751 @abstractmethod
3752 def reset(self):
3753 """
3754 Resets current execution strategy.
3756 See: ClusterPipeline.reset()
3757 """
3758 pass
3760 @abstractmethod
3761 def exists(self, *keys):
3762 pass
3764 @abstractmethod
3765 def eval(self):
3766 pass
3768 @abstractmethod
3769 def multi(self):
3770 """
3771 Starts transactional context.
3773 See: ClusterPipeline.multi()
3774 """
3775 pass
3777 @abstractmethod
3778 def load_scripts(self):
3779 pass
3781 @abstractmethod
3782 def watch(self, *names):
3783 pass
3785 @abstractmethod
3786 def unwatch(self):
3787 """
3788 Unwatches all previously specified keys
3790 See: ClusterPipeline.unwatch()
3791 """
3792 pass
3794 @abstractmethod
3795 def script_load_for_pipeline(self, *args, **kwargs):
3796 pass
3798 @abstractmethod
3799 def delete(self, *names):
3800 """
3801 "Delete a key specified by ``names``"
3803 See: ClusterPipeline.delete()
3804 """
3805 pass
3807 @abstractmethod
3808 def unlink(self, *names):
3809 """
3810 "Unlink a key specified by ``names``"
3812 See: ClusterPipeline.unlink()
3813 """
3814 pass
3816 @abstractmethod
3817 def discard(self):
3818 pass
3821class AbstractStrategy(ExecutionStrategy):
3822 def __init__(
3823 self,
3824 pipe: ClusterPipeline,
3825 ):
3826 self._command_queue: List[PipelineCommand] = []
3827 self._pipe = pipe
3828 self._nodes_manager = self._pipe.nodes_manager
3830 @property
3831 def command_queue(self):
3832 return self._command_queue
3834 @command_queue.setter
3835 def command_queue(self, queue: List[PipelineCommand]):
3836 self._command_queue = queue
3838 @abstractmethod
3839 def execute_command(self, *args, **kwargs):
3840 pass
3842 def pipeline_execute_command(self, *args, **options):
3843 self._command_queue.append(
3844 PipelineCommand(args, options, len(self._command_queue))
3845 )
3846 return self._pipe
3848 @abstractmethod
3849 def execute(self, raise_on_error: bool = True) -> List[Any]:
3850 pass
3852 @abstractmethod
3853 def send_cluster_commands(
3854 self, stack, raise_on_error=True, allow_redirections=True
3855 ):
3856 pass
3858 @abstractmethod
3859 def reset(self):
3860 pass
3862 def exists(self, *keys):
3863 return self.execute_command("EXISTS", *keys)
3865 def eval(self):
3866 """ """
3867 raise RedisClusterException("method eval() is not implemented")
3869 def load_scripts(self):
3870 """ """
3871 raise RedisClusterException("method load_scripts() is not implemented")
3873 def script_load_for_pipeline(self, *args, **kwargs):
3874 """ """
3875 raise RedisClusterException(
3876 "method script_load_for_pipeline() is not implemented"
3877 )
3879 def annotate_exception(self, exception, number, command):
3880 """
3881 Provides extra context to the exception prior to it being handled
3882 """
3883 cmd = " ".join(map(safe_str, command))
3884 msg = (
3885 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
3886 f"caused error: {exception.args[0]}"
3887 )
3888 exception.args = (msg,) + exception.args[1:]
3891class PipelineStrategy(AbstractStrategy):
3892 def __init__(self, pipe: ClusterPipeline):
3893 super().__init__(pipe)
3894 self.command_flags = pipe.command_flags
3896 def execute_command(self, *args, **kwargs):
3897 return self.pipeline_execute_command(*args, **kwargs)
3899 def _raise_first_error(self, stack, start_time):
3900 """
3901 Raise the first exception on the stack
3902 """
3903 for c in stack:
3904 r = c.result
3905 if isinstance(r, Exception):
3906 self.annotate_exception(r, c.position + 1, c.args)
3908 record_operation_duration(
3909 command_name="PIPELINE",
3910 duration_seconds=time.monotonic() - start_time,
3911 error=r,
3912 )
3914 raise r
3916 def execute(self, raise_on_error: bool = True) -> List[Any]:
3917 stack = self._command_queue
3918 if not stack:
3919 return []
3921 try:
3922 return self.send_cluster_commands(stack, raise_on_error)
3923 finally:
3924 self.reset()
3926 def reset(self):
3927 """
3928 Reset back to empty pipeline.
3929 """
3930 self._command_queue = []
3932 def send_cluster_commands(
3933 self, stack, raise_on_error=True, allow_redirections=True
3934 ):
3935 """
3936 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
3938 If one of the retryable exceptions has been thrown we assume that:
3939 - connection_pool was disconnected
3940 - connection_pool was reset
3941 - refresh_table_asap set to True
3943 It will try the number of times specified by
3944 the retries in config option "self.retry"
3945 which defaults to 3 unless manually configured.
3947 If it reaches the number of times, the command will
3948 raises ClusterDownException.
3949 """
3950 if not stack:
3951 return []
3952 retry_attempts = self._pipe.retry.get_retries()
3953 while True:
3954 try:
3955 return self._send_cluster_commands(
3956 stack,
3957 raise_on_error=raise_on_error,
3958 allow_redirections=allow_redirections,
3959 )
3960 except RedisCluster.ERRORS_ALLOW_RETRY as e:
3961 if retry_attempts > 0:
3962 # Try again with the new cluster setup. All other errors
3963 # should be raised.
3964 retry_attempts -= 1
3965 pass
3966 else:
3967 raise e
3969 def _send_cluster_commands(
3970 self, stack, raise_on_error=True, allow_redirections=True
3971 ):
3972 """
3973 Send a bunch of cluster commands to the redis cluster.
3975 `allow_redirections` If the pipeline should follow
3976 `ASK` & `MOVED` responses automatically. If set
3977 to false it will raise RedisClusterException.
3978 """
3979 # the first time sending the commands we send all of
3980 # the commands that were queued up.
3981 # if we have to run through it again, we only retry
3982 # the commands that failed.
3983 attempt = sorted(stack, key=lambda x: x.position)
3984 is_default_node = False
3985 # build a list of node objects based on node names we need to
3986 nodes: dict[str, NodeCommands] = {}
3987 nodes_written = 0
3988 nodes_read = 0
3990 try:
3991 # as we move through each command that still needs to be processed,
3992 # we figure out the slot number that command maps to, then from
3993 # the slot determine the node.
3994 for c in attempt:
3995 command_policies = self._pipe._policy_resolver.resolve(
3996 c.args[0].lower()
3997 )
3998 # refer to our internal node -> slot table that
3999 # tells us where a given command should route to.
4000 # (it might be possible we have a cached node that no longer
4001 # exists in the cluster, which is why we do this in a loop)
4002 passed_targets = c.options.pop("target_nodes", None)
4003 if passed_targets and not self._is_nodes_flag(passed_targets):
4004 target_nodes = self._parse_target_nodes(passed_targets)
4006 if not command_policies:
4007 command_policies = CommandPolicies()
4008 else:
4009 if not command_policies:
4010 command = c.args[0].upper()
4011 if (
4012 len(c.args) >= 2
4013 and f"{c.args[0]} {c.args[1]}".upper()
4014 in self._pipe.command_flags
4015 ):
4016 command = f"{c.args[0]} {c.args[1]}".upper()
4018 # We only could resolve key properties if command is not
4019 # in a list of pre-defined request policies
4020 command_flag = self.command_flags.get(command)
4021 if not command_flag:
4022 # Fallback to default policy
4023 if not self._pipe.get_default_node():
4024 keys = None
4025 else:
4026 keys = self._pipe._get_command_keys(*c.args)
4027 if not keys or len(keys) == 0:
4028 command_policies = CommandPolicies()
4029 else:
4030 command_policies = CommandPolicies(
4031 request_policy=RequestPolicy.DEFAULT_KEYED,
4032 response_policy=ResponsePolicy.DEFAULT_KEYED,
4033 )
4034 else:
4035 if command_flag in self._pipe._command_flags_mapping:
4036 command_policies = CommandPolicies(
4037 request_policy=self._pipe._command_flags_mapping[
4038 command_flag
4039 ]
4040 )
4041 else:
4042 command_policies = CommandPolicies()
4044 target_nodes = self._determine_nodes(
4045 *c.args,
4046 request_policy=command_policies.request_policy,
4047 node_flag=passed_targets,
4048 )
4049 if not target_nodes:
4050 raise RedisClusterException(
4051 f"No targets were found to execute {c.args} command on"
4052 )
4053 c.command_policies = command_policies
4054 if len(target_nodes) > 1:
4055 raise RedisClusterException(
4056 f"Too many targets for command {c.args}"
4057 )
4059 node = target_nodes[0]
4060 if node == self._pipe.get_default_node():
4061 is_default_node = True
4063 # now that we know the name of the node
4064 # ( it's just a string in the form of host:port )
4065 # we can build a list of commands for each node.
4066 node_name = node.name
4067 if node_name not in nodes:
4068 redis_node = self._pipe.get_redis_connection(node)
4069 try:
4070 connection = get_connection(redis_node)
4071 except (ConnectionError, TimeoutError):
4072 # Release any connections we've already acquired before clearing nodes
4073 for n in nodes.values():
4074 n.connection_pool.release(n.connection)
4075 # Connection retries are being handled in the node's
4076 # Retry object. Reinitialize the node -> slot table.
4077 self._nodes_manager.initialize()
4078 if is_default_node:
4079 self._pipe.replace_default_node()
4080 nodes = {}
4081 raise
4082 nodes[node_name] = NodeCommands(
4083 redis_node.parse_response,
4084 redis_node.connection_pool,
4085 connection,
4086 )
4087 nodes[node_name].append(c)
4089 # send the commands in sequence.
4090 # we write to all the open sockets for each node first,
4091 # before reading anything
4092 # this allows us to flush all the requests out across the
4093 # network
4094 # so that we can read them from different sockets as they come back.
4095 # we don't multiplex on the sockets as they come available,
4096 # but that shouldn't make too much difference.
4098 # Start timing for observability
4099 start_time = time.monotonic()
4101 node_commands = nodes.values()
4102 for n in node_commands:
4103 nodes_written += 1
4104 n.write()
4106 for n in node_commands:
4107 n.read()
4109 # Find the first error in this node's commands, if any
4110 node_error = None
4111 for cmd in n.commands:
4112 if isinstance(cmd.result, Exception):
4113 node_error = cmd.result
4114 break
4116 record_operation_duration(
4117 command_name="PIPELINE",
4118 duration_seconds=time.monotonic() - start_time,
4119 server_address=n.connection.host,
4120 server_port=n.connection.port,
4121 db_namespace=str(n.connection.db),
4122 error=node_error,
4123 )
4124 nodes_read += 1
4125 finally:
4126 # release all the redis connections we allocated earlier
4127 # back into the connection pool.
4128 # if the connection is dirty (that is: we've written
4129 # commands to it, but haven't read the responses), we need
4130 # to close the connection before returning it to the pool.
4131 # otherwise, the next caller to use this connection will
4132 # read the response from _this_ request, not its own request.
4133 # disconnecting discards the dirty state & forces the next
4134 # caller to reconnect.
4135 # NOTE: dicts have a consistent ordering; we're iterating
4136 # through nodes.values() in the same order as we are when
4137 # reading / writing to the connections above, which is critical
4138 # for how we're using the nodes_written/nodes_read offsets.
4139 for i, n in enumerate(nodes.values()):
4140 if i < nodes_written and i >= nodes_read:
4141 n.connection.disconnect()
4142 n.connection_pool.release(n.connection)
4144 # if the response isn't an exception it is a
4145 # valid response from the node
4146 # we're all done with that command, YAY!
4147 # if we have more commands to attempt, we've run into problems.
4148 # collect all the commands we are allowed to retry.
4149 # (MOVED, ASK, or connection errors or timeout errors)
4150 attempt = sorted(
4151 (
4152 c
4153 for c in attempt
4154 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
4155 ),
4156 key=lambda x: x.position,
4157 )
4158 if attempt and allow_redirections:
4159 # RETRY MAGIC HAPPENS HERE!
4160 # send these remaining commands one at a time using `execute_command`
4161 # in the main client. This keeps our retry logic
4162 # in one place mostly,
4163 # and allows us to be more confident in correctness of behavior.
4164 # at this point any speed gains from pipelining have been lost
4165 # anyway, so we might as well make the best
4166 # attempt to get the correct behavior.
4167 #
4168 # The client command will handle retries for each
4169 # individual command sequentially as we pass each
4170 # one into `execute_command`. Any exceptions
4171 # that bubble out should only appear once all
4172 # retries have been exhausted.
4173 #
4174 # If a lot of commands have failed, we'll be setting the
4175 # flag to rebuild the slots table from scratch.
4176 # So MOVED errors should correct themselves fairly quickly.
4177 self._pipe.reinitialize_counter += 1
4178 if self._pipe._should_reinitialized():
4179 self._nodes_manager.initialize()
4180 if is_default_node:
4181 self._pipe.replace_default_node()
4182 for c in attempt:
4183 try:
4184 # send each command individually like we
4185 # do in the main client.
4186 c.result = self._pipe.parent_execute_command(*c.args, **c.options)
4187 except RedisError as e:
4188 c.result = e
4190 # turn the response back into a simple flat array that corresponds
4191 # to the sequence of commands issued in the stack in pipeline.execute()
4192 response = []
4193 for c in sorted(stack, key=lambda x: x.position):
4194 if c.args[0] in self._pipe.cluster_response_callbacks:
4195 # Remove keys entry, it needs only for cache.
4196 c.options.pop("keys", None)
4197 c.result = self._pipe._policies_callback_mapping[
4198 c.command_policies.response_policy
4199 ](
4200 self._pipe.cluster_response_callbacks[c.args[0]](
4201 c.result, **c.options
4202 )
4203 )
4204 response.append(c.result)
4206 if raise_on_error:
4207 self._raise_first_error(stack, start_time)
4209 return response
4211 def _is_nodes_flag(self, target_nodes):
4212 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
4214 def _parse_target_nodes(self, target_nodes):
4215 if isinstance(target_nodes, list):
4216 nodes = target_nodes
4217 elif isinstance(target_nodes, ClusterNode):
4218 # Supports passing a single ClusterNode as a variable
4219 nodes = [target_nodes]
4220 elif isinstance(target_nodes, dict):
4221 # Supports dictionaries of the format {node_name: node}.
4222 # It enables to execute commands with multi nodes as follows:
4223 # rc.cluster_save_config(rc.get_primaries())
4224 nodes = target_nodes.values()
4225 else:
4226 raise TypeError(
4227 "target_nodes type can be one of the following: "
4228 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
4229 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
4230 f"The passed type is {type(target_nodes)}"
4231 )
4232 return nodes
4234 def _determine_nodes(
4235 self, *args, request_policy: RequestPolicy, **kwargs
4236 ) -> List["ClusterNode"]:
4237 # Determine which nodes should be executed the command on.
4238 # Returns a list of target nodes.
4239 command = args[0].upper()
4240 if (
4241 len(args) >= 2
4242 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
4243 ):
4244 command = f"{args[0]} {args[1]}".upper()
4246 nodes_flag = kwargs.pop("nodes_flag", None)
4247 if nodes_flag is not None:
4248 # nodes flag passed by the user
4249 command_flag = nodes_flag
4250 else:
4251 # get the nodes group for this command if it was predefined
4252 command_flag = self._pipe.command_flags.get(command)
4254 if command_flag in self._pipe._command_flags_mapping:
4255 request_policy = self._pipe._command_flags_mapping[command_flag]
4257 policy_callback = self._pipe._policies_callback_mapping[request_policy]
4259 if request_policy == RequestPolicy.DEFAULT_KEYED:
4260 nodes = policy_callback(command, *args)
4261 elif request_policy == RequestPolicy.MULTI_SHARD:
4262 nodes = policy_callback(*args, **kwargs)
4263 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
4264 nodes = policy_callback(args[0])
4265 else:
4266 nodes = policy_callback()
4268 if args[0].lower() == "ft.aggregate":
4269 self._aggregate_nodes = nodes
4271 return nodes
4273 def multi(self):
4274 raise RedisClusterException(
4275 "method multi() is not supported outside of transactional context"
4276 )
4278 def discard(self):
4279 raise RedisClusterException(
4280 "method discard() is not supported outside of transactional context"
4281 )
4283 def watch(self, *names):
4284 raise RedisClusterException(
4285 "method watch() is not supported outside of transactional context"
4286 )
4288 def unwatch(self, *names):
4289 raise RedisClusterException(
4290 "method unwatch() is not supported outside of transactional context"
4291 )
4293 def delete(self, *names):
4294 if len(names) != 1:
4295 raise RedisClusterException(
4296 "deleting multiple keys is not implemented in pipeline command"
4297 )
4299 return self.execute_command("DEL", names[0])
4301 def unlink(self, *names):
4302 if len(names) != 1:
4303 raise RedisClusterException(
4304 "unlinking multiple keys is not implemented in pipeline command"
4305 )
4307 return self.execute_command("UNLINK", names[0])
4310class TransactionStrategy(AbstractStrategy):
4311 NO_SLOTS_COMMANDS = {"UNWATCH"}
4312 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
4313 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
4314 SLOT_REDIRECT_ERRORS = (AskError, MovedError)
4315 CONNECTION_ERRORS = (
4316 ConnectionError,
4317 OSError,
4318 ClusterDownError,
4319 SlotNotCoveredError,
4320 )
4322 def __init__(self, pipe: ClusterPipeline):
4323 super().__init__(pipe)
4324 self._explicit_transaction = False
4325 self._watching = False
4326 self._pipeline_slots: Set[int] = set()
4327 self._transaction_connection: Optional[Connection] = None
4328 self._executing = False
4329 self._retry = copy(self._pipe.retry)
4330 self._retry.update_supported_errors(
4331 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
4332 )
4334 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
4335 """
4336 Find a connection for a pipeline transaction.
4338 For running an atomic transaction, watch keys ensure that contents have not been
4339 altered as long as the watch commands for those keys were sent over the same
4340 connection. So once we start watching a key, we fetch a connection to the
4341 node that owns that slot and reuse it.
4342 """
4343 if not self._pipeline_slots:
4344 raise RedisClusterException(
4345 "At least a command with a key is needed to identify a node"
4346 )
4348 node: ClusterNode = self._nodes_manager.get_node_from_slot(
4349 list(self._pipeline_slots)[0], False
4350 )
4351 redis_node: Redis = self._pipe.get_redis_connection(node)
4352 if self._transaction_connection:
4353 if not redis_node.connection_pool.owns_connection(
4354 self._transaction_connection
4355 ):
4356 previous_node = self._nodes_manager.find_connection_owner(
4357 self._transaction_connection
4358 )
4359 previous_node.connection_pool.release(self._transaction_connection)
4360 self._transaction_connection = None
4362 if not self._transaction_connection:
4363 self._transaction_connection = get_connection(redis_node)
4365 return redis_node, self._transaction_connection
4367 def execute_command(self, *args, **kwargs):
4368 slot_number: Optional[int] = None
4369 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
4370 slot_number = self._pipe.determine_slot(*args)
4372 if (
4373 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
4374 ) and not self._explicit_transaction:
4375 if args[0] == "WATCH":
4376 self._validate_watch()
4378 if slot_number is not None:
4379 if self._pipeline_slots and slot_number not in self._pipeline_slots:
4380 raise CrossSlotTransactionError(
4381 "Cannot watch or send commands on different slots"
4382 )
4384 self._pipeline_slots.add(slot_number)
4385 elif args[0] not in self.NO_SLOTS_COMMANDS:
4386 raise RedisClusterException(
4387 f"Cannot identify slot number for command: {args[0]},"
4388 "it cannot be triggered in a transaction"
4389 )
4391 return self._immediate_execute_command(*args, **kwargs)
4392 else:
4393 if slot_number is not None:
4394 self._pipeline_slots.add(slot_number)
4396 return self.pipeline_execute_command(*args, **kwargs)
4398 def _validate_watch(self):
4399 if self._explicit_transaction:
4400 raise RedisError("Cannot issue a WATCH after a MULTI")
4402 self._watching = True
4404 def _immediate_execute_command(self, *args, **options):
4405 return self._retry.call_with_retry(
4406 lambda: self._get_connection_and_send_command(*args, **options),
4407 self._reinitialize_on_error,
4408 with_failure_count=True,
4409 )
4411 def _get_connection_and_send_command(self, *args, **options):
4412 redis_node, connection = self._get_client_and_connection_for_transaction()
4414 # Start timing for observability
4415 start_time = time.monotonic()
4417 try:
4418 response = self._send_command_parse_response(
4419 connection, redis_node, args[0], *args, **options
4420 )
4422 record_operation_duration(
4423 command_name=args[0],
4424 duration_seconds=time.monotonic() - start_time,
4425 server_address=connection.host,
4426 server_port=connection.port,
4427 db_namespace=str(connection.db),
4428 )
4430 return response
4431 except Exception as e:
4432 if connection:
4433 # this is used to report the metrics based on host and port info
4434 e.connection = connection
4435 record_operation_duration(
4436 command_name=args[0],
4437 duration_seconds=time.monotonic() - start_time,
4438 server_address=connection.host,
4439 server_port=connection.port,
4440 db_namespace=str(connection.db),
4441 error=e,
4442 )
4443 raise
4445 def _send_command_parse_response(
4446 self, conn, redis_node: Redis, command_name, *args, **options
4447 ):
4448 """
4449 Send a command and parse the response
4450 """
4452 conn.send_command(*args)
4453 output = redis_node.parse_response(conn, command_name, **options)
4455 if command_name in self.UNWATCH_COMMANDS:
4456 self._watching = False
4457 return output
4459 def _reinitialize_on_error(self, error, failure_count):
4460 if hasattr(error, "connection"):
4461 record_error_count(
4462 server_address=error.connection.host,
4463 server_port=error.connection.port,
4464 network_peer_address=error.connection.host,
4465 network_peer_port=error.connection.port,
4466 error_type=error,
4467 retry_attempts=failure_count,
4468 is_internal=True,
4469 )
4471 if self._watching:
4472 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
4473 raise WatchError("Slot rebalancing occurred while watching keys")
4475 if (
4476 type(error) in self.SLOT_REDIRECT_ERRORS
4477 or type(error) in self.CONNECTION_ERRORS
4478 ):
4479 if self._transaction_connection:
4480 if is_debug_log_enabled():
4481 logger.debug(
4482 f"Operation failed, "
4483 f"with connection: {self._transaction_connection}, "
4484 f"details: {self._transaction_connection.extract_connection_details()}",
4485 )
4486 # Disconnect and release back to pool
4487 self._transaction_connection.disconnect()
4488 node = self._nodes_manager.find_connection_owner(
4489 self._transaction_connection
4490 )
4491 if node and node.redis_connection:
4492 node.redis_connection.connection_pool.release(
4493 self._transaction_connection
4494 )
4495 self._transaction_connection = None
4497 self._pipe.reinitialize_counter += 1
4498 if self._pipe._should_reinitialized():
4499 self._nodes_manager.initialize()
4500 self.reinitialize_counter = 0
4501 else:
4502 if isinstance(error, AskError):
4503 self._nodes_manager.move_slot(error)
4505 self._executing = False
4507 def _raise_first_error(self, responses, stack, start_time):
4508 """
4509 Raise the first exception on the stack
4510 """
4511 for r, cmd in zip(responses, stack):
4512 if isinstance(r, Exception):
4513 self.annotate_exception(r, cmd.position + 1, cmd.args)
4515 record_operation_duration(
4516 command_name="TRANSACTION",
4517 duration_seconds=time.monotonic() - start_time,
4518 server_address=self._transaction_connection.host,
4519 server_port=self._transaction_connection.port,
4520 db_namespace=str(self._transaction_connection.db),
4521 )
4523 raise r
4525 def execute(self, raise_on_error: bool = True) -> List[Any]:
4526 stack = self._command_queue
4527 if not stack and (not self._watching or not self._pipeline_slots):
4528 return []
4530 return self._execute_transaction_with_retries(stack, raise_on_error)
4532 def _execute_transaction_with_retries(
4533 self, stack: List["PipelineCommand"], raise_on_error: bool
4534 ):
4535 return self._retry.call_with_retry(
4536 lambda: self._execute_transaction(stack, raise_on_error),
4537 lambda error, failure_count: self._reinitialize_on_error(
4538 error, failure_count
4539 ),
4540 with_failure_count=True,
4541 )
4543 def _execute_transaction(
4544 self, stack: List["PipelineCommand"], raise_on_error: bool
4545 ):
4546 if len(self._pipeline_slots) > 1:
4547 raise CrossSlotTransactionError(
4548 "All keys involved in a cluster transaction must map to the same slot"
4549 )
4551 self._executing = True
4553 redis_node, connection = self._get_client_and_connection_for_transaction()
4555 stack = chain(
4556 [PipelineCommand(("MULTI",))],
4557 stack,
4558 [PipelineCommand(("EXEC",))],
4559 )
4560 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
4561 packed_commands = connection.pack_commands(commands)
4563 # Start timing for observability
4564 start_time = time.monotonic()
4566 connection.send_packed_command(packed_commands)
4567 errors = []
4569 # parse off the response for MULTI
4570 # NOTE: we need to handle ResponseErrors here and continue
4571 # so that we read all the additional command messages from
4572 # the socket
4573 try:
4574 redis_node.parse_response(connection, "MULTI")
4575 except ResponseError as e:
4576 self.annotate_exception(e, 0, "MULTI")
4577 errors.append(e)
4578 except self.CONNECTION_ERRORS as cluster_error:
4579 self.annotate_exception(cluster_error, 0, "MULTI")
4580 raise
4582 # and all the other commands
4583 for i, command in enumerate(self._command_queue):
4584 if EMPTY_RESPONSE in command.options:
4585 errors.append((i, command.options[EMPTY_RESPONSE]))
4586 else:
4587 try:
4588 _ = redis_node.parse_response(connection, "_")
4589 except self.SLOT_REDIRECT_ERRORS as slot_error:
4590 self.annotate_exception(slot_error, i + 1, command.args)
4591 errors.append(slot_error)
4592 except self.CONNECTION_ERRORS as cluster_error:
4593 self.annotate_exception(cluster_error, i + 1, command.args)
4594 raise
4595 except ResponseError as e:
4596 self.annotate_exception(e, i + 1, command.args)
4597 errors.append(e)
4599 response = None
4600 # parse the EXEC.
4601 try:
4602 response = redis_node.parse_response(connection, "EXEC")
4603 except ExecAbortError:
4604 if errors:
4605 raise errors[0]
4606 raise
4608 self._executing = False
4610 record_operation_duration(
4611 command_name="TRANSACTION",
4612 duration_seconds=time.monotonic() - start_time,
4613 server_address=connection.host,
4614 server_port=connection.port,
4615 db_namespace=str(connection.db),
4616 )
4618 # EXEC clears any watched keys
4619 self._watching = False
4621 if response is None:
4622 raise WatchError("Watched variable changed.")
4624 # put any parse errors into the response
4625 for i, e in errors:
4626 response.insert(i, e)
4628 if len(response) != len(self._command_queue):
4629 raise InvalidPipelineStack(
4630 "Unexpected response length for cluster pipeline EXEC."
4631 " Command stack was {} but response had length {}".format(
4632 [c.args[0] for c in self._command_queue], len(response)
4633 )
4634 )
4636 # find any errors in the response and raise if necessary
4637 if raise_on_error or len(errors) > 0:
4638 self._raise_first_error(
4639 response,
4640 self._command_queue,
4641 start_time,
4642 )
4644 # We have to run response callbacks manually
4645 data = []
4646 for r, cmd in zip(response, self._command_queue):
4647 if not isinstance(r, Exception):
4648 command_name = cmd.args[0]
4649 if command_name in self._pipe.cluster_response_callbacks:
4650 r = self._pipe.cluster_response_callbacks[command_name](
4651 r, **cmd.options
4652 )
4653 data.append(r)
4654 return data
4656 def reset(self):
4657 self._command_queue = []
4659 # make sure to reset the connection state in the event that we were
4660 # watching something
4661 if self._transaction_connection:
4662 try:
4663 if self._watching:
4664 # call this manually since our unwatch or
4665 # immediate_execute_command methods can call reset()
4666 self._transaction_connection.send_command("UNWATCH")
4667 self._transaction_connection.read_response()
4668 # we can safely return the connection to the pool here since we're
4669 # sure we're no longer WATCHing anything
4670 node = self._nodes_manager.find_connection_owner(
4671 self._transaction_connection
4672 )
4673 if node and node.redis_connection:
4674 node.redis_connection.connection_pool.release(
4675 self._transaction_connection
4676 )
4677 self._transaction_connection = None
4678 except self.CONNECTION_ERRORS:
4679 # disconnect will also remove any previous WATCHes
4680 if self._transaction_connection:
4681 self._transaction_connection.disconnect()
4682 node = self._nodes_manager.find_connection_owner(
4683 self._transaction_connection
4684 )
4685 if node and node.redis_connection:
4686 node.redis_connection.connection_pool.release(
4687 self._transaction_connection
4688 )
4689 self._transaction_connection = None
4691 # clean up the other instance attributes
4692 self._watching = False
4693 self._explicit_transaction = False
4694 self._pipeline_slots = set()
4695 self._executing = False
4697 def send_cluster_commands(
4698 self, stack, raise_on_error=True, allow_redirections=True
4699 ):
4700 raise NotImplementedError(
4701 "send_cluster_commands cannot be executed in transactional context."
4702 )
4704 def multi(self):
4705 if self._explicit_transaction:
4706 raise RedisError("Cannot issue nested calls to MULTI")
4707 if self._command_queue:
4708 raise RedisError(
4709 "Commands without an initial WATCH have already been issued"
4710 )
4711 self._explicit_transaction = True
4713 def watch(self, *names):
4714 if self._explicit_transaction:
4715 raise RedisError("Cannot issue a WATCH after a MULTI")
4717 return self.execute_command("WATCH", *names)
4719 def unwatch(self):
4720 if self._watching:
4721 return self.execute_command("UNWATCH")
4723 return True
4725 def discard(self):
4726 self.reset()
4728 def delete(self, *names):
4729 return self.execute_command("DEL", *names)
4731 def unlink(self, *names):
4732 return self.execute_command("UNLINK", *names)