Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import re
3import threading
4import time
5from itertools import chain
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Callable,
10 Dict,
11 List,
12 Mapping,
13 Optional,
14 Set,
15 Type,
16 Union,
17)
19from redis._parsers.encoders import Encoder
20from redis._parsers.helpers import (
21 _RedisCallbacks,
22 _RedisCallbacksRESP2,
23 _RedisCallbacksRESP3,
24 bool_ok,
25)
26from redis.backoff import ExponentialWithJitterBackoff
27from redis.cache import CacheConfig, CacheInterface
28from redis.commands import (
29 CoreCommands,
30 RedisModuleCommands,
31 SentinelCommands,
32 list_or_args,
33)
34from redis.commands.core import Script
35from redis.connection import (
36 AbstractConnection,
37 Connection,
38 ConnectionPool,
39 SSLConnection,
40 UnixDomainSocketConnection,
41)
42from redis.credentials import CredentialProvider
43from redis.driver_info import DriverInfo, resolve_driver_info
44from redis.event import (
45 AfterPooledConnectionsInstantiationEvent,
46 AfterPubSubConnectionInstantiationEvent,
47 AfterSingleConnectionInstantiationEvent,
48 ClientType,
49 EventDispatcher,
50)
51from redis.exceptions import (
52 ConnectionError,
53 ExecAbortError,
54 PubSubError,
55 RedisError,
56 ResponseError,
57 WatchError,
58)
59from redis.lock import Lock
60from redis.maint_notifications import (
61 MaintNotificationsConfig,
62 OSSMaintNotificationsHandler,
63)
64from redis.retry import Retry
65from redis.utils import (
66 _set_info_logger,
67 check_protocol_version,
68 deprecated_args,
69 safe_str,
70 str_if_bytes,
71 truncate_text,
72)
74if TYPE_CHECKING:
75 import ssl
77 import OpenSSL
79SYM_EMPTY = b""
80EMPTY_RESPONSE = "EMPTY_RESPONSE"
82# some responses (ie. dump) are binary, and just meant to never be decoded
83NEVER_DECODE = "NEVER_DECODE"
86class CaseInsensitiveDict(dict):
87 "Case insensitive dict implementation. Assumes string keys only."
89 def __init__(self, data: Dict[str, str]) -> None:
90 for k, v in data.items():
91 self[k.upper()] = v
93 def __contains__(self, k):
94 return super().__contains__(k.upper())
96 def __delitem__(self, k):
97 super().__delitem__(k.upper())
99 def __getitem__(self, k):
100 return super().__getitem__(k.upper())
102 def get(self, k, default=None):
103 return super().get(k.upper(), default)
105 def __setitem__(self, k, v):
106 super().__setitem__(k.upper(), v)
108 def update(self, data):
109 data = CaseInsensitiveDict(data)
110 super().update(data)
113class AbstractRedis:
114 pass
117class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
118 """
119 Implementation of the Redis protocol.
121 This abstract class provides a Python interface to all Redis commands
122 and an implementation of the Redis protocol.
124 Pipelines derive from this, implementing how
125 the commands are sent and received to the Redis server. Based on
126 configuration, an instance will either use a ConnectionPool, or
127 Connection object to talk to redis.
129 It is not safe to pass PubSub or Pipeline objects between threads.
130 """
132 @classmethod
133 def from_url(cls, url: str, **kwargs) -> "Redis":
134 """
135 Return a Redis client object configured from the given URL
137 For example::
139 redis://[[username]:[password]]@localhost:6379/0
140 rediss://[[username]:[password]]@localhost:6379/0
141 unix://[username@]/path/to/socket.sock?db=0[&password=password]
143 Three URL schemes are supported:
145 - `redis://` creates a TCP socket connection. See more at:
146 <https://www.iana.org/assignments/uri-schemes/prov/redis>
147 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
148 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
149 - ``unix://``: creates a Unix Domain Socket connection.
151 The username, password, hostname, path and all querystring values
152 are passed through urllib.parse.unquote in order to replace any
153 percent-encoded values with their corresponding characters.
155 There are several ways to specify a database number. The first value
156 found will be used:
158 1. A ``db`` querystring option, e.g. redis://localhost?db=0
159 2. If using the redis:// or rediss:// schemes, the path argument
160 of the url, e.g. redis://localhost/0
161 3. A ``db`` keyword argument to this function.
163 If none of these options are specified, the default db=0 is used.
165 All querystring options are cast to their appropriate Python types.
166 Boolean arguments can be specified with string values "True"/"False"
167 or "Yes"/"No". Values that cannot be properly cast cause a
168 ``ValueError`` to be raised. Once parsed, the querystring arguments
169 and keyword arguments are passed to the ``ConnectionPool``'s
170 class initializer. In the case of conflicting arguments, querystring
171 arguments always win.
173 """
174 single_connection_client = kwargs.pop("single_connection_client", False)
175 connection_pool = ConnectionPool.from_url(url, **kwargs)
176 client = cls(
177 connection_pool=connection_pool,
178 single_connection_client=single_connection_client,
179 )
180 client.auto_close_connection_pool = True
181 return client
183 @classmethod
184 def from_pool(
185 cls: Type["Redis"],
186 connection_pool: ConnectionPool,
187 ) -> "Redis":
188 """
189 Return a Redis client from the given connection pool.
190 The Redis client will take ownership of the connection pool and
191 close it when the Redis client is closed.
192 """
193 client = cls(
194 connection_pool=connection_pool,
195 )
196 client.auto_close_connection_pool = True
197 return client
199 @deprecated_args(
200 args_to_warn=["retry_on_timeout"],
201 reason="TimeoutError is included by default.",
202 version="6.0.0",
203 )
204 @deprecated_args(
205 args_to_warn=["lib_name", "lib_version"],
206 reason="Use 'driver_info' parameter instead. "
207 "lib_name and lib_version will be removed in a future version.",
208 )
209 def __init__(
210 self,
211 host: str = "localhost",
212 port: int = 6379,
213 db: int = 0,
214 password: Optional[str] = None,
215 socket_timeout: Optional[float] = None,
216 socket_connect_timeout: Optional[float] = None,
217 socket_keepalive: Optional[bool] = None,
218 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
219 connection_pool: Optional[ConnectionPool] = None,
220 unix_socket_path: Optional[str] = None,
221 encoding: str = "utf-8",
222 encoding_errors: str = "strict",
223 decode_responses: bool = False,
224 retry_on_timeout: bool = False,
225 retry: Retry = Retry(
226 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
227 ),
228 retry_on_error: Optional[List[Type[Exception]]] = None,
229 ssl: bool = False,
230 ssl_keyfile: Optional[str] = None,
231 ssl_certfile: Optional[str] = None,
232 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
233 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
234 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
235 ssl_ca_certs: Optional[str] = None,
236 ssl_ca_path: Optional[str] = None,
237 ssl_ca_data: Optional[str] = None,
238 ssl_check_hostname: bool = True,
239 ssl_password: Optional[str] = None,
240 ssl_validate_ocsp: bool = False,
241 ssl_validate_ocsp_stapled: bool = False,
242 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
243 ssl_ocsp_expected_cert: Optional[str] = None,
244 ssl_min_version: Optional["ssl.TLSVersion"] = None,
245 ssl_ciphers: Optional[str] = None,
246 max_connections: Optional[int] = None,
247 single_connection_client: bool = False,
248 health_check_interval: int = 0,
249 client_name: Optional[str] = None,
250 lib_name: Optional[str] = None,
251 lib_version: Optional[str] = None,
252 driver_info: Optional["DriverInfo"] = None,
253 username: Optional[str] = None,
254 redis_connect_func: Optional[Callable[[], None]] = None,
255 credential_provider: Optional[CredentialProvider] = None,
256 protocol: Optional[int] = 2,
257 cache: Optional[CacheInterface] = None,
258 cache_config: Optional[CacheConfig] = None,
259 event_dispatcher: Optional[EventDispatcher] = None,
260 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
261 oss_cluster_maint_notifications_handler: Optional[
262 OSSMaintNotificationsHandler
263 ] = None,
264 ) -> None:
265 """
266 Initialize a new Redis client.
268 To specify a retry policy for specific errors, you have two options:
270 1. Set the `retry_on_error` to a list of the error/s to retry on, and
271 you can also set `retry` to a valid `Retry` object(in case the default
272 one is not appropriate) - with this approach the retries will be triggered
273 on the default errors specified in the Retry object enriched with the
274 errors specified in `retry_on_error`.
276 2. Define a `Retry` object with configured 'supported_errors' and set
277 it to the `retry` parameter - with this approach you completely redefine
278 the errors on which retries will happen.
280 `retry_on_timeout` is deprecated - please include the TimeoutError
281 either in the Retry object or in the `retry_on_error` list.
283 When 'connection_pool' is provided - the retry configuration of the
284 provided pool will be used.
286 Args:
288 single_connection_client:
289 if `True`, connection pool is not used. In that case `Redis`
290 instance use is not thread safe.
291 decode_responses:
292 if `True`, the response will be decoded to utf-8.
293 Argument is ignored when connection_pool is provided.
294 driver_info:
295 Optional DriverInfo object to identify upstream libraries.
296 If provided, lib_name and lib_version are ignored.
297 If not provided, a DriverInfo will be created from lib_name and lib_version.
298 Argument is ignored when connection_pool is provided.
299 lib_name:
300 **Deprecated.** Use driver_info instead. Library name for CLIENT SETINFO.
301 lib_version:
302 **Deprecated.** Use driver_info instead. Library version for CLIENT SETINFO.
303 maint_notifications_config:
304 configures the pool to support maintenance notifications - see
305 `redis.maint_notifications.MaintNotificationsConfig` for details.
306 Only supported with RESP3
307 If not provided and protocol is RESP3, the maintenance notifications
308 will be enabled by default (logic is included in the connection pool
309 initialization).
310 Argument is ignored when connection_pool is provided.
311 oss_cluster_maint_notifications_handler:
312 handler for OSS cluster notifications - see
313 `redis.maint_notifications.OSSMaintNotificationsHandler` for details.
314 Only supported with RESP3
315 Argument is ignored when connection_pool is provided.
316 """
317 if event_dispatcher is None:
318 self._event_dispatcher = EventDispatcher()
319 else:
320 self._event_dispatcher = event_dispatcher
321 if not connection_pool:
322 if not retry_on_error:
323 retry_on_error = []
325 # Handle driver_info: if provided, use it; otherwise create from lib_name/lib_version
326 computed_driver_info = resolve_driver_info(
327 driver_info, lib_name, lib_version
328 )
330 kwargs = {
331 "db": db,
332 "username": username,
333 "password": password,
334 "socket_timeout": socket_timeout,
335 "encoding": encoding,
336 "encoding_errors": encoding_errors,
337 "decode_responses": decode_responses,
338 "retry_on_error": retry_on_error,
339 "retry": copy.deepcopy(retry),
340 "max_connections": max_connections,
341 "health_check_interval": health_check_interval,
342 "client_name": client_name,
343 "driver_info": computed_driver_info,
344 "redis_connect_func": redis_connect_func,
345 "credential_provider": credential_provider,
346 "protocol": protocol,
347 }
348 # based on input, setup appropriate connection args
349 if unix_socket_path is not None:
350 kwargs.update(
351 {
352 "path": unix_socket_path,
353 "connection_class": UnixDomainSocketConnection,
354 }
355 )
356 else:
357 # TCP specific options
358 kwargs.update(
359 {
360 "host": host,
361 "port": port,
362 "socket_connect_timeout": socket_connect_timeout,
363 "socket_keepalive": socket_keepalive,
364 "socket_keepalive_options": socket_keepalive_options,
365 }
366 )
368 if ssl:
369 kwargs.update(
370 {
371 "connection_class": SSLConnection,
372 "ssl_keyfile": ssl_keyfile,
373 "ssl_certfile": ssl_certfile,
374 "ssl_cert_reqs": ssl_cert_reqs,
375 "ssl_include_verify_flags": ssl_include_verify_flags,
376 "ssl_exclude_verify_flags": ssl_exclude_verify_flags,
377 "ssl_ca_certs": ssl_ca_certs,
378 "ssl_ca_data": ssl_ca_data,
379 "ssl_check_hostname": ssl_check_hostname,
380 "ssl_password": ssl_password,
381 "ssl_ca_path": ssl_ca_path,
382 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
383 "ssl_validate_ocsp": ssl_validate_ocsp,
384 "ssl_ocsp_context": ssl_ocsp_context,
385 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
386 "ssl_min_version": ssl_min_version,
387 "ssl_ciphers": ssl_ciphers,
388 }
389 )
390 if (cache_config or cache) and check_protocol_version(protocol, 3):
391 kwargs.update(
392 {
393 "cache": cache,
394 "cache_config": cache_config,
395 }
396 )
397 maint_notifications_enabled = (
398 maint_notifications_config and maint_notifications_config.enabled
399 )
400 if maint_notifications_enabled and protocol not in [
401 3,
402 "3",
403 ]:
404 raise RedisError(
405 "Maintenance notifications handlers on connection are only supported with RESP version 3"
406 )
407 if maint_notifications_config:
408 kwargs.update(
409 {
410 "maint_notifications_config": maint_notifications_config,
411 }
412 )
413 if oss_cluster_maint_notifications_handler:
414 kwargs.update(
415 {
416 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
417 }
418 )
419 connection_pool = ConnectionPool(**kwargs)
420 self._event_dispatcher.dispatch(
421 AfterPooledConnectionsInstantiationEvent(
422 [connection_pool], ClientType.SYNC, credential_provider
423 )
424 )
425 self.auto_close_connection_pool = True
426 else:
427 self.auto_close_connection_pool = False
428 self._event_dispatcher.dispatch(
429 AfterPooledConnectionsInstantiationEvent(
430 [connection_pool], ClientType.SYNC, credential_provider
431 )
432 )
434 self.connection_pool = connection_pool
436 if (cache_config or cache) and self.connection_pool.get_protocol() not in [
437 3,
438 "3",
439 ]:
440 raise RedisError("Client caching is only supported with RESP version 3")
442 self.single_connection_lock = threading.RLock()
443 self.connection = None
444 self._single_connection_client = single_connection_client
445 if self._single_connection_client:
446 self.connection = self.connection_pool.get_connection()
447 self._event_dispatcher.dispatch(
448 AfterSingleConnectionInstantiationEvent(
449 self.connection, ClientType.SYNC, self.single_connection_lock
450 )
451 )
453 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks)
455 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
456 self.response_callbacks.update(_RedisCallbacksRESP3)
457 else:
458 self.response_callbacks.update(_RedisCallbacksRESP2)
460 def __repr__(self) -> str:
461 return (
462 f"<{type(self).__module__}.{type(self).__name__}"
463 f"({repr(self.connection_pool)})>"
464 )
466 def get_encoder(self) -> "Encoder":
467 """Get the connection pool's encoder"""
468 return self.connection_pool.get_encoder()
470 def get_connection_kwargs(self) -> Dict:
471 """Get the connection's key-word arguments"""
472 return self.connection_pool.connection_kwargs
474 def get_retry(self) -> Optional[Retry]:
475 return self.get_connection_kwargs().get("retry")
477 def set_retry(self, retry: Retry) -> None:
478 self.get_connection_kwargs().update({"retry": retry})
479 self.connection_pool.set_retry(retry)
481 def set_response_callback(self, command: str, callback: Callable) -> None:
482 """Set a custom Response Callback"""
483 self.response_callbacks[command] = callback
485 def load_external_module(self, funcname, func) -> None:
486 """
487 This function can be used to add externally defined redis modules,
488 and their namespaces to the redis client.
490 funcname - A string containing the name of the function to create
491 func - The function, being added to this class.
493 ex: Assume that one has a custom redis module named foomod that
494 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
495 To load function functions into this namespace:
497 from redis import Redis
498 from foomodule import F
499 r = Redis()
500 r.load_external_module("foo", F)
501 r.foo().dothing('your', 'arguments')
503 For a concrete example see the reimport of the redisjson module in
504 tests/test_connection.py::test_loading_external_modules
505 """
506 setattr(self, funcname, func)
508 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
509 """
510 Return a new pipeline object that can queue multiple commands for
511 later execution. ``transaction`` indicates whether all commands
512 should be executed atomically. Apart from making a group of operations
513 atomic, pipelines are useful for reducing the back-and-forth overhead
514 between the client and server.
515 """
516 return Pipeline(
517 self.connection_pool, self.response_callbacks, transaction, shard_hint
518 )
520 def transaction(
521 self, func: Callable[["Pipeline"], None], *watches, **kwargs
522 ) -> Union[List[Any], Any, None]:
523 """
524 Convenience method for executing the callable `func` as a transaction
525 while watching all keys specified in `watches`. The 'func' callable
526 should expect a single argument which is a Pipeline object.
527 """
528 shard_hint = kwargs.pop("shard_hint", None)
529 value_from_callable = kwargs.pop("value_from_callable", False)
530 watch_delay = kwargs.pop("watch_delay", None)
531 with self.pipeline(True, shard_hint) as pipe:
532 while True:
533 try:
534 if watches:
535 pipe.watch(*watches)
536 func_value = func(pipe)
537 exec_value = pipe.execute()
538 return func_value if value_from_callable else exec_value
539 except WatchError:
540 if watch_delay is not None and watch_delay > 0:
541 time.sleep(watch_delay)
542 continue
544 def lock(
545 self,
546 name: str,
547 timeout: Optional[float] = None,
548 sleep: float = 0.1,
549 blocking: bool = True,
550 blocking_timeout: Optional[float] = None,
551 lock_class: Union[None, Any] = None,
552 thread_local: bool = True,
553 raise_on_release_error: bool = True,
554 ):
555 """
556 Return a new Lock object using key ``name`` that mimics
557 the behavior of threading.Lock.
559 If specified, ``timeout`` indicates a maximum life for the lock.
560 By default, it will remain locked until release() is called.
562 ``sleep`` indicates the amount of time to sleep per loop iteration
563 when the lock is in blocking mode and another client is currently
564 holding the lock.
566 ``blocking`` indicates whether calling ``acquire`` should block until
567 the lock has been acquired or to fail immediately, causing ``acquire``
568 to return False and the lock not being acquired. Defaults to True.
569 Note this value can be overridden by passing a ``blocking``
570 argument to ``acquire``.
572 ``blocking_timeout`` indicates the maximum amount of time in seconds to
573 spend trying to acquire the lock. A value of ``None`` indicates
574 continue trying forever. ``blocking_timeout`` can be specified as a
575 float or integer, both representing the number of seconds to wait.
577 ``lock_class`` forces the specified lock implementation. Note that as
578 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
579 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
580 you have created your own custom lock class.
582 ``thread_local`` indicates whether the lock token is placed in
583 thread-local storage. By default, the token is placed in thread local
584 storage so that a thread only sees its token, not a token set by
585 another thread. Consider the following timeline:
587 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
588 thread-1 sets the token to "abc"
589 time: 1, thread-2 blocks trying to acquire `my-lock` using the
590 Lock instance.
591 time: 5, thread-1 has not yet completed. redis expires the lock
592 key.
593 time: 5, thread-2 acquired `my-lock` now that it's available.
594 thread-2 sets the token to "xyz"
595 time: 6, thread-1 finishes its work and calls release(). if the
596 token is *not* stored in thread local storage, then
597 thread-1 would see the token value as "xyz" and would be
598 able to successfully release the thread-2's lock.
600 ``raise_on_release_error`` indicates whether to raise an exception when
601 the lock is no longer owned when exiting the context manager. By default,
602 this is True, meaning an exception will be raised. If False, the warning
603 will be logged and the exception will be suppressed.
605 In some use cases it's necessary to disable thread local storage. For
606 example, if you have code where one thread acquires a lock and passes
607 that lock instance to a worker thread to release later. If thread
608 local storage isn't disabled in this case, the worker thread won't see
609 the token set by the thread that acquired the lock. Our assumption
610 is that these cases aren't common and as such default to using
611 thread local storage."""
612 if lock_class is None:
613 lock_class = Lock
614 return lock_class(
615 self,
616 name,
617 timeout=timeout,
618 sleep=sleep,
619 blocking=blocking,
620 blocking_timeout=blocking_timeout,
621 thread_local=thread_local,
622 raise_on_release_error=raise_on_release_error,
623 )
625 def pubsub(self, **kwargs):
626 """
627 Return a Publish/Subscribe object. With this object, you can
628 subscribe to channels and listen for messages that get published to
629 them.
630 """
631 return PubSub(
632 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs
633 )
635 def monitor(self):
636 return Monitor(self.connection_pool)
638 def client(self):
639 return self.__class__(
640 connection_pool=self.connection_pool,
641 single_connection_client=True,
642 )
644 def __enter__(self):
645 return self
647 def __exit__(self, exc_type, exc_value, traceback):
648 self.close()
650 def __del__(self):
651 try:
652 self.close()
653 except Exception:
654 pass
656 def close(self) -> None:
657 # In case a connection property does not yet exist
658 # (due to a crash earlier in the Redis() constructor), return
659 # immediately as there is nothing to clean-up.
660 if not hasattr(self, "connection"):
661 return
663 conn = self.connection
664 if conn:
665 self.connection = None
666 self.connection_pool.release(conn)
668 if self.auto_close_connection_pool:
669 self.connection_pool.disconnect()
671 def _send_command_parse_response(self, conn, command_name, *args, **options):
672 """
673 Send a command and parse the response
674 """
675 conn.send_command(*args, **options)
676 return self.parse_response(conn, command_name, **options)
678 def _close_connection(self, conn) -> None:
679 """
680 Close the connection before retrying.
682 The supported exceptions are already checked in the
683 retry object so we don't need to do it here.
685 After we disconnect the connection, it will try to reconnect and
686 do a health check as part of the send_command logic(on connection level).
687 """
689 conn.disconnect()
691 # COMMAND EXECUTION AND PROTOCOL PARSING
692 def execute_command(self, *args, **options):
693 return self._execute_command(*args, **options)
695 def _execute_command(self, *args, **options):
696 """Execute a command and return a parsed response"""
697 pool = self.connection_pool
698 command_name = args[0]
699 conn = self.connection or pool.get_connection()
701 if self._single_connection_client:
702 self.single_connection_lock.acquire()
703 try:
704 return conn.retry.call_with_retry(
705 lambda: self._send_command_parse_response(
706 conn, command_name, *args, **options
707 ),
708 lambda _: self._close_connection(conn),
709 )
711 finally:
712 if conn and conn.should_reconnect():
713 self._close_connection(conn)
714 conn.connect()
715 if self._single_connection_client:
716 self.single_connection_lock.release()
717 if not self.connection:
718 pool.release(conn)
720 def parse_response(self, connection, command_name, **options):
721 """Parses a response from the Redis server"""
722 try:
723 if NEVER_DECODE in options:
724 response = connection.read_response(disable_decoding=True)
725 options.pop(NEVER_DECODE)
726 else:
727 response = connection.read_response()
728 except ResponseError:
729 if EMPTY_RESPONSE in options:
730 return options[EMPTY_RESPONSE]
731 raise
733 if EMPTY_RESPONSE in options:
734 options.pop(EMPTY_RESPONSE)
736 # Remove keys entry, it needs only for cache.
737 options.pop("keys", None)
739 if command_name in self.response_callbacks:
740 return self.response_callbacks[command_name](response, **options)
741 return response
743 def get_cache(self) -> Optional[CacheInterface]:
744 return self.connection_pool.cache
747StrictRedis = Redis
750class Monitor:
751 """
752 Monitor is useful for handling the MONITOR command to the redis server.
753 next_command() method returns one command from monitor
754 listen() method yields commands from monitor.
755 """
757 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)")
758 command_re = re.compile(r'"(.*?)(?<!\\)"')
760 def __init__(self, connection_pool):
761 self.connection_pool = connection_pool
762 self.connection = self.connection_pool.get_connection()
764 def __enter__(self):
765 self._start_monitor()
766 return self
768 def __exit__(self, *args):
769 self.connection.disconnect()
770 self.connection_pool.release(self.connection)
772 def next_command(self):
773 """Parse the response from a monitor command"""
774 response = self.connection.read_response()
776 if response is None:
777 return None
779 if isinstance(response, bytes):
780 response = self.connection.encoder.decode(response, force=True)
782 command_time, command_data = response.split(" ", 1)
783 m = self.monitor_re.match(command_data)
784 db_id, client_info, command = m.groups()
785 command = " ".join(self.command_re.findall(command))
786 # Redis escapes double quotes because each piece of the command
787 # string is surrounded by double quotes. We don't have that
788 # requirement so remove the escaping and leave the quote.
789 command = command.replace('\\"', '"')
791 if client_info == "lua":
792 client_address = "lua"
793 client_port = ""
794 client_type = "lua"
795 elif client_info.startswith("unix"):
796 client_address = "unix"
797 client_port = client_info[5:]
798 client_type = "unix"
799 else:
800 if client_info == "":
801 client_address = ""
802 client_port = ""
803 client_type = "unknown"
804 else:
805 # use rsplit as ipv6 addresses contain colons
806 client_address, client_port = client_info.rsplit(":", 1)
807 client_type = "tcp"
808 return {
809 "time": float(command_time),
810 "db": int(db_id),
811 "client_address": client_address,
812 "client_port": client_port,
813 "client_type": client_type,
814 "command": command,
815 }
817 def listen(self):
818 """Listen for commands coming to the server."""
819 while True:
820 yield self.next_command()
822 def _start_monitor(self):
823 self.connection.send_command("MONITOR")
824 # check that monitor returns 'OK', but don't return it to user
825 response = self.connection.read_response()
827 if not bool_ok(response):
828 raise RedisError(f"MONITOR failed: {response}")
831class PubSub:
832 """
833 PubSub provides publish, subscribe and listen support to Redis channels.
835 After subscribing to one or more channels, the listen() method will block
836 until a message arrives on one of the subscribed channels. That message
837 will be returned and it's safe to start listening again.
838 """
840 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage")
841 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe")
842 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
844 def __init__(
845 self,
846 connection_pool,
847 shard_hint=None,
848 ignore_subscribe_messages: bool = False,
849 encoder: Optional["Encoder"] = None,
850 push_handler_func: Union[None, Callable[[str], None]] = None,
851 event_dispatcher: Optional["EventDispatcher"] = None,
852 ):
853 self.connection_pool = connection_pool
854 self.shard_hint = shard_hint
855 self.ignore_subscribe_messages = ignore_subscribe_messages
856 self.connection = None
857 self.subscribed_event = threading.Event()
858 # we need to know the encoding options for this connection in order
859 # to lookup channel and pattern names for callback handlers.
860 self.encoder = encoder
861 self.push_handler_func = push_handler_func
862 if event_dispatcher is None:
863 self._event_dispatcher = EventDispatcher()
864 else:
865 self._event_dispatcher = event_dispatcher
867 self._lock = threading.RLock()
868 if self.encoder is None:
869 self.encoder = self.connection_pool.get_encoder()
870 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
871 if self.encoder.decode_responses:
872 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
873 else:
874 self.health_check_response = [b"pong", self.health_check_response_b]
875 if self.push_handler_func is None:
876 _set_info_logger()
877 self.reset()
879 def __enter__(self) -> "PubSub":
880 return self
882 def __exit__(self, exc_type, exc_value, traceback) -> None:
883 self.reset()
885 def __del__(self) -> None:
886 try:
887 # if this object went out of scope prior to shutting down
888 # subscriptions, close the connection manually before
889 # returning it to the connection pool
890 self.reset()
891 except Exception:
892 pass
894 def reset(self) -> None:
895 if self.connection:
896 self.connection.disconnect()
897 self.connection.deregister_connect_callback(self.on_connect)
898 self.connection_pool.release(self.connection)
899 self.connection = None
900 self.health_check_response_counter = 0
901 self.channels = {}
902 self.pending_unsubscribe_channels = set()
903 self.shard_channels = {}
904 self.pending_unsubscribe_shard_channels = set()
905 self.patterns = {}
906 self.pending_unsubscribe_patterns = set()
907 self.subscribed_event.clear()
909 def close(self) -> None:
910 self.reset()
912 def on_connect(self, connection) -> None:
913 "Re-subscribe to any channels and patterns previously subscribed to"
914 # NOTE: for python3, we can't pass bytestrings as keyword arguments
915 # so we need to decode channel/pattern names back to unicode strings
916 # before passing them to [p]subscribe.
917 self.pending_unsubscribe_channels.clear()
918 self.pending_unsubscribe_patterns.clear()
919 self.pending_unsubscribe_shard_channels.clear()
920 if self.channels:
921 channels = {
922 self.encoder.decode(k, force=True): v for k, v in self.channels.items()
923 }
924 self.subscribe(**channels)
925 if self.patterns:
926 patterns = {
927 self.encoder.decode(k, force=True): v for k, v in self.patterns.items()
928 }
929 self.psubscribe(**patterns)
930 if self.shard_channels:
931 shard_channels = {
932 self.encoder.decode(k, force=True): v
933 for k, v in self.shard_channels.items()
934 }
935 self.ssubscribe(**shard_channels)
937 @property
938 def subscribed(self) -> bool:
939 """Indicates if there are subscriptions to any channels or patterns"""
940 return self.subscribed_event.is_set()
942 def execute_command(self, *args):
943 """Execute a publish/subscribe command"""
945 # NOTE: don't parse the response in this function -- it could pull a
946 # legitimate message off the stack if the connection is already
947 # subscribed to one or more channels
949 if self.connection is None:
950 self.connection = self.connection_pool.get_connection()
951 # register a callback that re-subscribes to any channels we
952 # were listening to when we were disconnected
953 self.connection.register_connect_callback(self.on_connect)
954 if self.push_handler_func is not None:
955 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
956 self._event_dispatcher.dispatch(
957 AfterPubSubConnectionInstantiationEvent(
958 self.connection, self.connection_pool, ClientType.SYNC, self._lock
959 )
960 )
961 connection = self.connection
962 kwargs = {"check_health": not self.subscribed}
963 if not self.subscribed:
964 self.clean_health_check_responses()
965 with self._lock:
966 self._execute(connection, connection.send_command, *args, **kwargs)
968 def clean_health_check_responses(self) -> None:
969 """
970 If any health check responses are present, clean them
971 """
972 ttl = 10
973 conn = self.connection
974 while conn and self.health_check_response_counter > 0 and ttl > 0:
975 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
976 response = self._execute(conn, conn.read_response)
977 if self.is_health_check_response(response):
978 self.health_check_response_counter -= 1
979 else:
980 raise PubSubError(
981 "A non health check response was cleaned by "
982 "execute_command: {}".format(response)
983 )
984 ttl -= 1
986 def _reconnect(self, conn) -> None:
987 """
988 The supported exceptions are already checked in the
989 retry object so we don't need to do it here.
991 In this error handler we are trying to reconnect to the server.
992 """
993 conn.disconnect()
994 conn.connect()
996 def _execute(self, conn, command, *args, **kwargs):
997 """
998 Connect manually upon disconnection. If the Redis server is down,
999 this will fail and raise a ConnectionError as desired.
1000 After reconnection, the ``on_connect`` callback should have been
1001 called by the # connection to resubscribe us to any channels and
1002 patterns we were previously listening to
1003 """
1005 if conn.should_reconnect():
1006 self._reconnect(conn)
1008 response = conn.retry.call_with_retry(
1009 lambda: command(*args, **kwargs),
1010 lambda _: self._reconnect(conn),
1011 )
1013 return response
1015 def parse_response(self, block=True, timeout=0):
1016 """Parse the response from a publish/subscribe command"""
1017 conn = self.connection
1018 if conn is None:
1019 raise RuntimeError(
1020 "pubsub connection not set: "
1021 "did you forget to call subscribe() or psubscribe()?"
1022 )
1024 self.check_health()
1026 def try_read():
1027 if not block:
1028 if not conn.can_read(timeout=timeout):
1029 return None
1030 else:
1031 conn.connect()
1032 return conn.read_response(disconnect_on_error=False, push_request=True)
1034 response = self._execute(conn, try_read)
1036 if self.is_health_check_response(response):
1037 # ignore the health check message as user might not expect it
1038 self.health_check_response_counter -= 1
1039 return None
1040 return response
1042 def is_health_check_response(self, response) -> bool:
1043 """
1044 Check if the response is a health check response.
1045 If there are no subscriptions redis responds to PING command with a
1046 bulk response, instead of a multi-bulk with "pong" and the response.
1047 """
1048 if self.encoder.decode_responses:
1049 return (
1050 response
1051 in [
1052 self.health_check_response, # If there is a subscription
1053 self.HEALTH_CHECK_MESSAGE, # If there are no subscriptions and decode_responses=True
1054 ]
1055 )
1056 else:
1057 return (
1058 response
1059 in [
1060 self.health_check_response, # If there is a subscription
1061 self.health_check_response_b, # If there isn't a subscription and decode_responses=False
1062 ]
1063 )
1065 def check_health(self) -> None:
1066 conn = self.connection
1067 if conn is None:
1068 raise RuntimeError(
1069 "pubsub connection not set: "
1070 "did you forget to call subscribe() or psubscribe()?"
1071 )
1073 if conn.health_check_interval and time.monotonic() > conn.next_health_check:
1074 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
1075 self.health_check_response_counter += 1
1077 def _normalize_keys(self, data) -> Dict:
1078 """
1079 normalize channel/pattern names to be either bytes or strings
1080 based on whether responses are automatically decoded. this saves us
1081 from coercing the value for each message coming in.
1082 """
1083 encode = self.encoder.encode
1084 decode = self.encoder.decode
1085 return {decode(encode(k)): v for k, v in data.items()}
1087 def psubscribe(self, *args, **kwargs):
1088 """
1089 Subscribe to channel patterns. Patterns supplied as keyword arguments
1090 expect a pattern name as the key and a callable as the value. A
1091 pattern's callable will be invoked automatically when a message is
1092 received on that pattern rather than producing a message via
1093 ``listen()``.
1094 """
1095 if args:
1096 args = list_or_args(args[0], args[1:])
1097 new_patterns = dict.fromkeys(args)
1098 new_patterns.update(kwargs)
1099 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
1100 # update the patterns dict AFTER we send the command. we don't want to
1101 # subscribe twice to these patterns, once for the command and again
1102 # for the reconnection.
1103 new_patterns = self._normalize_keys(new_patterns)
1104 self.patterns.update(new_patterns)
1105 if not self.subscribed:
1106 # Set the subscribed_event flag to True
1107 self.subscribed_event.set()
1108 # Clear the health check counter
1109 self.health_check_response_counter = 0
1110 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1111 return ret_val
1113 def punsubscribe(self, *args):
1114 """
1115 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1116 all patterns.
1117 """
1118 if args:
1119 args = list_or_args(args[0], args[1:])
1120 patterns = self._normalize_keys(dict.fromkeys(args))
1121 else:
1122 patterns = self.patterns
1123 self.pending_unsubscribe_patterns.update(patterns)
1124 return self.execute_command("PUNSUBSCRIBE", *args)
1126 def subscribe(self, *args, **kwargs):
1127 """
1128 Subscribe to channels. Channels supplied as keyword arguments expect
1129 a channel name as the key and a callable as the value. A channel's
1130 callable will be invoked automatically when a message is received on
1131 that channel rather than producing a message via ``listen()`` or
1132 ``get_message()``.
1133 """
1134 if args:
1135 args = list_or_args(args[0], args[1:])
1136 new_channels = dict.fromkeys(args)
1137 new_channels.update(kwargs)
1138 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1139 # update the channels dict AFTER we send the command. we don't want to
1140 # subscribe twice to these channels, once for the command and again
1141 # for the reconnection.
1142 new_channels = self._normalize_keys(new_channels)
1143 self.channels.update(new_channels)
1144 if not self.subscribed:
1145 # Set the subscribed_event flag to True
1146 self.subscribed_event.set()
1147 # Clear the health check counter
1148 self.health_check_response_counter = 0
1149 self.pending_unsubscribe_channels.difference_update(new_channels)
1150 return ret_val
1152 def unsubscribe(self, *args):
1153 """
1154 Unsubscribe from the supplied channels. If empty, unsubscribe from
1155 all channels
1156 """
1157 if args:
1158 args = list_or_args(args[0], args[1:])
1159 channels = self._normalize_keys(dict.fromkeys(args))
1160 else:
1161 channels = self.channels
1162 self.pending_unsubscribe_channels.update(channels)
1163 return self.execute_command("UNSUBSCRIBE", *args)
1165 def ssubscribe(self, *args, target_node=None, **kwargs):
1166 """
1167 Subscribes the client to the specified shard channels.
1168 Channels supplied as keyword arguments expect a channel name as the key
1169 and a callable as the value. A channel's callable will be invoked automatically
1170 when a message is received on that channel rather than producing a message via
1171 ``listen()`` or ``get_sharded_message()``.
1172 """
1173 if args:
1174 args = list_or_args(args[0], args[1:])
1175 new_s_channels = dict.fromkeys(args)
1176 new_s_channels.update(kwargs)
1177 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys())
1178 # update the s_channels dict AFTER we send the command. we don't want to
1179 # subscribe twice to these channels, once for the command and again
1180 # for the reconnection.
1181 new_s_channels = self._normalize_keys(new_s_channels)
1182 self.shard_channels.update(new_s_channels)
1183 if not self.subscribed:
1184 # Set the subscribed_event flag to True
1185 self.subscribed_event.set()
1186 # Clear the health check counter
1187 self.health_check_response_counter = 0
1188 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels)
1189 return ret_val
1191 def sunsubscribe(self, *args, target_node=None):
1192 """
1193 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
1194 all shard_channels
1195 """
1196 if args:
1197 args = list_or_args(args[0], args[1:])
1198 s_channels = self._normalize_keys(dict.fromkeys(args))
1199 else:
1200 s_channels = self.shard_channels
1201 self.pending_unsubscribe_shard_channels.update(s_channels)
1202 return self.execute_command("SUNSUBSCRIBE", *args)
1204 def listen(self):
1205 "Listen for messages on channels this client has been subscribed to"
1206 while self.subscribed:
1207 response = self.handle_message(self.parse_response(block=True))
1208 if response is not None:
1209 yield response
1211 def get_message(
1212 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
1213 ):
1214 """
1215 Get the next message if one is available, otherwise None.
1217 If timeout is specified, the system will wait for `timeout` seconds
1218 before returning. Timeout should be specified as a floating point
1219 number, or None, to wait indefinitely.
1220 """
1221 if not self.subscribed:
1222 # Wait for subscription
1223 start_time = time.monotonic()
1224 if self.subscribed_event.wait(timeout) is True:
1225 # The connection was subscribed during the timeout time frame.
1226 # The timeout should be adjusted based on the time spent
1227 # waiting for the subscription
1228 time_spent = time.monotonic() - start_time
1229 timeout = max(0.0, timeout - time_spent)
1230 else:
1231 # The connection isn't subscribed to any channels or patterns,
1232 # so no messages are available
1233 return None
1235 response = self.parse_response(block=(timeout is None), timeout=timeout)
1237 if response:
1238 return self.handle_message(response, ignore_subscribe_messages)
1239 return None
1241 get_sharded_message = get_message
1243 def ping(self, message: Union[str, None] = None) -> bool:
1244 """
1245 Ping the Redis server to test connectivity.
1247 Sends a PING command to the Redis server and returns True if the server
1248 responds with "PONG".
1249 """
1250 args = ["PING", message] if message is not None else ["PING"]
1251 return self.execute_command(*args)
1253 def handle_message(self, response, ignore_subscribe_messages=False):
1254 """
1255 Parses a pub/sub message. If the channel or pattern was subscribed to
1256 with a message handler, the handler is invoked instead of a parsed
1257 message being returned.
1258 """
1259 if response is None:
1260 return None
1261 if isinstance(response, bytes):
1262 response = [b"pong", response] if response != b"PONG" else [b"pong", b""]
1264 message_type = str_if_bytes(response[0])
1265 if message_type == "pmessage":
1266 message = {
1267 "type": message_type,
1268 "pattern": response[1],
1269 "channel": response[2],
1270 "data": response[3],
1271 }
1272 elif message_type == "pong":
1273 message = {
1274 "type": message_type,
1275 "pattern": None,
1276 "channel": None,
1277 "data": response[1],
1278 }
1279 else:
1280 message = {
1281 "type": message_type,
1282 "pattern": None,
1283 "channel": response[1],
1284 "data": response[2],
1285 }
1287 # if this is an unsubscribe message, remove it from memory
1288 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1289 if message_type == "punsubscribe":
1290 pattern = response[1]
1291 if pattern in self.pending_unsubscribe_patterns:
1292 self.pending_unsubscribe_patterns.remove(pattern)
1293 self.patterns.pop(pattern, None)
1294 elif message_type == "sunsubscribe":
1295 s_channel = response[1]
1296 if s_channel in self.pending_unsubscribe_shard_channels:
1297 self.pending_unsubscribe_shard_channels.remove(s_channel)
1298 self.shard_channels.pop(s_channel, None)
1299 else:
1300 channel = response[1]
1301 if channel in self.pending_unsubscribe_channels:
1302 self.pending_unsubscribe_channels.remove(channel)
1303 self.channels.pop(channel, None)
1304 if not self.channels and not self.patterns and not self.shard_channels:
1305 # There are no subscriptions anymore, set subscribed_event flag
1306 # to false
1307 self.subscribed_event.clear()
1309 if message_type in self.PUBLISH_MESSAGE_TYPES:
1310 # if there's a message handler, invoke it
1311 if message_type == "pmessage":
1312 handler = self.patterns.get(message["pattern"], None)
1313 elif message_type == "smessage":
1314 handler = self.shard_channels.get(message["channel"], None)
1315 else:
1316 handler = self.channels.get(message["channel"], None)
1317 if handler:
1318 handler(message)
1319 return None
1320 elif message_type != "pong":
1321 # this is a subscribe/unsubscribe message. ignore if we don't
1322 # want them
1323 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1324 return None
1326 return message
1328 def run_in_thread(
1329 self,
1330 sleep_time: float = 0.0,
1331 daemon: bool = False,
1332 exception_handler: Optional[Callable] = None,
1333 pubsub=None,
1334 sharded_pubsub: bool = False,
1335 ) -> "PubSubWorkerThread":
1336 for channel, handler in self.channels.items():
1337 if handler is None:
1338 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1339 for pattern, handler in self.patterns.items():
1340 if handler is None:
1341 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1342 for s_channel, handler in self.shard_channels.items():
1343 if handler is None:
1344 raise PubSubError(
1345 f"Shard Channel: '{s_channel}' has no handler registered"
1346 )
1348 pubsub = self if pubsub is None else pubsub
1349 thread = PubSubWorkerThread(
1350 pubsub,
1351 sleep_time,
1352 daemon=daemon,
1353 exception_handler=exception_handler,
1354 sharded_pubsub=sharded_pubsub,
1355 )
1356 thread.start()
1357 return thread
1360class PubSubWorkerThread(threading.Thread):
1361 def __init__(
1362 self,
1363 pubsub,
1364 sleep_time: float,
1365 daemon: bool = False,
1366 exception_handler: Union[
1367 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
1368 ] = None,
1369 sharded_pubsub: bool = False,
1370 ):
1371 super().__init__()
1372 self.daemon = daemon
1373 self.pubsub = pubsub
1374 self.sleep_time = sleep_time
1375 self.exception_handler = exception_handler
1376 self.sharded_pubsub = sharded_pubsub
1377 self._running = threading.Event()
1379 def run(self) -> None:
1380 if self._running.is_set():
1381 return
1382 self._running.set()
1383 pubsub = self.pubsub
1384 sleep_time = self.sleep_time
1385 while self._running.is_set():
1386 try:
1387 if not self.sharded_pubsub:
1388 pubsub.get_message(
1389 ignore_subscribe_messages=True, timeout=sleep_time
1390 )
1391 else:
1392 pubsub.get_sharded_message(
1393 ignore_subscribe_messages=True, timeout=sleep_time
1394 )
1395 except BaseException as e:
1396 if self.exception_handler is None:
1397 raise
1398 self.exception_handler(e, pubsub, self)
1399 pubsub.close()
1401 def stop(self) -> None:
1402 # trip the flag so the run loop exits. the run loop will
1403 # close the pubsub connection, which disconnects the socket
1404 # and returns the connection to the pool.
1405 self._running.clear()
1408class Pipeline(Redis):
1409 """
1410 Pipelines provide a way to transmit multiple commands to the Redis server
1411 in one transmission. This is convenient for batch processing, such as
1412 saving all the values in a list to Redis.
1414 All commands executed within a pipeline(when running in transactional mode,
1415 which is the default behavior) are wrapped with MULTI and EXEC
1416 calls. This guarantees all commands executed in the pipeline will be
1417 executed atomically.
1419 Any command raising an exception does *not* halt the execution of
1420 subsequent commands in the pipeline. Instead, the exception is caught
1421 and its instance is placed into the response list returned by execute().
1422 Code iterating over the response list should be able to deal with an
1423 instance of an exception as a potential value. In general, these will be
1424 ResponseError exceptions, such as those raised when issuing a command
1425 on a key of a different datatype.
1426 """
1428 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1430 def __init__(
1431 self,
1432 connection_pool: ConnectionPool,
1433 response_callbacks,
1434 transaction,
1435 shard_hint,
1436 ):
1437 self.connection_pool = connection_pool
1438 self.connection: Optional[Connection] = None
1439 self.response_callbacks = response_callbacks
1440 self.transaction = transaction
1441 self.shard_hint = shard_hint
1442 self.watching = False
1443 self.command_stack = []
1444 self.scripts: Set[Script] = set()
1445 self.explicit_transaction = False
1447 def __enter__(self) -> "Pipeline":
1448 return self
1450 def __exit__(self, exc_type, exc_value, traceback):
1451 self.reset()
1453 def __del__(self):
1454 try:
1455 self.reset()
1456 except Exception:
1457 pass
1459 def __len__(self) -> int:
1460 return len(self.command_stack)
1462 def __bool__(self) -> bool:
1463 """Pipeline instances should always evaluate to True"""
1464 return True
1466 def reset(self) -> None:
1467 self.command_stack = []
1468 self.scripts = set()
1469 # make sure to reset the connection state in the event that we were
1470 # watching something
1471 if self.watching and self.connection:
1472 try:
1473 # call this manually since our unwatch or
1474 # immediate_execute_command methods can call reset()
1475 self.connection.send_command("UNWATCH")
1476 self.connection.read_response()
1477 except ConnectionError:
1478 # disconnect will also remove any previous WATCHes
1479 self.connection.disconnect()
1480 # clean up the other instance attributes
1481 self.watching = False
1482 self.explicit_transaction = False
1484 # we can safely return the connection to the pool here since we're
1485 # sure we're no longer WATCHing anything
1486 if self.connection:
1487 self.connection_pool.release(self.connection)
1488 self.connection = None
1490 def close(self) -> None:
1491 """Close the pipeline"""
1492 self.reset()
1494 def multi(self) -> None:
1495 """
1496 Start a transactional block of the pipeline after WATCH commands
1497 are issued. End the transactional block with `execute`.
1498 """
1499 if self.explicit_transaction:
1500 raise RedisError("Cannot issue nested calls to MULTI")
1501 if self.command_stack:
1502 raise RedisError(
1503 "Commands without an initial WATCH have already been issued"
1504 )
1505 self.explicit_transaction = True
1507 def execute_command(self, *args, **kwargs):
1508 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1509 return self.immediate_execute_command(*args, **kwargs)
1510 return self.pipeline_execute_command(*args, **kwargs)
1512 def _disconnect_reset_raise_on_watching(
1513 self,
1514 conn: AbstractConnection,
1515 error: Exception,
1516 ) -> None:
1517 """
1518 Close the connection reset watching state and
1519 raise an exception if we were watching.
1521 The supported exceptions are already checked in the
1522 retry object so we don't need to do it here.
1524 After we disconnect the connection, it will try to reconnect and
1525 do a health check as part of the send_command logic(on connection level).
1526 """
1527 conn.disconnect()
1529 # if we were already watching a variable, the watch is no longer
1530 # valid since this connection has died. raise a WatchError, which
1531 # indicates the user should retry this transaction.
1532 if self.watching:
1533 self.reset()
1534 raise WatchError(
1535 f"A {type(error).__name__} occurred while watching one or more keys"
1536 )
1538 def immediate_execute_command(self, *args, **options):
1539 """
1540 Execute a command immediately, but don't auto-retry on the supported
1541 errors for retry if we're already WATCHing a variable.
1542 Used when issuing WATCH or subsequent commands retrieving their values but before
1543 MULTI is called.
1544 """
1545 command_name = args[0]
1546 conn = self.connection
1547 # if this is the first call, we need a connection
1548 if not conn:
1549 conn = self.connection_pool.get_connection()
1550 self.connection = conn
1552 return conn.retry.call_with_retry(
1553 lambda: self._send_command_parse_response(
1554 conn, command_name, *args, **options
1555 ),
1556 lambda error: self._disconnect_reset_raise_on_watching(conn, error),
1557 )
1559 def pipeline_execute_command(self, *args, **options) -> "Pipeline":
1560 """
1561 Stage a command to be executed when execute() is next called
1563 Returns the current Pipeline object back so commands can be
1564 chained together, such as:
1566 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1568 At some other point, you can then run: pipe.execute(),
1569 which will execute all commands queued in the pipe.
1570 """
1571 self.command_stack.append((args, options))
1572 return self
1574 def _execute_transaction(
1575 self, connection: Connection, commands, raise_on_error
1576 ) -> List:
1577 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1578 all_cmds = connection.pack_commands(
1579 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1580 )
1581 connection.send_packed_command(all_cmds)
1582 errors = []
1584 # parse off the response for MULTI
1585 # NOTE: we need to handle ResponseErrors here and continue
1586 # so that we read all the additional command messages from
1587 # the socket
1588 try:
1589 self.parse_response(connection, "_")
1590 except ResponseError as e:
1591 errors.append((0, e))
1593 # and all the other commands
1594 for i, command in enumerate(commands):
1595 if EMPTY_RESPONSE in command[1]:
1596 errors.append((i, command[1][EMPTY_RESPONSE]))
1597 else:
1598 try:
1599 self.parse_response(connection, "_")
1600 except ResponseError as e:
1601 self.annotate_exception(e, i + 1, command[0])
1602 errors.append((i, e))
1604 # parse the EXEC.
1605 try:
1606 response = self.parse_response(connection, "_")
1607 except ExecAbortError:
1608 if errors:
1609 raise errors[0][1]
1610 raise
1612 # EXEC clears any watched keys
1613 self.watching = False
1615 if response is None:
1616 raise WatchError("Watched variable changed.")
1618 # put any parse errors into the response
1619 for i, e in errors:
1620 response.insert(i, e)
1622 if len(response) != len(commands):
1623 self.connection.disconnect()
1624 raise ResponseError(
1625 "Wrong number of response items from pipeline execution"
1626 )
1628 # find any errors in the response and raise if necessary
1629 if raise_on_error:
1630 self.raise_first_error(commands, response)
1632 # We have to run response callbacks manually
1633 data = []
1634 for r, cmd in zip(response, commands):
1635 if not isinstance(r, Exception):
1636 args, options = cmd
1637 # Remove keys entry, it needs only for cache.
1638 options.pop("keys", None)
1639 command_name = args[0]
1640 if command_name in self.response_callbacks:
1641 r = self.response_callbacks[command_name](r, **options)
1642 data.append(r)
1644 return data
1646 def _execute_pipeline(self, connection, commands, raise_on_error):
1647 # build up all commands into a single request to increase network perf
1648 all_cmds = connection.pack_commands([args for args, _ in commands])
1649 connection.send_packed_command(all_cmds)
1651 responses = []
1652 for args, options in commands:
1653 try:
1654 responses.append(self.parse_response(connection, args[0], **options))
1655 except ResponseError as e:
1656 responses.append(e)
1658 if raise_on_error:
1659 self.raise_first_error(commands, responses)
1661 return responses
1663 def raise_first_error(self, commands, response):
1664 for i, r in enumerate(response):
1665 if isinstance(r, ResponseError):
1666 self.annotate_exception(r, i + 1, commands[i][0])
1667 raise r
1669 def annotate_exception(self, exception, number, command):
1670 cmd = " ".join(map(safe_str, command))
1671 msg = (
1672 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
1673 f"caused error: {exception.args[0]}"
1674 )
1675 exception.args = (msg,) + exception.args[1:]
1677 def parse_response(self, connection, command_name, **options):
1678 result = Redis.parse_response(self, connection, command_name, **options)
1679 if command_name in self.UNWATCH_COMMANDS:
1680 self.watching = False
1681 elif command_name == "WATCH":
1682 self.watching = True
1683 return result
1685 def load_scripts(self):
1686 # make sure all scripts that are about to be run on this pipeline exist
1687 scripts = list(self.scripts)
1688 immediate = self.immediate_execute_command
1689 shas = [s.sha for s in scripts]
1690 # we can't use the normal script_* methods because they would just
1691 # get buffered in the pipeline.
1692 exists = immediate("SCRIPT EXISTS", *shas)
1693 if not all(exists):
1694 for s, exist in zip(scripts, exists):
1695 if not exist:
1696 s.sha = immediate("SCRIPT LOAD", s.script)
1698 def _disconnect_raise_on_watching(
1699 self,
1700 conn: AbstractConnection,
1701 error: Exception,
1702 ) -> None:
1703 """
1704 Close the connection, raise an exception if we were watching.
1706 The supported exceptions are already checked in the
1707 retry object so we don't need to do it here.
1709 After we disconnect the connection, it will try to reconnect and
1710 do a health check as part of the send_command logic(on connection level).
1711 """
1712 conn.disconnect()
1713 # if we were watching a variable, the watch is no longer valid
1714 # since this connection has died. raise a WatchError, which
1715 # indicates the user should retry this transaction.
1716 if self.watching:
1717 raise WatchError(
1718 f"A {type(error).__name__} occurred while watching one or more keys"
1719 )
1721 def execute(self, raise_on_error: bool = True) -> List[Any]:
1722 """Execute all the commands in the current pipeline"""
1723 stack = self.command_stack
1724 if not stack and not self.watching:
1725 return []
1726 if self.scripts:
1727 self.load_scripts()
1728 if self.transaction or self.explicit_transaction:
1729 execute = self._execute_transaction
1730 else:
1731 execute = self._execute_pipeline
1733 conn = self.connection
1734 if not conn:
1735 conn = self.connection_pool.get_connection()
1736 # assign to self.connection so reset() releases the connection
1737 # back to the pool after we're done
1738 self.connection = conn
1740 try:
1741 return conn.retry.call_with_retry(
1742 lambda: execute(conn, stack, raise_on_error),
1743 lambda error: self._disconnect_raise_on_watching(conn, error),
1744 )
1745 finally:
1746 # in reset() the connection is disconnected before returned to the pool if
1747 # it is marked for reconnect.
1748 self.reset()
1750 def discard(self):
1751 """
1752 Flushes all previously queued commands
1753 See: https://redis.io/commands/DISCARD
1754 """
1755 self.execute_command("DISCARD")
1757 def watch(self, *names):
1758 """Watches the values at keys ``names``"""
1759 if self.explicit_transaction:
1760 raise RedisError("Cannot issue a WATCH after a MULTI")
1761 return self.execute_command("WATCH", *names)
1763 def unwatch(self) -> bool:
1764 """Unwatches all previously specified keys"""
1765 return self.watching and self.execute_command("UNWATCH") or True