Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cluster.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import random
2import socket
3import sys
4import threading
5import time
6from abc import ABC, abstractmethod
7from collections import OrderedDict
8from copy import copy
9from enum import Enum
10from itertools import chain
11from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
13from redis._parsers import CommandsParser, Encoder
14from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy
15from redis._parsers.helpers import parse_scan
16from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
17from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
18from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
19from redis.commands import READ_COMMANDS, RedisClusterCommands
20from redis.commands.helpers import list_or_args
21from redis.commands.policies import PolicyResolver, StaticPolicyResolver
22from redis.connection import (
23 Connection,
24 ConnectionPool,
25 parse_url,
26)
27from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
28from redis.event import (
29 AfterPooledConnectionsInstantiationEvent,
30 AfterPubSubConnectionInstantiationEvent,
31 ClientType,
32 EventDispatcher,
33)
34from redis.exceptions import (
35 AskError,
36 AuthenticationError,
37 ClusterDownError,
38 ClusterError,
39 ConnectionError,
40 CrossSlotTransactionError,
41 DataError,
42 ExecAbortError,
43 InvalidPipelineStack,
44 MaxConnectionsError,
45 MovedError,
46 RedisClusterException,
47 RedisError,
48 ResponseError,
49 SlotNotCoveredError,
50 TimeoutError,
51 TryAgainError,
52 WatchError,
53)
54from redis.lock import Lock
55from redis.maint_notifications import MaintNotificationsConfig
56from redis.retry import Retry
57from redis.utils import (
58 deprecated_args,
59 dict_merge,
60 list_keys_to_dict,
61 merge_result,
62 safe_str,
63 str_if_bytes,
64 truncate_text,
65)
68def get_node_name(host: str, port: Union[str, int]) -> str:
69 return f"{host}:{port}"
72@deprecated_args(
73 allowed_args=["redis_node"],
74 reason="Use get_connection(redis_node) instead",
75 version="5.3.0",
76)
77def get_connection(redis_node: Redis, *args, **options) -> Connection:
78 return redis_node.connection or redis_node.connection_pool.get_connection()
81def parse_scan_result(command, res, **options):
82 cursors = {}
83 ret = []
84 for node_name, response in res.items():
85 cursor, r = parse_scan(response, **options)
86 cursors[node_name] = cursor
87 ret += r
89 return cursors, ret
92def parse_pubsub_numsub(command, res, **options):
93 numsub_d = OrderedDict()
94 for numsub_tups in res.values():
95 for channel, numsubbed in numsub_tups:
96 try:
97 numsub_d[channel] += numsubbed
98 except KeyError:
99 numsub_d[channel] = numsubbed
101 ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
102 return ret_numsub
105def parse_cluster_slots(
106 resp: Any, **options: Any
107) -> Dict[Tuple[int, int], Dict[str, Any]]:
108 current_host = options.get("current_host", "")
110 def fix_server(*args: Any) -> Tuple[str, Any]:
111 return str_if_bytes(args[0]) or current_host, args[1]
113 slots = {}
114 for slot in resp:
115 start, end, primary = slot[:3]
116 replicas = slot[3:]
117 slots[start, end] = {
118 "primary": fix_server(*primary),
119 "replicas": [fix_server(*replica) for replica in replicas],
120 }
122 return slots
125def parse_cluster_shards(resp, **options):
126 """
127 Parse CLUSTER SHARDS response.
128 """
129 if isinstance(resp[0], dict):
130 return resp
131 shards = []
132 for x in resp:
133 shard = {"slots": [], "nodes": []}
134 for i in range(0, len(x[1]), 2):
135 shard["slots"].append((x[1][i], (x[1][i + 1])))
136 nodes = x[3]
137 for node in nodes:
138 dict_node = {}
139 for i in range(0, len(node), 2):
140 dict_node[node[i]] = node[i + 1]
141 shard["nodes"].append(dict_node)
142 shards.append(shard)
144 return shards
147def parse_cluster_myshardid(resp, **options):
148 """
149 Parse CLUSTER MYSHARDID response.
150 """
151 return resp.decode("utf-8")
154PRIMARY = "primary"
155REPLICA = "replica"
156SLOT_ID = "slot-id"
158REDIS_ALLOWED_KEYS = (
159 "connection_class",
160 "connection_pool",
161 "connection_pool_class",
162 "client_name",
163 "credential_provider",
164 "db",
165 "decode_responses",
166 "encoding",
167 "encoding_errors",
168 "host",
169 "lib_name",
170 "lib_version",
171 "max_connections",
172 "nodes_flag",
173 "redis_connect_func",
174 "password",
175 "port",
176 "timeout",
177 "queue_class",
178 "retry",
179 "retry_on_timeout",
180 "protocol",
181 "socket_connect_timeout",
182 "socket_keepalive",
183 "socket_keepalive_options",
184 "socket_timeout",
185 "ssl",
186 "ssl_ca_certs",
187 "ssl_ca_data",
188 "ssl_certfile",
189 "ssl_cert_reqs",
190 "ssl_include_verify_flags",
191 "ssl_exclude_verify_flags",
192 "ssl_keyfile",
193 "ssl_password",
194 "ssl_check_hostname",
195 "unix_socket_path",
196 "username",
197 "cache",
198 "cache_config",
199)
200KWARGS_DISABLED_KEYS = ("host", "port", "retry")
203def cleanup_kwargs(**kwargs):
204 """
205 Remove unsupported or disabled keys from kwargs
206 """
207 connection_kwargs = {
208 k: v
209 for k, v in kwargs.items()
210 if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
211 }
213 return connection_kwargs
216class AbstractRedisCluster:
217 RedisClusterRequestTTL = 16
219 PRIMARIES = "primaries"
220 REPLICAS = "replicas"
221 ALL_NODES = "all"
222 RANDOM = "random"
223 DEFAULT_NODE = "default-node"
225 NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
227 COMMAND_FLAGS = dict_merge(
228 list_keys_to_dict(
229 [
230 "ACL CAT",
231 "ACL DELUSER",
232 "ACL DRYRUN",
233 "ACL GENPASS",
234 "ACL GETUSER",
235 "ACL HELP",
236 "ACL LIST",
237 "ACL LOG",
238 "ACL LOAD",
239 "ACL SAVE",
240 "ACL SETUSER",
241 "ACL USERS",
242 "ACL WHOAMI",
243 "AUTH",
244 "CLIENT LIST",
245 "CLIENT SETINFO",
246 "CLIENT SETNAME",
247 "CLIENT GETNAME",
248 "CONFIG SET",
249 "CONFIG REWRITE",
250 "CONFIG RESETSTAT",
251 "TIME",
252 "PUBSUB CHANNELS",
253 "PUBSUB NUMPAT",
254 "PUBSUB NUMSUB",
255 "PUBSUB SHARDCHANNELS",
256 "PUBSUB SHARDNUMSUB",
257 "PING",
258 "INFO",
259 "SHUTDOWN",
260 "KEYS",
261 "DBSIZE",
262 "BGSAVE",
263 "SLOWLOG GET",
264 "SLOWLOG LEN",
265 "SLOWLOG RESET",
266 "WAIT",
267 "WAITAOF",
268 "SAVE",
269 "MEMORY PURGE",
270 "MEMORY MALLOC-STATS",
271 "MEMORY STATS",
272 "LASTSAVE",
273 "CLIENT TRACKINGINFO",
274 "CLIENT PAUSE",
275 "CLIENT UNPAUSE",
276 "CLIENT UNBLOCK",
277 "CLIENT ID",
278 "CLIENT REPLY",
279 "CLIENT GETREDIR",
280 "CLIENT INFO",
281 "CLIENT KILL",
282 "READONLY",
283 "CLUSTER INFO",
284 "CLUSTER MEET",
285 "CLUSTER MYSHARDID",
286 "CLUSTER NODES",
287 "CLUSTER REPLICAS",
288 "CLUSTER RESET",
289 "CLUSTER SET-CONFIG-EPOCH",
290 "CLUSTER SLOTS",
291 "CLUSTER SHARDS",
292 "CLUSTER COUNT-FAILURE-REPORTS",
293 "CLUSTER KEYSLOT",
294 "COMMAND",
295 "COMMAND COUNT",
296 "COMMAND LIST",
297 "COMMAND GETKEYS",
298 "CONFIG GET",
299 "DEBUG",
300 "RANDOMKEY",
301 "READONLY",
302 "READWRITE",
303 "TIME",
304 "TFUNCTION LOAD",
305 "TFUNCTION DELETE",
306 "TFUNCTION LIST",
307 "TFCALL",
308 "TFCALLASYNC",
309 "LATENCY HISTORY",
310 "LATENCY LATEST",
311 "LATENCY RESET",
312 "MODULE LIST",
313 "MODULE LOAD",
314 "MODULE UNLOAD",
315 "MODULE LOADEX",
316 ],
317 DEFAULT_NODE,
318 ),
319 list_keys_to_dict(
320 [
321 "FLUSHALL",
322 "FLUSHDB",
323 "FUNCTION DELETE",
324 "FUNCTION FLUSH",
325 "FUNCTION LIST",
326 "FUNCTION LOAD",
327 "FUNCTION RESTORE",
328 "SCAN",
329 "SCRIPT EXISTS",
330 "SCRIPT FLUSH",
331 "SCRIPT LOAD",
332 ],
333 PRIMARIES,
334 ),
335 list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
336 list_keys_to_dict(
337 [
338 "CLUSTER COUNTKEYSINSLOT",
339 "CLUSTER DELSLOTS",
340 "CLUSTER DELSLOTSRANGE",
341 "CLUSTER GETKEYSINSLOT",
342 "CLUSTER SETSLOT",
343 ],
344 SLOT_ID,
345 ),
346 )
348 SEARCH_COMMANDS = (
349 [
350 "FT.CREATE",
351 "FT.SEARCH",
352 "FT.AGGREGATE",
353 "FT.EXPLAIN",
354 "FT.EXPLAINCLI",
355 "FT,PROFILE",
356 "FT.ALTER",
357 "FT.DROPINDEX",
358 "FT.ALIASADD",
359 "FT.ALIASUPDATE",
360 "FT.ALIASDEL",
361 "FT.TAGVALS",
362 "FT.SUGADD",
363 "FT.SUGGET",
364 "FT.SUGDEL",
365 "FT.SUGLEN",
366 "FT.SYNUPDATE",
367 "FT.SYNDUMP",
368 "FT.SPELLCHECK",
369 "FT.DICTADD",
370 "FT.DICTDEL",
371 "FT.DICTDUMP",
372 "FT.INFO",
373 "FT._LIST",
374 "FT.CONFIG",
375 "FT.ADD",
376 "FT.DEL",
377 "FT.DROP",
378 "FT.GET",
379 "FT.MGET",
380 "FT.SYNADD",
381 ],
382 )
384 CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
385 "CLUSTER SLOTS": parse_cluster_slots,
386 "CLUSTER SHARDS": parse_cluster_shards,
387 "CLUSTER MYSHARDID": parse_cluster_myshardid,
388 }
390 RESULT_CALLBACKS = dict_merge(
391 list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
392 list_keys_to_dict(
393 ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
394 ),
395 list_keys_to_dict(
396 ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
397 ),
398 list_keys_to_dict(
399 [
400 "PING",
401 "CONFIG SET",
402 "CONFIG REWRITE",
403 "CONFIG RESETSTAT",
404 "CLIENT SETNAME",
405 "BGSAVE",
406 "SLOWLOG RESET",
407 "SAVE",
408 "MEMORY PURGE",
409 "CLIENT PAUSE",
410 "CLIENT UNPAUSE",
411 ],
412 lambda command, res: all(res.values()) if isinstance(res, dict) else res,
413 ),
414 list_keys_to_dict(
415 ["DBSIZE", "WAIT"],
416 lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
417 ),
418 list_keys_to_dict(
419 ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
420 ),
421 list_keys_to_dict(["SCAN"], parse_scan_result),
422 list_keys_to_dict(
423 ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
424 ),
425 list_keys_to_dict(
426 ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
427 ),
428 list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
429 )
431 ERRORS_ALLOW_RETRY = (
432 ConnectionError,
433 TimeoutError,
434 ClusterDownError,
435 SlotNotCoveredError,
436 )
438 def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
439 """Replace the default cluster node.
440 A random cluster node will be chosen if target_node isn't passed, and primaries
441 will be prioritized. The default node will not be changed if there are no other
442 nodes in the cluster.
444 Args:
445 target_node (ClusterNode, optional): Target node to replace the default
446 node. Defaults to None.
447 """
448 if target_node:
449 self.nodes_manager.default_node = target_node
450 else:
451 curr_node = self.get_default_node()
452 primaries = [node for node in self.get_primaries() if node != curr_node]
453 if primaries:
454 # Choose a primary if the cluster contains different primaries
455 self.nodes_manager.default_node = random.choice(primaries)
456 else:
457 # Otherwise, choose a primary if the cluster contains different primaries
458 replicas = [node for node in self.get_replicas() if node != curr_node]
459 if replicas:
460 self.nodes_manager.default_node = random.choice(replicas)
463class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
464 @classmethod
465 def from_url(cls, url, **kwargs):
466 """
467 Return a Redis client object configured from the given URL
469 For example::
471 redis://[[username]:[password]]@localhost:6379/0
472 rediss://[[username]:[password]]@localhost:6379/0
473 unix://[username@]/path/to/socket.sock?db=0[&password=password]
475 Three URL schemes are supported:
477 - `redis://` creates a TCP socket connection. See more at:
478 <https://www.iana.org/assignments/uri-schemes/prov/redis>
479 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
480 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
481 - ``unix://``: creates a Unix Domain Socket connection.
483 The username, password, hostname, path and all querystring values
484 are passed through urllib.parse.unquote in order to replace any
485 percent-encoded values with their corresponding characters.
487 There are several ways to specify a database number. The first value
488 found will be used:
490 1. A ``db`` querystring option, e.g. redis://localhost?db=0
491 2. If using the redis:// or rediss:// schemes, the path argument
492 of the url, e.g. redis://localhost/0
493 3. A ``db`` keyword argument to this function.
495 If none of these options are specified, the default db=0 is used.
497 All querystring options are cast to their appropriate Python types.
498 Boolean arguments can be specified with string values "True"/"False"
499 or "Yes"/"No". Values that cannot be properly cast cause a
500 ``ValueError`` to be raised. Once parsed, the querystring arguments
501 and keyword arguments are passed to the ``ConnectionPool``'s
502 class initializer. In the case of conflicting arguments, querystring
503 arguments always win.
505 """
506 return cls(url=url, **kwargs)
508 @deprecated_args(
509 args_to_warn=["read_from_replicas"],
510 reason="Please configure the 'load_balancing_strategy' instead",
511 version="5.3.0",
512 )
513 @deprecated_args(
514 args_to_warn=[
515 "cluster_error_retry_attempts",
516 ],
517 reason="Please configure the 'retry' object instead",
518 version="6.0.0",
519 )
520 def __init__(
521 self,
522 host: Optional[str] = None,
523 port: int = 6379,
524 startup_nodes: Optional[List["ClusterNode"]] = None,
525 cluster_error_retry_attempts: int = 3,
526 retry: Optional["Retry"] = None,
527 require_full_coverage: bool = True,
528 reinitialize_steps: int = 5,
529 read_from_replicas: bool = False,
530 load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
531 dynamic_startup_nodes: bool = True,
532 url: Optional[str] = None,
533 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
534 cache: Optional[CacheInterface] = None,
535 cache_config: Optional[CacheConfig] = None,
536 event_dispatcher: Optional[EventDispatcher] = None,
537 policy_resolver: PolicyResolver = StaticPolicyResolver(),
538 **kwargs,
539 ):
540 """
541 Initialize a new RedisCluster client.
543 :param startup_nodes:
544 List of nodes from which initial bootstrapping can be done
545 :param host:
546 Can be used to point to a startup node
547 :param port:
548 Can be used to point to a startup node
549 :param require_full_coverage:
550 When set to False (default value): the client will not require a
551 full coverage of the slots. However, if not all slots are covered,
552 and at least one node has 'cluster-require-full-coverage' set to
553 'yes,' the server will throw a ClusterDownError for some key-based
554 commands. See -
555 https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
556 When set to True: all slots must be covered to construct the
557 cluster client. If not all slots are covered, RedisClusterException
558 will be thrown.
559 :param read_from_replicas:
560 @deprecated - please use load_balancing_strategy instead
561 Enable read from replicas in READONLY mode. You can read possibly
562 stale data.
563 When set to true, read commands will be assigned between the
564 primary and its replications in a Round-Robin manner.
565 :param load_balancing_strategy:
566 Enable read from replicas in READONLY mode and defines the load balancing
567 strategy that will be used for cluster node selection.
568 The data read from replicas is eventually consistent with the data in primary nodes.
569 :param dynamic_startup_nodes:
570 Set the RedisCluster's startup nodes to all of the discovered nodes.
571 If true (default value), the cluster's discovered nodes will be used to
572 determine the cluster nodes-slots mapping in the next topology refresh.
573 It will remove the initial passed startup nodes if their endpoints aren't
574 listed in the CLUSTER SLOTS output.
575 If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
576 specific IP addresses, it is best to set it to false.
577 :param cluster_error_retry_attempts:
578 @deprecated - Please configure the 'retry' object instead
579 In case 'retry' object is set - this argument is ignored!
581 Number of times to retry before raising an error when
582 :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
583 :class:`~.ClusterDownError` are encountered
584 :param retry:
585 A retry object that defines the retry strategy and the number of
586 retries for the cluster client.
587 In current implementation for the cluster client (starting form redis-py version 6.0.0)
588 the retry object is not yet fully utilized, instead it is used just to determine
589 the number of retries for the cluster client.
590 In the future releases the retry object will be used to handle the cluster client retries!
591 :param reinitialize_steps:
592 Specifies the number of MOVED errors that need to occur before
593 reinitializing the whole cluster topology. If a MOVED error occurs
594 and the cluster does not need to be reinitialized on this current
595 error handling, only the MOVED slot will be patched with the
596 redirected node.
597 To reinitialize the cluster on every MOVED error, set
598 reinitialize_steps to 1.
599 To avoid reinitializing the cluster on moved errors, set
600 reinitialize_steps to 0.
601 :param address_remap:
602 An optional callable which, when provided with an internal network
603 address of a node, e.g. a `(host, port)` tuple, will return the address
604 where the node is reachable. This can be used to map the addresses at
605 which the nodes _think_ they are, to addresses at which a client may
606 reach them, such as when they sit behind a proxy.
608 :**kwargs:
609 Extra arguments that will be sent into Redis instance when created
610 (See Official redis-py doc for supported kwargs - the only limitation
611 is that you can't provide 'retry' object as part of kwargs.
612 [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
613 Some kwargs are not supported and will raise a
614 RedisClusterException:
615 - db (Redis do not support database SELECT in cluster mode)
616 """
617 if startup_nodes is None:
618 startup_nodes = []
620 if "db" in kwargs:
621 # Argument 'db' is not possible to use in cluster mode
622 raise RedisClusterException(
623 "Argument 'db' is not possible to use in cluster mode"
624 )
626 if "retry" in kwargs:
627 # Argument 'retry' is not possible to be used in kwargs when in cluster mode
628 # the kwargs are set to the lower level connections to the cluster nodes
629 # and there we provide retry configuration without retries allowed.
630 # The retries should be handled on cluster client level.
631 raise RedisClusterException(
632 "The 'retry' argument cannot be used in kwargs when running in cluster mode."
633 )
635 # Get the startup node/s
636 from_url = False
637 if url is not None:
638 from_url = True
639 url_options = parse_url(url)
640 if "path" in url_options:
641 raise RedisClusterException(
642 "RedisCluster does not currently support Unix Domain "
643 "Socket connections"
644 )
645 if "db" in url_options and url_options["db"] != 0:
646 # Argument 'db' is not possible to use in cluster mode
647 raise RedisClusterException(
648 "A ``db`` querystring option can only be 0 in cluster mode"
649 )
650 kwargs.update(url_options)
651 host = kwargs.get("host")
652 port = kwargs.get("port", port)
653 startup_nodes.append(ClusterNode(host, port))
654 elif host is not None and port is not None:
655 startup_nodes.append(ClusterNode(host, port))
656 elif len(startup_nodes) == 0:
657 # No startup node was provided
658 raise RedisClusterException(
659 "RedisCluster requires at least one node to discover the "
660 "cluster. Please provide one of the followings:\n"
661 "1. host and port, for example:\n"
662 " RedisCluster(host='localhost', port=6379)\n"
663 "2. list of startup nodes, for example:\n"
664 " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
665 " ClusterNode('localhost', 6378)])"
666 )
667 # Update the connection arguments
668 # Whenever a new connection is established, RedisCluster's on_connect
669 # method should be run
670 # If the user passed on_connect function we'll save it and run it
671 # inside the RedisCluster.on_connect() function
672 self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
673 kwargs.update({"redis_connect_func": self.on_connect})
674 kwargs = cleanup_kwargs(**kwargs)
675 if retry:
676 self.retry = retry
677 else:
678 self.retry = Retry(
679 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
680 retries=cluster_error_retry_attempts,
681 )
683 self.encoder = Encoder(
684 kwargs.get("encoding", "utf-8"),
685 kwargs.get("encoding_errors", "strict"),
686 kwargs.get("decode_responses", False),
687 )
688 protocol = kwargs.get("protocol", None)
689 if (cache_config or cache) and protocol not in [3, "3"]:
690 raise RedisError("Client caching is only supported with RESP version 3")
692 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
693 self.node_flags = self.__class__.NODE_FLAGS.copy()
694 self.read_from_replicas = read_from_replicas
695 self.load_balancing_strategy = load_balancing_strategy
696 self.reinitialize_counter = 0
697 self.reinitialize_steps = reinitialize_steps
698 if event_dispatcher is None:
699 self._event_dispatcher = EventDispatcher()
700 else:
701 self._event_dispatcher = event_dispatcher
702 self.startup_nodes = startup_nodes
703 self.nodes_manager = NodesManager(
704 startup_nodes=startup_nodes,
705 from_url=from_url,
706 require_full_coverage=require_full_coverage,
707 dynamic_startup_nodes=dynamic_startup_nodes,
708 address_remap=address_remap,
709 cache=cache,
710 cache_config=cache_config,
711 event_dispatcher=self._event_dispatcher,
712 **kwargs,
713 )
715 self.cluster_response_callbacks = CaseInsensitiveDict(
716 self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
717 )
718 self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
720 # For backward compatibility, mapping from existing policies to new one
721 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
722 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
723 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
724 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
725 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
726 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
727 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
728 }
730 self._policies_callback_mapping: dict[
731 Union[RequestPolicy, ResponsePolicy], Callable
732 ] = {
733 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
734 self.get_random_primary_or_all_nodes(command_name)
735 ],
736 RequestPolicy.DEFAULT_KEYED: lambda command,
737 *args: self.get_nodes_from_slot(command, *args),
738 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
739 RequestPolicy.ALL_SHARDS: self.get_primaries,
740 RequestPolicy.ALL_NODES: self.get_nodes,
741 RequestPolicy.ALL_REPLICAS: self.get_replicas,
742 RequestPolicy.MULTI_SHARD: lambda *args,
743 **kwargs: self._split_multi_shard_command(*args, **kwargs),
744 RequestPolicy.SPECIAL: self.get_special_nodes,
745 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
746 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
747 }
749 self._policy_resolver = policy_resolver
750 self.commands_parser = CommandsParser(self)
752 # Node where FT.AGGREGATE command is executed.
753 self._aggregate_nodes = None
754 self._lock = threading.RLock()
756 def __enter__(self):
757 return self
759 def __exit__(self, exc_type, exc_value, traceback):
760 self.close()
762 def __del__(self):
763 try:
764 self.close()
765 except Exception:
766 pass
768 def disconnect_connection_pools(self):
769 for node in self.get_nodes():
770 if node.redis_connection:
771 try:
772 node.redis_connection.connection_pool.disconnect()
773 except OSError:
774 # Client was already disconnected. do nothing
775 pass
777 def on_connect(self, connection):
778 """
779 Initialize the connection, authenticate and select a database and send
780 READONLY if it is set during object initialization.
781 """
782 connection.on_connect()
784 if self.read_from_replicas or self.load_balancing_strategy:
785 # Sending READONLY command to server to configure connection as
786 # readonly. Since each cluster node may change its server type due
787 # to a failover, we should establish a READONLY connection
788 # regardless of the server type. If this is a primary connection,
789 # READONLY would not affect executing write commands.
790 connection.send_command("READONLY")
791 if str_if_bytes(connection.read_response()) != "OK":
792 raise ConnectionError("READONLY command failed")
794 if self.user_on_connect_func is not None:
795 self.user_on_connect_func(connection)
797 def get_redis_connection(self, node: "ClusterNode") -> Redis:
798 if not node.redis_connection:
799 with self._lock:
800 if not node.redis_connection:
801 self.nodes_manager.create_redis_connections([node])
802 return node.redis_connection
804 def get_node(self, host=None, port=None, node_name=None):
805 return self.nodes_manager.get_node(host, port, node_name)
807 def get_primaries(self):
808 return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
810 def get_replicas(self):
811 return self.nodes_manager.get_nodes_by_server_type(REPLICA)
813 def get_random_node(self):
814 return random.choice(list(self.nodes_manager.nodes_cache.values()))
816 def get_random_primary_or_all_nodes(self, command_name):
817 """
818 Returns random primary or all nodes depends on READONLY mode.
819 """
820 if self.read_from_replicas and command_name in READ_COMMANDS:
821 return self.get_random_node()
823 return self.get_random_primary_node()
825 def get_nodes(self):
826 return list(self.nodes_manager.nodes_cache.values())
828 def get_node_from_key(self, key, replica=False):
829 """
830 Get the node that holds the key's slot.
831 If replica set to True but the slot doesn't have any replicas, None is
832 returned.
833 """
834 slot = self.keyslot(key)
835 slot_cache = self.nodes_manager.slots_cache.get(slot)
836 if slot_cache is None or len(slot_cache) == 0:
837 raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
838 if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
839 return None
840 elif replica:
841 node_idx = 1
842 else:
843 # primary
844 node_idx = 0
846 return slot_cache[node_idx]
848 def get_default_node(self):
849 """
850 Get the cluster's default node
851 """
852 return self.nodes_manager.default_node
854 def get_nodes_from_slot(self, command: str, *args):
855 """
856 Returns a list of nodes that hold the specified keys' slots.
857 """
858 # get the node that holds the key's slot
859 slot = self.determine_slot(*args)
860 node = self.nodes_manager.get_node_from_slot(
861 slot,
862 self.read_from_replicas and command in READ_COMMANDS,
863 self.load_balancing_strategy if command in READ_COMMANDS else None,
864 )
865 return [node]
867 def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]:
868 """
869 Splits the command with Multi-Shard policy, to the multiple commands
870 """
871 keys = self._get_command_keys(*args)
872 commands = []
874 for key in keys:
875 commands.append(
876 {
877 "args": (args[0], key),
878 "kwargs": kwargs,
879 }
880 )
882 return commands
884 def get_special_nodes(self) -> Optional[list["ClusterNode"]]:
885 """
886 Returns a list of nodes for commands with a special policy.
887 """
888 if not self._aggregate_nodes:
889 raise RedisClusterException(
890 "Cannot execute FT.CURSOR commands without FT.AGGREGATE"
891 )
893 return self._aggregate_nodes
895 def get_random_primary_node(self) -> "ClusterNode":
896 """
897 Returns a random primary node
898 """
899 return random.choice(self.get_primaries())
901 def _evaluate_all_succeeded(self, res):
902 """
903 Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED
904 """
905 first_successful_response = None
907 if isinstance(res, dict):
908 for key, value in res.items():
909 if value:
910 if first_successful_response is None:
911 first_successful_response = {key: value}
912 else:
913 return {key: False}
914 else:
915 for response in res:
916 if response:
917 if first_successful_response is None:
918 # Dynamically resolve type
919 first_successful_response = type(response)(response)
920 else:
921 return type(response)(False)
923 return first_successful_response
925 def set_default_node(self, node):
926 """
927 Set the default node of the cluster.
928 :param node: 'ClusterNode'
929 :return True if the default node was set, else False
930 """
931 if node is None or self.get_node(node_name=node.name) is None:
932 return False
933 self.nodes_manager.default_node = node
934 return True
936 def set_retry(self, retry: Retry) -> None:
937 self.retry = retry
939 def monitor(self, target_node=None):
940 """
941 Returns a Monitor object for the specified target node.
942 The default cluster node will be selected if no target node was
943 specified.
944 Monitor is useful for handling the MONITOR command to the redis server.
945 next_command() method returns one command from monitor
946 listen() method yields commands from monitor.
947 """
948 if target_node is None:
949 target_node = self.get_default_node()
950 if target_node.redis_connection is None:
951 raise RedisClusterException(
952 f"Cluster Node {target_node.name} has no redis_connection"
953 )
954 return target_node.redis_connection.monitor()
956 def pubsub(self, node=None, host=None, port=None, **kwargs):
957 """
958 Allows passing a ClusterNode, or host&port, to get a pubsub instance
959 connected to the specified node
960 """
961 return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
963 def pipeline(self, transaction=None, shard_hint=None):
964 """
965 Cluster impl:
966 Pipelines do not work in cluster mode the same way they
967 do in normal mode. Create a clone of this object so
968 that simulating pipelines will work correctly. Each
969 command will be called directly when used and
970 when calling execute() will only return the result stack.
971 """
972 if shard_hint:
973 raise RedisClusterException("shard_hint is deprecated in cluster mode")
975 return ClusterPipeline(
976 nodes_manager=self.nodes_manager,
977 commands_parser=self.commands_parser,
978 startup_nodes=self.nodes_manager.startup_nodes,
979 result_callbacks=self.result_callbacks,
980 cluster_response_callbacks=self.cluster_response_callbacks,
981 read_from_replicas=self.read_from_replicas,
982 load_balancing_strategy=self.load_balancing_strategy,
983 reinitialize_steps=self.reinitialize_steps,
984 retry=self.retry,
985 lock=self._lock,
986 transaction=transaction,
987 )
989 def lock(
990 self,
991 name,
992 timeout=None,
993 sleep=0.1,
994 blocking=True,
995 blocking_timeout=None,
996 lock_class=None,
997 thread_local=True,
998 raise_on_release_error: bool = True,
999 ):
1000 """
1001 Return a new Lock object using key ``name`` that mimics
1002 the behavior of threading.Lock.
1004 If specified, ``timeout`` indicates a maximum life for the lock.
1005 By default, it will remain locked until release() is called.
1007 ``sleep`` indicates the amount of time to sleep per loop iteration
1008 when the lock is in blocking mode and another client is currently
1009 holding the lock.
1011 ``blocking`` indicates whether calling ``acquire`` should block until
1012 the lock has been acquired or to fail immediately, causing ``acquire``
1013 to return False and the lock not being acquired. Defaults to True.
1014 Note this value can be overridden by passing a ``blocking``
1015 argument to ``acquire``.
1017 ``blocking_timeout`` indicates the maximum amount of time in seconds to
1018 spend trying to acquire the lock. A value of ``None`` indicates
1019 continue trying forever. ``blocking_timeout`` can be specified as a
1020 float or integer, both representing the number of seconds to wait.
1022 ``lock_class`` forces the specified lock implementation. Note that as
1023 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
1024 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
1025 you have created your own custom lock class.
1027 ``thread_local`` indicates whether the lock token is placed in
1028 thread-local storage. By default, the token is placed in thread local
1029 storage so that a thread only sees its token, not a token set by
1030 another thread. Consider the following timeline:
1032 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
1033 thread-1 sets the token to "abc"
1034 time: 1, thread-2 blocks trying to acquire `my-lock` using the
1035 Lock instance.
1036 time: 5, thread-1 has not yet completed. redis expires the lock
1037 key.
1038 time: 5, thread-2 acquired `my-lock` now that it's available.
1039 thread-2 sets the token to "xyz"
1040 time: 6, thread-1 finishes its work and calls release(). if the
1041 token is *not* stored in thread local storage, then
1042 thread-1 would see the token value as "xyz" and would be
1043 able to successfully release the thread-2's lock.
1045 ``raise_on_release_error`` indicates whether to raise an exception when
1046 the lock is no longer owned when exiting the context manager. By default,
1047 this is True, meaning an exception will be raised. If False, the warning
1048 will be logged and the exception will be suppressed.
1050 In some use cases it's necessary to disable thread local storage. For
1051 example, if you have code where one thread acquires a lock and passes
1052 that lock instance to a worker thread to release later. If thread
1053 local storage isn't disabled in this case, the worker thread won't see
1054 the token set by the thread that acquired the lock. Our assumption
1055 is that these cases aren't common and as such default to using
1056 thread local storage."""
1057 if lock_class is None:
1058 lock_class = Lock
1059 return lock_class(
1060 self,
1061 name,
1062 timeout=timeout,
1063 sleep=sleep,
1064 blocking=blocking,
1065 blocking_timeout=blocking_timeout,
1066 thread_local=thread_local,
1067 raise_on_release_error=raise_on_release_error,
1068 )
1070 def set_response_callback(self, command, callback):
1071 """Set a custom Response Callback"""
1072 self.cluster_response_callbacks[command] = callback
1074 def _determine_nodes(
1075 self, *args, request_policy: RequestPolicy, **kwargs
1076 ) -> List["ClusterNode"]:
1077 """
1078 Determines a nodes the command should be executed on.
1079 """
1080 command = args[0].upper()
1081 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1082 command = f"{args[0]} {args[1]}".upper()
1084 nodes_flag = kwargs.pop("nodes_flag", None)
1085 if nodes_flag is not None:
1086 # nodes flag passed by the user
1087 command_flag = nodes_flag
1088 else:
1089 # get the nodes group for this command if it was predefined
1090 command_flag = self.command_flags.get(command)
1092 if command_flag in self._command_flags_mapping:
1093 request_policy = self._command_flags_mapping[command_flag]
1095 policy_callback = self._policies_callback_mapping[request_policy]
1097 if request_policy == RequestPolicy.DEFAULT_KEYED:
1098 nodes = policy_callback(command, *args)
1099 elif request_policy == RequestPolicy.MULTI_SHARD:
1100 nodes = policy_callback(*args, **kwargs)
1101 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
1102 nodes = policy_callback(args[0])
1103 else:
1104 nodes = policy_callback()
1106 if args[0].lower() == "ft.aggregate":
1107 self._aggregate_nodes = nodes
1109 return nodes
1111 def _should_reinitialized(self):
1112 # To reinitialize the cluster on every MOVED error,
1113 # set reinitialize_steps to 1.
1114 # To avoid reinitializing the cluster on moved errors, set
1115 # reinitialize_steps to 0.
1116 if self.reinitialize_steps == 0:
1117 return False
1118 else:
1119 return self.reinitialize_counter % self.reinitialize_steps == 0
1121 def keyslot(self, key):
1122 """
1123 Calculate keyslot for a given key.
1124 See Keys distribution model in https://redis.io/topics/cluster-spec
1125 """
1126 k = self.encoder.encode(key)
1127 return key_slot(k)
1129 def _get_command_keys(self, *args):
1130 """
1131 Get the keys in the command. If the command has no keys in in, None is
1132 returned.
1134 NOTE: Due to a bug in redis<7.0, this function does not work properly
1135 for EVAL or EVALSHA when the `numkeys` arg is 0.
1136 - issue: https://github.com/redis/redis/issues/9493
1137 - fix: https://github.com/redis/redis/pull/9733
1139 So, don't use this function with EVAL or EVALSHA.
1140 """
1141 redis_conn = self.get_default_node().redis_connection
1142 return self.commands_parser.get_keys(redis_conn, *args)
1144 def determine_slot(self, *args) -> int:
1145 """
1146 Figure out what slot to use based on args.
1148 Raises a RedisClusterException if there's a missing key and we can't
1149 determine what slots to map the command to; or, if the keys don't
1150 all map to the same key slot.
1151 """
1152 command = args[0]
1153 if self.command_flags.get(command) == SLOT_ID:
1154 # The command contains the slot ID
1155 return args[1]
1157 # Get the keys in the command
1159 # EVAL and EVALSHA are common enough that it's wasteful to go to the
1160 # redis server to parse the keys. Besides, there is a bug in redis<7.0
1161 # where `self._get_command_keys()` fails anyway. So, we special case
1162 # EVAL/EVALSHA.
1163 if command.upper() in ("EVAL", "EVALSHA"):
1164 # command syntax: EVAL "script body" num_keys ...
1165 if len(args) <= 2:
1166 raise RedisClusterException(f"Invalid args in command: {args}")
1167 num_actual_keys = int(args[2])
1168 eval_keys = args[3 : 3 + num_actual_keys]
1169 # if there are 0 keys, that means the script can be run on any node
1170 # so we can just return a random slot
1171 if len(eval_keys) == 0:
1172 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1173 keys = eval_keys
1174 else:
1175 keys = self._get_command_keys(*args)
1176 if keys is None or len(keys) == 0:
1177 # FCALL can call a function with 0 keys, that means the function
1178 # can be run on any node so we can just return a random slot
1179 if command.upper() in ("FCALL", "FCALL_RO"):
1180 return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
1181 raise RedisClusterException(
1182 "No way to dispatch this command to Redis Cluster. "
1183 "Missing key.\nYou can execute the command by specifying "
1184 f"target nodes.\nCommand: {args}"
1185 )
1187 # single key command
1188 if len(keys) == 1:
1189 return self.keyslot(keys[0])
1191 # multi-key command; we need to make sure all keys are mapped to
1192 # the same slot
1193 slots = {self.keyslot(key) for key in keys}
1194 if len(slots) != 1:
1195 raise RedisClusterException(
1196 f"{command} - all keys must map to the same key slot"
1197 )
1199 return slots.pop()
1201 def get_encoder(self):
1202 """
1203 Get the connections' encoder
1204 """
1205 return self.encoder
1207 def get_connection_kwargs(self):
1208 """
1209 Get the connections' key-word arguments
1210 """
1211 return self.nodes_manager.connection_kwargs
1213 def _is_nodes_flag(self, target_nodes):
1214 return isinstance(target_nodes, str) and target_nodes in self.node_flags
1216 def _parse_target_nodes(self, target_nodes):
1217 if isinstance(target_nodes, list):
1218 nodes = target_nodes
1219 elif isinstance(target_nodes, ClusterNode):
1220 # Supports passing a single ClusterNode as a variable
1221 nodes = [target_nodes]
1222 elif isinstance(target_nodes, dict):
1223 # Supports dictionaries of the format {node_name: node}.
1224 # It enables to execute commands with multi nodes as follows:
1225 # rc.cluster_save_config(rc.get_primaries())
1226 nodes = target_nodes.values()
1227 else:
1228 raise TypeError(
1229 "target_nodes type can be one of the following: "
1230 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
1231 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
1232 f"The passed type is {type(target_nodes)}"
1233 )
1234 return nodes
1236 def execute_command(self, *args, **kwargs):
1237 return self._internal_execute_command(*args, **kwargs)
1239 def _internal_execute_command(self, *args, **kwargs):
1240 """
1241 Wrapper for ERRORS_ALLOW_RETRY error handling.
1243 It will try the number of times specified by the retries property from
1244 config option "self.retry" which defaults to 3 unless manually
1245 configured.
1247 If it reaches the number of times, the command will raise the exception
1249 Key argument :target_nodes: can be passed with the following types:
1250 nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
1251 ClusterNode
1252 list<ClusterNode>
1253 dict<Any, ClusterNode>
1254 """
1255 target_nodes_specified = False
1256 is_default_node = False
1257 target_nodes = None
1258 passed_targets = kwargs.pop("target_nodes", None)
1259 command_policies = self._policy_resolver.resolve(args[0].lower())
1261 if passed_targets is not None and not self._is_nodes_flag(passed_targets):
1262 target_nodes = self._parse_target_nodes(passed_targets)
1263 target_nodes_specified = True
1265 if not command_policies and not target_nodes_specified:
1266 command = args[0].upper()
1267 if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
1268 command = f"{args[0]} {args[1]}".upper()
1270 # We only could resolve key properties if command is not
1271 # in a list of pre-defined request policies
1272 command_flag = self.command_flags.get(command)
1273 if not command_flag:
1274 # Fallback to default policy
1275 if not self.get_default_node():
1276 slot = None
1277 else:
1278 slot = self.determine_slot(*args)
1279 if not slot:
1280 command_policies = CommandPolicies()
1281 else:
1282 command_policies = CommandPolicies(
1283 request_policy=RequestPolicy.DEFAULT_KEYED,
1284 response_policy=ResponsePolicy.DEFAULT_KEYED,
1285 )
1286 else:
1287 if command_flag in self._command_flags_mapping:
1288 command_policies = CommandPolicies(
1289 request_policy=self._command_flags_mapping[command_flag]
1290 )
1291 else:
1292 command_policies = CommandPolicies()
1293 elif not command_policies and target_nodes_specified:
1294 command_policies = CommandPolicies()
1296 # If an error that allows retrying was thrown, the nodes and slots
1297 # cache were reinitialized. We will retry executing the command with
1298 # the updated cluster setup only when the target nodes can be
1299 # determined again with the new cache tables. Therefore, when target
1300 # nodes were passed to this function, we cannot retry the command
1301 # execution since the nodes may not be valid anymore after the tables
1302 # were reinitialized. So in case of passed target nodes,
1303 # retry_attempts will be set to 0.
1304 retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
1305 # Add one for the first execution
1306 execute_attempts = 1 + retry_attempts
1307 for _ in range(execute_attempts):
1308 try:
1309 res = {}
1310 if not target_nodes_specified:
1311 # Determine the nodes to execute the command on
1312 target_nodes = self._determine_nodes(
1313 *args,
1314 request_policy=command_policies.request_policy,
1315 nodes_flag=passed_targets,
1316 )
1317 if not target_nodes:
1318 raise RedisClusterException(
1319 f"No targets were found to execute {args} command on"
1320 )
1321 if (
1322 len(target_nodes) == 1
1323 and target_nodes[0] == self.get_default_node()
1324 ):
1325 is_default_node = True
1326 for node in target_nodes:
1327 res[node.name] = self._execute_command(node, *args, **kwargs)
1329 if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED:
1330 break
1332 # Return the processed result
1333 return self._process_result(
1334 args[0],
1335 res,
1336 response_policy=command_policies.response_policy,
1337 **kwargs,
1338 )
1339 except Exception as e:
1340 if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
1341 if is_default_node:
1342 # Replace the default cluster node
1343 self.replace_default_node()
1344 # The nodes and slots cache were reinitialized.
1345 # Try again with the new cluster setup.
1346 retry_attempts -= 1
1347 continue
1348 else:
1349 # raise the exception
1350 raise e
1352 def _execute_command(self, target_node, *args, **kwargs):
1353 """
1354 Send a command to a node in the cluster
1355 """
1356 command = args[0]
1357 redis_node = None
1358 connection = None
1359 redirect_addr = None
1360 asking = False
1361 moved = False
1362 ttl = int(self.RedisClusterRequestTTL)
1364 while ttl > 0:
1365 ttl -= 1
1366 try:
1367 if asking:
1368 target_node = self.get_node(node_name=redirect_addr)
1369 elif moved:
1370 # MOVED occurred and the slots cache was updated,
1371 # refresh the target node
1372 slot = self.determine_slot(*args)
1373 target_node = self.nodes_manager.get_node_from_slot(
1374 slot,
1375 self.read_from_replicas and command in READ_COMMANDS,
1376 self.load_balancing_strategy
1377 if command in READ_COMMANDS
1378 else None,
1379 )
1380 moved = False
1382 redis_node = self.get_redis_connection(target_node)
1383 connection = get_connection(redis_node)
1384 if asking:
1385 connection.send_command("ASKING")
1386 redis_node.parse_response(connection, "ASKING", **kwargs)
1387 asking = False
1388 connection.send_command(*args, **kwargs)
1389 response = redis_node.parse_response(connection, command, **kwargs)
1391 # Remove keys entry, it needs only for cache.
1392 kwargs.pop("keys", None)
1394 if command in self.cluster_response_callbacks:
1395 response = self.cluster_response_callbacks[command](
1396 response, **kwargs
1397 )
1398 return response
1399 except AuthenticationError:
1400 raise
1401 except MaxConnectionsError:
1402 # MaxConnectionsError indicates client-side resource exhaustion
1403 # (too many connections in the pool), not a node failure.
1404 # Don't treat this as a node failure - just re-raise the error
1405 # without reinitializing the cluster.
1406 raise
1407 except (ConnectionError, TimeoutError) as e:
1408 # ConnectionError can also be raised if we couldn't get a
1409 # connection from the pool before timing out, so check that
1410 # this is an actual connection before attempting to disconnect.
1411 if connection is not None:
1412 connection.disconnect()
1414 # Remove the failed node from the startup nodes before we try
1415 # to reinitialize the cluster
1416 self.nodes_manager.startup_nodes.pop(target_node.name, None)
1417 # Reset the cluster node's connection
1418 target_node.redis_connection = None
1419 self.nodes_manager.initialize()
1420 raise e
1421 except MovedError as e:
1422 # First, we will try to patch the slots/nodes cache with the
1423 # redirected node output and try again. If MovedError exceeds
1424 # 'reinitialize_steps' number of times, we will force
1425 # reinitializing the tables, and then try again.
1426 # 'reinitialize_steps' counter will increase faster when
1427 # the same client object is shared between multiple threads. To
1428 # reduce the frequency you can set this variable in the
1429 # RedisCluster constructor.
1430 self.reinitialize_counter += 1
1431 if self._should_reinitialized():
1432 self.nodes_manager.initialize()
1433 # Reset the counter
1434 self.reinitialize_counter = 0
1435 else:
1436 self.nodes_manager.update_moved_exception(e)
1437 moved = True
1438 except TryAgainError:
1439 if ttl < self.RedisClusterRequestTTL / 2:
1440 time.sleep(0.05)
1441 except AskError as e:
1442 redirect_addr = get_node_name(host=e.host, port=e.port)
1443 asking = True
1444 except (ClusterDownError, SlotNotCoveredError):
1445 # ClusterDownError can occur during a failover and to get
1446 # self-healed, we will try to reinitialize the cluster layout
1447 # and retry executing the command
1449 # SlotNotCoveredError can occur when the cluster is not fully
1450 # initialized or can be temporary issue.
1451 # We will try to reinitialize the cluster topology
1452 # and retry executing the command
1454 time.sleep(0.25)
1455 self.nodes_manager.initialize()
1456 raise
1457 except ResponseError:
1458 raise
1459 except Exception as e:
1460 if connection:
1461 connection.disconnect()
1462 raise e
1463 finally:
1464 if connection is not None:
1465 redis_node.connection_pool.release(connection)
1467 raise ClusterError("TTL exhausted.")
1469 def close(self) -> None:
1470 try:
1471 with self._lock:
1472 if self.nodes_manager:
1473 self.nodes_manager.close()
1474 except AttributeError:
1475 # RedisCluster's __init__ can fail before nodes_manager is set
1476 pass
1478 def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs):
1479 """
1480 Process the result of the executed command.
1481 The function would return a dict or a single value.
1483 :type command: str
1484 :type res: dict
1486 `res` should be in the following format:
1487 Dict<node_name, command_result>
1488 """
1489 if command in self.result_callbacks:
1490 res = self.result_callbacks[command](command, res, **kwargs)
1491 elif len(res) == 1:
1492 # When we execute the command on a single node, we can
1493 # remove the dictionary and return a single response
1494 res = list(res.values())[0]
1496 return self._policies_callback_mapping[response_policy](res)
1498 def load_external_module(self, funcname, func):
1499 """
1500 This function can be used to add externally defined redis modules,
1501 and their namespaces to the redis client.
1503 ``funcname`` - A string containing the name of the function to create
1504 ``func`` - The function, being added to this class.
1505 """
1506 setattr(self, funcname, func)
1508 def transaction(self, func, *watches, **kwargs):
1509 """
1510 Convenience method for executing the callable `func` as a transaction
1511 while watching all keys specified in `watches`. The 'func' callable
1512 should expect a single argument which is a Pipeline object.
1513 """
1514 shard_hint = kwargs.pop("shard_hint", None)
1515 value_from_callable = kwargs.pop("value_from_callable", False)
1516 watch_delay = kwargs.pop("watch_delay", None)
1517 with self.pipeline(True, shard_hint) as pipe:
1518 while True:
1519 try:
1520 if watches:
1521 pipe.watch(*watches)
1522 func_value = func(pipe)
1523 exec_value = pipe.execute()
1524 return func_value if value_from_callable else exec_value
1525 except WatchError:
1526 if watch_delay is not None and watch_delay > 0:
1527 time.sleep(watch_delay)
1528 continue
1531class ClusterNode:
1532 def __init__(self, host, port, server_type=None, redis_connection=None):
1533 if host == "localhost":
1534 host = socket.gethostbyname(host)
1536 self.host = host
1537 self.port = port
1538 self.name = get_node_name(host, port)
1539 self.server_type = server_type
1540 self.redis_connection = redis_connection
1542 def __repr__(self):
1543 return (
1544 f"[host={self.host},"
1545 f"port={self.port},"
1546 f"name={self.name},"
1547 f"server_type={self.server_type},"
1548 f"redis_connection={self.redis_connection}]"
1549 )
1551 def __eq__(self, obj):
1552 return isinstance(obj, ClusterNode) and obj.name == self.name
1554 def __del__(self):
1555 try:
1556 if self.redis_connection is not None:
1557 self.redis_connection.close()
1558 except Exception:
1559 # Ignore errors when closing the connection
1560 pass
1563class LoadBalancingStrategy(Enum):
1564 ROUND_ROBIN = "round_robin"
1565 ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1566 RANDOM_REPLICA = "random_replica"
1569class LoadBalancer:
1570 """
1571 Round-Robin Load Balancing
1572 """
1574 def __init__(self, start_index: int = 0) -> None:
1575 self.primary_to_idx = {}
1576 self.start_index = start_index
1578 def get_server_index(
1579 self,
1580 primary: str,
1581 list_size: int,
1582 load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1583 ) -> int:
1584 if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1585 return self._get_random_replica_index(list_size)
1586 else:
1587 return self._get_round_robin_index(
1588 primary,
1589 list_size,
1590 load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1591 )
1593 def reset(self) -> None:
1594 self.primary_to_idx.clear()
1596 def _get_random_replica_index(self, list_size: int) -> int:
1597 return random.randint(1, list_size - 1)
1599 def _get_round_robin_index(
1600 self, primary: str, list_size: int, replicas_only: bool
1601 ) -> int:
1602 server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1603 if replicas_only and server_index == 0:
1604 # skip the primary node index
1605 server_index = 1
1606 # Update the index for the next round
1607 self.primary_to_idx[primary] = (server_index + 1) % list_size
1608 return server_index
1611class NodesManager:
1612 def __init__(
1613 self,
1614 startup_nodes,
1615 from_url=False,
1616 require_full_coverage=False,
1617 lock=None,
1618 dynamic_startup_nodes=True,
1619 connection_pool_class=ConnectionPool,
1620 address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
1621 cache: Optional[CacheInterface] = None,
1622 cache_config: Optional[CacheConfig] = None,
1623 cache_factory: Optional[CacheFactoryInterface] = None,
1624 event_dispatcher: Optional[EventDispatcher] = None,
1625 **kwargs,
1626 ):
1627 self.nodes_cache: Dict[str, Redis] = {}
1628 self.slots_cache = {}
1629 self.startup_nodes = {}
1630 self.default_node = None
1631 self.populate_startup_nodes(startup_nodes)
1632 self.from_url = from_url
1633 self._require_full_coverage = require_full_coverage
1634 self._dynamic_startup_nodes = dynamic_startup_nodes
1635 self.connection_pool_class = connection_pool_class
1636 self.address_remap = address_remap
1637 self._cache = cache
1638 self._cache_config = cache_config
1639 self._cache_factory = cache_factory
1640 self._moved_exception = None
1641 self.connection_kwargs = kwargs
1642 self.read_load_balancer = LoadBalancer()
1643 if lock is None:
1644 lock = threading.RLock()
1645 self._lock = lock
1646 if event_dispatcher is None:
1647 self._event_dispatcher = EventDispatcher()
1648 else:
1649 self._event_dispatcher = event_dispatcher
1650 self._credential_provider = self.connection_kwargs.get(
1651 "credential_provider", None
1652 )
1653 self.initialize()
1655 def get_node(self, host=None, port=None, node_name=None):
1656 """
1657 Get the requested node from the cluster's nodes.
1658 nodes.
1659 :return: ClusterNode if the node exists, else None
1660 """
1661 if host and port:
1662 # the user passed host and port
1663 if host == "localhost":
1664 host = socket.gethostbyname(host)
1665 return self.nodes_cache.get(get_node_name(host=host, port=port))
1666 elif node_name:
1667 return self.nodes_cache.get(node_name)
1668 else:
1669 return None
1671 def update_moved_exception(self, exception):
1672 self._moved_exception = exception
1674 def _update_moved_slots(self):
1675 """
1676 Update the slot's node with the redirected one
1677 """
1678 e = self._moved_exception
1679 redirected_node = self.get_node(host=e.host, port=e.port)
1680 if redirected_node is not None:
1681 # The node already exists
1682 if redirected_node.server_type is not PRIMARY:
1683 # Update the node's server type
1684 redirected_node.server_type = PRIMARY
1685 else:
1686 # This is a new node, we will add it to the nodes cache
1687 redirected_node = ClusterNode(e.host, e.port, PRIMARY)
1688 self.nodes_cache[redirected_node.name] = redirected_node
1689 if redirected_node in self.slots_cache[e.slot_id]:
1690 # The MOVED error resulted from a failover, and the new slot owner
1691 # had previously been a replica.
1692 old_primary = self.slots_cache[e.slot_id][0]
1693 # Update the old primary to be a replica and add it to the end of
1694 # the slot's node list
1695 old_primary.server_type = REPLICA
1696 self.slots_cache[e.slot_id].append(old_primary)
1697 # Remove the old replica, which is now a primary, from the slot's
1698 # node list
1699 self.slots_cache[e.slot_id].remove(redirected_node)
1700 # Override the old primary with the new one
1701 self.slots_cache[e.slot_id][0] = redirected_node
1702 if self.default_node == old_primary:
1703 # Update the default node with the new primary
1704 self.default_node = redirected_node
1705 else:
1706 # The new slot owner is a new server, or a server from a different
1707 # shard. We need to remove all current nodes from the slot's list
1708 # (including replications) and add just the new node.
1709 self.slots_cache[e.slot_id] = [redirected_node]
1710 # Reset moved_exception
1711 self._moved_exception = None
1713 @deprecated_args(
1714 args_to_warn=["server_type"],
1715 reason=(
1716 "In case you need select some load balancing strategy "
1717 "that will use replicas, please set it through 'load_balancing_strategy'"
1718 ),
1719 version="5.3.0",
1720 )
1721 def get_node_from_slot(
1722 self,
1723 slot,
1724 read_from_replicas=False,
1725 load_balancing_strategy=None,
1726 server_type=None,
1727 ) -> ClusterNode:
1728 """
1729 Gets a node that servers this hash slot
1730 """
1731 if self._moved_exception:
1732 with self._lock:
1733 if self._moved_exception:
1734 self._update_moved_slots()
1736 if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
1737 raise SlotNotCoveredError(
1738 f'Slot "{slot}" not covered by the cluster. '
1739 f'"require_full_coverage={self._require_full_coverage}"'
1740 )
1742 if read_from_replicas is True and load_balancing_strategy is None:
1743 load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1745 if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1746 # get the server index using the strategy defined in load_balancing_strategy
1747 primary_name = self.slots_cache[slot][0].name
1748 node_idx = self.read_load_balancer.get_server_index(
1749 primary_name, len(self.slots_cache[slot]), load_balancing_strategy
1750 )
1751 elif (
1752 server_type is None
1753 or server_type == PRIMARY
1754 or len(self.slots_cache[slot]) == 1
1755 ):
1756 # return a primary
1757 node_idx = 0
1758 else:
1759 # return a replica
1760 # randomly choose one of the replicas
1761 node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
1763 return self.slots_cache[slot][node_idx]
1765 def get_nodes_by_server_type(self, server_type):
1766 """
1767 Get all nodes with the specified server type
1768 :param server_type: 'primary' or 'replica'
1769 :return: list of ClusterNode
1770 """
1771 return [
1772 node
1773 for node in self.nodes_cache.values()
1774 if node.server_type == server_type
1775 ]
1777 def populate_startup_nodes(self, nodes):
1778 """
1779 Populate all startup nodes and filters out any duplicates
1780 """
1781 for n in nodes:
1782 self.startup_nodes[n.name] = n
1784 def check_slots_coverage(self, slots_cache):
1785 # Validate if all slots are covered or if we should try next
1786 # startup node
1787 for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
1788 if i not in slots_cache:
1789 return False
1790 return True
1792 def create_redis_connections(self, nodes):
1793 """
1794 This function will create a redis connection to all nodes in :nodes:
1795 """
1796 connection_pools = []
1797 for node in nodes:
1798 if node.redis_connection is None:
1799 node.redis_connection = self.create_redis_node(
1800 host=node.host, port=node.port, **self.connection_kwargs
1801 )
1802 connection_pools.append(node.redis_connection.connection_pool)
1804 self._event_dispatcher.dispatch(
1805 AfterPooledConnectionsInstantiationEvent(
1806 connection_pools, ClientType.SYNC, self._credential_provider
1807 )
1808 )
1810 def create_redis_node(self, host, port, **kwargs):
1811 # We are configuring the connection pool not to retry
1812 # connections on lower level clients to avoid retrying
1813 # connections to nodes that are not reachable
1814 # and to avoid blocking the connection pool.
1815 # The only error that will have some handling in the lower
1816 # level clients is ConnectionError which will trigger disconnection
1817 # of the socket.
1818 # The retries will be handled on cluster client level
1819 # where we will have proper handling of the cluster topology
1820 node_retry_config = Retry(
1821 backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
1822 )
1824 protocol = kwargs.get("protocol", None)
1825 if protocol in [3, "3"]:
1826 kwargs.update(
1827 {"maint_notifications_config": MaintNotificationsConfig(enabled=False)}
1828 )
1829 if self.from_url:
1830 # Create a redis node with a costumed connection pool
1831 kwargs.update({"host": host})
1832 kwargs.update({"port": port})
1833 kwargs.update({"cache": self._cache})
1834 kwargs.update({"retry": node_retry_config})
1835 r = Redis(connection_pool=self.connection_pool_class(**kwargs))
1836 else:
1837 r = Redis(
1838 host=host,
1839 port=port,
1840 cache=self._cache,
1841 retry=node_retry_config,
1842 **kwargs,
1843 )
1844 return r
1846 def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
1847 node_name = get_node_name(host, port)
1848 # check if we already have this node in the tmp_nodes_cache
1849 target_node = tmp_nodes_cache.get(node_name)
1850 if target_node is None:
1851 # before creating a new cluster node, check if the cluster node already
1852 # exists in the current nodes cache and has a valid connection so we can
1853 # reuse it
1854 target_node = self.nodes_cache.get(node_name)
1855 if target_node is None or target_node.redis_connection is None:
1856 # create new cluster node for this cluster
1857 target_node = ClusterNode(host, port, role)
1858 if target_node.server_type != role:
1859 target_node.server_type = role
1860 # add this node to the nodes cache
1861 tmp_nodes_cache[target_node.name] = target_node
1863 return target_node
1865 def initialize(self):
1866 """
1867 Initializes the nodes cache, slots cache and redis connections.
1868 :startup_nodes:
1869 Responsible for discovering other nodes in the cluster
1870 """
1871 self.reset()
1872 tmp_nodes_cache = {}
1873 tmp_slots = {}
1874 disagreements = []
1875 startup_nodes_reachable = False
1876 fully_covered = False
1877 kwargs = self.connection_kwargs
1878 exception = None
1879 # Convert to tuple to prevent RuntimeError if self.startup_nodes
1880 # is modified during iteration
1881 for startup_node in tuple(self.startup_nodes.values()):
1882 try:
1883 if startup_node.redis_connection:
1884 r = startup_node.redis_connection
1885 else:
1886 # Create a new Redis connection
1887 r = self.create_redis_node(
1888 startup_node.host, startup_node.port, **kwargs
1889 )
1890 self.startup_nodes[startup_node.name].redis_connection = r
1891 # Make sure cluster mode is enabled on this node
1892 try:
1893 cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1894 r.connection_pool.disconnect()
1895 except ResponseError:
1896 raise RedisClusterException(
1897 "Cluster mode is not enabled on this node"
1898 )
1899 startup_nodes_reachable = True
1900 except Exception as e:
1901 # Try the next startup node.
1902 # The exception is saved and raised only if we have no more nodes.
1903 exception = e
1904 continue
1906 # CLUSTER SLOTS command results in the following output:
1907 # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
1908 # where each node contains the following list: [IP, port, node_id]
1909 # Therefore, cluster_slots[0][2][0] will be the IP address of the
1910 # primary node of the first slot section.
1911 # If there's only one server in the cluster, its ``host`` is ''
1912 # Fix it to the host in startup_nodes
1913 if (
1914 len(cluster_slots) == 1
1915 and len(cluster_slots[0][2][0]) == 0
1916 and len(self.startup_nodes) == 1
1917 ):
1918 cluster_slots[0][2][0] = startup_node.host
1920 for slot in cluster_slots:
1921 primary_node = slot[2]
1922 host = str_if_bytes(primary_node[0])
1923 if host == "":
1924 host = startup_node.host
1925 port = int(primary_node[1])
1926 host, port = self.remap_host_port(host, port)
1928 nodes_for_slot = []
1930 target_node = self._get_or_create_cluster_node(
1931 host, port, PRIMARY, tmp_nodes_cache
1932 )
1933 nodes_for_slot.append(target_node)
1935 replica_nodes = slot[3:]
1936 for replica_node in replica_nodes:
1937 host = str_if_bytes(replica_node[0])
1938 port = int(replica_node[1])
1939 host, port = self.remap_host_port(host, port)
1940 target_replica_node = self._get_or_create_cluster_node(
1941 host, port, REPLICA, tmp_nodes_cache
1942 )
1943 nodes_for_slot.append(target_replica_node)
1945 for i in range(int(slot[0]), int(slot[1]) + 1):
1946 if i not in tmp_slots:
1947 tmp_slots[i] = nodes_for_slot
1948 else:
1949 # Validate that 2 nodes want to use the same slot cache
1950 # setup
1951 tmp_slot = tmp_slots[i][0]
1952 if tmp_slot.name != target_node.name:
1953 disagreements.append(
1954 f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
1955 )
1957 if len(disagreements) > 5:
1958 raise RedisClusterException(
1959 f"startup_nodes could not agree on a valid "
1960 f"slots cache: {', '.join(disagreements)}"
1961 )
1963 fully_covered = self.check_slots_coverage(tmp_slots)
1964 if fully_covered:
1965 # Don't need to continue to the next startup node if all
1966 # slots are covered
1967 break
1969 if not startup_nodes_reachable:
1970 raise RedisClusterException(
1971 f"Redis Cluster cannot be connected. Please provide at least "
1972 f"one reachable node: {str(exception)}"
1973 ) from exception
1975 if self._cache is None and self._cache_config is not None:
1976 if self._cache_factory is None:
1977 self._cache = CacheFactory(self._cache_config).get_cache()
1978 else:
1979 self._cache = self._cache_factory.get_cache()
1981 # Create Redis connections to all nodes
1982 self.create_redis_connections(list(tmp_nodes_cache.values()))
1984 # Check if the slots are not fully covered
1985 if not fully_covered and self._require_full_coverage:
1986 # Despite the requirement that the slots be covered, there
1987 # isn't a full coverage
1988 raise RedisClusterException(
1989 f"All slots are not covered after query all startup_nodes. "
1990 f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
1991 f"covered..."
1992 )
1994 # Set the tmp variables to the real variables
1995 self.nodes_cache = tmp_nodes_cache
1996 self.slots_cache = tmp_slots
1997 # Set the default node
1998 self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
1999 if self._dynamic_startup_nodes:
2000 # Populate the startup nodes with all discovered nodes
2001 self.startup_nodes = tmp_nodes_cache
2002 # If initialize was called after a MovedError, clear it
2003 self._moved_exception = None
2005 def close(self) -> None:
2006 self.default_node = None
2007 for node in self.nodes_cache.values():
2008 if node.redis_connection:
2009 node.redis_connection.close()
2011 def reset(self):
2012 try:
2013 self.read_load_balancer.reset()
2014 except TypeError:
2015 # The read_load_balancer is None, do nothing
2016 pass
2018 def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
2019 """
2020 Remap the host and port returned from the cluster to a different
2021 internal value. Useful if the client is not connecting directly
2022 to the cluster.
2023 """
2024 if self.address_remap:
2025 return self.address_remap((host, port))
2026 return host, port
2028 def find_connection_owner(self, connection: Connection) -> Optional[Redis]:
2029 node_name = get_node_name(connection.host, connection.port)
2030 for node in tuple(self.nodes_cache.values()):
2031 if node.redis_connection:
2032 conn_args = node.redis_connection.connection_pool.connection_kwargs
2033 if node_name == get_node_name(
2034 conn_args.get("host"), conn_args.get("port")
2035 ):
2036 return node
2039class ClusterPubSub(PubSub):
2040 """
2041 Wrapper for PubSub class.
2043 IMPORTANT: before using ClusterPubSub, read about the known limitations
2044 with pubsub in Cluster mode and learn how to workaround them:
2045 https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
2046 """
2048 def __init__(
2049 self,
2050 redis_cluster,
2051 node=None,
2052 host=None,
2053 port=None,
2054 push_handler_func=None,
2055 event_dispatcher: Optional["EventDispatcher"] = None,
2056 **kwargs,
2057 ):
2058 """
2059 When a pubsub instance is created without specifying a node, a single
2060 node will be transparently chosen for the pubsub connection on the
2061 first command execution. The node will be determined by:
2062 1. Hashing the channel name in the request to find its keyslot
2063 2. Selecting a node that handles the keyslot: If read_from_replicas is
2064 set to true or load_balancing_strategy is set, a replica can be selected.
2066 :type redis_cluster: RedisCluster
2067 :type node: ClusterNode
2068 :type host: str
2069 :type port: int
2070 """
2071 self.node = None
2072 self.set_pubsub_node(redis_cluster, node, host, port)
2073 connection_pool = (
2074 None
2075 if self.node is None
2076 else redis_cluster.get_redis_connection(self.node).connection_pool
2077 )
2078 self.cluster = redis_cluster
2079 self.node_pubsub_mapping = {}
2080 self._pubsubs_generator = self._pubsubs_generator()
2081 if event_dispatcher is None:
2082 self._event_dispatcher = EventDispatcher()
2083 else:
2084 self._event_dispatcher = event_dispatcher
2085 super().__init__(
2086 connection_pool=connection_pool,
2087 encoder=redis_cluster.encoder,
2088 push_handler_func=push_handler_func,
2089 event_dispatcher=self._event_dispatcher,
2090 **kwargs,
2091 )
2093 def set_pubsub_node(self, cluster, node=None, host=None, port=None):
2094 """
2095 The pubsub node will be set according to the passed node, host and port
2096 When none of the node, host, or port are specified - the node is set
2097 to None and will be determined by the keyslot of the channel in the
2098 first command to be executed.
2099 RedisClusterException will be thrown if the passed node does not exist
2100 in the cluster.
2101 If host is passed without port, or vice versa, a DataError will be
2102 thrown.
2103 :type cluster: RedisCluster
2104 :type node: ClusterNode
2105 :type host: str
2106 :type port: int
2107 """
2108 if node is not None:
2109 # node is passed by the user
2110 self._raise_on_invalid_node(cluster, node, node.host, node.port)
2111 pubsub_node = node
2112 elif host is not None and port is not None:
2113 # host and port passed by the user
2114 node = cluster.get_node(host=host, port=port)
2115 self._raise_on_invalid_node(cluster, node, host, port)
2116 pubsub_node = node
2117 elif any([host, port]) is True:
2118 # only 'host' or 'port' passed
2119 raise DataError("Passing a host requires passing a port, and vice versa")
2120 else:
2121 # nothing passed by the user. set node to None
2122 pubsub_node = None
2124 self.node = pubsub_node
2126 def get_pubsub_node(self):
2127 """
2128 Get the node that is being used as the pubsub connection
2129 """
2130 return self.node
2132 def _raise_on_invalid_node(self, redis_cluster, node, host, port):
2133 """
2134 Raise a RedisClusterException if the node is None or doesn't exist in
2135 the cluster.
2136 """
2137 if node is None or redis_cluster.get_node(node_name=node.name) is None:
2138 raise RedisClusterException(
2139 f"Node {host}:{port} doesn't exist in the cluster"
2140 )
2142 def execute_command(self, *args):
2143 """
2144 Execute a subscribe/unsubscribe command.
2146 Taken code from redis-py and tweak to make it work within a cluster.
2147 """
2148 # NOTE: don't parse the response in this function -- it could pull a
2149 # legitimate message off the stack if the connection is already
2150 # subscribed to one or more channels
2152 if self.connection is None:
2153 if self.connection_pool is None:
2154 if len(args) > 1:
2155 # Hash the first channel and get one of the nodes holding
2156 # this slot
2157 channel = args[1]
2158 slot = self.cluster.keyslot(channel)
2159 node = self.cluster.nodes_manager.get_node_from_slot(
2160 slot,
2161 self.cluster.read_from_replicas,
2162 self.cluster.load_balancing_strategy,
2163 )
2164 else:
2165 # Get a random node
2166 node = self.cluster.get_random_node()
2167 self.node = node
2168 redis_connection = self.cluster.get_redis_connection(node)
2169 self.connection_pool = redis_connection.connection_pool
2170 self.connection = self.connection_pool.get_connection()
2171 # register a callback that re-subscribes to any channels we
2172 # were listening to when we were disconnected
2173 self.connection.register_connect_callback(self.on_connect)
2174 if self.push_handler_func is not None:
2175 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
2176 self._event_dispatcher.dispatch(
2177 AfterPubSubConnectionInstantiationEvent(
2178 self.connection, self.connection_pool, ClientType.SYNC, self._lock
2179 )
2180 )
2181 connection = self.connection
2182 self._execute(connection, connection.send_command, *args)
2184 def _get_node_pubsub(self, node):
2185 try:
2186 return self.node_pubsub_mapping[node.name]
2187 except KeyError:
2188 pubsub = node.redis_connection.pubsub(
2189 push_handler_func=self.push_handler_func
2190 )
2191 self.node_pubsub_mapping[node.name] = pubsub
2192 return pubsub
2194 def _sharded_message_generator(self):
2195 for _ in range(len(self.node_pubsub_mapping)):
2196 pubsub = next(self._pubsubs_generator)
2197 message = pubsub.get_message()
2198 if message is not None:
2199 return message
2200 return None
2202 def _pubsubs_generator(self):
2203 while True:
2204 yield from self.node_pubsub_mapping.values()
2206 def get_sharded_message(
2207 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
2208 ):
2209 if target_node:
2210 message = self.node_pubsub_mapping[target_node.name].get_message(
2211 ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
2212 )
2213 else:
2214 message = self._sharded_message_generator()
2215 if message is None:
2216 return None
2217 elif str_if_bytes(message["type"]) == "sunsubscribe":
2218 if message["channel"] in self.pending_unsubscribe_shard_channels:
2219 self.pending_unsubscribe_shard_channels.remove(message["channel"])
2220 self.shard_channels.pop(message["channel"], None)
2221 node = self.cluster.get_node_from_key(message["channel"])
2222 if self.node_pubsub_mapping[node.name].subscribed is False:
2223 self.node_pubsub_mapping.pop(node.name)
2224 if not self.channels and not self.patterns and not self.shard_channels:
2225 # There are no subscriptions anymore, set subscribed_event flag
2226 # to false
2227 self.subscribed_event.clear()
2228 if self.ignore_subscribe_messages or ignore_subscribe_messages:
2229 return None
2230 return message
2232 def ssubscribe(self, *args, **kwargs):
2233 if args:
2234 args = list_or_args(args[0], args[1:])
2235 s_channels = dict.fromkeys(args)
2236 s_channels.update(kwargs)
2237 for s_channel, handler in s_channels.items():
2238 node = self.cluster.get_node_from_key(s_channel)
2239 pubsub = self._get_node_pubsub(node)
2240 if handler:
2241 pubsub.ssubscribe(**{s_channel: handler})
2242 else:
2243 pubsub.ssubscribe(s_channel)
2244 self.shard_channels.update(pubsub.shard_channels)
2245 self.pending_unsubscribe_shard_channels.difference_update(
2246 self._normalize_keys({s_channel: None})
2247 )
2248 if pubsub.subscribed and not self.subscribed:
2249 self.subscribed_event.set()
2250 self.health_check_response_counter = 0
2252 def sunsubscribe(self, *args):
2253 if args:
2254 args = list_or_args(args[0], args[1:])
2255 else:
2256 args = self.shard_channels
2258 for s_channel in args:
2259 node = self.cluster.get_node_from_key(s_channel)
2260 p = self._get_node_pubsub(node)
2261 p.sunsubscribe(s_channel)
2262 self.pending_unsubscribe_shard_channels.update(
2263 p.pending_unsubscribe_shard_channels
2264 )
2266 def get_redis_connection(self):
2267 """
2268 Get the Redis connection of the pubsub connected node.
2269 """
2270 if self.node is not None:
2271 return self.node.redis_connection
2273 def disconnect(self):
2274 """
2275 Disconnect the pubsub connection.
2276 """
2277 if self.connection:
2278 self.connection.disconnect()
2279 for pubsub in self.node_pubsub_mapping.values():
2280 pubsub.connection.disconnect()
2283class ClusterPipeline(RedisCluster):
2284 """
2285 Support for Redis pipeline
2286 in cluster mode
2287 """
2289 ERRORS_ALLOW_RETRY = (
2290 ConnectionError,
2291 TimeoutError,
2292 MovedError,
2293 AskError,
2294 TryAgainError,
2295 )
2297 NO_SLOTS_COMMANDS = {"UNWATCH"}
2298 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
2299 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
2301 @deprecated_args(
2302 args_to_warn=[
2303 "cluster_error_retry_attempts",
2304 ],
2305 reason="Please configure the 'retry' object instead",
2306 version="6.0.0",
2307 )
2308 def __init__(
2309 self,
2310 nodes_manager: "NodesManager",
2311 commands_parser: "CommandsParser",
2312 result_callbacks: Optional[Dict[str, Callable]] = None,
2313 cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
2314 startup_nodes: Optional[List["ClusterNode"]] = None,
2315 read_from_replicas: bool = False,
2316 load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
2317 cluster_error_retry_attempts: int = 3,
2318 reinitialize_steps: int = 5,
2319 retry: Optional[Retry] = None,
2320 lock=None,
2321 transaction=False,
2322 policy_resolver: PolicyResolver = StaticPolicyResolver(),
2323 **kwargs,
2324 ):
2325 """ """
2326 self.command_stack = []
2327 self.nodes_manager = nodes_manager
2328 self.commands_parser = commands_parser
2329 self.refresh_table_asap = False
2330 self.result_callbacks = (
2331 result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
2332 )
2333 self.startup_nodes = startup_nodes if startup_nodes else []
2334 self.read_from_replicas = read_from_replicas
2335 self.load_balancing_strategy = load_balancing_strategy
2336 self.command_flags = self.__class__.COMMAND_FLAGS.copy()
2337 self.cluster_response_callbacks = cluster_response_callbacks
2338 self.reinitialize_counter = 0
2339 self.reinitialize_steps = reinitialize_steps
2340 if retry is not None:
2341 self.retry = retry
2342 else:
2343 self.retry = Retry(
2344 backoff=ExponentialWithJitterBackoff(base=1, cap=10),
2345 retries=cluster_error_retry_attempts,
2346 )
2348 self.encoder = Encoder(
2349 kwargs.get("encoding", "utf-8"),
2350 kwargs.get("encoding_errors", "strict"),
2351 kwargs.get("decode_responses", False),
2352 )
2353 if lock is None:
2354 lock = threading.RLock()
2355 self._lock = lock
2356 self.parent_execute_command = super().execute_command
2357 self._execution_strategy: ExecutionStrategy = (
2358 PipelineStrategy(self) if not transaction else TransactionStrategy(self)
2359 )
2361 # For backward compatibility, mapping from existing policies to new one
2362 self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
2363 self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
2364 self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
2365 self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
2366 self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
2367 self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
2368 SLOT_ID: RequestPolicy.DEFAULT_KEYED,
2369 }
2371 self._policies_callback_mapping: dict[
2372 Union[RequestPolicy, ResponsePolicy], Callable
2373 ] = {
2374 RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
2375 self.get_random_primary_or_all_nodes(command_name)
2376 ],
2377 RequestPolicy.DEFAULT_KEYED: lambda command,
2378 *args: self.get_nodes_from_slot(command, *args),
2379 RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
2380 RequestPolicy.ALL_SHARDS: self.get_primaries,
2381 RequestPolicy.ALL_NODES: self.get_nodes,
2382 RequestPolicy.ALL_REPLICAS: self.get_replicas,
2383 RequestPolicy.MULTI_SHARD: lambda *args,
2384 **kwargs: self._split_multi_shard_command(*args, **kwargs),
2385 RequestPolicy.SPECIAL: self.get_special_nodes,
2386 ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
2387 ResponsePolicy.DEFAULT_KEYED: lambda res: res,
2388 }
2390 self._policy_resolver = policy_resolver
2392 def __repr__(self):
2393 """ """
2394 return f"{type(self).__name__}"
2396 def __enter__(self):
2397 """ """
2398 return self
2400 def __exit__(self, exc_type, exc_value, traceback):
2401 """ """
2402 self.reset()
2404 def __del__(self):
2405 try:
2406 self.reset()
2407 except Exception:
2408 pass
2410 def __len__(self):
2411 """ """
2412 return len(self._execution_strategy.command_queue)
2414 def __bool__(self):
2415 "Pipeline instances should always evaluate to True on Python 3+"
2416 return True
2418 def execute_command(self, *args, **kwargs):
2419 """
2420 Wrapper function for pipeline_execute_command
2421 """
2422 return self._execution_strategy.execute_command(*args, **kwargs)
2424 def pipeline_execute_command(self, *args, **options):
2425 """
2426 Stage a command to be executed when execute() is next called
2428 Returns the current Pipeline object back so commands can be
2429 chained together, such as:
2431 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
2433 At some other point, you can then run: pipe.execute(),
2434 which will execute all commands queued in the pipe.
2435 """
2436 return self._execution_strategy.execute_command(*args, **options)
2438 def annotate_exception(self, exception, number, command):
2439 """
2440 Provides extra context to the exception prior to it being handled
2441 """
2442 self._execution_strategy.annotate_exception(exception, number, command)
2444 def execute(self, raise_on_error: bool = True) -> List[Any]:
2445 """
2446 Execute all the commands in the current pipeline
2447 """
2449 try:
2450 return self._execution_strategy.execute(raise_on_error)
2451 finally:
2452 self.reset()
2454 def reset(self):
2455 """
2456 Reset back to empty pipeline.
2457 """
2458 self._execution_strategy.reset()
2460 def send_cluster_commands(
2461 self, stack, raise_on_error=True, allow_redirections=True
2462 ):
2463 return self._execution_strategy.send_cluster_commands(
2464 stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
2465 )
2467 def exists(self, *keys):
2468 return self._execution_strategy.exists(*keys)
2470 def eval(self):
2471 """ """
2472 return self._execution_strategy.eval()
2474 def multi(self):
2475 """
2476 Start a transactional block of the pipeline after WATCH commands
2477 are issued. End the transactional block with `execute`.
2478 """
2479 self._execution_strategy.multi()
2481 def load_scripts(self):
2482 """ """
2483 self._execution_strategy.load_scripts()
2485 def discard(self):
2486 """ """
2487 self._execution_strategy.discard()
2489 def watch(self, *names):
2490 """Watches the values at keys ``names``"""
2491 self._execution_strategy.watch(*names)
2493 def unwatch(self):
2494 """Unwatches all previously specified keys"""
2495 self._execution_strategy.unwatch()
2497 def script_load_for_pipeline(self, *args, **kwargs):
2498 self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
2500 def delete(self, *names):
2501 self._execution_strategy.delete(*names)
2503 def unlink(self, *names):
2504 self._execution_strategy.unlink(*names)
2507def block_pipeline_command(name: str) -> Callable[..., Any]:
2508 """
2509 Prints error because some pipelined commands should
2510 be blocked when running in cluster-mode
2511 """
2513 def inner(*args, **kwargs):
2514 raise RedisClusterException(
2515 f"ERROR: Calling pipelined function {name} is blocked "
2516 f"when running redis in cluster mode..."
2517 )
2519 return inner
2522# Blocked pipeline commands
2523PIPELINE_BLOCKED_COMMANDS = (
2524 "BGREWRITEAOF",
2525 "BGSAVE",
2526 "BITOP",
2527 "BRPOPLPUSH",
2528 "CLIENT GETNAME",
2529 "CLIENT KILL",
2530 "CLIENT LIST",
2531 "CLIENT SETNAME",
2532 "CLIENT",
2533 "CONFIG GET",
2534 "CONFIG RESETSTAT",
2535 "CONFIG REWRITE",
2536 "CONFIG SET",
2537 "CONFIG",
2538 "DBSIZE",
2539 "ECHO",
2540 "EVALSHA",
2541 "FLUSHALL",
2542 "FLUSHDB",
2543 "INFO",
2544 "KEYS",
2545 "LASTSAVE",
2546 "MGET",
2547 "MGET NONATOMIC",
2548 "MOVE",
2549 "MSET",
2550 "MSETEX",
2551 "MSET NONATOMIC",
2552 "MSETNX",
2553 "PFCOUNT",
2554 "PFMERGE",
2555 "PING",
2556 "PUBLISH",
2557 "RANDOMKEY",
2558 "READONLY",
2559 "READWRITE",
2560 "RENAME",
2561 "RENAMENX",
2562 "RPOPLPUSH",
2563 "SAVE",
2564 "SCAN",
2565 "SCRIPT EXISTS",
2566 "SCRIPT FLUSH",
2567 "SCRIPT KILL",
2568 "SCRIPT LOAD",
2569 "SCRIPT",
2570 "SDIFF",
2571 "SDIFFSTORE",
2572 "SENTINEL GET MASTER ADDR BY NAME",
2573 "SENTINEL MASTER",
2574 "SENTINEL MASTERS",
2575 "SENTINEL MONITOR",
2576 "SENTINEL REMOVE",
2577 "SENTINEL SENTINELS",
2578 "SENTINEL SET",
2579 "SENTINEL SLAVES",
2580 "SENTINEL",
2581 "SHUTDOWN",
2582 "SINTER",
2583 "SINTERSTORE",
2584 "SLAVEOF",
2585 "SLOWLOG GET",
2586 "SLOWLOG LEN",
2587 "SLOWLOG RESET",
2588 "SLOWLOG",
2589 "SMOVE",
2590 "SORT",
2591 "SUNION",
2592 "SUNIONSTORE",
2593 "TIME",
2594)
2595for command in PIPELINE_BLOCKED_COMMANDS:
2596 command = command.replace(" ", "_").lower()
2598 setattr(ClusterPipeline, command, block_pipeline_command(command))
2601class PipelineCommand:
2602 """ """
2604 def __init__(self, args, options=None, position=None):
2605 self.args = args
2606 if options is None:
2607 options = {}
2608 self.options = options
2609 self.position = position
2610 self.result = None
2611 self.node = None
2612 self.asking = False
2613 self.command_policies: Optional[CommandPolicies] = None
2616class NodeCommands:
2617 """ """
2619 def __init__(self, parse_response, connection_pool, connection):
2620 """ """
2621 self.parse_response = parse_response
2622 self.connection_pool = connection_pool
2623 self.connection = connection
2624 self.commands = []
2626 def append(self, c):
2627 """ """
2628 self.commands.append(c)
2630 def write(self):
2631 """
2632 Code borrowed from Redis so it can be fixed
2633 """
2634 connection = self.connection
2635 commands = self.commands
2637 # We are going to clobber the commands with the write, so go ahead
2638 # and ensure that nothing is sitting there from a previous run.
2639 for c in commands:
2640 c.result = None
2642 # build up all commands into a single request to increase network perf
2643 # send all the commands and catch connection and timeout errors.
2644 try:
2645 connection.send_packed_command(
2646 connection.pack_commands([c.args for c in commands])
2647 )
2648 except (ConnectionError, TimeoutError) as e:
2649 for c in commands:
2650 c.result = e
2652 def read(self):
2653 """ """
2654 connection = self.connection
2655 for c in self.commands:
2656 # if there is a result on this command,
2657 # it means we ran into an exception
2658 # like a connection error. Trying to parse
2659 # a response on a connection that
2660 # is no longer open will result in a
2661 # connection error raised by redis-py.
2662 # but redis-py doesn't check in parse_response
2663 # that the sock object is
2664 # still set and if you try to
2665 # read from a closed connection, it will
2666 # result in an AttributeError because
2667 # it will do a readline() call on None.
2668 # This can have all kinds of nasty side-effects.
2669 # Treating this case as a connection error
2670 # is fine because it will dump
2671 # the connection object back into the
2672 # pool and on the next write, it will
2673 # explicitly open the connection and all will be well.
2674 if c.result is None:
2675 try:
2676 c.result = self.parse_response(connection, c.args[0], **c.options)
2677 except (ConnectionError, TimeoutError) as e:
2678 for c in self.commands:
2679 c.result = e
2680 return
2681 except RedisError:
2682 c.result = sys.exc_info()[1]
2685class ExecutionStrategy(ABC):
2686 @property
2687 @abstractmethod
2688 def command_queue(self):
2689 pass
2691 @abstractmethod
2692 def execute_command(self, *args, **kwargs):
2693 """
2694 Execution flow for current execution strategy.
2696 See: ClusterPipeline.execute_command()
2697 """
2698 pass
2700 @abstractmethod
2701 def annotate_exception(self, exception, number, command):
2702 """
2703 Annotate exception according to current execution strategy.
2705 See: ClusterPipeline.annotate_exception()
2706 """
2707 pass
2709 @abstractmethod
2710 def pipeline_execute_command(self, *args, **options):
2711 """
2712 Pipeline execution flow for current execution strategy.
2714 See: ClusterPipeline.pipeline_execute_command()
2715 """
2716 pass
2718 @abstractmethod
2719 def execute(self, raise_on_error: bool = True) -> List[Any]:
2720 """
2721 Executes current execution strategy.
2723 See: ClusterPipeline.execute()
2724 """
2725 pass
2727 @abstractmethod
2728 def send_cluster_commands(
2729 self, stack, raise_on_error=True, allow_redirections=True
2730 ):
2731 """
2732 Sends commands according to current execution strategy.
2734 See: ClusterPipeline.send_cluster_commands()
2735 """
2736 pass
2738 @abstractmethod
2739 def reset(self):
2740 """
2741 Resets current execution strategy.
2743 See: ClusterPipeline.reset()
2744 """
2745 pass
2747 @abstractmethod
2748 def exists(self, *keys):
2749 pass
2751 @abstractmethod
2752 def eval(self):
2753 pass
2755 @abstractmethod
2756 def multi(self):
2757 """
2758 Starts transactional context.
2760 See: ClusterPipeline.multi()
2761 """
2762 pass
2764 @abstractmethod
2765 def load_scripts(self):
2766 pass
2768 @abstractmethod
2769 def watch(self, *names):
2770 pass
2772 @abstractmethod
2773 def unwatch(self):
2774 """
2775 Unwatches all previously specified keys
2777 See: ClusterPipeline.unwatch()
2778 """
2779 pass
2781 @abstractmethod
2782 def script_load_for_pipeline(self, *args, **kwargs):
2783 pass
2785 @abstractmethod
2786 def delete(self, *names):
2787 """
2788 "Delete a key specified by ``names``"
2790 See: ClusterPipeline.delete()
2791 """
2792 pass
2794 @abstractmethod
2795 def unlink(self, *names):
2796 """
2797 "Unlink a key specified by ``names``"
2799 See: ClusterPipeline.unlink()
2800 """
2801 pass
2803 @abstractmethod
2804 def discard(self):
2805 pass
2808class AbstractStrategy(ExecutionStrategy):
2809 def __init__(
2810 self,
2811 pipe: ClusterPipeline,
2812 ):
2813 self._command_queue: List[PipelineCommand] = []
2814 self._pipe = pipe
2815 self._nodes_manager = self._pipe.nodes_manager
2817 @property
2818 def command_queue(self):
2819 return self._command_queue
2821 @command_queue.setter
2822 def command_queue(self, queue: List[PipelineCommand]):
2823 self._command_queue = queue
2825 @abstractmethod
2826 def execute_command(self, *args, **kwargs):
2827 pass
2829 def pipeline_execute_command(self, *args, **options):
2830 self._command_queue.append(
2831 PipelineCommand(args, options, len(self._command_queue))
2832 )
2833 return self._pipe
2835 @abstractmethod
2836 def execute(self, raise_on_error: bool = True) -> List[Any]:
2837 pass
2839 @abstractmethod
2840 def send_cluster_commands(
2841 self, stack, raise_on_error=True, allow_redirections=True
2842 ):
2843 pass
2845 @abstractmethod
2846 def reset(self):
2847 pass
2849 def exists(self, *keys):
2850 return self.execute_command("EXISTS", *keys)
2852 def eval(self):
2853 """ """
2854 raise RedisClusterException("method eval() is not implemented")
2856 def load_scripts(self):
2857 """ """
2858 raise RedisClusterException("method load_scripts() is not implemented")
2860 def script_load_for_pipeline(self, *args, **kwargs):
2861 """ """
2862 raise RedisClusterException(
2863 "method script_load_for_pipeline() is not implemented"
2864 )
2866 def annotate_exception(self, exception, number, command):
2867 """
2868 Provides extra context to the exception prior to it being handled
2869 """
2870 cmd = " ".join(map(safe_str, command))
2871 msg = (
2872 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
2873 f"caused error: {exception.args[0]}"
2874 )
2875 exception.args = (msg,) + exception.args[1:]
2878class PipelineStrategy(AbstractStrategy):
2879 def __init__(self, pipe: ClusterPipeline):
2880 super().__init__(pipe)
2881 self.command_flags = pipe.command_flags
2883 def execute_command(self, *args, **kwargs):
2884 return self.pipeline_execute_command(*args, **kwargs)
2886 def _raise_first_error(self, stack):
2887 """
2888 Raise the first exception on the stack
2889 """
2890 for c in stack:
2891 r = c.result
2892 if isinstance(r, Exception):
2893 self.annotate_exception(r, c.position + 1, c.args)
2894 raise r
2896 def execute(self, raise_on_error: bool = True) -> List[Any]:
2897 stack = self._command_queue
2898 if not stack:
2899 return []
2901 try:
2902 return self.send_cluster_commands(stack, raise_on_error)
2903 finally:
2904 self.reset()
2906 def reset(self):
2907 """
2908 Reset back to empty pipeline.
2909 """
2910 self._command_queue = []
2912 def send_cluster_commands(
2913 self, stack, raise_on_error=True, allow_redirections=True
2914 ):
2915 """
2916 Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
2918 If one of the retryable exceptions has been thrown we assume that:
2919 - connection_pool was disconnected
2920 - connection_pool was reset
2921 - refresh_table_asap set to True
2923 It will try the number of times specified by
2924 the retries in config option "self.retry"
2925 which defaults to 3 unless manually configured.
2927 If it reaches the number of times, the command will
2928 raises ClusterDownException.
2929 """
2930 if not stack:
2931 return []
2932 retry_attempts = self._pipe.retry.get_retries()
2933 while True:
2934 try:
2935 return self._send_cluster_commands(
2936 stack,
2937 raise_on_error=raise_on_error,
2938 allow_redirections=allow_redirections,
2939 )
2940 except RedisCluster.ERRORS_ALLOW_RETRY as e:
2941 if retry_attempts > 0:
2942 # Try again with the new cluster setup. All other errors
2943 # should be raised.
2944 retry_attempts -= 1
2945 pass
2946 else:
2947 raise e
2949 def _send_cluster_commands(
2950 self, stack, raise_on_error=True, allow_redirections=True
2951 ):
2952 """
2953 Send a bunch of cluster commands to the redis cluster.
2955 `allow_redirections` If the pipeline should follow
2956 `ASK` & `MOVED` responses automatically. If set
2957 to false it will raise RedisClusterException.
2958 """
2959 # the first time sending the commands we send all of
2960 # the commands that were queued up.
2961 # if we have to run through it again, we only retry
2962 # the commands that failed.
2963 attempt = sorted(stack, key=lambda x: x.position)
2964 is_default_node = False
2965 # build a list of node objects based on node names we need to
2966 nodes = {}
2968 # as we move through each command that still needs to be processed,
2969 # we figure out the slot number that command maps to, then from
2970 # the slot determine the node.
2971 for c in attempt:
2972 command_policies = self._pipe._policy_resolver.resolve(c.args[0].lower())
2974 while True:
2975 # refer to our internal node -> slot table that
2976 # tells us where a given command should route to.
2977 # (it might be possible we have a cached node that no longer
2978 # exists in the cluster, which is why we do this in a loop)
2979 passed_targets = c.options.pop("target_nodes", None)
2980 if passed_targets and not self._is_nodes_flag(passed_targets):
2981 target_nodes = self._parse_target_nodes(passed_targets)
2983 if not command_policies:
2984 command_policies = CommandPolicies()
2985 else:
2986 if not command_policies:
2987 command = c.args[0].upper()
2988 if (
2989 len(c.args) >= 2
2990 and f"{c.args[0]} {c.args[1]}".upper()
2991 in self._pipe.command_flags
2992 ):
2993 command = f"{c.args[0]} {c.args[1]}".upper()
2995 # We only could resolve key properties if command is not
2996 # in a list of pre-defined request policies
2997 command_flag = self.command_flags.get(command)
2998 if not command_flag:
2999 # Fallback to default policy
3000 if not self._pipe.get_default_node():
3001 keys = None
3002 else:
3003 keys = self._pipe._get_command_keys(*c.args)
3004 if not keys or len(keys) == 0:
3005 command_policies = CommandPolicies()
3006 else:
3007 command_policies = CommandPolicies(
3008 request_policy=RequestPolicy.DEFAULT_KEYED,
3009 response_policy=ResponsePolicy.DEFAULT_KEYED,
3010 )
3011 else:
3012 if command_flag in self._pipe._command_flags_mapping:
3013 command_policies = CommandPolicies(
3014 request_policy=self._pipe._command_flags_mapping[
3015 command_flag
3016 ]
3017 )
3018 else:
3019 command_policies = CommandPolicies()
3021 target_nodes = self._determine_nodes(
3022 *c.args,
3023 request_policy=command_policies.request_policy,
3024 node_flag=passed_targets,
3025 )
3026 if not target_nodes:
3027 raise RedisClusterException(
3028 f"No targets were found to execute {c.args} command on"
3029 )
3030 c.command_policies = command_policies
3031 if len(target_nodes) > 1:
3032 raise RedisClusterException(
3033 f"Too many targets for command {c.args}"
3034 )
3036 node = target_nodes[0]
3037 if node == self._pipe.get_default_node():
3038 is_default_node = True
3040 # now that we know the name of the node
3041 # ( it's just a string in the form of host:port )
3042 # we can build a list of commands for each node.
3043 node_name = node.name
3044 if node_name not in nodes:
3045 redis_node = self._pipe.get_redis_connection(node)
3046 try:
3047 connection = get_connection(redis_node)
3048 except (ConnectionError, TimeoutError):
3049 for n in nodes.values():
3050 n.connection_pool.release(n.connection)
3051 # Connection retries are being handled in the node's
3052 # Retry object. Reinitialize the node -> slot table.
3053 self._nodes_manager.initialize()
3054 if is_default_node:
3055 self._pipe.replace_default_node()
3056 raise
3057 nodes[node_name] = NodeCommands(
3058 redis_node.parse_response,
3059 redis_node.connection_pool,
3060 connection,
3061 )
3062 nodes[node_name].append(c)
3063 break
3065 # send the commands in sequence.
3066 # we write to all the open sockets for each node first,
3067 # before reading anything
3068 # this allows us to flush all the requests out across the
3069 # network
3070 # so that we can read them from different sockets as they come back.
3071 # we dont' multiplex on the sockets as they come available,
3072 # but that shouldn't make too much difference.
3073 try:
3074 node_commands = nodes.values()
3075 for n in node_commands:
3076 n.write()
3078 for n in node_commands:
3079 n.read()
3080 finally:
3081 # release all of the redis connections we allocated earlier
3082 # back into the connection pool.
3083 # we used to do this step as part of a try/finally block,
3084 # but it is really dangerous to
3085 # release connections back into the pool if for some
3086 # reason the socket has data still left in it
3087 # from a previous operation. The write and
3088 # read operations already have try/catch around them for
3089 # all known types of errors including connection
3090 # and socket level errors.
3091 # So if we hit an exception, something really bad
3092 # happened and putting any oF
3093 # these connections back into the pool is a very bad idea.
3094 # the socket might have unread buffer still sitting in it,
3095 # and then the next time we read from it we pass the
3096 # buffered result back from a previous command and
3097 # every single request after to that connection will always get
3098 # a mismatched result.
3099 for n in nodes.values():
3100 n.connection_pool.release(n.connection)
3102 # if the response isn't an exception it is a
3103 # valid response from the node
3104 # we're all done with that command, YAY!
3105 # if we have more commands to attempt, we've run into problems.
3106 # collect all the commands we are allowed to retry.
3107 # (MOVED, ASK, or connection errors or timeout errors)
3108 attempt = sorted(
3109 (
3110 c
3111 for c in attempt
3112 if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
3113 ),
3114 key=lambda x: x.position,
3115 )
3116 if attempt and allow_redirections:
3117 # RETRY MAGIC HAPPENS HERE!
3118 # send these remaining commands one at a time using `execute_command`
3119 # in the main client. This keeps our retry logic
3120 # in one place mostly,
3121 # and allows us to be more confident in correctness of behavior.
3122 # at this point any speed gains from pipelining have been lost
3123 # anyway, so we might as well make the best
3124 # attempt to get the correct behavior.
3125 #
3126 # The client command will handle retries for each
3127 # individual command sequentially as we pass each
3128 # one into `execute_command`. Any exceptions
3129 # that bubble out should only appear once all
3130 # retries have been exhausted.
3131 #
3132 # If a lot of commands have failed, we'll be setting the
3133 # flag to rebuild the slots table from scratch.
3134 # So MOVED errors should correct themselves fairly quickly.
3135 self._pipe.reinitialize_counter += 1
3136 if self._pipe._should_reinitialized():
3137 self._nodes_manager.initialize()
3138 if is_default_node:
3139 self._pipe.replace_default_node()
3140 for c in attempt:
3141 try:
3142 # send each command individually like we
3143 # do in the main client.
3144 c.result = self._pipe.parent_execute_command(*c.args, **c.options)
3145 except RedisError as e:
3146 c.result = e
3148 # turn the response back into a simple flat array that corresponds
3149 # to the sequence of commands issued in the stack in pipeline.execute()
3150 response = []
3151 for c in sorted(stack, key=lambda x: x.position):
3152 if c.args[0] in self._pipe.cluster_response_callbacks:
3153 # Remove keys entry, it needs only for cache.
3154 c.options.pop("keys", None)
3155 c.result = self._pipe._policies_callback_mapping[
3156 c.command_policies.response_policy
3157 ](
3158 self._pipe.cluster_response_callbacks[c.args[0]](
3159 c.result, **c.options
3160 )
3161 )
3162 response.append(c.result)
3164 if raise_on_error:
3165 self._raise_first_error(stack)
3167 return response
3169 def _is_nodes_flag(self, target_nodes):
3170 return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
3172 def _parse_target_nodes(self, target_nodes):
3173 if isinstance(target_nodes, list):
3174 nodes = target_nodes
3175 elif isinstance(target_nodes, ClusterNode):
3176 # Supports passing a single ClusterNode as a variable
3177 nodes = [target_nodes]
3178 elif isinstance(target_nodes, dict):
3179 # Supports dictionaries of the format {node_name: node}.
3180 # It enables to execute commands with multi nodes as follows:
3181 # rc.cluster_save_config(rc.get_primaries())
3182 nodes = target_nodes.values()
3183 else:
3184 raise TypeError(
3185 "target_nodes type can be one of the following: "
3186 "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
3187 "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
3188 f"The passed type is {type(target_nodes)}"
3189 )
3190 return nodes
3192 def _determine_nodes(
3193 self, *args, request_policy: RequestPolicy, **kwargs
3194 ) -> List["ClusterNode"]:
3195 # Determine which nodes should be executed the command on.
3196 # Returns a list of target nodes.
3197 command = args[0].upper()
3198 if (
3199 len(args) >= 2
3200 and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
3201 ):
3202 command = f"{args[0]} {args[1]}".upper()
3204 nodes_flag = kwargs.pop("nodes_flag", None)
3205 if nodes_flag is not None:
3206 # nodes flag passed by the user
3207 command_flag = nodes_flag
3208 else:
3209 # get the nodes group for this command if it was predefined
3210 command_flag = self._pipe.command_flags.get(command)
3212 if command_flag in self._pipe._command_flags_mapping:
3213 request_policy = self._pipe._command_flags_mapping[command_flag]
3215 policy_callback = self._pipe._policies_callback_mapping[request_policy]
3217 if request_policy == RequestPolicy.DEFAULT_KEYED:
3218 nodes = policy_callback(command, *args)
3219 elif request_policy == RequestPolicy.MULTI_SHARD:
3220 nodes = policy_callback(*args, **kwargs)
3221 elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
3222 nodes = policy_callback(args[0])
3223 else:
3224 nodes = policy_callback()
3226 if args[0].lower() == "ft.aggregate":
3227 self._aggregate_nodes = nodes
3229 return nodes
3231 def multi(self):
3232 raise RedisClusterException(
3233 "method multi() is not supported outside of transactional context"
3234 )
3236 def discard(self):
3237 raise RedisClusterException(
3238 "method discard() is not supported outside of transactional context"
3239 )
3241 def watch(self, *names):
3242 raise RedisClusterException(
3243 "method watch() is not supported outside of transactional context"
3244 )
3246 def unwatch(self, *names):
3247 raise RedisClusterException(
3248 "method unwatch() is not supported outside of transactional context"
3249 )
3251 def delete(self, *names):
3252 if len(names) != 1:
3253 raise RedisClusterException(
3254 "deleting multiple keys is not implemented in pipeline command"
3255 )
3257 return self.execute_command("DEL", names[0])
3259 def unlink(self, *names):
3260 if len(names) != 1:
3261 raise RedisClusterException(
3262 "unlinking multiple keys is not implemented in pipeline command"
3263 )
3265 return self.execute_command("UNLINK", names[0])
3268class TransactionStrategy(AbstractStrategy):
3269 NO_SLOTS_COMMANDS = {"UNWATCH"}
3270 IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
3271 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3272 SLOT_REDIRECT_ERRORS = (AskError, MovedError)
3273 CONNECTION_ERRORS = (
3274 ConnectionError,
3275 OSError,
3276 ClusterDownError,
3277 SlotNotCoveredError,
3278 )
3280 def __init__(self, pipe: ClusterPipeline):
3281 super().__init__(pipe)
3282 self._explicit_transaction = False
3283 self._watching = False
3284 self._pipeline_slots: Set[int] = set()
3285 self._transaction_connection: Optional[Connection] = None
3286 self._executing = False
3287 self._retry = copy(self._pipe.retry)
3288 self._retry.update_supported_errors(
3289 RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
3290 )
3292 def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
3293 """
3294 Find a connection for a pipeline transaction.
3296 For running an atomic transaction, watch keys ensure that contents have not been
3297 altered as long as the watch commands for those keys were sent over the same
3298 connection. So once we start watching a key, we fetch a connection to the
3299 node that owns that slot and reuse it.
3300 """
3301 if not self._pipeline_slots:
3302 raise RedisClusterException(
3303 "At least a command with a key is needed to identify a node"
3304 )
3306 node: ClusterNode = self._nodes_manager.get_node_from_slot(
3307 list(self._pipeline_slots)[0], False
3308 )
3309 redis_node: Redis = self._pipe.get_redis_connection(node)
3310 if self._transaction_connection:
3311 if not redis_node.connection_pool.owns_connection(
3312 self._transaction_connection
3313 ):
3314 previous_node = self._nodes_manager.find_connection_owner(
3315 self._transaction_connection
3316 )
3317 previous_node.connection_pool.release(self._transaction_connection)
3318 self._transaction_connection = None
3320 if not self._transaction_connection:
3321 self._transaction_connection = get_connection(redis_node)
3323 return redis_node, self._transaction_connection
3325 def execute_command(self, *args, **kwargs):
3326 slot_number: Optional[int] = None
3327 if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
3328 slot_number = self._pipe.determine_slot(*args)
3330 if (
3331 self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
3332 ) and not self._explicit_transaction:
3333 if args[0] == "WATCH":
3334 self._validate_watch()
3336 if slot_number is not None:
3337 if self._pipeline_slots and slot_number not in self._pipeline_slots:
3338 raise CrossSlotTransactionError(
3339 "Cannot watch or send commands on different slots"
3340 )
3342 self._pipeline_slots.add(slot_number)
3343 elif args[0] not in self.NO_SLOTS_COMMANDS:
3344 raise RedisClusterException(
3345 f"Cannot identify slot number for command: {args[0]},"
3346 "it cannot be triggered in a transaction"
3347 )
3349 return self._immediate_execute_command(*args, **kwargs)
3350 else:
3351 if slot_number is not None:
3352 self._pipeline_slots.add(slot_number)
3354 return self.pipeline_execute_command(*args, **kwargs)
3356 def _validate_watch(self):
3357 if self._explicit_transaction:
3358 raise RedisError("Cannot issue a WATCH after a MULTI")
3360 self._watching = True
3362 def _immediate_execute_command(self, *args, **options):
3363 return self._retry.call_with_retry(
3364 lambda: self._get_connection_and_send_command(*args, **options),
3365 self._reinitialize_on_error,
3366 )
3368 def _get_connection_and_send_command(self, *args, **options):
3369 redis_node, connection = self._get_client_and_connection_for_transaction()
3370 return self._send_command_parse_response(
3371 connection, redis_node, args[0], *args, **options
3372 )
3374 def _send_command_parse_response(
3375 self, conn, redis_node: Redis, command_name, *args, **options
3376 ):
3377 """
3378 Send a command and parse the response
3379 """
3381 conn.send_command(*args)
3382 output = redis_node.parse_response(conn, command_name, **options)
3384 if command_name in self.UNWATCH_COMMANDS:
3385 self._watching = False
3386 return output
3388 def _reinitialize_on_error(self, error):
3389 if self._watching:
3390 if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
3391 raise WatchError("Slot rebalancing occurred while watching keys")
3393 if (
3394 type(error) in self.SLOT_REDIRECT_ERRORS
3395 or type(error) in self.CONNECTION_ERRORS
3396 ):
3397 if self._transaction_connection:
3398 self._transaction_connection = None
3400 self._pipe.reinitialize_counter += 1
3401 if self._pipe._should_reinitialized():
3402 self._nodes_manager.initialize()
3403 self.reinitialize_counter = 0
3404 else:
3405 if isinstance(error, AskError):
3406 self._nodes_manager.update_moved_exception(error)
3408 self._executing = False
3410 def _raise_first_error(self, responses, stack):
3411 """
3412 Raise the first exception on the stack
3413 """
3414 for r, cmd in zip(responses, stack):
3415 if isinstance(r, Exception):
3416 self.annotate_exception(r, cmd.position + 1, cmd.args)
3417 raise r
3419 def execute(self, raise_on_error: bool = True) -> List[Any]:
3420 stack = self._command_queue
3421 if not stack and (not self._watching or not self._pipeline_slots):
3422 return []
3424 return self._execute_transaction_with_retries(stack, raise_on_error)
3426 def _execute_transaction_with_retries(
3427 self, stack: List["PipelineCommand"], raise_on_error: bool
3428 ):
3429 return self._retry.call_with_retry(
3430 lambda: self._execute_transaction(stack, raise_on_error),
3431 self._reinitialize_on_error,
3432 )
3434 def _execute_transaction(
3435 self, stack: List["PipelineCommand"], raise_on_error: bool
3436 ):
3437 if len(self._pipeline_slots) > 1:
3438 raise CrossSlotTransactionError(
3439 "All keys involved in a cluster transaction must map to the same slot"
3440 )
3442 self._executing = True
3444 redis_node, connection = self._get_client_and_connection_for_transaction()
3446 stack = chain(
3447 [PipelineCommand(("MULTI",))],
3448 stack,
3449 [PipelineCommand(("EXEC",))],
3450 )
3451 commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
3452 packed_commands = connection.pack_commands(commands)
3453 connection.send_packed_command(packed_commands)
3454 errors = []
3456 # parse off the response for MULTI
3457 # NOTE: we need to handle ResponseErrors here and continue
3458 # so that we read all the additional command messages from
3459 # the socket
3460 try:
3461 redis_node.parse_response(connection, "MULTI")
3462 except ResponseError as e:
3463 self.annotate_exception(e, 0, "MULTI")
3464 errors.append(e)
3465 except self.CONNECTION_ERRORS as cluster_error:
3466 self.annotate_exception(cluster_error, 0, "MULTI")
3467 raise
3469 # and all the other commands
3470 for i, command in enumerate(self._command_queue):
3471 if EMPTY_RESPONSE in command.options:
3472 errors.append((i, command.options[EMPTY_RESPONSE]))
3473 else:
3474 try:
3475 _ = redis_node.parse_response(connection, "_")
3476 except self.SLOT_REDIRECT_ERRORS as slot_error:
3477 self.annotate_exception(slot_error, i + 1, command.args)
3478 errors.append(slot_error)
3479 except self.CONNECTION_ERRORS as cluster_error:
3480 self.annotate_exception(cluster_error, i + 1, command.args)
3481 raise
3482 except ResponseError as e:
3483 self.annotate_exception(e, i + 1, command.args)
3484 errors.append(e)
3486 response = None
3487 # parse the EXEC.
3488 try:
3489 response = redis_node.parse_response(connection, "EXEC")
3490 except ExecAbortError:
3491 if errors:
3492 raise errors[0]
3493 raise
3495 self._executing = False
3497 # EXEC clears any watched keys
3498 self._watching = False
3500 if response is None:
3501 raise WatchError("Watched variable changed.")
3503 # put any parse errors into the response
3504 for i, e in errors:
3505 response.insert(i, e)
3507 if len(response) != len(self._command_queue):
3508 raise InvalidPipelineStack(
3509 "Unexpected response length for cluster pipeline EXEC."
3510 " Command stack was {} but response had length {}".format(
3511 [c.args[0] for c in self._command_queue], len(response)
3512 )
3513 )
3515 # find any errors in the response and raise if necessary
3516 if raise_on_error or len(errors) > 0:
3517 self._raise_first_error(
3518 response,
3519 self._command_queue,
3520 )
3522 # We have to run response callbacks manually
3523 data = []
3524 for r, cmd in zip(response, self._command_queue):
3525 if not isinstance(r, Exception):
3526 command_name = cmd.args[0]
3527 if command_name in self._pipe.cluster_response_callbacks:
3528 r = self._pipe.cluster_response_callbacks[command_name](
3529 r, **cmd.options
3530 )
3531 data.append(r)
3532 return data
3534 def reset(self):
3535 self._command_queue = []
3537 # make sure to reset the connection state in the event that we were
3538 # watching something
3539 if self._transaction_connection:
3540 try:
3541 if self._watching:
3542 # call this manually since our unwatch or
3543 # immediate_execute_command methods can call reset()
3544 self._transaction_connection.send_command("UNWATCH")
3545 self._transaction_connection.read_response()
3546 # we can safely return the connection to the pool here since we're
3547 # sure we're no longer WATCHing anything
3548 node = self._nodes_manager.find_connection_owner(
3549 self._transaction_connection
3550 )
3551 node.redis_connection.connection_pool.release(
3552 self._transaction_connection
3553 )
3554 self._transaction_connection = None
3555 except self.CONNECTION_ERRORS:
3556 # disconnect will also remove any previous WATCHes
3557 if self._transaction_connection:
3558 self._transaction_connection.disconnect()
3560 # clean up the other instance attributes
3561 self._watching = False
3562 self._explicit_transaction = False
3563 self._pipeline_slots = set()
3564 self._executing = False
3566 def send_cluster_commands(
3567 self, stack, raise_on_error=True, allow_redirections=True
3568 ):
3569 raise NotImplementedError(
3570 "send_cluster_commands cannot be executed in transactional context."
3571 )
3573 def multi(self):
3574 if self._explicit_transaction:
3575 raise RedisError("Cannot issue nested calls to MULTI")
3576 if self._command_queue:
3577 raise RedisError(
3578 "Commands without an initial WATCH have already been issued"
3579 )
3580 self._explicit_transaction = True
3582 def watch(self, *names):
3583 if self._explicit_transaction:
3584 raise RedisError("Cannot issue a WATCH after a MULTI")
3586 return self.execute_command("WATCH", *names)
3588 def unwatch(self):
3589 if self._watching:
3590 return self.execute_command("UNWATCH")
3592 return True
3594 def discard(self):
3595 self.reset()
3597 def delete(self, *names):
3598 return self.execute_command("DEL", *names)
3600 def unlink(self, *names):
3601 return self.execute_command("UNLINK", *names)