Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2import random
3import socket
4import sys
5import threading
6import time
7import weakref
8from abc import ABC, abstractmethod
9from collections import OrderedDict, defaultdict
10from concurrent.futures import Future, ThreadPoolExecutor
11from copy import copy
12from enum import Enum
13from itertools import chain
14from types import MethodType
15from typing import (
16 TYPE_CHECKING,
17 Any,
18 Callable,
19 Dict,
20 List,
21 Literal,
22 Optional,
23 Set,
24 Tuple,
25 Type,
26 Union,
27)
29if TYPE_CHECKING:
30 from redis.keyspace_notifications import ClusterKeyspaceNotifications
32from redis._defaults import DEFAULT_RETRY_BASE, DEFAULT_RETRY_CAP, DEFAULT_RETRY_COUNT
33from redis._parsers import CommandsParser, Encoder
34from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy
35from redis._parsers.helpers import parse_scan
36from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
37from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
38from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
39from redis.commands import READ_COMMANDS, RedisClusterCommands
40from redis.commands.helpers import list_or_args, parse_pubsub_subscriptions
41from redis.commands.policies import PolicyResolver, StaticPolicyResolver
42from redis.connection import (
43 Connection,
44 ConnectionPool,
45 parse_url,
46)
47from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
48from redis.event import (
49 AfterPooledConnectionsInstantiationEvent,
50 AfterPubSubConnectionInstantiationEvent,
51 AfterSlotsCacheRefreshEvent,
52 ClientType,
53 EventDispatcher,
54 EventListenerInterface,
55)
56from redis.exceptions import (
57 AskError,
58 AuthenticationError,
59 ClusterDownError,
60 ClusterError,
61 ConnectionError,
62 CrossSlotTransactionError,
63 DataError,
64 ExecAbortError,
65 InvalidPipelineStack,
66 MaxConnectionsError,
67 MovedError,
68 RedisClusterException,
69 RedisError,
70 ResponseError,
71 SlotNotCoveredError,
72 TimeoutError,
73 TryAgainError,
74 WatchError,
75)
76from redis.lock import Lock
77from redis.maint_notifications import (
78 MaintNotificationsConfig,
79 OSSMaintNotificationsHandler,
80)
81from redis.observability.recorder import (
82 record_error_count,
83 record_operation_duration,
84)
85from redis.retry import Retry
86from redis.typing import ChannelT, PubSubHandler, Subscription
87from redis.utils import (
88 check_protocol_version,
89 deprecated_args,
90 deprecated_function,
91 dict_merge,
92 list_keys_to_dict,
93 merge_result,
94 safe_str,
95 str_if_bytes,
96 truncate_text,
97)
99logger = logging.getLogger(__name__)
102def is_debug_log_enabled():
103 return logger.isEnabledFor(logging.DEBUG)
106def get_node_name(host: str, port: Union[str, int]) -> str:
107 return f"{host}:{port}"
110@deprecated_args(
111 allowed_args=["redis_node"],
112 reason="Use get_connection(redis_node) instead",
113 version="5.3.0",
114)
115def get_connection(redis_node: Redis, *args, **options) -> Connection:
116 return redis_node.connection or redis_node.connection_pool.get_connection()
119def parse_scan_result(command, res, **options):
120 cursors = {}
121 ret = []
122 for node_name, response in res.items():
123 cursor, r = parse_scan(response, **options)
124 cursors[node_name] = cursor
125 ret += r
127 return cursors, ret
130def parse_pubsub_numsub(command, res, **options):
131 numsub_d = OrderedDict()
132 for numsub_tups in res.values():
133 for channel, numsubbed in numsub_tups:
134 try:
135 numsub_d[channel] += numsubbed
136 except KeyError:
137 numsub_d[channel] = numsubbed
139 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
140 return ret_numsub
143def parse_cluster_slots(
144 resp: Any, **options: Any
145) -> Dict[Tuple[int, int], Dict[str, Any]]:
146 current_host = options.get("current_host", "")
148 def fix_server(*args: Any) -> Tuple[str, Any]:
149 return str_if_bytes(args[0]) or current_host, args[1]
151 slots = {}
152 for slot in resp:
153 start, end, primary = slot[:3]
154 replicas = slot[3:]
155 slots[start, end] = {
156 "primary": fix_server(*primary),
157 "replicas": [fix_server(*replica) for replica in replicas],
158 }
160 return slots
163def parse_cluster_shards(resp, **options):
164 """
165 Parse CLUSTER SHARDS response.
166 """
167 if isinstance(resp[0], dict):
168 return resp
169 shards = []
170 for x in resp:
171 shard = {"slots": [], "nodes": []}
172 for i in range(0, len(x[1]), 2):
173 shard["slots"].append((x[1][i], (x[1][i + 1])))
174 nodes = x[3]
175 for node in nodes:
176 dict_node = {}
177 for i in range(0, len(node), 2):
178 dict_node[node[i]] = node[i + 1]
179 shard["nodes"].append(dict_node)
180 shards.append(shard)
182 return shards
185def parse_cluster_shards_with_str_keys(resp, **options):
186 """
187 Parse CLUSTER SHARDS with string top-level structural keys.
189 RESP2 parsing exposes top-level shard keys as ``"slots"``/``"nodes"``
190 while node attribute keys keep the connection's decoded/raw form. RESP3 can
191 return top-level shard dictionaries directly, so normalize only the
192 structural shard keys and preserve nested node dictionaries as delivered.
193 """
194 if not resp:
195 return resp
196 if not isinstance(resp[0], dict):
197 return parse_cluster_shards(resp, **options)
199 shards = []
200 for shard_resp in resp:
201 slots = shard_resp.get(b"slots", shard_resp.get("slots", []))
202 nodes = shard_resp.get(b"nodes", shard_resp.get("nodes", []))
203 shard = {
204 "slots": [
205 tuple(slot) if isinstance(slot, list) else slot for slot in slots
206 ],
207 "nodes": [dict(node) if isinstance(node, dict) else node for node in nodes],
208 }
209 shards.append(shard)
210 return shards
213def parse_cluster_shards_unified(resp, **options):
214 """
215 Parse CLUSTER SHARDS into the approved unified shape.
217 Top-level shard keys and nested node attribute keys are strings for both
218 RESP2 and RESP3 wire responses.
219 """
220 if not resp:
221 return resp
222 if isinstance(resp[0], dict):
223 shards = []
224 for shard_resp in resp:
225 slots = shard_resp.get(b"slots", shard_resp.get("slots", []))
226 nodes = shard_resp.get(b"nodes", shard_resp.get("nodes", []))
227 shard = {
228 "slots": slots,
229 "nodes": [
230 {str_if_bytes(k): v for k, v in node.items()}
231 if isinstance(node, dict)
232 else node
233 for node in nodes
234 ],
235 }
236 shards.append(shard)
237 return shards
239 shards = []
240 for x in resp:
241 shard = {"slots": [], "nodes": []}
242 for i in range(0, len(x[1]), 2):
243 shard["slots"].append((x[1][i], x[1][i + 1]))
244 nodes = x[3]
245 for node in nodes:
246 dict_node = {}
247 for i in range(0, len(node), 2):
248 dict_node[str_if_bytes(node[i])] = node[i + 1]
249 shard["nodes"].append(dict_node)
250 shards.append(shard)
251 return shards
254def parse_cluster_myshardid(resp, **options):
255 """
256 Parse CLUSTER MYSHARDID response.
257 """
258 return resp.decode("utf-8")
261PRIMARY = "primary"
262REPLICA = "replica"
263SLOT_ID = "slot-id"
265REDIS_ALLOWED_KEYS = (
266 "connection_class",
267 "connection_pool",
268 "connection_pool_class",
269 "client_name",
270 "credential_provider",
271 "db",
272 "decode_responses",
273 "encoding",
274 "encoding_errors",
275 "host",
276 "driver_info",
277 "lib_name",
278 "lib_version",
279 "max_connections",
280 "nodes_flag",
281 "redis_connect_func",
282 "password",
283 "port",
284 "timeout",
285 "queue_class",
286 "retry",
287 "retry_on_timeout",
288 "protocol",
289 "legacy_responses",
290 "socket_connect_timeout",
291 "socket_keepalive",
292 "socket_keepalive_options",
293 "socket_read_size",
294 "socket_timeout",
295 "ssl",
296 "ssl_ca_certs",
297 "ssl_ca_data",
298 "ssl_ca_path",
299 "ssl_certfile",
300 "ssl_cert_reqs",
301 "ssl_include_verify_flags",
302 "ssl_exclude_verify_flags",
303 "ssl_keyfile",
304 "ssl_password",
305 "ssl_check_hostname",
306 "unix_socket_path",
307 "username",
308 "cache",
309 "cache_config",
310 "maint_notifications_config",
311)
312KWARGS_DISABLED_KEYS = ("host", "port", "retry")
315def cleanup_kwargs(**kwargs):
316 """
317 Remove unsupported or disabled keys from kwargs
318 """
319 connection_kwargs = {
320 k: v
321 for k, v in kwargs.items()
322 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
323 }
325 return connection_kwargs
328class MaintNotificationsAbstractRedisCluster:
329 """
330 Abstract class for handling maintenance notifications logic.
331 This class is expected to be used as base class together with RedisCluster.
333 This class is intended to be used with multiple inheritance!
335 All logic related to maintenance notifications is encapsulated in this class.
336 """
338 def __init__(
339 self,
340 maint_notifications_config: Optional[MaintNotificationsConfig],
341 **kwargs,
342 ):
343 # Initialize maintenance notifications
344 is_protocol_supported = check_protocol_version(kwargs.get("protocol"), 3)
346 if (
347 maint_notifications_config
348 and maint_notifications_config.enabled
349 and not is_protocol_supported
350 ):
351 raise RedisError(
352 "Maintenance notifications handlers on connection are only supported with RESP version 3"
353 )
354 if maint_notifications_config is None and is_protocol_supported:
355 maint_notifications_config = MaintNotificationsConfig()
357 self.maint_notifications_config = maint_notifications_config
359 if self.maint_notifications_config and self.maint_notifications_config.enabled:
360 self._oss_cluster_maint_notifications_handler = (
361 OSSMaintNotificationsHandler(self, self.maint_notifications_config)
362 )
363 # Update connection kwargs for all future nodes connections
364 self._update_connection_kwargs_for_maint_notifications(
365 self._oss_cluster_maint_notifications_handler
366 )
367 # Update existing nodes connections - they are created as part of the RedisCluster constructor
368 for node in self.get_nodes():
369 if node.redis_connection is None:
370 continue
371 node.redis_connection.connection_pool.update_maint_notifications_config(
372 self.maint_notifications_config,
373 oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler,
374 )
375 else:
376 self._oss_cluster_maint_notifications_handler = None
378 def _update_connection_kwargs_for_maint_notifications(
379 self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler
380 ):
381 """
382 Update the connection kwargs for all future connections.
383 """
384 self.nodes_manager.connection_kwargs.update(
385 {
386 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
387 }
388 )
391class AbstractRedisCluster:
392 RedisClusterRequestTTL = 16
394 PRIMARIES = "primaries"
395 REPLICAS = "replicas"
396 ALL_NODES = "all"
397 RANDOM = "random"
398 DEFAULT_NODE = "default-node"
400 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
402 COMMAND_FLAGS = dict_merge(
403 list_keys_to_dict(
404 [
405 "ACL CAT",
406 "ACL DELUSER",
407 "ACL DRYRUN",
408 "ACL GENPASS",
409 "ACL GETUSER",
410 "ACL HELP",
411 "ACL LIST",
412 "ACL LOG",
413 "ACL LOAD",
414 "ACL SAVE",
415 "ACL SETUSER",
416 "ACL USERS",
417 "ACL WHOAMI",
418 "AUTH",
419 "CLIENT LIST",
420 "CLIENT SETINFO",
421 "CLIENT SETNAME",
422 "CLIENT GETNAME",
423 "CONFIG SET",
424 "CONFIG REWRITE",
425 "CONFIG RESETSTAT",
426 "TIME",
427 "PUBSUB CHANNELS",
428 "PUBSUB NUMPAT",
429 "PUBSUB NUMSUB",
430 "PUBSUB SHARDCHANNELS",
431 "PUBSUB SHARDNUMSUB",
432 "PING",
433 "INFO",
434 "SHUTDOWN",
435 "KEYS",
436 "DBSIZE",
437 "BGSAVE",
438 "SLOWLOG GET",
439 "SLOWLOG LEN",
440 "SLOWLOG RESET",
441 "WAIT",
442 "WAITAOF",
443 "SAVE",
444 "MEMORY PURGE",
445 "MEMORY MALLOC-STATS",
446 "MEMORY STATS",
447 "LASTSAVE",
448 "CLIENT TRACKINGINFO",
449 "CLIENT PAUSE",
450 "CLIENT UNPAUSE",
451 "CLIENT UNBLOCK",
452 "CLIENT ID",
453 "CLIENT REPLY",
454 "CLIENT GETREDIR",
455 "CLIENT INFO",
456 "CLIENT KILL",
457 "READONLY",
458 "CLUSTER INFO",
459 "CLUSTER MEET",
460 "CLUSTER MYSHARDID",
461 "CLUSTER NODES",
462 "CLUSTER REPLICAS",
463 "CLUSTER RESET",
464 "CLUSTER SET-CONFIG-EPOCH",
465 "CLUSTER SLOTS",
466 "CLUSTER SHARDS",
467 "CLUSTER COUNT-FAILURE-REPORTS",
468 "CLUSTER KEYSLOT",
469 "COMMAND",
470 "COMMAND COUNT",
471 "COMMAND LIST",
472 "COMMAND GETKEYS",
473 "CONFIG GET",
474 "DEBUG",
475 "RANDOMKEY",
476 "READONLY",
477 "READWRITE",
478 "TIME",
479 "TFUNCTION LOAD",
480 "TFUNCTION DELETE",
481 "TFUNCTION LIST",
482 "TFCALL",
483 "TFCALLASYNC",
484 "LATENCY HISTORY",
485 "LATENCY LATEST",
486 "LATENCY RESET",
487 "MODULE LIST",
488 "MODULE LOAD",
489 "MODULE UNLOAD",
490 "MODULE LOADEX",
491 ],
492 DEFAULT_NODE,
493 ),
494 list_keys_to_dict(
495 [
496 "FLUSHALL",
497 "FLUSHDB",
498 "FUNCTION DELETE",
499 "FUNCTION FLUSH",
500 "FUNCTION LIST",
501 "FUNCTION LOAD",
502 "FUNCTION RESTORE",
503 "SCAN",
504 "SCRIPT EXISTS",
505 "SCRIPT FLUSH",
506 "SCRIPT LOAD",
507 ],
508 PRIMARIES,
509 ),
510 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
511 list_keys_to_dict(
512 [
513 "CLUSTER COUNTKEYSINSLOT",
514 "CLUSTER DELSLOTS",
515 "CLUSTER DELSLOTSRANGE",
516 "CLUSTER GETKEYSINSLOT",
517 "CLUSTER SETSLOT",
518 ],
519 SLOT_ID,
520 ),
521 )
523 SEARCH_COMMANDS = (
524 [
525 "FT.CREATE",
526 "FT.SEARCH",
527 "FT.AGGREGATE",
528 "FT.EXPLAIN",
529 "FT.EXPLAINCLI",
530 "FT,PROFILE",
531 "FT.ALTER",
532 "FT.DROPINDEX",
533 "FT.ALIASADD",
534 "FT.ALIASUPDATE",
535 "FT.ALIASDEL",
536 "FT.TAGVALS",
537 "FT.SUGADD",
538 "FT.SUGGET",
539 "FT.SUGDEL",
540 "FT.SUGLEN",
541 "FT.SYNUPDATE",
542 "FT.SYNDUMP",
543 "FT.SPELLCHECK",
544 "FT.DICTADD",
545 "FT.DICTDEL",
546 "FT.DICTDUMP",
547 "FT.INFO",
548 "FT._LIST",
549 "FT.CONFIG",
550 "FT.ADD",
551 "FT.DEL",
552 "FT.DROP",
553 "FT.GET",
554 "FT.MGET",
555 "FT.SYNADD",
556 ],
557 )
559 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
560 "CLUSTER SLOTS": parse_cluster_slots,
561 "CLUSTER SHARDS": parse_cluster_shards,
562 "CLUSTER MYSHARDID": parse_cluster_myshardid,
563 }
565 RESULT_CALLBACKS = dict_merge(
566 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
567 list_keys_to_dict(
568 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
569 ),
570 list_keys_to_dict(
571 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
572 ),
573 list_keys_to_dict(
574 [
575 "PING",
576 "CONFIG SET",
577 "CONFIG REWRITE",
578 "CONFIG RESETSTAT",
579 "CLIENT SETNAME",
580 "BGSAVE",
581 "SLOWLOG RESET",
582 "SAVE",
583 "MEMORY PURGE",
584 "CLIENT PAUSE",
585 "CLIENT UNPAUSE",
586 ],
587 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
588 ),
589 list_keys_to_dict(
590 ["DBSIZE", "WAIT"],
591 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
592 ),
593 list_keys_to_dict(
594 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
595 ),
596 list_keys_to_dict(["SCAN"], parse_scan_result),
597 list_keys_to_dict(
598 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
599 ),
600 list_keys_to_dict(
601 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
602 ),
603 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
604 )
606 ERRORS_ALLOW_RETRY = (
607 ConnectionError,
608 TimeoutError,
609 ClusterDownError,
610 SlotNotCoveredError,
611 )
613 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
614 """Replace the default cluster node.
615 A random cluster node will be chosen if target_node isn't passed, and primaries
616 will be prioritized. The default node will not be changed if there are no other
617 nodes in the cluster.
619 Args:
620 target_node (ClusterNode, optional): Target node to replace the default
621 node. Defaults to None.
622 """
623 if target_node:
624 self.nodes_manager.default_node = target_node
625 else:
626 curr_node = self.get_default_node()
627 primaries = [node for node in self.get_primaries() if node != curr_node]
628 if primaries:
629 # Choose a primary if the cluster contains different primaries
630 self.nodes_manager.default_node = random.choice(primaries)
631 else:
632 # Otherwise, choose a primary if the cluster contains different primaries
633 replicas = [node for node in self.get_replicas() if node != curr_node]
634 if replicas:
635 self.nodes_manager.default_node = random.choice(replicas)
638class RedisCluster(
639 AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands
640):
641 # Type discrimination marker for @overload self-type pattern
642 _is_async_client: Literal[False] = False
644 @classmethod
645 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
646 """
647 Return a Redis client object configured from the given URL
649 For example::
651 redis://[[username]:[password]]@localhost:6379/0
652 rediss://[[username]:[password]]@localhost:6379/0
653 unix://[username@]/path/to/socket.sock?db=0[&password=password]
655 Three URL schemes are supported:
657 - `redis://` creates a TCP socket connection. See more at:
658 <https://www.iana.org/assignments/uri-schemes/prov/redis>
659 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
660 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
661 - ``unix://``: creates a Unix Domain Socket connection.
663 The username, password, hostname, path and all querystring values
664 are passed through urllib.parse.unquote in order to replace any
665 percent-encoded values with their corresponding characters.
667 There are several ways to specify a database number. The first value
668 found will be used:
670 1. A ``db`` querystring option, e.g. redis://localhost?db=0
671 2. If using the redis:// or rediss:// schemes, the path argument
672 of the url, e.g. redis://localhost/0
673 3. A ``db`` keyword argument to this function.
675 If none of these options are specified, the default db=0 is used.
677 All querystring options are cast to their appropriate Python types.
678 Boolean arguments can be specified with string values "True"/"False"
679 or "Yes"/"No". Values that cannot be properly cast cause a
680 ``ValueError`` to be raised. Once parsed, the querystring arguments
681 and keyword arguments are passed to the ``ConnectionPool``'s
682 class initializer. In the case of conflicting arguments, querystring
683 arguments always win.
685 """
686 return cls(url=url, **kwargs)
688 @deprecated_args(
689 args_to_warn=["read_from_replicas"],
690 reason="Please configure the 'load_balancing_strategy' instead",
691 version="5.3.0",
692 )
693 @deprecated_args(
694 args_to_warn=[
695 "cluster_error_retry_attempts",
696 ],
697 reason="Please configure the 'retry' object instead",
698 version="6.0.0",
699 )
700 def __init__(
701 self,
702 host: Optional[str] = None,
703 port: int = 6379,
704 startup_nodes: Optional[List["ClusterNode"]] = None,
705 cluster_error_retry_attempts: int = DEFAULT_RETRY_COUNT,
706 retry: Optional["Retry"] = None,
707 require_full_coverage: bool = True,
708 reinitialize_steps: int = 5,
709 read_from_replicas: bool = False,
710 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
711 dynamic_startup_nodes: bool = True,
712 url: Optional[str] = None,
713 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
714 cache: Optional[CacheInterface] = None,
715 cache_config: Optional[CacheConfig] = None,
716 event_dispatcher: Optional[EventDispatcher] = None,
717 policy_resolver: PolicyResolver = StaticPolicyResolver(),
718 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
719 **kwargs,
720 ):
721 """
722 Initialize a new RedisCluster client.
724 :param startup_nodes:
725 List of nodes from which initial bootstrapping can be done
726 :param host:
727 Can be used to point to a startup node
728 :param port:
729 Can be used to point to a startup node
730 :param require_full_coverage:
731 When set to False (default value): the client will not require a
732 full coverage of the slots. However, if not all slots are covered,
733 and at least one node has 'cluster-require-full-coverage' set to
734 'yes,' the server will throw a ClusterDownError for some key-based
735 commands. See -
736 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
737 When set to True: all slots must be covered to construct the
738 cluster client. If not all slots are covered, RedisClusterException
739 will be thrown.
740 :param read_from_replicas:
741 @deprecated - please use load_balancing_strategy instead
742 Enable read from replicas in READONLY mode. You can read possibly
743 stale data.
744 When set to true, read commands will be assigned between the
745 primary and its replications in a Round-Robin manner.
746 :param load_balancing_strategy:
747 Enable read from replicas in READONLY mode and defines the load balancing
748 strategy that will be used for cluster node selection.
749 The data read from replicas is eventually consistent with the data in primary nodes.
750 :param dynamic_startup_nodes:
751 Set the RedisCluster's startup nodes to all of the discovered nodes.
752 If true (default value), the cluster's discovered nodes will be used to
753 determine the cluster nodes-slots mapping in the next topology refresh.
754 It will remove the initial passed startup nodes if their endpoints aren't
755 listed in the CLUSTER SLOTS output.
756 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
757 specific IP addresses, it is best to set it to false.
758 :param cluster_error_retry_attempts:
759 @deprecated - Please configure the 'retry' object instead
760 In case 'retry' object is set - this argument is ignored!
762 Number of times to retry before raising an error when
763 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
764 :class:`~.ClusterDownError` are encountered
765 :param retry:
766 A retry object that defines the retry strategy and the number of
767 retries for the cluster client.
768 In current implementation for the cluster client (starting form redis-py version 6.0.0)
769 the retry object is not yet fully utilized, instead it is used just to determine
770 the number of retries for the cluster client.
771 In the future releases the retry object will be used to handle the cluster client retries!
772 :param reinitialize_steps:
773 Specifies the number of MOVED errors that need to occur before
774 reinitializing the whole cluster topology. If a MOVED error occurs
775 and the cluster does not need to be reinitialized on this current
776 error handling, only the MOVED slot will be patched with the
777 redirected node.
778 To reinitialize the cluster on every MOVED error, set
779 reinitialize_steps to 1.
780 To avoid reinitializing the cluster on moved errors, set
781 reinitialize_steps to 0.
782 :param address_remap:
783 An optional callable which, when provided with an internal network
784 address of a node, e.g. a `(host, port)` tuple, will return the address
785 where the node is reachable. This can be used to map the addresses at
786 which the nodes _think_ they are, to addresses at which a client may
787 reach them, such as when they sit behind a proxy.
789 :param maint_notifications_config:
790 Configures the nodes connections to support maintenance notifications - see
791 `redis.maint_notifications.MaintNotificationsConfig` for details.
792 Only supported with RESP3.
793 If not provided and protocol is RESP3, the maintenance notifications
794 will be enabled by default (logic is included in the NodesManager
795 initialization).
796 :**kwargs:
797 Extra arguments that will be sent into Redis instance when created
798 (See Official redis-py doc for supported kwargs - the only limitation
799 is that you can't provide 'retry' object as part of kwargs.
800 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
801 Some kwargs are not supported and will raise a
802 RedisClusterException:
803 - db (Redis do not support database SELECT in cluster mode)
805 """
806 if startup_nodes is None:
807 startup_nodes = []
809 if "db" in kwargs:
810 # Argument 'db' is not possible to use in cluster mode
811 raise RedisClusterException(
812 "Argument 'db' is not possible to use in cluster mode"
813 )
815 if "retry" in kwargs:
816 # Argument 'retry' is not possible to be used in kwargs when in cluster mode
817 # the kwargs are set to the lower level connections to the cluster nodes
818 # and there we provide retry configuration without retries allowed.
819 # The retries should be handled on cluster client level.
820 raise RedisClusterException(
821 "The 'retry' argument cannot be used in kwargs when running in cluster mode."
822 )
824 # Get the startup node/s
825 from_url = False
826 if url is not None:
827 from_url = True
828 url_options = parse_url(url)
829 if "path" in url_options:
830 raise RedisClusterException(
831 "RedisCluster does not currently support Unix Domain "
832 "Socket connections"
833 )
834 if "db" in url_options and url_options["db"] != 0:
835 # Argument 'db' is not possible to use in cluster mode
836 raise RedisClusterException(
837 "A ``db`` querystring option can only be 0 in cluster mode"
838 )
839 kwargs.update(url_options)
840 host = kwargs.get("host")
841 port = kwargs.get("port", port)
842 startup_nodes.append(ClusterNode(host, port))
843 elif host is not None and port is not None:
844 startup_nodes.append(ClusterNode(host, port))
845 elif len(startup_nodes) == 0:
846 # No startup node was provided
847 raise RedisClusterException(
848 "RedisCluster requires at least one node to discover the "
849 "cluster. Please provide one of the followings:\n"
850 "1. host and port, for example:\n"
851 " RedisCluster(host='localhost', port=6379)\n"
852 "2. list of startup nodes, for example:\n"
853 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
854 " ClusterNode('localhost', 6378)])"
855 )
856 # Update the connection arguments
857 # Whenever a new connection is established, RedisCluster's on_connect
858 # method should be run
859 # If the user passed on_connect function we'll save it and run it
860 # inside the RedisCluster.on_connect() function
861 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
862 kwargs.update({"redis_connect_func": self.on_connect})
863 kwargs = cleanup_kwargs(**kwargs)
864 if retry:
865 self.retry = retry
866 else:
867 self.retry = Retry(
868 backoff=ExponentialWithJitterBackoff(
869 base=DEFAULT_RETRY_BASE, cap=DEFAULT_RETRY_CAP
870 ),
871 retries=cluster_error_retry_attempts,
872 )
874 self.encoder = Encoder(
875 kwargs.get("encoding", "utf-8"),
876 kwargs.get("encoding_errors", "strict"),
877 kwargs.get("decode_responses", False),
878 )
879 protocol = kwargs.get("protocol", None)
880 if (cache_config or cache) and not check_protocol_version(protocol, 3):
881 raise RedisError("Client caching is only supported with RESP version 3")
883 if maint_notifications_config and not check_protocol_version(protocol, 3):
884 raise RedisError(
885 "Maintenance notifications are only supported with RESP version 3"
886 )
887 if check_protocol_version(protocol, 3) and maint_notifications_config is None:
888 maint_notifications_config = MaintNotificationsConfig()
890 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
891 self.node_flags = self.__class__.NODE_FLAGS.copy()
892 self.read_from_replicas = read_from_replicas
893 self.load_balancing_strategy = load_balancing_strategy
894 self.reinitialize_counter = 0
895 self.reinitialize_steps = reinitialize_steps
896 if event_dispatcher is None:
897 self._event_dispatcher = EventDispatcher()
898 else:
899 self._event_dispatcher = event_dispatcher
900 self.startup_nodes = startup_nodes
902 self.nodes_manager = NodesManager(
903 startup_nodes=startup_nodes,
904 from_url=from_url,
905 require_full_coverage=require_full_coverage,
906 dynamic_startup_nodes=dynamic_startup_nodes,
907 address_remap=address_remap,
908 cache=cache,
909 cache_config=cache_config,
910 event_dispatcher=self._event_dispatcher,
911 maint_notifications_config=maint_notifications_config,
912 **kwargs,
913 )
915 cluster_response_callbacks = dict(
916 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
917 )
918 legacy_responses = kwargs.get("legacy_responses", True)
919 protocol = kwargs.get("protocol")
920 if not legacy_responses:
921 cluster_response_callbacks["CLUSTER SHARDS"] = parse_cluster_shards_unified
922 elif protocol is None:
923 cluster_response_callbacks["CLUSTER SHARDS"] = (
924 parse_cluster_shards_with_str_keys
925 )
926 self.cluster_response_callbacks = CaseInsensitiveDict(
927 cluster_response_callbacks
928 )
929 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
931 # For backward compatibility, mapping from existing policies to new one
932 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
933 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
934 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
935 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
936 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
937 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
938 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
939 }
941 self._policies_callback_mapping: dict[
942 Union[RequestPolicy, ResponsePolicy], Callable
943 ] = {
944 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
945 self.get_random_primary_or_all_nodes(command_name)
946 ],
947 RequestPolicy.DEFAULT_KEYED: lambda command,
948 *args: self.get_nodes_from_slot(command, *args),
949 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
950 RequestPolicy.ALL_SHARDS: self.get_primaries,
951 RequestPolicy.ALL_NODES: self.get_nodes,
952 RequestPolicy.ALL_REPLICAS: self.get_replicas,
953 RequestPolicy.MULTI_SHARD: lambda *args,
954 **kwargs: self._split_multi_shard_command(*args, **kwargs),
955 RequestPolicy.SPECIAL: self.get_special_nodes,
956 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
957 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
958 }
960 self._policy_resolver = policy_resolver
961 self.commands_parser = CommandsParser(self)
963 # Node where FT.AGGREGATE command is executed.
964 self._aggregate_nodes = None
965 self._lock = threading.RLock()
967 MaintNotificationsAbstractRedisCluster.__init__(
968 self, maint_notifications_config, **kwargs
969 )
971 def __enter__(self):
972 return self
974 def __exit__(self, exc_type, exc_value, traceback):
975 self.close()
977 def __del__(self):
978 try:
979 self.close()
980 except Exception:
981 pass
983 def disconnect_connection_pools(self):
984 for node in self.get_nodes():
985 if node.redis_connection:
986 try:
987 node.redis_connection.connection_pool.disconnect()
988 except OSError:
989 # Client was already disconnected. do nothing
990 pass
992 def on_connect(self, connection):
993 """
994 Initialize the connection, authenticate and select a database and send
995 READONLY if it is set during object initialization.
996 """
997 connection.on_connect()
999 if self.read_from_replicas or self.load_balancing_strategy:
1000 # Sending READONLY command to server to configure connection as
1001 # readonly. Since each cluster node may change its server type due
1002 # to a failover, we should establish a READONLY connection
1003 # regardless of the server type. If this is a primary connection,
1004 # READONLY would not affect executing write commands.
1005 connection.send_command("READONLY")
1006 if str_if_bytes(connection.read_response()) != "OK":
1007 raise ConnectionError("READONLY command failed")
1009 if self.user_on_connect_func is not None:
1010 self.user_on_connect_func(connection)
1012 def get_redis_connection(self, node: "ClusterNode") -> Redis:
1013 if not node.redis_connection:
1014 with self._lock:
1015 if not node.redis_connection:
1016 self.nodes_manager.create_redis_connections([node])
1017 return node.redis_connection
1019 def get_node(self, host=None, port=None, node_name=None):
1020 return self.nodes_manager.get_node(host, port, node_name)
1022 def get_primaries(self):
1023 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
1025 def get_replicas(self):
1026 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
1028 def get_random_node(self):
1029 return random.choice(list(self.nodes_manager.nodes_cache.values()))
1031 def get_random_primary_or_all_nodes(self, command_name):
1032 """
1033 Returns random primary or all nodes depends on READONLY mode.
1034 """
1035 if self.read_from_replicas and command_name in READ_COMMANDS:
1036 return self.get_random_node()
1038 return self.get_random_primary_node()
1040 def get_nodes(self):
1041 return list(self.nodes_manager.nodes_cache.values())
1043 def get_node_from_key(self, key, replica=False):
1044 """
1045 Get the node that holds the key's slot.
1046 If replica set to True but the slot doesn't have any replicas, None is
1047 returned.
1048 """
1049 slot = self.keyslot(key)
1050 slot_cache = self.nodes_manager.slots_cache.get(slot)
1051 if slot_cache is None or len(slot_cache) == 0:
1052 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
1053 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
1054 return None
1055 elif replica:
1056 node_idx = 1
1057 else:
1058 # primary
1059 node_idx = 0
1061 return slot_cache[node_idx]
1063 def get_default_node(self):
1064 """
1065 Get the cluster's default node
1066 """
1067 return self.nodes_manager.default_node
1069 def get_nodes_from_slot(self, command: str, *args):
1070 """
1071 Returns a list of nodes that hold the specified keys' slots.
1072 """
1073 # get the node that holds the key's slot
1074 slot = self.determine_slot(*args)
1075 node = self.nodes_manager.get_node_from_slot(
1076 slot,
1077 self.read_from_replicas and command in READ_COMMANDS,
1078 self.load_balancing_strategy if command in READ_COMMANDS else None,
1079 )
1080 return [node]
1082 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]:
1083 """
1084 Splits the command with Multi-Shard policy, to the multiple commands
1085 """
1086 keys = self._get_command_keys(*args)
1087 commands = []
1089 for key in keys:
1090 commands.append(
1091 {
1092 "args": (args[0], key),
1093 "kwargs": kwargs,
1094 }
1095 )
1097 return commands
1099 def get_special_nodes(self) -> Optional[list["ClusterNode"]]:
1100 """
1101 Returns a list of nodes for commands with a special policy.
1102 """
1103 if not self._aggregate_nodes:
1104 raise RedisClusterException(
1105 "Cannot execute FT.CURSOR commands without FT.AGGREGATE"
1106 )
1108 return self._aggregate_nodes
1110 def get_random_primary_node(self) -> "ClusterNode":
1111 """
1112 Returns a random primary node
1113 """
1114 return random.choice(self.get_primaries())
1116 def _evaluate_all_succeeded(self, res):
1117 """
1118 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED
1119 """
1120 first_successful_response = None
1122 if isinstance(res, dict):
1123 for key, value in res.items():
1124 if value:
1125 if first_successful_response is None:
1126 first_successful_response = {key: value}
1127 else:
1128 return {key: False}
1129 else:
1130 for response in res:
1131 if response:
1132 if first_successful_response is None:
1133 # Dynamically resolve type
1134 first_successful_response = type(response)(response)
1135 else:
1136 return type(response)(False)
1138 return first_successful_response
1140 def set_default_node(self, node):
1141 """
1142 Set the default node of the cluster.
1143 :param node: 'ClusterNode'
1144 :return True if the default node was set, else False
1145 """
1146 if node is None or self.get_node(node_name=node.name) is None:
1147 return False
1148 self.nodes_manager.default_node = node
1149 return True
1151 def set_retry(self, retry: Retry) -> None:
1152 self.retry = retry
1154 def monitor(self, target_node=None):
1155 """
1156 Returns a Monitor object for the specified target node.
1157 The default cluster node will be selected if no target node was
1158 specified.
1159 Monitor is useful for handling the MONITOR command to the redis server.
1160 next_command() method returns one command from monitor
1161 listen() method yields commands from monitor.
1162 """
1163 if target_node is None:
1164 target_node = self.get_default_node()
1165 if target_node.redis_connection is None:
1166 raise RedisClusterException(
1167 f"Cluster Node {target_node.name} has no redis_connection"
1168 )
1169 return target_node.redis_connection.monitor()
1171 def pubsub(self, node=None, host=None, port=None, **kwargs):
1172 """
1173 Allows passing a ClusterNode, or host&port, to get a pubsub instance
1174 connected to the specified node
1175 """
1176 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
1178 def keyspace_notifications(
1179 self,
1180 key_prefix: Union[str, bytes, None] = None,
1181 ignore_subscribe_messages: bool = True,
1182 ) -> "ClusterKeyspaceNotifications":
1183 """
1184 Return a :class:`~redis.keyspace_notifications.ClusterKeyspaceNotifications`
1185 object for subscribing to keyspace and keyevent notifications across
1186 all primary nodes in the cluster.
1188 Note: Keyspace notifications must be enabled on all Redis cluster nodes
1189 via the ``notify-keyspace-events`` configuration option.
1191 Args:
1192 key_prefix: Optional prefix to filter and strip from keys in
1193 notifications.
1194 ignore_subscribe_messages: If True, subscribe/unsubscribe
1195 confirmations are not returned by
1196 get_message/listen.
1197 """
1198 from redis.keyspace_notifications import ClusterKeyspaceNotifications
1200 return ClusterKeyspaceNotifications(
1201 self,
1202 key_prefix=key_prefix,
1203 ignore_subscribe_messages=ignore_subscribe_messages,
1204 )
1206 def pipeline(self, transaction=None, shard_hint=None):
1207 """
1208 Cluster impl:
1209 Pipelines do not work in cluster mode the same way they
1210 do in normal mode. Create a clone of this object so
1211 that simulating pipelines will work correctly. Each
1212 command will be called directly when used and
1213 when calling execute() will only return the result stack.
1214 """
1215 if shard_hint:
1216 raise RedisClusterException("shard_hint is deprecated in cluster mode")
1218 return ClusterPipeline(
1219 nodes_manager=self.nodes_manager,
1220 commands_parser=self.commands_parser,
1221 startup_nodes=self.nodes_manager.startup_nodes,
1222 result_callbacks=self.result_callbacks,
1223 cluster_response_callbacks=self.cluster_response_callbacks,
1224 read_from_replicas=self.read_from_replicas,
1225 load_balancing_strategy=self.load_balancing_strategy,
1226 reinitialize_steps=self.reinitialize_steps,
1227 retry=self.retry,
1228 lock=self._lock,
1229 transaction=transaction,
1230 event_dispatcher=self._event_dispatcher,
1231 )
1233 def lock(
1234 self,
1235 name,
1236 timeout=None,
1237 sleep=0.1,
1238 blocking=True,
1239 blocking_timeout=None,
1240 lock_class=None,
1241 thread_local=True,
1242 raise_on_release_error: bool = True,
1243 ):
1244 """
1245 Return a new Lock object using key ``name`` that mimics
1246 the behavior of threading.Lock.
1248 If specified, ``timeout`` indicates a maximum life for the lock.
1249 By default, it will remain locked until release() is called.
1251 ``sleep`` indicates the amount of time to sleep per loop iteration
1252 when the lock is in blocking mode and another client is currently
1253 holding the lock.
1255 ``blocking`` indicates whether calling ``acquire`` should block until
1256 the lock has been acquired or to fail immediately, causing ``acquire``
1257 to return False and the lock not being acquired. Defaults to True.
1258 Note this value can be overridden by passing a ``blocking``
1259 argument to ``acquire``.
1261 ``blocking_timeout`` indicates the maximum amount of time in seconds to
1262 spend trying to acquire the lock. A value of ``None`` indicates
1263 continue trying forever. ``blocking_timeout`` can be specified as a
1264 float or integer, both representing the number of seconds to wait.
1266 ``lock_class`` forces the specified lock implementation. Note that as
1267 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
1268 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
1269 you have created your own custom lock class.
1271 ``thread_local`` indicates whether the lock token is placed in
1272 thread-local storage. By default, the token is placed in thread local
1273 storage so that a thread only sees its token, not a token set by
1274 another thread. Consider the following timeline:
1276 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
1277 thread-1 sets the token to "abc"
1278 time: 1, thread-2 blocks trying to acquire `my-lock` using the
1279 Lock instance.
1280 time: 5, thread-1 has not yet completed. redis expires the lock
1281 key.
1282 time: 5, thread-2 acquired `my-lock` now that it's available.
1283 thread-2 sets the token to "xyz"
1284 time: 6, thread-1 finishes its work and calls release(). if the
1285 token is *not* stored in thread local storage, then
1286 thread-1 would see the token value as "xyz" and would be
1287 able to successfully release the thread-2's lock.
1289 ``raise_on_release_error`` indicates whether to raise an exception when
1290 the lock is no longer owned when exiting the context manager. By default,
1291 this is True, meaning an exception will be raised. If False, the warning
1292 will be logged and the exception will be suppressed.
1294 In some use cases it's necessary to disable thread local storage. For
1295 example, if you have code where one thread acquires a lock and passes
1296 that lock instance to a worker thread to release later. If thread
1297 local storage isn't disabled in this case, the worker thread won't see
1298 the token set by the thread that acquired the lock. Our assumption
1299 is that these cases aren't common and as such default to using
1300 thread local storage."""
1301 if lock_class is None:
1302 lock_class = Lock
1303 return lock_class(
1304 self,
1305 name,
1306 timeout=timeout,
1307 sleep=sleep,
1308 blocking=blocking,
1309 blocking_timeout=blocking_timeout,
1310 thread_local=thread_local,
1311 raise_on_release_error=raise_on_release_error,
1312 )
1314 def set_response_callback(self, command, callback):
1315 """Set a custom Response Callback"""
1316 self.cluster_response_callbacks[command] = callback
1318 def _determine_nodes(
1319 self, *args, request_policy: RequestPolicy, **kwargs
1320 ) -> List["ClusterNode"]:
1321 """
1322 Determines a nodes the command should be executed on.
1323 """
1324 command = args[0].upper()
1325 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1326 command = f"{args[0]} {args[1]}".upper()
1328 nodes_flag = kwargs.pop("nodes_flag", None)
1329 if nodes_flag is not None:
1330 # nodes flag passed by the user
1331 command_flag = nodes_flag
1332 else:
1333 # get the nodes group for this command if it was predefined
1334 command_flag = self.command_flags.get(command)
1336 if command_flag in self._command_flags_mapping:
1337 request_policy = self._command_flags_mapping[command_flag]
1339 policy_callback = self._policies_callback_mapping[request_policy]
1341 if request_policy == RequestPolicy.DEFAULT_KEYED:
1342 nodes = policy_callback(command, *args)
1343 elif request_policy == RequestPolicy.MULTI_SHARD:
1344 nodes = policy_callback(*args, **kwargs)
1345 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
1346 nodes = policy_callback(args[0])
1347 else:
1348 nodes = policy_callback()
1350 if args[0].lower() == "ft.aggregate":
1351 self._aggregate_nodes = nodes
1353 return nodes
1355 def _should_reinitialized(self):
1356 # To reinitialize the cluster on every MOVED error,
1357 # set reinitialize_steps to 1.
1358 # To avoid reinitializing the cluster on moved errors, set
1359 # reinitialize_steps to 0.
1360 if self.reinitialize_steps == 0:
1361 return False
1362 else:
1363 return self.reinitialize_counter % self.reinitialize_steps == 0
1365 def keyslot(self, key):
1366 """
1367 Calculate keyslot for a given key.
1368 See Keys distribution model in https://redis.io/topics/cluster-spec
1369 """
1370 k = self.encoder.encode(key)
1371 return key_slot(k)
1373 def _get_command_keys(self, *args):
1374 """
1375 Get the keys in the command. If the command has no keys in in, None is
1376 returned.
1378 NOTE: Due to a bug in redis<7.0, this function does not work properly
1379 for EVAL or EVALSHA when the `numkeys` arg is 0.
1380 - issue: https://github.com/redis/redis/issues/9493
1381 - fix: https://github.com/redis/redis/pull/9733
1383 So, don't use this function with EVAL or EVALSHA.
1384 """
1385 redis_conn = self.get_default_node().redis_connection
1386 return self.commands_parser.get_keys(redis_conn, *args)
1388 def determine_slot(self, *args) -> Optional[int]:
1389 """
1390 Figure out what slot to use based on args.
1392 Raises a RedisClusterException if there's a missing key and we can't
1393 determine what slots to map the command to; or, if the keys don't
1394 all map to the same key slot.
1395 """
1396 command = args[0]
1397 if self.command_flags.get(command) == SLOT_ID:
1398 # The command contains the slot ID
1399 return args[1]
1401 # Get the keys in the command
1403 # CLIENT TRACKING is a special case.
1404 # It doesn't have any keys, it needs to be sent to the provided nodes
1405 # By default it will be sent to all nodes.
1406 if command.upper() == "CLIENT TRACKING":
1407 return None
1409 # EVAL and EVALSHA are common enough that it's wasteful to go to the
1410 # redis server to parse the keys. Besides, there is a bug in redis<7.0
1411 # where `self._get_command_keys()` fails anyway. So, we special case
1412 # EVAL/EVALSHA.
1413 if command.upper() in ("EVAL", "EVALSHA"):
1414 # command syntax: EVAL "script body" num_keys ...
1415 if len(args) <= 2:
1416 raise RedisClusterException(f"Invalid args in command: {args}")
1417 num_actual_keys = int(args[2])
1418 eval_keys = args[3 : 3 + num_actual_keys]
1419 # if there are 0 keys, that means the script can be run on any node
1420 # so we can just return a random slot
1421 if len(eval_keys) == 0:
1422 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1423 keys = eval_keys
1424 else:
1425 keys = self._get_command_keys(*args)
1426 if keys is None or len(keys) == 0:
1427 # FCALL can call a function with 0 keys, that means the function
1428 # can be run on any node so we can just return a random slot
1429 if command.upper() in ("FCALL", "FCALL_RO"):
1430 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1431 raise RedisClusterException(
1432 "No way to dispatch this command to Redis Cluster. "
1433 "Missing key.\nYou can execute the command by specifying "
1434 f"target nodes.\nCommand: {args}"
1435 )
1437 # single key command
1438 if len(keys) == 1:
1439 return self.keyslot(keys[0])
1441 # multi-key command; we need to make sure all keys are mapped to
1442 # the same slot
1443 slots = {self.keyslot(key) for key in keys}
1444 if len(slots) != 1:
1445 raise RedisClusterException(
1446 f"{command} - all keys must map to the same key slot"
1447 )
1449 return slots.pop()
1451 def get_encoder(self):
1452 """
1453 Get the connections' encoder
1454 """
1455 return self.encoder
1457 def get_connection_kwargs(self):
1458 """
1459 Get the connections' key-word arguments
1460 """
1461 return self.nodes_manager.connection_kwargs
1463 def _is_nodes_flag(self, target_nodes):
1464 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1466 def _parse_target_nodes(self, target_nodes):
1467 if isinstance(target_nodes, list):
1468 nodes = target_nodes
1469 elif isinstance(target_nodes, ClusterNode):
1470 # Supports passing a single ClusterNode as a variable
1471 nodes = [target_nodes]
1472 elif isinstance(target_nodes, dict):
1473 # Supports dictionaries of the format {node_name: node}.
1474 # It enables to execute commands with multi nodes as follows:
1475 # rc.cluster_save_config(rc.get_primaries())
1476 nodes = target_nodes.values()
1477 else:
1478 raise TypeError(
1479 "target_nodes type can be one of the following: "
1480 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1481 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1482 f"The passed type is {type(target_nodes)}"
1483 )
1484 return nodes
1486 def execute_command(self, *args, **kwargs):
1487 return self._internal_execute_command(*args, **kwargs)
1489 def _internal_execute_command(self, *args, **kwargs):
1490 """
1491 Wrapper for ERRORS_ALLOW_RETRY error handling.
1493 It will try the number of times specified by the retries property from
1494 config option "self.retry" which defaults to 10 unless manually
1495 configured.
1497 If it reaches the number of times, the command will raise the exception
1499 Key argument :target_nodes: can be passed with the following types:
1500 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1501 ClusterNode
1502 list<ClusterNode>
1503 dict<Any, ClusterNode>
1504 """
1505 target_nodes_specified = False
1506 is_default_node = False
1507 target_nodes = None
1508 passed_targets = kwargs.pop("target_nodes", None)
1509 command_policies = self._policy_resolver.resolve(args[0].lower())
1511 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1512 target_nodes = self._parse_target_nodes(passed_targets)
1513 target_nodes_specified = True
1515 if not command_policies and not target_nodes_specified:
1516 command = args[0].upper()
1517 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1518 command = f"{args[0]} {args[1]}".upper()
1520 # We only could resolve key properties if command is not
1521 # in a list of pre-defined request policies
1522 command_flag = self.command_flags.get(command)
1523 if not command_flag:
1524 # Fallback to default policy
1525 if not self.get_default_node():
1526 slot = None
1527 else:
1528 slot = self.determine_slot(*args)
1529 if slot is None:
1530 command_policies = CommandPolicies()
1531 else:
1532 command_policies = CommandPolicies(
1533 request_policy=RequestPolicy.DEFAULT_KEYED,
1534 response_policy=ResponsePolicy.DEFAULT_KEYED,
1535 )
1536 else:
1537 if command_flag in self._command_flags_mapping:
1538 command_policies = CommandPolicies(
1539 request_policy=self._command_flags_mapping[command_flag]
1540 )
1541 else:
1542 command_policies = CommandPolicies()
1543 elif not command_policies and target_nodes_specified:
1544 command_policies = CommandPolicies()
1546 # If an error that allows retrying was thrown, the nodes and slots
1547 # cache were reinitialized. We will retry executing the command with
1548 # the updated cluster setup only when the target nodes can be
1549 # determined again with the new cache tables. Therefore, when target
1550 # nodes were passed to this function, we cannot retry the command
1551 # execution since the nodes may not be valid anymore after the tables
1552 # were reinitialized. So in case of passed target nodes,
1553 # retry_attempts will be set to 0.
1554 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
1555 # Add one for the first execution
1556 execute_attempts = 1 + retry_attempts
1557 failure_count = 0
1559 # Start timing for observability
1560 start_time = time.monotonic()
1562 for _ in range(execute_attempts):
1563 try:
1564 res = {}
1565 if not target_nodes_specified:
1566 # Determine the nodes to execute the command on
1567 target_nodes = self._determine_nodes(
1568 *args,
1569 request_policy=command_policies.request_policy,
1570 nodes_flag=passed_targets,
1571 )
1573 if not target_nodes:
1574 raise RedisClusterException(
1575 f"No targets were found to execute {args} command on"
1576 )
1577 if (
1578 len(target_nodes) == 1
1579 and target_nodes[0] == self.get_default_node()
1580 ):
1581 is_default_node = True
1582 for node in target_nodes:
1583 res[node.name] = self._execute_command(node, *args, **kwargs)
1585 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED:
1586 break
1588 # Return the processed result
1589 return self._process_result(
1590 args[0],
1591 res,
1592 response_policy=command_policies.response_policy,
1593 **kwargs,
1594 )
1595 except Exception as e:
1596 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1597 if is_default_node:
1598 # Replace the default cluster node
1599 self.replace_default_node()
1600 # The nodes and slots cache were reinitialized.
1601 # Try again with the new cluster setup.
1602 retry_attempts -= 1
1603 failure_count += 1
1605 if hasattr(e, "connection"):
1606 self._record_command_metric(
1607 command_name=args[0],
1608 duration_seconds=time.monotonic() - start_time,
1609 connection=e.connection,
1610 error=e,
1611 )
1613 self._record_error_metric(
1614 error=e,
1615 connection=e.connection,
1616 retry_attempts=failure_count,
1617 )
1618 continue
1619 else:
1620 # raise the exception
1621 if hasattr(e, "connection"):
1622 self._record_error_metric(
1623 error=e,
1624 connection=e.connection,
1625 retry_attempts=failure_count,
1626 is_internal=False,
1627 )
1628 raise e
1630 def _execute_command(self, target_node, *args, **kwargs):
1631 """
1632 Send a command to a node in the cluster
1633 """
1634 command = args[0]
1635 redis_node = None
1636 connection = None
1637 redirect_addr = None
1638 asking = False
1639 moved = False
1640 ttl = int(self.RedisClusterRequestTTL)
1642 # Start timing for observability
1643 start_time = time.monotonic()
1645 while ttl > 0:
1646 ttl -= 1
1647 try:
1648 if asking:
1649 target_node = self.get_node(node_name=redirect_addr)
1650 elif moved:
1651 # MOVED occurred and the slots cache was updated,
1652 # refresh the target node
1653 slot = self.determine_slot(*args)
1654 target_node = self.nodes_manager.get_node_from_slot(
1655 slot,
1656 self.read_from_replicas and command in READ_COMMANDS,
1657 self.load_balancing_strategy
1658 if command in READ_COMMANDS
1659 else None,
1660 )
1661 moved = False
1663 redis_node = self.get_redis_connection(target_node)
1664 connection = get_connection(redis_node)
1665 if asking:
1666 connection.send_command("ASKING")
1667 redis_node.parse_response(connection, "ASKING", **kwargs)
1668 asking = False
1669 connection.send_command(*args, **kwargs)
1670 response = redis_node.parse_response(connection, command, **kwargs)
1672 # Remove keys entry, it needs only for cache.
1673 kwargs.pop("keys", None)
1675 if command in self.cluster_response_callbacks:
1676 response = self.cluster_response_callbacks[command](
1677 response, **kwargs
1678 )
1680 self._record_command_metric(
1681 command_name=command,
1682 duration_seconds=time.monotonic() - start_time,
1683 connection=connection,
1684 )
1685 return response
1686 except AuthenticationError as e:
1687 e.connection = connection if connection is not None else target_node
1688 self._record_command_metric(
1689 command_name=command,
1690 duration_seconds=time.monotonic() - start_time,
1691 connection=e.connection,
1692 error=e,
1693 )
1694 raise
1695 except MaxConnectionsError as e:
1696 # MaxConnectionsError indicates client-side resource exhaustion
1697 # (too many connections in the pool), not a node failure.
1698 # Don't treat this as a node failure - just re-raise the error
1699 # without reinitializing the cluster.
1700 # The connection in the error is used to report the metrics based on host and port info
1701 # so we use the target node object which contains the host and port info
1702 # because we did not get the connection yet
1703 e.connection = target_node
1704 self._record_command_metric(
1705 command_name=command,
1706 duration_seconds=time.monotonic() - start_time,
1707 connection=e.connection,
1708 error=e,
1709 )
1710 raise
1711 except (ConnectionError, TimeoutError) as e:
1712 if is_debug_log_enabled():
1713 socket_address = self._extracts_socket_address(connection)
1714 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1715 logger.debug(
1716 f"{type(e).__name__} received for command {args_log_str}, on node {target_node.name}, "
1717 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1718 )
1719 # this is used to report the metrics based on host and port info
1720 e.connection = connection if connection else target_node
1722 # ConnectionError can also be raised if we couldn't get a
1723 # connection from the pool before timing out, so check that
1724 # this is an actual connection before attempting to disconnect.
1725 if connection is not None:
1726 connection.disconnect()
1728 # Instead of setting to None, properly handle the pool
1729 # Get the pool safely - redis_connection could be set to None
1730 # by another thread between the check and access
1731 redis_conn = target_node.redis_connection
1732 if redis_conn is not None:
1733 pool = redis_conn.connection_pool
1734 if pool is not None:
1735 with pool._lock:
1736 # take care for the active connections in the pool
1737 pool.update_active_connections_for_reconnect()
1738 # disconnect all free connections
1739 pool.disconnect_free_connections()
1741 # Move the failed node to the end of the cached nodes list
1742 self.nodes_manager.move_node_to_end_of_cached_nodes(target_node.name)
1744 # DON'T set redis_connection = None - keep the pool for reuse
1745 # provide the name of the failed node so we can try it last
1746 self.nodes_manager.initialize(last_failed_node_name=target_node.name)
1747 self._record_command_metric(
1748 command_name=command,
1749 duration_seconds=time.monotonic() - start_time,
1750 connection=e.connection,
1751 error=e,
1752 )
1753 raise e
1754 except MovedError as e:
1755 if is_debug_log_enabled():
1756 socket_address = self._extracts_socket_address(connection)
1757 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1758 logger.debug(
1759 f"MOVED error received for command {args_log_str}, on node {target_node.name}, "
1760 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1761 )
1762 # First, we will try to patch the slots/nodes cache with the
1763 # redirected node output and try again. If MovedError exceeds
1764 # 'reinitialize_steps' number of times, we will force
1765 # reinitializing the tables, and then try again.
1766 # 'reinitialize_steps' counter will increase faster when
1767 # the same client object is shared between multiple threads. To
1768 # reduce the frequency you can set this variable in the
1769 # RedisCluster constructor.
1770 self.reinitialize_counter += 1
1771 if self._should_reinitialized():
1772 # during this call all connections are closed or marked for disconnect,
1773 # so we don't need to disconnect the changed node's connections
1774 self.nodes_manager.initialize(
1775 additional_startup_nodes_info=[(e.host, e.port)]
1776 )
1777 # Reset the counter
1778 self.reinitialize_counter = 0
1779 else:
1780 self.nodes_manager.move_slot(e)
1781 moved = True
1782 self._record_command_metric(
1783 command_name=command,
1784 duration_seconds=time.monotonic() - start_time,
1785 connection=connection,
1786 error=e,
1787 )
1788 self._record_error_metric(
1789 error=e,
1790 connection=connection,
1791 )
1792 except TryAgainError as e:
1793 if is_debug_log_enabled():
1794 socket_address = self._extracts_socket_address(connection)
1795 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1796 logger.debug(
1797 f"TRYAGAIN error received for command {args_log_str}, on node {target_node.name}, "
1798 f"and connection: {connection} using local socket address: {socket_address}"
1799 )
1800 if ttl < self.RedisClusterRequestTTL / 2:
1801 time.sleep(0.05)
1803 self._record_command_metric(
1804 command_name=command,
1805 duration_seconds=time.monotonic() - start_time,
1806 connection=connection,
1807 error=e,
1808 )
1809 self._record_error_metric(
1810 error=e,
1811 connection=connection,
1812 )
1813 except AskError as e:
1814 if is_debug_log_enabled():
1815 socket_address = self._extracts_socket_address(connection)
1816 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1817 logger.debug(
1818 f"ASK error received for command {args_log_str}, on node {target_node.name}, "
1819 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1820 )
1821 redirect_addr = get_node_name(host=e.host, port=e.port)
1822 asking = True
1824 self._record_command_metric(
1825 command_name=command,
1826 duration_seconds=time.monotonic() - start_time,
1827 connection=connection,
1828 error=e,
1829 )
1830 self._record_error_metric(
1831 error=e,
1832 connection=connection,
1833 )
1834 except (ClusterDownError, SlotNotCoveredError) as e:
1835 # ClusterDownError can occur during a failover and to get
1836 # self-healed, we will try to reinitialize the cluster layout
1837 # and retry executing the command
1839 # SlotNotCoveredError can occur when the cluster is not fully
1840 # initialized or can be temporary issue.
1841 # We will try to reinitialize the cluster topology
1842 # and retry executing the command
1844 time.sleep(0.25)
1845 self.nodes_manager.initialize()
1847 # if we have a connection, use it, otherwise use the target node
1848 # object which contains the host and port info
1849 # this is used to report the metrics based on host and port info
1850 e.connection = connection if connection else target_node
1851 self._record_command_metric(
1852 command_name=command,
1853 duration_seconds=time.monotonic() - start_time,
1854 connection=e.connection,
1855 error=e,
1856 )
1857 raise
1858 except ResponseError as e:
1859 # this is used to report the metrics based on host and port info
1860 # ResponseError typically happens after get_connection() succeeds,
1861 # so connection should be available
1862 e.connection = connection if connection else target_node
1863 self._record_command_metric(
1864 command_name=command,
1865 duration_seconds=time.monotonic() - start_time,
1866 connection=e.connection,
1867 error=e,
1868 )
1869 raise
1870 except Exception as e:
1871 if connection:
1872 connection.disconnect()
1874 # if we have a connection, use it, otherwise use the target node
1875 # object which contains the host and port info
1876 # this is used to report the metrics based on host and port info
1877 e.connection = connection if connection else target_node
1878 self._record_command_metric(
1879 command_name=command,
1880 duration_seconds=time.monotonic() - start_time,
1881 connection=e.connection,
1882 error=e,
1883 )
1884 raise e
1885 finally:
1886 if connection is not None:
1887 redis_node.connection_pool.release(connection)
1889 e = ClusterError("TTL exhausted.")
1890 # In this case we should have an active connection.
1891 # If we are here, we have received many MOVED or ASK errors and finally exhausted the TTL.
1892 # This means that we used an active connection to read from the socket.
1893 # This is used to report metrics based on the host and port information.
1894 e.connection = connection
1895 self._record_command_metric(
1896 command_name=command,
1897 duration_seconds=time.monotonic() - start_time,
1898 connection=connection,
1899 error=e,
1900 )
1901 raise e
1903 def _record_command_metric(
1904 self,
1905 command_name: str,
1906 duration_seconds: float,
1907 connection: Connection,
1908 error=None,
1909 ):
1910 """
1911 Records operation duration metric directly.
1912 """
1913 host = connection.host if connection else "unknown"
1914 port = connection.port if connection else 0
1915 db = str(connection.db) if connection and hasattr(connection, "db") else "0"
1917 record_operation_duration(
1918 command_name=command_name,
1919 duration_seconds=duration_seconds,
1920 server_address=host,
1921 server_port=port,
1922 db_namespace=db,
1923 error=error,
1924 )
1926 def _record_error_metric(
1927 self,
1928 error: Exception,
1929 connection: Connection,
1930 is_internal: bool = True,
1931 retry_attempts: Optional[int] = None,
1932 ):
1933 """
1934 Records error count metric directly.
1935 """
1936 record_error_count(
1937 server_address=connection.host,
1938 server_port=connection.port,
1939 network_peer_address=connection.host,
1940 network_peer_port=connection.port,
1941 error_type=error,
1942 retry_attempts=retry_attempts if retry_attempts is not None else 0,
1943 is_internal=is_internal,
1944 )
1946 def _extracts_socket_address(
1947 self, connection: Optional[Connection]
1948 ) -> Optional[int]:
1949 if connection is None:
1950 return None
1951 try:
1952 socket_address = (
1953 connection._sock.getsockname() if connection._sock else None
1954 )
1955 socket_address = socket_address[1] if socket_address else None
1956 except (AttributeError, OSError):
1957 pass
1958 return socket_address
1960 def close(self) -> None:
1961 try:
1962 with self._lock:
1963 if self.nodes_manager:
1964 self.nodes_manager.close()
1965 except AttributeError:
1966 # RedisCluster's __init__ can fail before nodes_manager is set
1967 pass
1969 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs):
1970 """
1971 Process the result of the executed command.
1972 The function would return a dict or a single value.
1974 :type command: str
1975 :type res: dict
1977 `res` should be in the following format:
1978 Dict<node_name, command_result>
1979 """
1980 if command in self.result_callbacks:
1981 res = self.result_callbacks[command](command, res, **kwargs)
1982 elif len(res) == 1:
1983 # When we execute the command on a single node, we can
1984 # remove the dictionary and return a single response
1985 res = list(res.values())[0]
1987 return self._policies_callback_mapping[response_policy](res)
1989 def load_external_module(self, funcname, func):
1990 """
1991 This function can be used to add externally defined redis modules,
1992 and their namespaces to the redis client.
1994 ``funcname`` - A string containing the name of the function to create
1995 ``func`` - The function, being added to this class.
1996 """
1997 setattr(self, funcname, func)
1999 def transaction(self, func, *watches, **kwargs):
2000 """
2001 Convenience method for executing the callable `func` as a transaction
2002 while watching all keys specified in `watches`. The 'func' callable
2003 should expect a single argument which is a Pipeline object.
2004 """
2005 shard_hint = kwargs.pop("shard_hint", None)
2006 value_from_callable = kwargs.pop("value_from_callable", False)
2007 watch_delay = kwargs.pop("watch_delay", None)
2008 with self.pipeline(True, shard_hint) as pipe:
2009 while True:
2010 try:
2011 if watches:
2012 pipe.watch(*watches)
2013 func_value = func(pipe)
2014 exec_value = pipe.execute()
2015 return func_value if value_from_callable else exec_value
2016 except WatchError:
2017 if watch_delay is not None and watch_delay > 0:
2018 time.sleep(watch_delay)
2019 continue
2022class ClusterNode:
2023 def __init__(self, host, port, server_type=None, redis_connection=None):
2024 if host == "localhost":
2025 host = socket.gethostbyname(host)
2027 self.host = host
2028 self.port = port
2029 self.name = get_node_name(host, port)
2030 self.server_type = server_type
2031 self.redis_connection = redis_connection
2033 def __repr__(self):
2034 return (
2035 f"[host={self.host},"
2036 f"port={self.port},"
2037 f"name={self.name},"
2038 f"server_type={self.server_type},"
2039 f"redis_connection={self.redis_connection}]"
2040 )
2042 def __eq__(self, obj):
2043 return isinstance(obj, ClusterNode) and obj.name == self.name
2045 def __hash__(self):
2046 return hash(self.name)
2049class LoadBalancingStrategy(Enum):
2050 ROUND_ROBIN = "round_robin"
2051 ROUND_ROBIN_REPLICAS = "round_robin_replicas"
2052 RANDOM = "random"
2053 RANDOM_REPLICA = "random_replica"
2056class LoadBalancer:
2057 """
2058 Round-Robin Load Balancing
2059 """
2061 def __init__(self, start_index: int = 0) -> None:
2062 self.primary_to_idx: dict[str, int] = {}
2063 self.start_index: int = start_index
2064 self._lock: threading.Lock = threading.Lock()
2066 def get_server_index(
2067 self,
2068 primary: str,
2069 list_size: int,
2070 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
2071 ) -> int:
2072 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
2073 return self._get_random_server_index(
2074 list_size,
2075 replicas_only=True,
2076 )
2077 elif load_balancing_strategy == LoadBalancingStrategy.RANDOM:
2078 return self._get_random_server_index(
2079 list_size,
2080 replicas_only=False,
2081 )
2082 else:
2083 return self._get_round_robin_index(
2084 primary,
2085 list_size,
2086 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
2087 )
2089 def reset(self) -> None:
2090 with self._lock:
2091 self.primary_to_idx.clear()
2093 def _get_random_server_index(self, list_size: int, replicas_only: bool) -> int:
2094 return random.randint(1 if replicas_only else 0, list_size - 1)
2096 def _get_round_robin_index(
2097 self, primary: str, list_size: int, replicas_only: bool
2098 ) -> int:
2099 with self._lock:
2100 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
2101 if replicas_only and server_index == 0:
2102 # skip the primary node index
2103 server_index = 1
2104 # Update the index for the next round
2105 self.primary_to_idx[primary] = (server_index + 1) % list_size
2106 return server_index
2109class NodesManager:
2110 def __init__(
2111 self,
2112 startup_nodes: list[ClusterNode],
2113 from_url=False,
2114 require_full_coverage=False,
2115 lock: Optional[threading.RLock] = None,
2116 dynamic_startup_nodes=True,
2117 connection_pool_class=ConnectionPool,
2118 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
2119 cache: Optional[CacheInterface] = None,
2120 cache_config: Optional[CacheConfig] = None,
2121 cache_factory: Optional[CacheFactoryInterface] = None,
2122 event_dispatcher: Optional[EventDispatcher] = None,
2123 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
2124 **kwargs,
2125 ):
2126 self.nodes_cache: dict[str, ClusterNode] = {}
2127 self.slots_cache: dict[int, list[ClusterNode]] = {}
2128 self.startup_nodes: dict[str, ClusterNode] = {n.name: n for n in startup_nodes}
2129 self.default_node: Optional[ClusterNode] = None
2130 self._epoch: int = 0
2131 self.from_url = from_url
2132 self._require_full_coverage = require_full_coverage
2133 self._dynamic_startup_nodes = dynamic_startup_nodes
2134 self.connection_pool_class = connection_pool_class
2135 self.address_remap = address_remap
2136 self._cache: Optional[CacheInterface] = None
2137 if cache:
2138 self._cache = cache
2139 elif cache_factory is not None:
2140 self._cache = cache_factory.get_cache()
2141 elif cache_config is not None:
2142 self._cache = CacheFactory(cache_config).get_cache()
2143 self.connection_kwargs = kwargs
2144 self.read_load_balancer = LoadBalancer()
2146 # nodes_cache / slots_cache / startup_nodes / default_node are protected by _lock
2147 if lock is None:
2148 self._lock = threading.RLock()
2149 else:
2150 self._lock = lock
2152 # initialize holds _initialization_lock to dedup multiple calls to reinitialize;
2153 # note that if we hold both _lock and _initialization_lock, we _must_ acquire
2154 # _initialization_lock first (ie: to have a consistent order) to avoid deadlock.
2155 self._initialization_lock: threading.RLock = threading.RLock()
2157 if event_dispatcher is None:
2158 self._event_dispatcher = EventDispatcher()
2159 else:
2160 self._event_dispatcher = event_dispatcher
2161 self._credential_provider = self.connection_kwargs.get(
2162 "credential_provider", None
2163 )
2164 self.maint_notifications_config = maint_notifications_config
2166 self.initialize()
2168 def get_node(
2169 self,
2170 host: Optional[str] = None,
2171 port: Optional[int] = None,
2172 node_name: Optional[str] = None,
2173 ) -> Optional[ClusterNode]:
2174 """
2175 Get the requested node from the cluster's nodes.
2176 nodes.
2177 :return: ClusterNode if the node exists, else None
2178 """
2179 if host and port:
2180 # the user passed host and port
2181 if host == "localhost":
2182 host = socket.gethostbyname(host)
2183 with self._lock:
2184 return self.nodes_cache.get(get_node_name(host=host, port=port))
2185 elif node_name:
2186 with self._lock:
2187 return self.nodes_cache.get(node_name)
2188 else:
2189 return None
2191 def move_slot(self, e: Union[AskError, MovedError]):
2192 """
2193 Update the slot's node with the redirected one
2194 """
2195 node_changed = False
2196 with self._lock:
2197 redirected_node = self.get_node(host=e.host, port=e.port)
2198 if redirected_node is not None:
2199 # The node already exists
2200 if redirected_node.server_type is not PRIMARY:
2201 # Update the node's server type
2202 redirected_node.server_type = PRIMARY
2203 else:
2204 # This is a new node, we will add it to the nodes cache
2205 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
2206 self.nodes_cache[redirected_node.name] = redirected_node
2208 slot_nodes = self.slots_cache[e.slot_id]
2209 if redirected_node not in slot_nodes:
2210 # The new slot owner is a new server, or a server from a different
2211 # shard. We need to remove all current nodes from the slot's list
2212 # (including replications) and add just the new node.
2213 self.slots_cache[e.slot_id] = [redirected_node]
2214 node_changed = True
2215 elif redirected_node is not slot_nodes[0]:
2216 # The MOVED error resulted from a failover, and the new slot owner
2217 # had previously been a replica.
2218 old_primary = slot_nodes[0]
2219 # Update the old primary to be a replica and add it to the end of
2220 # the slot's node list
2221 old_primary.server_type = REPLICA
2222 slot_nodes.append(old_primary)
2223 # Remove the old replica, which is now a primary, from the slot's
2224 # node list
2225 slot_nodes.remove(redirected_node)
2226 # Override the old primary with the new one
2227 slot_nodes[0] = redirected_node
2228 if self.default_node == old_primary:
2229 # Update the default node with the new primary
2230 self.default_node = redirected_node
2231 node_changed = True
2232 # else: circular MOVED to current primary -> no-op
2233 # Dispatch outside the lock so listeners can acquire their own locks
2234 # without risk of deadlock. Skipped on the no-op branch to avoid
2235 # needless reconciliation walks under MOVED storms. A listener must
2236 # not break slots-cache refresh; log and continue so a single buggy
2237 # listener cannot starve the rest.
2238 if node_changed:
2239 try:
2240 self._event_dispatcher.dispatch(AfterSlotsCacheRefreshEvent())
2241 except Exception as exc:
2242 # Don't shadow the method parameter ``e``: ``except as`` binds
2243 # the listener exception in the function scope and ``del``s
2244 # the name on block exit (PEP 3134), which would also wipe
2245 # out the original AskError/MovedError parameter.
2246 logger.exception(
2247 "listener raised during slots-cache refresh: %s: %s",
2248 type(exc).__name__,
2249 exc,
2250 )
2252 @deprecated_args(
2253 args_to_warn=["server_type"],
2254 reason=(
2255 "In case you need select some load balancing strategy "
2256 "that will use replicas, please set it through 'load_balancing_strategy'"
2257 ),
2258 version="5.3.0",
2259 )
2260 def get_node_from_slot(
2261 self,
2262 slot: int,
2263 read_from_replicas: bool = False,
2264 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2265 server_type: Optional[Literal["primary", "replica"]] = None,
2266 ) -> ClusterNode:
2267 """
2268 Gets a node that servers this hash slot
2269 """
2271 if read_from_replicas is True and load_balancing_strategy is None:
2272 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
2274 with self._lock:
2275 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
2276 raise SlotNotCoveredError(
2277 f'Slot "{slot}" not covered by the cluster. '
2278 + f'"require_full_coverage={self._require_full_coverage}"'
2279 )
2281 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
2282 # get the server index using the strategy defined in load_balancing_strategy
2283 primary_name = self.slots_cache[slot][0].name
2284 node_idx = self.read_load_balancer.get_server_index(
2285 primary_name, len(self.slots_cache[slot]), load_balancing_strategy
2286 )
2287 elif (
2288 server_type is None
2289 or server_type == PRIMARY
2290 or len(self.slots_cache[slot]) == 1
2291 ):
2292 # return a primary
2293 node_idx = 0
2294 else:
2295 # return a replica
2296 # randomly choose one of the replicas
2297 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
2299 return self.slots_cache[slot][node_idx]
2301 def get_nodes_by_server_type(self, server_type: Literal["primary", "replica"]):
2302 """
2303 Get all nodes with the specified server type
2304 :param server_type: 'primary' or 'replica'
2305 :return: list of ClusterNode
2306 """
2307 with self._lock:
2308 return [
2309 node
2310 for node in self.nodes_cache.values()
2311 if node.server_type == server_type
2312 ]
2314 @deprecated_function(
2315 reason="This method is not used anymore internally. The startup nodes are populated automatically.",
2316 version="7.0.2",
2317 )
2318 def populate_startup_nodes(self, nodes):
2319 """
2320 Populate all startup nodes and filters out any duplicates
2321 """
2322 with self._lock:
2323 for n in nodes:
2324 self.startup_nodes[n.name] = n
2326 def move_node_to_end_of_cached_nodes(self, node_name: str) -> None:
2327 """
2328 Move a failing node to the end of startup_nodes and nodes_cache so it's
2329 tried last during reinitialization and when selecting the default node.
2330 If the node is not in the respective list, nothing is done.
2331 """
2332 # Move in startup_nodes
2333 if node_name in self.startup_nodes and len(self.startup_nodes) > 1:
2334 node = self.startup_nodes.pop(node_name)
2335 self.startup_nodes[node_name] = node # Re-insert at end
2337 # Move in nodes_cache - this affects get_nodes_by_server_type ordering
2338 # which is used to select the default_node during initialize()
2339 if node_name in self.nodes_cache and len(self.nodes_cache) > 1:
2340 node = self.nodes_cache.pop(node_name)
2341 self.nodes_cache[node_name] = node # Re-insert at end
2343 def check_slots_coverage(self, slots_cache):
2344 # Validate if all slots are covered or if we should try next
2345 # startup node
2346 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
2347 if i not in slots_cache:
2348 return False
2349 return True
2351 def create_redis_connections(self, nodes):
2352 """
2353 This function will create a redis connection to all nodes in :nodes:
2354 """
2355 connection_pools = []
2356 for node in nodes:
2357 if node.redis_connection is None:
2358 node.redis_connection = self.create_redis_node(
2359 host=node.host,
2360 port=node.port,
2361 maint_notifications_config=self.maint_notifications_config,
2362 **self.connection_kwargs,
2363 )
2364 connection_pools.append(node.redis_connection.connection_pool)
2366 self._event_dispatcher.dispatch(
2367 AfterPooledConnectionsInstantiationEvent(
2368 connection_pools, ClientType.SYNC, self._credential_provider
2369 )
2370 )
2372 def create_redis_node(
2373 self,
2374 host,
2375 port,
2376 **kwargs,
2377 ):
2378 # We are configuring the connection pool not to retry
2379 # connections on lower level clients to avoid retrying
2380 # connections to nodes that are not reachable
2381 # and to avoid blocking the connection pool.
2382 # The only error that will have some handling in the lower
2383 # level clients is ConnectionError which will trigger disconnection
2384 # of the socket.
2385 # The retries will be handled on cluster client level
2386 # where we will have proper handling of the cluster topology
2387 node_retry_config = Retry(
2388 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
2389 )
2391 if self.from_url:
2392 # Create a redis node with a custom connection pool
2393 kwargs.update({"host": host})
2394 kwargs.update({"port": port})
2395 kwargs.update({"cache": self._cache})
2396 kwargs.update({"retry": node_retry_config})
2397 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
2398 else:
2399 r = Redis(
2400 host=host,
2401 port=port,
2402 cache=self._cache,
2403 retry=node_retry_config,
2404 **kwargs,
2405 )
2406 return r
2408 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
2409 node_name = get_node_name(host, port)
2410 # check if we already have this node in the tmp_nodes_cache
2411 target_node = tmp_nodes_cache.get(node_name)
2412 if target_node is None:
2413 # before creating a new cluster node, check if the cluster node already
2414 # exists in the current nodes cache and has a valid connection so we can
2415 # reuse it
2416 redis_connection: Optional[Redis] = None
2417 with self._lock:
2418 previous_node = self.nodes_cache.get(node_name)
2419 if previous_node:
2420 redis_connection = previous_node.redis_connection
2421 # don't update the old ClusterNode, so we don't update its role
2422 # outside of the lock
2423 target_node = ClusterNode(host, port, role, redis_connection)
2424 # add this node to the nodes cache
2425 tmp_nodes_cache[target_node.name] = target_node
2427 return target_node
2429 def _get_epoch(self) -> int:
2430 """
2431 Get the current epoch value. This method exists primarily to allow
2432 tests to mock the epoch fetch and control race condition timing.
2433 """
2434 with self._lock:
2435 return self._epoch
2437 def initialize(
2438 self,
2439 additional_startup_nodes_info: Optional[List[Tuple[str, int]]] = None,
2440 disconnect_startup_nodes_pools: bool = True,
2441 last_failed_node_name: Optional[str] = None,
2442 ):
2443 """
2444 Initializes the nodes cache, slots cache and redis connections.
2445 :startup_nodes:
2446 Responsible for discovering other nodes in the cluster
2447 :disconnect_startup_nodes_pools:
2448 Whether to disconnect the connection pool of the startup nodes
2449 after the initialization is complete. This is useful when the
2450 startup nodes are not part of the cluster and we want to avoid
2451 keeping the connection open.
2452 :additional_startup_nodes_info:
2453 Additional nodes to add temporarily to the startup nodes.
2454 The additional nodes will be used just in the process of extraction of the slots
2455 and nodes information from the cluster.
2456 This is useful when we want to add new nodes to the cluster
2457 and initialize the client
2458 with them.
2459 The format of the list is a list of tuples, where each tuple contains
2460 the host and port of the node.
2461 :last_failed_node_name:
2462 Name of the node that just failed and should be tried only after
2463 other startup and additional startup nodes during this refresh.
2464 """
2465 self.reset()
2466 tmp_nodes_cache = {}
2467 tmp_slots = {}
2468 disagreements = []
2469 startup_nodes_reachable = False
2470 fully_covered = False
2471 kwargs = self.connection_kwargs
2472 exception = None
2473 epoch = self._get_epoch()
2474 if additional_startup_nodes_info is None:
2475 additional_startup_nodes_info = []
2477 with self._initialization_lock:
2478 with self._lock:
2479 if epoch != self._epoch:
2480 # another thread has already re-initialized the nodes; don't
2481 # bother running again
2482 return
2484 with self._lock:
2485 startup_nodes = list(self.startup_nodes.values())
2486 deferred_failed_nodes = []
2487 if last_failed_node_name is not None:
2488 for index, node in enumerate(startup_nodes):
2489 if node.name == last_failed_node_name:
2490 deferred_failed_nodes.append(startup_nodes.pop(index))
2491 break
2492 if len(startup_nodes) > 1:
2493 # Vary which startup node is queried first so clients do not
2494 # all reinitialize through the same node.
2495 random.shuffle(startup_nodes)
2497 additional_startup_nodes = [
2498 ClusterNode(host, port) for host, port in additional_startup_nodes_info
2499 ]
2500 if last_failed_node_name is not None:
2501 for index, node in enumerate(additional_startup_nodes):
2502 if node.name == last_failed_node_name:
2503 if not deferred_failed_nodes:
2504 deferred_failed_nodes.append(node)
2505 additional_startup_nodes.pop(index)
2506 break
2507 if is_debug_log_enabled():
2508 logger.debug(
2509 f"Topology refresh: using additional nodes: {[node.name for node in additional_startup_nodes]}; "
2510 f"and startup nodes: {[node.name for node in startup_nodes]}"
2511 )
2513 for startup_node in chain(
2514 startup_nodes,
2515 additional_startup_nodes,
2516 deferred_failed_nodes,
2517 ):
2518 try:
2519 if startup_node.redis_connection:
2520 r = startup_node.redis_connection
2522 else:
2523 # Create a new Redis connection
2524 if is_debug_log_enabled():
2525 socket_timeout = kwargs.get("socket_timeout", "not set")
2526 socket_connect_timeout = kwargs.get(
2527 "socket_connect_timeout", "not set"
2528 )
2529 maint_enabled = (
2530 self.maint_notifications_config.enabled
2531 if self.maint_notifications_config
2532 else False
2533 )
2534 logger.debug(
2535 "Topology refresh: Creating new Redis connection to "
2536 f"{startup_node.host}:{startup_node.port}; "
2537 f"with socket_timeout: {socket_timeout}, and "
2538 f"socket_connect_timeout: {socket_connect_timeout}, "
2539 "and maint_notifications enabled: "
2540 f"{maint_enabled}"
2541 )
2542 r = self.create_redis_node(
2543 startup_node.host,
2544 startup_node.port,
2545 maint_notifications_config=self.maint_notifications_config,
2546 **kwargs,
2547 )
2548 if startup_node in self.startup_nodes.values():
2549 self.startup_nodes[startup_node.name].redis_connection = r
2550 else:
2551 startup_node.redis_connection = r
2552 try:
2553 # Make sure cluster mode is enabled on this node
2554 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
2555 if disconnect_startup_nodes_pools:
2556 with r.connection_pool._lock:
2557 # take care to clear connections before we move on
2558 # mark all active connections for reconnect - they will be
2559 # reconnected on next use, but will allow current in flight commands to complete first
2560 r.connection_pool.update_active_connections_for_reconnect()
2561 # Needed to clear READONLY state when it is no longer applicable
2562 r.connection_pool.disconnect_free_connections()
2563 except ResponseError:
2564 raise RedisClusterException(
2565 "Cluster mode is not enabled on this node"
2566 )
2567 startup_nodes_reachable = True
2568 except Exception as e:
2569 # Try the next startup node.
2570 # The exception is saved and raised only if we have no more nodes.
2571 exception = e
2572 continue
2574 # CLUSTER SLOTS command results in the following output:
2575 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
2576 # where each node contains the following list: [IP, port, node_id]
2577 # Therefore, cluster_slots[0][2][0] will be the IP address of the
2578 # primary node of the first slot section.
2579 # If there's only one server in the cluster, its ``host`` is ''
2580 # Fix it to the host in startup_nodes
2581 if (
2582 len(cluster_slots) == 1
2583 and len(cluster_slots[0][2][0]) == 0
2584 and len(self.startup_nodes) == 1
2585 ):
2586 cluster_slots[0][2][0] = startup_node.host
2588 for slot in cluster_slots:
2589 primary_node = slot[2]
2590 host = str_if_bytes(primary_node[0])
2591 if host == "":
2592 host = startup_node.host
2593 port = int(primary_node[1])
2594 host, port = self.remap_host_port(host, port)
2596 nodes_for_slot = []
2598 target_node = self._get_or_create_cluster_node(
2599 host, port, PRIMARY, tmp_nodes_cache
2600 )
2601 nodes_for_slot.append(target_node)
2603 replica_nodes = slot[3:]
2604 for replica_node in replica_nodes:
2605 host = str_if_bytes(replica_node[0])
2606 port = int(replica_node[1])
2607 host, port = self.remap_host_port(host, port)
2608 target_replica_node = self._get_or_create_cluster_node(
2609 host, port, REPLICA, tmp_nodes_cache
2610 )
2611 nodes_for_slot.append(target_replica_node)
2613 for i in range(int(slot[0]), int(slot[1]) + 1):
2614 if i not in tmp_slots:
2615 tmp_slots[i] = nodes_for_slot
2616 else:
2617 # Validate that 2 nodes want to use the same slot cache
2618 # setup
2619 tmp_slot = tmp_slots[i][0]
2620 if tmp_slot.name != target_node.name:
2621 disagreements.append(
2622 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
2623 )
2625 if len(disagreements) > 5:
2626 raise RedisClusterException(
2627 f"startup_nodes could not agree on a valid "
2628 f"slots cache: {', '.join(disagreements)}"
2629 )
2631 fully_covered = self.check_slots_coverage(tmp_slots)
2632 if fully_covered:
2633 # Don't need to continue to the next startup node if all
2634 # slots are covered
2635 break
2637 if not startup_nodes_reachable:
2638 raise RedisClusterException(
2639 f"Redis Cluster cannot be connected. Please provide at least "
2640 f"one reachable node: {str(exception)}"
2641 ) from exception
2643 # Create Redis connections to all nodes
2644 self.create_redis_connections(list(tmp_nodes_cache.values()))
2646 # Check if the slots are not fully covered
2647 if not fully_covered and self._require_full_coverage:
2648 # Despite the requirement that the slots be covered, there
2649 # isn't a full coverage
2650 raise RedisClusterException(
2651 f"All slots are not covered after query all startup_nodes. "
2652 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
2653 f"covered..."
2654 )
2656 # Set the tmp variables to the real variables
2657 with self._lock:
2658 self.nodes_cache = tmp_nodes_cache
2659 self.slots_cache = tmp_slots
2660 # Set the default node
2661 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
2662 if self._dynamic_startup_nodes:
2663 # Populate the startup nodes with all discovered nodes
2664 self.startup_nodes = tmp_nodes_cache
2665 # Increment the epoch to signal that initialization has completed
2666 self._epoch += 1
2667 # Dispatch so listeners (e.g. ClusterPubSub) can reconcile per-node
2668 # state after slot ownership may have changed. A listener must not
2669 # break slots-cache refresh; log and continue so a single buggy
2670 # listener cannot starve the rest.
2671 try:
2672 self._event_dispatcher.dispatch(AfterSlotsCacheRefreshEvent())
2673 except Exception as e:
2674 logger.exception(
2675 "listener raised during slots-cache refresh: %s: %s",
2676 type(e).__name__,
2677 e,
2678 )
2680 def close(self) -> None:
2681 with self._lock:
2682 self.default_node = None
2683 nodes = tuple(self.nodes_cache.values())
2684 for node in nodes:
2685 if node.redis_connection:
2686 node.redis_connection.close()
2688 def reset(self):
2689 try:
2690 self.read_load_balancer.reset()
2691 except TypeError:
2692 # The read_load_balancer is None, do nothing
2693 pass
2695 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
2696 """
2697 Remap the host and port returned from the cluster to a different
2698 internal value. Useful if the client is not connecting directly
2699 to the cluster.
2700 """
2701 if self.address_remap:
2702 return self.address_remap((host, port))
2703 return host, port
2705 def find_connection_owner(self, connection: Connection) -> Optional[ClusterNode]:
2706 node_name = get_node_name(connection.host, connection.port)
2707 with self._lock:
2708 for node in tuple(self.nodes_cache.values()):
2709 if node.redis_connection:
2710 conn_args = node.redis_connection.connection_pool.connection_kwargs
2711 if node_name == get_node_name(
2712 conn_args.get("host"), conn_args.get("port")
2713 ):
2714 return node
2715 return None
2718def _unregister_slots_cache_listener(
2719 dispatcher_ref: "weakref.ref[EventDispatcher]",
2720 listener: EventListenerInterface,
2721 event_type: Type[object],
2722) -> None:
2723 # Module-level finalizer callback. Kept free of strong references to the
2724 # owning ClusterPubSub so attaching it via weakref.finalize does not
2725 # extend the pubsub's lifetime.
2726 dispatcher = dispatcher_ref()
2727 if dispatcher is not None:
2728 dispatcher.unregister_listeners({event_type: [listener]})
2731class ClusterPubSubSlotsCacheListener(EventListenerInterface):
2732 """
2733 Listener that forwards AfterSlotsCacheRefreshEvent to a ClusterPubSub.
2735 Holds a weak reference to the pubsub so it does not keep the instance
2736 alive. Deterministic cleanup of the dispatcher's strong reference to this
2737 listener is performed by a ``weakref.finalize`` attached to the owning
2738 ClusterPubSub in ``ClusterPubSub.__init__``.
2739 """
2741 def __init__(self, pubsub: "ClusterPubSub") -> None:
2742 self._pubsub_ref: "weakref.ref[ClusterPubSub]" = weakref.ref(pubsub)
2744 def listen(self, event: object) -> None:
2745 pubsub = self._pubsub_ref()
2746 if pubsub is None:
2747 # Race window between pubsub GC and the finalizer running; safe
2748 # no-op, finalizer will remove this listener shortly.
2749 return
2750 try:
2751 pubsub.on_slots_changed()
2752 except Exception as e:
2753 # Listeners must not break slots-cache refresh; log and continue so
2754 # a single buggy pubsub cannot starve the rest.
2755 logger.exception(
2756 "pubsub %r raised during slots-cache change: %s: %s",
2757 pubsub,
2758 type(e).__name__,
2759 e,
2760 )
2763class ClusterPubSub(PubSub):
2764 """
2765 Wrapper for PubSub class.
2767 IMPORTANT: before using ClusterPubSub, read about the known limitations
2768 with pubsub in Cluster mode and learn how to workaround them:
2769 https://redis.readthedocs.io/en/stable/clustering.html#known-pubsub-limitations
2770 """
2772 def __init__(
2773 self,
2774 redis_cluster,
2775 node=None,
2776 host=None,
2777 port=None,
2778 push_handler_func=None,
2779 event_dispatcher: Optional["EventDispatcher"] = None,
2780 **kwargs,
2781 ):
2782 """
2783 When a pubsub instance is created without specifying a node, a single
2784 node will be transparently chosen for the pubsub connection on the
2785 first command execution. The node will be determined by:
2786 1. Hashing the channel name in the request to find its keyslot
2787 2. Selecting a node that handles the keyslot: If read_from_replicas is
2788 set to true or load_balancing_strategy is set, a replica can be selected.
2790 :type redis_cluster: RedisCluster
2791 :type node: ClusterNode
2792 :type host: str
2793 :type port: int
2794 """
2795 self.node = None
2796 self.set_pubsub_node(redis_cluster, node, host, port)
2797 connection_pool = (
2798 None
2799 if self.node is None
2800 else redis_cluster.get_redis_connection(self.node).connection_pool
2801 )
2802 self.cluster = redis_cluster
2803 self.node_pubsub_mapping = {}
2804 # Reverse index: shard channel (normalized) -> owning node.name. Used to
2805 # route sunsubscribe calls and reconcile subscriptions after slot
2806 # migration / failover.
2807 self._shard_channel_to_node: dict = {}
2808 # Dedicated lock for shard-subscription bookkeeping. Distinct from
2809 # PubSub.self._lock (which serializes wire I/O on the cluster-level
2810 # connection used by aclose / send_command / regular subscribe) so
2811 # that reconciliation cannot starve those unrelated paths during
2812 # long per-channel migrations.
2813 self._shard_state_lock: threading.RLock = threading.RLock()
2814 # Worker executor for off-loading slot-migration reconciliation from
2815 # the dispatch call site (mirrors async's asyncio.create_task model so
2816 # the thread that triggered MovedError / topology refresh is not
2817 # blocked on per-channel sunsubscribe / ssubscribe network I/O).
2818 # Lazy-created on first on_slots_changed() to avoid a persistent
2819 # worker thread for pubsubs that never see a slot migration.
2820 # Initialized before super().__init__() because PubSub.__init__ calls
2821 # self.reset(), which resolves to ClusterPubSub.reset() and reads
2822 # these attributes.
2823 self._reconcile_executor: Optional[ThreadPoolExecutor] = None
2824 # In-flight reconciliation futures; tracked so reset() can cancel
2825 # pending work and so exceptions surface via a done-callback.
2826 self._reconcile_futures: Set[Future] = set()
2827 self._pubsubs_generator = self._pubsubs_generator()
2828 if event_dispatcher is None:
2829 self._event_dispatcher = EventDispatcher()
2830 else:
2831 self._event_dispatcher = event_dispatcher
2832 super().__init__(
2833 connection_pool=connection_pool,
2834 encoder=redis_cluster.encoder,
2835 push_handler_func=push_handler_func,
2836 event_dispatcher=self._event_dispatcher,
2837 **kwargs,
2838 )
2839 # Subscribe to slots-cache change notifications so shard subscriptions
2840 # can be reconciled automatically after topology refreshes.
2841 nm_dispatcher = redis_cluster.nodes_manager._event_dispatcher
2842 self._slots_cache_listener = ClusterPubSubSlotsCacheListener(self)
2843 nm_dispatcher.register_listeners(
2844 {AfterSlotsCacheRefreshEvent: [self._slots_cache_listener]}
2845 )
2846 # Deterministic GC-time cleanup so short-lived pubsubs do not leak
2847 # listeners in the dispatcher when no slots-refresh event ever fires.
2848 weakref.finalize(
2849 self,
2850 _unregister_slots_cache_listener,
2851 weakref.ref(nm_dispatcher),
2852 self._slots_cache_listener,
2853 AfterSlotsCacheRefreshEvent,
2854 )
2856 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
2857 """
2858 The pubsub node will be set according to the passed node, host and port
2859 When none of the node, host, or port are specified - the node is set
2860 to None and will be determined by the keyslot of the channel in the
2861 first command to be executed.
2862 RedisClusterException will be thrown if the passed node does not exist
2863 in the cluster.
2864 If host is passed without port, or vice versa, a DataError will be
2865 thrown.
2866 :type cluster: RedisCluster
2867 :type node: ClusterNode
2868 :type host: str
2869 :type port: int
2870 """
2871 if node is not None:
2872 # node is passed by the user
2873 self._raise_on_invalid_node(cluster, node, node.host, node.port)
2874 pubsub_node = node
2875 elif host is not None and port is not None:
2876 # host and port passed by the user
2877 node = cluster.get_node(host=host, port=port)
2878 self._raise_on_invalid_node(cluster, node, host, port)
2879 pubsub_node = node
2880 elif any([host, port]) is True:
2881 # only 'host' or 'port' passed
2882 raise DataError("Passing a host requires passing a port, and vice versa")
2883 else:
2884 # nothing passed by the user. set node to None
2885 pubsub_node = None
2887 self.node = pubsub_node
2889 def get_pubsub_node(self):
2890 """
2891 Get the node that is being used as the pubsub connection
2892 """
2893 return self.node
2895 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
2896 """
2897 Raise a RedisClusterException if the node is None or doesn't exist in
2898 the cluster.
2899 """
2900 if node is None or redis_cluster.get_node(node_name=node.name) is None:
2901 raise RedisClusterException(
2902 f"Node {host}:{port} doesn't exist in the cluster"
2903 )
2905 def execute_command(self, *args):
2906 """
2907 Execute a subscribe/unsubscribe command.
2909 Taken code from redis-py and tweak to make it work within a cluster.
2910 """
2911 # NOTE: don't parse the response in this function -- it could pull a
2912 # legitimate message off the stack if the connection is already
2913 # subscribed to one or more channels
2915 if self.connection is None:
2916 if self.connection_pool is None:
2917 if len(args) > 1:
2918 # Hash the first channel and get one of the nodes holding
2919 # this slot
2920 channel = args[1]
2921 slot = self.cluster.keyslot(channel)
2922 node = self.cluster.nodes_manager.get_node_from_slot(
2923 slot,
2924 self.cluster.read_from_replicas,
2925 self.cluster.load_balancing_strategy,
2926 )
2927 else:
2928 # Get a random node
2929 node = self.cluster.get_random_node()
2930 self.node = node
2931 redis_connection = self.cluster.get_redis_connection(node)
2932 self.connection_pool = redis_connection.connection_pool
2933 self.connection = self.connection_pool.get_connection()
2934 # register a callback that re-subscribes to any channels we
2935 # were listening to when we were disconnected
2936 self.connection.register_connect_callback(self.on_connect)
2937 if self.push_handler_func is not None:
2938 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
2939 self._event_dispatcher.dispatch(
2940 AfterPubSubConnectionInstantiationEvent(
2941 self.connection, self.connection_pool, ClientType.SYNC, self._lock
2942 )
2943 )
2944 connection = self.connection
2945 self._execute(connection, connection.send_command, *args)
2947 def _resubscribe_shard_channels(self) -> None:
2948 # A single node can own multiple slot ranges, so a batched
2949 # ``SSUBSCRIBE`` covering every tracked channel would be rejected by
2950 # Redis with a ``CROSSSLOT`` error. Group by hash slot and emit one
2951 # ``SSUBSCRIBE`` per slot.
2952 by_slot: defaultdict[int, dict] = defaultdict(dict)
2953 for k, v in self.shard_channels.items():
2954 by_slot[key_slot(self.encoder.encode(k))][k] = v
2955 for subscriptions in by_slot.values():
2956 self._resubscribe(subscriptions, self.ssubscribe)
2958 def _get_node_pubsub(self, node):
2959 try:
2960 return self.node_pubsub_mapping[node.name]
2961 except KeyError:
2962 redis_connection = self.cluster.get_redis_connection(node)
2963 pubsub = redis_connection.pubsub(
2964 push_handler_func=self.push_handler_func,
2965 )
2966 # Replay shard subscriptions on reconnect with slot-aware grouping
2967 # so that channels spanning multiple slots owned by this node do
2968 # not trigger a CROSSSLOT error.
2969 pubsub._resubscribe_shard_channels = MethodType(
2970 ClusterPubSub._resubscribe_shard_channels, pubsub
2971 )
2972 self.node_pubsub_mapping[node.name] = pubsub
2973 return pubsub
2975 def _find_node_name_for_pubsub(self, pubsub):
2976 for node_name, node_pubsub in self.node_pubsub_mapping.items():
2977 if node_pubsub is pubsub:
2978 return node_name
2979 return None
2981 def _sharded_message_generator(self, timeout=0.0):
2982 for _ in range(len(self.node_pubsub_mapping)):
2983 pubsub = next(self._pubsubs_generator)
2984 # Don't pass ignore_subscribe_messages here - let get_sharded_message
2985 # handle the filtering after processing subscription state changes
2986 message = pubsub.get_message(
2987 ignore_subscribe_messages=False, timeout=timeout
2988 )
2989 if message is not None:
2990 return pubsub, message
2991 return None, None
2993 def _pubsubs_generator(self):
2994 while True:
2995 current_nodes = list(self.node_pubsub_mapping.values())
2996 if not current_nodes:
2997 return # Avoid infinite loop when no subscriptions exist
2998 yield from current_nodes
3000 def get_sharded_message(
3001 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
3002 ):
3003 if target_node:
3004 # Use .get(): migration-driven cleanup in the sunsubscribe branch
3005 # below and reset() both remove entries from node_pubsub_mapping,
3006 # so a caller polling with target_node may race the cleanup. Match
3007 # the async counterpart's None-handling rather than raising
3008 # KeyError. None pubsub falls through to "no message available".
3009 pubsub = self.node_pubsub_mapping.get(target_node.name)
3010 if pubsub is not None:
3011 # Don't pass ignore_subscribe_messages here - let get_sharded_message
3012 # handle the filtering after processing subscription state changes
3013 message = pubsub.get_message(
3014 ignore_subscribe_messages=False, timeout=timeout
3015 )
3016 else:
3017 message = None
3018 else:
3019 pubsub, message = self._sharded_message_generator(timeout=timeout)
3020 if message is None:
3021 return None
3022 # Only sunsubscribe mutates cluster-level shard state; bypassing the
3023 # lock on the data-message hot path keeps smessage delivery from
3024 # competing with the reconciliation worker for _shard_state_lock.
3025 if str_if_bytes(message["type"]) == "sunsubscribe":
3026 # Serialize state mutation against reinitialize_shard_subscriptions
3027 # (worker thread). The blocking get_message above intentionally
3028 # runs outside the lock so reconciliation is not stalled by long
3029 # polls.
3030 with self._shard_state_lock:
3031 if message["channel"] in self.pending_unsubscribe_shard_channels:
3032 # User-initiated sunsubscribe: drop from cluster-level tracking.
3033 self.pending_unsubscribe_shard_channels.remove(message["channel"])
3034 self.shard_channels.pop(message["channel"], None)
3035 self._shard_channel_to_node.pop(message["channel"], None)
3036 # Drop the per-node pubsub that delivered the confirmation once
3037 # it no longer holds any shard subscriptions, regardless of
3038 # whether the sunsubscribe was user-initiated or driven by
3039 # slot-migration reconciliation (_migrate_shard_channel, which
3040 # intentionally does not add the channel to
3041 # pending_unsubscribe_shard_channels). This releases the
3042 # dedicated connection that would otherwise linger.
3043 # Identifying the receiving pubsub directly (rather than via
3044 # the cluster's current slot map) is required after slot
3045 # migration, where the channel's owner is no longer the node
3046 # that received our original SSUBSCRIBE.
3047 if pubsub is not None and not pubsub.subscribed:
3048 name = self._find_node_name_for_pubsub(pubsub)
3049 if name is not None:
3050 try:
3051 pubsub.reset()
3052 except Exception:
3053 pass
3054 self.node_pubsub_mapping.pop(name, None)
3055 # Mirror PubSub.handle_message: the empty-check belongs in the
3056 # unsubscribe branch since that is the only path that can
3057 # reduce shard_channels here.
3058 if not self.channels and not self.patterns and not self.shard_channels:
3059 self.subscribed_event.clear()
3060 # Only suppress subscribe/unsubscribe messages, not data messages (smessage)
3061 if str_if_bytes(message["type"]) in ("ssubscribe", "sunsubscribe"):
3062 if self.ignore_subscribe_messages or ignore_subscribe_messages:
3063 return None
3064 return message
3066 def ssubscribe(
3067 self, *args: ChannelT | Subscription, **kwargs: PubSubHandler
3068 ) -> None:
3069 """
3070 Subscribe to shard channels.
3072 Channels supplied as keyword arguments expect a channel name as the key
3073 and a callable as the value. ``Subscription`` objects can also be
3074 supplied positionally with an optional handler.
3075 """
3076 s_channels = parse_pubsub_subscriptions(args, kwargs)
3077 # Serialize against reinitialize_shard_subscriptions (worker thread)
3078 # so the reverse index, shard_channels, and node_pubsub_mapping are
3079 # not mutated concurrently.
3080 with self._shard_state_lock:
3081 for s_channel, handler in s_channels.items():
3082 node = self.cluster.get_node_from_key(s_channel)
3083 if not node:
3084 continue
3085 # Lazy re-route: if this channel is already tracked against a
3086 # different node (e.g. after a slot migration), migrate it now
3087 # so the caller's intent is applied on the current owner.
3088 normalized_key = next(iter(self._normalize_keys({s_channel: None})))
3089 old_name = self._shard_channel_to_node.get(normalized_key)
3090 if old_name and old_name != node.name:
3091 # Match PubSub.ssubscribe() dict.update() semantics: the
3092 # caller's newly supplied handler (including None) always
3093 # overrides any previously registered handler.
3094 self._migrate_shard_channel(
3095 normalized_key,
3096 handler,
3097 old_name,
3098 node,
3099 )
3100 continue
3101 pubsub = self._get_node_pubsub(node)
3102 if handler:
3103 pubsub.ssubscribe(Subscription(s_channel, handler))
3104 else:
3105 pubsub.ssubscribe(s_channel)
3106 self.shard_channels.update(pubsub.shard_channels)
3107 self._shard_channel_to_node[normalized_key] = node.name
3108 self.pending_unsubscribe_shard_channels.difference_update(
3109 self._normalize_keys({s_channel: None})
3110 )
3111 if pubsub.subscribed and not self.subscribed:
3112 self.subscribed_event.set()
3113 self.health_check_response_counter = 0
3115 def sunsubscribe(self, *args):
3116 if args:
3117 args = list_or_args(args[0], args[1:])
3118 else:
3119 args = list(self.shard_channels)
3121 # Serialize against reinitialize_shard_subscriptions: the reverse
3122 # index and node_pubsub_mapping must not change between the lookup
3123 # and the per-node sunsubscribe call below.
3124 with self._shard_state_lock:
3125 for s_channel in args:
3126 normalized_key = next(iter(self._normalize_keys({s_channel: None})))
3127 # Route via the reverse index so we unsubscribe on the node
3128 # that actually holds the subscription. After a slot migration
3129 # the cluster's current owner may no longer be that node.
3130 name = self._shard_channel_to_node.get(normalized_key)
3131 if name and name in self.node_pubsub_mapping:
3132 p = self.node_pubsub_mapping[name]
3133 else:
3134 node = self.cluster.get_node_from_key(s_channel)
3135 if not node or node.name not in self.node_pubsub_mapping:
3136 continue
3137 p = self.node_pubsub_mapping[node.name]
3138 p.sunsubscribe(s_channel)
3139 self.pending_unsubscribe_shard_channels.update(
3140 p.pending_unsubscribe_shard_channels
3141 )
3143 def reinitialize_shard_subscriptions(self):
3144 """
3145 Reconcile per-node shard subscriptions against the cluster's current
3146 slot ownership map. For each tracked shard channel whose owning node
3147 has changed (e.g. after CLUSTER SETSLOT / failover), sunsubscribe on
3148 the old node's pubsub and ssubscribe on the new owner's pubsub,
3149 preserving any registered handler.
3150 """
3151 uncovered: list = []
3152 made_progress = False
3153 first_migrate_error: Optional[BaseException] = None
3154 with self._shard_state_lock:
3155 for channel, handler in list(self.shard_channels.items()):
3156 try:
3157 new_node = self.cluster.get_node_from_key(channel)
3158 except SlotNotCoveredError:
3159 # Slot is transiently uncovered (mid-migration / partial
3160 # topology refresh). Defer this channel so coverable
3161 # siblings still reconcile this pass; we surface the
3162 # error below so the caller (and logs) know not every
3163 # channel was reconciled. Retry happens on the next
3164 # slots-cache change notification.
3165 uncovered.append(channel)
3166 continue
3167 old_name = self._shard_channel_to_node.get(channel)
3168 if old_name == new_node.name:
3169 continue
3170 try:
3171 self._migrate_shard_channel(channel, handler, old_name, new_node)
3172 made_progress = True
3173 except (ConnectionError, TimeoutError, OSError) as e:
3174 # Transient connectivity error while subscribing on the
3175 # new owner (or unsubscribing on the old owner if its
3176 # handler chose to re-raise). Do not abort reconciliation
3177 # for sibling channels: _shard_channel_to_node was not
3178 # advanced for this channel, so the next slots-cache
3179 # change notification will retry it.
3180 logger.warning(
3181 "shard channel %r migration deferred: %s: %s",
3182 channel,
3183 type(e).__name__,
3184 e,
3185 )
3186 if first_migrate_error is None:
3187 first_migrate_error = e
3188 continue
3189 # Garbage-collect per-node pubsubs that no longer hold any
3190 # subscription so their connections are released.
3191 for name, pubsub in list(self.node_pubsub_mapping.items()):
3192 if not pubsub.subscribed:
3193 try:
3194 pubsub.reset()
3195 except Exception:
3196 pass
3197 self.node_pubsub_mapping.pop(name, None)
3198 if uncovered:
3199 # Surface the uncovered channels so the caller (and observer
3200 # notification path) knows reconciliation was incomplete. All
3201 # coverable siblings have already been migrated above.
3202 raise SlotNotCoveredError(
3203 f"{len(uncovered)} shard channel(s) left unreconciled; "
3204 f"slot(s) not covered by the cluster: {uncovered!r}"
3205 )
3206 if first_migrate_error is not None and not made_progress:
3207 # Every migration attempted in this pass failed transiently and
3208 # nothing else made progress. Re-raise the first caught error
3209 # (typically the root cause; later failures are often downstream
3210 # symptoms of the same unreachable node) so the worker's done-
3211 # callback surfaces a single representative failure through the
3212 # same logger channel used for SlotNotCoveredError. Per-channel
3213 # WARNINGs above preserve the full forensic detail.
3214 raise first_migrate_error
3216 def _migrate_shard_channel(self, channel, handler, old_name, new_node):
3217 # Detach from the old per-node pubsub, best-effort: the old node may
3218 # already be unreachable during migration / failover.
3219 if old_name and old_name in self.node_pubsub_mapping:
3220 old_pubsub = self.node_pubsub_mapping[old_name]
3221 try:
3222 old_pubsub.sunsubscribe(channel)
3223 except (ConnectionError, TimeoutError, OSError):
3224 # redis-py's Connection has already called ``disconnect()``
3225 # before raising (see Connection.read_response /
3226 # send_packed_command with ``disconnect_on_error=True``),
3227 # so ``old_pubsub``'s dedicated socket is gone. Two cases:
3228 #
3229 # 1. The old node is no longer in the cluster topology
3230 # (e.g. removed by failover / topology refresh): no
3231 # reconnect target exists, so ``old_pubsub.subscribed``
3232 # would stay True forever and the end-of-pass GC block
3233 # would skip it. Drop it eagerly so the round-robin
3234 # generator does not keep yielding a dead pubsub that
3235 # produces periodic errors from ``get_sharded_message``.
3236 # 2. The old node is still known (transiently slow /
3237 # unreachable): ``PubSub._execute`` auto-reconnects and
3238 # ``on_connect`` re-subscribes to remaining channels,
3239 # so other subscriptions on the same pubsub recover
3240 # naturally. Leave it alone.
3241 if self.cluster.get_node(node_name=old_name) is None:
3242 try:
3243 old_pubsub.reset()
3244 except Exception:
3245 pass
3246 self.node_pubsub_mapping.pop(old_name, None)
3247 # Attach to the new per-node pubsub, preserving the handler. Decode to
3248 # a text key only when we must pass it as a kwarg (handler present).
3249 new_pubsub = self._get_node_pubsub(new_node)
3250 if handler:
3251 new_pubsub.ssubscribe(Subscription(channel, handler))
3252 else:
3253 new_pubsub.ssubscribe(channel)
3254 self.shard_channels.update(new_pubsub.shard_channels)
3255 normalized_key = next(iter(self._normalize_keys({channel: None})))
3256 self._shard_channel_to_node[normalized_key] = new_node.name
3257 self.pending_unsubscribe_shard_channels.difference_update(
3258 self._normalize_keys({channel: None})
3259 )
3260 if new_pubsub.subscribed and not self.subscribed:
3261 self.subscribed_event.set()
3262 self.health_check_response_counter = 0
3264 def on_slots_changed(self):
3265 # Observer hook invoked by NodesManager after a slots-cache refresh.
3266 # Schedule reconciliation on a dedicated worker thread so the caller
3267 # (typically MovedError handling in _execute_command or the topology
3268 # refresh thread in initialize()) is not blocked on the network I/O
3269 # performed by reinitialize_shard_subscriptions. Mirrors the async
3270 # path's asyncio.create_task model. No-op when there are no shard
3271 # subscriptions to reconcile.
3272 if not self.shard_channels:
3273 return
3274 # Serialize lazy executor creation and submission against concurrent
3275 # on_slots_changed calls (EventDispatcher releases its lock before
3276 # invoking listeners, so two MovedError-handling threads can land
3277 # here at once) and against reset() which tears the executor down.
3278 # Without this, two threads could each create a ThreadPoolExecutor
3279 # and one would be orphaned (leaking its worker thread); a reset()
3280 # interleaved between create and submit() could also raise
3281 # RuntimeError("cannot schedule new futures after shutdown").
3282 with self._shard_state_lock:
3283 if self._reconcile_executor is None:
3284 self._reconcile_executor = ThreadPoolExecutor(
3285 max_workers=1,
3286 thread_name_prefix="redis-cluster-pubsub-reconcile",
3287 )
3288 future = self._reconcile_executor.submit(
3289 self.reinitialize_shard_subscriptions
3290 )
3291 self._reconcile_futures.add(future)
3292 future.add_done_callback(self._discard_reconcile_future)
3293 # Consume the future's exception (if any) so it is not silently lost.
3294 # reinitialize_shard_subscriptions surfaces SlotNotCoveredError when
3295 # a slot is still transiently uncovered; route it through the same
3296 # logger channel as the async path for consistent observability.
3297 future.add_done_callback(self._log_reconcile_future_exception)
3299 def _discard_reconcile_future(self, future: "Future") -> None:
3300 # Done-callback fires on the worker thread. Take _shard_state_lock so
3301 # the discard observes the same mutual-exclusion discipline as the
3302 # add() / clear() sites; without it the set mutation is correct only
3303 # because of CPython's GIL and would race under free-threaded builds.
3304 with self._shard_state_lock:
3305 self._reconcile_futures.discard(future)
3307 @staticmethod
3308 def _log_reconcile_future_exception(future: "Future") -> None:
3309 if future.cancelled():
3310 return
3311 exc = future.exception()
3312 if exc is not None:
3313 logger.error(
3314 "shard subscription reconciliation failed: %r", exc, exc_info=exc
3315 )
3317 def reset(self) -> None:
3318 # Hold _shard_state_lock across the entire teardown so it observes
3319 # the same mutual-exclusion discipline as ssubscribe / sunsubscribe /
3320 # get_sharded_message / reinitialize_shard_subscriptions, which all
3321 # mutate shard_channels, _shard_channel_to_node, and
3322 # node_pubsub_mapping under this lock. Without it, super().reset()
3323 # rebinds shard_channels and pending_unsubscribe_shard_channels in
3324 # parallel with a concurrent user-thread mutation, silently dropping
3325 # subscription intent. cancel_futures drops queued reconciliation
3326 # work; the currently-running task (if any) is already serialized
3327 # against us by this same lock - shutdown(wait=False) avoids waiting
3328 # on the worker thread's join, not on its critical section.
3329 with self._shard_state_lock:
3330 if self._reconcile_executor is not None:
3331 self._reconcile_executor.shutdown(wait=False, cancel_futures=True)
3332 self._reconcile_executor = None
3333 self._reconcile_futures.clear()
3334 # Tear down per-node pubsubs (parity with async aclose) so they
3335 # don't leak their dedicated connections and don't replay stale
3336 # shard_channels via PubSub.on_connect on a subsequent reconnect.
3337 # Errors are swallowed because reset() is also a fallback path
3338 # from __del__; we cannot let one buggy per-node pubsub mask the
3339 # rest of the teardown.
3340 for pubsub in self.node_pubsub_mapping.values():
3341 try:
3342 pubsub.reset()
3343 except Exception:
3344 pass
3345 # Drop the now-dead per-node pubsubs from the mapping so the
3346 # round-robin in _pubsubs_generator / _sharded_message_generator
3347 # cannot yield them between teardown and re-subscription.
3348 self.node_pubsub_mapping.clear()
3349 # _pubsubs_generator captures node_pubsub_mapping.values() into
3350 # a local list inside ``yield from``; clearing the mapping does
3351 # not reach references already held by that captured snapshot,
3352 # so a generator suspended mid-yield-from would still surface
3353 # the now-reset() per-node pubsubs after re-subscription.
3354 # Recreate it to drop the captured list. type(self) bypasses
3355 # the instance-level self-shadow established at __init__
3356 # (self._pubsubs_generator = self._pubsubs_generator()).
3357 self._pubsubs_generator = type(self)._pubsubs_generator(self)
3358 super().reset()
3359 self._shard_channel_to_node = {}
3361 def get_redis_connection(self):
3362 """
3363 Get the Redis connection of the pubsub connected node.
3364 """
3365 if self.node is not None:
3366 return self.node.redis_connection
3368 def disconnect(self):
3369 """
3370 Disconnect the pubsub connection.
3371 """
3372 if self.connection:
3373 self.connection.disconnect()
3374 for pubsub in self.node_pubsub_mapping.values():
3375 if pubsub.connection:
3376 pubsub.connection.disconnect()
3379class ClusterPipeline(RedisCluster):
3380 """
3381 Support for Redis pipeline
3382 in cluster mode
3383 """
3385 ERRORS_ALLOW_RETRY = (
3386 ConnectionError,
3387 TimeoutError,
3388 MovedError,
3389 AskError,
3390 TryAgainError,
3391 )
3393 NO_SLOTS_COMMANDS = {"UNWATCH"}
3394 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
3395 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3397 @deprecated_args(
3398 args_to_warn=[
3399 "cluster_error_retry_attempts",
3400 ],
3401 reason="Please configure the 'retry' object instead",
3402 version="6.0.0",
3403 )
3404 def __init__(
3405 self,
3406 nodes_manager: "NodesManager",
3407 commands_parser: "CommandsParser",
3408 result_callbacks: Optional[Dict[str, Callable]] = None,
3409 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
3410 startup_nodes: Optional[List["ClusterNode"]] = None,
3411 read_from_replicas: bool = False,
3412 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
3413 cluster_error_retry_attempts: int = DEFAULT_RETRY_COUNT,
3414 reinitialize_steps: int = 5,
3415 retry: Optional[Retry] = None,
3416 lock=None,
3417 transaction=False,
3418 policy_resolver: PolicyResolver = StaticPolicyResolver(),
3419 event_dispatcher: Optional["EventDispatcher"] = None,
3420 **kwargs,
3421 ):
3422 """ """
3423 self.command_stack = []
3424 self.nodes_manager = nodes_manager
3425 self.commands_parser = commands_parser
3426 self.refresh_table_asap = False
3427 self.result_callbacks = (
3428 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
3429 )
3430 self.startup_nodes = startup_nodes if startup_nodes else []
3431 self.read_from_replicas = read_from_replicas
3432 self.load_balancing_strategy = load_balancing_strategy
3433 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
3434 self.cluster_response_callbacks = cluster_response_callbacks
3435 self.reinitialize_counter = 0
3436 self.reinitialize_steps = reinitialize_steps
3437 if retry is not None:
3438 self.retry = retry
3439 else:
3440 self.retry = Retry(
3441 backoff=ExponentialWithJitterBackoff(
3442 base=DEFAULT_RETRY_BASE, cap=DEFAULT_RETRY_CAP
3443 ),
3444 retries=cluster_error_retry_attempts,
3445 )
3447 self.encoder = Encoder(
3448 kwargs.get("encoding", "utf-8"),
3449 kwargs.get("encoding_errors", "strict"),
3450 kwargs.get("decode_responses", False),
3451 )
3452 if lock is None:
3453 lock = threading.RLock()
3454 self._lock = lock
3455 self.parent_execute_command = super().execute_command
3456 self._execution_strategy: ExecutionStrategy = (
3457 PipelineStrategy(self) if not transaction else TransactionStrategy(self)
3458 )
3460 # For backward compatibility, mapping from existing policies to new one
3461 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
3462 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
3463 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
3464 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
3465 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
3466 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
3467 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
3468 }
3470 self._policies_callback_mapping: dict[
3471 Union[RequestPolicy, ResponsePolicy], Callable
3472 ] = {
3473 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
3474 self.get_random_primary_or_all_nodes(command_name)
3475 ],
3476 RequestPolicy.DEFAULT_KEYED: lambda command,
3477 *args: self.get_nodes_from_slot(command, *args),
3478 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
3479 RequestPolicy.ALL_SHARDS: self.get_primaries,
3480 RequestPolicy.ALL_NODES: self.get_nodes,
3481 RequestPolicy.ALL_REPLICAS: self.get_replicas,
3482 RequestPolicy.MULTI_SHARD: lambda *args,
3483 **kwargs: self._split_multi_shard_command(*args, **kwargs),
3484 RequestPolicy.SPECIAL: self.get_special_nodes,
3485 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
3486 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
3487 }
3489 self._policy_resolver = policy_resolver
3491 if event_dispatcher is None:
3492 self._event_dispatcher = EventDispatcher()
3493 else:
3494 self._event_dispatcher = event_dispatcher
3496 def __repr__(self):
3497 """ """
3498 return f"{type(self).__name__}"
3500 def __enter__(self):
3501 """ """
3502 return self
3504 def __exit__(self, exc_type, exc_value, traceback):
3505 """ """
3506 self.reset()
3508 def __del__(self):
3509 try:
3510 self.reset()
3511 except Exception:
3512 pass
3514 def __len__(self):
3515 """ """
3516 return len(self._execution_strategy.command_queue)
3518 def __bool__(self):
3519 "Pipeline instances should always evaluate to True on Python 3+"
3520 return True
3522 def execute_command(self, *args, **kwargs):
3523 """
3524 Wrapper function for pipeline_execute_command
3525 """
3526 return self._execution_strategy.execute_command(*args, **kwargs)
3528 def pipeline_execute_command(self, *args, **options):
3529 """
3530 Stage a command to be executed when execute() is next called
3532 Returns the current Pipeline object back so commands can be
3533 chained together, such as:
3535 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
3537 At some other point, you can then run: pipe.execute(),
3538 which will execute all commands queued in the pipe.
3539 """
3540 return self._execution_strategy.execute_command(*args, **options)
3542 def annotate_exception(self, exception, number, command):
3543 """
3544 Provides extra context to the exception prior to it being handled
3545 """
3546 self._execution_strategy.annotate_exception(exception, number, command)
3548 def execute(self, raise_on_error: bool = True) -> List[Any]:
3549 """
3550 Execute all the commands in the current pipeline
3551 """
3553 try:
3554 return self._execution_strategy.execute(raise_on_error)
3555 finally:
3556 self.reset()
3558 def reset(self):
3559 """
3560 Reset back to empty pipeline.
3561 """
3562 self._execution_strategy.reset()
3564 def send_cluster_commands(
3565 self, stack, raise_on_error=True, allow_redirections=True
3566 ):
3567 return self._execution_strategy.send_cluster_commands(
3568 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
3569 )
3571 def exists(self, *keys):
3572 return self._execution_strategy.exists(*keys)
3574 def eval(self):
3575 """ """
3576 return self._execution_strategy.eval()
3578 def multi(self):
3579 """
3580 Start a transactional block of the pipeline after WATCH commands
3581 are issued. End the transactional block with `execute`.
3582 """
3583 self._execution_strategy.multi()
3585 def load_scripts(self):
3586 """ """
3587 self._execution_strategy.load_scripts()
3589 def discard(self):
3590 """ """
3591 self._execution_strategy.discard()
3593 def watch(self, *names):
3594 """Watches the values at keys ``names``"""
3595 self._execution_strategy.watch(*names)
3597 def unwatch(self):
3598 """Unwatches all previously specified keys"""
3599 self._execution_strategy.unwatch()
3601 def script_load_for_pipeline(self, *args, **kwargs):
3602 self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
3604 def delete(self, *names):
3605 self._execution_strategy.delete(*names)
3607 def unlink(self, *names):
3608 self._execution_strategy.unlink(*names)
3611def block_pipeline_command(name: str) -> Callable[..., Any]:
3612 """
3613 Prints error because some pipelined commands should
3614 be blocked when running in cluster-mode
3615 """
3617 def inner(*args, **kwargs):
3618 raise RedisClusterException(
3619 f"ERROR: Calling pipelined function {name} is blocked "
3620 f"when running redis in cluster mode..."
3621 )
3623 return inner
3626# Blocked pipeline commands
3627PIPELINE_BLOCKED_COMMANDS = (
3628 "BGREWRITEAOF",
3629 "BGSAVE",
3630 "BITOP",
3631 "BRPOPLPUSH",
3632 "CLIENT GETNAME",
3633 "CLIENT KILL",
3634 "CLIENT LIST",
3635 "CLIENT SETNAME",
3636 "CLIENT",
3637 "CONFIG GET",
3638 "CONFIG RESETSTAT",
3639 "CONFIG REWRITE",
3640 "CONFIG SET",
3641 "CONFIG",
3642 "DBSIZE",
3643 "ECHO",
3644 "EVALSHA",
3645 "FLUSHALL",
3646 "FLUSHDB",
3647 "INFO",
3648 "KEYS",
3649 "LASTSAVE",
3650 "MGET",
3651 "MGET NONATOMIC",
3652 "MOVE",
3653 "MSET",
3654 "MSETEX",
3655 "MSET NONATOMIC",
3656 "MSETNX",
3657 "PFCOUNT",
3658 "PFMERGE",
3659 "PING",
3660 "PUBLISH",
3661 "RANDOMKEY",
3662 "READONLY",
3663 "READWRITE",
3664 "RENAME",
3665 "RENAMENX",
3666 "RPOPLPUSH",
3667 "SAVE",
3668 "SCAN",
3669 "SCRIPT EXISTS",
3670 "SCRIPT FLUSH",
3671 "SCRIPT KILL",
3672 "SCRIPT LOAD",
3673 "SCRIPT",
3674 "SDIFF",
3675 "SDIFFSTORE",
3676 "SENTINEL GET MASTER ADDR BY NAME",
3677 "SENTINEL MASTER",
3678 "SENTINEL MASTERS",
3679 "SENTINEL MONITOR",
3680 "SENTINEL REMOVE",
3681 "SENTINEL SENTINELS",
3682 "SENTINEL SET",
3683 "SENTINEL SLAVES",
3684 "SENTINEL",
3685 "SHUTDOWN",
3686 "SINTER",
3687 "SINTERSTORE",
3688 "SLAVEOF",
3689 "SLOWLOG GET",
3690 "SLOWLOG LEN",
3691 "SLOWLOG RESET",
3692 "SLOWLOG",
3693 "SMOVE",
3694 "SORT",
3695 "SUNION",
3696 "SUNIONSTORE",
3697 "TIME",
3698)
3699for command in PIPELINE_BLOCKED_COMMANDS:
3700 command = command.replace(" ", "_").lower()
3702 setattr(ClusterPipeline, command, block_pipeline_command(command))
3705class PipelineCommand:
3706 """ """
3708 def __init__(self, args, options=None, position=None):
3709 self.args = args
3710 if options is None:
3711 options = {}
3712 self.options = options
3713 self.position = position
3714 self.result = None
3715 self.node = None
3716 self.asking = False
3717 self.command_policies: Optional[CommandPolicies] = None
3720class NodeCommands:
3721 """ """
3723 def __init__(
3724 self, parse_response, connection_pool: ConnectionPool, connection: Connection
3725 ):
3726 """ """
3727 self.parse_response = parse_response
3728 self.connection_pool = connection_pool
3729 self.connection = connection
3730 self.commands = []
3732 def append(self, c):
3733 """ """
3734 self.commands.append(c)
3736 def write(self):
3737 """
3738 Code borrowed from Redis so it can be fixed
3739 """
3740 connection = self.connection
3741 commands = self.commands
3743 # We are going to clobber the commands with the write, so go ahead
3744 # and ensure that nothing is sitting there from a previous run.
3745 for c in commands:
3746 c.result = None
3748 # build up all commands into a single request to increase network perf
3749 # send all the commands and catch connection and timeout errors.
3750 try:
3751 connection.send_packed_command(
3752 connection.pack_commands([c.args for c in commands])
3753 )
3754 except (ConnectionError, TimeoutError) as e:
3755 for c in commands:
3756 c.result = e
3758 def read(self):
3759 """ """
3760 connection = self.connection
3761 for c in self.commands:
3762 # if there is a result on this command,
3763 # it means we ran into an exception
3764 # like a connection error. Trying to parse
3765 # a response on a connection that
3766 # is no longer open will result in a
3767 # connection error raised by redis-py.
3768 # but redis-py doesn't check in parse_response
3769 # that the sock object is
3770 # still set and if you try to
3771 # read from a closed connection, it will
3772 # result in an AttributeError because
3773 # it will do a readline() call on None.
3774 # This can have all kinds of nasty side-effects.
3775 # Treating this case as a connection error
3776 # is fine because it will dump
3777 # the connection object back into the
3778 # pool and on the next write, it will
3779 # explicitly open the connection and all will be well.
3780 if c.result is None:
3781 try:
3782 c.result = self.parse_response(connection, c.args[0], **c.options)
3783 except (ConnectionError, TimeoutError) as e:
3784 for c in self.commands:
3785 c.result = e
3786 return
3787 except RedisError:
3788 c.result = sys.exc_info()[1]
3791class ExecutionStrategy(ABC):
3792 @property
3793 @abstractmethod
3794 def command_queue(self):
3795 pass
3797 @abstractmethod
3798 def execute_command(self, *args, **kwargs):
3799 """
3800 Execution flow for current execution strategy.
3802 See: ClusterPipeline.execute_command()
3803 """
3804 pass
3806 @abstractmethod
3807 def annotate_exception(self, exception, number, command):
3808 """
3809 Annotate exception according to current execution strategy.
3811 See: ClusterPipeline.annotate_exception()
3812 """
3813 pass
3815 @abstractmethod
3816 def pipeline_execute_command(self, *args, **options):
3817 """
3818 Pipeline execution flow for current execution strategy.
3820 See: ClusterPipeline.pipeline_execute_command()
3821 """
3822 pass
3824 @abstractmethod
3825 def execute(self, raise_on_error: bool = True) -> List[Any]:
3826 """
3827 Executes current execution strategy.
3829 See: ClusterPipeline.execute()
3830 """
3831 pass
3833 @abstractmethod
3834 def send_cluster_commands(
3835 self, stack, raise_on_error=True, allow_redirections=True
3836 ):
3837 """
3838 Sends commands according to current execution strategy.
3840 See: ClusterPipeline.send_cluster_commands()
3841 """
3842 pass
3844 @abstractmethod
3845 def reset(self):
3846 """
3847 Resets current execution strategy.
3849 See: ClusterPipeline.reset()
3850 """
3851 pass
3853 @abstractmethod
3854 def exists(self, *keys):
3855 pass
3857 @abstractmethod
3858 def eval(self):
3859 pass
3861 @abstractmethod
3862 def multi(self):
3863 """
3864 Starts transactional context.
3866 See: ClusterPipeline.multi()
3867 """
3868 pass
3870 @abstractmethod
3871 def load_scripts(self):
3872 pass
3874 @abstractmethod
3875 def watch(self, *names):
3876 pass
3878 @abstractmethod
3879 def unwatch(self):
3880 """
3881 Unwatches all previously specified keys
3883 See: ClusterPipeline.unwatch()
3884 """
3885 pass
3887 @abstractmethod
3888 def script_load_for_pipeline(self, *args, **kwargs):
3889 pass
3891 @abstractmethod
3892 def delete(self, *names):
3893 """
3894 "Delete a key specified by ``names``"
3896 See: ClusterPipeline.delete()
3897 """
3898 pass
3900 @abstractmethod
3901 def unlink(self, *names):
3902 """
3903 "Unlink a key specified by ``names``"
3905 See: ClusterPipeline.unlink()
3906 """
3907 pass
3909 @abstractmethod
3910 def discard(self):
3911 pass
3914class AbstractStrategy(ExecutionStrategy):
3915 def __init__(
3916 self,
3917 pipe: ClusterPipeline,
3918 ):
3919 self._command_queue: List[PipelineCommand] = []
3920 self._pipe = pipe
3921 self._nodes_manager = self._pipe.nodes_manager
3923 @property
3924 def command_queue(self):
3925 return self._command_queue
3927 @command_queue.setter
3928 def command_queue(self, queue: List[PipelineCommand]):
3929 self._command_queue = queue
3931 @abstractmethod
3932 def execute_command(self, *args, **kwargs):
3933 pass
3935 def pipeline_execute_command(self, *args, **options):
3936 self._command_queue.append(
3937 PipelineCommand(args, options, len(self._command_queue))
3938 )
3939 return self._pipe
3941 @abstractmethod
3942 def execute(self, raise_on_error: bool = True) -> List[Any]:
3943 pass
3945 @abstractmethod
3946 def send_cluster_commands(
3947 self, stack, raise_on_error=True, allow_redirections=True
3948 ):
3949 pass
3951 @abstractmethod
3952 def reset(self):
3953 pass
3955 def exists(self, *keys):
3956 return self.execute_command("EXISTS", *keys)
3958 def eval(self):
3959 """ """
3960 raise RedisClusterException("method eval() is not implemented")
3962 def load_scripts(self):
3963 """ """
3964 raise RedisClusterException("method load_scripts() is not implemented")
3966 def script_load_for_pipeline(self, *args, **kwargs):
3967 """ """
3968 raise RedisClusterException(
3969 "method script_load_for_pipeline() is not implemented"
3970 )
3972 def annotate_exception(self, exception, number, command):
3973 """
3974 Provides extra context to the exception prior to it being handled
3975 """
3976 cmd = " ".join(map(safe_str, command))
3977 msg = (
3978 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
3979 f"caused error: {exception.args[0]}"
3980 )
3981 exception.args = (msg,) + exception.args[1:]
3984class PipelineStrategy(AbstractStrategy):
3985 def __init__(self, pipe: ClusterPipeline):
3986 super().__init__(pipe)
3987 self.command_flags = pipe.command_flags
3989 def execute_command(self, *args, **kwargs):
3990 return self.pipeline_execute_command(*args, **kwargs)
3992 def _raise_first_error(self, stack, start_time):
3993 """
3994 Raise the first exception on the stack
3995 """
3996 for c in stack:
3997 r = c.result
3998 if isinstance(r, Exception):
3999 self.annotate_exception(r, c.position + 1, c.args)
4001 record_operation_duration(
4002 command_name="PIPELINE",
4003 duration_seconds=time.monotonic() - start_time,
4004 error=r,
4005 )
4007 raise r
4009 def execute(self, raise_on_error: bool = True) -> List[Any]:
4010 stack = self._command_queue
4011 if not stack:
4012 return []
4014 try:
4015 return self.send_cluster_commands(stack, raise_on_error)
4016 finally:
4017 self.reset()
4019 def reset(self):
4020 """
4021 Reset back to empty pipeline.
4022 """
4023 self._command_queue = []
4025 def send_cluster_commands(
4026 self, stack, raise_on_error=True, allow_redirections=True
4027 ):
4028 """
4029 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
4031 If one of the retryable exceptions has been thrown we assume that:
4032 - connection_pool was disconnected
4033 - connection_pool was reset
4034 - refresh_table_asap set to True
4036 It will try the number of times specified by
4037 the retries in config option "self.retry"
4038 which defaults to 10 unless manually configured.
4040 If it reaches the number of times, the command will
4041 raises ClusterDownException.
4042 """
4043 if not stack:
4044 return []
4045 retry_attempts = self._pipe.retry.get_retries()
4046 while True:
4047 try:
4048 return self._send_cluster_commands(
4049 stack,
4050 raise_on_error=raise_on_error,
4051 allow_redirections=allow_redirections,
4052 )
4053 except RedisCluster.ERRORS_ALLOW_RETRY as e:
4054 if retry_attempts > 0:
4055 # Try again with the new cluster setup. All other errors
4056 # should be raised.
4057 retry_attempts -= 1
4058 pass
4059 else:
4060 raise e
4062 def _send_cluster_commands(
4063 self, stack, raise_on_error=True, allow_redirections=True
4064 ):
4065 """
4066 Send a bunch of cluster commands to the redis cluster.
4068 `allow_redirections` If the pipeline should follow
4069 `ASK` & `MOVED` responses automatically. If set
4070 to false it will raise RedisClusterException.
4071 """
4072 # the first time sending the commands we send all of
4073 # the commands that were queued up.
4074 # if we have to run through it again, we only retry
4075 # the commands that failed.
4076 attempt = sorted(stack, key=lambda x: x.position)
4077 is_default_node = False
4078 # build a list of node objects based on node names we need to
4079 nodes: dict[str, NodeCommands] = {}
4080 nodes_written = 0
4081 nodes_read = 0
4083 try:
4084 # as we move through each command that still needs to be processed,
4085 # we figure out the slot number that command maps to, then from
4086 # the slot determine the node.
4087 for c in attempt:
4088 command_policies = self._pipe._policy_resolver.resolve(
4089 c.args[0].lower()
4090 )
4091 # refer to our internal node -> slot table that
4092 # tells us where a given command should route to.
4093 # (it might be possible we have a cached node that no longer
4094 # exists in the cluster, which is why we do this in a loop)
4095 passed_targets = c.options.pop("target_nodes", None)
4096 if passed_targets and not self._is_nodes_flag(passed_targets):
4097 target_nodes = self._parse_target_nodes(passed_targets)
4099 if not command_policies:
4100 command_policies = CommandPolicies()
4101 else:
4102 if not command_policies:
4103 command = c.args[0].upper()
4104 if (
4105 len(c.args) >= 2
4106 and f"{c.args[0]} {c.args[1]}".upper()
4107 in self._pipe.command_flags
4108 ):
4109 command = f"{c.args[0]} {c.args[1]}".upper()
4111 # We only could resolve key properties if command is not
4112 # in a list of pre-defined request policies
4113 command_flag = self.command_flags.get(command)
4114 if not command_flag:
4115 # Fallback to default policy
4116 if not self._pipe.get_default_node():
4117 keys = None
4118 else:
4119 keys = self._pipe._get_command_keys(*c.args)
4120 if not keys or len(keys) == 0:
4121 command_policies = CommandPolicies()
4122 else:
4123 command_policies = CommandPolicies(
4124 request_policy=RequestPolicy.DEFAULT_KEYED,
4125 response_policy=ResponsePolicy.DEFAULT_KEYED,
4126 )
4127 else:
4128 if command_flag in self._pipe._command_flags_mapping:
4129 command_policies = CommandPolicies(
4130 request_policy=self._pipe._command_flags_mapping[
4131 command_flag
4132 ]
4133 )
4134 else:
4135 command_policies = CommandPolicies()
4137 target_nodes = self._determine_nodes(
4138 *c.args,
4139 request_policy=command_policies.request_policy,
4140 node_flag=passed_targets,
4141 )
4142 if not target_nodes:
4143 raise RedisClusterException(
4144 f"No targets were found to execute {c.args} command on"
4145 )
4146 c.command_policies = command_policies
4147 if len(target_nodes) > 1:
4148 raise RedisClusterException(
4149 f"Too many targets for command {c.args}"
4150 )
4152 node = target_nodes[0]
4153 if node == self._pipe.get_default_node():
4154 is_default_node = True
4156 # now that we know the name of the node
4157 # ( it's just a string in the form of host:port )
4158 # we can build a list of commands for each node.
4159 node_name = node.name
4160 if node_name not in nodes:
4161 redis_node = self._pipe.get_redis_connection(node)
4162 try:
4163 connection = get_connection(redis_node)
4164 except (ConnectionError, TimeoutError):
4165 # Release any connections we've already acquired before clearing nodes
4166 for n in nodes.values():
4167 n.connection_pool.release(n.connection)
4168 # Connection retries are being handled in the node's
4169 # Retry object. Reinitialize the node -> slot table.
4170 self._nodes_manager.initialize()
4171 if is_default_node:
4172 self._pipe.replace_default_node()
4173 nodes = {}
4174 raise
4175 nodes[node_name] = NodeCommands(
4176 redis_node.parse_response,
4177 redis_node.connection_pool,
4178 connection,
4179 )
4180 nodes[node_name].append(c)
4182 # send the commands in sequence.
4183 # we write to all the open sockets for each node first,
4184 # before reading anything
4185 # this allows us to flush all the requests out across the
4186 # network
4187 # so that we can read them from different sockets as they come back.
4188 # we don't multiplex on the sockets as they come available,
4189 # but that shouldn't make too much difference.
4191 # Start timing for observability
4192 start_time = time.monotonic()
4194 node_commands = nodes.values()
4195 for n in node_commands:
4196 nodes_written += 1
4197 n.write()
4199 for n in node_commands:
4200 n.read()
4202 # Find the first error in this node's commands, if any
4203 node_error = None
4204 for cmd in n.commands:
4205 if isinstance(cmd.result, Exception):
4206 node_error = cmd.result
4207 break
4209 record_operation_duration(
4210 command_name="PIPELINE",
4211 duration_seconds=time.monotonic() - start_time,
4212 server_address=n.connection.host,
4213 server_port=n.connection.port,
4214 db_namespace=str(n.connection.db),
4215 error=node_error,
4216 )
4217 nodes_read += 1
4218 finally:
4219 # release all the redis connections we allocated earlier
4220 # back into the connection pool.
4221 # if the connection is dirty (that is: we've written
4222 # commands to it, but haven't read the responses), we need
4223 # to close the connection before returning it to the pool.
4224 # otherwise, the next caller to use this connection will
4225 # read the response from _this_ request, not its own request.
4226 # disconnecting discards the dirty state & forces the next
4227 # caller to reconnect.
4228 # NOTE: dicts have a consistent ordering; we're iterating
4229 # through nodes.values() in the same order as we are when
4230 # reading / writing to the connections above, which is critical
4231 # for how we're using the nodes_written/nodes_read offsets.
4232 for i, n in enumerate(nodes.values()):
4233 if i < nodes_written and i >= nodes_read:
4234 n.connection.disconnect()
4235 n.connection_pool.release(n.connection)
4237 # if the response isn't an exception it is a
4238 # valid response from the node
4239 # we're all done with that command, YAY!
4240 # if we have more commands to attempt, we've run into problems.
4241 # collect all the commands we are allowed to retry.
4242 # (MOVED, ASK, or connection errors or timeout errors)
4243 attempt = sorted(
4244 (
4245 c
4246 for c in attempt
4247 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
4248 ),
4249 key=lambda x: x.position,
4250 )
4251 if attempt and allow_redirections:
4252 # RETRY MAGIC HAPPENS HERE!
4253 # send these remaining commands one at a time using `execute_command`
4254 # in the main client. This keeps our retry logic
4255 # in one place mostly,
4256 # and allows us to be more confident in correctness of behavior.
4257 # at this point any speed gains from pipelining have been lost
4258 # anyway, so we might as well make the best
4259 # attempt to get the correct behavior.
4260 #
4261 # The client command will handle retries for each
4262 # individual command sequentially as we pass each
4263 # one into `execute_command`. Any exceptions
4264 # that bubble out should only appear once all
4265 # retries have been exhausted.
4266 #
4267 # If a lot of commands have failed, we'll be setting the
4268 # flag to rebuild the slots table from scratch.
4269 # So MOVED errors should correct themselves fairly quickly.
4270 self._pipe.reinitialize_counter += 1
4271 if self._pipe._should_reinitialized():
4272 self._nodes_manager.initialize()
4273 if is_default_node:
4274 self._pipe.replace_default_node()
4275 for c in attempt:
4276 try:
4277 # send each command individually like we
4278 # do in the main client.
4279 c.result = self._pipe.parent_execute_command(*c.args, **c.options)
4280 except RedisError as e:
4281 c.result = e
4283 # turn the response back into a simple flat array that corresponds
4284 # to the sequence of commands issued in the stack in pipeline.execute()
4285 response = []
4286 for c in sorted(stack, key=lambda x: x.position):
4287 if c.args[0] in self._pipe.cluster_response_callbacks:
4288 # Remove keys entry, it needs only for cache.
4289 c.options.pop("keys", None)
4290 c.result = self._pipe._policies_callback_mapping[
4291 c.command_policies.response_policy
4292 ](
4293 self._pipe.cluster_response_callbacks[c.args[0]](
4294 c.result, **c.options
4295 )
4296 )
4297 response.append(c.result)
4299 if raise_on_error:
4300 self._raise_first_error(stack, start_time)
4302 return response
4304 def _is_nodes_flag(self, target_nodes):
4305 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
4307 def _parse_target_nodes(self, target_nodes):
4308 if isinstance(target_nodes, list):
4309 nodes = target_nodes
4310 elif isinstance(target_nodes, ClusterNode):
4311 # Supports passing a single ClusterNode as a variable
4312 nodes = [target_nodes]
4313 elif isinstance(target_nodes, dict):
4314 # Supports dictionaries of the format {node_name: node}.
4315 # It enables to execute commands with multi nodes as follows:
4316 # rc.cluster_save_config(rc.get_primaries())
4317 nodes = target_nodes.values()
4318 else:
4319 raise TypeError(
4320 "target_nodes type can be one of the following: "
4321 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
4322 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
4323 f"The passed type is {type(target_nodes)}"
4324 )
4325 return nodes
4327 def _determine_nodes(
4328 self, *args, request_policy: RequestPolicy, **kwargs
4329 ) -> List["ClusterNode"]:
4330 # Determine which nodes should be executed the command on.
4331 # Returns a list of target nodes.
4332 command = args[0].upper()
4333 if (
4334 len(args) >= 2
4335 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
4336 ):
4337 command = f"{args[0]} {args[1]}".upper()
4339 nodes_flag = kwargs.pop("nodes_flag", None)
4340 if nodes_flag is not None:
4341 # nodes flag passed by the user
4342 command_flag = nodes_flag
4343 else:
4344 # get the nodes group for this command if it was predefined
4345 command_flag = self._pipe.command_flags.get(command)
4347 if command_flag in self._pipe._command_flags_mapping:
4348 request_policy = self._pipe._command_flags_mapping[command_flag]
4350 policy_callback = self._pipe._policies_callback_mapping[request_policy]
4352 if request_policy == RequestPolicy.DEFAULT_KEYED:
4353 nodes = policy_callback(command, *args)
4354 elif request_policy == RequestPolicy.MULTI_SHARD:
4355 nodes = policy_callback(*args, **kwargs)
4356 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
4357 nodes = policy_callback(args[0])
4358 else:
4359 nodes = policy_callback()
4361 if args[0].lower() == "ft.aggregate":
4362 self._aggregate_nodes = nodes
4364 return nodes
4366 def multi(self):
4367 raise RedisClusterException(
4368 "method multi() is not supported outside of transactional context"
4369 )
4371 def discard(self):
4372 raise RedisClusterException(
4373 "method discard() is not supported outside of transactional context"
4374 )
4376 def watch(self, *names):
4377 raise RedisClusterException(
4378 "method watch() is not supported outside of transactional context"
4379 )
4381 def unwatch(self, *names):
4382 raise RedisClusterException(
4383 "method unwatch() is not supported outside of transactional context"
4384 )
4386 def delete(self, *names):
4387 if len(names) != 1:
4388 raise RedisClusterException(
4389 "deleting multiple keys is not implemented in pipeline command"
4390 )
4392 return self.execute_command("DEL", names[0])
4394 def unlink(self, *names):
4395 if len(names) != 1:
4396 raise RedisClusterException(
4397 "unlinking multiple keys is not implemented in pipeline command"
4398 )
4400 return self.execute_command("UNLINK", names[0])
4403class TransactionStrategy(AbstractStrategy):
4404 NO_SLOTS_COMMANDS = {"UNWATCH"}
4405 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
4406 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
4407 SLOT_REDIRECT_ERRORS = (AskError, MovedError)
4408 CONNECTION_ERRORS = (
4409 ConnectionError,
4410 OSError,
4411 ClusterDownError,
4412 SlotNotCoveredError,
4413 )
4415 def __init__(self, pipe: ClusterPipeline):
4416 super().__init__(pipe)
4417 self._explicit_transaction = False
4418 self._watching = False
4419 self._pipeline_slots: Set[int] = set()
4420 self._transaction_connection: Optional[Connection] = None
4421 self._executing = False
4422 self._retry = copy(self._pipe.retry)
4423 self._retry.update_supported_errors(
4424 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
4425 )
4427 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
4428 """
4429 Find a connection for a pipeline transaction.
4431 For running an atomic transaction, watch keys ensure that contents have not been
4432 altered as long as the watch commands for those keys were sent over the same
4433 connection. So once we start watching a key, we fetch a connection to the
4434 node that owns that slot and reuse it.
4435 """
4436 if not self._pipeline_slots:
4437 raise RedisClusterException(
4438 "At least a command with a key is needed to identify a node"
4439 )
4441 node: ClusterNode = self._nodes_manager.get_node_from_slot(
4442 list(self._pipeline_slots)[0], False
4443 )
4444 redis_node: Redis = self._pipe.get_redis_connection(node)
4445 if self._transaction_connection:
4446 if not redis_node.connection_pool.owns_connection(
4447 self._transaction_connection
4448 ):
4449 previous_node = self._nodes_manager.find_connection_owner(
4450 self._transaction_connection
4451 )
4452 previous_node.connection_pool.release(self._transaction_connection)
4453 self._transaction_connection = None
4455 if not self._transaction_connection:
4456 self._transaction_connection = get_connection(redis_node)
4458 return redis_node, self._transaction_connection
4460 def execute_command(self, *args, **kwargs):
4461 slot_number: Optional[int] = None
4462 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
4463 slot_number = self._pipe.determine_slot(*args)
4465 if (
4466 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
4467 ) and not self._explicit_transaction:
4468 if args[0] == "WATCH":
4469 self._validate_watch()
4471 if slot_number is not None:
4472 if self._pipeline_slots and slot_number not in self._pipeline_slots:
4473 raise CrossSlotTransactionError(
4474 "Cannot watch or send commands on different slots"
4475 )
4477 self._pipeline_slots.add(slot_number)
4478 elif args[0] not in self.NO_SLOTS_COMMANDS:
4479 raise RedisClusterException(
4480 f"Cannot identify slot number for command: {args[0]},"
4481 "it cannot be triggered in a transaction"
4482 )
4484 return self._immediate_execute_command(*args, **kwargs)
4485 else:
4486 if slot_number is not None:
4487 self._pipeline_slots.add(slot_number)
4489 return self.pipeline_execute_command(*args, **kwargs)
4491 def _validate_watch(self):
4492 if self._explicit_transaction:
4493 raise RedisError("Cannot issue a WATCH after a MULTI")
4495 self._watching = True
4497 def _immediate_execute_command(self, *args, **options):
4498 return self._retry.call_with_retry(
4499 lambda: self._get_connection_and_send_command(*args, **options),
4500 self._reinitialize_on_error,
4501 with_failure_count=True,
4502 )
4504 def _get_connection_and_send_command(self, *args, **options):
4505 redis_node, connection = self._get_client_and_connection_for_transaction()
4507 # Start timing for observability
4508 start_time = time.monotonic()
4510 try:
4511 response = self._send_command_parse_response(
4512 connection, redis_node, args[0], *args, **options
4513 )
4515 record_operation_duration(
4516 command_name=args[0],
4517 duration_seconds=time.monotonic() - start_time,
4518 server_address=connection.host,
4519 server_port=connection.port,
4520 db_namespace=str(connection.db),
4521 )
4523 return response
4524 except Exception as e:
4525 if connection:
4526 # this is used to report the metrics based on host and port info
4527 e.connection = connection
4528 record_operation_duration(
4529 command_name=args[0],
4530 duration_seconds=time.monotonic() - start_time,
4531 server_address=connection.host,
4532 server_port=connection.port,
4533 db_namespace=str(connection.db),
4534 error=e,
4535 )
4536 raise
4538 def _send_command_parse_response(
4539 self, conn, redis_node: Redis, command_name, *args, **options
4540 ):
4541 """
4542 Send a command and parse the response
4543 """
4545 conn.send_command(*args)
4546 output = redis_node.parse_response(conn, command_name, **options)
4548 if command_name in self.UNWATCH_COMMANDS:
4549 self._watching = False
4550 return output
4552 def _reinitialize_on_error(self, error, failure_count):
4553 if hasattr(error, "connection"):
4554 record_error_count(
4555 server_address=error.connection.host,
4556 server_port=error.connection.port,
4557 network_peer_address=error.connection.host,
4558 network_peer_port=error.connection.port,
4559 error_type=error,
4560 retry_attempts=failure_count,
4561 is_internal=True,
4562 )
4564 if self._watching:
4565 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
4566 raise WatchError("Slot rebalancing occurred while watching keys")
4568 if (
4569 type(error) in self.SLOT_REDIRECT_ERRORS
4570 or type(error) in self.CONNECTION_ERRORS
4571 ):
4572 if self._transaction_connection:
4573 if is_debug_log_enabled():
4574 logger.debug(
4575 f"Operation failed, "
4576 f"with connection: {self._transaction_connection}, "
4577 f"details: {self._transaction_connection.extract_connection_details()}",
4578 )
4579 # Disconnect and release back to pool
4580 self._transaction_connection.disconnect()
4581 node = self._nodes_manager.find_connection_owner(
4582 self._transaction_connection
4583 )
4584 if node and node.redis_connection:
4585 node.redis_connection.connection_pool.release(
4586 self._transaction_connection
4587 )
4588 self._transaction_connection = None
4590 self._pipe.reinitialize_counter += 1
4591 if self._pipe._should_reinitialized():
4592 self._nodes_manager.initialize()
4593 self.reinitialize_counter = 0
4594 else:
4595 if isinstance(error, AskError):
4596 self._nodes_manager.move_slot(error)
4598 self._executing = False
4600 def _raise_first_error(self, responses, stack, start_time):
4601 """
4602 Raise the first exception on the stack
4603 """
4604 for r, cmd in zip(responses, stack):
4605 if isinstance(r, Exception):
4606 self.annotate_exception(r, cmd.position + 1, cmd.args)
4608 record_operation_duration(
4609 command_name="TRANSACTION",
4610 duration_seconds=time.monotonic() - start_time,
4611 server_address=self._transaction_connection.host,
4612 server_port=self._transaction_connection.port,
4613 db_namespace=str(self._transaction_connection.db),
4614 )
4616 raise r
4618 def execute(self, raise_on_error: bool = True) -> List[Any]:
4619 stack = self._command_queue
4620 if not stack and (not self._watching or not self._pipeline_slots):
4621 return []
4623 return self._execute_transaction_with_retries(stack, raise_on_error)
4625 def _execute_transaction_with_retries(
4626 self, stack: List["PipelineCommand"], raise_on_error: bool
4627 ):
4628 return self._retry.call_with_retry(
4629 lambda: self._execute_transaction(stack, raise_on_error),
4630 lambda error, failure_count: self._reinitialize_on_error(
4631 error, failure_count
4632 ),
4633 with_failure_count=True,
4634 )
4636 def _execute_transaction(
4637 self, stack: List["PipelineCommand"], raise_on_error: bool
4638 ):
4639 if len(self._pipeline_slots) > 1:
4640 raise CrossSlotTransactionError(
4641 "All keys involved in a cluster transaction must map to the same slot"
4642 )
4644 self._executing = True
4646 redis_node, connection = self._get_client_and_connection_for_transaction()
4648 stack = chain(
4649 [PipelineCommand(("MULTI",))],
4650 stack,
4651 [PipelineCommand(("EXEC",))],
4652 )
4653 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
4654 packed_commands = connection.pack_commands(commands)
4656 # Start timing for observability
4657 start_time = time.monotonic()
4659 connection.send_packed_command(packed_commands)
4660 errors = []
4662 # parse off the response for MULTI
4663 # NOTE: we need to handle ResponseErrors here and continue
4664 # so that we read all the additional command messages from
4665 # the socket
4666 try:
4667 redis_node.parse_response(connection, "MULTI")
4668 except ResponseError as e:
4669 self.annotate_exception(e, 0, "MULTI")
4670 errors.append(e)
4671 except self.CONNECTION_ERRORS as cluster_error:
4672 self.annotate_exception(cluster_error, 0, "MULTI")
4673 raise
4675 # and all the other commands
4676 for i, command in enumerate(self._command_queue):
4677 if EMPTY_RESPONSE in command.options:
4678 errors.append((i, command.options[EMPTY_RESPONSE]))
4679 else:
4680 try:
4681 _ = redis_node.parse_response(connection, "_")
4682 except self.SLOT_REDIRECT_ERRORS as slot_error:
4683 self.annotate_exception(slot_error, i + 1, command.args)
4684 errors.append(slot_error)
4685 except self.CONNECTION_ERRORS as cluster_error:
4686 self.annotate_exception(cluster_error, i + 1, command.args)
4687 raise
4688 except ResponseError as e:
4689 self.annotate_exception(e, i + 1, command.args)
4690 errors.append(e)
4692 response = None
4693 # parse the EXEC.
4694 try:
4695 response = redis_node.parse_response(connection, "EXEC")
4696 except ExecAbortError:
4697 if errors:
4698 raise errors[0]
4699 raise
4701 self._executing = False
4703 record_operation_duration(
4704 command_name="TRANSACTION",
4705 duration_seconds=time.monotonic() - start_time,
4706 server_address=connection.host,
4707 server_port=connection.port,
4708 db_namespace=str(connection.db),
4709 )
4711 # EXEC clears any watched keys
4712 self._watching = False
4714 if response is None:
4715 raise WatchError("Watched variable changed.")
4717 # put any parse errors into the response
4718 for i, e in errors:
4719 response.insert(i, e)
4721 if len(response) != len(self._command_queue):
4722 raise InvalidPipelineStack(
4723 "Unexpected response length for cluster pipeline EXEC."
4724 " Command stack was {} but response had length {}".format(
4725 [c.args[0] for c in self._command_queue], len(response)
4726 )
4727 )
4729 # find any errors in the response and raise if necessary
4730 if raise_on_error or len(errors) > 0:
4731 self._raise_first_error(
4732 response,
4733 self._command_queue,
4734 start_time,
4735 )
4737 # We have to run response callbacks manually
4738 data = []
4739 for r, cmd in zip(response, self._command_queue):
4740 if not isinstance(r, Exception):
4741 command_name = cmd.args[0]
4742 if command_name in self._pipe.cluster_response_callbacks:
4743 r = self._pipe.cluster_response_callbacks[command_name](
4744 r, **cmd.options
4745 )
4746 data.append(r)
4747 return data
4749 def reset(self):
4750 self._command_queue = []
4752 # make sure to reset the connection state in the event that we were
4753 # watching something
4754 if self._transaction_connection:
4755 try:
4756 if self._watching:
4757 # call this manually since our unwatch or
4758 # immediate_execute_command methods can call reset()
4759 self._transaction_connection.send_command("UNWATCH")
4760 self._transaction_connection.read_response()
4761 # we can safely return the connection to the pool here since we're
4762 # sure we're no longer WATCHing anything
4763 node = self._nodes_manager.find_connection_owner(
4764 self._transaction_connection
4765 )
4766 if node and node.redis_connection:
4767 node.redis_connection.connection_pool.release(
4768 self._transaction_connection
4769 )
4770 self._transaction_connection = None
4771 except self.CONNECTION_ERRORS:
4772 # disconnect will also remove any previous WATCHes
4773 if self._transaction_connection:
4774 self._transaction_connection.disconnect()
4775 node = self._nodes_manager.find_connection_owner(
4776 self._transaction_connection
4777 )
4778 if node and node.redis_connection:
4779 node.redis_connection.connection_pool.release(
4780 self._transaction_connection
4781 )
4782 self._transaction_connection = None
4784 # clean up the other instance attributes
4785 self._watching = False
4786 self._explicit_transaction = False
4787 self._pipeline_slots = set()
4788 self._executing = False
4790 def send_cluster_commands(
4791 self, stack, raise_on_error=True, allow_redirections=True
4792 ):
4793 raise NotImplementedError(
4794 "send_cluster_commands cannot be executed in transactional context."
4795 )
4797 def multi(self):
4798 if self._explicit_transaction:
4799 raise RedisError("Cannot issue nested calls to MULTI")
4800 if self._command_queue:
4801 raise RedisError(
4802 "Commands without an initial WATCH have already been issued"
4803 )
4804 self._explicit_transaction = True
4806 def watch(self, *names):
4807 if self._explicit_transaction:
4808 raise RedisError("Cannot issue a WATCH after a MULTI")
4810 return self.execute_command("WATCH", *names)
4812 def unwatch(self):
4813 if self._watching:
4814 return self.execute_command("UNWATCH")
4816 return True
4818 def discard(self):
4819 self.reset()
4821 def delete(self, *names):
4822 return self.execute_command("DEL", *names)
4824 def unlink(self, *names):
4825 return self.execute_command("UNLINK", *names)