Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/cluster.py: 18%
927 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
1import random
2import socket
3import sys
4import threading
5import time
6from collections import OrderedDict
7from typing import Any, Callable, Dict, List, Optional, Tuple, Union
9from redis._parsers import CommandsParser, Encoder
10from redis._parsers.helpers import parse_scan
11from redis.backoff import default_backoff
12from redis.client import CaseInsensitiveDict, PubSub, Redis
13from redis.commands import READ_COMMANDS, RedisClusterCommands
14from redis.commands.helpers import list_or_args
15from redis.connection import ConnectionPool, DefaultParser, parse_url
16from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
17from redis.exceptions import (
18 AskError,
19 AuthenticationError,
20 ClusterCrossSlotError,
21 ClusterDownError,
22 ClusterError,
23 ConnectionError,
24 DataError,
25 MasterDownError,
26 MovedError,
27 RedisClusterException,
28 RedisError,
29 ResponseError,
30 SlotNotCoveredError,
31 TimeoutError,
32 TryAgainError,
33)
34from redis.lock import Lock
35from redis.retry import Retry
36from redis.utils import (
37 HIREDIS_AVAILABLE,
38 dict_merge,
39 list_keys_to_dict,
40 merge_result,
41 safe_str,
42 str_if_bytes,
43)
46def get_node_name(host: str, port: Union[str, int]) -> str:
47 return f"{host}:{port}"
50def get_connection(redis_node, *args, **options):
51 return redis_node.connection or redis_node.connection_pool.get_connection(
52 args[0], **options
53 )
56def parse_scan_result(command, res, **options):
57 cursors = {}
58 ret = []
59 for node_name, response in res.items():
60 cursor, r = parse_scan(response, **options)
61 cursors[node_name] = cursor
62 ret += r
64 return cursors, ret
67def parse_pubsub_numsub(command, res, **options):
68 numsub_d = OrderedDict()
69 for numsub_tups in res.values():
70 for channel, numsubbed in numsub_tups:
71 try:
72 numsub_d[channel] += numsubbed
73 except KeyError:
74 numsub_d[channel] = numsubbed
76 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
77 return ret_numsub
80def parse_cluster_slots(
81 resp: Any, **options: Any
82) -> Dict[Tuple[int, int], Dict[str, Any]]:
83 current_host = options.get("current_host", "")
85 def fix_server(*args: Any) -> Tuple[str, Any]:
86 return str_if_bytes(args[0]) or current_host, args[1]
88 slots = {}
89 for slot in resp:
90 start, end, primary = slot[:3]
91 replicas = slot[3:]
92 slots[start, end] = {
93 "primary": fix_server(*primary),
94 "replicas": [fix_server(*replica) for replica in replicas],
95 }
97 return slots
100def parse_cluster_shards(resp, **options):
101 """
102 Parse CLUSTER SHARDS response.
103 """
104 if isinstance(resp[0], dict):
105 return resp
106 shards = []
107 for x in resp:
108 shard = {"slots": [], "nodes": []}
109 for i in range(0, len(x[1]), 2):
110 shard["slots"].append((x[1][i], (x[1][i + 1])))
111 nodes = x[3]
112 for node in nodes:
113 dict_node = {}
114 for i in range(0, len(node), 2):
115 dict_node[node[i]] = node[i + 1]
116 shard["nodes"].append(dict_node)
117 shards.append(shard)
119 return shards
122def parse_cluster_myshardid(resp, **options):
123 """
124 Parse CLUSTER MYSHARDID response.
125 """
126 return resp.decode("utf-8")
129PRIMARY = "primary"
130REPLICA = "replica"
131SLOT_ID = "slot-id"
133REDIS_ALLOWED_KEYS = (
134 "charset",
135 "connection_class",
136 "connection_pool",
137 "connection_pool_class",
138 "client_name",
139 "credential_provider",
140 "db",
141 "decode_responses",
142 "encoding",
143 "encoding_errors",
144 "errors",
145 "host",
146 "lib_name",
147 "lib_version",
148 "max_connections",
149 "nodes_flag",
150 "redis_connect_func",
151 "password",
152 "port",
153 "queue_class",
154 "retry",
155 "retry_on_timeout",
156 "protocol",
157 "socket_connect_timeout",
158 "socket_keepalive",
159 "socket_keepalive_options",
160 "socket_timeout",
161 "ssl",
162 "ssl_ca_certs",
163 "ssl_ca_data",
164 "ssl_certfile",
165 "ssl_cert_reqs",
166 "ssl_keyfile",
167 "ssl_password",
168 "unix_socket_path",
169 "username",
170 "cache_enabled",
171 "client_cache",
172 "cache_max_size",
173 "cache_ttl",
174 "cache_policy",
175 "cache_blacklist",
176 "cache_whitelist",
177)
178KWARGS_DISABLED_KEYS = ("host", "port")
181def cleanup_kwargs(**kwargs):
182 """
183 Remove unsupported or disabled keys from kwargs
184 """
185 connection_kwargs = {
186 k: v
187 for k, v in kwargs.items()
188 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
189 }
191 return connection_kwargs
194class ClusterParser(DefaultParser):
195 EXCEPTION_CLASSES = dict_merge(
196 DefaultParser.EXCEPTION_CLASSES,
197 {
198 "ASK": AskError,
199 "TRYAGAIN": TryAgainError,
200 "MOVED": MovedError,
201 "CLUSTERDOWN": ClusterDownError,
202 "CROSSSLOT": ClusterCrossSlotError,
203 "MASTERDOWN": MasterDownError,
204 },
205 )
208class AbstractRedisCluster:
209 RedisClusterRequestTTL = 16
211 PRIMARIES = "primaries"
212 REPLICAS = "replicas"
213 ALL_NODES = "all"
214 RANDOM = "random"
215 DEFAULT_NODE = "default-node"
217 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
219 COMMAND_FLAGS = dict_merge(
220 list_keys_to_dict(
221 [
222 "ACL CAT",
223 "ACL DELUSER",
224 "ACL DRYRUN",
225 "ACL GENPASS",
226 "ACL GETUSER",
227 "ACL HELP",
228 "ACL LIST",
229 "ACL LOG",
230 "ACL LOAD",
231 "ACL SAVE",
232 "ACL SETUSER",
233 "ACL USERS",
234 "ACL WHOAMI",
235 "AUTH",
236 "CLIENT LIST",
237 "CLIENT SETINFO",
238 "CLIENT SETNAME",
239 "CLIENT GETNAME",
240 "CONFIG SET",
241 "CONFIG REWRITE",
242 "CONFIG RESETSTAT",
243 "TIME",
244 "PUBSUB CHANNELS",
245 "PUBSUB NUMPAT",
246 "PUBSUB NUMSUB",
247 "PUBSUB SHARDCHANNELS",
248 "PUBSUB SHARDNUMSUB",
249 "PING",
250 "INFO",
251 "SHUTDOWN",
252 "KEYS",
253 "DBSIZE",
254 "BGSAVE",
255 "SLOWLOG GET",
256 "SLOWLOG LEN",
257 "SLOWLOG RESET",
258 "WAIT",
259 "WAITAOF",
260 "SAVE",
261 "MEMORY PURGE",
262 "MEMORY MALLOC-STATS",
263 "MEMORY STATS",
264 "LASTSAVE",
265 "CLIENT TRACKINGINFO",
266 "CLIENT PAUSE",
267 "CLIENT UNPAUSE",
268 "CLIENT UNBLOCK",
269 "CLIENT ID",
270 "CLIENT REPLY",
271 "CLIENT GETREDIR",
272 "CLIENT INFO",
273 "CLIENT KILL",
274 "READONLY",
275 "CLUSTER INFO",
276 "CLUSTER MEET",
277 "CLUSTER MYSHARDID",
278 "CLUSTER NODES",
279 "CLUSTER REPLICAS",
280 "CLUSTER RESET",
281 "CLUSTER SET-CONFIG-EPOCH",
282 "CLUSTER SLOTS",
283 "CLUSTER SHARDS",
284 "CLUSTER COUNT-FAILURE-REPORTS",
285 "CLUSTER KEYSLOT",
286 "COMMAND",
287 "COMMAND COUNT",
288 "COMMAND LIST",
289 "COMMAND GETKEYS",
290 "CONFIG GET",
291 "DEBUG",
292 "RANDOMKEY",
293 "READONLY",
294 "READWRITE",
295 "TIME",
296 "TFUNCTION LOAD",
297 "TFUNCTION DELETE",
298 "TFUNCTION LIST",
299 "TFCALL",
300 "TFCALLASYNC",
301 "GRAPH.CONFIG",
302 "LATENCY HISTORY",
303 "LATENCY LATEST",
304 "LATENCY RESET",
305 "MODULE LIST",
306 "MODULE LOAD",
307 "MODULE UNLOAD",
308 "MODULE LOADEX",
309 ],
310 DEFAULT_NODE,
311 ),
312 list_keys_to_dict(
313 [
314 "FLUSHALL",
315 "FLUSHDB",
316 "FUNCTION DELETE",
317 "FUNCTION FLUSH",
318 "FUNCTION LIST",
319 "FUNCTION LOAD",
320 "FUNCTION RESTORE",
321 "REDISGEARS_2.REFRESHCLUSTER",
322 "SCAN",
323 "SCRIPT EXISTS",
324 "SCRIPT FLUSH",
325 "SCRIPT LOAD",
326 ],
327 PRIMARIES,
328 ),
329 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
330 list_keys_to_dict(
331 [
332 "CLUSTER COUNTKEYSINSLOT",
333 "CLUSTER DELSLOTS",
334 "CLUSTER DELSLOTSRANGE",
335 "CLUSTER GETKEYSINSLOT",
336 "CLUSTER SETSLOT",
337 ],
338 SLOT_ID,
339 ),
340 )
342 SEARCH_COMMANDS = (
343 [
344 "FT.CREATE",
345 "FT.SEARCH",
346 "FT.AGGREGATE",
347 "FT.EXPLAIN",
348 "FT.EXPLAINCLI",
349 "FT,PROFILE",
350 "FT.ALTER",
351 "FT.DROPINDEX",
352 "FT.ALIASADD",
353 "FT.ALIASUPDATE",
354 "FT.ALIASDEL",
355 "FT.TAGVALS",
356 "FT.SUGADD",
357 "FT.SUGGET",
358 "FT.SUGDEL",
359 "FT.SUGLEN",
360 "FT.SYNUPDATE",
361 "FT.SYNDUMP",
362 "FT.SPELLCHECK",
363 "FT.DICTADD",
364 "FT.DICTDEL",
365 "FT.DICTDUMP",
366 "FT.INFO",
367 "FT._LIST",
368 "FT.CONFIG",
369 "FT.ADD",
370 "FT.DEL",
371 "FT.DROP",
372 "FT.GET",
373 "FT.MGET",
374 "FT.SYNADD",
375 ],
376 )
378 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
379 "CLUSTER SLOTS": parse_cluster_slots,
380 "CLUSTER SHARDS": parse_cluster_shards,
381 "CLUSTER MYSHARDID": parse_cluster_myshardid,
382 }
384 RESULT_CALLBACKS = dict_merge(
385 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
386 list_keys_to_dict(
387 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
388 ),
389 list_keys_to_dict(
390 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
391 ),
392 list_keys_to_dict(
393 [
394 "PING",
395 "CONFIG SET",
396 "CONFIG REWRITE",
397 "CONFIG RESETSTAT",
398 "CLIENT SETNAME",
399 "BGSAVE",
400 "SLOWLOG RESET",
401 "SAVE",
402 "MEMORY PURGE",
403 "CLIENT PAUSE",
404 "CLIENT UNPAUSE",
405 ],
406 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
407 ),
408 list_keys_to_dict(
409 ["DBSIZE", "WAIT"],
410 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
411 ),
412 list_keys_to_dict(
413 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
414 ),
415 list_keys_to_dict(["SCAN"], parse_scan_result),
416 list_keys_to_dict(
417 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
418 ),
419 list_keys_to_dict(
420 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
421 ),
422 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
423 )
425 ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError)
427 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
428 """Replace the default cluster node.
429 A random cluster node will be chosen if target_node isn't passed, and primaries
430 will be prioritized. The default node will not be changed if there are no other
431 nodes in the cluster.
433 Args:
434 target_node (ClusterNode, optional): Target node to replace the default
435 node. Defaults to None.
436 """
437 if target_node:
438 self.nodes_manager.default_node = target_node
439 else:
440 curr_node = self.get_default_node()
441 primaries = [node for node in self.get_primaries() if node != curr_node]
442 if primaries:
443 # Choose a primary if the cluster contains different primaries
444 self.nodes_manager.default_node = random.choice(primaries)
445 else:
446 # Otherwise, hoose a primary if the cluster contains different primaries
447 replicas = [node for node in self.get_replicas() if node != curr_node]
448 if replicas:
449 self.nodes_manager.default_node = random.choice(replicas)
452class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
453 @classmethod
454 def from_url(cls, url, **kwargs):
455 """
456 Return a Redis client object configured from the given URL
458 For example::
460 redis://[[username]:[password]]@localhost:6379/0
461 rediss://[[username]:[password]]@localhost:6379/0
462 unix://[username@]/path/to/socket.sock?db=0[&password=password]
464 Three URL schemes are supported:
466 - `redis://` creates a TCP socket connection. See more at:
467 <https://www.iana.org/assignments/uri-schemes/prov/redis>
468 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
469 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
470 - ``unix://``: creates a Unix Domain Socket connection.
472 The username, password, hostname, path and all querystring values
473 are passed through urllib.parse.unquote in order to replace any
474 percent-encoded values with their corresponding characters.
476 There are several ways to specify a database number. The first value
477 found will be used:
479 1. A ``db`` querystring option, e.g. redis://localhost?db=0
480 2. If using the redis:// or rediss:// schemes, the path argument
481 of the url, e.g. redis://localhost/0
482 3. A ``db`` keyword argument to this function.
484 If none of these options are specified, the default db=0 is used.
486 All querystring options are cast to their appropriate Python types.
487 Boolean arguments can be specified with string values "True"/"False"
488 or "Yes"/"No". Values that cannot be properly cast cause a
489 ``ValueError`` to be raised. Once parsed, the querystring arguments
490 and keyword arguments are passed to the ``ConnectionPool``'s
491 class initializer. In the case of conflicting arguments, querystring
492 arguments always win.
494 """
495 return cls(url=url, **kwargs)
497 def __init__(
498 self,
499 host: Optional[str] = None,
500 port: int = 6379,
501 startup_nodes: Optional[List["ClusterNode"]] = None,
502 cluster_error_retry_attempts: int = 3,
503 retry: Optional["Retry"] = None,
504 require_full_coverage: bool = False,
505 reinitialize_steps: int = 5,
506 read_from_replicas: bool = False,
507 dynamic_startup_nodes: bool = True,
508 url: Optional[str] = None,
509 address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None,
510 **kwargs,
511 ):
512 """
513 Initialize a new RedisCluster client.
515 :param startup_nodes:
516 List of nodes from which initial bootstrapping can be done
517 :param host:
518 Can be used to point to a startup node
519 :param port:
520 Can be used to point to a startup node
521 :param require_full_coverage:
522 When set to False (default value): the client will not require a
523 full coverage of the slots. However, if not all slots are covered,
524 and at least one node has 'cluster-require-full-coverage' set to
525 'yes,' the server will throw a ClusterDownError for some key-based
526 commands. See -
527 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
528 When set to True: all slots must be covered to construct the
529 cluster client. If not all slots are covered, RedisClusterException
530 will be thrown.
531 :param read_from_replicas:
532 Enable read from replicas in READONLY mode. You can read possibly
533 stale data.
534 When set to true, read commands will be assigned between the
535 primary and its replications in a Round-Robin manner.
536 :param dynamic_startup_nodes:
537 Set the RedisCluster's startup nodes to all of the discovered nodes.
538 If true (default value), the cluster's discovered nodes will be used to
539 determine the cluster nodes-slots mapping in the next topology refresh.
540 It will remove the initial passed startup nodes if their endpoints aren't
541 listed in the CLUSTER SLOTS output.
542 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
543 specific IP addresses, it is best to set it to false.
544 :param cluster_error_retry_attempts:
545 Number of times to retry before raising an error when
546 :class:`~.TimeoutError` or :class:`~.ConnectionError` or
547 :class:`~.ClusterDownError` are encountered
548 :param reinitialize_steps:
549 Specifies the number of MOVED errors that need to occur before
550 reinitializing the whole cluster topology. If a MOVED error occurs
551 and the cluster does not need to be reinitialized on this current
552 error handling, only the MOVED slot will be patched with the
553 redirected node.
554 To reinitialize the cluster on every MOVED error, set
555 reinitialize_steps to 1.
556 To avoid reinitializing the cluster on moved errors, set
557 reinitialize_steps to 0.
558 :param address_remap:
559 An optional callable which, when provided with an internal network
560 address of a node, e.g. a `(host, port)` tuple, will return the address
561 where the node is reachable. This can be used to map the addresses at
562 which the nodes _think_ they are, to addresses at which a client may
563 reach them, such as when they sit behind a proxy.
565 :**kwargs:
566 Extra arguments that will be sent into Redis instance when created
567 (See Official redis-py doc for supported kwargs
568 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
569 Some kwargs are not supported and will raise a
570 RedisClusterException:
571 - db (Redis do not support database SELECT in cluster mode)
572 """
573 if startup_nodes is None:
574 startup_nodes = []
576 if "db" in kwargs:
577 # Argument 'db' is not possible to use in cluster mode
578 raise RedisClusterException(
579 "Argument 'db' is not possible to use in cluster mode"
580 )
582 # Get the startup node/s
583 from_url = False
584 if url is not None:
585 from_url = True
586 url_options = parse_url(url)
587 if "path" in url_options:
588 raise RedisClusterException(
589 "RedisCluster does not currently support Unix Domain "
590 "Socket connections"
591 )
592 if "db" in url_options and url_options["db"] != 0:
593 # Argument 'db' is not possible to use in cluster mode
594 raise RedisClusterException(
595 "A ``db`` querystring option can only be 0 in cluster mode"
596 )
597 kwargs.update(url_options)
598 host = kwargs.get("host")
599 port = kwargs.get("port", port)
600 startup_nodes.append(ClusterNode(host, port))
601 elif host is not None and port is not None:
602 startup_nodes.append(ClusterNode(host, port))
603 elif len(startup_nodes) == 0:
604 # No startup node was provided
605 raise RedisClusterException(
606 "RedisCluster requires at least one node to discover the "
607 "cluster. Please provide one of the followings:\n"
608 "1. host and port, for example:\n"
609 " RedisCluster(host='localhost', port=6379)\n"
610 "2. list of startup nodes, for example:\n"
611 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
612 " ClusterNode('localhost', 6378)])"
613 )
614 # Update the connection arguments
615 # Whenever a new connection is established, RedisCluster's on_connect
616 # method should be run
617 # If the user passed on_connect function we'll save it and run it
618 # inside the RedisCluster.on_connect() function
619 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
620 kwargs.update({"redis_connect_func": self.on_connect})
621 kwargs = cleanup_kwargs(**kwargs)
622 if retry:
623 self.retry = retry
624 kwargs.update({"retry": self.retry})
625 else:
626 kwargs.update({"retry": Retry(default_backoff(), 0)})
628 self.encoder = Encoder(
629 kwargs.get("encoding", "utf-8"),
630 kwargs.get("encoding_errors", "strict"),
631 kwargs.get("decode_responses", False),
632 )
633 self.cluster_error_retry_attempts = cluster_error_retry_attempts
634 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
635 self.node_flags = self.__class__.NODE_FLAGS.copy()
636 self.read_from_replicas = read_from_replicas
637 self.reinitialize_counter = 0
638 self.reinitialize_steps = reinitialize_steps
639 self.nodes_manager = NodesManager(
640 startup_nodes=startup_nodes,
641 from_url=from_url,
642 require_full_coverage=require_full_coverage,
643 dynamic_startup_nodes=dynamic_startup_nodes,
644 address_remap=address_remap,
645 **kwargs,
646 )
648 self.cluster_response_callbacks = CaseInsensitiveDict(
649 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
650 )
651 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
652 self.commands_parser = CommandsParser(self)
653 self._lock = threading.Lock()
655 def __enter__(self):
656 return self
658 def __exit__(self, exc_type, exc_value, traceback):
659 self.close()
661 def __del__(self):
662 self.close()
664 def disconnect_connection_pools(self):
665 for node in self.get_nodes():
666 if node.redis_connection:
667 try:
668 node.redis_connection.connection_pool.disconnect()
669 except OSError:
670 # Client was already disconnected. do nothing
671 pass
673 def on_connect(self, connection):
674 """
675 Initialize the connection, authenticate and select a database and send
676 READONLY if it is set during object initialization.
677 """
678 connection.set_parser(ClusterParser)
679 connection.on_connect()
681 if self.read_from_replicas:
682 # Sending READONLY command to server to configure connection as
683 # readonly. Since each cluster node may change its server type due
684 # to a failover, we should establish a READONLY connection
685 # regardless of the server type. If this is a primary connection,
686 # READONLY would not affect executing write commands.
687 connection.send_command("READONLY")
688 if str_if_bytes(connection.read_response()) != "OK":
689 raise ConnectionError("READONLY command failed")
691 if self.user_on_connect_func is not None:
692 self.user_on_connect_func(connection)
694 def get_redis_connection(self, node):
695 if not node.redis_connection:
696 with self._lock:
697 if not node.redis_connection:
698 self.nodes_manager.create_redis_connections([node])
699 return node.redis_connection
701 def get_node(self, host=None, port=None, node_name=None):
702 return self.nodes_manager.get_node(host, port, node_name)
704 def get_primaries(self):
705 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
707 def get_replicas(self):
708 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
710 def get_random_node(self):
711 return random.choice(list(self.nodes_manager.nodes_cache.values()))
713 def get_nodes(self):
714 return list(self.nodes_manager.nodes_cache.values())
716 def get_node_from_key(self, key, replica=False):
717 """
718 Get the node that holds the key's slot.
719 If replica set to True but the slot doesn't have any replicas, None is
720 returned.
721 """
722 slot = self.keyslot(key)
723 slot_cache = self.nodes_manager.slots_cache.get(slot)
724 if slot_cache is None or len(slot_cache) == 0:
725 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
726 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
727 return None
728 elif replica:
729 node_idx = 1
730 else:
731 # primary
732 node_idx = 0
734 return slot_cache[node_idx]
736 def get_default_node(self):
737 """
738 Get the cluster's default node
739 """
740 return self.nodes_manager.default_node
742 def set_default_node(self, node):
743 """
744 Set the default node of the cluster.
745 :param node: 'ClusterNode'
746 :return True if the default node was set, else False
747 """
748 if node is None or self.get_node(node_name=node.name) is None:
749 return False
750 self.nodes_manager.default_node = node
751 return True
753 def get_retry(self) -> Optional["Retry"]:
754 return self.retry
756 def set_retry(self, retry: "Retry") -> None:
757 self.retry = retry
758 for node in self.get_nodes():
759 node.redis_connection.set_retry(retry)
761 def monitor(self, target_node=None):
762 """
763 Returns a Monitor object for the specified target node.
764 The default cluster node will be selected if no target node was
765 specified.
766 Monitor is useful for handling the MONITOR command to the redis server.
767 next_command() method returns one command from monitor
768 listen() method yields commands from monitor.
769 """
770 if target_node is None:
771 target_node = self.get_default_node()
772 if target_node.redis_connection is None:
773 raise RedisClusterException(
774 f"Cluster Node {target_node.name} has no redis_connection"
775 )
776 return target_node.redis_connection.monitor()
778 def pubsub(self, node=None, host=None, port=None, **kwargs):
779 """
780 Allows passing a ClusterNode, or host&port, to get a pubsub instance
781 connected to the specified node
782 """
783 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
785 def pipeline(self, transaction=None, shard_hint=None):
786 """
787 Cluster impl:
788 Pipelines do not work in cluster mode the same way they
789 do in normal mode. Create a clone of this object so
790 that simulating pipelines will work correctly. Each
791 command will be called directly when used and
792 when calling execute() will only return the result stack.
793 """
794 if shard_hint:
795 raise RedisClusterException("shard_hint is deprecated in cluster mode")
797 if transaction:
798 raise RedisClusterException("transaction is deprecated in cluster mode")
800 return ClusterPipeline(
801 nodes_manager=self.nodes_manager,
802 commands_parser=self.commands_parser,
803 startup_nodes=self.nodes_manager.startup_nodes,
804 result_callbacks=self.result_callbacks,
805 cluster_response_callbacks=self.cluster_response_callbacks,
806 cluster_error_retry_attempts=self.cluster_error_retry_attempts,
807 read_from_replicas=self.read_from_replicas,
808 reinitialize_steps=self.reinitialize_steps,
809 lock=self._lock,
810 )
812 def lock(
813 self,
814 name,
815 timeout=None,
816 sleep=0.1,
817 blocking=True,
818 blocking_timeout=None,
819 lock_class=None,
820 thread_local=True,
821 ):
822 """
823 Return a new Lock object using key ``name`` that mimics
824 the behavior of threading.Lock.
826 If specified, ``timeout`` indicates a maximum life for the lock.
827 By default, it will remain locked until release() is called.
829 ``sleep`` indicates the amount of time to sleep per loop iteration
830 when the lock is in blocking mode and another client is currently
831 holding the lock.
833 ``blocking`` indicates whether calling ``acquire`` should block until
834 the lock has been acquired or to fail immediately, causing ``acquire``
835 to return False and the lock not being acquired. Defaults to True.
836 Note this value can be overridden by passing a ``blocking``
837 argument to ``acquire``.
839 ``blocking_timeout`` indicates the maximum amount of time in seconds to
840 spend trying to acquire the lock. A value of ``None`` indicates
841 continue trying forever. ``blocking_timeout`` can be specified as a
842 float or integer, both representing the number of seconds to wait.
844 ``lock_class`` forces the specified lock implementation. Note that as
845 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
846 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
847 you have created your own custom lock class.
849 ``thread_local`` indicates whether the lock token is placed in
850 thread-local storage. By default, the token is placed in thread local
851 storage so that a thread only sees its token, not a token set by
852 another thread. Consider the following timeline:
854 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
855 thread-1 sets the token to "abc"
856 time: 1, thread-2 blocks trying to acquire `my-lock` using the
857 Lock instance.
858 time: 5, thread-1 has not yet completed. redis expires the lock
859 key.
860 time: 5, thread-2 acquired `my-lock` now that it's available.
861 thread-2 sets the token to "xyz"
862 time: 6, thread-1 finishes its work and calls release(). if the
863 token is *not* stored in thread local storage, then
864 thread-1 would see the token value as "xyz" and would be
865 able to successfully release the thread-2's lock.
867 In some use cases it's necessary to disable thread local storage. For
868 example, if you have code where one thread acquires a lock and passes
869 that lock instance to a worker thread to release later. If thread
870 local storage isn't disabled in this case, the worker thread won't see
871 the token set by the thread that acquired the lock. Our assumption
872 is that these cases aren't common and as such default to using
873 thread local storage."""
874 if lock_class is None:
875 lock_class = Lock
876 return lock_class(
877 self,
878 name,
879 timeout=timeout,
880 sleep=sleep,
881 blocking=blocking,
882 blocking_timeout=blocking_timeout,
883 thread_local=thread_local,
884 )
886 def set_response_callback(self, command, callback):
887 """Set a custom Response Callback"""
888 self.cluster_response_callbacks[command] = callback
890 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
891 # Determine which nodes should be executed the command on.
892 # Returns a list of target nodes.
893 command = args[0].upper()
894 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
895 command = f"{args[0]} {args[1]}".upper()
897 nodes_flag = kwargs.pop("nodes_flag", None)
898 if nodes_flag is not None:
899 # nodes flag passed by the user
900 command_flag = nodes_flag
901 else:
902 # get the nodes group for this command if it was predefined
903 command_flag = self.command_flags.get(command)
904 if command_flag == self.__class__.RANDOM:
905 # return a random node
906 return [self.get_random_node()]
907 elif command_flag == self.__class__.PRIMARIES:
908 # return all primaries
909 return self.get_primaries()
910 elif command_flag == self.__class__.REPLICAS:
911 # return all replicas
912 return self.get_replicas()
913 elif command_flag == self.__class__.ALL_NODES:
914 # return all nodes
915 return self.get_nodes()
916 elif command_flag == self.__class__.DEFAULT_NODE:
917 # return the cluster's default node
918 return [self.nodes_manager.default_node]
919 elif command in self.__class__.SEARCH_COMMANDS[0]:
920 return [self.nodes_manager.default_node]
921 else:
922 # get the node that holds the key's slot
923 slot = self.determine_slot(*args)
924 node = self.nodes_manager.get_node_from_slot(
925 slot, self.read_from_replicas and command in READ_COMMANDS
926 )
927 return [node]
929 def _should_reinitialized(self):
930 # To reinitialize the cluster on every MOVED error,
931 # set reinitialize_steps to 1.
932 # To avoid reinitializing the cluster on moved errors, set
933 # reinitialize_steps to 0.
934 if self.reinitialize_steps == 0:
935 return False
936 else:
937 return self.reinitialize_counter % self.reinitialize_steps == 0
939 def keyslot(self, key):
940 """
941 Calculate keyslot for a given key.
942 See Keys distribution model in https://redis.io/topics/cluster-spec
943 """
944 k = self.encoder.encode(key)
945 return key_slot(k)
947 def _get_command_keys(self, *args):
948 """
949 Get the keys in the command. If the command has no keys in in, None is
950 returned.
952 NOTE: Due to a bug in redis<7.0, this function does not work properly
953 for EVAL or EVALSHA when the `numkeys` arg is 0.
954 - issue: https://github.com/redis/redis/issues/9493
955 - fix: https://github.com/redis/redis/pull/9733
957 So, don't use this function with EVAL or EVALSHA.
958 """
959 redis_conn = self.get_default_node().redis_connection
960 return self.commands_parser.get_keys(redis_conn, *args)
962 def determine_slot(self, *args):
963 """
964 Figure out what slot to use based on args.
966 Raises a RedisClusterException if there's a missing key and we can't
967 determine what slots to map the command to; or, if the keys don't
968 all map to the same key slot.
969 """
970 command = args[0]
971 if self.command_flags.get(command) == SLOT_ID:
972 # The command contains the slot ID
973 return args[1]
975 # Get the keys in the command
977 # EVAL and EVALSHA are common enough that it's wasteful to go to the
978 # redis server to parse the keys. Besides, there is a bug in redis<7.0
979 # where `self._get_command_keys()` fails anyway. So, we special case
980 # EVAL/EVALSHA.
981 if command.upper() in ("EVAL", "EVALSHA"):
982 # command syntax: EVAL "script body" num_keys ...
983 if len(args) <= 2:
984 raise RedisClusterException(f"Invalid args in command: {args}")
985 num_actual_keys = int(args[2])
986 eval_keys = args[3 : 3 + num_actual_keys]
987 # if there are 0 keys, that means the script can be run on any node
988 # so we can just return a random slot
989 if len(eval_keys) == 0:
990 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
991 keys = eval_keys
992 else:
993 keys = self._get_command_keys(*args)
994 if keys is None or len(keys) == 0:
995 # FCALL can call a function with 0 keys, that means the function
996 # can be run on any node so we can just return a random slot
997 if command.upper() in ("FCALL", "FCALL_RO"):
998 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
999 raise RedisClusterException(
1000 "No way to dispatch this command to Redis Cluster. "
1001 "Missing key.\nYou can execute the command by specifying "
1002 f"target nodes.\nCommand: {args}"
1003 )
1005 # single key command
1006 if len(keys) == 1:
1007 return self.keyslot(keys[0])
1009 # multi-key command; we need to make sure all keys are mapped to
1010 # the same slot
1011 slots = {self.keyslot(key) for key in keys}
1012 if len(slots) != 1:
1013 raise RedisClusterException(
1014 f"{command} - all keys must map to the same key slot"
1015 )
1017 return slots.pop()
1019 def get_encoder(self):
1020 """
1021 Get the connections' encoder
1022 """
1023 return self.encoder
1025 def get_connection_kwargs(self):
1026 """
1027 Get the connections' key-word arguments
1028 """
1029 return self.nodes_manager.connection_kwargs
1031 def _is_nodes_flag(self, target_nodes):
1032 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1034 def _parse_target_nodes(self, target_nodes):
1035 if isinstance(target_nodes, list):
1036 nodes = target_nodes
1037 elif isinstance(target_nodes, ClusterNode):
1038 # Supports passing a single ClusterNode as a variable
1039 nodes = [target_nodes]
1040 elif isinstance(target_nodes, dict):
1041 # Supports dictionaries of the format {node_name: node}.
1042 # It enables to execute commands with multi nodes as follows:
1043 # rc.cluster_save_config(rc.get_primaries())
1044 nodes = target_nodes.values()
1045 else:
1046 raise TypeError(
1047 "target_nodes type can be one of the following: "
1048 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1049 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1050 f"The passed type is {type(target_nodes)}"
1051 )
1052 return nodes
1054 def execute_command(self, *args, **kwargs):
1055 """
1056 Wrapper for ERRORS_ALLOW_RETRY error handling.
1058 It will try the number of times specified by the config option
1059 "self.cluster_error_retry_attempts" which defaults to 3 unless manually
1060 configured.
1062 If it reaches the number of times, the command will raise the exception
1064 Key argument :target_nodes: can be passed with the following types:
1065 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1066 ClusterNode
1067 list<ClusterNode>
1068 dict<Any, ClusterNode>
1069 """
1070 target_nodes_specified = False
1071 is_default_node = False
1072 target_nodes = None
1073 passed_targets = kwargs.pop("target_nodes", None)
1074 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1075 target_nodes = self._parse_target_nodes(passed_targets)
1076 target_nodes_specified = True
1077 # If an error that allows retrying was thrown, the nodes and slots
1078 # cache were reinitialized. We will retry executing the command with
1079 # the updated cluster setup only when the target nodes can be
1080 # determined again with the new cache tables. Therefore, when target
1081 # nodes were passed to this function, we cannot retry the command
1082 # execution since the nodes may not be valid anymore after the tables
1083 # were reinitialized. So in case of passed target nodes,
1084 # retry_attempts will be set to 0.
1085 retry_attempts = (
1086 0 if target_nodes_specified else self.cluster_error_retry_attempts
1087 )
1088 # Add one for the first execution
1089 execute_attempts = 1 + retry_attempts
1090 for _ in range(execute_attempts):
1091 try:
1092 res = {}
1093 if not target_nodes_specified:
1094 # Determine the nodes to execute the command on
1095 target_nodes = self._determine_nodes(
1096 *args, **kwargs, nodes_flag=passed_targets
1097 )
1098 if not target_nodes:
1099 raise RedisClusterException(
1100 f"No targets were found to execute {args} command on"
1101 )
1102 if (
1103 len(target_nodes) == 1
1104 and target_nodes[0] == self.get_default_node()
1105 ):
1106 is_default_node = True
1107 for node in target_nodes:
1108 res[node.name] = self._execute_command(node, *args, **kwargs)
1109 # Return the processed result
1110 return self._process_result(args[0], res, **kwargs)
1111 except Exception as e:
1112 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1113 if is_default_node:
1114 # Replace the default cluster node
1115 self.replace_default_node()
1116 # The nodes and slots cache were reinitialized.
1117 # Try again with the new cluster setup.
1118 retry_attempts -= 1
1119 continue
1120 else:
1121 # raise the exception
1122 raise e
1124 def _execute_command(self, target_node, *args, **kwargs):
1125 """
1126 Send a command to a node in the cluster
1127 """
1128 keys = kwargs.pop("keys", None)
1129 command = args[0]
1130 redis_node = None
1131 connection = None
1132 redirect_addr = None
1133 asking = False
1134 moved = False
1135 ttl = int(self.RedisClusterRequestTTL)
1137 while ttl > 0:
1138 ttl -= 1
1139 try:
1140 if asking:
1141 target_node = self.get_node(node_name=redirect_addr)
1142 elif moved:
1143 # MOVED occurred and the slots cache was updated,
1144 # refresh the target node
1145 slot = self.determine_slot(*args)
1146 target_node = self.nodes_manager.get_node_from_slot(
1147 slot, self.read_from_replicas and command in READ_COMMANDS
1148 )
1149 moved = False
1151 redis_node = self.get_redis_connection(target_node)
1152 connection = get_connection(redis_node, *args, **kwargs)
1153 if asking:
1154 connection.send_command("ASKING")
1155 redis_node.parse_response(connection, "ASKING", **kwargs)
1156 asking = False
1157 response_from_cache = connection._get_from_local_cache(args)
1158 if response_from_cache is not None:
1159 return response_from_cache
1160 else:
1161 connection.send_command(*args)
1162 response = redis_node.parse_response(connection, command, **kwargs)
1163 if command in self.cluster_response_callbacks:
1164 response = self.cluster_response_callbacks[command](
1165 response, **kwargs
1166 )
1167 connection._add_to_local_cache(args, response, keys)
1168 return response
1169 except AuthenticationError:
1170 raise
1171 except (ConnectionError, TimeoutError) as e:
1172 # Connection retries are being handled in the node's
1173 # Retry object.
1174 # ConnectionError can also be raised if we couldn't get a
1175 # connection from the pool before timing out, so check that
1176 # this is an actual connection before attempting to disconnect.
1177 if connection is not None:
1178 connection.disconnect()
1180 # Remove the failed node from the startup nodes before we try
1181 # to reinitialize the cluster
1182 self.nodes_manager.startup_nodes.pop(target_node.name, None)
1183 # Reset the cluster node's connection
1184 target_node.redis_connection = None
1185 self.nodes_manager.initialize()
1186 raise e
1187 except MovedError as e:
1188 # First, we will try to patch the slots/nodes cache with the
1189 # redirected node output and try again. If MovedError exceeds
1190 # 'reinitialize_steps' number of times, we will force
1191 # reinitializing the tables, and then try again.
1192 # 'reinitialize_steps' counter will increase faster when
1193 # the same client object is shared between multiple threads. To
1194 # reduce the frequency you can set this variable in the
1195 # RedisCluster constructor.
1196 self.reinitialize_counter += 1
1197 if self._should_reinitialized():
1198 self.nodes_manager.initialize()
1199 # Reset the counter
1200 self.reinitialize_counter = 0
1201 else:
1202 self.nodes_manager.update_moved_exception(e)
1203 moved = True
1204 except TryAgainError:
1205 if ttl < self.RedisClusterRequestTTL / 2:
1206 time.sleep(0.05)
1207 except AskError as e:
1208 redirect_addr = get_node_name(host=e.host, port=e.port)
1209 asking = True
1210 except ClusterDownError as e:
1211 # ClusterDownError can occur during a failover and to get
1212 # self-healed, we will try to reinitialize the cluster layout
1213 # and retry executing the command
1214 time.sleep(0.25)
1215 self.nodes_manager.initialize()
1216 raise e
1217 except ResponseError:
1218 raise
1219 except Exception as e:
1220 if connection:
1221 connection.disconnect()
1222 raise e
1223 finally:
1224 if connection is not None:
1225 redis_node.connection_pool.release(connection)
1227 raise ClusterError("TTL exhausted.")
1229 def close(self):
1230 try:
1231 with self._lock:
1232 if self.nodes_manager:
1233 self.nodes_manager.close()
1234 except AttributeError:
1235 # RedisCluster's __init__ can fail before nodes_manager is set
1236 pass
1238 def _process_result(self, command, res, **kwargs):
1239 """
1240 Process the result of the executed command.
1241 The function would return a dict or a single value.
1243 :type command: str
1244 :type res: dict
1246 `res` should be in the following format:
1247 Dict<node_name, command_result>
1248 """
1249 if command in self.result_callbacks:
1250 return self.result_callbacks[command](command, res, **kwargs)
1251 elif len(res) == 1:
1252 # When we execute the command on a single node, we can
1253 # remove the dictionary and return a single response
1254 return list(res.values())[0]
1255 else:
1256 return res
1258 def load_external_module(self, funcname, func):
1259 """
1260 This function can be used to add externally defined redis modules,
1261 and their namespaces to the redis client.
1263 ``funcname`` - A string containing the name of the function to create
1264 ``func`` - The function, being added to this class.
1265 """
1266 setattr(self, funcname, func)
1269class ClusterNode:
1270 def __init__(self, host, port, server_type=None, redis_connection=None):
1271 if host == "localhost":
1272 host = socket.gethostbyname(host)
1274 self.host = host
1275 self.port = port
1276 self.name = get_node_name(host, port)
1277 self.server_type = server_type
1278 self.redis_connection = redis_connection
1280 def __repr__(self):
1281 return (
1282 f"[host={self.host},"
1283 f"port={self.port},"
1284 f"name={self.name},"
1285 f"server_type={self.server_type},"
1286 f"redis_connection={self.redis_connection}]"
1287 )
1289 def __eq__(self, obj):
1290 return isinstance(obj, ClusterNode) and obj.name == self.name
1292 def __del__(self):
1293 if self.redis_connection is not None:
1294 self.redis_connection.close()
1297class LoadBalancer:
1298 """
1299 Round-Robin Load Balancing
1300 """
1302 def __init__(self, start_index: int = 0) -> None:
1303 self.primary_to_idx = {}
1304 self.start_index = start_index
1306 def get_server_index(self, primary: str, list_size: int) -> int:
1307 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1308 # Update the index
1309 self.primary_to_idx[primary] = (server_index + 1) % list_size
1310 return server_index
1312 def reset(self) -> None:
1313 self.primary_to_idx.clear()
1316class NodesManager:
1317 def __init__(
1318 self,
1319 startup_nodes,
1320 from_url=False,
1321 require_full_coverage=False,
1322 lock=None,
1323 dynamic_startup_nodes=True,
1324 connection_pool_class=ConnectionPool,
1325 address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None,
1326 **kwargs,
1327 ):
1328 self.nodes_cache = {}
1329 self.slots_cache = {}
1330 self.startup_nodes = {}
1331 self.default_node = None
1332 self.populate_startup_nodes(startup_nodes)
1333 self.from_url = from_url
1334 self._require_full_coverage = require_full_coverage
1335 self._dynamic_startup_nodes = dynamic_startup_nodes
1336 self.connection_pool_class = connection_pool_class
1337 self.address_remap = address_remap
1338 self._moved_exception = None
1339 self.connection_kwargs = kwargs
1340 self.read_load_balancer = LoadBalancer()
1341 if lock is None:
1342 lock = threading.Lock()
1343 self._lock = lock
1344 self.initialize()
1346 def get_node(self, host=None, port=None, node_name=None):
1347 """
1348 Get the requested node from the cluster's nodes.
1349 nodes.
1350 :return: ClusterNode if the node exists, else None
1351 """
1352 if host and port:
1353 # the user passed host and port
1354 if host == "localhost":
1355 host = socket.gethostbyname(host)
1356 return self.nodes_cache.get(get_node_name(host=host, port=port))
1357 elif node_name:
1358 return self.nodes_cache.get(node_name)
1359 else:
1360 return None
1362 def update_moved_exception(self, exception):
1363 self._moved_exception = exception
1365 def _update_moved_slots(self):
1366 """
1367 Update the slot's node with the redirected one
1368 """
1369 e = self._moved_exception
1370 redirected_node = self.get_node(host=e.host, port=e.port)
1371 if redirected_node is not None:
1372 # The node already exists
1373 if redirected_node.server_type is not PRIMARY:
1374 # Update the node's server type
1375 redirected_node.server_type = PRIMARY
1376 else:
1377 # This is a new node, we will add it to the nodes cache
1378 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
1379 self.nodes_cache[redirected_node.name] = redirected_node
1380 if redirected_node in self.slots_cache[e.slot_id]:
1381 # The MOVED error resulted from a failover, and the new slot owner
1382 # had previously been a replica.
1383 old_primary = self.slots_cache[e.slot_id][0]
1384 # Update the old primary to be a replica and add it to the end of
1385 # the slot's node list
1386 old_primary.server_type = REPLICA
1387 self.slots_cache[e.slot_id].append(old_primary)
1388 # Remove the old replica, which is now a primary, from the slot's
1389 # node list
1390 self.slots_cache[e.slot_id].remove(redirected_node)
1391 # Override the old primary with the new one
1392 self.slots_cache[e.slot_id][0] = redirected_node
1393 if self.default_node == old_primary:
1394 # Update the default node with the new primary
1395 self.default_node = redirected_node
1396 else:
1397 # The new slot owner is a new server, or a server from a different
1398 # shard. We need to remove all current nodes from the slot's list
1399 # (including replications) and add just the new node.
1400 self.slots_cache[e.slot_id] = [redirected_node]
1401 # Reset moved_exception
1402 self._moved_exception = None
1404 def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
1405 """
1406 Gets a node that servers this hash slot
1407 """
1408 if self._moved_exception:
1409 with self._lock:
1410 if self._moved_exception:
1411 self._update_moved_slots()
1413 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
1414 raise SlotNotCoveredError(
1415 f'Slot "{slot}" not covered by the cluster. '
1416 f'"require_full_coverage={self._require_full_coverage}"'
1417 )
1419 if read_from_replicas is True:
1420 # get the server index in a Round-Robin manner
1421 primary_name = self.slots_cache[slot][0].name
1422 node_idx = self.read_load_balancer.get_server_index(
1423 primary_name, len(self.slots_cache[slot])
1424 )
1425 elif (
1426 server_type is None
1427 or server_type == PRIMARY
1428 or len(self.slots_cache[slot]) == 1
1429 ):
1430 # return a primary
1431 node_idx = 0
1432 else:
1433 # return a replica
1434 # randomly choose one of the replicas
1435 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
1437 return self.slots_cache[slot][node_idx]
1439 def get_nodes_by_server_type(self, server_type):
1440 """
1441 Get all nodes with the specified server type
1442 :param server_type: 'primary' or 'replica'
1443 :return: list of ClusterNode
1444 """
1445 return [
1446 node
1447 for node in self.nodes_cache.values()
1448 if node.server_type == server_type
1449 ]
1451 def populate_startup_nodes(self, nodes):
1452 """
1453 Populate all startup nodes and filters out any duplicates
1454 """
1455 for n in nodes:
1456 self.startup_nodes[n.name] = n
1458 def check_slots_coverage(self, slots_cache):
1459 # Validate if all slots are covered or if we should try next
1460 # startup node
1461 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
1462 if i not in slots_cache:
1463 return False
1464 return True
1466 def create_redis_connections(self, nodes):
1467 """
1468 This function will create a redis connection to all nodes in :nodes:
1469 """
1470 for node in nodes:
1471 if node.redis_connection is None:
1472 node.redis_connection = self.create_redis_node(
1473 host=node.host, port=node.port, **self.connection_kwargs
1474 )
1476 def create_redis_node(self, host, port, **kwargs):
1477 if self.from_url:
1478 # Create a redis node with a costumed connection pool
1479 kwargs.update({"host": host})
1480 kwargs.update({"port": port})
1481 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
1482 else:
1483 r = Redis(host=host, port=port, **kwargs)
1484 return r
1486 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
1487 node_name = get_node_name(host, port)
1488 # check if we already have this node in the tmp_nodes_cache
1489 target_node = tmp_nodes_cache.get(node_name)
1490 if target_node is None:
1491 # before creating a new cluster node, check if the cluster node already
1492 # exists in the current nodes cache and has a valid connection so we can
1493 # reuse it
1494 target_node = self.nodes_cache.get(node_name)
1495 if target_node is None or target_node.redis_connection is None:
1496 # create new cluster node for this cluster
1497 target_node = ClusterNode(host, port, role)
1498 if target_node.server_type != role:
1499 target_node.server_type = role
1501 return target_node
1503 def initialize(self):
1504 """
1505 Initializes the nodes cache, slots cache and redis connections.
1506 :startup_nodes:
1507 Responsible for discovering other nodes in the cluster
1508 """
1509 self.reset()
1510 tmp_nodes_cache = {}
1511 tmp_slots = {}
1512 disagreements = []
1513 startup_nodes_reachable = False
1514 fully_covered = False
1515 kwargs = self.connection_kwargs
1516 exception = None
1517 for startup_node in self.startup_nodes.values():
1518 try:
1519 if startup_node.redis_connection:
1520 r = startup_node.redis_connection
1521 else:
1522 # Create a new Redis connection
1523 r = self.create_redis_node(
1524 startup_node.host, startup_node.port, **kwargs
1525 )
1526 self.startup_nodes[startup_node.name].redis_connection = r
1527 # Make sure cluster mode is enabled on this node
1528 try:
1529 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1530 except ResponseError:
1531 raise RedisClusterException(
1532 "Cluster mode is not enabled on this node"
1533 )
1534 startup_nodes_reachable = True
1535 except Exception as e:
1536 # Try the next startup node.
1537 # The exception is saved and raised only if we have no more nodes.
1538 exception = e
1539 continue
1541 # CLUSTER SLOTS command results in the following output:
1542 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
1543 # where each node contains the following list: [IP, port, node_id]
1544 # Therefore, cluster_slots[0][2][0] will be the IP address of the
1545 # primary node of the first slot section.
1546 # If there's only one server in the cluster, its ``host`` is ''
1547 # Fix it to the host in startup_nodes
1548 if (
1549 len(cluster_slots) == 1
1550 and len(cluster_slots[0][2][0]) == 0
1551 and len(self.startup_nodes) == 1
1552 ):
1553 cluster_slots[0][2][0] = startup_node.host
1555 for slot in cluster_slots:
1556 primary_node = slot[2]
1557 host = str_if_bytes(primary_node[0])
1558 if host == "":
1559 host = startup_node.host
1560 port = int(primary_node[1])
1561 host, port = self.remap_host_port(host, port)
1563 target_node = self._get_or_create_cluster_node(
1564 host, port, PRIMARY, tmp_nodes_cache
1565 )
1566 # add this node to the nodes cache
1567 tmp_nodes_cache[target_node.name] = target_node
1569 for i in range(int(slot[0]), int(slot[1]) + 1):
1570 if i not in tmp_slots:
1571 tmp_slots[i] = []
1572 tmp_slots[i].append(target_node)
1573 replica_nodes = [slot[j] for j in range(3, len(slot))]
1575 for replica_node in replica_nodes:
1576 host = str_if_bytes(replica_node[0])
1577 port = replica_node[1]
1578 host, port = self.remap_host_port(host, port)
1580 target_replica_node = self._get_or_create_cluster_node(
1581 host, port, REPLICA, tmp_nodes_cache
1582 )
1583 tmp_slots[i].append(target_replica_node)
1584 # add this node to the nodes cache
1585 tmp_nodes_cache[target_replica_node.name] = (
1586 target_replica_node
1587 )
1588 else:
1589 # Validate that 2 nodes want to use the same slot cache
1590 # setup
1591 tmp_slot = tmp_slots[i][0]
1592 if tmp_slot.name != target_node.name:
1593 disagreements.append(
1594 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
1595 )
1597 if len(disagreements) > 5:
1598 raise RedisClusterException(
1599 f"startup_nodes could not agree on a valid "
1600 f'slots cache: {", ".join(disagreements)}'
1601 )
1603 fully_covered = self.check_slots_coverage(tmp_slots)
1604 if fully_covered:
1605 # Don't need to continue to the next startup node if all
1606 # slots are covered
1607 break
1609 if not startup_nodes_reachable:
1610 raise RedisClusterException(
1611 f"Redis Cluster cannot be connected. Please provide at least "
1612 f"one reachable node: {str(exception)}"
1613 ) from exception
1615 # Create Redis connections to all nodes
1616 self.create_redis_connections(list(tmp_nodes_cache.values()))
1618 # Check if the slots are not fully covered
1619 if not fully_covered and self._require_full_coverage:
1620 # Despite the requirement that the slots be covered, there
1621 # isn't a full coverage
1622 raise RedisClusterException(
1623 f"All slots are not covered after query all startup_nodes. "
1624 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
1625 f"covered..."
1626 )
1628 # Set the tmp variables to the real variables
1629 self.nodes_cache = tmp_nodes_cache
1630 self.slots_cache = tmp_slots
1631 # Set the default node
1632 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
1633 if self._dynamic_startup_nodes:
1634 # Populate the startup nodes with all discovered nodes
1635 self.startup_nodes = tmp_nodes_cache
1636 # If initialize was called after a MovedError, clear it
1637 self._moved_exception = None
1639 def close(self):
1640 self.default_node = None
1641 for node in self.nodes_cache.values():
1642 if node.redis_connection:
1643 node.redis_connection.close()
1645 def reset(self):
1646 try:
1647 self.read_load_balancer.reset()
1648 except TypeError:
1649 # The read_load_balancer is None, do nothing
1650 pass
1652 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
1653 """
1654 Remap the host and port returned from the cluster to a different
1655 internal value. Useful if the client is not connecting directly
1656 to the cluster.
1657 """
1658 if self.address_remap:
1659 return self.address_remap((host, port))
1660 return host, port
1663class ClusterPubSub(PubSub):
1664 """
1665 Wrapper for PubSub class.
1667 IMPORTANT: before using ClusterPubSub, read about the known limitations
1668 with pubsub in Cluster mode and learn how to workaround them:
1669 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
1670 """
1672 def __init__(
1673 self,
1674 redis_cluster,
1675 node=None,
1676 host=None,
1677 port=None,
1678 push_handler_func=None,
1679 **kwargs,
1680 ):
1681 """
1682 When a pubsub instance is created without specifying a node, a single
1683 node will be transparently chosen for the pubsub connection on the
1684 first command execution. The node will be determined by:
1685 1. Hashing the channel name in the request to find its keyslot
1686 2. Selecting a node that handles the keyslot: If read_from_replicas is
1687 set to true, a replica can be selected.
1689 :type redis_cluster: RedisCluster
1690 :type node: ClusterNode
1691 :type host: str
1692 :type port: int
1693 """
1694 self.node = None
1695 self.set_pubsub_node(redis_cluster, node, host, port)
1696 connection_pool = (
1697 None
1698 if self.node is None
1699 else redis_cluster.get_redis_connection(self.node).connection_pool
1700 )
1701 self.cluster = redis_cluster
1702 self.node_pubsub_mapping = {}
1703 self._pubsubs_generator = self._pubsubs_generator()
1704 super().__init__(
1705 connection_pool=connection_pool,
1706 encoder=redis_cluster.encoder,
1707 push_handler_func=push_handler_func,
1708 **kwargs,
1709 )
1711 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
1712 """
1713 The pubsub node will be set according to the passed node, host and port
1714 When none of the node, host, or port are specified - the node is set
1715 to None and will be determined by the keyslot of the channel in the
1716 first command to be executed.
1717 RedisClusterException will be thrown if the passed node does not exist
1718 in the cluster.
1719 If host is passed without port, or vice versa, a DataError will be
1720 thrown.
1721 :type cluster: RedisCluster
1722 :type node: ClusterNode
1723 :type host: str
1724 :type port: int
1725 """
1726 if node is not None:
1727 # node is passed by the user
1728 self._raise_on_invalid_node(cluster, node, node.host, node.port)
1729 pubsub_node = node
1730 elif host is not None and port is not None:
1731 # host and port passed by the user
1732 node = cluster.get_node(host=host, port=port)
1733 self._raise_on_invalid_node(cluster, node, host, port)
1734 pubsub_node = node
1735 elif any([host, port]) is True:
1736 # only 'host' or 'port' passed
1737 raise DataError("Passing a host requires passing a port, and vice versa")
1738 else:
1739 # nothing passed by the user. set node to None
1740 pubsub_node = None
1742 self.node = pubsub_node
1744 def get_pubsub_node(self):
1745 """
1746 Get the node that is being used as the pubsub connection
1747 """
1748 return self.node
1750 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
1751 """
1752 Raise a RedisClusterException if the node is None or doesn't exist in
1753 the cluster.
1754 """
1755 if node is None or redis_cluster.get_node(node_name=node.name) is None:
1756 raise RedisClusterException(
1757 f"Node {host}:{port} doesn't exist in the cluster"
1758 )
1760 def execute_command(self, *args):
1761 """
1762 Execute a subscribe/unsubscribe command.
1764 Taken code from redis-py and tweak to make it work within a cluster.
1765 """
1766 # NOTE: don't parse the response in this function -- it could pull a
1767 # legitimate message off the stack if the connection is already
1768 # subscribed to one or more channels
1770 if self.connection is None:
1771 if self.connection_pool is None:
1772 if len(args) > 1:
1773 # Hash the first channel and get one of the nodes holding
1774 # this slot
1775 channel = args[1]
1776 slot = self.cluster.keyslot(channel)
1777 node = self.cluster.nodes_manager.get_node_from_slot(
1778 slot, self.cluster.read_from_replicas
1779 )
1780 else:
1781 # Get a random node
1782 node = self.cluster.get_random_node()
1783 self.node = node
1784 redis_connection = self.cluster.get_redis_connection(node)
1785 self.connection_pool = redis_connection.connection_pool
1786 self.connection = self.connection_pool.get_connection(
1787 "pubsub", self.shard_hint
1788 )
1789 # register a callback that re-subscribes to any channels we
1790 # were listening to when we were disconnected
1791 self.connection.register_connect_callback(self.on_connect)
1792 if self.push_handler_func is not None and not HIREDIS_AVAILABLE:
1793 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
1794 connection = self.connection
1795 self._execute(connection, connection.send_command, *args)
1797 def _get_node_pubsub(self, node):
1798 try:
1799 return self.node_pubsub_mapping[node.name]
1800 except KeyError:
1801 pubsub = node.redis_connection.pubsub(
1802 push_handler_func=self.push_handler_func
1803 )
1804 self.node_pubsub_mapping[node.name] = pubsub
1805 return pubsub
1807 def _sharded_message_generator(self):
1808 for _ in range(len(self.node_pubsub_mapping)):
1809 pubsub = next(self._pubsubs_generator)
1810 message = pubsub.get_message()
1811 if message is not None:
1812 return message
1813 return None
1815 def _pubsubs_generator(self):
1816 while True:
1817 for pubsub in self.node_pubsub_mapping.values():
1818 yield pubsub
1820 def get_sharded_message(
1821 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
1822 ):
1823 if target_node:
1824 message = self.node_pubsub_mapping[target_node.name].get_message(
1825 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
1826 )
1827 else:
1828 message = self._sharded_message_generator()
1829 if message is None:
1830 return None
1831 elif str_if_bytes(message["type"]) == "sunsubscribe":
1832 if message["channel"] in self.pending_unsubscribe_shard_channels:
1833 self.pending_unsubscribe_shard_channels.remove(message["channel"])
1834 self.shard_channels.pop(message["channel"], None)
1835 node = self.cluster.get_node_from_key(message["channel"])
1836 if self.node_pubsub_mapping[node.name].subscribed is False:
1837 self.node_pubsub_mapping.pop(node.name)
1838 if not self.channels and not self.patterns and not self.shard_channels:
1839 # There are no subscriptions anymore, set subscribed_event flag
1840 # to false
1841 self.subscribed_event.clear()
1842 if self.ignore_subscribe_messages or ignore_subscribe_messages:
1843 return None
1844 return message
1846 def ssubscribe(self, *args, **kwargs):
1847 if args:
1848 args = list_or_args(args[0], args[1:])
1849 s_channels = dict.fromkeys(args)
1850 s_channels.update(kwargs)
1851 for s_channel, handler in s_channels.items():
1852 node = self.cluster.get_node_from_key(s_channel)
1853 pubsub = self._get_node_pubsub(node)
1854 if handler:
1855 pubsub.ssubscribe(**{s_channel: handler})
1856 else:
1857 pubsub.ssubscribe(s_channel)
1858 self.shard_channels.update(pubsub.shard_channels)
1859 self.pending_unsubscribe_shard_channels.difference_update(
1860 self._normalize_keys({s_channel: None})
1861 )
1862 if pubsub.subscribed and not self.subscribed:
1863 self.subscribed_event.set()
1864 self.health_check_response_counter = 0
1866 def sunsubscribe(self, *args):
1867 if args:
1868 args = list_or_args(args[0], args[1:])
1869 else:
1870 args = self.shard_channels
1872 for s_channel in args:
1873 node = self.cluster.get_node_from_key(s_channel)
1874 p = self._get_node_pubsub(node)
1875 p.sunsubscribe(s_channel)
1876 self.pending_unsubscribe_shard_channels.update(
1877 p.pending_unsubscribe_shard_channels
1878 )
1880 def get_redis_connection(self):
1881 """
1882 Get the Redis connection of the pubsub connected node.
1883 """
1884 if self.node is not None:
1885 return self.node.redis_connection
1887 def disconnect(self):
1888 """
1889 Disconnect the pubsub connection.
1890 """
1891 if self.connection:
1892 self.connection.disconnect()
1893 for pubsub in self.node_pubsub_mapping.values():
1894 pubsub.connection.disconnect()
1897class ClusterPipeline(RedisCluster):
1898 """
1899 Support for Redis pipeline
1900 in cluster mode
1901 """
1903 ERRORS_ALLOW_RETRY = (
1904 ConnectionError,
1905 TimeoutError,
1906 MovedError,
1907 AskError,
1908 TryAgainError,
1909 )
1911 def __init__(
1912 self,
1913 nodes_manager: "NodesManager",
1914 commands_parser: "CommandsParser",
1915 result_callbacks: Optional[Dict[str, Callable]] = None,
1916 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
1917 startup_nodes: Optional[List["ClusterNode"]] = None,
1918 read_from_replicas: bool = False,
1919 cluster_error_retry_attempts: int = 3,
1920 reinitialize_steps: int = 5,
1921 lock=None,
1922 **kwargs,
1923 ):
1924 """ """
1925 self.command_stack = []
1926 self.nodes_manager = nodes_manager
1927 self.commands_parser = commands_parser
1928 self.refresh_table_asap = False
1929 self.result_callbacks = (
1930 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
1931 )
1932 self.startup_nodes = startup_nodes if startup_nodes else []
1933 self.read_from_replicas = read_from_replicas
1934 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
1935 self.cluster_response_callbacks = cluster_response_callbacks
1936 self.cluster_error_retry_attempts = cluster_error_retry_attempts
1937 self.reinitialize_counter = 0
1938 self.reinitialize_steps = reinitialize_steps
1939 self.encoder = Encoder(
1940 kwargs.get("encoding", "utf-8"),
1941 kwargs.get("encoding_errors", "strict"),
1942 kwargs.get("decode_responses", False),
1943 )
1944 if lock is None:
1945 lock = threading.Lock()
1946 self._lock = lock
1948 def __repr__(self):
1949 """ """
1950 return f"{type(self).__name__}"
1952 def __enter__(self):
1953 """ """
1954 return self
1956 def __exit__(self, exc_type, exc_value, traceback):
1957 """ """
1958 self.reset()
1960 def __del__(self):
1961 try:
1962 self.reset()
1963 except Exception:
1964 pass
1966 def __len__(self):
1967 """ """
1968 return len(self.command_stack)
1970 def __bool__(self):
1971 "Pipeline instances should always evaluate to True on Python 3+"
1972 return True
1974 def execute_command(self, *args, **kwargs):
1975 """
1976 Wrapper function for pipeline_execute_command
1977 """
1978 kwargs.pop("keys", None) # the keys are used only for client side caching
1979 return self.pipeline_execute_command(*args, **kwargs)
1981 def pipeline_execute_command(self, *args, **options):
1982 """
1983 Appends the executed command to the pipeline's command stack
1984 """
1985 self.command_stack.append(
1986 PipelineCommand(args, options, len(self.command_stack))
1987 )
1988 return self
1990 def raise_first_error(self, stack):
1991 """
1992 Raise the first exception on the stack
1993 """
1994 for c in stack:
1995 r = c.result
1996 if isinstance(r, Exception):
1997 self.annotate_exception(r, c.position + 1, c.args)
1998 raise r
2000 def annotate_exception(self, exception, number, command):
2001 """
2002 Provides extra context to the exception prior to it being handled
2003 """
2004 cmd = " ".join(map(safe_str, command))
2005 msg = (
2006 f"Command # {number} ({cmd}) of pipeline "
2007 f"caused error: {exception.args[0]}"
2008 )
2009 exception.args = (msg,) + exception.args[1:]
2011 def execute(self, raise_on_error=True):
2012 """
2013 Execute all the commands in the current pipeline
2014 """
2015 stack = self.command_stack
2016 try:
2017 return self.send_cluster_commands(stack, raise_on_error)
2018 finally:
2019 self.reset()
2021 def reset(self):
2022 """
2023 Reset back to empty pipeline.
2024 """
2025 self.command_stack = []
2027 self.scripts = set()
2029 # TODO: Implement
2030 # make sure to reset the connection state in the event that we were
2031 # watching something
2032 # if self.watching and self.connection:
2033 # try:
2034 # # call this manually since our unwatch or
2035 # # immediate_execute_command methods can call reset()
2036 # self.connection.send_command('UNWATCH')
2037 # self.connection.read_response()
2038 # except ConnectionError:
2039 # # disconnect will also remove any previous WATCHes
2040 # self.connection.disconnect()
2042 # clean up the other instance attributes
2043 self.watching = False
2044 self.explicit_transaction = False
2046 # TODO: Implement
2047 # we can safely return the connection to the pool here since we're
2048 # sure we're no longer WATCHing anything
2049 # if self.connection:
2050 # self.connection_pool.release(self.connection)
2051 # self.connection = None
2053 def send_cluster_commands(
2054 self, stack, raise_on_error=True, allow_redirections=True
2055 ):
2056 """
2057 Wrapper for CLUSTERDOWN error handling.
2059 If the cluster reports it is down it is assumed that:
2060 - connection_pool was disconnected
2061 - connection_pool was reseted
2062 - refereh_table_asap set to True
2064 It will try the number of times specified by
2065 the config option "self.cluster_error_retry_attempts"
2066 which defaults to 3 unless manually configured.
2068 If it reaches the number of times, the command will
2069 raises ClusterDownException.
2070 """
2071 if not stack:
2072 return []
2073 retry_attempts = self.cluster_error_retry_attempts
2074 while True:
2075 try:
2076 return self._send_cluster_commands(
2077 stack,
2078 raise_on_error=raise_on_error,
2079 allow_redirections=allow_redirections,
2080 )
2081 except (ClusterDownError, ConnectionError) as e:
2082 if retry_attempts > 0:
2083 # Try again with the new cluster setup. All other errors
2084 # should be raised.
2085 retry_attempts -= 1
2086 pass
2087 else:
2088 raise e
2090 def _send_cluster_commands(
2091 self, stack, raise_on_error=True, allow_redirections=True
2092 ):
2093 """
2094 Send a bunch of cluster commands to the redis cluster.
2096 `allow_redirections` If the pipeline should follow
2097 `ASK` & `MOVED` responses automatically. If set
2098 to false it will raise RedisClusterException.
2099 """
2100 # the first time sending the commands we send all of
2101 # the commands that were queued up.
2102 # if we have to run through it again, we only retry
2103 # the commands that failed.
2104 attempt = sorted(stack, key=lambda x: x.position)
2105 is_default_node = False
2106 # build a list of node objects based on node names we need to
2107 nodes = {}
2109 # as we move through each command that still needs to be processed,
2110 # we figure out the slot number that command maps to, then from
2111 # the slot determine the node.
2112 for c in attempt:
2113 while True:
2114 # refer to our internal node -> slot table that
2115 # tells us where a given command should route to.
2116 # (it might be possible we have a cached node that no longer
2117 # exists in the cluster, which is why we do this in a loop)
2118 passed_targets = c.options.pop("target_nodes", None)
2119 if passed_targets and not self._is_nodes_flag(passed_targets):
2120 target_nodes = self._parse_target_nodes(passed_targets)
2121 else:
2122 target_nodes = self._determine_nodes(
2123 *c.args, node_flag=passed_targets
2124 )
2125 if not target_nodes:
2126 raise RedisClusterException(
2127 f"No targets were found to execute {c.args} command on"
2128 )
2129 if len(target_nodes) > 1:
2130 raise RedisClusterException(
2131 f"Too many targets for command {c.args}"
2132 )
2134 node = target_nodes[0]
2135 if node == self.get_default_node():
2136 is_default_node = True
2138 # now that we know the name of the node
2139 # ( it's just a string in the form of host:port )
2140 # we can build a list of commands for each node.
2141 node_name = node.name
2142 if node_name not in nodes:
2143 redis_node = self.get_redis_connection(node)
2144 try:
2145 connection = get_connection(redis_node, c.args)
2146 except ConnectionError:
2147 for n in nodes.values():
2148 n.connection_pool.release(n.connection)
2149 # Connection retries are being handled in the node's
2150 # Retry object. Reinitialize the node -> slot table.
2151 self.nodes_manager.initialize()
2152 if is_default_node:
2153 self.replace_default_node()
2154 raise
2155 nodes[node_name] = NodeCommands(
2156 redis_node.parse_response,
2157 redis_node.connection_pool,
2158 connection,
2159 )
2160 nodes[node_name].append(c)
2161 break
2163 # send the commands in sequence.
2164 # we write to all the open sockets for each node first,
2165 # before reading anything
2166 # this allows us to flush all the requests out across the
2167 # network essentially in parallel
2168 # so that we can read them all in parallel as they come back.
2169 # we dont' multiplex on the sockets as they come available,
2170 # but that shouldn't make too much difference.
2171 node_commands = nodes.values()
2172 try:
2173 node_commands = nodes.values()
2174 for n in node_commands:
2175 n.write()
2177 for n in node_commands:
2178 n.read()
2179 finally:
2180 # release all of the redis connections we allocated earlier
2181 # back into the connection pool.
2182 # we used to do this step as part of a try/finally block,
2183 # but it is really dangerous to
2184 # release connections back into the pool if for some
2185 # reason the socket has data still left in it
2186 # from a previous operation. The write and
2187 # read operations already have try/catch around them for
2188 # all known types of errors including connection
2189 # and socket level errors.
2190 # So if we hit an exception, something really bad
2191 # happened and putting any oF
2192 # these connections back into the pool is a very bad idea.
2193 # the socket might have unread buffer still sitting in it,
2194 # and then the next time we read from it we pass the
2195 # buffered result back from a previous command and
2196 # every single request after to that connection will always get
2197 # a mismatched result.
2198 for n in nodes.values():
2199 n.connection_pool.release(n.connection)
2201 # if the response isn't an exception it is a
2202 # valid response from the node
2203 # we're all done with that command, YAY!
2204 # if we have more commands to attempt, we've run into problems.
2205 # collect all the commands we are allowed to retry.
2206 # (MOVED, ASK, or connection errors or timeout errors)
2207 attempt = sorted(
2208 (
2209 c
2210 for c in attempt
2211 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2212 ),
2213 key=lambda x: x.position,
2214 )
2215 if attempt and allow_redirections:
2216 # RETRY MAGIC HAPPENS HERE!
2217 # send these remaining commands one at a time using `execute_command`
2218 # in the main client. This keeps our retry logic
2219 # in one place mostly,
2220 # and allows us to be more confident in correctness of behavior.
2221 # at this point any speed gains from pipelining have been lost
2222 # anyway, so we might as well make the best
2223 # attempt to get the correct behavior.
2224 #
2225 # The client command will handle retries for each
2226 # individual command sequentially as we pass each
2227 # one into `execute_command`. Any exceptions
2228 # that bubble out should only appear once all
2229 # retries have been exhausted.
2230 #
2231 # If a lot of commands have failed, we'll be setting the
2232 # flag to rebuild the slots table from scratch.
2233 # So MOVED errors should correct themselves fairly quickly.
2234 self.reinitialize_counter += 1
2235 if self._should_reinitialized():
2236 self.nodes_manager.initialize()
2237 if is_default_node:
2238 self.replace_default_node()
2239 for c in attempt:
2240 try:
2241 # send each command individually like we
2242 # do in the main client.
2243 c.result = super().execute_command(*c.args, **c.options)
2244 except RedisError as e:
2245 c.result = e
2247 # turn the response back into a simple flat array that corresponds
2248 # to the sequence of commands issued in the stack in pipeline.execute()
2249 response = []
2250 for c in sorted(stack, key=lambda x: x.position):
2251 if c.args[0] in self.cluster_response_callbacks:
2252 c.result = self.cluster_response_callbacks[c.args[0]](
2253 c.result, **c.options
2254 )
2255 response.append(c.result)
2257 if raise_on_error:
2258 self.raise_first_error(stack)
2260 return response
2262 def _fail_on_redirect(self, allow_redirections):
2263 """ """
2264 if not allow_redirections:
2265 raise RedisClusterException(
2266 "ASK & MOVED redirection not allowed in this pipeline"
2267 )
2269 def exists(self, *keys):
2270 return self.execute_command("EXISTS", *keys)
2272 def eval(self):
2273 """ """
2274 raise RedisClusterException("method eval() is not implemented")
2276 def multi(self):
2277 """ """
2278 raise RedisClusterException("method multi() is not implemented")
2280 def immediate_execute_command(self, *args, **options):
2281 """ """
2282 raise RedisClusterException(
2283 "method immediate_execute_command() is not implemented"
2284 )
2286 def _execute_transaction(self, *args, **kwargs):
2287 """ """
2288 raise RedisClusterException("method _execute_transaction() is not implemented")
2290 def load_scripts(self):
2291 """ """
2292 raise RedisClusterException("method load_scripts() is not implemented")
2294 def watch(self, *names):
2295 """ """
2296 raise RedisClusterException("method watch() is not implemented")
2298 def unwatch(self):
2299 """ """
2300 raise RedisClusterException("method unwatch() is not implemented")
2302 def script_load_for_pipeline(self, *args, **kwargs):
2303 """ """
2304 raise RedisClusterException(
2305 "method script_load_for_pipeline() is not implemented"
2306 )
2308 def delete(self, *names):
2309 """
2310 "Delete a key specified by ``names``"
2311 """
2312 if len(names) != 1:
2313 raise RedisClusterException(
2314 "deleting multiple keys is not implemented in pipeline command"
2315 )
2317 return self.execute_command("DEL", names[0])
2319 def unlink(self, *names):
2320 """
2321 "Unlink a key specified by ``names``"
2322 """
2323 if len(names) != 1:
2324 raise RedisClusterException(
2325 "unlinking multiple keys is not implemented in pipeline command"
2326 )
2328 return self.execute_command("UNLINK", names[0])
2331def block_pipeline_command(name: str) -> Callable[..., Any]:
2332 """
2333 Prints error because some pipelined commands should
2334 be blocked when running in cluster-mode
2335 """
2337 def inner(*args, **kwargs):
2338 raise RedisClusterException(
2339 f"ERROR: Calling pipelined function {name} is blocked "
2340 f"when running redis in cluster mode..."
2341 )
2343 return inner
2346# Blocked pipeline commands
2347PIPELINE_BLOCKED_COMMANDS = (
2348 "BGREWRITEAOF",
2349 "BGSAVE",
2350 "BITOP",
2351 "BRPOPLPUSH",
2352 "CLIENT GETNAME",
2353 "CLIENT KILL",
2354 "CLIENT LIST",
2355 "CLIENT SETNAME",
2356 "CLIENT",
2357 "CONFIG GET",
2358 "CONFIG RESETSTAT",
2359 "CONFIG REWRITE",
2360 "CONFIG SET",
2361 "CONFIG",
2362 "DBSIZE",
2363 "ECHO",
2364 "EVALSHA",
2365 "FLUSHALL",
2366 "FLUSHDB",
2367 "INFO",
2368 "KEYS",
2369 "LASTSAVE",
2370 "MGET",
2371 "MGET NONATOMIC",
2372 "MOVE",
2373 "MSET",
2374 "MSET NONATOMIC",
2375 "MSETNX",
2376 "PFCOUNT",
2377 "PFMERGE",
2378 "PING",
2379 "PUBLISH",
2380 "RANDOMKEY",
2381 "READONLY",
2382 "READWRITE",
2383 "RENAME",
2384 "RENAMENX",
2385 "RPOPLPUSH",
2386 "SAVE",
2387 "SCAN",
2388 "SCRIPT EXISTS",
2389 "SCRIPT FLUSH",
2390 "SCRIPT KILL",
2391 "SCRIPT LOAD",
2392 "SCRIPT",
2393 "SDIFF",
2394 "SDIFFSTORE",
2395 "SENTINEL GET MASTER ADDR BY NAME",
2396 "SENTINEL MASTER",
2397 "SENTINEL MASTERS",
2398 "SENTINEL MONITOR",
2399 "SENTINEL REMOVE",
2400 "SENTINEL SENTINELS",
2401 "SENTINEL SET",
2402 "SENTINEL SLAVES",
2403 "SENTINEL",
2404 "SHUTDOWN",
2405 "SINTER",
2406 "SINTERSTORE",
2407 "SLAVEOF",
2408 "SLOWLOG GET",
2409 "SLOWLOG LEN",
2410 "SLOWLOG RESET",
2411 "SLOWLOG",
2412 "SMOVE",
2413 "SORT",
2414 "SUNION",
2415 "SUNIONSTORE",
2416 "TIME",
2417)
2418for command in PIPELINE_BLOCKED_COMMANDS:
2419 command = command.replace(" ", "_").lower()
2421 setattr(ClusterPipeline, command, block_pipeline_command(command))
2424class PipelineCommand:
2425 """ """
2427 def __init__(self, args, options=None, position=None):
2428 self.args = args
2429 if options is None:
2430 options = {}
2431 self.options = options
2432 self.position = position
2433 self.result = None
2434 self.node = None
2435 self.asking = False
2438class NodeCommands:
2439 """ """
2441 def __init__(self, parse_response, connection_pool, connection):
2442 """ """
2443 self.parse_response = parse_response
2444 self.connection_pool = connection_pool
2445 self.connection = connection
2446 self.commands = []
2448 def append(self, c):
2449 """ """
2450 self.commands.append(c)
2452 def write(self):
2453 """
2454 Code borrowed from Redis so it can be fixed
2455 """
2456 connection = self.connection
2457 commands = self.commands
2459 # We are going to clobber the commands with the write, so go ahead
2460 # and ensure that nothing is sitting there from a previous run.
2461 for c in commands:
2462 c.result = None
2464 # build up all commands into a single request to increase network perf
2465 # send all the commands and catch connection and timeout errors.
2466 try:
2467 connection.send_packed_command(
2468 connection.pack_commands([c.args for c in commands])
2469 )
2470 except (ConnectionError, TimeoutError) as e:
2471 for c in commands:
2472 c.result = e
2474 def read(self):
2475 """ """
2476 connection = self.connection
2477 for c in self.commands:
2478 # if there is a result on this command,
2479 # it means we ran into an exception
2480 # like a connection error. Trying to parse
2481 # a response on a connection that
2482 # is no longer open will result in a
2483 # connection error raised by redis-py.
2484 # but redis-py doesn't check in parse_response
2485 # that the sock object is
2486 # still set and if you try to
2487 # read from a closed connection, it will
2488 # result in an AttributeError because
2489 # it will do a readline() call on None.
2490 # This can have all kinds of nasty side-effects.
2491 # Treating this case as a connection error
2492 # is fine because it will dump
2493 # the connection object back into the
2494 # pool and on the next write, it will
2495 # explicitly open the connection and all will be well.
2496 if c.result is None:
2497 try:
2498 c.result = self.parse_response(connection, c.args[0], **c.options)
2499 except (ConnectionError, TimeoutError) as e:
2500 for c in self.commands:
2501 c.result = e
2502 return
2503 except RedisError:
2504 c.result = sys.exc_info()[1]