Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/cluster.py: 19%
842 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:16 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:16 +0000
1import random
2import socket
3import sys
4import threading
5import time
6from collections import OrderedDict
7from typing import Any, Callable, Dict, List, Optional, Tuple, Union
9from redis.backoff import default_backoff
10from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan
11from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands
12from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url
13from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
14from redis.exceptions import (
15 AskError,
16 AuthenticationError,
17 ClusterCrossSlotError,
18 ClusterDownError,
19 ClusterError,
20 ConnectionError,
21 DataError,
22 MasterDownError,
23 MovedError,
24 RedisClusterException,
25 RedisError,
26 ResponseError,
27 SlotNotCoveredError,
28 TimeoutError,
29 TryAgainError,
30)
31from redis.lock import Lock
32from redis.retry import Retry
33from redis.utils import (
34 dict_merge,
35 list_keys_to_dict,
36 merge_result,
37 safe_str,
38 str_if_bytes,
39)
42def get_node_name(host: str, port: Union[str, int]) -> str:
43 return f"{host}:{port}"
46def get_connection(redis_node, *args, **options):
47 return redis_node.connection or redis_node.connection_pool.get_connection(
48 args[0], **options
49 )
52def parse_scan_result(command, res, **options):
53 cursors = {}
54 ret = []
55 for node_name, response in res.items():
56 cursor, r = parse_scan(response, **options)
57 cursors[node_name] = cursor
58 ret += r
60 return cursors, ret
63def parse_pubsub_numsub(command, res, **options):
64 numsub_d = OrderedDict()
65 for numsub_tups in res.values():
66 for channel, numsubbed in numsub_tups:
67 try:
68 numsub_d[channel] += numsubbed
69 except KeyError:
70 numsub_d[channel] = numsubbed
72 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
73 return ret_numsub
76def parse_cluster_slots(
77 resp: Any, **options: Any
78) -> Dict[Tuple[int, int], Dict[str, Any]]:
79 current_host = options.get("current_host", "")
81 def fix_server(*args: Any) -> Tuple[str, Any]:
82 return str_if_bytes(args[0]) or current_host, args[1]
84 slots = {}
85 for slot in resp:
86 start, end, primary = slot[:3]
87 replicas = slot[3:]
88 slots[start, end] = {
89 "primary": fix_server(*primary),
90 "replicas": [fix_server(*replica) for replica in replicas],
91 }
93 return slots
96def parse_cluster_shards(resp, **options):
97 """
98 Parse CLUSTER SHARDS response.
99 """
100 shards = []
101 for x in resp:
102 shard = {"slots": [], "nodes": []}
103 for i in range(0, len(x[1]), 2):
104 shard["slots"].append((x[1][i], (x[1][i + 1])))
105 nodes = x[3]
106 for node in nodes:
107 dict_node = {}
108 for i in range(0, len(node), 2):
109 dict_node[node[i]] = node[i + 1]
110 shard["nodes"].append(dict_node)
111 shards.append(shard)
113 return shards
116def parse_cluster_myshardid(resp, **options):
117 """
118 Parse CLUSTER MYSHARDID response.
119 """
120 return resp.decode("utf-8")
123PRIMARY = "primary"
124REPLICA = "replica"
125SLOT_ID = "slot-id"
127REDIS_ALLOWED_KEYS = (
128 "charset",
129 "connection_class",
130 "connection_pool",
131 "connection_pool_class",
132 "client_name",
133 "credential_provider",
134 "db",
135 "decode_responses",
136 "encoding",
137 "encoding_errors",
138 "errors",
139 "host",
140 "max_connections",
141 "nodes_flag",
142 "redis_connect_func",
143 "password",
144 "port",
145 "queue_class",
146 "retry",
147 "retry_on_timeout",
148 "socket_connect_timeout",
149 "socket_keepalive",
150 "socket_keepalive_options",
151 "socket_timeout",
152 "ssl",
153 "ssl_ca_certs",
154 "ssl_ca_data",
155 "ssl_certfile",
156 "ssl_cert_reqs",
157 "ssl_keyfile",
158 "ssl_password",
159 "unix_socket_path",
160 "username",
161)
162KWARGS_DISABLED_KEYS = ("host", "port")
165def cleanup_kwargs(**kwargs):
166 """
167 Remove unsupported or disabled keys from kwargs
168 """
169 connection_kwargs = {
170 k: v
171 for k, v in kwargs.items()
172 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
173 }
175 return connection_kwargs
178class ClusterParser(DefaultParser):
179 EXCEPTION_CLASSES = dict_merge(
180 DefaultParser.EXCEPTION_CLASSES,
181 {
182 "ASK": AskError,
183 "TRYAGAIN": TryAgainError,
184 "MOVED": MovedError,
185 "CLUSTERDOWN": ClusterDownError,
186 "CROSSSLOT": ClusterCrossSlotError,
187 "MASTERDOWN": MasterDownError,
188 },
189 )
192class AbstractRedisCluster:
193 RedisClusterRequestTTL = 16
195 PRIMARIES = "primaries"
196 REPLICAS = "replicas"
197 ALL_NODES = "all"
198 RANDOM = "random"
199 DEFAULT_NODE = "default-node"
201 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
203 COMMAND_FLAGS = dict_merge(
204 list_keys_to_dict(
205 [
206 "ACL CAT",
207 "ACL DELUSER",
208 "ACL DRYRUN",
209 "ACL GENPASS",
210 "ACL GETUSER",
211 "ACL HELP",
212 "ACL LIST",
213 "ACL LOG",
214 "ACL LOAD",
215 "ACL SAVE",
216 "ACL SETUSER",
217 "ACL USERS",
218 "ACL WHOAMI",
219 "AUTH",
220 "CLIENT LIST",
221 "CLIENT SETNAME",
222 "CLIENT GETNAME",
223 "CONFIG SET",
224 "CONFIG REWRITE",
225 "CONFIG RESETSTAT",
226 "TIME",
227 "PUBSUB CHANNELS",
228 "PUBSUB NUMPAT",
229 "PUBSUB NUMSUB",
230 "PING",
231 "INFO",
232 "SHUTDOWN",
233 "KEYS",
234 "DBSIZE",
235 "BGSAVE",
236 "SLOWLOG GET",
237 "SLOWLOG LEN",
238 "SLOWLOG RESET",
239 "WAIT",
240 "SAVE",
241 "MEMORY PURGE",
242 "MEMORY MALLOC-STATS",
243 "MEMORY STATS",
244 "LASTSAVE",
245 "CLIENT TRACKINGINFO",
246 "CLIENT PAUSE",
247 "CLIENT UNPAUSE",
248 "CLIENT UNBLOCK",
249 "CLIENT ID",
250 "CLIENT REPLY",
251 "CLIENT GETREDIR",
252 "CLIENT INFO",
253 "CLIENT KILL",
254 "READONLY",
255 "READWRITE",
256 "CLUSTER INFO",
257 "CLUSTER MEET",
258 "CLUSTER NODES",
259 "CLUSTER REPLICAS",
260 "CLUSTER RESET",
261 "CLUSTER SET-CONFIG-EPOCH",
262 "CLUSTER SLOTS",
263 "CLUSTER SHARDS",
264 "CLUSTER COUNT-FAILURE-REPORTS",
265 "CLUSTER KEYSLOT",
266 "COMMAND",
267 "COMMAND COUNT",
268 "COMMAND LIST",
269 "COMMAND GETKEYS",
270 "CONFIG GET",
271 "DEBUG",
272 "RANDOMKEY",
273 "READONLY",
274 "READWRITE",
275 "TIME",
276 "GRAPH.CONFIG",
277 "LATENCY HISTORY",
278 "LATENCY LATEST",
279 "LATENCY RESET",
280 ],
281 DEFAULT_NODE,
282 ),
283 list_keys_to_dict(
284 [
285 "FLUSHALL",
286 "FLUSHDB",
287 "FUNCTION DELETE",
288 "FUNCTION FLUSH",
289 "FUNCTION LIST",
290 "FUNCTION LOAD",
291 "FUNCTION RESTORE",
292 "SCAN",
293 "SCRIPT EXISTS",
294 "SCRIPT FLUSH",
295 "SCRIPT LOAD",
296 ],
297 PRIMARIES,
298 ),
299 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
300 list_keys_to_dict(
301 [
302 "CLUSTER COUNTKEYSINSLOT",
303 "CLUSTER DELSLOTS",
304 "CLUSTER DELSLOTSRANGE",
305 "CLUSTER GETKEYSINSLOT",
306 "CLUSTER SETSLOT",
307 ],
308 SLOT_ID,
309 ),
310 )
312 SEARCH_COMMANDS = (
313 [
314 "FT.CREATE",
315 "FT.SEARCH",
316 "FT.AGGREGATE",
317 "FT.EXPLAIN",
318 "FT.EXPLAINCLI",
319 "FT,PROFILE",
320 "FT.ALTER",
321 "FT.DROPINDEX",
322 "FT.ALIASADD",
323 "FT.ALIASUPDATE",
324 "FT.ALIASDEL",
325 "FT.TAGVALS",
326 "FT.SUGADD",
327 "FT.SUGGET",
328 "FT.SUGDEL",
329 "FT.SUGLEN",
330 "FT.SYNUPDATE",
331 "FT.SYNDUMP",
332 "FT.SPELLCHECK",
333 "FT.DICTADD",
334 "FT.DICTDEL",
335 "FT.DICTDUMP",
336 "FT.INFO",
337 "FT._LIST",
338 "FT.CONFIG",
339 "FT.ADD",
340 "FT.DEL",
341 "FT.DROP",
342 "FT.GET",
343 "FT.MGET",
344 "FT.SYNADD",
345 ],
346 )
348 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
349 "CLUSTER SLOTS": parse_cluster_slots,
350 "CLUSTER SHARDS": parse_cluster_shards,
351 "CLUSTER MYSHARDID": parse_cluster_myshardid,
352 }
354 RESULT_CALLBACKS = dict_merge(
355 list_keys_to_dict(["PUBSUB NUMSUB"], parse_pubsub_numsub),
356 list_keys_to_dict(
357 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
358 ),
359 list_keys_to_dict(["KEYS", "PUBSUB CHANNELS"], merge_result),
360 list_keys_to_dict(
361 [
362 "PING",
363 "CONFIG SET",
364 "CONFIG REWRITE",
365 "CONFIG RESETSTAT",
366 "CLIENT SETNAME",
367 "BGSAVE",
368 "SLOWLOG RESET",
369 "SAVE",
370 "MEMORY PURGE",
371 "CLIENT PAUSE",
372 "CLIENT UNPAUSE",
373 ],
374 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
375 ),
376 list_keys_to_dict(
377 ["DBSIZE", "WAIT"],
378 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
379 ),
380 list_keys_to_dict(
381 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
382 ),
383 list_keys_to_dict(["SCAN"], parse_scan_result),
384 list_keys_to_dict(
385 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
386 ),
387 list_keys_to_dict(
388 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
389 ),
390 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
391 )
393 ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError)
395 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
396 """Replace the default cluster node.
397 A random cluster node will be chosen if target_node isn't passed, and primaries
398 will be prioritized. The default node will not be changed if there are no other
399 nodes in the cluster.
401 Args:
402 target_node (ClusterNode, optional): Target node to replace the default
403 node. Defaults to None.
404 """
405 if target_node:
406 self.nodes_manager.default_node = target_node
407 else:
408 curr_node = self.get_default_node()
409 primaries = [node for node in self.get_primaries() if node != curr_node]
410 if primaries:
411 # Choose a primary if the cluster contains different primaries
412 self.nodes_manager.default_node = random.choice(primaries)
413 else:
414 # Otherwise, hoose a primary if the cluster contains different primaries
415 replicas = [node for node in self.get_replicas() if node != curr_node]
416 if replicas:
417 self.nodes_manager.default_node = random.choice(replicas)
420class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
421 @classmethod
422 def from_url(cls, url, **kwargs):
423 """
424 Return a Redis client object configured from the given URL
426 For example::
428 redis://[[username]:[password]]@localhost:6379/0
429 rediss://[[username]:[password]]@localhost:6379/0
430 unix://[username@]/path/to/socket.sock?db=0[&password=password]
432 Three URL schemes are supported:
434 - `redis://` creates a TCP socket connection. See more at:
435 <https://www.iana.org/assignments/uri-schemes/prov/redis>
436 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
437 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
438 - ``unix://``: creates a Unix Domain Socket connection.
440 The username, password, hostname, path and all querystring values
441 are passed through urllib.parse.unquote in order to replace any
442 percent-encoded values with their corresponding characters.
444 There are several ways to specify a database number. The first value
445 found will be used:
447 1. A ``db`` querystring option, e.g. redis://localhost?db=0
448 2. If using the redis:// or rediss:// schemes, the path argument
449 of the url, e.g. redis://localhost/0
450 3. A ``db`` keyword argument to this function.
452 If none of these options are specified, the default db=0 is used.
454 All querystring options are cast to their appropriate Python types.
455 Boolean arguments can be specified with string values "True"/"False"
456 or "Yes"/"No". Values that cannot be properly cast cause a
457 ``ValueError`` to be raised. Once parsed, the querystring arguments
458 and keyword arguments are passed to the ``ConnectionPool``'s
459 class initializer. In the case of conflicting arguments, querystring
460 arguments always win.
462 """
463 return cls(url=url, **kwargs)
465 def __init__(
466 self,
467 host: Optional[str] = None,
468 port: int = 6379,
469 startup_nodes: Optional[List["ClusterNode"]] = None,
470 cluster_error_retry_attempts: int = 3,
471 retry: Optional["Retry"] = None,
472 require_full_coverage: bool = False,
473 reinitialize_steps: int = 5,
474 read_from_replicas: bool = False,
475 dynamic_startup_nodes: bool = True,
476 url: Optional[str] = None,
477 address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None,
478 **kwargs,
479 ):
480 """
481 Initialize a new RedisCluster client.
483 :param startup_nodes:
484 List of nodes from which initial bootstrapping can be done
485 :param host:
486 Can be used to point to a startup node
487 :param port:
488 Can be used to point to a startup node
489 :param require_full_coverage:
490 When set to False (default value): the client will not require a
491 full coverage of the slots. However, if not all slots are covered,
492 and at least one node has 'cluster-require-full-coverage' set to
493 'yes,' the server will throw a ClusterDownError for some key-based
494 commands. See -
495 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
496 When set to True: all slots must be covered to construct the
497 cluster client. If not all slots are covered, RedisClusterException
498 will be thrown.
499 :param read_from_replicas:
500 Enable read from replicas in READONLY mode. You can read possibly
501 stale data.
502 When set to true, read commands will be assigned between the
503 primary and its replications in a Round-Robin manner.
504 :param dynamic_startup_nodes:
505 Set the RedisCluster's startup nodes to all of the discovered nodes.
506 If true (default value), the cluster's discovered nodes will be used to
507 determine the cluster nodes-slots mapping in the next topology refresh.
508 It will remove the initial passed startup nodes if their endpoints aren't
509 listed in the CLUSTER SLOTS output.
510 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
511 specific IP addresses, it is best to set it to false.
512 :param cluster_error_retry_attempts:
513 Number of times to retry before raising an error when
514 :class:`~.TimeoutError` or :class:`~.ConnectionError` or
515 :class:`~.ClusterDownError` are encountered
516 :param reinitialize_steps:
517 Specifies the number of MOVED errors that need to occur before
518 reinitializing the whole cluster topology. If a MOVED error occurs
519 and the cluster does not need to be reinitialized on this current
520 error handling, only the MOVED slot will be patched with the
521 redirected node.
522 To reinitialize the cluster on every MOVED error, set
523 reinitialize_steps to 1.
524 To avoid reinitializing the cluster on moved errors, set
525 reinitialize_steps to 0.
526 :param address_remap:
527 An optional callable which, when provided with an internal network
528 address of a node, e.g. a `(host, port)` tuple, will return the address
529 where the node is reachable. This can be used to map the addresses at
530 which the nodes _think_ they are, to addresses at which a client may
531 reach them, such as when they sit behind a proxy.
533 :**kwargs:
534 Extra arguments that will be sent into Redis instance when created
535 (See Official redis-py doc for supported kwargs
536 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
537 Some kwargs are not supported and will raise a
538 RedisClusterException:
539 - db (Redis do not support database SELECT in cluster mode)
540 """
541 if startup_nodes is None:
542 startup_nodes = []
544 if "db" in kwargs:
545 # Argument 'db' is not possible to use in cluster mode
546 raise RedisClusterException(
547 "Argument 'db' is not possible to use in cluster mode"
548 )
550 # Get the startup node/s
551 from_url = False
552 if url is not None:
553 from_url = True
554 url_options = parse_url(url)
555 if "path" in url_options:
556 raise RedisClusterException(
557 "RedisCluster does not currently support Unix Domain "
558 "Socket connections"
559 )
560 if "db" in url_options and url_options["db"] != 0:
561 # Argument 'db' is not possible to use in cluster mode
562 raise RedisClusterException(
563 "A ``db`` querystring option can only be 0 in cluster mode"
564 )
565 kwargs.update(url_options)
566 host = kwargs.get("host")
567 port = kwargs.get("port", port)
568 startup_nodes.append(ClusterNode(host, port))
569 elif host is not None and port is not None:
570 startup_nodes.append(ClusterNode(host, port))
571 elif len(startup_nodes) == 0:
572 # No startup node was provided
573 raise RedisClusterException(
574 "RedisCluster requires at least one node to discover the "
575 "cluster. Please provide one of the followings:\n"
576 "1. host and port, for example:\n"
577 " RedisCluster(host='localhost', port=6379)\n"
578 "2. list of startup nodes, for example:\n"
579 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
580 " ClusterNode('localhost', 6378)])"
581 )
582 # Update the connection arguments
583 # Whenever a new connection is established, RedisCluster's on_connect
584 # method should be run
585 # If the user passed on_connect function we'll save it and run it
586 # inside the RedisCluster.on_connect() function
587 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
588 kwargs.update({"redis_connect_func": self.on_connect})
589 kwargs = cleanup_kwargs(**kwargs)
590 if retry:
591 self.retry = retry
592 kwargs.update({"retry": self.retry})
593 else:
594 kwargs.update({"retry": Retry(default_backoff(), 0)})
596 self.encoder = Encoder(
597 kwargs.get("encoding", "utf-8"),
598 kwargs.get("encoding_errors", "strict"),
599 kwargs.get("decode_responses", False),
600 )
601 self.cluster_error_retry_attempts = cluster_error_retry_attempts
602 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
603 self.node_flags = self.__class__.NODE_FLAGS.copy()
604 self.read_from_replicas = read_from_replicas
605 self.reinitialize_counter = 0
606 self.reinitialize_steps = reinitialize_steps
607 self.nodes_manager = NodesManager(
608 startup_nodes=startup_nodes,
609 from_url=from_url,
610 require_full_coverage=require_full_coverage,
611 dynamic_startup_nodes=dynamic_startup_nodes,
612 address_remap=address_remap,
613 **kwargs,
614 )
616 self.cluster_response_callbacks = CaseInsensitiveDict(
617 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
618 )
619 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
620 self.commands_parser = CommandsParser(self)
621 self._lock = threading.Lock()
623 def __enter__(self):
624 return self
626 def __exit__(self, exc_type, exc_value, traceback):
627 self.close()
629 def __del__(self):
630 self.close()
632 def disconnect_connection_pools(self):
633 for node in self.get_nodes():
634 if node.redis_connection:
635 try:
636 node.redis_connection.connection_pool.disconnect()
637 except OSError:
638 # Client was already disconnected. do nothing
639 pass
641 def on_connect(self, connection):
642 """
643 Initialize the connection, authenticate and select a database and send
644 READONLY if it is set during object initialization.
645 """
646 connection.set_parser(ClusterParser)
647 connection.on_connect()
649 if self.read_from_replicas:
650 # Sending READONLY command to server to configure connection as
651 # readonly. Since each cluster node may change its server type due
652 # to a failover, we should establish a READONLY connection
653 # regardless of the server type. If this is a primary connection,
654 # READONLY would not affect executing write commands.
655 connection.send_command("READONLY")
656 if str_if_bytes(connection.read_response()) != "OK":
657 raise ConnectionError("READONLY command failed")
659 if self.user_on_connect_func is not None:
660 self.user_on_connect_func(connection)
662 def get_redis_connection(self, node):
663 if not node.redis_connection:
664 with self._lock:
665 if not node.redis_connection:
666 self.nodes_manager.create_redis_connections([node])
667 return node.redis_connection
669 def get_node(self, host=None, port=None, node_name=None):
670 return self.nodes_manager.get_node(host, port, node_name)
672 def get_primaries(self):
673 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
675 def get_replicas(self):
676 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
678 def get_random_node(self):
679 return random.choice(list(self.nodes_manager.nodes_cache.values()))
681 def get_nodes(self):
682 return list(self.nodes_manager.nodes_cache.values())
684 def get_node_from_key(self, key, replica=False):
685 """
686 Get the node that holds the key's slot.
687 If replica set to True but the slot doesn't have any replicas, None is
688 returned.
689 """
690 slot = self.keyslot(key)
691 slot_cache = self.nodes_manager.slots_cache.get(slot)
692 if slot_cache is None or len(slot_cache) == 0:
693 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
694 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
695 return None
696 elif replica:
697 node_idx = 1
698 else:
699 # primary
700 node_idx = 0
702 return slot_cache[node_idx]
704 def get_default_node(self):
705 """
706 Get the cluster's default node
707 """
708 return self.nodes_manager.default_node
710 def set_default_node(self, node):
711 """
712 Set the default node of the cluster.
713 :param node: 'ClusterNode'
714 :return True if the default node was set, else False
715 """
716 if node is None or self.get_node(node_name=node.name) is None:
717 return False
718 self.nodes_manager.default_node = node
719 return True
721 def get_retry(self) -> Optional["Retry"]:
722 return self.retry
724 def set_retry(self, retry: "Retry") -> None:
725 self.retry = retry
726 for node in self.get_nodes():
727 node.redis_connection.set_retry(retry)
729 def monitor(self, target_node=None):
730 """
731 Returns a Monitor object for the specified target node.
732 The default cluster node will be selected if no target node was
733 specified.
734 Monitor is useful for handling the MONITOR command to the redis server.
735 next_command() method returns one command from monitor
736 listen() method yields commands from monitor.
737 """
738 if target_node is None:
739 target_node = self.get_default_node()
740 if target_node.redis_connection is None:
741 raise RedisClusterException(
742 f"Cluster Node {target_node.name} has no redis_connection"
743 )
744 return target_node.redis_connection.monitor()
746 def pubsub(self, node=None, host=None, port=None, **kwargs):
747 """
748 Allows passing a ClusterNode, or host&port, to get a pubsub instance
749 connected to the specified node
750 """
751 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
753 def pipeline(self, transaction=None, shard_hint=None):
754 """
755 Cluster impl:
756 Pipelines do not work in cluster mode the same way they
757 do in normal mode. Create a clone of this object so
758 that simulating pipelines will work correctly. Each
759 command will be called directly when used and
760 when calling execute() will only return the result stack.
761 """
762 if shard_hint:
763 raise RedisClusterException("shard_hint is deprecated in cluster mode")
765 if transaction:
766 raise RedisClusterException("transaction is deprecated in cluster mode")
768 return ClusterPipeline(
769 nodes_manager=self.nodes_manager,
770 commands_parser=self.commands_parser,
771 startup_nodes=self.nodes_manager.startup_nodes,
772 result_callbacks=self.result_callbacks,
773 cluster_response_callbacks=self.cluster_response_callbacks,
774 cluster_error_retry_attempts=self.cluster_error_retry_attempts,
775 read_from_replicas=self.read_from_replicas,
776 reinitialize_steps=self.reinitialize_steps,
777 lock=self._lock,
778 )
780 def lock(
781 self,
782 name,
783 timeout=None,
784 sleep=0.1,
785 blocking=True,
786 blocking_timeout=None,
787 lock_class=None,
788 thread_local=True,
789 ):
790 """
791 Return a new Lock object using key ``name`` that mimics
792 the behavior of threading.Lock.
794 If specified, ``timeout`` indicates a maximum life for the lock.
795 By default, it will remain locked until release() is called.
797 ``sleep`` indicates the amount of time to sleep per loop iteration
798 when the lock is in blocking mode and another client is currently
799 holding the lock.
801 ``blocking`` indicates whether calling ``acquire`` should block until
802 the lock has been acquired or to fail immediately, causing ``acquire``
803 to return False and the lock not being acquired. Defaults to True.
804 Note this value can be overridden by passing a ``blocking``
805 argument to ``acquire``.
807 ``blocking_timeout`` indicates the maximum amount of time in seconds to
808 spend trying to acquire the lock. A value of ``None`` indicates
809 continue trying forever. ``blocking_timeout`` can be specified as a
810 float or integer, both representing the number of seconds to wait.
812 ``lock_class`` forces the specified lock implementation. Note that as
813 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
814 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
815 you have created your own custom lock class.
817 ``thread_local`` indicates whether the lock token is placed in
818 thread-local storage. By default, the token is placed in thread local
819 storage so that a thread only sees its token, not a token set by
820 another thread. Consider the following timeline:
822 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
823 thread-1 sets the token to "abc"
824 time: 1, thread-2 blocks trying to acquire `my-lock` using the
825 Lock instance.
826 time: 5, thread-1 has not yet completed. redis expires the lock
827 key.
828 time: 5, thread-2 acquired `my-lock` now that it's available.
829 thread-2 sets the token to "xyz"
830 time: 6, thread-1 finishes its work and calls release(). if the
831 token is *not* stored in thread local storage, then
832 thread-1 would see the token value as "xyz" and would be
833 able to successfully release the thread-2's lock.
835 In some use cases it's necessary to disable thread local storage. For
836 example, if you have code where one thread acquires a lock and passes
837 that lock instance to a worker thread to release later. If thread
838 local storage isn't disabled in this case, the worker thread won't see
839 the token set by the thread that acquired the lock. Our assumption
840 is that these cases aren't common and as such default to using
841 thread local storage."""
842 if lock_class is None:
843 lock_class = Lock
844 return lock_class(
845 self,
846 name,
847 timeout=timeout,
848 sleep=sleep,
849 blocking=blocking,
850 blocking_timeout=blocking_timeout,
851 thread_local=thread_local,
852 )
854 def set_response_callback(self, command, callback):
855 """Set a custom Response Callback"""
856 self.cluster_response_callbacks[command] = callback
858 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
859 # Determine which nodes should be executed the command on.
860 # Returns a list of target nodes.
861 command = args[0].upper()
862 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
863 command = f"{args[0]} {args[1]}".upper()
865 nodes_flag = kwargs.pop("nodes_flag", None)
866 if nodes_flag is not None:
867 # nodes flag passed by the user
868 command_flag = nodes_flag
869 else:
870 # get the nodes group for this command if it was predefined
871 command_flag = self.command_flags.get(command)
872 if command_flag == self.__class__.RANDOM:
873 # return a random node
874 return [self.get_random_node()]
875 elif command_flag == self.__class__.PRIMARIES:
876 # return all primaries
877 return self.get_primaries()
878 elif command_flag == self.__class__.REPLICAS:
879 # return all replicas
880 return self.get_replicas()
881 elif command_flag == self.__class__.ALL_NODES:
882 # return all nodes
883 return self.get_nodes()
884 elif command_flag == self.__class__.DEFAULT_NODE:
885 # return the cluster's default node
886 return [self.nodes_manager.default_node]
887 elif command in self.__class__.SEARCH_COMMANDS[0]:
888 return [self.nodes_manager.default_node]
889 else:
890 # get the node that holds the key's slot
891 slot = self.determine_slot(*args)
892 node = self.nodes_manager.get_node_from_slot(
893 slot, self.read_from_replicas and command in READ_COMMANDS
894 )
895 return [node]
897 def _should_reinitialized(self):
898 # To reinitialize the cluster on every MOVED error,
899 # set reinitialize_steps to 1.
900 # To avoid reinitializing the cluster on moved errors, set
901 # reinitialize_steps to 0.
902 if self.reinitialize_steps == 0:
903 return False
904 else:
905 return self.reinitialize_counter % self.reinitialize_steps == 0
907 def keyslot(self, key):
908 """
909 Calculate keyslot for a given key.
910 See Keys distribution model in https://redis.io/topics/cluster-spec
911 """
912 k = self.encoder.encode(key)
913 return key_slot(k)
915 def _get_command_keys(self, *args):
916 """
917 Get the keys in the command. If the command has no keys in in, None is
918 returned.
920 NOTE: Due to a bug in redis<7.0, this function does not work properly
921 for EVAL or EVALSHA when the `numkeys` arg is 0.
922 - issue: https://github.com/redis/redis/issues/9493
923 - fix: https://github.com/redis/redis/pull/9733
925 So, don't use this function with EVAL or EVALSHA.
926 """
927 redis_conn = self.get_default_node().redis_connection
928 return self.commands_parser.get_keys(redis_conn, *args)
930 def determine_slot(self, *args):
931 """
932 Figure out what slot to use based on args.
934 Raises a RedisClusterException if there's a missing key and we can't
935 determine what slots to map the command to; or, if the keys don't
936 all map to the same key slot.
937 """
938 command = args[0]
939 if self.command_flags.get(command) == SLOT_ID:
940 # The command contains the slot ID
941 return args[1]
943 # Get the keys in the command
945 # EVAL and EVALSHA are common enough that it's wasteful to go to the
946 # redis server to parse the keys. Besides, there is a bug in redis<7.0
947 # where `self._get_command_keys()` fails anyway. So, we special case
948 # EVAL/EVALSHA.
949 if command in ("EVAL", "EVALSHA"):
950 # command syntax: EVAL "script body" num_keys ...
951 if len(args) <= 2:
952 raise RedisClusterException(f"Invalid args in command: {args}")
953 num_actual_keys = args[2]
954 eval_keys = args[3 : 3 + num_actual_keys]
955 # if there are 0 keys, that means the script can be run on any node
956 # so we can just return a random slot
957 if len(eval_keys) == 0:
958 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
959 keys = eval_keys
960 else:
961 keys = self._get_command_keys(*args)
962 if keys is None or len(keys) == 0:
963 # FCALL can call a function with 0 keys, that means the function
964 # can be run on any node so we can just return a random slot
965 if command in ("FCALL", "FCALL_RO"):
966 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
967 raise RedisClusterException(
968 "No way to dispatch this command to Redis Cluster. "
969 "Missing key.\nYou can execute the command by specifying "
970 f"target nodes.\nCommand: {args}"
971 )
973 # single key command
974 if len(keys) == 1:
975 return self.keyslot(keys[0])
977 # multi-key command; we need to make sure all keys are mapped to
978 # the same slot
979 slots = {self.keyslot(key) for key in keys}
980 if len(slots) != 1:
981 raise RedisClusterException(
982 f"{command} - all keys must map to the same key slot"
983 )
985 return slots.pop()
987 def get_encoder(self):
988 """
989 Get the connections' encoder
990 """
991 return self.encoder
993 def get_connection_kwargs(self):
994 """
995 Get the connections' key-word arguments
996 """
997 return self.nodes_manager.connection_kwargs
999 def _is_nodes_flag(self, target_nodes):
1000 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1002 def _parse_target_nodes(self, target_nodes):
1003 if isinstance(target_nodes, list):
1004 nodes = target_nodes
1005 elif isinstance(target_nodes, ClusterNode):
1006 # Supports passing a single ClusterNode as a variable
1007 nodes = [target_nodes]
1008 elif isinstance(target_nodes, dict):
1009 # Supports dictionaries of the format {node_name: node}.
1010 # It enables to execute commands with multi nodes as follows:
1011 # rc.cluster_save_config(rc.get_primaries())
1012 nodes = target_nodes.values()
1013 else:
1014 raise TypeError(
1015 "target_nodes type can be one of the following: "
1016 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1017 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1018 f"The passed type is {type(target_nodes)}"
1019 )
1020 return nodes
1022 def execute_command(self, *args, **kwargs):
1023 """
1024 Wrapper for ERRORS_ALLOW_RETRY error handling.
1026 It will try the number of times specified by the config option
1027 "self.cluster_error_retry_attempts" which defaults to 3 unless manually
1028 configured.
1030 If it reaches the number of times, the command will raise the exception
1032 Key argument :target_nodes: can be passed with the following types:
1033 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1034 ClusterNode
1035 list<ClusterNode>
1036 dict<Any, ClusterNode>
1037 """
1038 target_nodes_specified = False
1039 is_default_node = False
1040 target_nodes = None
1041 passed_targets = kwargs.pop("target_nodes", None)
1042 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1043 target_nodes = self._parse_target_nodes(passed_targets)
1044 target_nodes_specified = True
1045 # If an error that allows retrying was thrown, the nodes and slots
1046 # cache were reinitialized. We will retry executing the command with
1047 # the updated cluster setup only when the target nodes can be
1048 # determined again with the new cache tables. Therefore, when target
1049 # nodes were passed to this function, we cannot retry the command
1050 # execution since the nodes may not be valid anymore after the tables
1051 # were reinitialized. So in case of passed target nodes,
1052 # retry_attempts will be set to 0.
1053 retry_attempts = (
1054 0 if target_nodes_specified else self.cluster_error_retry_attempts
1055 )
1056 # Add one for the first execution
1057 execute_attempts = 1 + retry_attempts
1058 for _ in range(execute_attempts):
1059 try:
1060 res = {}
1061 if not target_nodes_specified:
1062 # Determine the nodes to execute the command on
1063 target_nodes = self._determine_nodes(
1064 *args, **kwargs, nodes_flag=passed_targets
1065 )
1066 if not target_nodes:
1067 raise RedisClusterException(
1068 f"No targets were found to execute {args} command on"
1069 )
1070 if (
1071 len(target_nodes) == 1
1072 and target_nodes[0] == self.get_default_node()
1073 ):
1074 is_default_node = True
1075 for node in target_nodes:
1076 res[node.name] = self._execute_command(node, *args, **kwargs)
1077 # Return the processed result
1078 return self._process_result(args[0], res, **kwargs)
1079 except Exception as e:
1080 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1081 if is_default_node:
1082 # Replace the default cluster node
1083 self.replace_default_node()
1084 # The nodes and slots cache were reinitialized.
1085 # Try again with the new cluster setup.
1086 retry_attempts -= 1
1087 continue
1088 else:
1089 # raise the exception
1090 raise e
1092 def _execute_command(self, target_node, *args, **kwargs):
1093 """
1094 Send a command to a node in the cluster
1095 """
1096 command = args[0]
1097 redis_node = None
1098 connection = None
1099 redirect_addr = None
1100 asking = False
1101 moved = False
1102 ttl = int(self.RedisClusterRequestTTL)
1104 while ttl > 0:
1105 ttl -= 1
1106 try:
1107 if asking:
1108 target_node = self.get_node(node_name=redirect_addr)
1109 elif moved:
1110 # MOVED occurred and the slots cache was updated,
1111 # refresh the target node
1112 slot = self.determine_slot(*args)
1113 target_node = self.nodes_manager.get_node_from_slot(
1114 slot, self.read_from_replicas and command in READ_COMMANDS
1115 )
1116 moved = False
1118 redis_node = self.get_redis_connection(target_node)
1119 connection = get_connection(redis_node, *args, **kwargs)
1120 if asking:
1121 connection.send_command("ASKING")
1122 redis_node.parse_response(connection, "ASKING", **kwargs)
1123 asking = False
1125 connection.send_command(*args)
1126 response = redis_node.parse_response(connection, command, **kwargs)
1127 if command in self.cluster_response_callbacks:
1128 response = self.cluster_response_callbacks[command](
1129 response, **kwargs
1130 )
1131 return response
1132 except AuthenticationError:
1133 raise
1134 except (ConnectionError, TimeoutError) as e:
1135 # Connection retries are being handled in the node's
1136 # Retry object.
1137 # ConnectionError can also be raised if we couldn't get a
1138 # connection from the pool before timing out, so check that
1139 # this is an actual connection before attempting to disconnect.
1140 if connection is not None:
1141 connection.disconnect()
1143 # Remove the failed node from the startup nodes before we try
1144 # to reinitialize the cluster
1145 self.nodes_manager.startup_nodes.pop(target_node.name, None)
1146 # Reset the cluster node's connection
1147 target_node.redis_connection = None
1148 self.nodes_manager.initialize()
1149 raise e
1150 except MovedError as e:
1151 # First, we will try to patch the slots/nodes cache with the
1152 # redirected node output and try again. If MovedError exceeds
1153 # 'reinitialize_steps' number of times, we will force
1154 # reinitializing the tables, and then try again.
1155 # 'reinitialize_steps' counter will increase faster when
1156 # the same client object is shared between multiple threads. To
1157 # reduce the frequency you can set this variable in the
1158 # RedisCluster constructor.
1159 self.reinitialize_counter += 1
1160 if self._should_reinitialized():
1161 self.nodes_manager.initialize()
1162 # Reset the counter
1163 self.reinitialize_counter = 0
1164 else:
1165 self.nodes_manager.update_moved_exception(e)
1166 moved = True
1167 except TryAgainError:
1168 if ttl < self.RedisClusterRequestTTL / 2:
1169 time.sleep(0.05)
1170 except AskError as e:
1171 redirect_addr = get_node_name(host=e.host, port=e.port)
1172 asking = True
1173 except ClusterDownError as e:
1174 # ClusterDownError can occur during a failover and to get
1175 # self-healed, we will try to reinitialize the cluster layout
1176 # and retry executing the command
1177 time.sleep(0.25)
1178 self.nodes_manager.initialize()
1179 raise e
1180 except ResponseError:
1181 raise
1182 except Exception as e:
1183 if connection:
1184 connection.disconnect()
1185 raise e
1186 finally:
1187 if connection is not None:
1188 redis_node.connection_pool.release(connection)
1190 raise ClusterError("TTL exhausted.")
1192 def close(self):
1193 try:
1194 with self._lock:
1195 if self.nodes_manager:
1196 self.nodes_manager.close()
1197 except AttributeError:
1198 # RedisCluster's __init__ can fail before nodes_manager is set
1199 pass
1201 def _process_result(self, command, res, **kwargs):
1202 """
1203 Process the result of the executed command.
1204 The function would return a dict or a single value.
1206 :type command: str
1207 :type res: dict
1209 `res` should be in the following format:
1210 Dict<node_name, command_result>
1211 """
1212 if command in self.result_callbacks:
1213 return self.result_callbacks[command](command, res, **kwargs)
1214 elif len(res) == 1:
1215 # When we execute the command on a single node, we can
1216 # remove the dictionary and return a single response
1217 return list(res.values())[0]
1218 else:
1219 return res
1221 def load_external_module(self, funcname, func):
1222 """
1223 This function can be used to add externally defined redis modules,
1224 and their namespaces to the redis client.
1226 ``funcname`` - A string containing the name of the function to create
1227 ``func`` - The function, being added to this class.
1228 """
1229 setattr(self, funcname, func)
1232class ClusterNode:
1233 def __init__(self, host, port, server_type=None, redis_connection=None):
1234 if host == "localhost":
1235 host = socket.gethostbyname(host)
1237 self.host = host
1238 self.port = port
1239 self.name = get_node_name(host, port)
1240 self.server_type = server_type
1241 self.redis_connection = redis_connection
1243 def __repr__(self):
1244 return (
1245 f"[host={self.host},"
1246 f"port={self.port},"
1247 f"name={self.name},"
1248 f"server_type={self.server_type},"
1249 f"redis_connection={self.redis_connection}]"
1250 )
1252 def __eq__(self, obj):
1253 return isinstance(obj, ClusterNode) and obj.name == self.name
1255 def __del__(self):
1256 if self.redis_connection is not None:
1257 self.redis_connection.close()
1260class LoadBalancer:
1261 """
1262 Round-Robin Load Balancing
1263 """
1265 def __init__(self, start_index: int = 0) -> None:
1266 self.primary_to_idx = {}
1267 self.start_index = start_index
1269 def get_server_index(self, primary: str, list_size: int) -> int:
1270 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1271 # Update the index
1272 self.primary_to_idx[primary] = (server_index + 1) % list_size
1273 return server_index
1275 def reset(self) -> None:
1276 self.primary_to_idx.clear()
1279class NodesManager:
1280 def __init__(
1281 self,
1282 startup_nodes,
1283 from_url=False,
1284 require_full_coverage=False,
1285 lock=None,
1286 dynamic_startup_nodes=True,
1287 connection_pool_class=ConnectionPool,
1288 address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None,
1289 **kwargs,
1290 ):
1291 self.nodes_cache = {}
1292 self.slots_cache = {}
1293 self.startup_nodes = {}
1294 self.default_node = None
1295 self.populate_startup_nodes(startup_nodes)
1296 self.from_url = from_url
1297 self._require_full_coverage = require_full_coverage
1298 self._dynamic_startup_nodes = dynamic_startup_nodes
1299 self.connection_pool_class = connection_pool_class
1300 self.address_remap = address_remap
1301 self._moved_exception = None
1302 self.connection_kwargs = kwargs
1303 self.read_load_balancer = LoadBalancer()
1304 if lock is None:
1305 lock = threading.Lock()
1306 self._lock = lock
1307 self.initialize()
1309 def get_node(self, host=None, port=None, node_name=None):
1310 """
1311 Get the requested node from the cluster's nodes.
1312 nodes.
1313 :return: ClusterNode if the node exists, else None
1314 """
1315 if host and port:
1316 # the user passed host and port
1317 if host == "localhost":
1318 host = socket.gethostbyname(host)
1319 return self.nodes_cache.get(get_node_name(host=host, port=port))
1320 elif node_name:
1321 return self.nodes_cache.get(node_name)
1322 else:
1323 return None
1325 def update_moved_exception(self, exception):
1326 self._moved_exception = exception
1328 def _update_moved_slots(self):
1329 """
1330 Update the slot's node with the redirected one
1331 """
1332 e = self._moved_exception
1333 redirected_node = self.get_node(host=e.host, port=e.port)
1334 if redirected_node is not None:
1335 # The node already exists
1336 if redirected_node.server_type is not PRIMARY:
1337 # Update the node's server type
1338 redirected_node.server_type = PRIMARY
1339 else:
1340 # This is a new node, we will add it to the nodes cache
1341 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
1342 self.nodes_cache[redirected_node.name] = redirected_node
1343 if redirected_node in self.slots_cache[e.slot_id]:
1344 # The MOVED error resulted from a failover, and the new slot owner
1345 # had previously been a replica.
1346 old_primary = self.slots_cache[e.slot_id][0]
1347 # Update the old primary to be a replica and add it to the end of
1348 # the slot's node list
1349 old_primary.server_type = REPLICA
1350 self.slots_cache[e.slot_id].append(old_primary)
1351 # Remove the old replica, which is now a primary, from the slot's
1352 # node list
1353 self.slots_cache[e.slot_id].remove(redirected_node)
1354 # Override the old primary with the new one
1355 self.slots_cache[e.slot_id][0] = redirected_node
1356 if self.default_node == old_primary:
1357 # Update the default node with the new primary
1358 self.default_node = redirected_node
1359 else:
1360 # The new slot owner is a new server, or a server from a different
1361 # shard. We need to remove all current nodes from the slot's list
1362 # (including replications) and add just the new node.
1363 self.slots_cache[e.slot_id] = [redirected_node]
1364 # Reset moved_exception
1365 self._moved_exception = None
1367 def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
1368 """
1369 Gets a node that servers this hash slot
1370 """
1371 if self._moved_exception:
1372 with self._lock:
1373 if self._moved_exception:
1374 self._update_moved_slots()
1376 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
1377 raise SlotNotCoveredError(
1378 f'Slot "{slot}" not covered by the cluster. '
1379 f'"require_full_coverage={self._require_full_coverage}"'
1380 )
1382 if read_from_replicas is True:
1383 # get the server index in a Round-Robin manner
1384 primary_name = self.slots_cache[slot][0].name
1385 node_idx = self.read_load_balancer.get_server_index(
1386 primary_name, len(self.slots_cache[slot])
1387 )
1388 elif (
1389 server_type is None
1390 or server_type == PRIMARY
1391 or len(self.slots_cache[slot]) == 1
1392 ):
1393 # return a primary
1394 node_idx = 0
1395 else:
1396 # return a replica
1397 # randomly choose one of the replicas
1398 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
1400 return self.slots_cache[slot][node_idx]
1402 def get_nodes_by_server_type(self, server_type):
1403 """
1404 Get all nodes with the specified server type
1405 :param server_type: 'primary' or 'replica'
1406 :return: list of ClusterNode
1407 """
1408 return [
1409 node
1410 for node in self.nodes_cache.values()
1411 if node.server_type == server_type
1412 ]
1414 def populate_startup_nodes(self, nodes):
1415 """
1416 Populate all startup nodes and filters out any duplicates
1417 """
1418 for n in nodes:
1419 self.startup_nodes[n.name] = n
1421 def check_slots_coverage(self, slots_cache):
1422 # Validate if all slots are covered or if we should try next
1423 # startup node
1424 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
1425 if i not in slots_cache:
1426 return False
1427 return True
1429 def create_redis_connections(self, nodes):
1430 """
1431 This function will create a redis connection to all nodes in :nodes:
1432 """
1433 for node in nodes:
1434 if node.redis_connection is None:
1435 node.redis_connection = self.create_redis_node(
1436 host=node.host, port=node.port, **self.connection_kwargs
1437 )
1439 def create_redis_node(self, host, port, **kwargs):
1440 if self.from_url:
1441 # Create a redis node with a costumed connection pool
1442 kwargs.update({"host": host})
1443 kwargs.update({"port": port})
1444 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
1445 else:
1446 r = Redis(host=host, port=port, **kwargs)
1447 return r
1449 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
1450 node_name = get_node_name(host, port)
1451 # check if we already have this node in the tmp_nodes_cache
1452 target_node = tmp_nodes_cache.get(node_name)
1453 if target_node is None:
1454 # before creating a new cluster node, check if the cluster node already
1455 # exists in the current nodes cache and has a valid connection so we can
1456 # reuse it
1457 target_node = self.nodes_cache.get(node_name)
1458 if target_node is None or target_node.redis_connection is None:
1459 # create new cluster node for this cluster
1460 target_node = ClusterNode(host, port, role)
1461 if target_node.server_type != role:
1462 target_node.server_type = role
1464 return target_node
1466 def initialize(self):
1467 """
1468 Initializes the nodes cache, slots cache and redis connections.
1469 :startup_nodes:
1470 Responsible for discovering other nodes in the cluster
1471 """
1472 self.reset()
1473 tmp_nodes_cache = {}
1474 tmp_slots = {}
1475 disagreements = []
1476 startup_nodes_reachable = False
1477 fully_covered = False
1478 kwargs = self.connection_kwargs
1479 exception = None
1480 for startup_node in self.startup_nodes.values():
1481 try:
1482 if startup_node.redis_connection:
1483 r = startup_node.redis_connection
1484 else:
1485 # Create a new Redis connection
1486 r = self.create_redis_node(
1487 startup_node.host, startup_node.port, **kwargs
1488 )
1489 self.startup_nodes[startup_node.name].redis_connection = r
1490 # Make sure cluster mode is enabled on this node
1491 if bool(r.info().get("cluster_enabled")) is False:
1492 raise RedisClusterException(
1493 "Cluster mode is not enabled on this node"
1494 )
1495 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1496 startup_nodes_reachable = True
1497 except Exception as e:
1498 # Try the next startup node.
1499 # The exception is saved and raised only if we have no more nodes.
1500 exception = e
1501 continue
1503 # CLUSTER SLOTS command results in the following output:
1504 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
1505 # where each node contains the following list: [IP, port, node_id]
1506 # Therefore, cluster_slots[0][2][0] will be the IP address of the
1507 # primary node of the first slot section.
1508 # If there's only one server in the cluster, its ``host`` is ''
1509 # Fix it to the host in startup_nodes
1510 if (
1511 len(cluster_slots) == 1
1512 and len(cluster_slots[0][2][0]) == 0
1513 and len(self.startup_nodes) == 1
1514 ):
1515 cluster_slots[0][2][0] = startup_node.host
1517 for slot in cluster_slots:
1518 primary_node = slot[2]
1519 host = str_if_bytes(primary_node[0])
1520 if host == "":
1521 host = startup_node.host
1522 port = int(primary_node[1])
1523 host, port = self.remap_host_port(host, port)
1525 target_node = self._get_or_create_cluster_node(
1526 host, port, PRIMARY, tmp_nodes_cache
1527 )
1528 # add this node to the nodes cache
1529 tmp_nodes_cache[target_node.name] = target_node
1531 for i in range(int(slot[0]), int(slot[1]) + 1):
1532 if i not in tmp_slots:
1533 tmp_slots[i] = []
1534 tmp_slots[i].append(target_node)
1535 replica_nodes = [slot[j] for j in range(3, len(slot))]
1537 for replica_node in replica_nodes:
1538 host = str_if_bytes(replica_node[0])
1539 port = replica_node[1]
1540 host, port = self.remap_host_port(host, port)
1542 target_replica_node = self._get_or_create_cluster_node(
1543 host, port, REPLICA, tmp_nodes_cache
1544 )
1545 tmp_slots[i].append(target_replica_node)
1546 # add this node to the nodes cache
1547 tmp_nodes_cache[
1548 target_replica_node.name
1549 ] = target_replica_node
1550 else:
1551 # Validate that 2 nodes want to use the same slot cache
1552 # setup
1553 tmp_slot = tmp_slots[i][0]
1554 if tmp_slot.name != target_node.name:
1555 disagreements.append(
1556 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
1557 )
1559 if len(disagreements) > 5:
1560 raise RedisClusterException(
1561 f"startup_nodes could not agree on a valid "
1562 f'slots cache: {", ".join(disagreements)}'
1563 )
1565 fully_covered = self.check_slots_coverage(tmp_slots)
1566 if fully_covered:
1567 # Don't need to continue to the next startup node if all
1568 # slots are covered
1569 break
1571 if not startup_nodes_reachable:
1572 raise RedisClusterException(
1573 f"Redis Cluster cannot be connected. Please provide at least "
1574 f"one reachable node: {str(exception)}"
1575 ) from exception
1577 # Create Redis connections to all nodes
1578 self.create_redis_connections(list(tmp_nodes_cache.values()))
1580 # Check if the slots are not fully covered
1581 if not fully_covered and self._require_full_coverage:
1582 # Despite the requirement that the slots be covered, there
1583 # isn't a full coverage
1584 raise RedisClusterException(
1585 f"All slots are not covered after query all startup_nodes. "
1586 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
1587 f"covered..."
1588 )
1590 # Set the tmp variables to the real variables
1591 self.nodes_cache = tmp_nodes_cache
1592 self.slots_cache = tmp_slots
1593 # Set the default node
1594 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
1595 if self._dynamic_startup_nodes:
1596 # Populate the startup nodes with all discovered nodes
1597 self.startup_nodes = tmp_nodes_cache
1598 # If initialize was called after a MovedError, clear it
1599 self._moved_exception = None
1601 def close(self):
1602 self.default_node = None
1603 for node in self.nodes_cache.values():
1604 if node.redis_connection:
1605 node.redis_connection.close()
1607 def reset(self):
1608 try:
1609 self.read_load_balancer.reset()
1610 except TypeError:
1611 # The read_load_balancer is None, do nothing
1612 pass
1614 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
1615 """
1616 Remap the host and port returned from the cluster to a different
1617 internal value. Useful if the client is not connecting directly
1618 to the cluster.
1619 """
1620 if self.address_remap:
1621 return self.address_remap((host, port))
1622 return host, port
1625class ClusterPubSub(PubSub):
1626 """
1627 Wrapper for PubSub class.
1629 IMPORTANT: before using ClusterPubSub, read about the known limitations
1630 with pubsub in Cluster mode and learn how to workaround them:
1631 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
1632 """
1634 def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs):
1635 """
1636 When a pubsub instance is created without specifying a node, a single
1637 node will be transparently chosen for the pubsub connection on the
1638 first command execution. The node will be determined by:
1639 1. Hashing the channel name in the request to find its keyslot
1640 2. Selecting a node that handles the keyslot: If read_from_replicas is
1641 set to true, a replica can be selected.
1643 :type redis_cluster: RedisCluster
1644 :type node: ClusterNode
1645 :type host: str
1646 :type port: int
1647 """
1648 self.node = None
1649 self.set_pubsub_node(redis_cluster, node, host, port)
1650 connection_pool = (
1651 None
1652 if self.node is None
1653 else redis_cluster.get_redis_connection(self.node).connection_pool
1654 )
1655 self.cluster = redis_cluster
1656 super().__init__(
1657 **kwargs, connection_pool=connection_pool, encoder=redis_cluster.encoder
1658 )
1660 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
1661 """
1662 The pubsub node will be set according to the passed node, host and port
1663 When none of the node, host, or port are specified - the node is set
1664 to None and will be determined by the keyslot of the channel in the
1665 first command to be executed.
1666 RedisClusterException will be thrown if the passed node does not exist
1667 in the cluster.
1668 If host is passed without port, or vice versa, a DataError will be
1669 thrown.
1670 :type cluster: RedisCluster
1671 :type node: ClusterNode
1672 :type host: str
1673 :type port: int
1674 """
1675 if node is not None:
1676 # node is passed by the user
1677 self._raise_on_invalid_node(cluster, node, node.host, node.port)
1678 pubsub_node = node
1679 elif host is not None and port is not None:
1680 # host and port passed by the user
1681 node = cluster.get_node(host=host, port=port)
1682 self._raise_on_invalid_node(cluster, node, host, port)
1683 pubsub_node = node
1684 elif any([host, port]) is True:
1685 # only 'host' or 'port' passed
1686 raise DataError("Passing a host requires passing a port, and vice versa")
1687 else:
1688 # nothing passed by the user. set node to None
1689 pubsub_node = None
1691 self.node = pubsub_node
1693 def get_pubsub_node(self):
1694 """
1695 Get the node that is being used as the pubsub connection
1696 """
1697 return self.node
1699 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
1700 """
1701 Raise a RedisClusterException if the node is None or doesn't exist in
1702 the cluster.
1703 """
1704 if node is None or redis_cluster.get_node(node_name=node.name) is None:
1705 raise RedisClusterException(
1706 f"Node {host}:{port} doesn't exist in the cluster"
1707 )
1709 def execute_command(self, *args, **kwargs):
1710 """
1711 Execute a publish/subscribe command.
1713 Taken code from redis-py and tweak to make it work within a cluster.
1714 """
1715 # NOTE: don't parse the response in this function -- it could pull a
1716 # legitimate message off the stack if the connection is already
1717 # subscribed to one or more channels
1719 if self.connection is None:
1720 if self.connection_pool is None:
1721 if len(args) > 1:
1722 # Hash the first channel and get one of the nodes holding
1723 # this slot
1724 channel = args[1]
1725 slot = self.cluster.keyslot(channel)
1726 node = self.cluster.nodes_manager.get_node_from_slot(
1727 slot, self.cluster.read_from_replicas
1728 )
1729 else:
1730 # Get a random node
1731 node = self.cluster.get_random_node()
1732 self.node = node
1733 redis_connection = self.cluster.get_redis_connection(node)
1734 self.connection_pool = redis_connection.connection_pool
1735 self.connection = self.connection_pool.get_connection(
1736 "pubsub", self.shard_hint
1737 )
1738 # register a callback that re-subscribes to any channels we
1739 # were listening to when we were disconnected
1740 self.connection.register_connect_callback(self.on_connect)
1741 connection = self.connection
1742 self._execute(connection, connection.send_command, *args)
1744 def get_redis_connection(self):
1745 """
1746 Get the Redis connection of the pubsub connected node.
1747 """
1748 if self.node is not None:
1749 return self.node.redis_connection
1752class ClusterPipeline(RedisCluster):
1753 """
1754 Support for Redis pipeline
1755 in cluster mode
1756 """
1758 ERRORS_ALLOW_RETRY = (
1759 ConnectionError,
1760 TimeoutError,
1761 MovedError,
1762 AskError,
1763 TryAgainError,
1764 )
1766 def __init__(
1767 self,
1768 nodes_manager: "NodesManager",
1769 commands_parser: "CommandsParser",
1770 result_callbacks: Optional[Dict[str, Callable]] = None,
1771 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
1772 startup_nodes: Optional[List["ClusterNode"]] = None,
1773 read_from_replicas: bool = False,
1774 cluster_error_retry_attempts: int = 3,
1775 reinitialize_steps: int = 5,
1776 lock=None,
1777 **kwargs,
1778 ):
1779 """ """
1780 self.command_stack = []
1781 self.nodes_manager = nodes_manager
1782 self.commands_parser = commands_parser
1783 self.refresh_table_asap = False
1784 self.result_callbacks = (
1785 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
1786 )
1787 self.startup_nodes = startup_nodes if startup_nodes else []
1788 self.read_from_replicas = read_from_replicas
1789 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
1790 self.cluster_response_callbacks = cluster_response_callbacks
1791 self.cluster_error_retry_attempts = cluster_error_retry_attempts
1792 self.reinitialize_counter = 0
1793 self.reinitialize_steps = reinitialize_steps
1794 self.encoder = Encoder(
1795 kwargs.get("encoding", "utf-8"),
1796 kwargs.get("encoding_errors", "strict"),
1797 kwargs.get("decode_responses", False),
1798 )
1799 if lock is None:
1800 lock = threading.Lock()
1801 self._lock = lock
1803 def __repr__(self):
1804 """ """
1805 return f"{type(self).__name__}"
1807 def __enter__(self):
1808 """ """
1809 return self
1811 def __exit__(self, exc_type, exc_value, traceback):
1812 """ """
1813 self.reset()
1815 def __del__(self):
1816 try:
1817 self.reset()
1818 except Exception:
1819 pass
1821 def __len__(self):
1822 """ """
1823 return len(self.command_stack)
1825 def __bool__(self):
1826 "Pipeline instances should always evaluate to True on Python 3+"
1827 return True
1829 def execute_command(self, *args, **kwargs):
1830 """
1831 Wrapper function for pipeline_execute_command
1832 """
1833 return self.pipeline_execute_command(*args, **kwargs)
1835 def pipeline_execute_command(self, *args, **options):
1836 """
1837 Appends the executed command to the pipeline's command stack
1838 """
1839 self.command_stack.append(
1840 PipelineCommand(args, options, len(self.command_stack))
1841 )
1842 return self
1844 def raise_first_error(self, stack):
1845 """
1846 Raise the first exception on the stack
1847 """
1848 for c in stack:
1849 r = c.result
1850 if isinstance(r, Exception):
1851 self.annotate_exception(r, c.position + 1, c.args)
1852 raise r
1854 def annotate_exception(self, exception, number, command):
1855 """
1856 Provides extra context to the exception prior to it being handled
1857 """
1858 cmd = " ".join(map(safe_str, command))
1859 msg = (
1860 f"Command # {number} ({cmd}) of pipeline "
1861 f"caused error: {exception.args[0]}"
1862 )
1863 exception.args = (msg,) + exception.args[1:]
1865 def execute(self, raise_on_error=True):
1866 """
1867 Execute all the commands in the current pipeline
1868 """
1869 stack = self.command_stack
1870 try:
1871 return self.send_cluster_commands(stack, raise_on_error)
1872 finally:
1873 self.reset()
1875 def reset(self):
1876 """
1877 Reset back to empty pipeline.
1878 """
1879 self.command_stack = []
1881 self.scripts = set()
1883 # TODO: Implement
1884 # make sure to reset the connection state in the event that we were
1885 # watching something
1886 # if self.watching and self.connection:
1887 # try:
1888 # # call this manually since our unwatch or
1889 # # immediate_execute_command methods can call reset()
1890 # self.connection.send_command('UNWATCH')
1891 # self.connection.read_response()
1892 # except ConnectionError:
1893 # # disconnect will also remove any previous WATCHes
1894 # self.connection.disconnect()
1896 # clean up the other instance attributes
1897 self.watching = False
1898 self.explicit_transaction = False
1900 # TODO: Implement
1901 # we can safely return the connection to the pool here since we're
1902 # sure we're no longer WATCHing anything
1903 # if self.connection:
1904 # self.connection_pool.release(self.connection)
1905 # self.connection = None
1907 def send_cluster_commands(
1908 self, stack, raise_on_error=True, allow_redirections=True
1909 ):
1910 """
1911 Wrapper for CLUSTERDOWN error handling.
1913 If the cluster reports it is down it is assumed that:
1914 - connection_pool was disconnected
1915 - connection_pool was reseted
1916 - refereh_table_asap set to True
1918 It will try the number of times specified by
1919 the config option "self.cluster_error_retry_attempts"
1920 which defaults to 3 unless manually configured.
1922 If it reaches the number of times, the command will
1923 raises ClusterDownException.
1924 """
1925 if not stack:
1926 return []
1927 retry_attempts = self.cluster_error_retry_attempts
1928 while True:
1929 try:
1930 return self._send_cluster_commands(
1931 stack,
1932 raise_on_error=raise_on_error,
1933 allow_redirections=allow_redirections,
1934 )
1935 except (ClusterDownError, ConnectionError) as e:
1936 if retry_attempts > 0:
1937 # Try again with the new cluster setup. All other errors
1938 # should be raised.
1939 retry_attempts -= 1
1940 pass
1941 else:
1942 raise e
1944 def _send_cluster_commands(
1945 self, stack, raise_on_error=True, allow_redirections=True
1946 ):
1947 """
1948 Send a bunch of cluster commands to the redis cluster.
1950 `allow_redirections` If the pipeline should follow
1951 `ASK` & `MOVED` responses automatically. If set
1952 to false it will raise RedisClusterException.
1953 """
1954 # the first time sending the commands we send all of
1955 # the commands that were queued up.
1956 # if we have to run through it again, we only retry
1957 # the commands that failed.
1958 attempt = sorted(stack, key=lambda x: x.position)
1959 is_default_node = False
1960 # build a list of node objects based on node names we need to
1961 nodes = {}
1963 # as we move through each command that still needs to be processed,
1964 # we figure out the slot number that command maps to, then from
1965 # the slot determine the node.
1966 for c in attempt:
1967 while True:
1968 # refer to our internal node -> slot table that
1969 # tells us where a given command should route to.
1970 # (it might be possible we have a cached node that no longer
1971 # exists in the cluster, which is why we do this in a loop)
1972 passed_targets = c.options.pop("target_nodes", None)
1973 if passed_targets and not self._is_nodes_flag(passed_targets):
1974 target_nodes = self._parse_target_nodes(passed_targets)
1975 else:
1976 target_nodes = self._determine_nodes(
1977 *c.args, node_flag=passed_targets
1978 )
1979 if not target_nodes:
1980 raise RedisClusterException(
1981 f"No targets were found to execute {c.args} command on"
1982 )
1983 if len(target_nodes) > 1:
1984 raise RedisClusterException(
1985 f"Too many targets for command {c.args}"
1986 )
1988 node = target_nodes[0]
1989 if node == self.get_default_node():
1990 is_default_node = True
1992 # now that we know the name of the node
1993 # ( it's just a string in the form of host:port )
1994 # we can build a list of commands for each node.
1995 node_name = node.name
1996 if node_name not in nodes:
1997 redis_node = self.get_redis_connection(node)
1998 try:
1999 connection = get_connection(redis_node, c.args)
2000 except ConnectionError:
2001 # Connection retries are being handled in the node's
2002 # Retry object. Reinitialize the node -> slot table.
2003 self.nodes_manager.initialize()
2004 if is_default_node:
2005 self.replace_default_node()
2006 raise
2007 nodes[node_name] = NodeCommands(
2008 redis_node.parse_response,
2009 redis_node.connection_pool,
2010 connection,
2011 )
2012 nodes[node_name].append(c)
2013 break
2015 # send the commands in sequence.
2016 # we write to all the open sockets for each node first,
2017 # before reading anything
2018 # this allows us to flush all the requests out across the
2019 # network essentially in parallel
2020 # so that we can read them all in parallel as they come back.
2021 # we dont' multiplex on the sockets as they come available,
2022 # but that shouldn't make too much difference.
2023 node_commands = nodes.values()
2024 for n in node_commands:
2025 n.write()
2027 for n in node_commands:
2028 n.read()
2030 # release all of the redis connections we allocated earlier
2031 # back into the connection pool.
2032 # we used to do this step as part of a try/finally block,
2033 # but it is really dangerous to
2034 # release connections back into the pool if for some
2035 # reason the socket has data still left in it
2036 # from a previous operation. The write and
2037 # read operations already have try/catch around them for
2038 # all known types of errors including connection
2039 # and socket level errors.
2040 # So if we hit an exception, something really bad
2041 # happened and putting any oF
2042 # these connections back into the pool is a very bad idea.
2043 # the socket might have unread buffer still sitting in it,
2044 # and then the next time we read from it we pass the
2045 # buffered result back from a previous command and
2046 # every single request after to that connection will always get
2047 # a mismatched result.
2048 for n in nodes.values():
2049 n.connection_pool.release(n.connection)
2051 # if the response isn't an exception it is a
2052 # valid response from the node
2053 # we're all done with that command, YAY!
2054 # if we have more commands to attempt, we've run into problems.
2055 # collect all the commands we are allowed to retry.
2056 # (MOVED, ASK, or connection errors or timeout errors)
2057 attempt = sorted(
2058 (
2059 c
2060 for c in attempt
2061 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2062 ),
2063 key=lambda x: x.position,
2064 )
2065 if attempt and allow_redirections:
2066 # RETRY MAGIC HAPPENS HERE!
2067 # send these remaing commands one at a time using `execute_command`
2068 # in the main client. This keeps our retry logic
2069 # in one place mostly,
2070 # and allows us to be more confident in correctness of behavior.
2071 # at this point any speed gains from pipelining have been lost
2072 # anyway, so we might as well make the best
2073 # attempt to get the correct behavior.
2074 #
2075 # The client command will handle retries for each
2076 # individual command sequentially as we pass each
2077 # one into `execute_command`. Any exceptions
2078 # that bubble out should only appear once all
2079 # retries have been exhausted.
2080 #
2081 # If a lot of commands have failed, we'll be setting the
2082 # flag to rebuild the slots table from scratch.
2083 # So MOVED errors should correct themselves fairly quickly.
2084 self.reinitialize_counter += 1
2085 if self._should_reinitialized():
2086 self.nodes_manager.initialize()
2087 if is_default_node:
2088 self.replace_default_node()
2089 for c in attempt:
2090 try:
2091 # send each command individually like we
2092 # do in the main client.
2093 c.result = super().execute_command(*c.args, **c.options)
2094 except RedisError as e:
2095 c.result = e
2097 # turn the response back into a simple flat array that corresponds
2098 # to the sequence of commands issued in the stack in pipeline.execute()
2099 response = []
2100 for c in sorted(stack, key=lambda x: x.position):
2101 if c.args[0] in self.cluster_response_callbacks:
2102 c.result = self.cluster_response_callbacks[c.args[0]](
2103 c.result, **c.options
2104 )
2105 response.append(c.result)
2107 if raise_on_error:
2108 self.raise_first_error(stack)
2110 return response
2112 def _fail_on_redirect(self, allow_redirections):
2113 """ """
2114 if not allow_redirections:
2115 raise RedisClusterException(
2116 "ASK & MOVED redirection not allowed in this pipeline"
2117 )
2119 def exists(self, *keys):
2120 return self.execute_command("EXISTS", *keys)
2122 def eval(self):
2123 """ """
2124 raise RedisClusterException("method eval() is not implemented")
2126 def multi(self):
2127 """ """
2128 raise RedisClusterException("method multi() is not implemented")
2130 def immediate_execute_command(self, *args, **options):
2131 """ """
2132 raise RedisClusterException(
2133 "method immediate_execute_command() is not implemented"
2134 )
2136 def _execute_transaction(self, *args, **kwargs):
2137 """ """
2138 raise RedisClusterException("method _execute_transaction() is not implemented")
2140 def load_scripts(self):
2141 """ """
2142 raise RedisClusterException("method load_scripts() is not implemented")
2144 def watch(self, *names):
2145 """ """
2146 raise RedisClusterException("method watch() is not implemented")
2148 def unwatch(self):
2149 """ """
2150 raise RedisClusterException("method unwatch() is not implemented")
2152 def script_load_for_pipeline(self, *args, **kwargs):
2153 """ """
2154 raise RedisClusterException(
2155 "method script_load_for_pipeline() is not implemented"
2156 )
2158 def delete(self, *names):
2159 """
2160 "Delete a key specified by ``names``"
2161 """
2162 if len(names) != 1:
2163 raise RedisClusterException(
2164 "deleting multiple keys is not implemented in pipeline command"
2165 )
2167 return self.execute_command("DEL", names[0])
2169 def unlink(self, *names):
2170 """
2171 "Unlink a key specified by ``names``"
2172 """
2173 if len(names) != 1:
2174 raise RedisClusterException(
2175 "unlinking multiple keys is not implemented in pipeline command"
2176 )
2178 return self.execute_command("UNLINK", names[0])
2181def block_pipeline_command(name: str) -> Callable[..., Any]:
2182 """
2183 Prints error because some pipelined commands should
2184 be blocked when running in cluster-mode
2185 """
2187 def inner(*args, **kwargs):
2188 raise RedisClusterException(
2189 f"ERROR: Calling pipelined function {name} is blocked "
2190 f"when running redis in cluster mode..."
2191 )
2193 return inner
2196# Blocked pipeline commands
2197PIPELINE_BLOCKED_COMMANDS = (
2198 "BGREWRITEAOF",
2199 "BGSAVE",
2200 "BITOP",
2201 "BRPOPLPUSH",
2202 "CLIENT GETNAME",
2203 "CLIENT KILL",
2204 "CLIENT LIST",
2205 "CLIENT SETNAME",
2206 "CLIENT",
2207 "CONFIG GET",
2208 "CONFIG RESETSTAT",
2209 "CONFIG REWRITE",
2210 "CONFIG SET",
2211 "CONFIG",
2212 "DBSIZE",
2213 "ECHO",
2214 "EVALSHA",
2215 "FLUSHALL",
2216 "FLUSHDB",
2217 "INFO",
2218 "KEYS",
2219 "LASTSAVE",
2220 "MGET",
2221 "MGET NONATOMIC",
2222 "MOVE",
2223 "MSET",
2224 "MSET NONATOMIC",
2225 "MSETNX",
2226 "PFCOUNT",
2227 "PFMERGE",
2228 "PING",
2229 "PUBLISH",
2230 "RANDOMKEY",
2231 "READONLY",
2232 "READWRITE",
2233 "RENAME",
2234 "RENAMENX",
2235 "RPOPLPUSH",
2236 "SAVE",
2237 "SCAN",
2238 "SCRIPT EXISTS",
2239 "SCRIPT FLUSH",
2240 "SCRIPT KILL",
2241 "SCRIPT LOAD",
2242 "SCRIPT",
2243 "SDIFF",
2244 "SDIFFSTORE",
2245 "SENTINEL GET MASTER ADDR BY NAME",
2246 "SENTINEL MASTER",
2247 "SENTINEL MASTERS",
2248 "SENTINEL MONITOR",
2249 "SENTINEL REMOVE",
2250 "SENTINEL SENTINELS",
2251 "SENTINEL SET",
2252 "SENTINEL SLAVES",
2253 "SENTINEL",
2254 "SHUTDOWN",
2255 "SINTER",
2256 "SINTERSTORE",
2257 "SLAVEOF",
2258 "SLOWLOG GET",
2259 "SLOWLOG LEN",
2260 "SLOWLOG RESET",
2261 "SLOWLOG",
2262 "SMOVE",
2263 "SORT",
2264 "SUNION",
2265 "SUNIONSTORE",
2266 "TIME",
2267)
2268for command in PIPELINE_BLOCKED_COMMANDS:
2269 command = command.replace(" ", "_").lower()
2271 setattr(ClusterPipeline, command, block_pipeline_command(command))
2274class PipelineCommand:
2275 """ """
2277 def __init__(self, args, options=None, position=None):
2278 self.args = args
2279 if options is None:
2280 options = {}
2281 self.options = options
2282 self.position = position
2283 self.result = None
2284 self.node = None
2285 self.asking = False
2288class NodeCommands:
2289 """ """
2291 def __init__(self, parse_response, connection_pool, connection):
2292 """ """
2293 self.parse_response = parse_response
2294 self.connection_pool = connection_pool
2295 self.connection = connection
2296 self.commands = []
2298 def append(self, c):
2299 """ """
2300 self.commands.append(c)
2302 def write(self):
2303 """
2304 Code borrowed from Redis so it can be fixed
2305 """
2306 connection = self.connection
2307 commands = self.commands
2309 # We are going to clobber the commands with the write, so go ahead
2310 # and ensure that nothing is sitting there from a previous run.
2311 for c in commands:
2312 c.result = None
2314 # build up all commands into a single request to increase network perf
2315 # send all the commands and catch connection and timeout errors.
2316 try:
2317 connection.send_packed_command(
2318 connection.pack_commands([c.args for c in commands])
2319 )
2320 except (ConnectionError, TimeoutError) as e:
2321 for c in commands:
2322 c.result = e
2324 def read(self):
2325 """ """
2326 connection = self.connection
2327 for c in self.commands:
2329 # if there is a result on this command,
2330 # it means we ran into an exception
2331 # like a connection error. Trying to parse
2332 # a response on a connection that
2333 # is no longer open will result in a
2334 # connection error raised by redis-py.
2335 # but redis-py doesn't check in parse_response
2336 # that the sock object is
2337 # still set and if you try to
2338 # read from a closed connection, it will
2339 # result in an AttributeError because
2340 # it will do a readline() call on None.
2341 # This can have all kinds of nasty side-effects.
2342 # Treating this case as a connection error
2343 # is fine because it will dump
2344 # the connection object back into the
2345 # pool and on the next write, it will
2346 # explicitly open the connection and all will be well.
2347 if c.result is None:
2348 try:
2349 c.result = self.parse_response(connection, c.args[0], **c.options)
2350 except (ConnectionError, TimeoutError) as e:
2351 for c in self.commands:
2352 c.result = e
2353 return
2354 except RedisError:
2355 c.result = sys.exc_info()[1]