Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/cluster.py: 19%
833 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:09 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:09 +0000
1import random
2import socket
3import sys
4import threading
5import time
6from collections import OrderedDict
7from typing import Any, Callable, Dict, List, Optional, Tuple, Union
9from redis.backoff import default_backoff
10from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan
11from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands
12from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url
13from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
14from redis.exceptions import (
15 AskError,
16 AuthenticationError,
17 ClusterCrossSlotError,
18 ClusterDownError,
19 ClusterError,
20 ConnectionError,
21 DataError,
22 MasterDownError,
23 MovedError,
24 RedisClusterException,
25 RedisError,
26 ResponseError,
27 SlotNotCoveredError,
28 TimeoutError,
29 TryAgainError,
30)
31from redis.lock import Lock
32from redis.retry import Retry
33from redis.utils import (
34 dict_merge,
35 list_keys_to_dict,
36 merge_result,
37 safe_str,
38 str_if_bytes,
39)
42def get_node_name(host: str, port: Union[str, int]) -> str:
43 return f"{host}:{port}"
46def get_connection(redis_node, *args, **options):
47 return redis_node.connection or redis_node.connection_pool.get_connection(
48 args[0], **options
49 )
52def parse_scan_result(command, res, **options):
53 cursors = {}
54 ret = []
55 for node_name, response in res.items():
56 cursor, r = parse_scan(response, **options)
57 cursors[node_name] = cursor
58 ret += r
60 return cursors, ret
63def parse_pubsub_numsub(command, res, **options):
64 numsub_d = OrderedDict()
65 for numsub_tups in res.values():
66 for channel, numsubbed in numsub_tups:
67 try:
68 numsub_d[channel] += numsubbed
69 except KeyError:
70 numsub_d[channel] = numsubbed
72 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
73 return ret_numsub
76def parse_cluster_slots(
77 resp: Any, **options: Any
78) -> Dict[Tuple[int, int], Dict[str, Any]]:
79 current_host = options.get("current_host", "")
81 def fix_server(*args: Any) -> Tuple[str, Any]:
82 return str_if_bytes(args[0]) or current_host, args[1]
84 slots = {}
85 for slot in resp:
86 start, end, primary = slot[:3]
87 replicas = slot[3:]
88 slots[start, end] = {
89 "primary": fix_server(*primary),
90 "replicas": [fix_server(*replica) for replica in replicas],
91 }
93 return slots
96def parse_cluster_shards(resp, **options):
97 """
98 Parse CLUSTER SHARDS response.
99 """
100 shards = []
101 for x in resp:
102 shard = {"slots": [], "nodes": []}
103 for i in range(0, len(x[1]), 2):
104 shard["slots"].append((x[1][i], (x[1][i + 1])))
105 nodes = x[3]
106 for node in nodes:
107 dict_node = {}
108 for i in range(0, len(node), 2):
109 dict_node[node[i]] = node[i + 1]
110 shard["nodes"].append(dict_node)
111 shards.append(shard)
113 return shards
116PRIMARY = "primary"
117REPLICA = "replica"
118SLOT_ID = "slot-id"
120REDIS_ALLOWED_KEYS = (
121 "charset",
122 "connection_class",
123 "connection_pool",
124 "connection_pool_class",
125 "client_name",
126 "credential_provider",
127 "db",
128 "decode_responses",
129 "encoding",
130 "encoding_errors",
131 "errors",
132 "host",
133 "max_connections",
134 "nodes_flag",
135 "redis_connect_func",
136 "password",
137 "port",
138 "queue_class",
139 "retry",
140 "retry_on_timeout",
141 "socket_connect_timeout",
142 "socket_keepalive",
143 "socket_keepalive_options",
144 "socket_timeout",
145 "ssl",
146 "ssl_ca_certs",
147 "ssl_ca_data",
148 "ssl_certfile",
149 "ssl_cert_reqs",
150 "ssl_keyfile",
151 "ssl_password",
152 "unix_socket_path",
153 "username",
154)
155KWARGS_DISABLED_KEYS = ("host", "port")
158def cleanup_kwargs(**kwargs):
159 """
160 Remove unsupported or disabled keys from kwargs
161 """
162 connection_kwargs = {
163 k: v
164 for k, v in kwargs.items()
165 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
166 }
168 return connection_kwargs
171class ClusterParser(DefaultParser):
172 EXCEPTION_CLASSES = dict_merge(
173 DefaultParser.EXCEPTION_CLASSES,
174 {
175 "ASK": AskError,
176 "TRYAGAIN": TryAgainError,
177 "MOVED": MovedError,
178 "CLUSTERDOWN": ClusterDownError,
179 "CROSSSLOT": ClusterCrossSlotError,
180 "MASTERDOWN": MasterDownError,
181 },
182 )
185class AbstractRedisCluster:
186 RedisClusterRequestTTL = 16
188 PRIMARIES = "primaries"
189 REPLICAS = "replicas"
190 ALL_NODES = "all"
191 RANDOM = "random"
192 DEFAULT_NODE = "default-node"
194 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
196 COMMAND_FLAGS = dict_merge(
197 list_keys_to_dict(
198 [
199 "ACL CAT",
200 "ACL DELUSER",
201 "ACL DRYRUN",
202 "ACL GENPASS",
203 "ACL GETUSER",
204 "ACL HELP",
205 "ACL LIST",
206 "ACL LOG",
207 "ACL LOAD",
208 "ACL SAVE",
209 "ACL SETUSER",
210 "ACL USERS",
211 "ACL WHOAMI",
212 "AUTH",
213 "CLIENT LIST",
214 "CLIENT SETNAME",
215 "CLIENT GETNAME",
216 "CONFIG SET",
217 "CONFIG REWRITE",
218 "CONFIG RESETSTAT",
219 "TIME",
220 "PUBSUB CHANNELS",
221 "PUBSUB NUMPAT",
222 "PUBSUB NUMSUB",
223 "PING",
224 "INFO",
225 "SHUTDOWN",
226 "KEYS",
227 "DBSIZE",
228 "BGSAVE",
229 "SLOWLOG GET",
230 "SLOWLOG LEN",
231 "SLOWLOG RESET",
232 "WAIT",
233 "SAVE",
234 "MEMORY PURGE",
235 "MEMORY MALLOC-STATS",
236 "MEMORY STATS",
237 "LASTSAVE",
238 "CLIENT TRACKINGINFO",
239 "CLIENT PAUSE",
240 "CLIENT UNPAUSE",
241 "CLIENT UNBLOCK",
242 "CLIENT ID",
243 "CLIENT REPLY",
244 "CLIENT GETREDIR",
245 "CLIENT INFO",
246 "CLIENT KILL",
247 "READONLY",
248 "READWRITE",
249 "CLUSTER INFO",
250 "CLUSTER MEET",
251 "CLUSTER NODES",
252 "CLUSTER REPLICAS",
253 "CLUSTER RESET",
254 "CLUSTER SET-CONFIG-EPOCH",
255 "CLUSTER SLOTS",
256 "CLUSTER SHARDS",
257 "CLUSTER COUNT-FAILURE-REPORTS",
258 "CLUSTER KEYSLOT",
259 "COMMAND",
260 "COMMAND COUNT",
261 "COMMAND LIST",
262 "COMMAND GETKEYS",
263 "CONFIG GET",
264 "DEBUG",
265 "RANDOMKEY",
266 "READONLY",
267 "READWRITE",
268 "TIME",
269 "GRAPH.CONFIG",
270 "LATENCY HISTORY",
271 "LATENCY LATEST",
272 "LATENCY RESET",
273 ],
274 DEFAULT_NODE,
275 ),
276 list_keys_to_dict(
277 [
278 "FLUSHALL",
279 "FLUSHDB",
280 "FUNCTION DELETE",
281 "FUNCTION FLUSH",
282 "FUNCTION LIST",
283 "FUNCTION LOAD",
284 "FUNCTION RESTORE",
285 "SCAN",
286 "SCRIPT EXISTS",
287 "SCRIPT FLUSH",
288 "SCRIPT LOAD",
289 ],
290 PRIMARIES,
291 ),
292 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
293 list_keys_to_dict(
294 [
295 "CLUSTER COUNTKEYSINSLOT",
296 "CLUSTER DELSLOTS",
297 "CLUSTER DELSLOTSRANGE",
298 "CLUSTER GETKEYSINSLOT",
299 "CLUSTER SETSLOT",
300 ],
301 SLOT_ID,
302 ),
303 )
305 SEARCH_COMMANDS = (
306 [
307 "FT.CREATE",
308 "FT.SEARCH",
309 "FT.AGGREGATE",
310 "FT.EXPLAIN",
311 "FT.EXPLAINCLI",
312 "FT,PROFILE",
313 "FT.ALTER",
314 "FT.DROPINDEX",
315 "FT.ALIASADD",
316 "FT.ALIASUPDATE",
317 "FT.ALIASDEL",
318 "FT.TAGVALS",
319 "FT.SUGADD",
320 "FT.SUGGET",
321 "FT.SUGDEL",
322 "FT.SUGLEN",
323 "FT.SYNUPDATE",
324 "FT.SYNDUMP",
325 "FT.SPELLCHECK",
326 "FT.DICTADD",
327 "FT.DICTDEL",
328 "FT.DICTDUMP",
329 "FT.INFO",
330 "FT._LIST",
331 "FT.CONFIG",
332 "FT.ADD",
333 "FT.DEL",
334 "FT.DROP",
335 "FT.GET",
336 "FT.MGET",
337 "FT.SYNADD",
338 ],
339 )
341 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
342 "CLUSTER SLOTS": parse_cluster_slots,
343 "CLUSTER SHARDS": parse_cluster_shards,
344 }
346 RESULT_CALLBACKS = dict_merge(
347 list_keys_to_dict(["PUBSUB NUMSUB"], parse_pubsub_numsub),
348 list_keys_to_dict(
349 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
350 ),
351 list_keys_to_dict(["KEYS", "PUBSUB CHANNELS"], merge_result),
352 list_keys_to_dict(
353 [
354 "PING",
355 "CONFIG SET",
356 "CONFIG REWRITE",
357 "CONFIG RESETSTAT",
358 "CLIENT SETNAME",
359 "BGSAVE",
360 "SLOWLOG RESET",
361 "SAVE",
362 "MEMORY PURGE",
363 "CLIENT PAUSE",
364 "CLIENT UNPAUSE",
365 ],
366 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
367 ),
368 list_keys_to_dict(
369 ["DBSIZE", "WAIT"],
370 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
371 ),
372 list_keys_to_dict(
373 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
374 ),
375 list_keys_to_dict(["SCAN"], parse_scan_result),
376 list_keys_to_dict(
377 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
378 ),
379 list_keys_to_dict(
380 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
381 ),
382 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
383 )
385 ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError)
387 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
388 """Replace the default cluster node.
389 A random cluster node will be chosen if target_node isn't passed, and primaries
390 will be prioritized. The default node will not be changed if there are no other
391 nodes in the cluster.
393 Args:
394 target_node (ClusterNode, optional): Target node to replace the default
395 node. Defaults to None.
396 """
397 if target_node:
398 self.nodes_manager.default_node = target_node
399 else:
400 curr_node = self.get_default_node()
401 primaries = [node for node in self.get_primaries() if node != curr_node]
402 if primaries:
403 # Choose a primary if the cluster contains different primaries
404 self.nodes_manager.default_node = random.choice(primaries)
405 else:
406 # Otherwise, hoose a primary if the cluster contains different primaries
407 replicas = [node for node in self.get_replicas() if node != curr_node]
408 if replicas:
409 self.nodes_manager.default_node = random.choice(replicas)
412class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
413 @classmethod
414 def from_url(cls, url, **kwargs):
415 """
416 Return a Redis client object configured from the given URL
418 For example::
420 redis://[[username]:[password]]@localhost:6379/0
421 rediss://[[username]:[password]]@localhost:6379/0
422 unix://[username@]/path/to/socket.sock?db=0[&password=password]
424 Three URL schemes are supported:
426 - `redis://` creates a TCP socket connection. See more at:
427 <https://www.iana.org/assignments/uri-schemes/prov/redis>
428 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
429 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
430 - ``unix://``: creates a Unix Domain Socket connection.
432 The username, password, hostname, path and all querystring values
433 are passed through urllib.parse.unquote in order to replace any
434 percent-encoded values with their corresponding characters.
436 There are several ways to specify a database number. The first value
437 found will be used:
439 1. A ``db`` querystring option, e.g. redis://localhost?db=0
440 2. If using the redis:// or rediss:// schemes, the path argument
441 of the url, e.g. redis://localhost/0
442 3. A ``db`` keyword argument to this function.
444 If none of these options are specified, the default db=0 is used.
446 All querystring options are cast to their appropriate Python types.
447 Boolean arguments can be specified with string values "True"/"False"
448 or "Yes"/"No". Values that cannot be properly cast cause a
449 ``ValueError`` to be raised. Once parsed, the querystring arguments
450 and keyword arguments are passed to the ``ConnectionPool``'s
451 class initializer. In the case of conflicting arguments, querystring
452 arguments always win.
454 """
455 return cls(url=url, **kwargs)
457 def __init__(
458 self,
459 host: Optional[str] = None,
460 port: int = 6379,
461 startup_nodes: Optional[List["ClusterNode"]] = None,
462 cluster_error_retry_attempts: int = 3,
463 retry: Optional["Retry"] = None,
464 require_full_coverage: bool = False,
465 reinitialize_steps: int = 5,
466 read_from_replicas: bool = False,
467 dynamic_startup_nodes: bool = True,
468 url: Optional[str] = None,
469 **kwargs,
470 ):
471 """
472 Initialize a new RedisCluster client.
474 :param startup_nodes:
475 List of nodes from which initial bootstrapping can be done
476 :param host:
477 Can be used to point to a startup node
478 :param port:
479 Can be used to point to a startup node
480 :param require_full_coverage:
481 When set to False (default value): the client will not require a
482 full coverage of the slots. However, if not all slots are covered,
483 and at least one node has 'cluster-require-full-coverage' set to
484 'yes,' the server will throw a ClusterDownError for some key-based
485 commands. See -
486 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
487 When set to True: all slots must be covered to construct the
488 cluster client. If not all slots are covered, RedisClusterException
489 will be thrown.
490 :param read_from_replicas:
491 Enable read from replicas in READONLY mode. You can read possibly
492 stale data.
493 When set to true, read commands will be assigned between the
494 primary and its replications in a Round-Robin manner.
495 :param dynamic_startup_nodes:
496 Set the RedisCluster's startup nodes to all of the discovered nodes.
497 If true (default value), the cluster's discovered nodes will be used to
498 determine the cluster nodes-slots mapping in the next topology refresh.
499 It will remove the initial passed startup nodes if their endpoints aren't
500 listed in the CLUSTER SLOTS output.
501 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
502 specific IP addresses, it is best to set it to false.
503 :param cluster_error_retry_attempts:
504 Number of times to retry before raising an error when
505 :class:`~.TimeoutError` or :class:`~.ConnectionError` or
506 :class:`~.ClusterDownError` are encountered
507 :param reinitialize_steps:
508 Specifies the number of MOVED errors that need to occur before
509 reinitializing the whole cluster topology. If a MOVED error occurs
510 and the cluster does not need to be reinitialized on this current
511 error handling, only the MOVED slot will be patched with the
512 redirected node.
513 To reinitialize the cluster on every MOVED error, set
514 reinitialize_steps to 1.
515 To avoid reinitializing the cluster on moved errors, set
516 reinitialize_steps to 0.
518 :**kwargs:
519 Extra arguments that will be sent into Redis instance when created
520 (See Official redis-py doc for supported kwargs
521 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
522 Some kwargs are not supported and will raise a
523 RedisClusterException:
524 - db (Redis do not support database SELECT in cluster mode)
525 """
526 if startup_nodes is None:
527 startup_nodes = []
529 if "db" in kwargs:
530 # Argument 'db' is not possible to use in cluster mode
531 raise RedisClusterException(
532 "Argument 'db' is not possible to use in cluster mode"
533 )
535 # Get the startup node/s
536 from_url = False
537 if url is not None:
538 from_url = True
539 url_options = parse_url(url)
540 if "path" in url_options:
541 raise RedisClusterException(
542 "RedisCluster does not currently support Unix Domain "
543 "Socket connections"
544 )
545 if "db" in url_options and url_options["db"] != 0:
546 # Argument 'db' is not possible to use in cluster mode
547 raise RedisClusterException(
548 "A ``db`` querystring option can only be 0 in cluster mode"
549 )
550 kwargs.update(url_options)
551 host = kwargs.get("host")
552 port = kwargs.get("port", port)
553 startup_nodes.append(ClusterNode(host, port))
554 elif host is not None and port is not None:
555 startup_nodes.append(ClusterNode(host, port))
556 elif len(startup_nodes) == 0:
557 # No startup node was provided
558 raise RedisClusterException(
559 "RedisCluster requires at least one node to discover the "
560 "cluster. Please provide one of the followings:\n"
561 "1. host and port, for example:\n"
562 " RedisCluster(host='localhost', port=6379)\n"
563 "2. list of startup nodes, for example:\n"
564 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
565 " ClusterNode('localhost', 6378)])"
566 )
567 # Update the connection arguments
568 # Whenever a new connection is established, RedisCluster's on_connect
569 # method should be run
570 # If the user passed on_connect function we'll save it and run it
571 # inside the RedisCluster.on_connect() function
572 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
573 kwargs.update({"redis_connect_func": self.on_connect})
574 kwargs = cleanup_kwargs(**kwargs)
575 if retry:
576 self.retry = retry
577 kwargs.update({"retry": self.retry})
578 else:
579 kwargs.update({"retry": Retry(default_backoff(), 0)})
581 self.encoder = Encoder(
582 kwargs.get("encoding", "utf-8"),
583 kwargs.get("encoding_errors", "strict"),
584 kwargs.get("decode_responses", False),
585 )
586 self.cluster_error_retry_attempts = cluster_error_retry_attempts
587 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
588 self.node_flags = self.__class__.NODE_FLAGS.copy()
589 self.read_from_replicas = read_from_replicas
590 self.reinitialize_counter = 0
591 self.reinitialize_steps = reinitialize_steps
592 self.nodes_manager = NodesManager(
593 startup_nodes=startup_nodes,
594 from_url=from_url,
595 require_full_coverage=require_full_coverage,
596 dynamic_startup_nodes=dynamic_startup_nodes,
597 **kwargs,
598 )
600 self.cluster_response_callbacks = CaseInsensitiveDict(
601 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
602 )
603 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
604 self.commands_parser = CommandsParser(self)
605 self._lock = threading.Lock()
607 def __enter__(self):
608 return self
610 def __exit__(self, exc_type, exc_value, traceback):
611 self.close()
613 def __del__(self):
614 self.close()
616 def disconnect_connection_pools(self):
617 for node in self.get_nodes():
618 if node.redis_connection:
619 try:
620 node.redis_connection.connection_pool.disconnect()
621 except OSError:
622 # Client was already disconnected. do nothing
623 pass
625 def on_connect(self, connection):
626 """
627 Initialize the connection, authenticate and select a database and send
628 READONLY if it is set during object initialization.
629 """
630 connection.set_parser(ClusterParser)
631 connection.on_connect()
633 if self.read_from_replicas:
634 # Sending READONLY command to server to configure connection as
635 # readonly. Since each cluster node may change its server type due
636 # to a failover, we should establish a READONLY connection
637 # regardless of the server type. If this is a primary connection,
638 # READONLY would not affect executing write commands.
639 connection.send_command("READONLY")
640 if str_if_bytes(connection.read_response()) != "OK":
641 raise ConnectionError("READONLY command failed")
643 if self.user_on_connect_func is not None:
644 self.user_on_connect_func(connection)
646 def get_redis_connection(self, node):
647 if not node.redis_connection:
648 with self._lock:
649 if not node.redis_connection:
650 self.nodes_manager.create_redis_connections([node])
651 return node.redis_connection
653 def get_node(self, host=None, port=None, node_name=None):
654 return self.nodes_manager.get_node(host, port, node_name)
656 def get_primaries(self):
657 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
659 def get_replicas(self):
660 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
662 def get_random_node(self):
663 return random.choice(list(self.nodes_manager.nodes_cache.values()))
665 def get_nodes(self):
666 return list(self.nodes_manager.nodes_cache.values())
668 def get_node_from_key(self, key, replica=False):
669 """
670 Get the node that holds the key's slot.
671 If replica set to True but the slot doesn't have any replicas, None is
672 returned.
673 """
674 slot = self.keyslot(key)
675 slot_cache = self.nodes_manager.slots_cache.get(slot)
676 if slot_cache is None or len(slot_cache) == 0:
677 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
678 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
679 return None
680 elif replica:
681 node_idx = 1
682 else:
683 # primary
684 node_idx = 0
686 return slot_cache[node_idx]
688 def get_default_node(self):
689 """
690 Get the cluster's default node
691 """
692 return self.nodes_manager.default_node
694 def set_default_node(self, node):
695 """
696 Set the default node of the cluster.
697 :param node: 'ClusterNode'
698 :return True if the default node was set, else False
699 """
700 if node is None or self.get_node(node_name=node.name) is None:
701 return False
702 self.nodes_manager.default_node = node
703 return True
705 def get_retry(self) -> Optional["Retry"]:
706 return self.retry
708 def set_retry(self, retry: "Retry") -> None:
709 self.retry = retry
710 for node in self.get_nodes():
711 node.redis_connection.set_retry(retry)
713 def monitor(self, target_node=None):
714 """
715 Returns a Monitor object for the specified target node.
716 The default cluster node will be selected if no target node was
717 specified.
718 Monitor is useful for handling the MONITOR command to the redis server.
719 next_command() method returns one command from monitor
720 listen() method yields commands from monitor.
721 """
722 if target_node is None:
723 target_node = self.get_default_node()
724 if target_node.redis_connection is None:
725 raise RedisClusterException(
726 f"Cluster Node {target_node.name} has no redis_connection"
727 )
728 return target_node.redis_connection.monitor()
730 def pubsub(self, node=None, host=None, port=None, **kwargs):
731 """
732 Allows passing a ClusterNode, or host&port, to get a pubsub instance
733 connected to the specified node
734 """
735 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
737 def pipeline(self, transaction=None, shard_hint=None):
738 """
739 Cluster impl:
740 Pipelines do not work in cluster mode the same way they
741 do in normal mode. Create a clone of this object so
742 that simulating pipelines will work correctly. Each
743 command will be called directly when used and
744 when calling execute() will only return the result stack.
745 """
746 if shard_hint:
747 raise RedisClusterException("shard_hint is deprecated in cluster mode")
749 if transaction:
750 raise RedisClusterException("transaction is deprecated in cluster mode")
752 return ClusterPipeline(
753 nodes_manager=self.nodes_manager,
754 commands_parser=self.commands_parser,
755 startup_nodes=self.nodes_manager.startup_nodes,
756 result_callbacks=self.result_callbacks,
757 cluster_response_callbacks=self.cluster_response_callbacks,
758 cluster_error_retry_attempts=self.cluster_error_retry_attempts,
759 read_from_replicas=self.read_from_replicas,
760 reinitialize_steps=self.reinitialize_steps,
761 lock=self._lock,
762 )
764 def lock(
765 self,
766 name,
767 timeout=None,
768 sleep=0.1,
769 blocking=True,
770 blocking_timeout=None,
771 lock_class=None,
772 thread_local=True,
773 ):
774 """
775 Return a new Lock object using key ``name`` that mimics
776 the behavior of threading.Lock.
778 If specified, ``timeout`` indicates a maximum life for the lock.
779 By default, it will remain locked until release() is called.
781 ``sleep`` indicates the amount of time to sleep per loop iteration
782 when the lock is in blocking mode and another client is currently
783 holding the lock.
785 ``blocking`` indicates whether calling ``acquire`` should block until
786 the lock has been acquired or to fail immediately, causing ``acquire``
787 to return False and the lock not being acquired. Defaults to True.
788 Note this value can be overridden by passing a ``blocking``
789 argument to ``acquire``.
791 ``blocking_timeout`` indicates the maximum amount of time in seconds to
792 spend trying to acquire the lock. A value of ``None`` indicates
793 continue trying forever. ``blocking_timeout`` can be specified as a
794 float or integer, both representing the number of seconds to wait.
796 ``lock_class`` forces the specified lock implementation. Note that as
797 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
798 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
799 you have created your own custom lock class.
801 ``thread_local`` indicates whether the lock token is placed in
802 thread-local storage. By default, the token is placed in thread local
803 storage so that a thread only sees its token, not a token set by
804 another thread. Consider the following timeline:
806 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
807 thread-1 sets the token to "abc"
808 time: 1, thread-2 blocks trying to acquire `my-lock` using the
809 Lock instance.
810 time: 5, thread-1 has not yet completed. redis expires the lock
811 key.
812 time: 5, thread-2 acquired `my-lock` now that it's available.
813 thread-2 sets the token to "xyz"
814 time: 6, thread-1 finishes its work and calls release(). if the
815 token is *not* stored in thread local storage, then
816 thread-1 would see the token value as "xyz" and would be
817 able to successfully release the thread-2's lock.
819 In some use cases it's necessary to disable thread local storage. For
820 example, if you have code where one thread acquires a lock and passes
821 that lock instance to a worker thread to release later. If thread
822 local storage isn't disabled in this case, the worker thread won't see
823 the token set by the thread that acquired the lock. Our assumption
824 is that these cases aren't common and as such default to using
825 thread local storage."""
826 if lock_class is None:
827 lock_class = Lock
828 return lock_class(
829 self,
830 name,
831 timeout=timeout,
832 sleep=sleep,
833 blocking=blocking,
834 blocking_timeout=blocking_timeout,
835 thread_local=thread_local,
836 )
838 def set_response_callback(self, command, callback):
839 """Set a custom Response Callback"""
840 self.cluster_response_callbacks[command] = callback
842 def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
843 # Determine which nodes should be executed the command on.
844 # Returns a list of target nodes.
845 command = args[0].upper()
846 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
847 command = f"{args[0]} {args[1]}".upper()
849 nodes_flag = kwargs.pop("nodes_flag", None)
850 if nodes_flag is not None:
851 # nodes flag passed by the user
852 command_flag = nodes_flag
853 else:
854 # get the nodes group for this command if it was predefined
855 command_flag = self.command_flags.get(command)
856 if command_flag == self.__class__.RANDOM:
857 # return a random node
858 return [self.get_random_node()]
859 elif command_flag == self.__class__.PRIMARIES:
860 # return all primaries
861 return self.get_primaries()
862 elif command_flag == self.__class__.REPLICAS:
863 # return all replicas
864 return self.get_replicas()
865 elif command_flag == self.__class__.ALL_NODES:
866 # return all nodes
867 return self.get_nodes()
868 elif command_flag == self.__class__.DEFAULT_NODE:
869 # return the cluster's default node
870 return [self.nodes_manager.default_node]
871 elif command in self.__class__.SEARCH_COMMANDS[0]:
872 return [self.nodes_manager.default_node]
873 else:
874 # get the node that holds the key's slot
875 slot = self.determine_slot(*args)
876 node = self.nodes_manager.get_node_from_slot(
877 slot, self.read_from_replicas and command in READ_COMMANDS
878 )
879 return [node]
881 def _should_reinitialized(self):
882 # To reinitialize the cluster on every MOVED error,
883 # set reinitialize_steps to 1.
884 # To avoid reinitializing the cluster on moved errors, set
885 # reinitialize_steps to 0.
886 if self.reinitialize_steps == 0:
887 return False
888 else:
889 return self.reinitialize_counter % self.reinitialize_steps == 0
891 def keyslot(self, key):
892 """
893 Calculate keyslot for a given key.
894 See Keys distribution model in https://redis.io/topics/cluster-spec
895 """
896 k = self.encoder.encode(key)
897 return key_slot(k)
899 def _get_command_keys(self, *args):
900 """
901 Get the keys in the command. If the command has no keys in in, None is
902 returned.
904 NOTE: Due to a bug in redis<7.0, this function does not work properly
905 for EVAL or EVALSHA when the `numkeys` arg is 0.
906 - issue: https://github.com/redis/redis/issues/9493
907 - fix: https://github.com/redis/redis/pull/9733
909 So, don't use this function with EVAL or EVALSHA.
910 """
911 redis_conn = self.get_default_node().redis_connection
912 return self.commands_parser.get_keys(redis_conn, *args)
914 def determine_slot(self, *args):
915 """
916 Figure out what slot to use based on args.
918 Raises a RedisClusterException if there's a missing key and we can't
919 determine what slots to map the command to; or, if the keys don't
920 all map to the same key slot.
921 """
922 command = args[0]
923 if self.command_flags.get(command) == SLOT_ID:
924 # The command contains the slot ID
925 return args[1]
927 # Get the keys in the command
929 # EVAL and EVALSHA are common enough that it's wasteful to go to the
930 # redis server to parse the keys. Besides, there is a bug in redis<7.0
931 # where `self._get_command_keys()` fails anyway. So, we special case
932 # EVAL/EVALSHA.
933 if command in ("EVAL", "EVALSHA"):
934 # command syntax: EVAL "script body" num_keys ...
935 if len(args) <= 2:
936 raise RedisClusterException(f"Invalid args in command: {args}")
937 num_actual_keys = args[2]
938 eval_keys = args[3 : 3 + num_actual_keys]
939 # if there are 0 keys, that means the script can be run on any node
940 # so we can just return a random slot
941 if len(eval_keys) == 0:
942 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
943 keys = eval_keys
944 else:
945 keys = self._get_command_keys(*args)
946 if keys is None or len(keys) == 0:
947 # FCALL can call a function with 0 keys, that means the function
948 # can be run on any node so we can just return a random slot
949 if command in ("FCALL", "FCALL_RO"):
950 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
951 raise RedisClusterException(
952 "No way to dispatch this command to Redis Cluster. "
953 "Missing key.\nYou can execute the command by specifying "
954 f"target nodes.\nCommand: {args}"
955 )
957 # single key command
958 if len(keys) == 1:
959 return self.keyslot(keys[0])
961 # multi-key command; we need to make sure all keys are mapped to
962 # the same slot
963 slots = {self.keyslot(key) for key in keys}
964 if len(slots) != 1:
965 raise RedisClusterException(
966 f"{command} - all keys must map to the same key slot"
967 )
969 return slots.pop()
971 def get_encoder(self):
972 """
973 Get the connections' encoder
974 """
975 return self.encoder
977 def get_connection_kwargs(self):
978 """
979 Get the connections' key-word arguments
980 """
981 return self.nodes_manager.connection_kwargs
983 def _is_nodes_flag(self, target_nodes):
984 return isinstance(target_nodes, str) and target_nodes in self.node_flags
986 def _parse_target_nodes(self, target_nodes):
987 if isinstance(target_nodes, list):
988 nodes = target_nodes
989 elif isinstance(target_nodes, ClusterNode):
990 # Supports passing a single ClusterNode as a variable
991 nodes = [target_nodes]
992 elif isinstance(target_nodes, dict):
993 # Supports dictionaries of the format {node_name: node}.
994 # It enables to execute commands with multi nodes as follows:
995 # rc.cluster_save_config(rc.get_primaries())
996 nodes = target_nodes.values()
997 else:
998 raise TypeError(
999 "target_nodes type can be one of the following: "
1000 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1001 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1002 f"The passed type is {type(target_nodes)}"
1003 )
1004 return nodes
1006 def execute_command(self, *args, **kwargs):
1007 """
1008 Wrapper for ERRORS_ALLOW_RETRY error handling.
1010 It will try the number of times specified by the config option
1011 "self.cluster_error_retry_attempts" which defaults to 3 unless manually
1012 configured.
1014 If it reaches the number of times, the command will raise the exception
1016 Key argument :target_nodes: can be passed with the following types:
1017 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1018 ClusterNode
1019 list<ClusterNode>
1020 dict<Any, ClusterNode>
1021 """
1022 target_nodes_specified = False
1023 is_default_node = False
1024 target_nodes = None
1025 passed_targets = kwargs.pop("target_nodes", None)
1026 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1027 target_nodes = self._parse_target_nodes(passed_targets)
1028 target_nodes_specified = True
1029 # If an error that allows retrying was thrown, the nodes and slots
1030 # cache were reinitialized. We will retry executing the command with
1031 # the updated cluster setup only when the target nodes can be
1032 # determined again with the new cache tables. Therefore, when target
1033 # nodes were passed to this function, we cannot retry the command
1034 # execution since the nodes may not be valid anymore after the tables
1035 # were reinitialized. So in case of passed target nodes,
1036 # retry_attempts will be set to 0.
1037 retry_attempts = (
1038 0 if target_nodes_specified else self.cluster_error_retry_attempts
1039 )
1040 # Add one for the first execution
1041 execute_attempts = 1 + retry_attempts
1042 for _ in range(execute_attempts):
1043 try:
1044 res = {}
1045 if not target_nodes_specified:
1046 # Determine the nodes to execute the command on
1047 target_nodes = self._determine_nodes(
1048 *args, **kwargs, nodes_flag=passed_targets
1049 )
1050 if not target_nodes:
1051 raise RedisClusterException(
1052 f"No targets were found to execute {args} command on"
1053 )
1054 if (
1055 len(target_nodes) == 1
1056 and target_nodes[0] == self.get_default_node()
1057 ):
1058 is_default_node = True
1059 for node in target_nodes:
1060 res[node.name] = self._execute_command(node, *args, **kwargs)
1061 # Return the processed result
1062 return self._process_result(args[0], res, **kwargs)
1063 except Exception as e:
1064 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1065 if is_default_node:
1066 # Replace the default cluster node
1067 self.replace_default_node()
1068 # The nodes and slots cache were reinitialized.
1069 # Try again with the new cluster setup.
1070 retry_attempts -= 1
1071 continue
1072 else:
1073 # raise the exception
1074 raise e
1076 def _execute_command(self, target_node, *args, **kwargs):
1077 """
1078 Send a command to a node in the cluster
1079 """
1080 command = args[0]
1081 redis_node = None
1082 connection = None
1083 redirect_addr = None
1084 asking = False
1085 moved = False
1086 ttl = int(self.RedisClusterRequestTTL)
1088 while ttl > 0:
1089 ttl -= 1
1090 try:
1091 if asking:
1092 target_node = self.get_node(node_name=redirect_addr)
1093 elif moved:
1094 # MOVED occurred and the slots cache was updated,
1095 # refresh the target node
1096 slot = self.determine_slot(*args)
1097 target_node = self.nodes_manager.get_node_from_slot(
1098 slot, self.read_from_replicas and command in READ_COMMANDS
1099 )
1100 moved = False
1102 redis_node = self.get_redis_connection(target_node)
1103 connection = get_connection(redis_node, *args, **kwargs)
1104 if asking:
1105 connection.send_command("ASKING")
1106 redis_node.parse_response(connection, "ASKING", **kwargs)
1107 asking = False
1109 connection.send_command(*args)
1110 response = redis_node.parse_response(connection, command, **kwargs)
1111 if command in self.cluster_response_callbacks:
1112 response = self.cluster_response_callbacks[command](
1113 response, **kwargs
1114 )
1115 return response
1116 except AuthenticationError:
1117 raise
1118 except (ConnectionError, TimeoutError) as e:
1119 # Connection retries are being handled in the node's
1120 # Retry object.
1121 # ConnectionError can also be raised if we couldn't get a
1122 # connection from the pool before timing out, so check that
1123 # this is an actual connection before attempting to disconnect.
1124 if connection is not None:
1125 connection.disconnect()
1127 # Remove the failed node from the startup nodes before we try
1128 # to reinitialize the cluster
1129 self.nodes_manager.startup_nodes.pop(target_node.name, None)
1130 # Reset the cluster node's connection
1131 target_node.redis_connection = None
1132 self.nodes_manager.initialize()
1133 raise e
1134 except MovedError as e:
1135 # First, we will try to patch the slots/nodes cache with the
1136 # redirected node output and try again. If MovedError exceeds
1137 # 'reinitialize_steps' number of times, we will force
1138 # reinitializing the tables, and then try again.
1139 # 'reinitialize_steps' counter will increase faster when
1140 # the same client object is shared between multiple threads. To
1141 # reduce the frequency you can set this variable in the
1142 # RedisCluster constructor.
1143 self.reinitialize_counter += 1
1144 if self._should_reinitialized():
1145 self.nodes_manager.initialize()
1146 # Reset the counter
1147 self.reinitialize_counter = 0
1148 else:
1149 self.nodes_manager.update_moved_exception(e)
1150 moved = True
1151 except TryAgainError:
1152 if ttl < self.RedisClusterRequestTTL / 2:
1153 time.sleep(0.05)
1154 except AskError as e:
1155 redirect_addr = get_node_name(host=e.host, port=e.port)
1156 asking = True
1157 except ClusterDownError as e:
1158 # ClusterDownError can occur during a failover and to get
1159 # self-healed, we will try to reinitialize the cluster layout
1160 # and retry executing the command
1161 time.sleep(0.25)
1162 self.nodes_manager.initialize()
1163 raise e
1164 except ResponseError:
1165 raise
1166 except Exception as e:
1167 if connection:
1168 connection.disconnect()
1169 raise e
1170 finally:
1171 if connection is not None:
1172 redis_node.connection_pool.release(connection)
1174 raise ClusterError("TTL exhausted.")
1176 def close(self):
1177 try:
1178 with self._lock:
1179 if self.nodes_manager:
1180 self.nodes_manager.close()
1181 except AttributeError:
1182 # RedisCluster's __init__ can fail before nodes_manager is set
1183 pass
1185 def _process_result(self, command, res, **kwargs):
1186 """
1187 Process the result of the executed command.
1188 The function would return a dict or a single value.
1190 :type command: str
1191 :type res: dict
1193 `res` should be in the following format:
1194 Dict<node_name, command_result>
1195 """
1196 if command in self.result_callbacks:
1197 return self.result_callbacks[command](command, res, **kwargs)
1198 elif len(res) == 1:
1199 # When we execute the command on a single node, we can
1200 # remove the dictionary and return a single response
1201 return list(res.values())[0]
1202 else:
1203 return res
1205 def load_external_module(self, funcname, func):
1206 """
1207 This function can be used to add externally defined redis modules,
1208 and their namespaces to the redis client.
1210 ``funcname`` - A string containing the name of the function to create
1211 ``func`` - The function, being added to this class.
1212 """
1213 setattr(self, funcname, func)
1216class ClusterNode:
1217 def __init__(self, host, port, server_type=None, redis_connection=None):
1218 if host == "localhost":
1219 host = socket.gethostbyname(host)
1221 self.host = host
1222 self.port = port
1223 self.name = get_node_name(host, port)
1224 self.server_type = server_type
1225 self.redis_connection = redis_connection
1227 def __repr__(self):
1228 return (
1229 f"[host={self.host},"
1230 f"port={self.port},"
1231 f"name={self.name},"
1232 f"server_type={self.server_type},"
1233 f"redis_connection={self.redis_connection}]"
1234 )
1236 def __eq__(self, obj):
1237 return isinstance(obj, ClusterNode) and obj.name == self.name
1239 def __del__(self):
1240 if self.redis_connection is not None:
1241 self.redis_connection.close()
1244class LoadBalancer:
1245 """
1246 Round-Robin Load Balancing
1247 """
1249 def __init__(self, start_index: int = 0) -> None:
1250 self.primary_to_idx = {}
1251 self.start_index = start_index
1253 def get_server_index(self, primary: str, list_size: int) -> int:
1254 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1255 # Update the index
1256 self.primary_to_idx[primary] = (server_index + 1) % list_size
1257 return server_index
1259 def reset(self) -> None:
1260 self.primary_to_idx.clear()
1263class NodesManager:
1264 def __init__(
1265 self,
1266 startup_nodes,
1267 from_url=False,
1268 require_full_coverage=False,
1269 lock=None,
1270 dynamic_startup_nodes=True,
1271 connection_pool_class=ConnectionPool,
1272 **kwargs,
1273 ):
1274 self.nodes_cache = {}
1275 self.slots_cache = {}
1276 self.startup_nodes = {}
1277 self.default_node = None
1278 self.populate_startup_nodes(startup_nodes)
1279 self.from_url = from_url
1280 self._require_full_coverage = require_full_coverage
1281 self._dynamic_startup_nodes = dynamic_startup_nodes
1282 self.connection_pool_class = connection_pool_class
1283 self._moved_exception = None
1284 self.connection_kwargs = kwargs
1285 self.read_load_balancer = LoadBalancer()
1286 if lock is None:
1287 lock = threading.Lock()
1288 self._lock = lock
1289 self.initialize()
1291 def get_node(self, host=None, port=None, node_name=None):
1292 """
1293 Get the requested node from the cluster's nodes.
1294 nodes.
1295 :return: ClusterNode if the node exists, else None
1296 """
1297 if host and port:
1298 # the user passed host and port
1299 if host == "localhost":
1300 host = socket.gethostbyname(host)
1301 return self.nodes_cache.get(get_node_name(host=host, port=port))
1302 elif node_name:
1303 return self.nodes_cache.get(node_name)
1304 else:
1305 return None
1307 def update_moved_exception(self, exception):
1308 self._moved_exception = exception
1310 def _update_moved_slots(self):
1311 """
1312 Update the slot's node with the redirected one
1313 """
1314 e = self._moved_exception
1315 redirected_node = self.get_node(host=e.host, port=e.port)
1316 if redirected_node is not None:
1317 # The node already exists
1318 if redirected_node.server_type is not PRIMARY:
1319 # Update the node's server type
1320 redirected_node.server_type = PRIMARY
1321 else:
1322 # This is a new node, we will add it to the nodes cache
1323 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
1324 self.nodes_cache[redirected_node.name] = redirected_node
1325 if redirected_node in self.slots_cache[e.slot_id]:
1326 # The MOVED error resulted from a failover, and the new slot owner
1327 # had previously been a replica.
1328 old_primary = self.slots_cache[e.slot_id][0]
1329 # Update the old primary to be a replica and add it to the end of
1330 # the slot's node list
1331 old_primary.server_type = REPLICA
1332 self.slots_cache[e.slot_id].append(old_primary)
1333 # Remove the old replica, which is now a primary, from the slot's
1334 # node list
1335 self.slots_cache[e.slot_id].remove(redirected_node)
1336 # Override the old primary with the new one
1337 self.slots_cache[e.slot_id][0] = redirected_node
1338 if self.default_node == old_primary:
1339 # Update the default node with the new primary
1340 self.default_node = redirected_node
1341 else:
1342 # The new slot owner is a new server, or a server from a different
1343 # shard. We need to remove all current nodes from the slot's list
1344 # (including replications) and add just the new node.
1345 self.slots_cache[e.slot_id] = [redirected_node]
1346 # Reset moved_exception
1347 self._moved_exception = None
1349 def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
1350 """
1351 Gets a node that servers this hash slot
1352 """
1353 if self._moved_exception:
1354 with self._lock:
1355 if self._moved_exception:
1356 self._update_moved_slots()
1358 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
1359 raise SlotNotCoveredError(
1360 f'Slot "{slot}" not covered by the cluster. '
1361 f'"require_full_coverage={self._require_full_coverage}"'
1362 )
1364 if read_from_replicas is True:
1365 # get the server index in a Round-Robin manner
1366 primary_name = self.slots_cache[slot][0].name
1367 node_idx = self.read_load_balancer.get_server_index(
1368 primary_name, len(self.slots_cache[slot])
1369 )
1370 elif (
1371 server_type is None
1372 or server_type == PRIMARY
1373 or len(self.slots_cache[slot]) == 1
1374 ):
1375 # return a primary
1376 node_idx = 0
1377 else:
1378 # return a replica
1379 # randomly choose one of the replicas
1380 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
1382 return self.slots_cache[slot][node_idx]
1384 def get_nodes_by_server_type(self, server_type):
1385 """
1386 Get all nodes with the specified server type
1387 :param server_type: 'primary' or 'replica'
1388 :return: list of ClusterNode
1389 """
1390 return [
1391 node
1392 for node in self.nodes_cache.values()
1393 if node.server_type == server_type
1394 ]
1396 def populate_startup_nodes(self, nodes):
1397 """
1398 Populate all startup nodes and filters out any duplicates
1399 """
1400 for n in nodes:
1401 self.startup_nodes[n.name] = n
1403 def check_slots_coverage(self, slots_cache):
1404 # Validate if all slots are covered or if we should try next
1405 # startup node
1406 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
1407 if i not in slots_cache:
1408 return False
1409 return True
1411 def create_redis_connections(self, nodes):
1412 """
1413 This function will create a redis connection to all nodes in :nodes:
1414 """
1415 for node in nodes:
1416 if node.redis_connection is None:
1417 node.redis_connection = self.create_redis_node(
1418 host=node.host, port=node.port, **self.connection_kwargs
1419 )
1421 def create_redis_node(self, host, port, **kwargs):
1422 if self.from_url:
1423 # Create a redis node with a costumed connection pool
1424 kwargs.update({"host": host})
1425 kwargs.update({"port": port})
1426 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
1427 else:
1428 r = Redis(host=host, port=port, **kwargs)
1429 return r
1431 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
1432 node_name = get_node_name(host, port)
1433 # check if we already have this node in the tmp_nodes_cache
1434 target_node = tmp_nodes_cache.get(node_name)
1435 if target_node is None:
1436 # before creating a new cluster node, check if the cluster node already
1437 # exists in the current nodes cache and has a valid connection so we can
1438 # reuse it
1439 target_node = self.nodes_cache.get(node_name)
1440 if target_node is None or target_node.redis_connection is None:
1441 # create new cluster node for this cluster
1442 target_node = ClusterNode(host, port, role)
1443 if target_node.server_type != role:
1444 target_node.server_type = role
1446 return target_node
1448 def initialize(self):
1449 """
1450 Initializes the nodes cache, slots cache and redis connections.
1451 :startup_nodes:
1452 Responsible for discovering other nodes in the cluster
1453 """
1454 self.reset()
1455 tmp_nodes_cache = {}
1456 tmp_slots = {}
1457 disagreements = []
1458 startup_nodes_reachable = False
1459 fully_covered = False
1460 kwargs = self.connection_kwargs
1461 exception = None
1462 for startup_node in self.startup_nodes.values():
1463 try:
1464 if startup_node.redis_connection:
1465 r = startup_node.redis_connection
1466 else:
1467 # Create a new Redis connection
1468 r = self.create_redis_node(
1469 startup_node.host, startup_node.port, **kwargs
1470 )
1471 self.startup_nodes[startup_node.name].redis_connection = r
1472 # Make sure cluster mode is enabled on this node
1473 if bool(r.info().get("cluster_enabled")) is False:
1474 raise RedisClusterException(
1475 "Cluster mode is not enabled on this node"
1476 )
1477 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1478 startup_nodes_reachable = True
1479 except Exception as e:
1480 # Try the next startup node.
1481 # The exception is saved and raised only if we have no more nodes.
1482 exception = e
1483 continue
1485 # CLUSTER SLOTS command results in the following output:
1486 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
1487 # where each node contains the following list: [IP, port, node_id]
1488 # Therefore, cluster_slots[0][2][0] will be the IP address of the
1489 # primary node of the first slot section.
1490 # If there's only one server in the cluster, its ``host`` is ''
1491 # Fix it to the host in startup_nodes
1492 if (
1493 len(cluster_slots) == 1
1494 and len(cluster_slots[0][2][0]) == 0
1495 and len(self.startup_nodes) == 1
1496 ):
1497 cluster_slots[0][2][0] = startup_node.host
1499 for slot in cluster_slots:
1500 primary_node = slot[2]
1501 host = str_if_bytes(primary_node[0])
1502 if host == "":
1503 host = startup_node.host
1504 port = int(primary_node[1])
1506 target_node = self._get_or_create_cluster_node(
1507 host, port, PRIMARY, tmp_nodes_cache
1508 )
1509 # add this node to the nodes cache
1510 tmp_nodes_cache[target_node.name] = target_node
1512 for i in range(int(slot[0]), int(slot[1]) + 1):
1513 if i not in tmp_slots:
1514 tmp_slots[i] = []
1515 tmp_slots[i].append(target_node)
1516 replica_nodes = [slot[j] for j in range(3, len(slot))]
1518 for replica_node in replica_nodes:
1519 host = str_if_bytes(replica_node[0])
1520 port = replica_node[1]
1522 target_replica_node = self._get_or_create_cluster_node(
1523 host, port, REPLICA, tmp_nodes_cache
1524 )
1525 tmp_slots[i].append(target_replica_node)
1526 # add this node to the nodes cache
1527 tmp_nodes_cache[
1528 target_replica_node.name
1529 ] = target_replica_node
1530 else:
1531 # Validate that 2 nodes want to use the same slot cache
1532 # setup
1533 tmp_slot = tmp_slots[i][0]
1534 if tmp_slot.name != target_node.name:
1535 disagreements.append(
1536 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
1537 )
1539 if len(disagreements) > 5:
1540 raise RedisClusterException(
1541 f"startup_nodes could not agree on a valid "
1542 f'slots cache: {", ".join(disagreements)}'
1543 )
1545 fully_covered = self.check_slots_coverage(tmp_slots)
1546 if fully_covered:
1547 # Don't need to continue to the next startup node if all
1548 # slots are covered
1549 break
1551 if not startup_nodes_reachable:
1552 raise RedisClusterException(
1553 f"Redis Cluster cannot be connected. Please provide at least "
1554 f"one reachable node: {str(exception)}"
1555 ) from exception
1557 # Create Redis connections to all nodes
1558 self.create_redis_connections(list(tmp_nodes_cache.values()))
1560 # Check if the slots are not fully covered
1561 if not fully_covered and self._require_full_coverage:
1562 # Despite the requirement that the slots be covered, there
1563 # isn't a full coverage
1564 raise RedisClusterException(
1565 f"All slots are not covered after query all startup_nodes. "
1566 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
1567 f"covered..."
1568 )
1570 # Set the tmp variables to the real variables
1571 self.nodes_cache = tmp_nodes_cache
1572 self.slots_cache = tmp_slots
1573 # Set the default node
1574 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
1575 if self._dynamic_startup_nodes:
1576 # Populate the startup nodes with all discovered nodes
1577 self.startup_nodes = tmp_nodes_cache
1578 # If initialize was called after a MovedError, clear it
1579 self._moved_exception = None
1581 def close(self):
1582 self.default_node = None
1583 for node in self.nodes_cache.values():
1584 if node.redis_connection:
1585 node.redis_connection.close()
1587 def reset(self):
1588 try:
1589 self.read_load_balancer.reset()
1590 except TypeError:
1591 # The read_load_balancer is None, do nothing
1592 pass
1595class ClusterPubSub(PubSub):
1596 """
1597 Wrapper for PubSub class.
1599 IMPORTANT: before using ClusterPubSub, read about the known limitations
1600 with pubsub in Cluster mode and learn how to workaround them:
1601 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
1602 """
1604 def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs):
1605 """
1606 When a pubsub instance is created without specifying a node, a single
1607 node will be transparently chosen for the pubsub connection on the
1608 first command execution. The node will be determined by:
1609 1. Hashing the channel name in the request to find its keyslot
1610 2. Selecting a node that handles the keyslot: If read_from_replicas is
1611 set to true, a replica can be selected.
1613 :type redis_cluster: RedisCluster
1614 :type node: ClusterNode
1615 :type host: str
1616 :type port: int
1617 """
1618 self.node = None
1619 self.set_pubsub_node(redis_cluster, node, host, port)
1620 connection_pool = (
1621 None
1622 if self.node is None
1623 else redis_cluster.get_redis_connection(self.node).connection_pool
1624 )
1625 self.cluster = redis_cluster
1626 super().__init__(
1627 **kwargs, connection_pool=connection_pool, encoder=redis_cluster.encoder
1628 )
1630 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
1631 """
1632 The pubsub node will be set according to the passed node, host and port
1633 When none of the node, host, or port are specified - the node is set
1634 to None and will be determined by the keyslot of the channel in the
1635 first command to be executed.
1636 RedisClusterException will be thrown if the passed node does not exist
1637 in the cluster.
1638 If host is passed without port, or vice versa, a DataError will be
1639 thrown.
1640 :type cluster: RedisCluster
1641 :type node: ClusterNode
1642 :type host: str
1643 :type port: int
1644 """
1645 if node is not None:
1646 # node is passed by the user
1647 self._raise_on_invalid_node(cluster, node, node.host, node.port)
1648 pubsub_node = node
1649 elif host is not None and port is not None:
1650 # host and port passed by the user
1651 node = cluster.get_node(host=host, port=port)
1652 self._raise_on_invalid_node(cluster, node, host, port)
1653 pubsub_node = node
1654 elif any([host, port]) is True:
1655 # only 'host' or 'port' passed
1656 raise DataError("Passing a host requires passing a port, and vice versa")
1657 else:
1658 # nothing passed by the user. set node to None
1659 pubsub_node = None
1661 self.node = pubsub_node
1663 def get_pubsub_node(self):
1664 """
1665 Get the node that is being used as the pubsub connection
1666 """
1667 return self.node
1669 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
1670 """
1671 Raise a RedisClusterException if the node is None or doesn't exist in
1672 the cluster.
1673 """
1674 if node is None or redis_cluster.get_node(node_name=node.name) is None:
1675 raise RedisClusterException(
1676 f"Node {host}:{port} doesn't exist in the cluster"
1677 )
1679 def execute_command(self, *args, **kwargs):
1680 """
1681 Execute a publish/subscribe command.
1683 Taken code from redis-py and tweak to make it work within a cluster.
1684 """
1685 # NOTE: don't parse the response in this function -- it could pull a
1686 # legitimate message off the stack if the connection is already
1687 # subscribed to one or more channels
1689 if self.connection is None:
1690 if self.connection_pool is None:
1691 if len(args) > 1:
1692 # Hash the first channel and get one of the nodes holding
1693 # this slot
1694 channel = args[1]
1695 slot = self.cluster.keyslot(channel)
1696 node = self.cluster.nodes_manager.get_node_from_slot(
1697 slot, self.cluster.read_from_replicas
1698 )
1699 else:
1700 # Get a random node
1701 node = self.cluster.get_random_node()
1702 self.node = node
1703 redis_connection = self.cluster.get_redis_connection(node)
1704 self.connection_pool = redis_connection.connection_pool
1705 self.connection = self.connection_pool.get_connection(
1706 "pubsub", self.shard_hint
1707 )
1708 # register a callback that re-subscribes to any channels we
1709 # were listening to when we were disconnected
1710 self.connection.register_connect_callback(self.on_connect)
1711 connection = self.connection
1712 self._execute(connection, connection.send_command, *args)
1714 def get_redis_connection(self):
1715 """
1716 Get the Redis connection of the pubsub connected node.
1717 """
1718 if self.node is not None:
1719 return self.node.redis_connection
1722class ClusterPipeline(RedisCluster):
1723 """
1724 Support for Redis pipeline
1725 in cluster mode
1726 """
1728 ERRORS_ALLOW_RETRY = (
1729 ConnectionError,
1730 TimeoutError,
1731 MovedError,
1732 AskError,
1733 TryAgainError,
1734 )
1736 def __init__(
1737 self,
1738 nodes_manager: "NodesManager",
1739 commands_parser: "CommandsParser",
1740 result_callbacks: Optional[Dict[str, Callable]] = None,
1741 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
1742 startup_nodes: Optional[List["ClusterNode"]] = None,
1743 read_from_replicas: bool = False,
1744 cluster_error_retry_attempts: int = 3,
1745 reinitialize_steps: int = 5,
1746 lock=None,
1747 **kwargs,
1748 ):
1749 """ """
1750 self.command_stack = []
1751 self.nodes_manager = nodes_manager
1752 self.commands_parser = commands_parser
1753 self.refresh_table_asap = False
1754 self.result_callbacks = (
1755 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
1756 )
1757 self.startup_nodes = startup_nodes if startup_nodes else []
1758 self.read_from_replicas = read_from_replicas
1759 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
1760 self.cluster_response_callbacks = cluster_response_callbacks
1761 self.cluster_error_retry_attempts = cluster_error_retry_attempts
1762 self.reinitialize_counter = 0
1763 self.reinitialize_steps = reinitialize_steps
1764 self.encoder = Encoder(
1765 kwargs.get("encoding", "utf-8"),
1766 kwargs.get("encoding_errors", "strict"),
1767 kwargs.get("decode_responses", False),
1768 )
1769 if lock is None:
1770 lock = threading.Lock()
1771 self._lock = lock
1773 def __repr__(self):
1774 """ """
1775 return f"{type(self).__name__}"
1777 def __enter__(self):
1778 """ """
1779 return self
1781 def __exit__(self, exc_type, exc_value, traceback):
1782 """ """
1783 self.reset()
1785 def __del__(self):
1786 try:
1787 self.reset()
1788 except Exception:
1789 pass
1791 def __len__(self):
1792 """ """
1793 return len(self.command_stack)
1795 def __bool__(self):
1796 "Pipeline instances should always evaluate to True on Python 3+"
1797 return True
1799 def execute_command(self, *args, **kwargs):
1800 """
1801 Wrapper function for pipeline_execute_command
1802 """
1803 return self.pipeline_execute_command(*args, **kwargs)
1805 def pipeline_execute_command(self, *args, **options):
1806 """
1807 Appends the executed command to the pipeline's command stack
1808 """
1809 self.command_stack.append(
1810 PipelineCommand(args, options, len(self.command_stack))
1811 )
1812 return self
1814 def raise_first_error(self, stack):
1815 """
1816 Raise the first exception on the stack
1817 """
1818 for c in stack:
1819 r = c.result
1820 if isinstance(r, Exception):
1821 self.annotate_exception(r, c.position + 1, c.args)
1822 raise r
1824 def annotate_exception(self, exception, number, command):
1825 """
1826 Provides extra context to the exception prior to it being handled
1827 """
1828 cmd = " ".join(map(safe_str, command))
1829 msg = (
1830 f"Command # {number} ({cmd}) of pipeline "
1831 f"caused error: {exception.args[0]}"
1832 )
1833 exception.args = (msg,) + exception.args[1:]
1835 def execute(self, raise_on_error=True):
1836 """
1837 Execute all the commands in the current pipeline
1838 """
1839 stack = self.command_stack
1840 try:
1841 return self.send_cluster_commands(stack, raise_on_error)
1842 finally:
1843 self.reset()
1845 def reset(self):
1846 """
1847 Reset back to empty pipeline.
1848 """
1849 self.command_stack = []
1851 self.scripts = set()
1853 # TODO: Implement
1854 # make sure to reset the connection state in the event that we were
1855 # watching something
1856 # if self.watching and self.connection:
1857 # try:
1858 # # call this manually since our unwatch or
1859 # # immediate_execute_command methods can call reset()
1860 # self.connection.send_command('UNWATCH')
1861 # self.connection.read_response()
1862 # except ConnectionError:
1863 # # disconnect will also remove any previous WATCHes
1864 # self.connection.disconnect()
1866 # clean up the other instance attributes
1867 self.watching = False
1868 self.explicit_transaction = False
1870 # TODO: Implement
1871 # we can safely return the connection to the pool here since we're
1872 # sure we're no longer WATCHing anything
1873 # if self.connection:
1874 # self.connection_pool.release(self.connection)
1875 # self.connection = None
1877 def send_cluster_commands(
1878 self, stack, raise_on_error=True, allow_redirections=True
1879 ):
1880 """
1881 Wrapper for CLUSTERDOWN error handling.
1883 If the cluster reports it is down it is assumed that:
1884 - connection_pool was disconnected
1885 - connection_pool was reseted
1886 - refereh_table_asap set to True
1888 It will try the number of times specified by
1889 the config option "self.cluster_error_retry_attempts"
1890 which defaults to 3 unless manually configured.
1892 If it reaches the number of times, the command will
1893 raises ClusterDownException.
1894 """
1895 if not stack:
1896 return []
1897 retry_attempts = self.cluster_error_retry_attempts
1898 while True:
1899 try:
1900 return self._send_cluster_commands(
1901 stack,
1902 raise_on_error=raise_on_error,
1903 allow_redirections=allow_redirections,
1904 )
1905 except (ClusterDownError, ConnectionError) as e:
1906 if retry_attempts > 0:
1907 # Try again with the new cluster setup. All other errors
1908 # should be raised.
1909 retry_attempts -= 1
1910 pass
1911 else:
1912 raise e
1914 def _send_cluster_commands(
1915 self, stack, raise_on_error=True, allow_redirections=True
1916 ):
1917 """
1918 Send a bunch of cluster commands to the redis cluster.
1920 `allow_redirections` If the pipeline should follow
1921 `ASK` & `MOVED` responses automatically. If set
1922 to false it will raise RedisClusterException.
1923 """
1924 # the first time sending the commands we send all of
1925 # the commands that were queued up.
1926 # if we have to run through it again, we only retry
1927 # the commands that failed.
1928 attempt = sorted(stack, key=lambda x: x.position)
1929 is_default_node = False
1930 # build a list of node objects based on node names we need to
1931 nodes = {}
1933 # as we move through each command that still needs to be processed,
1934 # we figure out the slot number that command maps to, then from
1935 # the slot determine the node.
1936 for c in attempt:
1937 while True:
1938 # refer to our internal node -> slot table that
1939 # tells us where a given command should route to.
1940 # (it might be possible we have a cached node that no longer
1941 # exists in the cluster, which is why we do this in a loop)
1942 passed_targets = c.options.pop("target_nodes", None)
1943 if passed_targets and not self._is_nodes_flag(passed_targets):
1944 target_nodes = self._parse_target_nodes(passed_targets)
1945 else:
1946 target_nodes = self._determine_nodes(
1947 *c.args, node_flag=passed_targets
1948 )
1949 if not target_nodes:
1950 raise RedisClusterException(
1951 f"No targets were found to execute {c.args} command on"
1952 )
1953 if len(target_nodes) > 1:
1954 raise RedisClusterException(
1955 f"Too many targets for command {c.args}"
1956 )
1958 node = target_nodes[0]
1959 if node == self.get_default_node():
1960 is_default_node = True
1962 # now that we know the name of the node
1963 # ( it's just a string in the form of host:port )
1964 # we can build a list of commands for each node.
1965 node_name = node.name
1966 if node_name not in nodes:
1967 redis_node = self.get_redis_connection(node)
1968 try:
1969 connection = get_connection(redis_node, c.args)
1970 except ConnectionError:
1971 # Connection retries are being handled in the node's
1972 # Retry object. Reinitialize the node -> slot table.
1973 self.nodes_manager.initialize()
1974 if is_default_node:
1975 self.replace_default_node()
1976 raise
1977 nodes[node_name] = NodeCommands(
1978 redis_node.parse_response,
1979 redis_node.connection_pool,
1980 connection,
1981 )
1982 nodes[node_name].append(c)
1983 break
1985 # send the commands in sequence.
1986 # we write to all the open sockets for each node first,
1987 # before reading anything
1988 # this allows us to flush all the requests out across the
1989 # network essentially in parallel
1990 # so that we can read them all in parallel as they come back.
1991 # we dont' multiplex on the sockets as they come available,
1992 # but that shouldn't make too much difference.
1993 node_commands = nodes.values()
1994 for n in node_commands:
1995 n.write()
1997 for n in node_commands:
1998 n.read()
2000 # release all of the redis connections we allocated earlier
2001 # back into the connection pool.
2002 # we used to do this step as part of a try/finally block,
2003 # but it is really dangerous to
2004 # release connections back into the pool if for some
2005 # reason the socket has data still left in it
2006 # from a previous operation. The write and
2007 # read operations already have try/catch around them for
2008 # all known types of errors including connection
2009 # and socket level errors.
2010 # So if we hit an exception, something really bad
2011 # happened and putting any oF
2012 # these connections back into the pool is a very bad idea.
2013 # the socket might have unread buffer still sitting in it,
2014 # and then the next time we read from it we pass the
2015 # buffered result back from a previous command and
2016 # every single request after to that connection will always get
2017 # a mismatched result.
2018 for n in nodes.values():
2019 n.connection_pool.release(n.connection)
2021 # if the response isn't an exception it is a
2022 # valid response from the node
2023 # we're all done with that command, YAY!
2024 # if we have more commands to attempt, we've run into problems.
2025 # collect all the commands we are allowed to retry.
2026 # (MOVED, ASK, or connection errors or timeout errors)
2027 attempt = sorted(
2028 (
2029 c
2030 for c in attempt
2031 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2032 ),
2033 key=lambda x: x.position,
2034 )
2035 if attempt and allow_redirections:
2036 # RETRY MAGIC HAPPENS HERE!
2037 # send these remaing commands one at a time using `execute_command`
2038 # in the main client. This keeps our retry logic
2039 # in one place mostly,
2040 # and allows us to be more confident in correctness of behavior.
2041 # at this point any speed gains from pipelining have been lost
2042 # anyway, so we might as well make the best
2043 # attempt to get the correct behavior.
2044 #
2045 # The client command will handle retries for each
2046 # individual command sequentially as we pass each
2047 # one into `execute_command`. Any exceptions
2048 # that bubble out should only appear once all
2049 # retries have been exhausted.
2050 #
2051 # If a lot of commands have failed, we'll be setting the
2052 # flag to rebuild the slots table from scratch.
2053 # So MOVED errors should correct themselves fairly quickly.
2054 self.reinitialize_counter += 1
2055 if self._should_reinitialized():
2056 self.nodes_manager.initialize()
2057 if is_default_node:
2058 self.replace_default_node()
2059 for c in attempt:
2060 try:
2061 # send each command individually like we
2062 # do in the main client.
2063 c.result = super().execute_command(*c.args, **c.options)
2064 except RedisError as e:
2065 c.result = e
2067 # turn the response back into a simple flat array that corresponds
2068 # to the sequence of commands issued in the stack in pipeline.execute()
2069 response = []
2070 for c in sorted(stack, key=lambda x: x.position):
2071 if c.args[0] in self.cluster_response_callbacks:
2072 c.result = self.cluster_response_callbacks[c.args[0]](
2073 c.result, **c.options
2074 )
2075 response.append(c.result)
2077 if raise_on_error:
2078 self.raise_first_error(stack)
2080 return response
2082 def _fail_on_redirect(self, allow_redirections):
2083 """ """
2084 if not allow_redirections:
2085 raise RedisClusterException(
2086 "ASK & MOVED redirection not allowed in this pipeline"
2087 )
2089 def exists(self, *keys):
2090 return self.execute_command("EXISTS", *keys)
2092 def eval(self):
2093 """ """
2094 raise RedisClusterException("method eval() is not implemented")
2096 def multi(self):
2097 """ """
2098 raise RedisClusterException("method multi() is not implemented")
2100 def immediate_execute_command(self, *args, **options):
2101 """ """
2102 raise RedisClusterException(
2103 "method immediate_execute_command() is not implemented"
2104 )
2106 def _execute_transaction(self, *args, **kwargs):
2107 """ """
2108 raise RedisClusterException("method _execute_transaction() is not implemented")
2110 def load_scripts(self):
2111 """ """
2112 raise RedisClusterException("method load_scripts() is not implemented")
2114 def watch(self, *names):
2115 """ """
2116 raise RedisClusterException("method watch() is not implemented")
2118 def unwatch(self):
2119 """ """
2120 raise RedisClusterException("method unwatch() is not implemented")
2122 def script_load_for_pipeline(self, *args, **kwargs):
2123 """ """
2124 raise RedisClusterException(
2125 "method script_load_for_pipeline() is not implemented"
2126 )
2128 def delete(self, *names):
2129 """
2130 "Delete a key specified by ``names``"
2131 """
2132 if len(names) != 1:
2133 raise RedisClusterException(
2134 "deleting multiple keys is not implemented in pipeline command"
2135 )
2137 return self.execute_command("DEL", names[0])
2139 def unlink(self, *names):
2140 """
2141 "Unlink a key specified by ``names``"
2142 """
2143 if len(names) != 1:
2144 raise RedisClusterException(
2145 "unlinking multiple keys is not implemented in pipeline command"
2146 )
2148 return self.execute_command("UNLINK", names[0])
2151def block_pipeline_command(name: str) -> Callable[..., Any]:
2152 """
2153 Prints error because some pipelined commands should
2154 be blocked when running in cluster-mode
2155 """
2157 def inner(*args, **kwargs):
2158 raise RedisClusterException(
2159 f"ERROR: Calling pipelined function {name} is blocked "
2160 f"when running redis in cluster mode..."
2161 )
2163 return inner
2166# Blocked pipeline commands
2167PIPELINE_BLOCKED_COMMANDS = (
2168 "BGREWRITEAOF",
2169 "BGSAVE",
2170 "BITOP",
2171 "BRPOPLPUSH",
2172 "CLIENT GETNAME",
2173 "CLIENT KILL",
2174 "CLIENT LIST",
2175 "CLIENT SETNAME",
2176 "CLIENT",
2177 "CONFIG GET",
2178 "CONFIG RESETSTAT",
2179 "CONFIG REWRITE",
2180 "CONFIG SET",
2181 "CONFIG",
2182 "DBSIZE",
2183 "ECHO",
2184 "EVALSHA",
2185 "FLUSHALL",
2186 "FLUSHDB",
2187 "INFO",
2188 "KEYS",
2189 "LASTSAVE",
2190 "MGET",
2191 "MGET NONATOMIC",
2192 "MOVE",
2193 "MSET",
2194 "MSET NONATOMIC",
2195 "MSETNX",
2196 "PFCOUNT",
2197 "PFMERGE",
2198 "PING",
2199 "PUBLISH",
2200 "RANDOMKEY",
2201 "READONLY",
2202 "READWRITE",
2203 "RENAME",
2204 "RENAMENX",
2205 "RPOPLPUSH",
2206 "SAVE",
2207 "SCAN",
2208 "SCRIPT EXISTS",
2209 "SCRIPT FLUSH",
2210 "SCRIPT KILL",
2211 "SCRIPT LOAD",
2212 "SCRIPT",
2213 "SDIFF",
2214 "SDIFFSTORE",
2215 "SENTINEL GET MASTER ADDR BY NAME",
2216 "SENTINEL MASTER",
2217 "SENTINEL MASTERS",
2218 "SENTINEL MONITOR",
2219 "SENTINEL REMOVE",
2220 "SENTINEL SENTINELS",
2221 "SENTINEL SET",
2222 "SENTINEL SLAVES",
2223 "SENTINEL",
2224 "SHUTDOWN",
2225 "SINTER",
2226 "SINTERSTORE",
2227 "SLAVEOF",
2228 "SLOWLOG GET",
2229 "SLOWLOG LEN",
2230 "SLOWLOG RESET",
2231 "SLOWLOG",
2232 "SMOVE",
2233 "SORT",
2234 "SUNION",
2235 "SUNIONSTORE",
2236 "TIME",
2237)
2238for command in PIPELINE_BLOCKED_COMMANDS:
2239 command = command.replace(" ", "_").lower()
2241 setattr(ClusterPipeline, command, block_pipeline_command(command))
2244class PipelineCommand:
2245 """ """
2247 def __init__(self, args, options=None, position=None):
2248 self.args = args
2249 if options is None:
2250 options = {}
2251 self.options = options
2252 self.position = position
2253 self.result = None
2254 self.node = None
2255 self.asking = False
2258class NodeCommands:
2259 """ """
2261 def __init__(self, parse_response, connection_pool, connection):
2262 """ """
2263 self.parse_response = parse_response
2264 self.connection_pool = connection_pool
2265 self.connection = connection
2266 self.commands = []
2268 def append(self, c):
2269 """ """
2270 self.commands.append(c)
2272 def write(self):
2273 """
2274 Code borrowed from Redis so it can be fixed
2275 """
2276 connection = self.connection
2277 commands = self.commands
2279 # We are going to clobber the commands with the write, so go ahead
2280 # and ensure that nothing is sitting there from a previous run.
2281 for c in commands:
2282 c.result = None
2284 # build up all commands into a single request to increase network perf
2285 # send all the commands and catch connection and timeout errors.
2286 try:
2287 connection.send_packed_command(
2288 connection.pack_commands([c.args for c in commands])
2289 )
2290 except (ConnectionError, TimeoutError) as e:
2291 for c in commands:
2292 c.result = e
2294 def read(self):
2295 """ """
2296 connection = self.connection
2297 for c in self.commands:
2299 # if there is a result on this command,
2300 # it means we ran into an exception
2301 # like a connection error. Trying to parse
2302 # a response on a connection that
2303 # is no longer open will result in a
2304 # connection error raised by redis-py.
2305 # but redis-py doesn't check in parse_response
2306 # that the sock object is
2307 # still set and if you try to
2308 # read from a closed connection, it will
2309 # result in an AttributeError because
2310 # it will do a readline() call on None.
2311 # This can have all kinds of nasty side-effects.
2312 # Treating this case as a connection error
2313 # is fine because it will dump
2314 # the connection object back into the
2315 # pool and on the next write, it will
2316 # explicitly open the connection and all will be well.
2317 if c.result is None:
2318 try:
2319 c.result = self.parse_response(connection, c.args[0], **c.options)
2320 except (ConnectionError, TimeoutError) as e:
2321 for c in self.commands:
2322 c.result = e
2323 return
2324 except RedisError:
2325 c.result = sys.exc_info()[1]