Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import logging
3import re
4import threading
5import time
6from itertools import chain
7from typing import (
8 TYPE_CHECKING,
9 Any,
10 Callable,
11 Dict,
12 List,
13 Mapping,
14 Optional,
15 Set,
16 Type,
17 Union,
18)
20from redis._parsers.encoders import Encoder
21from redis._parsers.helpers import (
22 _RedisCallbacks,
23 _RedisCallbacksRESP2,
24 _RedisCallbacksRESP3,
25 bool_ok,
26)
27from redis.backoff import ExponentialWithJitterBackoff
28from redis.cache import CacheConfig, CacheInterface
29from redis.commands import (
30 CoreCommands,
31 RedisModuleCommands,
32 SentinelCommands,
33 list_or_args,
34)
35from redis.commands.core import Script
36from redis.connection import (
37 AbstractConnection,
38 Connection,
39 ConnectionPool,
40 SSLConnection,
41 UnixDomainSocketConnection,
42)
43from redis.credentials import CredentialProvider
44from redis.driver_info import DriverInfo, resolve_driver_info
45from redis.event import (
46 AfterPooledConnectionsInstantiationEvent,
47 AfterPubSubConnectionInstantiationEvent,
48 AfterSingleConnectionInstantiationEvent,
49 ClientType,
50 EventDispatcher,
51)
52from redis.exceptions import (
53 ConnectionError,
54 ExecAbortError,
55 PubSubError,
56 RedisError,
57 ResponseError,
58 WatchError,
59)
60from redis.lock import Lock
61from redis.maint_notifications import (
62 MaintNotificationsConfig,
63 OSSMaintNotificationsHandler,
64)
65from redis.observability.attributes import PubSubDirection
66from redis.observability.recorder import (
67 record_error_count,
68 record_operation_duration,
69 record_pubsub_message,
70)
71from redis.retry import Retry
72from redis.utils import (
73 _set_info_logger,
74 check_protocol_version,
75 deprecated_args,
76 safe_str,
77 str_if_bytes,
78 truncate_text,
79)
81if TYPE_CHECKING:
82 import ssl
84 import OpenSSL
86SYM_EMPTY = b""
87EMPTY_RESPONSE = "EMPTY_RESPONSE"
89# some responses (ie. dump) are binary, and just meant to never be decoded
90NEVER_DECODE = "NEVER_DECODE"
93logger = logging.getLogger(__name__)
96def is_debug_log_enabled():
97 return logger.isEnabledFor(logging.DEBUG)
100def add_debug_log_for_operation_failure(connection: "AbstractConnection"):
101 logger.debug(
102 f"Operation failed, "
103 f"with connection: {connection}, details: {connection.extract_connection_details() if connection else 'no connection'}",
104 )
107class CaseInsensitiveDict(dict):
108 "Case insensitive dict implementation. Assumes string keys only."
110 def __init__(self, data: Dict[str, str]) -> None:
111 for k, v in data.items():
112 self[k.upper()] = v
114 def __contains__(self, k):
115 return super().__contains__(k.upper())
117 def __delitem__(self, k):
118 super().__delitem__(k.upper())
120 def __getitem__(self, k):
121 return super().__getitem__(k.upper())
123 def get(self, k, default=None):
124 return super().get(k.upper(), default)
126 def __setitem__(self, k, v):
127 super().__setitem__(k.upper(), v)
129 def update(self, data):
130 data = CaseInsensitiveDict(data)
131 super().update(data)
134class AbstractRedis:
135 pass
138class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
139 """
140 Implementation of the Redis protocol.
142 This abstract class provides a Python interface to all Redis commands
143 and an implementation of the Redis protocol.
145 Pipelines derive from this, implementing how
146 the commands are sent and received to the Redis server. Based on
147 configuration, an instance will either use a ConnectionPool, or
148 Connection object to talk to redis.
150 It is not safe to pass PubSub or Pipeline objects between threads.
151 """
153 @classmethod
154 def from_url(cls, url: str, **kwargs) -> "Redis":
155 """
156 Return a Redis client object configured from the given URL
158 For example::
160 redis://[[username]:[password]]@localhost:6379/0
161 rediss://[[username]:[password]]@localhost:6379/0
162 unix://[username@]/path/to/socket.sock?db=0[&password=password]
164 Three URL schemes are supported:
166 - `redis://` creates a TCP socket connection. See more at:
167 <https://www.iana.org/assignments/uri-schemes/prov/redis>
168 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
169 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
170 - ``unix://``: creates a Unix Domain Socket connection.
172 The username, password, hostname, path and all querystring values
173 are passed through urllib.parse.unquote in order to replace any
174 percent-encoded values with their corresponding characters.
176 There are several ways to specify a database number. The first value
177 found will be used:
179 1. A ``db`` querystring option, e.g. redis://localhost?db=0
180 2. If using the redis:// or rediss:// schemes, the path argument
181 of the url, e.g. redis://localhost/0
182 3. A ``db`` keyword argument to this function.
184 If none of these options are specified, the default db=0 is used.
186 All querystring options are cast to their appropriate Python types.
187 Boolean arguments can be specified with string values "True"/"False"
188 or "Yes"/"No". Values that cannot be properly cast cause a
189 ``ValueError`` to be raised. Once parsed, the querystring arguments
190 and keyword arguments are passed to the ``ConnectionPool``'s
191 class initializer. In the case of conflicting arguments, querystring
192 arguments always win.
194 """
195 single_connection_client = kwargs.pop("single_connection_client", False)
196 connection_pool = ConnectionPool.from_url(url, **kwargs)
197 client = cls(
198 connection_pool=connection_pool,
199 single_connection_client=single_connection_client,
200 )
201 client.auto_close_connection_pool = True
202 return client
204 @classmethod
205 def from_pool(
206 cls: Type["Redis"],
207 connection_pool: ConnectionPool,
208 ) -> "Redis":
209 """
210 Return a Redis client from the given connection pool.
211 The Redis client will take ownership of the connection pool and
212 close it when the Redis client is closed.
213 """
214 client = cls(
215 connection_pool=connection_pool,
216 )
217 client.auto_close_connection_pool = True
218 return client
220 @deprecated_args(
221 args_to_warn=["retry_on_timeout"],
222 reason="TimeoutError is included by default.",
223 version="6.0.0",
224 )
225 @deprecated_args(
226 args_to_warn=["lib_name", "lib_version"],
227 reason="Use 'driver_info' parameter instead. "
228 "lib_name and lib_version will be removed in a future version.",
229 )
230 def __init__(
231 self,
232 host: str = "localhost",
233 port: int = 6379,
234 db: int = 0,
235 password: Optional[str] = None,
236 socket_timeout: Optional[float] = None,
237 socket_connect_timeout: Optional[float] = None,
238 socket_keepalive: Optional[bool] = None,
239 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
240 connection_pool: Optional[ConnectionPool] = None,
241 unix_socket_path: Optional[str] = None,
242 encoding: str = "utf-8",
243 encoding_errors: str = "strict",
244 decode_responses: bool = False,
245 retry_on_timeout: bool = False,
246 retry: Retry = Retry(
247 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
248 ),
249 retry_on_error: Optional[List[Type[Exception]]] = None,
250 ssl: bool = False,
251 ssl_keyfile: Optional[str] = None,
252 ssl_certfile: Optional[str] = None,
253 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
254 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
255 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
256 ssl_ca_certs: Optional[str] = None,
257 ssl_ca_path: Optional[str] = None,
258 ssl_ca_data: Optional[str] = None,
259 ssl_check_hostname: bool = True,
260 ssl_password: Optional[str] = None,
261 ssl_validate_ocsp: bool = False,
262 ssl_validate_ocsp_stapled: bool = False,
263 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
264 ssl_ocsp_expected_cert: Optional[str] = None,
265 ssl_min_version: Optional["ssl.TLSVersion"] = None,
266 ssl_ciphers: Optional[str] = None,
267 max_connections: Optional[int] = None,
268 single_connection_client: bool = False,
269 health_check_interval: int = 0,
270 client_name: Optional[str] = None,
271 lib_name: Optional[str] = None,
272 lib_version: Optional[str] = None,
273 driver_info: Optional["DriverInfo"] = None,
274 username: Optional[str] = None,
275 redis_connect_func: Optional[Callable[[], None]] = None,
276 credential_provider: Optional[CredentialProvider] = None,
277 protocol: Optional[int] = 2,
278 cache: Optional[CacheInterface] = None,
279 cache_config: Optional[CacheConfig] = None,
280 event_dispatcher: Optional[EventDispatcher] = None,
281 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
282 oss_cluster_maint_notifications_handler: Optional[
283 OSSMaintNotificationsHandler
284 ] = None,
285 ) -> None:
286 """
287 Initialize a new Redis client.
289 To specify a retry policy for specific errors, you have two options:
291 1. Set the `retry_on_error` to a list of the error/s to retry on, and
292 you can also set `retry` to a valid `Retry` object(in case the default
293 one is not appropriate) - with this approach the retries will be triggered
294 on the default errors specified in the Retry object enriched with the
295 errors specified in `retry_on_error`.
297 2. Define a `Retry` object with configured 'supported_errors' and set
298 it to the `retry` parameter - with this approach you completely redefine
299 the errors on which retries will happen.
301 `retry_on_timeout` is deprecated - please include the TimeoutError
302 either in the Retry object or in the `retry_on_error` list.
304 When 'connection_pool' is provided - the retry configuration of the
305 provided pool will be used.
307 Args:
309 single_connection_client:
310 if `True`, connection pool is not used. In that case `Redis`
311 instance use is not thread safe.
312 decode_responses:
313 if `True`, the response will be decoded to utf-8.
314 Argument is ignored when connection_pool is provided.
315 driver_info:
316 Optional DriverInfo object to identify upstream libraries.
317 If provided, lib_name and lib_version are ignored.
318 If not provided, a DriverInfo will be created from lib_name and lib_version.
319 Argument is ignored when connection_pool is provided.
320 lib_name:
321 **Deprecated.** Use driver_info instead. Library name for CLIENT SETINFO.
322 lib_version:
323 **Deprecated.** Use driver_info instead. Library version for CLIENT SETINFO.
324 maint_notifications_config:
325 configures the pool to support maintenance notifications - see
326 `redis.maint_notifications.MaintNotificationsConfig` for details.
327 Only supported with RESP3
328 If not provided and protocol is RESP3, the maintenance notifications
329 will be enabled by default (logic is included in the connection pool
330 initialization).
331 Argument is ignored when connection_pool is provided.
332 oss_cluster_maint_notifications_handler:
333 handler for OSS cluster notifications - see
334 `redis.maint_notifications.OSSMaintNotificationsHandler` for details.
335 Only supported with RESP3
336 Argument is ignored when connection_pool is provided.
337 """
338 if event_dispatcher is None:
339 self._event_dispatcher = EventDispatcher()
340 else:
341 self._event_dispatcher = event_dispatcher
342 if not connection_pool:
343 if not retry_on_error:
344 retry_on_error = []
346 # Handle driver_info: if provided, use it; otherwise create from lib_name/lib_version
347 computed_driver_info = resolve_driver_info(
348 driver_info, lib_name, lib_version
349 )
351 kwargs = {
352 "db": db,
353 "username": username,
354 "password": password,
355 "socket_timeout": socket_timeout,
356 "encoding": encoding,
357 "encoding_errors": encoding_errors,
358 "decode_responses": decode_responses,
359 "retry_on_error": retry_on_error,
360 "retry": copy.deepcopy(retry),
361 "max_connections": max_connections,
362 "health_check_interval": health_check_interval,
363 "client_name": client_name,
364 "driver_info": computed_driver_info,
365 "redis_connect_func": redis_connect_func,
366 "credential_provider": credential_provider,
367 "protocol": protocol,
368 }
369 # based on input, setup appropriate connection args
370 if unix_socket_path is not None:
371 kwargs.update(
372 {
373 "path": unix_socket_path,
374 "connection_class": UnixDomainSocketConnection,
375 }
376 )
377 else:
378 # TCP specific options
379 kwargs.update(
380 {
381 "host": host,
382 "port": port,
383 "socket_connect_timeout": socket_connect_timeout,
384 "socket_keepalive": socket_keepalive,
385 "socket_keepalive_options": socket_keepalive_options,
386 }
387 )
389 if ssl:
390 kwargs.update(
391 {
392 "connection_class": SSLConnection,
393 "ssl_keyfile": ssl_keyfile,
394 "ssl_certfile": ssl_certfile,
395 "ssl_cert_reqs": ssl_cert_reqs,
396 "ssl_include_verify_flags": ssl_include_verify_flags,
397 "ssl_exclude_verify_flags": ssl_exclude_verify_flags,
398 "ssl_ca_certs": ssl_ca_certs,
399 "ssl_ca_data": ssl_ca_data,
400 "ssl_check_hostname": ssl_check_hostname,
401 "ssl_password": ssl_password,
402 "ssl_ca_path": ssl_ca_path,
403 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
404 "ssl_validate_ocsp": ssl_validate_ocsp,
405 "ssl_ocsp_context": ssl_ocsp_context,
406 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
407 "ssl_min_version": ssl_min_version,
408 "ssl_ciphers": ssl_ciphers,
409 }
410 )
411 if (cache_config or cache) and check_protocol_version(protocol, 3):
412 kwargs.update(
413 {
414 "cache": cache,
415 "cache_config": cache_config,
416 }
417 )
418 maint_notifications_enabled = (
419 maint_notifications_config and maint_notifications_config.enabled
420 )
421 if maint_notifications_enabled and protocol not in [
422 3,
423 "3",
424 ]:
425 raise RedisError(
426 "Maintenance notifications handlers on connection are only supported with RESP version 3"
427 )
428 if maint_notifications_config:
429 kwargs.update(
430 {
431 "maint_notifications_config": maint_notifications_config,
432 }
433 )
434 if oss_cluster_maint_notifications_handler:
435 kwargs.update(
436 {
437 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
438 }
439 )
440 connection_pool = ConnectionPool(**kwargs)
441 self._event_dispatcher.dispatch(
442 AfterPooledConnectionsInstantiationEvent(
443 [connection_pool], ClientType.SYNC, credential_provider
444 )
445 )
446 self.auto_close_connection_pool = True
447 else:
448 self.auto_close_connection_pool = False
449 self._event_dispatcher.dispatch(
450 AfterPooledConnectionsInstantiationEvent(
451 [connection_pool], ClientType.SYNC, credential_provider
452 )
453 )
455 self.connection_pool = connection_pool
457 if (cache_config or cache) and self.connection_pool.get_protocol() not in [
458 3,
459 "3",
460 ]:
461 raise RedisError("Client caching is only supported with RESP version 3")
463 self.single_connection_lock = threading.RLock()
464 self.connection = None
465 self._single_connection_client = single_connection_client
466 if self._single_connection_client:
467 self.connection = self.connection_pool.get_connection()
468 self._event_dispatcher.dispatch(
469 AfterSingleConnectionInstantiationEvent(
470 self.connection, ClientType.SYNC, self.single_connection_lock
471 )
472 )
474 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks)
476 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
477 self.response_callbacks.update(_RedisCallbacksRESP3)
478 else:
479 self.response_callbacks.update(_RedisCallbacksRESP2)
481 def __repr__(self) -> str:
482 return (
483 f"<{type(self).__module__}.{type(self).__name__}"
484 f"({repr(self.connection_pool)})>"
485 )
487 def get_encoder(self) -> "Encoder":
488 """Get the connection pool's encoder"""
489 return self.connection_pool.get_encoder()
491 def get_connection_kwargs(self) -> Dict:
492 """Get the connection's key-word arguments"""
493 return self.connection_pool.connection_kwargs
495 def get_retry(self) -> Optional[Retry]:
496 return self.get_connection_kwargs().get("retry")
498 def set_retry(self, retry: Retry) -> None:
499 self.get_connection_kwargs().update({"retry": retry})
500 self.connection_pool.set_retry(retry)
502 def set_response_callback(self, command: str, callback: Callable) -> None:
503 """Set a custom Response Callback"""
504 self.response_callbacks[command] = callback
506 def load_external_module(self, funcname, func) -> None:
507 """
508 This function can be used to add externally defined redis modules,
509 and their namespaces to the redis client.
511 funcname - A string containing the name of the function to create
512 func - The function, being added to this class.
514 ex: Assume that one has a custom redis module named foomod that
515 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
516 To load function functions into this namespace:
518 from redis import Redis
519 from foomodule import F
520 r = Redis()
521 r.load_external_module("foo", F)
522 r.foo().dothing('your', 'arguments')
524 For a concrete example see the reimport of the redisjson module in
525 tests/test_connection.py::test_loading_external_modules
526 """
527 setattr(self, funcname, func)
529 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
530 """
531 Return a new pipeline object that can queue multiple commands for
532 later execution. ``transaction`` indicates whether all commands
533 should be executed atomically. Apart from making a group of operations
534 atomic, pipelines are useful for reducing the back-and-forth overhead
535 between the client and server.
536 """
537 return Pipeline(
538 self.connection_pool, self.response_callbacks, transaction, shard_hint
539 )
541 def transaction(
542 self, func: Callable[["Pipeline"], None], *watches, **kwargs
543 ) -> Union[List[Any], Any, None]:
544 """
545 Convenience method for executing the callable `func` as a transaction
546 while watching all keys specified in `watches`. The 'func' callable
547 should expect a single argument which is a Pipeline object.
548 """
549 shard_hint = kwargs.pop("shard_hint", None)
550 value_from_callable = kwargs.pop("value_from_callable", False)
551 watch_delay = kwargs.pop("watch_delay", None)
552 with self.pipeline(True, shard_hint) as pipe:
553 while True:
554 try:
555 if watches:
556 pipe.watch(*watches)
557 func_value = func(pipe)
558 exec_value = pipe.execute()
559 return func_value if value_from_callable else exec_value
560 except WatchError:
561 if watch_delay is not None and watch_delay > 0:
562 time.sleep(watch_delay)
563 continue
565 def lock(
566 self,
567 name: str,
568 timeout: Optional[float] = None,
569 sleep: float = 0.1,
570 blocking: bool = True,
571 blocking_timeout: Optional[float] = None,
572 lock_class: Union[None, Any] = None,
573 thread_local: bool = True,
574 raise_on_release_error: bool = True,
575 ):
576 """
577 Return a new Lock object using key ``name`` that mimics
578 the behavior of threading.Lock.
580 If specified, ``timeout`` indicates a maximum life for the lock.
581 By default, it will remain locked until release() is called.
583 ``sleep`` indicates the amount of time to sleep per loop iteration
584 when the lock is in blocking mode and another client is currently
585 holding the lock.
587 ``blocking`` indicates whether calling ``acquire`` should block until
588 the lock has been acquired or to fail immediately, causing ``acquire``
589 to return False and the lock not being acquired. Defaults to True.
590 Note this value can be overridden by passing a ``blocking``
591 argument to ``acquire``.
593 ``blocking_timeout`` indicates the maximum amount of time in seconds to
594 spend trying to acquire the lock. A value of ``None`` indicates
595 continue trying forever. ``blocking_timeout`` can be specified as a
596 float or integer, both representing the number of seconds to wait.
598 ``lock_class`` forces the specified lock implementation. Note that as
599 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
600 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
601 you have created your own custom lock class.
603 ``thread_local`` indicates whether the lock token is placed in
604 thread-local storage. By default, the token is placed in thread local
605 storage so that a thread only sees its token, not a token set by
606 another thread. Consider the following timeline:
608 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
609 thread-1 sets the token to "abc"
610 time: 1, thread-2 blocks trying to acquire `my-lock` using the
611 Lock instance.
612 time: 5, thread-1 has not yet completed. redis expires the lock
613 key.
614 time: 5, thread-2 acquired `my-lock` now that it's available.
615 thread-2 sets the token to "xyz"
616 time: 6, thread-1 finishes its work and calls release(). if the
617 token is *not* stored in thread local storage, then
618 thread-1 would see the token value as "xyz" and would be
619 able to successfully release the thread-2's lock.
621 ``raise_on_release_error`` indicates whether to raise an exception when
622 the lock is no longer owned when exiting the context manager. By default,
623 this is True, meaning an exception will be raised. If False, the warning
624 will be logged and the exception will be suppressed.
626 In some use cases it's necessary to disable thread local storage. For
627 example, if you have code where one thread acquires a lock and passes
628 that lock instance to a worker thread to release later. If thread
629 local storage isn't disabled in this case, the worker thread won't see
630 the token set by the thread that acquired the lock. Our assumption
631 is that these cases aren't common and as such default to using
632 thread local storage."""
633 if lock_class is None:
634 lock_class = Lock
635 return lock_class(
636 self,
637 name,
638 timeout=timeout,
639 sleep=sleep,
640 blocking=blocking,
641 blocking_timeout=blocking_timeout,
642 thread_local=thread_local,
643 raise_on_release_error=raise_on_release_error,
644 )
646 def pubsub(self, **kwargs):
647 """
648 Return a Publish/Subscribe object. With this object, you can
649 subscribe to channels and listen for messages that get published to
650 them.
651 """
652 return PubSub(
653 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs
654 )
656 def monitor(self):
657 return Monitor(self.connection_pool)
659 def client(self):
660 return self.__class__(
661 connection_pool=self.connection_pool,
662 single_connection_client=True,
663 )
665 def __enter__(self):
666 return self
668 def __exit__(self, exc_type, exc_value, traceback):
669 self.close()
671 def __del__(self):
672 try:
673 self.close()
674 except Exception:
675 pass
677 def close(self) -> None:
678 # In case a connection property does not yet exist
679 # (due to a crash earlier in the Redis() constructor), return
680 # immediately as there is nothing to clean-up.
681 if not hasattr(self, "connection"):
682 return
684 conn = self.connection
685 if conn:
686 self.connection = None
687 self.connection_pool.release(conn)
689 if self.auto_close_connection_pool:
690 self.connection_pool.disconnect()
692 def _send_command_parse_response(self, conn, command_name, *args, **options):
693 """
694 Send a command and parse the response
695 """
696 conn.send_command(*args, **options)
697 return self.parse_response(conn, command_name, **options)
699 def _close_connection(
700 self,
701 conn,
702 error: Optional[Exception] = None,
703 failure_count: Optional[int] = None,
704 start_time: Optional[float] = None,
705 command_name: Optional[str] = None,
706 ) -> None:
707 """
708 Close the connection before retrying.
710 The supported exceptions are already checked in the
711 retry object so we don't need to do it here.
713 After we disconnect the connection, it will try to reconnect and
714 do a health check as part of the send_command logic(on connection level).
715 """
716 if error and failure_count <= conn.retry.get_retries():
717 record_operation_duration(
718 command_name=command_name,
719 duration_seconds=time.monotonic() - start_time,
720 server_address=getattr(conn, "host", None),
721 server_port=getattr(conn, "port", None),
722 db_namespace=str(conn.db),
723 error=error,
724 retry_attempts=failure_count,
725 )
727 conn.disconnect()
729 # COMMAND EXECUTION AND PROTOCOL PARSING
730 def execute_command(self, *args, **options):
731 return self._execute_command(*args, **options)
733 def _execute_command(self, *args, **options):
734 """Execute a command and return a parsed response"""
735 pool = self.connection_pool
736 command_name = args[0]
737 conn = self.connection or pool.get_connection()
739 # Start timing for observability
740 start_time = time.monotonic()
741 # Track actual retry attempts for error reporting
742 actual_retry_attempts = [0]
744 def failure_callback(error, failure_count):
745 if is_debug_log_enabled():
746 add_debug_log_for_operation_failure(conn)
747 actual_retry_attempts[0] = failure_count
748 self._close_connection(conn, error, failure_count, start_time, command_name)
750 if self._single_connection_client:
751 self.single_connection_lock.acquire()
752 try:
753 result = conn.retry.call_with_retry(
754 lambda: self._send_command_parse_response(
755 conn, command_name, *args, **options
756 ),
757 failure_callback,
758 with_failure_count=True,
759 )
761 record_operation_duration(
762 command_name=command_name,
763 duration_seconds=time.monotonic() - start_time,
764 server_address=getattr(conn, "host", None),
765 server_port=getattr(conn, "port", None),
766 db_namespace=str(conn.db),
767 )
768 return result
769 except Exception as e:
770 record_error_count(
771 server_address=getattr(conn, "host", None),
772 server_port=getattr(conn, "port", None),
773 network_peer_address=getattr(conn, "host", None),
774 network_peer_port=getattr(conn, "port", None),
775 error_type=e,
776 retry_attempts=actual_retry_attempts[0],
777 is_internal=False,
778 )
779 raise
781 finally:
782 if conn and conn.should_reconnect():
783 self._close_connection(conn)
784 conn.connect()
785 if self._single_connection_client:
786 self.single_connection_lock.release()
787 if not self.connection:
788 pool.release(conn)
790 def parse_response(self, connection, command_name, **options):
791 """Parses a response from the Redis server"""
792 try:
793 if NEVER_DECODE in options:
794 response = connection.read_response(disable_decoding=True)
795 options.pop(NEVER_DECODE)
796 else:
797 response = connection.read_response()
798 except ResponseError:
799 if EMPTY_RESPONSE in options:
800 return options[EMPTY_RESPONSE]
801 raise
803 if EMPTY_RESPONSE in options:
804 options.pop(EMPTY_RESPONSE)
806 # Remove keys entry, it needs only for cache.
807 options.pop("keys", None)
809 if command_name in self.response_callbacks:
810 return self.response_callbacks[command_name](response, **options)
811 return response
813 def get_cache(self) -> Optional[CacheInterface]:
814 return self.connection_pool.cache
817StrictRedis = Redis
820class Monitor:
821 """
822 Monitor is useful for handling the MONITOR command to the redis server.
823 next_command() method returns one command from monitor
824 listen() method yields commands from monitor.
825 """
827 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)")
828 command_re = re.compile(r'"(.*?)(?<!\\)"')
830 def __init__(self, connection_pool):
831 self.connection_pool = connection_pool
832 self.connection = self.connection_pool.get_connection()
834 def __enter__(self):
835 self._start_monitor()
836 return self
838 def __exit__(self, *args):
839 self.connection.disconnect()
840 self.connection_pool.release(self.connection)
842 def next_command(self):
843 """Parse the response from a monitor command"""
844 response = self.connection.read_response()
846 if response is None:
847 return None
849 if isinstance(response, bytes):
850 response = self.connection.encoder.decode(response, force=True)
852 command_time, command_data = response.split(" ", 1)
853 m = self.monitor_re.match(command_data)
854 db_id, client_info, command = m.groups()
855 command = " ".join(self.command_re.findall(command))
856 # Redis escapes double quotes because each piece of the command
857 # string is surrounded by double quotes. We don't have that
858 # requirement so remove the escaping and leave the quote.
859 command = command.replace('\\"', '"')
861 if client_info == "lua":
862 client_address = "lua"
863 client_port = ""
864 client_type = "lua"
865 elif client_info.startswith("unix"):
866 client_address = "unix"
867 client_port = client_info[5:]
868 client_type = "unix"
869 else:
870 if client_info == "":
871 client_address = ""
872 client_port = ""
873 client_type = "unknown"
874 else:
875 # use rsplit as ipv6 addresses contain colons
876 client_address, client_port = client_info.rsplit(":", 1)
877 client_type = "tcp"
878 return {
879 "time": float(command_time),
880 "db": int(db_id),
881 "client_address": client_address,
882 "client_port": client_port,
883 "client_type": client_type,
884 "command": command,
885 }
887 def listen(self):
888 """Listen for commands coming to the server."""
889 while True:
890 yield self.next_command()
892 def _start_monitor(self):
893 self.connection.send_command("MONITOR")
894 # check that monitor returns 'OK', but don't return it to user
895 response = self.connection.read_response()
897 if not bool_ok(response):
898 raise RedisError(f"MONITOR failed: {response}")
901class PubSub:
902 """
903 PubSub provides publish, subscribe and listen support to Redis channels.
905 After subscribing to one or more channels, the listen() method will block
906 until a message arrives on one of the subscribed channels. That message
907 will be returned and it's safe to start listening again.
908 """
910 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage")
911 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe")
912 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
914 def __init__(
915 self,
916 connection_pool,
917 shard_hint=None,
918 ignore_subscribe_messages: bool = False,
919 encoder: Optional["Encoder"] = None,
920 push_handler_func: Union[None, Callable[[str], None]] = None,
921 event_dispatcher: Optional["EventDispatcher"] = None,
922 ):
923 self.connection_pool = connection_pool
924 self.shard_hint = shard_hint
925 self.ignore_subscribe_messages = ignore_subscribe_messages
926 self.connection = None
927 self.subscribed_event = threading.Event()
928 # we need to know the encoding options for this connection in order
929 # to lookup channel and pattern names for callback handlers.
930 self.encoder = encoder
931 self.push_handler_func = push_handler_func
932 if event_dispatcher is None:
933 self._event_dispatcher = EventDispatcher()
934 else:
935 self._event_dispatcher = event_dispatcher
937 self._lock = threading.RLock()
938 if self.encoder is None:
939 self.encoder = self.connection_pool.get_encoder()
940 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
941 if self.encoder.decode_responses:
942 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
943 else:
944 self.health_check_response = [b"pong", self.health_check_response_b]
945 if self.push_handler_func is None:
946 _set_info_logger()
947 self.reset()
949 def __enter__(self) -> "PubSub":
950 return self
952 def __exit__(self, exc_type, exc_value, traceback) -> None:
953 self.reset()
955 def __del__(self) -> None:
956 try:
957 # if this object went out of scope prior to shutting down
958 # subscriptions, close the connection manually before
959 # returning it to the connection pool
960 self.reset()
961 except Exception:
962 pass
964 def reset(self) -> None:
965 if self.connection:
966 self.connection.disconnect()
967 self.connection.deregister_connect_callback(self.on_connect)
968 self.connection_pool.release(self.connection)
969 self.connection = None
970 self.health_check_response_counter = 0
971 self.channels = {}
972 self.pending_unsubscribe_channels = set()
973 self.shard_channels = {}
974 self.pending_unsubscribe_shard_channels = set()
975 self.patterns = {}
976 self.pending_unsubscribe_patterns = set()
977 self.subscribed_event.clear()
979 def close(self) -> None:
980 self.reset()
982 def on_connect(self, connection) -> None:
983 "Re-subscribe to any channels and patterns previously subscribed to"
984 # NOTE: for python3, we can't pass bytestrings as keyword arguments
985 # so we need to decode channel/pattern names back to unicode strings
986 # before passing them to [p]subscribe.
987 #
988 # However, channels subscribed without a callback (positional args) may
989 # have binary names that are not valid in the current encoding (e.g.
990 # arbitrary bytes that are not valid UTF-8). These channels are stored
991 # with a ``None`` handler. We re-subscribe them as positional args so
992 # that no decoding is required.
993 self.pending_unsubscribe_channels.clear()
994 self.pending_unsubscribe_patterns.clear()
995 self.pending_unsubscribe_shard_channels.clear()
996 if self.channels:
997 channels_with_handlers = {}
998 channels_without_handlers = []
999 for k, v in self.channels.items():
1000 if v is not None:
1001 channels_with_handlers[self.encoder.decode(k, force=True)] = v
1002 else:
1003 channels_without_handlers.append(k)
1004 if channels_with_handlers or channels_without_handlers:
1005 self.subscribe(*channels_without_handlers, **channels_with_handlers)
1006 if self.patterns:
1007 patterns_with_handlers = {}
1008 patterns_without_handlers = []
1009 for k, v in self.patterns.items():
1010 if v is not None:
1011 patterns_with_handlers[self.encoder.decode(k, force=True)] = v
1012 else:
1013 patterns_without_handlers.append(k)
1014 if patterns_with_handlers or patterns_without_handlers:
1015 self.psubscribe(*patterns_without_handlers, **patterns_with_handlers)
1016 if self.shard_channels:
1017 shard_with_handlers = {}
1018 shard_without_handlers = []
1019 for k, v in self.shard_channels.items():
1020 if v is not None:
1021 shard_with_handlers[self.encoder.decode(k, force=True)] = v
1022 else:
1023 shard_without_handlers.append(k)
1024 if shard_with_handlers or shard_without_handlers:
1025 self.ssubscribe(*shard_without_handlers, **shard_with_handlers)
1027 @property
1028 def subscribed(self) -> bool:
1029 """Indicates if there are subscriptions to any channels or patterns"""
1030 return self.subscribed_event.is_set()
1032 def execute_command(self, *args):
1033 """Execute a publish/subscribe command"""
1035 # NOTE: don't parse the response in this function -- it could pull a
1036 # legitimate message off the stack if the connection is already
1037 # subscribed to one or more channels
1039 if self.connection is None:
1040 self.connection = self.connection_pool.get_connection()
1041 # register a callback that re-subscribes to any channels we
1042 # were listening to when we were disconnected
1043 self.connection.register_connect_callback(self.on_connect)
1044 if self.push_handler_func is not None:
1045 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
1046 self._event_dispatcher.dispatch(
1047 AfterPubSubConnectionInstantiationEvent(
1048 self.connection, self.connection_pool, ClientType.SYNC, self._lock
1049 )
1050 )
1051 connection = self.connection
1052 kwargs = {"check_health": not self.subscribed}
1053 if not self.subscribed:
1054 self.clean_health_check_responses()
1055 with self._lock:
1056 self._execute(connection, connection.send_command, *args, **kwargs)
1058 def clean_health_check_responses(self) -> None:
1059 """
1060 If any health check responses are present, clean them
1061 """
1062 ttl = 10
1063 conn = self.connection
1064 while conn and self.health_check_response_counter > 0 and ttl > 0:
1065 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
1066 response = self._execute(conn, conn.read_response)
1067 if self.is_health_check_response(response):
1068 self.health_check_response_counter -= 1
1069 else:
1070 raise PubSubError(
1071 "A non health check response was cleaned by "
1072 "execute_command: {}".format(response)
1073 )
1074 ttl -= 1
1076 def _reconnect(
1077 self,
1078 conn,
1079 error: Optional[Exception] = None,
1080 failure_count: Optional[int] = None,
1081 start_time: Optional[float] = None,
1082 command_name: Optional[str] = None,
1083 ) -> None:
1084 """
1085 The supported exceptions are already checked in the
1086 retry object so we don't need to do it here.
1088 In this error handler we are trying to reconnect to the server.
1089 """
1090 if error and failure_count <= conn.retry.get_retries():
1091 if command_name:
1092 record_operation_duration(
1093 command_name=command_name,
1094 duration_seconds=time.monotonic() - start_time,
1095 server_address=getattr(conn, "host", None),
1096 server_port=getattr(conn, "port", None),
1097 db_namespace=str(conn.db),
1098 error=error,
1099 retry_attempts=failure_count,
1100 )
1101 conn.disconnect()
1102 conn.connect()
1104 def _execute(self, conn, command, *args, **kwargs):
1105 """
1106 Connect manually upon disconnection. If the Redis server is down,
1107 this will fail and raise a ConnectionError as desired.
1108 After reconnection, the ``on_connect`` callback should have been
1109 called by the # connection to resubscribe us to any channels and
1110 patterns we were previously listening to
1111 """
1113 if conn.should_reconnect():
1114 self._reconnect(conn)
1116 if not len(args) == 0:
1117 command_name = args[0]
1118 else:
1119 command_name = None
1121 # Start timing for observability
1122 start_time = time.monotonic()
1123 # Track actual retry attempts for error reporting
1124 actual_retry_attempts = [0]
1126 def failure_callback(error, failure_count):
1127 actual_retry_attempts[0] = failure_count
1128 self._reconnect(conn, error, failure_count, start_time, command_name)
1130 try:
1131 response = conn.retry.call_with_retry(
1132 lambda: command(*args, **kwargs),
1133 failure_callback,
1134 with_failure_count=True,
1135 )
1137 if command_name:
1138 record_operation_duration(
1139 command_name=command_name,
1140 duration_seconds=time.monotonic() - start_time,
1141 server_address=getattr(conn, "host", None),
1142 server_port=getattr(conn, "port", None),
1143 db_namespace=str(conn.db),
1144 )
1146 return response
1147 except Exception as e:
1148 record_error_count(
1149 server_address=getattr(conn, "host", None),
1150 server_port=getattr(conn, "port", None),
1151 network_peer_address=getattr(conn, "host", None),
1152 network_peer_port=getattr(conn, "port", None),
1153 error_type=e,
1154 retry_attempts=actual_retry_attempts[0],
1155 is_internal=False,
1156 )
1157 raise
1159 def parse_response(self, block=True, timeout=0):
1160 """Parse the response from a publish/subscribe command"""
1161 conn = self.connection
1162 if conn is None:
1163 raise RuntimeError(
1164 "pubsub connection not set: "
1165 "did you forget to call subscribe() or psubscribe()?"
1166 )
1168 self.check_health()
1170 def try_read():
1171 if not block:
1172 if not conn.can_read(timeout=timeout):
1173 return None
1174 else:
1175 conn.connect()
1176 return conn.read_response(disconnect_on_error=False, push_request=True)
1178 response = self._execute(conn, try_read)
1180 if self.is_health_check_response(response):
1181 # ignore the health check message as user might not expect it
1182 self.health_check_response_counter -= 1
1183 return None
1184 return response
1186 def is_health_check_response(self, response) -> bool:
1187 """
1188 Check if the response is a health check response.
1189 If there are no subscriptions redis responds to PING command with a
1190 bulk response, instead of a multi-bulk with "pong" and the response.
1191 """
1192 if self.encoder.decode_responses:
1193 return (
1194 response
1195 in [
1196 self.health_check_response, # If there is a subscription
1197 self.HEALTH_CHECK_MESSAGE, # If there are no subscriptions and decode_responses=True
1198 ]
1199 )
1200 else:
1201 return (
1202 response
1203 in [
1204 self.health_check_response, # If there is a subscription
1205 self.health_check_response_b, # If there isn't a subscription and decode_responses=False
1206 ]
1207 )
1209 def check_health(self) -> None:
1210 conn = self.connection
1211 if conn is None:
1212 raise RuntimeError(
1213 "pubsub connection not set: "
1214 "did you forget to call subscribe() or psubscribe()?"
1215 )
1217 if conn.health_check_interval and time.monotonic() > conn.next_health_check:
1218 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
1219 self.health_check_response_counter += 1
1221 def _normalize_keys(self, data) -> Dict:
1222 """
1223 normalize channel/pattern names to be either bytes or strings
1224 based on whether responses are automatically decoded. this saves us
1225 from coercing the value for each message coming in.
1226 """
1227 encode = self.encoder.encode
1228 decode = self.encoder.decode
1229 return {decode(encode(k)): v for k, v in data.items()}
1231 def psubscribe(self, *args, **kwargs):
1232 """
1233 Subscribe to channel patterns. Patterns supplied as keyword arguments
1234 expect a pattern name as the key and a callable as the value. A
1235 pattern's callable will be invoked automatically when a message is
1236 received on that pattern rather than producing a message via
1237 ``listen()``.
1238 """
1239 if args:
1240 args = list_or_args(args[0], args[1:])
1241 new_patterns = dict.fromkeys(args)
1242 new_patterns.update(kwargs)
1243 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
1244 # update the patterns dict AFTER we send the command. we don't want to
1245 # subscribe twice to these patterns, once for the command and again
1246 # for the reconnection.
1247 new_patterns = self._normalize_keys(new_patterns)
1248 self.patterns.update(new_patterns)
1249 if not self.subscribed:
1250 # Set the subscribed_event flag to True
1251 self.subscribed_event.set()
1252 # Clear the health check counter
1253 self.health_check_response_counter = 0
1254 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1255 return ret_val
1257 def punsubscribe(self, *args):
1258 """
1259 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1260 all patterns.
1261 """
1262 if args:
1263 args = list_or_args(args[0], args[1:])
1264 patterns = self._normalize_keys(dict.fromkeys(args))
1265 else:
1266 patterns = self.patterns
1267 self.pending_unsubscribe_patterns.update(patterns)
1268 return self.execute_command("PUNSUBSCRIBE", *args)
1270 def subscribe(self, *args, **kwargs):
1271 """
1272 Subscribe to channels. Channels supplied as keyword arguments expect
1273 a channel name as the key and a callable as the value. A channel's
1274 callable will be invoked automatically when a message is received on
1275 that channel rather than producing a message via ``listen()`` or
1276 ``get_message()``.
1277 """
1278 if args:
1279 args = list_or_args(args[0], args[1:])
1280 new_channels = dict.fromkeys(args)
1281 new_channels.update(kwargs)
1282 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1283 # update the channels dict AFTER we send the command. we don't want to
1284 # subscribe twice to these channels, once for the command and again
1285 # for the reconnection.
1286 new_channels = self._normalize_keys(new_channels)
1287 self.channels.update(new_channels)
1288 if not self.subscribed:
1289 # Set the subscribed_event flag to True
1290 self.subscribed_event.set()
1291 # Clear the health check counter
1292 self.health_check_response_counter = 0
1293 self.pending_unsubscribe_channels.difference_update(new_channels)
1294 return ret_val
1296 def unsubscribe(self, *args):
1297 """
1298 Unsubscribe from the supplied channels. If empty, unsubscribe from
1299 all channels
1300 """
1301 if args:
1302 args = list_or_args(args[0], args[1:])
1303 channels = self._normalize_keys(dict.fromkeys(args))
1304 else:
1305 channels = self.channels
1306 self.pending_unsubscribe_channels.update(channels)
1307 return self.execute_command("UNSUBSCRIBE", *args)
1309 def ssubscribe(self, *args, target_node=None, **kwargs):
1310 """
1311 Subscribes the client to the specified shard channels.
1312 Channels supplied as keyword arguments expect a channel name as the key
1313 and a callable as the value. A channel's callable will be invoked automatically
1314 when a message is received on that channel rather than producing a message via
1315 ``listen()`` or ``get_sharded_message()``.
1316 """
1317 if args:
1318 args = list_or_args(args[0], args[1:])
1319 new_s_channels = dict.fromkeys(args)
1320 new_s_channels.update(kwargs)
1321 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys())
1322 # update the s_channels dict AFTER we send the command. we don't want to
1323 # subscribe twice to these channels, once for the command and again
1324 # for the reconnection.
1325 new_s_channels = self._normalize_keys(new_s_channels)
1326 self.shard_channels.update(new_s_channels)
1327 if not self.subscribed:
1328 # Set the subscribed_event flag to True
1329 self.subscribed_event.set()
1330 # Clear the health check counter
1331 self.health_check_response_counter = 0
1332 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels)
1333 return ret_val
1335 def sunsubscribe(self, *args, target_node=None):
1336 """
1337 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
1338 all shard_channels
1339 """
1340 if args:
1341 args = list_or_args(args[0], args[1:])
1342 s_channels = self._normalize_keys(dict.fromkeys(args))
1343 else:
1344 s_channels = self.shard_channels
1345 self.pending_unsubscribe_shard_channels.update(s_channels)
1346 return self.execute_command("SUNSUBSCRIBE", *args)
1348 def listen(self):
1349 "Listen for messages on channels this client has been subscribed to"
1350 while self.subscribed:
1351 response = self.handle_message(self.parse_response(block=True))
1352 if response is not None:
1353 yield response
1355 def get_message(
1356 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
1357 ):
1358 """
1359 Get the next message if one is available, otherwise None.
1361 If timeout is specified, the system will wait for `timeout` seconds
1362 before returning. Timeout should be specified as a floating point
1363 number, or None, to wait indefinitely.
1364 """
1365 if not self.subscribed:
1366 # Wait for subscription
1367 start_time = time.monotonic()
1368 if self.subscribed_event.wait(timeout) is True:
1369 # The connection was subscribed during the timeout time frame.
1370 # The timeout should be adjusted based on the time spent
1371 # waiting for the subscription
1372 time_spent = time.monotonic() - start_time
1373 timeout = max(0.0, timeout - time_spent)
1374 else:
1375 # The connection isn't subscribed to any channels or patterns,
1376 # so no messages are available
1377 return None
1379 response = self.parse_response(block=(timeout is None), timeout=timeout)
1381 if response:
1382 return self.handle_message(response, ignore_subscribe_messages)
1383 return None
1385 get_sharded_message = get_message
1387 def ping(self, message: Union[str, None] = None) -> bool:
1388 """
1389 Ping the Redis server to test connectivity.
1391 Sends a PING command to the Redis server and returns True if the server
1392 responds with "PONG".
1393 """
1394 args = ["PING", message] if message is not None else ["PING"]
1395 return self.execute_command(*args)
1397 def handle_message(self, response, ignore_subscribe_messages=False):
1398 """
1399 Parses a pub/sub message. If the channel or pattern was subscribed to
1400 with a message handler, the handler is invoked instead of a parsed
1401 message being returned.
1402 """
1403 if response is None:
1404 return None
1405 if isinstance(response, bytes):
1406 response = [b"pong", response] if response != b"PONG" else [b"pong", b""]
1408 message_type = str_if_bytes(response[0])
1409 if message_type == "pmessage":
1410 message = {
1411 "type": message_type,
1412 "pattern": response[1],
1413 "channel": response[2],
1414 "data": response[3],
1415 }
1416 elif message_type == "pong":
1417 message = {
1418 "type": message_type,
1419 "pattern": None,
1420 "channel": None,
1421 "data": response[1],
1422 }
1423 else:
1424 message = {
1425 "type": message_type,
1426 "pattern": None,
1427 "channel": response[1],
1428 "data": response[2],
1429 }
1431 if message_type in ["message", "pmessage"]:
1432 channel = str_if_bytes(message["channel"])
1433 record_pubsub_message(
1434 direction=PubSubDirection.RECEIVE,
1435 channel=channel,
1436 )
1437 elif message_type == "smessage":
1438 channel = str_if_bytes(message["channel"])
1439 record_pubsub_message(
1440 direction=PubSubDirection.RECEIVE,
1441 channel=channel,
1442 sharded=True,
1443 )
1445 # if this is an unsubscribe message, remove it from memory
1446 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1447 if message_type == "punsubscribe":
1448 pattern = response[1]
1449 if pattern in self.pending_unsubscribe_patterns:
1450 self.pending_unsubscribe_patterns.remove(pattern)
1451 self.patterns.pop(pattern, None)
1452 elif message_type == "sunsubscribe":
1453 s_channel = response[1]
1454 if s_channel in self.pending_unsubscribe_shard_channels:
1455 self.pending_unsubscribe_shard_channels.remove(s_channel)
1456 self.shard_channels.pop(s_channel, None)
1457 else:
1458 channel = response[1]
1459 if channel in self.pending_unsubscribe_channels:
1460 self.pending_unsubscribe_channels.remove(channel)
1461 self.channels.pop(channel, None)
1462 if not self.channels and not self.patterns and not self.shard_channels:
1463 # There are no subscriptions anymore, set subscribed_event flag
1464 # to false
1465 self.subscribed_event.clear()
1467 if message_type in self.PUBLISH_MESSAGE_TYPES:
1468 # if there's a message handler, invoke it
1469 if message_type == "pmessage":
1470 handler = self.patterns.get(message["pattern"], None)
1471 elif message_type == "smessage":
1472 handler = self.shard_channels.get(message["channel"], None)
1473 else:
1474 handler = self.channels.get(message["channel"], None)
1475 if handler:
1476 handler(message)
1477 return None
1478 elif message_type != "pong":
1479 # this is a subscribe/unsubscribe message. ignore if we don't
1480 # want them
1481 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1482 return None
1484 return message
1486 def run_in_thread(
1487 self,
1488 sleep_time: float = 0.0,
1489 daemon: bool = False,
1490 exception_handler: Optional[Callable] = None,
1491 pubsub=None,
1492 sharded_pubsub: bool = False,
1493 ) -> "PubSubWorkerThread":
1494 for channel, handler in self.channels.items():
1495 if handler is None:
1496 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1497 for pattern, handler in self.patterns.items():
1498 if handler is None:
1499 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1500 for s_channel, handler in self.shard_channels.items():
1501 if handler is None:
1502 raise PubSubError(
1503 f"Shard Channel: '{s_channel}' has no handler registered"
1504 )
1506 pubsub = self if pubsub is None else pubsub
1507 thread = PubSubWorkerThread(
1508 pubsub,
1509 sleep_time,
1510 daemon=daemon,
1511 exception_handler=exception_handler,
1512 sharded_pubsub=sharded_pubsub,
1513 )
1514 thread.start()
1515 return thread
1518class PubSubWorkerThread(threading.Thread):
1519 def __init__(
1520 self,
1521 pubsub,
1522 sleep_time: float,
1523 daemon: bool = False,
1524 exception_handler: Union[
1525 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
1526 ] = None,
1527 sharded_pubsub: bool = False,
1528 ):
1529 super().__init__()
1530 self.daemon = daemon
1531 self.pubsub = pubsub
1532 self.sleep_time = sleep_time
1533 self.exception_handler = exception_handler
1534 self.sharded_pubsub = sharded_pubsub
1535 self._running = threading.Event()
1537 def run(self) -> None:
1538 if self._running.is_set():
1539 return
1540 self._running.set()
1541 pubsub = self.pubsub
1542 sleep_time = self.sleep_time
1543 while self._running.is_set():
1544 try:
1545 if not self.sharded_pubsub:
1546 pubsub.get_message(
1547 ignore_subscribe_messages=True, timeout=sleep_time
1548 )
1549 else:
1550 pubsub.get_sharded_message(
1551 ignore_subscribe_messages=True, timeout=sleep_time
1552 )
1553 except BaseException as e:
1554 if self.exception_handler is None:
1555 raise
1556 self.exception_handler(e, pubsub, self)
1557 pubsub.close()
1559 def stop(self) -> None:
1560 # trip the flag so the run loop exits. the run loop will
1561 # close the pubsub connection, which disconnects the socket
1562 # and returns the connection to the pool.
1563 self._running.clear()
1566class Pipeline(Redis):
1567 """
1568 Pipelines provide a way to transmit multiple commands to the Redis server
1569 in one transmission. This is convenient for batch processing, such as
1570 saving all the values in a list to Redis.
1572 All commands executed within a pipeline(when running in transactional mode,
1573 which is the default behavior) are wrapped with MULTI and EXEC
1574 calls. This guarantees all commands executed in the pipeline will be
1575 executed atomically.
1577 Any command raising an exception does *not* halt the execution of
1578 subsequent commands in the pipeline. Instead, the exception is caught
1579 and its instance is placed into the response list returned by execute().
1580 Code iterating over the response list should be able to deal with an
1581 instance of an exception as a potential value. In general, these will be
1582 ResponseError exceptions, such as those raised when issuing a command
1583 on a key of a different datatype.
1584 """
1586 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1588 def __init__(
1589 self,
1590 connection_pool: ConnectionPool,
1591 response_callbacks,
1592 transaction,
1593 shard_hint,
1594 ):
1595 self.connection_pool = connection_pool
1596 self.connection: Optional[Connection] = None
1597 self.response_callbacks = response_callbacks
1598 self.transaction = transaction
1599 self.shard_hint = shard_hint
1600 self.watching = False
1601 self.command_stack = []
1602 self.scripts: Set[Script] = set()
1603 self.explicit_transaction = False
1605 def __enter__(self) -> "Pipeline":
1606 return self
1608 def __exit__(self, exc_type, exc_value, traceback):
1609 self.reset()
1611 def __del__(self):
1612 try:
1613 self.reset()
1614 except Exception:
1615 pass
1617 def __len__(self) -> int:
1618 return len(self.command_stack)
1620 def __bool__(self) -> bool:
1621 """Pipeline instances should always evaluate to True"""
1622 return True
1624 def reset(self) -> None:
1625 self.command_stack = []
1626 self.scripts = set()
1627 # make sure to reset the connection state in the event that we were
1628 # watching something
1629 if self.watching and self.connection:
1630 try:
1631 # call this manually since our unwatch or
1632 # immediate_execute_command methods can call reset()
1633 self.connection.send_command("UNWATCH")
1634 self.connection.read_response()
1635 except ConnectionError:
1636 # disconnect will also remove any previous WATCHes
1637 self.connection.disconnect()
1638 # clean up the other instance attributes
1639 self.watching = False
1640 self.explicit_transaction = False
1642 # we can safely return the connection to the pool here since we're
1643 # sure we're no longer WATCHing anything
1644 if self.connection:
1645 self.connection_pool.release(self.connection)
1646 self.connection = None
1648 def close(self) -> None:
1649 """Close the pipeline"""
1650 self.reset()
1652 def multi(self) -> None:
1653 """
1654 Start a transactional block of the pipeline after WATCH commands
1655 are issued. End the transactional block with `execute`.
1656 """
1657 if self.explicit_transaction:
1658 raise RedisError("Cannot issue nested calls to MULTI")
1659 if self.command_stack:
1660 raise RedisError(
1661 "Commands without an initial WATCH have already been issued"
1662 )
1663 self.explicit_transaction = True
1665 def execute_command(self, *args, **kwargs):
1666 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1667 return self.immediate_execute_command(*args, **kwargs)
1668 return self.pipeline_execute_command(*args, **kwargs)
1670 def _disconnect_reset_raise_on_watching(
1671 self,
1672 conn: AbstractConnection,
1673 error: Exception,
1674 failure_count: Optional[int] = None,
1675 start_time: Optional[float] = None,
1676 command_name: Optional[str] = None,
1677 ) -> None:
1678 """
1679 Close the connection reset watching state and
1680 raise an exception if we were watching.
1682 The supported exceptions are already checked in the
1683 retry object so we don't need to do it here.
1685 After we disconnect the connection, it will try to reconnect and
1686 do a health check as part of the send_command logic(on connection level).
1687 """
1688 if error and failure_count <= conn.retry.get_retries():
1689 record_operation_duration(
1690 command_name=command_name,
1691 duration_seconds=time.monotonic() - start_time,
1692 server_address=getattr(conn, "host", None),
1693 server_port=getattr(conn, "port", None),
1694 db_namespace=str(conn.db),
1695 error=error,
1696 retry_attempts=failure_count,
1697 )
1698 conn.disconnect()
1700 # if we were already watching a variable, the watch is no longer
1701 # valid since this connection has died. raise a WatchError, which
1702 # indicates the user should retry this transaction.
1703 if self.watching:
1704 self.reset()
1705 raise WatchError(
1706 f"A {type(error).__name__} occurred while watching one or more keys"
1707 )
1709 def immediate_execute_command(self, *args, **options):
1710 """
1711 Execute a command immediately, but don't auto-retry on the supported
1712 errors for retry if we're already WATCHing a variable.
1713 Used when issuing WATCH or subsequent commands retrieving their values but before
1714 MULTI is called.
1715 """
1716 command_name = args[0]
1717 conn = self.connection
1718 # if this is the first call, we need a connection
1719 if not conn:
1720 conn = self.connection_pool.get_connection()
1721 self.connection = conn
1723 # Start timing for observability
1724 start_time = time.monotonic()
1725 # Track actual retry attempts for error reporting
1726 actual_retry_attempts = [0]
1728 def failure_callback(error, failure_count):
1729 if is_debug_log_enabled():
1730 add_debug_log_for_operation_failure(conn)
1731 actual_retry_attempts[0] = failure_count
1732 self._disconnect_reset_raise_on_watching(
1733 conn, error, failure_count, start_time, command_name
1734 )
1736 try:
1737 response = conn.retry.call_with_retry(
1738 lambda: self._send_command_parse_response(
1739 conn, command_name, *args, **options
1740 ),
1741 failure_callback,
1742 with_failure_count=True,
1743 )
1745 record_operation_duration(
1746 command_name=command_name,
1747 duration_seconds=time.monotonic() - start_time,
1748 server_address=getattr(conn, "host", None),
1749 server_port=getattr(conn, "port", None),
1750 db_namespace=str(conn.db),
1751 )
1753 return response
1754 except Exception as e:
1755 record_error_count(
1756 server_address=getattr(conn, "host", None),
1757 server_port=getattr(conn, "port", None),
1758 network_peer_address=getattr(conn, "host", None),
1759 network_peer_port=getattr(conn, "port", None),
1760 error_type=e,
1761 retry_attempts=actual_retry_attempts[0],
1762 is_internal=False,
1763 )
1764 raise
1766 def pipeline_execute_command(self, *args, **options) -> "Pipeline":
1767 """
1768 Stage a command to be executed when execute() is next called
1770 Returns the current Pipeline object back so commands can be
1771 chained together, such as:
1773 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1775 At some other point, you can then run: pipe.execute(),
1776 which will execute all commands queued in the pipe.
1777 """
1778 self.command_stack.append((args, options))
1779 return self
1781 def _execute_transaction(
1782 self, connection: Connection, commands, raise_on_error
1783 ) -> List:
1784 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1785 all_cmds = connection.pack_commands(
1786 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1787 )
1788 connection.send_packed_command(all_cmds)
1789 errors = []
1791 # parse off the response for MULTI
1792 # NOTE: we need to handle ResponseErrors here and continue
1793 # so that we read all the additional command messages from
1794 # the socket
1795 try:
1796 self.parse_response(connection, "_")
1797 except ResponseError as e:
1798 errors.append((0, e))
1800 # and all the other commands
1801 for i, command in enumerate(commands):
1802 if EMPTY_RESPONSE in command[1]:
1803 errors.append((i, command[1][EMPTY_RESPONSE]))
1804 else:
1805 try:
1806 self.parse_response(connection, "_")
1807 except ResponseError as e:
1808 self.annotate_exception(e, i + 1, command[0])
1809 errors.append((i, e))
1811 # parse the EXEC.
1812 try:
1813 response = self.parse_response(connection, "_")
1814 except ExecAbortError:
1815 if errors:
1816 raise errors[0][1]
1817 raise
1819 # EXEC clears any watched keys
1820 self.watching = False
1822 if response is None:
1823 raise WatchError("Watched variable changed.")
1825 # put any parse errors into the response
1826 for i, e in errors:
1827 response.insert(i, e)
1829 if len(response) != len(commands):
1830 self.connection.disconnect()
1831 raise ResponseError(
1832 "Wrong number of response items from pipeline execution"
1833 )
1835 # find any errors in the response and raise if necessary
1836 if raise_on_error:
1837 self.raise_first_error(commands, response)
1839 # We have to run response callbacks manually
1840 data = []
1841 for r, cmd in zip(response, commands):
1842 if not isinstance(r, Exception):
1843 args, options = cmd
1844 # Remove keys entry, it needs only for cache.
1845 options.pop("keys", None)
1846 command_name = args[0]
1847 if command_name in self.response_callbacks:
1848 r = self.response_callbacks[command_name](r, **options)
1849 data.append(r)
1851 return data
1853 def _execute_pipeline(self, connection, commands, raise_on_error):
1854 # build up all commands into a single request to increase network perf
1855 all_cmds = connection.pack_commands([args for args, _ in commands])
1856 connection.send_packed_command(all_cmds)
1858 responses = []
1859 for args, options in commands:
1860 try:
1861 responses.append(self.parse_response(connection, args[0], **options))
1862 except ResponseError as e:
1863 responses.append(e)
1865 if raise_on_error:
1866 self.raise_first_error(commands, responses)
1868 return responses
1870 def raise_first_error(self, commands, response):
1871 for i, r in enumerate(response):
1872 if isinstance(r, ResponseError):
1873 self.annotate_exception(r, i + 1, commands[i][0])
1874 raise r
1876 def annotate_exception(self, exception, number, command):
1877 cmd = " ".join(map(safe_str, command))
1878 msg = (
1879 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
1880 f"caused error: {exception.args[0]}"
1881 )
1882 exception.args = (msg,) + exception.args[1:]
1884 def parse_response(self, connection, command_name, **options):
1885 result = Redis.parse_response(self, connection, command_name, **options)
1886 if command_name in self.UNWATCH_COMMANDS:
1887 self.watching = False
1888 elif command_name == "WATCH":
1889 self.watching = True
1890 return result
1892 def load_scripts(self):
1893 # make sure all scripts that are about to be run on this pipeline exist
1894 scripts = list(self.scripts)
1895 immediate = self.immediate_execute_command
1896 shas = [s.sha for s in scripts]
1897 # we can't use the normal script_* methods because they would just
1898 # get buffered in the pipeline.
1899 exists = immediate("SCRIPT EXISTS", *shas)
1900 if not all(exists):
1901 for s, exist in zip(scripts, exists):
1902 if not exist:
1903 s.sha = immediate("SCRIPT LOAD", s.script)
1905 def _disconnect_raise_on_watching(
1906 self,
1907 conn: AbstractConnection,
1908 error: Exception,
1909 failure_count: Optional[int] = None,
1910 start_time: Optional[float] = None,
1911 command_name: Optional[str] = None,
1912 ) -> None:
1913 """
1914 Close the connection, raise an exception if we were watching.
1916 The supported exceptions are already checked in the
1917 retry object so we don't need to do it here.
1919 After we disconnect the connection, it will try to reconnect and
1920 do a health check as part of the send_command logic(on connection level).
1921 """
1922 if error and failure_count <= conn.retry.get_retries():
1923 record_operation_duration(
1924 command_name=command_name,
1925 duration_seconds=time.monotonic() - start_time,
1926 server_address=getattr(conn, "host", None),
1927 server_port=getattr(conn, "port", None),
1928 db_namespace=str(conn.db),
1929 error=error,
1930 retry_attempts=failure_count,
1931 )
1932 conn.disconnect()
1933 # if we were watching a variable, the watch is no longer valid
1934 # since this connection has died. raise a WatchError, which
1935 # indicates the user should retry this transaction.
1936 if self.watching:
1937 raise WatchError(
1938 f"A {type(error).__name__} occurred while watching one or more keys"
1939 )
1941 def execute(self, raise_on_error: bool = True) -> List[Any]:
1942 """Execute all the commands in the current pipeline"""
1943 stack = self.command_stack
1944 if not stack and not self.watching:
1945 return []
1946 if self.scripts:
1947 self.load_scripts()
1948 if self.transaction or self.explicit_transaction:
1949 execute = self._execute_transaction
1950 operation_name = "MULTI"
1951 else:
1952 execute = self._execute_pipeline
1953 operation_name = "PIPELINE"
1955 conn = self.connection
1956 if not conn:
1957 conn = self.connection_pool.get_connection()
1958 # assign to self.connection so reset() releases the connection
1959 # back to the pool after we're done
1960 self.connection = conn
1962 # Start timing for observability
1963 start_time = time.monotonic()
1964 # Track actual retry attempts for error reporting
1965 actual_retry_attempts = [0]
1967 def failure_callback(error, failure_count):
1968 if is_debug_log_enabled():
1969 add_debug_log_for_operation_failure(conn)
1970 actual_retry_attempts[0] = failure_count
1971 self._disconnect_raise_on_watching(
1972 conn, error, failure_count, start_time, operation_name
1973 )
1975 try:
1976 response = conn.retry.call_with_retry(
1977 lambda: execute(conn, stack, raise_on_error),
1978 failure_callback,
1979 with_failure_count=True,
1980 )
1982 record_operation_duration(
1983 command_name=operation_name,
1984 duration_seconds=time.monotonic() - start_time,
1985 server_address=getattr(conn, "host", None),
1986 server_port=getattr(conn, "port", None),
1987 db_namespace=str(conn.db),
1988 )
1989 return response
1990 except Exception as e:
1991 record_error_count(
1992 server_address=getattr(conn, "host", None),
1993 server_port=getattr(conn, "port", None),
1994 network_peer_address=getattr(conn, "host", None),
1995 network_peer_port=getattr(conn, "port", None),
1996 error_type=e,
1997 retry_attempts=actual_retry_attempts[0],
1998 is_internal=False,
1999 )
2000 raise
2002 finally:
2003 # in reset() the connection is disconnected before returned to the pool if
2004 # it is marked for reconnect.
2005 self.reset()
2007 def discard(self):
2008 """
2009 Flushes all previously queued commands
2010 See: https://redis.io/commands/DISCARD
2011 """
2012 self.execute_command("DISCARD")
2014 def watch(self, *names):
2015 """Watches the values at keys ``names``"""
2016 if self.explicit_transaction:
2017 raise RedisError("Cannot issue a WATCH after a MULTI")
2018 return self.execute_command("WATCH", *names)
2020 def unwatch(self) -> bool:
2021 """Unwatches all previously specified keys"""
2022 return self.watching and self.execute_command("UNWATCH") or True