Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import random
2import socket
3import sys
4import threading
5import time
6from abc import ABC, abstractmethod
7from collections import OrderedDict
8from copy import copy
9from enum import Enum
10from itertools import chain
11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
13from redis._parsers import CommandsParser, Encoder
14from redis._parsers.helpers import parse_scan
15from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
16from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
17from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
18from redis.commands import READ_COMMANDS, RedisClusterCommands
19from redis.commands.helpers import list_or_args
20from redis.connection import (
21 Connection,
22 ConnectionPool,
23 parse_url,
24)
25from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
26from redis.event import (
27 AfterPooledConnectionsInstantiationEvent,
28 AfterPubSubConnectionInstantiationEvent,
29 ClientType,
30 EventDispatcher,
31)
32from redis.exceptions import (
33 AskError,
34 AuthenticationError,
35 ClusterDownError,
36 ClusterError,
37 ConnectionError,
38 CrossSlotTransactionError,
39 DataError,
40 ExecAbortError,
41 InvalidPipelineStack,
42 MaxConnectionsError,
43 MovedError,
44 RedisClusterException,
45 RedisError,
46 ResponseError,
47 SlotNotCoveredError,
48 TimeoutError,
49 TryAgainError,
50 WatchError,
51)
52from redis.lock import Lock
53from redis.maint_notifications import MaintNotificationsConfig
54from redis.retry import Retry
55from redis.utils import (
56 deprecated_args,
57 dict_merge,
58 list_keys_to_dict,
59 merge_result,
60 safe_str,
61 str_if_bytes,
62 truncate_text,
63)
66def get_node_name(host: str, port: Union[str, int]) -> str:
67 return f"{host}:{port}"
70@deprecated_args(
71 allowed_args=["redis_node"],
72 reason="Use get_connection(redis_node) instead",
73 version="5.3.0",
74)
75def get_connection(redis_node: Redis, *args, **options) -> Connection:
76 return redis_node.connection or redis_node.connection_pool.get_connection()
79def parse_scan_result(command, res, **options):
80 cursors = {}
81 ret = []
82 for node_name, response in res.items():
83 cursor, r = parse_scan(response, **options)
84 cursors[node_name] = cursor
85 ret += r
87 return cursors, ret
90def parse_pubsub_numsub(command, res, **options):
91 numsub_d = OrderedDict()
92 for numsub_tups in res.values():
93 for channel, numsubbed in numsub_tups:
94 try:
95 numsub_d[channel] += numsubbed
96 except KeyError:
97 numsub_d[channel] = numsubbed
99 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
100 return ret_numsub
103def parse_cluster_slots(
104 resp: Any, **options: Any
105) -> Dict[Tuple[int, int], Dict[str, Any]]:
106 current_host = options.get("current_host", "")
108 def fix_server(*args: Any) -> Tuple[str, Any]:
109 return str_if_bytes(args[0]) or current_host, args[1]
111 slots = {}
112 for slot in resp:
113 start, end, primary = slot[:3]
114 replicas = slot[3:]
115 slots[start, end] = {
116 "primary": fix_server(*primary),
117 "replicas": [fix_server(*replica) for replica in replicas],
118 }
120 return slots
123def parse_cluster_shards(resp, **options):
124 """
125 Parse CLUSTER SHARDS response.
126 """
127 if isinstance(resp[0], dict):
128 return resp
129 shards = []
130 for x in resp:
131 shard = {"slots": [], "nodes": []}
132 for i in range(0, len(x[1]), 2):
133 shard["slots"].append((x[1][i], (x[1][i + 1])))
134 nodes = x[3]
135 for node in nodes:
136 dict_node = {}
137 for i in range(0, len(node), 2):
138 dict_node[node[i]] = node[i + 1]
139 shard["nodes"].append(dict_node)
140 shards.append(shard)
142 return shards
145def parse_cluster_myshardid(resp, **options):
146 """
147 Parse CLUSTER MYSHARDID response.
148 """
149 return resp.decode("utf-8")
152PRIMARY = "primary"
153REPLICA = "replica"
154SLOT_ID = "slot-id"
156REDIS_ALLOWED_KEYS = (
157 "connection_class",
158 "connection_pool",
159 "connection_pool_class",
160 "client_name",
161 "credential_provider",
162 "db",
163 "decode_responses",
164 "encoding",
165 "encoding_errors",
166 "host",
167 "lib_name",
168 "lib_version",
169 "max_connections",
170 "nodes_flag",
171 "redis_connect_func",
172 "password",
173 "port",
174 "timeout",
175 "queue_class",
176 "retry",
177 "retry_on_timeout",
178 "protocol",
179 "socket_connect_timeout",
180 "socket_keepalive",
181 "socket_keepalive_options",
182 "socket_timeout",
183 "ssl",
184 "ssl_ca_certs",
185 "ssl_ca_data",
186 "ssl_certfile",
187 "ssl_cert_reqs",
188 "ssl_include_verify_flags",
189 "ssl_exclude_verify_flags",
190 "ssl_keyfile",
191 "ssl_password",
192 "ssl_check_hostname",
193 "unix_socket_path",
194 "username",
195 "cache",
196 "cache_config",
197)
198KWARGS_DISABLED_KEYS = ("host", "port", "retry")
201def cleanup_kwargs(**kwargs):
202 """
203 Remove unsupported or disabled keys from kwargs
204 """
205 connection_kwargs = {
206 k: v
207 for k, v in kwargs.items()
208 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
209 }
211 return connection_kwargs
214class AbstractRedisCluster:
215 RedisClusterRequestTTL = 16
217 PRIMARIES = "primaries"
218 REPLICAS = "replicas"
219 ALL_NODES = "all"
220 RANDOM = "random"
221 DEFAULT_NODE = "default-node"
223 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
225 COMMAND_FLAGS = dict_merge(
226 list_keys_to_dict(
227 [
228 "ACL CAT",
229 "ACL DELUSER",
230 "ACL DRYRUN",
231 "ACL GENPASS",
232 "ACL GETUSER",
233 "ACL HELP",
234 "ACL LIST",
235 "ACL LOG",
236 "ACL LOAD",
237 "ACL SAVE",
238 "ACL SETUSER",
239 "ACL USERS",
240 "ACL WHOAMI",
241 "AUTH",
242 "CLIENT LIST",
243 "CLIENT SETINFO",
244 "CLIENT SETNAME",
245 "CLIENT GETNAME",
246 "CONFIG SET",
247 "CONFIG REWRITE",
248 "CONFIG RESETSTAT",
249 "TIME",
250 "PUBSUB CHANNELS",
251 "PUBSUB NUMPAT",
252 "PUBSUB NUMSUB",
253 "PUBSUB SHARDCHANNELS",
254 "PUBSUB SHARDNUMSUB",
255 "PING",
256 "INFO",
257 "SHUTDOWN",
258 "KEYS",
259 "DBSIZE",
260 "BGSAVE",
261 "SLOWLOG GET",
262 "SLOWLOG LEN",
263 "SLOWLOG RESET",
264 "WAIT",
265 "WAITAOF",
266 "SAVE",
267 "MEMORY PURGE",
268 "MEMORY MALLOC-STATS",
269 "MEMORY STATS",
270 "LASTSAVE",
271 "CLIENT TRACKINGINFO",
272 "CLIENT PAUSE",
273 "CLIENT UNPAUSE",
274 "CLIENT UNBLOCK",
275 "CLIENT ID",
276 "CLIENT REPLY",
277 "CLIENT GETREDIR",
278 "CLIENT INFO",
279 "CLIENT KILL",
280 "READONLY",
281 "CLUSTER INFO",
282 "CLUSTER MEET",
283 "CLUSTER MYSHARDID",
284 "CLUSTER NODES",
285 "CLUSTER REPLICAS",
286 "CLUSTER RESET",
287 "CLUSTER SET-CONFIG-EPOCH",
288 "CLUSTER SLOTS",
289 "CLUSTER SHARDS",
290 "CLUSTER COUNT-FAILURE-REPORTS",
291 "CLUSTER KEYSLOT",
292 "COMMAND",
293 "COMMAND COUNT",
294 "COMMAND LIST",
295 "COMMAND GETKEYS",
296 "CONFIG GET",
297 "DEBUG",
298 "RANDOMKEY",
299 "READONLY",
300 "READWRITE",
301 "TIME",
302 "TFUNCTION LOAD",
303 "TFUNCTION DELETE",
304 "TFUNCTION LIST",
305 "TFCALL",
306 "TFCALLASYNC",
307 "LATENCY HISTORY",
308 "LATENCY LATEST",
309 "LATENCY RESET",
310 "MODULE LIST",
311 "MODULE LOAD",
312 "MODULE UNLOAD",
313 "MODULE LOADEX",
314 ],
315 DEFAULT_NODE,
316 ),
317 list_keys_to_dict(
318 [
319 "FLUSHALL",
320 "FLUSHDB",
321 "FUNCTION DELETE",
322 "FUNCTION FLUSH",
323 "FUNCTION LIST",
324 "FUNCTION LOAD",
325 "FUNCTION RESTORE",
326 "SCAN",
327 "SCRIPT EXISTS",
328 "SCRIPT FLUSH",
329 "SCRIPT LOAD",
330 ],
331 PRIMARIES,
332 ),
333 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
334 list_keys_to_dict(
335 [
336 "CLUSTER COUNTKEYSINSLOT",
337 "CLUSTER DELSLOTS",
338 "CLUSTER DELSLOTSRANGE",
339 "CLUSTER GETKEYSINSLOT",
340 "CLUSTER SETSLOT",
341 ],
342 SLOT_ID,
343 ),
344 )
346 SEARCH_COMMANDS = (
347 [
348 "FT.CREATE",
349 "FT.SEARCH",
350 "FT.AGGREGATE",
351 "FT.EXPLAIN",
352 "FT.EXPLAINCLI",
353 "FT,PROFILE",
354 "FT.ALTER",
355 "FT.DROPINDEX",
356 "FT.ALIASADD",
357 "FT.ALIASUPDATE",
358 "FT.ALIASDEL",
359 "FT.TAGVALS",
360 "FT.SUGADD",
361 "FT.SUGGET",
362 "FT.SUGDEL",
363 "FT.SUGLEN",
364 "FT.SYNUPDATE",
365 "FT.SYNDUMP",
366 "FT.SPELLCHECK",
367 "FT.DICTADD",
368 "FT.DICTDEL",
369 "FT.DICTDUMP",
370 "FT.INFO",
371 "FT._LIST",
372 "FT.CONFIG",
373 "FT.ADD",
374 "FT.DEL",
375 "FT.DROP",
376 "FT.GET",
377 "FT.MGET",
378 "FT.SYNADD",
379 ],
380 )
382 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
383 "CLUSTER SLOTS": parse_cluster_slots,
384 "CLUSTER SHARDS": parse_cluster_shards,
385 "CLUSTER MYSHARDID": parse_cluster_myshardid,
386 }
388 RESULT_CALLBACKS = dict_merge(
389 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
390 list_keys_to_dict(
391 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
392 ),
393 list_keys_to_dict(
394 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
395 ),
396 list_keys_to_dict(
397 [
398 "PING",
399 "CONFIG SET",
400 "CONFIG REWRITE",
401 "CONFIG RESETSTAT",
402 "CLIENT SETNAME",
403 "BGSAVE",
404 "SLOWLOG RESET",
405 "SAVE",
406 "MEMORY PURGE",
407 "CLIENT PAUSE",
408 "CLIENT UNPAUSE",
409 ],
410 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
411 ),
412 list_keys_to_dict(
413 ["DBSIZE", "WAIT"],
414 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
415 ),
416 list_keys_to_dict(
417 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
418 ),
419 list_keys_to_dict(["SCAN"], parse_scan_result),
420 list_keys_to_dict(
421 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
422 ),
423 list_keys_to_dict(
424 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
425 ),
426 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
427 )
429 ERRORS_ALLOW_RETRY = (
430 ConnectionError,
431 TimeoutError,
432 ClusterDownError,
433 SlotNotCoveredError,
434 )
436 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
437 """Replace the default cluster node.
438 A random cluster node will be chosen if target_node isn't passed, and primaries
439 will be prioritized. The default node will not be changed if there are no other
440 nodes in the cluster.
442 Args:
443 target_node (ClusterNode, optional): Target node to replace the default
444 node. Defaults to None.
445 """
446 if target_node:
447 self.nodes_manager.default_node = target_node
448 else:
449 curr_node = self.get_default_node()
450 primaries = [node for node in self.get_primaries() if node != curr_node]
451 if primaries:
452 # Choose a primary if the cluster contains different primaries
453 self.nodes_manager.default_node = random.choice(primaries)
454 else:
455 # Otherwise, choose a primary if the cluster contains different primaries
456 replicas = [node for node in self.get_replicas() if node != curr_node]
457 if replicas:
458 self.nodes_manager.default_node = random.choice(replicas)
461class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
462 @classmethod
463 def from_url(cls, url, **kwargs):
464 """
465 Return a Redis client object configured from the given URL
467 For example::
469 redis://[[username]:[password]]@localhost:6379/0
470 rediss://[[username]:[password]]@localhost:6379/0
471 unix://[username@]/path/to/socket.sock?db=0[&password=password]
473 Three URL schemes are supported:
475 - `redis://` creates a TCP socket connection. See more at:
476 <https://www.iana.org/assignments/uri-schemes/prov/redis>
477 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
478 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
479 - ``unix://``: creates a Unix Domain Socket connection.
481 The username, password, hostname, path and all querystring values
482 are passed through urllib.parse.unquote in order to replace any
483 percent-encoded values with their corresponding characters.
485 There are several ways to specify a database number. The first value
486 found will be used:
488 1. A ``db`` querystring option, e.g. redis://localhost?db=0
489 2. If using the redis:// or rediss:// schemes, the path argument
490 of the url, e.g. redis://localhost/0
491 3. A ``db`` keyword argument to this function.
493 If none of these options are specified, the default db=0 is used.
495 All querystring options are cast to their appropriate Python types.
496 Boolean arguments can be specified with string values "True"/"False"
497 or "Yes"/"No". Values that cannot be properly cast cause a
498 ``ValueError`` to be raised. Once parsed, the querystring arguments
499 and keyword arguments are passed to the ``ConnectionPool``'s
500 class initializer. In the case of conflicting arguments, querystring
501 arguments always win.
503 """
504 return cls(url=url, **kwargs)
506 @deprecated_args(
507 args_to_warn=["read_from_replicas"],
508 reason="Please configure the 'load_balancing_strategy' instead",
509 version="5.3.0",
510 )
511 @deprecated_args(
512 args_to_warn=[
513 "cluster_error_retry_attempts",
514 ],
515 reason="Please configure the 'retry' object instead",
516 version="6.0.0",
517 )
518 def __init__(
519 self,
520 host: Optional[str] = None,
521 port: int = 6379,
522 startup_nodes: Optional[List["ClusterNode"]] = None,
523 cluster_error_retry_attempts: int = 3,
524 retry: Optional["Retry"] = None,
525 require_full_coverage: bool = True,
526 reinitialize_steps: int = 5,
527 read_from_replicas: bool = False,
528 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
529 dynamic_startup_nodes: bool = True,
530 url: Optional[str] = None,
531 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
532 cache: Optional[CacheInterface] = None,
533 cache_config: Optional[CacheConfig] = None,
534 event_dispatcher: Optional[EventDispatcher] = None,
535 **kwargs,
536 ):
537 """
538 Initialize a new RedisCluster client.
540 :param startup_nodes:
541 List of nodes from which initial bootstrapping can be done
542 :param host:
543 Can be used to point to a startup node
544 :param port:
545 Can be used to point to a startup node
546 :param require_full_coverage:
547 When set to False (default value): the client will not require a
548 full coverage of the slots. However, if not all slots are covered,
549 and at least one node has 'cluster-require-full-coverage' set to
550 'yes,' the server will throw a ClusterDownError for some key-based
551 commands. See -
552 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
553 When set to True: all slots must be covered to construct the
554 cluster client. If not all slots are covered, RedisClusterException
555 will be thrown.
556 :param read_from_replicas:
557 @deprecated - please use load_balancing_strategy instead
558 Enable read from replicas in READONLY mode. You can read possibly
559 stale data.
560 When set to true, read commands will be assigned between the
561 primary and its replications in a Round-Robin manner.
562 :param load_balancing_strategy:
563 Enable read from replicas in READONLY mode and defines the load balancing
564 strategy that will be used for cluster node selection.
565 The data read from replicas is eventually consistent with the data in primary nodes.
566 :param dynamic_startup_nodes:
567 Set the RedisCluster's startup nodes to all of the discovered nodes.
568 If true (default value), the cluster's discovered nodes will be used to
569 determine the cluster nodes-slots mapping in the next topology refresh.
570 It will remove the initial passed startup nodes if their endpoints aren't
571 listed in the CLUSTER SLOTS output.
572 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
573 specific IP addresses, it is best to set it to false.
574 :param cluster_error_retry_attempts:
575 @deprecated - Please configure the 'retry' object instead
576 In case 'retry' object is set - this argument is ignored!
578 Number of times to retry before raising an error when
579 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
580 :class:`~.ClusterDownError` are encountered
581 :param retry:
582 A retry object that defines the retry strategy and the number of
583 retries for the cluster client.
584 In current implementation for the cluster client (starting form redis-py version 6.0.0)
585 the retry object is not yet fully utilized, instead it is used just to determine
586 the number of retries for the cluster client.
587 In the future releases the retry object will be used to handle the cluster client retries!
588 :param reinitialize_steps:
589 Specifies the number of MOVED errors that need to occur before
590 reinitializing the whole cluster topology. If a MOVED error occurs
591 and the cluster does not need to be reinitialized on this current
592 error handling, only the MOVED slot will be patched with the
593 redirected node.
594 To reinitialize the cluster on every MOVED error, set
595 reinitialize_steps to 1.
596 To avoid reinitializing the cluster on moved errors, set
597 reinitialize_steps to 0.
598 :param address_remap:
599 An optional callable which, when provided with an internal network
600 address of a node, e.g. a `(host, port)` tuple, will return the address
601 where the node is reachable. This can be used to map the addresses at
602 which the nodes _think_ they are, to addresses at which a client may
603 reach them, such as when they sit behind a proxy.
605 :**kwargs:
606 Extra arguments that will be sent into Redis instance when created
607 (See Official redis-py doc for supported kwargs - the only limitation
608 is that you can't provide 'retry' object as part of kwargs.
609 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
610 Some kwargs are not supported and will raise a
611 RedisClusterException:
612 - db (Redis do not support database SELECT in cluster mode)
613 """
614 if startup_nodes is None:
615 startup_nodes = []
617 if "db" in kwargs:
618 # Argument 'db' is not possible to use in cluster mode
619 raise RedisClusterException(
620 "Argument 'db' is not possible to use in cluster mode"
621 )
623 if "retry" in kwargs:
624 # Argument 'retry' is not possible to be used in kwargs when in cluster mode
625 # the kwargs are set to the lower level connections to the cluster nodes
626 # and there we provide retry configuration without retries allowed.
627 # The retries should be handled on cluster client level.
628 raise RedisClusterException(
629 "The 'retry' argument cannot be used in kwargs when running in cluster mode."
630 )
632 # Get the startup node/s
633 from_url = False
634 if url is not None:
635 from_url = True
636 url_options = parse_url(url)
637 if "path" in url_options:
638 raise RedisClusterException(
639 "RedisCluster does not currently support Unix Domain "
640 "Socket connections"
641 )
642 if "db" in url_options and url_options["db"] != 0:
643 # Argument 'db' is not possible to use in cluster mode
644 raise RedisClusterException(
645 "A ``db`` querystring option can only be 0 in cluster mode"
646 )
647 kwargs.update(url_options)
648 host = kwargs.get("host")
649 port = kwargs.get("port", port)
650 startup_nodes.append(ClusterNode(host, port))
651 elif host is not None and port is not None:
652 startup_nodes.append(ClusterNode(host, port))
653 elif len(startup_nodes) == 0:
654 # No startup node was provided
655 raise RedisClusterException(
656 "RedisCluster requires at least one node to discover the "
657 "cluster. Please provide one of the followings:\n"
658 "1. host and port, for example:\n"
659 " RedisCluster(host='localhost', port=6379)\n"
660 "2. list of startup nodes, for example:\n"
661 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
662 " ClusterNode('localhost', 6378)])"
663 )
664 # Update the connection arguments
665 # Whenever a new connection is established, RedisCluster's on_connect
666 # method should be run
667 # If the user passed on_connect function we'll save it and run it
668 # inside the RedisCluster.on_connect() function
669 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
670 kwargs.update({"redis_connect_func": self.on_connect})
671 kwargs = cleanup_kwargs(**kwargs)
672 if retry:
673 self.retry = retry
674 else:
675 self.retry = Retry(
676 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
677 retries=cluster_error_retry_attempts,
678 )
680 self.encoder = Encoder(
681 kwargs.get("encoding", "utf-8"),
682 kwargs.get("encoding_errors", "strict"),
683 kwargs.get("decode_responses", False),
684 )
685 protocol = kwargs.get("protocol", None)
686 if (cache_config or cache) and protocol not in [3, "3"]:
687 raise RedisError("Client caching is only supported with RESP version 3")
689 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
690 self.node_flags = self.__class__.NODE_FLAGS.copy()
691 self.read_from_replicas = read_from_replicas
692 self.load_balancing_strategy = load_balancing_strategy
693 self.reinitialize_counter = 0
694 self.reinitialize_steps = reinitialize_steps
695 if event_dispatcher is None:
696 self._event_dispatcher = EventDispatcher()
697 else:
698 self._event_dispatcher = event_dispatcher
699 self.startup_nodes = startup_nodes
700 self.nodes_manager = NodesManager(
701 startup_nodes=startup_nodes,
702 from_url=from_url,
703 require_full_coverage=require_full_coverage,
704 dynamic_startup_nodes=dynamic_startup_nodes,
705 address_remap=address_remap,
706 cache=cache,
707 cache_config=cache_config,
708 event_dispatcher=self._event_dispatcher,
709 **kwargs,
710 )
712 self.cluster_response_callbacks = CaseInsensitiveDict(
713 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
714 )
715 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
717 self.commands_parser = CommandsParser(self)
718 self._lock = threading.RLock()
720 def __enter__(self):
721 return self
723 def __exit__(self, exc_type, exc_value, traceback):
724 self.close()
726 def __del__(self):
727 try:
728 self.close()
729 except Exception:
730 pass
732 def disconnect_connection_pools(self):
733 for node in self.get_nodes():
734 if node.redis_connection:
735 try:
736 node.redis_connection.connection_pool.disconnect()
737 except OSError:
738 # Client was already disconnected. do nothing
739 pass
741 def on_connect(self, connection):
742 """
743 Initialize the connection, authenticate and select a database and send
744 READONLY if it is set during object initialization.
745 """
746 connection.on_connect()
748 if self.read_from_replicas or self.load_balancing_strategy:
749 # Sending READONLY command to server to configure connection as
750 # readonly. Since each cluster node may change its server type due
751 # to a failover, we should establish a READONLY connection
752 # regardless of the server type. If this is a primary connection,
753 # READONLY would not affect executing write commands.
754 connection.send_command("READONLY")
755 if str_if_bytes(connection.read_response()) != "OK":
756 raise ConnectionError("READONLY command failed")
758 if self.user_on_connect_func is not None:
759 self.user_on_connect_func(connection)
761 def get_redis_connection(self, node: "ClusterNode") -> Redis:
762 if not node.redis_connection:
763 with self._lock:
764 if not node.redis_connection:
765 self.nodes_manager.create_redis_connections([node])
766 return node.redis_connection
768 def get_node(self, host=None, port=None, node_name=None):
769 return self.nodes_manager.get_node(host, port, node_name)
771 def get_primaries(self):
772 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
774 def get_replicas(self):
775 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
777 def get_random_node(self):
778 return random.choice(list(self.nodes_manager.nodes_cache.values()))
780 def get_nodes(self):
781 return list(self.nodes_manager.nodes_cache.values())
783 def get_node_from_key(self, key, replica=False):
784 """
785 Get the node that holds the key's slot.
786 If replica set to True but the slot doesn't have any replicas, None is
787 returned.
788 """
789 slot = self.keyslot(key)
790 slot_cache = self.nodes_manager.slots_cache.get(slot)
791 if slot_cache is None or len(slot_cache) == 0:
792 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
793 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
794 return None
795 elif replica:
796 node_idx = 1
797 else:
798 # primary
799 node_idx = 0
801 return slot_cache[node_idx]
803 def get_default_node(self):
804 """
805 Get the cluster's default node
806 """
807 return self.nodes_manager.default_node
809 def set_default_node(self, node):
810 """
811 Set the default node of the cluster.
812 :param node: 'ClusterNode'
813 :return True if the default node was set, else False
814 """
815 if node is None or self.get_node(node_name=node.name) is None:
816 return False
817 self.nodes_manager.default_node = node
818 return True
820 def set_retry(self, retry: Retry) -> None:
821 self.retry = retry
823 def monitor(self, target_node=None):
824 """
825 Returns a Monitor object for the specified target node.
826 The default cluster node will be selected if no target node was
827 specified.
828 Monitor is useful for handling the MONITOR command to the redis server.
829 next_command() method returns one command from monitor
830 listen() method yields commands from monitor.
831 """
832 if target_node is None:
833 target_node = self.get_default_node()
834 if target_node.redis_connection is None:
835 raise RedisClusterException(
836 f"Cluster Node {target_node.name} has no redis_connection"
837 )
838 return target_node.redis_connection.monitor()
840 def pubsub(self, node=None, host=None, port=None, **kwargs):
841 """
842 Allows passing a ClusterNode, or host&port, to get a pubsub instance
843 connected to the specified node
844 """
845 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
847 def pipeline(self, transaction=None, shard_hint=None):
848 """
849 Cluster impl:
850 Pipelines do not work in cluster mode the same way they
851 do in normal mode. Create a clone of this object so
852 that simulating pipelines will work correctly. Each
853 command will be called directly when used and
854 when calling execute() will only return the result stack.
855 """
856 if shard_hint:
857 raise RedisClusterException("shard_hint is deprecated in cluster mode")
859 return ClusterPipeline(
860 nodes_manager=self.nodes_manager,
861 commands_parser=self.commands_parser,
862 startup_nodes=self.nodes_manager.startup_nodes,
863 result_callbacks=self.result_callbacks,
864 cluster_response_callbacks=self.cluster_response_callbacks,
865 read_from_replicas=self.read_from_replicas,
866 load_balancing_strategy=self.load_balancing_strategy,
867 reinitialize_steps=self.reinitialize_steps,
868 retry=self.retry,
869 lock=self._lock,
870 transaction=transaction,
871 )
873 def lock(
874 self,
875 name,
876 timeout=None,
877 sleep=0.1,
878 blocking=True,
879 blocking_timeout=None,
880 lock_class=None,
881 thread_local=True,
882 raise_on_release_error: bool = True,
883 ):
884 """
885 Return a new Lock object using key ``name`` that mimics
886 the behavior of threading.Lock.
888 If specified, ``timeout`` indicates a maximum life for the lock.
889 By default, it will remain locked until release() is called.
891 ``sleep`` indicates the amount of time to sleep per loop iteration
892 when the lock is in blocking mode and another client is currently
893 holding the lock.
895 ``blocking`` indicates whether calling ``acquire`` should block until
896 the lock has been acquired or to fail immediately, causing ``acquire``
897 to return False and the lock not being acquired. Defaults to True.
898 Note this value can be overridden by passing a ``blocking``
899 argument to ``acquire``.
901 ``blocking_timeout`` indicates the maximum amount of time in seconds to
902 spend trying to acquire the lock. A value of ``None`` indicates
903 continue trying forever. ``blocking_timeout`` can be specified as a
904 float or integer, both representing the number of seconds to wait.
906 ``lock_class`` forces the specified lock implementation. Note that as
907 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
908 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
909 you have created your own custom lock class.
911 ``thread_local`` indicates whether the lock token is placed in
912 thread-local storage. By default, the token is placed in thread local
913 storage so that a thread only sees its token, not a token set by
914 another thread. Consider the following timeline:
916 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
917 thread-1 sets the token to "abc"
918 time: 1, thread-2 blocks trying to acquire `my-lock` using the
919 Lock instance.
920 time: 5, thread-1 has not yet completed. redis expires the lock
921 key.
922 time: 5, thread-2 acquired `my-lock` now that it's available.
923 thread-2 sets the token to "xyz"
924 time: 6, thread-1 finishes its work and calls release(). if the
925 token is *not* stored in thread local storage, then
926 thread-1 would see the token value as "xyz" and would be
927 able to successfully release the thread-2's lock.
929 ``raise_on_release_error`` indicates whether to raise an exception when
930 the lock is no longer owned when exiting the context manager. By default,
931 this is True, meaning an exception will be raised. If False, the warning
932 will be logged and the exception will be suppressed.
934 In some use cases it's necessary to disable thread local storage. For
935 example, if you have code where one thread acquires a lock and passes
936 that lock instance to a worker thread to release later. If thread
937 local storage isn't disabled in this case, the worker thread won't see
938 the token set by the thread that acquired the lock. Our assumption
939 is that these cases aren't common and as such default to using
940 thread local storage."""
941 if lock_class is None:
942 lock_class = Lock
943 return lock_class(
944 self,
945 name,
946 timeout=timeout,
947 sleep=sleep,
948 blocking=blocking,
949 blocking_timeout=blocking_timeout,
950 thread_local=thread_local,
951 raise_on_release_error=raise_on_release_error,
952 )
954 def set_response_callback(self, command, callback):
955 """Set a custom Response Callback"""
956 self.cluster_response_callbacks[command] = callback
958 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
959 # Determine which nodes should be executed the command on.
960 # Returns a list of target nodes.
961 command = args[0].upper()
962 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
963 command = f"{args[0]} {args[1]}".upper()
965 nodes_flag = kwargs.pop("nodes_flag", None)
966 if nodes_flag is not None:
967 # nodes flag passed by the user
968 command_flag = nodes_flag
969 else:
970 # get the nodes group for this command if it was predefined
971 command_flag = self.command_flags.get(command)
972 if command_flag == self.__class__.RANDOM:
973 # return a random node
974 return [self.get_random_node()]
975 elif command_flag == self.__class__.PRIMARIES:
976 # return all primaries
977 return self.get_primaries()
978 elif command_flag == self.__class__.REPLICAS:
979 # return all replicas
980 return self.get_replicas()
981 elif command_flag == self.__class__.ALL_NODES:
982 # return all nodes
983 return self.get_nodes()
984 elif command_flag == self.__class__.DEFAULT_NODE:
985 # return the cluster's default node
986 return [self.nodes_manager.default_node]
987 elif command in self.__class__.SEARCH_COMMANDS[0]:
988 return [self.nodes_manager.default_node]
989 else:
990 # get the node that holds the key's slot
991 slot = self.determine_slot(*args)
992 node = self.nodes_manager.get_node_from_slot(
993 slot,
994 self.read_from_replicas and command in READ_COMMANDS,
995 self.load_balancing_strategy if command in READ_COMMANDS else None,
996 )
997 return [node]
999 def _should_reinitialized(self):
1000 # To reinitialize the cluster on every MOVED error,
1001 # set reinitialize_steps to 1.
1002 # To avoid reinitializing the cluster on moved errors, set
1003 # reinitialize_steps to 0.
1004 if self.reinitialize_steps == 0:
1005 return False
1006 else:
1007 return self.reinitialize_counter % self.reinitialize_steps == 0
1009 def keyslot(self, key):
1010 """
1011 Calculate keyslot for a given key.
1012 See Keys distribution model in https://redis.io/topics/cluster-spec
1013 """
1014 k = self.encoder.encode(key)
1015 return key_slot(k)
1017 def _get_command_keys(self, *args):
1018 """
1019 Get the keys in the command. If the command has no keys in in, None is
1020 returned.
1022 NOTE: Due to a bug in redis<7.0, this function does not work properly
1023 for EVAL or EVALSHA when the `numkeys` arg is 0.
1024 - issue: https://github.com/redis/redis/issues/9493
1025 - fix: https://github.com/redis/redis/pull/9733
1027 So, don't use this function with EVAL or EVALSHA.
1028 """
1029 redis_conn = self.get_default_node().redis_connection
1030 return self.commands_parser.get_keys(redis_conn, *args)
1032 def determine_slot(self, *args) -> int:
1033 """
1034 Figure out what slot to use based on args.
1036 Raises a RedisClusterException if there's a missing key and we can't
1037 determine what slots to map the command to; or, if the keys don't
1038 all map to the same key slot.
1039 """
1040 command = args[0]
1041 if self.command_flags.get(command) == SLOT_ID:
1042 # The command contains the slot ID
1043 return args[1]
1045 # Get the keys in the command
1047 # EVAL and EVALSHA are common enough that it's wasteful to go to the
1048 # redis server to parse the keys. Besides, there is a bug in redis<7.0
1049 # where `self._get_command_keys()` fails anyway. So, we special case
1050 # EVAL/EVALSHA.
1051 if command.upper() in ("EVAL", "EVALSHA"):
1052 # command syntax: EVAL "script body" num_keys ...
1053 if len(args) <= 2:
1054 raise RedisClusterException(f"Invalid args in command: {args}")
1055 num_actual_keys = int(args[2])
1056 eval_keys = args[3 : 3 + num_actual_keys]
1057 # if there are 0 keys, that means the script can be run on any node
1058 # so we can just return a random slot
1059 if len(eval_keys) == 0:
1060 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1061 keys = eval_keys
1062 else:
1063 keys = self._get_command_keys(*args)
1064 if keys is None or len(keys) == 0:
1065 # FCALL can call a function with 0 keys, that means the function
1066 # can be run on any node so we can just return a random slot
1067 if command.upper() in ("FCALL", "FCALL_RO"):
1068 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1069 raise RedisClusterException(
1070 "No way to dispatch this command to Redis Cluster. "
1071 "Missing key.\nYou can execute the command by specifying "
1072 f"target nodes.\nCommand: {args}"
1073 )
1075 # single key command
1076 if len(keys) == 1:
1077 return self.keyslot(keys[0])
1079 # multi-key command; we need to make sure all keys are mapped to
1080 # the same slot
1081 slots = {self.keyslot(key) for key in keys}
1082 if len(slots) != 1:
1083 raise RedisClusterException(
1084 f"{command} - all keys must map to the same key slot"
1085 )
1087 return slots.pop()
1089 def get_encoder(self):
1090 """
1091 Get the connections' encoder
1092 """
1093 return self.encoder
1095 def get_connection_kwargs(self):
1096 """
1097 Get the connections' key-word arguments
1098 """
1099 return self.nodes_manager.connection_kwargs
1101 def _is_nodes_flag(self, target_nodes):
1102 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1104 def _parse_target_nodes(self, target_nodes):
1105 if isinstance(target_nodes, list):
1106 nodes = target_nodes
1107 elif isinstance(target_nodes, ClusterNode):
1108 # Supports passing a single ClusterNode as a variable
1109 nodes = [target_nodes]
1110 elif isinstance(target_nodes, dict):
1111 # Supports dictionaries of the format {node_name: node}.
1112 # It enables to execute commands with multi nodes as follows:
1113 # rc.cluster_save_config(rc.get_primaries())
1114 nodes = target_nodes.values()
1115 else:
1116 raise TypeError(
1117 "target_nodes type can be one of the following: "
1118 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1119 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1120 f"The passed type is {type(target_nodes)}"
1121 )
1122 return nodes
1124 def execute_command(self, *args, **kwargs):
1125 return self._internal_execute_command(*args, **kwargs)
1127 def _internal_execute_command(self, *args, **kwargs):
1128 """
1129 Wrapper for ERRORS_ALLOW_RETRY error handling.
1131 It will try the number of times specified by the retries property from
1132 config option "self.retry" which defaults to 3 unless manually
1133 configured.
1135 If it reaches the number of times, the command will raise the exception
1137 Key argument :target_nodes: can be passed with the following types:
1138 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1139 ClusterNode
1140 list<ClusterNode>
1141 dict<Any, ClusterNode>
1142 """
1143 target_nodes_specified = False
1144 is_default_node = False
1145 target_nodes = None
1146 passed_targets = kwargs.pop("target_nodes", None)
1147 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1148 target_nodes = self._parse_target_nodes(passed_targets)
1149 target_nodes_specified = True
1150 # If an error that allows retrying was thrown, the nodes and slots
1151 # cache were reinitialized. We will retry executing the command with
1152 # the updated cluster setup only when the target nodes can be
1153 # determined again with the new cache tables. Therefore, when target
1154 # nodes were passed to this function, we cannot retry the command
1155 # execution since the nodes may not be valid anymore after the tables
1156 # were reinitialized. So in case of passed target nodes,
1157 # retry_attempts will be set to 0.
1158 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
1159 # Add one for the first execution
1160 execute_attempts = 1 + retry_attempts
1161 for _ in range(execute_attempts):
1162 try:
1163 res = {}
1164 if not target_nodes_specified:
1165 # Determine the nodes to execute the command on
1166 target_nodes = self._determine_nodes(
1167 *args, **kwargs, nodes_flag=passed_targets
1168 )
1169 if not target_nodes:
1170 raise RedisClusterException(
1171 f"No targets were found to execute {args} command on"
1172 )
1173 if (
1174 len(target_nodes) == 1
1175 and target_nodes[0] == self.get_default_node()
1176 ):
1177 is_default_node = True
1178 for node in target_nodes:
1179 res[node.name] = self._execute_command(node, *args, **kwargs)
1180 # Return the processed result
1181 return self._process_result(args[0], res, **kwargs)
1182 except Exception as e:
1183 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1184 if is_default_node:
1185 # Replace the default cluster node
1186 self.replace_default_node()
1187 # The nodes and slots cache were reinitialized.
1188 # Try again with the new cluster setup.
1189 retry_attempts -= 1
1190 continue
1191 else:
1192 # raise the exception
1193 raise e
1195 def _execute_command(self, target_node, *args, **kwargs):
1196 """
1197 Send a command to a node in the cluster
1198 """
1199 command = args[0]
1200 redis_node = None
1201 connection = None
1202 redirect_addr = None
1203 asking = False
1204 moved = False
1205 ttl = int(self.RedisClusterRequestTTL)
1207 while ttl > 0:
1208 ttl -= 1
1209 try:
1210 if asking:
1211 target_node = self.get_node(node_name=redirect_addr)
1212 elif moved:
1213 # MOVED occurred and the slots cache was updated,
1214 # refresh the target node
1215 slot = self.determine_slot(*args)
1216 target_node = self.nodes_manager.get_node_from_slot(
1217 slot,
1218 self.read_from_replicas and command in READ_COMMANDS,
1219 self.load_balancing_strategy
1220 if command in READ_COMMANDS
1221 else None,
1222 )
1223 moved = False
1225 redis_node = self.get_redis_connection(target_node)
1226 connection = get_connection(redis_node)
1227 if asking:
1228 connection.send_command("ASKING")
1229 redis_node.parse_response(connection, "ASKING", **kwargs)
1230 asking = False
1231 connection.send_command(*args, **kwargs)
1232 response = redis_node.parse_response(connection, command, **kwargs)
1234 # Remove keys entry, it needs only for cache.
1235 kwargs.pop("keys", None)
1237 if command in self.cluster_response_callbacks:
1238 response = self.cluster_response_callbacks[command](
1239 response, **kwargs
1240 )
1241 return response
1242 except AuthenticationError:
1243 raise
1244 except MaxConnectionsError:
1245 # MaxConnectionsError indicates client-side resource exhaustion
1246 # (too many connections in the pool), not a node failure.
1247 # Don't treat this as a node failure - just re-raise the error
1248 # without reinitializing the cluster.
1249 raise
1250 except (ConnectionError, TimeoutError) as e:
1251 # ConnectionError can also be raised if we couldn't get a
1252 # connection from the pool before timing out, so check that
1253 # this is an actual connection before attempting to disconnect.
1254 if connection is not None:
1255 connection.disconnect()
1257 # Remove the failed node from the startup nodes before we try
1258 # to reinitialize the cluster
1259 self.nodes_manager.startup_nodes.pop(target_node.name, None)
1260 # Reset the cluster node's connection
1261 target_node.redis_connection = None
1262 self.nodes_manager.initialize()
1263 raise e
1264 except MovedError as e:
1265 # First, we will try to patch the slots/nodes cache with the
1266 # redirected node output and try again. If MovedError exceeds
1267 # 'reinitialize_steps' number of times, we will force
1268 # reinitializing the tables, and then try again.
1269 # 'reinitialize_steps' counter will increase faster when
1270 # the same client object is shared between multiple threads. To
1271 # reduce the frequency you can set this variable in the
1272 # RedisCluster constructor.
1273 self.reinitialize_counter += 1
1274 if self._should_reinitialized():
1275 self.nodes_manager.initialize()
1276 # Reset the counter
1277 self.reinitialize_counter = 0
1278 else:
1279 self.nodes_manager.update_moved_exception(e)
1280 moved = True
1281 except TryAgainError:
1282 if ttl < self.RedisClusterRequestTTL / 2:
1283 time.sleep(0.05)
1284 except AskError as e:
1285 redirect_addr = get_node_name(host=e.host, port=e.port)
1286 asking = True
1287 except (ClusterDownError, SlotNotCoveredError):
1288 # ClusterDownError can occur during a failover and to get
1289 # self-healed, we will try to reinitialize the cluster layout
1290 # and retry executing the command
1292 # SlotNotCoveredError can occur when the cluster is not fully
1293 # initialized or can be temporary issue.
1294 # We will try to reinitialize the cluster topology
1295 # and retry executing the command
1297 time.sleep(0.25)
1298 self.nodes_manager.initialize()
1299 raise
1300 except ResponseError:
1301 raise
1302 except Exception as e:
1303 if connection:
1304 connection.disconnect()
1305 raise e
1306 finally:
1307 if connection is not None:
1308 redis_node.connection_pool.release(connection)
1310 raise ClusterError("TTL exhausted.")
1312 def close(self) -> None:
1313 try:
1314 with self._lock:
1315 if self.nodes_manager:
1316 self.nodes_manager.close()
1317 except AttributeError:
1318 # RedisCluster's __init__ can fail before nodes_manager is set
1319 pass
1321 def _process_result(self, command, res, **kwargs):
1322 """
1323 Process the result of the executed command.
1324 The function would return a dict or a single value.
1326 :type command: str
1327 :type res: dict
1329 `res` should be in the following format:
1330 Dict<node_name, command_result>
1331 """
1332 if command in self.result_callbacks:
1333 return self.result_callbacks[command](command, res, **kwargs)
1334 elif len(res) == 1:
1335 # When we execute the command on a single node, we can
1336 # remove the dictionary and return a single response
1337 return list(res.values())[0]
1338 else:
1339 return res
1341 def load_external_module(self, funcname, func):
1342 """
1343 This function can be used to add externally defined redis modules,
1344 and their namespaces to the redis client.
1346 ``funcname`` - A string containing the name of the function to create
1347 ``func`` - The function, being added to this class.
1348 """
1349 setattr(self, funcname, func)
1351 def transaction(self, func, *watches, **kwargs):
1352 """
1353 Convenience method for executing the callable `func` as a transaction
1354 while watching all keys specified in `watches`. The 'func' callable
1355 should expect a single argument which is a Pipeline object.
1356 """
1357 shard_hint = kwargs.pop("shard_hint", None)
1358 value_from_callable = kwargs.pop("value_from_callable", False)
1359 watch_delay = kwargs.pop("watch_delay", None)
1360 with self.pipeline(True, shard_hint) as pipe:
1361 while True:
1362 try:
1363 if watches:
1364 pipe.watch(*watches)
1365 func_value = func(pipe)
1366 exec_value = pipe.execute()
1367 return func_value if value_from_callable else exec_value
1368 except WatchError:
1369 if watch_delay is not None and watch_delay > 0:
1370 time.sleep(watch_delay)
1371 continue
1374class ClusterNode:
1375 def __init__(self, host, port, server_type=None, redis_connection=None):
1376 if host == "localhost":
1377 host = socket.gethostbyname(host)
1379 self.host = host
1380 self.port = port
1381 self.name = get_node_name(host, port)
1382 self.server_type = server_type
1383 self.redis_connection = redis_connection
1385 def __repr__(self):
1386 return (
1387 f"[host={self.host},"
1388 f"port={self.port},"
1389 f"name={self.name},"
1390 f"server_type={self.server_type},"
1391 f"redis_connection={self.redis_connection}]"
1392 )
1394 def __eq__(self, obj):
1395 return isinstance(obj, ClusterNode) and obj.name == self.name
1397 def __del__(self):
1398 try:
1399 if self.redis_connection is not None:
1400 self.redis_connection.close()
1401 except Exception:
1402 # Ignore errors when closing the connection
1403 pass
1406class LoadBalancingStrategy(Enum):
1407 ROUND_ROBIN = "round_robin"
1408 ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1409 RANDOM_REPLICA = "random_replica"
1412class LoadBalancer:
1413 """
1414 Round-Robin Load Balancing
1415 """
1417 def __init__(self, start_index: int = 0) -> None:
1418 self.primary_to_idx = {}
1419 self.start_index = start_index
1421 def get_server_index(
1422 self,
1423 primary: str,
1424 list_size: int,
1425 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1426 ) -> int:
1427 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1428 return self._get_random_replica_index(list_size)
1429 else:
1430 return self._get_round_robin_index(
1431 primary,
1432 list_size,
1433 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1434 )
1436 def reset(self) -> None:
1437 self.primary_to_idx.clear()
1439 def _get_random_replica_index(self, list_size: int) -> int:
1440 return random.randint(1, list_size - 1)
1442 def _get_round_robin_index(
1443 self, primary: str, list_size: int, replicas_only: bool
1444 ) -> int:
1445 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1446 if replicas_only and server_index == 0:
1447 # skip the primary node index
1448 server_index = 1
1449 # Update the index for the next round
1450 self.primary_to_idx[primary] = (server_index + 1) % list_size
1451 return server_index
1454class NodesManager:
1455 def __init__(
1456 self,
1457 startup_nodes,
1458 from_url=False,
1459 require_full_coverage=False,
1460 lock=None,
1461 dynamic_startup_nodes=True,
1462 connection_pool_class=ConnectionPool,
1463 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
1464 cache: Optional[CacheInterface] = None,
1465 cache_config: Optional[CacheConfig] = None,
1466 cache_factory: Optional[CacheFactoryInterface] = None,
1467 event_dispatcher: Optional[EventDispatcher] = None,
1468 **kwargs,
1469 ):
1470 self.nodes_cache: Dict[str, Redis] = {}
1471 self.slots_cache = {}
1472 self.startup_nodes = {}
1473 self.default_node = None
1474 self.populate_startup_nodes(startup_nodes)
1475 self.from_url = from_url
1476 self._require_full_coverage = require_full_coverage
1477 self._dynamic_startup_nodes = dynamic_startup_nodes
1478 self.connection_pool_class = connection_pool_class
1479 self.address_remap = address_remap
1480 self._cache = cache
1481 self._cache_config = cache_config
1482 self._cache_factory = cache_factory
1483 self._moved_exception = None
1484 self.connection_kwargs = kwargs
1485 self.read_load_balancer = LoadBalancer()
1486 if lock is None:
1487 lock = threading.RLock()
1488 self._lock = lock
1489 if event_dispatcher is None:
1490 self._event_dispatcher = EventDispatcher()
1491 else:
1492 self._event_dispatcher = event_dispatcher
1493 self._credential_provider = self.connection_kwargs.get(
1494 "credential_provider", None
1495 )
1496 self.initialize()
1498 def get_node(self, host=None, port=None, node_name=None):
1499 """
1500 Get the requested node from the cluster's nodes.
1501 nodes.
1502 :return: ClusterNode if the node exists, else None
1503 """
1504 if host and port:
1505 # the user passed host and port
1506 if host == "localhost":
1507 host = socket.gethostbyname(host)
1508 return self.nodes_cache.get(get_node_name(host=host, port=port))
1509 elif node_name:
1510 return self.nodes_cache.get(node_name)
1511 else:
1512 return None
1514 def update_moved_exception(self, exception):
1515 self._moved_exception = exception
1517 def _update_moved_slots(self):
1518 """
1519 Update the slot's node with the redirected one
1520 """
1521 e = self._moved_exception
1522 redirected_node = self.get_node(host=e.host, port=e.port)
1523 if redirected_node is not None:
1524 # The node already exists
1525 if redirected_node.server_type is not PRIMARY:
1526 # Update the node's server type
1527 redirected_node.server_type = PRIMARY
1528 else:
1529 # This is a new node, we will add it to the nodes cache
1530 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
1531 self.nodes_cache[redirected_node.name] = redirected_node
1532 if redirected_node in self.slots_cache[e.slot_id]:
1533 # The MOVED error resulted from a failover, and the new slot owner
1534 # had previously been a replica.
1535 old_primary = self.slots_cache[e.slot_id][0]
1536 # Update the old primary to be a replica and add it to the end of
1537 # the slot's node list
1538 old_primary.server_type = REPLICA
1539 self.slots_cache[e.slot_id].append(old_primary)
1540 # Remove the old replica, which is now a primary, from the slot's
1541 # node list
1542 self.slots_cache[e.slot_id].remove(redirected_node)
1543 # Override the old primary with the new one
1544 self.slots_cache[e.slot_id][0] = redirected_node
1545 if self.default_node == old_primary:
1546 # Update the default node with the new primary
1547 self.default_node = redirected_node
1548 else:
1549 # The new slot owner is a new server, or a server from a different
1550 # shard. We need to remove all current nodes from the slot's list
1551 # (including replications) and add just the new node.
1552 self.slots_cache[e.slot_id] = [redirected_node]
1553 # Reset moved_exception
1554 self._moved_exception = None
1556 @deprecated_args(
1557 args_to_warn=["server_type"],
1558 reason=(
1559 "In case you need select some load balancing strategy "
1560 "that will use replicas, please set it through 'load_balancing_strategy'"
1561 ),
1562 version="5.3.0",
1563 )
1564 def get_node_from_slot(
1565 self,
1566 slot,
1567 read_from_replicas=False,
1568 load_balancing_strategy=None,
1569 server_type=None,
1570 ) -> ClusterNode:
1571 """
1572 Gets a node that servers this hash slot
1573 """
1574 if self._moved_exception:
1575 with self._lock:
1576 if self._moved_exception:
1577 self._update_moved_slots()
1579 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
1580 raise SlotNotCoveredError(
1581 f'Slot "{slot}" not covered by the cluster. '
1582 f'"require_full_coverage={self._require_full_coverage}"'
1583 )
1585 if read_from_replicas is True and load_balancing_strategy is None:
1586 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1588 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1589 # get the server index using the strategy defined in load_balancing_strategy
1590 primary_name = self.slots_cache[slot][0].name
1591 node_idx = self.read_load_balancer.get_server_index(
1592 primary_name, len(self.slots_cache[slot]), load_balancing_strategy
1593 )
1594 elif (
1595 server_type is None
1596 or server_type == PRIMARY
1597 or len(self.slots_cache[slot]) == 1
1598 ):
1599 # return a primary
1600 node_idx = 0
1601 else:
1602 # return a replica
1603 # randomly choose one of the replicas
1604 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
1606 return self.slots_cache[slot][node_idx]
1608 def get_nodes_by_server_type(self, server_type):
1609 """
1610 Get all nodes with the specified server type
1611 :param server_type: 'primary' or 'replica'
1612 :return: list of ClusterNode
1613 """
1614 return [
1615 node
1616 for node in self.nodes_cache.values()
1617 if node.server_type == server_type
1618 ]
1620 def populate_startup_nodes(self, nodes):
1621 """
1622 Populate all startup nodes and filters out any duplicates
1623 """
1624 for n in nodes:
1625 self.startup_nodes[n.name] = n
1627 def check_slots_coverage(self, slots_cache):
1628 # Validate if all slots are covered or if we should try next
1629 # startup node
1630 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
1631 if i not in slots_cache:
1632 return False
1633 return True
1635 def create_redis_connections(self, nodes):
1636 """
1637 This function will create a redis connection to all nodes in :nodes:
1638 """
1639 connection_pools = []
1640 for node in nodes:
1641 if node.redis_connection is None:
1642 node.redis_connection = self.create_redis_node(
1643 host=node.host, port=node.port, **self.connection_kwargs
1644 )
1645 connection_pools.append(node.redis_connection.connection_pool)
1647 self._event_dispatcher.dispatch(
1648 AfterPooledConnectionsInstantiationEvent(
1649 connection_pools, ClientType.SYNC, self._credential_provider
1650 )
1651 )
1653 def create_redis_node(self, host, port, **kwargs):
1654 # We are configuring the connection pool not to retry
1655 # connections on lower level clients to avoid retrying
1656 # connections to nodes that are not reachable
1657 # and to avoid blocking the connection pool.
1658 # The only error that will have some handling in the lower
1659 # level clients is ConnectionError which will trigger disconnection
1660 # of the socket.
1661 # The retries will be handled on cluster client level
1662 # where we will have proper handling of the cluster topology
1663 node_retry_config = Retry(
1664 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
1665 )
1667 protocol = kwargs.get("protocol", None)
1668 if protocol in [3, "3"]:
1669 kwargs.update(
1670 {"maint_notifications_config": MaintNotificationsConfig(enabled=False)}
1671 )
1672 if self.from_url:
1673 # Create a redis node with a costumed connection pool
1674 kwargs.update({"host": host})
1675 kwargs.update({"port": port})
1676 kwargs.update({"cache": self._cache})
1677 kwargs.update({"retry": node_retry_config})
1678 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
1679 else:
1680 r = Redis(
1681 host=host,
1682 port=port,
1683 cache=self._cache,
1684 retry=node_retry_config,
1685 **kwargs,
1686 )
1687 return r
1689 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
1690 node_name = get_node_name(host, port)
1691 # check if we already have this node in the tmp_nodes_cache
1692 target_node = tmp_nodes_cache.get(node_name)
1693 if target_node is None:
1694 # before creating a new cluster node, check if the cluster node already
1695 # exists in the current nodes cache and has a valid connection so we can
1696 # reuse it
1697 target_node = self.nodes_cache.get(node_name)
1698 if target_node is None or target_node.redis_connection is None:
1699 # create new cluster node for this cluster
1700 target_node = ClusterNode(host, port, role)
1701 if target_node.server_type != role:
1702 target_node.server_type = role
1703 # add this node to the nodes cache
1704 tmp_nodes_cache[target_node.name] = target_node
1706 return target_node
1708 def initialize(self):
1709 """
1710 Initializes the nodes cache, slots cache and redis connections.
1711 :startup_nodes:
1712 Responsible for discovering other nodes in the cluster
1713 """
1714 self.reset()
1715 tmp_nodes_cache = {}
1716 tmp_slots = {}
1717 disagreements = []
1718 startup_nodes_reachable = False
1719 fully_covered = False
1720 kwargs = self.connection_kwargs
1721 exception = None
1722 # Convert to tuple to prevent RuntimeError if self.startup_nodes
1723 # is modified during iteration
1724 for startup_node in tuple(self.startup_nodes.values()):
1725 try:
1726 if startup_node.redis_connection:
1727 r = startup_node.redis_connection
1728 else:
1729 # Create a new Redis connection
1730 r = self.create_redis_node(
1731 startup_node.host, startup_node.port, **kwargs
1732 )
1733 self.startup_nodes[startup_node.name].redis_connection = r
1734 # Make sure cluster mode is enabled on this node
1735 try:
1736 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1737 r.connection_pool.disconnect()
1738 except ResponseError:
1739 raise RedisClusterException(
1740 "Cluster mode is not enabled on this node"
1741 )
1742 startup_nodes_reachable = True
1743 except Exception as e:
1744 # Try the next startup node.
1745 # The exception is saved and raised only if we have no more nodes.
1746 exception = e
1747 continue
1749 # CLUSTER SLOTS command results in the following output:
1750 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
1751 # where each node contains the following list: [IP, port, node_id]
1752 # Therefore, cluster_slots[0][2][0] will be the IP address of the
1753 # primary node of the first slot section.
1754 # If there's only one server in the cluster, its ``host`` is ''
1755 # Fix it to the host in startup_nodes
1756 if (
1757 len(cluster_slots) == 1
1758 and len(cluster_slots[0][2][0]) == 0
1759 and len(self.startup_nodes) == 1
1760 ):
1761 cluster_slots[0][2][0] = startup_node.host
1763 for slot in cluster_slots:
1764 primary_node = slot[2]
1765 host = str_if_bytes(primary_node[0])
1766 if host == "":
1767 host = startup_node.host
1768 port = int(primary_node[1])
1769 host, port = self.remap_host_port(host, port)
1771 nodes_for_slot = []
1773 target_node = self._get_or_create_cluster_node(
1774 host, port, PRIMARY, tmp_nodes_cache
1775 )
1776 nodes_for_slot.append(target_node)
1778 replica_nodes = slot[3:]
1779 for replica_node in replica_nodes:
1780 host = str_if_bytes(replica_node[0])
1781 port = int(replica_node[1])
1782 host, port = self.remap_host_port(host, port)
1783 target_replica_node = self._get_or_create_cluster_node(
1784 host, port, REPLICA, tmp_nodes_cache
1785 )
1786 nodes_for_slot.append(target_replica_node)
1788 for i in range(int(slot[0]), int(slot[1]) + 1):
1789 if i not in tmp_slots:
1790 tmp_slots[i] = nodes_for_slot
1791 else:
1792 # Validate that 2 nodes want to use the same slot cache
1793 # setup
1794 tmp_slot = tmp_slots[i][0]
1795 if tmp_slot.name != target_node.name:
1796 disagreements.append(
1797 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
1798 )
1800 if len(disagreements) > 5:
1801 raise RedisClusterException(
1802 f"startup_nodes could not agree on a valid "
1803 f"slots cache: {', '.join(disagreements)}"
1804 )
1806 fully_covered = self.check_slots_coverage(tmp_slots)
1807 if fully_covered:
1808 # Don't need to continue to the next startup node if all
1809 # slots are covered
1810 break
1812 if not startup_nodes_reachable:
1813 raise RedisClusterException(
1814 f"Redis Cluster cannot be connected. Please provide at least "
1815 f"one reachable node: {str(exception)}"
1816 ) from exception
1818 if self._cache is None and self._cache_config is not None:
1819 if self._cache_factory is None:
1820 self._cache = CacheFactory(self._cache_config).get_cache()
1821 else:
1822 self._cache = self._cache_factory.get_cache()
1824 # Create Redis connections to all nodes
1825 self.create_redis_connections(list(tmp_nodes_cache.values()))
1827 # Check if the slots are not fully covered
1828 if not fully_covered and self._require_full_coverage:
1829 # Despite the requirement that the slots be covered, there
1830 # isn't a full coverage
1831 raise RedisClusterException(
1832 f"All slots are not covered after query all startup_nodes. "
1833 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
1834 f"covered..."
1835 )
1837 # Set the tmp variables to the real variables
1838 self.nodes_cache = tmp_nodes_cache
1839 self.slots_cache = tmp_slots
1840 # Set the default node
1841 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
1842 if self._dynamic_startup_nodes:
1843 # Populate the startup nodes with all discovered nodes
1844 self.startup_nodes = tmp_nodes_cache
1845 # If initialize was called after a MovedError, clear it
1846 self._moved_exception = None
1848 def close(self) -> None:
1849 self.default_node = None
1850 for node in self.nodes_cache.values():
1851 if node.redis_connection:
1852 node.redis_connection.close()
1854 def reset(self):
1855 try:
1856 self.read_load_balancer.reset()
1857 except TypeError:
1858 # The read_load_balancer is None, do nothing
1859 pass
1861 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
1862 """
1863 Remap the host and port returned from the cluster to a different
1864 internal value. Useful if the client is not connecting directly
1865 to the cluster.
1866 """
1867 if self.address_remap:
1868 return self.address_remap((host, port))
1869 return host, port
1871 def find_connection_owner(self, connection: Connection) -> Optional[Redis]:
1872 node_name = get_node_name(connection.host, connection.port)
1873 for node in tuple(self.nodes_cache.values()):
1874 if node.redis_connection:
1875 conn_args = node.redis_connection.connection_pool.connection_kwargs
1876 if node_name == get_node_name(
1877 conn_args.get("host"), conn_args.get("port")
1878 ):
1879 return node
1882class ClusterPubSub(PubSub):
1883 """
1884 Wrapper for PubSub class.
1886 IMPORTANT: before using ClusterPubSub, read about the known limitations
1887 with pubsub in Cluster mode and learn how to workaround them:
1888 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
1889 """
1891 def __init__(
1892 self,
1893 redis_cluster,
1894 node=None,
1895 host=None,
1896 port=None,
1897 push_handler_func=None,
1898 event_dispatcher: Optional["EventDispatcher"] = None,
1899 **kwargs,
1900 ):
1901 """
1902 When a pubsub instance is created without specifying a node, a single
1903 node will be transparently chosen for the pubsub connection on the
1904 first command execution. The node will be determined by:
1905 1. Hashing the channel name in the request to find its keyslot
1906 2. Selecting a node that handles the keyslot: If read_from_replicas is
1907 set to true or load_balancing_strategy is set, a replica can be selected.
1909 :type redis_cluster: RedisCluster
1910 :type node: ClusterNode
1911 :type host: str
1912 :type port: int
1913 """
1914 self.node = None
1915 self.set_pubsub_node(redis_cluster, node, host, port)
1916 connection_pool = (
1917 None
1918 if self.node is None
1919 else redis_cluster.get_redis_connection(self.node).connection_pool
1920 )
1921 self.cluster = redis_cluster
1922 self.node_pubsub_mapping = {}
1923 self._pubsubs_generator = self._pubsubs_generator()
1924 if event_dispatcher is None:
1925 self._event_dispatcher = EventDispatcher()
1926 else:
1927 self._event_dispatcher = event_dispatcher
1928 super().__init__(
1929 connection_pool=connection_pool,
1930 encoder=redis_cluster.encoder,
1931 push_handler_func=push_handler_func,
1932 event_dispatcher=self._event_dispatcher,
1933 **kwargs,
1934 )
1936 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
1937 """
1938 The pubsub node will be set according to the passed node, host and port
1939 When none of the node, host, or port are specified - the node is set
1940 to None and will be determined by the keyslot of the channel in the
1941 first command to be executed.
1942 RedisClusterException will be thrown if the passed node does not exist
1943 in the cluster.
1944 If host is passed without port, or vice versa, a DataError will be
1945 thrown.
1946 :type cluster: RedisCluster
1947 :type node: ClusterNode
1948 :type host: str
1949 :type port: int
1950 """
1951 if node is not None:
1952 # node is passed by the user
1953 self._raise_on_invalid_node(cluster, node, node.host, node.port)
1954 pubsub_node = node
1955 elif host is not None and port is not None:
1956 # host and port passed by the user
1957 node = cluster.get_node(host=host, port=port)
1958 self._raise_on_invalid_node(cluster, node, host, port)
1959 pubsub_node = node
1960 elif any([host, port]) is True:
1961 # only 'host' or 'port' passed
1962 raise DataError("Passing a host requires passing a port, and vice versa")
1963 else:
1964 # nothing passed by the user. set node to None
1965 pubsub_node = None
1967 self.node = pubsub_node
1969 def get_pubsub_node(self):
1970 """
1971 Get the node that is being used as the pubsub connection
1972 """
1973 return self.node
1975 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
1976 """
1977 Raise a RedisClusterException if the node is None or doesn't exist in
1978 the cluster.
1979 """
1980 if node is None or redis_cluster.get_node(node_name=node.name) is None:
1981 raise RedisClusterException(
1982 f"Node {host}:{port} doesn't exist in the cluster"
1983 )
1985 def execute_command(self, *args):
1986 """
1987 Execute a subscribe/unsubscribe command.
1989 Taken code from redis-py and tweak to make it work within a cluster.
1990 """
1991 # NOTE: don't parse the response in this function -- it could pull a
1992 # legitimate message off the stack if the connection is already
1993 # subscribed to one or more channels
1995 if self.connection is None:
1996 if self.connection_pool is None:
1997 if len(args) > 1:
1998 # Hash the first channel and get one of the nodes holding
1999 # this slot
2000 channel = args[1]
2001 slot = self.cluster.keyslot(channel)
2002 node = self.cluster.nodes_manager.get_node_from_slot(
2003 slot,
2004 self.cluster.read_from_replicas,
2005 self.cluster.load_balancing_strategy,
2006 )
2007 else:
2008 # Get a random node
2009 node = self.cluster.get_random_node()
2010 self.node = node
2011 redis_connection = self.cluster.get_redis_connection(node)
2012 self.connection_pool = redis_connection.connection_pool
2013 self.connection = self.connection_pool.get_connection()
2014 # register a callback that re-subscribes to any channels we
2015 # were listening to when we were disconnected
2016 self.connection.register_connect_callback(self.on_connect)
2017 if self.push_handler_func is not None:
2018 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
2019 self._event_dispatcher.dispatch(
2020 AfterPubSubConnectionInstantiationEvent(
2021 self.connection, self.connection_pool, ClientType.SYNC, self._lock
2022 )
2023 )
2024 connection = self.connection
2025 self._execute(connection, connection.send_command, *args)
2027 def _get_node_pubsub(self, node):
2028 try:
2029 return self.node_pubsub_mapping[node.name]
2030 except KeyError:
2031 pubsub = node.redis_connection.pubsub(
2032 push_handler_func=self.push_handler_func
2033 )
2034 self.node_pubsub_mapping[node.name] = pubsub
2035 return pubsub
2037 def _sharded_message_generator(self):
2038 for _ in range(len(self.node_pubsub_mapping)):
2039 pubsub = next(self._pubsubs_generator)
2040 message = pubsub.get_message()
2041 if message is not None:
2042 return message
2043 return None
2045 def _pubsubs_generator(self):
2046 while True:
2047 yield from self.node_pubsub_mapping.values()
2049 def get_sharded_message(
2050 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
2051 ):
2052 if target_node:
2053 message = self.node_pubsub_mapping[target_node.name].get_message(
2054 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
2055 )
2056 else:
2057 message = self._sharded_message_generator()
2058 if message is None:
2059 return None
2060 elif str_if_bytes(message["type"]) == "sunsubscribe":
2061 if message["channel"] in self.pending_unsubscribe_shard_channels:
2062 self.pending_unsubscribe_shard_channels.remove(message["channel"])
2063 self.shard_channels.pop(message["channel"], None)
2064 node = self.cluster.get_node_from_key(message["channel"])
2065 if self.node_pubsub_mapping[node.name].subscribed is False:
2066 self.node_pubsub_mapping.pop(node.name)
2067 if not self.channels and not self.patterns and not self.shard_channels:
2068 # There are no subscriptions anymore, set subscribed_event flag
2069 # to false
2070 self.subscribed_event.clear()
2071 if self.ignore_subscribe_messages or ignore_subscribe_messages:
2072 return None
2073 return message
2075 def ssubscribe(self, *args, **kwargs):
2076 if args:
2077 args = list_or_args(args[0], args[1:])
2078 s_channels = dict.fromkeys(args)
2079 s_channels.update(kwargs)
2080 for s_channel, handler in s_channels.items():
2081 node = self.cluster.get_node_from_key(s_channel)
2082 pubsub = self._get_node_pubsub(node)
2083 if handler:
2084 pubsub.ssubscribe(**{s_channel: handler})
2085 else:
2086 pubsub.ssubscribe(s_channel)
2087 self.shard_channels.update(pubsub.shard_channels)
2088 self.pending_unsubscribe_shard_channels.difference_update(
2089 self._normalize_keys({s_channel: None})
2090 )
2091 if pubsub.subscribed and not self.subscribed:
2092 self.subscribed_event.set()
2093 self.health_check_response_counter = 0
2095 def sunsubscribe(self, *args):
2096 if args:
2097 args = list_or_args(args[0], args[1:])
2098 else:
2099 args = self.shard_channels
2101 for s_channel in args:
2102 node = self.cluster.get_node_from_key(s_channel)
2103 p = self._get_node_pubsub(node)
2104 p.sunsubscribe(s_channel)
2105 self.pending_unsubscribe_shard_channels.update(
2106 p.pending_unsubscribe_shard_channels
2107 )
2109 def get_redis_connection(self):
2110 """
2111 Get the Redis connection of the pubsub connected node.
2112 """
2113 if self.node is not None:
2114 return self.node.redis_connection
2116 def disconnect(self):
2117 """
2118 Disconnect the pubsub connection.
2119 """
2120 if self.connection:
2121 self.connection.disconnect()
2122 for pubsub in self.node_pubsub_mapping.values():
2123 pubsub.connection.disconnect()
2126class ClusterPipeline(RedisCluster):
2127 """
2128 Support for Redis pipeline
2129 in cluster mode
2130 """
2132 ERRORS_ALLOW_RETRY = (
2133 ConnectionError,
2134 TimeoutError,
2135 MovedError,
2136 AskError,
2137 TryAgainError,
2138 )
2140 NO_SLOTS_COMMANDS = {"UNWATCH"}
2141 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
2142 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
2144 @deprecated_args(
2145 args_to_warn=[
2146 "cluster_error_retry_attempts",
2147 ],
2148 reason="Please configure the 'retry' object instead",
2149 version="6.0.0",
2150 )
2151 def __init__(
2152 self,
2153 nodes_manager: "NodesManager",
2154 commands_parser: "CommandsParser",
2155 result_callbacks: Optional[Dict[str, Callable]] = None,
2156 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
2157 startup_nodes: Optional[List["ClusterNode"]] = None,
2158 read_from_replicas: bool = False,
2159 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2160 cluster_error_retry_attempts: int = 3,
2161 reinitialize_steps: int = 5,
2162 retry: Optional[Retry] = None,
2163 lock=None,
2164 transaction=False,
2165 **kwargs,
2166 ):
2167 """ """
2168 self.command_stack = []
2169 self.nodes_manager = nodes_manager
2170 self.commands_parser = commands_parser
2171 self.refresh_table_asap = False
2172 self.result_callbacks = (
2173 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
2174 )
2175 self.startup_nodes = startup_nodes if startup_nodes else []
2176 self.read_from_replicas = read_from_replicas
2177 self.load_balancing_strategy = load_balancing_strategy
2178 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
2179 self.cluster_response_callbacks = cluster_response_callbacks
2180 self.reinitialize_counter = 0
2181 self.reinitialize_steps = reinitialize_steps
2182 if retry is not None:
2183 self.retry = retry
2184 else:
2185 self.retry = Retry(
2186 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
2187 retries=cluster_error_retry_attempts,
2188 )
2190 self.encoder = Encoder(
2191 kwargs.get("encoding", "utf-8"),
2192 kwargs.get("encoding_errors", "strict"),
2193 kwargs.get("decode_responses", False),
2194 )
2195 if lock is None:
2196 lock = threading.RLock()
2197 self._lock = lock
2198 self.parent_execute_command = super().execute_command
2199 self._execution_strategy: ExecutionStrategy = (
2200 PipelineStrategy(self) if not transaction else TransactionStrategy(self)
2201 )
2203 def __repr__(self):
2204 """ """
2205 return f"{type(self).__name__}"
2207 def __enter__(self):
2208 """ """
2209 return self
2211 def __exit__(self, exc_type, exc_value, traceback):
2212 """ """
2213 self.reset()
2215 def __del__(self):
2216 try:
2217 self.reset()
2218 except Exception:
2219 pass
2221 def __len__(self):
2222 """ """
2223 return len(self._execution_strategy.command_queue)
2225 def __bool__(self):
2226 "Pipeline instances should always evaluate to True on Python 3+"
2227 return True
2229 def execute_command(self, *args, **kwargs):
2230 """
2231 Wrapper function for pipeline_execute_command
2232 """
2233 return self._execution_strategy.execute_command(*args, **kwargs)
2235 def pipeline_execute_command(self, *args, **options):
2236 """
2237 Stage a command to be executed when execute() is next called
2239 Returns the current Pipeline object back so commands can be
2240 chained together, such as:
2242 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
2244 At some other point, you can then run: pipe.execute(),
2245 which will execute all commands queued in the pipe.
2246 """
2247 return self._execution_strategy.execute_command(*args, **options)
2249 def annotate_exception(self, exception, number, command):
2250 """
2251 Provides extra context to the exception prior to it being handled
2252 """
2253 self._execution_strategy.annotate_exception(exception, number, command)
2255 def execute(self, raise_on_error: bool = True) -> List[Any]:
2256 """
2257 Execute all the commands in the current pipeline
2258 """
2260 try:
2261 return self._execution_strategy.execute(raise_on_error)
2262 finally:
2263 self.reset()
2265 def reset(self):
2266 """
2267 Reset back to empty pipeline.
2268 """
2269 self._execution_strategy.reset()
2271 def send_cluster_commands(
2272 self, stack, raise_on_error=True, allow_redirections=True
2273 ):
2274 return self._execution_strategy.send_cluster_commands(
2275 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
2276 )
2278 def exists(self, *keys):
2279 return self._execution_strategy.exists(*keys)
2281 def eval(self):
2282 """ """
2283 return self._execution_strategy.eval()
2285 def multi(self):
2286 """
2287 Start a transactional block of the pipeline after WATCH commands
2288 are issued. End the transactional block with `execute`.
2289 """
2290 self._execution_strategy.multi()
2292 def load_scripts(self):
2293 """ """
2294 self._execution_strategy.load_scripts()
2296 def discard(self):
2297 """ """
2298 self._execution_strategy.discard()
2300 def watch(self, *names):
2301 """Watches the values at keys ``names``"""
2302 self._execution_strategy.watch(*names)
2304 def unwatch(self):
2305 """Unwatches all previously specified keys"""
2306 self._execution_strategy.unwatch()
2308 def script_load_for_pipeline(self, *args, **kwargs):
2309 self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
2311 def delete(self, *names):
2312 self._execution_strategy.delete(*names)
2314 def unlink(self, *names):
2315 self._execution_strategy.unlink(*names)
2318def block_pipeline_command(name: str) -> Callable[..., Any]:
2319 """
2320 Prints error because some pipelined commands should
2321 be blocked when running in cluster-mode
2322 """
2324 def inner(*args, **kwargs):
2325 raise RedisClusterException(
2326 f"ERROR: Calling pipelined function {name} is blocked "
2327 f"when running redis in cluster mode..."
2328 )
2330 return inner
2333# Blocked pipeline commands
2334PIPELINE_BLOCKED_COMMANDS = (
2335 "BGREWRITEAOF",
2336 "BGSAVE",
2337 "BITOP",
2338 "BRPOPLPUSH",
2339 "CLIENT GETNAME",
2340 "CLIENT KILL",
2341 "CLIENT LIST",
2342 "CLIENT SETNAME",
2343 "CLIENT",
2344 "CONFIG GET",
2345 "CONFIG RESETSTAT",
2346 "CONFIG REWRITE",
2347 "CONFIG SET",
2348 "CONFIG",
2349 "DBSIZE",
2350 "ECHO",
2351 "EVALSHA",
2352 "FLUSHALL",
2353 "FLUSHDB",
2354 "INFO",
2355 "KEYS",
2356 "LASTSAVE",
2357 "MGET",
2358 "MGET NONATOMIC",
2359 "MOVE",
2360 "MSET",
2361 "MSET NONATOMIC",
2362 "MSETNX",
2363 "PFCOUNT",
2364 "PFMERGE",
2365 "PING",
2366 "PUBLISH",
2367 "RANDOMKEY",
2368 "READONLY",
2369 "READWRITE",
2370 "RENAME",
2371 "RENAMENX",
2372 "RPOPLPUSH",
2373 "SAVE",
2374 "SCAN",
2375 "SCRIPT EXISTS",
2376 "SCRIPT FLUSH",
2377 "SCRIPT KILL",
2378 "SCRIPT LOAD",
2379 "SCRIPT",
2380 "SDIFF",
2381 "SDIFFSTORE",
2382 "SENTINEL GET MASTER ADDR BY NAME",
2383 "SENTINEL MASTER",
2384 "SENTINEL MASTERS",
2385 "SENTINEL MONITOR",
2386 "SENTINEL REMOVE",
2387 "SENTINEL SENTINELS",
2388 "SENTINEL SET",
2389 "SENTINEL SLAVES",
2390 "SENTINEL",
2391 "SHUTDOWN",
2392 "SINTER",
2393 "SINTERSTORE",
2394 "SLAVEOF",
2395 "SLOWLOG GET",
2396 "SLOWLOG LEN",
2397 "SLOWLOG RESET",
2398 "SLOWLOG",
2399 "SMOVE",
2400 "SORT",
2401 "SUNION",
2402 "SUNIONSTORE",
2403 "TIME",
2404)
2405for command in PIPELINE_BLOCKED_COMMANDS:
2406 command = command.replace(" ", "_").lower()
2408 setattr(ClusterPipeline, command, block_pipeline_command(command))
2411class PipelineCommand:
2412 """ """
2414 def __init__(self, args, options=None, position=None):
2415 self.args = args
2416 if options is None:
2417 options = {}
2418 self.options = options
2419 self.position = position
2420 self.result = None
2421 self.node = None
2422 self.asking = False
2425class NodeCommands:
2426 """ """
2428 def __init__(self, parse_response, connection_pool, connection):
2429 """ """
2430 self.parse_response = parse_response
2431 self.connection_pool = connection_pool
2432 self.connection = connection
2433 self.commands = []
2435 def append(self, c):
2436 """ """
2437 self.commands.append(c)
2439 def write(self):
2440 """
2441 Code borrowed from Redis so it can be fixed
2442 """
2443 connection = self.connection
2444 commands = self.commands
2446 # We are going to clobber the commands with the write, so go ahead
2447 # and ensure that nothing is sitting there from a previous run.
2448 for c in commands:
2449 c.result = None
2451 # build up all commands into a single request to increase network perf
2452 # send all the commands and catch connection and timeout errors.
2453 try:
2454 connection.send_packed_command(
2455 connection.pack_commands([c.args for c in commands])
2456 )
2457 except (ConnectionError, TimeoutError) as e:
2458 for c in commands:
2459 c.result = e
2461 def read(self):
2462 """ """
2463 connection = self.connection
2464 for c in self.commands:
2465 # if there is a result on this command,
2466 # it means we ran into an exception
2467 # like a connection error. Trying to parse
2468 # a response on a connection that
2469 # is no longer open will result in a
2470 # connection error raised by redis-py.
2471 # but redis-py doesn't check in parse_response
2472 # that the sock object is
2473 # still set and if you try to
2474 # read from a closed connection, it will
2475 # result in an AttributeError because
2476 # it will do a readline() call on None.
2477 # This can have all kinds of nasty side-effects.
2478 # Treating this case as a connection error
2479 # is fine because it will dump
2480 # the connection object back into the
2481 # pool and on the next write, it will
2482 # explicitly open the connection and all will be well.
2483 if c.result is None:
2484 try:
2485 c.result = self.parse_response(connection, c.args[0], **c.options)
2486 except (ConnectionError, TimeoutError) as e:
2487 for c in self.commands:
2488 c.result = e
2489 return
2490 except RedisError:
2491 c.result = sys.exc_info()[1]
2494class ExecutionStrategy(ABC):
2495 @property
2496 @abstractmethod
2497 def command_queue(self):
2498 pass
2500 @abstractmethod
2501 def execute_command(self, *args, **kwargs):
2502 """
2503 Execution flow for current execution strategy.
2505 See: ClusterPipeline.execute_command()
2506 """
2507 pass
2509 @abstractmethod
2510 def annotate_exception(self, exception, number, command):
2511 """
2512 Annotate exception according to current execution strategy.
2514 See: ClusterPipeline.annotate_exception()
2515 """
2516 pass
2518 @abstractmethod
2519 def pipeline_execute_command(self, *args, **options):
2520 """
2521 Pipeline execution flow for current execution strategy.
2523 See: ClusterPipeline.pipeline_execute_command()
2524 """
2525 pass
2527 @abstractmethod
2528 def execute(self, raise_on_error: bool = True) -> List[Any]:
2529 """
2530 Executes current execution strategy.
2532 See: ClusterPipeline.execute()
2533 """
2534 pass
2536 @abstractmethod
2537 def send_cluster_commands(
2538 self, stack, raise_on_error=True, allow_redirections=True
2539 ):
2540 """
2541 Sends commands according to current execution strategy.
2543 See: ClusterPipeline.send_cluster_commands()
2544 """
2545 pass
2547 @abstractmethod
2548 def reset(self):
2549 """
2550 Resets current execution strategy.
2552 See: ClusterPipeline.reset()
2553 """
2554 pass
2556 @abstractmethod
2557 def exists(self, *keys):
2558 pass
2560 @abstractmethod
2561 def eval(self):
2562 pass
2564 @abstractmethod
2565 def multi(self):
2566 """
2567 Starts transactional context.
2569 See: ClusterPipeline.multi()
2570 """
2571 pass
2573 @abstractmethod
2574 def load_scripts(self):
2575 pass
2577 @abstractmethod
2578 def watch(self, *names):
2579 pass
2581 @abstractmethod
2582 def unwatch(self):
2583 """
2584 Unwatches all previously specified keys
2586 See: ClusterPipeline.unwatch()
2587 """
2588 pass
2590 @abstractmethod
2591 def script_load_for_pipeline(self, *args, **kwargs):
2592 pass
2594 @abstractmethod
2595 def delete(self, *names):
2596 """
2597 "Delete a key specified by ``names``"
2599 See: ClusterPipeline.delete()
2600 """
2601 pass
2603 @abstractmethod
2604 def unlink(self, *names):
2605 """
2606 "Unlink a key specified by ``names``"
2608 See: ClusterPipeline.unlink()
2609 """
2610 pass
2612 @abstractmethod
2613 def discard(self):
2614 pass
2617class AbstractStrategy(ExecutionStrategy):
2618 def __init__(
2619 self,
2620 pipe: ClusterPipeline,
2621 ):
2622 self._command_queue: List[PipelineCommand] = []
2623 self._pipe = pipe
2624 self._nodes_manager = self._pipe.nodes_manager
2626 @property
2627 def command_queue(self):
2628 return self._command_queue
2630 @command_queue.setter
2631 def command_queue(self, queue: List[PipelineCommand]):
2632 self._command_queue = queue
2634 @abstractmethod
2635 def execute_command(self, *args, **kwargs):
2636 pass
2638 def pipeline_execute_command(self, *args, **options):
2639 self._command_queue.append(
2640 PipelineCommand(args, options, len(self._command_queue))
2641 )
2642 return self._pipe
2644 @abstractmethod
2645 def execute(self, raise_on_error: bool = True) -> List[Any]:
2646 pass
2648 @abstractmethod
2649 def send_cluster_commands(
2650 self, stack, raise_on_error=True, allow_redirections=True
2651 ):
2652 pass
2654 @abstractmethod
2655 def reset(self):
2656 pass
2658 def exists(self, *keys):
2659 return self.execute_command("EXISTS", *keys)
2661 def eval(self):
2662 """ """
2663 raise RedisClusterException("method eval() is not implemented")
2665 def load_scripts(self):
2666 """ """
2667 raise RedisClusterException("method load_scripts() is not implemented")
2669 def script_load_for_pipeline(self, *args, **kwargs):
2670 """ """
2671 raise RedisClusterException(
2672 "method script_load_for_pipeline() is not implemented"
2673 )
2675 def annotate_exception(self, exception, number, command):
2676 """
2677 Provides extra context to the exception prior to it being handled
2678 """
2679 cmd = " ".join(map(safe_str, command))
2680 msg = (
2681 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
2682 f"caused error: {exception.args[0]}"
2683 )
2684 exception.args = (msg,) + exception.args[1:]
2687class PipelineStrategy(AbstractStrategy):
2688 def __init__(self, pipe: ClusterPipeline):
2689 super().__init__(pipe)
2690 self.command_flags = pipe.command_flags
2692 def execute_command(self, *args, **kwargs):
2693 return self.pipeline_execute_command(*args, **kwargs)
2695 def _raise_first_error(self, stack):
2696 """
2697 Raise the first exception on the stack
2698 """
2699 for c in stack:
2700 r = c.result
2701 if isinstance(r, Exception):
2702 self.annotate_exception(r, c.position + 1, c.args)
2703 raise r
2705 def execute(self, raise_on_error: bool = True) -> List[Any]:
2706 stack = self._command_queue
2707 if not stack:
2708 return []
2710 try:
2711 return self.send_cluster_commands(stack, raise_on_error)
2712 finally:
2713 self.reset()
2715 def reset(self):
2716 """
2717 Reset back to empty pipeline.
2718 """
2719 self._command_queue = []
2721 def send_cluster_commands(
2722 self, stack, raise_on_error=True, allow_redirections=True
2723 ):
2724 """
2725 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
2727 If one of the retryable exceptions has been thrown we assume that:
2728 - connection_pool was disconnected
2729 - connection_pool was reset
2730 - refresh_table_asap set to True
2732 It will try the number of times specified by
2733 the retries in config option "self.retry"
2734 which defaults to 3 unless manually configured.
2736 If it reaches the number of times, the command will
2737 raises ClusterDownException.
2738 """
2739 if not stack:
2740 return []
2741 retry_attempts = self._pipe.retry.get_retries()
2742 while True:
2743 try:
2744 return self._send_cluster_commands(
2745 stack,
2746 raise_on_error=raise_on_error,
2747 allow_redirections=allow_redirections,
2748 )
2749 except RedisCluster.ERRORS_ALLOW_RETRY as e:
2750 if retry_attempts > 0:
2751 # Try again with the new cluster setup. All other errors
2752 # should be raised.
2753 retry_attempts -= 1
2754 pass
2755 else:
2756 raise e
2758 def _send_cluster_commands(
2759 self, stack, raise_on_error=True, allow_redirections=True
2760 ):
2761 """
2762 Send a bunch of cluster commands to the redis cluster.
2764 `allow_redirections` If the pipeline should follow
2765 `ASK` & `MOVED` responses automatically. If set
2766 to false it will raise RedisClusterException.
2767 """
2768 # the first time sending the commands we send all of
2769 # the commands that were queued up.
2770 # if we have to run through it again, we only retry
2771 # the commands that failed.
2772 attempt = sorted(stack, key=lambda x: x.position)
2773 is_default_node = False
2774 # build a list of node objects based on node names we need to
2775 nodes = {}
2777 # as we move through each command that still needs to be processed,
2778 # we figure out the slot number that command maps to, then from
2779 # the slot determine the node.
2780 for c in attempt:
2781 while True:
2782 # refer to our internal node -> slot table that
2783 # tells us where a given command should route to.
2784 # (it might be possible we have a cached node that no longer
2785 # exists in the cluster, which is why we do this in a loop)
2786 passed_targets = c.options.pop("target_nodes", None)
2787 if passed_targets and not self._is_nodes_flag(passed_targets):
2788 target_nodes = self._parse_target_nodes(passed_targets)
2789 else:
2790 target_nodes = self._determine_nodes(
2791 *c.args, node_flag=passed_targets
2792 )
2793 if not target_nodes:
2794 raise RedisClusterException(
2795 f"No targets were found to execute {c.args} command on"
2796 )
2797 if len(target_nodes) > 1:
2798 raise RedisClusterException(
2799 f"Too many targets for command {c.args}"
2800 )
2802 node = target_nodes[0]
2803 if node == self._pipe.get_default_node():
2804 is_default_node = True
2806 # now that we know the name of the node
2807 # ( it's just a string in the form of host:port )
2808 # we can build a list of commands for each node.
2809 node_name = node.name
2810 if node_name not in nodes:
2811 redis_node = self._pipe.get_redis_connection(node)
2812 try:
2813 connection = get_connection(redis_node)
2814 except (ConnectionError, TimeoutError):
2815 for n in nodes.values():
2816 n.connection_pool.release(n.connection)
2817 # Connection retries are being handled in the node's
2818 # Retry object. Reinitialize the node -> slot table.
2819 self._nodes_manager.initialize()
2820 if is_default_node:
2821 self._pipe.replace_default_node()
2822 raise
2823 nodes[node_name] = NodeCommands(
2824 redis_node.parse_response,
2825 redis_node.connection_pool,
2826 connection,
2827 )
2828 nodes[node_name].append(c)
2829 break
2831 # send the commands in sequence.
2832 # we write to all the open sockets for each node first,
2833 # before reading anything
2834 # this allows us to flush all the requests out across the
2835 # network
2836 # so that we can read them from different sockets as they come back.
2837 # we dont' multiplex on the sockets as they come available,
2838 # but that shouldn't make too much difference.
2839 try:
2840 node_commands = nodes.values()
2841 for n in node_commands:
2842 n.write()
2844 for n in node_commands:
2845 n.read()
2846 finally:
2847 # release all of the redis connections we allocated earlier
2848 # back into the connection pool.
2849 # we used to do this step as part of a try/finally block,
2850 # but it is really dangerous to
2851 # release connections back into the pool if for some
2852 # reason the socket has data still left in it
2853 # from a previous operation. The write and
2854 # read operations already have try/catch around them for
2855 # all known types of errors including connection
2856 # and socket level errors.
2857 # So if we hit an exception, something really bad
2858 # happened and putting any oF
2859 # these connections back into the pool is a very bad idea.
2860 # the socket might have unread buffer still sitting in it,
2861 # and then the next time we read from it we pass the
2862 # buffered result back from a previous command and
2863 # every single request after to that connection will always get
2864 # a mismatched result.
2865 for n in nodes.values():
2866 n.connection_pool.release(n.connection)
2868 # if the response isn't an exception it is a
2869 # valid response from the node
2870 # we're all done with that command, YAY!
2871 # if we have more commands to attempt, we've run into problems.
2872 # collect all the commands we are allowed to retry.
2873 # (MOVED, ASK, or connection errors or timeout errors)
2874 attempt = sorted(
2875 (
2876 c
2877 for c in attempt
2878 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2879 ),
2880 key=lambda x: x.position,
2881 )
2882 if attempt and allow_redirections:
2883 # RETRY MAGIC HAPPENS HERE!
2884 # send these remaining commands one at a time using `execute_command`
2885 # in the main client. This keeps our retry logic
2886 # in one place mostly,
2887 # and allows us to be more confident in correctness of behavior.
2888 # at this point any speed gains from pipelining have been lost
2889 # anyway, so we might as well make the best
2890 # attempt to get the correct behavior.
2891 #
2892 # The client command will handle retries for each
2893 # individual command sequentially as we pass each
2894 # one into `execute_command`. Any exceptions
2895 # that bubble out should only appear once all
2896 # retries have been exhausted.
2897 #
2898 # If a lot of commands have failed, we'll be setting the
2899 # flag to rebuild the slots table from scratch.
2900 # So MOVED errors should correct themselves fairly quickly.
2901 self._pipe.reinitialize_counter += 1
2902 if self._pipe._should_reinitialized():
2903 self._nodes_manager.initialize()
2904 if is_default_node:
2905 self._pipe.replace_default_node()
2906 for c in attempt:
2907 try:
2908 # send each command individually like we
2909 # do in the main client.
2910 c.result = self._pipe.parent_execute_command(*c.args, **c.options)
2911 except RedisError as e:
2912 c.result = e
2914 # turn the response back into a simple flat array that corresponds
2915 # to the sequence of commands issued in the stack in pipeline.execute()
2916 response = []
2917 for c in sorted(stack, key=lambda x: x.position):
2918 if c.args[0] in self._pipe.cluster_response_callbacks:
2919 # Remove keys entry, it needs only for cache.
2920 c.options.pop("keys", None)
2921 c.result = self._pipe.cluster_response_callbacks[c.args[0]](
2922 c.result, **c.options
2923 )
2924 response.append(c.result)
2926 if raise_on_error:
2927 self._raise_first_error(stack)
2929 return response
2931 def _is_nodes_flag(self, target_nodes):
2932 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
2934 def _parse_target_nodes(self, target_nodes):
2935 if isinstance(target_nodes, list):
2936 nodes = target_nodes
2937 elif isinstance(target_nodes, ClusterNode):
2938 # Supports passing a single ClusterNode as a variable
2939 nodes = [target_nodes]
2940 elif isinstance(target_nodes, dict):
2941 # Supports dictionaries of the format {node_name: node}.
2942 # It enables to execute commands with multi nodes as follows:
2943 # rc.cluster_save_config(rc.get_primaries())
2944 nodes = target_nodes.values()
2945 else:
2946 raise TypeError(
2947 "target_nodes type can be one of the following: "
2948 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
2949 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
2950 f"The passed type is {type(target_nodes)}"
2951 )
2952 return nodes
2954 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
2955 # Determine which nodes should be executed the command on.
2956 # Returns a list of target nodes.
2957 command = args[0].upper()
2958 if (
2959 len(args) >= 2
2960 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
2961 ):
2962 command = f"{args[0]} {args[1]}".upper()
2964 nodes_flag = kwargs.pop("nodes_flag", None)
2965 if nodes_flag is not None:
2966 # nodes flag passed by the user
2967 command_flag = nodes_flag
2968 else:
2969 # get the nodes group for this command if it was predefined
2970 command_flag = self._pipe.command_flags.get(command)
2971 if command_flag == self._pipe.RANDOM:
2972 # return a random node
2973 return [self._pipe.get_random_node()]
2974 elif command_flag == self._pipe.PRIMARIES:
2975 # return all primaries
2976 return self._pipe.get_primaries()
2977 elif command_flag == self._pipe.REPLICAS:
2978 # return all replicas
2979 return self._pipe.get_replicas()
2980 elif command_flag == self._pipe.ALL_NODES:
2981 # return all nodes
2982 return self._pipe.get_nodes()
2983 elif command_flag == self._pipe.DEFAULT_NODE:
2984 # return the cluster's default node
2985 return [self._nodes_manager.default_node]
2986 elif command in self._pipe.SEARCH_COMMANDS[0]:
2987 return [self._nodes_manager.default_node]
2988 else:
2989 # get the node that holds the key's slot
2990 slot = self._pipe.determine_slot(*args)
2991 node = self._nodes_manager.get_node_from_slot(
2992 slot,
2993 self._pipe.read_from_replicas and command in READ_COMMANDS,
2994 self._pipe.load_balancing_strategy
2995 if command in READ_COMMANDS
2996 else None,
2997 )
2998 return [node]
3000 def multi(self):
3001 raise RedisClusterException(
3002 "method multi() is not supported outside of transactional context"
3003 )
3005 def discard(self):
3006 raise RedisClusterException(
3007 "method discard() is not supported outside of transactional context"
3008 )
3010 def watch(self, *names):
3011 raise RedisClusterException(
3012 "method watch() is not supported outside of transactional context"
3013 )
3015 def unwatch(self, *names):
3016 raise RedisClusterException(
3017 "method unwatch() is not supported outside of transactional context"
3018 )
3020 def delete(self, *names):
3021 if len(names) != 1:
3022 raise RedisClusterException(
3023 "deleting multiple keys is not implemented in pipeline command"
3024 )
3026 return self.execute_command("DEL", names[0])
3028 def unlink(self, *names):
3029 if len(names) != 1:
3030 raise RedisClusterException(
3031 "unlinking multiple keys is not implemented in pipeline command"
3032 )
3034 return self.execute_command("UNLINK", names[0])
3037class TransactionStrategy(AbstractStrategy):
3038 NO_SLOTS_COMMANDS = {"UNWATCH"}
3039 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
3040 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3041 SLOT_REDIRECT_ERRORS = (AskError, MovedError)
3042 CONNECTION_ERRORS = (
3043 ConnectionError,
3044 OSError,
3045 ClusterDownError,
3046 SlotNotCoveredError,
3047 )
3049 def __init__(self, pipe: ClusterPipeline):
3050 super().__init__(pipe)
3051 self._explicit_transaction = False
3052 self._watching = False
3053 self._pipeline_slots: Set[int] = set()
3054 self._transaction_connection: Optional[Connection] = None
3055 self._executing = False
3056 self._retry = copy(self._pipe.retry)
3057 self._retry.update_supported_errors(
3058 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
3059 )
3061 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
3062 """
3063 Find a connection for a pipeline transaction.
3065 For running an atomic transaction, watch keys ensure that contents have not been
3066 altered as long as the watch commands for those keys were sent over the same
3067 connection. So once we start watching a key, we fetch a connection to the
3068 node that owns that slot and reuse it.
3069 """
3070 if not self._pipeline_slots:
3071 raise RedisClusterException(
3072 "At least a command with a key is needed to identify a node"
3073 )
3075 node: ClusterNode = self._nodes_manager.get_node_from_slot(
3076 list(self._pipeline_slots)[0], False
3077 )
3078 redis_node: Redis = self._pipe.get_redis_connection(node)
3079 if self._transaction_connection:
3080 if not redis_node.connection_pool.owns_connection(
3081 self._transaction_connection
3082 ):
3083 previous_node = self._nodes_manager.find_connection_owner(
3084 self._transaction_connection
3085 )
3086 previous_node.connection_pool.release(self._transaction_connection)
3087 self._transaction_connection = None
3089 if not self._transaction_connection:
3090 self._transaction_connection = get_connection(redis_node)
3092 return redis_node, self._transaction_connection
3094 def execute_command(self, *args, **kwargs):
3095 slot_number: Optional[int] = None
3096 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
3097 slot_number = self._pipe.determine_slot(*args)
3099 if (
3100 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
3101 ) and not self._explicit_transaction:
3102 if args[0] == "WATCH":
3103 self._validate_watch()
3105 if slot_number is not None:
3106 if self._pipeline_slots and slot_number not in self._pipeline_slots:
3107 raise CrossSlotTransactionError(
3108 "Cannot watch or send commands on different slots"
3109 )
3111 self._pipeline_slots.add(slot_number)
3112 elif args[0] not in self.NO_SLOTS_COMMANDS:
3113 raise RedisClusterException(
3114 f"Cannot identify slot number for command: {args[0]},"
3115 "it cannot be triggered in a transaction"
3116 )
3118 return self._immediate_execute_command(*args, **kwargs)
3119 else:
3120 if slot_number is not None:
3121 self._pipeline_slots.add(slot_number)
3123 return self.pipeline_execute_command(*args, **kwargs)
3125 def _validate_watch(self):
3126 if self._explicit_transaction:
3127 raise RedisError("Cannot issue a WATCH after a MULTI")
3129 self._watching = True
3131 def _immediate_execute_command(self, *args, **options):
3132 return self._retry.call_with_retry(
3133 lambda: self._get_connection_and_send_command(*args, **options),
3134 self._reinitialize_on_error,
3135 )
3137 def _get_connection_and_send_command(self, *args, **options):
3138 redis_node, connection = self._get_client_and_connection_for_transaction()
3139 return self._send_command_parse_response(
3140 connection, redis_node, args[0], *args, **options
3141 )
3143 def _send_command_parse_response(
3144 self, conn, redis_node: Redis, command_name, *args, **options
3145 ):
3146 """
3147 Send a command and parse the response
3148 """
3150 conn.send_command(*args)
3151 output = redis_node.parse_response(conn, command_name, **options)
3153 if command_name in self.UNWATCH_COMMANDS:
3154 self._watching = False
3155 return output
3157 def _reinitialize_on_error(self, error):
3158 if self._watching:
3159 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
3160 raise WatchError("Slot rebalancing occurred while watching keys")
3162 if (
3163 type(error) in self.SLOT_REDIRECT_ERRORS
3164 or type(error) in self.CONNECTION_ERRORS
3165 ):
3166 if self._transaction_connection:
3167 self._transaction_connection = None
3169 self._pipe.reinitialize_counter += 1
3170 if self._pipe._should_reinitialized():
3171 self._nodes_manager.initialize()
3172 self.reinitialize_counter = 0
3173 else:
3174 if isinstance(error, AskError):
3175 self._nodes_manager.update_moved_exception(error)
3177 self._executing = False
3179 def _raise_first_error(self, responses, stack):
3180 """
3181 Raise the first exception on the stack
3182 """
3183 for r, cmd in zip(responses, stack):
3184 if isinstance(r, Exception):
3185 self.annotate_exception(r, cmd.position + 1, cmd.args)
3186 raise r
3188 def execute(self, raise_on_error: bool = True) -> List[Any]:
3189 stack = self._command_queue
3190 if not stack and (not self._watching or not self._pipeline_slots):
3191 return []
3193 return self._execute_transaction_with_retries(stack, raise_on_error)
3195 def _execute_transaction_with_retries(
3196 self, stack: List["PipelineCommand"], raise_on_error: bool
3197 ):
3198 return self._retry.call_with_retry(
3199 lambda: self._execute_transaction(stack, raise_on_error),
3200 self._reinitialize_on_error,
3201 )
3203 def _execute_transaction(
3204 self, stack: List["PipelineCommand"], raise_on_error: bool
3205 ):
3206 if len(self._pipeline_slots) > 1:
3207 raise CrossSlotTransactionError(
3208 "All keys involved in a cluster transaction must map to the same slot"
3209 )
3211 self._executing = True
3213 redis_node, connection = self._get_client_and_connection_for_transaction()
3215 stack = chain(
3216 [PipelineCommand(("MULTI",))],
3217 stack,
3218 [PipelineCommand(("EXEC",))],
3219 )
3220 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
3221 packed_commands = connection.pack_commands(commands)
3222 connection.send_packed_command(packed_commands)
3223 errors = []
3225 # parse off the response for MULTI
3226 # NOTE: we need to handle ResponseErrors here and continue
3227 # so that we read all the additional command messages from
3228 # the socket
3229 try:
3230 redis_node.parse_response(connection, "MULTI")
3231 except ResponseError as e:
3232 self.annotate_exception(e, 0, "MULTI")
3233 errors.append(e)
3234 except self.CONNECTION_ERRORS as cluster_error:
3235 self.annotate_exception(cluster_error, 0, "MULTI")
3236 raise
3238 # and all the other commands
3239 for i, command in enumerate(self._command_queue):
3240 if EMPTY_RESPONSE in command.options:
3241 errors.append((i, command.options[EMPTY_RESPONSE]))
3242 else:
3243 try:
3244 _ = redis_node.parse_response(connection, "_")
3245 except self.SLOT_REDIRECT_ERRORS as slot_error:
3246 self.annotate_exception(slot_error, i + 1, command.args)
3247 errors.append(slot_error)
3248 except self.CONNECTION_ERRORS as cluster_error:
3249 self.annotate_exception(cluster_error, i + 1, command.args)
3250 raise
3251 except ResponseError as e:
3252 self.annotate_exception(e, i + 1, command.args)
3253 errors.append(e)
3255 response = None
3256 # parse the EXEC.
3257 try:
3258 response = redis_node.parse_response(connection, "EXEC")
3259 except ExecAbortError:
3260 if errors:
3261 raise errors[0]
3262 raise
3264 self._executing = False
3266 # EXEC clears any watched keys
3267 self._watching = False
3269 if response is None:
3270 raise WatchError("Watched variable changed.")
3272 # put any parse errors into the response
3273 for i, e in errors:
3274 response.insert(i, e)
3276 if len(response) != len(self._command_queue):
3277 raise InvalidPipelineStack(
3278 "Unexpected response length for cluster pipeline EXEC."
3279 " Command stack was {} but response had length {}".format(
3280 [c.args[0] for c in self._command_queue], len(response)
3281 )
3282 )
3284 # find any errors in the response and raise if necessary
3285 if raise_on_error or len(errors) > 0:
3286 self._raise_first_error(
3287 response,
3288 self._command_queue,
3289 )
3291 # We have to run response callbacks manually
3292 data = []
3293 for r, cmd in zip(response, self._command_queue):
3294 if not isinstance(r, Exception):
3295 command_name = cmd.args[0]
3296 if command_name in self._pipe.cluster_response_callbacks:
3297 r = self._pipe.cluster_response_callbacks[command_name](
3298 r, **cmd.options
3299 )
3300 data.append(r)
3301 return data
3303 def reset(self):
3304 self._command_queue = []
3306 # make sure to reset the connection state in the event that we were
3307 # watching something
3308 if self._transaction_connection:
3309 try:
3310 if self._watching:
3311 # call this manually since our unwatch or
3312 # immediate_execute_command methods can call reset()
3313 self._transaction_connection.send_command("UNWATCH")
3314 self._transaction_connection.read_response()
3315 # we can safely return the connection to the pool here since we're
3316 # sure we're no longer WATCHing anything
3317 node = self._nodes_manager.find_connection_owner(
3318 self._transaction_connection
3319 )
3320 node.redis_connection.connection_pool.release(
3321 self._transaction_connection
3322 )
3323 self._transaction_connection = None
3324 except self.CONNECTION_ERRORS:
3325 # disconnect will also remove any previous WATCHes
3326 if self._transaction_connection:
3327 self._transaction_connection.disconnect()
3329 # clean up the other instance attributes
3330 self._watching = False
3331 self._explicit_transaction = False
3332 self._pipeline_slots = set()
3333 self._executing = False
3335 def send_cluster_commands(
3336 self, stack, raise_on_error=True, allow_redirections=True
3337 ):
3338 raise NotImplementedError(
3339 "send_cluster_commands cannot be executed in transactional context."
3340 )
3342 def multi(self):
3343 if self._explicit_transaction:
3344 raise RedisError("Cannot issue nested calls to MULTI")
3345 if self._command_queue:
3346 raise RedisError(
3347 "Commands without an initial WATCH have already been issued"
3348 )
3349 self._explicit_transaction = True
3351 def watch(self, *names):
3352 if self._explicit_transaction:
3353 raise RedisError("Cannot issue a WATCH after a MULTI")
3355 return self.execute_command("WATCH", *names)
3357 def unwatch(self):
3358 if self._watching:
3359 return self.execute_command("UNWATCH")
3361 return True
3363 def discard(self):
3364 self.reset()
3366 def delete(self, *names):
3367 return self.execute_command("DEL", *names)
3369 def unlink(self, *names):
3370 return self.execute_command("UNLINK", *names)