Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import logging
3import re
4import threading
5import time
6from itertools import chain
7from typing import (
8 TYPE_CHECKING,
9 Any,
10 Callable,
11 Dict,
12 List,
13 Literal,
14 Mapping,
15 Optional,
16 Set,
17 Type,
18 Union,
19)
21from redis._defaults import (
22 DEFAULT_RETRY_BASE,
23 DEFAULT_RETRY_CAP,
24 DEFAULT_RETRY_COUNT,
25 DEFAULT_SOCKET_CONNECT_TIMEOUT,
26 DEFAULT_SOCKET_READ_SIZE,
27 DEFAULT_SOCKET_TIMEOUT,
28)
29from redis._parsers.encoders import Encoder
30from redis._parsers.helpers import bool_ok, get_response_callbacks
31from redis.backoff import ExponentialWithJitterBackoff
32from redis.cache import CacheConfig, CacheInterface
33from redis.commands import (
34 CoreCommands,
35 RedisModuleCommands,
36 SentinelCommands,
37 list_or_args,
38)
39from redis.commands.core import Script
40from redis.commands.helpers import parse_pubsub_subscriptions, pubsub_subscription_args
41from redis.connection import (
42 AbstractConnection,
43 Connection,
44 ConnectionPool,
45 SSLConnection,
46 UnixDomainSocketConnection,
47)
48from redis.credentials import CredentialProvider
49from redis.driver_info import DriverInfo, resolve_driver_info
50from redis.event import (
51 AfterPooledConnectionsInstantiationEvent,
52 AfterPubSubConnectionInstantiationEvent,
53 AfterSingleConnectionInstantiationEvent,
54 ClientType,
55 EventDispatcher,
56)
57from redis.exceptions import (
58 ConnectionError,
59 ExecAbortError,
60 PubSubError,
61 RedisError,
62 ResponseError,
63 WatchError,
64)
65from redis.lock import Lock
66from redis.maint_notifications import (
67 MaintNotificationsConfig,
68 OSSMaintNotificationsHandler,
69)
70from redis.observability.attributes import PubSubDirection
71from redis.observability.recorder import (
72 record_error_count,
73 record_operation_duration,
74 record_pubsub_message,
75)
76from redis.retry import Retry
77from redis.typing import ChannelT, PubSubHandler, Subscription
78from redis.utils import (
79 SENTINEL,
80 _set_info_logger,
81 check_protocol_version,
82 deprecated_args,
83 safe_str,
84 str_if_bytes,
85 truncate_text,
86)
88if TYPE_CHECKING:
89 import ssl
91 import OpenSSL
93 from redis.keyspace_notifications import KeyspaceNotifications
95SYM_EMPTY = b""
96EMPTY_RESPONSE = "EMPTY_RESPONSE"
98# some responses (ie. dump) are binary, and just meant to never be decoded
99NEVER_DECODE = "NEVER_DECODE"
102logger = logging.getLogger(__name__)
105def is_debug_log_enabled():
106 return logger.isEnabledFor(logging.DEBUG)
109def add_debug_log_for_operation_failure(connection: "AbstractConnection"):
110 logger.debug(
111 f"Operation failed, "
112 f"with connection: {connection}, details: {connection.extract_connection_details() if connection else 'no connection'}",
113 )
116class CaseInsensitiveDict(dict):
117 "Case insensitive dict implementation. Assumes string keys only."
119 def __init__(self, data: Dict[str, str]) -> None:
120 for k, v in data.items():
121 self[k.upper()] = v
123 def __contains__(self, k):
124 return super().__contains__(k.upper())
126 def __delitem__(self, k):
127 super().__delitem__(k.upper())
129 def __getitem__(self, k):
130 return super().__getitem__(k.upper())
132 def get(self, k, default=None):
133 return super().get(k.upper(), default)
135 def __setitem__(self, k, v):
136 super().__setitem__(k.upper(), v)
138 def update(self, data):
139 data = CaseInsensitiveDict(data)
140 super().update(data)
143class AbstractRedis:
144 pass
147class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
148 """
149 Implementation of the Redis protocol.
151 This abstract class provides a Python interface to all Redis commands
152 and an implementation of the Redis protocol.
154 Pipelines derive from this, implementing how
155 the commands are sent and received to the Redis server. Based on
156 configuration, an instance will either use a ConnectionPool, or
157 Connection object to talk to redis.
159 It is not safe to pass PubSub or Pipeline objects between threads.
160 """
162 # Type discrimination marker for @overload self-type pattern
163 _is_async_client: Literal[False] = False
165 @classmethod
166 def from_url(cls, url: str, **kwargs) -> "Redis":
167 """
168 Return a Redis client object configured from the given URL
170 For example::
172 redis://[[username]:[password]]@localhost:6379/0
173 rediss://[[username]:[password]]@localhost:6379/0
174 unix://[username@]/path/to/socket.sock?db=0[&password=password]
176 Three URL schemes are supported:
178 - `redis://` creates a TCP socket connection. See more at:
179 <https://www.iana.org/assignments/uri-schemes/prov/redis>
180 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
181 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
182 - ``unix://``: creates a Unix Domain Socket connection.
184 The username, password, hostname, path and all querystring values
185 are passed through urllib.parse.unquote in order to replace any
186 percent-encoded values with their corresponding characters.
188 There are several ways to specify a database number. The first value
189 found will be used:
191 1. A ``db`` querystring option, e.g. redis://localhost?db=0
192 2. If using the redis:// or rediss:// schemes, the path argument
193 of the url, e.g. redis://localhost/0
194 3. A ``db`` keyword argument to this function.
196 If none of these options are specified, the default db=0 is used.
198 All querystring options are cast to their appropriate Python types.
199 Boolean arguments can be specified with string values "True"/"False"
200 or "Yes"/"No". Values that cannot be properly cast cause a
201 ``ValueError`` to be raised. Once parsed, the querystring arguments
202 and keyword arguments are passed to the ``ConnectionPool``'s
203 class initializer. In the case of conflicting arguments, querystring
204 arguments always win.
206 """
207 single_connection_client = kwargs.pop("single_connection_client", False)
208 connection_pool = ConnectionPool.from_url(url, **kwargs)
209 client = cls(
210 connection_pool=connection_pool,
211 single_connection_client=single_connection_client,
212 )
213 client.auto_close_connection_pool = True
214 return client
216 @classmethod
217 def from_pool(
218 cls: Type["Redis"],
219 connection_pool: ConnectionPool,
220 ) -> "Redis":
221 """
222 Return a Redis client from the given connection pool.
223 The Redis client will take ownership of the connection pool and
224 close it when the Redis client is closed.
225 """
226 client = cls(
227 connection_pool=connection_pool,
228 )
229 client.auto_close_connection_pool = True
230 return client
232 @deprecated_args(
233 args_to_warn=["retry_on_timeout"],
234 reason="TimeoutError is included by default.",
235 version="6.0.0",
236 )
237 @deprecated_args(
238 args_to_warn=["lib_name", "lib_version"],
239 reason="Use 'driver_info' parameter instead. "
240 "lib_name and lib_version will be removed in a future version.",
241 )
242 def __init__(
243 self,
244 host: str = "localhost",
245 port: int = 6379,
246 db: int = 0,
247 password: str | None = None,
248 socket_timeout: float | None = DEFAULT_SOCKET_TIMEOUT,
249 socket_connect_timeout: float | None = DEFAULT_SOCKET_CONNECT_TIMEOUT,
250 socket_read_size: int = DEFAULT_SOCKET_READ_SIZE,
251 socket_keepalive: bool | None = True,
252 socket_keepalive_options: Mapping[int, int | bytes] | object | None = SENTINEL,
253 connection_pool: ConnectionPool | None = None,
254 unix_socket_path: str | None = None,
255 encoding: str = "utf-8",
256 encoding_errors: str = "strict",
257 decode_responses: bool = False,
258 retry_on_timeout: bool = False,
259 retry: Retry = Retry(
260 backoff=ExponentialWithJitterBackoff(
261 base=DEFAULT_RETRY_BASE, cap=DEFAULT_RETRY_CAP
262 ),
263 retries=DEFAULT_RETRY_COUNT,
264 ),
265 retry_on_error: List[Type[Exception]] | None = None,
266 ssl: bool = False,
267 ssl_keyfile: str | None = None,
268 ssl_certfile: str | None = None,
269 ssl_cert_reqs: "str | ssl.VerifyMode" = "required",
270 ssl_include_verify_flags: List["ssl.VerifyFlags"] | None = None,
271 ssl_exclude_verify_flags: List["ssl.VerifyFlags"] | None = None,
272 ssl_ca_certs: str | None = None,
273 ssl_ca_path: str | None = None,
274 ssl_ca_data: str | None = None,
275 ssl_check_hostname: bool = True,
276 ssl_password: str | None = None,
277 ssl_validate_ocsp: bool = False,
278 ssl_validate_ocsp_stapled: bool = False,
279 ssl_ocsp_context: "OpenSSL.SSL.Context | None" = None,
280 ssl_ocsp_expected_cert: str | None = None,
281 ssl_min_version: "ssl.TLSVersion | None" = None,
282 ssl_ciphers: str | None = None,
283 max_connections: int | None = None,
284 single_connection_client: bool = False,
285 health_check_interval: int = 0,
286 client_name: str | None = None,
287 lib_name: str | object | None = SENTINEL,
288 lib_version: str | object | None = SENTINEL,
289 driver_info: DriverInfo | object | None = SENTINEL,
290 username: str | None = None,
291 redis_connect_func: Callable[[], None] | None = None,
292 credential_provider: CredentialProvider | None = None,
293 protocol: int | None = None,
294 legacy_responses: bool = True,
295 cache: CacheInterface | None = None,
296 cache_config: CacheConfig | None = None,
297 event_dispatcher: EventDispatcher | None = None,
298 maint_notifications_config: MaintNotificationsConfig | None = None,
299 oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler
300 | None = None,
301 ) -> None:
302 """
303 Initialize a new Redis client.
305 To specify a retry policy for specific errors, you have two options:
307 1. Set the `retry_on_error` to a list of the error/s to retry on, and
308 you can also set `retry` to a valid `Retry` object(in case the default
309 one is not appropriate) - with this approach the retries will be triggered
310 on the default errors specified in the Retry object enriched with the
311 errors specified in `retry_on_error`.
313 2. Define a `Retry` object with configured 'supported_errors' and set
314 it to the `retry` parameter - with this approach you completely redefine
315 the errors on which retries will happen.
317 `retry_on_timeout` is deprecated - please include the TimeoutError
318 either in the Retry object or in the `retry_on_error` list.
320 When 'connection_pool' is provided - the retry configuration of the
321 provided pool will be used.
323 Args:
325 socket_keepalive:
326 if `True`, TCP keepalive is enabled for TCP socket connections.
327 Argument is ignored when connection_pool is provided.
328 socket_keepalive_options:
329 mapping of TCP keepalive socket option constants to values, for
330 example `{socket.TCP_KEEPIDLE: 30}`. If left unspecified, redis-py
331 uses TCP keepalive defaults when `socket_keepalive` is enabled:
332 idle 30 seconds, interval 5 seconds, and 3 probes. Platform-specific
333 options that are not available are skipped. Pass `None` or `{}` to
334 avoid setting additional TCP keepalive options. Argument is ignored
335 when connection_pool is provided.
336 single_connection_client:
337 if `True`, connection pool is not used. In that case `Redis`
338 instance use is not thread safe.
339 decode_responses:
340 if `True`, the response will be decoded to utf-8.
341 Argument is ignored when connection_pool is provided.
342 driver_info:
343 Optional DriverInfo object to identify upstream libraries.
344 If provided, lib_name and lib_version are ignored.
345 If not provided, a DriverInfo will be created from lib_name and lib_version.
346 Explicit None disables CLIENT SETINFO.
347 Argument is ignored when connection_pool is provided.
348 lib_name:
349 **Deprecated.** Use driver_info instead. Library name for CLIENT SETINFO.
350 lib_version:
351 **Deprecated.** Use driver_info instead. Library version for CLIENT SETINFO.
352 maint_notifications_config:
353 configures the pool to support maintenance notifications - see
354 `redis.maint_notifications.MaintNotificationsConfig` for details.
355 Only supported with RESP3
356 If not provided and protocol is RESP3, the maintenance notifications
357 will be enabled by default (logic is included in the connection pool
358 initialization).
359 Argument is ignored when connection_pool is provided.
360 oss_cluster_maint_notifications_handler:
361 handler for OSS cluster notifications - see
362 `redis.maint_notifications.OSSMaintNotificationsHandler` for details.
363 Only supported with RESP3
364 Argument is ignored when connection_pool is provided.
365 """
366 if event_dispatcher is None:
367 self._event_dispatcher = EventDispatcher()
368 else:
369 self._event_dispatcher = event_dispatcher
370 if not connection_pool:
371 if not retry_on_error:
372 retry_on_error = []
374 # Handle driver_info: if provided, use it; otherwise create from lib_name/lib_version.
375 computed_driver_info = resolve_driver_info(
376 driver_info, lib_name, lib_version
377 )
379 kwargs = {
380 "db": db,
381 "username": username,
382 "password": password,
383 "socket_timeout": socket_timeout,
384 "socket_read_size": socket_read_size,
385 "encoding": encoding,
386 "encoding_errors": encoding_errors,
387 "decode_responses": decode_responses,
388 "retry_on_error": retry_on_error,
389 "retry": copy.deepcopy(retry),
390 "max_connections": max_connections,
391 "health_check_interval": health_check_interval,
392 "client_name": client_name,
393 "driver_info": computed_driver_info,
394 "redis_connect_func": redis_connect_func,
395 "credential_provider": credential_provider,
396 "protocol": protocol,
397 "legacy_responses": legacy_responses,
398 }
399 # based on input, setup appropriate connection args
400 if unix_socket_path is not None:
401 if (
402 maint_notifications_config
403 and maint_notifications_config.enabled is True
404 ):
405 raise RedisError(
406 "Maintenance notifications are not supported with Unix "
407 "domain socket connections"
408 )
409 kwargs.update(
410 {
411 "path": unix_socket_path,
412 "connection_class": UnixDomainSocketConnection,
413 "maint_notifications_config": MaintNotificationsConfig(
414 enabled=False
415 ),
416 }
417 )
418 else:
419 # TCP specific options
420 kwargs.update(
421 {
422 "host": host,
423 "port": port,
424 "socket_connect_timeout": socket_connect_timeout,
425 "socket_keepalive": socket_keepalive,
426 "socket_keepalive_options": socket_keepalive_options,
427 }
428 )
430 if ssl:
431 kwargs.update(
432 {
433 "connection_class": SSLConnection,
434 "ssl_keyfile": ssl_keyfile,
435 "ssl_certfile": ssl_certfile,
436 "ssl_cert_reqs": ssl_cert_reqs,
437 "ssl_include_verify_flags": ssl_include_verify_flags,
438 "ssl_exclude_verify_flags": ssl_exclude_verify_flags,
439 "ssl_ca_certs": ssl_ca_certs,
440 "ssl_ca_data": ssl_ca_data,
441 "ssl_check_hostname": ssl_check_hostname,
442 "ssl_password": ssl_password,
443 "ssl_ca_path": ssl_ca_path,
444 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
445 "ssl_validate_ocsp": ssl_validate_ocsp,
446 "ssl_ocsp_context": ssl_ocsp_context,
447 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
448 "ssl_min_version": ssl_min_version,
449 "ssl_ciphers": ssl_ciphers,
450 }
451 )
452 if (cache_config or cache) and check_protocol_version(protocol, 3):
453 kwargs.update(
454 {
455 "cache": cache,
456 "cache_config": cache_config,
457 }
458 )
459 maint_notifications_enabled = (
460 maint_notifications_config and maint_notifications_config.enabled
461 )
462 if maint_notifications_enabled and not check_protocol_version(
463 protocol, 3
464 ):
465 raise RedisError(
466 "Maintenance notifications handlers on connection are only supported with RESP version 3"
467 )
468 if maint_notifications_config:
469 kwargs.update(
470 {
471 "maint_notifications_config": maint_notifications_config,
472 }
473 )
474 if oss_cluster_maint_notifications_handler:
475 kwargs.update(
476 {
477 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
478 }
479 )
480 connection_pool = ConnectionPool(**kwargs)
481 self._event_dispatcher.dispatch(
482 AfterPooledConnectionsInstantiationEvent(
483 [connection_pool], ClientType.SYNC, credential_provider
484 )
485 )
486 self.auto_close_connection_pool = True
487 else:
488 self.auto_close_connection_pool = False
489 self._event_dispatcher.dispatch(
490 AfterPooledConnectionsInstantiationEvent(
491 [connection_pool], ClientType.SYNC, credential_provider
492 )
493 )
495 self.connection_pool = connection_pool
497 if (cache_config or cache) and not check_protocol_version(
498 self.connection_pool.get_protocol(), 3
499 ):
500 raise RedisError("Client caching is only supported with RESP version 3")
502 self.single_connection_lock = threading.RLock()
503 self.connection = None
504 self._single_connection_client = single_connection_client
505 if self._single_connection_client:
506 self.connection = self.connection_pool.get_connection()
507 self._event_dispatcher.dispatch(
508 AfterSingleConnectionInstantiationEvent(
509 self.connection, ClientType.SYNC, self.single_connection_lock
510 )
511 )
513 connection_kwargs = self.connection_pool.connection_kwargs
514 self.response_callbacks = CaseInsensitiveDict(
515 get_response_callbacks(
516 user_protocol=connection_kwargs.get("protocol"),
517 legacy_responses=connection_kwargs.get("legacy_responses", True),
518 )
519 )
521 def __repr__(self) -> str:
522 return (
523 f"<{type(self).__module__}.{type(self).__name__}"
524 f"({repr(self.connection_pool)})>"
525 )
527 def get_encoder(self) -> "Encoder":
528 """Get the connection pool's encoder"""
529 return self.connection_pool.get_encoder()
531 def get_connection_kwargs(self) -> Dict:
532 """Get the connection's key-word arguments"""
533 return self.connection_pool.connection_kwargs
535 def get_retry(self) -> Optional[Retry]:
536 return self.get_connection_kwargs().get("retry")
538 def set_retry(self, retry: Retry) -> None:
539 self.get_connection_kwargs().update({"retry": retry})
540 self.connection_pool.set_retry(retry)
542 def set_response_callback(self, command: str, callback: Callable) -> None:
543 """Set a custom Response Callback"""
544 self.response_callbacks[command] = callback
546 def load_external_module(self, funcname, func) -> None:
547 """
548 This function can be used to add externally defined redis modules,
549 and their namespaces to the redis client.
551 funcname - A string containing the name of the function to create
552 func - The function, being added to this class.
554 ex: Assume that one has a custom redis module named foomod that
555 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
556 To load function functions into this namespace:
558 from redis import Redis
559 from foomodule import F
560 r = Redis()
561 r.load_external_module("foo", F)
562 r.foo().dothing('your', 'arguments')
564 For a concrete example see the reimport of the redisjson module in
565 tests/test_connection.py::test_loading_external_modules
566 """
567 setattr(self, funcname, func)
569 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
570 """
571 Return a new pipeline object that can queue multiple commands for
572 later execution. ``transaction`` indicates whether all commands
573 should be executed atomically. Apart from making a group of operations
574 atomic, pipelines are useful for reducing the back-and-forth overhead
575 between the client and server.
576 """
577 return Pipeline(
578 self.connection_pool, self.response_callbacks, transaction, shard_hint
579 )
581 def transaction(
582 self, func: Callable[["Pipeline"], None], *watches, **kwargs
583 ) -> Union[List[Any], Any, None]:
584 """
585 Convenience method for executing the callable `func` as a transaction
586 while watching all keys specified in `watches`. The 'func' callable
587 should expect a single argument which is a Pipeline object.
588 """
589 shard_hint = kwargs.pop("shard_hint", None)
590 value_from_callable = kwargs.pop("value_from_callable", False)
591 watch_delay = kwargs.pop("watch_delay", None)
592 with self.pipeline(True, shard_hint) as pipe:
593 while True:
594 try:
595 if watches:
596 pipe.watch(*watches)
597 func_value = func(pipe)
598 exec_value = pipe.execute()
599 return func_value if value_from_callable else exec_value
600 except WatchError:
601 if watch_delay is not None and watch_delay > 0:
602 time.sleep(watch_delay)
603 continue
605 def lock(
606 self,
607 name: str,
608 timeout: Optional[float] = None,
609 sleep: float = 0.1,
610 blocking: bool = True,
611 blocking_timeout: Optional[float] = None,
612 lock_class: Union[None, Any] = None,
613 thread_local: bool = True,
614 raise_on_release_error: bool = True,
615 ):
616 """
617 Return a new Lock object using key ``name`` that mimics
618 the behavior of threading.Lock.
620 If specified, ``timeout`` indicates a maximum life for the lock.
621 By default, it will remain locked until release() is called.
623 ``sleep`` indicates the amount of time to sleep per loop iteration
624 when the lock is in blocking mode and another client is currently
625 holding the lock.
627 ``blocking`` indicates whether calling ``acquire`` should block until
628 the lock has been acquired or to fail immediately, causing ``acquire``
629 to return False and the lock not being acquired. Defaults to True.
630 Note this value can be overridden by passing a ``blocking``
631 argument to ``acquire``.
633 ``blocking_timeout`` indicates the maximum amount of time in seconds to
634 spend trying to acquire the lock. A value of ``None`` indicates
635 continue trying forever. ``blocking_timeout`` can be specified as a
636 float or integer, both representing the number of seconds to wait.
638 ``lock_class`` forces the specified lock implementation. Note that as
639 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
640 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
641 you have created your own custom lock class.
643 ``thread_local`` indicates whether the lock token is placed in
644 thread-local storage. By default, the token is placed in thread local
645 storage so that a thread only sees its token, not a token set by
646 another thread. Consider the following timeline:
648 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
649 thread-1 sets the token to "abc"
650 time: 1, thread-2 blocks trying to acquire `my-lock` using the
651 Lock instance.
652 time: 5, thread-1 has not yet completed. redis expires the lock
653 key.
654 time: 5, thread-2 acquired `my-lock` now that it's available.
655 thread-2 sets the token to "xyz"
656 time: 6, thread-1 finishes its work and calls release(). if the
657 token is *not* stored in thread local storage, then
658 thread-1 would see the token value as "xyz" and would be
659 able to successfully release the thread-2's lock.
661 ``raise_on_release_error`` indicates whether to raise an exception when
662 the lock is no longer owned when exiting the context manager. By default,
663 this is True, meaning an exception will be raised. If False, the warning
664 will be logged and the exception will be suppressed.
666 In some use cases it's necessary to disable thread local storage. For
667 example, if you have code where one thread acquires a lock and passes
668 that lock instance to a worker thread to release later. If thread
669 local storage isn't disabled in this case, the worker thread won't see
670 the token set by the thread that acquired the lock. Our assumption
671 is that these cases aren't common and as such default to using
672 thread local storage."""
673 if lock_class is None:
674 lock_class = Lock
675 return lock_class(
676 self,
677 name,
678 timeout=timeout,
679 sleep=sleep,
680 blocking=blocking,
681 blocking_timeout=blocking_timeout,
682 thread_local=thread_local,
683 raise_on_release_error=raise_on_release_error,
684 )
686 def pubsub(self, **kwargs):
687 """
688 Return a Publish/Subscribe object. With this object, you can
689 subscribe to channels and listen for messages that get published to
690 them.
691 """
692 return PubSub(
693 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs
694 )
696 def keyspace_notifications(
697 self,
698 key_prefix: Union[str, bytes, None] = None,
699 ignore_subscribe_messages: bool = True,
700 ) -> "KeyspaceNotifications":
701 """
702 Return a :class:`~redis.keyspace_notifications.KeyspaceNotifications`
703 object for subscribing to keyspace and keyevent notifications.
705 Note: Keyspace notifications must be enabled on the Redis server via
706 the ``notify-keyspace-events`` configuration option.
708 Args:
709 key_prefix: Optional prefix to filter and strip from keys in
710 notifications.
711 ignore_subscribe_messages: If True, subscribe/unsubscribe
712 confirmations are not returned by
713 get_message/listen.
714 """
715 from redis.keyspace_notifications import KeyspaceNotifications
717 return KeyspaceNotifications(
718 self,
719 key_prefix=key_prefix,
720 ignore_subscribe_messages=ignore_subscribe_messages,
721 )
723 def monitor(self):
724 return Monitor(self.connection_pool)
726 def client(self):
727 return self.__class__(
728 connection_pool=self.connection_pool,
729 single_connection_client=True,
730 )
732 def __enter__(self):
733 return self
735 def __exit__(self, exc_type, exc_value, traceback):
736 self.close()
738 def __del__(self):
739 try:
740 self.close()
741 except Exception:
742 pass
744 def close(self) -> None:
745 # In case a connection property does not yet exist
746 # (due to a crash earlier in the Redis() constructor), return
747 # immediately as there is nothing to clean-up.
748 if not hasattr(self, "connection"):
749 return
751 conn = self.connection
752 if conn:
753 self.connection = None
754 self.connection_pool.release(conn)
756 if self.auto_close_connection_pool:
757 self.connection_pool.disconnect()
759 def _send_command_parse_response(self, conn, command_name, *args, **options):
760 """
761 Send a command and parse the response
762 """
763 conn.send_command(*args, **options)
764 return self.parse_response(conn, command_name, **options)
766 def _close_connection(
767 self,
768 conn,
769 error: Optional[Exception] = None,
770 failure_count: Optional[int] = None,
771 start_time: Optional[float] = None,
772 command_name: Optional[str] = None,
773 ) -> None:
774 """
775 Close the connection before retrying.
777 The supported exceptions are already checked in the
778 retry object so we don't need to do it here.
780 After we disconnect the connection, it will try to reconnect and
781 do a health check as part of the send_command logic(on connection level).
782 """
783 if error and failure_count <= conn.retry.get_retries():
784 record_operation_duration(
785 command_name=command_name,
786 duration_seconds=time.monotonic() - start_time,
787 server_address=getattr(conn, "host", None),
788 server_port=getattr(conn, "port", None),
789 db_namespace=str(conn.db),
790 error=error,
791 retry_attempts=failure_count,
792 )
794 conn.disconnect()
796 # COMMAND EXECUTION AND PROTOCOL PARSING
797 def execute_command(self, *args, **options):
798 return self._execute_command(*args, **options)
800 def _execute_command(self, *args, **options):
801 """Execute a command and return a parsed response"""
802 pool = self.connection_pool
803 command_name = args[0]
804 conn = self.connection or pool.get_connection()
806 # Start timing for observability
807 start_time = time.monotonic()
808 # Track actual retry attempts for error reporting
809 actual_retry_attempts = [0]
811 def failure_callback(error, failure_count):
812 if is_debug_log_enabled():
813 add_debug_log_for_operation_failure(conn)
814 actual_retry_attempts[0] = failure_count
815 self._close_connection(conn, error, failure_count, start_time, command_name)
817 if self._single_connection_client:
818 self.single_connection_lock.acquire()
819 try:
820 result = conn.retry.call_with_retry(
821 lambda: self._send_command_parse_response(
822 conn, command_name, *args, **options
823 ),
824 failure_callback,
825 with_failure_count=True,
826 )
828 record_operation_duration(
829 command_name=command_name,
830 duration_seconds=time.monotonic() - start_time,
831 server_address=getattr(conn, "host", None),
832 server_port=getattr(conn, "port", None),
833 db_namespace=str(conn.db),
834 )
835 return result
836 except Exception as e:
837 record_error_count(
838 server_address=getattr(conn, "host", None),
839 server_port=getattr(conn, "port", None),
840 network_peer_address=getattr(conn, "host", None),
841 network_peer_port=getattr(conn, "port", None),
842 error_type=e,
843 retry_attempts=actual_retry_attempts[0],
844 is_internal=False,
845 )
846 raise
848 finally:
849 if conn and conn.should_reconnect():
850 self._close_connection(conn)
851 conn.connect()
852 if self._single_connection_client:
853 self.single_connection_lock.release()
854 if not self.connection:
855 pool.release(conn)
857 def parse_response(self, connection, command_name, **options):
858 """Parses a response from the Redis server"""
859 try:
860 if NEVER_DECODE in options:
861 response = connection.read_response(disable_decoding=True)
862 options.pop(NEVER_DECODE)
863 else:
864 response = connection.read_response()
865 except ResponseError:
866 if EMPTY_RESPONSE in options:
867 return options[EMPTY_RESPONSE]
868 raise
870 if EMPTY_RESPONSE in options:
871 options.pop(EMPTY_RESPONSE)
873 # Remove keys entry, it needs only for cache.
874 options.pop("keys", None)
876 if command_name in self.response_callbacks:
877 return self.response_callbacks[command_name](response, **options)
878 return response
880 def get_cache(self) -> Optional[CacheInterface]:
881 return self.connection_pool.cache
884StrictRedis = Redis
887class Monitor:
888 """
889 Monitor is useful for handling the MONITOR command to the redis server.
890 next_command() method returns one command from monitor
891 listen() method yields commands from monitor.
892 """
894 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)")
895 command_re = re.compile(r'"(.*?)(?<!\\)"')
897 def __init__(self, connection_pool):
898 self.connection_pool = connection_pool
899 self.connection = self.connection_pool.get_connection()
901 def __enter__(self):
902 self._start_monitor()
903 return self
905 def __exit__(self, *args):
906 self.connection.disconnect()
907 self.connection_pool.release(self.connection)
909 def next_command(self):
910 """Parse the response from a monitor command"""
911 response = self.connection.read_response()
913 if response is None:
914 return None
916 if isinstance(response, bytes):
917 response = self.connection.encoder.decode(response, force=True)
919 command_time, command_data = response.split(" ", 1)
920 m = self.monitor_re.match(command_data)
921 db_id, client_info, command = m.groups()
922 command = " ".join(self.command_re.findall(command))
923 # Redis escapes double quotes because each piece of the command
924 # string is surrounded by double quotes. We don't have that
925 # requirement so remove the escaping and leave the quote.
926 command = command.replace('\\"', '"')
928 if client_info == "lua":
929 client_address = "lua"
930 client_port = ""
931 client_type = "lua"
932 elif client_info.startswith("unix"):
933 client_address = "unix"
934 client_port = client_info[5:]
935 client_type = "unix"
936 else:
937 if client_info == "":
938 client_address = ""
939 client_port = ""
940 client_type = "unknown"
941 else:
942 # use rsplit as ipv6 addresses contain colons
943 client_address, client_port = client_info.rsplit(":", 1)
944 client_type = "tcp"
945 return {
946 "time": float(command_time),
947 "db": int(db_id),
948 "client_address": client_address,
949 "client_port": client_port,
950 "client_type": client_type,
951 "command": command,
952 }
954 def listen(self):
955 """Listen for commands coming to the server."""
956 while True:
957 yield self.next_command()
959 def _start_monitor(self):
960 self.connection.send_command("MONITOR")
961 # check that monitor returns 'OK', but don't return it to user
962 response = self.connection.read_response()
964 if not bool_ok(response):
965 raise RedisError(f"MONITOR failed: {response}")
968class PubSub:
969 """
970 PubSub provides publish, subscribe and listen support to Redis channels.
972 After subscribing to one or more channels, the listen() method will block
973 until a message arrives on one of the subscribed channels. That message
974 will be returned and it's safe to start listening again.
975 """
977 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage")
978 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe")
979 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
981 def __init__(
982 self,
983 connection_pool,
984 shard_hint=None,
985 ignore_subscribe_messages: bool = False,
986 encoder: Optional["Encoder"] = None,
987 push_handler_func: Union[None, Callable[[str], None]] = None,
988 event_dispatcher: Optional["EventDispatcher"] = None,
989 ):
990 self.connection_pool = connection_pool
991 self.shard_hint = shard_hint
992 self.ignore_subscribe_messages = ignore_subscribe_messages
993 self.connection = None
994 self.subscribed_event = threading.Event()
995 # we need to know the encoding options for this connection in order
996 # to lookup channel and pattern names for callback handlers.
997 self.encoder = encoder
998 self.push_handler_func = push_handler_func
999 if event_dispatcher is None:
1000 self._event_dispatcher = EventDispatcher()
1001 else:
1002 self._event_dispatcher = event_dispatcher
1004 self._lock = threading.RLock()
1005 if self.encoder is None:
1006 self.encoder = self.connection_pool.get_encoder()
1007 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
1008 if self.encoder.decode_responses:
1009 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
1010 else:
1011 self.health_check_response = [b"pong", self.health_check_response_b]
1012 if self.push_handler_func is None:
1013 _set_info_logger()
1014 self.reset()
1016 def __enter__(self) -> "PubSub":
1017 return self
1019 def __exit__(self, exc_type, exc_value, traceback) -> None:
1020 self.reset()
1022 def __del__(self) -> None:
1023 try:
1024 # if this object went out of scope prior to shutting down
1025 # subscriptions, close the connection manually before
1026 # returning it to the connection pool
1027 self.reset()
1028 except Exception:
1029 pass
1031 def reset(self) -> None:
1032 if self.connection:
1033 self.connection.disconnect()
1034 self.connection.deregister_connect_callback(self.on_connect)
1035 self.connection_pool.release(self.connection)
1036 self.connection = None
1037 self.health_check_response_counter = 0
1038 self.channels = {}
1039 self.pending_unsubscribe_channels = set()
1040 self.shard_channels = {}
1041 self.pending_unsubscribe_shard_channels = set()
1042 self.patterns = {}
1043 self.pending_unsubscribe_patterns = set()
1044 self.subscribed_event.clear()
1046 def close(self) -> None:
1047 self.reset()
1049 def _resubscribe(self, subscribed, subscribe_fn) -> None:
1050 # Replay handler-backed subscriptions as positional Subscription objects
1051 # so binary names never need to be decoded into keyword argument keys.
1052 subscriptions = pubsub_subscription_args(subscribed)
1053 if subscriptions:
1054 subscribe_fn(*subscriptions)
1056 def _resubscribe_shard_channels(self) -> None:
1057 self._resubscribe(self.shard_channels, self.ssubscribe)
1059 def on_connect(self, connection) -> None:
1060 "Re-subscribe to any channels and patterns previously subscribed to"
1061 self.pending_unsubscribe_channels.clear()
1062 self.pending_unsubscribe_patterns.clear()
1063 self.pending_unsubscribe_shard_channels.clear()
1064 if self.channels:
1065 self._resubscribe(self.channels, self.subscribe)
1066 if self.patterns:
1067 self._resubscribe(self.patterns, self.psubscribe)
1068 if self.shard_channels:
1069 self._resubscribe_shard_channels()
1071 @property
1072 def subscribed(self) -> bool:
1073 """Indicates if there are subscriptions to any channels or patterns"""
1074 return self.subscribed_event.is_set()
1076 def execute_command(self, *args):
1077 """Execute a publish/subscribe command"""
1079 # NOTE: don't parse the response in this function -- it could pull a
1080 # legitimate message off the stack if the connection is already
1081 # subscribed to one or more channels
1083 if self.connection is None:
1084 self.connection = self.connection_pool.get_connection()
1085 # register a callback that re-subscribes to any channels we
1086 # were listening to when we were disconnected
1087 self.connection.register_connect_callback(self.on_connect)
1088 if self.push_handler_func is not None:
1089 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
1090 self._event_dispatcher.dispatch(
1091 AfterPubSubConnectionInstantiationEvent(
1092 self.connection, self.connection_pool, ClientType.SYNC, self._lock
1093 )
1094 )
1095 connection = self.connection
1096 kwargs = {"check_health": not self.subscribed}
1097 if not self.subscribed:
1098 self.clean_health_check_responses()
1099 with self._lock:
1100 self._execute(connection, connection.send_command, *args, **kwargs)
1102 def clean_health_check_responses(self) -> None:
1103 """
1104 If any health check responses are present, clean them
1105 """
1106 ttl = 10
1107 conn = self.connection
1108 while conn and self.health_check_response_counter > 0 and ttl > 0:
1109 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
1110 response = self._execute(conn, conn.read_response)
1111 if self.is_health_check_response(response):
1112 self.health_check_response_counter -= 1
1113 else:
1114 raise PubSubError(
1115 "A non health check response was cleaned by "
1116 "execute_command: {}".format(response)
1117 )
1118 ttl -= 1
1120 def _reconnect(
1121 self,
1122 conn,
1123 error: Optional[Exception] = None,
1124 failure_count: Optional[int] = None,
1125 start_time: Optional[float] = None,
1126 command_name: Optional[str] = None,
1127 ) -> None:
1128 """
1129 The supported exceptions are already checked in the
1130 retry object so we don't need to do it here.
1132 In this error handler we are trying to reconnect to the server.
1133 """
1134 if error and failure_count <= conn.retry.get_retries():
1135 if command_name:
1136 record_operation_duration(
1137 command_name=command_name,
1138 duration_seconds=time.monotonic() - start_time,
1139 server_address=getattr(conn, "host", None),
1140 server_port=getattr(conn, "port", None),
1141 db_namespace=str(conn.db),
1142 error=error,
1143 retry_attempts=failure_count,
1144 )
1145 conn.disconnect()
1146 conn.connect()
1148 def _execute(self, conn, command, *args, **kwargs):
1149 """
1150 Connect manually upon disconnection. If the Redis server is down,
1151 this will fail and raise a ConnectionError as desired.
1152 After reconnection, the ``on_connect`` callback should have been
1153 called by the # connection to resubscribe us to any channels and
1154 patterns we were previously listening to
1155 """
1157 if conn.should_reconnect():
1158 self._reconnect(conn)
1160 if not len(args) == 0:
1161 command_name = args[0]
1162 else:
1163 command_name = None
1165 # Start timing for observability
1166 start_time = time.monotonic()
1167 # Track actual retry attempts for error reporting
1168 actual_retry_attempts = [0]
1170 def failure_callback(error, failure_count):
1171 actual_retry_attempts[0] = failure_count
1172 self._reconnect(conn, error, failure_count, start_time, command_name)
1174 try:
1175 response = conn.retry.call_with_retry(
1176 lambda: command(*args, **kwargs),
1177 failure_callback,
1178 with_failure_count=True,
1179 )
1181 if command_name:
1182 record_operation_duration(
1183 command_name=command_name,
1184 duration_seconds=time.monotonic() - start_time,
1185 server_address=getattr(conn, "host", None),
1186 server_port=getattr(conn, "port", None),
1187 db_namespace=str(conn.db),
1188 )
1190 return response
1191 except Exception as e:
1192 record_error_count(
1193 server_address=getattr(conn, "host", None),
1194 server_port=getattr(conn, "port", None),
1195 network_peer_address=getattr(conn, "host", None),
1196 network_peer_port=getattr(conn, "port", None),
1197 error_type=e,
1198 retry_attempts=actual_retry_attempts[0],
1199 is_internal=False,
1200 )
1201 raise
1203 def parse_response(self, block=True, timeout=0):
1204 """
1205 Parse the response from a publish/subscribe command.
1207 Args:
1208 block: If True, block indefinitely until a message is available.
1209 If False, return immediately if no message is available.
1210 Default: True
1211 timeout: The timeout in seconds for reading a response when block=False.
1212 This parameter is ignored when block=True.
1213 Default: 0 (return immediately if no data available)
1215 Returns:
1216 The parsed response from the server, or None if no message is available
1217 within the timeout period (when block=False).
1219 Important:
1220 The block and timeout parameters work together:
1221 - When block=True: timeout is IGNORED, method blocks indefinitely
1222 - When block=False: timeout is USED, method returns after timeout expires
1224 Typically, you should use get_message(timeout=X) instead of calling
1225 parse_response() directly. The get_message() method automatically sets
1226 block=False when a timeout is provided, and block=True when timeout=None.
1228 Example:
1229 # Block indefinitely (timeout is ignored)
1230 response = pubsub.parse_response(block=True, timeout=0.1)
1232 # Non-blocking with 0.1 second timeout
1233 response = pubsub.parse_response(block=False, timeout=0.1)
1235 # Non-blocking, return immediately
1236 response = pubsub.parse_response(block=False, timeout=0)
1238 # Recommended: use get_message() instead
1239 msg = pubsub.get_message(timeout=0.1) # automatically sets block=False
1240 msg = pubsub.get_message(timeout=None) # automatically sets block=True
1241 """
1242 conn = self.connection
1243 if conn is None:
1244 raise RuntimeError(
1245 "pubsub connection not set: "
1246 "did you forget to call subscribe() or psubscribe()?"
1247 )
1249 self.check_health()
1251 def try_read():
1252 if not block:
1253 if not conn.can_read(timeout=timeout):
1254 return None
1255 read_timeout = timeout
1256 else:
1257 conn.connect()
1258 read_timeout = SENTINEL # Use default socket timeout for blocking
1259 return conn.read_response(
1260 disconnect_on_error=False, push_request=True, timeout=read_timeout
1261 )
1263 response = self._execute(conn, try_read)
1265 if self.is_health_check_response(response):
1266 # ignore the health check message as user might not expect it
1267 self.health_check_response_counter -= 1
1268 return None
1269 return response
1271 def is_health_check_response(self, response) -> bool:
1272 """
1273 Check if the response is a health check response.
1274 If there are no subscriptions redis responds to PING command with a
1275 bulk response, instead of a multi-bulk with "pong" and the response.
1276 """
1277 if self.encoder.decode_responses:
1278 return (
1279 response
1280 in [
1281 self.health_check_response, # If there is a subscription
1282 self.HEALTH_CHECK_MESSAGE, # If there are no subscriptions and decode_responses=True
1283 ]
1284 )
1285 else:
1286 return (
1287 response
1288 in [
1289 self.health_check_response, # If there is a subscription
1290 self.health_check_response_b, # If there isn't a subscription and decode_responses=False
1291 ]
1292 )
1294 def check_health(self) -> None:
1295 conn = self.connection
1296 if conn is None:
1297 raise RuntimeError(
1298 "pubsub connection not set: "
1299 "did you forget to call subscribe() or psubscribe()?"
1300 )
1302 if conn.health_check_interval and time.monotonic() > conn.next_health_check:
1303 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
1304 self.health_check_response_counter += 1
1306 def _normalize_keys(self, data) -> Dict:
1307 """
1308 normalize channel/pattern names to be either bytes or strings
1309 based on whether responses are automatically decoded. this saves us
1310 from coercing the value for each message coming in.
1311 """
1312 encode = self.encoder.encode
1313 decode = self.encoder.decode
1314 return {decode(encode(k)): v for k, v in data.items()}
1316 def psubscribe(
1317 self, *args: ChannelT | Subscription, **kwargs: PubSubHandler
1318 ) -> None:
1319 """
1320 Subscribe to channel patterns.
1321 Patterns supplied as keyword arguments expect a pattern name as the
1322 key and a callable as the value.
1323 ``Subscription`` objects can also be supplied positionally with an
1324 optional handler.
1325 A pattern's callable will be invoked automatically
1326 when a message is received on that pattern rather than producing a
1327 message via ``listen()``.
1328 """
1329 new_patterns = parse_pubsub_subscriptions(args, kwargs)
1330 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
1331 # update the patterns dict AFTER we send the command. we don't want to
1332 # subscribe twice to these patterns, once for the command and again
1333 # for the reconnection.
1334 new_patterns = self._normalize_keys(new_patterns)
1335 self.patterns.update(new_patterns)
1336 if not self.subscribed:
1337 # Set the subscribed_event flag to True
1338 self.subscribed_event.set()
1339 # Clear the health check counter
1340 self.health_check_response_counter = 0
1341 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1342 return ret_val
1344 def punsubscribe(self, *args):
1345 """
1346 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1347 all patterns.
1348 """
1349 if args:
1350 args = list_or_args(args[0], args[1:])
1351 patterns = self._normalize_keys(dict.fromkeys(args))
1352 else:
1353 patterns = self.patterns
1354 self.pending_unsubscribe_patterns.update(patterns)
1355 return self.execute_command("PUNSUBSCRIBE", *args)
1357 def subscribe(
1358 self, *args: ChannelT | Subscription, **kwargs: PubSubHandler
1359 ) -> None:
1360 """
1361 Subscribe to channels.
1362 Channels supplied as keyword arguments expect
1363 a channel name as the key and a callable as the value.
1364 ``Subscription`` objects can also be supplied positionally with an
1365 optional handler.
1366 A channel's callable will be invoked automatically
1367 when a message is received on that channel rather than producing a
1368 message via ``listen()`` or ``get_message()``.
1369 """
1370 new_channels = parse_pubsub_subscriptions(args, kwargs)
1371 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1372 # update the channels dict AFTER we send the command. we don't want to
1373 # subscribe twice to these channels, once for the command and again
1374 # for the reconnection.
1375 new_channels = self._normalize_keys(new_channels)
1376 self.channels.update(new_channels)
1377 if not self.subscribed:
1378 # Set the subscribed_event flag to True
1379 self.subscribed_event.set()
1380 # Clear the health check counter
1381 self.health_check_response_counter = 0
1382 self.pending_unsubscribe_channels.difference_update(new_channels)
1383 return ret_val
1385 def unsubscribe(self, *args):
1386 """
1387 Unsubscribe from the supplied channels. If empty, unsubscribe from
1388 all channels
1389 """
1390 if args:
1391 args = list_or_args(args[0], args[1:])
1392 channels = self._normalize_keys(dict.fromkeys(args))
1393 else:
1394 channels = self.channels
1395 self.pending_unsubscribe_channels.update(channels)
1396 return self.execute_command("UNSUBSCRIBE", *args)
1398 def ssubscribe(
1399 self,
1400 *args: ChannelT | Subscription,
1401 target_node: Any = None,
1402 **kwargs: PubSubHandler,
1403 ) -> None:
1404 """
1405 Subscribes the client to the specified shard channels.
1406 Channels supplied as keyword arguments expect a channel name as the key
1407 and a callable as the value.
1408 ``Subscription`` objects can also be supplied positionally
1409 with an optional handler.
1410 A channel's callable will be invoked automatically when a message
1411 is received on that channel rather than producing a message
1412 via ``listen()`` or ``get_sharded_message()``.
1413 """
1414 new_s_channels = parse_pubsub_subscriptions(args, kwargs)
1415 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys())
1416 # update the s_channels dict AFTER we send the command. we don't want to
1417 # subscribe twice to these channels, once for the command and again
1418 # for the reconnection.
1419 new_s_channels = self._normalize_keys(new_s_channels)
1420 self.shard_channels.update(new_s_channels)
1421 if not self.subscribed:
1422 # Set the subscribed_event flag to True
1423 self.subscribed_event.set()
1424 # Clear the health check counter
1425 self.health_check_response_counter = 0
1426 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels)
1427 return ret_val
1429 def sunsubscribe(self, *args, target_node=None):
1430 """
1431 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
1432 all shard_channels
1433 """
1434 if args:
1435 args = list_or_args(args[0], args[1:])
1436 s_channels = self._normalize_keys(dict.fromkeys(args))
1437 else:
1438 s_channels = self.shard_channels
1439 self.pending_unsubscribe_shard_channels.update(s_channels)
1440 return self.execute_command("SUNSUBSCRIBE", *args)
1442 def listen(self):
1443 "Listen for messages on channels this client has been subscribed to"
1444 while self.subscribed:
1445 response = self.handle_message(self.parse_response(block=True))
1446 if response is not None:
1447 yield response
1449 def get_message(
1450 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
1451 ):
1452 """
1453 Get the next message if one is available, otherwise None.
1455 If timeout is specified, the system will wait for `timeout` seconds
1456 before returning. Timeout should be specified as a floating point
1457 number, or None, to wait indefinitely.
1458 """
1459 if not self.subscribed:
1460 # Wait for subscription
1461 start_time = time.monotonic()
1462 if self.subscribed_event.wait(timeout) is True:
1463 # The connection was subscribed during the timeout time frame.
1464 # The timeout should be adjusted based on the time spent
1465 # waiting for the subscription
1466 time_spent = time.monotonic() - start_time
1467 timeout = max(0.0, timeout - time_spent)
1468 else:
1469 # The connection isn't subscribed to any channels or patterns,
1470 # so no messages are available
1471 return None
1473 response = self.parse_response(block=(timeout is None), timeout=timeout)
1475 if response:
1476 return self.handle_message(response, ignore_subscribe_messages)
1477 return None
1479 get_sharded_message = get_message
1481 def ping(self, message: Union[str, None] = None) -> bool:
1482 """
1483 Ping the Redis server to test connectivity.
1485 Sends a PING command to the Redis server and returns True if the server
1486 responds with "PONG".
1487 """
1488 args = ["PING", message] if message is not None else ["PING"]
1489 return self.execute_command(*args)
1491 def handle_message(self, response, ignore_subscribe_messages=False):
1492 """
1493 Parses a pub/sub message. If the channel or pattern was subscribed to
1494 with a message handler, the handler is invoked instead of a parsed
1495 message being returned.
1496 """
1497 if response is None:
1498 return None
1499 if isinstance(response, bytes):
1500 response = [b"pong", response] if response != b"PONG" else [b"pong", b""]
1502 message_type = str_if_bytes(response[0])
1503 if message_type == "pmessage":
1504 message = {
1505 "type": message_type,
1506 "pattern": response[1],
1507 "channel": response[2],
1508 "data": response[3],
1509 }
1510 elif message_type == "pong":
1511 message = {
1512 "type": message_type,
1513 "pattern": None,
1514 "channel": None,
1515 "data": response[1],
1516 }
1517 else:
1518 message = {
1519 "type": message_type,
1520 "pattern": None,
1521 "channel": response[1],
1522 "data": response[2],
1523 }
1525 if message_type in ["message", "pmessage"]:
1526 channel = str_if_bytes(message["channel"])
1527 record_pubsub_message(
1528 direction=PubSubDirection.RECEIVE,
1529 channel=channel,
1530 )
1531 elif message_type == "smessage":
1532 channel = str_if_bytes(message["channel"])
1533 record_pubsub_message(
1534 direction=PubSubDirection.RECEIVE,
1535 channel=channel,
1536 sharded=True,
1537 )
1539 # if this is an unsubscribe message, remove it from memory
1540 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1541 if message_type == "punsubscribe":
1542 pattern = response[1]
1543 if pattern in self.pending_unsubscribe_patterns:
1544 self.pending_unsubscribe_patterns.remove(pattern)
1545 self.patterns.pop(pattern, None)
1546 elif message_type == "sunsubscribe":
1547 s_channel = response[1]
1548 if s_channel in self.pending_unsubscribe_shard_channels:
1549 self.pending_unsubscribe_shard_channels.remove(s_channel)
1550 self.shard_channels.pop(s_channel, None)
1551 else:
1552 channel = response[1]
1553 if channel in self.pending_unsubscribe_channels:
1554 self.pending_unsubscribe_channels.remove(channel)
1555 self.channels.pop(channel, None)
1556 if not self.channels and not self.patterns and not self.shard_channels:
1557 # There are no subscriptions anymore, set subscribed_event flag
1558 # to false
1559 self.subscribed_event.clear()
1561 if message_type in self.PUBLISH_MESSAGE_TYPES:
1562 # if there's a message handler, invoke it
1563 if message_type == "pmessage":
1564 handler = self.patterns.get(message["pattern"], None)
1565 elif message_type == "smessage":
1566 handler = self.shard_channels.get(message["channel"], None)
1567 else:
1568 handler = self.channels.get(message["channel"], None)
1569 if handler:
1570 handler(message)
1571 return None
1572 elif message_type != "pong":
1573 # this is a subscribe/unsubscribe message. ignore if we don't
1574 # want them
1575 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1576 return None
1578 return message
1580 def run_in_thread(
1581 self,
1582 sleep_time: float = 0.0,
1583 daemon: bool = False,
1584 exception_handler: Optional[Callable] = None,
1585 pubsub=None,
1586 sharded_pubsub: bool = False,
1587 ) -> "PubSubWorkerThread":
1588 for channel, handler in self.channels.items():
1589 if handler is None:
1590 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1591 for pattern, handler in self.patterns.items():
1592 if handler is None:
1593 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1594 for s_channel, handler in self.shard_channels.items():
1595 if handler is None:
1596 raise PubSubError(
1597 f"Shard Channel: '{s_channel}' has no handler registered"
1598 )
1600 pubsub = self if pubsub is None else pubsub
1601 thread = PubSubWorkerThread(
1602 pubsub,
1603 sleep_time,
1604 daemon=daemon,
1605 exception_handler=exception_handler,
1606 sharded_pubsub=sharded_pubsub,
1607 )
1608 thread.start()
1609 return thread
1612class PubSubWorkerThread(threading.Thread):
1613 def __init__(
1614 self,
1615 pubsub,
1616 sleep_time: float,
1617 daemon: bool = False,
1618 exception_handler: Union[
1619 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
1620 ] = None,
1621 sharded_pubsub: bool = False,
1622 ):
1623 super().__init__()
1624 self.daemon = daemon
1625 self.pubsub = pubsub
1626 self.sleep_time = sleep_time
1627 self.exception_handler = exception_handler
1628 self.sharded_pubsub = sharded_pubsub
1629 self._running = threading.Event()
1631 def run(self) -> None:
1632 if self._running.is_set():
1633 return
1634 self._running.set()
1635 pubsub = self.pubsub
1636 sleep_time = self.sleep_time
1637 while self._running.is_set():
1638 try:
1639 if not self.sharded_pubsub:
1640 pubsub.get_message(
1641 ignore_subscribe_messages=True, timeout=sleep_time
1642 )
1643 else:
1644 pubsub.get_sharded_message(
1645 ignore_subscribe_messages=True, timeout=sleep_time
1646 )
1647 except BaseException as e:
1648 if self.exception_handler is None:
1649 raise
1650 self.exception_handler(e, pubsub, self)
1651 pubsub.close()
1653 def stop(self) -> None:
1654 # trip the flag so the run loop exits. the run loop will
1655 # close the pubsub connection, which disconnects the socket
1656 # and returns the connection to the pool.
1657 self._running.clear()
1660class Pipeline(Redis):
1661 """
1662 Pipelines provide a way to transmit multiple commands to the Redis server
1663 in one transmission. This is convenient for batch processing, such as
1664 saving all the values in a list to Redis.
1666 All commands executed within a pipeline(when running in transactional mode,
1667 which is the default behavior) are wrapped with MULTI and EXEC
1668 calls. This guarantees all commands executed in the pipeline will be
1669 executed atomically.
1671 Any command raising an exception does *not* halt the execution of
1672 subsequent commands in the pipeline. Instead, the exception is caught
1673 and its instance is placed into the response list returned by execute().
1674 Code iterating over the response list should be able to deal with an
1675 instance of an exception as a potential value. In general, these will be
1676 ResponseError exceptions, such as those raised when issuing a command
1677 on a key of a different datatype.
1678 """
1680 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1682 def __init__(
1683 self,
1684 connection_pool: ConnectionPool,
1685 response_callbacks,
1686 transaction,
1687 shard_hint,
1688 ):
1689 self.connection_pool = connection_pool
1690 self.connection: Optional[Connection] = None
1691 self.response_callbacks = response_callbacks
1692 self.transaction = transaction
1693 self.shard_hint = shard_hint
1694 self.watching = False
1695 self.command_stack = []
1696 self.scripts: Set[Script] = set()
1697 self.explicit_transaction = False
1699 def __enter__(self) -> "Pipeline":
1700 return self
1702 def __exit__(self, exc_type, exc_value, traceback):
1703 self.reset()
1705 def __del__(self):
1706 try:
1707 self.reset()
1708 except Exception:
1709 pass
1711 def __len__(self) -> int:
1712 return len(self.command_stack)
1714 def __bool__(self) -> bool:
1715 """Pipeline instances should always evaluate to True"""
1716 return True
1718 def reset(self) -> None:
1719 self.command_stack = []
1720 self.scripts = set()
1721 # make sure to reset the connection state in the event that we were
1722 # watching something
1723 if self.watching and self.connection:
1724 try:
1725 # call this manually since our unwatch or
1726 # immediate_execute_command methods can call reset()
1727 self.connection.send_command("UNWATCH")
1728 self.connection.read_response()
1729 except ConnectionError:
1730 # disconnect will also remove any previous WATCHes
1731 self.connection.disconnect()
1732 # clean up the other instance attributes
1733 self.watching = False
1734 self.explicit_transaction = False
1736 # we can safely return the connection to the pool here since we're
1737 # sure we're no longer WATCHing anything
1738 if self.connection:
1739 self.connection_pool.release(self.connection)
1740 self.connection = None
1742 def close(self) -> None:
1743 """Close the pipeline"""
1744 self.reset()
1746 def multi(self) -> None:
1747 """
1748 Start a transactional block of the pipeline after WATCH commands
1749 are issued. End the transactional block with `execute`.
1750 """
1751 if self.explicit_transaction:
1752 raise RedisError("Cannot issue nested calls to MULTI")
1753 if self.command_stack:
1754 raise RedisError(
1755 "Commands without an initial WATCH have already been issued"
1756 )
1757 self.explicit_transaction = True
1759 def execute_command(self, *args, **kwargs):
1760 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1761 return self.immediate_execute_command(*args, **kwargs)
1762 return self.pipeline_execute_command(*args, **kwargs)
1764 def _disconnect_reset_raise_on_watching(
1765 self,
1766 conn: AbstractConnection,
1767 error: Exception,
1768 failure_count: Optional[int] = None,
1769 start_time: Optional[float] = None,
1770 command_name: Optional[str] = None,
1771 ) -> None:
1772 """
1773 Close the connection reset watching state and
1774 raise an exception if we were watching.
1776 The supported exceptions are already checked in the
1777 retry object so we don't need to do it here.
1779 After we disconnect the connection, it will try to reconnect and
1780 do a health check as part of the send_command logic(on connection level).
1781 """
1782 if error and failure_count <= conn.retry.get_retries():
1783 record_operation_duration(
1784 command_name=command_name,
1785 duration_seconds=time.monotonic() - start_time,
1786 server_address=getattr(conn, "host", None),
1787 server_port=getattr(conn, "port", None),
1788 db_namespace=str(conn.db),
1789 error=error,
1790 retry_attempts=failure_count,
1791 )
1792 conn.disconnect()
1794 # if we were already watching a variable, the watch is no longer
1795 # valid since this connection has died. raise a WatchError, which
1796 # indicates the user should retry this transaction.
1797 if self.watching:
1798 self.reset()
1799 raise WatchError(
1800 f"A {type(error).__name__} occurred while watching one or more keys"
1801 )
1803 def immediate_execute_command(self, *args, **options):
1804 """
1805 Execute a command immediately, but don't auto-retry on the supported
1806 errors for retry if we're already WATCHing a variable.
1807 Used when issuing WATCH or subsequent commands retrieving their values but before
1808 MULTI is called.
1809 """
1810 command_name = args[0]
1811 conn = self.connection
1812 # if this is the first call, we need a connection
1813 if not conn:
1814 conn = self.connection_pool.get_connection()
1815 self.connection = conn
1817 # Start timing for observability
1818 start_time = time.monotonic()
1819 # Track actual retry attempts for error reporting
1820 actual_retry_attempts = [0]
1822 def failure_callback(error, failure_count):
1823 if is_debug_log_enabled():
1824 add_debug_log_for_operation_failure(conn)
1825 actual_retry_attempts[0] = failure_count
1826 self._disconnect_reset_raise_on_watching(
1827 conn, error, failure_count, start_time, command_name
1828 )
1830 try:
1831 response = conn.retry.call_with_retry(
1832 lambda: self._send_command_parse_response(
1833 conn, command_name, *args, **options
1834 ),
1835 failure_callback,
1836 with_failure_count=True,
1837 )
1839 record_operation_duration(
1840 command_name=command_name,
1841 duration_seconds=time.monotonic() - start_time,
1842 server_address=getattr(conn, "host", None),
1843 server_port=getattr(conn, "port", None),
1844 db_namespace=str(conn.db),
1845 )
1847 return response
1848 except Exception as e:
1849 record_error_count(
1850 server_address=getattr(conn, "host", None),
1851 server_port=getattr(conn, "port", None),
1852 network_peer_address=getattr(conn, "host", None),
1853 network_peer_port=getattr(conn, "port", None),
1854 error_type=e,
1855 retry_attempts=actual_retry_attempts[0],
1856 is_internal=False,
1857 )
1858 raise
1860 def pipeline_execute_command(self, *args, **options) -> "Pipeline":
1861 """
1862 Stage a command to be executed when execute() is next called
1864 Returns the current Pipeline object back so commands can be
1865 chained together, such as:
1867 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1869 At some other point, you can then run: pipe.execute(),
1870 which will execute all commands queued in the pipe.
1871 """
1872 self.command_stack.append((args, options))
1873 return self
1875 def _execute_transaction(
1876 self, connection: Connection, commands, raise_on_error
1877 ) -> List:
1878 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1879 all_cmds = connection.pack_commands(
1880 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1881 )
1882 connection.send_packed_command(all_cmds)
1883 errors = []
1885 # parse off the response for MULTI
1886 # NOTE: we need to handle ResponseErrors here and continue
1887 # so that we read all the additional command messages from
1888 # the socket
1889 try:
1890 self.parse_response(connection, "_")
1891 except ResponseError as e:
1892 errors.append((0, e))
1894 # and all the other commands
1895 for i, command in enumerate(commands):
1896 if EMPTY_RESPONSE in command[1]:
1897 errors.append((i, command[1][EMPTY_RESPONSE]))
1898 else:
1899 try:
1900 self.parse_response(connection, "_")
1901 except ResponseError as e:
1902 self.annotate_exception(e, i + 1, command[0])
1903 errors.append((i, e))
1905 # parse the EXEC.
1906 try:
1907 response = self.parse_response(connection, "_")
1908 except ExecAbortError:
1909 if errors:
1910 raise errors[0][1]
1911 raise
1913 # EXEC clears any watched keys
1914 self.watching = False
1916 if response is None:
1917 raise WatchError("Watched variable changed.")
1919 # put any parse errors into the response
1920 for i, e in errors:
1921 response.insert(i, e)
1923 if len(response) != len(commands):
1924 self.connection.disconnect()
1925 raise ResponseError(
1926 "Wrong number of response items from pipeline execution"
1927 )
1929 # find any errors in the response and raise if necessary
1930 if raise_on_error:
1931 self.raise_first_error(commands, response)
1933 # We have to run response callbacks manually
1934 data = []
1935 for r, cmd in zip(response, commands):
1936 if not isinstance(r, Exception):
1937 args, options = cmd
1938 # Remove keys entry, it needs only for cache.
1939 options.pop("keys", None)
1940 command_name = args[0]
1941 if command_name in self.response_callbacks:
1942 r = self.response_callbacks[command_name](r, **options)
1943 data.append(r)
1945 return data
1947 def _execute_pipeline(self, connection, commands, raise_on_error):
1948 # build up all commands into a single request to increase network perf
1949 all_cmds = connection.pack_commands([args for args, _ in commands])
1950 connection.send_packed_command(all_cmds)
1952 responses = []
1953 for args, options in commands:
1954 try:
1955 responses.append(self.parse_response(connection, args[0], **options))
1956 except ResponseError as e:
1957 responses.append(e)
1959 if raise_on_error:
1960 self.raise_first_error(commands, responses)
1962 return responses
1964 def raise_first_error(self, commands, response):
1965 for i, r in enumerate(response):
1966 if isinstance(r, ResponseError):
1967 self.annotate_exception(r, i + 1, commands[i][0])
1968 raise r
1970 def annotate_exception(self, exception, number, command):
1971 cmd = " ".join(map(safe_str, command))
1972 msg = (
1973 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
1974 f"caused error: {exception.args[0]}"
1975 )
1976 exception.args = (msg,) + exception.args[1:]
1978 def parse_response(self, connection, command_name, **options):
1979 result = Redis.parse_response(self, connection, command_name, **options)
1980 if command_name in self.UNWATCH_COMMANDS:
1981 self.watching = False
1982 elif command_name == "WATCH":
1983 self.watching = True
1984 return result
1986 def load_scripts(self):
1987 # make sure all scripts that are about to be run on this pipeline exist
1988 scripts = list(self.scripts)
1989 immediate = self.immediate_execute_command
1990 shas = [s.sha for s in scripts]
1991 # we can't use the normal script_* methods because they would just
1992 # get buffered in the pipeline.
1993 exists = immediate("SCRIPT EXISTS", *shas)
1994 if not all(exists):
1995 for s, exist in zip(scripts, exists):
1996 if not exist:
1997 s.sha = immediate("SCRIPT LOAD", s.script)
1999 def _disconnect_raise_on_watching(
2000 self,
2001 conn: AbstractConnection,
2002 error: Exception,
2003 failure_count: Optional[int] = None,
2004 start_time: Optional[float] = None,
2005 command_name: Optional[str] = None,
2006 ) -> None:
2007 """
2008 Close the connection, raise an exception if we were watching.
2010 The supported exceptions are already checked in the
2011 retry object so we don't need to do it here.
2013 After we disconnect the connection, it will try to reconnect and
2014 do a health check as part of the send_command logic(on connection level).
2015 """
2016 if error and failure_count <= conn.retry.get_retries():
2017 record_operation_duration(
2018 command_name=command_name,
2019 duration_seconds=time.monotonic() - start_time,
2020 server_address=getattr(conn, "host", None),
2021 server_port=getattr(conn, "port", None),
2022 db_namespace=str(conn.db),
2023 error=error,
2024 retry_attempts=failure_count,
2025 )
2026 conn.disconnect()
2027 # if we were watching a variable, the watch is no longer valid
2028 # since this connection has died. raise a WatchError, which
2029 # indicates the user should retry this transaction.
2030 if self.watching:
2031 raise WatchError(
2032 f"A {type(error).__name__} occurred while watching one or more keys"
2033 )
2035 def execute(self, raise_on_error: bool = True) -> List[Any]:
2036 """Execute all the commands in the current pipeline"""
2037 stack = self.command_stack
2038 if not stack and not self.watching:
2039 return []
2040 if self.scripts:
2041 self.load_scripts()
2042 if self.transaction or self.explicit_transaction:
2043 execute = self._execute_transaction
2044 operation_name = "MULTI"
2045 else:
2046 execute = self._execute_pipeline
2047 operation_name = "PIPELINE"
2049 conn = self.connection
2050 if not conn:
2051 conn = self.connection_pool.get_connection()
2052 # assign to self.connection so reset() releases the connection
2053 # back to the pool after we're done
2054 self.connection = conn
2056 # Start timing for observability
2057 start_time = time.monotonic()
2058 # Track actual retry attempts for error reporting
2059 actual_retry_attempts = [0]
2061 def failure_callback(error, failure_count):
2062 if is_debug_log_enabled():
2063 add_debug_log_for_operation_failure(conn)
2064 actual_retry_attempts[0] = failure_count
2065 self._disconnect_raise_on_watching(
2066 conn, error, failure_count, start_time, operation_name
2067 )
2069 try:
2070 response = conn.retry.call_with_retry(
2071 lambda: execute(conn, stack, raise_on_error),
2072 failure_callback,
2073 with_failure_count=True,
2074 )
2076 record_operation_duration(
2077 command_name=operation_name,
2078 duration_seconds=time.monotonic() - start_time,
2079 server_address=getattr(conn, "host", None),
2080 server_port=getattr(conn, "port", None),
2081 db_namespace=str(conn.db),
2082 )
2083 return response
2084 except Exception as e:
2085 record_error_count(
2086 server_address=getattr(conn, "host", None),
2087 server_port=getattr(conn, "port", None),
2088 network_peer_address=getattr(conn, "host", None),
2089 network_peer_port=getattr(conn, "port", None),
2090 error_type=e,
2091 retry_attempts=actual_retry_attempts[0],
2092 is_internal=False,
2093 )
2094 raise
2096 finally:
2097 # in reset() the connection is disconnected before returned to the pool if
2098 # it is marked for reconnect.
2099 self.reset()
2101 def discard(self):
2102 """
2103 Flushes all previously queued commands
2104 See: https://redis.io/commands/DISCARD
2105 """
2106 self.execute_command("DISCARD")
2108 def watch(self, *names):
2109 """Watches the values at keys ``names``"""
2110 if self.explicit_transaction:
2111 raise RedisError("Cannot issue a WATCH after a MULTI")
2112 return self.execute_command("WATCH", *names)
2114 def unwatch(self) -> bool:
2115 """Unwatches all previously specified keys"""
2116 return self.watching and self.execute_command("UNWATCH") or True