Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import re
3import threading
4import time
5from itertools import chain
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Callable,
10 Dict,
11 List,
12 Mapping,
13 Optional,
14 Set,
15 Type,
16 Union,
17)
19from redis._parsers.encoders import Encoder
20from redis._parsers.helpers import (
21 _RedisCallbacks,
22 _RedisCallbacksRESP2,
23 _RedisCallbacksRESP3,
24 bool_ok,
25)
26from redis.backoff import ExponentialWithJitterBackoff
27from redis.cache import CacheConfig, CacheInterface
28from redis.commands import (
29 CoreCommands,
30 RedisModuleCommands,
31 SentinelCommands,
32 list_or_args,
33)
34from redis.commands.core import Script
35from redis.connection import (
36 AbstractConnection,
37 Connection,
38 ConnectionPool,
39 SSLConnection,
40 UnixDomainSocketConnection,
41)
42from redis.credentials import CredentialProvider
43from redis.event import (
44 AfterPooledConnectionsInstantiationEvent,
45 AfterPubSubConnectionInstantiationEvent,
46 AfterSingleConnectionInstantiationEvent,
47 ClientType,
48 EventDispatcher,
49)
50from redis.exceptions import (
51 ConnectionError,
52 ExecAbortError,
53 PubSubError,
54 RedisError,
55 ResponseError,
56 WatchError,
57)
58from redis.lock import Lock
59from redis.maint_notifications import (
60 MaintNotificationsConfig,
61 MaintNotificationsPoolHandler,
62)
63from redis.retry import Retry
64from redis.utils import (
65 _set_info_logger,
66 deprecated_args,
67 get_lib_version,
68 safe_str,
69 str_if_bytes,
70 truncate_text,
71)
73if TYPE_CHECKING:
74 import ssl
76 import OpenSSL
78SYM_EMPTY = b""
79EMPTY_RESPONSE = "EMPTY_RESPONSE"
81# some responses (ie. dump) are binary, and just meant to never be decoded
82NEVER_DECODE = "NEVER_DECODE"
85class CaseInsensitiveDict(dict):
86 "Case insensitive dict implementation. Assumes string keys only."
88 def __init__(self, data: Dict[str, str]) -> None:
89 for k, v in data.items():
90 self[k.upper()] = v
92 def __contains__(self, k):
93 return super().__contains__(k.upper())
95 def __delitem__(self, k):
96 super().__delitem__(k.upper())
98 def __getitem__(self, k):
99 return super().__getitem__(k.upper())
101 def get(self, k, default=None):
102 return super().get(k.upper(), default)
104 def __setitem__(self, k, v):
105 super().__setitem__(k.upper(), v)
107 def update(self, data):
108 data = CaseInsensitiveDict(data)
109 super().update(data)
112class AbstractRedis:
113 pass
116class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
117 """
118 Implementation of the Redis protocol.
120 This abstract class provides a Python interface to all Redis commands
121 and an implementation of the Redis protocol.
123 Pipelines derive from this, implementing how
124 the commands are sent and received to the Redis server. Based on
125 configuration, an instance will either use a ConnectionPool, or
126 Connection object to talk to redis.
128 It is not safe to pass PubSub or Pipeline objects between threads.
129 """
131 @classmethod
132 def from_url(cls, url: str, **kwargs) -> "Redis":
133 """
134 Return a Redis client object configured from the given URL
136 For example::
138 redis://[[username]:[password]]@localhost:6379/0
139 rediss://[[username]:[password]]@localhost:6379/0
140 unix://[username@]/path/to/socket.sock?db=0[&password=password]
142 Three URL schemes are supported:
144 - `redis://` creates a TCP socket connection. See more at:
145 <https://www.iana.org/assignments/uri-schemes/prov/redis>
146 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
147 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
148 - ``unix://``: creates a Unix Domain Socket connection.
150 The username, password, hostname, path and all querystring values
151 are passed through urllib.parse.unquote in order to replace any
152 percent-encoded values with their corresponding characters.
154 There are several ways to specify a database number. The first value
155 found will be used:
157 1. A ``db`` querystring option, e.g. redis://localhost?db=0
158 2. If using the redis:// or rediss:// schemes, the path argument
159 of the url, e.g. redis://localhost/0
160 3. A ``db`` keyword argument to this function.
162 If none of these options are specified, the default db=0 is used.
164 All querystring options are cast to their appropriate Python types.
165 Boolean arguments can be specified with string values "True"/"False"
166 or "Yes"/"No". Values that cannot be properly cast cause a
167 ``ValueError`` to be raised. Once parsed, the querystring arguments
168 and keyword arguments are passed to the ``ConnectionPool``'s
169 class initializer. In the case of conflicting arguments, querystring
170 arguments always win.
172 """
173 single_connection_client = kwargs.pop("single_connection_client", False)
174 connection_pool = ConnectionPool.from_url(url, **kwargs)
175 client = cls(
176 connection_pool=connection_pool,
177 single_connection_client=single_connection_client,
178 )
179 client.auto_close_connection_pool = True
180 return client
182 @classmethod
183 def from_pool(
184 cls: Type["Redis"],
185 connection_pool: ConnectionPool,
186 ) -> "Redis":
187 """
188 Return a Redis client from the given connection pool.
189 The Redis client will take ownership of the connection pool and
190 close it when the Redis client is closed.
191 """
192 client = cls(
193 connection_pool=connection_pool,
194 )
195 client.auto_close_connection_pool = True
196 return client
198 @deprecated_args(
199 args_to_warn=["retry_on_timeout"],
200 reason="TimeoutError is included by default.",
201 version="6.0.0",
202 )
203 def __init__(
204 self,
205 host: str = "localhost",
206 port: int = 6379,
207 db: int = 0,
208 password: Optional[str] = None,
209 socket_timeout: Optional[float] = None,
210 socket_connect_timeout: Optional[float] = None,
211 socket_keepalive: Optional[bool] = None,
212 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
213 connection_pool: Optional[ConnectionPool] = None,
214 unix_socket_path: Optional[str] = None,
215 encoding: str = "utf-8",
216 encoding_errors: str = "strict",
217 decode_responses: bool = False,
218 retry_on_timeout: bool = False,
219 retry: Retry = Retry(
220 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
221 ),
222 retry_on_error: Optional[List[Type[Exception]]] = None,
223 ssl: bool = False,
224 ssl_keyfile: Optional[str] = None,
225 ssl_certfile: Optional[str] = None,
226 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
227 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
228 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
229 ssl_ca_certs: Optional[str] = None,
230 ssl_ca_path: Optional[str] = None,
231 ssl_ca_data: Optional[str] = None,
232 ssl_check_hostname: bool = True,
233 ssl_password: Optional[str] = None,
234 ssl_validate_ocsp: bool = False,
235 ssl_validate_ocsp_stapled: bool = False,
236 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
237 ssl_ocsp_expected_cert: Optional[str] = None,
238 ssl_min_version: Optional["ssl.TLSVersion"] = None,
239 ssl_ciphers: Optional[str] = None,
240 max_connections: Optional[int] = None,
241 single_connection_client: bool = False,
242 health_check_interval: int = 0,
243 client_name: Optional[str] = None,
244 lib_name: Optional[str] = "redis-py",
245 lib_version: Optional[str] = get_lib_version(),
246 username: Optional[str] = None,
247 redis_connect_func: Optional[Callable[[], None]] = None,
248 credential_provider: Optional[CredentialProvider] = None,
249 protocol: Optional[int] = 2,
250 cache: Optional[CacheInterface] = None,
251 cache_config: Optional[CacheConfig] = None,
252 event_dispatcher: Optional[EventDispatcher] = None,
253 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
254 ) -> None:
255 """
256 Initialize a new Redis client.
258 To specify a retry policy for specific errors, you have two options:
260 1. Set the `retry_on_error` to a list of the error/s to retry on, and
261 you can also set `retry` to a valid `Retry` object(in case the default
262 one is not appropriate) - with this approach the retries will be triggered
263 on the default errors specified in the Retry object enriched with the
264 errors specified in `retry_on_error`.
266 2. Define a `Retry` object with configured 'supported_errors' and set
267 it to the `retry` parameter - with this approach you completely redefine
268 the errors on which retries will happen.
270 `retry_on_timeout` is deprecated - please include the TimeoutError
271 either in the Retry object or in the `retry_on_error` list.
273 When 'connection_pool' is provided - the retry configuration of the
274 provided pool will be used.
276 Args:
278 single_connection_client:
279 if `True`, connection pool is not used. In that case `Redis`
280 instance use is not thread safe.
281 """
282 if event_dispatcher is None:
283 self._event_dispatcher = EventDispatcher()
284 else:
285 self._event_dispatcher = event_dispatcher
286 if not connection_pool:
287 if not retry_on_error:
288 retry_on_error = []
289 kwargs = {
290 "db": db,
291 "username": username,
292 "password": password,
293 "socket_timeout": socket_timeout,
294 "encoding": encoding,
295 "encoding_errors": encoding_errors,
296 "decode_responses": decode_responses,
297 "retry_on_error": retry_on_error,
298 "retry": copy.deepcopy(retry),
299 "max_connections": max_connections,
300 "health_check_interval": health_check_interval,
301 "client_name": client_name,
302 "lib_name": lib_name,
303 "lib_version": lib_version,
304 "redis_connect_func": redis_connect_func,
305 "credential_provider": credential_provider,
306 "protocol": protocol,
307 }
308 # based on input, setup appropriate connection args
309 if unix_socket_path is not None:
310 kwargs.update(
311 {
312 "path": unix_socket_path,
313 "connection_class": UnixDomainSocketConnection,
314 }
315 )
316 else:
317 # TCP specific options
318 kwargs.update(
319 {
320 "host": host,
321 "port": port,
322 "socket_connect_timeout": socket_connect_timeout,
323 "socket_keepalive": socket_keepalive,
324 "socket_keepalive_options": socket_keepalive_options,
325 }
326 )
328 if ssl:
329 kwargs.update(
330 {
331 "connection_class": SSLConnection,
332 "ssl_keyfile": ssl_keyfile,
333 "ssl_certfile": ssl_certfile,
334 "ssl_cert_reqs": ssl_cert_reqs,
335 "ssl_include_verify_flags": ssl_include_verify_flags,
336 "ssl_exclude_verify_flags": ssl_exclude_verify_flags,
337 "ssl_ca_certs": ssl_ca_certs,
338 "ssl_ca_data": ssl_ca_data,
339 "ssl_check_hostname": ssl_check_hostname,
340 "ssl_password": ssl_password,
341 "ssl_ca_path": ssl_ca_path,
342 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
343 "ssl_validate_ocsp": ssl_validate_ocsp,
344 "ssl_ocsp_context": ssl_ocsp_context,
345 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
346 "ssl_min_version": ssl_min_version,
347 "ssl_ciphers": ssl_ciphers,
348 }
349 )
350 if (cache_config or cache) and protocol in [3, "3"]:
351 kwargs.update(
352 {
353 "cache": cache,
354 "cache_config": cache_config,
355 }
356 )
357 connection_pool = ConnectionPool(**kwargs)
358 self._event_dispatcher.dispatch(
359 AfterPooledConnectionsInstantiationEvent(
360 [connection_pool], ClientType.SYNC, credential_provider
361 )
362 )
363 self.auto_close_connection_pool = True
364 else:
365 self.auto_close_connection_pool = False
366 self._event_dispatcher.dispatch(
367 AfterPooledConnectionsInstantiationEvent(
368 [connection_pool], ClientType.SYNC, credential_provider
369 )
370 )
372 self.connection_pool = connection_pool
374 if (cache_config or cache) and self.connection_pool.get_protocol() not in [
375 3,
376 "3",
377 ]:
378 raise RedisError("Client caching is only supported with RESP version 3")
380 if maint_notifications_config and self.connection_pool.get_protocol() not in [
381 3,
382 "3",
383 ]:
384 raise RedisError(
385 "Push handlers on connection are only supported with RESP version 3"
386 )
387 if maint_notifications_config and maint_notifications_config.enabled:
388 self.maint_notifications_pool_handler = MaintNotificationsPoolHandler(
389 self.connection_pool, maint_notifications_config
390 )
391 self.connection_pool.set_maint_notifications_pool_handler(
392 self.maint_notifications_pool_handler
393 )
394 else:
395 self.maint_notifications_pool_handler = None
397 self.single_connection_lock = threading.RLock()
398 self.connection = None
399 self._single_connection_client = single_connection_client
400 if self._single_connection_client:
401 self.connection = self.connection_pool.get_connection()
402 self._event_dispatcher.dispatch(
403 AfterSingleConnectionInstantiationEvent(
404 self.connection, ClientType.SYNC, self.single_connection_lock
405 )
406 )
408 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks)
410 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
411 self.response_callbacks.update(_RedisCallbacksRESP3)
412 else:
413 self.response_callbacks.update(_RedisCallbacksRESP2)
415 def __repr__(self) -> str:
416 return (
417 f"<{type(self).__module__}.{type(self).__name__}"
418 f"({repr(self.connection_pool)})>"
419 )
421 def get_encoder(self) -> "Encoder":
422 """Get the connection pool's encoder"""
423 return self.connection_pool.get_encoder()
425 def get_connection_kwargs(self) -> Dict:
426 """Get the connection's key-word arguments"""
427 return self.connection_pool.connection_kwargs
429 def get_retry(self) -> Optional[Retry]:
430 return self.get_connection_kwargs().get("retry")
432 def set_retry(self, retry: Retry) -> None:
433 self.get_connection_kwargs().update({"retry": retry})
434 self.connection_pool.set_retry(retry)
436 def set_response_callback(self, command: str, callback: Callable) -> None:
437 """Set a custom Response Callback"""
438 self.response_callbacks[command] = callback
440 def load_external_module(self, funcname, func) -> None:
441 """
442 This function can be used to add externally defined redis modules,
443 and their namespaces to the redis client.
445 funcname - A string containing the name of the function to create
446 func - The function, being added to this class.
448 ex: Assume that one has a custom redis module named foomod that
449 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
450 To load function functions into this namespace:
452 from redis import Redis
453 from foomodule import F
454 r = Redis()
455 r.load_external_module("foo", F)
456 r.foo().dothing('your', 'arguments')
458 For a concrete example see the reimport of the redisjson module in
459 tests/test_connection.py::test_loading_external_modules
460 """
461 setattr(self, funcname, func)
463 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
464 """
465 Return a new pipeline object that can queue multiple commands for
466 later execution. ``transaction`` indicates whether all commands
467 should be executed atomically. Apart from making a group of operations
468 atomic, pipelines are useful for reducing the back-and-forth overhead
469 between the client and server.
470 """
471 return Pipeline(
472 self.connection_pool, self.response_callbacks, transaction, shard_hint
473 )
475 def transaction(
476 self, func: Callable[["Pipeline"], None], *watches, **kwargs
477 ) -> Union[List[Any], Any, None]:
478 """
479 Convenience method for executing the callable `func` as a transaction
480 while watching all keys specified in `watches`. The 'func' callable
481 should expect a single argument which is a Pipeline object.
482 """
483 shard_hint = kwargs.pop("shard_hint", None)
484 value_from_callable = kwargs.pop("value_from_callable", False)
485 watch_delay = kwargs.pop("watch_delay", None)
486 with self.pipeline(True, shard_hint) as pipe:
487 while True:
488 try:
489 if watches:
490 pipe.watch(*watches)
491 func_value = func(pipe)
492 exec_value = pipe.execute()
493 return func_value if value_from_callable else exec_value
494 except WatchError:
495 if watch_delay is not None and watch_delay > 0:
496 time.sleep(watch_delay)
497 continue
499 def lock(
500 self,
501 name: str,
502 timeout: Optional[float] = None,
503 sleep: float = 0.1,
504 blocking: bool = True,
505 blocking_timeout: Optional[float] = None,
506 lock_class: Union[None, Any] = None,
507 thread_local: bool = True,
508 raise_on_release_error: bool = True,
509 ):
510 """
511 Return a new Lock object using key ``name`` that mimics
512 the behavior of threading.Lock.
514 If specified, ``timeout`` indicates a maximum life for the lock.
515 By default, it will remain locked until release() is called.
517 ``sleep`` indicates the amount of time to sleep per loop iteration
518 when the lock is in blocking mode and another client is currently
519 holding the lock.
521 ``blocking`` indicates whether calling ``acquire`` should block until
522 the lock has been acquired or to fail immediately, causing ``acquire``
523 to return False and the lock not being acquired. Defaults to True.
524 Note this value can be overridden by passing a ``blocking``
525 argument to ``acquire``.
527 ``blocking_timeout`` indicates the maximum amount of time in seconds to
528 spend trying to acquire the lock. A value of ``None`` indicates
529 continue trying forever. ``blocking_timeout`` can be specified as a
530 float or integer, both representing the number of seconds to wait.
532 ``lock_class`` forces the specified lock implementation. Note that as
533 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
534 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
535 you have created your own custom lock class.
537 ``thread_local`` indicates whether the lock token is placed in
538 thread-local storage. By default, the token is placed in thread local
539 storage so that a thread only sees its token, not a token set by
540 another thread. Consider the following timeline:
542 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
543 thread-1 sets the token to "abc"
544 time: 1, thread-2 blocks trying to acquire `my-lock` using the
545 Lock instance.
546 time: 5, thread-1 has not yet completed. redis expires the lock
547 key.
548 time: 5, thread-2 acquired `my-lock` now that it's available.
549 thread-2 sets the token to "xyz"
550 time: 6, thread-1 finishes its work and calls release(). if the
551 token is *not* stored in thread local storage, then
552 thread-1 would see the token value as "xyz" and would be
553 able to successfully release the thread-2's lock.
555 ``raise_on_release_error`` indicates whether to raise an exception when
556 the lock is no longer owned when exiting the context manager. By default,
557 this is True, meaning an exception will be raised. If False, the warning
558 will be logged and the exception will be suppressed.
560 In some use cases it's necessary to disable thread local storage. For
561 example, if you have code where one thread acquires a lock and passes
562 that lock instance to a worker thread to release later. If thread
563 local storage isn't disabled in this case, the worker thread won't see
564 the token set by the thread that acquired the lock. Our assumption
565 is that these cases aren't common and as such default to using
566 thread local storage."""
567 if lock_class is None:
568 lock_class = Lock
569 return lock_class(
570 self,
571 name,
572 timeout=timeout,
573 sleep=sleep,
574 blocking=blocking,
575 blocking_timeout=blocking_timeout,
576 thread_local=thread_local,
577 raise_on_release_error=raise_on_release_error,
578 )
580 def pubsub(self, **kwargs):
581 """
582 Return a Publish/Subscribe object. With this object, you can
583 subscribe to channels and listen for messages that get published to
584 them.
585 """
586 return PubSub(
587 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs
588 )
590 def monitor(self):
591 return Monitor(self.connection_pool)
593 def client(self):
594 maint_notifications_config = (
595 None
596 if self.maint_notifications_pool_handler is None
597 else self.maint_notifications_pool_handler.config
598 )
599 return self.__class__(
600 connection_pool=self.connection_pool,
601 single_connection_client=True,
602 maint_notifications_config=maint_notifications_config,
603 )
605 def __enter__(self):
606 return self
608 def __exit__(self, exc_type, exc_value, traceback):
609 self.close()
611 def __del__(self):
612 try:
613 self.close()
614 except Exception:
615 pass
617 def close(self) -> None:
618 # In case a connection property does not yet exist
619 # (due to a crash earlier in the Redis() constructor), return
620 # immediately as there is nothing to clean-up.
621 if not hasattr(self, "connection"):
622 return
624 conn = self.connection
625 if conn:
626 self.connection = None
627 self.connection_pool.release(conn)
629 if self.auto_close_connection_pool:
630 self.connection_pool.disconnect()
632 def _send_command_parse_response(self, conn, command_name, *args, **options):
633 """
634 Send a command and parse the response
635 """
636 conn.send_command(*args, **options)
637 return self.parse_response(conn, command_name, **options)
639 def _close_connection(self, conn) -> None:
640 """
641 Close the connection before retrying.
643 The supported exceptions are already checked in the
644 retry object so we don't need to do it here.
646 After we disconnect the connection, it will try to reconnect and
647 do a health check as part of the send_command logic(on connection level).
648 """
650 conn.disconnect()
652 # COMMAND EXECUTION AND PROTOCOL PARSING
653 def execute_command(self, *args, **options):
654 return self._execute_command(*args, **options)
656 def _execute_command(self, *args, **options):
657 """Execute a command and return a parsed response"""
658 pool = self.connection_pool
659 command_name = args[0]
660 conn = self.connection or pool.get_connection()
662 if self._single_connection_client:
663 self.single_connection_lock.acquire()
664 try:
665 return conn.retry.call_with_retry(
666 lambda: self._send_command_parse_response(
667 conn, command_name, *args, **options
668 ),
669 lambda _: self._close_connection(conn),
670 )
672 finally:
673 if conn and conn.should_reconnect():
674 self._close_connection(conn)
675 conn.connect()
676 if self._single_connection_client:
677 self.single_connection_lock.release()
678 if not self.connection:
679 pool.release(conn)
681 def parse_response(self, connection, command_name, **options):
682 """Parses a response from the Redis server"""
683 try:
684 if NEVER_DECODE in options:
685 response = connection.read_response(disable_decoding=True)
686 options.pop(NEVER_DECODE)
687 else:
688 response = connection.read_response()
689 except ResponseError:
690 if EMPTY_RESPONSE in options:
691 return options[EMPTY_RESPONSE]
692 raise
694 if EMPTY_RESPONSE in options:
695 options.pop(EMPTY_RESPONSE)
697 # Remove keys entry, it needs only for cache.
698 options.pop("keys", None)
700 if command_name in self.response_callbacks:
701 return self.response_callbacks[command_name](response, **options)
702 return response
704 def get_cache(self) -> Optional[CacheInterface]:
705 return self.connection_pool.cache
708StrictRedis = Redis
711class Monitor:
712 """
713 Monitor is useful for handling the MONITOR command to the redis server.
714 next_command() method returns one command from monitor
715 listen() method yields commands from monitor.
716 """
718 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)")
719 command_re = re.compile(r'"(.*?)(?<!\\)"')
721 def __init__(self, connection_pool):
722 self.connection_pool = connection_pool
723 self.connection = self.connection_pool.get_connection()
725 def __enter__(self):
726 self._start_monitor()
727 return self
729 def __exit__(self, *args):
730 self.connection.disconnect()
731 self.connection_pool.release(self.connection)
733 def next_command(self):
734 """Parse the response from a monitor command"""
735 response = self.connection.read_response()
737 if response is None:
738 return None
740 if isinstance(response, bytes):
741 response = self.connection.encoder.decode(response, force=True)
743 command_time, command_data = response.split(" ", 1)
744 m = self.monitor_re.match(command_data)
745 db_id, client_info, command = m.groups()
746 command = " ".join(self.command_re.findall(command))
747 # Redis escapes double quotes because each piece of the command
748 # string is surrounded by double quotes. We don't have that
749 # requirement so remove the escaping and leave the quote.
750 command = command.replace('\\"', '"')
752 if client_info == "lua":
753 client_address = "lua"
754 client_port = ""
755 client_type = "lua"
756 elif client_info.startswith("unix"):
757 client_address = "unix"
758 client_port = client_info[5:]
759 client_type = "unix"
760 else:
761 # use rsplit as ipv6 addresses contain colons
762 client_address, client_port = client_info.rsplit(":", 1)
763 client_type = "tcp"
764 return {
765 "time": float(command_time),
766 "db": int(db_id),
767 "client_address": client_address,
768 "client_port": client_port,
769 "client_type": client_type,
770 "command": command,
771 }
773 def listen(self):
774 """Listen for commands coming to the server."""
775 while True:
776 yield self.next_command()
778 def _start_monitor(self):
779 self.connection.send_command("MONITOR")
780 # check that monitor returns 'OK', but don't return it to user
781 response = self.connection.read_response()
783 if not bool_ok(response):
784 raise RedisError(f"MONITOR failed: {response}")
787class PubSub:
788 """
789 PubSub provides publish, subscribe and listen support to Redis channels.
791 After subscribing to one or more channels, the listen() method will block
792 until a message arrives on one of the subscribed channels. That message
793 will be returned and it's safe to start listening again.
794 """
796 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage")
797 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe")
798 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
800 def __init__(
801 self,
802 connection_pool,
803 shard_hint=None,
804 ignore_subscribe_messages: bool = False,
805 encoder: Optional["Encoder"] = None,
806 push_handler_func: Union[None, Callable[[str], None]] = None,
807 event_dispatcher: Optional["EventDispatcher"] = None,
808 ):
809 self.connection_pool = connection_pool
810 self.shard_hint = shard_hint
811 self.ignore_subscribe_messages = ignore_subscribe_messages
812 self.connection = None
813 self.subscribed_event = threading.Event()
814 # we need to know the encoding options for this connection in order
815 # to lookup channel and pattern names for callback handlers.
816 self.encoder = encoder
817 self.push_handler_func = push_handler_func
818 if event_dispatcher is None:
819 self._event_dispatcher = EventDispatcher()
820 else:
821 self._event_dispatcher = event_dispatcher
823 self._lock = threading.RLock()
824 if self.encoder is None:
825 self.encoder = self.connection_pool.get_encoder()
826 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
827 if self.encoder.decode_responses:
828 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
829 else:
830 self.health_check_response = [b"pong", self.health_check_response_b]
831 if self.push_handler_func is None:
832 _set_info_logger()
833 self.reset()
835 def __enter__(self) -> "PubSub":
836 return self
838 def __exit__(self, exc_type, exc_value, traceback) -> None:
839 self.reset()
841 def __del__(self) -> None:
842 try:
843 # if this object went out of scope prior to shutting down
844 # subscriptions, close the connection manually before
845 # returning it to the connection pool
846 self.reset()
847 except Exception:
848 pass
850 def reset(self) -> None:
851 if self.connection:
852 self.connection.disconnect()
853 self.connection.deregister_connect_callback(self.on_connect)
854 self.connection_pool.release(self.connection)
855 self.connection = None
856 self.health_check_response_counter = 0
857 self.channels = {}
858 self.pending_unsubscribe_channels = set()
859 self.shard_channels = {}
860 self.pending_unsubscribe_shard_channels = set()
861 self.patterns = {}
862 self.pending_unsubscribe_patterns = set()
863 self.subscribed_event.clear()
865 def close(self) -> None:
866 self.reset()
868 def on_connect(self, connection) -> None:
869 "Re-subscribe to any channels and patterns previously subscribed to"
870 # NOTE: for python3, we can't pass bytestrings as keyword arguments
871 # so we need to decode channel/pattern names back to unicode strings
872 # before passing them to [p]subscribe.
873 self.pending_unsubscribe_channels.clear()
874 self.pending_unsubscribe_patterns.clear()
875 self.pending_unsubscribe_shard_channels.clear()
876 if self.channels:
877 channels = {
878 self.encoder.decode(k, force=True): v for k, v in self.channels.items()
879 }
880 self.subscribe(**channels)
881 if self.patterns:
882 patterns = {
883 self.encoder.decode(k, force=True): v for k, v in self.patterns.items()
884 }
885 self.psubscribe(**patterns)
886 if self.shard_channels:
887 shard_channels = {
888 self.encoder.decode(k, force=True): v
889 for k, v in self.shard_channels.items()
890 }
891 self.ssubscribe(**shard_channels)
893 @property
894 def subscribed(self) -> bool:
895 """Indicates if there are subscriptions to any channels or patterns"""
896 return self.subscribed_event.is_set()
898 def execute_command(self, *args):
899 """Execute a publish/subscribe command"""
901 # NOTE: don't parse the response in this function -- it could pull a
902 # legitimate message off the stack if the connection is already
903 # subscribed to one or more channels
905 if self.connection is None:
906 self.connection = self.connection_pool.get_connection()
907 # register a callback that re-subscribes to any channels we
908 # were listening to when we were disconnected
909 self.connection.register_connect_callback(self.on_connect)
910 if self.push_handler_func is not None:
911 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
912 self._event_dispatcher.dispatch(
913 AfterPubSubConnectionInstantiationEvent(
914 self.connection, self.connection_pool, ClientType.SYNC, self._lock
915 )
916 )
917 connection = self.connection
918 kwargs = {"check_health": not self.subscribed}
919 if not self.subscribed:
920 self.clean_health_check_responses()
921 with self._lock:
922 self._execute(connection, connection.send_command, *args, **kwargs)
924 def clean_health_check_responses(self) -> None:
925 """
926 If any health check responses are present, clean them
927 """
928 ttl = 10
929 conn = self.connection
930 while conn and self.health_check_response_counter > 0 and ttl > 0:
931 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
932 response = self._execute(conn, conn.read_response)
933 if self.is_health_check_response(response):
934 self.health_check_response_counter -= 1
935 else:
936 raise PubSubError(
937 "A non health check response was cleaned by "
938 "execute_command: {}".format(response)
939 )
940 ttl -= 1
942 def _reconnect(self, conn) -> None:
943 """
944 The supported exceptions are already checked in the
945 retry object so we don't need to do it here.
947 In this error handler we are trying to reconnect to the server.
948 """
949 conn.disconnect()
950 conn.connect()
952 def _execute(self, conn, command, *args, **kwargs):
953 """
954 Connect manually upon disconnection. If the Redis server is down,
955 this will fail and raise a ConnectionError as desired.
956 After reconnection, the ``on_connect`` callback should have been
957 called by the # connection to resubscribe us to any channels and
958 patterns we were previously listening to
959 """
961 if conn.should_reconnect():
962 self._reconnect(conn)
964 response = conn.retry.call_with_retry(
965 lambda: command(*args, **kwargs),
966 lambda _: self._reconnect(conn),
967 )
969 return response
971 def parse_response(self, block=True, timeout=0):
972 """Parse the response from a publish/subscribe command"""
973 conn = self.connection
974 if conn is None:
975 raise RuntimeError(
976 "pubsub connection not set: "
977 "did you forget to call subscribe() or psubscribe()?"
978 )
980 self.check_health()
982 def try_read():
983 if not block:
984 if not conn.can_read(timeout=timeout):
985 return None
986 else:
987 conn.connect()
988 return conn.read_response(disconnect_on_error=False, push_request=True)
990 response = self._execute(conn, try_read)
992 if self.is_health_check_response(response):
993 # ignore the health check message as user might not expect it
994 self.health_check_response_counter -= 1
995 return None
996 return response
998 def is_health_check_response(self, response) -> bool:
999 """
1000 Check if the response is a health check response.
1001 If there are no subscriptions redis responds to PING command with a
1002 bulk response, instead of a multi-bulk with "pong" and the response.
1003 """
1004 return response in [
1005 self.health_check_response, # If there was a subscription
1006 self.health_check_response_b, # If there wasn't
1007 ]
1009 def check_health(self) -> None:
1010 conn = self.connection
1011 if conn is None:
1012 raise RuntimeError(
1013 "pubsub connection not set: "
1014 "did you forget to call subscribe() or psubscribe()?"
1015 )
1017 if conn.health_check_interval and time.monotonic() > conn.next_health_check:
1018 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
1019 self.health_check_response_counter += 1
1021 def _normalize_keys(self, data) -> Dict:
1022 """
1023 normalize channel/pattern names to be either bytes or strings
1024 based on whether responses are automatically decoded. this saves us
1025 from coercing the value for each message coming in.
1026 """
1027 encode = self.encoder.encode
1028 decode = self.encoder.decode
1029 return {decode(encode(k)): v for k, v in data.items()}
1031 def psubscribe(self, *args, **kwargs):
1032 """
1033 Subscribe to channel patterns. Patterns supplied as keyword arguments
1034 expect a pattern name as the key and a callable as the value. A
1035 pattern's callable will be invoked automatically when a message is
1036 received on that pattern rather than producing a message via
1037 ``listen()``.
1038 """
1039 if args:
1040 args = list_or_args(args[0], args[1:])
1041 new_patterns = dict.fromkeys(args)
1042 new_patterns.update(kwargs)
1043 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
1044 # update the patterns dict AFTER we send the command. we don't want to
1045 # subscribe twice to these patterns, once for the command and again
1046 # for the reconnection.
1047 new_patterns = self._normalize_keys(new_patterns)
1048 self.patterns.update(new_patterns)
1049 if not self.subscribed:
1050 # Set the subscribed_event flag to True
1051 self.subscribed_event.set()
1052 # Clear the health check counter
1053 self.health_check_response_counter = 0
1054 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1055 return ret_val
1057 def punsubscribe(self, *args):
1058 """
1059 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1060 all patterns.
1061 """
1062 if args:
1063 args = list_or_args(args[0], args[1:])
1064 patterns = self._normalize_keys(dict.fromkeys(args))
1065 else:
1066 patterns = self.patterns
1067 self.pending_unsubscribe_patterns.update(patterns)
1068 return self.execute_command("PUNSUBSCRIBE", *args)
1070 def subscribe(self, *args, **kwargs):
1071 """
1072 Subscribe to channels. Channels supplied as keyword arguments expect
1073 a channel name as the key and a callable as the value. A channel's
1074 callable will be invoked automatically when a message is received on
1075 that channel rather than producing a message via ``listen()`` or
1076 ``get_message()``.
1077 """
1078 if args:
1079 args = list_or_args(args[0], args[1:])
1080 new_channels = dict.fromkeys(args)
1081 new_channels.update(kwargs)
1082 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1083 # update the channels dict AFTER we send the command. we don't want to
1084 # subscribe twice to these channels, once for the command and again
1085 # for the reconnection.
1086 new_channels = self._normalize_keys(new_channels)
1087 self.channels.update(new_channels)
1088 if not self.subscribed:
1089 # Set the subscribed_event flag to True
1090 self.subscribed_event.set()
1091 # Clear the health check counter
1092 self.health_check_response_counter = 0
1093 self.pending_unsubscribe_channels.difference_update(new_channels)
1094 return ret_val
1096 def unsubscribe(self, *args):
1097 """
1098 Unsubscribe from the supplied channels. If empty, unsubscribe from
1099 all channels
1100 """
1101 if args:
1102 args = list_or_args(args[0], args[1:])
1103 channels = self._normalize_keys(dict.fromkeys(args))
1104 else:
1105 channels = self.channels
1106 self.pending_unsubscribe_channels.update(channels)
1107 return self.execute_command("UNSUBSCRIBE", *args)
1109 def ssubscribe(self, *args, target_node=None, **kwargs):
1110 """
1111 Subscribes the client to the specified shard channels.
1112 Channels supplied as keyword arguments expect a channel name as the key
1113 and a callable as the value. A channel's callable will be invoked automatically
1114 when a message is received on that channel rather than producing a message via
1115 ``listen()`` or ``get_sharded_message()``.
1116 """
1117 if args:
1118 args = list_or_args(args[0], args[1:])
1119 new_s_channels = dict.fromkeys(args)
1120 new_s_channels.update(kwargs)
1121 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys())
1122 # update the s_channels dict AFTER we send the command. we don't want to
1123 # subscribe twice to these channels, once for the command and again
1124 # for the reconnection.
1125 new_s_channels = self._normalize_keys(new_s_channels)
1126 self.shard_channels.update(new_s_channels)
1127 if not self.subscribed:
1128 # Set the subscribed_event flag to True
1129 self.subscribed_event.set()
1130 # Clear the health check counter
1131 self.health_check_response_counter = 0
1132 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels)
1133 return ret_val
1135 def sunsubscribe(self, *args, target_node=None):
1136 """
1137 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
1138 all shard_channels
1139 """
1140 if args:
1141 args = list_or_args(args[0], args[1:])
1142 s_channels = self._normalize_keys(dict.fromkeys(args))
1143 else:
1144 s_channels = self.shard_channels
1145 self.pending_unsubscribe_shard_channels.update(s_channels)
1146 return self.execute_command("SUNSUBSCRIBE", *args)
1148 def listen(self):
1149 "Listen for messages on channels this client has been subscribed to"
1150 while self.subscribed:
1151 response = self.handle_message(self.parse_response(block=True))
1152 if response is not None:
1153 yield response
1155 def get_message(
1156 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
1157 ):
1158 """
1159 Get the next message if one is available, otherwise None.
1161 If timeout is specified, the system will wait for `timeout` seconds
1162 before returning. Timeout should be specified as a floating point
1163 number, or None, to wait indefinitely.
1164 """
1165 if not self.subscribed:
1166 # Wait for subscription
1167 start_time = time.monotonic()
1168 if self.subscribed_event.wait(timeout) is True:
1169 # The connection was subscribed during the timeout time frame.
1170 # The timeout should be adjusted based on the time spent
1171 # waiting for the subscription
1172 time_spent = time.monotonic() - start_time
1173 timeout = max(0.0, timeout - time_spent)
1174 else:
1175 # The connection isn't subscribed to any channels or patterns,
1176 # so no messages are available
1177 return None
1179 response = self.parse_response(block=(timeout is None), timeout=timeout)
1181 if response:
1182 return self.handle_message(response, ignore_subscribe_messages)
1183 return None
1185 get_sharded_message = get_message
1187 def ping(self, message: Union[str, None] = None) -> bool:
1188 """
1189 Ping the Redis server to test connectivity.
1191 Sends a PING command to the Redis server and returns True if the server
1192 responds with "PONG".
1193 """
1194 args = ["PING", message] if message is not None else ["PING"]
1195 return self.execute_command(*args)
1197 def handle_message(self, response, ignore_subscribe_messages=False):
1198 """
1199 Parses a pub/sub message. If the channel or pattern was subscribed to
1200 with a message handler, the handler is invoked instead of a parsed
1201 message being returned.
1202 """
1203 if response is None:
1204 return None
1205 if isinstance(response, bytes):
1206 response = [b"pong", response] if response != b"PONG" else [b"pong", b""]
1208 message_type = str_if_bytes(response[0])
1209 if message_type == "pmessage":
1210 message = {
1211 "type": message_type,
1212 "pattern": response[1],
1213 "channel": response[2],
1214 "data": response[3],
1215 }
1216 elif message_type == "pong":
1217 message = {
1218 "type": message_type,
1219 "pattern": None,
1220 "channel": None,
1221 "data": response[1],
1222 }
1223 else:
1224 message = {
1225 "type": message_type,
1226 "pattern": None,
1227 "channel": response[1],
1228 "data": response[2],
1229 }
1231 # if this is an unsubscribe message, remove it from memory
1232 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1233 if message_type == "punsubscribe":
1234 pattern = response[1]
1235 if pattern in self.pending_unsubscribe_patterns:
1236 self.pending_unsubscribe_patterns.remove(pattern)
1237 self.patterns.pop(pattern, None)
1238 elif message_type == "sunsubscribe":
1239 s_channel = response[1]
1240 if s_channel in self.pending_unsubscribe_shard_channels:
1241 self.pending_unsubscribe_shard_channels.remove(s_channel)
1242 self.shard_channels.pop(s_channel, None)
1243 else:
1244 channel = response[1]
1245 if channel in self.pending_unsubscribe_channels:
1246 self.pending_unsubscribe_channels.remove(channel)
1247 self.channels.pop(channel, None)
1248 if not self.channels and not self.patterns and not self.shard_channels:
1249 # There are no subscriptions anymore, set subscribed_event flag
1250 # to false
1251 self.subscribed_event.clear()
1253 if message_type in self.PUBLISH_MESSAGE_TYPES:
1254 # if there's a message handler, invoke it
1255 if message_type == "pmessage":
1256 handler = self.patterns.get(message["pattern"], None)
1257 elif message_type == "smessage":
1258 handler = self.shard_channels.get(message["channel"], None)
1259 else:
1260 handler = self.channels.get(message["channel"], None)
1261 if handler:
1262 handler(message)
1263 return None
1264 elif message_type != "pong":
1265 # this is a subscribe/unsubscribe message. ignore if we don't
1266 # want them
1267 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1268 return None
1270 return message
1272 def run_in_thread(
1273 self,
1274 sleep_time: float = 0.0,
1275 daemon: bool = False,
1276 exception_handler: Optional[Callable] = None,
1277 pubsub=None,
1278 sharded_pubsub: bool = False,
1279 ) -> "PubSubWorkerThread":
1280 for channel, handler in self.channels.items():
1281 if handler is None:
1282 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1283 for pattern, handler in self.patterns.items():
1284 if handler is None:
1285 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1286 for s_channel, handler in self.shard_channels.items():
1287 if handler is None:
1288 raise PubSubError(
1289 f"Shard Channel: '{s_channel}' has no handler registered"
1290 )
1292 pubsub = self if pubsub is None else pubsub
1293 thread = PubSubWorkerThread(
1294 pubsub,
1295 sleep_time,
1296 daemon=daemon,
1297 exception_handler=exception_handler,
1298 sharded_pubsub=sharded_pubsub,
1299 )
1300 thread.start()
1301 return thread
1304class PubSubWorkerThread(threading.Thread):
1305 def __init__(
1306 self,
1307 pubsub,
1308 sleep_time: float,
1309 daemon: bool = False,
1310 exception_handler: Union[
1311 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
1312 ] = None,
1313 sharded_pubsub: bool = False,
1314 ):
1315 super().__init__()
1316 self.daemon = daemon
1317 self.pubsub = pubsub
1318 self.sleep_time = sleep_time
1319 self.exception_handler = exception_handler
1320 self.sharded_pubsub = sharded_pubsub
1321 self._running = threading.Event()
1323 def run(self) -> None:
1324 if self._running.is_set():
1325 return
1326 self._running.set()
1327 pubsub = self.pubsub
1328 sleep_time = self.sleep_time
1329 while self._running.is_set():
1330 try:
1331 if not self.sharded_pubsub:
1332 pubsub.get_message(
1333 ignore_subscribe_messages=True, timeout=sleep_time
1334 )
1335 else:
1336 pubsub.get_sharded_message(
1337 ignore_subscribe_messages=True, timeout=sleep_time
1338 )
1339 except BaseException as e:
1340 if self.exception_handler is None:
1341 raise
1342 self.exception_handler(e, pubsub, self)
1343 pubsub.close()
1345 def stop(self) -> None:
1346 # trip the flag so the run loop exits. the run loop will
1347 # close the pubsub connection, which disconnects the socket
1348 # and returns the connection to the pool.
1349 self._running.clear()
1352class Pipeline(Redis):
1353 """
1354 Pipelines provide a way to transmit multiple commands to the Redis server
1355 in one transmission. This is convenient for batch processing, such as
1356 saving all the values in a list to Redis.
1358 All commands executed within a pipeline(when running in transactional mode,
1359 which is the default behavior) are wrapped with MULTI and EXEC
1360 calls. This guarantees all commands executed in the pipeline will be
1361 executed atomically.
1363 Any command raising an exception does *not* halt the execution of
1364 subsequent commands in the pipeline. Instead, the exception is caught
1365 and its instance is placed into the response list returned by execute().
1366 Code iterating over the response list should be able to deal with an
1367 instance of an exception as a potential value. In general, these will be
1368 ResponseError exceptions, such as those raised when issuing a command
1369 on a key of a different datatype.
1370 """
1372 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1374 def __init__(
1375 self,
1376 connection_pool: ConnectionPool,
1377 response_callbacks,
1378 transaction,
1379 shard_hint,
1380 ):
1381 self.connection_pool = connection_pool
1382 self.connection: Optional[Connection] = None
1383 self.response_callbacks = response_callbacks
1384 self.transaction = transaction
1385 self.shard_hint = shard_hint
1386 self.watching = False
1387 self.command_stack = []
1388 self.scripts: Set[Script] = set()
1389 self.explicit_transaction = False
1391 def __enter__(self) -> "Pipeline":
1392 return self
1394 def __exit__(self, exc_type, exc_value, traceback):
1395 self.reset()
1397 def __del__(self):
1398 try:
1399 self.reset()
1400 except Exception:
1401 pass
1403 def __len__(self) -> int:
1404 return len(self.command_stack)
1406 def __bool__(self) -> bool:
1407 """Pipeline instances should always evaluate to True"""
1408 return True
1410 def reset(self) -> None:
1411 self.command_stack = []
1412 self.scripts = set()
1413 # make sure to reset the connection state in the event that we were
1414 # watching something
1415 if self.watching and self.connection:
1416 try:
1417 # call this manually since our unwatch or
1418 # immediate_execute_command methods can call reset()
1419 self.connection.send_command("UNWATCH")
1420 self.connection.read_response()
1421 except ConnectionError:
1422 # disconnect will also remove any previous WATCHes
1423 self.connection.disconnect()
1424 # clean up the other instance attributes
1425 self.watching = False
1426 self.explicit_transaction = False
1428 # we can safely return the connection to the pool here since we're
1429 # sure we're no longer WATCHing anything
1430 if self.connection:
1431 self.connection_pool.release(self.connection)
1432 self.connection = None
1434 def close(self) -> None:
1435 """Close the pipeline"""
1436 self.reset()
1438 def multi(self) -> None:
1439 """
1440 Start a transactional block of the pipeline after WATCH commands
1441 are issued. End the transactional block with `execute`.
1442 """
1443 if self.explicit_transaction:
1444 raise RedisError("Cannot issue nested calls to MULTI")
1445 if self.command_stack:
1446 raise RedisError(
1447 "Commands without an initial WATCH have already been issued"
1448 )
1449 self.explicit_transaction = True
1451 def execute_command(self, *args, **kwargs):
1452 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1453 return self.immediate_execute_command(*args, **kwargs)
1454 return self.pipeline_execute_command(*args, **kwargs)
1456 def _disconnect_reset_raise_on_watching(
1457 self,
1458 conn: AbstractConnection,
1459 error: Exception,
1460 ) -> None:
1461 """
1462 Close the connection reset watching state and
1463 raise an exception if we were watching.
1465 The supported exceptions are already checked in the
1466 retry object so we don't need to do it here.
1468 After we disconnect the connection, it will try to reconnect and
1469 do a health check as part of the send_command logic(on connection level).
1470 """
1471 conn.disconnect()
1473 # if we were already watching a variable, the watch is no longer
1474 # valid since this connection has died. raise a WatchError, which
1475 # indicates the user should retry this transaction.
1476 if self.watching:
1477 self.reset()
1478 raise WatchError(
1479 f"A {type(error).__name__} occurred while watching one or more keys"
1480 )
1482 def immediate_execute_command(self, *args, **options):
1483 """
1484 Execute a command immediately, but don't auto-retry on the supported
1485 errors for retry if we're already WATCHing a variable.
1486 Used when issuing WATCH or subsequent commands retrieving their values but before
1487 MULTI is called.
1488 """
1489 command_name = args[0]
1490 conn = self.connection
1491 # if this is the first call, we need a connection
1492 if not conn:
1493 conn = self.connection_pool.get_connection()
1494 self.connection = conn
1496 return conn.retry.call_with_retry(
1497 lambda: self._send_command_parse_response(
1498 conn, command_name, *args, **options
1499 ),
1500 lambda error: self._disconnect_reset_raise_on_watching(conn, error),
1501 )
1503 def pipeline_execute_command(self, *args, **options) -> "Pipeline":
1504 """
1505 Stage a command to be executed when execute() is next called
1507 Returns the current Pipeline object back so commands can be
1508 chained together, such as:
1510 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1512 At some other point, you can then run: pipe.execute(),
1513 which will execute all commands queued in the pipe.
1514 """
1515 self.command_stack.append((args, options))
1516 return self
1518 def _execute_transaction(
1519 self, connection: Connection, commands, raise_on_error
1520 ) -> List:
1521 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1522 all_cmds = connection.pack_commands(
1523 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1524 )
1525 connection.send_packed_command(all_cmds)
1526 errors = []
1528 # parse off the response for MULTI
1529 # NOTE: we need to handle ResponseErrors here and continue
1530 # so that we read all the additional command messages from
1531 # the socket
1532 try:
1533 self.parse_response(connection, "_")
1534 except ResponseError as e:
1535 errors.append((0, e))
1537 # and all the other commands
1538 for i, command in enumerate(commands):
1539 if EMPTY_RESPONSE in command[1]:
1540 errors.append((i, command[1][EMPTY_RESPONSE]))
1541 else:
1542 try:
1543 self.parse_response(connection, "_")
1544 except ResponseError as e:
1545 self.annotate_exception(e, i + 1, command[0])
1546 errors.append((i, e))
1548 # parse the EXEC.
1549 try:
1550 response = self.parse_response(connection, "_")
1551 except ExecAbortError:
1552 if errors:
1553 raise errors[0][1]
1554 raise
1556 # EXEC clears any watched keys
1557 self.watching = False
1559 if response is None:
1560 raise WatchError("Watched variable changed.")
1562 # put any parse errors into the response
1563 for i, e in errors:
1564 response.insert(i, e)
1566 if len(response) != len(commands):
1567 self.connection.disconnect()
1568 raise ResponseError(
1569 "Wrong number of response items from pipeline execution"
1570 )
1572 # find any errors in the response and raise if necessary
1573 if raise_on_error:
1574 self.raise_first_error(commands, response)
1576 # We have to run response callbacks manually
1577 data = []
1578 for r, cmd in zip(response, commands):
1579 if not isinstance(r, Exception):
1580 args, options = cmd
1581 # Remove keys entry, it needs only for cache.
1582 options.pop("keys", None)
1583 command_name = args[0]
1584 if command_name in self.response_callbacks:
1585 r = self.response_callbacks[command_name](r, **options)
1586 data.append(r)
1588 return data
1590 def _execute_pipeline(self, connection, commands, raise_on_error):
1591 # build up all commands into a single request to increase network perf
1592 all_cmds = connection.pack_commands([args for args, _ in commands])
1593 connection.send_packed_command(all_cmds)
1595 responses = []
1596 for args, options in commands:
1597 try:
1598 responses.append(self.parse_response(connection, args[0], **options))
1599 except ResponseError as e:
1600 responses.append(e)
1602 if raise_on_error:
1603 self.raise_first_error(commands, responses)
1605 return responses
1607 def raise_first_error(self, commands, response):
1608 for i, r in enumerate(response):
1609 if isinstance(r, ResponseError):
1610 self.annotate_exception(r, i + 1, commands[i][0])
1611 raise r
1613 def annotate_exception(self, exception, number, command):
1614 cmd = " ".join(map(safe_str, command))
1615 msg = (
1616 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
1617 f"caused error: {exception.args[0]}"
1618 )
1619 exception.args = (msg,) + exception.args[1:]
1621 def parse_response(self, connection, command_name, **options):
1622 result = Redis.parse_response(self, connection, command_name, **options)
1623 if command_name in self.UNWATCH_COMMANDS:
1624 self.watching = False
1625 elif command_name == "WATCH":
1626 self.watching = True
1627 return result
1629 def load_scripts(self):
1630 # make sure all scripts that are about to be run on this pipeline exist
1631 scripts = list(self.scripts)
1632 immediate = self.immediate_execute_command
1633 shas = [s.sha for s in scripts]
1634 # we can't use the normal script_* methods because they would just
1635 # get buffered in the pipeline.
1636 exists = immediate("SCRIPT EXISTS", *shas)
1637 if not all(exists):
1638 for s, exist in zip(scripts, exists):
1639 if not exist:
1640 s.sha = immediate("SCRIPT LOAD", s.script)
1642 def _disconnect_raise_on_watching(
1643 self,
1644 conn: AbstractConnection,
1645 error: Exception,
1646 ) -> None:
1647 """
1648 Close the connection, raise an exception if we were watching.
1650 The supported exceptions are already checked in the
1651 retry object so we don't need to do it here.
1653 After we disconnect the connection, it will try to reconnect and
1654 do a health check as part of the send_command logic(on connection level).
1655 """
1656 conn.disconnect()
1657 # if we were watching a variable, the watch is no longer valid
1658 # since this connection has died. raise a WatchError, which
1659 # indicates the user should retry this transaction.
1660 if self.watching:
1661 raise WatchError(
1662 f"A {type(error).__name__} occurred while watching one or more keys"
1663 )
1665 def execute(self, raise_on_error: bool = True) -> List[Any]:
1666 """Execute all the commands in the current pipeline"""
1667 stack = self.command_stack
1668 if not stack and not self.watching:
1669 return []
1670 if self.scripts:
1671 self.load_scripts()
1672 if self.transaction or self.explicit_transaction:
1673 execute = self._execute_transaction
1674 else:
1675 execute = self._execute_pipeline
1677 conn = self.connection
1678 if not conn:
1679 conn = self.connection_pool.get_connection()
1680 # assign to self.connection so reset() releases the connection
1681 # back to the pool after we're done
1682 self.connection = conn
1684 try:
1685 return conn.retry.call_with_retry(
1686 lambda: execute(conn, stack, raise_on_error),
1687 lambda error: self._disconnect_raise_on_watching(conn, error),
1688 )
1689 finally:
1690 # in reset() the connection is disconnected before returned to the pool if
1691 # it is marked for reconnect.
1692 self.reset()
1694 def discard(self):
1695 """
1696 Flushes all previously queued commands
1697 See: https://redis.io/commands/DISCARD
1698 """
1699 self.execute_command("DISCARD")
1701 def watch(self, *names):
1702 """Watches the values at keys ``names``"""
1703 if self.explicit_transaction:
1704 raise RedisError("Cannot issue a WATCH after a MULTI")
1705 return self.execute_command("WATCH", *names)
1707 def unwatch(self) -> bool:
1708 """Unwatches all previously specified keys"""
1709 return self.watching and self.execute_command("UNWATCH") or True