Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import logging
3import re
4import threading
5import time
6from itertools import chain
7from typing import (
8 TYPE_CHECKING,
9 Any,
10 Callable,
11 Dict,
12 List,
13 Literal,
14 Mapping,
15 Optional,
16 Set,
17 Type,
18 Union,
19)
21from redis._parsers.encoders import Encoder
22from redis._parsers.helpers import (
23 _RedisCallbacks,
24 _RedisCallbacksRESP2,
25 _RedisCallbacksRESP3,
26 bool_ok,
27)
28from redis._parsers.socket import SENTINEL
29from redis.backoff import ExponentialWithJitterBackoff
30from redis.cache import CacheConfig, CacheInterface
31from redis.commands import (
32 CoreCommands,
33 RedisModuleCommands,
34 SentinelCommands,
35 list_or_args,
36)
37from redis.commands.core import Script
38from redis.connection import (
39 AbstractConnection,
40 Connection,
41 ConnectionPool,
42 SSLConnection,
43 UnixDomainSocketConnection,
44)
45from redis.credentials import CredentialProvider
46from redis.driver_info import DriverInfo, resolve_driver_info
47from redis.event import (
48 AfterPooledConnectionsInstantiationEvent,
49 AfterPubSubConnectionInstantiationEvent,
50 AfterSingleConnectionInstantiationEvent,
51 ClientType,
52 EventDispatcher,
53)
54from redis.exceptions import (
55 ConnectionError,
56 ExecAbortError,
57 PubSubError,
58 RedisError,
59 ResponseError,
60 WatchError,
61)
62from redis.lock import Lock
63from redis.maint_notifications import (
64 MaintNotificationsConfig,
65 OSSMaintNotificationsHandler,
66)
67from redis.observability.attributes import PubSubDirection
68from redis.observability.recorder import (
69 record_error_count,
70 record_operation_duration,
71 record_pubsub_message,
72)
73from redis.retry import Retry
74from redis.utils import (
75 _set_info_logger,
76 check_protocol_version,
77 deprecated_args,
78 safe_str,
79 str_if_bytes,
80 truncate_text,
81)
83if TYPE_CHECKING:
84 import ssl
86 import OpenSSL
88 from redis.keyspace_notifications import KeyspaceNotifications
90SYM_EMPTY = b""
91EMPTY_RESPONSE = "EMPTY_RESPONSE"
93# some responses (ie. dump) are binary, and just meant to never be decoded
94NEVER_DECODE = "NEVER_DECODE"
97logger = logging.getLogger(__name__)
100def is_debug_log_enabled():
101 return logger.isEnabledFor(logging.DEBUG)
104def add_debug_log_for_operation_failure(connection: "AbstractConnection"):
105 logger.debug(
106 f"Operation failed, "
107 f"with connection: {connection}, details: {connection.extract_connection_details() if connection else 'no connection'}",
108 )
111class CaseInsensitiveDict(dict):
112 "Case insensitive dict implementation. Assumes string keys only."
114 def __init__(self, data: Dict[str, str]) -> None:
115 for k, v in data.items():
116 self[k.upper()] = v
118 def __contains__(self, k):
119 return super().__contains__(k.upper())
121 def __delitem__(self, k):
122 super().__delitem__(k.upper())
124 def __getitem__(self, k):
125 return super().__getitem__(k.upper())
127 def get(self, k, default=None):
128 return super().get(k.upper(), default)
130 def __setitem__(self, k, v):
131 super().__setitem__(k.upper(), v)
133 def update(self, data):
134 data = CaseInsensitiveDict(data)
135 super().update(data)
138class AbstractRedis:
139 pass
142class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
143 """
144 Implementation of the Redis protocol.
146 This abstract class provides a Python interface to all Redis commands
147 and an implementation of the Redis protocol.
149 Pipelines derive from this, implementing how
150 the commands are sent and received to the Redis server. Based on
151 configuration, an instance will either use a ConnectionPool, or
152 Connection object to talk to redis.
154 It is not safe to pass PubSub or Pipeline objects between threads.
155 """
157 # Type discrimination marker for @overload self-type pattern
158 _is_async_client: Literal[False] = False
160 @classmethod
161 def from_url(cls, url: str, **kwargs) -> "Redis":
162 """
163 Return a Redis client object configured from the given URL
165 For example::
167 redis://[[username]:[password]]@localhost:6379/0
168 rediss://[[username]:[password]]@localhost:6379/0
169 unix://[username@]/path/to/socket.sock?db=0[&password=password]
171 Three URL schemes are supported:
173 - `redis://` creates a TCP socket connection. See more at:
174 <https://www.iana.org/assignments/uri-schemes/prov/redis>
175 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
176 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
177 - ``unix://``: creates a Unix Domain Socket connection.
179 The username, password, hostname, path and all querystring values
180 are passed through urllib.parse.unquote in order to replace any
181 percent-encoded values with their corresponding characters.
183 There are several ways to specify a database number. The first value
184 found will be used:
186 1. A ``db`` querystring option, e.g. redis://localhost?db=0
187 2. If using the redis:// or rediss:// schemes, the path argument
188 of the url, e.g. redis://localhost/0
189 3. A ``db`` keyword argument to this function.
191 If none of these options are specified, the default db=0 is used.
193 All querystring options are cast to their appropriate Python types.
194 Boolean arguments can be specified with string values "True"/"False"
195 or "Yes"/"No". Values that cannot be properly cast cause a
196 ``ValueError`` to be raised. Once parsed, the querystring arguments
197 and keyword arguments are passed to the ``ConnectionPool``'s
198 class initializer. In the case of conflicting arguments, querystring
199 arguments always win.
201 """
202 single_connection_client = kwargs.pop("single_connection_client", False)
203 connection_pool = ConnectionPool.from_url(url, **kwargs)
204 client = cls(
205 connection_pool=connection_pool,
206 single_connection_client=single_connection_client,
207 )
208 client.auto_close_connection_pool = True
209 return client
211 @classmethod
212 def from_pool(
213 cls: Type["Redis"],
214 connection_pool: ConnectionPool,
215 ) -> "Redis":
216 """
217 Return a Redis client from the given connection pool.
218 The Redis client will take ownership of the connection pool and
219 close it when the Redis client is closed.
220 """
221 client = cls(
222 connection_pool=connection_pool,
223 )
224 client.auto_close_connection_pool = True
225 return client
227 @deprecated_args(
228 args_to_warn=["retry_on_timeout"],
229 reason="TimeoutError is included by default.",
230 version="6.0.0",
231 )
232 @deprecated_args(
233 args_to_warn=["lib_name", "lib_version"],
234 reason="Use 'driver_info' parameter instead. "
235 "lib_name and lib_version will be removed in a future version.",
236 )
237 def __init__(
238 self,
239 host: str = "localhost",
240 port: int = 6379,
241 db: int = 0,
242 password: Optional[str] = None,
243 socket_timeout: Optional[float] = None,
244 socket_connect_timeout: Optional[float] = None,
245 socket_keepalive: Optional[bool] = None,
246 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
247 connection_pool: Optional[ConnectionPool] = None,
248 unix_socket_path: Optional[str] = None,
249 encoding: str = "utf-8",
250 encoding_errors: str = "strict",
251 decode_responses: bool = False,
252 retry_on_timeout: bool = False,
253 retry: Retry = Retry(
254 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
255 ),
256 retry_on_error: Optional[List[Type[Exception]]] = None,
257 ssl: bool = False,
258 ssl_keyfile: Optional[str] = None,
259 ssl_certfile: Optional[str] = None,
260 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
261 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
262 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
263 ssl_ca_certs: Optional[str] = None,
264 ssl_ca_path: Optional[str] = None,
265 ssl_ca_data: Optional[str] = None,
266 ssl_check_hostname: bool = True,
267 ssl_password: Optional[str] = None,
268 ssl_validate_ocsp: bool = False,
269 ssl_validate_ocsp_stapled: bool = False,
270 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
271 ssl_ocsp_expected_cert: Optional[str] = None,
272 ssl_min_version: Optional["ssl.TLSVersion"] = None,
273 ssl_ciphers: Optional[str] = None,
274 max_connections: Optional[int] = None,
275 single_connection_client: bool = False,
276 health_check_interval: int = 0,
277 client_name: Optional[str] = None,
278 lib_name: Optional[str] = None,
279 lib_version: Optional[str] = None,
280 driver_info: Optional["DriverInfo"] = None,
281 username: Optional[str] = None,
282 redis_connect_func: Optional[Callable[[], None]] = None,
283 credential_provider: Optional[CredentialProvider] = None,
284 protocol: Optional[int] = 2,
285 cache: Optional[CacheInterface] = None,
286 cache_config: Optional[CacheConfig] = None,
287 event_dispatcher: Optional[EventDispatcher] = None,
288 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
289 oss_cluster_maint_notifications_handler: Optional[
290 OSSMaintNotificationsHandler
291 ] = None,
292 ) -> None:
293 """
294 Initialize a new Redis client.
296 To specify a retry policy for specific errors, you have two options:
298 1. Set the `retry_on_error` to a list of the error/s to retry on, and
299 you can also set `retry` to a valid `Retry` object(in case the default
300 one is not appropriate) - with this approach the retries will be triggered
301 on the default errors specified in the Retry object enriched with the
302 errors specified in `retry_on_error`.
304 2. Define a `Retry` object with configured 'supported_errors' and set
305 it to the `retry` parameter - with this approach you completely redefine
306 the errors on which retries will happen.
308 `retry_on_timeout` is deprecated - please include the TimeoutError
309 either in the Retry object or in the `retry_on_error` list.
311 When 'connection_pool' is provided - the retry configuration of the
312 provided pool will be used.
314 Args:
316 single_connection_client:
317 if `True`, connection pool is not used. In that case `Redis`
318 instance use is not thread safe.
319 decode_responses:
320 if `True`, the response will be decoded to utf-8.
321 Argument is ignored when connection_pool is provided.
322 driver_info:
323 Optional DriverInfo object to identify upstream libraries.
324 If provided, lib_name and lib_version are ignored.
325 If not provided, a DriverInfo will be created from lib_name and lib_version.
326 Argument is ignored when connection_pool is provided.
327 lib_name:
328 **Deprecated.** Use driver_info instead. Library name for CLIENT SETINFO.
329 lib_version:
330 **Deprecated.** Use driver_info instead. Library version for CLIENT SETINFO.
331 maint_notifications_config:
332 configures the pool to support maintenance notifications - see
333 `redis.maint_notifications.MaintNotificationsConfig` for details.
334 Only supported with RESP3
335 If not provided and protocol is RESP3, the maintenance notifications
336 will be enabled by default (logic is included in the connection pool
337 initialization).
338 Argument is ignored when connection_pool is provided.
339 oss_cluster_maint_notifications_handler:
340 handler for OSS cluster notifications - see
341 `redis.maint_notifications.OSSMaintNotificationsHandler` for details.
342 Only supported with RESP3
343 Argument is ignored when connection_pool is provided.
344 """
345 if event_dispatcher is None:
346 self._event_dispatcher = EventDispatcher()
347 else:
348 self._event_dispatcher = event_dispatcher
349 if not connection_pool:
350 if not retry_on_error:
351 retry_on_error = []
353 # Handle driver_info: if provided, use it; otherwise create from lib_name/lib_version
354 computed_driver_info = resolve_driver_info(
355 driver_info, lib_name, lib_version
356 )
358 kwargs = {
359 "db": db,
360 "username": username,
361 "password": password,
362 "socket_timeout": socket_timeout,
363 "encoding": encoding,
364 "encoding_errors": encoding_errors,
365 "decode_responses": decode_responses,
366 "retry_on_error": retry_on_error,
367 "retry": copy.deepcopy(retry),
368 "max_connections": max_connections,
369 "health_check_interval": health_check_interval,
370 "client_name": client_name,
371 "driver_info": computed_driver_info,
372 "redis_connect_func": redis_connect_func,
373 "credential_provider": credential_provider,
374 "protocol": protocol,
375 }
376 # based on input, setup appropriate connection args
377 if unix_socket_path is not None:
378 kwargs.update(
379 {
380 "path": unix_socket_path,
381 "connection_class": UnixDomainSocketConnection,
382 }
383 )
384 else:
385 # TCP specific options
386 kwargs.update(
387 {
388 "host": host,
389 "port": port,
390 "socket_connect_timeout": socket_connect_timeout,
391 "socket_keepalive": socket_keepalive,
392 "socket_keepalive_options": socket_keepalive_options,
393 }
394 )
396 if ssl:
397 kwargs.update(
398 {
399 "connection_class": SSLConnection,
400 "ssl_keyfile": ssl_keyfile,
401 "ssl_certfile": ssl_certfile,
402 "ssl_cert_reqs": ssl_cert_reqs,
403 "ssl_include_verify_flags": ssl_include_verify_flags,
404 "ssl_exclude_verify_flags": ssl_exclude_verify_flags,
405 "ssl_ca_certs": ssl_ca_certs,
406 "ssl_ca_data": ssl_ca_data,
407 "ssl_check_hostname": ssl_check_hostname,
408 "ssl_password": ssl_password,
409 "ssl_ca_path": ssl_ca_path,
410 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
411 "ssl_validate_ocsp": ssl_validate_ocsp,
412 "ssl_ocsp_context": ssl_ocsp_context,
413 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
414 "ssl_min_version": ssl_min_version,
415 "ssl_ciphers": ssl_ciphers,
416 }
417 )
418 if (cache_config or cache) and check_protocol_version(protocol, 3):
419 kwargs.update(
420 {
421 "cache": cache,
422 "cache_config": cache_config,
423 }
424 )
425 maint_notifications_enabled = (
426 maint_notifications_config and maint_notifications_config.enabled
427 )
428 if maint_notifications_enabled and protocol not in [
429 3,
430 "3",
431 ]:
432 raise RedisError(
433 "Maintenance notifications handlers on connection are only supported with RESP version 3"
434 )
435 if maint_notifications_config:
436 kwargs.update(
437 {
438 "maint_notifications_config": maint_notifications_config,
439 }
440 )
441 if oss_cluster_maint_notifications_handler:
442 kwargs.update(
443 {
444 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
445 }
446 )
447 connection_pool = ConnectionPool(**kwargs)
448 self._event_dispatcher.dispatch(
449 AfterPooledConnectionsInstantiationEvent(
450 [connection_pool], ClientType.SYNC, credential_provider
451 )
452 )
453 self.auto_close_connection_pool = True
454 else:
455 self.auto_close_connection_pool = False
456 self._event_dispatcher.dispatch(
457 AfterPooledConnectionsInstantiationEvent(
458 [connection_pool], ClientType.SYNC, credential_provider
459 )
460 )
462 self.connection_pool = connection_pool
464 if (cache_config or cache) and self.connection_pool.get_protocol() not in [
465 3,
466 "3",
467 ]:
468 raise RedisError("Client caching is only supported with RESP version 3")
470 self.single_connection_lock = threading.RLock()
471 self.connection = None
472 self._single_connection_client = single_connection_client
473 if self._single_connection_client:
474 self.connection = self.connection_pool.get_connection()
475 self._event_dispatcher.dispatch(
476 AfterSingleConnectionInstantiationEvent(
477 self.connection, ClientType.SYNC, self.single_connection_lock
478 )
479 )
481 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks)
483 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
484 self.response_callbacks.update(_RedisCallbacksRESP3)
485 else:
486 self.response_callbacks.update(_RedisCallbacksRESP2)
488 def __repr__(self) -> str:
489 return (
490 f"<{type(self).__module__}.{type(self).__name__}"
491 f"({repr(self.connection_pool)})>"
492 )
494 def get_encoder(self) -> "Encoder":
495 """Get the connection pool's encoder"""
496 return self.connection_pool.get_encoder()
498 def get_connection_kwargs(self) -> Dict:
499 """Get the connection's key-word arguments"""
500 return self.connection_pool.connection_kwargs
502 def get_retry(self) -> Optional[Retry]:
503 return self.get_connection_kwargs().get("retry")
505 def set_retry(self, retry: Retry) -> None:
506 self.get_connection_kwargs().update({"retry": retry})
507 self.connection_pool.set_retry(retry)
509 def set_response_callback(self, command: str, callback: Callable) -> None:
510 """Set a custom Response Callback"""
511 self.response_callbacks[command] = callback
513 def load_external_module(self, funcname, func) -> None:
514 """
515 This function can be used to add externally defined redis modules,
516 and their namespaces to the redis client.
518 funcname - A string containing the name of the function to create
519 func - The function, being added to this class.
521 ex: Assume that one has a custom redis module named foomod that
522 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
523 To load function functions into this namespace:
525 from redis import Redis
526 from foomodule import F
527 r = Redis()
528 r.load_external_module("foo", F)
529 r.foo().dothing('your', 'arguments')
531 For a concrete example see the reimport of the redisjson module in
532 tests/test_connection.py::test_loading_external_modules
533 """
534 setattr(self, funcname, func)
536 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
537 """
538 Return a new pipeline object that can queue multiple commands for
539 later execution. ``transaction`` indicates whether all commands
540 should be executed atomically. Apart from making a group of operations
541 atomic, pipelines are useful for reducing the back-and-forth overhead
542 between the client and server.
543 """
544 return Pipeline(
545 self.connection_pool, self.response_callbacks, transaction, shard_hint
546 )
548 def transaction(
549 self, func: Callable[["Pipeline"], None], *watches, **kwargs
550 ) -> Union[List[Any], Any, None]:
551 """
552 Convenience method for executing the callable `func` as a transaction
553 while watching all keys specified in `watches`. The 'func' callable
554 should expect a single argument which is a Pipeline object.
555 """
556 shard_hint = kwargs.pop("shard_hint", None)
557 value_from_callable = kwargs.pop("value_from_callable", False)
558 watch_delay = kwargs.pop("watch_delay", None)
559 with self.pipeline(True, shard_hint) as pipe:
560 while True:
561 try:
562 if watches:
563 pipe.watch(*watches)
564 func_value = func(pipe)
565 exec_value = pipe.execute()
566 return func_value if value_from_callable else exec_value
567 except WatchError:
568 if watch_delay is not None and watch_delay > 0:
569 time.sleep(watch_delay)
570 continue
572 def lock(
573 self,
574 name: str,
575 timeout: Optional[float] = None,
576 sleep: float = 0.1,
577 blocking: bool = True,
578 blocking_timeout: Optional[float] = None,
579 lock_class: Union[None, Any] = None,
580 thread_local: bool = True,
581 raise_on_release_error: bool = True,
582 ):
583 """
584 Return a new Lock object using key ``name`` that mimics
585 the behavior of threading.Lock.
587 If specified, ``timeout`` indicates a maximum life for the lock.
588 By default, it will remain locked until release() is called.
590 ``sleep`` indicates the amount of time to sleep per loop iteration
591 when the lock is in blocking mode and another client is currently
592 holding the lock.
594 ``blocking`` indicates whether calling ``acquire`` should block until
595 the lock has been acquired or to fail immediately, causing ``acquire``
596 to return False and the lock not being acquired. Defaults to True.
597 Note this value can be overridden by passing a ``blocking``
598 argument to ``acquire``.
600 ``blocking_timeout`` indicates the maximum amount of time in seconds to
601 spend trying to acquire the lock. A value of ``None`` indicates
602 continue trying forever. ``blocking_timeout`` can be specified as a
603 float or integer, both representing the number of seconds to wait.
605 ``lock_class`` forces the specified lock implementation. Note that as
606 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
607 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
608 you have created your own custom lock class.
610 ``thread_local`` indicates whether the lock token is placed in
611 thread-local storage. By default, the token is placed in thread local
612 storage so that a thread only sees its token, not a token set by
613 another thread. Consider the following timeline:
615 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
616 thread-1 sets the token to "abc"
617 time: 1, thread-2 blocks trying to acquire `my-lock` using the
618 Lock instance.
619 time: 5, thread-1 has not yet completed. redis expires the lock
620 key.
621 time: 5, thread-2 acquired `my-lock` now that it's available.
622 thread-2 sets the token to "xyz"
623 time: 6, thread-1 finishes its work and calls release(). if the
624 token is *not* stored in thread local storage, then
625 thread-1 would see the token value as "xyz" and would be
626 able to successfully release the thread-2's lock.
628 ``raise_on_release_error`` indicates whether to raise an exception when
629 the lock is no longer owned when exiting the context manager. By default,
630 this is True, meaning an exception will be raised. If False, the warning
631 will be logged and the exception will be suppressed.
633 In some use cases it's necessary to disable thread local storage. For
634 example, if you have code where one thread acquires a lock and passes
635 that lock instance to a worker thread to release later. If thread
636 local storage isn't disabled in this case, the worker thread won't see
637 the token set by the thread that acquired the lock. Our assumption
638 is that these cases aren't common and as such default to using
639 thread local storage."""
640 if lock_class is None:
641 lock_class = Lock
642 return lock_class(
643 self,
644 name,
645 timeout=timeout,
646 sleep=sleep,
647 blocking=blocking,
648 blocking_timeout=blocking_timeout,
649 thread_local=thread_local,
650 raise_on_release_error=raise_on_release_error,
651 )
653 def pubsub(self, **kwargs):
654 """
655 Return a Publish/Subscribe object. With this object, you can
656 subscribe to channels and listen for messages that get published to
657 them.
658 """
659 return PubSub(
660 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs
661 )
663 def keyspace_notifications(
664 self,
665 key_prefix: Union[str, bytes, None] = None,
666 ignore_subscribe_messages: bool = True,
667 ) -> "KeyspaceNotifications":
668 """
669 Return a :class:`~redis.keyspace_notifications.KeyspaceNotifications`
670 object for subscribing to keyspace and keyevent notifications.
672 Note: Keyspace notifications must be enabled on the Redis server via
673 the ``notify-keyspace-events`` configuration option.
675 Args:
676 key_prefix: Optional prefix to filter and strip from keys in
677 notifications.
678 ignore_subscribe_messages: If True, subscribe/unsubscribe
679 confirmations are not returned by
680 get_message/listen.
681 """
682 from redis.keyspace_notifications import KeyspaceNotifications
684 return KeyspaceNotifications(
685 self,
686 key_prefix=key_prefix,
687 ignore_subscribe_messages=ignore_subscribe_messages,
688 )
690 def monitor(self):
691 return Monitor(self.connection_pool)
693 def client(self):
694 return self.__class__(
695 connection_pool=self.connection_pool,
696 single_connection_client=True,
697 )
699 def __enter__(self):
700 return self
702 def __exit__(self, exc_type, exc_value, traceback):
703 self.close()
705 def __del__(self):
706 try:
707 self.close()
708 except Exception:
709 pass
711 def close(self) -> None:
712 # In case a connection property does not yet exist
713 # (due to a crash earlier in the Redis() constructor), return
714 # immediately as there is nothing to clean-up.
715 if not hasattr(self, "connection"):
716 return
718 conn = self.connection
719 if conn:
720 self.connection = None
721 self.connection_pool.release(conn)
723 if self.auto_close_connection_pool:
724 self.connection_pool.disconnect()
726 def _send_command_parse_response(self, conn, command_name, *args, **options):
727 """
728 Send a command and parse the response
729 """
730 conn.send_command(*args, **options)
731 return self.parse_response(conn, command_name, **options)
733 def _close_connection(
734 self,
735 conn,
736 error: Optional[Exception] = None,
737 failure_count: Optional[int] = None,
738 start_time: Optional[float] = None,
739 command_name: Optional[str] = None,
740 ) -> None:
741 """
742 Close the connection before retrying.
744 The supported exceptions are already checked in the
745 retry object so we don't need to do it here.
747 After we disconnect the connection, it will try to reconnect and
748 do a health check as part of the send_command logic(on connection level).
749 """
750 if error and failure_count <= conn.retry.get_retries():
751 record_operation_duration(
752 command_name=command_name,
753 duration_seconds=time.monotonic() - start_time,
754 server_address=getattr(conn, "host", None),
755 server_port=getattr(conn, "port", None),
756 db_namespace=str(conn.db),
757 error=error,
758 retry_attempts=failure_count,
759 )
761 conn.disconnect()
763 # COMMAND EXECUTION AND PROTOCOL PARSING
764 def execute_command(self, *args, **options):
765 return self._execute_command(*args, **options)
767 def _execute_command(self, *args, **options):
768 """Execute a command and return a parsed response"""
769 pool = self.connection_pool
770 command_name = args[0]
771 conn = self.connection or pool.get_connection()
773 # Start timing for observability
774 start_time = time.monotonic()
775 # Track actual retry attempts for error reporting
776 actual_retry_attempts = [0]
778 def failure_callback(error, failure_count):
779 if is_debug_log_enabled():
780 add_debug_log_for_operation_failure(conn)
781 actual_retry_attempts[0] = failure_count
782 self._close_connection(conn, error, failure_count, start_time, command_name)
784 if self._single_connection_client:
785 self.single_connection_lock.acquire()
786 try:
787 result = conn.retry.call_with_retry(
788 lambda: self._send_command_parse_response(
789 conn, command_name, *args, **options
790 ),
791 failure_callback,
792 with_failure_count=True,
793 )
795 record_operation_duration(
796 command_name=command_name,
797 duration_seconds=time.monotonic() - start_time,
798 server_address=getattr(conn, "host", None),
799 server_port=getattr(conn, "port", None),
800 db_namespace=str(conn.db),
801 )
802 return result
803 except Exception as e:
804 record_error_count(
805 server_address=getattr(conn, "host", None),
806 server_port=getattr(conn, "port", None),
807 network_peer_address=getattr(conn, "host", None),
808 network_peer_port=getattr(conn, "port", None),
809 error_type=e,
810 retry_attempts=actual_retry_attempts[0],
811 is_internal=False,
812 )
813 raise
815 finally:
816 if conn and conn.should_reconnect():
817 self._close_connection(conn)
818 conn.connect()
819 if self._single_connection_client:
820 self.single_connection_lock.release()
821 if not self.connection:
822 pool.release(conn)
824 def parse_response(self, connection, command_name, **options):
825 """Parses a response from the Redis server"""
826 try:
827 if NEVER_DECODE in options:
828 response = connection.read_response(disable_decoding=True)
829 options.pop(NEVER_DECODE)
830 else:
831 response = connection.read_response()
832 except ResponseError:
833 if EMPTY_RESPONSE in options:
834 return options[EMPTY_RESPONSE]
835 raise
837 if EMPTY_RESPONSE in options:
838 options.pop(EMPTY_RESPONSE)
840 # Remove keys entry, it needs only for cache.
841 options.pop("keys", None)
843 if command_name in self.response_callbacks:
844 return self.response_callbacks[command_name](response, **options)
845 return response
847 def get_cache(self) -> Optional[CacheInterface]:
848 return self.connection_pool.cache
851StrictRedis = Redis
854class Monitor:
855 """
856 Monitor is useful for handling the MONITOR command to the redis server.
857 next_command() method returns one command from monitor
858 listen() method yields commands from monitor.
859 """
861 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)")
862 command_re = re.compile(r'"(.*?)(?<!\\)"')
864 def __init__(self, connection_pool):
865 self.connection_pool = connection_pool
866 self.connection = self.connection_pool.get_connection()
868 def __enter__(self):
869 self._start_monitor()
870 return self
872 def __exit__(self, *args):
873 self.connection.disconnect()
874 self.connection_pool.release(self.connection)
876 def next_command(self):
877 """Parse the response from a monitor command"""
878 response = self.connection.read_response()
880 if response is None:
881 return None
883 if isinstance(response, bytes):
884 response = self.connection.encoder.decode(response, force=True)
886 command_time, command_data = response.split(" ", 1)
887 m = self.monitor_re.match(command_data)
888 db_id, client_info, command = m.groups()
889 command = " ".join(self.command_re.findall(command))
890 # Redis escapes double quotes because each piece of the command
891 # string is surrounded by double quotes. We don't have that
892 # requirement so remove the escaping and leave the quote.
893 command = command.replace('\\"', '"')
895 if client_info == "lua":
896 client_address = "lua"
897 client_port = ""
898 client_type = "lua"
899 elif client_info.startswith("unix"):
900 client_address = "unix"
901 client_port = client_info[5:]
902 client_type = "unix"
903 else:
904 if client_info == "":
905 client_address = ""
906 client_port = ""
907 client_type = "unknown"
908 else:
909 # use rsplit as ipv6 addresses contain colons
910 client_address, client_port = client_info.rsplit(":", 1)
911 client_type = "tcp"
912 return {
913 "time": float(command_time),
914 "db": int(db_id),
915 "client_address": client_address,
916 "client_port": client_port,
917 "client_type": client_type,
918 "command": command,
919 }
921 def listen(self):
922 """Listen for commands coming to the server."""
923 while True:
924 yield self.next_command()
926 def _start_monitor(self):
927 self.connection.send_command("MONITOR")
928 # check that monitor returns 'OK', but don't return it to user
929 response = self.connection.read_response()
931 if not bool_ok(response):
932 raise RedisError(f"MONITOR failed: {response}")
935class PubSub:
936 """
937 PubSub provides publish, subscribe and listen support to Redis channels.
939 After subscribing to one or more channels, the listen() method will block
940 until a message arrives on one of the subscribed channels. That message
941 will be returned and it's safe to start listening again.
942 """
944 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage")
945 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe")
946 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
948 def __init__(
949 self,
950 connection_pool,
951 shard_hint=None,
952 ignore_subscribe_messages: bool = False,
953 encoder: Optional["Encoder"] = None,
954 push_handler_func: Union[None, Callable[[str], None]] = None,
955 event_dispatcher: Optional["EventDispatcher"] = None,
956 ):
957 self.connection_pool = connection_pool
958 self.shard_hint = shard_hint
959 self.ignore_subscribe_messages = ignore_subscribe_messages
960 self.connection = None
961 self.subscribed_event = threading.Event()
962 # we need to know the encoding options for this connection in order
963 # to lookup channel and pattern names for callback handlers.
964 self.encoder = encoder
965 self.push_handler_func = push_handler_func
966 if event_dispatcher is None:
967 self._event_dispatcher = EventDispatcher()
968 else:
969 self._event_dispatcher = event_dispatcher
971 self._lock = threading.RLock()
972 if self.encoder is None:
973 self.encoder = self.connection_pool.get_encoder()
974 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
975 if self.encoder.decode_responses:
976 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
977 else:
978 self.health_check_response = [b"pong", self.health_check_response_b]
979 if self.push_handler_func is None:
980 _set_info_logger()
981 self.reset()
983 def __enter__(self) -> "PubSub":
984 return self
986 def __exit__(self, exc_type, exc_value, traceback) -> None:
987 self.reset()
989 def __del__(self) -> None:
990 try:
991 # if this object went out of scope prior to shutting down
992 # subscriptions, close the connection manually before
993 # returning it to the connection pool
994 self.reset()
995 except Exception:
996 pass
998 def reset(self) -> None:
999 if self.connection:
1000 self.connection.disconnect()
1001 self.connection.deregister_connect_callback(self.on_connect)
1002 self.connection_pool.release(self.connection)
1003 self.connection = None
1004 self.health_check_response_counter = 0
1005 self.channels = {}
1006 self.pending_unsubscribe_channels = set()
1007 self.shard_channels = {}
1008 self.pending_unsubscribe_shard_channels = set()
1009 self.patterns = {}
1010 self.pending_unsubscribe_patterns = set()
1011 self.subscribed_event.clear()
1013 def close(self) -> None:
1014 self.reset()
1016 def on_connect(self, connection) -> None:
1017 "Re-subscribe to any channels and patterns previously subscribed to"
1018 # NOTE: for python3, we can't pass bytestrings as keyword arguments
1019 # so we need to decode channel/pattern names back to unicode strings
1020 # before passing them to [p]subscribe.
1021 #
1022 # However, channels subscribed without a callback (positional args) may
1023 # have binary names that are not valid in the current encoding (e.g.
1024 # arbitrary bytes that are not valid UTF-8). These channels are stored
1025 # with a ``None`` handler. We re-subscribe them as positional args so
1026 # that no decoding is required.
1027 self.pending_unsubscribe_channels.clear()
1028 self.pending_unsubscribe_patterns.clear()
1029 self.pending_unsubscribe_shard_channels.clear()
1030 if self.channels:
1031 channels_with_handlers = {}
1032 channels_without_handlers = []
1033 for k, v in self.channels.items():
1034 if v is not None:
1035 channels_with_handlers[self.encoder.decode(k, force=True)] = v
1036 else:
1037 channels_without_handlers.append(k)
1038 if channels_with_handlers or channels_without_handlers:
1039 self.subscribe(*channels_without_handlers, **channels_with_handlers)
1040 if self.patterns:
1041 patterns_with_handlers = {}
1042 patterns_without_handlers = []
1043 for k, v in self.patterns.items():
1044 if v is not None:
1045 patterns_with_handlers[self.encoder.decode(k, force=True)] = v
1046 else:
1047 patterns_without_handlers.append(k)
1048 if patterns_with_handlers or patterns_without_handlers:
1049 self.psubscribe(*patterns_without_handlers, **patterns_with_handlers)
1050 if self.shard_channels:
1051 shard_with_handlers = {}
1052 shard_without_handlers = []
1053 for k, v in self.shard_channels.items():
1054 if v is not None:
1055 shard_with_handlers[self.encoder.decode(k, force=True)] = v
1056 else:
1057 shard_without_handlers.append(k)
1058 if shard_with_handlers or shard_without_handlers:
1059 self.ssubscribe(*shard_without_handlers, **shard_with_handlers)
1061 @property
1062 def subscribed(self) -> bool:
1063 """Indicates if there are subscriptions to any channels or patterns"""
1064 return self.subscribed_event.is_set()
1066 def execute_command(self, *args):
1067 """Execute a publish/subscribe command"""
1069 # NOTE: don't parse the response in this function -- it could pull a
1070 # legitimate message off the stack if the connection is already
1071 # subscribed to one or more channels
1073 if self.connection is None:
1074 self.connection = self.connection_pool.get_connection()
1075 # register a callback that re-subscribes to any channels we
1076 # were listening to when we were disconnected
1077 self.connection.register_connect_callback(self.on_connect)
1078 if self.push_handler_func is not None:
1079 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
1080 self._event_dispatcher.dispatch(
1081 AfterPubSubConnectionInstantiationEvent(
1082 self.connection, self.connection_pool, ClientType.SYNC, self._lock
1083 )
1084 )
1085 connection = self.connection
1086 kwargs = {"check_health": not self.subscribed}
1087 if not self.subscribed:
1088 self.clean_health_check_responses()
1089 with self._lock:
1090 self._execute(connection, connection.send_command, *args, **kwargs)
1092 def clean_health_check_responses(self) -> None:
1093 """
1094 If any health check responses are present, clean them
1095 """
1096 ttl = 10
1097 conn = self.connection
1098 while conn and self.health_check_response_counter > 0 and ttl > 0:
1099 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
1100 response = self._execute(conn, conn.read_response)
1101 if self.is_health_check_response(response):
1102 self.health_check_response_counter -= 1
1103 else:
1104 raise PubSubError(
1105 "A non health check response was cleaned by "
1106 "execute_command: {}".format(response)
1107 )
1108 ttl -= 1
1110 def _reconnect(
1111 self,
1112 conn,
1113 error: Optional[Exception] = None,
1114 failure_count: Optional[int] = None,
1115 start_time: Optional[float] = None,
1116 command_name: Optional[str] = None,
1117 ) -> None:
1118 """
1119 The supported exceptions are already checked in the
1120 retry object so we don't need to do it here.
1122 In this error handler we are trying to reconnect to the server.
1123 """
1124 if error and failure_count <= conn.retry.get_retries():
1125 if command_name:
1126 record_operation_duration(
1127 command_name=command_name,
1128 duration_seconds=time.monotonic() - start_time,
1129 server_address=getattr(conn, "host", None),
1130 server_port=getattr(conn, "port", None),
1131 db_namespace=str(conn.db),
1132 error=error,
1133 retry_attempts=failure_count,
1134 )
1135 conn.disconnect()
1136 conn.connect()
1138 def _execute(self, conn, command, *args, **kwargs):
1139 """
1140 Connect manually upon disconnection. If the Redis server is down,
1141 this will fail and raise a ConnectionError as desired.
1142 After reconnection, the ``on_connect`` callback should have been
1143 called by the # connection to resubscribe us to any channels and
1144 patterns we were previously listening to
1145 """
1147 if conn.should_reconnect():
1148 self._reconnect(conn)
1150 if not len(args) == 0:
1151 command_name = args[0]
1152 else:
1153 command_name = None
1155 # Start timing for observability
1156 start_time = time.monotonic()
1157 # Track actual retry attempts for error reporting
1158 actual_retry_attempts = [0]
1160 def failure_callback(error, failure_count):
1161 actual_retry_attempts[0] = failure_count
1162 self._reconnect(conn, error, failure_count, start_time, command_name)
1164 try:
1165 response = conn.retry.call_with_retry(
1166 lambda: command(*args, **kwargs),
1167 failure_callback,
1168 with_failure_count=True,
1169 )
1171 if command_name:
1172 record_operation_duration(
1173 command_name=command_name,
1174 duration_seconds=time.monotonic() - start_time,
1175 server_address=getattr(conn, "host", None),
1176 server_port=getattr(conn, "port", None),
1177 db_namespace=str(conn.db),
1178 )
1180 return response
1181 except Exception as e:
1182 record_error_count(
1183 server_address=getattr(conn, "host", None),
1184 server_port=getattr(conn, "port", None),
1185 network_peer_address=getattr(conn, "host", None),
1186 network_peer_port=getattr(conn, "port", None),
1187 error_type=e,
1188 retry_attempts=actual_retry_attempts[0],
1189 is_internal=False,
1190 )
1191 raise
1193 def parse_response(self, block=True, timeout=0):
1194 """
1195 Parse the response from a publish/subscribe command.
1197 Args:
1198 block: If True, block indefinitely until a message is available.
1199 If False, return immediately if no message is available.
1200 Default: True
1201 timeout: The timeout in seconds for reading a response when block=False.
1202 This parameter is ignored when block=True.
1203 Default: 0 (return immediately if no data available)
1205 Returns:
1206 The parsed response from the server, or None if no message is available
1207 within the timeout period (when block=False).
1209 Important:
1210 The block and timeout parameters work together:
1211 - When block=True: timeout is IGNORED, method blocks indefinitely
1212 - When block=False: timeout is USED, method returns after timeout expires
1214 Typically, you should use get_message(timeout=X) instead of calling
1215 parse_response() directly. The get_message() method automatically sets
1216 block=False when a timeout is provided, and block=True when timeout=None.
1218 Example:
1219 # Block indefinitely (timeout is ignored)
1220 response = pubsub.parse_response(block=True, timeout=0.1)
1222 # Non-blocking with 0.1 second timeout
1223 response = pubsub.parse_response(block=False, timeout=0.1)
1225 # Non-blocking, return immediately
1226 response = pubsub.parse_response(block=False, timeout=0)
1228 # Recommended: use get_message() instead
1229 msg = pubsub.get_message(timeout=0.1) # automatically sets block=False
1230 msg = pubsub.get_message(timeout=None) # automatically sets block=True
1231 """
1232 conn = self.connection
1233 if conn is None:
1234 raise RuntimeError(
1235 "pubsub connection not set: "
1236 "did you forget to call subscribe() or psubscribe()?"
1237 )
1239 self.check_health()
1241 def try_read():
1242 if not block:
1243 if not conn.can_read(timeout=timeout):
1244 return None
1245 read_timeout = timeout
1246 else:
1247 conn.connect()
1248 read_timeout = SENTINEL # Use default socket timeout for blocking
1249 return conn.read_response(
1250 disconnect_on_error=False, push_request=True, timeout=read_timeout
1251 )
1253 response = self._execute(conn, try_read)
1255 if self.is_health_check_response(response):
1256 # ignore the health check message as user might not expect it
1257 self.health_check_response_counter -= 1
1258 return None
1259 return response
1261 def is_health_check_response(self, response) -> bool:
1262 """
1263 Check if the response is a health check response.
1264 If there are no subscriptions redis responds to PING command with a
1265 bulk response, instead of a multi-bulk with "pong" and the response.
1266 """
1267 if self.encoder.decode_responses:
1268 return (
1269 response
1270 in [
1271 self.health_check_response, # If there is a subscription
1272 self.HEALTH_CHECK_MESSAGE, # If there are no subscriptions and decode_responses=True
1273 ]
1274 )
1275 else:
1276 return (
1277 response
1278 in [
1279 self.health_check_response, # If there is a subscription
1280 self.health_check_response_b, # If there isn't a subscription and decode_responses=False
1281 ]
1282 )
1284 def check_health(self) -> None:
1285 conn = self.connection
1286 if conn is None:
1287 raise RuntimeError(
1288 "pubsub connection not set: "
1289 "did you forget to call subscribe() or psubscribe()?"
1290 )
1292 if conn.health_check_interval and time.monotonic() > conn.next_health_check:
1293 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
1294 self.health_check_response_counter += 1
1296 def _normalize_keys(self, data) -> Dict:
1297 """
1298 normalize channel/pattern names to be either bytes or strings
1299 based on whether responses are automatically decoded. this saves us
1300 from coercing the value for each message coming in.
1301 """
1302 encode = self.encoder.encode
1303 decode = self.encoder.decode
1304 return {decode(encode(k)): v for k, v in data.items()}
1306 def psubscribe(self, *args, **kwargs):
1307 """
1308 Subscribe to channel patterns. Patterns supplied as keyword arguments
1309 expect a pattern name as the key and a callable as the value. A
1310 pattern's callable will be invoked automatically when a message is
1311 received on that pattern rather than producing a message via
1312 ``listen()``.
1313 """
1314 if args:
1315 args = list_or_args(args[0], args[1:])
1316 new_patterns = dict.fromkeys(args)
1317 new_patterns.update(kwargs)
1318 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
1319 # update the patterns dict AFTER we send the command. we don't want to
1320 # subscribe twice to these patterns, once for the command and again
1321 # for the reconnection.
1322 new_patterns = self._normalize_keys(new_patterns)
1323 self.patterns.update(new_patterns)
1324 if not self.subscribed:
1325 # Set the subscribed_event flag to True
1326 self.subscribed_event.set()
1327 # Clear the health check counter
1328 self.health_check_response_counter = 0
1329 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1330 return ret_val
1332 def punsubscribe(self, *args):
1333 """
1334 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1335 all patterns.
1336 """
1337 if args:
1338 args = list_or_args(args[0], args[1:])
1339 patterns = self._normalize_keys(dict.fromkeys(args))
1340 else:
1341 patterns = self.patterns
1342 self.pending_unsubscribe_patterns.update(patterns)
1343 return self.execute_command("PUNSUBSCRIBE", *args)
1345 def subscribe(self, *args, **kwargs):
1346 """
1347 Subscribe to channels. Channels supplied as keyword arguments expect
1348 a channel name as the key and a callable as the value. A channel's
1349 callable will be invoked automatically when a message is received on
1350 that channel rather than producing a message via ``listen()`` or
1351 ``get_message()``.
1352 """
1353 if args:
1354 args = list_or_args(args[0], args[1:])
1355 new_channels = dict.fromkeys(args)
1356 new_channels.update(kwargs)
1357 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1358 # update the channels dict AFTER we send the command. we don't want to
1359 # subscribe twice to these channels, once for the command and again
1360 # for the reconnection.
1361 new_channels = self._normalize_keys(new_channels)
1362 self.channels.update(new_channels)
1363 if not self.subscribed:
1364 # Set the subscribed_event flag to True
1365 self.subscribed_event.set()
1366 # Clear the health check counter
1367 self.health_check_response_counter = 0
1368 self.pending_unsubscribe_channels.difference_update(new_channels)
1369 return ret_val
1371 def unsubscribe(self, *args):
1372 """
1373 Unsubscribe from the supplied channels. If empty, unsubscribe from
1374 all channels
1375 """
1376 if args:
1377 args = list_or_args(args[0], args[1:])
1378 channels = self._normalize_keys(dict.fromkeys(args))
1379 else:
1380 channels = self.channels
1381 self.pending_unsubscribe_channels.update(channels)
1382 return self.execute_command("UNSUBSCRIBE", *args)
1384 def ssubscribe(self, *args, target_node=None, **kwargs):
1385 """
1386 Subscribes the client to the specified shard channels.
1387 Channels supplied as keyword arguments expect a channel name as the key
1388 and a callable as the value. A channel's callable will be invoked automatically
1389 when a message is received on that channel rather than producing a message via
1390 ``listen()`` or ``get_sharded_message()``.
1391 """
1392 if args:
1393 args = list_or_args(args[0], args[1:])
1394 new_s_channels = dict.fromkeys(args)
1395 new_s_channels.update(kwargs)
1396 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys())
1397 # update the s_channels dict AFTER we send the command. we don't want to
1398 # subscribe twice to these channels, once for the command and again
1399 # for the reconnection.
1400 new_s_channels = self._normalize_keys(new_s_channels)
1401 self.shard_channels.update(new_s_channels)
1402 if not self.subscribed:
1403 # Set the subscribed_event flag to True
1404 self.subscribed_event.set()
1405 # Clear the health check counter
1406 self.health_check_response_counter = 0
1407 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels)
1408 return ret_val
1410 def sunsubscribe(self, *args, target_node=None):
1411 """
1412 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
1413 all shard_channels
1414 """
1415 if args:
1416 args = list_or_args(args[0], args[1:])
1417 s_channels = self._normalize_keys(dict.fromkeys(args))
1418 else:
1419 s_channels = self.shard_channels
1420 self.pending_unsubscribe_shard_channels.update(s_channels)
1421 return self.execute_command("SUNSUBSCRIBE", *args)
1423 def listen(self):
1424 "Listen for messages on channels this client has been subscribed to"
1425 while self.subscribed:
1426 response = self.handle_message(self.parse_response(block=True))
1427 if response is not None:
1428 yield response
1430 def get_message(
1431 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
1432 ):
1433 """
1434 Get the next message if one is available, otherwise None.
1436 If timeout is specified, the system will wait for `timeout` seconds
1437 before returning. Timeout should be specified as a floating point
1438 number, or None, to wait indefinitely.
1439 """
1440 if not self.subscribed:
1441 # Wait for subscription
1442 start_time = time.monotonic()
1443 if self.subscribed_event.wait(timeout) is True:
1444 # The connection was subscribed during the timeout time frame.
1445 # The timeout should be adjusted based on the time spent
1446 # waiting for the subscription
1447 time_spent = time.monotonic() - start_time
1448 timeout = max(0.0, timeout - time_spent)
1449 else:
1450 # The connection isn't subscribed to any channels or patterns,
1451 # so no messages are available
1452 return None
1454 response = self.parse_response(block=(timeout is None), timeout=timeout)
1456 if response:
1457 return self.handle_message(response, ignore_subscribe_messages)
1458 return None
1460 get_sharded_message = get_message
1462 def ping(self, message: Union[str, None] = None) -> bool:
1463 """
1464 Ping the Redis server to test connectivity.
1466 Sends a PING command to the Redis server and returns True if the server
1467 responds with "PONG".
1468 """
1469 args = ["PING", message] if message is not None else ["PING"]
1470 return self.execute_command(*args)
1472 def handle_message(self, response, ignore_subscribe_messages=False):
1473 """
1474 Parses a pub/sub message. If the channel or pattern was subscribed to
1475 with a message handler, the handler is invoked instead of a parsed
1476 message being returned.
1477 """
1478 if response is None:
1479 return None
1480 if isinstance(response, bytes):
1481 response = [b"pong", response] if response != b"PONG" else [b"pong", b""]
1483 message_type = str_if_bytes(response[0])
1484 if message_type == "pmessage":
1485 message = {
1486 "type": message_type,
1487 "pattern": response[1],
1488 "channel": response[2],
1489 "data": response[3],
1490 }
1491 elif message_type == "pong":
1492 message = {
1493 "type": message_type,
1494 "pattern": None,
1495 "channel": None,
1496 "data": response[1],
1497 }
1498 else:
1499 message = {
1500 "type": message_type,
1501 "pattern": None,
1502 "channel": response[1],
1503 "data": response[2],
1504 }
1506 if message_type in ["message", "pmessage"]:
1507 channel = str_if_bytes(message["channel"])
1508 record_pubsub_message(
1509 direction=PubSubDirection.RECEIVE,
1510 channel=channel,
1511 )
1512 elif message_type == "smessage":
1513 channel = str_if_bytes(message["channel"])
1514 record_pubsub_message(
1515 direction=PubSubDirection.RECEIVE,
1516 channel=channel,
1517 sharded=True,
1518 )
1520 # if this is an unsubscribe message, remove it from memory
1521 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1522 if message_type == "punsubscribe":
1523 pattern = response[1]
1524 if pattern in self.pending_unsubscribe_patterns:
1525 self.pending_unsubscribe_patterns.remove(pattern)
1526 self.patterns.pop(pattern, None)
1527 elif message_type == "sunsubscribe":
1528 s_channel = response[1]
1529 if s_channel in self.pending_unsubscribe_shard_channels:
1530 self.pending_unsubscribe_shard_channels.remove(s_channel)
1531 self.shard_channels.pop(s_channel, None)
1532 else:
1533 channel = response[1]
1534 if channel in self.pending_unsubscribe_channels:
1535 self.pending_unsubscribe_channels.remove(channel)
1536 self.channels.pop(channel, None)
1537 if not self.channels and not self.patterns and not self.shard_channels:
1538 # There are no subscriptions anymore, set subscribed_event flag
1539 # to false
1540 self.subscribed_event.clear()
1542 if message_type in self.PUBLISH_MESSAGE_TYPES:
1543 # if there's a message handler, invoke it
1544 if message_type == "pmessage":
1545 handler = self.patterns.get(message["pattern"], None)
1546 elif message_type == "smessage":
1547 handler = self.shard_channels.get(message["channel"], None)
1548 else:
1549 handler = self.channels.get(message["channel"], None)
1550 if handler:
1551 handler(message)
1552 return None
1553 elif message_type != "pong":
1554 # this is a subscribe/unsubscribe message. ignore if we don't
1555 # want them
1556 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1557 return None
1559 return message
1561 def run_in_thread(
1562 self,
1563 sleep_time: float = 0.0,
1564 daemon: bool = False,
1565 exception_handler: Optional[Callable] = None,
1566 pubsub=None,
1567 sharded_pubsub: bool = False,
1568 ) -> "PubSubWorkerThread":
1569 for channel, handler in self.channels.items():
1570 if handler is None:
1571 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1572 for pattern, handler in self.patterns.items():
1573 if handler is None:
1574 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1575 for s_channel, handler in self.shard_channels.items():
1576 if handler is None:
1577 raise PubSubError(
1578 f"Shard Channel: '{s_channel}' has no handler registered"
1579 )
1581 pubsub = self if pubsub is None else pubsub
1582 thread = PubSubWorkerThread(
1583 pubsub,
1584 sleep_time,
1585 daemon=daemon,
1586 exception_handler=exception_handler,
1587 sharded_pubsub=sharded_pubsub,
1588 )
1589 thread.start()
1590 return thread
1593class PubSubWorkerThread(threading.Thread):
1594 def __init__(
1595 self,
1596 pubsub,
1597 sleep_time: float,
1598 daemon: bool = False,
1599 exception_handler: Union[
1600 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
1601 ] = None,
1602 sharded_pubsub: bool = False,
1603 ):
1604 super().__init__()
1605 self.daemon = daemon
1606 self.pubsub = pubsub
1607 self.sleep_time = sleep_time
1608 self.exception_handler = exception_handler
1609 self.sharded_pubsub = sharded_pubsub
1610 self._running = threading.Event()
1612 def run(self) -> None:
1613 if self._running.is_set():
1614 return
1615 self._running.set()
1616 pubsub = self.pubsub
1617 sleep_time = self.sleep_time
1618 while self._running.is_set():
1619 try:
1620 if not self.sharded_pubsub:
1621 pubsub.get_message(
1622 ignore_subscribe_messages=True, timeout=sleep_time
1623 )
1624 else:
1625 pubsub.get_sharded_message(
1626 ignore_subscribe_messages=True, timeout=sleep_time
1627 )
1628 except BaseException as e:
1629 if self.exception_handler is None:
1630 raise
1631 self.exception_handler(e, pubsub, self)
1632 pubsub.close()
1634 def stop(self) -> None:
1635 # trip the flag so the run loop exits. the run loop will
1636 # close the pubsub connection, which disconnects the socket
1637 # and returns the connection to the pool.
1638 self._running.clear()
1641class Pipeline(Redis):
1642 """
1643 Pipelines provide a way to transmit multiple commands to the Redis server
1644 in one transmission. This is convenient for batch processing, such as
1645 saving all the values in a list to Redis.
1647 All commands executed within a pipeline(when running in transactional mode,
1648 which is the default behavior) are wrapped with MULTI and EXEC
1649 calls. This guarantees all commands executed in the pipeline will be
1650 executed atomically.
1652 Any command raising an exception does *not* halt the execution of
1653 subsequent commands in the pipeline. Instead, the exception is caught
1654 and its instance is placed into the response list returned by execute().
1655 Code iterating over the response list should be able to deal with an
1656 instance of an exception as a potential value. In general, these will be
1657 ResponseError exceptions, such as those raised when issuing a command
1658 on a key of a different datatype.
1659 """
1661 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1663 def __init__(
1664 self,
1665 connection_pool: ConnectionPool,
1666 response_callbacks,
1667 transaction,
1668 shard_hint,
1669 ):
1670 self.connection_pool = connection_pool
1671 self.connection: Optional[Connection] = None
1672 self.response_callbacks = response_callbacks
1673 self.transaction = transaction
1674 self.shard_hint = shard_hint
1675 self.watching = False
1676 self.command_stack = []
1677 self.scripts: Set[Script] = set()
1678 self.explicit_transaction = False
1680 def __enter__(self) -> "Pipeline":
1681 return self
1683 def __exit__(self, exc_type, exc_value, traceback):
1684 self.reset()
1686 def __del__(self):
1687 try:
1688 self.reset()
1689 except Exception:
1690 pass
1692 def __len__(self) -> int:
1693 return len(self.command_stack)
1695 def __bool__(self) -> bool:
1696 """Pipeline instances should always evaluate to True"""
1697 return True
1699 def reset(self) -> None:
1700 self.command_stack = []
1701 self.scripts = set()
1702 # make sure to reset the connection state in the event that we were
1703 # watching something
1704 if self.watching and self.connection:
1705 try:
1706 # call this manually since our unwatch or
1707 # immediate_execute_command methods can call reset()
1708 self.connection.send_command("UNWATCH")
1709 self.connection.read_response()
1710 except ConnectionError:
1711 # disconnect will also remove any previous WATCHes
1712 self.connection.disconnect()
1713 # clean up the other instance attributes
1714 self.watching = False
1715 self.explicit_transaction = False
1717 # we can safely return the connection to the pool here since we're
1718 # sure we're no longer WATCHing anything
1719 if self.connection:
1720 self.connection_pool.release(self.connection)
1721 self.connection = None
1723 def close(self) -> None:
1724 """Close the pipeline"""
1725 self.reset()
1727 def multi(self) -> None:
1728 """
1729 Start a transactional block of the pipeline after WATCH commands
1730 are issued. End the transactional block with `execute`.
1731 """
1732 if self.explicit_transaction:
1733 raise RedisError("Cannot issue nested calls to MULTI")
1734 if self.command_stack:
1735 raise RedisError(
1736 "Commands without an initial WATCH have already been issued"
1737 )
1738 self.explicit_transaction = True
1740 def execute_command(self, *args, **kwargs):
1741 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1742 return self.immediate_execute_command(*args, **kwargs)
1743 return self.pipeline_execute_command(*args, **kwargs)
1745 def _disconnect_reset_raise_on_watching(
1746 self,
1747 conn: AbstractConnection,
1748 error: Exception,
1749 failure_count: Optional[int] = None,
1750 start_time: Optional[float] = None,
1751 command_name: Optional[str] = None,
1752 ) -> None:
1753 """
1754 Close the connection reset watching state and
1755 raise an exception if we were watching.
1757 The supported exceptions are already checked in the
1758 retry object so we don't need to do it here.
1760 After we disconnect the connection, it will try to reconnect and
1761 do a health check as part of the send_command logic(on connection level).
1762 """
1763 if error and failure_count <= conn.retry.get_retries():
1764 record_operation_duration(
1765 command_name=command_name,
1766 duration_seconds=time.monotonic() - start_time,
1767 server_address=getattr(conn, "host", None),
1768 server_port=getattr(conn, "port", None),
1769 db_namespace=str(conn.db),
1770 error=error,
1771 retry_attempts=failure_count,
1772 )
1773 conn.disconnect()
1775 # if we were already watching a variable, the watch is no longer
1776 # valid since this connection has died. raise a WatchError, which
1777 # indicates the user should retry this transaction.
1778 if self.watching:
1779 self.reset()
1780 raise WatchError(
1781 f"A {type(error).__name__} occurred while watching one or more keys"
1782 )
1784 def immediate_execute_command(self, *args, **options):
1785 """
1786 Execute a command immediately, but don't auto-retry on the supported
1787 errors for retry if we're already WATCHing a variable.
1788 Used when issuing WATCH or subsequent commands retrieving their values but before
1789 MULTI is called.
1790 """
1791 command_name = args[0]
1792 conn = self.connection
1793 # if this is the first call, we need a connection
1794 if not conn:
1795 conn = self.connection_pool.get_connection()
1796 self.connection = conn
1798 # Start timing for observability
1799 start_time = time.monotonic()
1800 # Track actual retry attempts for error reporting
1801 actual_retry_attempts = [0]
1803 def failure_callback(error, failure_count):
1804 if is_debug_log_enabled():
1805 add_debug_log_for_operation_failure(conn)
1806 actual_retry_attempts[0] = failure_count
1807 self._disconnect_reset_raise_on_watching(
1808 conn, error, failure_count, start_time, command_name
1809 )
1811 try:
1812 response = conn.retry.call_with_retry(
1813 lambda: self._send_command_parse_response(
1814 conn, command_name, *args, **options
1815 ),
1816 failure_callback,
1817 with_failure_count=True,
1818 )
1820 record_operation_duration(
1821 command_name=command_name,
1822 duration_seconds=time.monotonic() - start_time,
1823 server_address=getattr(conn, "host", None),
1824 server_port=getattr(conn, "port", None),
1825 db_namespace=str(conn.db),
1826 )
1828 return response
1829 except Exception as e:
1830 record_error_count(
1831 server_address=getattr(conn, "host", None),
1832 server_port=getattr(conn, "port", None),
1833 network_peer_address=getattr(conn, "host", None),
1834 network_peer_port=getattr(conn, "port", None),
1835 error_type=e,
1836 retry_attempts=actual_retry_attempts[0],
1837 is_internal=False,
1838 )
1839 raise
1841 def pipeline_execute_command(self, *args, **options) -> "Pipeline":
1842 """
1843 Stage a command to be executed when execute() is next called
1845 Returns the current Pipeline object back so commands can be
1846 chained together, such as:
1848 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1850 At some other point, you can then run: pipe.execute(),
1851 which will execute all commands queued in the pipe.
1852 """
1853 self.command_stack.append((args, options))
1854 return self
1856 def _execute_transaction(
1857 self, connection: Connection, commands, raise_on_error
1858 ) -> List:
1859 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1860 all_cmds = connection.pack_commands(
1861 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1862 )
1863 connection.send_packed_command(all_cmds)
1864 errors = []
1866 # parse off the response for MULTI
1867 # NOTE: we need to handle ResponseErrors here and continue
1868 # so that we read all the additional command messages from
1869 # the socket
1870 try:
1871 self.parse_response(connection, "_")
1872 except ResponseError as e:
1873 errors.append((0, e))
1875 # and all the other commands
1876 for i, command in enumerate(commands):
1877 if EMPTY_RESPONSE in command[1]:
1878 errors.append((i, command[1][EMPTY_RESPONSE]))
1879 else:
1880 try:
1881 self.parse_response(connection, "_")
1882 except ResponseError as e:
1883 self.annotate_exception(e, i + 1, command[0])
1884 errors.append((i, e))
1886 # parse the EXEC.
1887 try:
1888 response = self.parse_response(connection, "_")
1889 except ExecAbortError:
1890 if errors:
1891 raise errors[0][1]
1892 raise
1894 # EXEC clears any watched keys
1895 self.watching = False
1897 if response is None:
1898 raise WatchError("Watched variable changed.")
1900 # put any parse errors into the response
1901 for i, e in errors:
1902 response.insert(i, e)
1904 if len(response) != len(commands):
1905 self.connection.disconnect()
1906 raise ResponseError(
1907 "Wrong number of response items from pipeline execution"
1908 )
1910 # find any errors in the response and raise if necessary
1911 if raise_on_error:
1912 self.raise_first_error(commands, response)
1914 # We have to run response callbacks manually
1915 data = []
1916 for r, cmd in zip(response, commands):
1917 if not isinstance(r, Exception):
1918 args, options = cmd
1919 # Remove keys entry, it needs only for cache.
1920 options.pop("keys", None)
1921 command_name = args[0]
1922 if command_name in self.response_callbacks:
1923 r = self.response_callbacks[command_name](r, **options)
1924 data.append(r)
1926 return data
1928 def _execute_pipeline(self, connection, commands, raise_on_error):
1929 # build up all commands into a single request to increase network perf
1930 all_cmds = connection.pack_commands([args for args, _ in commands])
1931 connection.send_packed_command(all_cmds)
1933 responses = []
1934 for args, options in commands:
1935 try:
1936 responses.append(self.parse_response(connection, args[0], **options))
1937 except ResponseError as e:
1938 responses.append(e)
1940 if raise_on_error:
1941 self.raise_first_error(commands, responses)
1943 return responses
1945 def raise_first_error(self, commands, response):
1946 for i, r in enumerate(response):
1947 if isinstance(r, ResponseError):
1948 self.annotate_exception(r, i + 1, commands[i][0])
1949 raise r
1951 def annotate_exception(self, exception, number, command):
1952 cmd = " ".join(map(safe_str, command))
1953 msg = (
1954 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
1955 f"caused error: {exception.args[0]}"
1956 )
1957 exception.args = (msg,) + exception.args[1:]
1959 def parse_response(self, connection, command_name, **options):
1960 result = Redis.parse_response(self, connection, command_name, **options)
1961 if command_name in self.UNWATCH_COMMANDS:
1962 self.watching = False
1963 elif command_name == "WATCH":
1964 self.watching = True
1965 return result
1967 def load_scripts(self):
1968 # make sure all scripts that are about to be run on this pipeline exist
1969 scripts = list(self.scripts)
1970 immediate = self.immediate_execute_command
1971 shas = [s.sha for s in scripts]
1972 # we can't use the normal script_* methods because they would just
1973 # get buffered in the pipeline.
1974 exists = immediate("SCRIPT EXISTS", *shas)
1975 if not all(exists):
1976 for s, exist in zip(scripts, exists):
1977 if not exist:
1978 s.sha = immediate("SCRIPT LOAD", s.script)
1980 def _disconnect_raise_on_watching(
1981 self,
1982 conn: AbstractConnection,
1983 error: Exception,
1984 failure_count: Optional[int] = None,
1985 start_time: Optional[float] = None,
1986 command_name: Optional[str] = None,
1987 ) -> None:
1988 """
1989 Close the connection, raise an exception if we were watching.
1991 The supported exceptions are already checked in the
1992 retry object so we don't need to do it here.
1994 After we disconnect the connection, it will try to reconnect and
1995 do a health check as part of the send_command logic(on connection level).
1996 """
1997 if error and failure_count <= conn.retry.get_retries():
1998 record_operation_duration(
1999 command_name=command_name,
2000 duration_seconds=time.monotonic() - start_time,
2001 server_address=getattr(conn, "host", None),
2002 server_port=getattr(conn, "port", None),
2003 db_namespace=str(conn.db),
2004 error=error,
2005 retry_attempts=failure_count,
2006 )
2007 conn.disconnect()
2008 # if we were watching a variable, the watch is no longer valid
2009 # since this connection has died. raise a WatchError, which
2010 # indicates the user should retry this transaction.
2011 if self.watching:
2012 raise WatchError(
2013 f"A {type(error).__name__} occurred while watching one or more keys"
2014 )
2016 def execute(self, raise_on_error: bool = True) -> List[Any]:
2017 """Execute all the commands in the current pipeline"""
2018 stack = self.command_stack
2019 if not stack and not self.watching:
2020 return []
2021 if self.scripts:
2022 self.load_scripts()
2023 if self.transaction or self.explicit_transaction:
2024 execute = self._execute_transaction
2025 operation_name = "MULTI"
2026 else:
2027 execute = self._execute_pipeline
2028 operation_name = "PIPELINE"
2030 conn = self.connection
2031 if not conn:
2032 conn = self.connection_pool.get_connection()
2033 # assign to self.connection so reset() releases the connection
2034 # back to the pool after we're done
2035 self.connection = conn
2037 # Start timing for observability
2038 start_time = time.monotonic()
2039 # Track actual retry attempts for error reporting
2040 actual_retry_attempts = [0]
2042 def failure_callback(error, failure_count):
2043 if is_debug_log_enabled():
2044 add_debug_log_for_operation_failure(conn)
2045 actual_retry_attempts[0] = failure_count
2046 self._disconnect_raise_on_watching(
2047 conn, error, failure_count, start_time, operation_name
2048 )
2050 try:
2051 response = conn.retry.call_with_retry(
2052 lambda: execute(conn, stack, raise_on_error),
2053 failure_callback,
2054 with_failure_count=True,
2055 )
2057 record_operation_duration(
2058 command_name=operation_name,
2059 duration_seconds=time.monotonic() - start_time,
2060 server_address=getattr(conn, "host", None),
2061 server_port=getattr(conn, "port", None),
2062 db_namespace=str(conn.db),
2063 )
2064 return response
2065 except Exception as e:
2066 record_error_count(
2067 server_address=getattr(conn, "host", None),
2068 server_port=getattr(conn, "port", None),
2069 network_peer_address=getattr(conn, "host", None),
2070 network_peer_port=getattr(conn, "port", None),
2071 error_type=e,
2072 retry_attempts=actual_retry_attempts[0],
2073 is_internal=False,
2074 )
2075 raise
2077 finally:
2078 # in reset() the connection is disconnected before returned to the pool if
2079 # it is marked for reconnect.
2080 self.reset()
2082 def discard(self):
2083 """
2084 Flushes all previously queued commands
2085 See: https://redis.io/commands/DISCARD
2086 """
2087 self.execute_command("DISCARD")
2089 def watch(self, *names):
2090 """Watches the values at keys ``names``"""
2091 if self.explicit_transaction:
2092 raise RedisError("Cannot issue a WATCH after a MULTI")
2093 return self.execute_command("WATCH", *names)
2095 def unwatch(self) -> bool:
2096 """Unwatches all previously specified keys"""
2097 return self.watching and self.execute_command("UNWATCH") or True