Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import random
2import socket
3import sys
4import threading
5import time
6from abc import ABC, abstractmethod
7from collections import OrderedDict
8from copy import copy
9from enum import Enum
10from itertools import chain
11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
13from redis._parsers import CommandsParser, Encoder
14from redis._parsers.helpers import parse_scan
15from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
16from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
17from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
18from redis.commands import READ_COMMANDS, RedisClusterCommands
19from redis.commands.helpers import list_or_args
20from redis.connection import (
21 Connection,
22 ConnectionPool,
23 parse_url,
24)
25from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
26from redis.event import (
27 AfterPooledConnectionsInstantiationEvent,
28 AfterPubSubConnectionInstantiationEvent,
29 ClientType,
30 EventDispatcher,
31)
32from redis.exceptions import (
33 AskError,
34 AuthenticationError,
35 ClusterDownError,
36 ClusterError,
37 ConnectionError,
38 CrossSlotTransactionError,
39 DataError,
40 ExecAbortError,
41 InvalidPipelineStack,
42 MovedError,
43 RedisClusterException,
44 RedisError,
45 ResponseError,
46 SlotNotCoveredError,
47 TimeoutError,
48 TryAgainError,
49 WatchError,
50)
51from redis.lock import Lock
52from redis.retry import Retry
53from redis.utils import (
54 deprecated_args,
55 dict_merge,
56 list_keys_to_dict,
57 merge_result,
58 safe_str,
59 str_if_bytes,
60 truncate_text,
61)
64def get_node_name(host: str, port: Union[str, int]) -> str:
65 return f"{host}:{port}"
68@deprecated_args(
69 allowed_args=["redis_node"],
70 reason="Use get_connection(redis_node) instead",
71 version="5.3.0",
72)
73def get_connection(redis_node: Redis, *args, **options) -> Connection:
74 return redis_node.connection or redis_node.connection_pool.get_connection()
77def parse_scan_result(command, res, **options):
78 cursors = {}
79 ret = []
80 for node_name, response in res.items():
81 cursor, r = parse_scan(response, **options)
82 cursors[node_name] = cursor
83 ret += r
85 return cursors, ret
88def parse_pubsub_numsub(command, res, **options):
89 numsub_d = OrderedDict()
90 for numsub_tups in res.values():
91 for channel, numsubbed in numsub_tups:
92 try:
93 numsub_d[channel] += numsubbed
94 except KeyError:
95 numsub_d[channel] = numsubbed
97 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
98 return ret_numsub
101def parse_cluster_slots(
102 resp: Any, **options: Any
103) -> Dict[Tuple[int, int], Dict[str, Any]]:
104 current_host = options.get("current_host", "")
106 def fix_server(*args: Any) -> Tuple[str, Any]:
107 return str_if_bytes(args[0]) or current_host, args[1]
109 slots = {}
110 for slot in resp:
111 start, end, primary = slot[:3]
112 replicas = slot[3:]
113 slots[start, end] = {
114 "primary": fix_server(*primary),
115 "replicas": [fix_server(*replica) for replica in replicas],
116 }
118 return slots
121def parse_cluster_shards(resp, **options):
122 """
123 Parse CLUSTER SHARDS response.
124 """
125 if isinstance(resp[0], dict):
126 return resp
127 shards = []
128 for x in resp:
129 shard = {"slots": [], "nodes": []}
130 for i in range(0, len(x[1]), 2):
131 shard["slots"].append((x[1][i], (x[1][i + 1])))
132 nodes = x[3]
133 for node in nodes:
134 dict_node = {}
135 for i in range(0, len(node), 2):
136 dict_node[node[i]] = node[i + 1]
137 shard["nodes"].append(dict_node)
138 shards.append(shard)
140 return shards
143def parse_cluster_myshardid(resp, **options):
144 """
145 Parse CLUSTER MYSHARDID response.
146 """
147 return resp.decode("utf-8")
150PRIMARY = "primary"
151REPLICA = "replica"
152SLOT_ID = "slot-id"
154REDIS_ALLOWED_KEYS = (
155 "connection_class",
156 "connection_pool",
157 "connection_pool_class",
158 "client_name",
159 "credential_provider",
160 "db",
161 "decode_responses",
162 "encoding",
163 "encoding_errors",
164 "host",
165 "lib_name",
166 "lib_version",
167 "max_connections",
168 "nodes_flag",
169 "redis_connect_func",
170 "password",
171 "port",
172 "queue_class",
173 "retry",
174 "retry_on_timeout",
175 "protocol",
176 "socket_connect_timeout",
177 "socket_keepalive",
178 "socket_keepalive_options",
179 "socket_timeout",
180 "ssl",
181 "ssl_ca_certs",
182 "ssl_ca_data",
183 "ssl_certfile",
184 "ssl_cert_reqs",
185 "ssl_keyfile",
186 "ssl_password",
187 "ssl_check_hostname",
188 "unix_socket_path",
189 "username",
190 "cache",
191 "cache_config",
192)
193KWARGS_DISABLED_KEYS = ("host", "port", "retry")
196def cleanup_kwargs(**kwargs):
197 """
198 Remove unsupported or disabled keys from kwargs
199 """
200 connection_kwargs = {
201 k: v
202 for k, v in kwargs.items()
203 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
204 }
206 return connection_kwargs
209class AbstractRedisCluster:
210 RedisClusterRequestTTL = 16
212 PRIMARIES = "primaries"
213 REPLICAS = "replicas"
214 ALL_NODES = "all"
215 RANDOM = "random"
216 DEFAULT_NODE = "default-node"
218 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
220 COMMAND_FLAGS = dict_merge(
221 list_keys_to_dict(
222 [
223 "ACL CAT",
224 "ACL DELUSER",
225 "ACL DRYRUN",
226 "ACL GENPASS",
227 "ACL GETUSER",
228 "ACL HELP",
229 "ACL LIST",
230 "ACL LOG",
231 "ACL LOAD",
232 "ACL SAVE",
233 "ACL SETUSER",
234 "ACL USERS",
235 "ACL WHOAMI",
236 "AUTH",
237 "CLIENT LIST",
238 "CLIENT SETINFO",
239 "CLIENT SETNAME",
240 "CLIENT GETNAME",
241 "CONFIG SET",
242 "CONFIG REWRITE",
243 "CONFIG RESETSTAT",
244 "TIME",
245 "PUBSUB CHANNELS",
246 "PUBSUB NUMPAT",
247 "PUBSUB NUMSUB",
248 "PUBSUB SHARDCHANNELS",
249 "PUBSUB SHARDNUMSUB",
250 "PING",
251 "INFO",
252 "SHUTDOWN",
253 "KEYS",
254 "DBSIZE",
255 "BGSAVE",
256 "SLOWLOG GET",
257 "SLOWLOG LEN",
258 "SLOWLOG RESET",
259 "WAIT",
260 "WAITAOF",
261 "SAVE",
262 "MEMORY PURGE",
263 "MEMORY MALLOC-STATS",
264 "MEMORY STATS",
265 "LASTSAVE",
266 "CLIENT TRACKINGINFO",
267 "CLIENT PAUSE",
268 "CLIENT UNPAUSE",
269 "CLIENT UNBLOCK",
270 "CLIENT ID",
271 "CLIENT REPLY",
272 "CLIENT GETREDIR",
273 "CLIENT INFO",
274 "CLIENT KILL",
275 "READONLY",
276 "CLUSTER INFO",
277 "CLUSTER MEET",
278 "CLUSTER MYSHARDID",
279 "CLUSTER NODES",
280 "CLUSTER REPLICAS",
281 "CLUSTER RESET",
282 "CLUSTER SET-CONFIG-EPOCH",
283 "CLUSTER SLOTS",
284 "CLUSTER SHARDS",
285 "CLUSTER COUNT-FAILURE-REPORTS",
286 "CLUSTER KEYSLOT",
287 "COMMAND",
288 "COMMAND COUNT",
289 "COMMAND LIST",
290 "COMMAND GETKEYS",
291 "CONFIG GET",
292 "DEBUG",
293 "RANDOMKEY",
294 "READONLY",
295 "READWRITE",
296 "TIME",
297 "TFUNCTION LOAD",
298 "TFUNCTION DELETE",
299 "TFUNCTION LIST",
300 "TFCALL",
301 "TFCALLASYNC",
302 "LATENCY HISTORY",
303 "LATENCY LATEST",
304 "LATENCY RESET",
305 "MODULE LIST",
306 "MODULE LOAD",
307 "MODULE UNLOAD",
308 "MODULE LOADEX",
309 ],
310 DEFAULT_NODE,
311 ),
312 list_keys_to_dict(
313 [
314 "FLUSHALL",
315 "FLUSHDB",
316 "FUNCTION DELETE",
317 "FUNCTION FLUSH",
318 "FUNCTION LIST",
319 "FUNCTION LOAD",
320 "FUNCTION RESTORE",
321 "SCAN",
322 "SCRIPT EXISTS",
323 "SCRIPT FLUSH",
324 "SCRIPT LOAD",
325 ],
326 PRIMARIES,
327 ),
328 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
329 list_keys_to_dict(
330 [
331 "CLUSTER COUNTKEYSINSLOT",
332 "CLUSTER DELSLOTS",
333 "CLUSTER DELSLOTSRANGE",
334 "CLUSTER GETKEYSINSLOT",
335 "CLUSTER SETSLOT",
336 ],
337 SLOT_ID,
338 ),
339 )
341 SEARCH_COMMANDS = (
342 [
343 "FT.CREATE",
344 "FT.SEARCH",
345 "FT.AGGREGATE",
346 "FT.EXPLAIN",
347 "FT.EXPLAINCLI",
348 "FT,PROFILE",
349 "FT.ALTER",
350 "FT.DROPINDEX",
351 "FT.ALIASADD",
352 "FT.ALIASUPDATE",
353 "FT.ALIASDEL",
354 "FT.TAGVALS",
355 "FT.SUGADD",
356 "FT.SUGGET",
357 "FT.SUGDEL",
358 "FT.SUGLEN",
359 "FT.SYNUPDATE",
360 "FT.SYNDUMP",
361 "FT.SPELLCHECK",
362 "FT.DICTADD",
363 "FT.DICTDEL",
364 "FT.DICTDUMP",
365 "FT.INFO",
366 "FT._LIST",
367 "FT.CONFIG",
368 "FT.ADD",
369 "FT.DEL",
370 "FT.DROP",
371 "FT.GET",
372 "FT.MGET",
373 "FT.SYNADD",
374 ],
375 )
377 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
378 "CLUSTER SLOTS": parse_cluster_slots,
379 "CLUSTER SHARDS": parse_cluster_shards,
380 "CLUSTER MYSHARDID": parse_cluster_myshardid,
381 }
383 RESULT_CALLBACKS = dict_merge(
384 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
385 list_keys_to_dict(
386 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
387 ),
388 list_keys_to_dict(
389 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
390 ),
391 list_keys_to_dict(
392 [
393 "PING",
394 "CONFIG SET",
395 "CONFIG REWRITE",
396 "CONFIG RESETSTAT",
397 "CLIENT SETNAME",
398 "BGSAVE",
399 "SLOWLOG RESET",
400 "SAVE",
401 "MEMORY PURGE",
402 "CLIENT PAUSE",
403 "CLIENT UNPAUSE",
404 ],
405 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
406 ),
407 list_keys_to_dict(
408 ["DBSIZE", "WAIT"],
409 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
410 ),
411 list_keys_to_dict(
412 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
413 ),
414 list_keys_to_dict(["SCAN"], parse_scan_result),
415 list_keys_to_dict(
416 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
417 ),
418 list_keys_to_dict(
419 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
420 ),
421 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
422 )
424 ERRORS_ALLOW_RETRY = (
425 ConnectionError,
426 TimeoutError,
427 ClusterDownError,
428 SlotNotCoveredError,
429 )
431 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
432 """Replace the default cluster node.
433 A random cluster node will be chosen if target_node isn't passed, and primaries
434 will be prioritized. The default node will not be changed if there are no other
435 nodes in the cluster.
437 Args:
438 target_node (ClusterNode, optional): Target node to replace the default
439 node. Defaults to None.
440 """
441 if target_node:
442 self.nodes_manager.default_node = target_node
443 else:
444 curr_node = self.get_default_node()
445 primaries = [node for node in self.get_primaries() if node != curr_node]
446 if primaries:
447 # Choose a primary if the cluster contains different primaries
448 self.nodes_manager.default_node = random.choice(primaries)
449 else:
450 # Otherwise, choose a primary if the cluster contains different primaries
451 replicas = [node for node in self.get_replicas() if node != curr_node]
452 if replicas:
453 self.nodes_manager.default_node = random.choice(replicas)
456class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
457 @classmethod
458 def from_url(cls, url, **kwargs):
459 """
460 Return a Redis client object configured from the given URL
462 For example::
464 redis://[[username]:[password]]@localhost:6379/0
465 rediss://[[username]:[password]]@localhost:6379/0
466 unix://[username@]/path/to/socket.sock?db=0[&password=password]
468 Three URL schemes are supported:
470 - `redis://` creates a TCP socket connection. See more at:
471 <https://www.iana.org/assignments/uri-schemes/prov/redis>
472 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
473 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
474 - ``unix://``: creates a Unix Domain Socket connection.
476 The username, password, hostname, path and all querystring values
477 are passed through urllib.parse.unquote in order to replace any
478 percent-encoded values with their corresponding characters.
480 There are several ways to specify a database number. The first value
481 found will be used:
483 1. A ``db`` querystring option, e.g. redis://localhost?db=0
484 2. If using the redis:// or rediss:// schemes, the path argument
485 of the url, e.g. redis://localhost/0
486 3. A ``db`` keyword argument to this function.
488 If none of these options are specified, the default db=0 is used.
490 All querystring options are cast to their appropriate Python types.
491 Boolean arguments can be specified with string values "True"/"False"
492 or "Yes"/"No". Values that cannot be properly cast cause a
493 ``ValueError`` to be raised. Once parsed, the querystring arguments
494 and keyword arguments are passed to the ``ConnectionPool``'s
495 class initializer. In the case of conflicting arguments, querystring
496 arguments always win.
498 """
499 return cls(url=url, **kwargs)
501 @deprecated_args(
502 args_to_warn=["read_from_replicas"],
503 reason="Please configure the 'load_balancing_strategy' instead",
504 version="5.3.0",
505 )
506 @deprecated_args(
507 args_to_warn=[
508 "cluster_error_retry_attempts",
509 ],
510 reason="Please configure the 'retry' object instead",
511 version="6.0.0",
512 )
513 def __init__(
514 self,
515 host: Optional[str] = None,
516 port: int = 6379,
517 startup_nodes: Optional[List["ClusterNode"]] = None,
518 cluster_error_retry_attempts: int = 3,
519 retry: Optional["Retry"] = None,
520 require_full_coverage: bool = True,
521 reinitialize_steps: int = 5,
522 read_from_replicas: bool = False,
523 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
524 dynamic_startup_nodes: bool = True,
525 url: Optional[str] = None,
526 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
527 cache: Optional[CacheInterface] = None,
528 cache_config: Optional[CacheConfig] = None,
529 event_dispatcher: Optional[EventDispatcher] = None,
530 **kwargs,
531 ):
532 """
533 Initialize a new RedisCluster client.
535 :param startup_nodes:
536 List of nodes from which initial bootstrapping can be done
537 :param host:
538 Can be used to point to a startup node
539 :param port:
540 Can be used to point to a startup node
541 :param require_full_coverage:
542 When set to False (default value): the client will not require a
543 full coverage of the slots. However, if not all slots are covered,
544 and at least one node has 'cluster-require-full-coverage' set to
545 'yes,' the server will throw a ClusterDownError for some key-based
546 commands. See -
547 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
548 When set to True: all slots must be covered to construct the
549 cluster client. If not all slots are covered, RedisClusterException
550 will be thrown.
551 :param read_from_replicas:
552 @deprecated - please use load_balancing_strategy instead
553 Enable read from replicas in READONLY mode. You can read possibly
554 stale data.
555 When set to true, read commands will be assigned between the
556 primary and its replications in a Round-Robin manner.
557 :param load_balancing_strategy:
558 Enable read from replicas in READONLY mode and defines the load balancing
559 strategy that will be used for cluster node selection.
560 The data read from replicas is eventually consistent with the data in primary nodes.
561 :param dynamic_startup_nodes:
562 Set the RedisCluster's startup nodes to all of the discovered nodes.
563 If true (default value), the cluster's discovered nodes will be used to
564 determine the cluster nodes-slots mapping in the next topology refresh.
565 It will remove the initial passed startup nodes if their endpoints aren't
566 listed in the CLUSTER SLOTS output.
567 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
568 specific IP addresses, it is best to set it to false.
569 :param cluster_error_retry_attempts:
570 @deprecated - Please configure the 'retry' object instead
571 In case 'retry' object is set - this argument is ignored!
573 Number of times to retry before raising an error when
574 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
575 :class:`~.ClusterDownError` are encountered
576 :param retry:
577 A retry object that defines the retry strategy and the number of
578 retries for the cluster client.
579 In current implementation for the cluster client (starting form redis-py version 6.0.0)
580 the retry object is not yet fully utilized, instead it is used just to determine
581 the number of retries for the cluster client.
582 In the future releases the retry object will be used to handle the cluster client retries!
583 :param reinitialize_steps:
584 Specifies the number of MOVED errors that need to occur before
585 reinitializing the whole cluster topology. If a MOVED error occurs
586 and the cluster does not need to be reinitialized on this current
587 error handling, only the MOVED slot will be patched with the
588 redirected node.
589 To reinitialize the cluster on every MOVED error, set
590 reinitialize_steps to 1.
591 To avoid reinitializing the cluster on moved errors, set
592 reinitialize_steps to 0.
593 :param address_remap:
594 An optional callable which, when provided with an internal network
595 address of a node, e.g. a `(host, port)` tuple, will return the address
596 where the node is reachable. This can be used to map the addresses at
597 which the nodes _think_ they are, to addresses at which a client may
598 reach them, such as when they sit behind a proxy.
600 :**kwargs:
601 Extra arguments that will be sent into Redis instance when created
602 (See Official redis-py doc for supported kwargs - the only limitation
603 is that you can't provide 'retry' object as part of kwargs.
604 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
605 Some kwargs are not supported and will raise a
606 RedisClusterException:
607 - db (Redis do not support database SELECT in cluster mode)
608 """
609 if startup_nodes is None:
610 startup_nodes = []
612 if "db" in kwargs:
613 # Argument 'db' is not possible to use in cluster mode
614 raise RedisClusterException(
615 "Argument 'db' is not possible to use in cluster mode"
616 )
618 if "retry" in kwargs:
619 # Argument 'retry' is not possible to be used in kwargs when in cluster mode
620 # the kwargs are set to the lower level connections to the cluster nodes
621 # and there we provide retry configuration without retries allowed.
622 # The retries should be handled on cluster client level.
623 raise RedisClusterException(
624 "The 'retry' argument cannot be used in kwargs when running in cluster mode."
625 )
627 # Get the startup node/s
628 from_url = False
629 if url is not None:
630 from_url = True
631 url_options = parse_url(url)
632 if "path" in url_options:
633 raise RedisClusterException(
634 "RedisCluster does not currently support Unix Domain "
635 "Socket connections"
636 )
637 if "db" in url_options and url_options["db"] != 0:
638 # Argument 'db' is not possible to use in cluster mode
639 raise RedisClusterException(
640 "A ``db`` querystring option can only be 0 in cluster mode"
641 )
642 kwargs.update(url_options)
643 host = kwargs.get("host")
644 port = kwargs.get("port", port)
645 startup_nodes.append(ClusterNode(host, port))
646 elif host is not None and port is not None:
647 startup_nodes.append(ClusterNode(host, port))
648 elif len(startup_nodes) == 0:
649 # No startup node was provided
650 raise RedisClusterException(
651 "RedisCluster requires at least one node to discover the "
652 "cluster. Please provide one of the followings:\n"
653 "1. host and port, for example:\n"
654 " RedisCluster(host='localhost', port=6379)\n"
655 "2. list of startup nodes, for example:\n"
656 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
657 " ClusterNode('localhost', 6378)])"
658 )
659 # Update the connection arguments
660 # Whenever a new connection is established, RedisCluster's on_connect
661 # method should be run
662 # If the user passed on_connect function we'll save it and run it
663 # inside the RedisCluster.on_connect() function
664 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
665 kwargs.update({"redis_connect_func": self.on_connect})
666 kwargs = cleanup_kwargs(**kwargs)
667 if retry:
668 self.retry = retry
669 else:
670 self.retry = Retry(
671 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
672 retries=cluster_error_retry_attempts,
673 )
675 self.encoder = Encoder(
676 kwargs.get("encoding", "utf-8"),
677 kwargs.get("encoding_errors", "strict"),
678 kwargs.get("decode_responses", False),
679 )
680 protocol = kwargs.get("protocol", None)
681 if (cache_config or cache) and protocol not in [3, "3"]:
682 raise RedisError("Client caching is only supported with RESP version 3")
684 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
685 self.node_flags = self.__class__.NODE_FLAGS.copy()
686 self.read_from_replicas = read_from_replicas
687 self.load_balancing_strategy = load_balancing_strategy
688 self.reinitialize_counter = 0
689 self.reinitialize_steps = reinitialize_steps
690 if event_dispatcher is None:
691 self._event_dispatcher = EventDispatcher()
692 else:
693 self._event_dispatcher = event_dispatcher
694 self.nodes_manager = NodesManager(
695 startup_nodes=startup_nodes,
696 from_url=from_url,
697 require_full_coverage=require_full_coverage,
698 dynamic_startup_nodes=dynamic_startup_nodes,
699 address_remap=address_remap,
700 cache=cache,
701 cache_config=cache_config,
702 event_dispatcher=self._event_dispatcher,
703 **kwargs,
704 )
706 self.cluster_response_callbacks = CaseInsensitiveDict(
707 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
708 )
709 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
711 self.commands_parser = CommandsParser(self)
712 self._lock = threading.RLock()
714 def __enter__(self):
715 return self
717 def __exit__(self, exc_type, exc_value, traceback):
718 self.close()
720 def __del__(self):
721 try:
722 self.close()
723 except Exception:
724 pass
726 def disconnect_connection_pools(self):
727 for node in self.get_nodes():
728 if node.redis_connection:
729 try:
730 node.redis_connection.connection_pool.disconnect()
731 except OSError:
732 # Client was already disconnected. do nothing
733 pass
735 def on_connect(self, connection):
736 """
737 Initialize the connection, authenticate and select a database and send
738 READONLY if it is set during object initialization.
739 """
740 connection.on_connect()
742 if self.read_from_replicas or self.load_balancing_strategy:
743 # Sending READONLY command to server to configure connection as
744 # readonly. Since each cluster node may change its server type due
745 # to a failover, we should establish a READONLY connection
746 # regardless of the server type. If this is a primary connection,
747 # READONLY would not affect executing write commands.
748 connection.send_command("READONLY")
749 if str_if_bytes(connection.read_response()) != "OK":
750 raise ConnectionError("READONLY command failed")
752 if self.user_on_connect_func is not None:
753 self.user_on_connect_func(connection)
755 def get_redis_connection(self, node: "ClusterNode") -> Redis:
756 if not node.redis_connection:
757 with self._lock:
758 if not node.redis_connection:
759 self.nodes_manager.create_redis_connections([node])
760 return node.redis_connection
762 def get_node(self, host=None, port=None, node_name=None):
763 return self.nodes_manager.get_node(host, port, node_name)
765 def get_primaries(self):
766 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
768 def get_replicas(self):
769 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
771 def get_random_node(self):
772 return random.choice(list(self.nodes_manager.nodes_cache.values()))
774 def get_nodes(self):
775 return list(self.nodes_manager.nodes_cache.values())
777 def get_node_from_key(self, key, replica=False):
778 """
779 Get the node that holds the key's slot.
780 If replica set to True but the slot doesn't have any replicas, None is
781 returned.
782 """
783 slot = self.keyslot(key)
784 slot_cache = self.nodes_manager.slots_cache.get(slot)
785 if slot_cache is None or len(slot_cache) == 0:
786 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
787 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
788 return None
789 elif replica:
790 node_idx = 1
791 else:
792 # primary
793 node_idx = 0
795 return slot_cache[node_idx]
797 def get_default_node(self):
798 """
799 Get the cluster's default node
800 """
801 return self.nodes_manager.default_node
803 def set_default_node(self, node):
804 """
805 Set the default node of the cluster.
806 :param node: 'ClusterNode'
807 :return True if the default node was set, else False
808 """
809 if node is None or self.get_node(node_name=node.name) is None:
810 return False
811 self.nodes_manager.default_node = node
812 return True
814 def set_retry(self, retry: Retry) -> None:
815 self.retry = retry
817 def monitor(self, target_node=None):
818 """
819 Returns a Monitor object for the specified target node.
820 The default cluster node will be selected if no target node was
821 specified.
822 Monitor is useful for handling the MONITOR command to the redis server.
823 next_command() method returns one command from monitor
824 listen() method yields commands from monitor.
825 """
826 if target_node is None:
827 target_node = self.get_default_node()
828 if target_node.redis_connection is None:
829 raise RedisClusterException(
830 f"Cluster Node {target_node.name} has no redis_connection"
831 )
832 return target_node.redis_connection.monitor()
834 def pubsub(self, node=None, host=None, port=None, **kwargs):
835 """
836 Allows passing a ClusterNode, or host&port, to get a pubsub instance
837 connected to the specified node
838 """
839 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
841 def pipeline(self, transaction=None, shard_hint=None):
842 """
843 Cluster impl:
844 Pipelines do not work in cluster mode the same way they
845 do in normal mode. Create a clone of this object so
846 that simulating pipelines will work correctly. Each
847 command will be called directly when used and
848 when calling execute() will only return the result stack.
849 """
850 if shard_hint:
851 raise RedisClusterException("shard_hint is deprecated in cluster mode")
853 return ClusterPipeline(
854 nodes_manager=self.nodes_manager,
855 commands_parser=self.commands_parser,
856 startup_nodes=self.nodes_manager.startup_nodes,
857 result_callbacks=self.result_callbacks,
858 cluster_response_callbacks=self.cluster_response_callbacks,
859 read_from_replicas=self.read_from_replicas,
860 load_balancing_strategy=self.load_balancing_strategy,
861 reinitialize_steps=self.reinitialize_steps,
862 retry=self.retry,
863 lock=self._lock,
864 transaction=transaction,
865 )
867 def lock(
868 self,
869 name,
870 timeout=None,
871 sleep=0.1,
872 blocking=True,
873 blocking_timeout=None,
874 lock_class=None,
875 thread_local=True,
876 raise_on_release_error: bool = True,
877 ):
878 """
879 Return a new Lock object using key ``name`` that mimics
880 the behavior of threading.Lock.
882 If specified, ``timeout`` indicates a maximum life for the lock.
883 By default, it will remain locked until release() is called.
885 ``sleep`` indicates the amount of time to sleep per loop iteration
886 when the lock is in blocking mode and another client is currently
887 holding the lock.
889 ``blocking`` indicates whether calling ``acquire`` should block until
890 the lock has been acquired or to fail immediately, causing ``acquire``
891 to return False and the lock not being acquired. Defaults to True.
892 Note this value can be overridden by passing a ``blocking``
893 argument to ``acquire``.
895 ``blocking_timeout`` indicates the maximum amount of time in seconds to
896 spend trying to acquire the lock. A value of ``None`` indicates
897 continue trying forever. ``blocking_timeout`` can be specified as a
898 float or integer, both representing the number of seconds to wait.
900 ``lock_class`` forces the specified lock implementation. Note that as
901 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
902 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
903 you have created your own custom lock class.
905 ``thread_local`` indicates whether the lock token is placed in
906 thread-local storage. By default, the token is placed in thread local
907 storage so that a thread only sees its token, not a token set by
908 another thread. Consider the following timeline:
910 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
911 thread-1 sets the token to "abc"
912 time: 1, thread-2 blocks trying to acquire `my-lock` using the
913 Lock instance.
914 time: 5, thread-1 has not yet completed. redis expires the lock
915 key.
916 time: 5, thread-2 acquired `my-lock` now that it's available.
917 thread-2 sets the token to "xyz"
918 time: 6, thread-1 finishes its work and calls release(). if the
919 token is *not* stored in thread local storage, then
920 thread-1 would see the token value as "xyz" and would be
921 able to successfully release the thread-2's lock.
923 ``raise_on_release_error`` indicates whether to raise an exception when
924 the lock is no longer owned when exiting the context manager. By default,
925 this is True, meaning an exception will be raised. If False, the warning
926 will be logged and the exception will be suppressed.
928 In some use cases it's necessary to disable thread local storage. For
929 example, if you have code where one thread acquires a lock and passes
930 that lock instance to a worker thread to release later. If thread
931 local storage isn't disabled in this case, the worker thread won't see
932 the token set by the thread that acquired the lock. Our assumption
933 is that these cases aren't common and as such default to using
934 thread local storage."""
935 if lock_class is None:
936 lock_class = Lock
937 return lock_class(
938 self,
939 name,
940 timeout=timeout,
941 sleep=sleep,
942 blocking=blocking,
943 blocking_timeout=blocking_timeout,
944 thread_local=thread_local,
945 raise_on_release_error=raise_on_release_error,
946 )
948 def set_response_callback(self, command, callback):
949 """Set a custom Response Callback"""
950 self.cluster_response_callbacks[command] = callback
952 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
953 # Determine which nodes should be executed the command on.
954 # Returns a list of target nodes.
955 command = args[0].upper()
956 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
957 command = f"{args[0]} {args[1]}".upper()
959 nodes_flag = kwargs.pop("nodes_flag", None)
960 if nodes_flag is not None:
961 # nodes flag passed by the user
962 command_flag = nodes_flag
963 else:
964 # get the nodes group for this command if it was predefined
965 command_flag = self.command_flags.get(command)
966 if command_flag == self.__class__.RANDOM:
967 # return a random node
968 return [self.get_random_node()]
969 elif command_flag == self.__class__.PRIMARIES:
970 # return all primaries
971 return self.get_primaries()
972 elif command_flag == self.__class__.REPLICAS:
973 # return all replicas
974 return self.get_replicas()
975 elif command_flag == self.__class__.ALL_NODES:
976 # return all nodes
977 return self.get_nodes()
978 elif command_flag == self.__class__.DEFAULT_NODE:
979 # return the cluster's default node
980 return [self.nodes_manager.default_node]
981 elif command in self.__class__.SEARCH_COMMANDS[0]:
982 return [self.nodes_manager.default_node]
983 else:
984 # get the node that holds the key's slot
985 slot = self.determine_slot(*args)
986 node = self.nodes_manager.get_node_from_slot(
987 slot,
988 self.read_from_replicas and command in READ_COMMANDS,
989 self.load_balancing_strategy if command in READ_COMMANDS else None,
990 )
991 return [node]
993 def _should_reinitialized(self):
994 # To reinitialize the cluster on every MOVED error,
995 # set reinitialize_steps to 1.
996 # To avoid reinitializing the cluster on moved errors, set
997 # reinitialize_steps to 0.
998 if self.reinitialize_steps == 0:
999 return False
1000 else:
1001 return self.reinitialize_counter % self.reinitialize_steps == 0
1003 def keyslot(self, key):
1004 """
1005 Calculate keyslot for a given key.
1006 See Keys distribution model in https://redis.io/topics/cluster-spec
1007 """
1008 k = self.encoder.encode(key)
1009 return key_slot(k)
1011 def _get_command_keys(self, *args):
1012 """
1013 Get the keys in the command. If the command has no keys in in, None is
1014 returned.
1016 NOTE: Due to a bug in redis<7.0, this function does not work properly
1017 for EVAL or EVALSHA when the `numkeys` arg is 0.
1018 - issue: https://github.com/redis/redis/issues/9493
1019 - fix: https://github.com/redis/redis/pull/9733
1021 So, don't use this function with EVAL or EVALSHA.
1022 """
1023 redis_conn = self.get_default_node().redis_connection
1024 return self.commands_parser.get_keys(redis_conn, *args)
1026 def determine_slot(self, *args) -> int:
1027 """
1028 Figure out what slot to use based on args.
1030 Raises a RedisClusterException if there's a missing key and we can't
1031 determine what slots to map the command to; or, if the keys don't
1032 all map to the same key slot.
1033 """
1034 command = args[0]
1035 if self.command_flags.get(command) == SLOT_ID:
1036 # The command contains the slot ID
1037 return args[1]
1039 # Get the keys in the command
1041 # EVAL and EVALSHA are common enough that it's wasteful to go to the
1042 # redis server to parse the keys. Besides, there is a bug in redis<7.0
1043 # where `self._get_command_keys()` fails anyway. So, we special case
1044 # EVAL/EVALSHA.
1045 if command.upper() in ("EVAL", "EVALSHA"):
1046 # command syntax: EVAL "script body" num_keys ...
1047 if len(args) <= 2:
1048 raise RedisClusterException(f"Invalid args in command: {args}")
1049 num_actual_keys = int(args[2])
1050 eval_keys = args[3 : 3 + num_actual_keys]
1051 # if there are 0 keys, that means the script can be run on any node
1052 # so we can just return a random slot
1053 if len(eval_keys) == 0:
1054 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1055 keys = eval_keys
1056 else:
1057 keys = self._get_command_keys(*args)
1058 if keys is None or len(keys) == 0:
1059 # FCALL can call a function with 0 keys, that means the function
1060 # can be run on any node so we can just return a random slot
1061 if command.upper() in ("FCALL", "FCALL_RO"):
1062 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1063 raise RedisClusterException(
1064 "No way to dispatch this command to Redis Cluster. "
1065 "Missing key.\nYou can execute the command by specifying "
1066 f"target nodes.\nCommand: {args}"
1067 )
1069 # single key command
1070 if len(keys) == 1:
1071 return self.keyslot(keys[0])
1073 # multi-key command; we need to make sure all keys are mapped to
1074 # the same slot
1075 slots = {self.keyslot(key) for key in keys}
1076 if len(slots) != 1:
1077 raise RedisClusterException(
1078 f"{command} - all keys must map to the same key slot"
1079 )
1081 return slots.pop()
1083 def get_encoder(self):
1084 """
1085 Get the connections' encoder
1086 """
1087 return self.encoder
1089 def get_connection_kwargs(self):
1090 """
1091 Get the connections' key-word arguments
1092 """
1093 return self.nodes_manager.connection_kwargs
1095 def _is_nodes_flag(self, target_nodes):
1096 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1098 def _parse_target_nodes(self, target_nodes):
1099 if isinstance(target_nodes, list):
1100 nodes = target_nodes
1101 elif isinstance(target_nodes, ClusterNode):
1102 # Supports passing a single ClusterNode as a variable
1103 nodes = [target_nodes]
1104 elif isinstance(target_nodes, dict):
1105 # Supports dictionaries of the format {node_name: node}.
1106 # It enables to execute commands with multi nodes as follows:
1107 # rc.cluster_save_config(rc.get_primaries())
1108 nodes = target_nodes.values()
1109 else:
1110 raise TypeError(
1111 "target_nodes type can be one of the following: "
1112 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1113 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1114 f"The passed type is {type(target_nodes)}"
1115 )
1116 return nodes
1118 def execute_command(self, *args, **kwargs):
1119 return self._internal_execute_command(*args, **kwargs)
1121 def _internal_execute_command(self, *args, **kwargs):
1122 """
1123 Wrapper for ERRORS_ALLOW_RETRY error handling.
1125 It will try the number of times specified by the retries property from
1126 config option "self.retry" which defaults to 3 unless manually
1127 configured.
1129 If it reaches the number of times, the command will raise the exception
1131 Key argument :target_nodes: can be passed with the following types:
1132 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1133 ClusterNode
1134 list<ClusterNode>
1135 dict<Any, ClusterNode>
1136 """
1137 target_nodes_specified = False
1138 is_default_node = False
1139 target_nodes = None
1140 passed_targets = kwargs.pop("target_nodes", None)
1141 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1142 target_nodes = self._parse_target_nodes(passed_targets)
1143 target_nodes_specified = True
1144 # If an error that allows retrying was thrown, the nodes and slots
1145 # cache were reinitialized. We will retry executing the command with
1146 # the updated cluster setup only when the target nodes can be
1147 # determined again with the new cache tables. Therefore, when target
1148 # nodes were passed to this function, we cannot retry the command
1149 # execution since the nodes may not be valid anymore after the tables
1150 # were reinitialized. So in case of passed target nodes,
1151 # retry_attempts will be set to 0.
1152 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
1153 # Add one for the first execution
1154 execute_attempts = 1 + retry_attempts
1155 for _ in range(execute_attempts):
1156 try:
1157 res = {}
1158 if not target_nodes_specified:
1159 # Determine the nodes to execute the command on
1160 target_nodes = self._determine_nodes(
1161 *args, **kwargs, nodes_flag=passed_targets
1162 )
1163 if not target_nodes:
1164 raise RedisClusterException(
1165 f"No targets were found to execute {args} command on"
1166 )
1167 if (
1168 len(target_nodes) == 1
1169 and target_nodes[0] == self.get_default_node()
1170 ):
1171 is_default_node = True
1172 for node in target_nodes:
1173 res[node.name] = self._execute_command(node, *args, **kwargs)
1174 # Return the processed result
1175 return self._process_result(args[0], res, **kwargs)
1176 except Exception as e:
1177 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1178 if is_default_node:
1179 # Replace the default cluster node
1180 self.replace_default_node()
1181 # The nodes and slots cache were reinitialized.
1182 # Try again with the new cluster setup.
1183 retry_attempts -= 1
1184 continue
1185 else:
1186 # raise the exception
1187 raise e
1189 def _execute_command(self, target_node, *args, **kwargs):
1190 """
1191 Send a command to a node in the cluster
1192 """
1193 command = args[0]
1194 redis_node = None
1195 connection = None
1196 redirect_addr = None
1197 asking = False
1198 moved = False
1199 ttl = int(self.RedisClusterRequestTTL)
1201 while ttl > 0:
1202 ttl -= 1
1203 try:
1204 if asking:
1205 target_node = self.get_node(node_name=redirect_addr)
1206 elif moved:
1207 # MOVED occurred and the slots cache was updated,
1208 # refresh the target node
1209 slot = self.determine_slot(*args)
1210 target_node = self.nodes_manager.get_node_from_slot(
1211 slot,
1212 self.read_from_replicas and command in READ_COMMANDS,
1213 self.load_balancing_strategy
1214 if command in READ_COMMANDS
1215 else None,
1216 )
1217 moved = False
1219 redis_node = self.get_redis_connection(target_node)
1220 connection = get_connection(redis_node)
1221 if asking:
1222 connection.send_command("ASKING")
1223 redis_node.parse_response(connection, "ASKING", **kwargs)
1224 asking = False
1225 connection.send_command(*args, **kwargs)
1226 response = redis_node.parse_response(connection, command, **kwargs)
1228 # Remove keys entry, it needs only for cache.
1229 kwargs.pop("keys", None)
1231 if command in self.cluster_response_callbacks:
1232 response = self.cluster_response_callbacks[command](
1233 response, **kwargs
1234 )
1235 return response
1236 except AuthenticationError:
1237 raise
1238 except (ConnectionError, TimeoutError) as e:
1239 # ConnectionError can also be raised if we couldn't get a
1240 # connection from the pool before timing out, so check that
1241 # this is an actual connection before attempting to disconnect.
1242 if connection is not None:
1243 connection.disconnect()
1245 # Remove the failed node from the startup nodes before we try
1246 # to reinitialize the cluster
1247 self.nodes_manager.startup_nodes.pop(target_node.name, None)
1248 # Reset the cluster node's connection
1249 target_node.redis_connection = None
1250 self.nodes_manager.initialize()
1251 raise e
1252 except MovedError as e:
1253 # First, we will try to patch the slots/nodes cache with the
1254 # redirected node output and try again. If MovedError exceeds
1255 # 'reinitialize_steps' number of times, we will force
1256 # reinitializing the tables, and then try again.
1257 # 'reinitialize_steps' counter will increase faster when
1258 # the same client object is shared between multiple threads. To
1259 # reduce the frequency you can set this variable in the
1260 # RedisCluster constructor.
1261 self.reinitialize_counter += 1
1262 if self._should_reinitialized():
1263 self.nodes_manager.initialize()
1264 # Reset the counter
1265 self.reinitialize_counter = 0
1266 else:
1267 self.nodes_manager.update_moved_exception(e)
1268 moved = True
1269 except TryAgainError:
1270 if ttl < self.RedisClusterRequestTTL / 2:
1271 time.sleep(0.05)
1272 except AskError as e:
1273 redirect_addr = get_node_name(host=e.host, port=e.port)
1274 asking = True
1275 except (ClusterDownError, SlotNotCoveredError):
1276 # ClusterDownError can occur during a failover and to get
1277 # self-healed, we will try to reinitialize the cluster layout
1278 # and retry executing the command
1280 # SlotNotCoveredError can occur when the cluster is not fully
1281 # initialized or can be temporary issue.
1282 # We will try to reinitialize the cluster topology
1283 # and retry executing the command
1285 time.sleep(0.25)
1286 self.nodes_manager.initialize()
1287 raise
1288 except ResponseError:
1289 raise
1290 except Exception as e:
1291 if connection:
1292 connection.disconnect()
1293 raise e
1294 finally:
1295 if connection is not None:
1296 redis_node.connection_pool.release(connection)
1298 raise ClusterError("TTL exhausted.")
1300 def close(self) -> None:
1301 try:
1302 with self._lock:
1303 if self.nodes_manager:
1304 self.nodes_manager.close()
1305 except AttributeError:
1306 # RedisCluster's __init__ can fail before nodes_manager is set
1307 pass
1309 def _process_result(self, command, res, **kwargs):
1310 """
1311 Process the result of the executed command.
1312 The function would return a dict or a single value.
1314 :type command: str
1315 :type res: dict
1317 `res` should be in the following format:
1318 Dict<node_name, command_result>
1319 """
1320 if command in self.result_callbacks:
1321 return self.result_callbacks[command](command, res, **kwargs)
1322 elif len(res) == 1:
1323 # When we execute the command on a single node, we can
1324 # remove the dictionary and return a single response
1325 return list(res.values())[0]
1326 else:
1327 return res
1329 def load_external_module(self, funcname, func):
1330 """
1331 This function can be used to add externally defined redis modules,
1332 and their namespaces to the redis client.
1334 ``funcname`` - A string containing the name of the function to create
1335 ``func`` - The function, being added to this class.
1336 """
1337 setattr(self, funcname, func)
1339 def transaction(self, func, *watches, **kwargs):
1340 """
1341 Convenience method for executing the callable `func` as a transaction
1342 while watching all keys specified in `watches`. The 'func' callable
1343 should expect a single argument which is a Pipeline object.
1344 """
1345 shard_hint = kwargs.pop("shard_hint", None)
1346 value_from_callable = kwargs.pop("value_from_callable", False)
1347 watch_delay = kwargs.pop("watch_delay", None)
1348 with self.pipeline(True, shard_hint) as pipe:
1349 while True:
1350 try:
1351 if watches:
1352 pipe.watch(*watches)
1353 func_value = func(pipe)
1354 exec_value = pipe.execute()
1355 return func_value if value_from_callable else exec_value
1356 except WatchError:
1357 if watch_delay is not None and watch_delay > 0:
1358 time.sleep(watch_delay)
1359 continue
1362class ClusterNode:
1363 def __init__(self, host, port, server_type=None, redis_connection=None):
1364 if host == "localhost":
1365 host = socket.gethostbyname(host)
1367 self.host = host
1368 self.port = port
1369 self.name = get_node_name(host, port)
1370 self.server_type = server_type
1371 self.redis_connection = redis_connection
1373 def __repr__(self):
1374 return (
1375 f"[host={self.host},"
1376 f"port={self.port},"
1377 f"name={self.name},"
1378 f"server_type={self.server_type},"
1379 f"redis_connection={self.redis_connection}]"
1380 )
1382 def __eq__(self, obj):
1383 return isinstance(obj, ClusterNode) and obj.name == self.name
1385 def __del__(self):
1386 try:
1387 if self.redis_connection is not None:
1388 self.redis_connection.close()
1389 except Exception:
1390 # Ignore errors when closing the connection
1391 pass
1394class LoadBalancingStrategy(Enum):
1395 ROUND_ROBIN = "round_robin"
1396 ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1397 RANDOM_REPLICA = "random_replica"
1400class LoadBalancer:
1401 """
1402 Round-Robin Load Balancing
1403 """
1405 def __init__(self, start_index: int = 0) -> None:
1406 self.primary_to_idx = {}
1407 self.start_index = start_index
1409 def get_server_index(
1410 self,
1411 primary: str,
1412 list_size: int,
1413 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1414 ) -> int:
1415 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1416 return self._get_random_replica_index(list_size)
1417 else:
1418 return self._get_round_robin_index(
1419 primary,
1420 list_size,
1421 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1422 )
1424 def reset(self) -> None:
1425 self.primary_to_idx.clear()
1427 def _get_random_replica_index(self, list_size: int) -> int:
1428 return random.randint(1, list_size - 1)
1430 def _get_round_robin_index(
1431 self, primary: str, list_size: int, replicas_only: bool
1432 ) -> int:
1433 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1434 if replicas_only and server_index == 0:
1435 # skip the primary node index
1436 server_index = 1
1437 # Update the index for the next round
1438 self.primary_to_idx[primary] = (server_index + 1) % list_size
1439 return server_index
1442class NodesManager:
1443 def __init__(
1444 self,
1445 startup_nodes,
1446 from_url=False,
1447 require_full_coverage=False,
1448 lock=None,
1449 dynamic_startup_nodes=True,
1450 connection_pool_class=ConnectionPool,
1451 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
1452 cache: Optional[CacheInterface] = None,
1453 cache_config: Optional[CacheConfig] = None,
1454 cache_factory: Optional[CacheFactoryInterface] = None,
1455 event_dispatcher: Optional[EventDispatcher] = None,
1456 **kwargs,
1457 ):
1458 self.nodes_cache: Dict[str, Redis] = {}
1459 self.slots_cache = {}
1460 self.startup_nodes = {}
1461 self.default_node = None
1462 self.populate_startup_nodes(startup_nodes)
1463 self.from_url = from_url
1464 self._require_full_coverage = require_full_coverage
1465 self._dynamic_startup_nodes = dynamic_startup_nodes
1466 self.connection_pool_class = connection_pool_class
1467 self.address_remap = address_remap
1468 self._cache = cache
1469 self._cache_config = cache_config
1470 self._cache_factory = cache_factory
1471 self._moved_exception = None
1472 self.connection_kwargs = kwargs
1473 self.read_load_balancer = LoadBalancer()
1474 if lock is None:
1475 lock = threading.RLock()
1476 self._lock = lock
1477 if event_dispatcher is None:
1478 self._event_dispatcher = EventDispatcher()
1479 else:
1480 self._event_dispatcher = event_dispatcher
1481 self._credential_provider = self.connection_kwargs.get(
1482 "credential_provider", None
1483 )
1484 self.initialize()
1486 def get_node(self, host=None, port=None, node_name=None):
1487 """
1488 Get the requested node from the cluster's nodes.
1489 nodes.
1490 :return: ClusterNode if the node exists, else None
1491 """
1492 if host and port:
1493 # the user passed host and port
1494 if host == "localhost":
1495 host = socket.gethostbyname(host)
1496 return self.nodes_cache.get(get_node_name(host=host, port=port))
1497 elif node_name:
1498 return self.nodes_cache.get(node_name)
1499 else:
1500 return None
1502 def update_moved_exception(self, exception):
1503 self._moved_exception = exception
1505 def _update_moved_slots(self):
1506 """
1507 Update the slot's node with the redirected one
1508 """
1509 e = self._moved_exception
1510 redirected_node = self.get_node(host=e.host, port=e.port)
1511 if redirected_node is not None:
1512 # The node already exists
1513 if redirected_node.server_type is not PRIMARY:
1514 # Update the node's server type
1515 redirected_node.server_type = PRIMARY
1516 else:
1517 # This is a new node, we will add it to the nodes cache
1518 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
1519 self.nodes_cache[redirected_node.name] = redirected_node
1520 if redirected_node in self.slots_cache[e.slot_id]:
1521 # The MOVED error resulted from a failover, and the new slot owner
1522 # had previously been a replica.
1523 old_primary = self.slots_cache[e.slot_id][0]
1524 # Update the old primary to be a replica and add it to the end of
1525 # the slot's node list
1526 old_primary.server_type = REPLICA
1527 self.slots_cache[e.slot_id].append(old_primary)
1528 # Remove the old replica, which is now a primary, from the slot's
1529 # node list
1530 self.slots_cache[e.slot_id].remove(redirected_node)
1531 # Override the old primary with the new one
1532 self.slots_cache[e.slot_id][0] = redirected_node
1533 if self.default_node == old_primary:
1534 # Update the default node with the new primary
1535 self.default_node = redirected_node
1536 else:
1537 # The new slot owner is a new server, or a server from a different
1538 # shard. We need to remove all current nodes from the slot's list
1539 # (including replications) and add just the new node.
1540 self.slots_cache[e.slot_id] = [redirected_node]
1541 # Reset moved_exception
1542 self._moved_exception = None
1544 @deprecated_args(
1545 args_to_warn=["server_type"],
1546 reason=(
1547 "In case you need select some load balancing strategy "
1548 "that will use replicas, please set it through 'load_balancing_strategy'"
1549 ),
1550 version="5.3.0",
1551 )
1552 def get_node_from_slot(
1553 self,
1554 slot,
1555 read_from_replicas=False,
1556 load_balancing_strategy=None,
1557 server_type=None,
1558 ) -> ClusterNode:
1559 """
1560 Gets a node that servers this hash slot
1561 """
1562 if self._moved_exception:
1563 with self._lock:
1564 if self._moved_exception:
1565 self._update_moved_slots()
1567 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
1568 raise SlotNotCoveredError(
1569 f'Slot "{slot}" not covered by the cluster. '
1570 f'"require_full_coverage={self._require_full_coverage}"'
1571 )
1573 if read_from_replicas is True and load_balancing_strategy is None:
1574 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1576 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1577 # get the server index using the strategy defined in load_balancing_strategy
1578 primary_name = self.slots_cache[slot][0].name
1579 node_idx = self.read_load_balancer.get_server_index(
1580 primary_name, len(self.slots_cache[slot]), load_balancing_strategy
1581 )
1582 elif (
1583 server_type is None
1584 or server_type == PRIMARY
1585 or len(self.slots_cache[slot]) == 1
1586 ):
1587 # return a primary
1588 node_idx = 0
1589 else:
1590 # return a replica
1591 # randomly choose one of the replicas
1592 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
1594 return self.slots_cache[slot][node_idx]
1596 def get_nodes_by_server_type(self, server_type):
1597 """
1598 Get all nodes with the specified server type
1599 :param server_type: 'primary' or 'replica'
1600 :return: list of ClusterNode
1601 """
1602 return [
1603 node
1604 for node in self.nodes_cache.values()
1605 if node.server_type == server_type
1606 ]
1608 def populate_startup_nodes(self, nodes):
1609 """
1610 Populate all startup nodes and filters out any duplicates
1611 """
1612 for n in nodes:
1613 self.startup_nodes[n.name] = n
1615 def check_slots_coverage(self, slots_cache):
1616 # Validate if all slots are covered or if we should try next
1617 # startup node
1618 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
1619 if i not in slots_cache:
1620 return False
1621 return True
1623 def create_redis_connections(self, nodes):
1624 """
1625 This function will create a redis connection to all nodes in :nodes:
1626 """
1627 connection_pools = []
1628 for node in nodes:
1629 if node.redis_connection is None:
1630 node.redis_connection = self.create_redis_node(
1631 host=node.host, port=node.port, **self.connection_kwargs
1632 )
1633 connection_pools.append(node.redis_connection.connection_pool)
1635 self._event_dispatcher.dispatch(
1636 AfterPooledConnectionsInstantiationEvent(
1637 connection_pools, ClientType.SYNC, self._credential_provider
1638 )
1639 )
1641 def create_redis_node(self, host, port, **kwargs):
1642 # We are configuring the connection pool not to retry
1643 # connections on lower level clients to avoid retrying
1644 # connections to nodes that are not reachable
1645 # and to avoid blocking the connection pool.
1646 # The only error that will have some handling in the lower
1647 # level clients is ConnectionError which will trigger disconnection
1648 # of the socket.
1649 # The retries will be handled on cluster client level
1650 # where we will have proper handling of the cluster topology
1651 node_retry_config = Retry(
1652 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
1653 )
1655 if self.from_url:
1656 # Create a redis node with a costumed connection pool
1657 kwargs.update({"host": host})
1658 kwargs.update({"port": port})
1659 kwargs.update({"cache": self._cache})
1660 kwargs.update({"retry": node_retry_config})
1661 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
1662 else:
1663 r = Redis(
1664 host=host,
1665 port=port,
1666 cache=self._cache,
1667 retry=node_retry_config,
1668 **kwargs,
1669 )
1670 return r
1672 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
1673 node_name = get_node_name(host, port)
1674 # check if we already have this node in the tmp_nodes_cache
1675 target_node = tmp_nodes_cache.get(node_name)
1676 if target_node is None:
1677 # before creating a new cluster node, check if the cluster node already
1678 # exists in the current nodes cache and has a valid connection so we can
1679 # reuse it
1680 target_node = self.nodes_cache.get(node_name)
1681 if target_node is None or target_node.redis_connection is None:
1682 # create new cluster node for this cluster
1683 target_node = ClusterNode(host, port, role)
1684 if target_node.server_type != role:
1685 target_node.server_type = role
1686 # add this node to the nodes cache
1687 tmp_nodes_cache[target_node.name] = target_node
1689 return target_node
1691 def initialize(self):
1692 """
1693 Initializes the nodes cache, slots cache and redis connections.
1694 :startup_nodes:
1695 Responsible for discovering other nodes in the cluster
1696 """
1697 self.reset()
1698 tmp_nodes_cache = {}
1699 tmp_slots = {}
1700 disagreements = []
1701 startup_nodes_reachable = False
1702 fully_covered = False
1703 kwargs = self.connection_kwargs
1704 exception = None
1705 # Convert to tuple to prevent RuntimeError if self.startup_nodes
1706 # is modified during iteration
1707 for startup_node in tuple(self.startup_nodes.values()):
1708 try:
1709 if startup_node.redis_connection:
1710 r = startup_node.redis_connection
1711 else:
1712 # Create a new Redis connection
1713 r = self.create_redis_node(
1714 startup_node.host, startup_node.port, **kwargs
1715 )
1716 self.startup_nodes[startup_node.name].redis_connection = r
1717 # Make sure cluster mode is enabled on this node
1718 try:
1719 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1720 r.connection_pool.disconnect()
1721 except ResponseError:
1722 raise RedisClusterException(
1723 "Cluster mode is not enabled on this node"
1724 )
1725 startup_nodes_reachable = True
1726 except Exception as e:
1727 # Try the next startup node.
1728 # The exception is saved and raised only if we have no more nodes.
1729 exception = e
1730 continue
1732 # CLUSTER SLOTS command results in the following output:
1733 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
1734 # where each node contains the following list: [IP, port, node_id]
1735 # Therefore, cluster_slots[0][2][0] will be the IP address of the
1736 # primary node of the first slot section.
1737 # If there's only one server in the cluster, its ``host`` is ''
1738 # Fix it to the host in startup_nodes
1739 if (
1740 len(cluster_slots) == 1
1741 and len(cluster_slots[0][2][0]) == 0
1742 and len(self.startup_nodes) == 1
1743 ):
1744 cluster_slots[0][2][0] = startup_node.host
1746 for slot in cluster_slots:
1747 primary_node = slot[2]
1748 host = str_if_bytes(primary_node[0])
1749 if host == "":
1750 host = startup_node.host
1751 port = int(primary_node[1])
1752 host, port = self.remap_host_port(host, port)
1754 nodes_for_slot = []
1756 target_node = self._get_or_create_cluster_node(
1757 host, port, PRIMARY, tmp_nodes_cache
1758 )
1759 nodes_for_slot.append(target_node)
1761 replica_nodes = slot[3:]
1762 for replica_node in replica_nodes:
1763 host = str_if_bytes(replica_node[0])
1764 port = int(replica_node[1])
1765 host, port = self.remap_host_port(host, port)
1766 target_replica_node = self._get_or_create_cluster_node(
1767 host, port, REPLICA, tmp_nodes_cache
1768 )
1769 nodes_for_slot.append(target_replica_node)
1771 for i in range(int(slot[0]), int(slot[1]) + 1):
1772 if i not in tmp_slots:
1773 tmp_slots[i] = nodes_for_slot
1774 else:
1775 # Validate that 2 nodes want to use the same slot cache
1776 # setup
1777 tmp_slot = tmp_slots[i][0]
1778 if tmp_slot.name != target_node.name:
1779 disagreements.append(
1780 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
1781 )
1783 if len(disagreements) > 5:
1784 raise RedisClusterException(
1785 f"startup_nodes could not agree on a valid "
1786 f"slots cache: {', '.join(disagreements)}"
1787 )
1789 fully_covered = self.check_slots_coverage(tmp_slots)
1790 if fully_covered:
1791 # Don't need to continue to the next startup node if all
1792 # slots are covered
1793 break
1795 if not startup_nodes_reachable:
1796 raise RedisClusterException(
1797 f"Redis Cluster cannot be connected. Please provide at least "
1798 f"one reachable node: {str(exception)}"
1799 ) from exception
1801 if self._cache is None and self._cache_config is not None:
1802 if self._cache_factory is None:
1803 self._cache = CacheFactory(self._cache_config).get_cache()
1804 else:
1805 self._cache = self._cache_factory.get_cache()
1807 # Create Redis connections to all nodes
1808 self.create_redis_connections(list(tmp_nodes_cache.values()))
1810 # Check if the slots are not fully covered
1811 if not fully_covered and self._require_full_coverage:
1812 # Despite the requirement that the slots be covered, there
1813 # isn't a full coverage
1814 raise RedisClusterException(
1815 f"All slots are not covered after query all startup_nodes. "
1816 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
1817 f"covered..."
1818 )
1820 # Set the tmp variables to the real variables
1821 self.nodes_cache = tmp_nodes_cache
1822 self.slots_cache = tmp_slots
1823 # Set the default node
1824 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
1825 if self._dynamic_startup_nodes:
1826 # Populate the startup nodes with all discovered nodes
1827 self.startup_nodes = tmp_nodes_cache
1828 # If initialize was called after a MovedError, clear it
1829 self._moved_exception = None
1831 def close(self) -> None:
1832 self.default_node = None
1833 for node in self.nodes_cache.values():
1834 if node.redis_connection:
1835 node.redis_connection.close()
1837 def reset(self):
1838 try:
1839 self.read_load_balancer.reset()
1840 except TypeError:
1841 # The read_load_balancer is None, do nothing
1842 pass
1844 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
1845 """
1846 Remap the host and port returned from the cluster to a different
1847 internal value. Useful if the client is not connecting directly
1848 to the cluster.
1849 """
1850 if self.address_remap:
1851 return self.address_remap((host, port))
1852 return host, port
1854 def find_connection_owner(self, connection: Connection) -> Optional[Redis]:
1855 node_name = get_node_name(connection.host, connection.port)
1856 for node in tuple(self.nodes_cache.values()):
1857 if node.redis_connection:
1858 conn_args = node.redis_connection.connection_pool.connection_kwargs
1859 if node_name == get_node_name(
1860 conn_args.get("host"), conn_args.get("port")
1861 ):
1862 return node
1865class ClusterPubSub(PubSub):
1866 """
1867 Wrapper for PubSub class.
1869 IMPORTANT: before using ClusterPubSub, read about the known limitations
1870 with pubsub in Cluster mode and learn how to workaround them:
1871 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
1872 """
1874 def __init__(
1875 self,
1876 redis_cluster,
1877 node=None,
1878 host=None,
1879 port=None,
1880 push_handler_func=None,
1881 event_dispatcher: Optional["EventDispatcher"] = None,
1882 **kwargs,
1883 ):
1884 """
1885 When a pubsub instance is created without specifying a node, a single
1886 node will be transparently chosen for the pubsub connection on the
1887 first command execution. The node will be determined by:
1888 1. Hashing the channel name in the request to find its keyslot
1889 2. Selecting a node that handles the keyslot: If read_from_replicas is
1890 set to true or load_balancing_strategy is set, a replica can be selected.
1892 :type redis_cluster: RedisCluster
1893 :type node: ClusterNode
1894 :type host: str
1895 :type port: int
1896 """
1897 self.node = None
1898 self.set_pubsub_node(redis_cluster, node, host, port)
1899 connection_pool = (
1900 None
1901 if self.node is None
1902 else redis_cluster.get_redis_connection(self.node).connection_pool
1903 )
1904 self.cluster = redis_cluster
1905 self.node_pubsub_mapping = {}
1906 self._pubsubs_generator = self._pubsubs_generator()
1907 if event_dispatcher is None:
1908 self._event_dispatcher = EventDispatcher()
1909 else:
1910 self._event_dispatcher = event_dispatcher
1911 super().__init__(
1912 connection_pool=connection_pool,
1913 encoder=redis_cluster.encoder,
1914 push_handler_func=push_handler_func,
1915 event_dispatcher=self._event_dispatcher,
1916 **kwargs,
1917 )
1919 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
1920 """
1921 The pubsub node will be set according to the passed node, host and port
1922 When none of the node, host, or port are specified - the node is set
1923 to None and will be determined by the keyslot of the channel in the
1924 first command to be executed.
1925 RedisClusterException will be thrown if the passed node does not exist
1926 in the cluster.
1927 If host is passed without port, or vice versa, a DataError will be
1928 thrown.
1929 :type cluster: RedisCluster
1930 :type node: ClusterNode
1931 :type host: str
1932 :type port: int
1933 """
1934 if node is not None:
1935 # node is passed by the user
1936 self._raise_on_invalid_node(cluster, node, node.host, node.port)
1937 pubsub_node = node
1938 elif host is not None and port is not None:
1939 # host and port passed by the user
1940 node = cluster.get_node(host=host, port=port)
1941 self._raise_on_invalid_node(cluster, node, host, port)
1942 pubsub_node = node
1943 elif any([host, port]) is True:
1944 # only 'host' or 'port' passed
1945 raise DataError("Passing a host requires passing a port, and vice versa")
1946 else:
1947 # nothing passed by the user. set node to None
1948 pubsub_node = None
1950 self.node = pubsub_node
1952 def get_pubsub_node(self):
1953 """
1954 Get the node that is being used as the pubsub connection
1955 """
1956 return self.node
1958 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
1959 """
1960 Raise a RedisClusterException if the node is None or doesn't exist in
1961 the cluster.
1962 """
1963 if node is None or redis_cluster.get_node(node_name=node.name) is None:
1964 raise RedisClusterException(
1965 f"Node {host}:{port} doesn't exist in the cluster"
1966 )
1968 def execute_command(self, *args):
1969 """
1970 Execute a subscribe/unsubscribe command.
1972 Taken code from redis-py and tweak to make it work within a cluster.
1973 """
1974 # NOTE: don't parse the response in this function -- it could pull a
1975 # legitimate message off the stack if the connection is already
1976 # subscribed to one or more channels
1978 if self.connection is None:
1979 if self.connection_pool is None:
1980 if len(args) > 1:
1981 # Hash the first channel and get one of the nodes holding
1982 # this slot
1983 channel = args[1]
1984 slot = self.cluster.keyslot(channel)
1985 node = self.cluster.nodes_manager.get_node_from_slot(
1986 slot,
1987 self.cluster.read_from_replicas,
1988 self.cluster.load_balancing_strategy,
1989 )
1990 else:
1991 # Get a random node
1992 node = self.cluster.get_random_node()
1993 self.node = node
1994 redis_connection = self.cluster.get_redis_connection(node)
1995 self.connection_pool = redis_connection.connection_pool
1996 self.connection = self.connection_pool.get_connection()
1997 # register a callback that re-subscribes to any channels we
1998 # were listening to when we were disconnected
1999 self.connection.register_connect_callback(self.on_connect)
2000 if self.push_handler_func is not None:
2001 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
2002 self._event_dispatcher.dispatch(
2003 AfterPubSubConnectionInstantiationEvent(
2004 self.connection, self.connection_pool, ClientType.SYNC, self._lock
2005 )
2006 )
2007 connection = self.connection
2008 self._execute(connection, connection.send_command, *args)
2010 def _get_node_pubsub(self, node):
2011 try:
2012 return self.node_pubsub_mapping[node.name]
2013 except KeyError:
2014 pubsub = node.redis_connection.pubsub(
2015 push_handler_func=self.push_handler_func
2016 )
2017 self.node_pubsub_mapping[node.name] = pubsub
2018 return pubsub
2020 def _sharded_message_generator(self):
2021 for _ in range(len(self.node_pubsub_mapping)):
2022 pubsub = next(self._pubsubs_generator)
2023 message = pubsub.get_message()
2024 if message is not None:
2025 return message
2026 return None
2028 def _pubsubs_generator(self):
2029 while True:
2030 yield from self.node_pubsub_mapping.values()
2032 def get_sharded_message(
2033 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
2034 ):
2035 if target_node:
2036 message = self.node_pubsub_mapping[target_node.name].get_message(
2037 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
2038 )
2039 else:
2040 message = self._sharded_message_generator()
2041 if message is None:
2042 return None
2043 elif str_if_bytes(message["type"]) == "sunsubscribe":
2044 if message["channel"] in self.pending_unsubscribe_shard_channels:
2045 self.pending_unsubscribe_shard_channels.remove(message["channel"])
2046 self.shard_channels.pop(message["channel"], None)
2047 node = self.cluster.get_node_from_key(message["channel"])
2048 if self.node_pubsub_mapping[node.name].subscribed is False:
2049 self.node_pubsub_mapping.pop(node.name)
2050 if not self.channels and not self.patterns and not self.shard_channels:
2051 # There are no subscriptions anymore, set subscribed_event flag
2052 # to false
2053 self.subscribed_event.clear()
2054 if self.ignore_subscribe_messages or ignore_subscribe_messages:
2055 return None
2056 return message
2058 def ssubscribe(self, *args, **kwargs):
2059 if args:
2060 args = list_or_args(args[0], args[1:])
2061 s_channels = dict.fromkeys(args)
2062 s_channels.update(kwargs)
2063 for s_channel, handler in s_channels.items():
2064 node = self.cluster.get_node_from_key(s_channel)
2065 pubsub = self._get_node_pubsub(node)
2066 if handler:
2067 pubsub.ssubscribe(**{s_channel: handler})
2068 else:
2069 pubsub.ssubscribe(s_channel)
2070 self.shard_channels.update(pubsub.shard_channels)
2071 self.pending_unsubscribe_shard_channels.difference_update(
2072 self._normalize_keys({s_channel: None})
2073 )
2074 if pubsub.subscribed and not self.subscribed:
2075 self.subscribed_event.set()
2076 self.health_check_response_counter = 0
2078 def sunsubscribe(self, *args):
2079 if args:
2080 args = list_or_args(args[0], args[1:])
2081 else:
2082 args = self.shard_channels
2084 for s_channel in args:
2085 node = self.cluster.get_node_from_key(s_channel)
2086 p = self._get_node_pubsub(node)
2087 p.sunsubscribe(s_channel)
2088 self.pending_unsubscribe_shard_channels.update(
2089 p.pending_unsubscribe_shard_channels
2090 )
2092 def get_redis_connection(self):
2093 """
2094 Get the Redis connection of the pubsub connected node.
2095 """
2096 if self.node is not None:
2097 return self.node.redis_connection
2099 def disconnect(self):
2100 """
2101 Disconnect the pubsub connection.
2102 """
2103 if self.connection:
2104 self.connection.disconnect()
2105 for pubsub in self.node_pubsub_mapping.values():
2106 pubsub.connection.disconnect()
2109class ClusterPipeline(RedisCluster):
2110 """
2111 Support for Redis pipeline
2112 in cluster mode
2113 """
2115 ERRORS_ALLOW_RETRY = (
2116 ConnectionError,
2117 TimeoutError,
2118 MovedError,
2119 AskError,
2120 TryAgainError,
2121 )
2123 NO_SLOTS_COMMANDS = {"UNWATCH"}
2124 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
2125 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
2127 @deprecated_args(
2128 args_to_warn=[
2129 "cluster_error_retry_attempts",
2130 ],
2131 reason="Please configure the 'retry' object instead",
2132 version="6.0.0",
2133 )
2134 def __init__(
2135 self,
2136 nodes_manager: "NodesManager",
2137 commands_parser: "CommandsParser",
2138 result_callbacks: Optional[Dict[str, Callable]] = None,
2139 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
2140 startup_nodes: Optional[List["ClusterNode"]] = None,
2141 read_from_replicas: bool = False,
2142 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2143 cluster_error_retry_attempts: int = 3,
2144 reinitialize_steps: int = 5,
2145 retry: Optional[Retry] = None,
2146 lock=None,
2147 transaction=False,
2148 **kwargs,
2149 ):
2150 """ """
2151 self.command_stack = []
2152 self.nodes_manager = nodes_manager
2153 self.commands_parser = commands_parser
2154 self.refresh_table_asap = False
2155 self.result_callbacks = (
2156 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
2157 )
2158 self.startup_nodes = startup_nodes if startup_nodes else []
2159 self.read_from_replicas = read_from_replicas
2160 self.load_balancing_strategy = load_balancing_strategy
2161 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
2162 self.cluster_response_callbacks = cluster_response_callbacks
2163 self.reinitialize_counter = 0
2164 self.reinitialize_steps = reinitialize_steps
2165 if retry is not None:
2166 self.retry = retry
2167 else:
2168 self.retry = Retry(
2169 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
2170 retries=cluster_error_retry_attempts,
2171 )
2173 self.encoder = Encoder(
2174 kwargs.get("encoding", "utf-8"),
2175 kwargs.get("encoding_errors", "strict"),
2176 kwargs.get("decode_responses", False),
2177 )
2178 if lock is None:
2179 lock = threading.RLock()
2180 self._lock = lock
2181 self.parent_execute_command = super().execute_command
2182 self._execution_strategy: ExecutionStrategy = (
2183 PipelineStrategy(self) if not transaction else TransactionStrategy(self)
2184 )
2186 def __repr__(self):
2187 """ """
2188 return f"{type(self).__name__}"
2190 def __enter__(self):
2191 """ """
2192 return self
2194 def __exit__(self, exc_type, exc_value, traceback):
2195 """ """
2196 self.reset()
2198 def __del__(self):
2199 try:
2200 self.reset()
2201 except Exception:
2202 pass
2204 def __len__(self):
2205 """ """
2206 return len(self._execution_strategy.command_queue)
2208 def __bool__(self):
2209 "Pipeline instances should always evaluate to True on Python 3+"
2210 return True
2212 def execute_command(self, *args, **kwargs):
2213 """
2214 Wrapper function for pipeline_execute_command
2215 """
2216 return self._execution_strategy.execute_command(*args, **kwargs)
2218 def pipeline_execute_command(self, *args, **options):
2219 """
2220 Stage a command to be executed when execute() is next called
2222 Returns the current Pipeline object back so commands can be
2223 chained together, such as:
2225 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
2227 At some other point, you can then run: pipe.execute(),
2228 which will execute all commands queued in the pipe.
2229 """
2230 return self._execution_strategy.execute_command(*args, **options)
2232 def annotate_exception(self, exception, number, command):
2233 """
2234 Provides extra context to the exception prior to it being handled
2235 """
2236 self._execution_strategy.annotate_exception(exception, number, command)
2238 def execute(self, raise_on_error: bool = True) -> List[Any]:
2239 """
2240 Execute all the commands in the current pipeline
2241 """
2243 try:
2244 return self._execution_strategy.execute(raise_on_error)
2245 finally:
2246 self.reset()
2248 def reset(self):
2249 """
2250 Reset back to empty pipeline.
2251 """
2252 self._execution_strategy.reset()
2254 def send_cluster_commands(
2255 self, stack, raise_on_error=True, allow_redirections=True
2256 ):
2257 return self._execution_strategy.send_cluster_commands(
2258 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
2259 )
2261 def exists(self, *keys):
2262 return self._execution_strategy.exists(*keys)
2264 def eval(self):
2265 """ """
2266 return self._execution_strategy.eval()
2268 def multi(self):
2269 """
2270 Start a transactional block of the pipeline after WATCH commands
2271 are issued. End the transactional block with `execute`.
2272 """
2273 self._execution_strategy.multi()
2275 def load_scripts(self):
2276 """ """
2277 self._execution_strategy.load_scripts()
2279 def discard(self):
2280 """ """
2281 self._execution_strategy.discard()
2283 def watch(self, *names):
2284 """Watches the values at keys ``names``"""
2285 self._execution_strategy.watch(*names)
2287 def unwatch(self):
2288 """Unwatches all previously specified keys"""
2289 self._execution_strategy.unwatch()
2291 def script_load_for_pipeline(self, *args, **kwargs):
2292 self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
2294 def delete(self, *names):
2295 self._execution_strategy.delete(*names)
2297 def unlink(self, *names):
2298 self._execution_strategy.unlink(*names)
2301def block_pipeline_command(name: str) -> Callable[..., Any]:
2302 """
2303 Prints error because some pipelined commands should
2304 be blocked when running in cluster-mode
2305 """
2307 def inner(*args, **kwargs):
2308 raise RedisClusterException(
2309 f"ERROR: Calling pipelined function {name} is blocked "
2310 f"when running redis in cluster mode..."
2311 )
2313 return inner
2316# Blocked pipeline commands
2317PIPELINE_BLOCKED_COMMANDS = (
2318 "BGREWRITEAOF",
2319 "BGSAVE",
2320 "BITOP",
2321 "BRPOPLPUSH",
2322 "CLIENT GETNAME",
2323 "CLIENT KILL",
2324 "CLIENT LIST",
2325 "CLIENT SETNAME",
2326 "CLIENT",
2327 "CONFIG GET",
2328 "CONFIG RESETSTAT",
2329 "CONFIG REWRITE",
2330 "CONFIG SET",
2331 "CONFIG",
2332 "DBSIZE",
2333 "ECHO",
2334 "EVALSHA",
2335 "FLUSHALL",
2336 "FLUSHDB",
2337 "INFO",
2338 "KEYS",
2339 "LASTSAVE",
2340 "MGET",
2341 "MGET NONATOMIC",
2342 "MOVE",
2343 "MSET",
2344 "MSET NONATOMIC",
2345 "MSETNX",
2346 "PFCOUNT",
2347 "PFMERGE",
2348 "PING",
2349 "PUBLISH",
2350 "RANDOMKEY",
2351 "READONLY",
2352 "READWRITE",
2353 "RENAME",
2354 "RENAMENX",
2355 "RPOPLPUSH",
2356 "SAVE",
2357 "SCAN",
2358 "SCRIPT EXISTS",
2359 "SCRIPT FLUSH",
2360 "SCRIPT KILL",
2361 "SCRIPT LOAD",
2362 "SCRIPT",
2363 "SDIFF",
2364 "SDIFFSTORE",
2365 "SENTINEL GET MASTER ADDR BY NAME",
2366 "SENTINEL MASTER",
2367 "SENTINEL MASTERS",
2368 "SENTINEL MONITOR",
2369 "SENTINEL REMOVE",
2370 "SENTINEL SENTINELS",
2371 "SENTINEL SET",
2372 "SENTINEL SLAVES",
2373 "SENTINEL",
2374 "SHUTDOWN",
2375 "SINTER",
2376 "SINTERSTORE",
2377 "SLAVEOF",
2378 "SLOWLOG GET",
2379 "SLOWLOG LEN",
2380 "SLOWLOG RESET",
2381 "SLOWLOG",
2382 "SMOVE",
2383 "SORT",
2384 "SUNION",
2385 "SUNIONSTORE",
2386 "TIME",
2387)
2388for command in PIPELINE_BLOCKED_COMMANDS:
2389 command = command.replace(" ", "_").lower()
2391 setattr(ClusterPipeline, command, block_pipeline_command(command))
2394class PipelineCommand:
2395 """ """
2397 def __init__(self, args, options=None, position=None):
2398 self.args = args
2399 if options is None:
2400 options = {}
2401 self.options = options
2402 self.position = position
2403 self.result = None
2404 self.node = None
2405 self.asking = False
2408class NodeCommands:
2409 """ """
2411 def __init__(self, parse_response, connection_pool, connection):
2412 """ """
2413 self.parse_response = parse_response
2414 self.connection_pool = connection_pool
2415 self.connection = connection
2416 self.commands = []
2418 def append(self, c):
2419 """ """
2420 self.commands.append(c)
2422 def write(self):
2423 """
2424 Code borrowed from Redis so it can be fixed
2425 """
2426 connection = self.connection
2427 commands = self.commands
2429 # We are going to clobber the commands with the write, so go ahead
2430 # and ensure that nothing is sitting there from a previous run.
2431 for c in commands:
2432 c.result = None
2434 # build up all commands into a single request to increase network perf
2435 # send all the commands and catch connection and timeout errors.
2436 try:
2437 connection.send_packed_command(
2438 connection.pack_commands([c.args for c in commands])
2439 )
2440 except (ConnectionError, TimeoutError) as e:
2441 for c in commands:
2442 c.result = e
2444 def read(self):
2445 """ """
2446 connection = self.connection
2447 for c in self.commands:
2448 # if there is a result on this command,
2449 # it means we ran into an exception
2450 # like a connection error. Trying to parse
2451 # a response on a connection that
2452 # is no longer open will result in a
2453 # connection error raised by redis-py.
2454 # but redis-py doesn't check in parse_response
2455 # that the sock object is
2456 # still set and if you try to
2457 # read from a closed connection, it will
2458 # result in an AttributeError because
2459 # it will do a readline() call on None.
2460 # This can have all kinds of nasty side-effects.
2461 # Treating this case as a connection error
2462 # is fine because it will dump
2463 # the connection object back into the
2464 # pool and on the next write, it will
2465 # explicitly open the connection and all will be well.
2466 if c.result is None:
2467 try:
2468 c.result = self.parse_response(connection, c.args[0], **c.options)
2469 except (ConnectionError, TimeoutError) as e:
2470 for c in self.commands:
2471 c.result = e
2472 return
2473 except RedisError:
2474 c.result = sys.exc_info()[1]
2477class ExecutionStrategy(ABC):
2478 @property
2479 @abstractmethod
2480 def command_queue(self):
2481 pass
2483 @abstractmethod
2484 def execute_command(self, *args, **kwargs):
2485 """
2486 Execution flow for current execution strategy.
2488 See: ClusterPipeline.execute_command()
2489 """
2490 pass
2492 @abstractmethod
2493 def annotate_exception(self, exception, number, command):
2494 """
2495 Annotate exception according to current execution strategy.
2497 See: ClusterPipeline.annotate_exception()
2498 """
2499 pass
2501 @abstractmethod
2502 def pipeline_execute_command(self, *args, **options):
2503 """
2504 Pipeline execution flow for current execution strategy.
2506 See: ClusterPipeline.pipeline_execute_command()
2507 """
2508 pass
2510 @abstractmethod
2511 def execute(self, raise_on_error: bool = True) -> List[Any]:
2512 """
2513 Executes current execution strategy.
2515 See: ClusterPipeline.execute()
2516 """
2517 pass
2519 @abstractmethod
2520 def send_cluster_commands(
2521 self, stack, raise_on_error=True, allow_redirections=True
2522 ):
2523 """
2524 Sends commands according to current execution strategy.
2526 See: ClusterPipeline.send_cluster_commands()
2527 """
2528 pass
2530 @abstractmethod
2531 def reset(self):
2532 """
2533 Resets current execution strategy.
2535 See: ClusterPipeline.reset()
2536 """
2537 pass
2539 @abstractmethod
2540 def exists(self, *keys):
2541 pass
2543 @abstractmethod
2544 def eval(self):
2545 pass
2547 @abstractmethod
2548 def multi(self):
2549 """
2550 Starts transactional context.
2552 See: ClusterPipeline.multi()
2553 """
2554 pass
2556 @abstractmethod
2557 def load_scripts(self):
2558 pass
2560 @abstractmethod
2561 def watch(self, *names):
2562 pass
2564 @abstractmethod
2565 def unwatch(self):
2566 """
2567 Unwatches all previously specified keys
2569 See: ClusterPipeline.unwatch()
2570 """
2571 pass
2573 @abstractmethod
2574 def script_load_for_pipeline(self, *args, **kwargs):
2575 pass
2577 @abstractmethod
2578 def delete(self, *names):
2579 """
2580 "Delete a key specified by ``names``"
2582 See: ClusterPipeline.delete()
2583 """
2584 pass
2586 @abstractmethod
2587 def unlink(self, *names):
2588 """
2589 "Unlink a key specified by ``names``"
2591 See: ClusterPipeline.unlink()
2592 """
2593 pass
2595 @abstractmethod
2596 def discard(self):
2597 pass
2600class AbstractStrategy(ExecutionStrategy):
2601 def __init__(
2602 self,
2603 pipe: ClusterPipeline,
2604 ):
2605 self._command_queue: List[PipelineCommand] = []
2606 self._pipe = pipe
2607 self._nodes_manager = self._pipe.nodes_manager
2609 @property
2610 def command_queue(self):
2611 return self._command_queue
2613 @command_queue.setter
2614 def command_queue(self, queue: List[PipelineCommand]):
2615 self._command_queue = queue
2617 @abstractmethod
2618 def execute_command(self, *args, **kwargs):
2619 pass
2621 def pipeline_execute_command(self, *args, **options):
2622 self._command_queue.append(
2623 PipelineCommand(args, options, len(self._command_queue))
2624 )
2625 return self._pipe
2627 @abstractmethod
2628 def execute(self, raise_on_error: bool = True) -> List[Any]:
2629 pass
2631 @abstractmethod
2632 def send_cluster_commands(
2633 self, stack, raise_on_error=True, allow_redirections=True
2634 ):
2635 pass
2637 @abstractmethod
2638 def reset(self):
2639 pass
2641 def exists(self, *keys):
2642 return self.execute_command("EXISTS", *keys)
2644 def eval(self):
2645 """ """
2646 raise RedisClusterException("method eval() is not implemented")
2648 def load_scripts(self):
2649 """ """
2650 raise RedisClusterException("method load_scripts() is not implemented")
2652 def script_load_for_pipeline(self, *args, **kwargs):
2653 """ """
2654 raise RedisClusterException(
2655 "method script_load_for_pipeline() is not implemented"
2656 )
2658 def annotate_exception(self, exception, number, command):
2659 """
2660 Provides extra context to the exception prior to it being handled
2661 """
2662 cmd = " ".join(map(safe_str, command))
2663 msg = (
2664 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
2665 f"caused error: {exception.args[0]}"
2666 )
2667 exception.args = (msg,) + exception.args[1:]
2670class PipelineStrategy(AbstractStrategy):
2671 def __init__(self, pipe: ClusterPipeline):
2672 super().__init__(pipe)
2673 self.command_flags = pipe.command_flags
2675 def execute_command(self, *args, **kwargs):
2676 return self.pipeline_execute_command(*args, **kwargs)
2678 def _raise_first_error(self, stack):
2679 """
2680 Raise the first exception on the stack
2681 """
2682 for c in stack:
2683 r = c.result
2684 if isinstance(r, Exception):
2685 self.annotate_exception(r, c.position + 1, c.args)
2686 raise r
2688 def execute(self, raise_on_error: bool = True) -> List[Any]:
2689 stack = self._command_queue
2690 if not stack:
2691 return []
2693 try:
2694 return self.send_cluster_commands(stack, raise_on_error)
2695 finally:
2696 self.reset()
2698 def reset(self):
2699 """
2700 Reset back to empty pipeline.
2701 """
2702 self._command_queue = []
2704 def send_cluster_commands(
2705 self, stack, raise_on_error=True, allow_redirections=True
2706 ):
2707 """
2708 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
2710 If one of the retryable exceptions has been thrown we assume that:
2711 - connection_pool was disconnected
2712 - connection_pool was reseted
2713 - refereh_table_asap set to True
2715 It will try the number of times specified by
2716 the retries in config option "self.retry"
2717 which defaults to 3 unless manually configured.
2719 If it reaches the number of times, the command will
2720 raises ClusterDownException.
2721 """
2722 if not stack:
2723 return []
2724 retry_attempts = self._pipe.retry.get_retries()
2725 while True:
2726 try:
2727 return self._send_cluster_commands(
2728 stack,
2729 raise_on_error=raise_on_error,
2730 allow_redirections=allow_redirections,
2731 )
2732 except RedisCluster.ERRORS_ALLOW_RETRY as e:
2733 if retry_attempts > 0:
2734 # Try again with the new cluster setup. All other errors
2735 # should be raised.
2736 retry_attempts -= 1
2737 pass
2738 else:
2739 raise e
2741 def _send_cluster_commands(
2742 self, stack, raise_on_error=True, allow_redirections=True
2743 ):
2744 """
2745 Send a bunch of cluster commands to the redis cluster.
2747 `allow_redirections` If the pipeline should follow
2748 `ASK` & `MOVED` responses automatically. If set
2749 to false it will raise RedisClusterException.
2750 """
2751 # the first time sending the commands we send all of
2752 # the commands that were queued up.
2753 # if we have to run through it again, we only retry
2754 # the commands that failed.
2755 attempt = sorted(stack, key=lambda x: x.position)
2756 is_default_node = False
2757 # build a list of node objects based on node names we need to
2758 nodes = {}
2760 # as we move through each command that still needs to be processed,
2761 # we figure out the slot number that command maps to, then from
2762 # the slot determine the node.
2763 for c in attempt:
2764 while True:
2765 # refer to our internal node -> slot table that
2766 # tells us where a given command should route to.
2767 # (it might be possible we have a cached node that no longer
2768 # exists in the cluster, which is why we do this in a loop)
2769 passed_targets = c.options.pop("target_nodes", None)
2770 if passed_targets and not self._is_nodes_flag(passed_targets):
2771 target_nodes = self._parse_target_nodes(passed_targets)
2772 else:
2773 target_nodes = self._determine_nodes(
2774 *c.args, node_flag=passed_targets
2775 )
2776 if not target_nodes:
2777 raise RedisClusterException(
2778 f"No targets were found to execute {c.args} command on"
2779 )
2780 if len(target_nodes) > 1:
2781 raise RedisClusterException(
2782 f"Too many targets for command {c.args}"
2783 )
2785 node = target_nodes[0]
2786 if node == self._pipe.get_default_node():
2787 is_default_node = True
2789 # now that we know the name of the node
2790 # ( it's just a string in the form of host:port )
2791 # we can build a list of commands for each node.
2792 node_name = node.name
2793 if node_name not in nodes:
2794 redis_node = self._pipe.get_redis_connection(node)
2795 try:
2796 connection = get_connection(redis_node)
2797 except (ConnectionError, TimeoutError):
2798 for n in nodes.values():
2799 n.connection_pool.release(n.connection)
2800 # Connection retries are being handled in the node's
2801 # Retry object. Reinitialize the node -> slot table.
2802 self._nodes_manager.initialize()
2803 if is_default_node:
2804 self._pipe.replace_default_node()
2805 raise
2806 nodes[node_name] = NodeCommands(
2807 redis_node.parse_response,
2808 redis_node.connection_pool,
2809 connection,
2810 )
2811 nodes[node_name].append(c)
2812 break
2814 # send the commands in sequence.
2815 # we write to all the open sockets for each node first,
2816 # before reading anything
2817 # this allows us to flush all the requests out across the
2818 # network
2819 # so that we can read them from different sockets as they come back.
2820 # we dont' multiplex on the sockets as they come available,
2821 # but that shouldn't make too much difference.
2822 try:
2823 node_commands = nodes.values()
2824 for n in node_commands:
2825 n.write()
2827 for n in node_commands:
2828 n.read()
2829 finally:
2830 # release all of the redis connections we allocated earlier
2831 # back into the connection pool.
2832 # we used to do this step as part of a try/finally block,
2833 # but it is really dangerous to
2834 # release connections back into the pool if for some
2835 # reason the socket has data still left in it
2836 # from a previous operation. The write and
2837 # read operations already have try/catch around them for
2838 # all known types of errors including connection
2839 # and socket level errors.
2840 # So if we hit an exception, something really bad
2841 # happened and putting any oF
2842 # these connections back into the pool is a very bad idea.
2843 # the socket might have unread buffer still sitting in it,
2844 # and then the next time we read from it we pass the
2845 # buffered result back from a previous command and
2846 # every single request after to that connection will always get
2847 # a mismatched result.
2848 for n in nodes.values():
2849 n.connection_pool.release(n.connection)
2851 # if the response isn't an exception it is a
2852 # valid response from the node
2853 # we're all done with that command, YAY!
2854 # if we have more commands to attempt, we've run into problems.
2855 # collect all the commands we are allowed to retry.
2856 # (MOVED, ASK, or connection errors or timeout errors)
2857 attempt = sorted(
2858 (
2859 c
2860 for c in attempt
2861 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2862 ),
2863 key=lambda x: x.position,
2864 )
2865 if attempt and allow_redirections:
2866 # RETRY MAGIC HAPPENS HERE!
2867 # send these remaining commands one at a time using `execute_command`
2868 # in the main client. This keeps our retry logic
2869 # in one place mostly,
2870 # and allows us to be more confident in correctness of behavior.
2871 # at this point any speed gains from pipelining have been lost
2872 # anyway, so we might as well make the best
2873 # attempt to get the correct behavior.
2874 #
2875 # The client command will handle retries for each
2876 # individual command sequentially as we pass each
2877 # one into `execute_command`. Any exceptions
2878 # that bubble out should only appear once all
2879 # retries have been exhausted.
2880 #
2881 # If a lot of commands have failed, we'll be setting the
2882 # flag to rebuild the slots table from scratch.
2883 # So MOVED errors should correct themselves fairly quickly.
2884 self._pipe.reinitialize_counter += 1
2885 if self._pipe._should_reinitialized():
2886 self._nodes_manager.initialize()
2887 if is_default_node:
2888 self._pipe.replace_default_node()
2889 for c in attempt:
2890 try:
2891 # send each command individually like we
2892 # do in the main client.
2893 c.result = self._pipe.parent_execute_command(*c.args, **c.options)
2894 except RedisError as e:
2895 c.result = e
2897 # turn the response back into a simple flat array that corresponds
2898 # to the sequence of commands issued in the stack in pipeline.execute()
2899 response = []
2900 for c in sorted(stack, key=lambda x: x.position):
2901 if c.args[0] in self._pipe.cluster_response_callbacks:
2902 # Remove keys entry, it needs only for cache.
2903 c.options.pop("keys", None)
2904 c.result = self._pipe.cluster_response_callbacks[c.args[0]](
2905 c.result, **c.options
2906 )
2907 response.append(c.result)
2909 if raise_on_error:
2910 self._raise_first_error(stack)
2912 return response
2914 def _is_nodes_flag(self, target_nodes):
2915 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
2917 def _parse_target_nodes(self, target_nodes):
2918 if isinstance(target_nodes, list):
2919 nodes = target_nodes
2920 elif isinstance(target_nodes, ClusterNode):
2921 # Supports passing a single ClusterNode as a variable
2922 nodes = [target_nodes]
2923 elif isinstance(target_nodes, dict):
2924 # Supports dictionaries of the format {node_name: node}.
2925 # It enables to execute commands with multi nodes as follows:
2926 # rc.cluster_save_config(rc.get_primaries())
2927 nodes = target_nodes.values()
2928 else:
2929 raise TypeError(
2930 "target_nodes type can be one of the following: "
2931 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
2932 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
2933 f"The passed type is {type(target_nodes)}"
2934 )
2935 return nodes
2937 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
2938 # Determine which nodes should be executed the command on.
2939 # Returns a list of target nodes.
2940 command = args[0].upper()
2941 if (
2942 len(args) >= 2
2943 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
2944 ):
2945 command = f"{args[0]} {args[1]}".upper()
2947 nodes_flag = kwargs.pop("nodes_flag", None)
2948 if nodes_flag is not None:
2949 # nodes flag passed by the user
2950 command_flag = nodes_flag
2951 else:
2952 # get the nodes group for this command if it was predefined
2953 command_flag = self._pipe.command_flags.get(command)
2954 if command_flag == self._pipe.RANDOM:
2955 # return a random node
2956 return [self._pipe.get_random_node()]
2957 elif command_flag == self._pipe.PRIMARIES:
2958 # return all primaries
2959 return self._pipe.get_primaries()
2960 elif command_flag == self._pipe.REPLICAS:
2961 # return all replicas
2962 return self._pipe.get_replicas()
2963 elif command_flag == self._pipe.ALL_NODES:
2964 # return all nodes
2965 return self._pipe.get_nodes()
2966 elif command_flag == self._pipe.DEFAULT_NODE:
2967 # return the cluster's default node
2968 return [self._nodes_manager.default_node]
2969 elif command in self._pipe.SEARCH_COMMANDS[0]:
2970 return [self._nodes_manager.default_node]
2971 else:
2972 # get the node that holds the key's slot
2973 slot = self._pipe.determine_slot(*args)
2974 node = self._nodes_manager.get_node_from_slot(
2975 slot,
2976 self._pipe.read_from_replicas and command in READ_COMMANDS,
2977 self._pipe.load_balancing_strategy
2978 if command in READ_COMMANDS
2979 else None,
2980 )
2981 return [node]
2983 def multi(self):
2984 raise RedisClusterException(
2985 "method multi() is not supported outside of transactional context"
2986 )
2988 def discard(self):
2989 raise RedisClusterException(
2990 "method discard() is not supported outside of transactional context"
2991 )
2993 def watch(self, *names):
2994 raise RedisClusterException(
2995 "method watch() is not supported outside of transactional context"
2996 )
2998 def unwatch(self, *names):
2999 raise RedisClusterException(
3000 "method unwatch() is not supported outside of transactional context"
3001 )
3003 def delete(self, *names):
3004 if len(names) != 1:
3005 raise RedisClusterException(
3006 "deleting multiple keys is not implemented in pipeline command"
3007 )
3009 return self.execute_command("DEL", names[0])
3011 def unlink(self, *names):
3012 if len(names) != 1:
3013 raise RedisClusterException(
3014 "unlinking multiple keys is not implemented in pipeline command"
3015 )
3017 return self.execute_command("UNLINK", names[0])
3020class TransactionStrategy(AbstractStrategy):
3021 NO_SLOTS_COMMANDS = {"UNWATCH"}
3022 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
3023 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3024 SLOT_REDIRECT_ERRORS = (AskError, MovedError)
3025 CONNECTION_ERRORS = (
3026 ConnectionError,
3027 OSError,
3028 ClusterDownError,
3029 SlotNotCoveredError,
3030 )
3032 def __init__(self, pipe: ClusterPipeline):
3033 super().__init__(pipe)
3034 self._explicit_transaction = False
3035 self._watching = False
3036 self._pipeline_slots: Set[int] = set()
3037 self._transaction_connection: Optional[Connection] = None
3038 self._executing = False
3039 self._retry = copy(self._pipe.retry)
3040 self._retry.update_supported_errors(
3041 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
3042 )
3044 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
3045 """
3046 Find a connection for a pipeline transaction.
3048 For running an atomic transaction, watch keys ensure that contents have not been
3049 altered as long as the watch commands for those keys were sent over the same
3050 connection. So once we start watching a key, we fetch a connection to the
3051 node that owns that slot and reuse it.
3052 """
3053 if not self._pipeline_slots:
3054 raise RedisClusterException(
3055 "At least a command with a key is needed to identify a node"
3056 )
3058 node: ClusterNode = self._nodes_manager.get_node_from_slot(
3059 list(self._pipeline_slots)[0], False
3060 )
3061 redis_node: Redis = self._pipe.get_redis_connection(node)
3062 if self._transaction_connection:
3063 if not redis_node.connection_pool.owns_connection(
3064 self._transaction_connection
3065 ):
3066 previous_node = self._nodes_manager.find_connection_owner(
3067 self._transaction_connection
3068 )
3069 previous_node.connection_pool.release(self._transaction_connection)
3070 self._transaction_connection = None
3072 if not self._transaction_connection:
3073 self._transaction_connection = get_connection(redis_node)
3075 return redis_node, self._transaction_connection
3077 def execute_command(self, *args, **kwargs):
3078 slot_number: Optional[int] = None
3079 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
3080 slot_number = self._pipe.determine_slot(*args)
3082 if (
3083 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
3084 ) and not self._explicit_transaction:
3085 if args[0] == "WATCH":
3086 self._validate_watch()
3088 if slot_number is not None:
3089 if self._pipeline_slots and slot_number not in self._pipeline_slots:
3090 raise CrossSlotTransactionError(
3091 "Cannot watch or send commands on different slots"
3092 )
3094 self._pipeline_slots.add(slot_number)
3095 elif args[0] not in self.NO_SLOTS_COMMANDS:
3096 raise RedisClusterException(
3097 f"Cannot identify slot number for command: {args[0]},"
3098 "it cannot be triggered in a transaction"
3099 )
3101 return self._immediate_execute_command(*args, **kwargs)
3102 else:
3103 if slot_number is not None:
3104 self._pipeline_slots.add(slot_number)
3106 return self.pipeline_execute_command(*args, **kwargs)
3108 def _validate_watch(self):
3109 if self._explicit_transaction:
3110 raise RedisError("Cannot issue a WATCH after a MULTI")
3112 self._watching = True
3114 def _immediate_execute_command(self, *args, **options):
3115 return self._retry.call_with_retry(
3116 lambda: self._get_connection_and_send_command(*args, **options),
3117 self._reinitialize_on_error,
3118 )
3120 def _get_connection_and_send_command(self, *args, **options):
3121 redis_node, connection = self._get_client_and_connection_for_transaction()
3122 return self._send_command_parse_response(
3123 connection, redis_node, args[0], *args, **options
3124 )
3126 def _send_command_parse_response(
3127 self, conn, redis_node: Redis, command_name, *args, **options
3128 ):
3129 """
3130 Send a command and parse the response
3131 """
3133 conn.send_command(*args)
3134 output = redis_node.parse_response(conn, command_name, **options)
3136 if command_name in self.UNWATCH_COMMANDS:
3137 self._watching = False
3138 return output
3140 def _reinitialize_on_error(self, error):
3141 if self._watching:
3142 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
3143 raise WatchError("Slot rebalancing occurred while watching keys")
3145 if (
3146 type(error) in self.SLOT_REDIRECT_ERRORS
3147 or type(error) in self.CONNECTION_ERRORS
3148 ):
3149 if self._transaction_connection:
3150 self._transaction_connection = None
3152 self._pipe.reinitialize_counter += 1
3153 if self._pipe._should_reinitialized():
3154 self._nodes_manager.initialize()
3155 self.reinitialize_counter = 0
3156 else:
3157 self._nodes_manager.update_moved_exception(error)
3159 self._executing = False
3161 def _raise_first_error(self, responses, stack):
3162 """
3163 Raise the first exception on the stack
3164 """
3165 for r, cmd in zip(responses, stack):
3166 if isinstance(r, Exception):
3167 self.annotate_exception(r, cmd.position + 1, cmd.args)
3168 raise r
3170 def execute(self, raise_on_error: bool = True) -> List[Any]:
3171 stack = self._command_queue
3172 if not stack and (not self._watching or not self._pipeline_slots):
3173 return []
3175 return self._execute_transaction_with_retries(stack, raise_on_error)
3177 def _execute_transaction_with_retries(
3178 self, stack: List["PipelineCommand"], raise_on_error: bool
3179 ):
3180 return self._retry.call_with_retry(
3181 lambda: self._execute_transaction(stack, raise_on_error),
3182 self._reinitialize_on_error,
3183 )
3185 def _execute_transaction(
3186 self, stack: List["PipelineCommand"], raise_on_error: bool
3187 ):
3188 if len(self._pipeline_slots) > 1:
3189 raise CrossSlotTransactionError(
3190 "All keys involved in a cluster transaction must map to the same slot"
3191 )
3193 self._executing = True
3195 redis_node, connection = self._get_client_and_connection_for_transaction()
3197 stack = chain(
3198 [PipelineCommand(("MULTI",))],
3199 stack,
3200 [PipelineCommand(("EXEC",))],
3201 )
3202 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
3203 packed_commands = connection.pack_commands(commands)
3204 connection.send_packed_command(packed_commands)
3205 errors = []
3207 # parse off the response for MULTI
3208 # NOTE: we need to handle ResponseErrors here and continue
3209 # so that we read all the additional command messages from
3210 # the socket
3211 try:
3212 redis_node.parse_response(connection, "MULTI")
3213 except ResponseError as e:
3214 self.annotate_exception(e, 0, "MULTI")
3215 errors.append(e)
3216 except self.CONNECTION_ERRORS as cluster_error:
3217 self.annotate_exception(cluster_error, 0, "MULTI")
3218 raise
3220 # and all the other commands
3221 for i, command in enumerate(self._command_queue):
3222 if EMPTY_RESPONSE in command.options:
3223 errors.append((i, command.options[EMPTY_RESPONSE]))
3224 else:
3225 try:
3226 _ = redis_node.parse_response(connection, "_")
3227 except self.SLOT_REDIRECT_ERRORS as slot_error:
3228 self.annotate_exception(slot_error, i + 1, command.args)
3229 errors.append(slot_error)
3230 except self.CONNECTION_ERRORS as cluster_error:
3231 self.annotate_exception(cluster_error, i + 1, command.args)
3232 raise
3233 except ResponseError as e:
3234 self.annotate_exception(e, i + 1, command.args)
3235 errors.append(e)
3237 response = None
3238 # parse the EXEC.
3239 try:
3240 response = redis_node.parse_response(connection, "EXEC")
3241 except ExecAbortError:
3242 if errors:
3243 raise errors[0]
3244 raise
3246 self._executing = False
3248 # EXEC clears any watched keys
3249 self._watching = False
3251 if response is None:
3252 raise WatchError("Watched variable changed.")
3254 # put any parse errors into the response
3255 for i, e in errors:
3256 response.insert(i, e)
3258 if len(response) != len(self._command_queue):
3259 raise InvalidPipelineStack(
3260 "Unexpected response length for cluster pipeline EXEC."
3261 " Command stack was {} but response had length {}".format(
3262 [c.args[0] for c in self._command_queue], len(response)
3263 )
3264 )
3266 # find any errors in the response and raise if necessary
3267 if raise_on_error or len(errors) > 0:
3268 self._raise_first_error(
3269 response,
3270 self._command_queue,
3271 )
3273 # We have to run response callbacks manually
3274 data = []
3275 for r, cmd in zip(response, self._command_queue):
3276 if not isinstance(r, Exception):
3277 command_name = cmd.args[0]
3278 if command_name in self._pipe.cluster_response_callbacks:
3279 r = self._pipe.cluster_response_callbacks[command_name](
3280 r, **cmd.options
3281 )
3282 data.append(r)
3283 return data
3285 def reset(self):
3286 self._command_queue = []
3288 # make sure to reset the connection state in the event that we were
3289 # watching something
3290 if self._transaction_connection:
3291 try:
3292 if self._watching:
3293 # call this manually since our unwatch or
3294 # immediate_execute_command methods can call reset()
3295 self._transaction_connection.send_command("UNWATCH")
3296 self._transaction_connection.read_response()
3297 # we can safely return the connection to the pool here since we're
3298 # sure we're no longer WATCHing anything
3299 node = self._nodes_manager.find_connection_owner(
3300 self._transaction_connection
3301 )
3302 node.redis_connection.connection_pool.release(
3303 self._transaction_connection
3304 )
3305 self._transaction_connection = None
3306 except self.CONNECTION_ERRORS:
3307 # disconnect will also remove any previous WATCHes
3308 if self._transaction_connection:
3309 self._transaction_connection.disconnect()
3311 # clean up the other instance attributes
3312 self._watching = False
3313 self._explicit_transaction = False
3314 self._pipeline_slots = set()
3315 self._executing = False
3317 def send_cluster_commands(
3318 self, stack, raise_on_error=True, allow_redirections=True
3319 ):
3320 raise NotImplementedError(
3321 "send_cluster_commands cannot be executed in transactional context."
3322 )
3324 def multi(self):
3325 if self._explicit_transaction:
3326 raise RedisError("Cannot issue nested calls to MULTI")
3327 if self._command_queue:
3328 raise RedisError(
3329 "Commands without an initial WATCH have already been issued"
3330 )
3331 self._explicit_transaction = True
3333 def watch(self, *names):
3334 if self._explicit_transaction:
3335 raise RedisError("Cannot issue a WATCH after a MULTI")
3337 return self.execute_command("WATCH", *names)
3339 def unwatch(self):
3340 if self._watching:
3341 return self.execute_command("UNWATCH")
3343 return True
3345 def discard(self):
3346 self.reset()
3348 def delete(self, *names):
3349 return self.execute_command("DEL", *names)
3351 def unlink(self, *names):
3352 return self.execute_command("UNLINK", *names)