Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import re
3import threading
4import time
5from itertools import chain
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Callable,
10 Dict,
11 List,
12 Mapping,
13 Optional,
14 Set,
15 Type,
16 Union,
17)
19from redis._parsers.encoders import Encoder
20from redis._parsers.helpers import (
21 _RedisCallbacks,
22 _RedisCallbacksRESP2,
23 _RedisCallbacksRESP3,
24 bool_ok,
25)
26from redis.backoff import ExponentialWithJitterBackoff
27from redis.cache import CacheConfig, CacheInterface
28from redis.commands import (
29 CoreCommands,
30 RedisModuleCommands,
31 SentinelCommands,
32 list_or_args,
33)
34from redis.commands.core import Script
35from redis.connection import (
36 AbstractConnection,
37 Connection,
38 ConnectionPool,
39 SSLConnection,
40 UnixDomainSocketConnection,
41)
42from redis.credentials import CredentialProvider
43from redis.event import (
44 AfterPooledConnectionsInstantiationEvent,
45 AfterPubSubConnectionInstantiationEvent,
46 AfterSingleConnectionInstantiationEvent,
47 ClientType,
48 EventDispatcher,
49)
50from redis.exceptions import (
51 ConnectionError,
52 ExecAbortError,
53 PubSubError,
54 RedisError,
55 ResponseError,
56 WatchError,
57)
58from redis.lock import Lock
59from redis.retry import Retry
60from redis.utils import (
61 _set_info_logger,
62 deprecated_args,
63 get_lib_version,
64 safe_str,
65 str_if_bytes,
66 truncate_text,
67)
69if TYPE_CHECKING:
70 import ssl
72 import OpenSSL
74SYM_EMPTY = b""
75EMPTY_RESPONSE = "EMPTY_RESPONSE"
77# some responses (ie. dump) are binary, and just meant to never be decoded
78NEVER_DECODE = "NEVER_DECODE"
81class CaseInsensitiveDict(dict):
82 "Case insensitive dict implementation. Assumes string keys only."
84 def __init__(self, data: Dict[str, str]) -> None:
85 for k, v in data.items():
86 self[k.upper()] = v
88 def __contains__(self, k):
89 return super().__contains__(k.upper())
91 def __delitem__(self, k):
92 super().__delitem__(k.upper())
94 def __getitem__(self, k):
95 return super().__getitem__(k.upper())
97 def get(self, k, default=None):
98 return super().get(k.upper(), default)
100 def __setitem__(self, k, v):
101 super().__setitem__(k.upper(), v)
103 def update(self, data):
104 data = CaseInsensitiveDict(data)
105 super().update(data)
108class AbstractRedis:
109 pass
112class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
113 """
114 Implementation of the Redis protocol.
116 This abstract class provides a Python interface to all Redis commands
117 and an implementation of the Redis protocol.
119 Pipelines derive from this, implementing how
120 the commands are sent and received to the Redis server. Based on
121 configuration, an instance will either use a ConnectionPool, or
122 Connection object to talk to redis.
124 It is not safe to pass PubSub or Pipeline objects between threads.
125 """
127 @classmethod
128 def from_url(cls, url: str, **kwargs) -> "Redis":
129 """
130 Return a Redis client object configured from the given URL
132 For example::
134 redis://[[username]:[password]]@localhost:6379/0
135 rediss://[[username]:[password]]@localhost:6379/0
136 unix://[username@]/path/to/socket.sock?db=0[&password=password]
138 Three URL schemes are supported:
140 - `redis://` creates a TCP socket connection. See more at:
141 <https://www.iana.org/assignments/uri-schemes/prov/redis>
142 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
143 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
144 - ``unix://``: creates a Unix Domain Socket connection.
146 The username, password, hostname, path and all querystring values
147 are passed through urllib.parse.unquote in order to replace any
148 percent-encoded values with their corresponding characters.
150 There are several ways to specify a database number. The first value
151 found will be used:
153 1. A ``db`` querystring option, e.g. redis://localhost?db=0
154 2. If using the redis:// or rediss:// schemes, the path argument
155 of the url, e.g. redis://localhost/0
156 3. A ``db`` keyword argument to this function.
158 If none of these options are specified, the default db=0 is used.
160 All querystring options are cast to their appropriate Python types.
161 Boolean arguments can be specified with string values "True"/"False"
162 or "Yes"/"No". Values that cannot be properly cast cause a
163 ``ValueError`` to be raised. Once parsed, the querystring arguments
164 and keyword arguments are passed to the ``ConnectionPool``'s
165 class initializer. In the case of conflicting arguments, querystring
166 arguments always win.
168 """
169 single_connection_client = kwargs.pop("single_connection_client", False)
170 connection_pool = ConnectionPool.from_url(url, **kwargs)
171 client = cls(
172 connection_pool=connection_pool,
173 single_connection_client=single_connection_client,
174 )
175 client.auto_close_connection_pool = True
176 return client
178 @classmethod
179 def from_pool(
180 cls: Type["Redis"],
181 connection_pool: ConnectionPool,
182 ) -> "Redis":
183 """
184 Return a Redis client from the given connection pool.
185 The Redis client will take ownership of the connection pool and
186 close it when the Redis client is closed.
187 """
188 client = cls(
189 connection_pool=connection_pool,
190 )
191 client.auto_close_connection_pool = True
192 return client
194 @deprecated_args(
195 args_to_warn=["retry_on_timeout"],
196 reason="TimeoutError is included by default.",
197 version="6.0.0",
198 )
199 def __init__(
200 self,
201 host: str = "localhost",
202 port: int = 6379,
203 db: int = 0,
204 password: Optional[str] = None,
205 socket_timeout: Optional[float] = None,
206 socket_connect_timeout: Optional[float] = None,
207 socket_keepalive: Optional[bool] = None,
208 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
209 connection_pool: Optional[ConnectionPool] = None,
210 unix_socket_path: Optional[str] = None,
211 encoding: str = "utf-8",
212 encoding_errors: str = "strict",
213 decode_responses: bool = False,
214 retry_on_timeout: bool = False,
215 retry: Retry = Retry(
216 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
217 ),
218 retry_on_error: Optional[List[Type[Exception]]] = None,
219 ssl: bool = False,
220 ssl_keyfile: Optional[str] = None,
221 ssl_certfile: Optional[str] = None,
222 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
223 ssl_ca_certs: Optional[str] = None,
224 ssl_ca_path: Optional[str] = None,
225 ssl_ca_data: Optional[str] = None,
226 ssl_check_hostname: bool = True,
227 ssl_password: Optional[str] = None,
228 ssl_validate_ocsp: bool = False,
229 ssl_validate_ocsp_stapled: bool = False,
230 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
231 ssl_ocsp_expected_cert: Optional[str] = None,
232 ssl_min_version: Optional["ssl.TLSVersion"] = None,
233 ssl_ciphers: Optional[str] = None,
234 max_connections: Optional[int] = None,
235 single_connection_client: bool = False,
236 health_check_interval: int = 0,
237 client_name: Optional[str] = None,
238 lib_name: Optional[str] = "redis-py",
239 lib_version: Optional[str] = get_lib_version(),
240 username: Optional[str] = None,
241 redis_connect_func: Optional[Callable[[], None]] = None,
242 credential_provider: Optional[CredentialProvider] = None,
243 protocol: Optional[int] = 2,
244 cache: Optional[CacheInterface] = None,
245 cache_config: Optional[CacheConfig] = None,
246 event_dispatcher: Optional[EventDispatcher] = None,
247 ) -> None:
248 """
249 Initialize a new Redis client.
251 To specify a retry policy for specific errors, you have two options:
253 1. Set the `retry_on_error` to a list of the error/s to retry on, and
254 you can also set `retry` to a valid `Retry` object(in case the default
255 one is not appropriate) - with this approach the retries will be triggered
256 on the default errors specified in the Retry object enriched with the
257 errors specified in `retry_on_error`.
259 2. Define a `Retry` object with configured 'supported_errors' and set
260 it to the `retry` parameter - with this approach you completely redefine
261 the errors on which retries will happen.
263 `retry_on_timeout` is deprecated - please include the TimeoutError
264 either in the Retry object or in the `retry_on_error` list.
266 When 'connection_pool' is provided - the retry configuration of the
267 provided pool will be used.
269 Args:
271 single_connection_client:
272 if `True`, connection pool is not used. In that case `Redis`
273 instance use is not thread safe.
274 """
275 if event_dispatcher is None:
276 self._event_dispatcher = EventDispatcher()
277 else:
278 self._event_dispatcher = event_dispatcher
279 if not connection_pool:
280 if not retry_on_error:
281 retry_on_error = []
282 kwargs = {
283 "db": db,
284 "username": username,
285 "password": password,
286 "socket_timeout": socket_timeout,
287 "encoding": encoding,
288 "encoding_errors": encoding_errors,
289 "decode_responses": decode_responses,
290 "retry_on_error": retry_on_error,
291 "retry": copy.deepcopy(retry),
292 "max_connections": max_connections,
293 "health_check_interval": health_check_interval,
294 "client_name": client_name,
295 "lib_name": lib_name,
296 "lib_version": lib_version,
297 "redis_connect_func": redis_connect_func,
298 "credential_provider": credential_provider,
299 "protocol": protocol,
300 }
301 # based on input, setup appropriate connection args
302 if unix_socket_path is not None:
303 kwargs.update(
304 {
305 "path": unix_socket_path,
306 "connection_class": UnixDomainSocketConnection,
307 }
308 )
309 else:
310 # TCP specific options
311 kwargs.update(
312 {
313 "host": host,
314 "port": port,
315 "socket_connect_timeout": socket_connect_timeout,
316 "socket_keepalive": socket_keepalive,
317 "socket_keepalive_options": socket_keepalive_options,
318 }
319 )
321 if ssl:
322 kwargs.update(
323 {
324 "connection_class": SSLConnection,
325 "ssl_keyfile": ssl_keyfile,
326 "ssl_certfile": ssl_certfile,
327 "ssl_cert_reqs": ssl_cert_reqs,
328 "ssl_ca_certs": ssl_ca_certs,
329 "ssl_ca_data": ssl_ca_data,
330 "ssl_check_hostname": ssl_check_hostname,
331 "ssl_password": ssl_password,
332 "ssl_ca_path": ssl_ca_path,
333 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
334 "ssl_validate_ocsp": ssl_validate_ocsp,
335 "ssl_ocsp_context": ssl_ocsp_context,
336 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
337 "ssl_min_version": ssl_min_version,
338 "ssl_ciphers": ssl_ciphers,
339 }
340 )
341 if (cache_config or cache) and protocol in [3, "3"]:
342 kwargs.update(
343 {
344 "cache": cache,
345 "cache_config": cache_config,
346 }
347 )
348 connection_pool = ConnectionPool(**kwargs)
349 self._event_dispatcher.dispatch(
350 AfterPooledConnectionsInstantiationEvent(
351 [connection_pool], ClientType.SYNC, credential_provider
352 )
353 )
354 self.auto_close_connection_pool = True
355 else:
356 self.auto_close_connection_pool = False
357 self._event_dispatcher.dispatch(
358 AfterPooledConnectionsInstantiationEvent(
359 [connection_pool], ClientType.SYNC, credential_provider
360 )
361 )
363 self.connection_pool = connection_pool
365 if (cache_config or cache) and self.connection_pool.get_protocol() not in [
366 3,
367 "3",
368 ]:
369 raise RedisError("Client caching is only supported with RESP version 3")
371 self.single_connection_lock = threading.RLock()
372 self.connection = None
373 self._single_connection_client = single_connection_client
374 if self._single_connection_client:
375 self.connection = self.connection_pool.get_connection()
376 self._event_dispatcher.dispatch(
377 AfterSingleConnectionInstantiationEvent(
378 self.connection, ClientType.SYNC, self.single_connection_lock
379 )
380 )
382 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks)
384 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
385 self.response_callbacks.update(_RedisCallbacksRESP3)
386 else:
387 self.response_callbacks.update(_RedisCallbacksRESP2)
389 def __repr__(self) -> str:
390 return (
391 f"<{type(self).__module__}.{type(self).__name__}"
392 f"({repr(self.connection_pool)})>"
393 )
395 def get_encoder(self) -> "Encoder":
396 """Get the connection pool's encoder"""
397 return self.connection_pool.get_encoder()
399 def get_connection_kwargs(self) -> Dict:
400 """Get the connection's key-word arguments"""
401 return self.connection_pool.connection_kwargs
403 def get_retry(self) -> Optional[Retry]:
404 return self.get_connection_kwargs().get("retry")
406 def set_retry(self, retry: Retry) -> None:
407 self.get_connection_kwargs().update({"retry": retry})
408 self.connection_pool.set_retry(retry)
410 def set_response_callback(self, command: str, callback: Callable) -> None:
411 """Set a custom Response Callback"""
412 self.response_callbacks[command] = callback
414 def load_external_module(self, funcname, func) -> None:
415 """
416 This function can be used to add externally defined redis modules,
417 and their namespaces to the redis client.
419 funcname - A string containing the name of the function to create
420 func - The function, being added to this class.
422 ex: Assume that one has a custom redis module named foomod that
423 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
424 To load function functions into this namespace:
426 from redis import Redis
427 from foomodule import F
428 r = Redis()
429 r.load_external_module("foo", F)
430 r.foo().dothing('your', 'arguments')
432 For a concrete example see the reimport of the redisjson module in
433 tests/test_connection.py::test_loading_external_modules
434 """
435 setattr(self, funcname, func)
437 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
438 """
439 Return a new pipeline object that can queue multiple commands for
440 later execution. ``transaction`` indicates whether all commands
441 should be executed atomically. Apart from making a group of operations
442 atomic, pipelines are useful for reducing the back-and-forth overhead
443 between the client and server.
444 """
445 return Pipeline(
446 self.connection_pool, self.response_callbacks, transaction, shard_hint
447 )
449 def transaction(
450 self, func: Callable[["Pipeline"], None], *watches, **kwargs
451 ) -> Union[List[Any], Any, None]:
452 """
453 Convenience method for executing the callable `func` as a transaction
454 while watching all keys specified in `watches`. The 'func' callable
455 should expect a single argument which is a Pipeline object.
456 """
457 shard_hint = kwargs.pop("shard_hint", None)
458 value_from_callable = kwargs.pop("value_from_callable", False)
459 watch_delay = kwargs.pop("watch_delay", None)
460 with self.pipeline(True, shard_hint) as pipe:
461 while True:
462 try:
463 if watches:
464 pipe.watch(*watches)
465 func_value = func(pipe)
466 exec_value = pipe.execute()
467 return func_value if value_from_callable else exec_value
468 except WatchError:
469 if watch_delay is not None and watch_delay > 0:
470 time.sleep(watch_delay)
471 continue
473 def lock(
474 self,
475 name: str,
476 timeout: Optional[float] = None,
477 sleep: float = 0.1,
478 blocking: bool = True,
479 blocking_timeout: Optional[float] = None,
480 lock_class: Union[None, Any] = None,
481 thread_local: bool = True,
482 raise_on_release_error: bool = True,
483 ):
484 """
485 Return a new Lock object using key ``name`` that mimics
486 the behavior of threading.Lock.
488 If specified, ``timeout`` indicates a maximum life for the lock.
489 By default, it will remain locked until release() is called.
491 ``sleep`` indicates the amount of time to sleep per loop iteration
492 when the lock is in blocking mode and another client is currently
493 holding the lock.
495 ``blocking`` indicates whether calling ``acquire`` should block until
496 the lock has been acquired or to fail immediately, causing ``acquire``
497 to return False and the lock not being acquired. Defaults to True.
498 Note this value can be overridden by passing a ``blocking``
499 argument to ``acquire``.
501 ``blocking_timeout`` indicates the maximum amount of time in seconds to
502 spend trying to acquire the lock. A value of ``None`` indicates
503 continue trying forever. ``blocking_timeout`` can be specified as a
504 float or integer, both representing the number of seconds to wait.
506 ``lock_class`` forces the specified lock implementation. Note that as
507 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
508 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
509 you have created your own custom lock class.
511 ``thread_local`` indicates whether the lock token is placed in
512 thread-local storage. By default, the token is placed in thread local
513 storage so that a thread only sees its token, not a token set by
514 another thread. Consider the following timeline:
516 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
517 thread-1 sets the token to "abc"
518 time: 1, thread-2 blocks trying to acquire `my-lock` using the
519 Lock instance.
520 time: 5, thread-1 has not yet completed. redis expires the lock
521 key.
522 time: 5, thread-2 acquired `my-lock` now that it's available.
523 thread-2 sets the token to "xyz"
524 time: 6, thread-1 finishes its work and calls release(). if the
525 token is *not* stored in thread local storage, then
526 thread-1 would see the token value as "xyz" and would be
527 able to successfully release the thread-2's lock.
529 ``raise_on_release_error`` indicates whether to raise an exception when
530 the lock is no longer owned when exiting the context manager. By default,
531 this is True, meaning an exception will be raised. If False, the warning
532 will be logged and the exception will be suppressed.
534 In some use cases it's necessary to disable thread local storage. For
535 example, if you have code where one thread acquires a lock and passes
536 that lock instance to a worker thread to release later. If thread
537 local storage isn't disabled in this case, the worker thread won't see
538 the token set by the thread that acquired the lock. Our assumption
539 is that these cases aren't common and as such default to using
540 thread local storage."""
541 if lock_class is None:
542 lock_class = Lock
543 return lock_class(
544 self,
545 name,
546 timeout=timeout,
547 sleep=sleep,
548 blocking=blocking,
549 blocking_timeout=blocking_timeout,
550 thread_local=thread_local,
551 raise_on_release_error=raise_on_release_error,
552 )
554 def pubsub(self, **kwargs):
555 """
556 Return a Publish/Subscribe object. With this object, you can
557 subscribe to channels and listen for messages that get published to
558 them.
559 """
560 return PubSub(
561 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs
562 )
564 def monitor(self):
565 return Monitor(self.connection_pool)
567 def client(self):
568 return self.__class__(
569 connection_pool=self.connection_pool, single_connection_client=True
570 )
572 def __enter__(self):
573 return self
575 def __exit__(self, exc_type, exc_value, traceback):
576 self.close()
578 def __del__(self):
579 try:
580 self.close()
581 except Exception:
582 pass
584 def close(self) -> None:
585 # In case a connection property does not yet exist
586 # (due to a crash earlier in the Redis() constructor), return
587 # immediately as there is nothing to clean-up.
588 if not hasattr(self, "connection"):
589 return
591 conn = self.connection
592 if conn:
593 self.connection = None
594 self.connection_pool.release(conn)
596 if self.auto_close_connection_pool:
597 self.connection_pool.disconnect()
599 def _send_command_parse_response(self, conn, command_name, *args, **options):
600 """
601 Send a command and parse the response
602 """
603 conn.send_command(*args, **options)
604 return self.parse_response(conn, command_name, **options)
606 def _close_connection(self, conn) -> None:
607 """
608 Close the connection before retrying.
610 The supported exceptions are already checked in the
611 retry object so we don't need to do it here.
613 After we disconnect the connection, it will try to reconnect and
614 do a health check as part of the send_command logic(on connection level).
615 """
617 conn.disconnect()
619 # COMMAND EXECUTION AND PROTOCOL PARSING
620 def execute_command(self, *args, **options):
621 return self._execute_command(*args, **options)
623 def _execute_command(self, *args, **options):
624 """Execute a command and return a parsed response"""
625 pool = self.connection_pool
626 command_name = args[0]
627 conn = self.connection or pool.get_connection()
629 if self._single_connection_client:
630 self.single_connection_lock.acquire()
631 try:
632 return conn.retry.call_with_retry(
633 lambda: self._send_command_parse_response(
634 conn, command_name, *args, **options
635 ),
636 lambda _: self._close_connection(conn),
637 )
638 finally:
639 if self._single_connection_client:
640 self.single_connection_lock.release()
641 if not self.connection:
642 pool.release(conn)
644 def parse_response(self, connection, command_name, **options):
645 """Parses a response from the Redis server"""
646 try:
647 if NEVER_DECODE in options:
648 response = connection.read_response(disable_decoding=True)
649 options.pop(NEVER_DECODE)
650 else:
651 response = connection.read_response()
652 except ResponseError:
653 if EMPTY_RESPONSE in options:
654 return options[EMPTY_RESPONSE]
655 raise
657 if EMPTY_RESPONSE in options:
658 options.pop(EMPTY_RESPONSE)
660 # Remove keys entry, it needs only for cache.
661 options.pop("keys", None)
663 if command_name in self.response_callbacks:
664 return self.response_callbacks[command_name](response, **options)
665 return response
667 def get_cache(self) -> Optional[CacheInterface]:
668 return self.connection_pool.cache
671StrictRedis = Redis
674class Monitor:
675 """
676 Monitor is useful for handling the MONITOR command to the redis server.
677 next_command() method returns one command from monitor
678 listen() method yields commands from monitor.
679 """
681 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)")
682 command_re = re.compile(r'"(.*?)(?<!\\)"')
684 def __init__(self, connection_pool):
685 self.connection_pool = connection_pool
686 self.connection = self.connection_pool.get_connection()
688 def __enter__(self):
689 self.connection.send_command("MONITOR")
690 # check that monitor returns 'OK', but don't return it to user
691 response = self.connection.read_response()
692 if not bool_ok(response):
693 raise RedisError(f"MONITOR failed: {response}")
694 return self
696 def __exit__(self, *args):
697 self.connection.disconnect()
698 self.connection_pool.release(self.connection)
700 def next_command(self):
701 """Parse the response from a monitor command"""
702 response = self.connection.read_response()
703 if isinstance(response, bytes):
704 response = self.connection.encoder.decode(response, force=True)
705 command_time, command_data = response.split(" ", 1)
706 m = self.monitor_re.match(command_data)
707 db_id, client_info, command = m.groups()
708 command = " ".join(self.command_re.findall(command))
709 # Redis escapes double quotes because each piece of the command
710 # string is surrounded by double quotes. We don't have that
711 # requirement so remove the escaping and leave the quote.
712 command = command.replace('\\"', '"')
714 if client_info == "lua":
715 client_address = "lua"
716 client_port = ""
717 client_type = "lua"
718 elif client_info.startswith("unix"):
719 client_address = "unix"
720 client_port = client_info[5:]
721 client_type = "unix"
722 else:
723 # use rsplit as ipv6 addresses contain colons
724 client_address, client_port = client_info.rsplit(":", 1)
725 client_type = "tcp"
726 return {
727 "time": float(command_time),
728 "db": int(db_id),
729 "client_address": client_address,
730 "client_port": client_port,
731 "client_type": client_type,
732 "command": command,
733 }
735 def listen(self):
736 """Listen for commands coming to the server."""
737 while True:
738 yield self.next_command()
741class PubSub:
742 """
743 PubSub provides publish, subscribe and listen support to Redis channels.
745 After subscribing to one or more channels, the listen() method will block
746 until a message arrives on one of the subscribed channels. That message
747 will be returned and it's safe to start listening again.
748 """
750 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage")
751 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe")
752 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
754 def __init__(
755 self,
756 connection_pool,
757 shard_hint=None,
758 ignore_subscribe_messages: bool = False,
759 encoder: Optional["Encoder"] = None,
760 push_handler_func: Union[None, Callable[[str], None]] = None,
761 event_dispatcher: Optional["EventDispatcher"] = None,
762 ):
763 self.connection_pool = connection_pool
764 self.shard_hint = shard_hint
765 self.ignore_subscribe_messages = ignore_subscribe_messages
766 self.connection = None
767 self.subscribed_event = threading.Event()
768 # we need to know the encoding options for this connection in order
769 # to lookup channel and pattern names for callback handlers.
770 self.encoder = encoder
771 self.push_handler_func = push_handler_func
772 if event_dispatcher is None:
773 self._event_dispatcher = EventDispatcher()
774 else:
775 self._event_dispatcher = event_dispatcher
777 self._lock = threading.RLock()
778 if self.encoder is None:
779 self.encoder = self.connection_pool.get_encoder()
780 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
781 if self.encoder.decode_responses:
782 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
783 else:
784 self.health_check_response = [b"pong", self.health_check_response_b]
785 if self.push_handler_func is None:
786 _set_info_logger()
787 self.reset()
789 def __enter__(self) -> "PubSub":
790 return self
792 def __exit__(self, exc_type, exc_value, traceback) -> None:
793 self.reset()
795 def __del__(self) -> None:
796 try:
797 # if this object went out of scope prior to shutting down
798 # subscriptions, close the connection manually before
799 # returning it to the connection pool
800 self.reset()
801 except Exception:
802 pass
804 def reset(self) -> None:
805 if self.connection:
806 self.connection.disconnect()
807 self.connection.deregister_connect_callback(self.on_connect)
808 self.connection_pool.release(self.connection)
809 self.connection = None
810 self.health_check_response_counter = 0
811 self.channels = {}
812 self.pending_unsubscribe_channels = set()
813 self.shard_channels = {}
814 self.pending_unsubscribe_shard_channels = set()
815 self.patterns = {}
816 self.pending_unsubscribe_patterns = set()
817 self.subscribed_event.clear()
819 def close(self) -> None:
820 self.reset()
822 def on_connect(self, connection) -> None:
823 "Re-subscribe to any channels and patterns previously subscribed to"
824 # NOTE: for python3, we can't pass bytestrings as keyword arguments
825 # so we need to decode channel/pattern names back to unicode strings
826 # before passing them to [p]subscribe.
827 self.pending_unsubscribe_channels.clear()
828 self.pending_unsubscribe_patterns.clear()
829 self.pending_unsubscribe_shard_channels.clear()
830 if self.channels:
831 channels = {
832 self.encoder.decode(k, force=True): v for k, v in self.channels.items()
833 }
834 self.subscribe(**channels)
835 if self.patterns:
836 patterns = {
837 self.encoder.decode(k, force=True): v for k, v in self.patterns.items()
838 }
839 self.psubscribe(**patterns)
840 if self.shard_channels:
841 shard_channels = {
842 self.encoder.decode(k, force=True): v
843 for k, v in self.shard_channels.items()
844 }
845 self.ssubscribe(**shard_channels)
847 @property
848 def subscribed(self) -> bool:
849 """Indicates if there are subscriptions to any channels or patterns"""
850 return self.subscribed_event.is_set()
852 def execute_command(self, *args):
853 """Execute a publish/subscribe command"""
855 # NOTE: don't parse the response in this function -- it could pull a
856 # legitimate message off the stack if the connection is already
857 # subscribed to one or more channels
859 if self.connection is None:
860 self.connection = self.connection_pool.get_connection()
861 # register a callback that re-subscribes to any channels we
862 # were listening to when we were disconnected
863 self.connection.register_connect_callback(self.on_connect)
864 if self.push_handler_func is not None:
865 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
866 self._event_dispatcher.dispatch(
867 AfterPubSubConnectionInstantiationEvent(
868 self.connection, self.connection_pool, ClientType.SYNC, self._lock
869 )
870 )
871 connection = self.connection
872 kwargs = {"check_health": not self.subscribed}
873 if not self.subscribed:
874 self.clean_health_check_responses()
875 with self._lock:
876 self._execute(connection, connection.send_command, *args, **kwargs)
878 def clean_health_check_responses(self) -> None:
879 """
880 If any health check responses are present, clean them
881 """
882 ttl = 10
883 conn = self.connection
884 while self.health_check_response_counter > 0 and ttl > 0:
885 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
886 response = self._execute(conn, conn.read_response)
887 if self.is_health_check_response(response):
888 self.health_check_response_counter -= 1
889 else:
890 raise PubSubError(
891 "A non health check response was cleaned by "
892 "execute_command: {}".format(response)
893 )
894 ttl -= 1
896 def _reconnect(self, conn) -> None:
897 """
898 The supported exceptions are already checked in the
899 retry object so we don't need to do it here.
901 In this error handler we are trying to reconnect to the server.
902 """
903 conn.disconnect()
904 conn.connect()
906 def _execute(self, conn, command, *args, **kwargs):
907 """
908 Connect manually upon disconnection. If the Redis server is down,
909 this will fail and raise a ConnectionError as desired.
910 After reconnection, the ``on_connect`` callback should have been
911 called by the # connection to resubscribe us to any channels and
912 patterns we were previously listening to
913 """
914 return conn.retry.call_with_retry(
915 lambda: command(*args, **kwargs),
916 lambda _: self._reconnect(conn),
917 )
919 def parse_response(self, block=True, timeout=0):
920 """Parse the response from a publish/subscribe command"""
921 conn = self.connection
922 if conn is None:
923 raise RuntimeError(
924 "pubsub connection not set: "
925 "did you forget to call subscribe() or psubscribe()?"
926 )
928 self.check_health()
930 def try_read():
931 if not block:
932 if not conn.can_read(timeout=timeout):
933 return None
934 else:
935 conn.connect()
936 return conn.read_response(disconnect_on_error=False, push_request=True)
938 response = self._execute(conn, try_read)
940 if self.is_health_check_response(response):
941 # ignore the health check message as user might not expect it
942 self.health_check_response_counter -= 1
943 return None
944 return response
946 def is_health_check_response(self, response) -> bool:
947 """
948 Check if the response is a health check response.
949 If there are no subscriptions redis responds to PING command with a
950 bulk response, instead of a multi-bulk with "pong" and the response.
951 """
952 return response in [
953 self.health_check_response, # If there was a subscription
954 self.health_check_response_b, # If there wasn't
955 ]
957 def check_health(self) -> None:
958 conn = self.connection
959 if conn is None:
960 raise RuntimeError(
961 "pubsub connection not set: "
962 "did you forget to call subscribe() or psubscribe()?"
963 )
965 if conn.health_check_interval and time.monotonic() > conn.next_health_check:
966 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
967 self.health_check_response_counter += 1
969 def _normalize_keys(self, data) -> Dict:
970 """
971 normalize channel/pattern names to be either bytes or strings
972 based on whether responses are automatically decoded. this saves us
973 from coercing the value for each message coming in.
974 """
975 encode = self.encoder.encode
976 decode = self.encoder.decode
977 return {decode(encode(k)): v for k, v in data.items()}
979 def psubscribe(self, *args, **kwargs):
980 """
981 Subscribe to channel patterns. Patterns supplied as keyword arguments
982 expect a pattern name as the key and a callable as the value. A
983 pattern's callable will be invoked automatically when a message is
984 received on that pattern rather than producing a message via
985 ``listen()``.
986 """
987 if args:
988 args = list_or_args(args[0], args[1:])
989 new_patterns = dict.fromkeys(args)
990 new_patterns.update(kwargs)
991 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
992 # update the patterns dict AFTER we send the command. we don't want to
993 # subscribe twice to these patterns, once for the command and again
994 # for the reconnection.
995 new_patterns = self._normalize_keys(new_patterns)
996 self.patterns.update(new_patterns)
997 if not self.subscribed:
998 # Set the subscribed_event flag to True
999 self.subscribed_event.set()
1000 # Clear the health check counter
1001 self.health_check_response_counter = 0
1002 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1003 return ret_val
1005 def punsubscribe(self, *args):
1006 """
1007 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1008 all patterns.
1009 """
1010 if args:
1011 args = list_or_args(args[0], args[1:])
1012 patterns = self._normalize_keys(dict.fromkeys(args))
1013 else:
1014 patterns = self.patterns
1015 self.pending_unsubscribe_patterns.update(patterns)
1016 return self.execute_command("PUNSUBSCRIBE", *args)
1018 def subscribe(self, *args, **kwargs):
1019 """
1020 Subscribe to channels. Channels supplied as keyword arguments expect
1021 a channel name as the key and a callable as the value. A channel's
1022 callable will be invoked automatically when a message is received on
1023 that channel rather than producing a message via ``listen()`` or
1024 ``get_message()``.
1025 """
1026 if args:
1027 args = list_or_args(args[0], args[1:])
1028 new_channels = dict.fromkeys(args)
1029 new_channels.update(kwargs)
1030 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1031 # update the channels dict AFTER we send the command. we don't want to
1032 # subscribe twice to these channels, once for the command and again
1033 # for the reconnection.
1034 new_channels = self._normalize_keys(new_channels)
1035 self.channels.update(new_channels)
1036 if not self.subscribed:
1037 # Set the subscribed_event flag to True
1038 self.subscribed_event.set()
1039 # Clear the health check counter
1040 self.health_check_response_counter = 0
1041 self.pending_unsubscribe_channels.difference_update(new_channels)
1042 return ret_val
1044 def unsubscribe(self, *args):
1045 """
1046 Unsubscribe from the supplied channels. If empty, unsubscribe from
1047 all channels
1048 """
1049 if args:
1050 args = list_or_args(args[0], args[1:])
1051 channels = self._normalize_keys(dict.fromkeys(args))
1052 else:
1053 channels = self.channels
1054 self.pending_unsubscribe_channels.update(channels)
1055 return self.execute_command("UNSUBSCRIBE", *args)
1057 def ssubscribe(self, *args, target_node=None, **kwargs):
1058 """
1059 Subscribes the client to the specified shard channels.
1060 Channels supplied as keyword arguments expect a channel name as the key
1061 and a callable as the value. A channel's callable will be invoked automatically
1062 when a message is received on that channel rather than producing a message via
1063 ``listen()`` or ``get_sharded_message()``.
1064 """
1065 if args:
1066 args = list_or_args(args[0], args[1:])
1067 new_s_channels = dict.fromkeys(args)
1068 new_s_channels.update(kwargs)
1069 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys())
1070 # update the s_channels dict AFTER we send the command. we don't want to
1071 # subscribe twice to these channels, once for the command and again
1072 # for the reconnection.
1073 new_s_channels = self._normalize_keys(new_s_channels)
1074 self.shard_channels.update(new_s_channels)
1075 if not self.subscribed:
1076 # Set the subscribed_event flag to True
1077 self.subscribed_event.set()
1078 # Clear the health check counter
1079 self.health_check_response_counter = 0
1080 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels)
1081 return ret_val
1083 def sunsubscribe(self, *args, target_node=None):
1084 """
1085 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
1086 all shard_channels
1087 """
1088 if args:
1089 args = list_or_args(args[0], args[1:])
1090 s_channels = self._normalize_keys(dict.fromkeys(args))
1091 else:
1092 s_channels = self.shard_channels
1093 self.pending_unsubscribe_shard_channels.update(s_channels)
1094 return self.execute_command("SUNSUBSCRIBE", *args)
1096 def listen(self):
1097 "Listen for messages on channels this client has been subscribed to"
1098 while self.subscribed:
1099 response = self.handle_message(self.parse_response(block=True))
1100 if response is not None:
1101 yield response
1103 def get_message(
1104 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
1105 ):
1106 """
1107 Get the next message if one is available, otherwise None.
1109 If timeout is specified, the system will wait for `timeout` seconds
1110 before returning. Timeout should be specified as a floating point
1111 number, or None, to wait indefinitely.
1112 """
1113 if not self.subscribed:
1114 # Wait for subscription
1115 start_time = time.monotonic()
1116 if self.subscribed_event.wait(timeout) is True:
1117 # The connection was subscribed during the timeout time frame.
1118 # The timeout should be adjusted based on the time spent
1119 # waiting for the subscription
1120 time_spent = time.monotonic() - start_time
1121 timeout = max(0.0, timeout - time_spent)
1122 else:
1123 # The connection isn't subscribed to any channels or patterns,
1124 # so no messages are available
1125 return None
1127 response = self.parse_response(block=(timeout is None), timeout=timeout)
1128 if response:
1129 return self.handle_message(response, ignore_subscribe_messages)
1130 return None
1132 get_sharded_message = get_message
1134 def ping(self, message: Union[str, None] = None) -> bool:
1135 """
1136 Ping the Redis server
1137 """
1138 args = ["PING", message] if message is not None else ["PING"]
1139 return self.execute_command(*args)
1141 def handle_message(self, response, ignore_subscribe_messages=False):
1142 """
1143 Parses a pub/sub message. If the channel or pattern was subscribed to
1144 with a message handler, the handler is invoked instead of a parsed
1145 message being returned.
1146 """
1147 if response is None:
1148 return None
1149 if isinstance(response, bytes):
1150 response = [b"pong", response] if response != b"PONG" else [b"pong", b""]
1151 message_type = str_if_bytes(response[0])
1152 if message_type == "pmessage":
1153 message = {
1154 "type": message_type,
1155 "pattern": response[1],
1156 "channel": response[2],
1157 "data": response[3],
1158 }
1159 elif message_type == "pong":
1160 message = {
1161 "type": message_type,
1162 "pattern": None,
1163 "channel": None,
1164 "data": response[1],
1165 }
1166 else:
1167 message = {
1168 "type": message_type,
1169 "pattern": None,
1170 "channel": response[1],
1171 "data": response[2],
1172 }
1174 # if this is an unsubscribe message, remove it from memory
1175 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1176 if message_type == "punsubscribe":
1177 pattern = response[1]
1178 if pattern in self.pending_unsubscribe_patterns:
1179 self.pending_unsubscribe_patterns.remove(pattern)
1180 self.patterns.pop(pattern, None)
1181 elif message_type == "sunsubscribe":
1182 s_channel = response[1]
1183 if s_channel in self.pending_unsubscribe_shard_channels:
1184 self.pending_unsubscribe_shard_channels.remove(s_channel)
1185 self.shard_channels.pop(s_channel, None)
1186 else:
1187 channel = response[1]
1188 if channel in self.pending_unsubscribe_channels:
1189 self.pending_unsubscribe_channels.remove(channel)
1190 self.channels.pop(channel, None)
1191 if not self.channels and not self.patterns and not self.shard_channels:
1192 # There are no subscriptions anymore, set subscribed_event flag
1193 # to false
1194 self.subscribed_event.clear()
1196 if message_type in self.PUBLISH_MESSAGE_TYPES:
1197 # if there's a message handler, invoke it
1198 if message_type == "pmessage":
1199 handler = self.patterns.get(message["pattern"], None)
1200 elif message_type == "smessage":
1201 handler = self.shard_channels.get(message["channel"], None)
1202 else:
1203 handler = self.channels.get(message["channel"], None)
1204 if handler:
1205 handler(message)
1206 return None
1207 elif message_type != "pong":
1208 # this is a subscribe/unsubscribe message. ignore if we don't
1209 # want them
1210 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1211 return None
1213 return message
1215 def run_in_thread(
1216 self,
1217 sleep_time: float = 0.0,
1218 daemon: bool = False,
1219 exception_handler: Optional[Callable] = None,
1220 ) -> "PubSubWorkerThread":
1221 for channel, handler in self.channels.items():
1222 if handler is None:
1223 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1224 for pattern, handler in self.patterns.items():
1225 if handler is None:
1226 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1227 for s_channel, handler in self.shard_channels.items():
1228 if handler is None:
1229 raise PubSubError(
1230 f"Shard Channel: '{s_channel}' has no handler registered"
1231 )
1233 thread = PubSubWorkerThread(
1234 self, sleep_time, daemon=daemon, exception_handler=exception_handler
1235 )
1236 thread.start()
1237 return thread
1240class PubSubWorkerThread(threading.Thread):
1241 def __init__(
1242 self,
1243 pubsub,
1244 sleep_time: float,
1245 daemon: bool = False,
1246 exception_handler: Union[
1247 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
1248 ] = None,
1249 ):
1250 super().__init__()
1251 self.daemon = daemon
1252 self.pubsub = pubsub
1253 self.sleep_time = sleep_time
1254 self.exception_handler = exception_handler
1255 self._running = threading.Event()
1257 def run(self) -> None:
1258 if self._running.is_set():
1259 return
1260 self._running.set()
1261 pubsub = self.pubsub
1262 sleep_time = self.sleep_time
1263 while self._running.is_set():
1264 try:
1265 pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
1266 except BaseException as e:
1267 if self.exception_handler is None:
1268 raise
1269 self.exception_handler(e, pubsub, self)
1270 pubsub.close()
1272 def stop(self) -> None:
1273 # trip the flag so the run loop exits. the run loop will
1274 # close the pubsub connection, which disconnects the socket
1275 # and returns the connection to the pool.
1276 self._running.clear()
1279class Pipeline(Redis):
1280 """
1281 Pipelines provide a way to transmit multiple commands to the Redis server
1282 in one transmission. This is convenient for batch processing, such as
1283 saving all the values in a list to Redis.
1285 All commands executed within a pipeline(when running in transactional mode,
1286 which is the default behavior) are wrapped with MULTI and EXEC
1287 calls. This guarantees all commands executed in the pipeline will be
1288 executed atomically.
1290 Any command raising an exception does *not* halt the execution of
1291 subsequent commands in the pipeline. Instead, the exception is caught
1292 and its instance is placed into the response list returned by execute().
1293 Code iterating over the response list should be able to deal with an
1294 instance of an exception as a potential value. In general, these will be
1295 ResponseError exceptions, such as those raised when issuing a command
1296 on a key of a different datatype.
1297 """
1299 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1301 def __init__(
1302 self,
1303 connection_pool: ConnectionPool,
1304 response_callbacks,
1305 transaction,
1306 shard_hint,
1307 ):
1308 self.connection_pool = connection_pool
1309 self.connection: Optional[Connection] = None
1310 self.response_callbacks = response_callbacks
1311 self.transaction = transaction
1312 self.shard_hint = shard_hint
1313 self.watching = False
1314 self.command_stack = []
1315 self.scripts: Set[Script] = set()
1316 self.explicit_transaction = False
1318 def __enter__(self) -> "Pipeline":
1319 return self
1321 def __exit__(self, exc_type, exc_value, traceback):
1322 self.reset()
1324 def __del__(self):
1325 try:
1326 self.reset()
1327 except Exception:
1328 pass
1330 def __len__(self) -> int:
1331 return len(self.command_stack)
1333 def __bool__(self) -> bool:
1334 """Pipeline instances should always evaluate to True"""
1335 return True
1337 def reset(self) -> None:
1338 self.command_stack = []
1339 self.scripts = set()
1340 # make sure to reset the connection state in the event that we were
1341 # watching something
1342 if self.watching and self.connection:
1343 try:
1344 # call this manually since our unwatch or
1345 # immediate_execute_command methods can call reset()
1346 self.connection.send_command("UNWATCH")
1347 self.connection.read_response()
1348 except ConnectionError:
1349 # disconnect will also remove any previous WATCHes
1350 self.connection.disconnect()
1351 # clean up the other instance attributes
1352 self.watching = False
1353 self.explicit_transaction = False
1354 # we can safely return the connection to the pool here since we're
1355 # sure we're no longer WATCHing anything
1356 if self.connection:
1357 self.connection_pool.release(self.connection)
1358 self.connection = None
1360 def close(self) -> None:
1361 """Close the pipeline"""
1362 self.reset()
1364 def multi(self) -> None:
1365 """
1366 Start a transactional block of the pipeline after WATCH commands
1367 are issued. End the transactional block with `execute`.
1368 """
1369 if self.explicit_transaction:
1370 raise RedisError("Cannot issue nested calls to MULTI")
1371 if self.command_stack:
1372 raise RedisError(
1373 "Commands without an initial WATCH have already been issued"
1374 )
1375 self.explicit_transaction = True
1377 def execute_command(self, *args, **kwargs):
1378 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1379 return self.immediate_execute_command(*args, **kwargs)
1380 return self.pipeline_execute_command(*args, **kwargs)
1382 def _disconnect_reset_raise_on_watching(
1383 self,
1384 conn: AbstractConnection,
1385 error: Exception,
1386 ) -> None:
1387 """
1388 Close the connection reset watching state and
1389 raise an exception if we were watching.
1391 The supported exceptions are already checked in the
1392 retry object so we don't need to do it here.
1394 After we disconnect the connection, it will try to reconnect and
1395 do a health check as part of the send_command logic(on connection level).
1396 """
1397 conn.disconnect()
1399 # if we were already watching a variable, the watch is no longer
1400 # valid since this connection has died. raise a WatchError, which
1401 # indicates the user should retry this transaction.
1402 if self.watching:
1403 self.reset()
1404 raise WatchError(
1405 f"A {type(error).__name__} occurred while watching one or more keys"
1406 )
1408 def immediate_execute_command(self, *args, **options):
1409 """
1410 Execute a command immediately, but don't auto-retry on the supported
1411 errors for retry if we're already WATCHing a variable.
1412 Used when issuing WATCH or subsequent commands retrieving their values but before
1413 MULTI is called.
1414 """
1415 command_name = args[0]
1416 conn = self.connection
1417 # if this is the first call, we need a connection
1418 if not conn:
1419 conn = self.connection_pool.get_connection()
1420 self.connection = conn
1422 return conn.retry.call_with_retry(
1423 lambda: self._send_command_parse_response(
1424 conn, command_name, *args, **options
1425 ),
1426 lambda error: self._disconnect_reset_raise_on_watching(conn, error),
1427 )
1429 def pipeline_execute_command(self, *args, **options) -> "Pipeline":
1430 """
1431 Stage a command to be executed when execute() is next called
1433 Returns the current Pipeline object back so commands can be
1434 chained together, such as:
1436 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1438 At some other point, you can then run: pipe.execute(),
1439 which will execute all commands queued in the pipe.
1440 """
1441 self.command_stack.append((args, options))
1442 return self
1444 def _execute_transaction(
1445 self, connection: Connection, commands, raise_on_error
1446 ) -> List:
1447 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1448 all_cmds = connection.pack_commands(
1449 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1450 )
1451 connection.send_packed_command(all_cmds)
1452 errors = []
1454 # parse off the response for MULTI
1455 # NOTE: we need to handle ResponseErrors here and continue
1456 # so that we read all the additional command messages from
1457 # the socket
1458 try:
1459 self.parse_response(connection, "_")
1460 except ResponseError as e:
1461 errors.append((0, e))
1463 # and all the other commands
1464 for i, command in enumerate(commands):
1465 if EMPTY_RESPONSE in command[1]:
1466 errors.append((i, command[1][EMPTY_RESPONSE]))
1467 else:
1468 try:
1469 self.parse_response(connection, "_")
1470 except ResponseError as e:
1471 self.annotate_exception(e, i + 1, command[0])
1472 errors.append((i, e))
1474 # parse the EXEC.
1475 try:
1476 response = self.parse_response(connection, "_")
1477 except ExecAbortError:
1478 if errors:
1479 raise errors[0][1]
1480 raise
1482 # EXEC clears any watched keys
1483 self.watching = False
1485 if response is None:
1486 raise WatchError("Watched variable changed.")
1488 # put any parse errors into the response
1489 for i, e in errors:
1490 response.insert(i, e)
1492 if len(response) != len(commands):
1493 self.connection.disconnect()
1494 raise ResponseError(
1495 "Wrong number of response items from pipeline execution"
1496 )
1498 # find any errors in the response and raise if necessary
1499 if raise_on_error:
1500 self.raise_first_error(commands, response)
1502 # We have to run response callbacks manually
1503 data = []
1504 for r, cmd in zip(response, commands):
1505 if not isinstance(r, Exception):
1506 args, options = cmd
1507 # Remove keys entry, it needs only for cache.
1508 options.pop("keys", None)
1509 command_name = args[0]
1510 if command_name in self.response_callbacks:
1511 r = self.response_callbacks[command_name](r, **options)
1512 data.append(r)
1513 return data
1515 def _execute_pipeline(self, connection, commands, raise_on_error):
1516 # build up all commands into a single request to increase network perf
1517 all_cmds = connection.pack_commands([args for args, _ in commands])
1518 connection.send_packed_command(all_cmds)
1520 response = []
1521 for args, options in commands:
1522 try:
1523 response.append(self.parse_response(connection, args[0], **options))
1524 except ResponseError as e:
1525 response.append(e)
1527 if raise_on_error:
1528 self.raise_first_error(commands, response)
1529 return response
1531 def raise_first_error(self, commands, response):
1532 for i, r in enumerate(response):
1533 if isinstance(r, ResponseError):
1534 self.annotate_exception(r, i + 1, commands[i][0])
1535 raise r
1537 def annotate_exception(self, exception, number, command):
1538 cmd = " ".join(map(safe_str, command))
1539 msg = (
1540 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
1541 f"caused error: {exception.args[0]}"
1542 )
1543 exception.args = (msg,) + exception.args[1:]
1545 def parse_response(self, connection, command_name, **options):
1546 result = Redis.parse_response(self, connection, command_name, **options)
1547 if command_name in self.UNWATCH_COMMANDS:
1548 self.watching = False
1549 elif command_name == "WATCH":
1550 self.watching = True
1551 return result
1553 def load_scripts(self):
1554 # make sure all scripts that are about to be run on this pipeline exist
1555 scripts = list(self.scripts)
1556 immediate = self.immediate_execute_command
1557 shas = [s.sha for s in scripts]
1558 # we can't use the normal script_* methods because they would just
1559 # get buffered in the pipeline.
1560 exists = immediate("SCRIPT EXISTS", *shas)
1561 if not all(exists):
1562 for s, exist in zip(scripts, exists):
1563 if not exist:
1564 s.sha = immediate("SCRIPT LOAD", s.script)
1566 def _disconnect_raise_on_watching(
1567 self,
1568 conn: AbstractConnection,
1569 error: Exception,
1570 ) -> None:
1571 """
1572 Close the connection, raise an exception if we were watching.
1574 The supported exceptions are already checked in the
1575 retry object so we don't need to do it here.
1577 After we disconnect the connection, it will try to reconnect and
1578 do a health check as part of the send_command logic(on connection level).
1579 """
1580 conn.disconnect()
1581 # if we were watching a variable, the watch is no longer valid
1582 # since this connection has died. raise a WatchError, which
1583 # indicates the user should retry this transaction.
1584 if self.watching:
1585 raise WatchError(
1586 f"A {type(error).__name__} occurred while watching one or more keys"
1587 )
1589 def execute(self, raise_on_error: bool = True) -> List[Any]:
1590 """Execute all the commands in the current pipeline"""
1591 stack = self.command_stack
1592 if not stack and not self.watching:
1593 return []
1594 if self.scripts:
1595 self.load_scripts()
1596 if self.transaction or self.explicit_transaction:
1597 execute = self._execute_transaction
1598 else:
1599 execute = self._execute_pipeline
1601 conn = self.connection
1602 if not conn:
1603 conn = self.connection_pool.get_connection()
1604 # assign to self.connection so reset() releases the connection
1605 # back to the pool after we're done
1606 self.connection = conn
1608 try:
1609 return conn.retry.call_with_retry(
1610 lambda: execute(conn, stack, raise_on_error),
1611 lambda error: self._disconnect_raise_on_watching(conn, error),
1612 )
1613 finally:
1614 self.reset()
1616 def discard(self):
1617 """
1618 Flushes all previously queued commands
1619 See: https://redis.io/commands/DISCARD
1620 """
1621 self.execute_command("DISCARD")
1623 def watch(self, *names):
1624 """Watches the values at keys ``names``"""
1625 if self.explicit_transaction:
1626 raise RedisError("Cannot issue a WATCH after a MULTI")
1627 return self.execute_command("WATCH", *names)
1629 def unwatch(self) -> bool:
1630 """Unwatches all previously specified keys"""
1631 return self.watching and self.execute_command("UNWATCH") or True