Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import random
2import socket
3import sys
4import threading
5import time
6from abc import ABC, abstractmethod
7from collections import OrderedDict
8from copy import copy
9from enum import Enum
10from itertools import chain
11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
13from redis._parsers import CommandsParser, Encoder
14from redis._parsers.helpers import parse_scan
15from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
16from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
17from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
18from redis.commands import READ_COMMANDS, RedisClusterCommands
19from redis.commands.helpers import list_or_args
20from redis.connection import (
21 Connection,
22 ConnectionPool,
23 parse_url,
24)
25from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
26from redis.event import (
27 AfterPooledConnectionsInstantiationEvent,
28 AfterPubSubConnectionInstantiationEvent,
29 ClientType,
30 EventDispatcher,
31)
32from redis.exceptions import (
33 AskError,
34 AuthenticationError,
35 ClusterDownError,
36 ClusterError,
37 ConnectionError,
38 CrossSlotTransactionError,
39 DataError,
40 ExecAbortError,
41 InvalidPipelineStack,
42 MaxConnectionsError,
43 MovedError,
44 RedisClusterException,
45 RedisError,
46 ResponseError,
47 SlotNotCoveredError,
48 TimeoutError,
49 TryAgainError,
50 WatchError,
51)
52from redis.lock import Lock
53from redis.retry import Retry
54from redis.utils import (
55 deprecated_args,
56 dict_merge,
57 list_keys_to_dict,
58 merge_result,
59 safe_str,
60 str_if_bytes,
61 truncate_text,
62)
65def get_node_name(host: str, port: Union[str, int]) -> str:
66 return f"{host}:{port}"
69@deprecated_args(
70 allowed_args=["redis_node"],
71 reason="Use get_connection(redis_node) instead",
72 version="5.3.0",
73)
74def get_connection(redis_node: Redis, *args, **options) -> Connection:
75 return redis_node.connection or redis_node.connection_pool.get_connection()
78def parse_scan_result(command, res, **options):
79 cursors = {}
80 ret = []
81 for node_name, response in res.items():
82 cursor, r = parse_scan(response, **options)
83 cursors[node_name] = cursor
84 ret += r
86 return cursors, ret
89def parse_pubsub_numsub(command, res, **options):
90 numsub_d = OrderedDict()
91 for numsub_tups in res.values():
92 for channel, numsubbed in numsub_tups:
93 try:
94 numsub_d[channel] += numsubbed
95 except KeyError:
96 numsub_d[channel] = numsubbed
98 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
99 return ret_numsub
102def parse_cluster_slots(
103 resp: Any, **options: Any
104) -> Dict[Tuple[int, int], Dict[str, Any]]:
105 current_host = options.get("current_host", "")
107 def fix_server(*args: Any) -> Tuple[str, Any]:
108 return str_if_bytes(args[0]) or current_host, args[1]
110 slots = {}
111 for slot in resp:
112 start, end, primary = slot[:3]
113 replicas = slot[3:]
114 slots[start, end] = {
115 "primary": fix_server(*primary),
116 "replicas": [fix_server(*replica) for replica in replicas],
117 }
119 return slots
122def parse_cluster_shards(resp, **options):
123 """
124 Parse CLUSTER SHARDS response.
125 """
126 if isinstance(resp[0], dict):
127 return resp
128 shards = []
129 for x in resp:
130 shard = {"slots": [], "nodes": []}
131 for i in range(0, len(x[1]), 2):
132 shard["slots"].append((x[1][i], (x[1][i + 1])))
133 nodes = x[3]
134 for node in nodes:
135 dict_node = {}
136 for i in range(0, len(node), 2):
137 dict_node[node[i]] = node[i + 1]
138 shard["nodes"].append(dict_node)
139 shards.append(shard)
141 return shards
144def parse_cluster_myshardid(resp, **options):
145 """
146 Parse CLUSTER MYSHARDID response.
147 """
148 return resp.decode("utf-8")
151PRIMARY = "primary"
152REPLICA = "replica"
153SLOT_ID = "slot-id"
155REDIS_ALLOWED_KEYS = (
156 "connection_class",
157 "connection_pool",
158 "connection_pool_class",
159 "client_name",
160 "credential_provider",
161 "db",
162 "decode_responses",
163 "encoding",
164 "encoding_errors",
165 "host",
166 "lib_name",
167 "lib_version",
168 "max_connections",
169 "nodes_flag",
170 "redis_connect_func",
171 "password",
172 "port",
173 "timeout",
174 "queue_class",
175 "retry",
176 "retry_on_timeout",
177 "protocol",
178 "socket_connect_timeout",
179 "socket_keepalive",
180 "socket_keepalive_options",
181 "socket_timeout",
182 "ssl",
183 "ssl_ca_certs",
184 "ssl_ca_data",
185 "ssl_certfile",
186 "ssl_cert_reqs",
187 "ssl_keyfile",
188 "ssl_password",
189 "ssl_check_hostname",
190 "unix_socket_path",
191 "username",
192 "cache",
193 "cache_config",
194)
195KWARGS_DISABLED_KEYS = ("host", "port", "retry")
198def cleanup_kwargs(**kwargs):
199 """
200 Remove unsupported or disabled keys from kwargs
201 """
202 connection_kwargs = {
203 k: v
204 for k, v in kwargs.items()
205 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
206 }
208 return connection_kwargs
211class AbstractRedisCluster:
212 RedisClusterRequestTTL = 16
214 PRIMARIES = "primaries"
215 REPLICAS = "replicas"
216 ALL_NODES = "all"
217 RANDOM = "random"
218 DEFAULT_NODE = "default-node"
220 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
222 COMMAND_FLAGS = dict_merge(
223 list_keys_to_dict(
224 [
225 "ACL CAT",
226 "ACL DELUSER",
227 "ACL DRYRUN",
228 "ACL GENPASS",
229 "ACL GETUSER",
230 "ACL HELP",
231 "ACL LIST",
232 "ACL LOG",
233 "ACL LOAD",
234 "ACL SAVE",
235 "ACL SETUSER",
236 "ACL USERS",
237 "ACL WHOAMI",
238 "AUTH",
239 "CLIENT LIST",
240 "CLIENT SETINFO",
241 "CLIENT SETNAME",
242 "CLIENT GETNAME",
243 "CONFIG SET",
244 "CONFIG REWRITE",
245 "CONFIG RESETSTAT",
246 "TIME",
247 "PUBSUB CHANNELS",
248 "PUBSUB NUMPAT",
249 "PUBSUB NUMSUB",
250 "PUBSUB SHARDCHANNELS",
251 "PUBSUB SHARDNUMSUB",
252 "PING",
253 "INFO",
254 "SHUTDOWN",
255 "KEYS",
256 "DBSIZE",
257 "BGSAVE",
258 "SLOWLOG GET",
259 "SLOWLOG LEN",
260 "SLOWLOG RESET",
261 "WAIT",
262 "WAITAOF",
263 "SAVE",
264 "MEMORY PURGE",
265 "MEMORY MALLOC-STATS",
266 "MEMORY STATS",
267 "LASTSAVE",
268 "CLIENT TRACKINGINFO",
269 "CLIENT PAUSE",
270 "CLIENT UNPAUSE",
271 "CLIENT UNBLOCK",
272 "CLIENT ID",
273 "CLIENT REPLY",
274 "CLIENT GETREDIR",
275 "CLIENT INFO",
276 "CLIENT KILL",
277 "READONLY",
278 "CLUSTER INFO",
279 "CLUSTER MEET",
280 "CLUSTER MYSHARDID",
281 "CLUSTER NODES",
282 "CLUSTER REPLICAS",
283 "CLUSTER RESET",
284 "CLUSTER SET-CONFIG-EPOCH",
285 "CLUSTER SLOTS",
286 "CLUSTER SHARDS",
287 "CLUSTER COUNT-FAILURE-REPORTS",
288 "CLUSTER KEYSLOT",
289 "COMMAND",
290 "COMMAND COUNT",
291 "COMMAND LIST",
292 "COMMAND GETKEYS",
293 "CONFIG GET",
294 "DEBUG",
295 "RANDOMKEY",
296 "READONLY",
297 "READWRITE",
298 "TIME",
299 "TFUNCTION LOAD",
300 "TFUNCTION DELETE",
301 "TFUNCTION LIST",
302 "TFCALL",
303 "TFCALLASYNC",
304 "LATENCY HISTORY",
305 "LATENCY LATEST",
306 "LATENCY RESET",
307 "MODULE LIST",
308 "MODULE LOAD",
309 "MODULE UNLOAD",
310 "MODULE LOADEX",
311 ],
312 DEFAULT_NODE,
313 ),
314 list_keys_to_dict(
315 [
316 "FLUSHALL",
317 "FLUSHDB",
318 "FUNCTION DELETE",
319 "FUNCTION FLUSH",
320 "FUNCTION LIST",
321 "FUNCTION LOAD",
322 "FUNCTION RESTORE",
323 "SCAN",
324 "SCRIPT EXISTS",
325 "SCRIPT FLUSH",
326 "SCRIPT LOAD",
327 ],
328 PRIMARIES,
329 ),
330 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
331 list_keys_to_dict(
332 [
333 "CLUSTER COUNTKEYSINSLOT",
334 "CLUSTER DELSLOTS",
335 "CLUSTER DELSLOTSRANGE",
336 "CLUSTER GETKEYSINSLOT",
337 "CLUSTER SETSLOT",
338 ],
339 SLOT_ID,
340 ),
341 )
343 SEARCH_COMMANDS = (
344 [
345 "FT.CREATE",
346 "FT.SEARCH",
347 "FT.AGGREGATE",
348 "FT.EXPLAIN",
349 "FT.EXPLAINCLI",
350 "FT,PROFILE",
351 "FT.ALTER",
352 "FT.DROPINDEX",
353 "FT.ALIASADD",
354 "FT.ALIASUPDATE",
355 "FT.ALIASDEL",
356 "FT.TAGVALS",
357 "FT.SUGADD",
358 "FT.SUGGET",
359 "FT.SUGDEL",
360 "FT.SUGLEN",
361 "FT.SYNUPDATE",
362 "FT.SYNDUMP",
363 "FT.SPELLCHECK",
364 "FT.DICTADD",
365 "FT.DICTDEL",
366 "FT.DICTDUMP",
367 "FT.INFO",
368 "FT._LIST",
369 "FT.CONFIG",
370 "FT.ADD",
371 "FT.DEL",
372 "FT.DROP",
373 "FT.GET",
374 "FT.MGET",
375 "FT.SYNADD",
376 ],
377 )
379 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
380 "CLUSTER SLOTS": parse_cluster_slots,
381 "CLUSTER SHARDS": parse_cluster_shards,
382 "CLUSTER MYSHARDID": parse_cluster_myshardid,
383 }
385 RESULT_CALLBACKS = dict_merge(
386 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
387 list_keys_to_dict(
388 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
389 ),
390 list_keys_to_dict(
391 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
392 ),
393 list_keys_to_dict(
394 [
395 "PING",
396 "CONFIG SET",
397 "CONFIG REWRITE",
398 "CONFIG RESETSTAT",
399 "CLIENT SETNAME",
400 "BGSAVE",
401 "SLOWLOG RESET",
402 "SAVE",
403 "MEMORY PURGE",
404 "CLIENT PAUSE",
405 "CLIENT UNPAUSE",
406 ],
407 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
408 ),
409 list_keys_to_dict(
410 ["DBSIZE", "WAIT"],
411 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
412 ),
413 list_keys_to_dict(
414 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
415 ),
416 list_keys_to_dict(["SCAN"], parse_scan_result),
417 list_keys_to_dict(
418 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
419 ),
420 list_keys_to_dict(
421 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
422 ),
423 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
424 )
426 ERRORS_ALLOW_RETRY = (
427 ConnectionError,
428 TimeoutError,
429 ClusterDownError,
430 SlotNotCoveredError,
431 )
433 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
434 """Replace the default cluster node.
435 A random cluster node will be chosen if target_node isn't passed, and primaries
436 will be prioritized. The default node will not be changed if there are no other
437 nodes in the cluster.
439 Args:
440 target_node (ClusterNode, optional): Target node to replace the default
441 node. Defaults to None.
442 """
443 if target_node:
444 self.nodes_manager.default_node = target_node
445 else:
446 curr_node = self.get_default_node()
447 primaries = [node for node in self.get_primaries() if node != curr_node]
448 if primaries:
449 # Choose a primary if the cluster contains different primaries
450 self.nodes_manager.default_node = random.choice(primaries)
451 else:
452 # Otherwise, choose a primary if the cluster contains different primaries
453 replicas = [node for node in self.get_replicas() if node != curr_node]
454 if replicas:
455 self.nodes_manager.default_node = random.choice(replicas)
458class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
459 @classmethod
460 def from_url(cls, url, **kwargs):
461 """
462 Return a Redis client object configured from the given URL
464 For example::
466 redis://[[username]:[password]]@localhost:6379/0
467 rediss://[[username]:[password]]@localhost:6379/0
468 unix://[username@]/path/to/socket.sock?db=0[&password=password]
470 Three URL schemes are supported:
472 - `redis://` creates a TCP socket connection. See more at:
473 <https://www.iana.org/assignments/uri-schemes/prov/redis>
474 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
475 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
476 - ``unix://``: creates a Unix Domain Socket connection.
478 The username, password, hostname, path and all querystring values
479 are passed through urllib.parse.unquote in order to replace any
480 percent-encoded values with their corresponding characters.
482 There are several ways to specify a database number. The first value
483 found will be used:
485 1. A ``db`` querystring option, e.g. redis://localhost?db=0
486 2. If using the redis:// or rediss:// schemes, the path argument
487 of the url, e.g. redis://localhost/0
488 3. A ``db`` keyword argument to this function.
490 If none of these options are specified, the default db=0 is used.
492 All querystring options are cast to their appropriate Python types.
493 Boolean arguments can be specified with string values "True"/"False"
494 or "Yes"/"No". Values that cannot be properly cast cause a
495 ``ValueError`` to be raised. Once parsed, the querystring arguments
496 and keyword arguments are passed to the ``ConnectionPool``'s
497 class initializer. In the case of conflicting arguments, querystring
498 arguments always win.
500 """
501 return cls(url=url, **kwargs)
503 @deprecated_args(
504 args_to_warn=["read_from_replicas"],
505 reason="Please configure the 'load_balancing_strategy' instead",
506 version="5.3.0",
507 )
508 @deprecated_args(
509 args_to_warn=[
510 "cluster_error_retry_attempts",
511 ],
512 reason="Please configure the 'retry' object instead",
513 version="6.0.0",
514 )
515 def __init__(
516 self,
517 host: Optional[str] = None,
518 port: int = 6379,
519 startup_nodes: Optional[List["ClusterNode"]] = None,
520 cluster_error_retry_attempts: int = 3,
521 retry: Optional["Retry"] = None,
522 require_full_coverage: bool = True,
523 reinitialize_steps: int = 5,
524 read_from_replicas: bool = False,
525 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
526 dynamic_startup_nodes: bool = True,
527 url: Optional[str] = None,
528 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
529 cache: Optional[CacheInterface] = None,
530 cache_config: Optional[CacheConfig] = None,
531 event_dispatcher: Optional[EventDispatcher] = None,
532 **kwargs,
533 ):
534 """
535 Initialize a new RedisCluster client.
537 :param startup_nodes:
538 List of nodes from which initial bootstrapping can be done
539 :param host:
540 Can be used to point to a startup node
541 :param port:
542 Can be used to point to a startup node
543 :param require_full_coverage:
544 When set to False (default value): the client will not require a
545 full coverage of the slots. However, if not all slots are covered,
546 and at least one node has 'cluster-require-full-coverage' set to
547 'yes,' the server will throw a ClusterDownError for some key-based
548 commands. See -
549 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
550 When set to True: all slots must be covered to construct the
551 cluster client. If not all slots are covered, RedisClusterException
552 will be thrown.
553 :param read_from_replicas:
554 @deprecated - please use load_balancing_strategy instead
555 Enable read from replicas in READONLY mode. You can read possibly
556 stale data.
557 When set to true, read commands will be assigned between the
558 primary and its replications in a Round-Robin manner.
559 :param load_balancing_strategy:
560 Enable read from replicas in READONLY mode and defines the load balancing
561 strategy that will be used for cluster node selection.
562 The data read from replicas is eventually consistent with the data in primary nodes.
563 :param dynamic_startup_nodes:
564 Set the RedisCluster's startup nodes to all of the discovered nodes.
565 If true (default value), the cluster's discovered nodes will be used to
566 determine the cluster nodes-slots mapping in the next topology refresh.
567 It will remove the initial passed startup nodes if their endpoints aren't
568 listed in the CLUSTER SLOTS output.
569 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
570 specific IP addresses, it is best to set it to false.
571 :param cluster_error_retry_attempts:
572 @deprecated - Please configure the 'retry' object instead
573 In case 'retry' object is set - this argument is ignored!
575 Number of times to retry before raising an error when
576 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
577 :class:`~.ClusterDownError` are encountered
578 :param retry:
579 A retry object that defines the retry strategy and the number of
580 retries for the cluster client.
581 In current implementation for the cluster client (starting form redis-py version 6.0.0)
582 the retry object is not yet fully utilized, instead it is used just to determine
583 the number of retries for the cluster client.
584 In the future releases the retry object will be used to handle the cluster client retries!
585 :param reinitialize_steps:
586 Specifies the number of MOVED errors that need to occur before
587 reinitializing the whole cluster topology. If a MOVED error occurs
588 and the cluster does not need to be reinitialized on this current
589 error handling, only the MOVED slot will be patched with the
590 redirected node.
591 To reinitialize the cluster on every MOVED error, set
592 reinitialize_steps to 1.
593 To avoid reinitializing the cluster on moved errors, set
594 reinitialize_steps to 0.
595 :param address_remap:
596 An optional callable which, when provided with an internal network
597 address of a node, e.g. a `(host, port)` tuple, will return the address
598 where the node is reachable. This can be used to map the addresses at
599 which the nodes _think_ they are, to addresses at which a client may
600 reach them, such as when they sit behind a proxy.
602 :**kwargs:
603 Extra arguments that will be sent into Redis instance when created
604 (See Official redis-py doc for supported kwargs - the only limitation
605 is that you can't provide 'retry' object as part of kwargs.
606 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
607 Some kwargs are not supported and will raise a
608 RedisClusterException:
609 - db (Redis do not support database SELECT in cluster mode)
610 """
611 if startup_nodes is None:
612 startup_nodes = []
614 if "db" in kwargs:
615 # Argument 'db' is not possible to use in cluster mode
616 raise RedisClusterException(
617 "Argument 'db' is not possible to use in cluster mode"
618 )
620 if "retry" in kwargs:
621 # Argument 'retry' is not possible to be used in kwargs when in cluster mode
622 # the kwargs are set to the lower level connections to the cluster nodes
623 # and there we provide retry configuration without retries allowed.
624 # The retries should be handled on cluster client level.
625 raise RedisClusterException(
626 "The 'retry' argument cannot be used in kwargs when running in cluster mode."
627 )
629 # Get the startup node/s
630 from_url = False
631 if url is not None:
632 from_url = True
633 url_options = parse_url(url)
634 if "path" in url_options:
635 raise RedisClusterException(
636 "RedisCluster does not currently support Unix Domain "
637 "Socket connections"
638 )
639 if "db" in url_options and url_options["db"] != 0:
640 # Argument 'db' is not possible to use in cluster mode
641 raise RedisClusterException(
642 "A ``db`` querystring option can only be 0 in cluster mode"
643 )
644 kwargs.update(url_options)
645 host = kwargs.get("host")
646 port = kwargs.get("port", port)
647 startup_nodes.append(ClusterNode(host, port))
648 elif host is not None and port is not None:
649 startup_nodes.append(ClusterNode(host, port))
650 elif len(startup_nodes) == 0:
651 # No startup node was provided
652 raise RedisClusterException(
653 "RedisCluster requires at least one node to discover the "
654 "cluster. Please provide one of the followings:\n"
655 "1. host and port, for example:\n"
656 " RedisCluster(host='localhost', port=6379)\n"
657 "2. list of startup nodes, for example:\n"
658 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
659 " ClusterNode('localhost', 6378)])"
660 )
661 # Update the connection arguments
662 # Whenever a new connection is established, RedisCluster's on_connect
663 # method should be run
664 # If the user passed on_connect function we'll save it and run it
665 # inside the RedisCluster.on_connect() function
666 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
667 kwargs.update({"redis_connect_func": self.on_connect})
668 kwargs = cleanup_kwargs(**kwargs)
669 if retry:
670 self.retry = retry
671 else:
672 self.retry = Retry(
673 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
674 retries=cluster_error_retry_attempts,
675 )
677 self.encoder = Encoder(
678 kwargs.get("encoding", "utf-8"),
679 kwargs.get("encoding_errors", "strict"),
680 kwargs.get("decode_responses", False),
681 )
682 protocol = kwargs.get("protocol", None)
683 if (cache_config or cache) and protocol not in [3, "3"]:
684 raise RedisError("Client caching is only supported with RESP version 3")
686 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
687 self.node_flags = self.__class__.NODE_FLAGS.copy()
688 self.read_from_replicas = read_from_replicas
689 self.load_balancing_strategy = load_balancing_strategy
690 self.reinitialize_counter = 0
691 self.reinitialize_steps = reinitialize_steps
692 if event_dispatcher is None:
693 self._event_dispatcher = EventDispatcher()
694 else:
695 self._event_dispatcher = event_dispatcher
696 self.nodes_manager = NodesManager(
697 startup_nodes=startup_nodes,
698 from_url=from_url,
699 require_full_coverage=require_full_coverage,
700 dynamic_startup_nodes=dynamic_startup_nodes,
701 address_remap=address_remap,
702 cache=cache,
703 cache_config=cache_config,
704 event_dispatcher=self._event_dispatcher,
705 **kwargs,
706 )
708 self.cluster_response_callbacks = CaseInsensitiveDict(
709 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
710 )
711 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
713 self.commands_parser = CommandsParser(self)
714 self._lock = threading.RLock()
716 def __enter__(self):
717 return self
719 def __exit__(self, exc_type, exc_value, traceback):
720 self.close()
722 def __del__(self):
723 try:
724 self.close()
725 except Exception:
726 pass
728 def disconnect_connection_pools(self):
729 for node in self.get_nodes():
730 if node.redis_connection:
731 try:
732 node.redis_connection.connection_pool.disconnect()
733 except OSError:
734 # Client was already disconnected. do nothing
735 pass
737 def on_connect(self, connection):
738 """
739 Initialize the connection, authenticate and select a database and send
740 READONLY if it is set during object initialization.
741 """
742 connection.on_connect()
744 if self.read_from_replicas or self.load_balancing_strategy:
745 # Sending READONLY command to server to configure connection as
746 # readonly. Since each cluster node may change its server type due
747 # to a failover, we should establish a READONLY connection
748 # regardless of the server type. If this is a primary connection,
749 # READONLY would not affect executing write commands.
750 connection.send_command("READONLY")
751 if str_if_bytes(connection.read_response()) != "OK":
752 raise ConnectionError("READONLY command failed")
754 if self.user_on_connect_func is not None:
755 self.user_on_connect_func(connection)
757 def get_redis_connection(self, node: "ClusterNode") -> Redis:
758 if not node.redis_connection:
759 with self._lock:
760 if not node.redis_connection:
761 self.nodes_manager.create_redis_connections([node])
762 return node.redis_connection
764 def get_node(self, host=None, port=None, node_name=None):
765 return self.nodes_manager.get_node(host, port, node_name)
767 def get_primaries(self):
768 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
770 def get_replicas(self):
771 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
773 def get_random_node(self):
774 return random.choice(list(self.nodes_manager.nodes_cache.values()))
776 def get_nodes(self):
777 return list(self.nodes_manager.nodes_cache.values())
779 def get_node_from_key(self, key, replica=False):
780 """
781 Get the node that holds the key's slot.
782 If replica set to True but the slot doesn't have any replicas, None is
783 returned.
784 """
785 slot = self.keyslot(key)
786 slot_cache = self.nodes_manager.slots_cache.get(slot)
787 if slot_cache is None or len(slot_cache) == 0:
788 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
789 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
790 return None
791 elif replica:
792 node_idx = 1
793 else:
794 # primary
795 node_idx = 0
797 return slot_cache[node_idx]
799 def get_default_node(self):
800 """
801 Get the cluster's default node
802 """
803 return self.nodes_manager.default_node
805 def set_default_node(self, node):
806 """
807 Set the default node of the cluster.
808 :param node: 'ClusterNode'
809 :return True if the default node was set, else False
810 """
811 if node is None or self.get_node(node_name=node.name) is None:
812 return False
813 self.nodes_manager.default_node = node
814 return True
816 def set_retry(self, retry: Retry) -> None:
817 self.retry = retry
819 def monitor(self, target_node=None):
820 """
821 Returns a Monitor object for the specified target node.
822 The default cluster node will be selected if no target node was
823 specified.
824 Monitor is useful for handling the MONITOR command to the redis server.
825 next_command() method returns one command from monitor
826 listen() method yields commands from monitor.
827 """
828 if target_node is None:
829 target_node = self.get_default_node()
830 if target_node.redis_connection is None:
831 raise RedisClusterException(
832 f"Cluster Node {target_node.name} has no redis_connection"
833 )
834 return target_node.redis_connection.monitor()
836 def pubsub(self, node=None, host=None, port=None, **kwargs):
837 """
838 Allows passing a ClusterNode, or host&port, to get a pubsub instance
839 connected to the specified node
840 """
841 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
843 def pipeline(self, transaction=None, shard_hint=None):
844 """
845 Cluster impl:
846 Pipelines do not work in cluster mode the same way they
847 do in normal mode. Create a clone of this object so
848 that simulating pipelines will work correctly. Each
849 command will be called directly when used and
850 when calling execute() will only return the result stack.
851 """
852 if shard_hint:
853 raise RedisClusterException("shard_hint is deprecated in cluster mode")
855 return ClusterPipeline(
856 nodes_manager=self.nodes_manager,
857 commands_parser=self.commands_parser,
858 startup_nodes=self.nodes_manager.startup_nodes,
859 result_callbacks=self.result_callbacks,
860 cluster_response_callbacks=self.cluster_response_callbacks,
861 read_from_replicas=self.read_from_replicas,
862 load_balancing_strategy=self.load_balancing_strategy,
863 reinitialize_steps=self.reinitialize_steps,
864 retry=self.retry,
865 lock=self._lock,
866 transaction=transaction,
867 )
869 def lock(
870 self,
871 name,
872 timeout=None,
873 sleep=0.1,
874 blocking=True,
875 blocking_timeout=None,
876 lock_class=None,
877 thread_local=True,
878 raise_on_release_error: bool = True,
879 ):
880 """
881 Return a new Lock object using key ``name`` that mimics
882 the behavior of threading.Lock.
884 If specified, ``timeout`` indicates a maximum life for the lock.
885 By default, it will remain locked until release() is called.
887 ``sleep`` indicates the amount of time to sleep per loop iteration
888 when the lock is in blocking mode and another client is currently
889 holding the lock.
891 ``blocking`` indicates whether calling ``acquire`` should block until
892 the lock has been acquired or to fail immediately, causing ``acquire``
893 to return False and the lock not being acquired. Defaults to True.
894 Note this value can be overridden by passing a ``blocking``
895 argument to ``acquire``.
897 ``blocking_timeout`` indicates the maximum amount of time in seconds to
898 spend trying to acquire the lock. A value of ``None`` indicates
899 continue trying forever. ``blocking_timeout`` can be specified as a
900 float or integer, both representing the number of seconds to wait.
902 ``lock_class`` forces the specified lock implementation. Note that as
903 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
904 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
905 you have created your own custom lock class.
907 ``thread_local`` indicates whether the lock token is placed in
908 thread-local storage. By default, the token is placed in thread local
909 storage so that a thread only sees its token, not a token set by
910 another thread. Consider the following timeline:
912 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
913 thread-1 sets the token to "abc"
914 time: 1, thread-2 blocks trying to acquire `my-lock` using the
915 Lock instance.
916 time: 5, thread-1 has not yet completed. redis expires the lock
917 key.
918 time: 5, thread-2 acquired `my-lock` now that it's available.
919 thread-2 sets the token to "xyz"
920 time: 6, thread-1 finishes its work and calls release(). if the
921 token is *not* stored in thread local storage, then
922 thread-1 would see the token value as "xyz" and would be
923 able to successfully release the thread-2's lock.
925 ``raise_on_release_error`` indicates whether to raise an exception when
926 the lock is no longer owned when exiting the context manager. By default,
927 this is True, meaning an exception will be raised. If False, the warning
928 will be logged and the exception will be suppressed.
930 In some use cases it's necessary to disable thread local storage. For
931 example, if you have code where one thread acquires a lock and passes
932 that lock instance to a worker thread to release later. If thread
933 local storage isn't disabled in this case, the worker thread won't see
934 the token set by the thread that acquired the lock. Our assumption
935 is that these cases aren't common and as such default to using
936 thread local storage."""
937 if lock_class is None:
938 lock_class = Lock
939 return lock_class(
940 self,
941 name,
942 timeout=timeout,
943 sleep=sleep,
944 blocking=blocking,
945 blocking_timeout=blocking_timeout,
946 thread_local=thread_local,
947 raise_on_release_error=raise_on_release_error,
948 )
950 def set_response_callback(self, command, callback):
951 """Set a custom Response Callback"""
952 self.cluster_response_callbacks[command] = callback
954 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
955 # Determine which nodes should be executed the command on.
956 # Returns a list of target nodes.
957 command = args[0].upper()
958 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
959 command = f"{args[0]} {args[1]}".upper()
961 nodes_flag = kwargs.pop("nodes_flag", None)
962 if nodes_flag is not None:
963 # nodes flag passed by the user
964 command_flag = nodes_flag
965 else:
966 # get the nodes group for this command if it was predefined
967 command_flag = self.command_flags.get(command)
968 if command_flag == self.__class__.RANDOM:
969 # return a random node
970 return [self.get_random_node()]
971 elif command_flag == self.__class__.PRIMARIES:
972 # return all primaries
973 return self.get_primaries()
974 elif command_flag == self.__class__.REPLICAS:
975 # return all replicas
976 return self.get_replicas()
977 elif command_flag == self.__class__.ALL_NODES:
978 # return all nodes
979 return self.get_nodes()
980 elif command_flag == self.__class__.DEFAULT_NODE:
981 # return the cluster's default node
982 return [self.nodes_manager.default_node]
983 elif command in self.__class__.SEARCH_COMMANDS[0]:
984 return [self.nodes_manager.default_node]
985 else:
986 # get the node that holds the key's slot
987 slot = self.determine_slot(*args)
988 node = self.nodes_manager.get_node_from_slot(
989 slot,
990 self.read_from_replicas and command in READ_COMMANDS,
991 self.load_balancing_strategy if command in READ_COMMANDS else None,
992 )
993 return [node]
995 def _should_reinitialized(self):
996 # To reinitialize the cluster on every MOVED error,
997 # set reinitialize_steps to 1.
998 # To avoid reinitializing the cluster on moved errors, set
999 # reinitialize_steps to 0.
1000 if self.reinitialize_steps == 0:
1001 return False
1002 else:
1003 return self.reinitialize_counter % self.reinitialize_steps == 0
1005 def keyslot(self, key):
1006 """
1007 Calculate keyslot for a given key.
1008 See Keys distribution model in https://redis.io/topics/cluster-spec
1009 """
1010 k = self.encoder.encode(key)
1011 return key_slot(k)
1013 def _get_command_keys(self, *args):
1014 """
1015 Get the keys in the command. If the command has no keys in in, None is
1016 returned.
1018 NOTE: Due to a bug in redis<7.0, this function does not work properly
1019 for EVAL or EVALSHA when the `numkeys` arg is 0.
1020 - issue: https://github.com/redis/redis/issues/9493
1021 - fix: https://github.com/redis/redis/pull/9733
1023 So, don't use this function with EVAL or EVALSHA.
1024 """
1025 redis_conn = self.get_default_node().redis_connection
1026 return self.commands_parser.get_keys(redis_conn, *args)
1028 def determine_slot(self, *args) -> int:
1029 """
1030 Figure out what slot to use based on args.
1032 Raises a RedisClusterException if there's a missing key and we can't
1033 determine what slots to map the command to; or, if the keys don't
1034 all map to the same key slot.
1035 """
1036 command = args[0]
1037 if self.command_flags.get(command) == SLOT_ID:
1038 # The command contains the slot ID
1039 return args[1]
1041 # Get the keys in the command
1043 # EVAL and EVALSHA are common enough that it's wasteful to go to the
1044 # redis server to parse the keys. Besides, there is a bug in redis<7.0
1045 # where `self._get_command_keys()` fails anyway. So, we special case
1046 # EVAL/EVALSHA.
1047 if command.upper() in ("EVAL", "EVALSHA"):
1048 # command syntax: EVAL "script body" num_keys ...
1049 if len(args) <= 2:
1050 raise RedisClusterException(f"Invalid args in command: {args}")
1051 num_actual_keys = int(args[2])
1052 eval_keys = args[3 : 3 + num_actual_keys]
1053 # if there are 0 keys, that means the script can be run on any node
1054 # so we can just return a random slot
1055 if len(eval_keys) == 0:
1056 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1057 keys = eval_keys
1058 else:
1059 keys = self._get_command_keys(*args)
1060 if keys is None or len(keys) == 0:
1061 # FCALL can call a function with 0 keys, that means the function
1062 # can be run on any node so we can just return a random slot
1063 if command.upper() in ("FCALL", "FCALL_RO"):
1064 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1065 raise RedisClusterException(
1066 "No way to dispatch this command to Redis Cluster. "
1067 "Missing key.\nYou can execute the command by specifying "
1068 f"target nodes.\nCommand: {args}"
1069 )
1071 # single key command
1072 if len(keys) == 1:
1073 return self.keyslot(keys[0])
1075 # multi-key command; we need to make sure all keys are mapped to
1076 # the same slot
1077 slots = {self.keyslot(key) for key in keys}
1078 if len(slots) != 1:
1079 raise RedisClusterException(
1080 f"{command} - all keys must map to the same key slot"
1081 )
1083 return slots.pop()
1085 def get_encoder(self):
1086 """
1087 Get the connections' encoder
1088 """
1089 return self.encoder
1091 def get_connection_kwargs(self):
1092 """
1093 Get the connections' key-word arguments
1094 """
1095 return self.nodes_manager.connection_kwargs
1097 def _is_nodes_flag(self, target_nodes):
1098 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1100 def _parse_target_nodes(self, target_nodes):
1101 if isinstance(target_nodes, list):
1102 nodes = target_nodes
1103 elif isinstance(target_nodes, ClusterNode):
1104 # Supports passing a single ClusterNode as a variable
1105 nodes = [target_nodes]
1106 elif isinstance(target_nodes, dict):
1107 # Supports dictionaries of the format {node_name: node}.
1108 # It enables to execute commands with multi nodes as follows:
1109 # rc.cluster_save_config(rc.get_primaries())
1110 nodes = target_nodes.values()
1111 else:
1112 raise TypeError(
1113 "target_nodes type can be one of the following: "
1114 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1115 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1116 f"The passed type is {type(target_nodes)}"
1117 )
1118 return nodes
1120 def execute_command(self, *args, **kwargs):
1121 return self._internal_execute_command(*args, **kwargs)
1123 def _internal_execute_command(self, *args, **kwargs):
1124 """
1125 Wrapper for ERRORS_ALLOW_RETRY error handling.
1127 It will try the number of times specified by the retries property from
1128 config option "self.retry" which defaults to 3 unless manually
1129 configured.
1131 If it reaches the number of times, the command will raise the exception
1133 Key argument :target_nodes: can be passed with the following types:
1134 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1135 ClusterNode
1136 list<ClusterNode>
1137 dict<Any, ClusterNode>
1138 """
1139 target_nodes_specified = False
1140 is_default_node = False
1141 target_nodes = None
1142 passed_targets = kwargs.pop("target_nodes", None)
1143 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1144 target_nodes = self._parse_target_nodes(passed_targets)
1145 target_nodes_specified = True
1146 # If an error that allows retrying was thrown, the nodes and slots
1147 # cache were reinitialized. We will retry executing the command with
1148 # the updated cluster setup only when the target nodes can be
1149 # determined again with the new cache tables. Therefore, when target
1150 # nodes were passed to this function, we cannot retry the command
1151 # execution since the nodes may not be valid anymore after the tables
1152 # were reinitialized. So in case of passed target nodes,
1153 # retry_attempts will be set to 0.
1154 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
1155 # Add one for the first execution
1156 execute_attempts = 1 + retry_attempts
1157 for _ in range(execute_attempts):
1158 try:
1159 res = {}
1160 if not target_nodes_specified:
1161 # Determine the nodes to execute the command on
1162 target_nodes = self._determine_nodes(
1163 *args, **kwargs, nodes_flag=passed_targets
1164 )
1165 if not target_nodes:
1166 raise RedisClusterException(
1167 f"No targets were found to execute {args} command on"
1168 )
1169 if (
1170 len(target_nodes) == 1
1171 and target_nodes[0] == self.get_default_node()
1172 ):
1173 is_default_node = True
1174 for node in target_nodes:
1175 res[node.name] = self._execute_command(node, *args, **kwargs)
1176 # Return the processed result
1177 return self._process_result(args[0], res, **kwargs)
1178 except Exception as e:
1179 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1180 if is_default_node:
1181 # Replace the default cluster node
1182 self.replace_default_node()
1183 # The nodes and slots cache were reinitialized.
1184 # Try again with the new cluster setup.
1185 retry_attempts -= 1
1186 continue
1187 else:
1188 # raise the exception
1189 raise e
1191 def _execute_command(self, target_node, *args, **kwargs):
1192 """
1193 Send a command to a node in the cluster
1194 """
1195 command = args[0]
1196 redis_node = None
1197 connection = None
1198 redirect_addr = None
1199 asking = False
1200 moved = False
1201 ttl = int(self.RedisClusterRequestTTL)
1203 while ttl > 0:
1204 ttl -= 1
1205 try:
1206 if asking:
1207 target_node = self.get_node(node_name=redirect_addr)
1208 elif moved:
1209 # MOVED occurred and the slots cache was updated,
1210 # refresh the target node
1211 slot = self.determine_slot(*args)
1212 target_node = self.nodes_manager.get_node_from_slot(
1213 slot,
1214 self.read_from_replicas and command in READ_COMMANDS,
1215 self.load_balancing_strategy
1216 if command in READ_COMMANDS
1217 else None,
1218 )
1219 moved = False
1221 redis_node = self.get_redis_connection(target_node)
1222 connection = get_connection(redis_node)
1223 if asking:
1224 connection.send_command("ASKING")
1225 redis_node.parse_response(connection, "ASKING", **kwargs)
1226 asking = False
1227 connection.send_command(*args, **kwargs)
1228 response = redis_node.parse_response(connection, command, **kwargs)
1230 # Remove keys entry, it needs only for cache.
1231 kwargs.pop("keys", None)
1233 if command in self.cluster_response_callbacks:
1234 response = self.cluster_response_callbacks[command](
1235 response, **kwargs
1236 )
1237 return response
1238 except AuthenticationError:
1239 raise
1240 except MaxConnectionsError:
1241 # MaxConnectionsError indicates client-side resource exhaustion
1242 # (too many connections in the pool), not a node failure.
1243 # Don't treat this as a node failure - just re-raise the error
1244 # without reinitializing the cluster.
1245 raise
1246 except (ConnectionError, TimeoutError) as e:
1247 # ConnectionError can also be raised if we couldn't get a
1248 # connection from the pool before timing out, so check that
1249 # this is an actual connection before attempting to disconnect.
1250 if connection is not None:
1251 connection.disconnect()
1253 # Remove the failed node from the startup nodes before we try
1254 # to reinitialize the cluster
1255 self.nodes_manager.startup_nodes.pop(target_node.name, None)
1256 # Reset the cluster node's connection
1257 target_node.redis_connection = None
1258 self.nodes_manager.initialize()
1259 raise e
1260 except MovedError as e:
1261 # First, we will try to patch the slots/nodes cache with the
1262 # redirected node output and try again. If MovedError exceeds
1263 # 'reinitialize_steps' number of times, we will force
1264 # reinitializing the tables, and then try again.
1265 # 'reinitialize_steps' counter will increase faster when
1266 # the same client object is shared between multiple threads. To
1267 # reduce the frequency you can set this variable in the
1268 # RedisCluster constructor.
1269 self.reinitialize_counter += 1
1270 if self._should_reinitialized():
1271 self.nodes_manager.initialize()
1272 # Reset the counter
1273 self.reinitialize_counter = 0
1274 else:
1275 self.nodes_manager.update_moved_exception(e)
1276 moved = True
1277 except TryAgainError:
1278 if ttl < self.RedisClusterRequestTTL / 2:
1279 time.sleep(0.05)
1280 except AskError as e:
1281 redirect_addr = get_node_name(host=e.host, port=e.port)
1282 asking = True
1283 except (ClusterDownError, SlotNotCoveredError):
1284 # ClusterDownError can occur during a failover and to get
1285 # self-healed, we will try to reinitialize the cluster layout
1286 # and retry executing the command
1288 # SlotNotCoveredError can occur when the cluster is not fully
1289 # initialized or can be temporary issue.
1290 # We will try to reinitialize the cluster topology
1291 # and retry executing the command
1293 time.sleep(0.25)
1294 self.nodes_manager.initialize()
1295 raise
1296 except ResponseError:
1297 raise
1298 except Exception as e:
1299 if connection:
1300 connection.disconnect()
1301 raise e
1302 finally:
1303 if connection is not None:
1304 redis_node.connection_pool.release(connection)
1306 raise ClusterError("TTL exhausted.")
1308 def close(self) -> None:
1309 try:
1310 with self._lock:
1311 if self.nodes_manager:
1312 self.nodes_manager.close()
1313 except AttributeError:
1314 # RedisCluster's __init__ can fail before nodes_manager is set
1315 pass
1317 def _process_result(self, command, res, **kwargs):
1318 """
1319 Process the result of the executed command.
1320 The function would return a dict or a single value.
1322 :type command: str
1323 :type res: dict
1325 `res` should be in the following format:
1326 Dict<node_name, command_result>
1327 """
1328 if command in self.result_callbacks:
1329 return self.result_callbacks[command](command, res, **kwargs)
1330 elif len(res) == 1:
1331 # When we execute the command on a single node, we can
1332 # remove the dictionary and return a single response
1333 return list(res.values())[0]
1334 else:
1335 return res
1337 def load_external_module(self, funcname, func):
1338 """
1339 This function can be used to add externally defined redis modules,
1340 and their namespaces to the redis client.
1342 ``funcname`` - A string containing the name of the function to create
1343 ``func`` - The function, being added to this class.
1344 """
1345 setattr(self, funcname, func)
1347 def transaction(self, func, *watches, **kwargs):
1348 """
1349 Convenience method for executing the callable `func` as a transaction
1350 while watching all keys specified in `watches`. The 'func' callable
1351 should expect a single argument which is a Pipeline object.
1352 """
1353 shard_hint = kwargs.pop("shard_hint", None)
1354 value_from_callable = kwargs.pop("value_from_callable", False)
1355 watch_delay = kwargs.pop("watch_delay", None)
1356 with self.pipeline(True, shard_hint) as pipe:
1357 while True:
1358 try:
1359 if watches:
1360 pipe.watch(*watches)
1361 func_value = func(pipe)
1362 exec_value = pipe.execute()
1363 return func_value if value_from_callable else exec_value
1364 except WatchError:
1365 if watch_delay is not None and watch_delay > 0:
1366 time.sleep(watch_delay)
1367 continue
1370class ClusterNode:
1371 def __init__(self, host, port, server_type=None, redis_connection=None):
1372 if host == "localhost":
1373 host = socket.gethostbyname(host)
1375 self.host = host
1376 self.port = port
1377 self.name = get_node_name(host, port)
1378 self.server_type = server_type
1379 self.redis_connection = redis_connection
1381 def __repr__(self):
1382 return (
1383 f"[host={self.host},"
1384 f"port={self.port},"
1385 f"name={self.name},"
1386 f"server_type={self.server_type},"
1387 f"redis_connection={self.redis_connection}]"
1388 )
1390 def __eq__(self, obj):
1391 return isinstance(obj, ClusterNode) and obj.name == self.name
1393 def __del__(self):
1394 try:
1395 if self.redis_connection is not None:
1396 self.redis_connection.close()
1397 except Exception:
1398 # Ignore errors when closing the connection
1399 pass
1402class LoadBalancingStrategy(Enum):
1403 ROUND_ROBIN = "round_robin"
1404 ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1405 RANDOM_REPLICA = "random_replica"
1408class LoadBalancer:
1409 """
1410 Round-Robin Load Balancing
1411 """
1413 def __init__(self, start_index: int = 0) -> None:
1414 self.primary_to_idx = {}
1415 self.start_index = start_index
1417 def get_server_index(
1418 self,
1419 primary: str,
1420 list_size: int,
1421 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1422 ) -> int:
1423 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1424 return self._get_random_replica_index(list_size)
1425 else:
1426 return self._get_round_robin_index(
1427 primary,
1428 list_size,
1429 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1430 )
1432 def reset(self) -> None:
1433 self.primary_to_idx.clear()
1435 def _get_random_replica_index(self, list_size: int) -> int:
1436 return random.randint(1, list_size - 1)
1438 def _get_round_robin_index(
1439 self, primary: str, list_size: int, replicas_only: bool
1440 ) -> int:
1441 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1442 if replicas_only and server_index == 0:
1443 # skip the primary node index
1444 server_index = 1
1445 # Update the index for the next round
1446 self.primary_to_idx[primary] = (server_index + 1) % list_size
1447 return server_index
1450class NodesManager:
1451 def __init__(
1452 self,
1453 startup_nodes,
1454 from_url=False,
1455 require_full_coverage=False,
1456 lock=None,
1457 dynamic_startup_nodes=True,
1458 connection_pool_class=ConnectionPool,
1459 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
1460 cache: Optional[CacheInterface] = None,
1461 cache_config: Optional[CacheConfig] = None,
1462 cache_factory: Optional[CacheFactoryInterface] = None,
1463 event_dispatcher: Optional[EventDispatcher] = None,
1464 **kwargs,
1465 ):
1466 self.nodes_cache: Dict[str, Redis] = {}
1467 self.slots_cache = {}
1468 self.startup_nodes = {}
1469 self.default_node = None
1470 self.populate_startup_nodes(startup_nodes)
1471 self.from_url = from_url
1472 self._require_full_coverage = require_full_coverage
1473 self._dynamic_startup_nodes = dynamic_startup_nodes
1474 self.connection_pool_class = connection_pool_class
1475 self.address_remap = address_remap
1476 self._cache = cache
1477 self._cache_config = cache_config
1478 self._cache_factory = cache_factory
1479 self._moved_exception = None
1480 self.connection_kwargs = kwargs
1481 self.read_load_balancer = LoadBalancer()
1482 if lock is None:
1483 lock = threading.RLock()
1484 self._lock = lock
1485 if event_dispatcher is None:
1486 self._event_dispatcher = EventDispatcher()
1487 else:
1488 self._event_dispatcher = event_dispatcher
1489 self._credential_provider = self.connection_kwargs.get(
1490 "credential_provider", None
1491 )
1492 self.initialize()
1494 def get_node(self, host=None, port=None, node_name=None):
1495 """
1496 Get the requested node from the cluster's nodes.
1497 nodes.
1498 :return: ClusterNode if the node exists, else None
1499 """
1500 if host and port:
1501 # the user passed host and port
1502 if host == "localhost":
1503 host = socket.gethostbyname(host)
1504 return self.nodes_cache.get(get_node_name(host=host, port=port))
1505 elif node_name:
1506 return self.nodes_cache.get(node_name)
1507 else:
1508 return None
1510 def update_moved_exception(self, exception):
1511 self._moved_exception = exception
1513 def _update_moved_slots(self):
1514 """
1515 Update the slot's node with the redirected one
1516 """
1517 e = self._moved_exception
1518 redirected_node = self.get_node(host=e.host, port=e.port)
1519 if redirected_node is not None:
1520 # The node already exists
1521 if redirected_node.server_type is not PRIMARY:
1522 # Update the node's server type
1523 redirected_node.server_type = PRIMARY
1524 else:
1525 # This is a new node, we will add it to the nodes cache
1526 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
1527 self.nodes_cache[redirected_node.name] = redirected_node
1528 if redirected_node in self.slots_cache[e.slot_id]:
1529 # The MOVED error resulted from a failover, and the new slot owner
1530 # had previously been a replica.
1531 old_primary = self.slots_cache[e.slot_id][0]
1532 # Update the old primary to be a replica and add it to the end of
1533 # the slot's node list
1534 old_primary.server_type = REPLICA
1535 self.slots_cache[e.slot_id].append(old_primary)
1536 # Remove the old replica, which is now a primary, from the slot's
1537 # node list
1538 self.slots_cache[e.slot_id].remove(redirected_node)
1539 # Override the old primary with the new one
1540 self.slots_cache[e.slot_id][0] = redirected_node
1541 if self.default_node == old_primary:
1542 # Update the default node with the new primary
1543 self.default_node = redirected_node
1544 else:
1545 # The new slot owner is a new server, or a server from a different
1546 # shard. We need to remove all current nodes from the slot's list
1547 # (including replications) and add just the new node.
1548 self.slots_cache[e.slot_id] = [redirected_node]
1549 # Reset moved_exception
1550 self._moved_exception = None
1552 @deprecated_args(
1553 args_to_warn=["server_type"],
1554 reason=(
1555 "In case you need select some load balancing strategy "
1556 "that will use replicas, please set it through 'load_balancing_strategy'"
1557 ),
1558 version="5.3.0",
1559 )
1560 def get_node_from_slot(
1561 self,
1562 slot,
1563 read_from_replicas=False,
1564 load_balancing_strategy=None,
1565 server_type=None,
1566 ) -> ClusterNode:
1567 """
1568 Gets a node that servers this hash slot
1569 """
1570 if self._moved_exception:
1571 with self._lock:
1572 if self._moved_exception:
1573 self._update_moved_slots()
1575 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
1576 raise SlotNotCoveredError(
1577 f'Slot "{slot}" not covered by the cluster. '
1578 f'"require_full_coverage={self._require_full_coverage}"'
1579 )
1581 if read_from_replicas is True and load_balancing_strategy is None:
1582 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1584 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1585 # get the server index using the strategy defined in load_balancing_strategy
1586 primary_name = self.slots_cache[slot][0].name
1587 node_idx = self.read_load_balancer.get_server_index(
1588 primary_name, len(self.slots_cache[slot]), load_balancing_strategy
1589 )
1590 elif (
1591 server_type is None
1592 or server_type == PRIMARY
1593 or len(self.slots_cache[slot]) == 1
1594 ):
1595 # return a primary
1596 node_idx = 0
1597 else:
1598 # return a replica
1599 # randomly choose one of the replicas
1600 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
1602 return self.slots_cache[slot][node_idx]
1604 def get_nodes_by_server_type(self, server_type):
1605 """
1606 Get all nodes with the specified server type
1607 :param server_type: 'primary' or 'replica'
1608 :return: list of ClusterNode
1609 """
1610 return [
1611 node
1612 for node in self.nodes_cache.values()
1613 if node.server_type == server_type
1614 ]
1616 def populate_startup_nodes(self, nodes):
1617 """
1618 Populate all startup nodes and filters out any duplicates
1619 """
1620 for n in nodes:
1621 self.startup_nodes[n.name] = n
1623 def check_slots_coverage(self, slots_cache):
1624 # Validate if all slots are covered or if we should try next
1625 # startup node
1626 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
1627 if i not in slots_cache:
1628 return False
1629 return True
1631 def create_redis_connections(self, nodes):
1632 """
1633 This function will create a redis connection to all nodes in :nodes:
1634 """
1635 connection_pools = []
1636 for node in nodes:
1637 if node.redis_connection is None:
1638 node.redis_connection = self.create_redis_node(
1639 host=node.host, port=node.port, **self.connection_kwargs
1640 )
1641 connection_pools.append(node.redis_connection.connection_pool)
1643 self._event_dispatcher.dispatch(
1644 AfterPooledConnectionsInstantiationEvent(
1645 connection_pools, ClientType.SYNC, self._credential_provider
1646 )
1647 )
1649 def create_redis_node(self, host, port, **kwargs):
1650 # We are configuring the connection pool not to retry
1651 # connections on lower level clients to avoid retrying
1652 # connections to nodes that are not reachable
1653 # and to avoid blocking the connection pool.
1654 # The only error that will have some handling in the lower
1655 # level clients is ConnectionError which will trigger disconnection
1656 # of the socket.
1657 # The retries will be handled on cluster client level
1658 # where we will have proper handling of the cluster topology
1659 node_retry_config = Retry(
1660 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
1661 )
1663 if self.from_url:
1664 # Create a redis node with a costumed connection pool
1665 kwargs.update({"host": host})
1666 kwargs.update({"port": port})
1667 kwargs.update({"cache": self._cache})
1668 kwargs.update({"retry": node_retry_config})
1669 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
1670 else:
1671 r = Redis(
1672 host=host,
1673 port=port,
1674 cache=self._cache,
1675 retry=node_retry_config,
1676 **kwargs,
1677 )
1678 return r
1680 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
1681 node_name = get_node_name(host, port)
1682 # check if we already have this node in the tmp_nodes_cache
1683 target_node = tmp_nodes_cache.get(node_name)
1684 if target_node is None:
1685 # before creating a new cluster node, check if the cluster node already
1686 # exists in the current nodes cache and has a valid connection so we can
1687 # reuse it
1688 target_node = self.nodes_cache.get(node_name)
1689 if target_node is None or target_node.redis_connection is None:
1690 # create new cluster node for this cluster
1691 target_node = ClusterNode(host, port, role)
1692 if target_node.server_type != role:
1693 target_node.server_type = role
1694 # add this node to the nodes cache
1695 tmp_nodes_cache[target_node.name] = target_node
1697 return target_node
1699 def initialize(self):
1700 """
1701 Initializes the nodes cache, slots cache and redis connections.
1702 :startup_nodes:
1703 Responsible for discovering other nodes in the cluster
1704 """
1705 self.reset()
1706 tmp_nodes_cache = {}
1707 tmp_slots = {}
1708 disagreements = []
1709 startup_nodes_reachable = False
1710 fully_covered = False
1711 kwargs = self.connection_kwargs
1712 exception = None
1713 # Convert to tuple to prevent RuntimeError if self.startup_nodes
1714 # is modified during iteration
1715 for startup_node in tuple(self.startup_nodes.values()):
1716 try:
1717 if startup_node.redis_connection:
1718 r = startup_node.redis_connection
1719 else:
1720 # Create a new Redis connection
1721 r = self.create_redis_node(
1722 startup_node.host, startup_node.port, **kwargs
1723 )
1724 self.startup_nodes[startup_node.name].redis_connection = r
1725 # Make sure cluster mode is enabled on this node
1726 try:
1727 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1728 r.connection_pool.disconnect()
1729 except ResponseError:
1730 raise RedisClusterException(
1731 "Cluster mode is not enabled on this node"
1732 )
1733 startup_nodes_reachable = True
1734 except Exception as e:
1735 # Try the next startup node.
1736 # The exception is saved and raised only if we have no more nodes.
1737 exception = e
1738 continue
1740 # CLUSTER SLOTS command results in the following output:
1741 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
1742 # where each node contains the following list: [IP, port, node_id]
1743 # Therefore, cluster_slots[0][2][0] will be the IP address of the
1744 # primary node of the first slot section.
1745 # If there's only one server in the cluster, its ``host`` is ''
1746 # Fix it to the host in startup_nodes
1747 if (
1748 len(cluster_slots) == 1
1749 and len(cluster_slots[0][2][0]) == 0
1750 and len(self.startup_nodes) == 1
1751 ):
1752 cluster_slots[0][2][0] = startup_node.host
1754 for slot in cluster_slots:
1755 primary_node = slot[2]
1756 host = str_if_bytes(primary_node[0])
1757 if host == "":
1758 host = startup_node.host
1759 port = int(primary_node[1])
1760 host, port = self.remap_host_port(host, port)
1762 nodes_for_slot = []
1764 target_node = self._get_or_create_cluster_node(
1765 host, port, PRIMARY, tmp_nodes_cache
1766 )
1767 nodes_for_slot.append(target_node)
1769 replica_nodes = slot[3:]
1770 for replica_node in replica_nodes:
1771 host = str_if_bytes(replica_node[0])
1772 port = int(replica_node[1])
1773 host, port = self.remap_host_port(host, port)
1774 target_replica_node = self._get_or_create_cluster_node(
1775 host, port, REPLICA, tmp_nodes_cache
1776 )
1777 nodes_for_slot.append(target_replica_node)
1779 for i in range(int(slot[0]), int(slot[1]) + 1):
1780 if i not in tmp_slots:
1781 tmp_slots[i] = nodes_for_slot
1782 else:
1783 # Validate that 2 nodes want to use the same slot cache
1784 # setup
1785 tmp_slot = tmp_slots[i][0]
1786 if tmp_slot.name != target_node.name:
1787 disagreements.append(
1788 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
1789 )
1791 if len(disagreements) > 5:
1792 raise RedisClusterException(
1793 f"startup_nodes could not agree on a valid "
1794 f"slots cache: {', '.join(disagreements)}"
1795 )
1797 fully_covered = self.check_slots_coverage(tmp_slots)
1798 if fully_covered:
1799 # Don't need to continue to the next startup node if all
1800 # slots are covered
1801 break
1803 if not startup_nodes_reachable:
1804 raise RedisClusterException(
1805 f"Redis Cluster cannot be connected. Please provide at least "
1806 f"one reachable node: {str(exception)}"
1807 ) from exception
1809 if self._cache is None and self._cache_config is not None:
1810 if self._cache_factory is None:
1811 self._cache = CacheFactory(self._cache_config).get_cache()
1812 else:
1813 self._cache = self._cache_factory.get_cache()
1815 # Create Redis connections to all nodes
1816 self.create_redis_connections(list(tmp_nodes_cache.values()))
1818 # Check if the slots are not fully covered
1819 if not fully_covered and self._require_full_coverage:
1820 # Despite the requirement that the slots be covered, there
1821 # isn't a full coverage
1822 raise RedisClusterException(
1823 f"All slots are not covered after query all startup_nodes. "
1824 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
1825 f"covered..."
1826 )
1828 # Set the tmp variables to the real variables
1829 self.nodes_cache = tmp_nodes_cache
1830 self.slots_cache = tmp_slots
1831 # Set the default node
1832 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
1833 if self._dynamic_startup_nodes:
1834 # Populate the startup nodes with all discovered nodes
1835 self.startup_nodes = tmp_nodes_cache
1836 # If initialize was called after a MovedError, clear it
1837 self._moved_exception = None
1839 def close(self) -> None:
1840 self.default_node = None
1841 for node in self.nodes_cache.values():
1842 if node.redis_connection:
1843 node.redis_connection.close()
1845 def reset(self):
1846 try:
1847 self.read_load_balancer.reset()
1848 except TypeError:
1849 # The read_load_balancer is None, do nothing
1850 pass
1852 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
1853 """
1854 Remap the host and port returned from the cluster to a different
1855 internal value. Useful if the client is not connecting directly
1856 to the cluster.
1857 """
1858 if self.address_remap:
1859 return self.address_remap((host, port))
1860 return host, port
1862 def find_connection_owner(self, connection: Connection) -> Optional[Redis]:
1863 node_name = get_node_name(connection.host, connection.port)
1864 for node in tuple(self.nodes_cache.values()):
1865 if node.redis_connection:
1866 conn_args = node.redis_connection.connection_pool.connection_kwargs
1867 if node_name == get_node_name(
1868 conn_args.get("host"), conn_args.get("port")
1869 ):
1870 return node
1873class ClusterPubSub(PubSub):
1874 """
1875 Wrapper for PubSub class.
1877 IMPORTANT: before using ClusterPubSub, read about the known limitations
1878 with pubsub in Cluster mode and learn how to workaround them:
1879 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
1880 """
1882 def __init__(
1883 self,
1884 redis_cluster,
1885 node=None,
1886 host=None,
1887 port=None,
1888 push_handler_func=None,
1889 event_dispatcher: Optional["EventDispatcher"] = None,
1890 **kwargs,
1891 ):
1892 """
1893 When a pubsub instance is created without specifying a node, a single
1894 node will be transparently chosen for the pubsub connection on the
1895 first command execution. The node will be determined by:
1896 1. Hashing the channel name in the request to find its keyslot
1897 2. Selecting a node that handles the keyslot: If read_from_replicas is
1898 set to true or load_balancing_strategy is set, a replica can be selected.
1900 :type redis_cluster: RedisCluster
1901 :type node: ClusterNode
1902 :type host: str
1903 :type port: int
1904 """
1905 self.node = None
1906 self.set_pubsub_node(redis_cluster, node, host, port)
1907 connection_pool = (
1908 None
1909 if self.node is None
1910 else redis_cluster.get_redis_connection(self.node).connection_pool
1911 )
1912 self.cluster = redis_cluster
1913 self.node_pubsub_mapping = {}
1914 self._pubsubs_generator = self._pubsubs_generator()
1915 if event_dispatcher is None:
1916 self._event_dispatcher = EventDispatcher()
1917 else:
1918 self._event_dispatcher = event_dispatcher
1919 super().__init__(
1920 connection_pool=connection_pool,
1921 encoder=redis_cluster.encoder,
1922 push_handler_func=push_handler_func,
1923 event_dispatcher=self._event_dispatcher,
1924 **kwargs,
1925 )
1927 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
1928 """
1929 The pubsub node will be set according to the passed node, host and port
1930 When none of the node, host, or port are specified - the node is set
1931 to None and will be determined by the keyslot of the channel in the
1932 first command to be executed.
1933 RedisClusterException will be thrown if the passed node does not exist
1934 in the cluster.
1935 If host is passed without port, or vice versa, a DataError will be
1936 thrown.
1937 :type cluster: RedisCluster
1938 :type node: ClusterNode
1939 :type host: str
1940 :type port: int
1941 """
1942 if node is not None:
1943 # node is passed by the user
1944 self._raise_on_invalid_node(cluster, node, node.host, node.port)
1945 pubsub_node = node
1946 elif host is not None and port is not None:
1947 # host and port passed by the user
1948 node = cluster.get_node(host=host, port=port)
1949 self._raise_on_invalid_node(cluster, node, host, port)
1950 pubsub_node = node
1951 elif any([host, port]) is True:
1952 # only 'host' or 'port' passed
1953 raise DataError("Passing a host requires passing a port, and vice versa")
1954 else:
1955 # nothing passed by the user. set node to None
1956 pubsub_node = None
1958 self.node = pubsub_node
1960 def get_pubsub_node(self):
1961 """
1962 Get the node that is being used as the pubsub connection
1963 """
1964 return self.node
1966 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
1967 """
1968 Raise a RedisClusterException if the node is None or doesn't exist in
1969 the cluster.
1970 """
1971 if node is None or redis_cluster.get_node(node_name=node.name) is None:
1972 raise RedisClusterException(
1973 f"Node {host}:{port} doesn't exist in the cluster"
1974 )
1976 def execute_command(self, *args):
1977 """
1978 Execute a subscribe/unsubscribe command.
1980 Taken code from redis-py and tweak to make it work within a cluster.
1981 """
1982 # NOTE: don't parse the response in this function -- it could pull a
1983 # legitimate message off the stack if the connection is already
1984 # subscribed to one or more channels
1986 if self.connection is None:
1987 if self.connection_pool is None:
1988 if len(args) > 1:
1989 # Hash the first channel and get one of the nodes holding
1990 # this slot
1991 channel = args[1]
1992 slot = self.cluster.keyslot(channel)
1993 node = self.cluster.nodes_manager.get_node_from_slot(
1994 slot,
1995 self.cluster.read_from_replicas,
1996 self.cluster.load_balancing_strategy,
1997 )
1998 else:
1999 # Get a random node
2000 node = self.cluster.get_random_node()
2001 self.node = node
2002 redis_connection = self.cluster.get_redis_connection(node)
2003 self.connection_pool = redis_connection.connection_pool
2004 self.connection = self.connection_pool.get_connection()
2005 # register a callback that re-subscribes to any channels we
2006 # were listening to when we were disconnected
2007 self.connection.register_connect_callback(self.on_connect)
2008 if self.push_handler_func is not None:
2009 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
2010 self._event_dispatcher.dispatch(
2011 AfterPubSubConnectionInstantiationEvent(
2012 self.connection, self.connection_pool, ClientType.SYNC, self._lock
2013 )
2014 )
2015 connection = self.connection
2016 self._execute(connection, connection.send_command, *args)
2018 def _get_node_pubsub(self, node):
2019 try:
2020 return self.node_pubsub_mapping[node.name]
2021 except KeyError:
2022 pubsub = node.redis_connection.pubsub(
2023 push_handler_func=self.push_handler_func
2024 )
2025 self.node_pubsub_mapping[node.name] = pubsub
2026 return pubsub
2028 def _sharded_message_generator(self):
2029 for _ in range(len(self.node_pubsub_mapping)):
2030 pubsub = next(self._pubsubs_generator)
2031 message = pubsub.get_message()
2032 if message is not None:
2033 return message
2034 return None
2036 def _pubsubs_generator(self):
2037 while True:
2038 yield from self.node_pubsub_mapping.values()
2040 def get_sharded_message(
2041 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
2042 ):
2043 if target_node:
2044 message = self.node_pubsub_mapping[target_node.name].get_message(
2045 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
2046 )
2047 else:
2048 message = self._sharded_message_generator()
2049 if message is None:
2050 return None
2051 elif str_if_bytes(message["type"]) == "sunsubscribe":
2052 if message["channel"] in self.pending_unsubscribe_shard_channels:
2053 self.pending_unsubscribe_shard_channels.remove(message["channel"])
2054 self.shard_channels.pop(message["channel"], None)
2055 node = self.cluster.get_node_from_key(message["channel"])
2056 if self.node_pubsub_mapping[node.name].subscribed is False:
2057 self.node_pubsub_mapping.pop(node.name)
2058 if not self.channels and not self.patterns and not self.shard_channels:
2059 # There are no subscriptions anymore, set subscribed_event flag
2060 # to false
2061 self.subscribed_event.clear()
2062 if self.ignore_subscribe_messages or ignore_subscribe_messages:
2063 return None
2064 return message
2066 def ssubscribe(self, *args, **kwargs):
2067 if args:
2068 args = list_or_args(args[0], args[1:])
2069 s_channels = dict.fromkeys(args)
2070 s_channels.update(kwargs)
2071 for s_channel, handler in s_channels.items():
2072 node = self.cluster.get_node_from_key(s_channel)
2073 pubsub = self._get_node_pubsub(node)
2074 if handler:
2075 pubsub.ssubscribe(**{s_channel: handler})
2076 else:
2077 pubsub.ssubscribe(s_channel)
2078 self.shard_channels.update(pubsub.shard_channels)
2079 self.pending_unsubscribe_shard_channels.difference_update(
2080 self._normalize_keys({s_channel: None})
2081 )
2082 if pubsub.subscribed and not self.subscribed:
2083 self.subscribed_event.set()
2084 self.health_check_response_counter = 0
2086 def sunsubscribe(self, *args):
2087 if args:
2088 args = list_or_args(args[0], args[1:])
2089 else:
2090 args = self.shard_channels
2092 for s_channel in args:
2093 node = self.cluster.get_node_from_key(s_channel)
2094 p = self._get_node_pubsub(node)
2095 p.sunsubscribe(s_channel)
2096 self.pending_unsubscribe_shard_channels.update(
2097 p.pending_unsubscribe_shard_channels
2098 )
2100 def get_redis_connection(self):
2101 """
2102 Get the Redis connection of the pubsub connected node.
2103 """
2104 if self.node is not None:
2105 return self.node.redis_connection
2107 def disconnect(self):
2108 """
2109 Disconnect the pubsub connection.
2110 """
2111 if self.connection:
2112 self.connection.disconnect()
2113 for pubsub in self.node_pubsub_mapping.values():
2114 pubsub.connection.disconnect()
2117class ClusterPipeline(RedisCluster):
2118 """
2119 Support for Redis pipeline
2120 in cluster mode
2121 """
2123 ERRORS_ALLOW_RETRY = (
2124 ConnectionError,
2125 TimeoutError,
2126 MovedError,
2127 AskError,
2128 TryAgainError,
2129 )
2131 NO_SLOTS_COMMANDS = {"UNWATCH"}
2132 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
2133 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
2135 @deprecated_args(
2136 args_to_warn=[
2137 "cluster_error_retry_attempts",
2138 ],
2139 reason="Please configure the 'retry' object instead",
2140 version="6.0.0",
2141 )
2142 def __init__(
2143 self,
2144 nodes_manager: "NodesManager",
2145 commands_parser: "CommandsParser",
2146 result_callbacks: Optional[Dict[str, Callable]] = None,
2147 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
2148 startup_nodes: Optional[List["ClusterNode"]] = None,
2149 read_from_replicas: bool = False,
2150 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2151 cluster_error_retry_attempts: int = 3,
2152 reinitialize_steps: int = 5,
2153 retry: Optional[Retry] = None,
2154 lock=None,
2155 transaction=False,
2156 **kwargs,
2157 ):
2158 """ """
2159 self.command_stack = []
2160 self.nodes_manager = nodes_manager
2161 self.commands_parser = commands_parser
2162 self.refresh_table_asap = False
2163 self.result_callbacks = (
2164 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
2165 )
2166 self.startup_nodes = startup_nodes if startup_nodes else []
2167 self.read_from_replicas = read_from_replicas
2168 self.load_balancing_strategy = load_balancing_strategy
2169 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
2170 self.cluster_response_callbacks = cluster_response_callbacks
2171 self.reinitialize_counter = 0
2172 self.reinitialize_steps = reinitialize_steps
2173 if retry is not None:
2174 self.retry = retry
2175 else:
2176 self.retry = Retry(
2177 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
2178 retries=cluster_error_retry_attempts,
2179 )
2181 self.encoder = Encoder(
2182 kwargs.get("encoding", "utf-8"),
2183 kwargs.get("encoding_errors", "strict"),
2184 kwargs.get("decode_responses", False),
2185 )
2186 if lock is None:
2187 lock = threading.RLock()
2188 self._lock = lock
2189 self.parent_execute_command = super().execute_command
2190 self._execution_strategy: ExecutionStrategy = (
2191 PipelineStrategy(self) if not transaction else TransactionStrategy(self)
2192 )
2194 def __repr__(self):
2195 """ """
2196 return f"{type(self).__name__}"
2198 def __enter__(self):
2199 """ """
2200 return self
2202 def __exit__(self, exc_type, exc_value, traceback):
2203 """ """
2204 self.reset()
2206 def __del__(self):
2207 try:
2208 self.reset()
2209 except Exception:
2210 pass
2212 def __len__(self):
2213 """ """
2214 return len(self._execution_strategy.command_queue)
2216 def __bool__(self):
2217 "Pipeline instances should always evaluate to True on Python 3+"
2218 return True
2220 def execute_command(self, *args, **kwargs):
2221 """
2222 Wrapper function for pipeline_execute_command
2223 """
2224 return self._execution_strategy.execute_command(*args, **kwargs)
2226 def pipeline_execute_command(self, *args, **options):
2227 """
2228 Stage a command to be executed when execute() is next called
2230 Returns the current Pipeline object back so commands can be
2231 chained together, such as:
2233 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
2235 At some other point, you can then run: pipe.execute(),
2236 which will execute all commands queued in the pipe.
2237 """
2238 return self._execution_strategy.execute_command(*args, **options)
2240 def annotate_exception(self, exception, number, command):
2241 """
2242 Provides extra context to the exception prior to it being handled
2243 """
2244 self._execution_strategy.annotate_exception(exception, number, command)
2246 def execute(self, raise_on_error: bool = True) -> List[Any]:
2247 """
2248 Execute all the commands in the current pipeline
2249 """
2251 try:
2252 return self._execution_strategy.execute(raise_on_error)
2253 finally:
2254 self.reset()
2256 def reset(self):
2257 """
2258 Reset back to empty pipeline.
2259 """
2260 self._execution_strategy.reset()
2262 def send_cluster_commands(
2263 self, stack, raise_on_error=True, allow_redirections=True
2264 ):
2265 return self._execution_strategy.send_cluster_commands(
2266 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
2267 )
2269 def exists(self, *keys):
2270 return self._execution_strategy.exists(*keys)
2272 def eval(self):
2273 """ """
2274 return self._execution_strategy.eval()
2276 def multi(self):
2277 """
2278 Start a transactional block of the pipeline after WATCH commands
2279 are issued. End the transactional block with `execute`.
2280 """
2281 self._execution_strategy.multi()
2283 def load_scripts(self):
2284 """ """
2285 self._execution_strategy.load_scripts()
2287 def discard(self):
2288 """ """
2289 self._execution_strategy.discard()
2291 def watch(self, *names):
2292 """Watches the values at keys ``names``"""
2293 self._execution_strategy.watch(*names)
2295 def unwatch(self):
2296 """Unwatches all previously specified keys"""
2297 self._execution_strategy.unwatch()
2299 def script_load_for_pipeline(self, *args, **kwargs):
2300 self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
2302 def delete(self, *names):
2303 self._execution_strategy.delete(*names)
2305 def unlink(self, *names):
2306 self._execution_strategy.unlink(*names)
2309def block_pipeline_command(name: str) -> Callable[..., Any]:
2310 """
2311 Prints error because some pipelined commands should
2312 be blocked when running in cluster-mode
2313 """
2315 def inner(*args, **kwargs):
2316 raise RedisClusterException(
2317 f"ERROR: Calling pipelined function {name} is blocked "
2318 f"when running redis in cluster mode..."
2319 )
2321 return inner
2324# Blocked pipeline commands
2325PIPELINE_BLOCKED_COMMANDS = (
2326 "BGREWRITEAOF",
2327 "BGSAVE",
2328 "BITOP",
2329 "BRPOPLPUSH",
2330 "CLIENT GETNAME",
2331 "CLIENT KILL",
2332 "CLIENT LIST",
2333 "CLIENT SETNAME",
2334 "CLIENT",
2335 "CONFIG GET",
2336 "CONFIG RESETSTAT",
2337 "CONFIG REWRITE",
2338 "CONFIG SET",
2339 "CONFIG",
2340 "DBSIZE",
2341 "ECHO",
2342 "EVALSHA",
2343 "FLUSHALL",
2344 "FLUSHDB",
2345 "INFO",
2346 "KEYS",
2347 "LASTSAVE",
2348 "MGET",
2349 "MGET NONATOMIC",
2350 "MOVE",
2351 "MSET",
2352 "MSET NONATOMIC",
2353 "MSETNX",
2354 "PFCOUNT",
2355 "PFMERGE",
2356 "PING",
2357 "PUBLISH",
2358 "RANDOMKEY",
2359 "READONLY",
2360 "READWRITE",
2361 "RENAME",
2362 "RENAMENX",
2363 "RPOPLPUSH",
2364 "SAVE",
2365 "SCAN",
2366 "SCRIPT EXISTS",
2367 "SCRIPT FLUSH",
2368 "SCRIPT KILL",
2369 "SCRIPT LOAD",
2370 "SCRIPT",
2371 "SDIFF",
2372 "SDIFFSTORE",
2373 "SENTINEL GET MASTER ADDR BY NAME",
2374 "SENTINEL MASTER",
2375 "SENTINEL MASTERS",
2376 "SENTINEL MONITOR",
2377 "SENTINEL REMOVE",
2378 "SENTINEL SENTINELS",
2379 "SENTINEL SET",
2380 "SENTINEL SLAVES",
2381 "SENTINEL",
2382 "SHUTDOWN",
2383 "SINTER",
2384 "SINTERSTORE",
2385 "SLAVEOF",
2386 "SLOWLOG GET",
2387 "SLOWLOG LEN",
2388 "SLOWLOG RESET",
2389 "SLOWLOG",
2390 "SMOVE",
2391 "SORT",
2392 "SUNION",
2393 "SUNIONSTORE",
2394 "TIME",
2395)
2396for command in PIPELINE_BLOCKED_COMMANDS:
2397 command = command.replace(" ", "_").lower()
2399 setattr(ClusterPipeline, command, block_pipeline_command(command))
2402class PipelineCommand:
2403 """ """
2405 def __init__(self, args, options=None, position=None):
2406 self.args = args
2407 if options is None:
2408 options = {}
2409 self.options = options
2410 self.position = position
2411 self.result = None
2412 self.node = None
2413 self.asking = False
2416class NodeCommands:
2417 """ """
2419 def __init__(self, parse_response, connection_pool, connection):
2420 """ """
2421 self.parse_response = parse_response
2422 self.connection_pool = connection_pool
2423 self.connection = connection
2424 self.commands = []
2426 def append(self, c):
2427 """ """
2428 self.commands.append(c)
2430 def write(self):
2431 """
2432 Code borrowed from Redis so it can be fixed
2433 """
2434 connection = self.connection
2435 commands = self.commands
2437 # We are going to clobber the commands with the write, so go ahead
2438 # and ensure that nothing is sitting there from a previous run.
2439 for c in commands:
2440 c.result = None
2442 # build up all commands into a single request to increase network perf
2443 # send all the commands and catch connection and timeout errors.
2444 try:
2445 connection.send_packed_command(
2446 connection.pack_commands([c.args for c in commands])
2447 )
2448 except (ConnectionError, TimeoutError) as e:
2449 for c in commands:
2450 c.result = e
2452 def read(self):
2453 """ """
2454 connection = self.connection
2455 for c in self.commands:
2456 # if there is a result on this command,
2457 # it means we ran into an exception
2458 # like a connection error. Trying to parse
2459 # a response on a connection that
2460 # is no longer open will result in a
2461 # connection error raised by redis-py.
2462 # but redis-py doesn't check in parse_response
2463 # that the sock object is
2464 # still set and if you try to
2465 # read from a closed connection, it will
2466 # result in an AttributeError because
2467 # it will do a readline() call on None.
2468 # This can have all kinds of nasty side-effects.
2469 # Treating this case as a connection error
2470 # is fine because it will dump
2471 # the connection object back into the
2472 # pool and on the next write, it will
2473 # explicitly open the connection and all will be well.
2474 if c.result is None:
2475 try:
2476 c.result = self.parse_response(connection, c.args[0], **c.options)
2477 except (ConnectionError, TimeoutError) as e:
2478 for c in self.commands:
2479 c.result = e
2480 return
2481 except RedisError:
2482 c.result = sys.exc_info()[1]
2485class ExecutionStrategy(ABC):
2486 @property
2487 @abstractmethod
2488 def command_queue(self):
2489 pass
2491 @abstractmethod
2492 def execute_command(self, *args, **kwargs):
2493 """
2494 Execution flow for current execution strategy.
2496 See: ClusterPipeline.execute_command()
2497 """
2498 pass
2500 @abstractmethod
2501 def annotate_exception(self, exception, number, command):
2502 """
2503 Annotate exception according to current execution strategy.
2505 See: ClusterPipeline.annotate_exception()
2506 """
2507 pass
2509 @abstractmethod
2510 def pipeline_execute_command(self, *args, **options):
2511 """
2512 Pipeline execution flow for current execution strategy.
2514 See: ClusterPipeline.pipeline_execute_command()
2515 """
2516 pass
2518 @abstractmethod
2519 def execute(self, raise_on_error: bool = True) -> List[Any]:
2520 """
2521 Executes current execution strategy.
2523 See: ClusterPipeline.execute()
2524 """
2525 pass
2527 @abstractmethod
2528 def send_cluster_commands(
2529 self, stack, raise_on_error=True, allow_redirections=True
2530 ):
2531 """
2532 Sends commands according to current execution strategy.
2534 See: ClusterPipeline.send_cluster_commands()
2535 """
2536 pass
2538 @abstractmethod
2539 def reset(self):
2540 """
2541 Resets current execution strategy.
2543 See: ClusterPipeline.reset()
2544 """
2545 pass
2547 @abstractmethod
2548 def exists(self, *keys):
2549 pass
2551 @abstractmethod
2552 def eval(self):
2553 pass
2555 @abstractmethod
2556 def multi(self):
2557 """
2558 Starts transactional context.
2560 See: ClusterPipeline.multi()
2561 """
2562 pass
2564 @abstractmethod
2565 def load_scripts(self):
2566 pass
2568 @abstractmethod
2569 def watch(self, *names):
2570 pass
2572 @abstractmethod
2573 def unwatch(self):
2574 """
2575 Unwatches all previously specified keys
2577 See: ClusterPipeline.unwatch()
2578 """
2579 pass
2581 @abstractmethod
2582 def script_load_for_pipeline(self, *args, **kwargs):
2583 pass
2585 @abstractmethod
2586 def delete(self, *names):
2587 """
2588 "Delete a key specified by ``names``"
2590 See: ClusterPipeline.delete()
2591 """
2592 pass
2594 @abstractmethod
2595 def unlink(self, *names):
2596 """
2597 "Unlink a key specified by ``names``"
2599 See: ClusterPipeline.unlink()
2600 """
2601 pass
2603 @abstractmethod
2604 def discard(self):
2605 pass
2608class AbstractStrategy(ExecutionStrategy):
2609 def __init__(
2610 self,
2611 pipe: ClusterPipeline,
2612 ):
2613 self._command_queue: List[PipelineCommand] = []
2614 self._pipe = pipe
2615 self._nodes_manager = self._pipe.nodes_manager
2617 @property
2618 def command_queue(self):
2619 return self._command_queue
2621 @command_queue.setter
2622 def command_queue(self, queue: List[PipelineCommand]):
2623 self._command_queue = queue
2625 @abstractmethod
2626 def execute_command(self, *args, **kwargs):
2627 pass
2629 def pipeline_execute_command(self, *args, **options):
2630 self._command_queue.append(
2631 PipelineCommand(args, options, len(self._command_queue))
2632 )
2633 return self._pipe
2635 @abstractmethod
2636 def execute(self, raise_on_error: bool = True) -> List[Any]:
2637 pass
2639 @abstractmethod
2640 def send_cluster_commands(
2641 self, stack, raise_on_error=True, allow_redirections=True
2642 ):
2643 pass
2645 @abstractmethod
2646 def reset(self):
2647 pass
2649 def exists(self, *keys):
2650 return self.execute_command("EXISTS", *keys)
2652 def eval(self):
2653 """ """
2654 raise RedisClusterException("method eval() is not implemented")
2656 def load_scripts(self):
2657 """ """
2658 raise RedisClusterException("method load_scripts() is not implemented")
2660 def script_load_for_pipeline(self, *args, **kwargs):
2661 """ """
2662 raise RedisClusterException(
2663 "method script_load_for_pipeline() is not implemented"
2664 )
2666 def annotate_exception(self, exception, number, command):
2667 """
2668 Provides extra context to the exception prior to it being handled
2669 """
2670 cmd = " ".join(map(safe_str, command))
2671 msg = (
2672 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
2673 f"caused error: {exception.args[0]}"
2674 )
2675 exception.args = (msg,) + exception.args[1:]
2678class PipelineStrategy(AbstractStrategy):
2679 def __init__(self, pipe: ClusterPipeline):
2680 super().__init__(pipe)
2681 self.command_flags = pipe.command_flags
2683 def execute_command(self, *args, **kwargs):
2684 return self.pipeline_execute_command(*args, **kwargs)
2686 def _raise_first_error(self, stack):
2687 """
2688 Raise the first exception on the stack
2689 """
2690 for c in stack:
2691 r = c.result
2692 if isinstance(r, Exception):
2693 self.annotate_exception(r, c.position + 1, c.args)
2694 raise r
2696 def execute(self, raise_on_error: bool = True) -> List[Any]:
2697 stack = self._command_queue
2698 if not stack:
2699 return []
2701 try:
2702 return self.send_cluster_commands(stack, raise_on_error)
2703 finally:
2704 self.reset()
2706 def reset(self):
2707 """
2708 Reset back to empty pipeline.
2709 """
2710 self._command_queue = []
2712 def send_cluster_commands(
2713 self, stack, raise_on_error=True, allow_redirections=True
2714 ):
2715 """
2716 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
2718 If one of the retryable exceptions has been thrown we assume that:
2719 - connection_pool was disconnected
2720 - connection_pool was reset
2721 - refresh_table_asap set to True
2723 It will try the number of times specified by
2724 the retries in config option "self.retry"
2725 which defaults to 3 unless manually configured.
2727 If it reaches the number of times, the command will
2728 raises ClusterDownException.
2729 """
2730 if not stack:
2731 return []
2732 retry_attempts = self._pipe.retry.get_retries()
2733 while True:
2734 try:
2735 return self._send_cluster_commands(
2736 stack,
2737 raise_on_error=raise_on_error,
2738 allow_redirections=allow_redirections,
2739 )
2740 except RedisCluster.ERRORS_ALLOW_RETRY as e:
2741 if retry_attempts > 0:
2742 # Try again with the new cluster setup. All other errors
2743 # should be raised.
2744 retry_attempts -= 1
2745 pass
2746 else:
2747 raise e
2749 def _send_cluster_commands(
2750 self, stack, raise_on_error=True, allow_redirections=True
2751 ):
2752 """
2753 Send a bunch of cluster commands to the redis cluster.
2755 `allow_redirections` If the pipeline should follow
2756 `ASK` & `MOVED` responses automatically. If set
2757 to false it will raise RedisClusterException.
2758 """
2759 # the first time sending the commands we send all of
2760 # the commands that were queued up.
2761 # if we have to run through it again, we only retry
2762 # the commands that failed.
2763 attempt = sorted(stack, key=lambda x: x.position)
2764 is_default_node = False
2765 # build a list of node objects based on node names we need to
2766 nodes = {}
2768 # as we move through each command that still needs to be processed,
2769 # we figure out the slot number that command maps to, then from
2770 # the slot determine the node.
2771 for c in attempt:
2772 while True:
2773 # refer to our internal node -> slot table that
2774 # tells us where a given command should route to.
2775 # (it might be possible we have a cached node that no longer
2776 # exists in the cluster, which is why we do this in a loop)
2777 passed_targets = c.options.pop("target_nodes", None)
2778 if passed_targets and not self._is_nodes_flag(passed_targets):
2779 target_nodes = self._parse_target_nodes(passed_targets)
2780 else:
2781 target_nodes = self._determine_nodes(
2782 *c.args, node_flag=passed_targets
2783 )
2784 if not target_nodes:
2785 raise RedisClusterException(
2786 f"No targets were found to execute {c.args} command on"
2787 )
2788 if len(target_nodes) > 1:
2789 raise RedisClusterException(
2790 f"Too many targets for command {c.args}"
2791 )
2793 node = target_nodes[0]
2794 if node == self._pipe.get_default_node():
2795 is_default_node = True
2797 # now that we know the name of the node
2798 # ( it's just a string in the form of host:port )
2799 # we can build a list of commands for each node.
2800 node_name = node.name
2801 if node_name not in nodes:
2802 redis_node = self._pipe.get_redis_connection(node)
2803 try:
2804 connection = get_connection(redis_node)
2805 except (ConnectionError, TimeoutError):
2806 for n in nodes.values():
2807 n.connection_pool.release(n.connection)
2808 # Connection retries are being handled in the node's
2809 # Retry object. Reinitialize the node -> slot table.
2810 self._nodes_manager.initialize()
2811 if is_default_node:
2812 self._pipe.replace_default_node()
2813 raise
2814 nodes[node_name] = NodeCommands(
2815 redis_node.parse_response,
2816 redis_node.connection_pool,
2817 connection,
2818 )
2819 nodes[node_name].append(c)
2820 break
2822 # send the commands in sequence.
2823 # we write to all the open sockets for each node first,
2824 # before reading anything
2825 # this allows us to flush all the requests out across the
2826 # network
2827 # so that we can read them from different sockets as they come back.
2828 # we dont' multiplex on the sockets as they come available,
2829 # but that shouldn't make too much difference.
2830 try:
2831 node_commands = nodes.values()
2832 for n in node_commands:
2833 n.write()
2835 for n in node_commands:
2836 n.read()
2837 finally:
2838 # release all of the redis connections we allocated earlier
2839 # back into the connection pool.
2840 # we used to do this step as part of a try/finally block,
2841 # but it is really dangerous to
2842 # release connections back into the pool if for some
2843 # reason the socket has data still left in it
2844 # from a previous operation. The write and
2845 # read operations already have try/catch around them for
2846 # all known types of errors including connection
2847 # and socket level errors.
2848 # So if we hit an exception, something really bad
2849 # happened and putting any oF
2850 # these connections back into the pool is a very bad idea.
2851 # the socket might have unread buffer still sitting in it,
2852 # and then the next time we read from it we pass the
2853 # buffered result back from a previous command and
2854 # every single request after to that connection will always get
2855 # a mismatched result.
2856 for n in nodes.values():
2857 n.connection_pool.release(n.connection)
2859 # if the response isn't an exception it is a
2860 # valid response from the node
2861 # we're all done with that command, YAY!
2862 # if we have more commands to attempt, we've run into problems.
2863 # collect all the commands we are allowed to retry.
2864 # (MOVED, ASK, or connection errors or timeout errors)
2865 attempt = sorted(
2866 (
2867 c
2868 for c in attempt
2869 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2870 ),
2871 key=lambda x: x.position,
2872 )
2873 if attempt and allow_redirections:
2874 # RETRY MAGIC HAPPENS HERE!
2875 # send these remaining commands one at a time using `execute_command`
2876 # in the main client. This keeps our retry logic
2877 # in one place mostly,
2878 # and allows us to be more confident in correctness of behavior.
2879 # at this point any speed gains from pipelining have been lost
2880 # anyway, so we might as well make the best
2881 # attempt to get the correct behavior.
2882 #
2883 # The client command will handle retries for each
2884 # individual command sequentially as we pass each
2885 # one into `execute_command`. Any exceptions
2886 # that bubble out should only appear once all
2887 # retries have been exhausted.
2888 #
2889 # If a lot of commands have failed, we'll be setting the
2890 # flag to rebuild the slots table from scratch.
2891 # So MOVED errors should correct themselves fairly quickly.
2892 self._pipe.reinitialize_counter += 1
2893 if self._pipe._should_reinitialized():
2894 self._nodes_manager.initialize()
2895 if is_default_node:
2896 self._pipe.replace_default_node()
2897 for c in attempt:
2898 try:
2899 # send each command individually like we
2900 # do in the main client.
2901 c.result = self._pipe.parent_execute_command(*c.args, **c.options)
2902 except RedisError as e:
2903 c.result = e
2905 # turn the response back into a simple flat array that corresponds
2906 # to the sequence of commands issued in the stack in pipeline.execute()
2907 response = []
2908 for c in sorted(stack, key=lambda x: x.position):
2909 if c.args[0] in self._pipe.cluster_response_callbacks:
2910 # Remove keys entry, it needs only for cache.
2911 c.options.pop("keys", None)
2912 c.result = self._pipe.cluster_response_callbacks[c.args[0]](
2913 c.result, **c.options
2914 )
2915 response.append(c.result)
2917 if raise_on_error:
2918 self._raise_first_error(stack)
2920 return response
2922 def _is_nodes_flag(self, target_nodes):
2923 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
2925 def _parse_target_nodes(self, target_nodes):
2926 if isinstance(target_nodes, list):
2927 nodes = target_nodes
2928 elif isinstance(target_nodes, ClusterNode):
2929 # Supports passing a single ClusterNode as a variable
2930 nodes = [target_nodes]
2931 elif isinstance(target_nodes, dict):
2932 # Supports dictionaries of the format {node_name: node}.
2933 # It enables to execute commands with multi nodes as follows:
2934 # rc.cluster_save_config(rc.get_primaries())
2935 nodes = target_nodes.values()
2936 else:
2937 raise TypeError(
2938 "target_nodes type can be one of the following: "
2939 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
2940 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
2941 f"The passed type is {type(target_nodes)}"
2942 )
2943 return nodes
2945 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
2946 # Determine which nodes should be executed the command on.
2947 # Returns a list of target nodes.
2948 command = args[0].upper()
2949 if (
2950 len(args) >= 2
2951 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
2952 ):
2953 command = f"{args[0]} {args[1]}".upper()
2955 nodes_flag = kwargs.pop("nodes_flag", None)
2956 if nodes_flag is not None:
2957 # nodes flag passed by the user
2958 command_flag = nodes_flag
2959 else:
2960 # get the nodes group for this command if it was predefined
2961 command_flag = self._pipe.command_flags.get(command)
2962 if command_flag == self._pipe.RANDOM:
2963 # return a random node
2964 return [self._pipe.get_random_node()]
2965 elif command_flag == self._pipe.PRIMARIES:
2966 # return all primaries
2967 return self._pipe.get_primaries()
2968 elif command_flag == self._pipe.REPLICAS:
2969 # return all replicas
2970 return self._pipe.get_replicas()
2971 elif command_flag == self._pipe.ALL_NODES:
2972 # return all nodes
2973 return self._pipe.get_nodes()
2974 elif command_flag == self._pipe.DEFAULT_NODE:
2975 # return the cluster's default node
2976 return [self._nodes_manager.default_node]
2977 elif command in self._pipe.SEARCH_COMMANDS[0]:
2978 return [self._nodes_manager.default_node]
2979 else:
2980 # get the node that holds the key's slot
2981 slot = self._pipe.determine_slot(*args)
2982 node = self._nodes_manager.get_node_from_slot(
2983 slot,
2984 self._pipe.read_from_replicas and command in READ_COMMANDS,
2985 self._pipe.load_balancing_strategy
2986 if command in READ_COMMANDS
2987 else None,
2988 )
2989 return [node]
2991 def multi(self):
2992 raise RedisClusterException(
2993 "method multi() is not supported outside of transactional context"
2994 )
2996 def discard(self):
2997 raise RedisClusterException(
2998 "method discard() is not supported outside of transactional context"
2999 )
3001 def watch(self, *names):
3002 raise RedisClusterException(
3003 "method watch() is not supported outside of transactional context"
3004 )
3006 def unwatch(self, *names):
3007 raise RedisClusterException(
3008 "method unwatch() is not supported outside of transactional context"
3009 )
3011 def delete(self, *names):
3012 if len(names) != 1:
3013 raise RedisClusterException(
3014 "deleting multiple keys is not implemented in pipeline command"
3015 )
3017 return self.execute_command("DEL", names[0])
3019 def unlink(self, *names):
3020 if len(names) != 1:
3021 raise RedisClusterException(
3022 "unlinking multiple keys is not implemented in pipeline command"
3023 )
3025 return self.execute_command("UNLINK", names[0])
3028class TransactionStrategy(AbstractStrategy):
3029 NO_SLOTS_COMMANDS = {"UNWATCH"}
3030 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
3031 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3032 SLOT_REDIRECT_ERRORS = (AskError, MovedError)
3033 CONNECTION_ERRORS = (
3034 ConnectionError,
3035 OSError,
3036 ClusterDownError,
3037 SlotNotCoveredError,
3038 )
3040 def __init__(self, pipe: ClusterPipeline):
3041 super().__init__(pipe)
3042 self._explicit_transaction = False
3043 self._watching = False
3044 self._pipeline_slots: Set[int] = set()
3045 self._transaction_connection: Optional[Connection] = None
3046 self._executing = False
3047 self._retry = copy(self._pipe.retry)
3048 self._retry.update_supported_errors(
3049 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
3050 )
3052 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
3053 """
3054 Find a connection for a pipeline transaction.
3056 For running an atomic transaction, watch keys ensure that contents have not been
3057 altered as long as the watch commands for those keys were sent over the same
3058 connection. So once we start watching a key, we fetch a connection to the
3059 node that owns that slot and reuse it.
3060 """
3061 if not self._pipeline_slots:
3062 raise RedisClusterException(
3063 "At least a command with a key is needed to identify a node"
3064 )
3066 node: ClusterNode = self._nodes_manager.get_node_from_slot(
3067 list(self._pipeline_slots)[0], False
3068 )
3069 redis_node: Redis = self._pipe.get_redis_connection(node)
3070 if self._transaction_connection:
3071 if not redis_node.connection_pool.owns_connection(
3072 self._transaction_connection
3073 ):
3074 previous_node = self._nodes_manager.find_connection_owner(
3075 self._transaction_connection
3076 )
3077 previous_node.connection_pool.release(self._transaction_connection)
3078 self._transaction_connection = None
3080 if not self._transaction_connection:
3081 self._transaction_connection = get_connection(redis_node)
3083 return redis_node, self._transaction_connection
3085 def execute_command(self, *args, **kwargs):
3086 slot_number: Optional[int] = None
3087 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
3088 slot_number = self._pipe.determine_slot(*args)
3090 if (
3091 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
3092 ) and not self._explicit_transaction:
3093 if args[0] == "WATCH":
3094 self._validate_watch()
3096 if slot_number is not None:
3097 if self._pipeline_slots and slot_number not in self._pipeline_slots:
3098 raise CrossSlotTransactionError(
3099 "Cannot watch or send commands on different slots"
3100 )
3102 self._pipeline_slots.add(slot_number)
3103 elif args[0] not in self.NO_SLOTS_COMMANDS:
3104 raise RedisClusterException(
3105 f"Cannot identify slot number for command: {args[0]},"
3106 "it cannot be triggered in a transaction"
3107 )
3109 return self._immediate_execute_command(*args, **kwargs)
3110 else:
3111 if slot_number is not None:
3112 self._pipeline_slots.add(slot_number)
3114 return self.pipeline_execute_command(*args, **kwargs)
3116 def _validate_watch(self):
3117 if self._explicit_transaction:
3118 raise RedisError("Cannot issue a WATCH after a MULTI")
3120 self._watching = True
3122 def _immediate_execute_command(self, *args, **options):
3123 return self._retry.call_with_retry(
3124 lambda: self._get_connection_and_send_command(*args, **options),
3125 self._reinitialize_on_error,
3126 )
3128 def _get_connection_and_send_command(self, *args, **options):
3129 redis_node, connection = self._get_client_and_connection_for_transaction()
3130 return self._send_command_parse_response(
3131 connection, redis_node, args[0], *args, **options
3132 )
3134 def _send_command_parse_response(
3135 self, conn, redis_node: Redis, command_name, *args, **options
3136 ):
3137 """
3138 Send a command and parse the response
3139 """
3141 conn.send_command(*args)
3142 output = redis_node.parse_response(conn, command_name, **options)
3144 if command_name in self.UNWATCH_COMMANDS:
3145 self._watching = False
3146 return output
3148 def _reinitialize_on_error(self, error):
3149 if self._watching:
3150 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
3151 raise WatchError("Slot rebalancing occurred while watching keys")
3153 if (
3154 type(error) in self.SLOT_REDIRECT_ERRORS
3155 or type(error) in self.CONNECTION_ERRORS
3156 ):
3157 if self._transaction_connection:
3158 self._transaction_connection = None
3160 self._pipe.reinitialize_counter += 1
3161 if self._pipe._should_reinitialized():
3162 self._nodes_manager.initialize()
3163 self.reinitialize_counter = 0
3164 else:
3165 self._nodes_manager.update_moved_exception(error)
3167 self._executing = False
3169 def _raise_first_error(self, responses, stack):
3170 """
3171 Raise the first exception on the stack
3172 """
3173 for r, cmd in zip(responses, stack):
3174 if isinstance(r, Exception):
3175 self.annotate_exception(r, cmd.position + 1, cmd.args)
3176 raise r
3178 def execute(self, raise_on_error: bool = True) -> List[Any]:
3179 stack = self._command_queue
3180 if not stack and (not self._watching or not self._pipeline_slots):
3181 return []
3183 return self._execute_transaction_with_retries(stack, raise_on_error)
3185 def _execute_transaction_with_retries(
3186 self, stack: List["PipelineCommand"], raise_on_error: bool
3187 ):
3188 return self._retry.call_with_retry(
3189 lambda: self._execute_transaction(stack, raise_on_error),
3190 self._reinitialize_on_error,
3191 )
3193 def _execute_transaction(
3194 self, stack: List["PipelineCommand"], raise_on_error: bool
3195 ):
3196 if len(self._pipeline_slots) > 1:
3197 raise CrossSlotTransactionError(
3198 "All keys involved in a cluster transaction must map to the same slot"
3199 )
3201 self._executing = True
3203 redis_node, connection = self._get_client_and_connection_for_transaction()
3205 stack = chain(
3206 [PipelineCommand(("MULTI",))],
3207 stack,
3208 [PipelineCommand(("EXEC",))],
3209 )
3210 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
3211 packed_commands = connection.pack_commands(commands)
3212 connection.send_packed_command(packed_commands)
3213 errors = []
3215 # parse off the response for MULTI
3216 # NOTE: we need to handle ResponseErrors here and continue
3217 # so that we read all the additional command messages from
3218 # the socket
3219 try:
3220 redis_node.parse_response(connection, "MULTI")
3221 except ResponseError as e:
3222 self.annotate_exception(e, 0, "MULTI")
3223 errors.append(e)
3224 except self.CONNECTION_ERRORS as cluster_error:
3225 self.annotate_exception(cluster_error, 0, "MULTI")
3226 raise
3228 # and all the other commands
3229 for i, command in enumerate(self._command_queue):
3230 if EMPTY_RESPONSE in command.options:
3231 errors.append((i, command.options[EMPTY_RESPONSE]))
3232 else:
3233 try:
3234 _ = redis_node.parse_response(connection, "_")
3235 except self.SLOT_REDIRECT_ERRORS as slot_error:
3236 self.annotate_exception(slot_error, i + 1, command.args)
3237 errors.append(slot_error)
3238 except self.CONNECTION_ERRORS as cluster_error:
3239 self.annotate_exception(cluster_error, i + 1, command.args)
3240 raise
3241 except ResponseError as e:
3242 self.annotate_exception(e, i + 1, command.args)
3243 errors.append(e)
3245 response = None
3246 # parse the EXEC.
3247 try:
3248 response = redis_node.parse_response(connection, "EXEC")
3249 except ExecAbortError:
3250 if errors:
3251 raise errors[0]
3252 raise
3254 self._executing = False
3256 # EXEC clears any watched keys
3257 self._watching = False
3259 if response is None:
3260 raise WatchError("Watched variable changed.")
3262 # put any parse errors into the response
3263 for i, e in errors:
3264 response.insert(i, e)
3266 if len(response) != len(self._command_queue):
3267 raise InvalidPipelineStack(
3268 "Unexpected response length for cluster pipeline EXEC."
3269 " Command stack was {} but response had length {}".format(
3270 [c.args[0] for c in self._command_queue], len(response)
3271 )
3272 )
3274 # find any errors in the response and raise if necessary
3275 if raise_on_error or len(errors) > 0:
3276 self._raise_first_error(
3277 response,
3278 self._command_queue,
3279 )
3281 # We have to run response callbacks manually
3282 data = []
3283 for r, cmd in zip(response, self._command_queue):
3284 if not isinstance(r, Exception):
3285 command_name = cmd.args[0]
3286 if command_name in self._pipe.cluster_response_callbacks:
3287 r = self._pipe.cluster_response_callbacks[command_name](
3288 r, **cmd.options
3289 )
3290 data.append(r)
3291 return data
3293 def reset(self):
3294 self._command_queue = []
3296 # make sure to reset the connection state in the event that we were
3297 # watching something
3298 if self._transaction_connection:
3299 try:
3300 if self._watching:
3301 # call this manually since our unwatch or
3302 # immediate_execute_command methods can call reset()
3303 self._transaction_connection.send_command("UNWATCH")
3304 self._transaction_connection.read_response()
3305 # we can safely return the connection to the pool here since we're
3306 # sure we're no longer WATCHing anything
3307 node = self._nodes_manager.find_connection_owner(
3308 self._transaction_connection
3309 )
3310 node.redis_connection.connection_pool.release(
3311 self._transaction_connection
3312 )
3313 self._transaction_connection = None
3314 except self.CONNECTION_ERRORS:
3315 # disconnect will also remove any previous WATCHes
3316 if self._transaction_connection:
3317 self._transaction_connection.disconnect()
3319 # clean up the other instance attributes
3320 self._watching = False
3321 self._explicit_transaction = False
3322 self._pipeline_slots = set()
3323 self._executing = False
3325 def send_cluster_commands(
3326 self, stack, raise_on_error=True, allow_redirections=True
3327 ):
3328 raise NotImplementedError(
3329 "send_cluster_commands cannot be executed in transactional context."
3330 )
3332 def multi(self):
3333 if self._explicit_transaction:
3334 raise RedisError("Cannot issue nested calls to MULTI")
3335 if self._command_queue:
3336 raise RedisError(
3337 "Commands without an initial WATCH have already been issued"
3338 )
3339 self._explicit_transaction = True
3341 def watch(self, *names):
3342 if self._explicit_transaction:
3343 raise RedisError("Cannot issue a WATCH after a MULTI")
3345 return self.execute_command("WATCH", *names)
3347 def unwatch(self):
3348 if self._watching:
3349 return self.execute_command("UNWATCH")
3351 return True
3353 def discard(self):
3354 self.reset()
3356 def delete(self, *names):
3357 return self.execute_command("DEL", *names)
3359 def unlink(self, *names):
3360 return self.execute_command("UNLINK", *names)