Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2import random
3import socket
4import sys
5import threading
6import time
7from abc import ABC, abstractmethod
8from collections import OrderedDict
9from copy import copy
10from enum import Enum
11from itertools import chain
12from typing import (
13 Any,
14 Callable,
15 Dict,
16 List,
17 Literal,
18 Optional,
19 Set,
20 Tuple,
21 Union,
22)
24from redis._parsers import CommandsParser, Encoder
25from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy
26from redis._parsers.helpers import parse_scan
27from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
28from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
29from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
30from redis.commands import READ_COMMANDS, RedisClusterCommands
31from redis.commands.helpers import list_or_args
32from redis.commands.policies import PolicyResolver, StaticPolicyResolver
33from redis.connection import (
34 Connection,
35 ConnectionPool,
36 parse_url,
37)
38from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
39from redis.event import (
40 AfterPooledConnectionsInstantiationEvent,
41 AfterPubSubConnectionInstantiationEvent,
42 ClientType,
43 EventDispatcher,
44)
45from redis.exceptions import (
46 AskError,
47 AuthenticationError,
48 ClusterDownError,
49 ClusterError,
50 ConnectionError,
51 CrossSlotTransactionError,
52 DataError,
53 ExecAbortError,
54 InvalidPipelineStack,
55 MaxConnectionsError,
56 MovedError,
57 RedisClusterException,
58 RedisError,
59 ResponseError,
60 SlotNotCoveredError,
61 TimeoutError,
62 TryAgainError,
63 WatchError,
64)
65from redis.lock import Lock
66from redis.maint_notifications import (
67 MaintNotificationsConfig,
68 OSSMaintNotificationsHandler,
69)
70from redis.retry import Retry
71from redis.utils import (
72 check_protocol_version,
73 deprecated_args,
74 deprecated_function,
75 dict_merge,
76 list_keys_to_dict,
77 merge_result,
78 safe_str,
79 str_if_bytes,
80 truncate_text,
81)
83logger = logging.getLogger(__name__)
86def is_debug_log_enabled():
87 return logger.isEnabledFor(logging.DEBUG)
90def get_node_name(host: str, port: Union[str, int]) -> str:
91 return f"{host}:{port}"
94@deprecated_args(
95 allowed_args=["redis_node"],
96 reason="Use get_connection(redis_node) instead",
97 version="5.3.0",
98)
99def get_connection(redis_node: Redis, *args, **options) -> Connection:
100 return redis_node.connection or redis_node.connection_pool.get_connection()
103def parse_scan_result(command, res, **options):
104 cursors = {}
105 ret = []
106 for node_name, response in res.items():
107 cursor, r = parse_scan(response, **options)
108 cursors[node_name] = cursor
109 ret += r
111 return cursors, ret
114def parse_pubsub_numsub(command, res, **options):
115 numsub_d = OrderedDict()
116 for numsub_tups in res.values():
117 for channel, numsubbed in numsub_tups:
118 try:
119 numsub_d[channel] += numsubbed
120 except KeyError:
121 numsub_d[channel] = numsubbed
123 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
124 return ret_numsub
127def parse_cluster_slots(
128 resp: Any, **options: Any
129) -> Dict[Tuple[int, int], Dict[str, Any]]:
130 current_host = options.get("current_host", "")
132 def fix_server(*args: Any) -> Tuple[str, Any]:
133 return str_if_bytes(args[0]) or current_host, args[1]
135 slots = {}
136 for slot in resp:
137 start, end, primary = slot[:3]
138 replicas = slot[3:]
139 slots[start, end] = {
140 "primary": fix_server(*primary),
141 "replicas": [fix_server(*replica) for replica in replicas],
142 }
144 return slots
147def parse_cluster_shards(resp, **options):
148 """
149 Parse CLUSTER SHARDS response.
150 """
151 if isinstance(resp[0], dict):
152 return resp
153 shards = []
154 for x in resp:
155 shard = {"slots": [], "nodes": []}
156 for i in range(0, len(x[1]), 2):
157 shard["slots"].append((x[1][i], (x[1][i + 1])))
158 nodes = x[3]
159 for node in nodes:
160 dict_node = {}
161 for i in range(0, len(node), 2):
162 dict_node[node[i]] = node[i + 1]
163 shard["nodes"].append(dict_node)
164 shards.append(shard)
166 return shards
169def parse_cluster_myshardid(resp, **options):
170 """
171 Parse CLUSTER MYSHARDID response.
172 """
173 return resp.decode("utf-8")
176PRIMARY = "primary"
177REPLICA = "replica"
178SLOT_ID = "slot-id"
180REDIS_ALLOWED_KEYS = (
181 "connection_class",
182 "connection_pool",
183 "connection_pool_class",
184 "client_name",
185 "credential_provider",
186 "db",
187 "decode_responses",
188 "encoding",
189 "encoding_errors",
190 "host",
191 "lib_name",
192 "lib_version",
193 "max_connections",
194 "nodes_flag",
195 "redis_connect_func",
196 "password",
197 "port",
198 "timeout",
199 "queue_class",
200 "retry",
201 "retry_on_timeout",
202 "protocol",
203 "socket_connect_timeout",
204 "socket_keepalive",
205 "socket_keepalive_options",
206 "socket_timeout",
207 "ssl",
208 "ssl_ca_certs",
209 "ssl_ca_data",
210 "ssl_ca_path",
211 "ssl_certfile",
212 "ssl_cert_reqs",
213 "ssl_include_verify_flags",
214 "ssl_exclude_verify_flags",
215 "ssl_keyfile",
216 "ssl_password",
217 "ssl_check_hostname",
218 "unix_socket_path",
219 "username",
220 "cache",
221 "cache_config",
222 "maint_notifications_config",
223)
224KWARGS_DISABLED_KEYS = ("host", "port", "retry")
227def cleanup_kwargs(**kwargs):
228 """
229 Remove unsupported or disabled keys from kwargs
230 """
231 connection_kwargs = {
232 k: v
233 for k, v in kwargs.items()
234 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
235 }
237 return connection_kwargs
240class MaintNotificationsAbstractRedisCluster:
241 """
242 Abstract class for handling maintenance notifications logic.
243 This class is expected to be used as base class together with RedisCluster.
245 This class is intended to be used with multiple inheritance!
247 All logic related to maintenance notifications is encapsulated in this class.
248 """
250 def __init__(
251 self,
252 maint_notifications_config: Optional[MaintNotificationsConfig],
253 **kwargs,
254 ):
255 # Initialize maintenance notifications
256 is_protocol_supported = check_protocol_version(kwargs.get("protocol"), 3)
258 if (
259 maint_notifications_config
260 and maint_notifications_config.enabled
261 and not is_protocol_supported
262 ):
263 raise RedisError(
264 "Maintenance notifications handlers on connection are only supported with RESP version 3"
265 )
266 if maint_notifications_config is None and is_protocol_supported:
267 maint_notifications_config = MaintNotificationsConfig()
269 self.maint_notifications_config = maint_notifications_config
271 if self.maint_notifications_config and self.maint_notifications_config.enabled:
272 self._oss_cluster_maint_notifications_handler = (
273 OSSMaintNotificationsHandler(self, self.maint_notifications_config)
274 )
275 # Update connection kwargs for all future nodes connections
276 self._update_connection_kwargs_for_maint_notifications(
277 self._oss_cluster_maint_notifications_handler
278 )
279 # Update existing nodes connections - they are created as part of the RedisCluster constructor
280 for node in self.get_nodes():
281 if node.redis_connection is None:
282 continue
283 node.redis_connection.connection_pool.update_maint_notifications_config(
284 self.maint_notifications_config,
285 oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler,
286 )
287 else:
288 self._oss_cluster_maint_notifications_handler = None
290 def _update_connection_kwargs_for_maint_notifications(
291 self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler
292 ):
293 """
294 Update the connection kwargs for all future connections.
295 """
296 self.nodes_manager.connection_kwargs.update(
297 {
298 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
299 }
300 )
303class AbstractRedisCluster:
304 RedisClusterRequestTTL = 16
306 PRIMARIES = "primaries"
307 REPLICAS = "replicas"
308 ALL_NODES = "all"
309 RANDOM = "random"
310 DEFAULT_NODE = "default-node"
312 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
314 COMMAND_FLAGS = dict_merge(
315 list_keys_to_dict(
316 [
317 "ACL CAT",
318 "ACL DELUSER",
319 "ACL DRYRUN",
320 "ACL GENPASS",
321 "ACL GETUSER",
322 "ACL HELP",
323 "ACL LIST",
324 "ACL LOG",
325 "ACL LOAD",
326 "ACL SAVE",
327 "ACL SETUSER",
328 "ACL USERS",
329 "ACL WHOAMI",
330 "AUTH",
331 "CLIENT LIST",
332 "CLIENT SETINFO",
333 "CLIENT SETNAME",
334 "CLIENT GETNAME",
335 "CONFIG SET",
336 "CONFIG REWRITE",
337 "CONFIG RESETSTAT",
338 "TIME",
339 "PUBSUB CHANNELS",
340 "PUBSUB NUMPAT",
341 "PUBSUB NUMSUB",
342 "PUBSUB SHARDCHANNELS",
343 "PUBSUB SHARDNUMSUB",
344 "PING",
345 "INFO",
346 "SHUTDOWN",
347 "KEYS",
348 "DBSIZE",
349 "BGSAVE",
350 "SLOWLOG GET",
351 "SLOWLOG LEN",
352 "SLOWLOG RESET",
353 "WAIT",
354 "WAITAOF",
355 "SAVE",
356 "MEMORY PURGE",
357 "MEMORY MALLOC-STATS",
358 "MEMORY STATS",
359 "LASTSAVE",
360 "CLIENT TRACKINGINFO",
361 "CLIENT PAUSE",
362 "CLIENT UNPAUSE",
363 "CLIENT UNBLOCK",
364 "CLIENT ID",
365 "CLIENT REPLY",
366 "CLIENT GETREDIR",
367 "CLIENT INFO",
368 "CLIENT KILL",
369 "READONLY",
370 "CLUSTER INFO",
371 "CLUSTER MEET",
372 "CLUSTER MYSHARDID",
373 "CLUSTER NODES",
374 "CLUSTER REPLICAS",
375 "CLUSTER RESET",
376 "CLUSTER SET-CONFIG-EPOCH",
377 "CLUSTER SLOTS",
378 "CLUSTER SHARDS",
379 "CLUSTER COUNT-FAILURE-REPORTS",
380 "CLUSTER KEYSLOT",
381 "COMMAND",
382 "COMMAND COUNT",
383 "COMMAND LIST",
384 "COMMAND GETKEYS",
385 "CONFIG GET",
386 "DEBUG",
387 "RANDOMKEY",
388 "READONLY",
389 "READWRITE",
390 "TIME",
391 "TFUNCTION LOAD",
392 "TFUNCTION DELETE",
393 "TFUNCTION LIST",
394 "TFCALL",
395 "TFCALLASYNC",
396 "LATENCY HISTORY",
397 "LATENCY LATEST",
398 "LATENCY RESET",
399 "MODULE LIST",
400 "MODULE LOAD",
401 "MODULE UNLOAD",
402 "MODULE LOADEX",
403 ],
404 DEFAULT_NODE,
405 ),
406 list_keys_to_dict(
407 [
408 "FLUSHALL",
409 "FLUSHDB",
410 "FUNCTION DELETE",
411 "FUNCTION FLUSH",
412 "FUNCTION LIST",
413 "FUNCTION LOAD",
414 "FUNCTION RESTORE",
415 "SCAN",
416 "SCRIPT EXISTS",
417 "SCRIPT FLUSH",
418 "SCRIPT LOAD",
419 ],
420 PRIMARIES,
421 ),
422 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
423 list_keys_to_dict(
424 [
425 "CLUSTER COUNTKEYSINSLOT",
426 "CLUSTER DELSLOTS",
427 "CLUSTER DELSLOTSRANGE",
428 "CLUSTER GETKEYSINSLOT",
429 "CLUSTER SETSLOT",
430 ],
431 SLOT_ID,
432 ),
433 )
435 SEARCH_COMMANDS = (
436 [
437 "FT.CREATE",
438 "FT.SEARCH",
439 "FT.AGGREGATE",
440 "FT.EXPLAIN",
441 "FT.EXPLAINCLI",
442 "FT,PROFILE",
443 "FT.ALTER",
444 "FT.DROPINDEX",
445 "FT.ALIASADD",
446 "FT.ALIASUPDATE",
447 "FT.ALIASDEL",
448 "FT.TAGVALS",
449 "FT.SUGADD",
450 "FT.SUGGET",
451 "FT.SUGDEL",
452 "FT.SUGLEN",
453 "FT.SYNUPDATE",
454 "FT.SYNDUMP",
455 "FT.SPELLCHECK",
456 "FT.DICTADD",
457 "FT.DICTDEL",
458 "FT.DICTDUMP",
459 "FT.INFO",
460 "FT._LIST",
461 "FT.CONFIG",
462 "FT.ADD",
463 "FT.DEL",
464 "FT.DROP",
465 "FT.GET",
466 "FT.MGET",
467 "FT.SYNADD",
468 ],
469 )
471 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
472 "CLUSTER SLOTS": parse_cluster_slots,
473 "CLUSTER SHARDS": parse_cluster_shards,
474 "CLUSTER MYSHARDID": parse_cluster_myshardid,
475 }
477 RESULT_CALLBACKS = dict_merge(
478 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
479 list_keys_to_dict(
480 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
481 ),
482 list_keys_to_dict(
483 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
484 ),
485 list_keys_to_dict(
486 [
487 "PING",
488 "CONFIG SET",
489 "CONFIG REWRITE",
490 "CONFIG RESETSTAT",
491 "CLIENT SETNAME",
492 "BGSAVE",
493 "SLOWLOG RESET",
494 "SAVE",
495 "MEMORY PURGE",
496 "CLIENT PAUSE",
497 "CLIENT UNPAUSE",
498 ],
499 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
500 ),
501 list_keys_to_dict(
502 ["DBSIZE", "WAIT"],
503 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
504 ),
505 list_keys_to_dict(
506 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
507 ),
508 list_keys_to_dict(["SCAN"], parse_scan_result),
509 list_keys_to_dict(
510 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
511 ),
512 list_keys_to_dict(
513 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
514 ),
515 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
516 )
518 ERRORS_ALLOW_RETRY = (
519 ConnectionError,
520 TimeoutError,
521 ClusterDownError,
522 SlotNotCoveredError,
523 )
525 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
526 """Replace the default cluster node.
527 A random cluster node will be chosen if target_node isn't passed, and primaries
528 will be prioritized. The default node will not be changed if there are no other
529 nodes in the cluster.
531 Args:
532 target_node (ClusterNode, optional): Target node to replace the default
533 node. Defaults to None.
534 """
535 if target_node:
536 self.nodes_manager.default_node = target_node
537 else:
538 curr_node = self.get_default_node()
539 primaries = [node for node in self.get_primaries() if node != curr_node]
540 if primaries:
541 # Choose a primary if the cluster contains different primaries
542 self.nodes_manager.default_node = random.choice(primaries)
543 else:
544 # Otherwise, choose a primary if the cluster contains different primaries
545 replicas = [node for node in self.get_replicas() if node != curr_node]
546 if replicas:
547 self.nodes_manager.default_node = random.choice(replicas)
550class RedisCluster(
551 AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands
552):
553 @classmethod
554 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
555 """
556 Return a Redis client object configured from the given URL
558 For example::
560 redis://[[username]:[password]]@localhost:6379/0
561 rediss://[[username]:[password]]@localhost:6379/0
562 unix://[username@]/path/to/socket.sock?db=0[&password=password]
564 Three URL schemes are supported:
566 - `redis://` creates a TCP socket connection. See more at:
567 <https://www.iana.org/assignments/uri-schemes/prov/redis>
568 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
569 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
570 - ``unix://``: creates a Unix Domain Socket connection.
572 The username, password, hostname, path and all querystring values
573 are passed through urllib.parse.unquote in order to replace any
574 percent-encoded values with their corresponding characters.
576 There are several ways to specify a database number. The first value
577 found will be used:
579 1. A ``db`` querystring option, e.g. redis://localhost?db=0
580 2. If using the redis:// or rediss:// schemes, the path argument
581 of the url, e.g. redis://localhost/0
582 3. A ``db`` keyword argument to this function.
584 If none of these options are specified, the default db=0 is used.
586 All querystring options are cast to their appropriate Python types.
587 Boolean arguments can be specified with string values "True"/"False"
588 or "Yes"/"No". Values that cannot be properly cast cause a
589 ``ValueError`` to be raised. Once parsed, the querystring arguments
590 and keyword arguments are passed to the ``ConnectionPool``'s
591 class initializer. In the case of conflicting arguments, querystring
592 arguments always win.
594 """
595 return cls(url=url, **kwargs)
597 @deprecated_args(
598 args_to_warn=["read_from_replicas"],
599 reason="Please configure the 'load_balancing_strategy' instead",
600 version="5.3.0",
601 )
602 @deprecated_args(
603 args_to_warn=[
604 "cluster_error_retry_attempts",
605 ],
606 reason="Please configure the 'retry' object instead",
607 version="6.0.0",
608 )
609 def __init__(
610 self,
611 host: Optional[str] = None,
612 port: int = 6379,
613 startup_nodes: Optional[List["ClusterNode"]] = None,
614 cluster_error_retry_attempts: int = 3,
615 retry: Optional["Retry"] = None,
616 require_full_coverage: bool = True,
617 reinitialize_steps: int = 5,
618 read_from_replicas: bool = False,
619 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
620 dynamic_startup_nodes: bool = True,
621 url: Optional[str] = None,
622 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
623 cache: Optional[CacheInterface] = None,
624 cache_config: Optional[CacheConfig] = None,
625 event_dispatcher: Optional[EventDispatcher] = None,
626 policy_resolver: PolicyResolver = StaticPolicyResolver(),
627 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
628 **kwargs,
629 ):
630 """
631 Initialize a new RedisCluster client.
633 :param startup_nodes:
634 List of nodes from which initial bootstrapping can be done
635 :param host:
636 Can be used to point to a startup node
637 :param port:
638 Can be used to point to a startup node
639 :param require_full_coverage:
640 When set to False (default value): the client will not require a
641 full coverage of the slots. However, if not all slots are covered,
642 and at least one node has 'cluster-require-full-coverage' set to
643 'yes,' the server will throw a ClusterDownError for some key-based
644 commands. See -
645 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
646 When set to True: all slots must be covered to construct the
647 cluster client. If not all slots are covered, RedisClusterException
648 will be thrown.
649 :param read_from_replicas:
650 @deprecated - please use load_balancing_strategy instead
651 Enable read from replicas in READONLY mode. You can read possibly
652 stale data.
653 When set to true, read commands will be assigned between the
654 primary and its replications in a Round-Robin manner.
655 :param load_balancing_strategy:
656 Enable read from replicas in READONLY mode and defines the load balancing
657 strategy that will be used for cluster node selection.
658 The data read from replicas is eventually consistent with the data in primary nodes.
659 :param dynamic_startup_nodes:
660 Set the RedisCluster's startup nodes to all of the discovered nodes.
661 If true (default value), the cluster's discovered nodes will be used to
662 determine the cluster nodes-slots mapping in the next topology refresh.
663 It will remove the initial passed startup nodes if their endpoints aren't
664 listed in the CLUSTER SLOTS output.
665 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
666 specific IP addresses, it is best to set it to false.
667 :param cluster_error_retry_attempts:
668 @deprecated - Please configure the 'retry' object instead
669 In case 'retry' object is set - this argument is ignored!
671 Number of times to retry before raising an error when
672 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
673 :class:`~.ClusterDownError` are encountered
674 :param retry:
675 A retry object that defines the retry strategy and the number of
676 retries for the cluster client.
677 In current implementation for the cluster client (starting form redis-py version 6.0.0)
678 the retry object is not yet fully utilized, instead it is used just to determine
679 the number of retries for the cluster client.
680 In the future releases the retry object will be used to handle the cluster client retries!
681 :param reinitialize_steps:
682 Specifies the number of MOVED errors that need to occur before
683 reinitializing the whole cluster topology. If a MOVED error occurs
684 and the cluster does not need to be reinitialized on this current
685 error handling, only the MOVED slot will be patched with the
686 redirected node.
687 To reinitialize the cluster on every MOVED error, set
688 reinitialize_steps to 1.
689 To avoid reinitializing the cluster on moved errors, set
690 reinitialize_steps to 0.
691 :param address_remap:
692 An optional callable which, when provided with an internal network
693 address of a node, e.g. a `(host, port)` tuple, will return the address
694 where the node is reachable. This can be used to map the addresses at
695 which the nodes _think_ they are, to addresses at which a client may
696 reach them, such as when they sit behind a proxy.
698 :param maint_notifications_config:
699 Configures the nodes connections to support maintenance notifications - see
700 `redis.maint_notifications.MaintNotificationsConfig` for details.
701 Only supported with RESP3.
702 If not provided and protocol is RESP3, the maintenance notifications
703 will be enabled by default (logic is included in the NodesManager
704 initialization).
705 :**kwargs:
706 Extra arguments that will be sent into Redis instance when created
707 (See Official redis-py doc for supported kwargs - the only limitation
708 is that you can't provide 'retry' object as part of kwargs.
709 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
710 Some kwargs are not supported and will raise a
711 RedisClusterException:
712 - db (Redis do not support database SELECT in cluster mode)
714 """
715 if startup_nodes is None:
716 startup_nodes = []
718 if "db" in kwargs:
719 # Argument 'db' is not possible to use in cluster mode
720 raise RedisClusterException(
721 "Argument 'db' is not possible to use in cluster mode"
722 )
724 if "retry" in kwargs:
725 # Argument 'retry' is not possible to be used in kwargs when in cluster mode
726 # the kwargs are set to the lower level connections to the cluster nodes
727 # and there we provide retry configuration without retries allowed.
728 # The retries should be handled on cluster client level.
729 raise RedisClusterException(
730 "The 'retry' argument cannot be used in kwargs when running in cluster mode."
731 )
733 # Get the startup node/s
734 from_url = False
735 if url is not None:
736 from_url = True
737 url_options = parse_url(url)
738 if "path" in url_options:
739 raise RedisClusterException(
740 "RedisCluster does not currently support Unix Domain "
741 "Socket connections"
742 )
743 if "db" in url_options and url_options["db"] != 0:
744 # Argument 'db' is not possible to use in cluster mode
745 raise RedisClusterException(
746 "A ``db`` querystring option can only be 0 in cluster mode"
747 )
748 kwargs.update(url_options)
749 host = kwargs.get("host")
750 port = kwargs.get("port", port)
751 startup_nodes.append(ClusterNode(host, port))
752 elif host is not None and port is not None:
753 startup_nodes.append(ClusterNode(host, port))
754 elif len(startup_nodes) == 0:
755 # No startup node was provided
756 raise RedisClusterException(
757 "RedisCluster requires at least one node to discover the "
758 "cluster. Please provide one of the followings:\n"
759 "1. host and port, for example:\n"
760 " RedisCluster(host='localhost', port=6379)\n"
761 "2. list of startup nodes, for example:\n"
762 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
763 " ClusterNode('localhost', 6378)])"
764 )
765 # Update the connection arguments
766 # Whenever a new connection is established, RedisCluster's on_connect
767 # method should be run
768 # If the user passed on_connect function we'll save it and run it
769 # inside the RedisCluster.on_connect() function
770 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
771 kwargs.update({"redis_connect_func": self.on_connect})
772 kwargs = cleanup_kwargs(**kwargs)
773 if retry:
774 self.retry = retry
775 else:
776 self.retry = Retry(
777 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
778 retries=cluster_error_retry_attempts,
779 )
781 self.encoder = Encoder(
782 kwargs.get("encoding", "utf-8"),
783 kwargs.get("encoding_errors", "strict"),
784 kwargs.get("decode_responses", False),
785 )
786 protocol = kwargs.get("protocol", None)
787 if (cache_config or cache) and not check_protocol_version(protocol, 3):
788 raise RedisError("Client caching is only supported with RESP version 3")
790 if maint_notifications_config and not check_protocol_version(protocol, 3):
791 raise RedisError(
792 "Maintenance notifications are only supported with RESP version 3"
793 )
794 if check_protocol_version(protocol, 3) and maint_notifications_config is None:
795 maint_notifications_config = MaintNotificationsConfig()
797 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
798 self.node_flags = self.__class__.NODE_FLAGS.copy()
799 self.read_from_replicas = read_from_replicas
800 self.load_balancing_strategy = load_balancing_strategy
801 self.reinitialize_counter = 0
802 self.reinitialize_steps = reinitialize_steps
803 if event_dispatcher is None:
804 self._event_dispatcher = EventDispatcher()
805 else:
806 self._event_dispatcher = event_dispatcher
807 self.startup_nodes = startup_nodes
809 self.nodes_manager = NodesManager(
810 startup_nodes=startup_nodes,
811 from_url=from_url,
812 require_full_coverage=require_full_coverage,
813 dynamic_startup_nodes=dynamic_startup_nodes,
814 address_remap=address_remap,
815 cache=cache,
816 cache_config=cache_config,
817 event_dispatcher=self._event_dispatcher,
818 maint_notifications_config=maint_notifications_config,
819 **kwargs,
820 )
822 self.cluster_response_callbacks = CaseInsensitiveDict(
823 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
824 )
825 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
827 # For backward compatibility, mapping from existing policies to new one
828 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
829 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
830 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
831 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
832 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
833 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
834 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
835 }
837 self._policies_callback_mapping: dict[
838 Union[RequestPolicy, ResponsePolicy], Callable
839 ] = {
840 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
841 self.get_random_primary_or_all_nodes(command_name)
842 ],
843 RequestPolicy.DEFAULT_KEYED: lambda command,
844 *args: self.get_nodes_from_slot(command, *args),
845 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
846 RequestPolicy.ALL_SHARDS: self.get_primaries,
847 RequestPolicy.ALL_NODES: self.get_nodes,
848 RequestPolicy.ALL_REPLICAS: self.get_replicas,
849 RequestPolicy.MULTI_SHARD: lambda *args,
850 **kwargs: self._split_multi_shard_command(*args, **kwargs),
851 RequestPolicy.SPECIAL: self.get_special_nodes,
852 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
853 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
854 }
856 self._policy_resolver = policy_resolver
857 self.commands_parser = CommandsParser(self)
859 # Node where FT.AGGREGATE command is executed.
860 self._aggregate_nodes = None
861 self._lock = threading.RLock()
863 MaintNotificationsAbstractRedisCluster.__init__(
864 self, maint_notifications_config, **kwargs
865 )
867 def __enter__(self):
868 return self
870 def __exit__(self, exc_type, exc_value, traceback):
871 self.close()
873 def __del__(self):
874 try:
875 self.close()
876 except Exception:
877 pass
879 def disconnect_connection_pools(self):
880 for node in self.get_nodes():
881 if node.redis_connection:
882 try:
883 node.redis_connection.connection_pool.disconnect()
884 except OSError:
885 # Client was already disconnected. do nothing
886 pass
888 def on_connect(self, connection):
889 """
890 Initialize the connection, authenticate and select a database and send
891 READONLY if it is set during object initialization.
892 """
893 connection.on_connect()
895 if self.read_from_replicas or self.load_balancing_strategy:
896 # Sending READONLY command to server to configure connection as
897 # readonly. Since each cluster node may change its server type due
898 # to a failover, we should establish a READONLY connection
899 # regardless of the server type. If this is a primary connection,
900 # READONLY would not affect executing write commands.
901 connection.send_command("READONLY")
902 if str_if_bytes(connection.read_response()) != "OK":
903 raise ConnectionError("READONLY command failed")
905 if self.user_on_connect_func is not None:
906 self.user_on_connect_func(connection)
908 def get_redis_connection(self, node: "ClusterNode") -> Redis:
909 if not node.redis_connection:
910 with self._lock:
911 if not node.redis_connection:
912 self.nodes_manager.create_redis_connections([node])
913 return node.redis_connection
915 def get_node(self, host=None, port=None, node_name=None):
916 return self.nodes_manager.get_node(host, port, node_name)
918 def get_primaries(self):
919 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
921 def get_replicas(self):
922 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
924 def get_random_node(self):
925 return random.choice(list(self.nodes_manager.nodes_cache.values()))
927 def get_random_primary_or_all_nodes(self, command_name):
928 """
929 Returns random primary or all nodes depends on READONLY mode.
930 """
931 if self.read_from_replicas and command_name in READ_COMMANDS:
932 return self.get_random_node()
934 return self.get_random_primary_node()
936 def get_nodes(self):
937 return list(self.nodes_manager.nodes_cache.values())
939 def get_node_from_key(self, key, replica=False):
940 """
941 Get the node that holds the key's slot.
942 If replica set to True but the slot doesn't have any replicas, None is
943 returned.
944 """
945 slot = self.keyslot(key)
946 slot_cache = self.nodes_manager.slots_cache.get(slot)
947 if slot_cache is None or len(slot_cache) == 0:
948 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
949 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
950 return None
951 elif replica:
952 node_idx = 1
953 else:
954 # primary
955 node_idx = 0
957 return slot_cache[node_idx]
959 def get_default_node(self):
960 """
961 Get the cluster's default node
962 """
963 return self.nodes_manager.default_node
965 def get_nodes_from_slot(self, command: str, *args):
966 """
967 Returns a list of nodes that hold the specified keys' slots.
968 """
969 # get the node that holds the key's slot
970 slot = self.determine_slot(*args)
971 node = self.nodes_manager.get_node_from_slot(
972 slot,
973 self.read_from_replicas and command in READ_COMMANDS,
974 self.load_balancing_strategy if command in READ_COMMANDS else None,
975 )
976 return [node]
978 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]:
979 """
980 Splits the command with Multi-Shard policy, to the multiple commands
981 """
982 keys = self._get_command_keys(*args)
983 commands = []
985 for key in keys:
986 commands.append(
987 {
988 "args": (args[0], key),
989 "kwargs": kwargs,
990 }
991 )
993 return commands
995 def get_special_nodes(self) -> Optional[list["ClusterNode"]]:
996 """
997 Returns a list of nodes for commands with a special policy.
998 """
999 if not self._aggregate_nodes:
1000 raise RedisClusterException(
1001 "Cannot execute FT.CURSOR commands without FT.AGGREGATE"
1002 )
1004 return self._aggregate_nodes
1006 def get_random_primary_node(self) -> "ClusterNode":
1007 """
1008 Returns a random primary node
1009 """
1010 return random.choice(self.get_primaries())
1012 def _evaluate_all_succeeded(self, res):
1013 """
1014 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED
1015 """
1016 first_successful_response = None
1018 if isinstance(res, dict):
1019 for key, value in res.items():
1020 if value:
1021 if first_successful_response is None:
1022 first_successful_response = {key: value}
1023 else:
1024 return {key: False}
1025 else:
1026 for response in res:
1027 if response:
1028 if first_successful_response is None:
1029 # Dynamically resolve type
1030 first_successful_response = type(response)(response)
1031 else:
1032 return type(response)(False)
1034 return first_successful_response
1036 def set_default_node(self, node):
1037 """
1038 Set the default node of the cluster.
1039 :param node: 'ClusterNode'
1040 :return True if the default node was set, else False
1041 """
1042 if node is None or self.get_node(node_name=node.name) is None:
1043 return False
1044 self.nodes_manager.default_node = node
1045 return True
1047 def set_retry(self, retry: Retry) -> None:
1048 self.retry = retry
1050 def monitor(self, target_node=None):
1051 """
1052 Returns a Monitor object for the specified target node.
1053 The default cluster node will be selected if no target node was
1054 specified.
1055 Monitor is useful for handling the MONITOR command to the redis server.
1056 next_command() method returns one command from monitor
1057 listen() method yields commands from monitor.
1058 """
1059 if target_node is None:
1060 target_node = self.get_default_node()
1061 if target_node.redis_connection is None:
1062 raise RedisClusterException(
1063 f"Cluster Node {target_node.name} has no redis_connection"
1064 )
1065 return target_node.redis_connection.monitor()
1067 def pubsub(self, node=None, host=None, port=None, **kwargs):
1068 """
1069 Allows passing a ClusterNode, or host&port, to get a pubsub instance
1070 connected to the specified node
1071 """
1072 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
1074 def pipeline(self, transaction=None, shard_hint=None):
1075 """
1076 Cluster impl:
1077 Pipelines do not work in cluster mode the same way they
1078 do in normal mode. Create a clone of this object so
1079 that simulating pipelines will work correctly. Each
1080 command will be called directly when used and
1081 when calling execute() will only return the result stack.
1082 """
1083 if shard_hint:
1084 raise RedisClusterException("shard_hint is deprecated in cluster mode")
1086 return ClusterPipeline(
1087 nodes_manager=self.nodes_manager,
1088 commands_parser=self.commands_parser,
1089 startup_nodes=self.nodes_manager.startup_nodes,
1090 result_callbacks=self.result_callbacks,
1091 cluster_response_callbacks=self.cluster_response_callbacks,
1092 read_from_replicas=self.read_from_replicas,
1093 load_balancing_strategy=self.load_balancing_strategy,
1094 reinitialize_steps=self.reinitialize_steps,
1095 retry=self.retry,
1096 lock=self._lock,
1097 transaction=transaction,
1098 )
1100 def lock(
1101 self,
1102 name,
1103 timeout=None,
1104 sleep=0.1,
1105 blocking=True,
1106 blocking_timeout=None,
1107 lock_class=None,
1108 thread_local=True,
1109 raise_on_release_error: bool = True,
1110 ):
1111 """
1112 Return a new Lock object using key ``name`` that mimics
1113 the behavior of threading.Lock.
1115 If specified, ``timeout`` indicates a maximum life for the lock.
1116 By default, it will remain locked until release() is called.
1118 ``sleep`` indicates the amount of time to sleep per loop iteration
1119 when the lock is in blocking mode and another client is currently
1120 holding the lock.
1122 ``blocking`` indicates whether calling ``acquire`` should block until
1123 the lock has been acquired or to fail immediately, causing ``acquire``
1124 to return False and the lock not being acquired. Defaults to True.
1125 Note this value can be overridden by passing a ``blocking``
1126 argument to ``acquire``.
1128 ``blocking_timeout`` indicates the maximum amount of time in seconds to
1129 spend trying to acquire the lock. A value of ``None`` indicates
1130 continue trying forever. ``blocking_timeout`` can be specified as a
1131 float or integer, both representing the number of seconds to wait.
1133 ``lock_class`` forces the specified lock implementation. Note that as
1134 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
1135 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
1136 you have created your own custom lock class.
1138 ``thread_local`` indicates whether the lock token is placed in
1139 thread-local storage. By default, the token is placed in thread local
1140 storage so that a thread only sees its token, not a token set by
1141 another thread. Consider the following timeline:
1143 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
1144 thread-1 sets the token to "abc"
1145 time: 1, thread-2 blocks trying to acquire `my-lock` using the
1146 Lock instance.
1147 time: 5, thread-1 has not yet completed. redis expires the lock
1148 key.
1149 time: 5, thread-2 acquired `my-lock` now that it's available.
1150 thread-2 sets the token to "xyz"
1151 time: 6, thread-1 finishes its work and calls release(). if the
1152 token is *not* stored in thread local storage, then
1153 thread-1 would see the token value as "xyz" and would be
1154 able to successfully release the thread-2's lock.
1156 ``raise_on_release_error`` indicates whether to raise an exception when
1157 the lock is no longer owned when exiting the context manager. By default,
1158 this is True, meaning an exception will be raised. If False, the warning
1159 will be logged and the exception will be suppressed.
1161 In some use cases it's necessary to disable thread local storage. For
1162 example, if you have code where one thread acquires a lock and passes
1163 that lock instance to a worker thread to release later. If thread
1164 local storage isn't disabled in this case, the worker thread won't see
1165 the token set by the thread that acquired the lock. Our assumption
1166 is that these cases aren't common and as such default to using
1167 thread local storage."""
1168 if lock_class is None:
1169 lock_class = Lock
1170 return lock_class(
1171 self,
1172 name,
1173 timeout=timeout,
1174 sleep=sleep,
1175 blocking=blocking,
1176 blocking_timeout=blocking_timeout,
1177 thread_local=thread_local,
1178 raise_on_release_error=raise_on_release_error,
1179 )
1181 def set_response_callback(self, command, callback):
1182 """Set a custom Response Callback"""
1183 self.cluster_response_callbacks[command] = callback
1185 def _determine_nodes(
1186 self, *args, request_policy: RequestPolicy, **kwargs
1187 ) -> List["ClusterNode"]:
1188 """
1189 Determines a nodes the command should be executed on.
1190 """
1191 command = args[0].upper()
1192 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1193 command = f"{args[0]} {args[1]}".upper()
1195 nodes_flag = kwargs.pop("nodes_flag", None)
1196 if nodes_flag is not None:
1197 # nodes flag passed by the user
1198 command_flag = nodes_flag
1199 else:
1200 # get the nodes group for this command if it was predefined
1201 command_flag = self.command_flags.get(command)
1203 if command_flag in self._command_flags_mapping:
1204 request_policy = self._command_flags_mapping[command_flag]
1206 policy_callback = self._policies_callback_mapping[request_policy]
1208 if request_policy == RequestPolicy.DEFAULT_KEYED:
1209 nodes = policy_callback(command, *args)
1210 elif request_policy == RequestPolicy.MULTI_SHARD:
1211 nodes = policy_callback(*args, **kwargs)
1212 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
1213 nodes = policy_callback(args[0])
1214 else:
1215 nodes = policy_callback()
1217 if args[0].lower() == "ft.aggregate":
1218 self._aggregate_nodes = nodes
1220 return nodes
1222 def _should_reinitialized(self):
1223 # To reinitialize the cluster on every MOVED error,
1224 # set reinitialize_steps to 1.
1225 # To avoid reinitializing the cluster on moved errors, set
1226 # reinitialize_steps to 0.
1227 if self.reinitialize_steps == 0:
1228 return False
1229 else:
1230 return self.reinitialize_counter % self.reinitialize_steps == 0
1232 def keyslot(self, key):
1233 """
1234 Calculate keyslot for a given key.
1235 See Keys distribution model in https://redis.io/topics/cluster-spec
1236 """
1237 k = self.encoder.encode(key)
1238 return key_slot(k)
1240 def _get_command_keys(self, *args):
1241 """
1242 Get the keys in the command. If the command has no keys in in, None is
1243 returned.
1245 NOTE: Due to a bug in redis<7.0, this function does not work properly
1246 for EVAL or EVALSHA when the `numkeys` arg is 0.
1247 - issue: https://github.com/redis/redis/issues/9493
1248 - fix: https://github.com/redis/redis/pull/9733
1250 So, don't use this function with EVAL or EVALSHA.
1251 """
1252 redis_conn = self.get_default_node().redis_connection
1253 return self.commands_parser.get_keys(redis_conn, *args)
1255 def determine_slot(self, *args) -> Optional[int]:
1256 """
1257 Figure out what slot to use based on args.
1259 Raises a RedisClusterException if there's a missing key and we can't
1260 determine what slots to map the command to; or, if the keys don't
1261 all map to the same key slot.
1262 """
1263 command = args[0]
1264 if self.command_flags.get(command) == SLOT_ID:
1265 # The command contains the slot ID
1266 return args[1]
1268 # Get the keys in the command
1270 # CLIENT TRACKING is a special case.
1271 # It doesn't have any keys, it needs to be sent to the provided nodes
1272 # By default it will be sent to all nodes.
1273 if command.upper() == "CLIENT TRACKING":
1274 return None
1276 # EVAL and EVALSHA are common enough that it's wasteful to go to the
1277 # redis server to parse the keys. Besides, there is a bug in redis<7.0
1278 # where `self._get_command_keys()` fails anyway. So, we special case
1279 # EVAL/EVALSHA.
1280 if command.upper() in ("EVAL", "EVALSHA"):
1281 # command syntax: EVAL "script body" num_keys ...
1282 if len(args) <= 2:
1283 raise RedisClusterException(f"Invalid args in command: {args}")
1284 num_actual_keys = int(args[2])
1285 eval_keys = args[3 : 3 + num_actual_keys]
1286 # if there are 0 keys, that means the script can be run on any node
1287 # so we can just return a random slot
1288 if len(eval_keys) == 0:
1289 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1290 keys = eval_keys
1291 else:
1292 keys = self._get_command_keys(*args)
1293 if keys is None or len(keys) == 0:
1294 # FCALL can call a function with 0 keys, that means the function
1295 # can be run on any node so we can just return a random slot
1296 if command.upper() in ("FCALL", "FCALL_RO"):
1297 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1298 raise RedisClusterException(
1299 "No way to dispatch this command to Redis Cluster. "
1300 "Missing key.\nYou can execute the command by specifying "
1301 f"target nodes.\nCommand: {args}"
1302 )
1304 # single key command
1305 if len(keys) == 1:
1306 return self.keyslot(keys[0])
1308 # multi-key command; we need to make sure all keys are mapped to
1309 # the same slot
1310 slots = {self.keyslot(key) for key in keys}
1311 if len(slots) != 1:
1312 raise RedisClusterException(
1313 f"{command} - all keys must map to the same key slot"
1314 )
1316 return slots.pop()
1318 def get_encoder(self):
1319 """
1320 Get the connections' encoder
1321 """
1322 return self.encoder
1324 def get_connection_kwargs(self):
1325 """
1326 Get the connections' key-word arguments
1327 """
1328 return self.nodes_manager.connection_kwargs
1330 def _is_nodes_flag(self, target_nodes):
1331 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1333 def _parse_target_nodes(self, target_nodes):
1334 if isinstance(target_nodes, list):
1335 nodes = target_nodes
1336 elif isinstance(target_nodes, ClusterNode):
1337 # Supports passing a single ClusterNode as a variable
1338 nodes = [target_nodes]
1339 elif isinstance(target_nodes, dict):
1340 # Supports dictionaries of the format {node_name: node}.
1341 # It enables to execute commands with multi nodes as follows:
1342 # rc.cluster_save_config(rc.get_primaries())
1343 nodes = target_nodes.values()
1344 else:
1345 raise TypeError(
1346 "target_nodes type can be one of the following: "
1347 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1348 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1349 f"The passed type is {type(target_nodes)}"
1350 )
1351 return nodes
1353 def execute_command(self, *args, **kwargs):
1354 return self._internal_execute_command(*args, **kwargs)
1356 def _internal_execute_command(self, *args, **kwargs):
1357 """
1358 Wrapper for ERRORS_ALLOW_RETRY error handling.
1360 It will try the number of times specified by the retries property from
1361 config option "self.retry" which defaults to 3 unless manually
1362 configured.
1364 If it reaches the number of times, the command will raise the exception
1366 Key argument :target_nodes: can be passed with the following types:
1367 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1368 ClusterNode
1369 list<ClusterNode>
1370 dict<Any, ClusterNode>
1371 """
1372 target_nodes_specified = False
1373 is_default_node = False
1374 target_nodes = None
1375 passed_targets = kwargs.pop("target_nodes", None)
1376 command_policies = self._policy_resolver.resolve(args[0].lower())
1378 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1379 target_nodes = self._parse_target_nodes(passed_targets)
1380 target_nodes_specified = True
1382 if not command_policies and not target_nodes_specified:
1383 command = args[0].upper()
1384 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1385 command = f"{args[0]} {args[1]}".upper()
1387 # We only could resolve key properties if command is not
1388 # in a list of pre-defined request policies
1389 command_flag = self.command_flags.get(command)
1390 if not command_flag:
1391 # Fallback to default policy
1392 if not self.get_default_node():
1393 slot = None
1394 else:
1395 slot = self.determine_slot(*args)
1396 if slot is None:
1397 command_policies = CommandPolicies()
1398 else:
1399 command_policies = CommandPolicies(
1400 request_policy=RequestPolicy.DEFAULT_KEYED,
1401 response_policy=ResponsePolicy.DEFAULT_KEYED,
1402 )
1403 else:
1404 if command_flag in self._command_flags_mapping:
1405 command_policies = CommandPolicies(
1406 request_policy=self._command_flags_mapping[command_flag]
1407 )
1408 else:
1409 command_policies = CommandPolicies()
1410 elif not command_policies and target_nodes_specified:
1411 command_policies = CommandPolicies()
1413 # If an error that allows retrying was thrown, the nodes and slots
1414 # cache were reinitialized. We will retry executing the command with
1415 # the updated cluster setup only when the target nodes can be
1416 # determined again with the new cache tables. Therefore, when target
1417 # nodes were passed to this function, we cannot retry the command
1418 # execution since the nodes may not be valid anymore after the tables
1419 # were reinitialized. So in case of passed target nodes,
1420 # retry_attempts will be set to 0.
1421 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
1422 # Add one for the first execution
1423 execute_attempts = 1 + retry_attempts
1424 for _ in range(execute_attempts):
1425 try:
1426 res = {}
1427 if not target_nodes_specified:
1428 # Determine the nodes to execute the command on
1429 target_nodes = self._determine_nodes(
1430 *args,
1431 request_policy=command_policies.request_policy,
1432 nodes_flag=passed_targets,
1433 )
1435 if not target_nodes:
1436 raise RedisClusterException(
1437 f"No targets were found to execute {args} command on"
1438 )
1439 if (
1440 len(target_nodes) == 1
1441 and target_nodes[0] == self.get_default_node()
1442 ):
1443 is_default_node = True
1444 for node in target_nodes:
1445 res[node.name] = self._execute_command(node, *args, **kwargs)
1447 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED:
1448 break
1450 # Return the processed result
1451 return self._process_result(
1452 args[0],
1453 res,
1454 response_policy=command_policies.response_policy,
1455 **kwargs,
1456 )
1457 except Exception as e:
1458 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1459 if is_default_node:
1460 # Replace the default cluster node
1461 self.replace_default_node()
1462 # The nodes and slots cache were reinitialized.
1463 # Try again with the new cluster setup.
1464 retry_attempts -= 1
1465 continue
1466 else:
1467 # raise the exception
1468 raise e
1470 def _execute_command(self, target_node, *args, **kwargs):
1471 """
1472 Send a command to a node in the cluster
1473 """
1474 command = args[0]
1475 redis_node = None
1476 connection = None
1477 redirect_addr = None
1478 asking = False
1479 moved = False
1480 ttl = int(self.RedisClusterRequestTTL)
1482 while ttl > 0:
1483 ttl -= 1
1484 try:
1485 if asking:
1486 target_node = self.get_node(node_name=redirect_addr)
1487 elif moved:
1488 # MOVED occurred and the slots cache was updated,
1489 # refresh the target node
1490 slot = self.determine_slot(*args)
1491 target_node = self.nodes_manager.get_node_from_slot(
1492 slot,
1493 self.read_from_replicas and command in READ_COMMANDS,
1494 self.load_balancing_strategy
1495 if command in READ_COMMANDS
1496 else None,
1497 )
1498 moved = False
1500 redis_node = self.get_redis_connection(target_node)
1501 connection = get_connection(redis_node)
1502 if asking:
1503 connection.send_command("ASKING")
1504 redis_node.parse_response(connection, "ASKING", **kwargs)
1505 asking = False
1506 connection.send_command(*args, **kwargs)
1507 response = redis_node.parse_response(connection, command, **kwargs)
1509 # Remove keys entry, it needs only for cache.
1510 kwargs.pop("keys", None)
1512 if command in self.cluster_response_callbacks:
1513 response = self.cluster_response_callbacks[command](
1514 response, **kwargs
1515 )
1516 return response
1517 except AuthenticationError:
1518 raise
1519 except MaxConnectionsError:
1520 # MaxConnectionsError indicates client-side resource exhaustion
1521 # (too many connections in the pool), not a node failure.
1522 # Don't treat this as a node failure - just re-raise the error
1523 # without reinitializing the cluster.
1524 raise
1525 except (ConnectionError, TimeoutError) as e:
1526 # ConnectionError can also be raised if we couldn't get a
1527 # connection from the pool before timing out, so check that
1528 # this is an actual connection before attempting to disconnect.
1529 if connection is not None:
1530 connection.disconnect()
1532 # Instead of setting to None, properly handle the pool
1533 # Get the pool safely - redis_connection could be set to None
1534 # by another thread between the check and access
1535 redis_conn = target_node.redis_connection
1536 if redis_conn is not None:
1537 pool = redis_conn.connection_pool
1538 if pool is not None:
1539 with pool._lock:
1540 # take care for the active connections in the pool
1541 pool.update_active_connections_for_reconnect()
1542 # disconnect all free connections
1543 pool.disconnect_free_connections()
1545 # Move the failed node to the end of the cached nodes list
1546 self.nodes_manager.move_node_to_end_of_cached_nodes(target_node.name)
1548 # DON'T set redis_connection = None - keep the pool for reuse
1549 self.nodes_manager.initialize()
1550 raise e
1551 except MovedError as e:
1552 if is_debug_log_enabled():
1553 socket_address = self._extracts_socket_address(connection)
1554 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1555 logger.debug(
1556 f"MOVED error received for command {args_log_str}, on node {target_node.name}, "
1557 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1558 )
1559 # First, we will try to patch the slots/nodes cache with the
1560 # redirected node output and try again. If MovedError exceeds
1561 # 'reinitialize_steps' number of times, we will force
1562 # reinitializing the tables, and then try again.
1563 # 'reinitialize_steps' counter will increase faster when
1564 # the same client object is shared between multiple threads. To
1565 # reduce the frequency you can set this variable in the
1566 # RedisCluster constructor.
1567 self.reinitialize_counter += 1
1568 if self._should_reinitialized():
1569 # during this call all connections are closed or marked for disconnect,
1570 # so we don't need to disconnect the changed node's connections
1571 self.nodes_manager.initialize(
1572 additional_startup_nodes_info=[(e.host, e.port)]
1573 )
1574 # Reset the counter
1575 self.reinitialize_counter = 0
1576 else:
1577 self.nodes_manager.move_slot(e)
1578 moved = True
1579 except TryAgainError:
1580 if is_debug_log_enabled():
1581 socket_address = self._extracts_socket_address(connection)
1582 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1583 logger.debug(
1584 f"TRYAGAIN error received for command {args_log_str}, on node {target_node.name}, "
1585 f"and connection: {connection} using local socket address: {socket_address}"
1586 )
1587 if ttl < self.RedisClusterRequestTTL / 2:
1588 time.sleep(0.05)
1589 except AskError as e:
1590 if is_debug_log_enabled():
1591 socket_address = self._extracts_socket_address(connection)
1592 args_log_str = truncate_text(" ".join(map(safe_str, args)))
1593 logger.debug(
1594 f"ASK error received for command {args_log_str}, on node {target_node.name}, "
1595 f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
1596 )
1597 redirect_addr = get_node_name(host=e.host, port=e.port)
1598 asking = True
1599 except (ClusterDownError, SlotNotCoveredError):
1600 # ClusterDownError can occur during a failover and to get
1601 # self-healed, we will try to reinitialize the cluster layout
1602 # and retry executing the command
1604 # SlotNotCoveredError can occur when the cluster is not fully
1605 # initialized or can be temporary issue.
1606 # We will try to reinitialize the cluster topology
1607 # and retry executing the command
1609 time.sleep(0.25)
1610 self.nodes_manager.initialize()
1611 raise
1612 except ResponseError:
1613 raise
1614 except Exception as e:
1615 if connection:
1616 connection.disconnect()
1617 raise e
1618 finally:
1619 if connection is not None:
1620 redis_node.connection_pool.release(connection)
1622 raise ClusterError("TTL exhausted.")
1624 def _extracts_socket_address(
1625 self, connection: Optional[Connection]
1626 ) -> Optional[int]:
1627 if connection is None:
1628 return None
1629 try:
1630 socket_address = (
1631 connection._sock.getsockname() if connection._sock else None
1632 )
1633 socket_address = socket_address[1] if socket_address else None
1634 except (AttributeError, OSError):
1635 pass
1636 return socket_address
1638 def close(self) -> None:
1639 try:
1640 with self._lock:
1641 if self.nodes_manager:
1642 self.nodes_manager.close()
1643 except AttributeError:
1644 # RedisCluster's __init__ can fail before nodes_manager is set
1645 pass
1647 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs):
1648 """
1649 Process the result of the executed command.
1650 The function would return a dict or a single value.
1652 :type command: str
1653 :type res: dict
1655 `res` should be in the following format:
1656 Dict<node_name, command_result>
1657 """
1658 if command in self.result_callbacks:
1659 res = self.result_callbacks[command](command, res, **kwargs)
1660 elif len(res) == 1:
1661 # When we execute the command on a single node, we can
1662 # remove the dictionary and return a single response
1663 res = list(res.values())[0]
1665 return self._policies_callback_mapping[response_policy](res)
1667 def load_external_module(self, funcname, func):
1668 """
1669 This function can be used to add externally defined redis modules,
1670 and their namespaces to the redis client.
1672 ``funcname`` - A string containing the name of the function to create
1673 ``func`` - The function, being added to this class.
1674 """
1675 setattr(self, funcname, func)
1677 def transaction(self, func, *watches, **kwargs):
1678 """
1679 Convenience method for executing the callable `func` as a transaction
1680 while watching all keys specified in `watches`. The 'func' callable
1681 should expect a single argument which is a Pipeline object.
1682 """
1683 shard_hint = kwargs.pop("shard_hint", None)
1684 value_from_callable = kwargs.pop("value_from_callable", False)
1685 watch_delay = kwargs.pop("watch_delay", None)
1686 with self.pipeline(True, shard_hint) as pipe:
1687 while True:
1688 try:
1689 if watches:
1690 pipe.watch(*watches)
1691 func_value = func(pipe)
1692 exec_value = pipe.execute()
1693 return func_value if value_from_callable else exec_value
1694 except WatchError:
1695 if watch_delay is not None and watch_delay > 0:
1696 time.sleep(watch_delay)
1697 continue
1700class ClusterNode:
1701 def __init__(self, host, port, server_type=None, redis_connection=None):
1702 if host == "localhost":
1703 host = socket.gethostbyname(host)
1705 self.host = host
1706 self.port = port
1707 self.name = get_node_name(host, port)
1708 self.server_type = server_type
1709 self.redis_connection = redis_connection
1711 def __repr__(self):
1712 return (
1713 f"[host={self.host},"
1714 f"port={self.port},"
1715 f"name={self.name},"
1716 f"server_type={self.server_type},"
1717 f"redis_connection={self.redis_connection}]"
1718 )
1720 def __eq__(self, obj):
1721 return isinstance(obj, ClusterNode) and obj.name == self.name
1723 def __hash__(self):
1724 return hash(self.name)
1727class LoadBalancingStrategy(Enum):
1728 ROUND_ROBIN = "round_robin"
1729 ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1730 RANDOM_REPLICA = "random_replica"
1733class LoadBalancer:
1734 """
1735 Round-Robin Load Balancing
1736 """
1738 def __init__(self, start_index: int = 0) -> None:
1739 self.primary_to_idx: dict[str, int] = {}
1740 self.start_index: int = start_index
1741 self._lock: threading.Lock = threading.Lock()
1743 def get_server_index(
1744 self,
1745 primary: str,
1746 list_size: int,
1747 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1748 ) -> int:
1749 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1750 return self._get_random_replica_index(list_size)
1751 else:
1752 return self._get_round_robin_index(
1753 primary,
1754 list_size,
1755 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1756 )
1758 def reset(self) -> None:
1759 with self._lock:
1760 self.primary_to_idx.clear()
1762 def _get_random_replica_index(self, list_size: int) -> int:
1763 return random.randint(1, list_size - 1)
1765 def _get_round_robin_index(
1766 self, primary: str, list_size: int, replicas_only: bool
1767 ) -> int:
1768 with self._lock:
1769 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1770 if replicas_only and server_index == 0:
1771 # skip the primary node index
1772 server_index = 1
1773 # Update the index for the next round
1774 self.primary_to_idx[primary] = (server_index + 1) % list_size
1775 return server_index
1778class NodesManager:
1779 def __init__(
1780 self,
1781 startup_nodes: list[ClusterNode],
1782 from_url=False,
1783 require_full_coverage=False,
1784 lock: Optional[threading.RLock] = None,
1785 dynamic_startup_nodes=True,
1786 connection_pool_class=ConnectionPool,
1787 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
1788 cache: Optional[CacheInterface] = None,
1789 cache_config: Optional[CacheConfig] = None,
1790 cache_factory: Optional[CacheFactoryInterface] = None,
1791 event_dispatcher: Optional[EventDispatcher] = None,
1792 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
1793 **kwargs,
1794 ):
1795 self.nodes_cache: dict[str, ClusterNode] = {}
1796 self.slots_cache: dict[int, list[ClusterNode]] = {}
1797 self.startup_nodes: dict[str, ClusterNode] = {n.name: n for n in startup_nodes}
1798 self.default_node: Optional[ClusterNode] = None
1799 self._epoch: int = 0
1800 self.from_url = from_url
1801 self._require_full_coverage = require_full_coverage
1802 self._dynamic_startup_nodes = dynamic_startup_nodes
1803 self.connection_pool_class = connection_pool_class
1804 self.address_remap = address_remap
1805 self._cache: Optional[CacheInterface] = None
1806 if cache:
1807 self._cache = cache
1808 elif cache_factory is not None:
1809 self._cache = cache_factory.get_cache()
1810 elif cache_config is not None:
1811 self._cache = CacheFactory(cache_config).get_cache()
1812 self.connection_kwargs = kwargs
1813 self.read_load_balancer = LoadBalancer()
1815 # nodes_cache / slots_cache / startup_nodes / default_node are protected by _lock
1816 if lock is None:
1817 self._lock = threading.RLock()
1818 else:
1819 self._lock = lock
1821 # initialize holds _initialization_lock to dedup multiple calls to reinitialize;
1822 # note that if we hold both _lock and _initialization_lock, we _must_ acquire
1823 # _initialization_lock first (ie: to have a consistent order) to avoid deadlock.
1824 self._initialization_lock: threading.RLock = threading.RLock()
1826 if event_dispatcher is None:
1827 self._event_dispatcher = EventDispatcher()
1828 else:
1829 self._event_dispatcher = event_dispatcher
1830 self._credential_provider = self.connection_kwargs.get(
1831 "credential_provider", None
1832 )
1833 self.maint_notifications_config = maint_notifications_config
1835 self.initialize()
1837 def get_node(
1838 self,
1839 host: Optional[str] = None,
1840 port: Optional[int] = None,
1841 node_name: Optional[str] = None,
1842 ) -> Optional[ClusterNode]:
1843 """
1844 Get the requested node from the cluster's nodes.
1845 nodes.
1846 :return: ClusterNode if the node exists, else None
1847 """
1848 if host and port:
1849 # the user passed host and port
1850 if host == "localhost":
1851 host = socket.gethostbyname(host)
1852 with self._lock:
1853 return self.nodes_cache.get(get_node_name(host=host, port=port))
1854 elif node_name:
1855 with self._lock:
1856 return self.nodes_cache.get(node_name)
1857 else:
1858 return None
1860 def move_slot(self, e: Union[AskError, MovedError]):
1861 """
1862 Update the slot's node with the redirected one
1863 """
1864 with self._lock:
1865 redirected_node = self.get_node(host=e.host, port=e.port)
1866 if redirected_node is not None:
1867 # The node already exists
1868 if redirected_node.server_type is not PRIMARY:
1869 # Update the node's server type
1870 redirected_node.server_type = PRIMARY
1871 else:
1872 # This is a new node, we will add it to the nodes cache
1873 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
1874 self.nodes_cache[redirected_node.name] = redirected_node
1876 slot_nodes = self.slots_cache[e.slot_id]
1877 if redirected_node not in slot_nodes:
1878 # The new slot owner is a new server, or a server from a different
1879 # shard. We need to remove all current nodes from the slot's list
1880 # (including replications) and add just the new node.
1881 self.slots_cache[e.slot_id] = [redirected_node]
1882 elif redirected_node is not slot_nodes[0]:
1883 # The MOVED error resulted from a failover, and the new slot owner
1884 # had previously been a replica.
1885 old_primary = slot_nodes[0]
1886 # Update the old primary to be a replica and add it to the end of
1887 # the slot's node list
1888 old_primary.server_type = REPLICA
1889 slot_nodes.append(old_primary)
1890 # Remove the old replica, which is now a primary, from the slot's
1891 # node list
1892 slot_nodes.remove(redirected_node)
1893 # Override the old primary with the new one
1894 slot_nodes[0] = redirected_node
1895 if self.default_node == old_primary:
1896 # Update the default node with the new primary
1897 self.default_node = redirected_node
1898 # else: circular MOVED to current primary -> no-op
1900 @deprecated_args(
1901 args_to_warn=["server_type"],
1902 reason=(
1903 "In case you need select some load balancing strategy "
1904 "that will use replicas, please set it through 'load_balancing_strategy'"
1905 ),
1906 version="5.3.0",
1907 )
1908 def get_node_from_slot(
1909 self,
1910 slot: int,
1911 read_from_replicas: bool = False,
1912 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
1913 server_type: Optional[Literal["primary", "replica"]] = None,
1914 ) -> ClusterNode:
1915 """
1916 Gets a node that servers this hash slot
1917 """
1919 if read_from_replicas is True and load_balancing_strategy is None:
1920 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1922 with self._lock:
1923 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
1924 raise SlotNotCoveredError(
1925 f'Slot "{slot}" not covered by the cluster. '
1926 + f'"require_full_coverage={self._require_full_coverage}"'
1927 )
1929 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1930 # get the server index using the strategy defined in load_balancing_strategy
1931 primary_name = self.slots_cache[slot][0].name
1932 node_idx = self.read_load_balancer.get_server_index(
1933 primary_name, len(self.slots_cache[slot]), load_balancing_strategy
1934 )
1935 elif (
1936 server_type is None
1937 or server_type == PRIMARY
1938 or len(self.slots_cache[slot]) == 1
1939 ):
1940 # return a primary
1941 node_idx = 0
1942 else:
1943 # return a replica
1944 # randomly choose one of the replicas
1945 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
1947 return self.slots_cache[slot][node_idx]
1949 def get_nodes_by_server_type(self, server_type: Literal["primary", "replica"]):
1950 """
1951 Get all nodes with the specified server type
1952 :param server_type: 'primary' or 'replica'
1953 :return: list of ClusterNode
1954 """
1955 with self._lock:
1956 return [
1957 node
1958 for node in self.nodes_cache.values()
1959 if node.server_type == server_type
1960 ]
1962 @deprecated_function(
1963 reason="This method is not used anymore internally. The startup nodes are populated automatically.",
1964 version="7.0.2",
1965 )
1966 def populate_startup_nodes(self, nodes):
1967 """
1968 Populate all startup nodes and filters out any duplicates
1969 """
1970 with self._lock:
1971 for n in nodes:
1972 self.startup_nodes[n.name] = n
1974 def move_node_to_end_of_cached_nodes(self, node_name: str) -> None:
1975 """
1976 Move a failing node to the end of startup_nodes and nodes_cache so it's
1977 tried last during reinitialization and when selecting the default node.
1978 If the node is not in the respective list, nothing is done.
1979 """
1980 # Move in startup_nodes
1981 if node_name in self.startup_nodes and len(self.startup_nodes) > 1:
1982 node = self.startup_nodes.pop(node_name)
1983 self.startup_nodes[node_name] = node # Re-insert at end
1985 # Move in nodes_cache - this affects get_nodes_by_server_type ordering
1986 # which is used to select the default_node during initialize()
1987 if node_name in self.nodes_cache and len(self.nodes_cache) > 1:
1988 node = self.nodes_cache.pop(node_name)
1989 self.nodes_cache[node_name] = node # Re-insert at end
1991 def check_slots_coverage(self, slots_cache):
1992 # Validate if all slots are covered or if we should try next
1993 # startup node
1994 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
1995 if i not in slots_cache:
1996 return False
1997 return True
1999 def create_redis_connections(self, nodes):
2000 """
2001 This function will create a redis connection to all nodes in :nodes:
2002 """
2003 connection_pools = []
2004 for node in nodes:
2005 if node.redis_connection is None:
2006 node.redis_connection = self.create_redis_node(
2007 host=node.host,
2008 port=node.port,
2009 maint_notifications_config=self.maint_notifications_config,
2010 **self.connection_kwargs,
2011 )
2012 connection_pools.append(node.redis_connection.connection_pool)
2014 self._event_dispatcher.dispatch(
2015 AfterPooledConnectionsInstantiationEvent(
2016 connection_pools, ClientType.SYNC, self._credential_provider
2017 )
2018 )
2020 def create_redis_node(
2021 self,
2022 host,
2023 port,
2024 **kwargs,
2025 ):
2026 # We are configuring the connection pool not to retry
2027 # connections on lower level clients to avoid retrying
2028 # connections to nodes that are not reachable
2029 # and to avoid blocking the connection pool.
2030 # The only error that will have some handling in the lower
2031 # level clients is ConnectionError which will trigger disconnection
2032 # of the socket.
2033 # The retries will be handled on cluster client level
2034 # where we will have proper handling of the cluster topology
2035 node_retry_config = Retry(
2036 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
2037 )
2039 if self.from_url:
2040 # Create a redis node with a custom connection pool
2041 kwargs.update({"host": host})
2042 kwargs.update({"port": port})
2043 kwargs.update({"cache": self._cache})
2044 kwargs.update({"retry": node_retry_config})
2045 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
2046 else:
2047 r = Redis(
2048 host=host,
2049 port=port,
2050 cache=self._cache,
2051 retry=node_retry_config,
2052 **kwargs,
2053 )
2054 return r
2056 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
2057 node_name = get_node_name(host, port)
2058 # check if we already have this node in the tmp_nodes_cache
2059 target_node = tmp_nodes_cache.get(node_name)
2060 if target_node is None:
2061 # before creating a new cluster node, check if the cluster node already
2062 # exists in the current nodes cache and has a valid connection so we can
2063 # reuse it
2064 redis_connection: Optional[Redis] = None
2065 with self._lock:
2066 previous_node = self.nodes_cache.get(node_name)
2067 if previous_node:
2068 redis_connection = previous_node.redis_connection
2069 # don't update the old ClusterNode, so we don't update its role
2070 # outside of the lock
2071 target_node = ClusterNode(host, port, role, redis_connection)
2072 # add this node to the nodes cache
2073 tmp_nodes_cache[target_node.name] = target_node
2075 return target_node
2077 def _get_epoch(self) -> int:
2078 """
2079 Get the current epoch value. This method exists primarily to allow
2080 tests to mock the epoch fetch and control race condition timing.
2081 """
2082 with self._lock:
2083 return self._epoch
2085 def initialize(
2086 self,
2087 additional_startup_nodes_info: Optional[List[Tuple[str, int]]] = None,
2088 disconnect_startup_nodes_pools: bool = True,
2089 ):
2090 """
2091 Initializes the nodes cache, slots cache and redis connections.
2092 :startup_nodes:
2093 Responsible for discovering other nodes in the cluster
2094 :disconnect_startup_nodes_pools:
2095 Whether to disconnect the connection pool of the startup nodes
2096 after the initialization is complete. This is useful when the
2097 startup nodes are not part of the cluster and we want to avoid
2098 keeping the connection open.
2099 :additional_startup_nodes_info:
2100 Additional nodes to add temporarily to the startup nodes.
2101 The additional nodes will be used just in the process of extraction of the slots
2102 and nodes information from the cluster.
2103 This is useful when we want to add new nodes to the cluster
2104 and initialize the client
2105 with them.
2106 The format of the list is a list of tuples, where each tuple contains
2107 the host and port of the node.
2108 """
2109 self.reset()
2110 tmp_nodes_cache = {}
2111 tmp_slots = {}
2112 disagreements = []
2113 startup_nodes_reachable = False
2114 fully_covered = False
2115 kwargs = self.connection_kwargs
2116 exception = None
2117 epoch = self._get_epoch()
2118 if additional_startup_nodes_info is None:
2119 additional_startup_nodes_info = []
2121 with self._initialization_lock:
2122 with self._lock:
2123 if epoch != self._epoch:
2124 # another thread has already re-initialized the nodes; don't
2125 # bother running again
2126 return
2128 with self._lock:
2129 startup_nodes = tuple(self.startup_nodes.values())
2131 additional_startup_nodes = [
2132 ClusterNode(host, port) for host, port in additional_startup_nodes_info
2133 ]
2134 if is_debug_log_enabled():
2135 logger.debug(
2136 f"Topology refresh: using additional nodes: {[node.name for node in additional_startup_nodes]}; "
2137 f"and startup nodes: {[node.name for node in startup_nodes]}"
2138 )
2140 for startup_node in (*startup_nodes, *additional_startup_nodes):
2141 try:
2142 if startup_node.redis_connection:
2143 r = startup_node.redis_connection
2145 else:
2146 # Create a new Redis connection
2147 if is_debug_log_enabled():
2148 socket_timeout = kwargs.get("socket_timeout", "not set")
2149 socket_connect_timeout = kwargs.get(
2150 "socket_connect_timeout", "not set"
2151 )
2152 maint_enabled = (
2153 self.maint_notifications_config.enabled
2154 if self.maint_notifications_config
2155 else False
2156 )
2157 logger.debug(
2158 "Topology refresh: Creating new Redis connection to "
2159 f"{startup_node.host}:{startup_node.port}; "
2160 f"with socket_timeout: {socket_timeout}, and "
2161 f"socket_connect_timeout: {socket_connect_timeout}, "
2162 "and maint_notifications enabled: "
2163 f"{maint_enabled}"
2164 )
2165 r = self.create_redis_node(
2166 startup_node.host,
2167 startup_node.port,
2168 maint_notifications_config=self.maint_notifications_config,
2169 **kwargs,
2170 )
2171 if startup_node in self.startup_nodes.values():
2172 self.startup_nodes[startup_node.name].redis_connection = r
2173 else:
2174 startup_node.redis_connection = r
2175 try:
2176 # Make sure cluster mode is enabled on this node
2177 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
2178 if disconnect_startup_nodes_pools:
2179 with r.connection_pool._lock:
2180 # take care to clear connections before we move on
2181 # mark all active connections for reconnect - they will be
2182 # reconnected on next use, but will allow current in flight commands to complete first
2183 r.connection_pool.update_active_connections_for_reconnect()
2184 # Needed to clear READONLY state when it is no longer applicable
2185 r.connection_pool.disconnect_free_connections()
2186 except ResponseError:
2187 raise RedisClusterException(
2188 "Cluster mode is not enabled on this node"
2189 )
2190 startup_nodes_reachable = True
2191 except Exception as e:
2192 # Try the next startup node.
2193 # The exception is saved and raised only if we have no more nodes.
2194 exception = e
2195 continue
2197 # CLUSTER SLOTS command results in the following output:
2198 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
2199 # where each node contains the following list: [IP, port, node_id]
2200 # Therefore, cluster_slots[0][2][0] will be the IP address of the
2201 # primary node of the first slot section.
2202 # If there's only one server in the cluster, its ``host`` is ''
2203 # Fix it to the host in startup_nodes
2204 if (
2205 len(cluster_slots) == 1
2206 and len(cluster_slots[0][2][0]) == 0
2207 and len(self.startup_nodes) == 1
2208 ):
2209 cluster_slots[0][2][0] = startup_node.host
2211 for slot in cluster_slots:
2212 primary_node = slot[2]
2213 host = str_if_bytes(primary_node[0])
2214 if host == "":
2215 host = startup_node.host
2216 port = int(primary_node[1])
2217 host, port = self.remap_host_port(host, port)
2219 nodes_for_slot = []
2221 target_node = self._get_or_create_cluster_node(
2222 host, port, PRIMARY, tmp_nodes_cache
2223 )
2224 nodes_for_slot.append(target_node)
2226 replica_nodes = slot[3:]
2227 for replica_node in replica_nodes:
2228 host = str_if_bytes(replica_node[0])
2229 port = int(replica_node[1])
2230 host, port = self.remap_host_port(host, port)
2231 target_replica_node = self._get_or_create_cluster_node(
2232 host, port, REPLICA, tmp_nodes_cache
2233 )
2234 nodes_for_slot.append(target_replica_node)
2236 for i in range(int(slot[0]), int(slot[1]) + 1):
2237 if i not in tmp_slots:
2238 tmp_slots[i] = nodes_for_slot
2239 else:
2240 # Validate that 2 nodes want to use the same slot cache
2241 # setup
2242 tmp_slot = tmp_slots[i][0]
2243 if tmp_slot.name != target_node.name:
2244 disagreements.append(
2245 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
2246 )
2248 if len(disagreements) > 5:
2249 raise RedisClusterException(
2250 f"startup_nodes could not agree on a valid "
2251 f"slots cache: {', '.join(disagreements)}"
2252 )
2254 fully_covered = self.check_slots_coverage(tmp_slots)
2255 if fully_covered:
2256 # Don't need to continue to the next startup node if all
2257 # slots are covered
2258 break
2260 if not startup_nodes_reachable:
2261 raise RedisClusterException(
2262 f"Redis Cluster cannot be connected. Please provide at least "
2263 f"one reachable node: {str(exception)}"
2264 ) from exception
2266 # Create Redis connections to all nodes
2267 self.create_redis_connections(list(tmp_nodes_cache.values()))
2269 # Check if the slots are not fully covered
2270 if not fully_covered and self._require_full_coverage:
2271 # Despite the requirement that the slots be covered, there
2272 # isn't a full coverage
2273 raise RedisClusterException(
2274 f"All slots are not covered after query all startup_nodes. "
2275 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
2276 f"covered..."
2277 )
2279 # Set the tmp variables to the real variables
2280 with self._lock:
2281 self.nodes_cache = tmp_nodes_cache
2282 self.slots_cache = tmp_slots
2283 # Set the default node
2284 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
2285 if self._dynamic_startup_nodes:
2286 # Populate the startup nodes with all discovered nodes
2287 self.startup_nodes = tmp_nodes_cache
2288 # Increment the epoch to signal that initialization has completed
2289 self._epoch += 1
2291 def close(self) -> None:
2292 with self._lock:
2293 self.default_node = None
2294 nodes = tuple(self.nodes_cache.values())
2295 for node in nodes:
2296 if node.redis_connection:
2297 node.redis_connection.close()
2299 def reset(self):
2300 try:
2301 self.read_load_balancer.reset()
2302 except TypeError:
2303 # The read_load_balancer is None, do nothing
2304 pass
2306 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
2307 """
2308 Remap the host and port returned from the cluster to a different
2309 internal value. Useful if the client is not connecting directly
2310 to the cluster.
2311 """
2312 if self.address_remap:
2313 return self.address_remap((host, port))
2314 return host, port
2316 def find_connection_owner(self, connection: Connection) -> Optional[ClusterNode]:
2317 node_name = get_node_name(connection.host, connection.port)
2318 with self._lock:
2319 for node in tuple(self.nodes_cache.values()):
2320 if node.redis_connection:
2321 conn_args = node.redis_connection.connection_pool.connection_kwargs
2322 if node_name == get_node_name(
2323 conn_args.get("host"), conn_args.get("port")
2324 ):
2325 return node
2326 return None
2329class ClusterPubSub(PubSub):
2330 """
2331 Wrapper for PubSub class.
2333 IMPORTANT: before using ClusterPubSub, read about the known limitations
2334 with pubsub in Cluster mode and learn how to workaround them:
2335 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
2336 """
2338 def __init__(
2339 self,
2340 redis_cluster,
2341 node=None,
2342 host=None,
2343 port=None,
2344 push_handler_func=None,
2345 event_dispatcher: Optional["EventDispatcher"] = None,
2346 **kwargs,
2347 ):
2348 """
2349 When a pubsub instance is created without specifying a node, a single
2350 node will be transparently chosen for the pubsub connection on the
2351 first command execution. The node will be determined by:
2352 1. Hashing the channel name in the request to find its keyslot
2353 2. Selecting a node that handles the keyslot: If read_from_replicas is
2354 set to true or load_balancing_strategy is set, a replica can be selected.
2356 :type redis_cluster: RedisCluster
2357 :type node: ClusterNode
2358 :type host: str
2359 :type port: int
2360 """
2361 self.node = None
2362 self.set_pubsub_node(redis_cluster, node, host, port)
2363 connection_pool = (
2364 None
2365 if self.node is None
2366 else redis_cluster.get_redis_connection(self.node).connection_pool
2367 )
2368 self.cluster = redis_cluster
2369 self.node_pubsub_mapping = {}
2370 self._pubsubs_generator = self._pubsubs_generator()
2371 if event_dispatcher is None:
2372 self._event_dispatcher = EventDispatcher()
2373 else:
2374 self._event_dispatcher = event_dispatcher
2375 super().__init__(
2376 connection_pool=connection_pool,
2377 encoder=redis_cluster.encoder,
2378 push_handler_func=push_handler_func,
2379 event_dispatcher=self._event_dispatcher,
2380 **kwargs,
2381 )
2383 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
2384 """
2385 The pubsub node will be set according to the passed node, host and port
2386 When none of the node, host, or port are specified - the node is set
2387 to None and will be determined by the keyslot of the channel in the
2388 first command to be executed.
2389 RedisClusterException will be thrown if the passed node does not exist
2390 in the cluster.
2391 If host is passed without port, or vice versa, a DataError will be
2392 thrown.
2393 :type cluster: RedisCluster
2394 :type node: ClusterNode
2395 :type host: str
2396 :type port: int
2397 """
2398 if node is not None:
2399 # node is passed by the user
2400 self._raise_on_invalid_node(cluster, node, node.host, node.port)
2401 pubsub_node = node
2402 elif host is not None and port is not None:
2403 # host and port passed by the user
2404 node = cluster.get_node(host=host, port=port)
2405 self._raise_on_invalid_node(cluster, node, host, port)
2406 pubsub_node = node
2407 elif any([host, port]) is True:
2408 # only 'host' or 'port' passed
2409 raise DataError("Passing a host requires passing a port, and vice versa")
2410 else:
2411 # nothing passed by the user. set node to None
2412 pubsub_node = None
2414 self.node = pubsub_node
2416 def get_pubsub_node(self):
2417 """
2418 Get the node that is being used as the pubsub connection
2419 """
2420 return self.node
2422 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
2423 """
2424 Raise a RedisClusterException if the node is None or doesn't exist in
2425 the cluster.
2426 """
2427 if node is None or redis_cluster.get_node(node_name=node.name) is None:
2428 raise RedisClusterException(
2429 f"Node {host}:{port} doesn't exist in the cluster"
2430 )
2432 def execute_command(self, *args):
2433 """
2434 Execute a subscribe/unsubscribe command.
2436 Taken code from redis-py and tweak to make it work within a cluster.
2437 """
2438 # NOTE: don't parse the response in this function -- it could pull a
2439 # legitimate message off the stack if the connection is already
2440 # subscribed to one or more channels
2442 if self.connection is None:
2443 if self.connection_pool is None:
2444 if len(args) > 1:
2445 # Hash the first channel and get one of the nodes holding
2446 # this slot
2447 channel = args[1]
2448 slot = self.cluster.keyslot(channel)
2449 node = self.cluster.nodes_manager.get_node_from_slot(
2450 slot,
2451 self.cluster.read_from_replicas,
2452 self.cluster.load_balancing_strategy,
2453 )
2454 else:
2455 # Get a random node
2456 node = self.cluster.get_random_node()
2457 self.node = node
2458 redis_connection = self.cluster.get_redis_connection(node)
2459 self.connection_pool = redis_connection.connection_pool
2460 self.connection = self.connection_pool.get_connection()
2461 # register a callback that re-subscribes to any channels we
2462 # were listening to when we were disconnected
2463 self.connection.register_connect_callback(self.on_connect)
2464 if self.push_handler_func is not None:
2465 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
2466 self._event_dispatcher.dispatch(
2467 AfterPubSubConnectionInstantiationEvent(
2468 self.connection, self.connection_pool, ClientType.SYNC, self._lock
2469 )
2470 )
2471 connection = self.connection
2472 self._execute(connection, connection.send_command, *args)
2474 def _get_node_pubsub(self, node):
2475 try:
2476 return self.node_pubsub_mapping[node.name]
2477 except KeyError:
2478 pubsub = node.redis_connection.pubsub(
2479 push_handler_func=self.push_handler_func
2480 )
2481 self.node_pubsub_mapping[node.name] = pubsub
2482 return pubsub
2484 def _sharded_message_generator(self):
2485 for _ in range(len(self.node_pubsub_mapping)):
2486 pubsub = next(self._pubsubs_generator)
2487 message = pubsub.get_message()
2488 if message is not None:
2489 return message
2490 return None
2492 def _pubsubs_generator(self):
2493 while True:
2494 current_nodes = list(self.node_pubsub_mapping.values())
2495 yield from current_nodes
2497 def get_sharded_message(
2498 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
2499 ):
2500 if target_node:
2501 message = self.node_pubsub_mapping[target_node.name].get_message(
2502 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
2503 )
2504 else:
2505 message = self._sharded_message_generator()
2506 if message is None:
2507 return None
2508 elif str_if_bytes(message["type"]) == "sunsubscribe":
2509 if message["channel"] in self.pending_unsubscribe_shard_channels:
2510 self.pending_unsubscribe_shard_channels.remove(message["channel"])
2511 self.shard_channels.pop(message["channel"], None)
2512 node = self.cluster.get_node_from_key(message["channel"])
2513 if self.node_pubsub_mapping[node.name].subscribed is False:
2514 self.node_pubsub_mapping.pop(node.name)
2515 if not self.channels and not self.patterns and not self.shard_channels:
2516 # There are no subscriptions anymore, set subscribed_event flag
2517 # to false
2518 self.subscribed_event.clear()
2519 if self.ignore_subscribe_messages or ignore_subscribe_messages:
2520 return None
2521 return message
2523 def ssubscribe(self, *args, **kwargs):
2524 if args:
2525 args = list_or_args(args[0], args[1:])
2526 s_channels = dict.fromkeys(args)
2527 s_channels.update(kwargs)
2528 for s_channel, handler in s_channels.items():
2529 node = self.cluster.get_node_from_key(s_channel)
2530 pubsub = self._get_node_pubsub(node)
2531 if handler:
2532 pubsub.ssubscribe(**{s_channel: handler})
2533 else:
2534 pubsub.ssubscribe(s_channel)
2535 self.shard_channels.update(pubsub.shard_channels)
2536 self.pending_unsubscribe_shard_channels.difference_update(
2537 self._normalize_keys({s_channel: None})
2538 )
2539 if pubsub.subscribed and not self.subscribed:
2540 self.subscribed_event.set()
2541 self.health_check_response_counter = 0
2543 def sunsubscribe(self, *args):
2544 if args:
2545 args = list_or_args(args[0], args[1:])
2546 else:
2547 args = self.shard_channels
2549 for s_channel in args:
2550 node = self.cluster.get_node_from_key(s_channel)
2551 p = self._get_node_pubsub(node)
2552 p.sunsubscribe(s_channel)
2553 self.pending_unsubscribe_shard_channels.update(
2554 p.pending_unsubscribe_shard_channels
2555 )
2557 def get_redis_connection(self):
2558 """
2559 Get the Redis connection of the pubsub connected node.
2560 """
2561 if self.node is not None:
2562 return self.node.redis_connection
2564 def disconnect(self):
2565 """
2566 Disconnect the pubsub connection.
2567 """
2568 if self.connection:
2569 self.connection.disconnect()
2570 for pubsub in self.node_pubsub_mapping.values():
2571 pubsub.connection.disconnect()
2574class ClusterPipeline(RedisCluster):
2575 """
2576 Support for Redis pipeline
2577 in cluster mode
2578 """
2580 ERRORS_ALLOW_RETRY = (
2581 ConnectionError,
2582 TimeoutError,
2583 MovedError,
2584 AskError,
2585 TryAgainError,
2586 )
2588 NO_SLOTS_COMMANDS = {"UNWATCH"}
2589 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
2590 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
2592 @deprecated_args(
2593 args_to_warn=[
2594 "cluster_error_retry_attempts",
2595 ],
2596 reason="Please configure the 'retry' object instead",
2597 version="6.0.0",
2598 )
2599 def __init__(
2600 self,
2601 nodes_manager: "NodesManager",
2602 commands_parser: "CommandsParser",
2603 result_callbacks: Optional[Dict[str, Callable]] = None,
2604 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
2605 startup_nodes: Optional[List["ClusterNode"]] = None,
2606 read_from_replicas: bool = False,
2607 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2608 cluster_error_retry_attempts: int = 3,
2609 reinitialize_steps: int = 5,
2610 retry: Optional[Retry] = None,
2611 lock=None,
2612 transaction=False,
2613 policy_resolver: PolicyResolver = StaticPolicyResolver(),
2614 **kwargs,
2615 ):
2616 """ """
2617 self.command_stack = []
2618 self.nodes_manager = nodes_manager
2619 self.commands_parser = commands_parser
2620 self.refresh_table_asap = False
2621 self.result_callbacks = (
2622 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
2623 )
2624 self.startup_nodes = startup_nodes if startup_nodes else []
2625 self.read_from_replicas = read_from_replicas
2626 self.load_balancing_strategy = load_balancing_strategy
2627 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
2628 self.cluster_response_callbacks = cluster_response_callbacks
2629 self.reinitialize_counter = 0
2630 self.reinitialize_steps = reinitialize_steps
2631 if retry is not None:
2632 self.retry = retry
2633 else:
2634 self.retry = Retry(
2635 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
2636 retries=cluster_error_retry_attempts,
2637 )
2639 self.encoder = Encoder(
2640 kwargs.get("encoding", "utf-8"),
2641 kwargs.get("encoding_errors", "strict"),
2642 kwargs.get("decode_responses", False),
2643 )
2644 if lock is None:
2645 lock = threading.RLock()
2646 self._lock = lock
2647 self.parent_execute_command = super().execute_command
2648 self._execution_strategy: ExecutionStrategy = (
2649 PipelineStrategy(self) if not transaction else TransactionStrategy(self)
2650 )
2652 # For backward compatibility, mapping from existing policies to new one
2653 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
2654 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
2655 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
2656 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
2657 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
2658 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
2659 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
2660 }
2662 self._policies_callback_mapping: dict[
2663 Union[RequestPolicy, ResponsePolicy], Callable
2664 ] = {
2665 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
2666 self.get_random_primary_or_all_nodes(command_name)
2667 ],
2668 RequestPolicy.DEFAULT_KEYED: lambda command,
2669 *args: self.get_nodes_from_slot(command, *args),
2670 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
2671 RequestPolicy.ALL_SHARDS: self.get_primaries,
2672 RequestPolicy.ALL_NODES: self.get_nodes,
2673 RequestPolicy.ALL_REPLICAS: self.get_replicas,
2674 RequestPolicy.MULTI_SHARD: lambda *args,
2675 **kwargs: self._split_multi_shard_command(*args, **kwargs),
2676 RequestPolicy.SPECIAL: self.get_special_nodes,
2677 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
2678 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
2679 }
2681 self._policy_resolver = policy_resolver
2683 def __repr__(self):
2684 """ """
2685 return f"{type(self).__name__}"
2687 def __enter__(self):
2688 """ """
2689 return self
2691 def __exit__(self, exc_type, exc_value, traceback):
2692 """ """
2693 self.reset()
2695 def __del__(self):
2696 try:
2697 self.reset()
2698 except Exception:
2699 pass
2701 def __len__(self):
2702 """ """
2703 return len(self._execution_strategy.command_queue)
2705 def __bool__(self):
2706 "Pipeline instances should always evaluate to True on Python 3+"
2707 return True
2709 def execute_command(self, *args, **kwargs):
2710 """
2711 Wrapper function for pipeline_execute_command
2712 """
2713 return self._execution_strategy.execute_command(*args, **kwargs)
2715 def pipeline_execute_command(self, *args, **options):
2716 """
2717 Stage a command to be executed when execute() is next called
2719 Returns the current Pipeline object back so commands can be
2720 chained together, such as:
2722 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
2724 At some other point, you can then run: pipe.execute(),
2725 which will execute all commands queued in the pipe.
2726 """
2727 return self._execution_strategy.execute_command(*args, **options)
2729 def annotate_exception(self, exception, number, command):
2730 """
2731 Provides extra context to the exception prior to it being handled
2732 """
2733 self._execution_strategy.annotate_exception(exception, number, command)
2735 def execute(self, raise_on_error: bool = True) -> List[Any]:
2736 """
2737 Execute all the commands in the current pipeline
2738 """
2740 try:
2741 return self._execution_strategy.execute(raise_on_error)
2742 finally:
2743 self.reset()
2745 def reset(self):
2746 """
2747 Reset back to empty pipeline.
2748 """
2749 self._execution_strategy.reset()
2751 def send_cluster_commands(
2752 self, stack, raise_on_error=True, allow_redirections=True
2753 ):
2754 return self._execution_strategy.send_cluster_commands(
2755 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
2756 )
2758 def exists(self, *keys):
2759 return self._execution_strategy.exists(*keys)
2761 def eval(self):
2762 """ """
2763 return self._execution_strategy.eval()
2765 def multi(self):
2766 """
2767 Start a transactional block of the pipeline after WATCH commands
2768 are issued. End the transactional block with `execute`.
2769 """
2770 self._execution_strategy.multi()
2772 def load_scripts(self):
2773 """ """
2774 self._execution_strategy.load_scripts()
2776 def discard(self):
2777 """ """
2778 self._execution_strategy.discard()
2780 def watch(self, *names):
2781 """Watches the values at keys ``names``"""
2782 self._execution_strategy.watch(*names)
2784 def unwatch(self):
2785 """Unwatches all previously specified keys"""
2786 self._execution_strategy.unwatch()
2788 def script_load_for_pipeline(self, *args, **kwargs):
2789 self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
2791 def delete(self, *names):
2792 self._execution_strategy.delete(*names)
2794 def unlink(self, *names):
2795 self._execution_strategy.unlink(*names)
2798def block_pipeline_command(name: str) -> Callable[..., Any]:
2799 """
2800 Prints error because some pipelined commands should
2801 be blocked when running in cluster-mode
2802 """
2804 def inner(*args, **kwargs):
2805 raise RedisClusterException(
2806 f"ERROR: Calling pipelined function {name} is blocked "
2807 f"when running redis in cluster mode..."
2808 )
2810 return inner
2813# Blocked pipeline commands
2814PIPELINE_BLOCKED_COMMANDS = (
2815 "BGREWRITEAOF",
2816 "BGSAVE",
2817 "BITOP",
2818 "BRPOPLPUSH",
2819 "CLIENT GETNAME",
2820 "CLIENT KILL",
2821 "CLIENT LIST",
2822 "CLIENT SETNAME",
2823 "CLIENT",
2824 "CONFIG GET",
2825 "CONFIG RESETSTAT",
2826 "CONFIG REWRITE",
2827 "CONFIG SET",
2828 "CONFIG",
2829 "DBSIZE",
2830 "ECHO",
2831 "EVALSHA",
2832 "FLUSHALL",
2833 "FLUSHDB",
2834 "INFO",
2835 "KEYS",
2836 "LASTSAVE",
2837 "MGET",
2838 "MGET NONATOMIC",
2839 "MOVE",
2840 "MSET",
2841 "MSETEX",
2842 "MSET NONATOMIC",
2843 "MSETNX",
2844 "PFCOUNT",
2845 "PFMERGE",
2846 "PING",
2847 "PUBLISH",
2848 "RANDOMKEY",
2849 "READONLY",
2850 "READWRITE",
2851 "RENAME",
2852 "RENAMENX",
2853 "RPOPLPUSH",
2854 "SAVE",
2855 "SCAN",
2856 "SCRIPT EXISTS",
2857 "SCRIPT FLUSH",
2858 "SCRIPT KILL",
2859 "SCRIPT LOAD",
2860 "SCRIPT",
2861 "SDIFF",
2862 "SDIFFSTORE",
2863 "SENTINEL GET MASTER ADDR BY NAME",
2864 "SENTINEL MASTER",
2865 "SENTINEL MASTERS",
2866 "SENTINEL MONITOR",
2867 "SENTINEL REMOVE",
2868 "SENTINEL SENTINELS",
2869 "SENTINEL SET",
2870 "SENTINEL SLAVES",
2871 "SENTINEL",
2872 "SHUTDOWN",
2873 "SINTER",
2874 "SINTERSTORE",
2875 "SLAVEOF",
2876 "SLOWLOG GET",
2877 "SLOWLOG LEN",
2878 "SLOWLOG RESET",
2879 "SLOWLOG",
2880 "SMOVE",
2881 "SORT",
2882 "SUNION",
2883 "SUNIONSTORE",
2884 "TIME",
2885)
2886for command in PIPELINE_BLOCKED_COMMANDS:
2887 command = command.replace(" ", "_").lower()
2889 setattr(ClusterPipeline, command, block_pipeline_command(command))
2892class PipelineCommand:
2893 """ """
2895 def __init__(self, args, options=None, position=None):
2896 self.args = args
2897 if options is None:
2898 options = {}
2899 self.options = options
2900 self.position = position
2901 self.result = None
2902 self.node = None
2903 self.asking = False
2904 self.command_policies: Optional[CommandPolicies] = None
2907class NodeCommands:
2908 """ """
2910 def __init__(self, parse_response, connection_pool, connection):
2911 """ """
2912 self.parse_response = parse_response
2913 self.connection_pool = connection_pool
2914 self.connection = connection
2915 self.commands = []
2917 def append(self, c):
2918 """ """
2919 self.commands.append(c)
2921 def write(self):
2922 """
2923 Code borrowed from Redis so it can be fixed
2924 """
2925 connection = self.connection
2926 commands = self.commands
2928 # We are going to clobber the commands with the write, so go ahead
2929 # and ensure that nothing is sitting there from a previous run.
2930 for c in commands:
2931 c.result = None
2933 # build up all commands into a single request to increase network perf
2934 # send all the commands and catch connection and timeout errors.
2935 try:
2936 connection.send_packed_command(
2937 connection.pack_commands([c.args for c in commands])
2938 )
2939 except (ConnectionError, TimeoutError) as e:
2940 for c in commands:
2941 c.result = e
2943 def read(self):
2944 """ """
2945 connection = self.connection
2946 for c in self.commands:
2947 # if there is a result on this command,
2948 # it means we ran into an exception
2949 # like a connection error. Trying to parse
2950 # a response on a connection that
2951 # is no longer open will result in a
2952 # connection error raised by redis-py.
2953 # but redis-py doesn't check in parse_response
2954 # that the sock object is
2955 # still set and if you try to
2956 # read from a closed connection, it will
2957 # result in an AttributeError because
2958 # it will do a readline() call on None.
2959 # This can have all kinds of nasty side-effects.
2960 # Treating this case as a connection error
2961 # is fine because it will dump
2962 # the connection object back into the
2963 # pool and on the next write, it will
2964 # explicitly open the connection and all will be well.
2965 if c.result is None:
2966 try:
2967 c.result = self.parse_response(connection, c.args[0], **c.options)
2968 except (ConnectionError, TimeoutError) as e:
2969 for c in self.commands:
2970 c.result = e
2971 return
2972 except RedisError:
2973 c.result = sys.exc_info()[1]
2976class ExecutionStrategy(ABC):
2977 @property
2978 @abstractmethod
2979 def command_queue(self):
2980 pass
2982 @abstractmethod
2983 def execute_command(self, *args, **kwargs):
2984 """
2985 Execution flow for current execution strategy.
2987 See: ClusterPipeline.execute_command()
2988 """
2989 pass
2991 @abstractmethod
2992 def annotate_exception(self, exception, number, command):
2993 """
2994 Annotate exception according to current execution strategy.
2996 See: ClusterPipeline.annotate_exception()
2997 """
2998 pass
3000 @abstractmethod
3001 def pipeline_execute_command(self, *args, **options):
3002 """
3003 Pipeline execution flow for current execution strategy.
3005 See: ClusterPipeline.pipeline_execute_command()
3006 """
3007 pass
3009 @abstractmethod
3010 def execute(self, raise_on_error: bool = True) -> List[Any]:
3011 """
3012 Executes current execution strategy.
3014 See: ClusterPipeline.execute()
3015 """
3016 pass
3018 @abstractmethod
3019 def send_cluster_commands(
3020 self, stack, raise_on_error=True, allow_redirections=True
3021 ):
3022 """
3023 Sends commands according to current execution strategy.
3025 See: ClusterPipeline.send_cluster_commands()
3026 """
3027 pass
3029 @abstractmethod
3030 def reset(self):
3031 """
3032 Resets current execution strategy.
3034 See: ClusterPipeline.reset()
3035 """
3036 pass
3038 @abstractmethod
3039 def exists(self, *keys):
3040 pass
3042 @abstractmethod
3043 def eval(self):
3044 pass
3046 @abstractmethod
3047 def multi(self):
3048 """
3049 Starts transactional context.
3051 See: ClusterPipeline.multi()
3052 """
3053 pass
3055 @abstractmethod
3056 def load_scripts(self):
3057 pass
3059 @abstractmethod
3060 def watch(self, *names):
3061 pass
3063 @abstractmethod
3064 def unwatch(self):
3065 """
3066 Unwatches all previously specified keys
3068 See: ClusterPipeline.unwatch()
3069 """
3070 pass
3072 @abstractmethod
3073 def script_load_for_pipeline(self, *args, **kwargs):
3074 pass
3076 @abstractmethod
3077 def delete(self, *names):
3078 """
3079 "Delete a key specified by ``names``"
3081 See: ClusterPipeline.delete()
3082 """
3083 pass
3085 @abstractmethod
3086 def unlink(self, *names):
3087 """
3088 "Unlink a key specified by ``names``"
3090 See: ClusterPipeline.unlink()
3091 """
3092 pass
3094 @abstractmethod
3095 def discard(self):
3096 pass
3099class AbstractStrategy(ExecutionStrategy):
3100 def __init__(
3101 self,
3102 pipe: ClusterPipeline,
3103 ):
3104 self._command_queue: List[PipelineCommand] = []
3105 self._pipe = pipe
3106 self._nodes_manager = self._pipe.nodes_manager
3108 @property
3109 def command_queue(self):
3110 return self._command_queue
3112 @command_queue.setter
3113 def command_queue(self, queue: List[PipelineCommand]):
3114 self._command_queue = queue
3116 @abstractmethod
3117 def execute_command(self, *args, **kwargs):
3118 pass
3120 def pipeline_execute_command(self, *args, **options):
3121 self._command_queue.append(
3122 PipelineCommand(args, options, len(self._command_queue))
3123 )
3124 return self._pipe
3126 @abstractmethod
3127 def execute(self, raise_on_error: bool = True) -> List[Any]:
3128 pass
3130 @abstractmethod
3131 def send_cluster_commands(
3132 self, stack, raise_on_error=True, allow_redirections=True
3133 ):
3134 pass
3136 @abstractmethod
3137 def reset(self):
3138 pass
3140 def exists(self, *keys):
3141 return self.execute_command("EXISTS", *keys)
3143 def eval(self):
3144 """ """
3145 raise RedisClusterException("method eval() is not implemented")
3147 def load_scripts(self):
3148 """ """
3149 raise RedisClusterException("method load_scripts() is not implemented")
3151 def script_load_for_pipeline(self, *args, **kwargs):
3152 """ """
3153 raise RedisClusterException(
3154 "method script_load_for_pipeline() is not implemented"
3155 )
3157 def annotate_exception(self, exception, number, command):
3158 """
3159 Provides extra context to the exception prior to it being handled
3160 """
3161 cmd = " ".join(map(safe_str, command))
3162 msg = (
3163 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
3164 f"caused error: {exception.args[0]}"
3165 )
3166 exception.args = (msg,) + exception.args[1:]
3169class PipelineStrategy(AbstractStrategy):
3170 def __init__(self, pipe: ClusterPipeline):
3171 super().__init__(pipe)
3172 self.command_flags = pipe.command_flags
3174 def execute_command(self, *args, **kwargs):
3175 return self.pipeline_execute_command(*args, **kwargs)
3177 def _raise_first_error(self, stack):
3178 """
3179 Raise the first exception on the stack
3180 """
3181 for c in stack:
3182 r = c.result
3183 if isinstance(r, Exception):
3184 self.annotate_exception(r, c.position + 1, c.args)
3185 raise r
3187 def execute(self, raise_on_error: bool = True) -> List[Any]:
3188 stack = self._command_queue
3189 if not stack:
3190 return []
3192 try:
3193 return self.send_cluster_commands(stack, raise_on_error)
3194 finally:
3195 self.reset()
3197 def reset(self):
3198 """
3199 Reset back to empty pipeline.
3200 """
3201 self._command_queue = []
3203 def send_cluster_commands(
3204 self, stack, raise_on_error=True, allow_redirections=True
3205 ):
3206 """
3207 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
3209 If one of the retryable exceptions has been thrown we assume that:
3210 - connection_pool was disconnected
3211 - connection_pool was reset
3212 - refresh_table_asap set to True
3214 It will try the number of times specified by
3215 the retries in config option "self.retry"
3216 which defaults to 3 unless manually configured.
3218 If it reaches the number of times, the command will
3219 raises ClusterDownException.
3220 """
3221 if not stack:
3222 return []
3223 retry_attempts = self._pipe.retry.get_retries()
3224 while True:
3225 try:
3226 return self._send_cluster_commands(
3227 stack,
3228 raise_on_error=raise_on_error,
3229 allow_redirections=allow_redirections,
3230 )
3231 except RedisCluster.ERRORS_ALLOW_RETRY as e:
3232 if retry_attempts > 0:
3233 # Try again with the new cluster setup. All other errors
3234 # should be raised.
3235 retry_attempts -= 1
3236 pass
3237 else:
3238 raise e
3240 def _send_cluster_commands(
3241 self, stack, raise_on_error=True, allow_redirections=True
3242 ):
3243 """
3244 Send a bunch of cluster commands to the redis cluster.
3246 `allow_redirections` If the pipeline should follow
3247 `ASK` & `MOVED` responses automatically. If set
3248 to false it will raise RedisClusterException.
3249 """
3250 # the first time sending the commands we send all of
3251 # the commands that were queued up.
3252 # if we have to run through it again, we only retry
3253 # the commands that failed.
3254 attempt = sorted(stack, key=lambda x: x.position)
3255 is_default_node = False
3256 # build a list of node objects based on node names we need to
3257 nodes = {}
3259 # as we move through each command that still needs to be processed,
3260 # we figure out the slot number that command maps to, then from
3261 # the slot determine the node.
3262 for c in attempt:
3263 command_policies = self._pipe._policy_resolver.resolve(c.args[0].lower())
3265 while True:
3266 # refer to our internal node -> slot table that
3267 # tells us where a given command should route to.
3268 # (it might be possible we have a cached node that no longer
3269 # exists in the cluster, which is why we do this in a loop)
3270 passed_targets = c.options.pop("target_nodes", None)
3271 if passed_targets and not self._is_nodes_flag(passed_targets):
3272 target_nodes = self._parse_target_nodes(passed_targets)
3274 if not command_policies:
3275 command_policies = CommandPolicies()
3276 else:
3277 if not command_policies:
3278 command = c.args[0].upper()
3279 if (
3280 len(c.args) >= 2
3281 and f"{c.args[0]} {c.args[1]}".upper()
3282 in self._pipe.command_flags
3283 ):
3284 command = f"{c.args[0]} {c.args[1]}".upper()
3286 # We only could resolve key properties if command is not
3287 # in a list of pre-defined request policies
3288 command_flag = self.command_flags.get(command)
3289 if not command_flag:
3290 # Fallback to default policy
3291 if not self._pipe.get_default_node():
3292 keys = None
3293 else:
3294 keys = self._pipe._get_command_keys(*c.args)
3295 if not keys or len(keys) == 0:
3296 command_policies = CommandPolicies()
3297 else:
3298 command_policies = CommandPolicies(
3299 request_policy=RequestPolicy.DEFAULT_KEYED,
3300 response_policy=ResponsePolicy.DEFAULT_KEYED,
3301 )
3302 else:
3303 if command_flag in self._pipe._command_flags_mapping:
3304 command_policies = CommandPolicies(
3305 request_policy=self._pipe._command_flags_mapping[
3306 command_flag
3307 ]
3308 )
3309 else:
3310 command_policies = CommandPolicies()
3312 target_nodes = self._determine_nodes(
3313 *c.args,
3314 request_policy=command_policies.request_policy,
3315 node_flag=passed_targets,
3316 )
3317 if not target_nodes:
3318 raise RedisClusterException(
3319 f"No targets were found to execute {c.args} command on"
3320 )
3321 c.command_policies = command_policies
3322 if len(target_nodes) > 1:
3323 raise RedisClusterException(
3324 f"Too many targets for command {c.args}"
3325 )
3327 node = target_nodes[0]
3328 if node == self._pipe.get_default_node():
3329 is_default_node = True
3331 # now that we know the name of the node
3332 # ( it's just a string in the form of host:port )
3333 # we can build a list of commands for each node.
3334 node_name = node.name
3335 if node_name not in nodes:
3336 redis_node = self._pipe.get_redis_connection(node)
3337 try:
3338 connection = get_connection(redis_node)
3339 except (ConnectionError, TimeoutError):
3340 for n in nodes.values():
3341 n.connection_pool.release(n.connection)
3342 # Connection retries are being handled in the node's
3343 # Retry object. Reinitialize the node -> slot table.
3344 self._nodes_manager.initialize()
3345 if is_default_node:
3346 self._pipe.replace_default_node()
3347 raise
3348 nodes[node_name] = NodeCommands(
3349 redis_node.parse_response,
3350 redis_node.connection_pool,
3351 connection,
3352 )
3353 nodes[node_name].append(c)
3354 break
3356 # send the commands in sequence.
3357 # we write to all the open sockets for each node first,
3358 # before reading anything
3359 # this allows us to flush all the requests out across the
3360 # network
3361 # so that we can read them from different sockets as they come back.
3362 # we dont' multiplex on the sockets as they come available,
3363 # but that shouldn't make too much difference.
3364 try:
3365 node_commands = nodes.values()
3366 for n in node_commands:
3367 n.write()
3369 for n in node_commands:
3370 n.read()
3371 finally:
3372 # release all of the redis connections we allocated earlier
3373 # back into the connection pool.
3374 # we used to do this step as part of a try/finally block,
3375 # but it is really dangerous to
3376 # release connections back into the pool if for some
3377 # reason the socket has data still left in it
3378 # from a previous operation. The write and
3379 # read operations already have try/catch around them for
3380 # all known types of errors including connection
3381 # and socket level errors.
3382 # So if we hit an exception, something really bad
3383 # happened and putting any oF
3384 # these connections back into the pool is a very bad idea.
3385 # the socket might have unread buffer still sitting in it,
3386 # and then the next time we read from it we pass the
3387 # buffered result back from a previous command and
3388 # every single request after to that connection will always get
3389 # a mismatched result.
3390 for n in nodes.values():
3391 n.connection_pool.release(n.connection)
3393 # if the response isn't an exception it is a
3394 # valid response from the node
3395 # we're all done with that command, YAY!
3396 # if we have more commands to attempt, we've run into problems.
3397 # collect all the commands we are allowed to retry.
3398 # (MOVED, ASK, or connection errors or timeout errors)
3399 attempt = sorted(
3400 (
3401 c
3402 for c in attempt
3403 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
3404 ),
3405 key=lambda x: x.position,
3406 )
3407 if attempt and allow_redirections:
3408 # RETRY MAGIC HAPPENS HERE!
3409 # send these remaining commands one at a time using `execute_command`
3410 # in the main client. This keeps our retry logic
3411 # in one place mostly,
3412 # and allows us to be more confident in correctness of behavior.
3413 # at this point any speed gains from pipelining have been lost
3414 # anyway, so we might as well make the best
3415 # attempt to get the correct behavior.
3416 #
3417 # The client command will handle retries for each
3418 # individual command sequentially as we pass each
3419 # one into `execute_command`. Any exceptions
3420 # that bubble out should only appear once all
3421 # retries have been exhausted.
3422 #
3423 # If a lot of commands have failed, we'll be setting the
3424 # flag to rebuild the slots table from scratch.
3425 # So MOVED errors should correct themselves fairly quickly.
3426 self._pipe.reinitialize_counter += 1
3427 if self._pipe._should_reinitialized():
3428 self._nodes_manager.initialize()
3429 if is_default_node:
3430 self._pipe.replace_default_node()
3431 for c in attempt:
3432 try:
3433 # send each command individually like we
3434 # do in the main client.
3435 c.result = self._pipe.parent_execute_command(*c.args, **c.options)
3436 except RedisError as e:
3437 c.result = e
3439 # turn the response back into a simple flat array that corresponds
3440 # to the sequence of commands issued in the stack in pipeline.execute()
3441 response = []
3442 for c in sorted(stack, key=lambda x: x.position):
3443 if c.args[0] in self._pipe.cluster_response_callbacks:
3444 # Remove keys entry, it needs only for cache.
3445 c.options.pop("keys", None)
3446 c.result = self._pipe._policies_callback_mapping[
3447 c.command_policies.response_policy
3448 ](
3449 self._pipe.cluster_response_callbacks[c.args[0]](
3450 c.result, **c.options
3451 )
3452 )
3453 response.append(c.result)
3455 if raise_on_error:
3456 self._raise_first_error(stack)
3458 return response
3460 def _is_nodes_flag(self, target_nodes):
3461 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
3463 def _parse_target_nodes(self, target_nodes):
3464 if isinstance(target_nodes, list):
3465 nodes = target_nodes
3466 elif isinstance(target_nodes, ClusterNode):
3467 # Supports passing a single ClusterNode as a variable
3468 nodes = [target_nodes]
3469 elif isinstance(target_nodes, dict):
3470 # Supports dictionaries of the format {node_name: node}.
3471 # It enables to execute commands with multi nodes as follows:
3472 # rc.cluster_save_config(rc.get_primaries())
3473 nodes = target_nodes.values()
3474 else:
3475 raise TypeError(
3476 "target_nodes type can be one of the following: "
3477 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
3478 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
3479 f"The passed type is {type(target_nodes)}"
3480 )
3481 return nodes
3483 def _determine_nodes(
3484 self, *args, request_policy: RequestPolicy, **kwargs
3485 ) -> List["ClusterNode"]:
3486 # Determine which nodes should be executed the command on.
3487 # Returns a list of target nodes.
3488 command = args[0].upper()
3489 if (
3490 len(args) >= 2
3491 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
3492 ):
3493 command = f"{args[0]} {args[1]}".upper()
3495 nodes_flag = kwargs.pop("nodes_flag", None)
3496 if nodes_flag is not None:
3497 # nodes flag passed by the user
3498 command_flag = nodes_flag
3499 else:
3500 # get the nodes group for this command if it was predefined
3501 command_flag = self._pipe.command_flags.get(command)
3503 if command_flag in self._pipe._command_flags_mapping:
3504 request_policy = self._pipe._command_flags_mapping[command_flag]
3506 policy_callback = self._pipe._policies_callback_mapping[request_policy]
3508 if request_policy == RequestPolicy.DEFAULT_KEYED:
3509 nodes = policy_callback(command, *args)
3510 elif request_policy == RequestPolicy.MULTI_SHARD:
3511 nodes = policy_callback(*args, **kwargs)
3512 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
3513 nodes = policy_callback(args[0])
3514 else:
3515 nodes = policy_callback()
3517 if args[0].lower() == "ft.aggregate":
3518 self._aggregate_nodes = nodes
3520 return nodes
3522 def multi(self):
3523 raise RedisClusterException(
3524 "method multi() is not supported outside of transactional context"
3525 )
3527 def discard(self):
3528 raise RedisClusterException(
3529 "method discard() is not supported outside of transactional context"
3530 )
3532 def watch(self, *names):
3533 raise RedisClusterException(
3534 "method watch() is not supported outside of transactional context"
3535 )
3537 def unwatch(self, *names):
3538 raise RedisClusterException(
3539 "method unwatch() is not supported outside of transactional context"
3540 )
3542 def delete(self, *names):
3543 if len(names) != 1:
3544 raise RedisClusterException(
3545 "deleting multiple keys is not implemented in pipeline command"
3546 )
3548 return self.execute_command("DEL", names[0])
3550 def unlink(self, *names):
3551 if len(names) != 1:
3552 raise RedisClusterException(
3553 "unlinking multiple keys is not implemented in pipeline command"
3554 )
3556 return self.execute_command("UNLINK", names[0])
3559class TransactionStrategy(AbstractStrategy):
3560 NO_SLOTS_COMMANDS = {"UNWATCH"}
3561 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
3562 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3563 SLOT_REDIRECT_ERRORS = (AskError, MovedError)
3564 CONNECTION_ERRORS = (
3565 ConnectionError,
3566 OSError,
3567 ClusterDownError,
3568 SlotNotCoveredError,
3569 )
3571 def __init__(self, pipe: ClusterPipeline):
3572 super().__init__(pipe)
3573 self._explicit_transaction = False
3574 self._watching = False
3575 self._pipeline_slots: Set[int] = set()
3576 self._transaction_connection: Optional[Connection] = None
3577 self._executing = False
3578 self._retry = copy(self._pipe.retry)
3579 self._retry.update_supported_errors(
3580 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
3581 )
3583 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
3584 """
3585 Find a connection for a pipeline transaction.
3587 For running an atomic transaction, watch keys ensure that contents have not been
3588 altered as long as the watch commands for those keys were sent over the same
3589 connection. So once we start watching a key, we fetch a connection to the
3590 node that owns that slot and reuse it.
3591 """
3592 if not self._pipeline_slots:
3593 raise RedisClusterException(
3594 "At least a command with a key is needed to identify a node"
3595 )
3597 node: ClusterNode = self._nodes_manager.get_node_from_slot(
3598 list(self._pipeline_slots)[0], False
3599 )
3600 redis_node: Redis = self._pipe.get_redis_connection(node)
3601 if self._transaction_connection:
3602 if not redis_node.connection_pool.owns_connection(
3603 self._transaction_connection
3604 ):
3605 previous_node = self._nodes_manager.find_connection_owner(
3606 self._transaction_connection
3607 )
3608 previous_node.connection_pool.release(self._transaction_connection)
3609 self._transaction_connection = None
3611 if not self._transaction_connection:
3612 self._transaction_connection = get_connection(redis_node)
3614 return redis_node, self._transaction_connection
3616 def execute_command(self, *args, **kwargs):
3617 slot_number: Optional[int] = None
3618 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
3619 slot_number = self._pipe.determine_slot(*args)
3621 if (
3622 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
3623 ) and not self._explicit_transaction:
3624 if args[0] == "WATCH":
3625 self._validate_watch()
3627 if slot_number is not None:
3628 if self._pipeline_slots and slot_number not in self._pipeline_slots:
3629 raise CrossSlotTransactionError(
3630 "Cannot watch or send commands on different slots"
3631 )
3633 self._pipeline_slots.add(slot_number)
3634 elif args[0] not in self.NO_SLOTS_COMMANDS:
3635 raise RedisClusterException(
3636 f"Cannot identify slot number for command: {args[0]},"
3637 "it cannot be triggered in a transaction"
3638 )
3640 return self._immediate_execute_command(*args, **kwargs)
3641 else:
3642 if slot_number is not None:
3643 self._pipeline_slots.add(slot_number)
3645 return self.pipeline_execute_command(*args, **kwargs)
3647 def _validate_watch(self):
3648 if self._explicit_transaction:
3649 raise RedisError("Cannot issue a WATCH after a MULTI")
3651 self._watching = True
3653 def _immediate_execute_command(self, *args, **options):
3654 return self._retry.call_with_retry(
3655 lambda: self._get_connection_and_send_command(*args, **options),
3656 self._reinitialize_on_error,
3657 )
3659 def _get_connection_and_send_command(self, *args, **options):
3660 redis_node, connection = self._get_client_and_connection_for_transaction()
3661 return self._send_command_parse_response(
3662 connection, redis_node, args[0], *args, **options
3663 )
3665 def _send_command_parse_response(
3666 self, conn, redis_node: Redis, command_name, *args, **options
3667 ):
3668 """
3669 Send a command and parse the response
3670 """
3672 conn.send_command(*args)
3673 output = redis_node.parse_response(conn, command_name, **options)
3675 if command_name in self.UNWATCH_COMMANDS:
3676 self._watching = False
3677 return output
3679 def _reinitialize_on_error(self, error):
3680 if self._watching:
3681 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
3682 raise WatchError("Slot rebalancing occurred while watching keys")
3684 if (
3685 type(error) in self.SLOT_REDIRECT_ERRORS
3686 or type(error) in self.CONNECTION_ERRORS
3687 ):
3688 if self._transaction_connection:
3689 # Disconnect and release back to pool
3690 self._transaction_connection.disconnect()
3691 node = self._nodes_manager.find_connection_owner(
3692 self._transaction_connection
3693 )
3694 if node and node.redis_connection:
3695 node.redis_connection.connection_pool.release(
3696 self._transaction_connection
3697 )
3698 self._transaction_connection = None
3700 self._pipe.reinitialize_counter += 1
3701 if self._pipe._should_reinitialized():
3702 self._nodes_manager.initialize()
3703 self.reinitialize_counter = 0
3704 else:
3705 if isinstance(error, AskError):
3706 self._nodes_manager.move_slot(error)
3708 self._executing = False
3710 def _raise_first_error(self, responses, stack):
3711 """
3712 Raise the first exception on the stack
3713 """
3714 for r, cmd in zip(responses, stack):
3715 if isinstance(r, Exception):
3716 self.annotate_exception(r, cmd.position + 1, cmd.args)
3717 raise r
3719 def execute(self, raise_on_error: bool = True) -> List[Any]:
3720 stack = self._command_queue
3721 if not stack and (not self._watching or not self._pipeline_slots):
3722 return []
3724 return self._execute_transaction_with_retries(stack, raise_on_error)
3726 def _execute_transaction_with_retries(
3727 self, stack: List["PipelineCommand"], raise_on_error: bool
3728 ):
3729 return self._retry.call_with_retry(
3730 lambda: self._execute_transaction(stack, raise_on_error),
3731 self._reinitialize_on_error,
3732 )
3734 def _execute_transaction(
3735 self, stack: List["PipelineCommand"], raise_on_error: bool
3736 ):
3737 if len(self._pipeline_slots) > 1:
3738 raise CrossSlotTransactionError(
3739 "All keys involved in a cluster transaction must map to the same slot"
3740 )
3742 self._executing = True
3744 redis_node, connection = self._get_client_and_connection_for_transaction()
3746 stack = chain(
3747 [PipelineCommand(("MULTI",))],
3748 stack,
3749 [PipelineCommand(("EXEC",))],
3750 )
3751 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
3752 packed_commands = connection.pack_commands(commands)
3753 connection.send_packed_command(packed_commands)
3754 errors = []
3756 # parse off the response for MULTI
3757 # NOTE: we need to handle ResponseErrors here and continue
3758 # so that we read all the additional command messages from
3759 # the socket
3760 try:
3761 redis_node.parse_response(connection, "MULTI")
3762 except ResponseError as e:
3763 self.annotate_exception(e, 0, "MULTI")
3764 errors.append(e)
3765 except self.CONNECTION_ERRORS as cluster_error:
3766 self.annotate_exception(cluster_error, 0, "MULTI")
3767 raise
3769 # and all the other commands
3770 for i, command in enumerate(self._command_queue):
3771 if EMPTY_RESPONSE in command.options:
3772 errors.append((i, command.options[EMPTY_RESPONSE]))
3773 else:
3774 try:
3775 _ = redis_node.parse_response(connection, "_")
3776 except self.SLOT_REDIRECT_ERRORS as slot_error:
3777 self.annotate_exception(slot_error, i + 1, command.args)
3778 errors.append(slot_error)
3779 except self.CONNECTION_ERRORS as cluster_error:
3780 self.annotate_exception(cluster_error, i + 1, command.args)
3781 raise
3782 except ResponseError as e:
3783 self.annotate_exception(e, i + 1, command.args)
3784 errors.append(e)
3786 response = None
3787 # parse the EXEC.
3788 try:
3789 response = redis_node.parse_response(connection, "EXEC")
3790 except ExecAbortError:
3791 if errors:
3792 raise errors[0]
3793 raise
3795 self._executing = False
3797 # EXEC clears any watched keys
3798 self._watching = False
3800 if response is None:
3801 raise WatchError("Watched variable changed.")
3803 # put any parse errors into the response
3804 for i, e in errors:
3805 response.insert(i, e)
3807 if len(response) != len(self._command_queue):
3808 raise InvalidPipelineStack(
3809 "Unexpected response length for cluster pipeline EXEC."
3810 " Command stack was {} but response had length {}".format(
3811 [c.args[0] for c in self._command_queue], len(response)
3812 )
3813 )
3815 # find any errors in the response and raise if necessary
3816 if raise_on_error or len(errors) > 0:
3817 self._raise_first_error(
3818 response,
3819 self._command_queue,
3820 )
3822 # We have to run response callbacks manually
3823 data = []
3824 for r, cmd in zip(response, self._command_queue):
3825 if not isinstance(r, Exception):
3826 command_name = cmd.args[0]
3827 if command_name in self._pipe.cluster_response_callbacks:
3828 r = self._pipe.cluster_response_callbacks[command_name](
3829 r, **cmd.options
3830 )
3831 data.append(r)
3832 return data
3834 def reset(self):
3835 self._command_queue = []
3837 # make sure to reset the connection state in the event that we were
3838 # watching something
3839 if self._transaction_connection:
3840 try:
3841 if self._watching:
3842 # call this manually since our unwatch or
3843 # immediate_execute_command methods can call reset()
3844 self._transaction_connection.send_command("UNWATCH")
3845 self._transaction_connection.read_response()
3846 # we can safely return the connection to the pool here since we're
3847 # sure we're no longer WATCHing anything
3848 node = self._nodes_manager.find_connection_owner(
3849 self._transaction_connection
3850 )
3851 if node and node.redis_connection:
3852 node.redis_connection.connection_pool.release(
3853 self._transaction_connection
3854 )
3855 self._transaction_connection = None
3856 except self.CONNECTION_ERRORS:
3857 # disconnect will also remove any previous WATCHes
3858 if self._transaction_connection:
3859 self._transaction_connection.disconnect()
3860 node = self._nodes_manager.find_connection_owner(
3861 self._transaction_connection
3862 )
3863 if node and node.redis_connection:
3864 node.redis_connection.connection_pool.release(
3865 self._transaction_connection
3866 )
3867 self._transaction_connection = None
3869 # clean up the other instance attributes
3870 self._watching = False
3871 self._explicit_transaction = False
3872 self._pipeline_slots = set()
3873 self._executing = False
3875 def send_cluster_commands(
3876 self, stack, raise_on_error=True, allow_redirections=True
3877 ):
3878 raise NotImplementedError(
3879 "send_cluster_commands cannot be executed in transactional context."
3880 )
3882 def multi(self):
3883 if self._explicit_transaction:
3884 raise RedisError("Cannot issue nested calls to MULTI")
3885 if self._command_queue:
3886 raise RedisError(
3887 "Commands without an initial WATCH have already been issued"
3888 )
3889 self._explicit_transaction = True
3891 def watch(self, *names):
3892 if self._explicit_transaction:
3893 raise RedisError("Cannot issue a WATCH after a MULTI")
3895 return self.execute_command("WATCH", *names)
3897 def unwatch(self):
3898 if self._watching:
3899 return self.execute_command("UNWATCH")
3901 return True
3903 def discard(self):
3904 self.reset()
3906 def delete(self, *names):
3907 return self.execute_command("DEL", *names)
3909 def unlink(self, *names):
3910 return self.execute_command("UNLINK", *names)