Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import re
3import threading
4import time
5from itertools import chain
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Callable,
10 Dict,
11 List,
12 Mapping,
13 Optional,
14 Set,
15 Type,
16 Union,
17)
19from redis._parsers.encoders import Encoder
20from redis._parsers.helpers import (
21 _RedisCallbacks,
22 _RedisCallbacksRESP2,
23 _RedisCallbacksRESP3,
24 bool_ok,
25)
26from redis.backoff import ExponentialWithJitterBackoff
27from redis.cache import CacheConfig, CacheInterface
28from redis.commands import (
29 CoreCommands,
30 RedisModuleCommands,
31 SentinelCommands,
32 list_or_args,
33)
34from redis.commands.core import Script
35from redis.connection import (
36 AbstractConnection,
37 Connection,
38 ConnectionPool,
39 SSLConnection,
40 UnixDomainSocketConnection,
41)
42from redis.credentials import CredentialProvider
43from redis.driver_info import DriverInfo, resolve_driver_info
44from redis.event import (
45 AfterPooledConnectionsInstantiationEvent,
46 AfterPubSubConnectionInstantiationEvent,
47 AfterSingleConnectionInstantiationEvent,
48 ClientType,
49 EventDispatcher,
50)
51from redis.exceptions import (
52 ConnectionError,
53 ExecAbortError,
54 PubSubError,
55 RedisError,
56 ResponseError,
57 WatchError,
58)
59from redis.lock import Lock
60from redis.maint_notifications import (
61 MaintNotificationsConfig,
62)
63from redis.retry import Retry
64from redis.utils import (
65 _set_info_logger,
66 deprecated_args,
67 safe_str,
68 str_if_bytes,
69 truncate_text,
70)
72if TYPE_CHECKING:
73 import ssl
75 import OpenSSL
77SYM_EMPTY = b""
78EMPTY_RESPONSE = "EMPTY_RESPONSE"
80# some responses (ie. dump) are binary, and just meant to never be decoded
81NEVER_DECODE = "NEVER_DECODE"
84class CaseInsensitiveDict(dict):
85 "Case insensitive dict implementation. Assumes string keys only."
87 def __init__(self, data: Dict[str, str]) -> None:
88 for k, v in data.items():
89 self[k.upper()] = v
91 def __contains__(self, k):
92 return super().__contains__(k.upper())
94 def __delitem__(self, k):
95 super().__delitem__(k.upper())
97 def __getitem__(self, k):
98 return super().__getitem__(k.upper())
100 def get(self, k, default=None):
101 return super().get(k.upper(), default)
103 def __setitem__(self, k, v):
104 super().__setitem__(k.upper(), v)
106 def update(self, data):
107 data = CaseInsensitiveDict(data)
108 super().update(data)
111class AbstractRedis:
112 pass
115class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
116 """
117 Implementation of the Redis protocol.
119 This abstract class provides a Python interface to all Redis commands
120 and an implementation of the Redis protocol.
122 Pipelines derive from this, implementing how
123 the commands are sent and received to the Redis server. Based on
124 configuration, an instance will either use a ConnectionPool, or
125 Connection object to talk to redis.
127 It is not safe to pass PubSub or Pipeline objects between threads.
128 """
130 @classmethod
131 def from_url(cls, url: str, **kwargs) -> "Redis":
132 """
133 Return a Redis client object configured from the given URL
135 For example::
137 redis://[[username]:[password]]@localhost:6379/0
138 rediss://[[username]:[password]]@localhost:6379/0
139 unix://[username@]/path/to/socket.sock?db=0[&password=password]
141 Three URL schemes are supported:
143 - `redis://` creates a TCP socket connection. See more at:
144 <https://www.iana.org/assignments/uri-schemes/prov/redis>
145 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
146 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
147 - ``unix://``: creates a Unix Domain Socket connection.
149 The username, password, hostname, path and all querystring values
150 are passed through urllib.parse.unquote in order to replace any
151 percent-encoded values with their corresponding characters.
153 There are several ways to specify a database number. The first value
154 found will be used:
156 1. A ``db`` querystring option, e.g. redis://localhost?db=0
157 2. If using the redis:// or rediss:// schemes, the path argument
158 of the url, e.g. redis://localhost/0
159 3. A ``db`` keyword argument to this function.
161 If none of these options are specified, the default db=0 is used.
163 All querystring options are cast to their appropriate Python types.
164 Boolean arguments can be specified with string values "True"/"False"
165 or "Yes"/"No". Values that cannot be properly cast cause a
166 ``ValueError`` to be raised. Once parsed, the querystring arguments
167 and keyword arguments are passed to the ``ConnectionPool``'s
168 class initializer. In the case of conflicting arguments, querystring
169 arguments always win.
171 """
172 single_connection_client = kwargs.pop("single_connection_client", False)
173 connection_pool = ConnectionPool.from_url(url, **kwargs)
174 client = cls(
175 connection_pool=connection_pool,
176 single_connection_client=single_connection_client,
177 )
178 client.auto_close_connection_pool = True
179 return client
181 @classmethod
182 def from_pool(
183 cls: Type["Redis"],
184 connection_pool: ConnectionPool,
185 ) -> "Redis":
186 """
187 Return a Redis client from the given connection pool.
188 The Redis client will take ownership of the connection pool and
189 close it when the Redis client is closed.
190 """
191 client = cls(
192 connection_pool=connection_pool,
193 )
194 client.auto_close_connection_pool = True
195 return client
197 @deprecated_args(
198 args_to_warn=["retry_on_timeout"],
199 reason="TimeoutError is included by default.",
200 version="6.0.0",
201 )
202 @deprecated_args(
203 args_to_warn=["lib_name", "lib_version"],
204 reason="Use 'driver_info' parameter instead. "
205 "lib_name and lib_version will be removed in a future version.",
206 )
207 def __init__(
208 self,
209 host: str = "localhost",
210 port: int = 6379,
211 db: int = 0,
212 password: Optional[str] = None,
213 socket_timeout: Optional[float] = None,
214 socket_connect_timeout: Optional[float] = None,
215 socket_keepalive: Optional[bool] = None,
216 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
217 connection_pool: Optional[ConnectionPool] = None,
218 unix_socket_path: Optional[str] = None,
219 encoding: str = "utf-8",
220 encoding_errors: str = "strict",
221 decode_responses: bool = False,
222 retry_on_timeout: bool = False,
223 retry: Retry = Retry(
224 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
225 ),
226 retry_on_error: Optional[List[Type[Exception]]] = None,
227 ssl: bool = False,
228 ssl_keyfile: Optional[str] = None,
229 ssl_certfile: Optional[str] = None,
230 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
231 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
232 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
233 ssl_ca_certs: Optional[str] = None,
234 ssl_ca_path: Optional[str] = None,
235 ssl_ca_data: Optional[str] = None,
236 ssl_check_hostname: bool = True,
237 ssl_password: Optional[str] = None,
238 ssl_validate_ocsp: bool = False,
239 ssl_validate_ocsp_stapled: bool = False,
240 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
241 ssl_ocsp_expected_cert: Optional[str] = None,
242 ssl_min_version: Optional["ssl.TLSVersion"] = None,
243 ssl_ciphers: Optional[str] = None,
244 max_connections: Optional[int] = None,
245 single_connection_client: bool = False,
246 health_check_interval: int = 0,
247 client_name: Optional[str] = None,
248 lib_name: Optional[str] = None,
249 lib_version: Optional[str] = None,
250 driver_info: Optional["DriverInfo"] = None,
251 username: Optional[str] = None,
252 redis_connect_func: Optional[Callable[[], None]] = None,
253 credential_provider: Optional[CredentialProvider] = None,
254 protocol: Optional[int] = 2,
255 cache: Optional[CacheInterface] = None,
256 cache_config: Optional[CacheConfig] = None,
257 event_dispatcher: Optional[EventDispatcher] = None,
258 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
259 ) -> None:
260 """
261 Initialize a new Redis client.
263 To specify a retry policy for specific errors, you have two options:
265 1. Set the `retry_on_error` to a list of the error/s to retry on, and
266 you can also set `retry` to a valid `Retry` object(in case the default
267 one is not appropriate) - with this approach the retries will be triggered
268 on the default errors specified in the Retry object enriched with the
269 errors specified in `retry_on_error`.
271 2. Define a `Retry` object with configured 'supported_errors' and set
272 it to the `retry` parameter - with this approach you completely redefine
273 the errors on which retries will happen.
275 `retry_on_timeout` is deprecated - please include the TimeoutError
276 either in the Retry object or in the `retry_on_error` list.
278 When 'connection_pool' is provided - the retry configuration of the
279 provided pool will be used.
281 Args:
283 single_connection_client:
284 if `True`, connection pool is not used. In that case `Redis`
285 instance use is not thread safe.
286 decode_responses:
287 if `True`, the response will be decoded to utf-8.
288 Argument is ignored when connection_pool is provided.
289 driver_info:
290 Optional DriverInfo object to identify upstream libraries.
291 If provided, lib_name and lib_version are ignored.
292 If not provided, a DriverInfo will be created from lib_name and lib_version.
293 Argument is ignored when connection_pool is provided.
294 lib_name:
295 **Deprecated.** Use driver_info instead. Library name for CLIENT SETINFO.
296 lib_version:
297 **Deprecated.** Use driver_info instead. Library version for CLIENT SETINFO.
298 maint_notifications_config:
299 configuration the pool to support maintenance notifications - see
300 `redis.maint_notifications.MaintNotificationsConfig` for details.
301 Only supported with RESP3
302 If not provided and protocol is RESP3, the maintenance notifications
303 will be enabled by default (logic is included in the connection pool
304 initialization).
305 Argument is ignored when connection_pool is provided.
306 """
307 if event_dispatcher is None:
308 self._event_dispatcher = EventDispatcher()
309 else:
310 self._event_dispatcher = event_dispatcher
311 if not connection_pool:
312 if not retry_on_error:
313 retry_on_error = []
315 # Handle driver_info: if provided, use it; otherwise create from lib_name/lib_version
316 computed_driver_info = resolve_driver_info(
317 driver_info, lib_name, lib_version
318 )
320 kwargs = {
321 "db": db,
322 "username": username,
323 "password": password,
324 "socket_timeout": socket_timeout,
325 "encoding": encoding,
326 "encoding_errors": encoding_errors,
327 "decode_responses": decode_responses,
328 "retry_on_error": retry_on_error,
329 "retry": copy.deepcopy(retry),
330 "max_connections": max_connections,
331 "health_check_interval": health_check_interval,
332 "client_name": client_name,
333 "driver_info": computed_driver_info,
334 "redis_connect_func": redis_connect_func,
335 "credential_provider": credential_provider,
336 "protocol": protocol,
337 }
338 # based on input, setup appropriate connection args
339 if unix_socket_path is not None:
340 kwargs.update(
341 {
342 "path": unix_socket_path,
343 "connection_class": UnixDomainSocketConnection,
344 }
345 )
346 else:
347 # TCP specific options
348 kwargs.update(
349 {
350 "host": host,
351 "port": port,
352 "socket_connect_timeout": socket_connect_timeout,
353 "socket_keepalive": socket_keepalive,
354 "socket_keepalive_options": socket_keepalive_options,
355 }
356 )
358 if ssl:
359 kwargs.update(
360 {
361 "connection_class": SSLConnection,
362 "ssl_keyfile": ssl_keyfile,
363 "ssl_certfile": ssl_certfile,
364 "ssl_cert_reqs": ssl_cert_reqs,
365 "ssl_include_verify_flags": ssl_include_verify_flags,
366 "ssl_exclude_verify_flags": ssl_exclude_verify_flags,
367 "ssl_ca_certs": ssl_ca_certs,
368 "ssl_ca_data": ssl_ca_data,
369 "ssl_check_hostname": ssl_check_hostname,
370 "ssl_password": ssl_password,
371 "ssl_ca_path": ssl_ca_path,
372 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
373 "ssl_validate_ocsp": ssl_validate_ocsp,
374 "ssl_ocsp_context": ssl_ocsp_context,
375 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
376 "ssl_min_version": ssl_min_version,
377 "ssl_ciphers": ssl_ciphers,
378 }
379 )
380 if (cache_config or cache) and protocol in [3, "3"]:
381 kwargs.update(
382 {
383 "cache": cache,
384 "cache_config": cache_config,
385 }
386 )
387 maint_notifications_enabled = (
388 maint_notifications_config and maint_notifications_config.enabled
389 )
390 if maint_notifications_enabled and protocol not in [
391 3,
392 "3",
393 ]:
394 raise RedisError(
395 "Maintenance notifications handlers on connection are only supported with RESP version 3"
396 )
397 if maint_notifications_config:
398 kwargs.update(
399 {
400 "maint_notifications_config": maint_notifications_config,
401 }
402 )
403 connection_pool = ConnectionPool(**kwargs)
404 self._event_dispatcher.dispatch(
405 AfterPooledConnectionsInstantiationEvent(
406 [connection_pool], ClientType.SYNC, credential_provider
407 )
408 )
409 self.auto_close_connection_pool = True
410 else:
411 self.auto_close_connection_pool = False
412 self._event_dispatcher.dispatch(
413 AfterPooledConnectionsInstantiationEvent(
414 [connection_pool], ClientType.SYNC, credential_provider
415 )
416 )
418 self.connection_pool = connection_pool
420 if (cache_config or cache) and self.connection_pool.get_protocol() not in [
421 3,
422 "3",
423 ]:
424 raise RedisError("Client caching is only supported with RESP version 3")
426 self.single_connection_lock = threading.RLock()
427 self.connection = None
428 self._single_connection_client = single_connection_client
429 if self._single_connection_client:
430 self.connection = self.connection_pool.get_connection()
431 self._event_dispatcher.dispatch(
432 AfterSingleConnectionInstantiationEvent(
433 self.connection, ClientType.SYNC, self.single_connection_lock
434 )
435 )
437 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks)
439 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
440 self.response_callbacks.update(_RedisCallbacksRESP3)
441 else:
442 self.response_callbacks.update(_RedisCallbacksRESP2)
444 def __repr__(self) -> str:
445 return (
446 f"<{type(self).__module__}.{type(self).__name__}"
447 f"({repr(self.connection_pool)})>"
448 )
450 def get_encoder(self) -> "Encoder":
451 """Get the connection pool's encoder"""
452 return self.connection_pool.get_encoder()
454 def get_connection_kwargs(self) -> Dict:
455 """Get the connection's key-word arguments"""
456 return self.connection_pool.connection_kwargs
458 def get_retry(self) -> Optional[Retry]:
459 return self.get_connection_kwargs().get("retry")
461 def set_retry(self, retry: Retry) -> None:
462 self.get_connection_kwargs().update({"retry": retry})
463 self.connection_pool.set_retry(retry)
465 def set_response_callback(self, command: str, callback: Callable) -> None:
466 """Set a custom Response Callback"""
467 self.response_callbacks[command] = callback
469 def load_external_module(self, funcname, func) -> None:
470 """
471 This function can be used to add externally defined redis modules,
472 and their namespaces to the redis client.
474 funcname - A string containing the name of the function to create
475 func - The function, being added to this class.
477 ex: Assume that one has a custom redis module named foomod that
478 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
479 To load function functions into this namespace:
481 from redis import Redis
482 from foomodule import F
483 r = Redis()
484 r.load_external_module("foo", F)
485 r.foo().dothing('your', 'arguments')
487 For a concrete example see the reimport of the redisjson module in
488 tests/test_connection.py::test_loading_external_modules
489 """
490 setattr(self, funcname, func)
492 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
493 """
494 Return a new pipeline object that can queue multiple commands for
495 later execution. ``transaction`` indicates whether all commands
496 should be executed atomically. Apart from making a group of operations
497 atomic, pipelines are useful for reducing the back-and-forth overhead
498 between the client and server.
499 """
500 return Pipeline(
501 self.connection_pool, self.response_callbacks, transaction, shard_hint
502 )
504 def transaction(
505 self, func: Callable[["Pipeline"], None], *watches, **kwargs
506 ) -> Union[List[Any], Any, None]:
507 """
508 Convenience method for executing the callable `func` as a transaction
509 while watching all keys specified in `watches`. The 'func' callable
510 should expect a single argument which is a Pipeline object.
511 """
512 shard_hint = kwargs.pop("shard_hint", None)
513 value_from_callable = kwargs.pop("value_from_callable", False)
514 watch_delay = kwargs.pop("watch_delay", None)
515 with self.pipeline(True, shard_hint) as pipe:
516 while True:
517 try:
518 if watches:
519 pipe.watch(*watches)
520 func_value = func(pipe)
521 exec_value = pipe.execute()
522 return func_value if value_from_callable else exec_value
523 except WatchError:
524 if watch_delay is not None and watch_delay > 0:
525 time.sleep(watch_delay)
526 continue
528 def lock(
529 self,
530 name: str,
531 timeout: Optional[float] = None,
532 sleep: float = 0.1,
533 blocking: bool = True,
534 blocking_timeout: Optional[float] = None,
535 lock_class: Union[None, Any] = None,
536 thread_local: bool = True,
537 raise_on_release_error: bool = True,
538 ):
539 """
540 Return a new Lock object using key ``name`` that mimics
541 the behavior of threading.Lock.
543 If specified, ``timeout`` indicates a maximum life for the lock.
544 By default, it will remain locked until release() is called.
546 ``sleep`` indicates the amount of time to sleep per loop iteration
547 when the lock is in blocking mode and another client is currently
548 holding the lock.
550 ``blocking`` indicates whether calling ``acquire`` should block until
551 the lock has been acquired or to fail immediately, causing ``acquire``
552 to return False and the lock not being acquired. Defaults to True.
553 Note this value can be overridden by passing a ``blocking``
554 argument to ``acquire``.
556 ``blocking_timeout`` indicates the maximum amount of time in seconds to
557 spend trying to acquire the lock. A value of ``None`` indicates
558 continue trying forever. ``blocking_timeout`` can be specified as a
559 float or integer, both representing the number of seconds to wait.
561 ``lock_class`` forces the specified lock implementation. Note that as
562 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
563 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
564 you have created your own custom lock class.
566 ``thread_local`` indicates whether the lock token is placed in
567 thread-local storage. By default, the token is placed in thread local
568 storage so that a thread only sees its token, not a token set by
569 another thread. Consider the following timeline:
571 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
572 thread-1 sets the token to "abc"
573 time: 1, thread-2 blocks trying to acquire `my-lock` using the
574 Lock instance.
575 time: 5, thread-1 has not yet completed. redis expires the lock
576 key.
577 time: 5, thread-2 acquired `my-lock` now that it's available.
578 thread-2 sets the token to "xyz"
579 time: 6, thread-1 finishes its work and calls release(). if the
580 token is *not* stored in thread local storage, then
581 thread-1 would see the token value as "xyz" and would be
582 able to successfully release the thread-2's lock.
584 ``raise_on_release_error`` indicates whether to raise an exception when
585 the lock is no longer owned when exiting the context manager. By default,
586 this is True, meaning an exception will be raised. If False, the warning
587 will be logged and the exception will be suppressed.
589 In some use cases it's necessary to disable thread local storage. For
590 example, if you have code where one thread acquires a lock and passes
591 that lock instance to a worker thread to release later. If thread
592 local storage isn't disabled in this case, the worker thread won't see
593 the token set by the thread that acquired the lock. Our assumption
594 is that these cases aren't common and as such default to using
595 thread local storage."""
596 if lock_class is None:
597 lock_class = Lock
598 return lock_class(
599 self,
600 name,
601 timeout=timeout,
602 sleep=sleep,
603 blocking=blocking,
604 blocking_timeout=blocking_timeout,
605 thread_local=thread_local,
606 raise_on_release_error=raise_on_release_error,
607 )
609 def pubsub(self, **kwargs):
610 """
611 Return a Publish/Subscribe object. With this object, you can
612 subscribe to channels and listen for messages that get published to
613 them.
614 """
615 return PubSub(
616 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs
617 )
619 def monitor(self):
620 return Monitor(self.connection_pool)
622 def client(self):
623 return self.__class__(
624 connection_pool=self.connection_pool,
625 single_connection_client=True,
626 )
628 def __enter__(self):
629 return self
631 def __exit__(self, exc_type, exc_value, traceback):
632 self.close()
634 def __del__(self):
635 try:
636 self.close()
637 except Exception:
638 pass
640 def close(self) -> None:
641 # In case a connection property does not yet exist
642 # (due to a crash earlier in the Redis() constructor), return
643 # immediately as there is nothing to clean-up.
644 if not hasattr(self, "connection"):
645 return
647 conn = self.connection
648 if conn:
649 self.connection = None
650 self.connection_pool.release(conn)
652 if self.auto_close_connection_pool:
653 self.connection_pool.disconnect()
655 def _send_command_parse_response(self, conn, command_name, *args, **options):
656 """
657 Send a command and parse the response
658 """
659 conn.send_command(*args, **options)
660 return self.parse_response(conn, command_name, **options)
662 def _close_connection(self, conn) -> None:
663 """
664 Close the connection before retrying.
666 The supported exceptions are already checked in the
667 retry object so we don't need to do it here.
669 After we disconnect the connection, it will try to reconnect and
670 do a health check as part of the send_command logic(on connection level).
671 """
673 conn.disconnect()
675 # COMMAND EXECUTION AND PROTOCOL PARSING
676 def execute_command(self, *args, **options):
677 return self._execute_command(*args, **options)
679 def _execute_command(self, *args, **options):
680 """Execute a command and return a parsed response"""
681 pool = self.connection_pool
682 command_name = args[0]
683 conn = self.connection or pool.get_connection()
685 if self._single_connection_client:
686 self.single_connection_lock.acquire()
687 try:
688 return conn.retry.call_with_retry(
689 lambda: self._send_command_parse_response(
690 conn, command_name, *args, **options
691 ),
692 lambda _: self._close_connection(conn),
693 )
695 finally:
696 if conn and conn.should_reconnect():
697 self._close_connection(conn)
698 conn.connect()
699 if self._single_connection_client:
700 self.single_connection_lock.release()
701 if not self.connection:
702 pool.release(conn)
704 def parse_response(self, connection, command_name, **options):
705 """Parses a response from the Redis server"""
706 try:
707 if NEVER_DECODE in options:
708 response = connection.read_response(disable_decoding=True)
709 options.pop(NEVER_DECODE)
710 else:
711 response = connection.read_response()
712 except ResponseError:
713 if EMPTY_RESPONSE in options:
714 return options[EMPTY_RESPONSE]
715 raise
717 if EMPTY_RESPONSE in options:
718 options.pop(EMPTY_RESPONSE)
720 # Remove keys entry, it needs only for cache.
721 options.pop("keys", None)
723 if command_name in self.response_callbacks:
724 return self.response_callbacks[command_name](response, **options)
725 return response
727 def get_cache(self) -> Optional[CacheInterface]:
728 return self.connection_pool.cache
731StrictRedis = Redis
734class Monitor:
735 """
736 Monitor is useful for handling the MONITOR command to the redis server.
737 next_command() method returns one command from monitor
738 listen() method yields commands from monitor.
739 """
741 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)")
742 command_re = re.compile(r'"(.*?)(?<!\\)"')
744 def __init__(self, connection_pool):
745 self.connection_pool = connection_pool
746 self.connection = self.connection_pool.get_connection()
748 def __enter__(self):
749 self._start_monitor()
750 return self
752 def __exit__(self, *args):
753 self.connection.disconnect()
754 self.connection_pool.release(self.connection)
756 def next_command(self):
757 """Parse the response from a monitor command"""
758 response = self.connection.read_response()
760 if response is None:
761 return None
763 if isinstance(response, bytes):
764 response = self.connection.encoder.decode(response, force=True)
766 command_time, command_data = response.split(" ", 1)
767 m = self.monitor_re.match(command_data)
768 db_id, client_info, command = m.groups()
769 command = " ".join(self.command_re.findall(command))
770 # Redis escapes double quotes because each piece of the command
771 # string is surrounded by double quotes. We don't have that
772 # requirement so remove the escaping and leave the quote.
773 command = command.replace('\\"', '"')
775 if client_info == "lua":
776 client_address = "lua"
777 client_port = ""
778 client_type = "lua"
779 elif client_info.startswith("unix"):
780 client_address = "unix"
781 client_port = client_info[5:]
782 client_type = "unix"
783 else:
784 if client_info == "":
785 client_address = ""
786 client_port = ""
787 client_type = "unknown"
788 else:
789 # use rsplit as ipv6 addresses contain colons
790 client_address, client_port = client_info.rsplit(":", 1)
791 client_type = "tcp"
792 return {
793 "time": float(command_time),
794 "db": int(db_id),
795 "client_address": client_address,
796 "client_port": client_port,
797 "client_type": client_type,
798 "command": command,
799 }
801 def listen(self):
802 """Listen for commands coming to the server."""
803 while True:
804 yield self.next_command()
806 def _start_monitor(self):
807 self.connection.send_command("MONITOR")
808 # check that monitor returns 'OK', but don't return it to user
809 response = self.connection.read_response()
811 if not bool_ok(response):
812 raise RedisError(f"MONITOR failed: {response}")
815class PubSub:
816 """
817 PubSub provides publish, subscribe and listen support to Redis channels.
819 After subscribing to one or more channels, the listen() method will block
820 until a message arrives on one of the subscribed channels. That message
821 will be returned and it's safe to start listening again.
822 """
824 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage")
825 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe")
826 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
828 def __init__(
829 self,
830 connection_pool,
831 shard_hint=None,
832 ignore_subscribe_messages: bool = False,
833 encoder: Optional["Encoder"] = None,
834 push_handler_func: Union[None, Callable[[str], None]] = None,
835 event_dispatcher: Optional["EventDispatcher"] = None,
836 ):
837 self.connection_pool = connection_pool
838 self.shard_hint = shard_hint
839 self.ignore_subscribe_messages = ignore_subscribe_messages
840 self.connection = None
841 self.subscribed_event = threading.Event()
842 # we need to know the encoding options for this connection in order
843 # to lookup channel and pattern names for callback handlers.
844 self.encoder = encoder
845 self.push_handler_func = push_handler_func
846 if event_dispatcher is None:
847 self._event_dispatcher = EventDispatcher()
848 else:
849 self._event_dispatcher = event_dispatcher
851 self._lock = threading.RLock()
852 if self.encoder is None:
853 self.encoder = self.connection_pool.get_encoder()
854 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
855 if self.encoder.decode_responses:
856 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
857 else:
858 self.health_check_response = [b"pong", self.health_check_response_b]
859 if self.push_handler_func is None:
860 _set_info_logger()
861 self.reset()
863 def __enter__(self) -> "PubSub":
864 return self
866 def __exit__(self, exc_type, exc_value, traceback) -> None:
867 self.reset()
869 def __del__(self) -> None:
870 try:
871 # if this object went out of scope prior to shutting down
872 # subscriptions, close the connection manually before
873 # returning it to the connection pool
874 self.reset()
875 except Exception:
876 pass
878 def reset(self) -> None:
879 if self.connection:
880 self.connection.disconnect()
881 self.connection.deregister_connect_callback(self.on_connect)
882 self.connection_pool.release(self.connection)
883 self.connection = None
884 self.health_check_response_counter = 0
885 self.channels = {}
886 self.pending_unsubscribe_channels = set()
887 self.shard_channels = {}
888 self.pending_unsubscribe_shard_channels = set()
889 self.patterns = {}
890 self.pending_unsubscribe_patterns = set()
891 self.subscribed_event.clear()
893 def close(self) -> None:
894 self.reset()
896 def on_connect(self, connection) -> None:
897 "Re-subscribe to any channels and patterns previously subscribed to"
898 # NOTE: for python3, we can't pass bytestrings as keyword arguments
899 # so we need to decode channel/pattern names back to unicode strings
900 # before passing them to [p]subscribe.
901 self.pending_unsubscribe_channels.clear()
902 self.pending_unsubscribe_patterns.clear()
903 self.pending_unsubscribe_shard_channels.clear()
904 if self.channels:
905 channels = {
906 self.encoder.decode(k, force=True): v for k, v in self.channels.items()
907 }
908 self.subscribe(**channels)
909 if self.patterns:
910 patterns = {
911 self.encoder.decode(k, force=True): v for k, v in self.patterns.items()
912 }
913 self.psubscribe(**patterns)
914 if self.shard_channels:
915 shard_channels = {
916 self.encoder.decode(k, force=True): v
917 for k, v in self.shard_channels.items()
918 }
919 self.ssubscribe(**shard_channels)
921 @property
922 def subscribed(self) -> bool:
923 """Indicates if there are subscriptions to any channels or patterns"""
924 return self.subscribed_event.is_set()
926 def execute_command(self, *args):
927 """Execute a publish/subscribe command"""
929 # NOTE: don't parse the response in this function -- it could pull a
930 # legitimate message off the stack if the connection is already
931 # subscribed to one or more channels
933 if self.connection is None:
934 self.connection = self.connection_pool.get_connection()
935 # register a callback that re-subscribes to any channels we
936 # were listening to when we were disconnected
937 self.connection.register_connect_callback(self.on_connect)
938 if self.push_handler_func is not None:
939 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
940 self._event_dispatcher.dispatch(
941 AfterPubSubConnectionInstantiationEvent(
942 self.connection, self.connection_pool, ClientType.SYNC, self._lock
943 )
944 )
945 connection = self.connection
946 kwargs = {"check_health": not self.subscribed}
947 if not self.subscribed:
948 self.clean_health_check_responses()
949 with self._lock:
950 self._execute(connection, connection.send_command, *args, **kwargs)
952 def clean_health_check_responses(self) -> None:
953 """
954 If any health check responses are present, clean them
955 """
956 ttl = 10
957 conn = self.connection
958 while conn and self.health_check_response_counter > 0 and ttl > 0:
959 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
960 response = self._execute(conn, conn.read_response)
961 if self.is_health_check_response(response):
962 self.health_check_response_counter -= 1
963 else:
964 raise PubSubError(
965 "A non health check response was cleaned by "
966 "execute_command: {}".format(response)
967 )
968 ttl -= 1
970 def _reconnect(self, conn) -> None:
971 """
972 The supported exceptions are already checked in the
973 retry object so we don't need to do it here.
975 In this error handler we are trying to reconnect to the server.
976 """
977 conn.disconnect()
978 conn.connect()
980 def _execute(self, conn, command, *args, **kwargs):
981 """
982 Connect manually upon disconnection. If the Redis server is down,
983 this will fail and raise a ConnectionError as desired.
984 After reconnection, the ``on_connect`` callback should have been
985 called by the # connection to resubscribe us to any channels and
986 patterns we were previously listening to
987 """
989 if conn.should_reconnect():
990 self._reconnect(conn)
992 response = conn.retry.call_with_retry(
993 lambda: command(*args, **kwargs),
994 lambda _: self._reconnect(conn),
995 )
997 return response
999 def parse_response(self, block=True, timeout=0):
1000 """Parse the response from a publish/subscribe command"""
1001 conn = self.connection
1002 if conn is None:
1003 raise RuntimeError(
1004 "pubsub connection not set: "
1005 "did you forget to call subscribe() or psubscribe()?"
1006 )
1008 self.check_health()
1010 def try_read():
1011 if not block:
1012 if not conn.can_read(timeout=timeout):
1013 return None
1014 else:
1015 conn.connect()
1016 return conn.read_response(disconnect_on_error=False, push_request=True)
1018 response = self._execute(conn, try_read)
1020 if self.is_health_check_response(response):
1021 # ignore the health check message as user might not expect it
1022 self.health_check_response_counter -= 1
1023 return None
1024 return response
1026 def is_health_check_response(self, response) -> bool:
1027 """
1028 Check if the response is a health check response.
1029 If there are no subscriptions redis responds to PING command with a
1030 bulk response, instead of a multi-bulk with "pong" and the response.
1031 """
1032 if self.encoder.decode_responses:
1033 return (
1034 response
1035 in [
1036 self.health_check_response, # If there is a subscription
1037 self.HEALTH_CHECK_MESSAGE, # If there are no subscriptions and decode_responses=True
1038 ]
1039 )
1040 else:
1041 return (
1042 response
1043 in [
1044 self.health_check_response, # If there is a subscription
1045 self.health_check_response_b, # If there isn't a subscription and decode_responses=False
1046 ]
1047 )
1049 def check_health(self) -> None:
1050 conn = self.connection
1051 if conn is None:
1052 raise RuntimeError(
1053 "pubsub connection not set: "
1054 "did you forget to call subscribe() or psubscribe()?"
1055 )
1057 if conn.health_check_interval and time.monotonic() > conn.next_health_check:
1058 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
1059 self.health_check_response_counter += 1
1061 def _normalize_keys(self, data) -> Dict:
1062 """
1063 normalize channel/pattern names to be either bytes or strings
1064 based on whether responses are automatically decoded. this saves us
1065 from coercing the value for each message coming in.
1066 """
1067 encode = self.encoder.encode
1068 decode = self.encoder.decode
1069 return {decode(encode(k)): v for k, v in data.items()}
1071 def psubscribe(self, *args, **kwargs):
1072 """
1073 Subscribe to channel patterns. Patterns supplied as keyword arguments
1074 expect a pattern name as the key and a callable as the value. A
1075 pattern's callable will be invoked automatically when a message is
1076 received on that pattern rather than producing a message via
1077 ``listen()``.
1078 """
1079 if args:
1080 args = list_or_args(args[0], args[1:])
1081 new_patterns = dict.fromkeys(args)
1082 new_patterns.update(kwargs)
1083 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
1084 # update the patterns dict AFTER we send the command. we don't want to
1085 # subscribe twice to these patterns, once for the command and again
1086 # for the reconnection.
1087 new_patterns = self._normalize_keys(new_patterns)
1088 self.patterns.update(new_patterns)
1089 if not self.subscribed:
1090 # Set the subscribed_event flag to True
1091 self.subscribed_event.set()
1092 # Clear the health check counter
1093 self.health_check_response_counter = 0
1094 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1095 return ret_val
1097 def punsubscribe(self, *args):
1098 """
1099 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1100 all patterns.
1101 """
1102 if args:
1103 args = list_or_args(args[0], args[1:])
1104 patterns = self._normalize_keys(dict.fromkeys(args))
1105 else:
1106 patterns = self.patterns
1107 self.pending_unsubscribe_patterns.update(patterns)
1108 return self.execute_command("PUNSUBSCRIBE", *args)
1110 def subscribe(self, *args, **kwargs):
1111 """
1112 Subscribe to channels. Channels supplied as keyword arguments expect
1113 a channel name as the key and a callable as the value. A channel's
1114 callable will be invoked automatically when a message is received on
1115 that channel rather than producing a message via ``listen()`` or
1116 ``get_message()``.
1117 """
1118 if args:
1119 args = list_or_args(args[0], args[1:])
1120 new_channels = dict.fromkeys(args)
1121 new_channels.update(kwargs)
1122 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1123 # update the channels dict AFTER we send the command. we don't want to
1124 # subscribe twice to these channels, once for the command and again
1125 # for the reconnection.
1126 new_channels = self._normalize_keys(new_channels)
1127 self.channels.update(new_channels)
1128 if not self.subscribed:
1129 # Set the subscribed_event flag to True
1130 self.subscribed_event.set()
1131 # Clear the health check counter
1132 self.health_check_response_counter = 0
1133 self.pending_unsubscribe_channels.difference_update(new_channels)
1134 return ret_val
1136 def unsubscribe(self, *args):
1137 """
1138 Unsubscribe from the supplied channels. If empty, unsubscribe from
1139 all channels
1140 """
1141 if args:
1142 args = list_or_args(args[0], args[1:])
1143 channels = self._normalize_keys(dict.fromkeys(args))
1144 else:
1145 channels = self.channels
1146 self.pending_unsubscribe_channels.update(channels)
1147 return self.execute_command("UNSUBSCRIBE", *args)
1149 def ssubscribe(self, *args, target_node=None, **kwargs):
1150 """
1151 Subscribes the client to the specified shard channels.
1152 Channels supplied as keyword arguments expect a channel name as the key
1153 and a callable as the value. A channel's callable will be invoked automatically
1154 when a message is received on that channel rather than producing a message via
1155 ``listen()`` or ``get_sharded_message()``.
1156 """
1157 if args:
1158 args = list_or_args(args[0], args[1:])
1159 new_s_channels = dict.fromkeys(args)
1160 new_s_channels.update(kwargs)
1161 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys())
1162 # update the s_channels dict AFTER we send the command. we don't want to
1163 # subscribe twice to these channels, once for the command and again
1164 # for the reconnection.
1165 new_s_channels = self._normalize_keys(new_s_channels)
1166 self.shard_channels.update(new_s_channels)
1167 if not self.subscribed:
1168 # Set the subscribed_event flag to True
1169 self.subscribed_event.set()
1170 # Clear the health check counter
1171 self.health_check_response_counter = 0
1172 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels)
1173 return ret_val
1175 def sunsubscribe(self, *args, target_node=None):
1176 """
1177 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
1178 all shard_channels
1179 """
1180 if args:
1181 args = list_or_args(args[0], args[1:])
1182 s_channels = self._normalize_keys(dict.fromkeys(args))
1183 else:
1184 s_channels = self.shard_channels
1185 self.pending_unsubscribe_shard_channels.update(s_channels)
1186 return self.execute_command("SUNSUBSCRIBE", *args)
1188 def listen(self):
1189 "Listen for messages on channels this client has been subscribed to"
1190 while self.subscribed:
1191 response = self.handle_message(self.parse_response(block=True))
1192 if response is not None:
1193 yield response
1195 def get_message(
1196 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
1197 ):
1198 """
1199 Get the next message if one is available, otherwise None.
1201 If timeout is specified, the system will wait for `timeout` seconds
1202 before returning. Timeout should be specified as a floating point
1203 number, or None, to wait indefinitely.
1204 """
1205 if not self.subscribed:
1206 # Wait for subscription
1207 start_time = time.monotonic()
1208 if self.subscribed_event.wait(timeout) is True:
1209 # The connection was subscribed during the timeout time frame.
1210 # The timeout should be adjusted based on the time spent
1211 # waiting for the subscription
1212 time_spent = time.monotonic() - start_time
1213 timeout = max(0.0, timeout - time_spent)
1214 else:
1215 # The connection isn't subscribed to any channels or patterns,
1216 # so no messages are available
1217 return None
1219 response = self.parse_response(block=(timeout is None), timeout=timeout)
1221 if response:
1222 return self.handle_message(response, ignore_subscribe_messages)
1223 return None
1225 get_sharded_message = get_message
1227 def ping(self, message: Union[str, None] = None) -> bool:
1228 """
1229 Ping the Redis server to test connectivity.
1231 Sends a PING command to the Redis server and returns True if the server
1232 responds with "PONG".
1233 """
1234 args = ["PING", message] if message is not None else ["PING"]
1235 return self.execute_command(*args)
1237 def handle_message(self, response, ignore_subscribe_messages=False):
1238 """
1239 Parses a pub/sub message. If the channel or pattern was subscribed to
1240 with a message handler, the handler is invoked instead of a parsed
1241 message being returned.
1242 """
1243 if response is None:
1244 return None
1245 if isinstance(response, bytes):
1246 response = [b"pong", response] if response != b"PONG" else [b"pong", b""]
1248 message_type = str_if_bytes(response[0])
1249 if message_type == "pmessage":
1250 message = {
1251 "type": message_type,
1252 "pattern": response[1],
1253 "channel": response[2],
1254 "data": response[3],
1255 }
1256 elif message_type == "pong":
1257 message = {
1258 "type": message_type,
1259 "pattern": None,
1260 "channel": None,
1261 "data": response[1],
1262 }
1263 else:
1264 message = {
1265 "type": message_type,
1266 "pattern": None,
1267 "channel": response[1],
1268 "data": response[2],
1269 }
1271 # if this is an unsubscribe message, remove it from memory
1272 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1273 if message_type == "punsubscribe":
1274 pattern = response[1]
1275 if pattern in self.pending_unsubscribe_patterns:
1276 self.pending_unsubscribe_patterns.remove(pattern)
1277 self.patterns.pop(pattern, None)
1278 elif message_type == "sunsubscribe":
1279 s_channel = response[1]
1280 if s_channel in self.pending_unsubscribe_shard_channels:
1281 self.pending_unsubscribe_shard_channels.remove(s_channel)
1282 self.shard_channels.pop(s_channel, None)
1283 else:
1284 channel = response[1]
1285 if channel in self.pending_unsubscribe_channels:
1286 self.pending_unsubscribe_channels.remove(channel)
1287 self.channels.pop(channel, None)
1288 if not self.channels and not self.patterns and not self.shard_channels:
1289 # There are no subscriptions anymore, set subscribed_event flag
1290 # to false
1291 self.subscribed_event.clear()
1293 if message_type in self.PUBLISH_MESSAGE_TYPES:
1294 # if there's a message handler, invoke it
1295 if message_type == "pmessage":
1296 handler = self.patterns.get(message["pattern"], None)
1297 elif message_type == "smessage":
1298 handler = self.shard_channels.get(message["channel"], None)
1299 else:
1300 handler = self.channels.get(message["channel"], None)
1301 if handler:
1302 handler(message)
1303 return None
1304 elif message_type != "pong":
1305 # this is a subscribe/unsubscribe message. ignore if we don't
1306 # want them
1307 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1308 return None
1310 return message
1312 def run_in_thread(
1313 self,
1314 sleep_time: float = 0.0,
1315 daemon: bool = False,
1316 exception_handler: Optional[Callable] = None,
1317 pubsub=None,
1318 sharded_pubsub: bool = False,
1319 ) -> "PubSubWorkerThread":
1320 for channel, handler in self.channels.items():
1321 if handler is None:
1322 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1323 for pattern, handler in self.patterns.items():
1324 if handler is None:
1325 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1326 for s_channel, handler in self.shard_channels.items():
1327 if handler is None:
1328 raise PubSubError(
1329 f"Shard Channel: '{s_channel}' has no handler registered"
1330 )
1332 pubsub = self if pubsub is None else pubsub
1333 thread = PubSubWorkerThread(
1334 pubsub,
1335 sleep_time,
1336 daemon=daemon,
1337 exception_handler=exception_handler,
1338 sharded_pubsub=sharded_pubsub,
1339 )
1340 thread.start()
1341 return thread
1344class PubSubWorkerThread(threading.Thread):
1345 def __init__(
1346 self,
1347 pubsub,
1348 sleep_time: float,
1349 daemon: bool = False,
1350 exception_handler: Union[
1351 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
1352 ] = None,
1353 sharded_pubsub: bool = False,
1354 ):
1355 super().__init__()
1356 self.daemon = daemon
1357 self.pubsub = pubsub
1358 self.sleep_time = sleep_time
1359 self.exception_handler = exception_handler
1360 self.sharded_pubsub = sharded_pubsub
1361 self._running = threading.Event()
1363 def run(self) -> None:
1364 if self._running.is_set():
1365 return
1366 self._running.set()
1367 pubsub = self.pubsub
1368 sleep_time = self.sleep_time
1369 while self._running.is_set():
1370 try:
1371 if not self.sharded_pubsub:
1372 pubsub.get_message(
1373 ignore_subscribe_messages=True, timeout=sleep_time
1374 )
1375 else:
1376 pubsub.get_sharded_message(
1377 ignore_subscribe_messages=True, timeout=sleep_time
1378 )
1379 except BaseException as e:
1380 if self.exception_handler is None:
1381 raise
1382 self.exception_handler(e, pubsub, self)
1383 pubsub.close()
1385 def stop(self) -> None:
1386 # trip the flag so the run loop exits. the run loop will
1387 # close the pubsub connection, which disconnects the socket
1388 # and returns the connection to the pool.
1389 self._running.clear()
1392class Pipeline(Redis):
1393 """
1394 Pipelines provide a way to transmit multiple commands to the Redis server
1395 in one transmission. This is convenient for batch processing, such as
1396 saving all the values in a list to Redis.
1398 All commands executed within a pipeline(when running in transactional mode,
1399 which is the default behavior) are wrapped with MULTI and EXEC
1400 calls. This guarantees all commands executed in the pipeline will be
1401 executed atomically.
1403 Any command raising an exception does *not* halt the execution of
1404 subsequent commands in the pipeline. Instead, the exception is caught
1405 and its instance is placed into the response list returned by execute().
1406 Code iterating over the response list should be able to deal with an
1407 instance of an exception as a potential value. In general, these will be
1408 ResponseError exceptions, such as those raised when issuing a command
1409 on a key of a different datatype.
1410 """
1412 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1414 def __init__(
1415 self,
1416 connection_pool: ConnectionPool,
1417 response_callbacks,
1418 transaction,
1419 shard_hint,
1420 ):
1421 self.connection_pool = connection_pool
1422 self.connection: Optional[Connection] = None
1423 self.response_callbacks = response_callbacks
1424 self.transaction = transaction
1425 self.shard_hint = shard_hint
1426 self.watching = False
1427 self.command_stack = []
1428 self.scripts: Set[Script] = set()
1429 self.explicit_transaction = False
1431 def __enter__(self) -> "Pipeline":
1432 return self
1434 def __exit__(self, exc_type, exc_value, traceback):
1435 self.reset()
1437 def __del__(self):
1438 try:
1439 self.reset()
1440 except Exception:
1441 pass
1443 def __len__(self) -> int:
1444 return len(self.command_stack)
1446 def __bool__(self) -> bool:
1447 """Pipeline instances should always evaluate to True"""
1448 return True
1450 def reset(self) -> None:
1451 self.command_stack = []
1452 self.scripts = set()
1453 # make sure to reset the connection state in the event that we were
1454 # watching something
1455 if self.watching and self.connection:
1456 try:
1457 # call this manually since our unwatch or
1458 # immediate_execute_command methods can call reset()
1459 self.connection.send_command("UNWATCH")
1460 self.connection.read_response()
1461 except ConnectionError:
1462 # disconnect will also remove any previous WATCHes
1463 self.connection.disconnect()
1464 # clean up the other instance attributes
1465 self.watching = False
1466 self.explicit_transaction = False
1468 # we can safely return the connection to the pool here since we're
1469 # sure we're no longer WATCHing anything
1470 if self.connection:
1471 self.connection_pool.release(self.connection)
1472 self.connection = None
1474 def close(self) -> None:
1475 """Close the pipeline"""
1476 self.reset()
1478 def multi(self) -> None:
1479 """
1480 Start a transactional block of the pipeline after WATCH commands
1481 are issued. End the transactional block with `execute`.
1482 """
1483 if self.explicit_transaction:
1484 raise RedisError("Cannot issue nested calls to MULTI")
1485 if self.command_stack:
1486 raise RedisError(
1487 "Commands without an initial WATCH have already been issued"
1488 )
1489 self.explicit_transaction = True
1491 def execute_command(self, *args, **kwargs):
1492 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1493 return self.immediate_execute_command(*args, **kwargs)
1494 return self.pipeline_execute_command(*args, **kwargs)
1496 def _disconnect_reset_raise_on_watching(
1497 self,
1498 conn: AbstractConnection,
1499 error: Exception,
1500 ) -> None:
1501 """
1502 Close the connection reset watching state and
1503 raise an exception if we were watching.
1505 The supported exceptions are already checked in the
1506 retry object so we don't need to do it here.
1508 After we disconnect the connection, it will try to reconnect and
1509 do a health check as part of the send_command logic(on connection level).
1510 """
1511 conn.disconnect()
1513 # if we were already watching a variable, the watch is no longer
1514 # valid since this connection has died. raise a WatchError, which
1515 # indicates the user should retry this transaction.
1516 if self.watching:
1517 self.reset()
1518 raise WatchError(
1519 f"A {type(error).__name__} occurred while watching one or more keys"
1520 )
1522 def immediate_execute_command(self, *args, **options):
1523 """
1524 Execute a command immediately, but don't auto-retry on the supported
1525 errors for retry if we're already WATCHing a variable.
1526 Used when issuing WATCH or subsequent commands retrieving their values but before
1527 MULTI is called.
1528 """
1529 command_name = args[0]
1530 conn = self.connection
1531 # if this is the first call, we need a connection
1532 if not conn:
1533 conn = self.connection_pool.get_connection()
1534 self.connection = conn
1536 return conn.retry.call_with_retry(
1537 lambda: self._send_command_parse_response(
1538 conn, command_name, *args, **options
1539 ),
1540 lambda error: self._disconnect_reset_raise_on_watching(conn, error),
1541 )
1543 def pipeline_execute_command(self, *args, **options) -> "Pipeline":
1544 """
1545 Stage a command to be executed when execute() is next called
1547 Returns the current Pipeline object back so commands can be
1548 chained together, such as:
1550 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1552 At some other point, you can then run: pipe.execute(),
1553 which will execute all commands queued in the pipe.
1554 """
1555 self.command_stack.append((args, options))
1556 return self
1558 def _execute_transaction(
1559 self, connection: Connection, commands, raise_on_error
1560 ) -> List:
1561 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1562 all_cmds = connection.pack_commands(
1563 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1564 )
1565 connection.send_packed_command(all_cmds)
1566 errors = []
1568 # parse off the response for MULTI
1569 # NOTE: we need to handle ResponseErrors here and continue
1570 # so that we read all the additional command messages from
1571 # the socket
1572 try:
1573 self.parse_response(connection, "_")
1574 except ResponseError as e:
1575 errors.append((0, e))
1577 # and all the other commands
1578 for i, command in enumerate(commands):
1579 if EMPTY_RESPONSE in command[1]:
1580 errors.append((i, command[1][EMPTY_RESPONSE]))
1581 else:
1582 try:
1583 self.parse_response(connection, "_")
1584 except ResponseError as e:
1585 self.annotate_exception(e, i + 1, command[0])
1586 errors.append((i, e))
1588 # parse the EXEC.
1589 try:
1590 response = self.parse_response(connection, "_")
1591 except ExecAbortError:
1592 if errors:
1593 raise errors[0][1]
1594 raise
1596 # EXEC clears any watched keys
1597 self.watching = False
1599 if response is None:
1600 raise WatchError("Watched variable changed.")
1602 # put any parse errors into the response
1603 for i, e in errors:
1604 response.insert(i, e)
1606 if len(response) != len(commands):
1607 self.connection.disconnect()
1608 raise ResponseError(
1609 "Wrong number of response items from pipeline execution"
1610 )
1612 # find any errors in the response and raise if necessary
1613 if raise_on_error:
1614 self.raise_first_error(commands, response)
1616 # We have to run response callbacks manually
1617 data = []
1618 for r, cmd in zip(response, commands):
1619 if not isinstance(r, Exception):
1620 args, options = cmd
1621 # Remove keys entry, it needs only for cache.
1622 options.pop("keys", None)
1623 command_name = args[0]
1624 if command_name in self.response_callbacks:
1625 r = self.response_callbacks[command_name](r, **options)
1626 data.append(r)
1628 return data
1630 def _execute_pipeline(self, connection, commands, raise_on_error):
1631 # build up all commands into a single request to increase network perf
1632 all_cmds = connection.pack_commands([args for args, _ in commands])
1633 connection.send_packed_command(all_cmds)
1635 responses = []
1636 for args, options in commands:
1637 try:
1638 responses.append(self.parse_response(connection, args[0], **options))
1639 except ResponseError as e:
1640 responses.append(e)
1642 if raise_on_error:
1643 self.raise_first_error(commands, responses)
1645 return responses
1647 def raise_first_error(self, commands, response):
1648 for i, r in enumerate(response):
1649 if isinstance(r, ResponseError):
1650 self.annotate_exception(r, i + 1, commands[i][0])
1651 raise r
1653 def annotate_exception(self, exception, number, command):
1654 cmd = " ".join(map(safe_str, command))
1655 msg = (
1656 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
1657 f"caused error: {exception.args[0]}"
1658 )
1659 exception.args = (msg,) + exception.args[1:]
1661 def parse_response(self, connection, command_name, **options):
1662 result = Redis.parse_response(self, connection, command_name, **options)
1663 if command_name in self.UNWATCH_COMMANDS:
1664 self.watching = False
1665 elif command_name == "WATCH":
1666 self.watching = True
1667 return result
1669 def load_scripts(self):
1670 # make sure all scripts that are about to be run on this pipeline exist
1671 scripts = list(self.scripts)
1672 immediate = self.immediate_execute_command
1673 shas = [s.sha for s in scripts]
1674 # we can't use the normal script_* methods because they would just
1675 # get buffered in the pipeline.
1676 exists = immediate("SCRIPT EXISTS", *shas)
1677 if not all(exists):
1678 for s, exist in zip(scripts, exists):
1679 if not exist:
1680 s.sha = immediate("SCRIPT LOAD", s.script)
1682 def _disconnect_raise_on_watching(
1683 self,
1684 conn: AbstractConnection,
1685 error: Exception,
1686 ) -> None:
1687 """
1688 Close the connection, raise an exception if we were watching.
1690 The supported exceptions are already checked in the
1691 retry object so we don't need to do it here.
1693 After we disconnect the connection, it will try to reconnect and
1694 do a health check as part of the send_command logic(on connection level).
1695 """
1696 conn.disconnect()
1697 # if we were watching a variable, the watch is no longer valid
1698 # since this connection has died. raise a WatchError, which
1699 # indicates the user should retry this transaction.
1700 if self.watching:
1701 raise WatchError(
1702 f"A {type(error).__name__} occurred while watching one or more keys"
1703 )
1705 def execute(self, raise_on_error: bool = True) -> List[Any]:
1706 """Execute all the commands in the current pipeline"""
1707 stack = self.command_stack
1708 if not stack and not self.watching:
1709 return []
1710 if self.scripts:
1711 self.load_scripts()
1712 if self.transaction or self.explicit_transaction:
1713 execute = self._execute_transaction
1714 else:
1715 execute = self._execute_pipeline
1717 conn = self.connection
1718 if not conn:
1719 conn = self.connection_pool.get_connection()
1720 # assign to self.connection so reset() releases the connection
1721 # back to the pool after we're done
1722 self.connection = conn
1724 try:
1725 return conn.retry.call_with_retry(
1726 lambda: execute(conn, stack, raise_on_error),
1727 lambda error: self._disconnect_raise_on_watching(conn, error),
1728 )
1729 finally:
1730 # in reset() the connection is disconnected before returned to the pool if
1731 # it is marked for reconnect.
1732 self.reset()
1734 def discard(self):
1735 """
1736 Flushes all previously queued commands
1737 See: https://redis.io/commands/DISCARD
1738 """
1739 self.execute_command("DISCARD")
1741 def watch(self, *names):
1742 """Watches the values at keys ``names``"""
1743 if self.explicit_transaction:
1744 raise RedisError("Cannot issue a WATCH after a MULTI")
1745 return self.execute_command("WATCH", *names)
1747 def unwatch(self) -> bool:
1748 """Unwatches all previously specified keys"""
1749 return self.watching and self.execute_command("UNWATCH") or True