Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import re
3import threading
4import time
5from itertools import chain
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Callable,
10 Dict,
11 List,
12 Mapping,
13 Optional,
14 Set,
15 Type,
16 Union,
17)
19from redis._parsers.encoders import Encoder
20from redis._parsers.helpers import (
21 _RedisCallbacks,
22 _RedisCallbacksRESP2,
23 _RedisCallbacksRESP3,
24 bool_ok,
25)
26from redis.backoff import ExponentialWithJitterBackoff
27from redis.cache import CacheConfig, CacheInterface
28from redis.commands import (
29 CoreCommands,
30 RedisModuleCommands,
31 SentinelCommands,
32 list_or_args,
33)
34from redis.commands.core import Script
35from redis.connection import (
36 AbstractConnection,
37 Connection,
38 ConnectionPool,
39 SSLConnection,
40 UnixDomainSocketConnection,
41)
42from redis.credentials import CredentialProvider
43from redis.event import (
44 AfterPooledConnectionsInstantiationEvent,
45 AfterPubSubConnectionInstantiationEvent,
46 AfterSingleConnectionInstantiationEvent,
47 ClientType,
48 EventDispatcher,
49)
50from redis.exceptions import (
51 ConnectionError,
52 ExecAbortError,
53 PubSubError,
54 RedisError,
55 ResponseError,
56 WatchError,
57)
58from redis.lock import Lock
59from redis.maint_notifications import (
60 MaintNotificationsConfig,
61)
62from redis.retry import Retry
63from redis.utils import (
64 _set_info_logger,
65 deprecated_args,
66 get_lib_version,
67 safe_str,
68 str_if_bytes,
69 truncate_text,
70)
72if TYPE_CHECKING:
73 import ssl
75 import OpenSSL
77SYM_EMPTY = b""
78EMPTY_RESPONSE = "EMPTY_RESPONSE"
80# some responses (ie. dump) are binary, and just meant to never be decoded
81NEVER_DECODE = "NEVER_DECODE"
84class CaseInsensitiveDict(dict):
85 "Case insensitive dict implementation. Assumes string keys only."
87 def __init__(self, data: Dict[str, str]) -> None:
88 for k, v in data.items():
89 self[k.upper()] = v
91 def __contains__(self, k):
92 return super().__contains__(k.upper())
94 def __delitem__(self, k):
95 super().__delitem__(k.upper())
97 def __getitem__(self, k):
98 return super().__getitem__(k.upper())
100 def get(self, k, default=None):
101 return super().get(k.upper(), default)
103 def __setitem__(self, k, v):
104 super().__setitem__(k.upper(), v)
106 def update(self, data):
107 data = CaseInsensitiveDict(data)
108 super().update(data)
111class AbstractRedis:
112 pass
115class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
116 """
117 Implementation of the Redis protocol.
119 This abstract class provides a Python interface to all Redis commands
120 and an implementation of the Redis protocol.
122 Pipelines derive from this, implementing how
123 the commands are sent and received to the Redis server. Based on
124 configuration, an instance will either use a ConnectionPool, or
125 Connection object to talk to redis.
127 It is not safe to pass PubSub or Pipeline objects between threads.
128 """
130 @classmethod
131 def from_url(cls, url: str, **kwargs) -> "Redis":
132 """
133 Return a Redis client object configured from the given URL
135 For example::
137 redis://[[username]:[password]]@localhost:6379/0
138 rediss://[[username]:[password]]@localhost:6379/0
139 unix://[username@]/path/to/socket.sock?db=0[&password=password]
141 Three URL schemes are supported:
143 - `redis://` creates a TCP socket connection. See more at:
144 <https://www.iana.org/assignments/uri-schemes/prov/redis>
145 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
146 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
147 - ``unix://``: creates a Unix Domain Socket connection.
149 The username, password, hostname, path and all querystring values
150 are passed through urllib.parse.unquote in order to replace any
151 percent-encoded values with their corresponding characters.
153 There are several ways to specify a database number. The first value
154 found will be used:
156 1. A ``db`` querystring option, e.g. redis://localhost?db=0
157 2. If using the redis:// or rediss:// schemes, the path argument
158 of the url, e.g. redis://localhost/0
159 3. A ``db`` keyword argument to this function.
161 If none of these options are specified, the default db=0 is used.
163 All querystring options are cast to their appropriate Python types.
164 Boolean arguments can be specified with string values "True"/"False"
165 or "Yes"/"No". Values that cannot be properly cast cause a
166 ``ValueError`` to be raised. Once parsed, the querystring arguments
167 and keyword arguments are passed to the ``ConnectionPool``'s
168 class initializer. In the case of conflicting arguments, querystring
169 arguments always win.
171 """
172 single_connection_client = kwargs.pop("single_connection_client", False)
173 connection_pool = ConnectionPool.from_url(url, **kwargs)
174 client = cls(
175 connection_pool=connection_pool,
176 single_connection_client=single_connection_client,
177 )
178 client.auto_close_connection_pool = True
179 return client
181 @classmethod
182 def from_pool(
183 cls: Type["Redis"],
184 connection_pool: ConnectionPool,
185 ) -> "Redis":
186 """
187 Return a Redis client from the given connection pool.
188 The Redis client will take ownership of the connection pool and
189 close it when the Redis client is closed.
190 """
191 client = cls(
192 connection_pool=connection_pool,
193 )
194 client.auto_close_connection_pool = True
195 return client
197 @deprecated_args(
198 args_to_warn=["retry_on_timeout"],
199 reason="TimeoutError is included by default.",
200 version="6.0.0",
201 )
202 def __init__(
203 self,
204 host: str = "localhost",
205 port: int = 6379,
206 db: int = 0,
207 password: Optional[str] = None,
208 socket_timeout: Optional[float] = None,
209 socket_connect_timeout: Optional[float] = None,
210 socket_keepalive: Optional[bool] = None,
211 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
212 connection_pool: Optional[ConnectionPool] = None,
213 unix_socket_path: Optional[str] = None,
214 encoding: str = "utf-8",
215 encoding_errors: str = "strict",
216 decode_responses: bool = False,
217 retry_on_timeout: bool = False,
218 retry: Retry = Retry(
219 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
220 ),
221 retry_on_error: Optional[List[Type[Exception]]] = None,
222 ssl: bool = False,
223 ssl_keyfile: Optional[str] = None,
224 ssl_certfile: Optional[str] = None,
225 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
226 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
227 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
228 ssl_ca_certs: Optional[str] = None,
229 ssl_ca_path: Optional[str] = None,
230 ssl_ca_data: Optional[str] = None,
231 ssl_check_hostname: bool = True,
232 ssl_password: Optional[str] = None,
233 ssl_validate_ocsp: bool = False,
234 ssl_validate_ocsp_stapled: bool = False,
235 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
236 ssl_ocsp_expected_cert: Optional[str] = None,
237 ssl_min_version: Optional["ssl.TLSVersion"] = None,
238 ssl_ciphers: Optional[str] = None,
239 max_connections: Optional[int] = None,
240 single_connection_client: bool = False,
241 health_check_interval: int = 0,
242 client_name: Optional[str] = None,
243 lib_name: Optional[str] = "redis-py",
244 lib_version: Optional[str] = get_lib_version(),
245 username: Optional[str] = None,
246 redis_connect_func: Optional[Callable[[], None]] = None,
247 credential_provider: Optional[CredentialProvider] = None,
248 protocol: Optional[int] = 2,
249 cache: Optional[CacheInterface] = None,
250 cache_config: Optional[CacheConfig] = None,
251 event_dispatcher: Optional[EventDispatcher] = None,
252 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
253 ) -> None:
254 """
255 Initialize a new Redis client.
257 To specify a retry policy for specific errors, you have two options:
259 1. Set the `retry_on_error` to a list of the error/s to retry on, and
260 you can also set `retry` to a valid `Retry` object(in case the default
261 one is not appropriate) - with this approach the retries will be triggered
262 on the default errors specified in the Retry object enriched with the
263 errors specified in `retry_on_error`.
265 2. Define a `Retry` object with configured 'supported_errors' and set
266 it to the `retry` parameter - with this approach you completely redefine
267 the errors on which retries will happen.
269 `retry_on_timeout` is deprecated - please include the TimeoutError
270 either in the Retry object or in the `retry_on_error` list.
272 When 'connection_pool' is provided - the retry configuration of the
273 provided pool will be used.
275 Args:
277 single_connection_client:
278 if `True`, connection pool is not used. In that case `Redis`
279 instance use is not thread safe.
280 decode_responses:
281 if `True`, the response will be decoded to utf-8.
282 Argument is ignored when connection_pool is provided.
283 maint_notifications_config:
284 configuration the pool to support maintenance notifications - see
285 `redis.maint_notifications.MaintNotificationsConfig` for details.
286 Only supported with RESP3
287 If not provided and protocol is RESP3, the maintenance notifications
288 will be enabled by default (logic is included in the connection pool
289 initialization).
290 Argument is ignored when connection_pool is provided.
291 """
292 if event_dispatcher is None:
293 self._event_dispatcher = EventDispatcher()
294 else:
295 self._event_dispatcher = event_dispatcher
296 if not connection_pool:
297 if not retry_on_error:
298 retry_on_error = []
299 kwargs = {
300 "db": db,
301 "username": username,
302 "password": password,
303 "socket_timeout": socket_timeout,
304 "encoding": encoding,
305 "encoding_errors": encoding_errors,
306 "decode_responses": decode_responses,
307 "retry_on_error": retry_on_error,
308 "retry": copy.deepcopy(retry),
309 "max_connections": max_connections,
310 "health_check_interval": health_check_interval,
311 "client_name": client_name,
312 "lib_name": lib_name,
313 "lib_version": lib_version,
314 "redis_connect_func": redis_connect_func,
315 "credential_provider": credential_provider,
316 "protocol": protocol,
317 }
318 # based on input, setup appropriate connection args
319 if unix_socket_path is not None:
320 kwargs.update(
321 {
322 "path": unix_socket_path,
323 "connection_class": UnixDomainSocketConnection,
324 }
325 )
326 else:
327 # TCP specific options
328 kwargs.update(
329 {
330 "host": host,
331 "port": port,
332 "socket_connect_timeout": socket_connect_timeout,
333 "socket_keepalive": socket_keepalive,
334 "socket_keepalive_options": socket_keepalive_options,
335 }
336 )
338 if ssl:
339 kwargs.update(
340 {
341 "connection_class": SSLConnection,
342 "ssl_keyfile": ssl_keyfile,
343 "ssl_certfile": ssl_certfile,
344 "ssl_cert_reqs": ssl_cert_reqs,
345 "ssl_include_verify_flags": ssl_include_verify_flags,
346 "ssl_exclude_verify_flags": ssl_exclude_verify_flags,
347 "ssl_ca_certs": ssl_ca_certs,
348 "ssl_ca_data": ssl_ca_data,
349 "ssl_check_hostname": ssl_check_hostname,
350 "ssl_password": ssl_password,
351 "ssl_ca_path": ssl_ca_path,
352 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
353 "ssl_validate_ocsp": ssl_validate_ocsp,
354 "ssl_ocsp_context": ssl_ocsp_context,
355 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
356 "ssl_min_version": ssl_min_version,
357 "ssl_ciphers": ssl_ciphers,
358 }
359 )
360 if (cache_config or cache) and protocol in [3, "3"]:
361 kwargs.update(
362 {
363 "cache": cache,
364 "cache_config": cache_config,
365 }
366 )
367 maint_notifications_enabled = (
368 maint_notifications_config and maint_notifications_config.enabled
369 )
370 if maint_notifications_enabled and protocol not in [
371 3,
372 "3",
373 ]:
374 raise RedisError(
375 "Maintenance notifications handlers on connection are only supported with RESP version 3"
376 )
377 if maint_notifications_config:
378 kwargs.update(
379 {
380 "maint_notifications_config": maint_notifications_config,
381 }
382 )
383 connection_pool = ConnectionPool(**kwargs)
384 self._event_dispatcher.dispatch(
385 AfterPooledConnectionsInstantiationEvent(
386 [connection_pool], ClientType.SYNC, credential_provider
387 )
388 )
389 self.auto_close_connection_pool = True
390 else:
391 self.auto_close_connection_pool = False
392 self._event_dispatcher.dispatch(
393 AfterPooledConnectionsInstantiationEvent(
394 [connection_pool], ClientType.SYNC, credential_provider
395 )
396 )
398 self.connection_pool = connection_pool
400 if (cache_config or cache) and self.connection_pool.get_protocol() not in [
401 3,
402 "3",
403 ]:
404 raise RedisError("Client caching is only supported with RESP version 3")
406 self.single_connection_lock = threading.RLock()
407 self.connection = None
408 self._single_connection_client = single_connection_client
409 if self._single_connection_client:
410 self.connection = self.connection_pool.get_connection()
411 self._event_dispatcher.dispatch(
412 AfterSingleConnectionInstantiationEvent(
413 self.connection, ClientType.SYNC, self.single_connection_lock
414 )
415 )
417 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks)
419 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
420 self.response_callbacks.update(_RedisCallbacksRESP3)
421 else:
422 self.response_callbacks.update(_RedisCallbacksRESP2)
424 def __repr__(self) -> str:
425 return (
426 f"<{type(self).__module__}.{type(self).__name__}"
427 f"({repr(self.connection_pool)})>"
428 )
430 def get_encoder(self) -> "Encoder":
431 """Get the connection pool's encoder"""
432 return self.connection_pool.get_encoder()
434 def get_connection_kwargs(self) -> Dict:
435 """Get the connection's key-word arguments"""
436 return self.connection_pool.connection_kwargs
438 def get_retry(self) -> Optional[Retry]:
439 return self.get_connection_kwargs().get("retry")
441 def set_retry(self, retry: Retry) -> None:
442 self.get_connection_kwargs().update({"retry": retry})
443 self.connection_pool.set_retry(retry)
445 def set_response_callback(self, command: str, callback: Callable) -> None:
446 """Set a custom Response Callback"""
447 self.response_callbacks[command] = callback
449 def load_external_module(self, funcname, func) -> None:
450 """
451 This function can be used to add externally defined redis modules,
452 and their namespaces to the redis client.
454 funcname - A string containing the name of the function to create
455 func - The function, being added to this class.
457 ex: Assume that one has a custom redis module named foomod that
458 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
459 To load function functions into this namespace:
461 from redis import Redis
462 from foomodule import F
463 r = Redis()
464 r.load_external_module("foo", F)
465 r.foo().dothing('your', 'arguments')
467 For a concrete example see the reimport of the redisjson module in
468 tests/test_connection.py::test_loading_external_modules
469 """
470 setattr(self, funcname, func)
472 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
473 """
474 Return a new pipeline object that can queue multiple commands for
475 later execution. ``transaction`` indicates whether all commands
476 should be executed atomically. Apart from making a group of operations
477 atomic, pipelines are useful for reducing the back-and-forth overhead
478 between the client and server.
479 """
480 return Pipeline(
481 self.connection_pool, self.response_callbacks, transaction, shard_hint
482 )
484 def transaction(
485 self, func: Callable[["Pipeline"], None], *watches, **kwargs
486 ) -> Union[List[Any], Any, None]:
487 """
488 Convenience method for executing the callable `func` as a transaction
489 while watching all keys specified in `watches`. The 'func' callable
490 should expect a single argument which is a Pipeline object.
491 """
492 shard_hint = kwargs.pop("shard_hint", None)
493 value_from_callable = kwargs.pop("value_from_callable", False)
494 watch_delay = kwargs.pop("watch_delay", None)
495 with self.pipeline(True, shard_hint) as pipe:
496 while True:
497 try:
498 if watches:
499 pipe.watch(*watches)
500 func_value = func(pipe)
501 exec_value = pipe.execute()
502 return func_value if value_from_callable else exec_value
503 except WatchError:
504 if watch_delay is not None and watch_delay > 0:
505 time.sleep(watch_delay)
506 continue
508 def lock(
509 self,
510 name: str,
511 timeout: Optional[float] = None,
512 sleep: float = 0.1,
513 blocking: bool = True,
514 blocking_timeout: Optional[float] = None,
515 lock_class: Union[None, Any] = None,
516 thread_local: bool = True,
517 raise_on_release_error: bool = True,
518 ):
519 """
520 Return a new Lock object using key ``name`` that mimics
521 the behavior of threading.Lock.
523 If specified, ``timeout`` indicates a maximum life for the lock.
524 By default, it will remain locked until release() is called.
526 ``sleep`` indicates the amount of time to sleep per loop iteration
527 when the lock is in blocking mode and another client is currently
528 holding the lock.
530 ``blocking`` indicates whether calling ``acquire`` should block until
531 the lock has been acquired or to fail immediately, causing ``acquire``
532 to return False and the lock not being acquired. Defaults to True.
533 Note this value can be overridden by passing a ``blocking``
534 argument to ``acquire``.
536 ``blocking_timeout`` indicates the maximum amount of time in seconds to
537 spend trying to acquire the lock. A value of ``None`` indicates
538 continue trying forever. ``blocking_timeout`` can be specified as a
539 float or integer, both representing the number of seconds to wait.
541 ``lock_class`` forces the specified lock implementation. Note that as
542 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
543 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
544 you have created your own custom lock class.
546 ``thread_local`` indicates whether the lock token is placed in
547 thread-local storage. By default, the token is placed in thread local
548 storage so that a thread only sees its token, not a token set by
549 another thread. Consider the following timeline:
551 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
552 thread-1 sets the token to "abc"
553 time: 1, thread-2 blocks trying to acquire `my-lock` using the
554 Lock instance.
555 time: 5, thread-1 has not yet completed. redis expires the lock
556 key.
557 time: 5, thread-2 acquired `my-lock` now that it's available.
558 thread-2 sets the token to "xyz"
559 time: 6, thread-1 finishes its work and calls release(). if the
560 token is *not* stored in thread local storage, then
561 thread-1 would see the token value as "xyz" and would be
562 able to successfully release the thread-2's lock.
564 ``raise_on_release_error`` indicates whether to raise an exception when
565 the lock is no longer owned when exiting the context manager. By default,
566 this is True, meaning an exception will be raised. If False, the warning
567 will be logged and the exception will be suppressed.
569 In some use cases it's necessary to disable thread local storage. For
570 example, if you have code where one thread acquires a lock and passes
571 that lock instance to a worker thread to release later. If thread
572 local storage isn't disabled in this case, the worker thread won't see
573 the token set by the thread that acquired the lock. Our assumption
574 is that these cases aren't common and as such default to using
575 thread local storage."""
576 if lock_class is None:
577 lock_class = Lock
578 return lock_class(
579 self,
580 name,
581 timeout=timeout,
582 sleep=sleep,
583 blocking=blocking,
584 blocking_timeout=blocking_timeout,
585 thread_local=thread_local,
586 raise_on_release_error=raise_on_release_error,
587 )
589 def pubsub(self, **kwargs):
590 """
591 Return a Publish/Subscribe object. With this object, you can
592 subscribe to channels and listen for messages that get published to
593 them.
594 """
595 return PubSub(
596 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs
597 )
599 def monitor(self):
600 return Monitor(self.connection_pool)
602 def client(self):
603 return self.__class__(
604 connection_pool=self.connection_pool,
605 single_connection_client=True,
606 )
608 def __enter__(self):
609 return self
611 def __exit__(self, exc_type, exc_value, traceback):
612 self.close()
614 def __del__(self):
615 try:
616 self.close()
617 except Exception:
618 pass
620 def close(self) -> None:
621 # In case a connection property does not yet exist
622 # (due to a crash earlier in the Redis() constructor), return
623 # immediately as there is nothing to clean-up.
624 if not hasattr(self, "connection"):
625 return
627 conn = self.connection
628 if conn:
629 self.connection = None
630 self.connection_pool.release(conn)
632 if self.auto_close_connection_pool:
633 self.connection_pool.disconnect()
635 def _send_command_parse_response(self, conn, command_name, *args, **options):
636 """
637 Send a command and parse the response
638 """
639 conn.send_command(*args, **options)
640 return self.parse_response(conn, command_name, **options)
642 def _close_connection(self, conn) -> None:
643 """
644 Close the connection before retrying.
646 The supported exceptions are already checked in the
647 retry object so we don't need to do it here.
649 After we disconnect the connection, it will try to reconnect and
650 do a health check as part of the send_command logic(on connection level).
651 """
653 conn.disconnect()
655 # COMMAND EXECUTION AND PROTOCOL PARSING
656 def execute_command(self, *args, **options):
657 return self._execute_command(*args, **options)
659 def _execute_command(self, *args, **options):
660 """Execute a command and return a parsed response"""
661 pool = self.connection_pool
662 command_name = args[0]
663 conn = self.connection or pool.get_connection()
665 if self._single_connection_client:
666 self.single_connection_lock.acquire()
667 try:
668 return conn.retry.call_with_retry(
669 lambda: self._send_command_parse_response(
670 conn, command_name, *args, **options
671 ),
672 lambda _: self._close_connection(conn),
673 )
675 finally:
676 if conn and conn.should_reconnect():
677 self._close_connection(conn)
678 conn.connect()
679 if self._single_connection_client:
680 self.single_connection_lock.release()
681 if not self.connection:
682 pool.release(conn)
684 def parse_response(self, connection, command_name, **options):
685 """Parses a response from the Redis server"""
686 try:
687 if NEVER_DECODE in options:
688 response = connection.read_response(disable_decoding=True)
689 options.pop(NEVER_DECODE)
690 else:
691 response = connection.read_response()
692 except ResponseError:
693 if EMPTY_RESPONSE in options:
694 return options[EMPTY_RESPONSE]
695 raise
697 if EMPTY_RESPONSE in options:
698 options.pop(EMPTY_RESPONSE)
700 # Remove keys entry, it needs only for cache.
701 options.pop("keys", None)
703 if command_name in self.response_callbacks:
704 return self.response_callbacks[command_name](response, **options)
705 return response
707 def get_cache(self) -> Optional[CacheInterface]:
708 return self.connection_pool.cache
711StrictRedis = Redis
714class Monitor:
715 """
716 Monitor is useful for handling the MONITOR command to the redis server.
717 next_command() method returns one command from monitor
718 listen() method yields commands from monitor.
719 """
721 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)")
722 command_re = re.compile(r'"(.*?)(?<!\\)"')
724 def __init__(self, connection_pool):
725 self.connection_pool = connection_pool
726 self.connection = self.connection_pool.get_connection()
728 def __enter__(self):
729 self._start_monitor()
730 return self
732 def __exit__(self, *args):
733 self.connection.disconnect()
734 self.connection_pool.release(self.connection)
736 def next_command(self):
737 """Parse the response from a monitor command"""
738 response = self.connection.read_response()
740 if response is None:
741 return None
743 if isinstance(response, bytes):
744 response = self.connection.encoder.decode(response, force=True)
746 command_time, command_data = response.split(" ", 1)
747 m = self.monitor_re.match(command_data)
748 db_id, client_info, command = m.groups()
749 command = " ".join(self.command_re.findall(command))
750 # Redis escapes double quotes because each piece of the command
751 # string is surrounded by double quotes. We don't have that
752 # requirement so remove the escaping and leave the quote.
753 command = command.replace('\\"', '"')
755 if client_info == "lua":
756 client_address = "lua"
757 client_port = ""
758 client_type = "lua"
759 elif client_info.startswith("unix"):
760 client_address = "unix"
761 client_port = client_info[5:]
762 client_type = "unix"
763 else:
764 # use rsplit as ipv6 addresses contain colons
765 client_address, client_port = client_info.rsplit(":", 1)
766 client_type = "tcp"
767 return {
768 "time": float(command_time),
769 "db": int(db_id),
770 "client_address": client_address,
771 "client_port": client_port,
772 "client_type": client_type,
773 "command": command,
774 }
776 def listen(self):
777 """Listen for commands coming to the server."""
778 while True:
779 yield self.next_command()
781 def _start_monitor(self):
782 self.connection.send_command("MONITOR")
783 # check that monitor returns 'OK', but don't return it to user
784 response = self.connection.read_response()
786 if not bool_ok(response):
787 raise RedisError(f"MONITOR failed: {response}")
790class PubSub:
791 """
792 PubSub provides publish, subscribe and listen support to Redis channels.
794 After subscribing to one or more channels, the listen() method will block
795 until a message arrives on one of the subscribed channels. That message
796 will be returned and it's safe to start listening again.
797 """
799 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage")
800 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe")
801 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
803 def __init__(
804 self,
805 connection_pool,
806 shard_hint=None,
807 ignore_subscribe_messages: bool = False,
808 encoder: Optional["Encoder"] = None,
809 push_handler_func: Union[None, Callable[[str], None]] = None,
810 event_dispatcher: Optional["EventDispatcher"] = None,
811 ):
812 self.connection_pool = connection_pool
813 self.shard_hint = shard_hint
814 self.ignore_subscribe_messages = ignore_subscribe_messages
815 self.connection = None
816 self.subscribed_event = threading.Event()
817 # we need to know the encoding options for this connection in order
818 # to lookup channel and pattern names for callback handlers.
819 self.encoder = encoder
820 self.push_handler_func = push_handler_func
821 if event_dispatcher is None:
822 self._event_dispatcher = EventDispatcher()
823 else:
824 self._event_dispatcher = event_dispatcher
826 self._lock = threading.RLock()
827 if self.encoder is None:
828 self.encoder = self.connection_pool.get_encoder()
829 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
830 if self.encoder.decode_responses:
831 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
832 else:
833 self.health_check_response = [b"pong", self.health_check_response_b]
834 if self.push_handler_func is None:
835 _set_info_logger()
836 self.reset()
838 def __enter__(self) -> "PubSub":
839 return self
841 def __exit__(self, exc_type, exc_value, traceback) -> None:
842 self.reset()
844 def __del__(self) -> None:
845 try:
846 # if this object went out of scope prior to shutting down
847 # subscriptions, close the connection manually before
848 # returning it to the connection pool
849 self.reset()
850 except Exception:
851 pass
853 def reset(self) -> None:
854 if self.connection:
855 self.connection.disconnect()
856 self.connection.deregister_connect_callback(self.on_connect)
857 self.connection_pool.release(self.connection)
858 self.connection = None
859 self.health_check_response_counter = 0
860 self.channels = {}
861 self.pending_unsubscribe_channels = set()
862 self.shard_channels = {}
863 self.pending_unsubscribe_shard_channels = set()
864 self.patterns = {}
865 self.pending_unsubscribe_patterns = set()
866 self.subscribed_event.clear()
868 def close(self) -> None:
869 self.reset()
871 def on_connect(self, connection) -> None:
872 "Re-subscribe to any channels and patterns previously subscribed to"
873 # NOTE: for python3, we can't pass bytestrings as keyword arguments
874 # so we need to decode channel/pattern names back to unicode strings
875 # before passing them to [p]subscribe.
876 self.pending_unsubscribe_channels.clear()
877 self.pending_unsubscribe_patterns.clear()
878 self.pending_unsubscribe_shard_channels.clear()
879 if self.channels:
880 channels = {
881 self.encoder.decode(k, force=True): v for k, v in self.channels.items()
882 }
883 self.subscribe(**channels)
884 if self.patterns:
885 patterns = {
886 self.encoder.decode(k, force=True): v for k, v in self.patterns.items()
887 }
888 self.psubscribe(**patterns)
889 if self.shard_channels:
890 shard_channels = {
891 self.encoder.decode(k, force=True): v
892 for k, v in self.shard_channels.items()
893 }
894 self.ssubscribe(**shard_channels)
896 @property
897 def subscribed(self) -> bool:
898 """Indicates if there are subscriptions to any channels or patterns"""
899 return self.subscribed_event.is_set()
901 def execute_command(self, *args):
902 """Execute a publish/subscribe command"""
904 # NOTE: don't parse the response in this function -- it could pull a
905 # legitimate message off the stack if the connection is already
906 # subscribed to one or more channels
908 if self.connection is None:
909 self.connection = self.connection_pool.get_connection()
910 # register a callback that re-subscribes to any channels we
911 # were listening to when we were disconnected
912 self.connection.register_connect_callback(self.on_connect)
913 if self.push_handler_func is not None:
914 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
915 self._event_dispatcher.dispatch(
916 AfterPubSubConnectionInstantiationEvent(
917 self.connection, self.connection_pool, ClientType.SYNC, self._lock
918 )
919 )
920 connection = self.connection
921 kwargs = {"check_health": not self.subscribed}
922 if not self.subscribed:
923 self.clean_health_check_responses()
924 with self._lock:
925 self._execute(connection, connection.send_command, *args, **kwargs)
927 def clean_health_check_responses(self) -> None:
928 """
929 If any health check responses are present, clean them
930 """
931 ttl = 10
932 conn = self.connection
933 while conn and self.health_check_response_counter > 0 and ttl > 0:
934 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
935 response = self._execute(conn, conn.read_response)
936 if self.is_health_check_response(response):
937 self.health_check_response_counter -= 1
938 else:
939 raise PubSubError(
940 "A non health check response was cleaned by "
941 "execute_command: {}".format(response)
942 )
943 ttl -= 1
945 def _reconnect(self, conn) -> None:
946 """
947 The supported exceptions are already checked in the
948 retry object so we don't need to do it here.
950 In this error handler we are trying to reconnect to the server.
951 """
952 conn.disconnect()
953 conn.connect()
955 def _execute(self, conn, command, *args, **kwargs):
956 """
957 Connect manually upon disconnection. If the Redis server is down,
958 this will fail and raise a ConnectionError as desired.
959 After reconnection, the ``on_connect`` callback should have been
960 called by the # connection to resubscribe us to any channels and
961 patterns we were previously listening to
962 """
964 if conn.should_reconnect():
965 self._reconnect(conn)
967 response = conn.retry.call_with_retry(
968 lambda: command(*args, **kwargs),
969 lambda _: self._reconnect(conn),
970 )
972 return response
974 def parse_response(self, block=True, timeout=0):
975 """Parse the response from a publish/subscribe command"""
976 conn = self.connection
977 if conn is None:
978 raise RuntimeError(
979 "pubsub connection not set: "
980 "did you forget to call subscribe() or psubscribe()?"
981 )
983 self.check_health()
985 def try_read():
986 if not block:
987 if not conn.can_read(timeout=timeout):
988 return None
989 else:
990 conn.connect()
991 return conn.read_response(disconnect_on_error=False, push_request=True)
993 response = self._execute(conn, try_read)
995 if self.is_health_check_response(response):
996 # ignore the health check message as user might not expect it
997 self.health_check_response_counter -= 1
998 return None
999 return response
1001 def is_health_check_response(self, response) -> bool:
1002 """
1003 Check if the response is a health check response.
1004 If there are no subscriptions redis responds to PING command with a
1005 bulk response, instead of a multi-bulk with "pong" and the response.
1006 """
1007 return response in [
1008 self.health_check_response, # If there was a subscription
1009 self.health_check_response_b, # If there wasn't
1010 ]
1012 def check_health(self) -> None:
1013 conn = self.connection
1014 if conn is None:
1015 raise RuntimeError(
1016 "pubsub connection not set: "
1017 "did you forget to call subscribe() or psubscribe()?"
1018 )
1020 if conn.health_check_interval and time.monotonic() > conn.next_health_check:
1021 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
1022 self.health_check_response_counter += 1
1024 def _normalize_keys(self, data) -> Dict:
1025 """
1026 normalize channel/pattern names to be either bytes or strings
1027 based on whether responses are automatically decoded. this saves us
1028 from coercing the value for each message coming in.
1029 """
1030 encode = self.encoder.encode
1031 decode = self.encoder.decode
1032 return {decode(encode(k)): v for k, v in data.items()}
1034 def psubscribe(self, *args, **kwargs):
1035 """
1036 Subscribe to channel patterns. Patterns supplied as keyword arguments
1037 expect a pattern name as the key and a callable as the value. A
1038 pattern's callable will be invoked automatically when a message is
1039 received on that pattern rather than producing a message via
1040 ``listen()``.
1041 """
1042 if args:
1043 args = list_or_args(args[0], args[1:])
1044 new_patterns = dict.fromkeys(args)
1045 new_patterns.update(kwargs)
1046 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
1047 # update the patterns dict AFTER we send the command. we don't want to
1048 # subscribe twice to these patterns, once for the command and again
1049 # for the reconnection.
1050 new_patterns = self._normalize_keys(new_patterns)
1051 self.patterns.update(new_patterns)
1052 if not self.subscribed:
1053 # Set the subscribed_event flag to True
1054 self.subscribed_event.set()
1055 # Clear the health check counter
1056 self.health_check_response_counter = 0
1057 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1058 return ret_val
1060 def punsubscribe(self, *args):
1061 """
1062 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1063 all patterns.
1064 """
1065 if args:
1066 args = list_or_args(args[0], args[1:])
1067 patterns = self._normalize_keys(dict.fromkeys(args))
1068 else:
1069 patterns = self.patterns
1070 self.pending_unsubscribe_patterns.update(patterns)
1071 return self.execute_command("PUNSUBSCRIBE", *args)
1073 def subscribe(self, *args, **kwargs):
1074 """
1075 Subscribe to channels. Channels supplied as keyword arguments expect
1076 a channel name as the key and a callable as the value. A channel's
1077 callable will be invoked automatically when a message is received on
1078 that channel rather than producing a message via ``listen()`` or
1079 ``get_message()``.
1080 """
1081 if args:
1082 args = list_or_args(args[0], args[1:])
1083 new_channels = dict.fromkeys(args)
1084 new_channels.update(kwargs)
1085 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1086 # update the channels dict AFTER we send the command. we don't want to
1087 # subscribe twice to these channels, once for the command and again
1088 # for the reconnection.
1089 new_channels = self._normalize_keys(new_channels)
1090 self.channels.update(new_channels)
1091 if not self.subscribed:
1092 # Set the subscribed_event flag to True
1093 self.subscribed_event.set()
1094 # Clear the health check counter
1095 self.health_check_response_counter = 0
1096 self.pending_unsubscribe_channels.difference_update(new_channels)
1097 return ret_val
1099 def unsubscribe(self, *args):
1100 """
1101 Unsubscribe from the supplied channels. If empty, unsubscribe from
1102 all channels
1103 """
1104 if args:
1105 args = list_or_args(args[0], args[1:])
1106 channels = self._normalize_keys(dict.fromkeys(args))
1107 else:
1108 channels = self.channels
1109 self.pending_unsubscribe_channels.update(channels)
1110 return self.execute_command("UNSUBSCRIBE", *args)
1112 def ssubscribe(self, *args, target_node=None, **kwargs):
1113 """
1114 Subscribes the client to the specified shard channels.
1115 Channels supplied as keyword arguments expect a channel name as the key
1116 and a callable as the value. A channel's callable will be invoked automatically
1117 when a message is received on that channel rather than producing a message via
1118 ``listen()`` or ``get_sharded_message()``.
1119 """
1120 if args:
1121 args = list_or_args(args[0], args[1:])
1122 new_s_channels = dict.fromkeys(args)
1123 new_s_channels.update(kwargs)
1124 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys())
1125 # update the s_channels dict AFTER we send the command. we don't want to
1126 # subscribe twice to these channels, once for the command and again
1127 # for the reconnection.
1128 new_s_channels = self._normalize_keys(new_s_channels)
1129 self.shard_channels.update(new_s_channels)
1130 if not self.subscribed:
1131 # Set the subscribed_event flag to True
1132 self.subscribed_event.set()
1133 # Clear the health check counter
1134 self.health_check_response_counter = 0
1135 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels)
1136 return ret_val
1138 def sunsubscribe(self, *args, target_node=None):
1139 """
1140 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
1141 all shard_channels
1142 """
1143 if args:
1144 args = list_or_args(args[0], args[1:])
1145 s_channels = self._normalize_keys(dict.fromkeys(args))
1146 else:
1147 s_channels = self.shard_channels
1148 self.pending_unsubscribe_shard_channels.update(s_channels)
1149 return self.execute_command("SUNSUBSCRIBE", *args)
1151 def listen(self):
1152 "Listen for messages on channels this client has been subscribed to"
1153 while self.subscribed:
1154 response = self.handle_message(self.parse_response(block=True))
1155 if response is not None:
1156 yield response
1158 def get_message(
1159 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
1160 ):
1161 """
1162 Get the next message if one is available, otherwise None.
1164 If timeout is specified, the system will wait for `timeout` seconds
1165 before returning. Timeout should be specified as a floating point
1166 number, or None, to wait indefinitely.
1167 """
1168 if not self.subscribed:
1169 # Wait for subscription
1170 start_time = time.monotonic()
1171 if self.subscribed_event.wait(timeout) is True:
1172 # The connection was subscribed during the timeout time frame.
1173 # The timeout should be adjusted based on the time spent
1174 # waiting for the subscription
1175 time_spent = time.monotonic() - start_time
1176 timeout = max(0.0, timeout - time_spent)
1177 else:
1178 # The connection isn't subscribed to any channels or patterns,
1179 # so no messages are available
1180 return None
1182 response = self.parse_response(block=(timeout is None), timeout=timeout)
1184 if response:
1185 return self.handle_message(response, ignore_subscribe_messages)
1186 return None
1188 get_sharded_message = get_message
1190 def ping(self, message: Union[str, None] = None) -> bool:
1191 """
1192 Ping the Redis server to test connectivity.
1194 Sends a PING command to the Redis server and returns True if the server
1195 responds with "PONG".
1196 """
1197 args = ["PING", message] if message is not None else ["PING"]
1198 return self.execute_command(*args)
1200 def handle_message(self, response, ignore_subscribe_messages=False):
1201 """
1202 Parses a pub/sub message. If the channel or pattern was subscribed to
1203 with a message handler, the handler is invoked instead of a parsed
1204 message being returned.
1205 """
1206 if response is None:
1207 return None
1208 if isinstance(response, bytes):
1209 response = [b"pong", response] if response != b"PONG" else [b"pong", b""]
1211 message_type = str_if_bytes(response[0])
1212 if message_type == "pmessage":
1213 message = {
1214 "type": message_type,
1215 "pattern": response[1],
1216 "channel": response[2],
1217 "data": response[3],
1218 }
1219 elif message_type == "pong":
1220 message = {
1221 "type": message_type,
1222 "pattern": None,
1223 "channel": None,
1224 "data": response[1],
1225 }
1226 else:
1227 message = {
1228 "type": message_type,
1229 "pattern": None,
1230 "channel": response[1],
1231 "data": response[2],
1232 }
1234 # if this is an unsubscribe message, remove it from memory
1235 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1236 if message_type == "punsubscribe":
1237 pattern = response[1]
1238 if pattern in self.pending_unsubscribe_patterns:
1239 self.pending_unsubscribe_patterns.remove(pattern)
1240 self.patterns.pop(pattern, None)
1241 elif message_type == "sunsubscribe":
1242 s_channel = response[1]
1243 if s_channel in self.pending_unsubscribe_shard_channels:
1244 self.pending_unsubscribe_shard_channels.remove(s_channel)
1245 self.shard_channels.pop(s_channel, None)
1246 else:
1247 channel = response[1]
1248 if channel in self.pending_unsubscribe_channels:
1249 self.pending_unsubscribe_channels.remove(channel)
1250 self.channels.pop(channel, None)
1251 if not self.channels and not self.patterns and not self.shard_channels:
1252 # There are no subscriptions anymore, set subscribed_event flag
1253 # to false
1254 self.subscribed_event.clear()
1256 if message_type in self.PUBLISH_MESSAGE_TYPES:
1257 # if there's a message handler, invoke it
1258 if message_type == "pmessage":
1259 handler = self.patterns.get(message["pattern"], None)
1260 elif message_type == "smessage":
1261 handler = self.shard_channels.get(message["channel"], None)
1262 else:
1263 handler = self.channels.get(message["channel"], None)
1264 if handler:
1265 handler(message)
1266 return None
1267 elif message_type != "pong":
1268 # this is a subscribe/unsubscribe message. ignore if we don't
1269 # want them
1270 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1271 return None
1273 return message
1275 def run_in_thread(
1276 self,
1277 sleep_time: float = 0.0,
1278 daemon: bool = False,
1279 exception_handler: Optional[Callable] = None,
1280 pubsub=None,
1281 sharded_pubsub: bool = False,
1282 ) -> "PubSubWorkerThread":
1283 for channel, handler in self.channels.items():
1284 if handler is None:
1285 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1286 for pattern, handler in self.patterns.items():
1287 if handler is None:
1288 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1289 for s_channel, handler in self.shard_channels.items():
1290 if handler is None:
1291 raise PubSubError(
1292 f"Shard Channel: '{s_channel}' has no handler registered"
1293 )
1295 pubsub = self if pubsub is None else pubsub
1296 thread = PubSubWorkerThread(
1297 pubsub,
1298 sleep_time,
1299 daemon=daemon,
1300 exception_handler=exception_handler,
1301 sharded_pubsub=sharded_pubsub,
1302 )
1303 thread.start()
1304 return thread
1307class PubSubWorkerThread(threading.Thread):
1308 def __init__(
1309 self,
1310 pubsub,
1311 sleep_time: float,
1312 daemon: bool = False,
1313 exception_handler: Union[
1314 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
1315 ] = None,
1316 sharded_pubsub: bool = False,
1317 ):
1318 super().__init__()
1319 self.daemon = daemon
1320 self.pubsub = pubsub
1321 self.sleep_time = sleep_time
1322 self.exception_handler = exception_handler
1323 self.sharded_pubsub = sharded_pubsub
1324 self._running = threading.Event()
1326 def run(self) -> None:
1327 if self._running.is_set():
1328 return
1329 self._running.set()
1330 pubsub = self.pubsub
1331 sleep_time = self.sleep_time
1332 while self._running.is_set():
1333 try:
1334 if not self.sharded_pubsub:
1335 pubsub.get_message(
1336 ignore_subscribe_messages=True, timeout=sleep_time
1337 )
1338 else:
1339 pubsub.get_sharded_message(
1340 ignore_subscribe_messages=True, timeout=sleep_time
1341 )
1342 except BaseException as e:
1343 if self.exception_handler is None:
1344 raise
1345 self.exception_handler(e, pubsub, self)
1346 pubsub.close()
1348 def stop(self) -> None:
1349 # trip the flag so the run loop exits. the run loop will
1350 # close the pubsub connection, which disconnects the socket
1351 # and returns the connection to the pool.
1352 self._running.clear()
1355class Pipeline(Redis):
1356 """
1357 Pipelines provide a way to transmit multiple commands to the Redis server
1358 in one transmission. This is convenient for batch processing, such as
1359 saving all the values in a list to Redis.
1361 All commands executed within a pipeline(when running in transactional mode,
1362 which is the default behavior) are wrapped with MULTI and EXEC
1363 calls. This guarantees all commands executed in the pipeline will be
1364 executed atomically.
1366 Any command raising an exception does *not* halt the execution of
1367 subsequent commands in the pipeline. Instead, the exception is caught
1368 and its instance is placed into the response list returned by execute().
1369 Code iterating over the response list should be able to deal with an
1370 instance of an exception as a potential value. In general, these will be
1371 ResponseError exceptions, such as those raised when issuing a command
1372 on a key of a different datatype.
1373 """
1375 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1377 def __init__(
1378 self,
1379 connection_pool: ConnectionPool,
1380 response_callbacks,
1381 transaction,
1382 shard_hint,
1383 ):
1384 self.connection_pool = connection_pool
1385 self.connection: Optional[Connection] = None
1386 self.response_callbacks = response_callbacks
1387 self.transaction = transaction
1388 self.shard_hint = shard_hint
1389 self.watching = False
1390 self.command_stack = []
1391 self.scripts: Set[Script] = set()
1392 self.explicit_transaction = False
1394 def __enter__(self) -> "Pipeline":
1395 return self
1397 def __exit__(self, exc_type, exc_value, traceback):
1398 self.reset()
1400 def __del__(self):
1401 try:
1402 self.reset()
1403 except Exception:
1404 pass
1406 def __len__(self) -> int:
1407 return len(self.command_stack)
1409 def __bool__(self) -> bool:
1410 """Pipeline instances should always evaluate to True"""
1411 return True
1413 def reset(self) -> None:
1414 self.command_stack = []
1415 self.scripts = set()
1416 # make sure to reset the connection state in the event that we were
1417 # watching something
1418 if self.watching and self.connection:
1419 try:
1420 # call this manually since our unwatch or
1421 # immediate_execute_command methods can call reset()
1422 self.connection.send_command("UNWATCH")
1423 self.connection.read_response()
1424 except ConnectionError:
1425 # disconnect will also remove any previous WATCHes
1426 self.connection.disconnect()
1427 # clean up the other instance attributes
1428 self.watching = False
1429 self.explicit_transaction = False
1431 # we can safely return the connection to the pool here since we're
1432 # sure we're no longer WATCHing anything
1433 if self.connection:
1434 self.connection_pool.release(self.connection)
1435 self.connection = None
1437 def close(self) -> None:
1438 """Close the pipeline"""
1439 self.reset()
1441 def multi(self) -> None:
1442 """
1443 Start a transactional block of the pipeline after WATCH commands
1444 are issued. End the transactional block with `execute`.
1445 """
1446 if self.explicit_transaction:
1447 raise RedisError("Cannot issue nested calls to MULTI")
1448 if self.command_stack:
1449 raise RedisError(
1450 "Commands without an initial WATCH have already been issued"
1451 )
1452 self.explicit_transaction = True
1454 def execute_command(self, *args, **kwargs):
1455 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1456 return self.immediate_execute_command(*args, **kwargs)
1457 return self.pipeline_execute_command(*args, **kwargs)
1459 def _disconnect_reset_raise_on_watching(
1460 self,
1461 conn: AbstractConnection,
1462 error: Exception,
1463 ) -> None:
1464 """
1465 Close the connection reset watching state and
1466 raise an exception if we were watching.
1468 The supported exceptions are already checked in the
1469 retry object so we don't need to do it here.
1471 After we disconnect the connection, it will try to reconnect and
1472 do a health check as part of the send_command logic(on connection level).
1473 """
1474 conn.disconnect()
1476 # if we were already watching a variable, the watch is no longer
1477 # valid since this connection has died. raise a WatchError, which
1478 # indicates the user should retry this transaction.
1479 if self.watching:
1480 self.reset()
1481 raise WatchError(
1482 f"A {type(error).__name__} occurred while watching one or more keys"
1483 )
1485 def immediate_execute_command(self, *args, **options):
1486 """
1487 Execute a command immediately, but don't auto-retry on the supported
1488 errors for retry if we're already WATCHing a variable.
1489 Used when issuing WATCH or subsequent commands retrieving their values but before
1490 MULTI is called.
1491 """
1492 command_name = args[0]
1493 conn = self.connection
1494 # if this is the first call, we need a connection
1495 if not conn:
1496 conn = self.connection_pool.get_connection()
1497 self.connection = conn
1499 return conn.retry.call_with_retry(
1500 lambda: self._send_command_parse_response(
1501 conn, command_name, *args, **options
1502 ),
1503 lambda error: self._disconnect_reset_raise_on_watching(conn, error),
1504 )
1506 def pipeline_execute_command(self, *args, **options) -> "Pipeline":
1507 """
1508 Stage a command to be executed when execute() is next called
1510 Returns the current Pipeline object back so commands can be
1511 chained together, such as:
1513 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1515 At some other point, you can then run: pipe.execute(),
1516 which will execute all commands queued in the pipe.
1517 """
1518 self.command_stack.append((args, options))
1519 return self
1521 def _execute_transaction(
1522 self, connection: Connection, commands, raise_on_error
1523 ) -> List:
1524 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1525 all_cmds = connection.pack_commands(
1526 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1527 )
1528 connection.send_packed_command(all_cmds)
1529 errors = []
1531 # parse off the response for MULTI
1532 # NOTE: we need to handle ResponseErrors here and continue
1533 # so that we read all the additional command messages from
1534 # the socket
1535 try:
1536 self.parse_response(connection, "_")
1537 except ResponseError as e:
1538 errors.append((0, e))
1540 # and all the other commands
1541 for i, command in enumerate(commands):
1542 if EMPTY_RESPONSE in command[1]:
1543 errors.append((i, command[1][EMPTY_RESPONSE]))
1544 else:
1545 try:
1546 self.parse_response(connection, "_")
1547 except ResponseError as e:
1548 self.annotate_exception(e, i + 1, command[0])
1549 errors.append((i, e))
1551 # parse the EXEC.
1552 try:
1553 response = self.parse_response(connection, "_")
1554 except ExecAbortError:
1555 if errors:
1556 raise errors[0][1]
1557 raise
1559 # EXEC clears any watched keys
1560 self.watching = False
1562 if response is None:
1563 raise WatchError("Watched variable changed.")
1565 # put any parse errors into the response
1566 for i, e in errors:
1567 response.insert(i, e)
1569 if len(response) != len(commands):
1570 self.connection.disconnect()
1571 raise ResponseError(
1572 "Wrong number of response items from pipeline execution"
1573 )
1575 # find any errors in the response and raise if necessary
1576 if raise_on_error:
1577 self.raise_first_error(commands, response)
1579 # We have to run response callbacks manually
1580 data = []
1581 for r, cmd in zip(response, commands):
1582 if not isinstance(r, Exception):
1583 args, options = cmd
1584 # Remove keys entry, it needs only for cache.
1585 options.pop("keys", None)
1586 command_name = args[0]
1587 if command_name in self.response_callbacks:
1588 r = self.response_callbacks[command_name](r, **options)
1589 data.append(r)
1591 return data
1593 def _execute_pipeline(self, connection, commands, raise_on_error):
1594 # build up all commands into a single request to increase network perf
1595 all_cmds = connection.pack_commands([args for args, _ in commands])
1596 connection.send_packed_command(all_cmds)
1598 responses = []
1599 for args, options in commands:
1600 try:
1601 responses.append(self.parse_response(connection, args[0], **options))
1602 except ResponseError as e:
1603 responses.append(e)
1605 if raise_on_error:
1606 self.raise_first_error(commands, responses)
1608 return responses
1610 def raise_first_error(self, commands, response):
1611 for i, r in enumerate(response):
1612 if isinstance(r, ResponseError):
1613 self.annotate_exception(r, i + 1, commands[i][0])
1614 raise r
1616 def annotate_exception(self, exception, number, command):
1617 cmd = " ".join(map(safe_str, command))
1618 msg = (
1619 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
1620 f"caused error: {exception.args[0]}"
1621 )
1622 exception.args = (msg,) + exception.args[1:]
1624 def parse_response(self, connection, command_name, **options):
1625 result = Redis.parse_response(self, connection, command_name, **options)
1626 if command_name in self.UNWATCH_COMMANDS:
1627 self.watching = False
1628 elif command_name == "WATCH":
1629 self.watching = True
1630 return result
1632 def load_scripts(self):
1633 # make sure all scripts that are about to be run on this pipeline exist
1634 scripts = list(self.scripts)
1635 immediate = self.immediate_execute_command
1636 shas = [s.sha for s in scripts]
1637 # we can't use the normal script_* methods because they would just
1638 # get buffered in the pipeline.
1639 exists = immediate("SCRIPT EXISTS", *shas)
1640 if not all(exists):
1641 for s, exist in zip(scripts, exists):
1642 if not exist:
1643 s.sha = immediate("SCRIPT LOAD", s.script)
1645 def _disconnect_raise_on_watching(
1646 self,
1647 conn: AbstractConnection,
1648 error: Exception,
1649 ) -> None:
1650 """
1651 Close the connection, raise an exception if we were watching.
1653 The supported exceptions are already checked in the
1654 retry object so we don't need to do it here.
1656 After we disconnect the connection, it will try to reconnect and
1657 do a health check as part of the send_command logic(on connection level).
1658 """
1659 conn.disconnect()
1660 # if we were watching a variable, the watch is no longer valid
1661 # since this connection has died. raise a WatchError, which
1662 # indicates the user should retry this transaction.
1663 if self.watching:
1664 raise WatchError(
1665 f"A {type(error).__name__} occurred while watching one or more keys"
1666 )
1668 def execute(self, raise_on_error: bool = True) -> List[Any]:
1669 """Execute all the commands in the current pipeline"""
1670 stack = self.command_stack
1671 if not stack and not self.watching:
1672 return []
1673 if self.scripts:
1674 self.load_scripts()
1675 if self.transaction or self.explicit_transaction:
1676 execute = self._execute_transaction
1677 else:
1678 execute = self._execute_pipeline
1680 conn = self.connection
1681 if not conn:
1682 conn = self.connection_pool.get_connection()
1683 # assign to self.connection so reset() releases the connection
1684 # back to the pool after we're done
1685 self.connection = conn
1687 try:
1688 return conn.retry.call_with_retry(
1689 lambda: execute(conn, stack, raise_on_error),
1690 lambda error: self._disconnect_raise_on_watching(conn, error),
1691 )
1692 finally:
1693 # in reset() the connection is disconnected before returned to the pool if
1694 # it is marked for reconnect.
1695 self.reset()
1697 def discard(self):
1698 """
1699 Flushes all previously queued commands
1700 See: https://redis.io/commands/DISCARD
1701 """
1702 self.execute_command("DISCARD")
1704 def watch(self, *names):
1705 """Watches the values at keys ``names``"""
1706 if self.explicit_transaction:
1707 raise RedisError("Cannot issue a WATCH after a MULTI")
1708 return self.execute_command("WATCH", *names)
1710 def unwatch(self) -> bool:
1711 """Unwatches all previously specified keys"""
1712 return self.watching and self.execute_command("UNWATCH") or True