Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import random
2import socket
3import sys
4import threading
5import time
6from abc import ABC, abstractmethod
7from collections import OrderedDict
8from copy import copy
9from enum import Enum
10from itertools import chain
11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
13from redis._parsers import CommandsParser, Encoder
14from redis._parsers.helpers import parse_scan
15from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
16from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
17from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
18from redis.commands import READ_COMMANDS, RedisClusterCommands
19from redis.commands.helpers import list_or_args
20from redis.connection import (
21 Connection,
22 ConnectionPool,
23 parse_url,
24)
25from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
26from redis.event import (
27 AfterPooledConnectionsInstantiationEvent,
28 AfterPubSubConnectionInstantiationEvent,
29 ClientType,
30 EventDispatcher,
31)
32from redis.exceptions import (
33 AskError,
34 AuthenticationError,
35 ClusterDownError,
36 ClusterError,
37 ConnectionError,
38 CrossSlotTransactionError,
39 DataError,
40 ExecAbortError,
41 InvalidPipelineStack,
42 MaxConnectionsError,
43 MovedError,
44 RedisClusterException,
45 RedisError,
46 ResponseError,
47 SlotNotCoveredError,
48 TimeoutError,
49 TryAgainError,
50 WatchError,
51)
52from redis.lock import Lock
53from redis.retry import Retry
54from redis.utils import (
55 deprecated_args,
56 dict_merge,
57 list_keys_to_dict,
58 merge_result,
59 safe_str,
60 str_if_bytes,
61 truncate_text,
62)
65def get_node_name(host: str, port: Union[str, int]) -> str:
66 return f"{host}:{port}"
69@deprecated_args(
70 allowed_args=["redis_node"],
71 reason="Use get_connection(redis_node) instead",
72 version="5.3.0",
73)
74def get_connection(redis_node: Redis, *args, **options) -> Connection:
75 return redis_node.connection or redis_node.connection_pool.get_connection()
78def parse_scan_result(command, res, **options):
79 cursors = {}
80 ret = []
81 for node_name, response in res.items():
82 cursor, r = parse_scan(response, **options)
83 cursors[node_name] = cursor
84 ret += r
86 return cursors, ret
89def parse_pubsub_numsub(command, res, **options):
90 numsub_d = OrderedDict()
91 for numsub_tups in res.values():
92 for channel, numsubbed in numsub_tups:
93 try:
94 numsub_d[channel] += numsubbed
95 except KeyError:
96 numsub_d[channel] = numsubbed
98 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
99 return ret_numsub
102def parse_cluster_slots(
103 resp: Any, **options: Any
104) -> Dict[Tuple[int, int], Dict[str, Any]]:
105 current_host = options.get("current_host", "")
107 def fix_server(*args: Any) -> Tuple[str, Any]:
108 return str_if_bytes(args[0]) or current_host, args[1]
110 slots = {}
111 for slot in resp:
112 start, end, primary = slot[:3]
113 replicas = slot[3:]
114 slots[start, end] = {
115 "primary": fix_server(*primary),
116 "replicas": [fix_server(*replica) for replica in replicas],
117 }
119 return slots
122def parse_cluster_shards(resp, **options):
123 """
124 Parse CLUSTER SHARDS response.
125 """
126 if isinstance(resp[0], dict):
127 return resp
128 shards = []
129 for x in resp:
130 shard = {"slots": [], "nodes": []}
131 for i in range(0, len(x[1]), 2):
132 shard["slots"].append((x[1][i], (x[1][i + 1])))
133 nodes = x[3]
134 for node in nodes:
135 dict_node = {}
136 for i in range(0, len(node), 2):
137 dict_node[node[i]] = node[i + 1]
138 shard["nodes"].append(dict_node)
139 shards.append(shard)
141 return shards
144def parse_cluster_myshardid(resp, **options):
145 """
146 Parse CLUSTER MYSHARDID response.
147 """
148 return resp.decode("utf-8")
151PRIMARY = "primary"
152REPLICA = "replica"
153SLOT_ID = "slot-id"
155REDIS_ALLOWED_KEYS = (
156 "connection_class",
157 "connection_pool",
158 "connection_pool_class",
159 "client_name",
160 "credential_provider",
161 "db",
162 "decode_responses",
163 "encoding",
164 "encoding_errors",
165 "host",
166 "lib_name",
167 "lib_version",
168 "max_connections",
169 "nodes_flag",
170 "redis_connect_func",
171 "password",
172 "port",
173 "timeout",
174 "queue_class",
175 "retry",
176 "retry_on_timeout",
177 "protocol",
178 "socket_connect_timeout",
179 "socket_keepalive",
180 "socket_keepalive_options",
181 "socket_timeout",
182 "ssl",
183 "ssl_ca_certs",
184 "ssl_ca_data",
185 "ssl_certfile",
186 "ssl_cert_reqs",
187 "ssl_include_verify_flags",
188 "ssl_exclude_verify_flags",
189 "ssl_keyfile",
190 "ssl_password",
191 "ssl_check_hostname",
192 "unix_socket_path",
193 "username",
194 "cache",
195 "cache_config",
196)
197KWARGS_DISABLED_KEYS = ("host", "port", "retry")
200def cleanup_kwargs(**kwargs):
201 """
202 Remove unsupported or disabled keys from kwargs
203 """
204 connection_kwargs = {
205 k: v
206 for k, v in kwargs.items()
207 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
208 }
210 return connection_kwargs
213class AbstractRedisCluster:
214 RedisClusterRequestTTL = 16
216 PRIMARIES = "primaries"
217 REPLICAS = "replicas"
218 ALL_NODES = "all"
219 RANDOM = "random"
220 DEFAULT_NODE = "default-node"
222 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
224 COMMAND_FLAGS = dict_merge(
225 list_keys_to_dict(
226 [
227 "ACL CAT",
228 "ACL DELUSER",
229 "ACL DRYRUN",
230 "ACL GENPASS",
231 "ACL GETUSER",
232 "ACL HELP",
233 "ACL LIST",
234 "ACL LOG",
235 "ACL LOAD",
236 "ACL SAVE",
237 "ACL SETUSER",
238 "ACL USERS",
239 "ACL WHOAMI",
240 "AUTH",
241 "CLIENT LIST",
242 "CLIENT SETINFO",
243 "CLIENT SETNAME",
244 "CLIENT GETNAME",
245 "CONFIG SET",
246 "CONFIG REWRITE",
247 "CONFIG RESETSTAT",
248 "TIME",
249 "PUBSUB CHANNELS",
250 "PUBSUB NUMPAT",
251 "PUBSUB NUMSUB",
252 "PUBSUB SHARDCHANNELS",
253 "PUBSUB SHARDNUMSUB",
254 "PING",
255 "INFO",
256 "SHUTDOWN",
257 "KEYS",
258 "DBSIZE",
259 "BGSAVE",
260 "SLOWLOG GET",
261 "SLOWLOG LEN",
262 "SLOWLOG RESET",
263 "WAIT",
264 "WAITAOF",
265 "SAVE",
266 "MEMORY PURGE",
267 "MEMORY MALLOC-STATS",
268 "MEMORY STATS",
269 "LASTSAVE",
270 "CLIENT TRACKINGINFO",
271 "CLIENT PAUSE",
272 "CLIENT UNPAUSE",
273 "CLIENT UNBLOCK",
274 "CLIENT ID",
275 "CLIENT REPLY",
276 "CLIENT GETREDIR",
277 "CLIENT INFO",
278 "CLIENT KILL",
279 "READONLY",
280 "CLUSTER INFO",
281 "CLUSTER MEET",
282 "CLUSTER MYSHARDID",
283 "CLUSTER NODES",
284 "CLUSTER REPLICAS",
285 "CLUSTER RESET",
286 "CLUSTER SET-CONFIG-EPOCH",
287 "CLUSTER SLOTS",
288 "CLUSTER SHARDS",
289 "CLUSTER COUNT-FAILURE-REPORTS",
290 "CLUSTER KEYSLOT",
291 "COMMAND",
292 "COMMAND COUNT",
293 "COMMAND LIST",
294 "COMMAND GETKEYS",
295 "CONFIG GET",
296 "DEBUG",
297 "RANDOMKEY",
298 "READONLY",
299 "READWRITE",
300 "TIME",
301 "TFUNCTION LOAD",
302 "TFUNCTION DELETE",
303 "TFUNCTION LIST",
304 "TFCALL",
305 "TFCALLASYNC",
306 "LATENCY HISTORY",
307 "LATENCY LATEST",
308 "LATENCY RESET",
309 "MODULE LIST",
310 "MODULE LOAD",
311 "MODULE UNLOAD",
312 "MODULE LOADEX",
313 ],
314 DEFAULT_NODE,
315 ),
316 list_keys_to_dict(
317 [
318 "FLUSHALL",
319 "FLUSHDB",
320 "FUNCTION DELETE",
321 "FUNCTION FLUSH",
322 "FUNCTION LIST",
323 "FUNCTION LOAD",
324 "FUNCTION RESTORE",
325 "SCAN",
326 "SCRIPT EXISTS",
327 "SCRIPT FLUSH",
328 "SCRIPT LOAD",
329 ],
330 PRIMARIES,
331 ),
332 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
333 list_keys_to_dict(
334 [
335 "CLUSTER COUNTKEYSINSLOT",
336 "CLUSTER DELSLOTS",
337 "CLUSTER DELSLOTSRANGE",
338 "CLUSTER GETKEYSINSLOT",
339 "CLUSTER SETSLOT",
340 ],
341 SLOT_ID,
342 ),
343 )
345 SEARCH_COMMANDS = (
346 [
347 "FT.CREATE",
348 "FT.SEARCH",
349 "FT.AGGREGATE",
350 "FT.EXPLAIN",
351 "FT.EXPLAINCLI",
352 "FT,PROFILE",
353 "FT.ALTER",
354 "FT.DROPINDEX",
355 "FT.ALIASADD",
356 "FT.ALIASUPDATE",
357 "FT.ALIASDEL",
358 "FT.TAGVALS",
359 "FT.SUGADD",
360 "FT.SUGGET",
361 "FT.SUGDEL",
362 "FT.SUGLEN",
363 "FT.SYNUPDATE",
364 "FT.SYNDUMP",
365 "FT.SPELLCHECK",
366 "FT.DICTADD",
367 "FT.DICTDEL",
368 "FT.DICTDUMP",
369 "FT.INFO",
370 "FT._LIST",
371 "FT.CONFIG",
372 "FT.ADD",
373 "FT.DEL",
374 "FT.DROP",
375 "FT.GET",
376 "FT.MGET",
377 "FT.SYNADD",
378 ],
379 )
381 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
382 "CLUSTER SLOTS": parse_cluster_slots,
383 "CLUSTER SHARDS": parse_cluster_shards,
384 "CLUSTER MYSHARDID": parse_cluster_myshardid,
385 }
387 RESULT_CALLBACKS = dict_merge(
388 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
389 list_keys_to_dict(
390 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
391 ),
392 list_keys_to_dict(
393 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
394 ),
395 list_keys_to_dict(
396 [
397 "PING",
398 "CONFIG SET",
399 "CONFIG REWRITE",
400 "CONFIG RESETSTAT",
401 "CLIENT SETNAME",
402 "BGSAVE",
403 "SLOWLOG RESET",
404 "SAVE",
405 "MEMORY PURGE",
406 "CLIENT PAUSE",
407 "CLIENT UNPAUSE",
408 ],
409 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
410 ),
411 list_keys_to_dict(
412 ["DBSIZE", "WAIT"],
413 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
414 ),
415 list_keys_to_dict(
416 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
417 ),
418 list_keys_to_dict(["SCAN"], parse_scan_result),
419 list_keys_to_dict(
420 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
421 ),
422 list_keys_to_dict(
423 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
424 ),
425 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
426 )
428 ERRORS_ALLOW_RETRY = (
429 ConnectionError,
430 TimeoutError,
431 ClusterDownError,
432 SlotNotCoveredError,
433 )
435 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
436 """Replace the default cluster node.
437 A random cluster node will be chosen if target_node isn't passed, and primaries
438 will be prioritized. The default node will not be changed if there are no other
439 nodes in the cluster.
441 Args:
442 target_node (ClusterNode, optional): Target node to replace the default
443 node. Defaults to None.
444 """
445 if target_node:
446 self.nodes_manager.default_node = target_node
447 else:
448 curr_node = self.get_default_node()
449 primaries = [node for node in self.get_primaries() if node != curr_node]
450 if primaries:
451 # Choose a primary if the cluster contains different primaries
452 self.nodes_manager.default_node = random.choice(primaries)
453 else:
454 # Otherwise, choose a primary if the cluster contains different primaries
455 replicas = [node for node in self.get_replicas() if node != curr_node]
456 if replicas:
457 self.nodes_manager.default_node = random.choice(replicas)
460class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
461 @classmethod
462 def from_url(cls, url, **kwargs):
463 """
464 Return a Redis client object configured from the given URL
466 For example::
468 redis://[[username]:[password]]@localhost:6379/0
469 rediss://[[username]:[password]]@localhost:6379/0
470 unix://[username@]/path/to/socket.sock?db=0[&password=password]
472 Three URL schemes are supported:
474 - `redis://` creates a TCP socket connection. See more at:
475 <https://www.iana.org/assignments/uri-schemes/prov/redis>
476 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
477 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
478 - ``unix://``: creates a Unix Domain Socket connection.
480 The username, password, hostname, path and all querystring values
481 are passed through urllib.parse.unquote in order to replace any
482 percent-encoded values with their corresponding characters.
484 There are several ways to specify a database number. The first value
485 found will be used:
487 1. A ``db`` querystring option, e.g. redis://localhost?db=0
488 2. If using the redis:// or rediss:// schemes, the path argument
489 of the url, e.g. redis://localhost/0
490 3. A ``db`` keyword argument to this function.
492 If none of these options are specified, the default db=0 is used.
494 All querystring options are cast to their appropriate Python types.
495 Boolean arguments can be specified with string values "True"/"False"
496 or "Yes"/"No". Values that cannot be properly cast cause a
497 ``ValueError`` to be raised. Once parsed, the querystring arguments
498 and keyword arguments are passed to the ``ConnectionPool``'s
499 class initializer. In the case of conflicting arguments, querystring
500 arguments always win.
502 """
503 return cls(url=url, **kwargs)
505 @deprecated_args(
506 args_to_warn=["read_from_replicas"],
507 reason="Please configure the 'load_balancing_strategy' instead",
508 version="5.3.0",
509 )
510 @deprecated_args(
511 args_to_warn=[
512 "cluster_error_retry_attempts",
513 ],
514 reason="Please configure the 'retry' object instead",
515 version="6.0.0",
516 )
517 def __init__(
518 self,
519 host: Optional[str] = None,
520 port: int = 6379,
521 startup_nodes: Optional[List["ClusterNode"]] = None,
522 cluster_error_retry_attempts: int = 3,
523 retry: Optional["Retry"] = None,
524 require_full_coverage: bool = True,
525 reinitialize_steps: int = 5,
526 read_from_replicas: bool = False,
527 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
528 dynamic_startup_nodes: bool = True,
529 url: Optional[str] = None,
530 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
531 cache: Optional[CacheInterface] = None,
532 cache_config: Optional[CacheConfig] = None,
533 event_dispatcher: Optional[EventDispatcher] = None,
534 **kwargs,
535 ):
536 """
537 Initialize a new RedisCluster client.
539 :param startup_nodes:
540 List of nodes from which initial bootstrapping can be done
541 :param host:
542 Can be used to point to a startup node
543 :param port:
544 Can be used to point to a startup node
545 :param require_full_coverage:
546 When set to False (default value): the client will not require a
547 full coverage of the slots. However, if not all slots are covered,
548 and at least one node has 'cluster-require-full-coverage' set to
549 'yes,' the server will throw a ClusterDownError for some key-based
550 commands. See -
551 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
552 When set to True: all slots must be covered to construct the
553 cluster client. If not all slots are covered, RedisClusterException
554 will be thrown.
555 :param read_from_replicas:
556 @deprecated - please use load_balancing_strategy instead
557 Enable read from replicas in READONLY mode. You can read possibly
558 stale data.
559 When set to true, read commands will be assigned between the
560 primary and its replications in a Round-Robin manner.
561 :param load_balancing_strategy:
562 Enable read from replicas in READONLY mode and defines the load balancing
563 strategy that will be used for cluster node selection.
564 The data read from replicas is eventually consistent with the data in primary nodes.
565 :param dynamic_startup_nodes:
566 Set the RedisCluster's startup nodes to all of the discovered nodes.
567 If true (default value), the cluster's discovered nodes will be used to
568 determine the cluster nodes-slots mapping in the next topology refresh.
569 It will remove the initial passed startup nodes if their endpoints aren't
570 listed in the CLUSTER SLOTS output.
571 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
572 specific IP addresses, it is best to set it to false.
573 :param cluster_error_retry_attempts:
574 @deprecated - Please configure the 'retry' object instead
575 In case 'retry' object is set - this argument is ignored!
577 Number of times to retry before raising an error when
578 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
579 :class:`~.ClusterDownError` are encountered
580 :param retry:
581 A retry object that defines the retry strategy and the number of
582 retries for the cluster client.
583 In current implementation for the cluster client (starting form redis-py version 6.0.0)
584 the retry object is not yet fully utilized, instead it is used just to determine
585 the number of retries for the cluster client.
586 In the future releases the retry object will be used to handle the cluster client retries!
587 :param reinitialize_steps:
588 Specifies the number of MOVED errors that need to occur before
589 reinitializing the whole cluster topology. If a MOVED error occurs
590 and the cluster does not need to be reinitialized on this current
591 error handling, only the MOVED slot will be patched with the
592 redirected node.
593 To reinitialize the cluster on every MOVED error, set
594 reinitialize_steps to 1.
595 To avoid reinitializing the cluster on moved errors, set
596 reinitialize_steps to 0.
597 :param address_remap:
598 An optional callable which, when provided with an internal network
599 address of a node, e.g. a `(host, port)` tuple, will return the address
600 where the node is reachable. This can be used to map the addresses at
601 which the nodes _think_ they are, to addresses at which a client may
602 reach them, such as when they sit behind a proxy.
604 :**kwargs:
605 Extra arguments that will be sent into Redis instance when created
606 (See Official redis-py doc for supported kwargs - the only limitation
607 is that you can't provide 'retry' object as part of kwargs.
608 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
609 Some kwargs are not supported and will raise a
610 RedisClusterException:
611 - db (Redis do not support database SELECT in cluster mode)
612 """
613 if startup_nodes is None:
614 startup_nodes = []
616 if "db" in kwargs:
617 # Argument 'db' is not possible to use in cluster mode
618 raise RedisClusterException(
619 "Argument 'db' is not possible to use in cluster mode"
620 )
622 if "retry" in kwargs:
623 # Argument 'retry' is not possible to be used in kwargs when in cluster mode
624 # the kwargs are set to the lower level connections to the cluster nodes
625 # and there we provide retry configuration without retries allowed.
626 # The retries should be handled on cluster client level.
627 raise RedisClusterException(
628 "The 'retry' argument cannot be used in kwargs when running in cluster mode."
629 )
631 # Get the startup node/s
632 from_url = False
633 if url is not None:
634 from_url = True
635 url_options = parse_url(url)
636 if "path" in url_options:
637 raise RedisClusterException(
638 "RedisCluster does not currently support Unix Domain "
639 "Socket connections"
640 )
641 if "db" in url_options and url_options["db"] != 0:
642 # Argument 'db' is not possible to use in cluster mode
643 raise RedisClusterException(
644 "A ``db`` querystring option can only be 0 in cluster mode"
645 )
646 kwargs.update(url_options)
647 host = kwargs.get("host")
648 port = kwargs.get("port", port)
649 startup_nodes.append(ClusterNode(host, port))
650 elif host is not None and port is not None:
651 startup_nodes.append(ClusterNode(host, port))
652 elif len(startup_nodes) == 0:
653 # No startup node was provided
654 raise RedisClusterException(
655 "RedisCluster requires at least one node to discover the "
656 "cluster. Please provide one of the followings:\n"
657 "1. host and port, for example:\n"
658 " RedisCluster(host='localhost', port=6379)\n"
659 "2. list of startup nodes, for example:\n"
660 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
661 " ClusterNode('localhost', 6378)])"
662 )
663 # Update the connection arguments
664 # Whenever a new connection is established, RedisCluster's on_connect
665 # method should be run
666 # If the user passed on_connect function we'll save it and run it
667 # inside the RedisCluster.on_connect() function
668 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
669 kwargs.update({"redis_connect_func": self.on_connect})
670 kwargs = cleanup_kwargs(**kwargs)
671 if retry:
672 self.retry = retry
673 else:
674 self.retry = Retry(
675 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
676 retries=cluster_error_retry_attempts,
677 )
679 self.encoder = Encoder(
680 kwargs.get("encoding", "utf-8"),
681 kwargs.get("encoding_errors", "strict"),
682 kwargs.get("decode_responses", False),
683 )
684 protocol = kwargs.get("protocol", None)
685 if (cache_config or cache) and protocol not in [3, "3"]:
686 raise RedisError("Client caching is only supported with RESP version 3")
688 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
689 self.node_flags = self.__class__.NODE_FLAGS.copy()
690 self.read_from_replicas = read_from_replicas
691 self.load_balancing_strategy = load_balancing_strategy
692 self.reinitialize_counter = 0
693 self.reinitialize_steps = reinitialize_steps
694 if event_dispatcher is None:
695 self._event_dispatcher = EventDispatcher()
696 else:
697 self._event_dispatcher = event_dispatcher
698 self.startup_nodes = startup_nodes
699 self.nodes_manager = NodesManager(
700 startup_nodes=startup_nodes,
701 from_url=from_url,
702 require_full_coverage=require_full_coverage,
703 dynamic_startup_nodes=dynamic_startup_nodes,
704 address_remap=address_remap,
705 cache=cache,
706 cache_config=cache_config,
707 event_dispatcher=self._event_dispatcher,
708 **kwargs,
709 )
711 self.cluster_response_callbacks = CaseInsensitiveDict(
712 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
713 )
714 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
716 self.commands_parser = CommandsParser(self)
717 self._lock = threading.RLock()
719 def __enter__(self):
720 return self
722 def __exit__(self, exc_type, exc_value, traceback):
723 self.close()
725 def __del__(self):
726 try:
727 self.close()
728 except Exception:
729 pass
731 def disconnect_connection_pools(self):
732 for node in self.get_nodes():
733 if node.redis_connection:
734 try:
735 node.redis_connection.connection_pool.disconnect()
736 except OSError:
737 # Client was already disconnected. do nothing
738 pass
740 def on_connect(self, connection):
741 """
742 Initialize the connection, authenticate and select a database and send
743 READONLY if it is set during object initialization.
744 """
745 connection.on_connect()
747 if self.read_from_replicas or self.load_balancing_strategy:
748 # Sending READONLY command to server to configure connection as
749 # readonly. Since each cluster node may change its server type due
750 # to a failover, we should establish a READONLY connection
751 # regardless of the server type. If this is a primary connection,
752 # READONLY would not affect executing write commands.
753 connection.send_command("READONLY")
754 if str_if_bytes(connection.read_response()) != "OK":
755 raise ConnectionError("READONLY command failed")
757 if self.user_on_connect_func is not None:
758 self.user_on_connect_func(connection)
760 def get_redis_connection(self, node: "ClusterNode") -> Redis:
761 if not node.redis_connection:
762 with self._lock:
763 if not node.redis_connection:
764 self.nodes_manager.create_redis_connections([node])
765 return node.redis_connection
767 def get_node(self, host=None, port=None, node_name=None):
768 return self.nodes_manager.get_node(host, port, node_name)
770 def get_primaries(self):
771 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
773 def get_replicas(self):
774 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
776 def get_random_node(self):
777 return random.choice(list(self.nodes_manager.nodes_cache.values()))
779 def get_nodes(self):
780 return list(self.nodes_manager.nodes_cache.values())
782 def get_node_from_key(self, key, replica=False):
783 """
784 Get the node that holds the key's slot.
785 If replica set to True but the slot doesn't have any replicas, None is
786 returned.
787 """
788 slot = self.keyslot(key)
789 slot_cache = self.nodes_manager.slots_cache.get(slot)
790 if slot_cache is None or len(slot_cache) == 0:
791 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
792 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
793 return None
794 elif replica:
795 node_idx = 1
796 else:
797 # primary
798 node_idx = 0
800 return slot_cache[node_idx]
802 def get_default_node(self):
803 """
804 Get the cluster's default node
805 """
806 return self.nodes_manager.default_node
808 def set_default_node(self, node):
809 """
810 Set the default node of the cluster.
811 :param node: 'ClusterNode'
812 :return True if the default node was set, else False
813 """
814 if node is None or self.get_node(node_name=node.name) is None:
815 return False
816 self.nodes_manager.default_node = node
817 return True
819 def set_retry(self, retry: Retry) -> None:
820 self.retry = retry
822 def monitor(self, target_node=None):
823 """
824 Returns a Monitor object for the specified target node.
825 The default cluster node will be selected if no target node was
826 specified.
827 Monitor is useful for handling the MONITOR command to the redis server.
828 next_command() method returns one command from monitor
829 listen() method yields commands from monitor.
830 """
831 if target_node is None:
832 target_node = self.get_default_node()
833 if target_node.redis_connection is None:
834 raise RedisClusterException(
835 f"Cluster Node {target_node.name} has no redis_connection"
836 )
837 return target_node.redis_connection.monitor()
839 def pubsub(self, node=None, host=None, port=None, **kwargs):
840 """
841 Allows passing a ClusterNode, or host&port, to get a pubsub instance
842 connected to the specified node
843 """
844 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
846 def pipeline(self, transaction=None, shard_hint=None):
847 """
848 Cluster impl:
849 Pipelines do not work in cluster mode the same way they
850 do in normal mode. Create a clone of this object so
851 that simulating pipelines will work correctly. Each
852 command will be called directly when used and
853 when calling execute() will only return the result stack.
854 """
855 if shard_hint:
856 raise RedisClusterException("shard_hint is deprecated in cluster mode")
858 return ClusterPipeline(
859 nodes_manager=self.nodes_manager,
860 commands_parser=self.commands_parser,
861 startup_nodes=self.nodes_manager.startup_nodes,
862 result_callbacks=self.result_callbacks,
863 cluster_response_callbacks=self.cluster_response_callbacks,
864 read_from_replicas=self.read_from_replicas,
865 load_balancing_strategy=self.load_balancing_strategy,
866 reinitialize_steps=self.reinitialize_steps,
867 retry=self.retry,
868 lock=self._lock,
869 transaction=transaction,
870 )
872 def lock(
873 self,
874 name,
875 timeout=None,
876 sleep=0.1,
877 blocking=True,
878 blocking_timeout=None,
879 lock_class=None,
880 thread_local=True,
881 raise_on_release_error: bool = True,
882 ):
883 """
884 Return a new Lock object using key ``name`` that mimics
885 the behavior of threading.Lock.
887 If specified, ``timeout`` indicates a maximum life for the lock.
888 By default, it will remain locked until release() is called.
890 ``sleep`` indicates the amount of time to sleep per loop iteration
891 when the lock is in blocking mode and another client is currently
892 holding the lock.
894 ``blocking`` indicates whether calling ``acquire`` should block until
895 the lock has been acquired or to fail immediately, causing ``acquire``
896 to return False and the lock not being acquired. Defaults to True.
897 Note this value can be overridden by passing a ``blocking``
898 argument to ``acquire``.
900 ``blocking_timeout`` indicates the maximum amount of time in seconds to
901 spend trying to acquire the lock. A value of ``None`` indicates
902 continue trying forever. ``blocking_timeout`` can be specified as a
903 float or integer, both representing the number of seconds to wait.
905 ``lock_class`` forces the specified lock implementation. Note that as
906 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
907 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
908 you have created your own custom lock class.
910 ``thread_local`` indicates whether the lock token is placed in
911 thread-local storage. By default, the token is placed in thread local
912 storage so that a thread only sees its token, not a token set by
913 another thread. Consider the following timeline:
915 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
916 thread-1 sets the token to "abc"
917 time: 1, thread-2 blocks trying to acquire `my-lock` using the
918 Lock instance.
919 time: 5, thread-1 has not yet completed. redis expires the lock
920 key.
921 time: 5, thread-2 acquired `my-lock` now that it's available.
922 thread-2 sets the token to "xyz"
923 time: 6, thread-1 finishes its work and calls release(). if the
924 token is *not* stored in thread local storage, then
925 thread-1 would see the token value as "xyz" and would be
926 able to successfully release the thread-2's lock.
928 ``raise_on_release_error`` indicates whether to raise an exception when
929 the lock is no longer owned when exiting the context manager. By default,
930 this is True, meaning an exception will be raised. If False, the warning
931 will be logged and the exception will be suppressed.
933 In some use cases it's necessary to disable thread local storage. For
934 example, if you have code where one thread acquires a lock and passes
935 that lock instance to a worker thread to release later. If thread
936 local storage isn't disabled in this case, the worker thread won't see
937 the token set by the thread that acquired the lock. Our assumption
938 is that these cases aren't common and as such default to using
939 thread local storage."""
940 if lock_class is None:
941 lock_class = Lock
942 return lock_class(
943 self,
944 name,
945 timeout=timeout,
946 sleep=sleep,
947 blocking=blocking,
948 blocking_timeout=blocking_timeout,
949 thread_local=thread_local,
950 raise_on_release_error=raise_on_release_error,
951 )
953 def set_response_callback(self, command, callback):
954 """Set a custom Response Callback"""
955 self.cluster_response_callbacks[command] = callback
957 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
958 # Determine which nodes should be executed the command on.
959 # Returns a list of target nodes.
960 command = args[0].upper()
961 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
962 command = f"{args[0]} {args[1]}".upper()
964 nodes_flag = kwargs.pop("nodes_flag", None)
965 if nodes_flag is not None:
966 # nodes flag passed by the user
967 command_flag = nodes_flag
968 else:
969 # get the nodes group for this command if it was predefined
970 command_flag = self.command_flags.get(command)
971 if command_flag == self.__class__.RANDOM:
972 # return a random node
973 return [self.get_random_node()]
974 elif command_flag == self.__class__.PRIMARIES:
975 # return all primaries
976 return self.get_primaries()
977 elif command_flag == self.__class__.REPLICAS:
978 # return all replicas
979 return self.get_replicas()
980 elif command_flag == self.__class__.ALL_NODES:
981 # return all nodes
982 return self.get_nodes()
983 elif command_flag == self.__class__.DEFAULT_NODE:
984 # return the cluster's default node
985 return [self.nodes_manager.default_node]
986 elif command in self.__class__.SEARCH_COMMANDS[0]:
987 return [self.nodes_manager.default_node]
988 else:
989 # get the node that holds the key's slot
990 slot = self.determine_slot(*args)
991 node = self.nodes_manager.get_node_from_slot(
992 slot,
993 self.read_from_replicas and command in READ_COMMANDS,
994 self.load_balancing_strategy if command in READ_COMMANDS else None,
995 )
996 return [node]
998 def _should_reinitialized(self):
999 # To reinitialize the cluster on every MOVED error,
1000 # set reinitialize_steps to 1.
1001 # To avoid reinitializing the cluster on moved errors, set
1002 # reinitialize_steps to 0.
1003 if self.reinitialize_steps == 0:
1004 return False
1005 else:
1006 return self.reinitialize_counter % self.reinitialize_steps == 0
1008 def keyslot(self, key):
1009 """
1010 Calculate keyslot for a given key.
1011 See Keys distribution model in https://redis.io/topics/cluster-spec
1012 """
1013 k = self.encoder.encode(key)
1014 return key_slot(k)
1016 def _get_command_keys(self, *args):
1017 """
1018 Get the keys in the command. If the command has no keys in in, None is
1019 returned.
1021 NOTE: Due to a bug in redis<7.0, this function does not work properly
1022 for EVAL or EVALSHA when the `numkeys` arg is 0.
1023 - issue: https://github.com/redis/redis/issues/9493
1024 - fix: https://github.com/redis/redis/pull/9733
1026 So, don't use this function with EVAL or EVALSHA.
1027 """
1028 redis_conn = self.get_default_node().redis_connection
1029 return self.commands_parser.get_keys(redis_conn, *args)
1031 def determine_slot(self, *args) -> int:
1032 """
1033 Figure out what slot to use based on args.
1035 Raises a RedisClusterException if there's a missing key and we can't
1036 determine what slots to map the command to; or, if the keys don't
1037 all map to the same key slot.
1038 """
1039 command = args[0]
1040 if self.command_flags.get(command) == SLOT_ID:
1041 # The command contains the slot ID
1042 return args[1]
1044 # Get the keys in the command
1046 # EVAL and EVALSHA are common enough that it's wasteful to go to the
1047 # redis server to parse the keys. Besides, there is a bug in redis<7.0
1048 # where `self._get_command_keys()` fails anyway. So, we special case
1049 # EVAL/EVALSHA.
1050 if command.upper() in ("EVAL", "EVALSHA"):
1051 # command syntax: EVAL "script body" num_keys ...
1052 if len(args) <= 2:
1053 raise RedisClusterException(f"Invalid args in command: {args}")
1054 num_actual_keys = int(args[2])
1055 eval_keys = args[3 : 3 + num_actual_keys]
1056 # if there are 0 keys, that means the script can be run on any node
1057 # so we can just return a random slot
1058 if len(eval_keys) == 0:
1059 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1060 keys = eval_keys
1061 else:
1062 keys = self._get_command_keys(*args)
1063 if keys is None or len(keys) == 0:
1064 # FCALL can call a function with 0 keys, that means the function
1065 # can be run on any node so we can just return a random slot
1066 if command.upper() in ("FCALL", "FCALL_RO"):
1067 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1068 raise RedisClusterException(
1069 "No way to dispatch this command to Redis Cluster. "
1070 "Missing key.\nYou can execute the command by specifying "
1071 f"target nodes.\nCommand: {args}"
1072 )
1074 # single key command
1075 if len(keys) == 1:
1076 return self.keyslot(keys[0])
1078 # multi-key command; we need to make sure all keys are mapped to
1079 # the same slot
1080 slots = {self.keyslot(key) for key in keys}
1081 if len(slots) != 1:
1082 raise RedisClusterException(
1083 f"{command} - all keys must map to the same key slot"
1084 )
1086 return slots.pop()
1088 def get_encoder(self):
1089 """
1090 Get the connections' encoder
1091 """
1092 return self.encoder
1094 def get_connection_kwargs(self):
1095 """
1096 Get the connections' key-word arguments
1097 """
1098 return self.nodes_manager.connection_kwargs
1100 def _is_nodes_flag(self, target_nodes):
1101 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1103 def _parse_target_nodes(self, target_nodes):
1104 if isinstance(target_nodes, list):
1105 nodes = target_nodes
1106 elif isinstance(target_nodes, ClusterNode):
1107 # Supports passing a single ClusterNode as a variable
1108 nodes = [target_nodes]
1109 elif isinstance(target_nodes, dict):
1110 # Supports dictionaries of the format {node_name: node}.
1111 # It enables to execute commands with multi nodes as follows:
1112 # rc.cluster_save_config(rc.get_primaries())
1113 nodes = target_nodes.values()
1114 else:
1115 raise TypeError(
1116 "target_nodes type can be one of the following: "
1117 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1118 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1119 f"The passed type is {type(target_nodes)}"
1120 )
1121 return nodes
1123 def execute_command(self, *args, **kwargs):
1124 return self._internal_execute_command(*args, **kwargs)
1126 def _internal_execute_command(self, *args, **kwargs):
1127 """
1128 Wrapper for ERRORS_ALLOW_RETRY error handling.
1130 It will try the number of times specified by the retries property from
1131 config option "self.retry" which defaults to 3 unless manually
1132 configured.
1134 If it reaches the number of times, the command will raise the exception
1136 Key argument :target_nodes: can be passed with the following types:
1137 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1138 ClusterNode
1139 list<ClusterNode>
1140 dict<Any, ClusterNode>
1141 """
1142 target_nodes_specified = False
1143 is_default_node = False
1144 target_nodes = None
1145 passed_targets = kwargs.pop("target_nodes", None)
1146 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1147 target_nodes = self._parse_target_nodes(passed_targets)
1148 target_nodes_specified = True
1149 # If an error that allows retrying was thrown, the nodes and slots
1150 # cache were reinitialized. We will retry executing the command with
1151 # the updated cluster setup only when the target nodes can be
1152 # determined again with the new cache tables. Therefore, when target
1153 # nodes were passed to this function, we cannot retry the command
1154 # execution since the nodes may not be valid anymore after the tables
1155 # were reinitialized. So in case of passed target nodes,
1156 # retry_attempts will be set to 0.
1157 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
1158 # Add one for the first execution
1159 execute_attempts = 1 + retry_attempts
1160 for _ in range(execute_attempts):
1161 try:
1162 res = {}
1163 if not target_nodes_specified:
1164 # Determine the nodes to execute the command on
1165 target_nodes = self._determine_nodes(
1166 *args, **kwargs, nodes_flag=passed_targets
1167 )
1168 if not target_nodes:
1169 raise RedisClusterException(
1170 f"No targets were found to execute {args} command on"
1171 )
1172 if (
1173 len(target_nodes) == 1
1174 and target_nodes[0] == self.get_default_node()
1175 ):
1176 is_default_node = True
1177 for node in target_nodes:
1178 res[node.name] = self._execute_command(node, *args, **kwargs)
1179 # Return the processed result
1180 return self._process_result(args[0], res, **kwargs)
1181 except Exception as e:
1182 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1183 if is_default_node:
1184 # Replace the default cluster node
1185 self.replace_default_node()
1186 # The nodes and slots cache were reinitialized.
1187 # Try again with the new cluster setup.
1188 retry_attempts -= 1
1189 continue
1190 else:
1191 # raise the exception
1192 raise e
1194 def _execute_command(self, target_node, *args, **kwargs):
1195 """
1196 Send a command to a node in the cluster
1197 """
1198 command = args[0]
1199 redis_node = None
1200 connection = None
1201 redirect_addr = None
1202 asking = False
1203 moved = False
1204 ttl = int(self.RedisClusterRequestTTL)
1206 while ttl > 0:
1207 ttl -= 1
1208 try:
1209 if asking:
1210 target_node = self.get_node(node_name=redirect_addr)
1211 elif moved:
1212 # MOVED occurred and the slots cache was updated,
1213 # refresh the target node
1214 slot = self.determine_slot(*args)
1215 target_node = self.nodes_manager.get_node_from_slot(
1216 slot,
1217 self.read_from_replicas and command in READ_COMMANDS,
1218 self.load_balancing_strategy
1219 if command in READ_COMMANDS
1220 else None,
1221 )
1222 moved = False
1224 redis_node = self.get_redis_connection(target_node)
1225 connection = get_connection(redis_node)
1226 if asking:
1227 connection.send_command("ASKING")
1228 redis_node.parse_response(connection, "ASKING", **kwargs)
1229 asking = False
1230 connection.send_command(*args, **kwargs)
1231 response = redis_node.parse_response(connection, command, **kwargs)
1233 # Remove keys entry, it needs only for cache.
1234 kwargs.pop("keys", None)
1236 if command in self.cluster_response_callbacks:
1237 response = self.cluster_response_callbacks[command](
1238 response, **kwargs
1239 )
1240 return response
1241 except AuthenticationError:
1242 raise
1243 except MaxConnectionsError:
1244 # MaxConnectionsError indicates client-side resource exhaustion
1245 # (too many connections in the pool), not a node failure.
1246 # Don't treat this as a node failure - just re-raise the error
1247 # without reinitializing the cluster.
1248 raise
1249 except (ConnectionError, TimeoutError) as e:
1250 # ConnectionError can also be raised if we couldn't get a
1251 # connection from the pool before timing out, so check that
1252 # this is an actual connection before attempting to disconnect.
1253 if connection is not None:
1254 connection.disconnect()
1256 # Remove the failed node from the startup nodes before we try
1257 # to reinitialize the cluster
1258 self.nodes_manager.startup_nodes.pop(target_node.name, None)
1259 # Reset the cluster node's connection
1260 target_node.redis_connection = None
1261 self.nodes_manager.initialize()
1262 raise e
1263 except MovedError as e:
1264 # First, we will try to patch the slots/nodes cache with the
1265 # redirected node output and try again. If MovedError exceeds
1266 # 'reinitialize_steps' number of times, we will force
1267 # reinitializing the tables, and then try again.
1268 # 'reinitialize_steps' counter will increase faster when
1269 # the same client object is shared between multiple threads. To
1270 # reduce the frequency you can set this variable in the
1271 # RedisCluster constructor.
1272 self.reinitialize_counter += 1
1273 if self._should_reinitialized():
1274 self.nodes_manager.initialize()
1275 # Reset the counter
1276 self.reinitialize_counter = 0
1277 else:
1278 self.nodes_manager.update_moved_exception(e)
1279 moved = True
1280 except TryAgainError:
1281 if ttl < self.RedisClusterRequestTTL / 2:
1282 time.sleep(0.05)
1283 except AskError as e:
1284 redirect_addr = get_node_name(host=e.host, port=e.port)
1285 asking = True
1286 except (ClusterDownError, SlotNotCoveredError):
1287 # ClusterDownError can occur during a failover and to get
1288 # self-healed, we will try to reinitialize the cluster layout
1289 # and retry executing the command
1291 # SlotNotCoveredError can occur when the cluster is not fully
1292 # initialized or can be temporary issue.
1293 # We will try to reinitialize the cluster topology
1294 # and retry executing the command
1296 time.sleep(0.25)
1297 self.nodes_manager.initialize()
1298 raise
1299 except ResponseError:
1300 raise
1301 except Exception as e:
1302 if connection:
1303 connection.disconnect()
1304 raise e
1305 finally:
1306 if connection is not None:
1307 redis_node.connection_pool.release(connection)
1309 raise ClusterError("TTL exhausted.")
1311 def close(self) -> None:
1312 try:
1313 with self._lock:
1314 if self.nodes_manager:
1315 self.nodes_manager.close()
1316 except AttributeError:
1317 # RedisCluster's __init__ can fail before nodes_manager is set
1318 pass
1320 def _process_result(self, command, res, **kwargs):
1321 """
1322 Process the result of the executed command.
1323 The function would return a dict or a single value.
1325 :type command: str
1326 :type res: dict
1328 `res` should be in the following format:
1329 Dict<node_name, command_result>
1330 """
1331 if command in self.result_callbacks:
1332 return self.result_callbacks[command](command, res, **kwargs)
1333 elif len(res) == 1:
1334 # When we execute the command on a single node, we can
1335 # remove the dictionary and return a single response
1336 return list(res.values())[0]
1337 else:
1338 return res
1340 def load_external_module(self, funcname, func):
1341 """
1342 This function can be used to add externally defined redis modules,
1343 and their namespaces to the redis client.
1345 ``funcname`` - A string containing the name of the function to create
1346 ``func`` - The function, being added to this class.
1347 """
1348 setattr(self, funcname, func)
1350 def transaction(self, func, *watches, **kwargs):
1351 """
1352 Convenience method for executing the callable `func` as a transaction
1353 while watching all keys specified in `watches`. The 'func' callable
1354 should expect a single argument which is a Pipeline object.
1355 """
1356 shard_hint = kwargs.pop("shard_hint", None)
1357 value_from_callable = kwargs.pop("value_from_callable", False)
1358 watch_delay = kwargs.pop("watch_delay", None)
1359 with self.pipeline(True, shard_hint) as pipe:
1360 while True:
1361 try:
1362 if watches:
1363 pipe.watch(*watches)
1364 func_value = func(pipe)
1365 exec_value = pipe.execute()
1366 return func_value if value_from_callable else exec_value
1367 except WatchError:
1368 if watch_delay is not None and watch_delay > 0:
1369 time.sleep(watch_delay)
1370 continue
1373class ClusterNode:
1374 def __init__(self, host, port, server_type=None, redis_connection=None):
1375 if host == "localhost":
1376 host = socket.gethostbyname(host)
1378 self.host = host
1379 self.port = port
1380 self.name = get_node_name(host, port)
1381 self.server_type = server_type
1382 self.redis_connection = redis_connection
1384 def __repr__(self):
1385 return (
1386 f"[host={self.host},"
1387 f"port={self.port},"
1388 f"name={self.name},"
1389 f"server_type={self.server_type},"
1390 f"redis_connection={self.redis_connection}]"
1391 )
1393 def __eq__(self, obj):
1394 return isinstance(obj, ClusterNode) and obj.name == self.name
1396 def __del__(self):
1397 try:
1398 if self.redis_connection is not None:
1399 self.redis_connection.close()
1400 except Exception:
1401 # Ignore errors when closing the connection
1402 pass
1405class LoadBalancingStrategy(Enum):
1406 ROUND_ROBIN = "round_robin"
1407 ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1408 RANDOM_REPLICA = "random_replica"
1411class LoadBalancer:
1412 """
1413 Round-Robin Load Balancing
1414 """
1416 def __init__(self, start_index: int = 0) -> None:
1417 self.primary_to_idx = {}
1418 self.start_index = start_index
1420 def get_server_index(
1421 self,
1422 primary: str,
1423 list_size: int,
1424 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1425 ) -> int:
1426 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1427 return self._get_random_replica_index(list_size)
1428 else:
1429 return self._get_round_robin_index(
1430 primary,
1431 list_size,
1432 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1433 )
1435 def reset(self) -> None:
1436 self.primary_to_idx.clear()
1438 def _get_random_replica_index(self, list_size: int) -> int:
1439 return random.randint(1, list_size - 1)
1441 def _get_round_robin_index(
1442 self, primary: str, list_size: int, replicas_only: bool
1443 ) -> int:
1444 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1445 if replicas_only and server_index == 0:
1446 # skip the primary node index
1447 server_index = 1
1448 # Update the index for the next round
1449 self.primary_to_idx[primary] = (server_index + 1) % list_size
1450 return server_index
1453class NodesManager:
1454 def __init__(
1455 self,
1456 startup_nodes,
1457 from_url=False,
1458 require_full_coverage=False,
1459 lock=None,
1460 dynamic_startup_nodes=True,
1461 connection_pool_class=ConnectionPool,
1462 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
1463 cache: Optional[CacheInterface] = None,
1464 cache_config: Optional[CacheConfig] = None,
1465 cache_factory: Optional[CacheFactoryInterface] = None,
1466 event_dispatcher: Optional[EventDispatcher] = None,
1467 **kwargs,
1468 ):
1469 self.nodes_cache: Dict[str, Redis] = {}
1470 self.slots_cache = {}
1471 self.startup_nodes = {}
1472 self.default_node = None
1473 self.populate_startup_nodes(startup_nodes)
1474 self.from_url = from_url
1475 self._require_full_coverage = require_full_coverage
1476 self._dynamic_startup_nodes = dynamic_startup_nodes
1477 self.connection_pool_class = connection_pool_class
1478 self.address_remap = address_remap
1479 self._cache = cache
1480 self._cache_config = cache_config
1481 self._cache_factory = cache_factory
1482 self._moved_exception = None
1483 self.connection_kwargs = kwargs
1484 self.read_load_balancer = LoadBalancer()
1485 if lock is None:
1486 lock = threading.RLock()
1487 self._lock = lock
1488 if event_dispatcher is None:
1489 self._event_dispatcher = EventDispatcher()
1490 else:
1491 self._event_dispatcher = event_dispatcher
1492 self._credential_provider = self.connection_kwargs.get(
1493 "credential_provider", None
1494 )
1495 self.initialize()
1497 def get_node(self, host=None, port=None, node_name=None):
1498 """
1499 Get the requested node from the cluster's nodes.
1500 nodes.
1501 :return: ClusterNode if the node exists, else None
1502 """
1503 if host and port:
1504 # the user passed host and port
1505 if host == "localhost":
1506 host = socket.gethostbyname(host)
1507 return self.nodes_cache.get(get_node_name(host=host, port=port))
1508 elif node_name:
1509 return self.nodes_cache.get(node_name)
1510 else:
1511 return None
1513 def update_moved_exception(self, exception):
1514 self._moved_exception = exception
1516 def _update_moved_slots(self):
1517 """
1518 Update the slot's node with the redirected one
1519 """
1520 e = self._moved_exception
1521 redirected_node = self.get_node(host=e.host, port=e.port)
1522 if redirected_node is not None:
1523 # The node already exists
1524 if redirected_node.server_type is not PRIMARY:
1525 # Update the node's server type
1526 redirected_node.server_type = PRIMARY
1527 else:
1528 # This is a new node, we will add it to the nodes cache
1529 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
1530 self.nodes_cache[redirected_node.name] = redirected_node
1531 if redirected_node in self.slots_cache[e.slot_id]:
1532 # The MOVED error resulted from a failover, and the new slot owner
1533 # had previously been a replica.
1534 old_primary = self.slots_cache[e.slot_id][0]
1535 # Update the old primary to be a replica and add it to the end of
1536 # the slot's node list
1537 old_primary.server_type = REPLICA
1538 self.slots_cache[e.slot_id].append(old_primary)
1539 # Remove the old replica, which is now a primary, from the slot's
1540 # node list
1541 self.slots_cache[e.slot_id].remove(redirected_node)
1542 # Override the old primary with the new one
1543 self.slots_cache[e.slot_id][0] = redirected_node
1544 if self.default_node == old_primary:
1545 # Update the default node with the new primary
1546 self.default_node = redirected_node
1547 else:
1548 # The new slot owner is a new server, or a server from a different
1549 # shard. We need to remove all current nodes from the slot's list
1550 # (including replications) and add just the new node.
1551 self.slots_cache[e.slot_id] = [redirected_node]
1552 # Reset moved_exception
1553 self._moved_exception = None
1555 @deprecated_args(
1556 args_to_warn=["server_type"],
1557 reason=(
1558 "In case you need select some load balancing strategy "
1559 "that will use replicas, please set it through 'load_balancing_strategy'"
1560 ),
1561 version="5.3.0",
1562 )
1563 def get_node_from_slot(
1564 self,
1565 slot,
1566 read_from_replicas=False,
1567 load_balancing_strategy=None,
1568 server_type=None,
1569 ) -> ClusterNode:
1570 """
1571 Gets a node that servers this hash slot
1572 """
1573 if self._moved_exception:
1574 with self._lock:
1575 if self._moved_exception:
1576 self._update_moved_slots()
1578 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
1579 raise SlotNotCoveredError(
1580 f'Slot "{slot}" not covered by the cluster. '
1581 f'"require_full_coverage={self._require_full_coverage}"'
1582 )
1584 if read_from_replicas is True and load_balancing_strategy is None:
1585 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1587 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1588 # get the server index using the strategy defined in load_balancing_strategy
1589 primary_name = self.slots_cache[slot][0].name
1590 node_idx = self.read_load_balancer.get_server_index(
1591 primary_name, len(self.slots_cache[slot]), load_balancing_strategy
1592 )
1593 elif (
1594 server_type is None
1595 or server_type == PRIMARY
1596 or len(self.slots_cache[slot]) == 1
1597 ):
1598 # return a primary
1599 node_idx = 0
1600 else:
1601 # return a replica
1602 # randomly choose one of the replicas
1603 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
1605 return self.slots_cache[slot][node_idx]
1607 def get_nodes_by_server_type(self, server_type):
1608 """
1609 Get all nodes with the specified server type
1610 :param server_type: 'primary' or 'replica'
1611 :return: list of ClusterNode
1612 """
1613 return [
1614 node
1615 for node in self.nodes_cache.values()
1616 if node.server_type == server_type
1617 ]
1619 def populate_startup_nodes(self, nodes):
1620 """
1621 Populate all startup nodes and filters out any duplicates
1622 """
1623 for n in nodes:
1624 self.startup_nodes[n.name] = n
1626 def check_slots_coverage(self, slots_cache):
1627 # Validate if all slots are covered or if we should try next
1628 # startup node
1629 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
1630 if i not in slots_cache:
1631 return False
1632 return True
1634 def create_redis_connections(self, nodes):
1635 """
1636 This function will create a redis connection to all nodes in :nodes:
1637 """
1638 connection_pools = []
1639 for node in nodes:
1640 if node.redis_connection is None:
1641 node.redis_connection = self.create_redis_node(
1642 host=node.host, port=node.port, **self.connection_kwargs
1643 )
1644 connection_pools.append(node.redis_connection.connection_pool)
1646 self._event_dispatcher.dispatch(
1647 AfterPooledConnectionsInstantiationEvent(
1648 connection_pools, ClientType.SYNC, self._credential_provider
1649 )
1650 )
1652 def create_redis_node(self, host, port, **kwargs):
1653 # We are configuring the connection pool not to retry
1654 # connections on lower level clients to avoid retrying
1655 # connections to nodes that are not reachable
1656 # and to avoid blocking the connection pool.
1657 # The only error that will have some handling in the lower
1658 # level clients is ConnectionError which will trigger disconnection
1659 # of the socket.
1660 # The retries will be handled on cluster client level
1661 # where we will have proper handling of the cluster topology
1662 node_retry_config = Retry(
1663 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
1664 )
1666 if self.from_url:
1667 # Create a redis node with a costumed connection pool
1668 kwargs.update({"host": host})
1669 kwargs.update({"port": port})
1670 kwargs.update({"cache": self._cache})
1671 kwargs.update({"retry": node_retry_config})
1672 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
1673 else:
1674 r = Redis(
1675 host=host,
1676 port=port,
1677 cache=self._cache,
1678 retry=node_retry_config,
1679 **kwargs,
1680 )
1681 return r
1683 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
1684 node_name = get_node_name(host, port)
1685 # check if we already have this node in the tmp_nodes_cache
1686 target_node = tmp_nodes_cache.get(node_name)
1687 if target_node is None:
1688 # before creating a new cluster node, check if the cluster node already
1689 # exists in the current nodes cache and has a valid connection so we can
1690 # reuse it
1691 target_node = self.nodes_cache.get(node_name)
1692 if target_node is None or target_node.redis_connection is None:
1693 # create new cluster node for this cluster
1694 target_node = ClusterNode(host, port, role)
1695 if target_node.server_type != role:
1696 target_node.server_type = role
1697 # add this node to the nodes cache
1698 tmp_nodes_cache[target_node.name] = target_node
1700 return target_node
1702 def initialize(self):
1703 """
1704 Initializes the nodes cache, slots cache and redis connections.
1705 :startup_nodes:
1706 Responsible for discovering other nodes in the cluster
1707 """
1708 self.reset()
1709 tmp_nodes_cache = {}
1710 tmp_slots = {}
1711 disagreements = []
1712 startup_nodes_reachable = False
1713 fully_covered = False
1714 kwargs = self.connection_kwargs
1715 exception = None
1716 # Convert to tuple to prevent RuntimeError if self.startup_nodes
1717 # is modified during iteration
1718 for startup_node in tuple(self.startup_nodes.values()):
1719 try:
1720 if startup_node.redis_connection:
1721 r = startup_node.redis_connection
1722 else:
1723 # Create a new Redis connection
1724 r = self.create_redis_node(
1725 startup_node.host, startup_node.port, **kwargs
1726 )
1727 self.startup_nodes[startup_node.name].redis_connection = r
1728 # Make sure cluster mode is enabled on this node
1729 try:
1730 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1731 r.connection_pool.disconnect()
1732 except ResponseError:
1733 raise RedisClusterException(
1734 "Cluster mode is not enabled on this node"
1735 )
1736 startup_nodes_reachable = True
1737 except Exception as e:
1738 # Try the next startup node.
1739 # The exception is saved and raised only if we have no more nodes.
1740 exception = e
1741 continue
1743 # CLUSTER SLOTS command results in the following output:
1744 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
1745 # where each node contains the following list: [IP, port, node_id]
1746 # Therefore, cluster_slots[0][2][0] will be the IP address of the
1747 # primary node of the first slot section.
1748 # If there's only one server in the cluster, its ``host`` is ''
1749 # Fix it to the host in startup_nodes
1750 if (
1751 len(cluster_slots) == 1
1752 and len(cluster_slots[0][2][0]) == 0
1753 and len(self.startup_nodes) == 1
1754 ):
1755 cluster_slots[0][2][0] = startup_node.host
1757 for slot in cluster_slots:
1758 primary_node = slot[2]
1759 host = str_if_bytes(primary_node[0])
1760 if host == "":
1761 host = startup_node.host
1762 port = int(primary_node[1])
1763 host, port = self.remap_host_port(host, port)
1765 nodes_for_slot = []
1767 target_node = self._get_or_create_cluster_node(
1768 host, port, PRIMARY, tmp_nodes_cache
1769 )
1770 nodes_for_slot.append(target_node)
1772 replica_nodes = slot[3:]
1773 for replica_node in replica_nodes:
1774 host = str_if_bytes(replica_node[0])
1775 port = int(replica_node[1])
1776 host, port = self.remap_host_port(host, port)
1777 target_replica_node = self._get_or_create_cluster_node(
1778 host, port, REPLICA, tmp_nodes_cache
1779 )
1780 nodes_for_slot.append(target_replica_node)
1782 for i in range(int(slot[0]), int(slot[1]) + 1):
1783 if i not in tmp_slots:
1784 tmp_slots[i] = nodes_for_slot
1785 else:
1786 # Validate that 2 nodes want to use the same slot cache
1787 # setup
1788 tmp_slot = tmp_slots[i][0]
1789 if tmp_slot.name != target_node.name:
1790 disagreements.append(
1791 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
1792 )
1794 if len(disagreements) > 5:
1795 raise RedisClusterException(
1796 f"startup_nodes could not agree on a valid "
1797 f"slots cache: {', '.join(disagreements)}"
1798 )
1800 fully_covered = self.check_slots_coverage(tmp_slots)
1801 if fully_covered:
1802 # Don't need to continue to the next startup node if all
1803 # slots are covered
1804 break
1806 if not startup_nodes_reachable:
1807 raise RedisClusterException(
1808 f"Redis Cluster cannot be connected. Please provide at least "
1809 f"one reachable node: {str(exception)}"
1810 ) from exception
1812 if self._cache is None and self._cache_config is not None:
1813 if self._cache_factory is None:
1814 self._cache = CacheFactory(self._cache_config).get_cache()
1815 else:
1816 self._cache = self._cache_factory.get_cache()
1818 # Create Redis connections to all nodes
1819 self.create_redis_connections(list(tmp_nodes_cache.values()))
1821 # Check if the slots are not fully covered
1822 if not fully_covered and self._require_full_coverage:
1823 # Despite the requirement that the slots be covered, there
1824 # isn't a full coverage
1825 raise RedisClusterException(
1826 f"All slots are not covered after query all startup_nodes. "
1827 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
1828 f"covered..."
1829 )
1831 # Set the tmp variables to the real variables
1832 self.nodes_cache = tmp_nodes_cache
1833 self.slots_cache = tmp_slots
1834 # Set the default node
1835 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
1836 if self._dynamic_startup_nodes:
1837 # Populate the startup nodes with all discovered nodes
1838 self.startup_nodes = tmp_nodes_cache
1839 # If initialize was called after a MovedError, clear it
1840 self._moved_exception = None
1842 def close(self) -> None:
1843 self.default_node = None
1844 for node in self.nodes_cache.values():
1845 if node.redis_connection:
1846 node.redis_connection.close()
1848 def reset(self):
1849 try:
1850 self.read_load_balancer.reset()
1851 except TypeError:
1852 # The read_load_balancer is None, do nothing
1853 pass
1855 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
1856 """
1857 Remap the host and port returned from the cluster to a different
1858 internal value. Useful if the client is not connecting directly
1859 to the cluster.
1860 """
1861 if self.address_remap:
1862 return self.address_remap((host, port))
1863 return host, port
1865 def find_connection_owner(self, connection: Connection) -> Optional[Redis]:
1866 node_name = get_node_name(connection.host, connection.port)
1867 for node in tuple(self.nodes_cache.values()):
1868 if node.redis_connection:
1869 conn_args = node.redis_connection.connection_pool.connection_kwargs
1870 if node_name == get_node_name(
1871 conn_args.get("host"), conn_args.get("port")
1872 ):
1873 return node
1876class ClusterPubSub(PubSub):
1877 """
1878 Wrapper for PubSub class.
1880 IMPORTANT: before using ClusterPubSub, read about the known limitations
1881 with pubsub in Cluster mode and learn how to workaround them:
1882 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
1883 """
1885 def __init__(
1886 self,
1887 redis_cluster,
1888 node=None,
1889 host=None,
1890 port=None,
1891 push_handler_func=None,
1892 event_dispatcher: Optional["EventDispatcher"] = None,
1893 **kwargs,
1894 ):
1895 """
1896 When a pubsub instance is created without specifying a node, a single
1897 node will be transparently chosen for the pubsub connection on the
1898 first command execution. The node will be determined by:
1899 1. Hashing the channel name in the request to find its keyslot
1900 2. Selecting a node that handles the keyslot: If read_from_replicas is
1901 set to true or load_balancing_strategy is set, a replica can be selected.
1903 :type redis_cluster: RedisCluster
1904 :type node: ClusterNode
1905 :type host: str
1906 :type port: int
1907 """
1908 self.node = None
1909 self.set_pubsub_node(redis_cluster, node, host, port)
1910 connection_pool = (
1911 None
1912 if self.node is None
1913 else redis_cluster.get_redis_connection(self.node).connection_pool
1914 )
1915 self.cluster = redis_cluster
1916 self.node_pubsub_mapping = {}
1917 self._pubsubs_generator = self._pubsubs_generator()
1918 if event_dispatcher is None:
1919 self._event_dispatcher = EventDispatcher()
1920 else:
1921 self._event_dispatcher = event_dispatcher
1922 super().__init__(
1923 connection_pool=connection_pool,
1924 encoder=redis_cluster.encoder,
1925 push_handler_func=push_handler_func,
1926 event_dispatcher=self._event_dispatcher,
1927 **kwargs,
1928 )
1930 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
1931 """
1932 The pubsub node will be set according to the passed node, host and port
1933 When none of the node, host, or port are specified - the node is set
1934 to None and will be determined by the keyslot of the channel in the
1935 first command to be executed.
1936 RedisClusterException will be thrown if the passed node does not exist
1937 in the cluster.
1938 If host is passed without port, or vice versa, a DataError will be
1939 thrown.
1940 :type cluster: RedisCluster
1941 :type node: ClusterNode
1942 :type host: str
1943 :type port: int
1944 """
1945 if node is not None:
1946 # node is passed by the user
1947 self._raise_on_invalid_node(cluster, node, node.host, node.port)
1948 pubsub_node = node
1949 elif host is not None and port is not None:
1950 # host and port passed by the user
1951 node = cluster.get_node(host=host, port=port)
1952 self._raise_on_invalid_node(cluster, node, host, port)
1953 pubsub_node = node
1954 elif any([host, port]) is True:
1955 # only 'host' or 'port' passed
1956 raise DataError("Passing a host requires passing a port, and vice versa")
1957 else:
1958 # nothing passed by the user. set node to None
1959 pubsub_node = None
1961 self.node = pubsub_node
1963 def get_pubsub_node(self):
1964 """
1965 Get the node that is being used as the pubsub connection
1966 """
1967 return self.node
1969 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
1970 """
1971 Raise a RedisClusterException if the node is None or doesn't exist in
1972 the cluster.
1973 """
1974 if node is None or redis_cluster.get_node(node_name=node.name) is None:
1975 raise RedisClusterException(
1976 f"Node {host}:{port} doesn't exist in the cluster"
1977 )
1979 def execute_command(self, *args):
1980 """
1981 Execute a subscribe/unsubscribe command.
1983 Taken code from redis-py and tweak to make it work within a cluster.
1984 """
1985 # NOTE: don't parse the response in this function -- it could pull a
1986 # legitimate message off the stack if the connection is already
1987 # subscribed to one or more channels
1989 if self.connection is None:
1990 if self.connection_pool is None:
1991 if len(args) > 1:
1992 # Hash the first channel and get one of the nodes holding
1993 # this slot
1994 channel = args[1]
1995 slot = self.cluster.keyslot(channel)
1996 node = self.cluster.nodes_manager.get_node_from_slot(
1997 slot,
1998 self.cluster.read_from_replicas,
1999 self.cluster.load_balancing_strategy,
2000 )
2001 else:
2002 # Get a random node
2003 node = self.cluster.get_random_node()
2004 self.node = node
2005 redis_connection = self.cluster.get_redis_connection(node)
2006 self.connection_pool = redis_connection.connection_pool
2007 self.connection = self.connection_pool.get_connection()
2008 # register a callback that re-subscribes to any channels we
2009 # were listening to when we were disconnected
2010 self.connection.register_connect_callback(self.on_connect)
2011 if self.push_handler_func is not None:
2012 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
2013 self._event_dispatcher.dispatch(
2014 AfterPubSubConnectionInstantiationEvent(
2015 self.connection, self.connection_pool, ClientType.SYNC, self._lock
2016 )
2017 )
2018 connection = self.connection
2019 self._execute(connection, connection.send_command, *args)
2021 def _get_node_pubsub(self, node):
2022 try:
2023 return self.node_pubsub_mapping[node.name]
2024 except KeyError:
2025 pubsub = node.redis_connection.pubsub(
2026 push_handler_func=self.push_handler_func
2027 )
2028 self.node_pubsub_mapping[node.name] = pubsub
2029 return pubsub
2031 def _sharded_message_generator(self):
2032 for _ in range(len(self.node_pubsub_mapping)):
2033 pubsub = next(self._pubsubs_generator)
2034 message = pubsub.get_message()
2035 if message is not None:
2036 return message
2037 return None
2039 def _pubsubs_generator(self):
2040 while True:
2041 yield from self.node_pubsub_mapping.values()
2043 def get_sharded_message(
2044 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
2045 ):
2046 if target_node:
2047 message = self.node_pubsub_mapping[target_node.name].get_message(
2048 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
2049 )
2050 else:
2051 message = self._sharded_message_generator()
2052 if message is None:
2053 return None
2054 elif str_if_bytes(message["type"]) == "sunsubscribe":
2055 if message["channel"] in self.pending_unsubscribe_shard_channels:
2056 self.pending_unsubscribe_shard_channels.remove(message["channel"])
2057 self.shard_channels.pop(message["channel"], None)
2058 node = self.cluster.get_node_from_key(message["channel"])
2059 if self.node_pubsub_mapping[node.name].subscribed is False:
2060 self.node_pubsub_mapping.pop(node.name)
2061 if not self.channels and not self.patterns and not self.shard_channels:
2062 # There are no subscriptions anymore, set subscribed_event flag
2063 # to false
2064 self.subscribed_event.clear()
2065 if self.ignore_subscribe_messages or ignore_subscribe_messages:
2066 return None
2067 return message
2069 def ssubscribe(self, *args, **kwargs):
2070 if args:
2071 args = list_or_args(args[0], args[1:])
2072 s_channels = dict.fromkeys(args)
2073 s_channels.update(kwargs)
2074 for s_channel, handler in s_channels.items():
2075 node = self.cluster.get_node_from_key(s_channel)
2076 pubsub = self._get_node_pubsub(node)
2077 if handler:
2078 pubsub.ssubscribe(**{s_channel: handler})
2079 else:
2080 pubsub.ssubscribe(s_channel)
2081 self.shard_channels.update(pubsub.shard_channels)
2082 self.pending_unsubscribe_shard_channels.difference_update(
2083 self._normalize_keys({s_channel: None})
2084 )
2085 if pubsub.subscribed and not self.subscribed:
2086 self.subscribed_event.set()
2087 self.health_check_response_counter = 0
2089 def sunsubscribe(self, *args):
2090 if args:
2091 args = list_or_args(args[0], args[1:])
2092 else:
2093 args = self.shard_channels
2095 for s_channel in args:
2096 node = self.cluster.get_node_from_key(s_channel)
2097 p = self._get_node_pubsub(node)
2098 p.sunsubscribe(s_channel)
2099 self.pending_unsubscribe_shard_channels.update(
2100 p.pending_unsubscribe_shard_channels
2101 )
2103 def get_redis_connection(self):
2104 """
2105 Get the Redis connection of the pubsub connected node.
2106 """
2107 if self.node is not None:
2108 return self.node.redis_connection
2110 def disconnect(self):
2111 """
2112 Disconnect the pubsub connection.
2113 """
2114 if self.connection:
2115 self.connection.disconnect()
2116 for pubsub in self.node_pubsub_mapping.values():
2117 pubsub.connection.disconnect()
2120class ClusterPipeline(RedisCluster):
2121 """
2122 Support for Redis pipeline
2123 in cluster mode
2124 """
2126 ERRORS_ALLOW_RETRY = (
2127 ConnectionError,
2128 TimeoutError,
2129 MovedError,
2130 AskError,
2131 TryAgainError,
2132 )
2134 NO_SLOTS_COMMANDS = {"UNWATCH"}
2135 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
2136 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
2138 @deprecated_args(
2139 args_to_warn=[
2140 "cluster_error_retry_attempts",
2141 ],
2142 reason="Please configure the 'retry' object instead",
2143 version="6.0.0",
2144 )
2145 def __init__(
2146 self,
2147 nodes_manager: "NodesManager",
2148 commands_parser: "CommandsParser",
2149 result_callbacks: Optional[Dict[str, Callable]] = None,
2150 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
2151 startup_nodes: Optional[List["ClusterNode"]] = None,
2152 read_from_replicas: bool = False,
2153 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2154 cluster_error_retry_attempts: int = 3,
2155 reinitialize_steps: int = 5,
2156 retry: Optional[Retry] = None,
2157 lock=None,
2158 transaction=False,
2159 **kwargs,
2160 ):
2161 """ """
2162 self.command_stack = []
2163 self.nodes_manager = nodes_manager
2164 self.commands_parser = commands_parser
2165 self.refresh_table_asap = False
2166 self.result_callbacks = (
2167 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
2168 )
2169 self.startup_nodes = startup_nodes if startup_nodes else []
2170 self.read_from_replicas = read_from_replicas
2171 self.load_balancing_strategy = load_balancing_strategy
2172 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
2173 self.cluster_response_callbacks = cluster_response_callbacks
2174 self.reinitialize_counter = 0
2175 self.reinitialize_steps = reinitialize_steps
2176 if retry is not None:
2177 self.retry = retry
2178 else:
2179 self.retry = Retry(
2180 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
2181 retries=cluster_error_retry_attempts,
2182 )
2184 self.encoder = Encoder(
2185 kwargs.get("encoding", "utf-8"),
2186 kwargs.get("encoding_errors", "strict"),
2187 kwargs.get("decode_responses", False),
2188 )
2189 if lock is None:
2190 lock = threading.RLock()
2191 self._lock = lock
2192 self.parent_execute_command = super().execute_command
2193 self._execution_strategy: ExecutionStrategy = (
2194 PipelineStrategy(self) if not transaction else TransactionStrategy(self)
2195 )
2197 def __repr__(self):
2198 """ """
2199 return f"{type(self).__name__}"
2201 def __enter__(self):
2202 """ """
2203 return self
2205 def __exit__(self, exc_type, exc_value, traceback):
2206 """ """
2207 self.reset()
2209 def __del__(self):
2210 try:
2211 self.reset()
2212 except Exception:
2213 pass
2215 def __len__(self):
2216 """ """
2217 return len(self._execution_strategy.command_queue)
2219 def __bool__(self):
2220 "Pipeline instances should always evaluate to True on Python 3+"
2221 return True
2223 def execute_command(self, *args, **kwargs):
2224 """
2225 Wrapper function for pipeline_execute_command
2226 """
2227 return self._execution_strategy.execute_command(*args, **kwargs)
2229 def pipeline_execute_command(self, *args, **options):
2230 """
2231 Stage a command to be executed when execute() is next called
2233 Returns the current Pipeline object back so commands can be
2234 chained together, such as:
2236 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
2238 At some other point, you can then run: pipe.execute(),
2239 which will execute all commands queued in the pipe.
2240 """
2241 return self._execution_strategy.execute_command(*args, **options)
2243 def annotate_exception(self, exception, number, command):
2244 """
2245 Provides extra context to the exception prior to it being handled
2246 """
2247 self._execution_strategy.annotate_exception(exception, number, command)
2249 def execute(self, raise_on_error: bool = True) -> List[Any]:
2250 """
2251 Execute all the commands in the current pipeline
2252 """
2254 try:
2255 return self._execution_strategy.execute(raise_on_error)
2256 finally:
2257 self.reset()
2259 def reset(self):
2260 """
2261 Reset back to empty pipeline.
2262 """
2263 self._execution_strategy.reset()
2265 def send_cluster_commands(
2266 self, stack, raise_on_error=True, allow_redirections=True
2267 ):
2268 return self._execution_strategy.send_cluster_commands(
2269 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
2270 )
2272 def exists(self, *keys):
2273 return self._execution_strategy.exists(*keys)
2275 def eval(self):
2276 """ """
2277 return self._execution_strategy.eval()
2279 def multi(self):
2280 """
2281 Start a transactional block of the pipeline after WATCH commands
2282 are issued. End the transactional block with `execute`.
2283 """
2284 self._execution_strategy.multi()
2286 def load_scripts(self):
2287 """ """
2288 self._execution_strategy.load_scripts()
2290 def discard(self):
2291 """ """
2292 self._execution_strategy.discard()
2294 def watch(self, *names):
2295 """Watches the values at keys ``names``"""
2296 self._execution_strategy.watch(*names)
2298 def unwatch(self):
2299 """Unwatches all previously specified keys"""
2300 self._execution_strategy.unwatch()
2302 def script_load_for_pipeline(self, *args, **kwargs):
2303 self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
2305 def delete(self, *names):
2306 self._execution_strategy.delete(*names)
2308 def unlink(self, *names):
2309 self._execution_strategy.unlink(*names)
2312def block_pipeline_command(name: str) -> Callable[..., Any]:
2313 """
2314 Prints error because some pipelined commands should
2315 be blocked when running in cluster-mode
2316 """
2318 def inner(*args, **kwargs):
2319 raise RedisClusterException(
2320 f"ERROR: Calling pipelined function {name} is blocked "
2321 f"when running redis in cluster mode..."
2322 )
2324 return inner
2327# Blocked pipeline commands
2328PIPELINE_BLOCKED_COMMANDS = (
2329 "BGREWRITEAOF",
2330 "BGSAVE",
2331 "BITOP",
2332 "BRPOPLPUSH",
2333 "CLIENT GETNAME",
2334 "CLIENT KILL",
2335 "CLIENT LIST",
2336 "CLIENT SETNAME",
2337 "CLIENT",
2338 "CONFIG GET",
2339 "CONFIG RESETSTAT",
2340 "CONFIG REWRITE",
2341 "CONFIG SET",
2342 "CONFIG",
2343 "DBSIZE",
2344 "ECHO",
2345 "EVALSHA",
2346 "FLUSHALL",
2347 "FLUSHDB",
2348 "INFO",
2349 "KEYS",
2350 "LASTSAVE",
2351 "MGET",
2352 "MGET NONATOMIC",
2353 "MOVE",
2354 "MSET",
2355 "MSET NONATOMIC",
2356 "MSETNX",
2357 "PFCOUNT",
2358 "PFMERGE",
2359 "PING",
2360 "PUBLISH",
2361 "RANDOMKEY",
2362 "READONLY",
2363 "READWRITE",
2364 "RENAME",
2365 "RENAMENX",
2366 "RPOPLPUSH",
2367 "SAVE",
2368 "SCAN",
2369 "SCRIPT EXISTS",
2370 "SCRIPT FLUSH",
2371 "SCRIPT KILL",
2372 "SCRIPT LOAD",
2373 "SCRIPT",
2374 "SDIFF",
2375 "SDIFFSTORE",
2376 "SENTINEL GET MASTER ADDR BY NAME",
2377 "SENTINEL MASTER",
2378 "SENTINEL MASTERS",
2379 "SENTINEL MONITOR",
2380 "SENTINEL REMOVE",
2381 "SENTINEL SENTINELS",
2382 "SENTINEL SET",
2383 "SENTINEL SLAVES",
2384 "SENTINEL",
2385 "SHUTDOWN",
2386 "SINTER",
2387 "SINTERSTORE",
2388 "SLAVEOF",
2389 "SLOWLOG GET",
2390 "SLOWLOG LEN",
2391 "SLOWLOG RESET",
2392 "SLOWLOG",
2393 "SMOVE",
2394 "SORT",
2395 "SUNION",
2396 "SUNIONSTORE",
2397 "TIME",
2398)
2399for command in PIPELINE_BLOCKED_COMMANDS:
2400 command = command.replace(" ", "_").lower()
2402 setattr(ClusterPipeline, command, block_pipeline_command(command))
2405class PipelineCommand:
2406 """ """
2408 def __init__(self, args, options=None, position=None):
2409 self.args = args
2410 if options is None:
2411 options = {}
2412 self.options = options
2413 self.position = position
2414 self.result = None
2415 self.node = None
2416 self.asking = False
2419class NodeCommands:
2420 """ """
2422 def __init__(self, parse_response, connection_pool, connection):
2423 """ """
2424 self.parse_response = parse_response
2425 self.connection_pool = connection_pool
2426 self.connection = connection
2427 self.commands = []
2429 def append(self, c):
2430 """ """
2431 self.commands.append(c)
2433 def write(self):
2434 """
2435 Code borrowed from Redis so it can be fixed
2436 """
2437 connection = self.connection
2438 commands = self.commands
2440 # We are going to clobber the commands with the write, so go ahead
2441 # and ensure that nothing is sitting there from a previous run.
2442 for c in commands:
2443 c.result = None
2445 # build up all commands into a single request to increase network perf
2446 # send all the commands and catch connection and timeout errors.
2447 try:
2448 connection.send_packed_command(
2449 connection.pack_commands([c.args for c in commands])
2450 )
2451 except (ConnectionError, TimeoutError) as e:
2452 for c in commands:
2453 c.result = e
2455 def read(self):
2456 """ """
2457 connection = self.connection
2458 for c in self.commands:
2459 # if there is a result on this command,
2460 # it means we ran into an exception
2461 # like a connection error. Trying to parse
2462 # a response on a connection that
2463 # is no longer open will result in a
2464 # connection error raised by redis-py.
2465 # but redis-py doesn't check in parse_response
2466 # that the sock object is
2467 # still set and if you try to
2468 # read from a closed connection, it will
2469 # result in an AttributeError because
2470 # it will do a readline() call on None.
2471 # This can have all kinds of nasty side-effects.
2472 # Treating this case as a connection error
2473 # is fine because it will dump
2474 # the connection object back into the
2475 # pool and on the next write, it will
2476 # explicitly open the connection and all will be well.
2477 if c.result is None:
2478 try:
2479 c.result = self.parse_response(connection, c.args[0], **c.options)
2480 except (ConnectionError, TimeoutError) as e:
2481 for c in self.commands:
2482 c.result = e
2483 return
2484 except RedisError:
2485 c.result = sys.exc_info()[1]
2488class ExecutionStrategy(ABC):
2489 @property
2490 @abstractmethod
2491 def command_queue(self):
2492 pass
2494 @abstractmethod
2495 def execute_command(self, *args, **kwargs):
2496 """
2497 Execution flow for current execution strategy.
2499 See: ClusterPipeline.execute_command()
2500 """
2501 pass
2503 @abstractmethod
2504 def annotate_exception(self, exception, number, command):
2505 """
2506 Annotate exception according to current execution strategy.
2508 See: ClusterPipeline.annotate_exception()
2509 """
2510 pass
2512 @abstractmethod
2513 def pipeline_execute_command(self, *args, **options):
2514 """
2515 Pipeline execution flow for current execution strategy.
2517 See: ClusterPipeline.pipeline_execute_command()
2518 """
2519 pass
2521 @abstractmethod
2522 def execute(self, raise_on_error: bool = True) -> List[Any]:
2523 """
2524 Executes current execution strategy.
2526 See: ClusterPipeline.execute()
2527 """
2528 pass
2530 @abstractmethod
2531 def send_cluster_commands(
2532 self, stack, raise_on_error=True, allow_redirections=True
2533 ):
2534 """
2535 Sends commands according to current execution strategy.
2537 See: ClusterPipeline.send_cluster_commands()
2538 """
2539 pass
2541 @abstractmethod
2542 def reset(self):
2543 """
2544 Resets current execution strategy.
2546 See: ClusterPipeline.reset()
2547 """
2548 pass
2550 @abstractmethod
2551 def exists(self, *keys):
2552 pass
2554 @abstractmethod
2555 def eval(self):
2556 pass
2558 @abstractmethod
2559 def multi(self):
2560 """
2561 Starts transactional context.
2563 See: ClusterPipeline.multi()
2564 """
2565 pass
2567 @abstractmethod
2568 def load_scripts(self):
2569 pass
2571 @abstractmethod
2572 def watch(self, *names):
2573 pass
2575 @abstractmethod
2576 def unwatch(self):
2577 """
2578 Unwatches all previously specified keys
2580 See: ClusterPipeline.unwatch()
2581 """
2582 pass
2584 @abstractmethod
2585 def script_load_for_pipeline(self, *args, **kwargs):
2586 pass
2588 @abstractmethod
2589 def delete(self, *names):
2590 """
2591 "Delete a key specified by ``names``"
2593 See: ClusterPipeline.delete()
2594 """
2595 pass
2597 @abstractmethod
2598 def unlink(self, *names):
2599 """
2600 "Unlink a key specified by ``names``"
2602 See: ClusterPipeline.unlink()
2603 """
2604 pass
2606 @abstractmethod
2607 def discard(self):
2608 pass
2611class AbstractStrategy(ExecutionStrategy):
2612 def __init__(
2613 self,
2614 pipe: ClusterPipeline,
2615 ):
2616 self._command_queue: List[PipelineCommand] = []
2617 self._pipe = pipe
2618 self._nodes_manager = self._pipe.nodes_manager
2620 @property
2621 def command_queue(self):
2622 return self._command_queue
2624 @command_queue.setter
2625 def command_queue(self, queue: List[PipelineCommand]):
2626 self._command_queue = queue
2628 @abstractmethod
2629 def execute_command(self, *args, **kwargs):
2630 pass
2632 def pipeline_execute_command(self, *args, **options):
2633 self._command_queue.append(
2634 PipelineCommand(args, options, len(self._command_queue))
2635 )
2636 return self._pipe
2638 @abstractmethod
2639 def execute(self, raise_on_error: bool = True) -> List[Any]:
2640 pass
2642 @abstractmethod
2643 def send_cluster_commands(
2644 self, stack, raise_on_error=True, allow_redirections=True
2645 ):
2646 pass
2648 @abstractmethod
2649 def reset(self):
2650 pass
2652 def exists(self, *keys):
2653 return self.execute_command("EXISTS", *keys)
2655 def eval(self):
2656 """ """
2657 raise RedisClusterException("method eval() is not implemented")
2659 def load_scripts(self):
2660 """ """
2661 raise RedisClusterException("method load_scripts() is not implemented")
2663 def script_load_for_pipeline(self, *args, **kwargs):
2664 """ """
2665 raise RedisClusterException(
2666 "method script_load_for_pipeline() is not implemented"
2667 )
2669 def annotate_exception(self, exception, number, command):
2670 """
2671 Provides extra context to the exception prior to it being handled
2672 """
2673 cmd = " ".join(map(safe_str, command))
2674 msg = (
2675 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
2676 f"caused error: {exception.args[0]}"
2677 )
2678 exception.args = (msg,) + exception.args[1:]
2681class PipelineStrategy(AbstractStrategy):
2682 def __init__(self, pipe: ClusterPipeline):
2683 super().__init__(pipe)
2684 self.command_flags = pipe.command_flags
2686 def execute_command(self, *args, **kwargs):
2687 return self.pipeline_execute_command(*args, **kwargs)
2689 def _raise_first_error(self, stack):
2690 """
2691 Raise the first exception on the stack
2692 """
2693 for c in stack:
2694 r = c.result
2695 if isinstance(r, Exception):
2696 self.annotate_exception(r, c.position + 1, c.args)
2697 raise r
2699 def execute(self, raise_on_error: bool = True) -> List[Any]:
2700 stack = self._command_queue
2701 if not stack:
2702 return []
2704 try:
2705 return self.send_cluster_commands(stack, raise_on_error)
2706 finally:
2707 self.reset()
2709 def reset(self):
2710 """
2711 Reset back to empty pipeline.
2712 """
2713 self._command_queue = []
2715 def send_cluster_commands(
2716 self, stack, raise_on_error=True, allow_redirections=True
2717 ):
2718 """
2719 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
2721 If one of the retryable exceptions has been thrown we assume that:
2722 - connection_pool was disconnected
2723 - connection_pool was reset
2724 - refresh_table_asap set to True
2726 It will try the number of times specified by
2727 the retries in config option "self.retry"
2728 which defaults to 3 unless manually configured.
2730 If it reaches the number of times, the command will
2731 raises ClusterDownException.
2732 """
2733 if not stack:
2734 return []
2735 retry_attempts = self._pipe.retry.get_retries()
2736 while True:
2737 try:
2738 return self._send_cluster_commands(
2739 stack,
2740 raise_on_error=raise_on_error,
2741 allow_redirections=allow_redirections,
2742 )
2743 except RedisCluster.ERRORS_ALLOW_RETRY as e:
2744 if retry_attempts > 0:
2745 # Try again with the new cluster setup. All other errors
2746 # should be raised.
2747 retry_attempts -= 1
2748 pass
2749 else:
2750 raise e
2752 def _send_cluster_commands(
2753 self, stack, raise_on_error=True, allow_redirections=True
2754 ):
2755 """
2756 Send a bunch of cluster commands to the redis cluster.
2758 `allow_redirections` If the pipeline should follow
2759 `ASK` & `MOVED` responses automatically. If set
2760 to false it will raise RedisClusterException.
2761 """
2762 # the first time sending the commands we send all of
2763 # the commands that were queued up.
2764 # if we have to run through it again, we only retry
2765 # the commands that failed.
2766 attempt = sorted(stack, key=lambda x: x.position)
2767 is_default_node = False
2768 # build a list of node objects based on node names we need to
2769 nodes = {}
2771 # as we move through each command that still needs to be processed,
2772 # we figure out the slot number that command maps to, then from
2773 # the slot determine the node.
2774 for c in attempt:
2775 while True:
2776 # refer to our internal node -> slot table that
2777 # tells us where a given command should route to.
2778 # (it might be possible we have a cached node that no longer
2779 # exists in the cluster, which is why we do this in a loop)
2780 passed_targets = c.options.pop("target_nodes", None)
2781 if passed_targets and not self._is_nodes_flag(passed_targets):
2782 target_nodes = self._parse_target_nodes(passed_targets)
2783 else:
2784 target_nodes = self._determine_nodes(
2785 *c.args, node_flag=passed_targets
2786 )
2787 if not target_nodes:
2788 raise RedisClusterException(
2789 f"No targets were found to execute {c.args} command on"
2790 )
2791 if len(target_nodes) > 1:
2792 raise RedisClusterException(
2793 f"Too many targets for command {c.args}"
2794 )
2796 node = target_nodes[0]
2797 if node == self._pipe.get_default_node():
2798 is_default_node = True
2800 # now that we know the name of the node
2801 # ( it's just a string in the form of host:port )
2802 # we can build a list of commands for each node.
2803 node_name = node.name
2804 if node_name not in nodes:
2805 redis_node = self._pipe.get_redis_connection(node)
2806 try:
2807 connection = get_connection(redis_node)
2808 except (ConnectionError, TimeoutError):
2809 for n in nodes.values():
2810 n.connection_pool.release(n.connection)
2811 # Connection retries are being handled in the node's
2812 # Retry object. Reinitialize the node -> slot table.
2813 self._nodes_manager.initialize()
2814 if is_default_node:
2815 self._pipe.replace_default_node()
2816 raise
2817 nodes[node_name] = NodeCommands(
2818 redis_node.parse_response,
2819 redis_node.connection_pool,
2820 connection,
2821 )
2822 nodes[node_name].append(c)
2823 break
2825 # send the commands in sequence.
2826 # we write to all the open sockets for each node first,
2827 # before reading anything
2828 # this allows us to flush all the requests out across the
2829 # network
2830 # so that we can read them from different sockets as they come back.
2831 # we dont' multiplex on the sockets as they come available,
2832 # but that shouldn't make too much difference.
2833 try:
2834 node_commands = nodes.values()
2835 for n in node_commands:
2836 n.write()
2838 for n in node_commands:
2839 n.read()
2840 finally:
2841 # release all of the redis connections we allocated earlier
2842 # back into the connection pool.
2843 # we used to do this step as part of a try/finally block,
2844 # but it is really dangerous to
2845 # release connections back into the pool if for some
2846 # reason the socket has data still left in it
2847 # from a previous operation. The write and
2848 # read operations already have try/catch around them for
2849 # all known types of errors including connection
2850 # and socket level errors.
2851 # So if we hit an exception, something really bad
2852 # happened and putting any oF
2853 # these connections back into the pool is a very bad idea.
2854 # the socket might have unread buffer still sitting in it,
2855 # and then the next time we read from it we pass the
2856 # buffered result back from a previous command and
2857 # every single request after to that connection will always get
2858 # a mismatched result.
2859 for n in nodes.values():
2860 n.connection_pool.release(n.connection)
2862 # if the response isn't an exception it is a
2863 # valid response from the node
2864 # we're all done with that command, YAY!
2865 # if we have more commands to attempt, we've run into problems.
2866 # collect all the commands we are allowed to retry.
2867 # (MOVED, ASK, or connection errors or timeout errors)
2868 attempt = sorted(
2869 (
2870 c
2871 for c in attempt
2872 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2873 ),
2874 key=lambda x: x.position,
2875 )
2876 if attempt and allow_redirections:
2877 # RETRY MAGIC HAPPENS HERE!
2878 # send these remaining commands one at a time using `execute_command`
2879 # in the main client. This keeps our retry logic
2880 # in one place mostly,
2881 # and allows us to be more confident in correctness of behavior.
2882 # at this point any speed gains from pipelining have been lost
2883 # anyway, so we might as well make the best
2884 # attempt to get the correct behavior.
2885 #
2886 # The client command will handle retries for each
2887 # individual command sequentially as we pass each
2888 # one into `execute_command`. Any exceptions
2889 # that bubble out should only appear once all
2890 # retries have been exhausted.
2891 #
2892 # If a lot of commands have failed, we'll be setting the
2893 # flag to rebuild the slots table from scratch.
2894 # So MOVED errors should correct themselves fairly quickly.
2895 self._pipe.reinitialize_counter += 1
2896 if self._pipe._should_reinitialized():
2897 self._nodes_manager.initialize()
2898 if is_default_node:
2899 self._pipe.replace_default_node()
2900 for c in attempt:
2901 try:
2902 # send each command individually like we
2903 # do in the main client.
2904 c.result = self._pipe.parent_execute_command(*c.args, **c.options)
2905 except RedisError as e:
2906 c.result = e
2908 # turn the response back into a simple flat array that corresponds
2909 # to the sequence of commands issued in the stack in pipeline.execute()
2910 response = []
2911 for c in sorted(stack, key=lambda x: x.position):
2912 if c.args[0] in self._pipe.cluster_response_callbacks:
2913 # Remove keys entry, it needs only for cache.
2914 c.options.pop("keys", None)
2915 c.result = self._pipe.cluster_response_callbacks[c.args[0]](
2916 c.result, **c.options
2917 )
2918 response.append(c.result)
2920 if raise_on_error:
2921 self._raise_first_error(stack)
2923 return response
2925 def _is_nodes_flag(self, target_nodes):
2926 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
2928 def _parse_target_nodes(self, target_nodes):
2929 if isinstance(target_nodes, list):
2930 nodes = target_nodes
2931 elif isinstance(target_nodes, ClusterNode):
2932 # Supports passing a single ClusterNode as a variable
2933 nodes = [target_nodes]
2934 elif isinstance(target_nodes, dict):
2935 # Supports dictionaries of the format {node_name: node}.
2936 # It enables to execute commands with multi nodes as follows:
2937 # rc.cluster_save_config(rc.get_primaries())
2938 nodes = target_nodes.values()
2939 else:
2940 raise TypeError(
2941 "target_nodes type can be one of the following: "
2942 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
2943 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
2944 f"The passed type is {type(target_nodes)}"
2945 )
2946 return nodes
2948 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
2949 # Determine which nodes should be executed the command on.
2950 # Returns a list of target nodes.
2951 command = args[0].upper()
2952 if (
2953 len(args) >= 2
2954 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
2955 ):
2956 command = f"{args[0]} {args[1]}".upper()
2958 nodes_flag = kwargs.pop("nodes_flag", None)
2959 if nodes_flag is not None:
2960 # nodes flag passed by the user
2961 command_flag = nodes_flag
2962 else:
2963 # get the nodes group for this command if it was predefined
2964 command_flag = self._pipe.command_flags.get(command)
2965 if command_flag == self._pipe.RANDOM:
2966 # return a random node
2967 return [self._pipe.get_random_node()]
2968 elif command_flag == self._pipe.PRIMARIES:
2969 # return all primaries
2970 return self._pipe.get_primaries()
2971 elif command_flag == self._pipe.REPLICAS:
2972 # return all replicas
2973 return self._pipe.get_replicas()
2974 elif command_flag == self._pipe.ALL_NODES:
2975 # return all nodes
2976 return self._pipe.get_nodes()
2977 elif command_flag == self._pipe.DEFAULT_NODE:
2978 # return the cluster's default node
2979 return [self._nodes_manager.default_node]
2980 elif command in self._pipe.SEARCH_COMMANDS[0]:
2981 return [self._nodes_manager.default_node]
2982 else:
2983 # get the node that holds the key's slot
2984 slot = self._pipe.determine_slot(*args)
2985 node = self._nodes_manager.get_node_from_slot(
2986 slot,
2987 self._pipe.read_from_replicas and command in READ_COMMANDS,
2988 self._pipe.load_balancing_strategy
2989 if command in READ_COMMANDS
2990 else None,
2991 )
2992 return [node]
2994 def multi(self):
2995 raise RedisClusterException(
2996 "method multi() is not supported outside of transactional context"
2997 )
2999 def discard(self):
3000 raise RedisClusterException(
3001 "method discard() is not supported outside of transactional context"
3002 )
3004 def watch(self, *names):
3005 raise RedisClusterException(
3006 "method watch() is not supported outside of transactional context"
3007 )
3009 def unwatch(self, *names):
3010 raise RedisClusterException(
3011 "method unwatch() is not supported outside of transactional context"
3012 )
3014 def delete(self, *names):
3015 if len(names) != 1:
3016 raise RedisClusterException(
3017 "deleting multiple keys is not implemented in pipeline command"
3018 )
3020 return self.execute_command("DEL", names[0])
3022 def unlink(self, *names):
3023 if len(names) != 1:
3024 raise RedisClusterException(
3025 "unlinking multiple keys is not implemented in pipeline command"
3026 )
3028 return self.execute_command("UNLINK", names[0])
3031class TransactionStrategy(AbstractStrategy):
3032 NO_SLOTS_COMMANDS = {"UNWATCH"}
3033 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
3034 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3035 SLOT_REDIRECT_ERRORS = (AskError, MovedError)
3036 CONNECTION_ERRORS = (
3037 ConnectionError,
3038 OSError,
3039 ClusterDownError,
3040 SlotNotCoveredError,
3041 )
3043 def __init__(self, pipe: ClusterPipeline):
3044 super().__init__(pipe)
3045 self._explicit_transaction = False
3046 self._watching = False
3047 self._pipeline_slots: Set[int] = set()
3048 self._transaction_connection: Optional[Connection] = None
3049 self._executing = False
3050 self._retry = copy(self._pipe.retry)
3051 self._retry.update_supported_errors(
3052 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
3053 )
3055 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
3056 """
3057 Find a connection for a pipeline transaction.
3059 For running an atomic transaction, watch keys ensure that contents have not been
3060 altered as long as the watch commands for those keys were sent over the same
3061 connection. So once we start watching a key, we fetch a connection to the
3062 node that owns that slot and reuse it.
3063 """
3064 if not self._pipeline_slots:
3065 raise RedisClusterException(
3066 "At least a command with a key is needed to identify a node"
3067 )
3069 node: ClusterNode = self._nodes_manager.get_node_from_slot(
3070 list(self._pipeline_slots)[0], False
3071 )
3072 redis_node: Redis = self._pipe.get_redis_connection(node)
3073 if self._transaction_connection:
3074 if not redis_node.connection_pool.owns_connection(
3075 self._transaction_connection
3076 ):
3077 previous_node = self._nodes_manager.find_connection_owner(
3078 self._transaction_connection
3079 )
3080 previous_node.connection_pool.release(self._transaction_connection)
3081 self._transaction_connection = None
3083 if not self._transaction_connection:
3084 self._transaction_connection = get_connection(redis_node)
3086 return redis_node, self._transaction_connection
3088 def execute_command(self, *args, **kwargs):
3089 slot_number: Optional[int] = None
3090 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
3091 slot_number = self._pipe.determine_slot(*args)
3093 if (
3094 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
3095 ) and not self._explicit_transaction:
3096 if args[0] == "WATCH":
3097 self._validate_watch()
3099 if slot_number is not None:
3100 if self._pipeline_slots and slot_number not in self._pipeline_slots:
3101 raise CrossSlotTransactionError(
3102 "Cannot watch or send commands on different slots"
3103 )
3105 self._pipeline_slots.add(slot_number)
3106 elif args[0] not in self.NO_SLOTS_COMMANDS:
3107 raise RedisClusterException(
3108 f"Cannot identify slot number for command: {args[0]},"
3109 "it cannot be triggered in a transaction"
3110 )
3112 return self._immediate_execute_command(*args, **kwargs)
3113 else:
3114 if slot_number is not None:
3115 self._pipeline_slots.add(slot_number)
3117 return self.pipeline_execute_command(*args, **kwargs)
3119 def _validate_watch(self):
3120 if self._explicit_transaction:
3121 raise RedisError("Cannot issue a WATCH after a MULTI")
3123 self._watching = True
3125 def _immediate_execute_command(self, *args, **options):
3126 return self._retry.call_with_retry(
3127 lambda: self._get_connection_and_send_command(*args, **options),
3128 self._reinitialize_on_error,
3129 )
3131 def _get_connection_and_send_command(self, *args, **options):
3132 redis_node, connection = self._get_client_and_connection_for_transaction()
3133 return self._send_command_parse_response(
3134 connection, redis_node, args[0], *args, **options
3135 )
3137 def _send_command_parse_response(
3138 self, conn, redis_node: Redis, command_name, *args, **options
3139 ):
3140 """
3141 Send a command and parse the response
3142 """
3144 conn.send_command(*args)
3145 output = redis_node.parse_response(conn, command_name, **options)
3147 if command_name in self.UNWATCH_COMMANDS:
3148 self._watching = False
3149 return output
3151 def _reinitialize_on_error(self, error):
3152 if self._watching:
3153 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
3154 raise WatchError("Slot rebalancing occurred while watching keys")
3156 if (
3157 type(error) in self.SLOT_REDIRECT_ERRORS
3158 or type(error) in self.CONNECTION_ERRORS
3159 ):
3160 if self._transaction_connection:
3161 self._transaction_connection = None
3163 self._pipe.reinitialize_counter += 1
3164 if self._pipe._should_reinitialized():
3165 self._nodes_manager.initialize()
3166 self.reinitialize_counter = 0
3167 else:
3168 if isinstance(error, AskError):
3169 self._nodes_manager.update_moved_exception(error)
3171 self._executing = False
3173 def _raise_first_error(self, responses, stack):
3174 """
3175 Raise the first exception on the stack
3176 """
3177 for r, cmd in zip(responses, stack):
3178 if isinstance(r, Exception):
3179 self.annotate_exception(r, cmd.position + 1, cmd.args)
3180 raise r
3182 def execute(self, raise_on_error: bool = True) -> List[Any]:
3183 stack = self._command_queue
3184 if not stack and (not self._watching or not self._pipeline_slots):
3185 return []
3187 return self._execute_transaction_with_retries(stack, raise_on_error)
3189 def _execute_transaction_with_retries(
3190 self, stack: List["PipelineCommand"], raise_on_error: bool
3191 ):
3192 return self._retry.call_with_retry(
3193 lambda: self._execute_transaction(stack, raise_on_error),
3194 self._reinitialize_on_error,
3195 )
3197 def _execute_transaction(
3198 self, stack: List["PipelineCommand"], raise_on_error: bool
3199 ):
3200 if len(self._pipeline_slots) > 1:
3201 raise CrossSlotTransactionError(
3202 "All keys involved in a cluster transaction must map to the same slot"
3203 )
3205 self._executing = True
3207 redis_node, connection = self._get_client_and_connection_for_transaction()
3209 stack = chain(
3210 [PipelineCommand(("MULTI",))],
3211 stack,
3212 [PipelineCommand(("EXEC",))],
3213 )
3214 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
3215 packed_commands = connection.pack_commands(commands)
3216 connection.send_packed_command(packed_commands)
3217 errors = []
3219 # parse off the response for MULTI
3220 # NOTE: we need to handle ResponseErrors here and continue
3221 # so that we read all the additional command messages from
3222 # the socket
3223 try:
3224 redis_node.parse_response(connection, "MULTI")
3225 except ResponseError as e:
3226 self.annotate_exception(e, 0, "MULTI")
3227 errors.append(e)
3228 except self.CONNECTION_ERRORS as cluster_error:
3229 self.annotate_exception(cluster_error, 0, "MULTI")
3230 raise
3232 # and all the other commands
3233 for i, command in enumerate(self._command_queue):
3234 if EMPTY_RESPONSE in command.options:
3235 errors.append((i, command.options[EMPTY_RESPONSE]))
3236 else:
3237 try:
3238 _ = redis_node.parse_response(connection, "_")
3239 except self.SLOT_REDIRECT_ERRORS as slot_error:
3240 self.annotate_exception(slot_error, i + 1, command.args)
3241 errors.append(slot_error)
3242 except self.CONNECTION_ERRORS as cluster_error:
3243 self.annotate_exception(cluster_error, i + 1, command.args)
3244 raise
3245 except ResponseError as e:
3246 self.annotate_exception(e, i + 1, command.args)
3247 errors.append(e)
3249 response = None
3250 # parse the EXEC.
3251 try:
3252 response = redis_node.parse_response(connection, "EXEC")
3253 except ExecAbortError:
3254 if errors:
3255 raise errors[0]
3256 raise
3258 self._executing = False
3260 # EXEC clears any watched keys
3261 self._watching = False
3263 if response is None:
3264 raise WatchError("Watched variable changed.")
3266 # put any parse errors into the response
3267 for i, e in errors:
3268 response.insert(i, e)
3270 if len(response) != len(self._command_queue):
3271 raise InvalidPipelineStack(
3272 "Unexpected response length for cluster pipeline EXEC."
3273 " Command stack was {} but response had length {}".format(
3274 [c.args[0] for c in self._command_queue], len(response)
3275 )
3276 )
3278 # find any errors in the response and raise if necessary
3279 if raise_on_error or len(errors) > 0:
3280 self._raise_first_error(
3281 response,
3282 self._command_queue,
3283 )
3285 # We have to run response callbacks manually
3286 data = []
3287 for r, cmd in zip(response, self._command_queue):
3288 if not isinstance(r, Exception):
3289 command_name = cmd.args[0]
3290 if command_name in self._pipe.cluster_response_callbacks:
3291 r = self._pipe.cluster_response_callbacks[command_name](
3292 r, **cmd.options
3293 )
3294 data.append(r)
3295 return data
3297 def reset(self):
3298 self._command_queue = []
3300 # make sure to reset the connection state in the event that we were
3301 # watching something
3302 if self._transaction_connection:
3303 try:
3304 if self._watching:
3305 # call this manually since our unwatch or
3306 # immediate_execute_command methods can call reset()
3307 self._transaction_connection.send_command("UNWATCH")
3308 self._transaction_connection.read_response()
3309 # we can safely return the connection to the pool here since we're
3310 # sure we're no longer WATCHing anything
3311 node = self._nodes_manager.find_connection_owner(
3312 self._transaction_connection
3313 )
3314 node.redis_connection.connection_pool.release(
3315 self._transaction_connection
3316 )
3317 self._transaction_connection = None
3318 except self.CONNECTION_ERRORS:
3319 # disconnect will also remove any previous WATCHes
3320 if self._transaction_connection:
3321 self._transaction_connection.disconnect()
3323 # clean up the other instance attributes
3324 self._watching = False
3325 self._explicit_transaction = False
3326 self._pipeline_slots = set()
3327 self._executing = False
3329 def send_cluster_commands(
3330 self, stack, raise_on_error=True, allow_redirections=True
3331 ):
3332 raise NotImplementedError(
3333 "send_cluster_commands cannot be executed in transactional context."
3334 )
3336 def multi(self):
3337 if self._explicit_transaction:
3338 raise RedisError("Cannot issue nested calls to MULTI")
3339 if self._command_queue:
3340 raise RedisError(
3341 "Commands without an initial WATCH have already been issued"
3342 )
3343 self._explicit_transaction = True
3345 def watch(self, *names):
3346 if self._explicit_transaction:
3347 raise RedisError("Cannot issue a WATCH after a MULTI")
3349 return self.execute_command("WATCH", *names)
3351 def unwatch(self):
3352 if self._watching:
3353 return self.execute_command("UNWATCH")
3355 return True
3357 def discard(self):
3358 self.reset()
3360 def delete(self, *names):
3361 return self.execute_command("DEL", *names)
3363 def unlink(self, *names):
3364 return self.execute_command("UNLINK", *names)