Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import random
2import socket
3import sys
4import threading
5import time
6from abc import ABC, abstractmethod
7from collections import OrderedDict
8from copy import copy
9from enum import Enum
10from itertools import chain
11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
13from redis._parsers import CommandsParser, Encoder
14from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy
15from redis._parsers.helpers import parse_scan
16from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
17from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
18from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
19from redis.commands import READ_COMMANDS, RedisClusterCommands
20from redis.commands.helpers import list_or_args
21from redis.commands.policies import PolicyResolver, StaticPolicyResolver
22from redis.connection import (
23 Connection,
24 ConnectionPool,
25 parse_url,
26)
27from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
28from redis.event import (
29 AfterPooledConnectionsInstantiationEvent,
30 AfterPubSubConnectionInstantiationEvent,
31 ClientType,
32 EventDispatcher,
33)
34from redis.exceptions import (
35 AskError,
36 AuthenticationError,
37 ClusterDownError,
38 ClusterError,
39 ConnectionError,
40 CrossSlotTransactionError,
41 DataError,
42 ExecAbortError,
43 InvalidPipelineStack,
44 MaxConnectionsError,
45 MovedError,
46 RedisClusterException,
47 RedisError,
48 ResponseError,
49 SlotNotCoveredError,
50 TimeoutError,
51 TryAgainError,
52 WatchError,
53)
54from redis.lock import Lock
55from redis.maint_notifications import MaintNotificationsConfig
56from redis.retry import Retry
57from redis.utils import (
58 deprecated_args,
59 dict_merge,
60 list_keys_to_dict,
61 merge_result,
62 safe_str,
63 str_if_bytes,
64 truncate_text,
65)
68def get_node_name(host: str, port: Union[str, int]) -> str:
69 return f"{host}:{port}"
72@deprecated_args(
73 allowed_args=["redis_node"],
74 reason="Use get_connection(redis_node) instead",
75 version="5.3.0",
76)
77def get_connection(redis_node: Redis, *args, **options) -> Connection:
78 return redis_node.connection or redis_node.connection_pool.get_connection()
81def parse_scan_result(command, res, **options):
82 cursors = {}
83 ret = []
84 for node_name, response in res.items():
85 cursor, r = parse_scan(response, **options)
86 cursors[node_name] = cursor
87 ret += r
89 return cursors, ret
92def parse_pubsub_numsub(command, res, **options):
93 numsub_d = OrderedDict()
94 for numsub_tups in res.values():
95 for channel, numsubbed in numsub_tups:
96 try:
97 numsub_d[channel] += numsubbed
98 except KeyError:
99 numsub_d[channel] = numsubbed
101 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
102 return ret_numsub
105def parse_cluster_slots(
106 resp: Any, **options: Any
107) -> Dict[Tuple[int, int], Dict[str, Any]]:
108 current_host = options.get("current_host", "")
110 def fix_server(*args: Any) -> Tuple[str, Any]:
111 return str_if_bytes(args[0]) or current_host, args[1]
113 slots = {}
114 for slot in resp:
115 start, end, primary = slot[:3]
116 replicas = slot[3:]
117 slots[start, end] = {
118 "primary": fix_server(*primary),
119 "replicas": [fix_server(*replica) for replica in replicas],
120 }
122 return slots
125def parse_cluster_shards(resp, **options):
126 """
127 Parse CLUSTER SHARDS response.
128 """
129 if isinstance(resp[0], dict):
130 return resp
131 shards = []
132 for x in resp:
133 shard = {"slots": [], "nodes": []}
134 for i in range(0, len(x[1]), 2):
135 shard["slots"].append((x[1][i], (x[1][i + 1])))
136 nodes = x[3]
137 for node in nodes:
138 dict_node = {}
139 for i in range(0, len(node), 2):
140 dict_node[node[i]] = node[i + 1]
141 shard["nodes"].append(dict_node)
142 shards.append(shard)
144 return shards
147def parse_cluster_myshardid(resp, **options):
148 """
149 Parse CLUSTER MYSHARDID response.
150 """
151 return resp.decode("utf-8")
154PRIMARY = "primary"
155REPLICA = "replica"
156SLOT_ID = "slot-id"
158REDIS_ALLOWED_KEYS = (
159 "connection_class",
160 "connection_pool",
161 "connection_pool_class",
162 "client_name",
163 "credential_provider",
164 "db",
165 "decode_responses",
166 "encoding",
167 "encoding_errors",
168 "host",
169 "lib_name",
170 "lib_version",
171 "max_connections",
172 "nodes_flag",
173 "redis_connect_func",
174 "password",
175 "port",
176 "timeout",
177 "queue_class",
178 "retry",
179 "retry_on_timeout",
180 "protocol",
181 "socket_connect_timeout",
182 "socket_keepalive",
183 "socket_keepalive_options",
184 "socket_timeout",
185 "ssl",
186 "ssl_ca_certs",
187 "ssl_ca_data",
188 "ssl_certfile",
189 "ssl_cert_reqs",
190 "ssl_include_verify_flags",
191 "ssl_exclude_verify_flags",
192 "ssl_keyfile",
193 "ssl_password",
194 "ssl_check_hostname",
195 "unix_socket_path",
196 "username",
197 "cache",
198 "cache_config",
199)
200KWARGS_DISABLED_KEYS = ("host", "port", "retry")
203def cleanup_kwargs(**kwargs):
204 """
205 Remove unsupported or disabled keys from kwargs
206 """
207 connection_kwargs = {
208 k: v
209 for k, v in kwargs.items()
210 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
211 }
213 return connection_kwargs
216class AbstractRedisCluster:
217 RedisClusterRequestTTL = 16
219 PRIMARIES = "primaries"
220 REPLICAS = "replicas"
221 ALL_NODES = "all"
222 RANDOM = "random"
223 DEFAULT_NODE = "default-node"
225 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
227 COMMAND_FLAGS = dict_merge(
228 list_keys_to_dict(
229 [
230 "ACL CAT",
231 "ACL DELUSER",
232 "ACL DRYRUN",
233 "ACL GENPASS",
234 "ACL GETUSER",
235 "ACL HELP",
236 "ACL LIST",
237 "ACL LOG",
238 "ACL LOAD",
239 "ACL SAVE",
240 "ACL SETUSER",
241 "ACL USERS",
242 "ACL WHOAMI",
243 "AUTH",
244 "CLIENT LIST",
245 "CLIENT SETINFO",
246 "CLIENT SETNAME",
247 "CLIENT GETNAME",
248 "CONFIG SET",
249 "CONFIG REWRITE",
250 "CONFIG RESETSTAT",
251 "TIME",
252 "PUBSUB CHANNELS",
253 "PUBSUB NUMPAT",
254 "PUBSUB NUMSUB",
255 "PUBSUB SHARDCHANNELS",
256 "PUBSUB SHARDNUMSUB",
257 "PING",
258 "INFO",
259 "SHUTDOWN",
260 "KEYS",
261 "DBSIZE",
262 "BGSAVE",
263 "SLOWLOG GET",
264 "SLOWLOG LEN",
265 "SLOWLOG RESET",
266 "WAIT",
267 "WAITAOF",
268 "SAVE",
269 "MEMORY PURGE",
270 "MEMORY MALLOC-STATS",
271 "MEMORY STATS",
272 "LASTSAVE",
273 "CLIENT TRACKINGINFO",
274 "CLIENT PAUSE",
275 "CLIENT UNPAUSE",
276 "CLIENT UNBLOCK",
277 "CLIENT ID",
278 "CLIENT REPLY",
279 "CLIENT GETREDIR",
280 "CLIENT INFO",
281 "CLIENT KILL",
282 "READONLY",
283 "CLUSTER INFO",
284 "CLUSTER MEET",
285 "CLUSTER MYSHARDID",
286 "CLUSTER NODES",
287 "CLUSTER REPLICAS",
288 "CLUSTER RESET",
289 "CLUSTER SET-CONFIG-EPOCH",
290 "CLUSTER SLOTS",
291 "CLUSTER SHARDS",
292 "CLUSTER COUNT-FAILURE-REPORTS",
293 "CLUSTER KEYSLOT",
294 "COMMAND",
295 "COMMAND COUNT",
296 "COMMAND LIST",
297 "COMMAND GETKEYS",
298 "CONFIG GET",
299 "DEBUG",
300 "RANDOMKEY",
301 "READONLY",
302 "READWRITE",
303 "TIME",
304 "TFUNCTION LOAD",
305 "TFUNCTION DELETE",
306 "TFUNCTION LIST",
307 "TFCALL",
308 "TFCALLASYNC",
309 "LATENCY HISTORY",
310 "LATENCY LATEST",
311 "LATENCY RESET",
312 "MODULE LIST",
313 "MODULE LOAD",
314 "MODULE UNLOAD",
315 "MODULE LOADEX",
316 ],
317 DEFAULT_NODE,
318 ),
319 list_keys_to_dict(
320 [
321 "FLUSHALL",
322 "FLUSHDB",
323 "FUNCTION DELETE",
324 "FUNCTION FLUSH",
325 "FUNCTION LIST",
326 "FUNCTION LOAD",
327 "FUNCTION RESTORE",
328 "SCAN",
329 "SCRIPT EXISTS",
330 "SCRIPT FLUSH",
331 "SCRIPT LOAD",
332 ],
333 PRIMARIES,
334 ),
335 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
336 list_keys_to_dict(
337 [
338 "CLUSTER COUNTKEYSINSLOT",
339 "CLUSTER DELSLOTS",
340 "CLUSTER DELSLOTSRANGE",
341 "CLUSTER GETKEYSINSLOT",
342 "CLUSTER SETSLOT",
343 ],
344 SLOT_ID,
345 ),
346 )
348 SEARCH_COMMANDS = (
349 [
350 "FT.CREATE",
351 "FT.SEARCH",
352 "FT.AGGREGATE",
353 "FT.EXPLAIN",
354 "FT.EXPLAINCLI",
355 "FT,PROFILE",
356 "FT.ALTER",
357 "FT.DROPINDEX",
358 "FT.ALIASADD",
359 "FT.ALIASUPDATE",
360 "FT.ALIASDEL",
361 "FT.TAGVALS",
362 "FT.SUGADD",
363 "FT.SUGGET",
364 "FT.SUGDEL",
365 "FT.SUGLEN",
366 "FT.SYNUPDATE",
367 "FT.SYNDUMP",
368 "FT.SPELLCHECK",
369 "FT.DICTADD",
370 "FT.DICTDEL",
371 "FT.DICTDUMP",
372 "FT.INFO",
373 "FT._LIST",
374 "FT.CONFIG",
375 "FT.ADD",
376 "FT.DEL",
377 "FT.DROP",
378 "FT.GET",
379 "FT.MGET",
380 "FT.SYNADD",
381 ],
382 )
384 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
385 "CLUSTER SLOTS": parse_cluster_slots,
386 "CLUSTER SHARDS": parse_cluster_shards,
387 "CLUSTER MYSHARDID": parse_cluster_myshardid,
388 }
390 RESULT_CALLBACKS = dict_merge(
391 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
392 list_keys_to_dict(
393 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
394 ),
395 list_keys_to_dict(
396 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
397 ),
398 list_keys_to_dict(
399 [
400 "PING",
401 "CONFIG SET",
402 "CONFIG REWRITE",
403 "CONFIG RESETSTAT",
404 "CLIENT SETNAME",
405 "BGSAVE",
406 "SLOWLOG RESET",
407 "SAVE",
408 "MEMORY PURGE",
409 "CLIENT PAUSE",
410 "CLIENT UNPAUSE",
411 ],
412 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
413 ),
414 list_keys_to_dict(
415 ["DBSIZE", "WAIT"],
416 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
417 ),
418 list_keys_to_dict(
419 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
420 ),
421 list_keys_to_dict(["SCAN"], parse_scan_result),
422 list_keys_to_dict(
423 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
424 ),
425 list_keys_to_dict(
426 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
427 ),
428 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
429 )
431 ERRORS_ALLOW_RETRY = (
432 ConnectionError,
433 TimeoutError,
434 ClusterDownError,
435 SlotNotCoveredError,
436 )
438 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
439 """Replace the default cluster node.
440 A random cluster node will be chosen if target_node isn't passed, and primaries
441 will be prioritized. The default node will not be changed if there are no other
442 nodes in the cluster.
444 Args:
445 target_node (ClusterNode, optional): Target node to replace the default
446 node. Defaults to None.
447 """
448 if target_node:
449 self.nodes_manager.default_node = target_node
450 else:
451 curr_node = self.get_default_node()
452 primaries = [node for node in self.get_primaries() if node != curr_node]
453 if primaries:
454 # Choose a primary if the cluster contains different primaries
455 self.nodes_manager.default_node = random.choice(primaries)
456 else:
457 # Otherwise, choose a primary if the cluster contains different primaries
458 replicas = [node for node in self.get_replicas() if node != curr_node]
459 if replicas:
460 self.nodes_manager.default_node = random.choice(replicas)
463class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
464 @classmethod
465 def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
466 """
467 Return a Redis client object configured from the given URL
469 For example::
471 redis://[[username]:[password]]@localhost:6379/0
472 rediss://[[username]:[password]]@localhost:6379/0
473 unix://[username@]/path/to/socket.sock?db=0[&password=password]
475 Three URL schemes are supported:
477 - `redis://` creates a TCP socket connection. See more at:
478 <https://www.iana.org/assignments/uri-schemes/prov/redis>
479 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
480 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
481 - ``unix://``: creates a Unix Domain Socket connection.
483 The username, password, hostname, path and all querystring values
484 are passed through urllib.parse.unquote in order to replace any
485 percent-encoded values with their corresponding characters.
487 There are several ways to specify a database number. The first value
488 found will be used:
490 1. A ``db`` querystring option, e.g. redis://localhost?db=0
491 2. If using the redis:// or rediss:// schemes, the path argument
492 of the url, e.g. redis://localhost/0
493 3. A ``db`` keyword argument to this function.
495 If none of these options are specified, the default db=0 is used.
497 All querystring options are cast to their appropriate Python types.
498 Boolean arguments can be specified with string values "True"/"False"
499 or "Yes"/"No". Values that cannot be properly cast cause a
500 ``ValueError`` to be raised. Once parsed, the querystring arguments
501 and keyword arguments are passed to the ``ConnectionPool``'s
502 class initializer. In the case of conflicting arguments, querystring
503 arguments always win.
505 """
506 return cls(url=url, **kwargs)
508 @deprecated_args(
509 args_to_warn=["read_from_replicas"],
510 reason="Please configure the 'load_balancing_strategy' instead",
511 version="5.3.0",
512 )
513 @deprecated_args(
514 args_to_warn=[
515 "cluster_error_retry_attempts",
516 ],
517 reason="Please configure the 'retry' object instead",
518 version="6.0.0",
519 )
520 def __init__(
521 self,
522 host: Optional[str] = None,
523 port: int = 6379,
524 startup_nodes: Optional[List["ClusterNode"]] = None,
525 cluster_error_retry_attempts: int = 3,
526 retry: Optional["Retry"] = None,
527 require_full_coverage: bool = True,
528 reinitialize_steps: int = 5,
529 read_from_replicas: bool = False,
530 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
531 dynamic_startup_nodes: bool = True,
532 url: Optional[str] = None,
533 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
534 cache: Optional[CacheInterface] = None,
535 cache_config: Optional[CacheConfig] = None,
536 event_dispatcher: Optional[EventDispatcher] = None,
537 policy_resolver: PolicyResolver = StaticPolicyResolver(),
538 **kwargs,
539 ):
540 """
541 Initialize a new RedisCluster client.
543 :param startup_nodes:
544 List of nodes from which initial bootstrapping can be done
545 :param host:
546 Can be used to point to a startup node
547 :param port:
548 Can be used to point to a startup node
549 :param require_full_coverage:
550 When set to False (default value): the client will not require a
551 full coverage of the slots. However, if not all slots are covered,
552 and at least one node has 'cluster-require-full-coverage' set to
553 'yes,' the server will throw a ClusterDownError for some key-based
554 commands. See -
555 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
556 When set to True: all slots must be covered to construct the
557 cluster client. If not all slots are covered, RedisClusterException
558 will be thrown.
559 :param read_from_replicas:
560 @deprecated - please use load_balancing_strategy instead
561 Enable read from replicas in READONLY mode. You can read possibly
562 stale data.
563 When set to true, read commands will be assigned between the
564 primary and its replications in a Round-Robin manner.
565 :param load_balancing_strategy:
566 Enable read from replicas in READONLY mode and defines the load balancing
567 strategy that will be used for cluster node selection.
568 The data read from replicas is eventually consistent with the data in primary nodes.
569 :param dynamic_startup_nodes:
570 Set the RedisCluster's startup nodes to all of the discovered nodes.
571 If true (default value), the cluster's discovered nodes will be used to
572 determine the cluster nodes-slots mapping in the next topology refresh.
573 It will remove the initial passed startup nodes if their endpoints aren't
574 listed in the CLUSTER SLOTS output.
575 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
576 specific IP addresses, it is best to set it to false.
577 :param cluster_error_retry_attempts:
578 @deprecated - Please configure the 'retry' object instead
579 In case 'retry' object is set - this argument is ignored!
581 Number of times to retry before raising an error when
582 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
583 :class:`~.ClusterDownError` are encountered
584 :param retry:
585 A retry object that defines the retry strategy and the number of
586 retries for the cluster client.
587 In current implementation for the cluster client (starting form redis-py version 6.0.0)
588 the retry object is not yet fully utilized, instead it is used just to determine
589 the number of retries for the cluster client.
590 In the future releases the retry object will be used to handle the cluster client retries!
591 :param reinitialize_steps:
592 Specifies the number of MOVED errors that need to occur before
593 reinitializing the whole cluster topology. If a MOVED error occurs
594 and the cluster does not need to be reinitialized on this current
595 error handling, only the MOVED slot will be patched with the
596 redirected node.
597 To reinitialize the cluster on every MOVED error, set
598 reinitialize_steps to 1.
599 To avoid reinitializing the cluster on moved errors, set
600 reinitialize_steps to 0.
601 :param address_remap:
602 An optional callable which, when provided with an internal network
603 address of a node, e.g. a `(host, port)` tuple, will return the address
604 where the node is reachable. This can be used to map the addresses at
605 which the nodes _think_ they are, to addresses at which a client may
606 reach them, such as when they sit behind a proxy.
608 :**kwargs:
609 Extra arguments that will be sent into Redis instance when created
610 (See Official redis-py doc for supported kwargs - the only limitation
611 is that you can't provide 'retry' object as part of kwargs.
612 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
613 Some kwargs are not supported and will raise a
614 RedisClusterException:
615 - db (Redis do not support database SELECT in cluster mode)
616 """
617 if startup_nodes is None:
618 startup_nodes = []
620 if "db" in kwargs:
621 # Argument 'db' is not possible to use in cluster mode
622 raise RedisClusterException(
623 "Argument 'db' is not possible to use in cluster mode"
624 )
626 if "retry" in kwargs:
627 # Argument 'retry' is not possible to be used in kwargs when in cluster mode
628 # the kwargs are set to the lower level connections to the cluster nodes
629 # and there we provide retry configuration without retries allowed.
630 # The retries should be handled on cluster client level.
631 raise RedisClusterException(
632 "The 'retry' argument cannot be used in kwargs when running in cluster mode."
633 )
635 # Get the startup node/s
636 from_url = False
637 if url is not None:
638 from_url = True
639 url_options = parse_url(url)
640 if "path" in url_options:
641 raise RedisClusterException(
642 "RedisCluster does not currently support Unix Domain "
643 "Socket connections"
644 )
645 if "db" in url_options and url_options["db"] != 0:
646 # Argument 'db' is not possible to use in cluster mode
647 raise RedisClusterException(
648 "A ``db`` querystring option can only be 0 in cluster mode"
649 )
650 kwargs.update(url_options)
651 host = kwargs.get("host")
652 port = kwargs.get("port", port)
653 startup_nodes.append(ClusterNode(host, port))
654 elif host is not None and port is not None:
655 startup_nodes.append(ClusterNode(host, port))
656 elif len(startup_nodes) == 0:
657 # No startup node was provided
658 raise RedisClusterException(
659 "RedisCluster requires at least one node to discover the "
660 "cluster. Please provide one of the followings:\n"
661 "1. host and port, for example:\n"
662 " RedisCluster(host='localhost', port=6379)\n"
663 "2. list of startup nodes, for example:\n"
664 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
665 " ClusterNode('localhost', 6378)])"
666 )
667 # Update the connection arguments
668 # Whenever a new connection is established, RedisCluster's on_connect
669 # method should be run
670 # If the user passed on_connect function we'll save it and run it
671 # inside the RedisCluster.on_connect() function
672 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
673 kwargs.update({"redis_connect_func": self.on_connect})
674 kwargs = cleanup_kwargs(**kwargs)
675 if retry:
676 self.retry = retry
677 else:
678 self.retry = Retry(
679 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
680 retries=cluster_error_retry_attempts,
681 )
683 self.encoder = Encoder(
684 kwargs.get("encoding", "utf-8"),
685 kwargs.get("encoding_errors", "strict"),
686 kwargs.get("decode_responses", False),
687 )
688 protocol = kwargs.get("protocol", None)
689 if (cache_config or cache) and protocol not in [3, "3"]:
690 raise RedisError("Client caching is only supported with RESP version 3")
692 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
693 self.node_flags = self.__class__.NODE_FLAGS.copy()
694 self.read_from_replicas = read_from_replicas
695 self.load_balancing_strategy = load_balancing_strategy
696 self.reinitialize_counter = 0
697 self.reinitialize_steps = reinitialize_steps
698 if event_dispatcher is None:
699 self._event_dispatcher = EventDispatcher()
700 else:
701 self._event_dispatcher = event_dispatcher
702 self.startup_nodes = startup_nodes
703 self.nodes_manager = NodesManager(
704 startup_nodes=startup_nodes,
705 from_url=from_url,
706 require_full_coverage=require_full_coverage,
707 dynamic_startup_nodes=dynamic_startup_nodes,
708 address_remap=address_remap,
709 cache=cache,
710 cache_config=cache_config,
711 event_dispatcher=self._event_dispatcher,
712 **kwargs,
713 )
715 self.cluster_response_callbacks = CaseInsensitiveDict(
716 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
717 )
718 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
720 # For backward compatibility, mapping from existing policies to new one
721 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
722 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
723 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
724 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
725 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
726 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
727 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
728 }
730 self._policies_callback_mapping: dict[
731 Union[RequestPolicy, ResponsePolicy], Callable
732 ] = {
733 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
734 self.get_random_primary_or_all_nodes(command_name)
735 ],
736 RequestPolicy.DEFAULT_KEYED: lambda command,
737 *args: self.get_nodes_from_slot(command, *args),
738 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
739 RequestPolicy.ALL_SHARDS: self.get_primaries,
740 RequestPolicy.ALL_NODES: self.get_nodes,
741 RequestPolicy.ALL_REPLICAS: self.get_replicas,
742 RequestPolicy.MULTI_SHARD: lambda *args,
743 **kwargs: self._split_multi_shard_command(*args, **kwargs),
744 RequestPolicy.SPECIAL: self.get_special_nodes,
745 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
746 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
747 }
749 self._policy_resolver = policy_resolver
750 self.commands_parser = CommandsParser(self)
752 # Node where FT.AGGREGATE command is executed.
753 self._aggregate_nodes = None
754 self._lock = threading.RLock()
756 def __enter__(self):
757 return self
759 def __exit__(self, exc_type, exc_value, traceback):
760 self.close()
762 def __del__(self):
763 try:
764 self.close()
765 except Exception:
766 pass
768 def disconnect_connection_pools(self):
769 for node in self.get_nodes():
770 if node.redis_connection:
771 try:
772 node.redis_connection.connection_pool.disconnect()
773 except OSError:
774 # Client was already disconnected. do nothing
775 pass
777 def on_connect(self, connection):
778 """
779 Initialize the connection, authenticate and select a database and send
780 READONLY if it is set during object initialization.
781 """
782 connection.on_connect()
784 if self.read_from_replicas or self.load_balancing_strategy:
785 # Sending READONLY command to server to configure connection as
786 # readonly. Since each cluster node may change its server type due
787 # to a failover, we should establish a READONLY connection
788 # regardless of the server type. If this is a primary connection,
789 # READONLY would not affect executing write commands.
790 connection.send_command("READONLY")
791 if str_if_bytes(connection.read_response()) != "OK":
792 raise ConnectionError("READONLY command failed")
794 if self.user_on_connect_func is not None:
795 self.user_on_connect_func(connection)
797 def get_redis_connection(self, node: "ClusterNode") -> Redis:
798 if not node.redis_connection:
799 with self._lock:
800 if not node.redis_connection:
801 self.nodes_manager.create_redis_connections([node])
802 return node.redis_connection
804 def get_node(self, host=None, port=None, node_name=None):
805 return self.nodes_manager.get_node(host, port, node_name)
807 def get_primaries(self):
808 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
810 def get_replicas(self):
811 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
813 def get_random_node(self):
814 return random.choice(list(self.nodes_manager.nodes_cache.values()))
816 def get_random_primary_or_all_nodes(self, command_name):
817 """
818 Returns random primary or all nodes depends on READONLY mode.
819 """
820 if self.read_from_replicas and command_name in READ_COMMANDS:
821 return self.get_random_node()
823 return self.get_random_primary_node()
825 def get_nodes(self):
826 return list(self.nodes_manager.nodes_cache.values())
828 def get_node_from_key(self, key, replica=False):
829 """
830 Get the node that holds the key's slot.
831 If replica set to True but the slot doesn't have any replicas, None is
832 returned.
833 """
834 slot = self.keyslot(key)
835 slot_cache = self.nodes_manager.slots_cache.get(slot)
836 if slot_cache is None or len(slot_cache) == 0:
837 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
838 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
839 return None
840 elif replica:
841 node_idx = 1
842 else:
843 # primary
844 node_idx = 0
846 return slot_cache[node_idx]
848 def get_default_node(self):
849 """
850 Get the cluster's default node
851 """
852 return self.nodes_manager.default_node
854 def get_nodes_from_slot(self, command: str, *args):
855 """
856 Returns a list of nodes that hold the specified keys' slots.
857 """
858 # get the node that holds the key's slot
859 slot = self.determine_slot(*args)
860 node = self.nodes_manager.get_node_from_slot(
861 slot,
862 self.read_from_replicas and command in READ_COMMANDS,
863 self.load_balancing_strategy if command in READ_COMMANDS else None,
864 )
865 return [node]
867 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]:
868 """
869 Splits the command with Multi-Shard policy, to the multiple commands
870 """
871 keys = self._get_command_keys(*args)
872 commands = []
874 for key in keys:
875 commands.append(
876 {
877 "args": (args[0], key),
878 "kwargs": kwargs,
879 }
880 )
882 return commands
884 def get_special_nodes(self) -> Optional[list["ClusterNode"]]:
885 """
886 Returns a list of nodes for commands with a special policy.
887 """
888 if not self._aggregate_nodes:
889 raise RedisClusterException(
890 "Cannot execute FT.CURSOR commands without FT.AGGREGATE"
891 )
893 return self._aggregate_nodes
895 def get_random_primary_node(self) -> "ClusterNode":
896 """
897 Returns a random primary node
898 """
899 return random.choice(self.get_primaries())
901 def _evaluate_all_succeeded(self, res):
902 """
903 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED
904 """
905 first_successful_response = None
907 if isinstance(res, dict):
908 for key, value in res.items():
909 if value:
910 if first_successful_response is None:
911 first_successful_response = {key: value}
912 else:
913 return {key: False}
914 else:
915 for response in res:
916 if response:
917 if first_successful_response is None:
918 # Dynamically resolve type
919 first_successful_response = type(response)(response)
920 else:
921 return type(response)(False)
923 return first_successful_response
925 def set_default_node(self, node):
926 """
927 Set the default node of the cluster.
928 :param node: 'ClusterNode'
929 :return True if the default node was set, else False
930 """
931 if node is None or self.get_node(node_name=node.name) is None:
932 return False
933 self.nodes_manager.default_node = node
934 return True
936 def set_retry(self, retry: Retry) -> None:
937 self.retry = retry
939 def monitor(self, target_node=None):
940 """
941 Returns a Monitor object for the specified target node.
942 The default cluster node will be selected if no target node was
943 specified.
944 Monitor is useful for handling the MONITOR command to the redis server.
945 next_command() method returns one command from monitor
946 listen() method yields commands from monitor.
947 """
948 if target_node is None:
949 target_node = self.get_default_node()
950 if target_node.redis_connection is None:
951 raise RedisClusterException(
952 f"Cluster Node {target_node.name} has no redis_connection"
953 )
954 return target_node.redis_connection.monitor()
956 def pubsub(self, node=None, host=None, port=None, **kwargs):
957 """
958 Allows passing a ClusterNode, or host&port, to get a pubsub instance
959 connected to the specified node
960 """
961 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
963 def pipeline(self, transaction=None, shard_hint=None):
964 """
965 Cluster impl:
966 Pipelines do not work in cluster mode the same way they
967 do in normal mode. Create a clone of this object so
968 that simulating pipelines will work correctly. Each
969 command will be called directly when used and
970 when calling execute() will only return the result stack.
971 """
972 if shard_hint:
973 raise RedisClusterException("shard_hint is deprecated in cluster mode")
975 return ClusterPipeline(
976 nodes_manager=self.nodes_manager,
977 commands_parser=self.commands_parser,
978 startup_nodes=self.nodes_manager.startup_nodes,
979 result_callbacks=self.result_callbacks,
980 cluster_response_callbacks=self.cluster_response_callbacks,
981 read_from_replicas=self.read_from_replicas,
982 load_balancing_strategy=self.load_balancing_strategy,
983 reinitialize_steps=self.reinitialize_steps,
984 retry=self.retry,
985 lock=self._lock,
986 transaction=transaction,
987 )
989 def lock(
990 self,
991 name,
992 timeout=None,
993 sleep=0.1,
994 blocking=True,
995 blocking_timeout=None,
996 lock_class=None,
997 thread_local=True,
998 raise_on_release_error: bool = True,
999 ):
1000 """
1001 Return a new Lock object using key ``name`` that mimics
1002 the behavior of threading.Lock.
1004 If specified, ``timeout`` indicates a maximum life for the lock.
1005 By default, it will remain locked until release() is called.
1007 ``sleep`` indicates the amount of time to sleep per loop iteration
1008 when the lock is in blocking mode and another client is currently
1009 holding the lock.
1011 ``blocking`` indicates whether calling ``acquire`` should block until
1012 the lock has been acquired or to fail immediately, causing ``acquire``
1013 to return False and the lock not being acquired. Defaults to True.
1014 Note this value can be overridden by passing a ``blocking``
1015 argument to ``acquire``.
1017 ``blocking_timeout`` indicates the maximum amount of time in seconds to
1018 spend trying to acquire the lock. A value of ``None`` indicates
1019 continue trying forever. ``blocking_timeout`` can be specified as a
1020 float or integer, both representing the number of seconds to wait.
1022 ``lock_class`` forces the specified lock implementation. Note that as
1023 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
1024 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
1025 you have created your own custom lock class.
1027 ``thread_local`` indicates whether the lock token is placed in
1028 thread-local storage. By default, the token is placed in thread local
1029 storage so that a thread only sees its token, not a token set by
1030 another thread. Consider the following timeline:
1032 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
1033 thread-1 sets the token to "abc"
1034 time: 1, thread-2 blocks trying to acquire `my-lock` using the
1035 Lock instance.
1036 time: 5, thread-1 has not yet completed. redis expires the lock
1037 key.
1038 time: 5, thread-2 acquired `my-lock` now that it's available.
1039 thread-2 sets the token to "xyz"
1040 time: 6, thread-1 finishes its work and calls release(). if the
1041 token is *not* stored in thread local storage, then
1042 thread-1 would see the token value as "xyz" and would be
1043 able to successfully release the thread-2's lock.
1045 ``raise_on_release_error`` indicates whether to raise an exception when
1046 the lock is no longer owned when exiting the context manager. By default,
1047 this is True, meaning an exception will be raised. If False, the warning
1048 will be logged and the exception will be suppressed.
1050 In some use cases it's necessary to disable thread local storage. For
1051 example, if you have code where one thread acquires a lock and passes
1052 that lock instance to a worker thread to release later. If thread
1053 local storage isn't disabled in this case, the worker thread won't see
1054 the token set by the thread that acquired the lock. Our assumption
1055 is that these cases aren't common and as such default to using
1056 thread local storage."""
1057 if lock_class is None:
1058 lock_class = Lock
1059 return lock_class(
1060 self,
1061 name,
1062 timeout=timeout,
1063 sleep=sleep,
1064 blocking=blocking,
1065 blocking_timeout=blocking_timeout,
1066 thread_local=thread_local,
1067 raise_on_release_error=raise_on_release_error,
1068 )
1070 def set_response_callback(self, command, callback):
1071 """Set a custom Response Callback"""
1072 self.cluster_response_callbacks[command] = callback
1074 def _determine_nodes(
1075 self, *args, request_policy: RequestPolicy, **kwargs
1076 ) -> List["ClusterNode"]:
1077 """
1078 Determines a nodes the command should be executed on.
1079 """
1080 command = args[0].upper()
1081 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1082 command = f"{args[0]} {args[1]}".upper()
1084 nodes_flag = kwargs.pop("nodes_flag", None)
1085 if nodes_flag is not None:
1086 # nodes flag passed by the user
1087 command_flag = nodes_flag
1088 else:
1089 # get the nodes group for this command if it was predefined
1090 command_flag = self.command_flags.get(command)
1092 if command_flag in self._command_flags_mapping:
1093 request_policy = self._command_flags_mapping[command_flag]
1095 policy_callback = self._policies_callback_mapping[request_policy]
1097 if request_policy == RequestPolicy.DEFAULT_KEYED:
1098 nodes = policy_callback(command, *args)
1099 elif request_policy == RequestPolicy.MULTI_SHARD:
1100 nodes = policy_callback(*args, **kwargs)
1101 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
1102 nodes = policy_callback(args[0])
1103 else:
1104 nodes = policy_callback()
1106 if args[0].lower() == "ft.aggregate":
1107 self._aggregate_nodes = nodes
1109 return nodes
1111 def _should_reinitialized(self):
1112 # To reinitialize the cluster on every MOVED error,
1113 # set reinitialize_steps to 1.
1114 # To avoid reinitializing the cluster on moved errors, set
1115 # reinitialize_steps to 0.
1116 if self.reinitialize_steps == 0:
1117 return False
1118 else:
1119 return self.reinitialize_counter % self.reinitialize_steps == 0
1121 def keyslot(self, key):
1122 """
1123 Calculate keyslot for a given key.
1124 See Keys distribution model in https://redis.io/topics/cluster-spec
1125 """
1126 k = self.encoder.encode(key)
1127 return key_slot(k)
1129 def _get_command_keys(self, *args):
1130 """
1131 Get the keys in the command. If the command has no keys in in, None is
1132 returned.
1134 NOTE: Due to a bug in redis<7.0, this function does not work properly
1135 for EVAL or EVALSHA when the `numkeys` arg is 0.
1136 - issue: https://github.com/redis/redis/issues/9493
1137 - fix: https://github.com/redis/redis/pull/9733
1139 So, don't use this function with EVAL or EVALSHA.
1140 """
1141 redis_conn = self.get_default_node().redis_connection
1142 return self.commands_parser.get_keys(redis_conn, *args)
1144 def determine_slot(self, *args) -> Optional[int]:
1145 """
1146 Figure out what slot to use based on args.
1148 Raises a RedisClusterException if there's a missing key and we can't
1149 determine what slots to map the command to; or, if the keys don't
1150 all map to the same key slot.
1151 """
1152 command = args[0]
1153 if self.command_flags.get(command) == SLOT_ID:
1154 # The command contains the slot ID
1155 return args[1]
1157 # Get the keys in the command
1159 # CLIENT TRACKING is a special case.
1160 # It doesn't have any keys, it needs to be sent to the provided nodes
1161 # By default it will be sent to all nodes.
1162 if command.upper() == "CLIENT TRACKING":
1163 return None
1165 # EVAL and EVALSHA are common enough that it's wasteful to go to the
1166 # redis server to parse the keys. Besides, there is a bug in redis<7.0
1167 # where `self._get_command_keys()` fails anyway. So, we special case
1168 # EVAL/EVALSHA.
1169 if command.upper() in ("EVAL", "EVALSHA"):
1170 # command syntax: EVAL "script body" num_keys ...
1171 if len(args) <= 2:
1172 raise RedisClusterException(f"Invalid args in command: {args}")
1173 num_actual_keys = int(args[2])
1174 eval_keys = args[3 : 3 + num_actual_keys]
1175 # if there are 0 keys, that means the script can be run on any node
1176 # so we can just return a random slot
1177 if len(eval_keys) == 0:
1178 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1179 keys = eval_keys
1180 else:
1181 keys = self._get_command_keys(*args)
1182 if keys is None or len(keys) == 0:
1183 # FCALL can call a function with 0 keys, that means the function
1184 # can be run on any node so we can just return a random slot
1185 if command.upper() in ("FCALL", "FCALL_RO"):
1186 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1187 raise RedisClusterException(
1188 "No way to dispatch this command to Redis Cluster. "
1189 "Missing key.\nYou can execute the command by specifying "
1190 f"target nodes.\nCommand: {args}"
1191 )
1193 # single key command
1194 if len(keys) == 1:
1195 return self.keyslot(keys[0])
1197 # multi-key command; we need to make sure all keys are mapped to
1198 # the same slot
1199 slots = {self.keyslot(key) for key in keys}
1200 if len(slots) != 1:
1201 raise RedisClusterException(
1202 f"{command} - all keys must map to the same key slot"
1203 )
1205 return slots.pop()
1207 def get_encoder(self):
1208 """
1209 Get the connections' encoder
1210 """
1211 return self.encoder
1213 def get_connection_kwargs(self):
1214 """
1215 Get the connections' key-word arguments
1216 """
1217 return self.nodes_manager.connection_kwargs
1219 def _is_nodes_flag(self, target_nodes):
1220 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1222 def _parse_target_nodes(self, target_nodes):
1223 if isinstance(target_nodes, list):
1224 nodes = target_nodes
1225 elif isinstance(target_nodes, ClusterNode):
1226 # Supports passing a single ClusterNode as a variable
1227 nodes = [target_nodes]
1228 elif isinstance(target_nodes, dict):
1229 # Supports dictionaries of the format {node_name: node}.
1230 # It enables to execute commands with multi nodes as follows:
1231 # rc.cluster_save_config(rc.get_primaries())
1232 nodes = target_nodes.values()
1233 else:
1234 raise TypeError(
1235 "target_nodes type can be one of the following: "
1236 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1237 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1238 f"The passed type is {type(target_nodes)}"
1239 )
1240 return nodes
1242 def execute_command(self, *args, **kwargs):
1243 return self._internal_execute_command(*args, **kwargs)
1245 def _internal_execute_command(self, *args, **kwargs):
1246 """
1247 Wrapper for ERRORS_ALLOW_RETRY error handling.
1249 It will try the number of times specified by the retries property from
1250 config option "self.retry" which defaults to 3 unless manually
1251 configured.
1253 If it reaches the number of times, the command will raise the exception
1255 Key argument :target_nodes: can be passed with the following types:
1256 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1257 ClusterNode
1258 list<ClusterNode>
1259 dict<Any, ClusterNode>
1260 """
1261 target_nodes_specified = False
1262 is_default_node = False
1263 target_nodes = None
1264 passed_targets = kwargs.pop("target_nodes", None)
1265 command_policies = self._policy_resolver.resolve(args[0].lower())
1267 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1268 target_nodes = self._parse_target_nodes(passed_targets)
1269 target_nodes_specified = True
1271 if not command_policies and not target_nodes_specified:
1272 command = args[0].upper()
1273 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1274 command = f"{args[0]} {args[1]}".upper()
1276 # We only could resolve key properties if command is not
1277 # in a list of pre-defined request policies
1278 command_flag = self.command_flags.get(command)
1279 if not command_flag:
1280 # Fallback to default policy
1281 if not self.get_default_node():
1282 slot = None
1283 else:
1284 slot = self.determine_slot(*args)
1285 if not slot:
1286 command_policies = CommandPolicies()
1287 else:
1288 command_policies = CommandPolicies(
1289 request_policy=RequestPolicy.DEFAULT_KEYED,
1290 response_policy=ResponsePolicy.DEFAULT_KEYED,
1291 )
1292 else:
1293 if command_flag in self._command_flags_mapping:
1294 command_policies = CommandPolicies(
1295 request_policy=self._command_flags_mapping[command_flag]
1296 )
1297 else:
1298 command_policies = CommandPolicies()
1299 elif not command_policies and target_nodes_specified:
1300 command_policies = CommandPolicies()
1302 # If an error that allows retrying was thrown, the nodes and slots
1303 # cache were reinitialized. We will retry executing the command with
1304 # the updated cluster setup only when the target nodes can be
1305 # determined again with the new cache tables. Therefore, when target
1306 # nodes were passed to this function, we cannot retry the command
1307 # execution since the nodes may not be valid anymore after the tables
1308 # were reinitialized. So in case of passed target nodes,
1309 # retry_attempts will be set to 0.
1310 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
1311 # Add one for the first execution
1312 execute_attempts = 1 + retry_attempts
1313 for _ in range(execute_attempts):
1314 try:
1315 res = {}
1316 if not target_nodes_specified:
1317 # Determine the nodes to execute the command on
1318 target_nodes = self._determine_nodes(
1319 *args,
1320 request_policy=command_policies.request_policy,
1321 nodes_flag=passed_targets,
1322 )
1323 if not target_nodes:
1324 raise RedisClusterException(
1325 f"No targets were found to execute {args} command on"
1326 )
1327 if (
1328 len(target_nodes) == 1
1329 and target_nodes[0] == self.get_default_node()
1330 ):
1331 is_default_node = True
1332 for node in target_nodes:
1333 res[node.name] = self._execute_command(node, *args, **kwargs)
1335 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED:
1336 break
1338 # Return the processed result
1339 return self._process_result(
1340 args[0],
1341 res,
1342 response_policy=command_policies.response_policy,
1343 **kwargs,
1344 )
1345 except Exception as e:
1346 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1347 if is_default_node:
1348 # Replace the default cluster node
1349 self.replace_default_node()
1350 # The nodes and slots cache were reinitialized.
1351 # Try again with the new cluster setup.
1352 retry_attempts -= 1
1353 continue
1354 else:
1355 # raise the exception
1356 raise e
1358 def _execute_command(self, target_node, *args, **kwargs):
1359 """
1360 Send a command to a node in the cluster
1361 """
1362 command = args[0]
1363 redis_node = None
1364 connection = None
1365 redirect_addr = None
1366 asking = False
1367 moved = False
1368 ttl = int(self.RedisClusterRequestTTL)
1370 while ttl > 0:
1371 ttl -= 1
1372 try:
1373 if asking:
1374 target_node = self.get_node(node_name=redirect_addr)
1375 elif moved:
1376 # MOVED occurred and the slots cache was updated,
1377 # refresh the target node
1378 slot = self.determine_slot(*args)
1379 target_node = self.nodes_manager.get_node_from_slot(
1380 slot,
1381 self.read_from_replicas and command in READ_COMMANDS,
1382 self.load_balancing_strategy
1383 if command in READ_COMMANDS
1384 else None,
1385 )
1386 moved = False
1388 redis_node = self.get_redis_connection(target_node)
1389 connection = get_connection(redis_node)
1390 if asking:
1391 connection.send_command("ASKING")
1392 redis_node.parse_response(connection, "ASKING", **kwargs)
1393 asking = False
1394 connection.send_command(*args, **kwargs)
1395 response = redis_node.parse_response(connection, command, **kwargs)
1397 # Remove keys entry, it needs only for cache.
1398 kwargs.pop("keys", None)
1400 if command in self.cluster_response_callbacks:
1401 response = self.cluster_response_callbacks[command](
1402 response, **kwargs
1403 )
1404 return response
1405 except AuthenticationError:
1406 raise
1407 except MaxConnectionsError:
1408 # MaxConnectionsError indicates client-side resource exhaustion
1409 # (too many connections in the pool), not a node failure.
1410 # Don't treat this as a node failure - just re-raise the error
1411 # without reinitializing the cluster.
1412 raise
1413 except (ConnectionError, TimeoutError) as e:
1414 # ConnectionError can also be raised if we couldn't get a
1415 # connection from the pool before timing out, so check that
1416 # this is an actual connection before attempting to disconnect.
1417 if connection is not None:
1418 connection.disconnect()
1420 # Remove the failed node from the startup nodes before we try
1421 # to reinitialize the cluster
1422 self.nodes_manager.startup_nodes.pop(target_node.name, None)
1423 # Reset the cluster node's connection
1424 target_node.redis_connection = None
1425 self.nodes_manager.initialize()
1426 raise e
1427 except MovedError as e:
1428 # First, we will try to patch the slots/nodes cache with the
1429 # redirected node output and try again. If MovedError exceeds
1430 # 'reinitialize_steps' number of times, we will force
1431 # reinitializing the tables, and then try again.
1432 # 'reinitialize_steps' counter will increase faster when
1433 # the same client object is shared between multiple threads. To
1434 # reduce the frequency you can set this variable in the
1435 # RedisCluster constructor.
1436 self.reinitialize_counter += 1
1437 if self._should_reinitialized():
1438 self.nodes_manager.initialize()
1439 # Reset the counter
1440 self.reinitialize_counter = 0
1441 else:
1442 self.nodes_manager.update_moved_exception(e)
1443 moved = True
1444 except TryAgainError:
1445 if ttl < self.RedisClusterRequestTTL / 2:
1446 time.sleep(0.05)
1447 except AskError as e:
1448 redirect_addr = get_node_name(host=e.host, port=e.port)
1449 asking = True
1450 except (ClusterDownError, SlotNotCoveredError):
1451 # ClusterDownError can occur during a failover and to get
1452 # self-healed, we will try to reinitialize the cluster layout
1453 # and retry executing the command
1455 # SlotNotCoveredError can occur when the cluster is not fully
1456 # initialized or can be temporary issue.
1457 # We will try to reinitialize the cluster topology
1458 # and retry executing the command
1460 time.sleep(0.25)
1461 self.nodes_manager.initialize()
1462 raise
1463 except ResponseError:
1464 raise
1465 except Exception as e:
1466 if connection:
1467 connection.disconnect()
1468 raise e
1469 finally:
1470 if connection is not None:
1471 redis_node.connection_pool.release(connection)
1473 raise ClusterError("TTL exhausted.")
1475 def close(self) -> None:
1476 try:
1477 with self._lock:
1478 if self.nodes_manager:
1479 self.nodes_manager.close()
1480 except AttributeError:
1481 # RedisCluster's __init__ can fail before nodes_manager is set
1482 pass
1484 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs):
1485 """
1486 Process the result of the executed command.
1487 The function would return a dict or a single value.
1489 :type command: str
1490 :type res: dict
1492 `res` should be in the following format:
1493 Dict<node_name, command_result>
1494 """
1495 if command in self.result_callbacks:
1496 res = self.result_callbacks[command](command, res, **kwargs)
1497 elif len(res) == 1:
1498 # When we execute the command on a single node, we can
1499 # remove the dictionary and return a single response
1500 res = list(res.values())[0]
1502 return self._policies_callback_mapping[response_policy](res)
1504 def load_external_module(self, funcname, func):
1505 """
1506 This function can be used to add externally defined redis modules,
1507 and their namespaces to the redis client.
1509 ``funcname`` - A string containing the name of the function to create
1510 ``func`` - The function, being added to this class.
1511 """
1512 setattr(self, funcname, func)
1514 def transaction(self, func, *watches, **kwargs):
1515 """
1516 Convenience method for executing the callable `func` as a transaction
1517 while watching all keys specified in `watches`. The 'func' callable
1518 should expect a single argument which is a Pipeline object.
1519 """
1520 shard_hint = kwargs.pop("shard_hint", None)
1521 value_from_callable = kwargs.pop("value_from_callable", False)
1522 watch_delay = kwargs.pop("watch_delay", None)
1523 with self.pipeline(True, shard_hint) as pipe:
1524 while True:
1525 try:
1526 if watches:
1527 pipe.watch(*watches)
1528 func_value = func(pipe)
1529 exec_value = pipe.execute()
1530 return func_value if value_from_callable else exec_value
1531 except WatchError:
1532 if watch_delay is not None and watch_delay > 0:
1533 time.sleep(watch_delay)
1534 continue
1537class ClusterNode:
1538 def __init__(self, host, port, server_type=None, redis_connection=None):
1539 if host == "localhost":
1540 host = socket.gethostbyname(host)
1542 self.host = host
1543 self.port = port
1544 self.name = get_node_name(host, port)
1545 self.server_type = server_type
1546 self.redis_connection = redis_connection
1548 def __repr__(self):
1549 return (
1550 f"[host={self.host},"
1551 f"port={self.port},"
1552 f"name={self.name},"
1553 f"server_type={self.server_type},"
1554 f"redis_connection={self.redis_connection}]"
1555 )
1557 def __eq__(self, obj):
1558 return isinstance(obj, ClusterNode) and obj.name == self.name
1560 def __del__(self):
1561 try:
1562 if self.redis_connection is not None:
1563 self.redis_connection.close()
1564 except Exception:
1565 # Ignore errors when closing the connection
1566 pass
1569class LoadBalancingStrategy(Enum):
1570 ROUND_ROBIN = "round_robin"
1571 ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1572 RANDOM_REPLICA = "random_replica"
1575class LoadBalancer:
1576 """
1577 Round-Robin Load Balancing
1578 """
1580 def __init__(self, start_index: int = 0) -> None:
1581 self.primary_to_idx = {}
1582 self.start_index = start_index
1584 def get_server_index(
1585 self,
1586 primary: str,
1587 list_size: int,
1588 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1589 ) -> int:
1590 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1591 return self._get_random_replica_index(list_size)
1592 else:
1593 return self._get_round_robin_index(
1594 primary,
1595 list_size,
1596 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1597 )
1599 def reset(self) -> None:
1600 self.primary_to_idx.clear()
1602 def _get_random_replica_index(self, list_size: int) -> int:
1603 return random.randint(1, list_size - 1)
1605 def _get_round_robin_index(
1606 self, primary: str, list_size: int, replicas_only: bool
1607 ) -> int:
1608 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1609 if replicas_only and server_index == 0:
1610 # skip the primary node index
1611 server_index = 1
1612 # Update the index for the next round
1613 self.primary_to_idx[primary] = (server_index + 1) % list_size
1614 return server_index
1617class NodesManager:
1618 def __init__(
1619 self,
1620 startup_nodes,
1621 from_url=False,
1622 require_full_coverage=False,
1623 lock=None,
1624 dynamic_startup_nodes=True,
1625 connection_pool_class=ConnectionPool,
1626 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
1627 cache: Optional[CacheInterface] = None,
1628 cache_config: Optional[CacheConfig] = None,
1629 cache_factory: Optional[CacheFactoryInterface] = None,
1630 event_dispatcher: Optional[EventDispatcher] = None,
1631 **kwargs,
1632 ):
1633 self.nodes_cache: Dict[str, Redis] = {}
1634 self.slots_cache = {}
1635 self.startup_nodes = {}
1636 self.default_node = None
1637 self.populate_startup_nodes(startup_nodes)
1638 self.from_url = from_url
1639 self._require_full_coverage = require_full_coverage
1640 self._dynamic_startup_nodes = dynamic_startup_nodes
1641 self.connection_pool_class = connection_pool_class
1642 self.address_remap = address_remap
1643 self._cache = cache
1644 self._cache_config = cache_config
1645 self._cache_factory = cache_factory
1646 self._moved_exception = None
1647 self.connection_kwargs = kwargs
1648 self.read_load_balancer = LoadBalancer()
1649 if lock is None:
1650 lock = threading.RLock()
1651 self._lock = lock
1652 if event_dispatcher is None:
1653 self._event_dispatcher = EventDispatcher()
1654 else:
1655 self._event_dispatcher = event_dispatcher
1656 self._credential_provider = self.connection_kwargs.get(
1657 "credential_provider", None
1658 )
1659 self.initialize()
1661 def get_node(self, host=None, port=None, node_name=None):
1662 """
1663 Get the requested node from the cluster's nodes.
1664 nodes.
1665 :return: ClusterNode if the node exists, else None
1666 """
1667 if host and port:
1668 # the user passed host and port
1669 if host == "localhost":
1670 host = socket.gethostbyname(host)
1671 return self.nodes_cache.get(get_node_name(host=host, port=port))
1672 elif node_name:
1673 return self.nodes_cache.get(node_name)
1674 else:
1675 return None
1677 def update_moved_exception(self, exception):
1678 self._moved_exception = exception
1680 def _update_moved_slots(self):
1681 """
1682 Update the slot's node with the redirected one
1683 """
1684 e = self._moved_exception
1685 redirected_node = self.get_node(host=e.host, port=e.port)
1686 if redirected_node is not None:
1687 # The node already exists
1688 if redirected_node.server_type is not PRIMARY:
1689 # Update the node's server type
1690 redirected_node.server_type = PRIMARY
1691 else:
1692 # This is a new node, we will add it to the nodes cache
1693 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
1694 self.nodes_cache[redirected_node.name] = redirected_node
1695 if redirected_node in self.slots_cache[e.slot_id]:
1696 # The MOVED error resulted from a failover, and the new slot owner
1697 # had previously been a replica.
1698 old_primary = self.slots_cache[e.slot_id][0]
1699 # Update the old primary to be a replica and add it to the end of
1700 # the slot's node list
1701 old_primary.server_type = REPLICA
1702 self.slots_cache[e.slot_id].append(old_primary)
1703 # Remove the old replica, which is now a primary, from the slot's
1704 # node list
1705 self.slots_cache[e.slot_id].remove(redirected_node)
1706 # Override the old primary with the new one
1707 self.slots_cache[e.slot_id][0] = redirected_node
1708 if self.default_node == old_primary:
1709 # Update the default node with the new primary
1710 self.default_node = redirected_node
1711 else:
1712 # The new slot owner is a new server, or a server from a different
1713 # shard. We need to remove all current nodes from the slot's list
1714 # (including replications) and add just the new node.
1715 self.slots_cache[e.slot_id] = [redirected_node]
1716 # Reset moved_exception
1717 self._moved_exception = None
1719 @deprecated_args(
1720 args_to_warn=["server_type"],
1721 reason=(
1722 "In case you need select some load balancing strategy "
1723 "that will use replicas, please set it through 'load_balancing_strategy'"
1724 ),
1725 version="5.3.0",
1726 )
1727 def get_node_from_slot(
1728 self,
1729 slot,
1730 read_from_replicas=False,
1731 load_balancing_strategy=None,
1732 server_type=None,
1733 ) -> ClusterNode:
1734 """
1735 Gets a node that servers this hash slot
1736 """
1737 if self._moved_exception:
1738 with self._lock:
1739 if self._moved_exception:
1740 self._update_moved_slots()
1742 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
1743 raise SlotNotCoveredError(
1744 f'Slot "{slot}" not covered by the cluster. '
1745 f'"require_full_coverage={self._require_full_coverage}"'
1746 )
1748 if read_from_replicas is True and load_balancing_strategy is None:
1749 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1751 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1752 # get the server index using the strategy defined in load_balancing_strategy
1753 primary_name = self.slots_cache[slot][0].name
1754 node_idx = self.read_load_balancer.get_server_index(
1755 primary_name, len(self.slots_cache[slot]), load_balancing_strategy
1756 )
1757 elif (
1758 server_type is None
1759 or server_type == PRIMARY
1760 or len(self.slots_cache[slot]) == 1
1761 ):
1762 # return a primary
1763 node_idx = 0
1764 else:
1765 # return a replica
1766 # randomly choose one of the replicas
1767 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
1769 return self.slots_cache[slot][node_idx]
1771 def get_nodes_by_server_type(self, server_type):
1772 """
1773 Get all nodes with the specified server type
1774 :param server_type: 'primary' or 'replica'
1775 :return: list of ClusterNode
1776 """
1777 return [
1778 node
1779 for node in self.nodes_cache.values()
1780 if node.server_type == server_type
1781 ]
1783 def populate_startup_nodes(self, nodes):
1784 """
1785 Populate all startup nodes and filters out any duplicates
1786 """
1787 for n in nodes:
1788 self.startup_nodes[n.name] = n
1790 def check_slots_coverage(self, slots_cache):
1791 # Validate if all slots are covered or if we should try next
1792 # startup node
1793 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
1794 if i not in slots_cache:
1795 return False
1796 return True
1798 def create_redis_connections(self, nodes):
1799 """
1800 This function will create a redis connection to all nodes in :nodes:
1801 """
1802 connection_pools = []
1803 for node in nodes:
1804 if node.redis_connection is None:
1805 node.redis_connection = self.create_redis_node(
1806 host=node.host, port=node.port, **self.connection_kwargs
1807 )
1808 connection_pools.append(node.redis_connection.connection_pool)
1810 self._event_dispatcher.dispatch(
1811 AfterPooledConnectionsInstantiationEvent(
1812 connection_pools, ClientType.SYNC, self._credential_provider
1813 )
1814 )
1816 def create_redis_node(self, host, port, **kwargs):
1817 # We are configuring the connection pool not to retry
1818 # connections on lower level clients to avoid retrying
1819 # connections to nodes that are not reachable
1820 # and to avoid blocking the connection pool.
1821 # The only error that will have some handling in the lower
1822 # level clients is ConnectionError which will trigger disconnection
1823 # of the socket.
1824 # The retries will be handled on cluster client level
1825 # where we will have proper handling of the cluster topology
1826 node_retry_config = Retry(
1827 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
1828 )
1830 protocol = kwargs.get("protocol", None)
1831 if protocol in [3, "3"]:
1832 kwargs.update(
1833 {"maint_notifications_config": MaintNotificationsConfig(enabled=False)}
1834 )
1835 if self.from_url:
1836 # Create a redis node with a costumed connection pool
1837 kwargs.update({"host": host})
1838 kwargs.update({"port": port})
1839 kwargs.update({"cache": self._cache})
1840 kwargs.update({"retry": node_retry_config})
1841 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
1842 else:
1843 r = Redis(
1844 host=host,
1845 port=port,
1846 cache=self._cache,
1847 retry=node_retry_config,
1848 **kwargs,
1849 )
1850 return r
1852 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
1853 node_name = get_node_name(host, port)
1854 # check if we already have this node in the tmp_nodes_cache
1855 target_node = tmp_nodes_cache.get(node_name)
1856 if target_node is None:
1857 # before creating a new cluster node, check if the cluster node already
1858 # exists in the current nodes cache and has a valid connection so we can
1859 # reuse it
1860 target_node = self.nodes_cache.get(node_name)
1861 if target_node is None or target_node.redis_connection is None:
1862 # create new cluster node for this cluster
1863 target_node = ClusterNode(host, port, role)
1864 if target_node.server_type != role:
1865 target_node.server_type = role
1866 # add this node to the nodes cache
1867 tmp_nodes_cache[target_node.name] = target_node
1869 return target_node
1871 def initialize(self):
1872 """
1873 Initializes the nodes cache, slots cache and redis connections.
1874 :startup_nodes:
1875 Responsible for discovering other nodes in the cluster
1876 """
1877 self.reset()
1878 tmp_nodes_cache = {}
1879 tmp_slots = {}
1880 disagreements = []
1881 startup_nodes_reachable = False
1882 fully_covered = False
1883 kwargs = self.connection_kwargs
1884 exception = None
1885 # Convert to tuple to prevent RuntimeError if self.startup_nodes
1886 # is modified during iteration
1887 for startup_node in tuple(self.startup_nodes.values()):
1888 try:
1889 if startup_node.redis_connection:
1890 r = startup_node.redis_connection
1891 else:
1892 # Create a new Redis connection
1893 r = self.create_redis_node(
1894 startup_node.host, startup_node.port, **kwargs
1895 )
1896 self.startup_nodes[startup_node.name].redis_connection = r
1897 # Make sure cluster mode is enabled on this node
1898 try:
1899 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1900 r.connection_pool.disconnect()
1901 except ResponseError:
1902 raise RedisClusterException(
1903 "Cluster mode is not enabled on this node"
1904 )
1905 startup_nodes_reachable = True
1906 except Exception as e:
1907 # Try the next startup node.
1908 # The exception is saved and raised only if we have no more nodes.
1909 exception = e
1910 continue
1912 # CLUSTER SLOTS command results in the following output:
1913 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
1914 # where each node contains the following list: [IP, port, node_id]
1915 # Therefore, cluster_slots[0][2][0] will be the IP address of the
1916 # primary node of the first slot section.
1917 # If there's only one server in the cluster, its ``host`` is ''
1918 # Fix it to the host in startup_nodes
1919 if (
1920 len(cluster_slots) == 1
1921 and len(cluster_slots[0][2][0]) == 0
1922 and len(self.startup_nodes) == 1
1923 ):
1924 cluster_slots[0][2][0] = startup_node.host
1926 for slot in cluster_slots:
1927 primary_node = slot[2]
1928 host = str_if_bytes(primary_node[0])
1929 if host == "":
1930 host = startup_node.host
1931 port = int(primary_node[1])
1932 host, port = self.remap_host_port(host, port)
1934 nodes_for_slot = []
1936 target_node = self._get_or_create_cluster_node(
1937 host, port, PRIMARY, tmp_nodes_cache
1938 )
1939 nodes_for_slot.append(target_node)
1941 replica_nodes = slot[3:]
1942 for replica_node in replica_nodes:
1943 host = str_if_bytes(replica_node[0])
1944 port = int(replica_node[1])
1945 host, port = self.remap_host_port(host, port)
1946 target_replica_node = self._get_or_create_cluster_node(
1947 host, port, REPLICA, tmp_nodes_cache
1948 )
1949 nodes_for_slot.append(target_replica_node)
1951 for i in range(int(slot[0]), int(slot[1]) + 1):
1952 if i not in tmp_slots:
1953 tmp_slots[i] = nodes_for_slot
1954 else:
1955 # Validate that 2 nodes want to use the same slot cache
1956 # setup
1957 tmp_slot = tmp_slots[i][0]
1958 if tmp_slot.name != target_node.name:
1959 disagreements.append(
1960 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
1961 )
1963 if len(disagreements) > 5:
1964 raise RedisClusterException(
1965 f"startup_nodes could not agree on a valid "
1966 f"slots cache: {', '.join(disagreements)}"
1967 )
1969 fully_covered = self.check_slots_coverage(tmp_slots)
1970 if fully_covered:
1971 # Don't need to continue to the next startup node if all
1972 # slots are covered
1973 break
1975 if not startup_nodes_reachable:
1976 raise RedisClusterException(
1977 f"Redis Cluster cannot be connected. Please provide at least "
1978 f"one reachable node: {str(exception)}"
1979 ) from exception
1981 if self._cache is None and self._cache_config is not None:
1982 if self._cache_factory is None:
1983 self._cache = CacheFactory(self._cache_config).get_cache()
1984 else:
1985 self._cache = self._cache_factory.get_cache()
1987 # Create Redis connections to all nodes
1988 self.create_redis_connections(list(tmp_nodes_cache.values()))
1990 # Check if the slots are not fully covered
1991 if not fully_covered and self._require_full_coverage:
1992 # Despite the requirement that the slots be covered, there
1993 # isn't a full coverage
1994 raise RedisClusterException(
1995 f"All slots are not covered after query all startup_nodes. "
1996 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
1997 f"covered..."
1998 )
2000 # Set the tmp variables to the real variables
2001 self.nodes_cache = tmp_nodes_cache
2002 self.slots_cache = tmp_slots
2003 # Set the default node
2004 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
2005 if self._dynamic_startup_nodes:
2006 # Populate the startup nodes with all discovered nodes
2007 self.startup_nodes = tmp_nodes_cache
2008 # If initialize was called after a MovedError, clear it
2009 self._moved_exception = None
2011 def close(self) -> None:
2012 self.default_node = None
2013 for node in self.nodes_cache.values():
2014 if node.redis_connection:
2015 node.redis_connection.close()
2017 def reset(self):
2018 try:
2019 self.read_load_balancer.reset()
2020 except TypeError:
2021 # The read_load_balancer is None, do nothing
2022 pass
2024 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
2025 """
2026 Remap the host and port returned from the cluster to a different
2027 internal value. Useful if the client is not connecting directly
2028 to the cluster.
2029 """
2030 if self.address_remap:
2031 return self.address_remap((host, port))
2032 return host, port
2034 def find_connection_owner(self, connection: Connection) -> Optional[Redis]:
2035 node_name = get_node_name(connection.host, connection.port)
2036 for node in tuple(self.nodes_cache.values()):
2037 if node.redis_connection:
2038 conn_args = node.redis_connection.connection_pool.connection_kwargs
2039 if node_name == get_node_name(
2040 conn_args.get("host"), conn_args.get("port")
2041 ):
2042 return node
2045class ClusterPubSub(PubSub):
2046 """
2047 Wrapper for PubSub class.
2049 IMPORTANT: before using ClusterPubSub, read about the known limitations
2050 with pubsub in Cluster mode and learn how to workaround them:
2051 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
2052 """
2054 def __init__(
2055 self,
2056 redis_cluster,
2057 node=None,
2058 host=None,
2059 port=None,
2060 push_handler_func=None,
2061 event_dispatcher: Optional["EventDispatcher"] = None,
2062 **kwargs,
2063 ):
2064 """
2065 When a pubsub instance is created without specifying a node, a single
2066 node will be transparently chosen for the pubsub connection on the
2067 first command execution. The node will be determined by:
2068 1. Hashing the channel name in the request to find its keyslot
2069 2. Selecting a node that handles the keyslot: If read_from_replicas is
2070 set to true or load_balancing_strategy is set, a replica can be selected.
2072 :type redis_cluster: RedisCluster
2073 :type node: ClusterNode
2074 :type host: str
2075 :type port: int
2076 """
2077 self.node = None
2078 self.set_pubsub_node(redis_cluster, node, host, port)
2079 connection_pool = (
2080 None
2081 if self.node is None
2082 else redis_cluster.get_redis_connection(self.node).connection_pool
2083 )
2084 self.cluster = redis_cluster
2085 self.node_pubsub_mapping = {}
2086 self._pubsubs_generator = self._pubsubs_generator()
2087 if event_dispatcher is None:
2088 self._event_dispatcher = EventDispatcher()
2089 else:
2090 self._event_dispatcher = event_dispatcher
2091 super().__init__(
2092 connection_pool=connection_pool,
2093 encoder=redis_cluster.encoder,
2094 push_handler_func=push_handler_func,
2095 event_dispatcher=self._event_dispatcher,
2096 **kwargs,
2097 )
2099 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
2100 """
2101 The pubsub node will be set according to the passed node, host and port
2102 When none of the node, host, or port are specified - the node is set
2103 to None and will be determined by the keyslot of the channel in the
2104 first command to be executed.
2105 RedisClusterException will be thrown if the passed node does not exist
2106 in the cluster.
2107 If host is passed without port, or vice versa, a DataError will be
2108 thrown.
2109 :type cluster: RedisCluster
2110 :type node: ClusterNode
2111 :type host: str
2112 :type port: int
2113 """
2114 if node is not None:
2115 # node is passed by the user
2116 self._raise_on_invalid_node(cluster, node, node.host, node.port)
2117 pubsub_node = node
2118 elif host is not None and port is not None:
2119 # host and port passed by the user
2120 node = cluster.get_node(host=host, port=port)
2121 self._raise_on_invalid_node(cluster, node, host, port)
2122 pubsub_node = node
2123 elif any([host, port]) is True:
2124 # only 'host' or 'port' passed
2125 raise DataError("Passing a host requires passing a port, and vice versa")
2126 else:
2127 # nothing passed by the user. set node to None
2128 pubsub_node = None
2130 self.node = pubsub_node
2132 def get_pubsub_node(self):
2133 """
2134 Get the node that is being used as the pubsub connection
2135 """
2136 return self.node
2138 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
2139 """
2140 Raise a RedisClusterException if the node is None or doesn't exist in
2141 the cluster.
2142 """
2143 if node is None or redis_cluster.get_node(node_name=node.name) is None:
2144 raise RedisClusterException(
2145 f"Node {host}:{port} doesn't exist in the cluster"
2146 )
2148 def execute_command(self, *args):
2149 """
2150 Execute a subscribe/unsubscribe command.
2152 Taken code from redis-py and tweak to make it work within a cluster.
2153 """
2154 # NOTE: don't parse the response in this function -- it could pull a
2155 # legitimate message off the stack if the connection is already
2156 # subscribed to one or more channels
2158 if self.connection is None:
2159 if self.connection_pool is None:
2160 if len(args) > 1:
2161 # Hash the first channel and get one of the nodes holding
2162 # this slot
2163 channel = args[1]
2164 slot = self.cluster.keyslot(channel)
2165 node = self.cluster.nodes_manager.get_node_from_slot(
2166 slot,
2167 self.cluster.read_from_replicas,
2168 self.cluster.load_balancing_strategy,
2169 )
2170 else:
2171 # Get a random node
2172 node = self.cluster.get_random_node()
2173 self.node = node
2174 redis_connection = self.cluster.get_redis_connection(node)
2175 self.connection_pool = redis_connection.connection_pool
2176 self.connection = self.connection_pool.get_connection()
2177 # register a callback that re-subscribes to any channels we
2178 # were listening to when we were disconnected
2179 self.connection.register_connect_callback(self.on_connect)
2180 if self.push_handler_func is not None:
2181 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
2182 self._event_dispatcher.dispatch(
2183 AfterPubSubConnectionInstantiationEvent(
2184 self.connection, self.connection_pool, ClientType.SYNC, self._lock
2185 )
2186 )
2187 connection = self.connection
2188 self._execute(connection, connection.send_command, *args)
2190 def _get_node_pubsub(self, node):
2191 try:
2192 return self.node_pubsub_mapping[node.name]
2193 except KeyError:
2194 pubsub = node.redis_connection.pubsub(
2195 push_handler_func=self.push_handler_func
2196 )
2197 self.node_pubsub_mapping[node.name] = pubsub
2198 return pubsub
2200 def _sharded_message_generator(self):
2201 for _ in range(len(self.node_pubsub_mapping)):
2202 pubsub = next(self._pubsubs_generator)
2203 message = pubsub.get_message()
2204 if message is not None:
2205 return message
2206 return None
2208 def _pubsubs_generator(self):
2209 while True:
2210 yield from self.node_pubsub_mapping.values()
2212 def get_sharded_message(
2213 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
2214 ):
2215 if target_node:
2216 message = self.node_pubsub_mapping[target_node.name].get_message(
2217 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
2218 )
2219 else:
2220 message = self._sharded_message_generator()
2221 if message is None:
2222 return None
2223 elif str_if_bytes(message["type"]) == "sunsubscribe":
2224 if message["channel"] in self.pending_unsubscribe_shard_channels:
2225 self.pending_unsubscribe_shard_channels.remove(message["channel"])
2226 self.shard_channels.pop(message["channel"], None)
2227 node = self.cluster.get_node_from_key(message["channel"])
2228 if self.node_pubsub_mapping[node.name].subscribed is False:
2229 self.node_pubsub_mapping.pop(node.name)
2230 if not self.channels and not self.patterns and not self.shard_channels:
2231 # There are no subscriptions anymore, set subscribed_event flag
2232 # to false
2233 self.subscribed_event.clear()
2234 if self.ignore_subscribe_messages or ignore_subscribe_messages:
2235 return None
2236 return message
2238 def ssubscribe(self, *args, **kwargs):
2239 if args:
2240 args = list_or_args(args[0], args[1:])
2241 s_channels = dict.fromkeys(args)
2242 s_channels.update(kwargs)
2243 for s_channel, handler in s_channels.items():
2244 node = self.cluster.get_node_from_key(s_channel)
2245 pubsub = self._get_node_pubsub(node)
2246 if handler:
2247 pubsub.ssubscribe(**{s_channel: handler})
2248 else:
2249 pubsub.ssubscribe(s_channel)
2250 self.shard_channels.update(pubsub.shard_channels)
2251 self.pending_unsubscribe_shard_channels.difference_update(
2252 self._normalize_keys({s_channel: None})
2253 )
2254 if pubsub.subscribed and not self.subscribed:
2255 self.subscribed_event.set()
2256 self.health_check_response_counter = 0
2258 def sunsubscribe(self, *args):
2259 if args:
2260 args = list_or_args(args[0], args[1:])
2261 else:
2262 args = self.shard_channels
2264 for s_channel in args:
2265 node = self.cluster.get_node_from_key(s_channel)
2266 p = self._get_node_pubsub(node)
2267 p.sunsubscribe(s_channel)
2268 self.pending_unsubscribe_shard_channels.update(
2269 p.pending_unsubscribe_shard_channels
2270 )
2272 def get_redis_connection(self):
2273 """
2274 Get the Redis connection of the pubsub connected node.
2275 """
2276 if self.node is not None:
2277 return self.node.redis_connection
2279 def disconnect(self):
2280 """
2281 Disconnect the pubsub connection.
2282 """
2283 if self.connection:
2284 self.connection.disconnect()
2285 for pubsub in self.node_pubsub_mapping.values():
2286 pubsub.connection.disconnect()
2289class ClusterPipeline(RedisCluster):
2290 """
2291 Support for Redis pipeline
2292 in cluster mode
2293 """
2295 ERRORS_ALLOW_RETRY = (
2296 ConnectionError,
2297 TimeoutError,
2298 MovedError,
2299 AskError,
2300 TryAgainError,
2301 )
2303 NO_SLOTS_COMMANDS = {"UNWATCH"}
2304 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
2305 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
2307 @deprecated_args(
2308 args_to_warn=[
2309 "cluster_error_retry_attempts",
2310 ],
2311 reason="Please configure the 'retry' object instead",
2312 version="6.0.0",
2313 )
2314 def __init__(
2315 self,
2316 nodes_manager: "NodesManager",
2317 commands_parser: "CommandsParser",
2318 result_callbacks: Optional[Dict[str, Callable]] = None,
2319 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
2320 startup_nodes: Optional[List["ClusterNode"]] = None,
2321 read_from_replicas: bool = False,
2322 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2323 cluster_error_retry_attempts: int = 3,
2324 reinitialize_steps: int = 5,
2325 retry: Optional[Retry] = None,
2326 lock=None,
2327 transaction=False,
2328 policy_resolver: PolicyResolver = StaticPolicyResolver(),
2329 **kwargs,
2330 ):
2331 """ """
2332 self.command_stack = []
2333 self.nodes_manager = nodes_manager
2334 self.commands_parser = commands_parser
2335 self.refresh_table_asap = False
2336 self.result_callbacks = (
2337 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
2338 )
2339 self.startup_nodes = startup_nodes if startup_nodes else []
2340 self.read_from_replicas = read_from_replicas
2341 self.load_balancing_strategy = load_balancing_strategy
2342 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
2343 self.cluster_response_callbacks = cluster_response_callbacks
2344 self.reinitialize_counter = 0
2345 self.reinitialize_steps = reinitialize_steps
2346 if retry is not None:
2347 self.retry = retry
2348 else:
2349 self.retry = Retry(
2350 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
2351 retries=cluster_error_retry_attempts,
2352 )
2354 self.encoder = Encoder(
2355 kwargs.get("encoding", "utf-8"),
2356 kwargs.get("encoding_errors", "strict"),
2357 kwargs.get("decode_responses", False),
2358 )
2359 if lock is None:
2360 lock = threading.RLock()
2361 self._lock = lock
2362 self.parent_execute_command = super().execute_command
2363 self._execution_strategy: ExecutionStrategy = (
2364 PipelineStrategy(self) if not transaction else TransactionStrategy(self)
2365 )
2367 # For backward compatibility, mapping from existing policies to new one
2368 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
2369 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
2370 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
2371 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
2372 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
2373 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
2374 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
2375 }
2377 self._policies_callback_mapping: dict[
2378 Union[RequestPolicy, ResponsePolicy], Callable
2379 ] = {
2380 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
2381 self.get_random_primary_or_all_nodes(command_name)
2382 ],
2383 RequestPolicy.DEFAULT_KEYED: lambda command,
2384 *args: self.get_nodes_from_slot(command, *args),
2385 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
2386 RequestPolicy.ALL_SHARDS: self.get_primaries,
2387 RequestPolicy.ALL_NODES: self.get_nodes,
2388 RequestPolicy.ALL_REPLICAS: self.get_replicas,
2389 RequestPolicy.MULTI_SHARD: lambda *args,
2390 **kwargs: self._split_multi_shard_command(*args, **kwargs),
2391 RequestPolicy.SPECIAL: self.get_special_nodes,
2392 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
2393 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
2394 }
2396 self._policy_resolver = policy_resolver
2398 def __repr__(self):
2399 """ """
2400 return f"{type(self).__name__}"
2402 def __enter__(self):
2403 """ """
2404 return self
2406 def __exit__(self, exc_type, exc_value, traceback):
2407 """ """
2408 self.reset()
2410 def __del__(self):
2411 try:
2412 self.reset()
2413 except Exception:
2414 pass
2416 def __len__(self):
2417 """ """
2418 return len(self._execution_strategy.command_queue)
2420 def __bool__(self):
2421 "Pipeline instances should always evaluate to True on Python 3+"
2422 return True
2424 def execute_command(self, *args, **kwargs):
2425 """
2426 Wrapper function for pipeline_execute_command
2427 """
2428 return self._execution_strategy.execute_command(*args, **kwargs)
2430 def pipeline_execute_command(self, *args, **options):
2431 """
2432 Stage a command to be executed when execute() is next called
2434 Returns the current Pipeline object back so commands can be
2435 chained together, such as:
2437 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
2439 At some other point, you can then run: pipe.execute(),
2440 which will execute all commands queued in the pipe.
2441 """
2442 return self._execution_strategy.execute_command(*args, **options)
2444 def annotate_exception(self, exception, number, command):
2445 """
2446 Provides extra context to the exception prior to it being handled
2447 """
2448 self._execution_strategy.annotate_exception(exception, number, command)
2450 def execute(self, raise_on_error: bool = True) -> List[Any]:
2451 """
2452 Execute all the commands in the current pipeline
2453 """
2455 try:
2456 return self._execution_strategy.execute(raise_on_error)
2457 finally:
2458 self.reset()
2460 def reset(self):
2461 """
2462 Reset back to empty pipeline.
2463 """
2464 self._execution_strategy.reset()
2466 def send_cluster_commands(
2467 self, stack, raise_on_error=True, allow_redirections=True
2468 ):
2469 return self._execution_strategy.send_cluster_commands(
2470 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
2471 )
2473 def exists(self, *keys):
2474 return self._execution_strategy.exists(*keys)
2476 def eval(self):
2477 """ """
2478 return self._execution_strategy.eval()
2480 def multi(self):
2481 """
2482 Start a transactional block of the pipeline after WATCH commands
2483 are issued. End the transactional block with `execute`.
2484 """
2485 self._execution_strategy.multi()
2487 def load_scripts(self):
2488 """ """
2489 self._execution_strategy.load_scripts()
2491 def discard(self):
2492 """ """
2493 self._execution_strategy.discard()
2495 def watch(self, *names):
2496 """Watches the values at keys ``names``"""
2497 self._execution_strategy.watch(*names)
2499 def unwatch(self):
2500 """Unwatches all previously specified keys"""
2501 self._execution_strategy.unwatch()
2503 def script_load_for_pipeline(self, *args, **kwargs):
2504 self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
2506 def delete(self, *names):
2507 self._execution_strategy.delete(*names)
2509 def unlink(self, *names):
2510 self._execution_strategy.unlink(*names)
2513def block_pipeline_command(name: str) -> Callable[..., Any]:
2514 """
2515 Prints error because some pipelined commands should
2516 be blocked when running in cluster-mode
2517 """
2519 def inner(*args, **kwargs):
2520 raise RedisClusterException(
2521 f"ERROR: Calling pipelined function {name} is blocked "
2522 f"when running redis in cluster mode..."
2523 )
2525 return inner
2528# Blocked pipeline commands
2529PIPELINE_BLOCKED_COMMANDS = (
2530 "BGREWRITEAOF",
2531 "BGSAVE",
2532 "BITOP",
2533 "BRPOPLPUSH",
2534 "CLIENT GETNAME",
2535 "CLIENT KILL",
2536 "CLIENT LIST",
2537 "CLIENT SETNAME",
2538 "CLIENT",
2539 "CONFIG GET",
2540 "CONFIG RESETSTAT",
2541 "CONFIG REWRITE",
2542 "CONFIG SET",
2543 "CONFIG",
2544 "DBSIZE",
2545 "ECHO",
2546 "EVALSHA",
2547 "FLUSHALL",
2548 "FLUSHDB",
2549 "INFO",
2550 "KEYS",
2551 "LASTSAVE",
2552 "MGET",
2553 "MGET NONATOMIC",
2554 "MOVE",
2555 "MSET",
2556 "MSETEX",
2557 "MSET NONATOMIC",
2558 "MSETNX",
2559 "PFCOUNT",
2560 "PFMERGE",
2561 "PING",
2562 "PUBLISH",
2563 "RANDOMKEY",
2564 "READONLY",
2565 "READWRITE",
2566 "RENAME",
2567 "RENAMENX",
2568 "RPOPLPUSH",
2569 "SAVE",
2570 "SCAN",
2571 "SCRIPT EXISTS",
2572 "SCRIPT FLUSH",
2573 "SCRIPT KILL",
2574 "SCRIPT LOAD",
2575 "SCRIPT",
2576 "SDIFF",
2577 "SDIFFSTORE",
2578 "SENTINEL GET MASTER ADDR BY NAME",
2579 "SENTINEL MASTER",
2580 "SENTINEL MASTERS",
2581 "SENTINEL MONITOR",
2582 "SENTINEL REMOVE",
2583 "SENTINEL SENTINELS",
2584 "SENTINEL SET",
2585 "SENTINEL SLAVES",
2586 "SENTINEL",
2587 "SHUTDOWN",
2588 "SINTER",
2589 "SINTERSTORE",
2590 "SLAVEOF",
2591 "SLOWLOG GET",
2592 "SLOWLOG LEN",
2593 "SLOWLOG RESET",
2594 "SLOWLOG",
2595 "SMOVE",
2596 "SORT",
2597 "SUNION",
2598 "SUNIONSTORE",
2599 "TIME",
2600)
2601for command in PIPELINE_BLOCKED_COMMANDS:
2602 command = command.replace(" ", "_").lower()
2604 setattr(ClusterPipeline, command, block_pipeline_command(command))
2607class PipelineCommand:
2608 """ """
2610 def __init__(self, args, options=None, position=None):
2611 self.args = args
2612 if options is None:
2613 options = {}
2614 self.options = options
2615 self.position = position
2616 self.result = None
2617 self.node = None
2618 self.asking = False
2619 self.command_policies: Optional[CommandPolicies] = None
2622class NodeCommands:
2623 """ """
2625 def __init__(self, parse_response, connection_pool, connection):
2626 """ """
2627 self.parse_response = parse_response
2628 self.connection_pool = connection_pool
2629 self.connection = connection
2630 self.commands = []
2632 def append(self, c):
2633 """ """
2634 self.commands.append(c)
2636 def write(self):
2637 """
2638 Code borrowed from Redis so it can be fixed
2639 """
2640 connection = self.connection
2641 commands = self.commands
2643 # We are going to clobber the commands with the write, so go ahead
2644 # and ensure that nothing is sitting there from a previous run.
2645 for c in commands:
2646 c.result = None
2648 # build up all commands into a single request to increase network perf
2649 # send all the commands and catch connection and timeout errors.
2650 try:
2651 connection.send_packed_command(
2652 connection.pack_commands([c.args for c in commands])
2653 )
2654 except (ConnectionError, TimeoutError) as e:
2655 for c in commands:
2656 c.result = e
2658 def read(self):
2659 """ """
2660 connection = self.connection
2661 for c in self.commands:
2662 # if there is a result on this command,
2663 # it means we ran into an exception
2664 # like a connection error. Trying to parse
2665 # a response on a connection that
2666 # is no longer open will result in a
2667 # connection error raised by redis-py.
2668 # but redis-py doesn't check in parse_response
2669 # that the sock object is
2670 # still set and if you try to
2671 # read from a closed connection, it will
2672 # result in an AttributeError because
2673 # it will do a readline() call on None.
2674 # This can have all kinds of nasty side-effects.
2675 # Treating this case as a connection error
2676 # is fine because it will dump
2677 # the connection object back into the
2678 # pool and on the next write, it will
2679 # explicitly open the connection and all will be well.
2680 if c.result is None:
2681 try:
2682 c.result = self.parse_response(connection, c.args[0], **c.options)
2683 except (ConnectionError, TimeoutError) as e:
2684 for c in self.commands:
2685 c.result = e
2686 return
2687 except RedisError:
2688 c.result = sys.exc_info()[1]
2691class ExecutionStrategy(ABC):
2692 @property
2693 @abstractmethod
2694 def command_queue(self):
2695 pass
2697 @abstractmethod
2698 def execute_command(self, *args, **kwargs):
2699 """
2700 Execution flow for current execution strategy.
2702 See: ClusterPipeline.execute_command()
2703 """
2704 pass
2706 @abstractmethod
2707 def annotate_exception(self, exception, number, command):
2708 """
2709 Annotate exception according to current execution strategy.
2711 See: ClusterPipeline.annotate_exception()
2712 """
2713 pass
2715 @abstractmethod
2716 def pipeline_execute_command(self, *args, **options):
2717 """
2718 Pipeline execution flow for current execution strategy.
2720 See: ClusterPipeline.pipeline_execute_command()
2721 """
2722 pass
2724 @abstractmethod
2725 def execute(self, raise_on_error: bool = True) -> List[Any]:
2726 """
2727 Executes current execution strategy.
2729 See: ClusterPipeline.execute()
2730 """
2731 pass
2733 @abstractmethod
2734 def send_cluster_commands(
2735 self, stack, raise_on_error=True, allow_redirections=True
2736 ):
2737 """
2738 Sends commands according to current execution strategy.
2740 See: ClusterPipeline.send_cluster_commands()
2741 """
2742 pass
2744 @abstractmethod
2745 def reset(self):
2746 """
2747 Resets current execution strategy.
2749 See: ClusterPipeline.reset()
2750 """
2751 pass
2753 @abstractmethod
2754 def exists(self, *keys):
2755 pass
2757 @abstractmethod
2758 def eval(self):
2759 pass
2761 @abstractmethod
2762 def multi(self):
2763 """
2764 Starts transactional context.
2766 See: ClusterPipeline.multi()
2767 """
2768 pass
2770 @abstractmethod
2771 def load_scripts(self):
2772 pass
2774 @abstractmethod
2775 def watch(self, *names):
2776 pass
2778 @abstractmethod
2779 def unwatch(self):
2780 """
2781 Unwatches all previously specified keys
2783 See: ClusterPipeline.unwatch()
2784 """
2785 pass
2787 @abstractmethod
2788 def script_load_for_pipeline(self, *args, **kwargs):
2789 pass
2791 @abstractmethod
2792 def delete(self, *names):
2793 """
2794 "Delete a key specified by ``names``"
2796 See: ClusterPipeline.delete()
2797 """
2798 pass
2800 @abstractmethod
2801 def unlink(self, *names):
2802 """
2803 "Unlink a key specified by ``names``"
2805 See: ClusterPipeline.unlink()
2806 """
2807 pass
2809 @abstractmethod
2810 def discard(self):
2811 pass
2814class AbstractStrategy(ExecutionStrategy):
2815 def __init__(
2816 self,
2817 pipe: ClusterPipeline,
2818 ):
2819 self._command_queue: List[PipelineCommand] = []
2820 self._pipe = pipe
2821 self._nodes_manager = self._pipe.nodes_manager
2823 @property
2824 def command_queue(self):
2825 return self._command_queue
2827 @command_queue.setter
2828 def command_queue(self, queue: List[PipelineCommand]):
2829 self._command_queue = queue
2831 @abstractmethod
2832 def execute_command(self, *args, **kwargs):
2833 pass
2835 def pipeline_execute_command(self, *args, **options):
2836 self._command_queue.append(
2837 PipelineCommand(args, options, len(self._command_queue))
2838 )
2839 return self._pipe
2841 @abstractmethod
2842 def execute(self, raise_on_error: bool = True) -> List[Any]:
2843 pass
2845 @abstractmethod
2846 def send_cluster_commands(
2847 self, stack, raise_on_error=True, allow_redirections=True
2848 ):
2849 pass
2851 @abstractmethod
2852 def reset(self):
2853 pass
2855 def exists(self, *keys):
2856 return self.execute_command("EXISTS", *keys)
2858 def eval(self):
2859 """ """
2860 raise RedisClusterException("method eval() is not implemented")
2862 def load_scripts(self):
2863 """ """
2864 raise RedisClusterException("method load_scripts() is not implemented")
2866 def script_load_for_pipeline(self, *args, **kwargs):
2867 """ """
2868 raise RedisClusterException(
2869 "method script_load_for_pipeline() is not implemented"
2870 )
2872 def annotate_exception(self, exception, number, command):
2873 """
2874 Provides extra context to the exception prior to it being handled
2875 """
2876 cmd = " ".join(map(safe_str, command))
2877 msg = (
2878 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
2879 f"caused error: {exception.args[0]}"
2880 )
2881 exception.args = (msg,) + exception.args[1:]
2884class PipelineStrategy(AbstractStrategy):
2885 def __init__(self, pipe: ClusterPipeline):
2886 super().__init__(pipe)
2887 self.command_flags = pipe.command_flags
2889 def execute_command(self, *args, **kwargs):
2890 return self.pipeline_execute_command(*args, **kwargs)
2892 def _raise_first_error(self, stack):
2893 """
2894 Raise the first exception on the stack
2895 """
2896 for c in stack:
2897 r = c.result
2898 if isinstance(r, Exception):
2899 self.annotate_exception(r, c.position + 1, c.args)
2900 raise r
2902 def execute(self, raise_on_error: bool = True) -> List[Any]:
2903 stack = self._command_queue
2904 if not stack:
2905 return []
2907 try:
2908 return self.send_cluster_commands(stack, raise_on_error)
2909 finally:
2910 self.reset()
2912 def reset(self):
2913 """
2914 Reset back to empty pipeline.
2915 """
2916 self._command_queue = []
2918 def send_cluster_commands(
2919 self, stack, raise_on_error=True, allow_redirections=True
2920 ):
2921 """
2922 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
2924 If one of the retryable exceptions has been thrown we assume that:
2925 - connection_pool was disconnected
2926 - connection_pool was reset
2927 - refresh_table_asap set to True
2929 It will try the number of times specified by
2930 the retries in config option "self.retry"
2931 which defaults to 3 unless manually configured.
2933 If it reaches the number of times, the command will
2934 raises ClusterDownException.
2935 """
2936 if not stack:
2937 return []
2938 retry_attempts = self._pipe.retry.get_retries()
2939 while True:
2940 try:
2941 return self._send_cluster_commands(
2942 stack,
2943 raise_on_error=raise_on_error,
2944 allow_redirections=allow_redirections,
2945 )
2946 except RedisCluster.ERRORS_ALLOW_RETRY as e:
2947 if retry_attempts > 0:
2948 # Try again with the new cluster setup. All other errors
2949 # should be raised.
2950 retry_attempts -= 1
2951 pass
2952 else:
2953 raise e
2955 def _send_cluster_commands(
2956 self, stack, raise_on_error=True, allow_redirections=True
2957 ):
2958 """
2959 Send a bunch of cluster commands to the redis cluster.
2961 `allow_redirections` If the pipeline should follow
2962 `ASK` & `MOVED` responses automatically. If set
2963 to false it will raise RedisClusterException.
2964 """
2965 # the first time sending the commands we send all of
2966 # the commands that were queued up.
2967 # if we have to run through it again, we only retry
2968 # the commands that failed.
2969 attempt = sorted(stack, key=lambda x: x.position)
2970 is_default_node = False
2971 # build a list of node objects based on node names we need to
2972 nodes = {}
2974 # as we move through each command that still needs to be processed,
2975 # we figure out the slot number that command maps to, then from
2976 # the slot determine the node.
2977 for c in attempt:
2978 command_policies = self._pipe._policy_resolver.resolve(c.args[0].lower())
2980 while True:
2981 # refer to our internal node -> slot table that
2982 # tells us where a given command should route to.
2983 # (it might be possible we have a cached node that no longer
2984 # exists in the cluster, which is why we do this in a loop)
2985 passed_targets = c.options.pop("target_nodes", None)
2986 if passed_targets and not self._is_nodes_flag(passed_targets):
2987 target_nodes = self._parse_target_nodes(passed_targets)
2989 if not command_policies:
2990 command_policies = CommandPolicies()
2991 else:
2992 if not command_policies:
2993 command = c.args[0].upper()
2994 if (
2995 len(c.args) >= 2
2996 and f"{c.args[0]} {c.args[1]}".upper()
2997 in self._pipe.command_flags
2998 ):
2999 command = f"{c.args[0]} {c.args[1]}".upper()
3001 # We only could resolve key properties if command is not
3002 # in a list of pre-defined request policies
3003 command_flag = self.command_flags.get(command)
3004 if not command_flag:
3005 # Fallback to default policy
3006 if not self._pipe.get_default_node():
3007 keys = None
3008 else:
3009 keys = self._pipe._get_command_keys(*c.args)
3010 if not keys or len(keys) == 0:
3011 command_policies = CommandPolicies()
3012 else:
3013 command_policies = CommandPolicies(
3014 request_policy=RequestPolicy.DEFAULT_KEYED,
3015 response_policy=ResponsePolicy.DEFAULT_KEYED,
3016 )
3017 else:
3018 if command_flag in self._pipe._command_flags_mapping:
3019 command_policies = CommandPolicies(
3020 request_policy=self._pipe._command_flags_mapping[
3021 command_flag
3022 ]
3023 )
3024 else:
3025 command_policies = CommandPolicies()
3027 target_nodes = self._determine_nodes(
3028 *c.args,
3029 request_policy=command_policies.request_policy,
3030 node_flag=passed_targets,
3031 )
3032 if not target_nodes:
3033 raise RedisClusterException(
3034 f"No targets were found to execute {c.args} command on"
3035 )
3036 c.command_policies = command_policies
3037 if len(target_nodes) > 1:
3038 raise RedisClusterException(
3039 f"Too many targets for command {c.args}"
3040 )
3042 node = target_nodes[0]
3043 if node == self._pipe.get_default_node():
3044 is_default_node = True
3046 # now that we know the name of the node
3047 # ( it's just a string in the form of host:port )
3048 # we can build a list of commands for each node.
3049 node_name = node.name
3050 if node_name not in nodes:
3051 redis_node = self._pipe.get_redis_connection(node)
3052 try:
3053 connection = get_connection(redis_node)
3054 except (ConnectionError, TimeoutError):
3055 for n in nodes.values():
3056 n.connection_pool.release(n.connection)
3057 # Connection retries are being handled in the node's
3058 # Retry object. Reinitialize the node -> slot table.
3059 self._nodes_manager.initialize()
3060 if is_default_node:
3061 self._pipe.replace_default_node()
3062 raise
3063 nodes[node_name] = NodeCommands(
3064 redis_node.parse_response,
3065 redis_node.connection_pool,
3066 connection,
3067 )
3068 nodes[node_name].append(c)
3069 break
3071 # send the commands in sequence.
3072 # we write to all the open sockets for each node first,
3073 # before reading anything
3074 # this allows us to flush all the requests out across the
3075 # network
3076 # so that we can read them from different sockets as they come back.
3077 # we dont' multiplex on the sockets as they come available,
3078 # but that shouldn't make too much difference.
3079 try:
3080 node_commands = nodes.values()
3081 for n in node_commands:
3082 n.write()
3084 for n in node_commands:
3085 n.read()
3086 finally:
3087 # release all of the redis connections we allocated earlier
3088 # back into the connection pool.
3089 # we used to do this step as part of a try/finally block,
3090 # but it is really dangerous to
3091 # release connections back into the pool if for some
3092 # reason the socket has data still left in it
3093 # from a previous operation. The write and
3094 # read operations already have try/catch around them for
3095 # all known types of errors including connection
3096 # and socket level errors.
3097 # So if we hit an exception, something really bad
3098 # happened and putting any oF
3099 # these connections back into the pool is a very bad idea.
3100 # the socket might have unread buffer still sitting in it,
3101 # and then the next time we read from it we pass the
3102 # buffered result back from a previous command and
3103 # every single request after to that connection will always get
3104 # a mismatched result.
3105 for n in nodes.values():
3106 n.connection_pool.release(n.connection)
3108 # if the response isn't an exception it is a
3109 # valid response from the node
3110 # we're all done with that command, YAY!
3111 # if we have more commands to attempt, we've run into problems.
3112 # collect all the commands we are allowed to retry.
3113 # (MOVED, ASK, or connection errors or timeout errors)
3114 attempt = sorted(
3115 (
3116 c
3117 for c in attempt
3118 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
3119 ),
3120 key=lambda x: x.position,
3121 )
3122 if attempt and allow_redirections:
3123 # RETRY MAGIC HAPPENS HERE!
3124 # send these remaining commands one at a time using `execute_command`
3125 # in the main client. This keeps our retry logic
3126 # in one place mostly,
3127 # and allows us to be more confident in correctness of behavior.
3128 # at this point any speed gains from pipelining have been lost
3129 # anyway, so we might as well make the best
3130 # attempt to get the correct behavior.
3131 #
3132 # The client command will handle retries for each
3133 # individual command sequentially as we pass each
3134 # one into `execute_command`. Any exceptions
3135 # that bubble out should only appear once all
3136 # retries have been exhausted.
3137 #
3138 # If a lot of commands have failed, we'll be setting the
3139 # flag to rebuild the slots table from scratch.
3140 # So MOVED errors should correct themselves fairly quickly.
3141 self._pipe.reinitialize_counter += 1
3142 if self._pipe._should_reinitialized():
3143 self._nodes_manager.initialize()
3144 if is_default_node:
3145 self._pipe.replace_default_node()
3146 for c in attempt:
3147 try:
3148 # send each command individually like we
3149 # do in the main client.
3150 c.result = self._pipe.parent_execute_command(*c.args, **c.options)
3151 except RedisError as e:
3152 c.result = e
3154 # turn the response back into a simple flat array that corresponds
3155 # to the sequence of commands issued in the stack in pipeline.execute()
3156 response = []
3157 for c in sorted(stack, key=lambda x: x.position):
3158 if c.args[0] in self._pipe.cluster_response_callbacks:
3159 # Remove keys entry, it needs only for cache.
3160 c.options.pop("keys", None)
3161 c.result = self._pipe._policies_callback_mapping[
3162 c.command_policies.response_policy
3163 ](
3164 self._pipe.cluster_response_callbacks[c.args[0]](
3165 c.result, **c.options
3166 )
3167 )
3168 response.append(c.result)
3170 if raise_on_error:
3171 self._raise_first_error(stack)
3173 return response
3175 def _is_nodes_flag(self, target_nodes):
3176 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
3178 def _parse_target_nodes(self, target_nodes):
3179 if isinstance(target_nodes, list):
3180 nodes = target_nodes
3181 elif isinstance(target_nodes, ClusterNode):
3182 # Supports passing a single ClusterNode as a variable
3183 nodes = [target_nodes]
3184 elif isinstance(target_nodes, dict):
3185 # Supports dictionaries of the format {node_name: node}.
3186 # It enables to execute commands with multi nodes as follows:
3187 # rc.cluster_save_config(rc.get_primaries())
3188 nodes = target_nodes.values()
3189 else:
3190 raise TypeError(
3191 "target_nodes type can be one of the following: "
3192 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
3193 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
3194 f"The passed type is {type(target_nodes)}"
3195 )
3196 return nodes
3198 def _determine_nodes(
3199 self, *args, request_policy: RequestPolicy, **kwargs
3200 ) -> List["ClusterNode"]:
3201 # Determine which nodes should be executed the command on.
3202 # Returns a list of target nodes.
3203 command = args[0].upper()
3204 if (
3205 len(args) >= 2
3206 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
3207 ):
3208 command = f"{args[0]} {args[1]}".upper()
3210 nodes_flag = kwargs.pop("nodes_flag", None)
3211 if nodes_flag is not None:
3212 # nodes flag passed by the user
3213 command_flag = nodes_flag
3214 else:
3215 # get the nodes group for this command if it was predefined
3216 command_flag = self._pipe.command_flags.get(command)
3218 if command_flag in self._pipe._command_flags_mapping:
3219 request_policy = self._pipe._command_flags_mapping[command_flag]
3221 policy_callback = self._pipe._policies_callback_mapping[request_policy]
3223 if request_policy == RequestPolicy.DEFAULT_KEYED:
3224 nodes = policy_callback(command, *args)
3225 elif request_policy == RequestPolicy.MULTI_SHARD:
3226 nodes = policy_callback(*args, **kwargs)
3227 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
3228 nodes = policy_callback(args[0])
3229 else:
3230 nodes = policy_callback()
3232 if args[0].lower() == "ft.aggregate":
3233 self._aggregate_nodes = nodes
3235 return nodes
3237 def multi(self):
3238 raise RedisClusterException(
3239 "method multi() is not supported outside of transactional context"
3240 )
3242 def discard(self):
3243 raise RedisClusterException(
3244 "method discard() is not supported outside of transactional context"
3245 )
3247 def watch(self, *names):
3248 raise RedisClusterException(
3249 "method watch() is not supported outside of transactional context"
3250 )
3252 def unwatch(self, *names):
3253 raise RedisClusterException(
3254 "method unwatch() is not supported outside of transactional context"
3255 )
3257 def delete(self, *names):
3258 if len(names) != 1:
3259 raise RedisClusterException(
3260 "deleting multiple keys is not implemented in pipeline command"
3261 )
3263 return self.execute_command("DEL", names[0])
3265 def unlink(self, *names):
3266 if len(names) != 1:
3267 raise RedisClusterException(
3268 "unlinking multiple keys is not implemented in pipeline command"
3269 )
3271 return self.execute_command("UNLINK", names[0])
3274class TransactionStrategy(AbstractStrategy):
3275 NO_SLOTS_COMMANDS = {"UNWATCH"}
3276 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
3277 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3278 SLOT_REDIRECT_ERRORS = (AskError, MovedError)
3279 CONNECTION_ERRORS = (
3280 ConnectionError,
3281 OSError,
3282 ClusterDownError,
3283 SlotNotCoveredError,
3284 )
3286 def __init__(self, pipe: ClusterPipeline):
3287 super().__init__(pipe)
3288 self._explicit_transaction = False
3289 self._watching = False
3290 self._pipeline_slots: Set[int] = set()
3291 self._transaction_connection: Optional[Connection] = None
3292 self._executing = False
3293 self._retry = copy(self._pipe.retry)
3294 self._retry.update_supported_errors(
3295 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
3296 )
3298 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
3299 """
3300 Find a connection for a pipeline transaction.
3302 For running an atomic transaction, watch keys ensure that contents have not been
3303 altered as long as the watch commands for those keys were sent over the same
3304 connection. So once we start watching a key, we fetch a connection to the
3305 node that owns that slot and reuse it.
3306 """
3307 if not self._pipeline_slots:
3308 raise RedisClusterException(
3309 "At least a command with a key is needed to identify a node"
3310 )
3312 node: ClusterNode = self._nodes_manager.get_node_from_slot(
3313 list(self._pipeline_slots)[0], False
3314 )
3315 redis_node: Redis = self._pipe.get_redis_connection(node)
3316 if self._transaction_connection:
3317 if not redis_node.connection_pool.owns_connection(
3318 self._transaction_connection
3319 ):
3320 previous_node = self._nodes_manager.find_connection_owner(
3321 self._transaction_connection
3322 )
3323 previous_node.connection_pool.release(self._transaction_connection)
3324 self._transaction_connection = None
3326 if not self._transaction_connection:
3327 self._transaction_connection = get_connection(redis_node)
3329 return redis_node, self._transaction_connection
3331 def execute_command(self, *args, **kwargs):
3332 slot_number: Optional[int] = None
3333 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
3334 slot_number = self._pipe.determine_slot(*args)
3336 if (
3337 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
3338 ) and not self._explicit_transaction:
3339 if args[0] == "WATCH":
3340 self._validate_watch()
3342 if slot_number is not None:
3343 if self._pipeline_slots and slot_number not in self._pipeline_slots:
3344 raise CrossSlotTransactionError(
3345 "Cannot watch or send commands on different slots"
3346 )
3348 self._pipeline_slots.add(slot_number)
3349 elif args[0] not in self.NO_SLOTS_COMMANDS:
3350 raise RedisClusterException(
3351 f"Cannot identify slot number for command: {args[0]},"
3352 "it cannot be triggered in a transaction"
3353 )
3355 return self._immediate_execute_command(*args, **kwargs)
3356 else:
3357 if slot_number is not None:
3358 self._pipeline_slots.add(slot_number)
3360 return self.pipeline_execute_command(*args, **kwargs)
3362 def _validate_watch(self):
3363 if self._explicit_transaction:
3364 raise RedisError("Cannot issue a WATCH after a MULTI")
3366 self._watching = True
3368 def _immediate_execute_command(self, *args, **options):
3369 return self._retry.call_with_retry(
3370 lambda: self._get_connection_and_send_command(*args, **options),
3371 self._reinitialize_on_error,
3372 )
3374 def _get_connection_and_send_command(self, *args, **options):
3375 redis_node, connection = self._get_client_and_connection_for_transaction()
3376 return self._send_command_parse_response(
3377 connection, redis_node, args[0], *args, **options
3378 )
3380 def _send_command_parse_response(
3381 self, conn, redis_node: Redis, command_name, *args, **options
3382 ):
3383 """
3384 Send a command and parse the response
3385 """
3387 conn.send_command(*args)
3388 output = redis_node.parse_response(conn, command_name, **options)
3390 if command_name in self.UNWATCH_COMMANDS:
3391 self._watching = False
3392 return output
3394 def _reinitialize_on_error(self, error):
3395 if self._watching:
3396 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
3397 raise WatchError("Slot rebalancing occurred while watching keys")
3399 if (
3400 type(error) in self.SLOT_REDIRECT_ERRORS
3401 or type(error) in self.CONNECTION_ERRORS
3402 ):
3403 if self._transaction_connection:
3404 self._transaction_connection = None
3406 self._pipe.reinitialize_counter += 1
3407 if self._pipe._should_reinitialized():
3408 self._nodes_manager.initialize()
3409 self.reinitialize_counter = 0
3410 else:
3411 if isinstance(error, AskError):
3412 self._nodes_manager.update_moved_exception(error)
3414 self._executing = False
3416 def _raise_first_error(self, responses, stack):
3417 """
3418 Raise the first exception on the stack
3419 """
3420 for r, cmd in zip(responses, stack):
3421 if isinstance(r, Exception):
3422 self.annotate_exception(r, cmd.position + 1, cmd.args)
3423 raise r
3425 def execute(self, raise_on_error: bool = True) -> List[Any]:
3426 stack = self._command_queue
3427 if not stack and (not self._watching or not self._pipeline_slots):
3428 return []
3430 return self._execute_transaction_with_retries(stack, raise_on_error)
3432 def _execute_transaction_with_retries(
3433 self, stack: List["PipelineCommand"], raise_on_error: bool
3434 ):
3435 return self._retry.call_with_retry(
3436 lambda: self._execute_transaction(stack, raise_on_error),
3437 self._reinitialize_on_error,
3438 )
3440 def _execute_transaction(
3441 self, stack: List["PipelineCommand"], raise_on_error: bool
3442 ):
3443 if len(self._pipeline_slots) > 1:
3444 raise CrossSlotTransactionError(
3445 "All keys involved in a cluster transaction must map to the same slot"
3446 )
3448 self._executing = True
3450 redis_node, connection = self._get_client_and_connection_for_transaction()
3452 stack = chain(
3453 [PipelineCommand(("MULTI",))],
3454 stack,
3455 [PipelineCommand(("EXEC",))],
3456 )
3457 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
3458 packed_commands = connection.pack_commands(commands)
3459 connection.send_packed_command(packed_commands)
3460 errors = []
3462 # parse off the response for MULTI
3463 # NOTE: we need to handle ResponseErrors here and continue
3464 # so that we read all the additional command messages from
3465 # the socket
3466 try:
3467 redis_node.parse_response(connection, "MULTI")
3468 except ResponseError as e:
3469 self.annotate_exception(e, 0, "MULTI")
3470 errors.append(e)
3471 except self.CONNECTION_ERRORS as cluster_error:
3472 self.annotate_exception(cluster_error, 0, "MULTI")
3473 raise
3475 # and all the other commands
3476 for i, command in enumerate(self._command_queue):
3477 if EMPTY_RESPONSE in command.options:
3478 errors.append((i, command.options[EMPTY_RESPONSE]))
3479 else:
3480 try:
3481 _ = redis_node.parse_response(connection, "_")
3482 except self.SLOT_REDIRECT_ERRORS as slot_error:
3483 self.annotate_exception(slot_error, i + 1, command.args)
3484 errors.append(slot_error)
3485 except self.CONNECTION_ERRORS as cluster_error:
3486 self.annotate_exception(cluster_error, i + 1, command.args)
3487 raise
3488 except ResponseError as e:
3489 self.annotate_exception(e, i + 1, command.args)
3490 errors.append(e)
3492 response = None
3493 # parse the EXEC.
3494 try:
3495 response = redis_node.parse_response(connection, "EXEC")
3496 except ExecAbortError:
3497 if errors:
3498 raise errors[0]
3499 raise
3501 self._executing = False
3503 # EXEC clears any watched keys
3504 self._watching = False
3506 if response is None:
3507 raise WatchError("Watched variable changed.")
3509 # put any parse errors into the response
3510 for i, e in errors:
3511 response.insert(i, e)
3513 if len(response) != len(self._command_queue):
3514 raise InvalidPipelineStack(
3515 "Unexpected response length for cluster pipeline EXEC."
3516 " Command stack was {} but response had length {}".format(
3517 [c.args[0] for c in self._command_queue], len(response)
3518 )
3519 )
3521 # find any errors in the response and raise if necessary
3522 if raise_on_error or len(errors) > 0:
3523 self._raise_first_error(
3524 response,
3525 self._command_queue,
3526 )
3528 # We have to run response callbacks manually
3529 data = []
3530 for r, cmd in zip(response, self._command_queue):
3531 if not isinstance(r, Exception):
3532 command_name = cmd.args[0]
3533 if command_name in self._pipe.cluster_response_callbacks:
3534 r = self._pipe.cluster_response_callbacks[command_name](
3535 r, **cmd.options
3536 )
3537 data.append(r)
3538 return data
3540 def reset(self):
3541 self._command_queue = []
3543 # make sure to reset the connection state in the event that we were
3544 # watching something
3545 if self._transaction_connection:
3546 try:
3547 if self._watching:
3548 # call this manually since our unwatch or
3549 # immediate_execute_command methods can call reset()
3550 self._transaction_connection.send_command("UNWATCH")
3551 self._transaction_connection.read_response()
3552 # we can safely return the connection to the pool here since we're
3553 # sure we're no longer WATCHing anything
3554 node = self._nodes_manager.find_connection_owner(
3555 self._transaction_connection
3556 )
3557 node.redis_connection.connection_pool.release(
3558 self._transaction_connection
3559 )
3560 self._transaction_connection = None
3561 except self.CONNECTION_ERRORS:
3562 # disconnect will also remove any previous WATCHes
3563 if self._transaction_connection:
3564 self._transaction_connection.disconnect()
3566 # clean up the other instance attributes
3567 self._watching = False
3568 self._explicit_transaction = False
3569 self._pipeline_slots = set()
3570 self._executing = False
3572 def send_cluster_commands(
3573 self, stack, raise_on_error=True, allow_redirections=True
3574 ):
3575 raise NotImplementedError(
3576 "send_cluster_commands cannot be executed in transactional context."
3577 )
3579 def multi(self):
3580 if self._explicit_transaction:
3581 raise RedisError("Cannot issue nested calls to MULTI")
3582 if self._command_queue:
3583 raise RedisError(
3584 "Commands without an initial WATCH have already been issued"
3585 )
3586 self._explicit_transaction = True
3588 def watch(self, *names):
3589 if self._explicit_transaction:
3590 raise RedisError("Cannot issue a WATCH after a MULTI")
3592 return self.execute_command("WATCH", *names)
3594 def unwatch(self):
3595 if self._watching:
3596 return self.execute_command("UNWATCH")
3598 return True
3600 def discard(self):
3601 self.reset()
3603 def delete(self, *names):
3604 return self.execute_command("DEL", *names)
3606 def unlink(self, *names):
3607 return self.execute_command("UNLINK", *names)