Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import logging
3import re
4import threading
5import time
6from itertools import chain
7from typing import (
8 TYPE_CHECKING,
9 Any,
10 Callable,
11 Dict,
12 List,
13 Literal,
14 Mapping,
15 Optional,
16 Set,
17 Type,
18 Union,
19)
21from redis._parsers.encoders import Encoder
22from redis._parsers.helpers import (
23 _RedisCallbacks,
24 _RedisCallbacksRESP2,
25 _RedisCallbacksRESP3,
26 bool_ok,
27)
28from redis._parsers.socket import SENTINEL
29from redis.backoff import ExponentialWithJitterBackoff
30from redis.cache import CacheConfig, CacheInterface
31from redis.commands import (
32 CoreCommands,
33 RedisModuleCommands,
34 SentinelCommands,
35 list_or_args,
36)
37from redis.commands.core import Script
38from redis.commands.helpers import partition_pubsub_subscriptions_by_handler
39from redis.connection import (
40 AbstractConnection,
41 Connection,
42 ConnectionPool,
43 SSLConnection,
44 UnixDomainSocketConnection,
45)
46from redis.credentials import CredentialProvider
47from redis.driver_info import DriverInfo, resolve_driver_info
48from redis.event import (
49 AfterPooledConnectionsInstantiationEvent,
50 AfterPubSubConnectionInstantiationEvent,
51 AfterSingleConnectionInstantiationEvent,
52 ClientType,
53 EventDispatcher,
54)
55from redis.exceptions import (
56 ConnectionError,
57 ExecAbortError,
58 PubSubError,
59 RedisError,
60 ResponseError,
61 WatchError,
62)
63from redis.lock import Lock
64from redis.maint_notifications import (
65 MaintNotificationsConfig,
66 OSSMaintNotificationsHandler,
67)
68from redis.observability.attributes import PubSubDirection
69from redis.observability.recorder import (
70 record_error_count,
71 record_operation_duration,
72 record_pubsub_message,
73)
74from redis.retry import Retry
75from redis.utils import (
76 DEFAULT_RESP_VERSION,
77 _set_info_logger,
78 check_protocol_version,
79 deprecated_args,
80 safe_str,
81 str_if_bytes,
82 truncate_text,
83)
85if TYPE_CHECKING:
86 import ssl
88 import OpenSSL
90 from redis.keyspace_notifications import KeyspaceNotifications
92SYM_EMPTY = b""
93EMPTY_RESPONSE = "EMPTY_RESPONSE"
95# some responses (ie. dump) are binary, and just meant to never be decoded
96NEVER_DECODE = "NEVER_DECODE"
99logger = logging.getLogger(__name__)
102def is_debug_log_enabled():
103 return logger.isEnabledFor(logging.DEBUG)
106def add_debug_log_for_operation_failure(connection: "AbstractConnection"):
107 logger.debug(
108 f"Operation failed, "
109 f"with connection: {connection}, details: {connection.extract_connection_details() if connection else 'no connection'}",
110 )
113class CaseInsensitiveDict(dict):
114 "Case insensitive dict implementation. Assumes string keys only."
116 def __init__(self, data: Dict[str, str]) -> None:
117 for k, v in data.items():
118 self[k.upper()] = v
120 def __contains__(self, k):
121 return super().__contains__(k.upper())
123 def __delitem__(self, k):
124 super().__delitem__(k.upper())
126 def __getitem__(self, k):
127 return super().__getitem__(k.upper())
129 def get(self, k, default=None):
130 return super().get(k.upper(), default)
132 def __setitem__(self, k, v):
133 super().__setitem__(k.upper(), v)
135 def update(self, data):
136 data = CaseInsensitiveDict(data)
137 super().update(data)
140class AbstractRedis:
141 pass
144class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
145 """
146 Implementation of the Redis protocol.
148 This abstract class provides a Python interface to all Redis commands
149 and an implementation of the Redis protocol.
151 Pipelines derive from this, implementing how
152 the commands are sent and received to the Redis server. Based on
153 configuration, an instance will either use a ConnectionPool, or
154 Connection object to talk to redis.
156 It is not safe to pass PubSub or Pipeline objects between threads.
157 """
159 # Type discrimination marker for @overload self-type pattern
160 _is_async_client: Literal[False] = False
162 @classmethod
163 def from_url(cls, url: str, **kwargs) -> "Redis":
164 """
165 Return a Redis client object configured from the given URL
167 For example::
169 redis://[[username]:[password]]@localhost:6379/0
170 rediss://[[username]:[password]]@localhost:6379/0
171 unix://[username@]/path/to/socket.sock?db=0[&password=password]
173 Three URL schemes are supported:
175 - `redis://` creates a TCP socket connection. See more at:
176 <https://www.iana.org/assignments/uri-schemes/prov/redis>
177 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
178 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
179 - ``unix://``: creates a Unix Domain Socket connection.
181 The username, password, hostname, path and all querystring values
182 are passed through urllib.parse.unquote in order to replace any
183 percent-encoded values with their corresponding characters.
185 There are several ways to specify a database number. The first value
186 found will be used:
188 1. A ``db`` querystring option, e.g. redis://localhost?db=0
189 2. If using the redis:// or rediss:// schemes, the path argument
190 of the url, e.g. redis://localhost/0
191 3. A ``db`` keyword argument to this function.
193 If none of these options are specified, the default db=0 is used.
195 All querystring options are cast to their appropriate Python types.
196 Boolean arguments can be specified with string values "True"/"False"
197 or "Yes"/"No". Values that cannot be properly cast cause a
198 ``ValueError`` to be raised. Once parsed, the querystring arguments
199 and keyword arguments are passed to the ``ConnectionPool``'s
200 class initializer. In the case of conflicting arguments, querystring
201 arguments always win.
203 """
204 single_connection_client = kwargs.pop("single_connection_client", False)
205 connection_pool = ConnectionPool.from_url(url, **kwargs)
206 client = cls(
207 connection_pool=connection_pool,
208 single_connection_client=single_connection_client,
209 )
210 client.auto_close_connection_pool = True
211 return client
213 @classmethod
214 def from_pool(
215 cls: Type["Redis"],
216 connection_pool: ConnectionPool,
217 ) -> "Redis":
218 """
219 Return a Redis client from the given connection pool.
220 The Redis client will take ownership of the connection pool and
221 close it when the Redis client is closed.
222 """
223 client = cls(
224 connection_pool=connection_pool,
225 )
226 client.auto_close_connection_pool = True
227 return client
229 @deprecated_args(
230 args_to_warn=["retry_on_timeout"],
231 reason="TimeoutError is included by default.",
232 version="6.0.0",
233 )
234 @deprecated_args(
235 args_to_warn=["lib_name", "lib_version"],
236 reason="Use 'driver_info' parameter instead. "
237 "lib_name and lib_version will be removed in a future version.",
238 )
239 def __init__(
240 self,
241 host: str = "localhost",
242 port: int = 6379,
243 db: int = 0,
244 password: Optional[str] = None,
245 socket_timeout: Optional[float] = None,
246 socket_connect_timeout: Optional[float] = None,
247 socket_keepalive: Optional[bool] = None,
248 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
249 connection_pool: Optional[ConnectionPool] = None,
250 unix_socket_path: Optional[str] = None,
251 encoding: str = "utf-8",
252 encoding_errors: str = "strict",
253 decode_responses: bool = False,
254 retry_on_timeout: bool = False,
255 retry: Retry = Retry(
256 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
257 ),
258 retry_on_error: Optional[List[Type[Exception]]] = None,
259 ssl: bool = False,
260 ssl_keyfile: Optional[str] = None,
261 ssl_certfile: Optional[str] = None,
262 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
263 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
264 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
265 ssl_ca_certs: Optional[str] = None,
266 ssl_ca_path: Optional[str] = None,
267 ssl_ca_data: Optional[str] = None,
268 ssl_check_hostname: bool = True,
269 ssl_password: Optional[str] = None,
270 ssl_validate_ocsp: bool = False,
271 ssl_validate_ocsp_stapled: bool = False,
272 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
273 ssl_ocsp_expected_cert: Optional[str] = None,
274 ssl_min_version: Optional["ssl.TLSVersion"] = None,
275 ssl_ciphers: Optional[str] = None,
276 max_connections: Optional[int] = None,
277 single_connection_client: bool = False,
278 health_check_interval: int = 0,
279 client_name: Optional[str] = None,
280 lib_name: Optional[str] = None,
281 lib_version: Optional[str] = None,
282 driver_info: Optional["DriverInfo"] = None,
283 username: Optional[str] = None,
284 redis_connect_func: Optional[Callable[[], None]] = None,
285 credential_provider: Optional[CredentialProvider] = None,
286 protocol: Optional[int] = 3,
287 cache: Optional[CacheInterface] = None,
288 cache_config: Optional[CacheConfig] = None,
289 event_dispatcher: Optional[EventDispatcher] = None,
290 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
291 oss_cluster_maint_notifications_handler: Optional[
292 OSSMaintNotificationsHandler
293 ] = None,
294 ) -> None:
295 """
296 Initialize a new Redis client.
298 To specify a retry policy for specific errors, you have two options:
300 1. Set the `retry_on_error` to a list of the error/s to retry on, and
301 you can also set `retry` to a valid `Retry` object(in case the default
302 one is not appropriate) - with this approach the retries will be triggered
303 on the default errors specified in the Retry object enriched with the
304 errors specified in `retry_on_error`.
306 2. Define a `Retry` object with configured 'supported_errors' and set
307 it to the `retry` parameter - with this approach you completely redefine
308 the errors on which retries will happen.
310 `retry_on_timeout` is deprecated - please include the TimeoutError
311 either in the Retry object or in the `retry_on_error` list.
313 When 'connection_pool' is provided - the retry configuration of the
314 provided pool will be used.
316 Args:
318 single_connection_client:
319 if `True`, connection pool is not used. In that case `Redis`
320 instance use is not thread safe.
321 decode_responses:
322 if `True`, the response will be decoded to utf-8.
323 Argument is ignored when connection_pool is provided.
324 driver_info:
325 Optional DriverInfo object to identify upstream libraries.
326 If provided, lib_name and lib_version are ignored.
327 If not provided, a DriverInfo will be created from lib_name and lib_version.
328 Argument is ignored when connection_pool is provided.
329 lib_name:
330 **Deprecated.** Use driver_info instead. Library name for CLIENT SETINFO.
331 lib_version:
332 **Deprecated.** Use driver_info instead. Library version for CLIENT SETINFO.
333 maint_notifications_config:
334 configures the pool to support maintenance notifications - see
335 `redis.maint_notifications.MaintNotificationsConfig` for details.
336 Only supported with RESP3
337 If not provided and protocol is RESP3, the maintenance notifications
338 will be enabled by default (logic is included in the connection pool
339 initialization).
340 Argument is ignored when connection_pool is provided.
341 oss_cluster_maint_notifications_handler:
342 handler for OSS cluster notifications - see
343 `redis.maint_notifications.OSSMaintNotificationsHandler` for details.
344 Only supported with RESP3
345 Argument is ignored when connection_pool is provided.
346 """
347 if event_dispatcher is None:
348 self._event_dispatcher = EventDispatcher()
349 else:
350 self._event_dispatcher = event_dispatcher
351 if not connection_pool:
352 if not retry_on_error:
353 retry_on_error = []
355 # Handle driver_info: if provided, use it; otherwise create from lib_name/lib_version
356 computed_driver_info = resolve_driver_info(
357 driver_info, lib_name, lib_version
358 )
360 kwargs = {
361 "db": db,
362 "username": username,
363 "password": password,
364 "socket_timeout": socket_timeout,
365 "encoding": encoding,
366 "encoding_errors": encoding_errors,
367 "decode_responses": decode_responses,
368 "retry_on_error": retry_on_error,
369 "retry": copy.deepcopy(retry),
370 "max_connections": max_connections,
371 "health_check_interval": health_check_interval,
372 "client_name": client_name,
373 "driver_info": computed_driver_info,
374 "redis_connect_func": redis_connect_func,
375 "credential_provider": credential_provider,
376 "protocol": protocol,
377 }
378 # based on input, setup appropriate connection args
379 if unix_socket_path is not None:
380 kwargs.update(
381 {
382 "path": unix_socket_path,
383 "connection_class": UnixDomainSocketConnection,
384 }
385 )
386 else:
387 # TCP specific options
388 kwargs.update(
389 {
390 "host": host,
391 "port": port,
392 "socket_connect_timeout": socket_connect_timeout,
393 "socket_keepalive": socket_keepalive,
394 "socket_keepalive_options": socket_keepalive_options,
395 }
396 )
398 if ssl:
399 kwargs.update(
400 {
401 "connection_class": SSLConnection,
402 "ssl_keyfile": ssl_keyfile,
403 "ssl_certfile": ssl_certfile,
404 "ssl_cert_reqs": ssl_cert_reqs,
405 "ssl_include_verify_flags": ssl_include_verify_flags,
406 "ssl_exclude_verify_flags": ssl_exclude_verify_flags,
407 "ssl_ca_certs": ssl_ca_certs,
408 "ssl_ca_data": ssl_ca_data,
409 "ssl_check_hostname": ssl_check_hostname,
410 "ssl_password": ssl_password,
411 "ssl_ca_path": ssl_ca_path,
412 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
413 "ssl_validate_ocsp": ssl_validate_ocsp,
414 "ssl_ocsp_context": ssl_ocsp_context,
415 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
416 "ssl_min_version": ssl_min_version,
417 "ssl_ciphers": ssl_ciphers,
418 }
419 )
420 if (cache_config or cache) and check_protocol_version(protocol, 3):
421 kwargs.update(
422 {
423 "cache": cache,
424 "cache_config": cache_config,
425 }
426 )
427 maint_notifications_enabled = (
428 maint_notifications_config and maint_notifications_config.enabled
429 )
430 if maint_notifications_enabled and protocol not in [
431 3,
432 "3",
433 ]:
434 raise RedisError(
435 "Maintenance notifications handlers on connection are only supported with RESP version 3"
436 )
437 if maint_notifications_config:
438 kwargs.update(
439 {
440 "maint_notifications_config": maint_notifications_config,
441 }
442 )
443 if oss_cluster_maint_notifications_handler:
444 kwargs.update(
445 {
446 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
447 }
448 )
449 connection_pool = ConnectionPool(**kwargs)
450 self._event_dispatcher.dispatch(
451 AfterPooledConnectionsInstantiationEvent(
452 [connection_pool], ClientType.SYNC, credential_provider
453 )
454 )
455 self.auto_close_connection_pool = True
456 else:
457 self.auto_close_connection_pool = False
458 self._event_dispatcher.dispatch(
459 AfterPooledConnectionsInstantiationEvent(
460 [connection_pool], ClientType.SYNC, credential_provider
461 )
462 )
464 self.connection_pool = connection_pool
466 if (cache_config or cache) and self.connection_pool.get_protocol() not in [
467 3,
468 "3",
469 ]:
470 raise RedisError("Client caching is only supported with RESP version 3")
472 self.single_connection_lock = threading.RLock()
473 self.connection = None
474 self._single_connection_client = single_connection_client
475 if self._single_connection_client:
476 self.connection = self.connection_pool.get_connection()
477 self._event_dispatcher.dispatch(
478 AfterSingleConnectionInstantiationEvent(
479 self.connection, ClientType.SYNC, self.single_connection_lock
480 )
481 )
483 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks)
485 if check_protocol_version(
486 self.connection_pool.connection_kwargs.get(
487 "protocol", DEFAULT_RESP_VERSION
488 ),
489 3,
490 ):
491 self.response_callbacks.update(_RedisCallbacksRESP3)
492 else:
493 self.response_callbacks.update(_RedisCallbacksRESP2)
495 def __repr__(self) -> str:
496 return (
497 f"<{type(self).__module__}.{type(self).__name__}"
498 f"({repr(self.connection_pool)})>"
499 )
501 def get_encoder(self) -> "Encoder":
502 """Get the connection pool's encoder"""
503 return self.connection_pool.get_encoder()
505 def get_connection_kwargs(self) -> Dict:
506 """Get the connection's key-word arguments"""
507 return self.connection_pool.connection_kwargs
509 def get_retry(self) -> Optional[Retry]:
510 return self.get_connection_kwargs().get("retry")
512 def set_retry(self, retry: Retry) -> None:
513 self.get_connection_kwargs().update({"retry": retry})
514 self.connection_pool.set_retry(retry)
516 def set_response_callback(self, command: str, callback: Callable) -> None:
517 """Set a custom Response Callback"""
518 self.response_callbacks[command] = callback
520 def load_external_module(self, funcname, func) -> None:
521 """
522 This function can be used to add externally defined redis modules,
523 and their namespaces to the redis client.
525 funcname - A string containing the name of the function to create
526 func - The function, being added to this class.
528 ex: Assume that one has a custom redis module named foomod that
529 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
530 To load function functions into this namespace:
532 from redis import Redis
533 from foomodule import F
534 r = Redis()
535 r.load_external_module("foo", F)
536 r.foo().dothing('your', 'arguments')
538 For a concrete example see the reimport of the redisjson module in
539 tests/test_connection.py::test_loading_external_modules
540 """
541 setattr(self, funcname, func)
543 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
544 """
545 Return a new pipeline object that can queue multiple commands for
546 later execution. ``transaction`` indicates whether all commands
547 should be executed atomically. Apart from making a group of operations
548 atomic, pipelines are useful for reducing the back-and-forth overhead
549 between the client and server.
550 """
551 return Pipeline(
552 self.connection_pool, self.response_callbacks, transaction, shard_hint
553 )
555 def transaction(
556 self, func: Callable[["Pipeline"], None], *watches, **kwargs
557 ) -> Union[List[Any], Any, None]:
558 """
559 Convenience method for executing the callable `func` as a transaction
560 while watching all keys specified in `watches`. The 'func' callable
561 should expect a single argument which is a Pipeline object.
562 """
563 shard_hint = kwargs.pop("shard_hint", None)
564 value_from_callable = kwargs.pop("value_from_callable", False)
565 watch_delay = kwargs.pop("watch_delay", None)
566 with self.pipeline(True, shard_hint) as pipe:
567 while True:
568 try:
569 if watches:
570 pipe.watch(*watches)
571 func_value = func(pipe)
572 exec_value = pipe.execute()
573 return func_value if value_from_callable else exec_value
574 except WatchError:
575 if watch_delay is not None and watch_delay > 0:
576 time.sleep(watch_delay)
577 continue
579 def lock(
580 self,
581 name: str,
582 timeout: Optional[float] = None,
583 sleep: float = 0.1,
584 blocking: bool = True,
585 blocking_timeout: Optional[float] = None,
586 lock_class: Union[None, Any] = None,
587 thread_local: bool = True,
588 raise_on_release_error: bool = True,
589 ):
590 """
591 Return a new Lock object using key ``name`` that mimics
592 the behavior of threading.Lock.
594 If specified, ``timeout`` indicates a maximum life for the lock.
595 By default, it will remain locked until release() is called.
597 ``sleep`` indicates the amount of time to sleep per loop iteration
598 when the lock is in blocking mode and another client is currently
599 holding the lock.
601 ``blocking`` indicates whether calling ``acquire`` should block until
602 the lock has been acquired or to fail immediately, causing ``acquire``
603 to return False and the lock not being acquired. Defaults to True.
604 Note this value can be overridden by passing a ``blocking``
605 argument to ``acquire``.
607 ``blocking_timeout`` indicates the maximum amount of time in seconds to
608 spend trying to acquire the lock. A value of ``None`` indicates
609 continue trying forever. ``blocking_timeout`` can be specified as a
610 float or integer, both representing the number of seconds to wait.
612 ``lock_class`` forces the specified lock implementation. Note that as
613 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
614 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
615 you have created your own custom lock class.
617 ``thread_local`` indicates whether the lock token is placed in
618 thread-local storage. By default, the token is placed in thread local
619 storage so that a thread only sees its token, not a token set by
620 another thread. Consider the following timeline:
622 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
623 thread-1 sets the token to "abc"
624 time: 1, thread-2 blocks trying to acquire `my-lock` using the
625 Lock instance.
626 time: 5, thread-1 has not yet completed. redis expires the lock
627 key.
628 time: 5, thread-2 acquired `my-lock` now that it's available.
629 thread-2 sets the token to "xyz"
630 time: 6, thread-1 finishes its work and calls release(). if the
631 token is *not* stored in thread local storage, then
632 thread-1 would see the token value as "xyz" and would be
633 able to successfully release the thread-2's lock.
635 ``raise_on_release_error`` indicates whether to raise an exception when
636 the lock is no longer owned when exiting the context manager. By default,
637 this is True, meaning an exception will be raised. If False, the warning
638 will be logged and the exception will be suppressed.
640 In some use cases it's necessary to disable thread local storage. For
641 example, if you have code where one thread acquires a lock and passes
642 that lock instance to a worker thread to release later. If thread
643 local storage isn't disabled in this case, the worker thread won't see
644 the token set by the thread that acquired the lock. Our assumption
645 is that these cases aren't common and as such default to using
646 thread local storage."""
647 if lock_class is None:
648 lock_class = Lock
649 return lock_class(
650 self,
651 name,
652 timeout=timeout,
653 sleep=sleep,
654 blocking=blocking,
655 blocking_timeout=blocking_timeout,
656 thread_local=thread_local,
657 raise_on_release_error=raise_on_release_error,
658 )
660 def pubsub(self, **kwargs):
661 """
662 Return a Publish/Subscribe object. With this object, you can
663 subscribe to channels and listen for messages that get published to
664 them.
665 """
666 return PubSub(
667 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs
668 )
670 def keyspace_notifications(
671 self,
672 key_prefix: Union[str, bytes, None] = None,
673 ignore_subscribe_messages: bool = True,
674 ) -> "KeyspaceNotifications":
675 """
676 Return a :class:`~redis.keyspace_notifications.KeyspaceNotifications`
677 object for subscribing to keyspace and keyevent notifications.
679 Note: Keyspace notifications must be enabled on the Redis server via
680 the ``notify-keyspace-events`` configuration option.
682 Args:
683 key_prefix: Optional prefix to filter and strip from keys in
684 notifications.
685 ignore_subscribe_messages: If True, subscribe/unsubscribe
686 confirmations are not returned by
687 get_message/listen.
688 """
689 from redis.keyspace_notifications import KeyspaceNotifications
691 return KeyspaceNotifications(
692 self,
693 key_prefix=key_prefix,
694 ignore_subscribe_messages=ignore_subscribe_messages,
695 )
697 def monitor(self):
698 return Monitor(self.connection_pool)
700 def client(self):
701 return self.__class__(
702 connection_pool=self.connection_pool,
703 single_connection_client=True,
704 )
706 def __enter__(self):
707 return self
709 def __exit__(self, exc_type, exc_value, traceback):
710 self.close()
712 def __del__(self):
713 try:
714 self.close()
715 except Exception:
716 pass
718 def close(self) -> None:
719 # In case a connection property does not yet exist
720 # (due to a crash earlier in the Redis() constructor), return
721 # immediately as there is nothing to clean-up.
722 if not hasattr(self, "connection"):
723 return
725 conn = self.connection
726 if conn:
727 self.connection = None
728 self.connection_pool.release(conn)
730 if self.auto_close_connection_pool:
731 self.connection_pool.disconnect()
733 def _send_command_parse_response(self, conn, command_name, *args, **options):
734 """
735 Send a command and parse the response
736 """
737 conn.send_command(*args, **options)
738 return self.parse_response(conn, command_name, **options)
740 def _close_connection(
741 self,
742 conn,
743 error: Optional[Exception] = None,
744 failure_count: Optional[int] = None,
745 start_time: Optional[float] = None,
746 command_name: Optional[str] = None,
747 ) -> None:
748 """
749 Close the connection before retrying.
751 The supported exceptions are already checked in the
752 retry object so we don't need to do it here.
754 After we disconnect the connection, it will try to reconnect and
755 do a health check as part of the send_command logic(on connection level).
756 """
757 if error and failure_count <= conn.retry.get_retries():
758 record_operation_duration(
759 command_name=command_name,
760 duration_seconds=time.monotonic() - start_time,
761 server_address=getattr(conn, "host", None),
762 server_port=getattr(conn, "port", None),
763 db_namespace=str(conn.db),
764 error=error,
765 retry_attempts=failure_count,
766 )
768 conn.disconnect()
770 # COMMAND EXECUTION AND PROTOCOL PARSING
771 def execute_command(self, *args, **options):
772 return self._execute_command(*args, **options)
774 def _execute_command(self, *args, **options):
775 """Execute a command and return a parsed response"""
776 pool = self.connection_pool
777 command_name = args[0]
778 conn = self.connection or pool.get_connection()
780 # Start timing for observability
781 start_time = time.monotonic()
782 # Track actual retry attempts for error reporting
783 actual_retry_attempts = [0]
785 def failure_callback(error, failure_count):
786 if is_debug_log_enabled():
787 add_debug_log_for_operation_failure(conn)
788 actual_retry_attempts[0] = failure_count
789 self._close_connection(conn, error, failure_count, start_time, command_name)
791 if self._single_connection_client:
792 self.single_connection_lock.acquire()
793 try:
794 result = conn.retry.call_with_retry(
795 lambda: self._send_command_parse_response(
796 conn, command_name, *args, **options
797 ),
798 failure_callback,
799 with_failure_count=True,
800 )
802 record_operation_duration(
803 command_name=command_name,
804 duration_seconds=time.monotonic() - start_time,
805 server_address=getattr(conn, "host", None),
806 server_port=getattr(conn, "port", None),
807 db_namespace=str(conn.db),
808 )
809 return result
810 except Exception as e:
811 record_error_count(
812 server_address=getattr(conn, "host", None),
813 server_port=getattr(conn, "port", None),
814 network_peer_address=getattr(conn, "host", None),
815 network_peer_port=getattr(conn, "port", None),
816 error_type=e,
817 retry_attempts=actual_retry_attempts[0],
818 is_internal=False,
819 )
820 raise
822 finally:
823 if conn and conn.should_reconnect():
824 self._close_connection(conn)
825 conn.connect()
826 if self._single_connection_client:
827 self.single_connection_lock.release()
828 if not self.connection:
829 pool.release(conn)
831 def parse_response(self, connection, command_name, **options):
832 """Parses a response from the Redis server"""
833 try:
834 if NEVER_DECODE in options:
835 response = connection.read_response(disable_decoding=True)
836 options.pop(NEVER_DECODE)
837 else:
838 response = connection.read_response()
839 except ResponseError:
840 if EMPTY_RESPONSE in options:
841 return options[EMPTY_RESPONSE]
842 raise
844 if EMPTY_RESPONSE in options:
845 options.pop(EMPTY_RESPONSE)
847 # Remove keys entry, it needs only for cache.
848 options.pop("keys", None)
850 if command_name in self.response_callbacks:
851 return self.response_callbacks[command_name](response, **options)
852 return response
854 def get_cache(self) -> Optional[CacheInterface]:
855 return self.connection_pool.cache
858StrictRedis = Redis
861class Monitor:
862 """
863 Monitor is useful for handling the MONITOR command to the redis server.
864 next_command() method returns one command from monitor
865 listen() method yields commands from monitor.
866 """
868 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)")
869 command_re = re.compile(r'"(.*?)(?<!\\)"')
871 def __init__(self, connection_pool):
872 self.connection_pool = connection_pool
873 self.connection = self.connection_pool.get_connection()
875 def __enter__(self):
876 self._start_monitor()
877 return self
879 def __exit__(self, *args):
880 self.connection.disconnect()
881 self.connection_pool.release(self.connection)
883 def next_command(self):
884 """Parse the response from a monitor command"""
885 response = self.connection.read_response()
887 if response is None:
888 return None
890 if isinstance(response, bytes):
891 response = self.connection.encoder.decode(response, force=True)
893 command_time, command_data = response.split(" ", 1)
894 m = self.monitor_re.match(command_data)
895 db_id, client_info, command = m.groups()
896 command = " ".join(self.command_re.findall(command))
897 # Redis escapes double quotes because each piece of the command
898 # string is surrounded by double quotes. We don't have that
899 # requirement so remove the escaping and leave the quote.
900 command = command.replace('\\"', '"')
902 if client_info == "lua":
903 client_address = "lua"
904 client_port = ""
905 client_type = "lua"
906 elif client_info.startswith("unix"):
907 client_address = "unix"
908 client_port = client_info[5:]
909 client_type = "unix"
910 else:
911 if client_info == "":
912 client_address = ""
913 client_port = ""
914 client_type = "unknown"
915 else:
916 # use rsplit as ipv6 addresses contain colons
917 client_address, client_port = client_info.rsplit(":", 1)
918 client_type = "tcp"
919 return {
920 "time": float(command_time),
921 "db": int(db_id),
922 "client_address": client_address,
923 "client_port": client_port,
924 "client_type": client_type,
925 "command": command,
926 }
928 def listen(self):
929 """Listen for commands coming to the server."""
930 while True:
931 yield self.next_command()
933 def _start_monitor(self):
934 self.connection.send_command("MONITOR")
935 # check that monitor returns 'OK', but don't return it to user
936 response = self.connection.read_response()
938 if not bool_ok(response):
939 raise RedisError(f"MONITOR failed: {response}")
942class PubSub:
943 """
944 PubSub provides publish, subscribe and listen support to Redis channels.
946 After subscribing to one or more channels, the listen() method will block
947 until a message arrives on one of the subscribed channels. That message
948 will be returned and it's safe to start listening again.
949 """
951 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage")
952 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe")
953 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
955 def __init__(
956 self,
957 connection_pool,
958 shard_hint=None,
959 ignore_subscribe_messages: bool = False,
960 encoder: Optional["Encoder"] = None,
961 push_handler_func: Union[None, Callable[[str], None]] = None,
962 event_dispatcher: Optional["EventDispatcher"] = None,
963 ):
964 self.connection_pool = connection_pool
965 self.shard_hint = shard_hint
966 self.ignore_subscribe_messages = ignore_subscribe_messages
967 self.connection = None
968 self.subscribed_event = threading.Event()
969 # we need to know the encoding options for this connection in order
970 # to lookup channel and pattern names for callback handlers.
971 self.encoder = encoder
972 self.push_handler_func = push_handler_func
973 if event_dispatcher is None:
974 self._event_dispatcher = EventDispatcher()
975 else:
976 self._event_dispatcher = event_dispatcher
978 self._lock = threading.RLock()
979 if self.encoder is None:
980 self.encoder = self.connection_pool.get_encoder()
981 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
982 if self.encoder.decode_responses:
983 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
984 else:
985 self.health_check_response = [b"pong", self.health_check_response_b]
986 if self.push_handler_func is None:
987 _set_info_logger()
988 self.reset()
990 def __enter__(self) -> "PubSub":
991 return self
993 def __exit__(self, exc_type, exc_value, traceback) -> None:
994 self.reset()
996 def __del__(self) -> None:
997 try:
998 # if this object went out of scope prior to shutting down
999 # subscriptions, close the connection manually before
1000 # returning it to the connection pool
1001 self.reset()
1002 except Exception:
1003 pass
1005 def reset(self) -> None:
1006 if self.connection:
1007 self.connection.disconnect()
1008 self.connection.deregister_connect_callback(self.on_connect)
1009 self.connection_pool.release(self.connection)
1010 self.connection = None
1011 self.health_check_response_counter = 0
1012 self.channels = {}
1013 self.pending_unsubscribe_channels = set()
1014 self.shard_channels = {}
1015 self.pending_unsubscribe_shard_channels = set()
1016 self.patterns = {}
1017 self.pending_unsubscribe_patterns = set()
1018 self.subscribed_event.clear()
1020 def close(self) -> None:
1021 self.reset()
1023 def _resubscribe(self, subscribed, subscribe_fn) -> None:
1024 subscriptions_without_handlers, subscriptions_with_handlers = (
1025 partition_pubsub_subscriptions_by_handler(subscribed, self.encoder)
1026 )
1027 if subscriptions_without_handlers or subscriptions_with_handlers:
1028 subscribe_fn(*subscriptions_without_handlers, **subscriptions_with_handlers)
1030 def _resubscribe_shard_channels(self) -> None:
1031 self._resubscribe(self.shard_channels, self.ssubscribe)
1033 def on_connect(self, connection) -> None:
1034 "Re-subscribe to any channels and patterns previously subscribed to"
1035 self.pending_unsubscribe_channels.clear()
1036 self.pending_unsubscribe_patterns.clear()
1037 self.pending_unsubscribe_shard_channels.clear()
1038 if self.channels:
1039 self._resubscribe(self.channels, self.subscribe)
1040 if self.patterns:
1041 self._resubscribe(self.patterns, self.psubscribe)
1042 if self.shard_channels:
1043 self._resubscribe_shard_channels()
1045 @property
1046 def subscribed(self) -> bool:
1047 """Indicates if there are subscriptions to any channels or patterns"""
1048 return self.subscribed_event.is_set()
1050 def execute_command(self, *args):
1051 """Execute a publish/subscribe command"""
1053 # NOTE: don't parse the response in this function -- it could pull a
1054 # legitimate message off the stack if the connection is already
1055 # subscribed to one or more channels
1057 if self.connection is None:
1058 self.connection = self.connection_pool.get_connection()
1059 # register a callback that re-subscribes to any channels we
1060 # were listening to when we were disconnected
1061 self.connection.register_connect_callback(self.on_connect)
1062 if self.push_handler_func is not None:
1063 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
1064 self._event_dispatcher.dispatch(
1065 AfterPubSubConnectionInstantiationEvent(
1066 self.connection, self.connection_pool, ClientType.SYNC, self._lock
1067 )
1068 )
1069 connection = self.connection
1070 kwargs = {"check_health": not self.subscribed}
1071 if not self.subscribed:
1072 self.clean_health_check_responses()
1073 with self._lock:
1074 self._execute(connection, connection.send_command, *args, **kwargs)
1076 def clean_health_check_responses(self) -> None:
1077 """
1078 If any health check responses are present, clean them
1079 """
1080 ttl = 10
1081 conn = self.connection
1082 while conn and self.health_check_response_counter > 0 and ttl > 0:
1083 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
1084 response = self._execute(conn, conn.read_response)
1085 if self.is_health_check_response(response):
1086 self.health_check_response_counter -= 1
1087 else:
1088 raise PubSubError(
1089 "A non health check response was cleaned by "
1090 "execute_command: {}".format(response)
1091 )
1092 ttl -= 1
1094 def _reconnect(
1095 self,
1096 conn,
1097 error: Optional[Exception] = None,
1098 failure_count: Optional[int] = None,
1099 start_time: Optional[float] = None,
1100 command_name: Optional[str] = None,
1101 ) -> None:
1102 """
1103 The supported exceptions are already checked in the
1104 retry object so we don't need to do it here.
1106 In this error handler we are trying to reconnect to the server.
1107 """
1108 if error and failure_count <= conn.retry.get_retries():
1109 if command_name:
1110 record_operation_duration(
1111 command_name=command_name,
1112 duration_seconds=time.monotonic() - start_time,
1113 server_address=getattr(conn, "host", None),
1114 server_port=getattr(conn, "port", None),
1115 db_namespace=str(conn.db),
1116 error=error,
1117 retry_attempts=failure_count,
1118 )
1119 conn.disconnect()
1120 conn.connect()
1122 def _execute(self, conn, command, *args, **kwargs):
1123 """
1124 Connect manually upon disconnection. If the Redis server is down,
1125 this will fail and raise a ConnectionError as desired.
1126 After reconnection, the ``on_connect`` callback should have been
1127 called by the # connection to resubscribe us to any channels and
1128 patterns we were previously listening to
1129 """
1131 if conn.should_reconnect():
1132 self._reconnect(conn)
1134 if not len(args) == 0:
1135 command_name = args[0]
1136 else:
1137 command_name = None
1139 # Start timing for observability
1140 start_time = time.monotonic()
1141 # Track actual retry attempts for error reporting
1142 actual_retry_attempts = [0]
1144 def failure_callback(error, failure_count):
1145 actual_retry_attempts[0] = failure_count
1146 self._reconnect(conn, error, failure_count, start_time, command_name)
1148 try:
1149 response = conn.retry.call_with_retry(
1150 lambda: command(*args, **kwargs),
1151 failure_callback,
1152 with_failure_count=True,
1153 )
1155 if command_name:
1156 record_operation_duration(
1157 command_name=command_name,
1158 duration_seconds=time.monotonic() - start_time,
1159 server_address=getattr(conn, "host", None),
1160 server_port=getattr(conn, "port", None),
1161 db_namespace=str(conn.db),
1162 )
1164 return response
1165 except Exception as e:
1166 record_error_count(
1167 server_address=getattr(conn, "host", None),
1168 server_port=getattr(conn, "port", None),
1169 network_peer_address=getattr(conn, "host", None),
1170 network_peer_port=getattr(conn, "port", None),
1171 error_type=e,
1172 retry_attempts=actual_retry_attempts[0],
1173 is_internal=False,
1174 )
1175 raise
1177 def parse_response(self, block=True, timeout=0):
1178 """
1179 Parse the response from a publish/subscribe command.
1181 Args:
1182 block: If True, block indefinitely until a message is available.
1183 If False, return immediately if no message is available.
1184 Default: True
1185 timeout: The timeout in seconds for reading a response when block=False.
1186 This parameter is ignored when block=True.
1187 Default: 0 (return immediately if no data available)
1189 Returns:
1190 The parsed response from the server, or None if no message is available
1191 within the timeout period (when block=False).
1193 Important:
1194 The block and timeout parameters work together:
1195 - When block=True: timeout is IGNORED, method blocks indefinitely
1196 - When block=False: timeout is USED, method returns after timeout expires
1198 Typically, you should use get_message(timeout=X) instead of calling
1199 parse_response() directly. The get_message() method automatically sets
1200 block=False when a timeout is provided, and block=True when timeout=None.
1202 Example:
1203 # Block indefinitely (timeout is ignored)
1204 response = pubsub.parse_response(block=True, timeout=0.1)
1206 # Non-blocking with 0.1 second timeout
1207 response = pubsub.parse_response(block=False, timeout=0.1)
1209 # Non-blocking, return immediately
1210 response = pubsub.parse_response(block=False, timeout=0)
1212 # Recommended: use get_message() instead
1213 msg = pubsub.get_message(timeout=0.1) # automatically sets block=False
1214 msg = pubsub.get_message(timeout=None) # automatically sets block=True
1215 """
1216 conn = self.connection
1217 if conn is None:
1218 raise RuntimeError(
1219 "pubsub connection not set: "
1220 "did you forget to call subscribe() or psubscribe()?"
1221 )
1223 self.check_health()
1225 def try_read():
1226 if not block:
1227 if not conn.can_read(timeout=timeout):
1228 return None
1229 read_timeout = timeout
1230 else:
1231 conn.connect()
1232 read_timeout = SENTINEL # Use default socket timeout for blocking
1233 return conn.read_response(
1234 disconnect_on_error=False, push_request=True, timeout=read_timeout
1235 )
1237 response = self._execute(conn, try_read)
1239 if self.is_health_check_response(response):
1240 # ignore the health check message as user might not expect it
1241 self.health_check_response_counter -= 1
1242 return None
1243 return response
1245 def is_health_check_response(self, response) -> bool:
1246 """
1247 Check if the response is a health check response.
1248 If there are no subscriptions redis responds to PING command with a
1249 bulk response, instead of a multi-bulk with "pong" and the response.
1250 """
1251 if self.encoder.decode_responses:
1252 return (
1253 response
1254 in [
1255 self.health_check_response, # If there is a subscription
1256 self.HEALTH_CHECK_MESSAGE, # If there are no subscriptions and decode_responses=True
1257 ]
1258 )
1259 else:
1260 return (
1261 response
1262 in [
1263 self.health_check_response, # If there is a subscription
1264 self.health_check_response_b, # If there isn't a subscription and decode_responses=False
1265 ]
1266 )
1268 def check_health(self) -> None:
1269 conn = self.connection
1270 if conn is None:
1271 raise RuntimeError(
1272 "pubsub connection not set: "
1273 "did you forget to call subscribe() or psubscribe()?"
1274 )
1276 if conn.health_check_interval and time.monotonic() > conn.next_health_check:
1277 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
1278 self.health_check_response_counter += 1
1280 def _normalize_keys(self, data) -> Dict:
1281 """
1282 normalize channel/pattern names to be either bytes or strings
1283 based on whether responses are automatically decoded. this saves us
1284 from coercing the value for each message coming in.
1285 """
1286 encode = self.encoder.encode
1287 decode = self.encoder.decode
1288 return {decode(encode(k)): v for k, v in data.items()}
1290 def psubscribe(self, *args, **kwargs):
1291 """
1292 Subscribe to channel patterns. Patterns supplied as keyword arguments
1293 expect a pattern name as the key and a callable as the value. A
1294 pattern's callable will be invoked automatically when a message is
1295 received on that pattern rather than producing a message via
1296 ``listen()``.
1297 """
1298 if args:
1299 args = list_or_args(args[0], args[1:])
1300 new_patterns = dict.fromkeys(args)
1301 new_patterns.update(kwargs)
1302 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
1303 # update the patterns dict AFTER we send the command. we don't want to
1304 # subscribe twice to these patterns, once for the command and again
1305 # for the reconnection.
1306 new_patterns = self._normalize_keys(new_patterns)
1307 self.patterns.update(new_patterns)
1308 if not self.subscribed:
1309 # Set the subscribed_event flag to True
1310 self.subscribed_event.set()
1311 # Clear the health check counter
1312 self.health_check_response_counter = 0
1313 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1314 return ret_val
1316 def punsubscribe(self, *args):
1317 """
1318 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1319 all patterns.
1320 """
1321 if args:
1322 args = list_or_args(args[0], args[1:])
1323 patterns = self._normalize_keys(dict.fromkeys(args))
1324 else:
1325 patterns = self.patterns
1326 self.pending_unsubscribe_patterns.update(patterns)
1327 return self.execute_command("PUNSUBSCRIBE", *args)
1329 def subscribe(self, *args, **kwargs):
1330 """
1331 Subscribe to channels. Channels supplied as keyword arguments expect
1332 a channel name as the key and a callable as the value. A channel's
1333 callable will be invoked automatically when a message is received on
1334 that channel rather than producing a message via ``listen()`` or
1335 ``get_message()``.
1336 """
1337 if args:
1338 args = list_or_args(args[0], args[1:])
1339 new_channels = dict.fromkeys(args)
1340 new_channels.update(kwargs)
1341 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1342 # update the channels dict AFTER we send the command. we don't want to
1343 # subscribe twice to these channels, once for the command and again
1344 # for the reconnection.
1345 new_channels = self._normalize_keys(new_channels)
1346 self.channels.update(new_channels)
1347 if not self.subscribed:
1348 # Set the subscribed_event flag to True
1349 self.subscribed_event.set()
1350 # Clear the health check counter
1351 self.health_check_response_counter = 0
1352 self.pending_unsubscribe_channels.difference_update(new_channels)
1353 return ret_val
1355 def unsubscribe(self, *args):
1356 """
1357 Unsubscribe from the supplied channels. If empty, unsubscribe from
1358 all channels
1359 """
1360 if args:
1361 args = list_or_args(args[0], args[1:])
1362 channels = self._normalize_keys(dict.fromkeys(args))
1363 else:
1364 channels = self.channels
1365 self.pending_unsubscribe_channels.update(channels)
1366 return self.execute_command("UNSUBSCRIBE", *args)
1368 def ssubscribe(self, *args, target_node=None, **kwargs):
1369 """
1370 Subscribes the client to the specified shard channels.
1371 Channels supplied as keyword arguments expect a channel name as the key
1372 and a callable as the value. A channel's callable will be invoked automatically
1373 when a message is received on that channel rather than producing a message via
1374 ``listen()`` or ``get_sharded_message()``.
1375 """
1376 if args:
1377 args = list_or_args(args[0], args[1:])
1378 new_s_channels = dict.fromkeys(args)
1379 new_s_channels.update(kwargs)
1380 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys())
1381 # update the s_channels dict AFTER we send the command. we don't want to
1382 # subscribe twice to these channels, once for the command and again
1383 # for the reconnection.
1384 new_s_channels = self._normalize_keys(new_s_channels)
1385 self.shard_channels.update(new_s_channels)
1386 if not self.subscribed:
1387 # Set the subscribed_event flag to True
1388 self.subscribed_event.set()
1389 # Clear the health check counter
1390 self.health_check_response_counter = 0
1391 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels)
1392 return ret_val
1394 def sunsubscribe(self, *args, target_node=None):
1395 """
1396 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
1397 all shard_channels
1398 """
1399 if args:
1400 args = list_or_args(args[0], args[1:])
1401 s_channels = self._normalize_keys(dict.fromkeys(args))
1402 else:
1403 s_channels = self.shard_channels
1404 self.pending_unsubscribe_shard_channels.update(s_channels)
1405 return self.execute_command("SUNSUBSCRIBE", *args)
1407 def listen(self):
1408 "Listen for messages on channels this client has been subscribed to"
1409 while self.subscribed:
1410 response = self.handle_message(self.parse_response(block=True))
1411 if response is not None:
1412 yield response
1414 def get_message(
1415 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
1416 ):
1417 """
1418 Get the next message if one is available, otherwise None.
1420 If timeout is specified, the system will wait for `timeout` seconds
1421 before returning. Timeout should be specified as a floating point
1422 number, or None, to wait indefinitely.
1423 """
1424 if not self.subscribed:
1425 # Wait for subscription
1426 start_time = time.monotonic()
1427 if self.subscribed_event.wait(timeout) is True:
1428 # The connection was subscribed during the timeout time frame.
1429 # The timeout should be adjusted based on the time spent
1430 # waiting for the subscription
1431 time_spent = time.monotonic() - start_time
1432 timeout = max(0.0, timeout - time_spent)
1433 else:
1434 # The connection isn't subscribed to any channels or patterns,
1435 # so no messages are available
1436 return None
1438 response = self.parse_response(block=(timeout is None), timeout=timeout)
1440 if response:
1441 return self.handle_message(response, ignore_subscribe_messages)
1442 return None
1444 get_sharded_message = get_message
1446 def ping(self, message: Union[str, None] = None) -> bool:
1447 """
1448 Ping the Redis server to test connectivity.
1450 Sends a PING command to the Redis server and returns True if the server
1451 responds with "PONG".
1452 """
1453 args = ["PING", message] if message is not None else ["PING"]
1454 return self.execute_command(*args)
1456 def handle_message(self, response, ignore_subscribe_messages=False):
1457 """
1458 Parses a pub/sub message. If the channel or pattern was subscribed to
1459 with a message handler, the handler is invoked instead of a parsed
1460 message being returned.
1461 """
1462 if response is None:
1463 return None
1464 if isinstance(response, bytes):
1465 response = [b"pong", response] if response != b"PONG" else [b"pong", b""]
1467 message_type = str_if_bytes(response[0])
1468 if message_type == "pmessage":
1469 message = {
1470 "type": message_type,
1471 "pattern": response[1],
1472 "channel": response[2],
1473 "data": response[3],
1474 }
1475 elif message_type == "pong":
1476 message = {
1477 "type": message_type,
1478 "pattern": None,
1479 "channel": None,
1480 "data": response[1],
1481 }
1482 else:
1483 message = {
1484 "type": message_type,
1485 "pattern": None,
1486 "channel": response[1],
1487 "data": response[2],
1488 }
1490 if message_type in ["message", "pmessage"]:
1491 channel = str_if_bytes(message["channel"])
1492 record_pubsub_message(
1493 direction=PubSubDirection.RECEIVE,
1494 channel=channel,
1495 )
1496 elif message_type == "smessage":
1497 channel = str_if_bytes(message["channel"])
1498 record_pubsub_message(
1499 direction=PubSubDirection.RECEIVE,
1500 channel=channel,
1501 sharded=True,
1502 )
1504 # if this is an unsubscribe message, remove it from memory
1505 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1506 if message_type == "punsubscribe":
1507 pattern = response[1]
1508 if pattern in self.pending_unsubscribe_patterns:
1509 self.pending_unsubscribe_patterns.remove(pattern)
1510 self.patterns.pop(pattern, None)
1511 elif message_type == "sunsubscribe":
1512 s_channel = response[1]
1513 if s_channel in self.pending_unsubscribe_shard_channels:
1514 self.pending_unsubscribe_shard_channels.remove(s_channel)
1515 self.shard_channels.pop(s_channel, None)
1516 else:
1517 channel = response[1]
1518 if channel in self.pending_unsubscribe_channels:
1519 self.pending_unsubscribe_channels.remove(channel)
1520 self.channels.pop(channel, None)
1521 if not self.channels and not self.patterns and not self.shard_channels:
1522 # There are no subscriptions anymore, set subscribed_event flag
1523 # to false
1524 self.subscribed_event.clear()
1526 if message_type in self.PUBLISH_MESSAGE_TYPES:
1527 # if there's a message handler, invoke it
1528 if message_type == "pmessage":
1529 handler = self.patterns.get(message["pattern"], None)
1530 elif message_type == "smessage":
1531 handler = self.shard_channels.get(message["channel"], None)
1532 else:
1533 handler = self.channels.get(message["channel"], None)
1534 if handler:
1535 handler(message)
1536 return None
1537 elif message_type != "pong":
1538 # this is a subscribe/unsubscribe message. ignore if we don't
1539 # want them
1540 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1541 return None
1543 return message
1545 def run_in_thread(
1546 self,
1547 sleep_time: float = 0.0,
1548 daemon: bool = False,
1549 exception_handler: Optional[Callable] = None,
1550 pubsub=None,
1551 sharded_pubsub: bool = False,
1552 ) -> "PubSubWorkerThread":
1553 for channel, handler in self.channels.items():
1554 if handler is None:
1555 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1556 for pattern, handler in self.patterns.items():
1557 if handler is None:
1558 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1559 for s_channel, handler in self.shard_channels.items():
1560 if handler is None:
1561 raise PubSubError(
1562 f"Shard Channel: '{s_channel}' has no handler registered"
1563 )
1565 pubsub = self if pubsub is None else pubsub
1566 thread = PubSubWorkerThread(
1567 pubsub,
1568 sleep_time,
1569 daemon=daemon,
1570 exception_handler=exception_handler,
1571 sharded_pubsub=sharded_pubsub,
1572 )
1573 thread.start()
1574 return thread
1577class PubSubWorkerThread(threading.Thread):
1578 def __init__(
1579 self,
1580 pubsub,
1581 sleep_time: float,
1582 daemon: bool = False,
1583 exception_handler: Union[
1584 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
1585 ] = None,
1586 sharded_pubsub: bool = False,
1587 ):
1588 super().__init__()
1589 self.daemon = daemon
1590 self.pubsub = pubsub
1591 self.sleep_time = sleep_time
1592 self.exception_handler = exception_handler
1593 self.sharded_pubsub = sharded_pubsub
1594 self._running = threading.Event()
1596 def run(self) -> None:
1597 if self._running.is_set():
1598 return
1599 self._running.set()
1600 pubsub = self.pubsub
1601 sleep_time = self.sleep_time
1602 while self._running.is_set():
1603 try:
1604 if not self.sharded_pubsub:
1605 pubsub.get_message(
1606 ignore_subscribe_messages=True, timeout=sleep_time
1607 )
1608 else:
1609 pubsub.get_sharded_message(
1610 ignore_subscribe_messages=True, timeout=sleep_time
1611 )
1612 except BaseException as e:
1613 if self.exception_handler is None:
1614 raise
1615 self.exception_handler(e, pubsub, self)
1616 pubsub.close()
1618 def stop(self) -> None:
1619 # trip the flag so the run loop exits. the run loop will
1620 # close the pubsub connection, which disconnects the socket
1621 # and returns the connection to the pool.
1622 self._running.clear()
1625class Pipeline(Redis):
1626 """
1627 Pipelines provide a way to transmit multiple commands to the Redis server
1628 in one transmission. This is convenient for batch processing, such as
1629 saving all the values in a list to Redis.
1631 All commands executed within a pipeline(when running in transactional mode,
1632 which is the default behavior) are wrapped with MULTI and EXEC
1633 calls. This guarantees all commands executed in the pipeline will be
1634 executed atomically.
1636 Any command raising an exception does *not* halt the execution of
1637 subsequent commands in the pipeline. Instead, the exception is caught
1638 and its instance is placed into the response list returned by execute().
1639 Code iterating over the response list should be able to deal with an
1640 instance of an exception as a potential value. In general, these will be
1641 ResponseError exceptions, such as those raised when issuing a command
1642 on a key of a different datatype.
1643 """
1645 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1647 def __init__(
1648 self,
1649 connection_pool: ConnectionPool,
1650 response_callbacks,
1651 transaction,
1652 shard_hint,
1653 ):
1654 self.connection_pool = connection_pool
1655 self.connection: Optional[Connection] = None
1656 self.response_callbacks = response_callbacks
1657 self.transaction = transaction
1658 self.shard_hint = shard_hint
1659 self.watching = False
1660 self.command_stack = []
1661 self.scripts: Set[Script] = set()
1662 self.explicit_transaction = False
1664 def __enter__(self) -> "Pipeline":
1665 return self
1667 def __exit__(self, exc_type, exc_value, traceback):
1668 self.reset()
1670 def __del__(self):
1671 try:
1672 self.reset()
1673 except Exception:
1674 pass
1676 def __len__(self) -> int:
1677 return len(self.command_stack)
1679 def __bool__(self) -> bool:
1680 """Pipeline instances should always evaluate to True"""
1681 return True
1683 def reset(self) -> None:
1684 self.command_stack = []
1685 self.scripts = set()
1686 # make sure to reset the connection state in the event that we were
1687 # watching something
1688 if self.watching and self.connection:
1689 try:
1690 # call this manually since our unwatch or
1691 # immediate_execute_command methods can call reset()
1692 self.connection.send_command("UNWATCH")
1693 self.connection.read_response()
1694 except ConnectionError:
1695 # disconnect will also remove any previous WATCHes
1696 self.connection.disconnect()
1697 # clean up the other instance attributes
1698 self.watching = False
1699 self.explicit_transaction = False
1701 # we can safely return the connection to the pool here since we're
1702 # sure we're no longer WATCHing anything
1703 if self.connection:
1704 self.connection_pool.release(self.connection)
1705 self.connection = None
1707 def close(self) -> None:
1708 """Close the pipeline"""
1709 self.reset()
1711 def multi(self) -> None:
1712 """
1713 Start a transactional block of the pipeline after WATCH commands
1714 are issued. End the transactional block with `execute`.
1715 """
1716 if self.explicit_transaction:
1717 raise RedisError("Cannot issue nested calls to MULTI")
1718 if self.command_stack:
1719 raise RedisError(
1720 "Commands without an initial WATCH have already been issued"
1721 )
1722 self.explicit_transaction = True
1724 def execute_command(self, *args, **kwargs):
1725 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1726 return self.immediate_execute_command(*args, **kwargs)
1727 return self.pipeline_execute_command(*args, **kwargs)
1729 def _disconnect_reset_raise_on_watching(
1730 self,
1731 conn: AbstractConnection,
1732 error: Exception,
1733 failure_count: Optional[int] = None,
1734 start_time: Optional[float] = None,
1735 command_name: Optional[str] = None,
1736 ) -> None:
1737 """
1738 Close the connection reset watching state and
1739 raise an exception if we were watching.
1741 The supported exceptions are already checked in the
1742 retry object so we don't need to do it here.
1744 After we disconnect the connection, it will try to reconnect and
1745 do a health check as part of the send_command logic(on connection level).
1746 """
1747 if error and failure_count <= conn.retry.get_retries():
1748 record_operation_duration(
1749 command_name=command_name,
1750 duration_seconds=time.monotonic() - start_time,
1751 server_address=getattr(conn, "host", None),
1752 server_port=getattr(conn, "port", None),
1753 db_namespace=str(conn.db),
1754 error=error,
1755 retry_attempts=failure_count,
1756 )
1757 conn.disconnect()
1759 # if we were already watching a variable, the watch is no longer
1760 # valid since this connection has died. raise a WatchError, which
1761 # indicates the user should retry this transaction.
1762 if self.watching:
1763 self.reset()
1764 raise WatchError(
1765 f"A {type(error).__name__} occurred while watching one or more keys"
1766 )
1768 def immediate_execute_command(self, *args, **options):
1769 """
1770 Execute a command immediately, but don't auto-retry on the supported
1771 errors for retry if we're already WATCHing a variable.
1772 Used when issuing WATCH or subsequent commands retrieving their values but before
1773 MULTI is called.
1774 """
1775 command_name = args[0]
1776 conn = self.connection
1777 # if this is the first call, we need a connection
1778 if not conn:
1779 conn = self.connection_pool.get_connection()
1780 self.connection = conn
1782 # Start timing for observability
1783 start_time = time.monotonic()
1784 # Track actual retry attempts for error reporting
1785 actual_retry_attempts = [0]
1787 def failure_callback(error, failure_count):
1788 if is_debug_log_enabled():
1789 add_debug_log_for_operation_failure(conn)
1790 actual_retry_attempts[0] = failure_count
1791 self._disconnect_reset_raise_on_watching(
1792 conn, error, failure_count, start_time, command_name
1793 )
1795 try:
1796 response = conn.retry.call_with_retry(
1797 lambda: self._send_command_parse_response(
1798 conn, command_name, *args, **options
1799 ),
1800 failure_callback,
1801 with_failure_count=True,
1802 )
1804 record_operation_duration(
1805 command_name=command_name,
1806 duration_seconds=time.monotonic() - start_time,
1807 server_address=getattr(conn, "host", None),
1808 server_port=getattr(conn, "port", None),
1809 db_namespace=str(conn.db),
1810 )
1812 return response
1813 except Exception as e:
1814 record_error_count(
1815 server_address=getattr(conn, "host", None),
1816 server_port=getattr(conn, "port", None),
1817 network_peer_address=getattr(conn, "host", None),
1818 network_peer_port=getattr(conn, "port", None),
1819 error_type=e,
1820 retry_attempts=actual_retry_attempts[0],
1821 is_internal=False,
1822 )
1823 raise
1825 def pipeline_execute_command(self, *args, **options) -> "Pipeline":
1826 """
1827 Stage a command to be executed when execute() is next called
1829 Returns the current Pipeline object back so commands can be
1830 chained together, such as:
1832 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1834 At some other point, you can then run: pipe.execute(),
1835 which will execute all commands queued in the pipe.
1836 """
1837 self.command_stack.append((args, options))
1838 return self
1840 def _execute_transaction(
1841 self, connection: Connection, commands, raise_on_error
1842 ) -> List:
1843 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1844 all_cmds = connection.pack_commands(
1845 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1846 )
1847 connection.send_packed_command(all_cmds)
1848 errors = []
1850 # parse off the response for MULTI
1851 # NOTE: we need to handle ResponseErrors here and continue
1852 # so that we read all the additional command messages from
1853 # the socket
1854 try:
1855 self.parse_response(connection, "_")
1856 except ResponseError as e:
1857 errors.append((0, e))
1859 # and all the other commands
1860 for i, command in enumerate(commands):
1861 if EMPTY_RESPONSE in command[1]:
1862 errors.append((i, command[1][EMPTY_RESPONSE]))
1863 else:
1864 try:
1865 self.parse_response(connection, "_")
1866 except ResponseError as e:
1867 self.annotate_exception(e, i + 1, command[0])
1868 errors.append((i, e))
1870 # parse the EXEC.
1871 try:
1872 response = self.parse_response(connection, "_")
1873 except ExecAbortError:
1874 if errors:
1875 raise errors[0][1]
1876 raise
1878 # EXEC clears any watched keys
1879 self.watching = False
1881 if response is None:
1882 raise WatchError("Watched variable changed.")
1884 # put any parse errors into the response
1885 for i, e in errors:
1886 response.insert(i, e)
1888 if len(response) != len(commands):
1889 self.connection.disconnect()
1890 raise ResponseError(
1891 "Wrong number of response items from pipeline execution"
1892 )
1894 # find any errors in the response and raise if necessary
1895 if raise_on_error:
1896 self.raise_first_error(commands, response)
1898 # We have to run response callbacks manually
1899 data = []
1900 for r, cmd in zip(response, commands):
1901 if not isinstance(r, Exception):
1902 args, options = cmd
1903 # Remove keys entry, it needs only for cache.
1904 options.pop("keys", None)
1905 command_name = args[0]
1906 if command_name in self.response_callbacks:
1907 r = self.response_callbacks[command_name](r, **options)
1908 data.append(r)
1910 return data
1912 def _execute_pipeline(self, connection, commands, raise_on_error):
1913 # build up all commands into a single request to increase network perf
1914 all_cmds = connection.pack_commands([args for args, _ in commands])
1915 connection.send_packed_command(all_cmds)
1917 responses = []
1918 for args, options in commands:
1919 try:
1920 responses.append(self.parse_response(connection, args[0], **options))
1921 except ResponseError as e:
1922 responses.append(e)
1924 if raise_on_error:
1925 self.raise_first_error(commands, responses)
1927 return responses
1929 def raise_first_error(self, commands, response):
1930 for i, r in enumerate(response):
1931 if isinstance(r, ResponseError):
1932 self.annotate_exception(r, i + 1, commands[i][0])
1933 raise r
1935 def annotate_exception(self, exception, number, command):
1936 cmd = " ".join(map(safe_str, command))
1937 msg = (
1938 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
1939 f"caused error: {exception.args[0]}"
1940 )
1941 exception.args = (msg,) + exception.args[1:]
1943 def parse_response(self, connection, command_name, **options):
1944 result = Redis.parse_response(self, connection, command_name, **options)
1945 if command_name in self.UNWATCH_COMMANDS:
1946 self.watching = False
1947 elif command_name == "WATCH":
1948 self.watching = True
1949 return result
1951 def load_scripts(self):
1952 # make sure all scripts that are about to be run on this pipeline exist
1953 scripts = list(self.scripts)
1954 immediate = self.immediate_execute_command
1955 shas = [s.sha for s in scripts]
1956 # we can't use the normal script_* methods because they would just
1957 # get buffered in the pipeline.
1958 exists = immediate("SCRIPT EXISTS", *shas)
1959 if not all(exists):
1960 for s, exist in zip(scripts, exists):
1961 if not exist:
1962 s.sha = immediate("SCRIPT LOAD", s.script)
1964 def _disconnect_raise_on_watching(
1965 self,
1966 conn: AbstractConnection,
1967 error: Exception,
1968 failure_count: Optional[int] = None,
1969 start_time: Optional[float] = None,
1970 command_name: Optional[str] = None,
1971 ) -> None:
1972 """
1973 Close the connection, raise an exception if we were watching.
1975 The supported exceptions are already checked in the
1976 retry object so we don't need to do it here.
1978 After we disconnect the connection, it will try to reconnect and
1979 do a health check as part of the send_command logic(on connection level).
1980 """
1981 if error and failure_count <= conn.retry.get_retries():
1982 record_operation_duration(
1983 command_name=command_name,
1984 duration_seconds=time.monotonic() - start_time,
1985 server_address=getattr(conn, "host", None),
1986 server_port=getattr(conn, "port", None),
1987 db_namespace=str(conn.db),
1988 error=error,
1989 retry_attempts=failure_count,
1990 )
1991 conn.disconnect()
1992 # if we were watching a variable, the watch is no longer valid
1993 # since this connection has died. raise a WatchError, which
1994 # indicates the user should retry this transaction.
1995 if self.watching:
1996 raise WatchError(
1997 f"A {type(error).__name__} occurred while watching one or more keys"
1998 )
2000 def execute(self, raise_on_error: bool = True) -> List[Any]:
2001 """Execute all the commands in the current pipeline"""
2002 stack = self.command_stack
2003 if not stack and not self.watching:
2004 return []
2005 if self.scripts:
2006 self.load_scripts()
2007 if self.transaction or self.explicit_transaction:
2008 execute = self._execute_transaction
2009 operation_name = "MULTI"
2010 else:
2011 execute = self._execute_pipeline
2012 operation_name = "PIPELINE"
2014 conn = self.connection
2015 if not conn:
2016 conn = self.connection_pool.get_connection()
2017 # assign to self.connection so reset() releases the connection
2018 # back to the pool after we're done
2019 self.connection = conn
2021 # Start timing for observability
2022 start_time = time.monotonic()
2023 # Track actual retry attempts for error reporting
2024 actual_retry_attempts = [0]
2026 def failure_callback(error, failure_count):
2027 if is_debug_log_enabled():
2028 add_debug_log_for_operation_failure(conn)
2029 actual_retry_attempts[0] = failure_count
2030 self._disconnect_raise_on_watching(
2031 conn, error, failure_count, start_time, operation_name
2032 )
2034 try:
2035 response = conn.retry.call_with_retry(
2036 lambda: execute(conn, stack, raise_on_error),
2037 failure_callback,
2038 with_failure_count=True,
2039 )
2041 record_operation_duration(
2042 command_name=operation_name,
2043 duration_seconds=time.monotonic() - start_time,
2044 server_address=getattr(conn, "host", None),
2045 server_port=getattr(conn, "port", None),
2046 db_namespace=str(conn.db),
2047 )
2048 return response
2049 except Exception as e:
2050 record_error_count(
2051 server_address=getattr(conn, "host", None),
2052 server_port=getattr(conn, "port", None),
2053 network_peer_address=getattr(conn, "host", None),
2054 network_peer_port=getattr(conn, "port", None),
2055 error_type=e,
2056 retry_attempts=actual_retry_attempts[0],
2057 is_internal=False,
2058 )
2059 raise
2061 finally:
2062 # in reset() the connection is disconnected before returned to the pool if
2063 # it is marked for reconnect.
2064 self.reset()
2066 def discard(self):
2067 """
2068 Flushes all previously queued commands
2069 See: https://redis.io/commands/DISCARD
2070 """
2071 self.execute_command("DISCARD")
2073 def watch(self, *names):
2074 """Watches the values at keys ``names``"""
2075 if self.explicit_transaction:
2076 raise RedisError("Cannot issue a WATCH after a MULTI")
2077 return self.execute_command("WATCH", *names)
2079 def unwatch(self) -> bool:
2080 """Unwatches all previously specified keys"""
2081 return self.watching and self.execute_command("UNWATCH") or True