Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import re
3import threading
4import time
5from itertools import chain
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Callable,
10 Dict,
11 List,
12 Mapping,
13 Optional,
14 Set,
15 Type,
16 Union,
17)
19from redis._parsers.encoders import Encoder
20from redis._parsers.helpers import (
21 _RedisCallbacks,
22 _RedisCallbacksRESP2,
23 _RedisCallbacksRESP3,
24 bool_ok,
25)
26from redis.backoff import ExponentialWithJitterBackoff
27from redis.cache import CacheConfig, CacheInterface
28from redis.commands import (
29 CoreCommands,
30 RedisModuleCommands,
31 SentinelCommands,
32 list_or_args,
33)
34from redis.commands.core import Script
35from redis.connection import (
36 AbstractConnection,
37 Connection,
38 ConnectionPool,
39 SSLConnection,
40 UnixDomainSocketConnection,
41)
42from redis.credentials import CredentialProvider
43from redis.event import (
44 AfterPooledConnectionsInstantiationEvent,
45 AfterPubSubConnectionInstantiationEvent,
46 AfterSingleConnectionInstantiationEvent,
47 ClientType,
48 EventDispatcher,
49)
50from redis.exceptions import (
51 ConnectionError,
52 ExecAbortError,
53 PubSubError,
54 RedisError,
55 ResponseError,
56 WatchError,
57)
58from redis.lock import Lock
59from redis.maint_notifications import (
60 MaintNotificationsConfig,
61)
62from redis.retry import Retry
63from redis.utils import (
64 _set_info_logger,
65 deprecated_args,
66 get_lib_version,
67 safe_str,
68 str_if_bytes,
69 truncate_text,
70)
72if TYPE_CHECKING:
73 import ssl
75 import OpenSSL
77SYM_EMPTY = b""
78EMPTY_RESPONSE = "EMPTY_RESPONSE"
80# some responses (ie. dump) are binary, and just meant to never be decoded
81NEVER_DECODE = "NEVER_DECODE"
84class CaseInsensitiveDict(dict):
85 "Case insensitive dict implementation. Assumes string keys only."
87 def __init__(self, data: Dict[str, str]) -> None:
88 for k, v in data.items():
89 self[k.upper()] = v
91 def __contains__(self, k):
92 return super().__contains__(k.upper())
94 def __delitem__(self, k):
95 super().__delitem__(k.upper())
97 def __getitem__(self, k):
98 return super().__getitem__(k.upper())
100 def get(self, k, default=None):
101 return super().get(k.upper(), default)
103 def __setitem__(self, k, v):
104 super().__setitem__(k.upper(), v)
106 def update(self, data):
107 data = CaseInsensitiveDict(data)
108 super().update(data)
111class AbstractRedis:
112 pass
115class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
116 """
117 Implementation of the Redis protocol.
119 This abstract class provides a Python interface to all Redis commands
120 and an implementation of the Redis protocol.
122 Pipelines derive from this, implementing how
123 the commands are sent and received to the Redis server. Based on
124 configuration, an instance will either use a ConnectionPool, or
125 Connection object to talk to redis.
127 It is not safe to pass PubSub or Pipeline objects between threads.
128 """
130 @classmethod
131 def from_url(cls, url: str, **kwargs) -> "Redis":
132 """
133 Return a Redis client object configured from the given URL
135 For example::
137 redis://[[username]:[password]]@localhost:6379/0
138 rediss://[[username]:[password]]@localhost:6379/0
139 unix://[username@]/path/to/socket.sock?db=0[&password=password]
141 Three URL schemes are supported:
143 - `redis://` creates a TCP socket connection. See more at:
144 <https://www.iana.org/assignments/uri-schemes/prov/redis>
145 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
146 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
147 - ``unix://``: creates a Unix Domain Socket connection.
149 The username, password, hostname, path and all querystring values
150 are passed through urllib.parse.unquote in order to replace any
151 percent-encoded values with their corresponding characters.
153 There are several ways to specify a database number. The first value
154 found will be used:
156 1. A ``db`` querystring option, e.g. redis://localhost?db=0
157 2. If using the redis:// or rediss:// schemes, the path argument
158 of the url, e.g. redis://localhost/0
159 3. A ``db`` keyword argument to this function.
161 If none of these options are specified, the default db=0 is used.
163 All querystring options are cast to their appropriate Python types.
164 Boolean arguments can be specified with string values "True"/"False"
165 or "Yes"/"No". Values that cannot be properly cast cause a
166 ``ValueError`` to be raised. Once parsed, the querystring arguments
167 and keyword arguments are passed to the ``ConnectionPool``'s
168 class initializer. In the case of conflicting arguments, querystring
169 arguments always win.
171 """
172 single_connection_client = kwargs.pop("single_connection_client", False)
173 connection_pool = ConnectionPool.from_url(url, **kwargs)
174 client = cls(
175 connection_pool=connection_pool,
176 single_connection_client=single_connection_client,
177 )
178 client.auto_close_connection_pool = True
179 return client
181 @classmethod
182 def from_pool(
183 cls: Type["Redis"],
184 connection_pool: ConnectionPool,
185 ) -> "Redis":
186 """
187 Return a Redis client from the given connection pool.
188 The Redis client will take ownership of the connection pool and
189 close it when the Redis client is closed.
190 """
191 client = cls(
192 connection_pool=connection_pool,
193 )
194 client.auto_close_connection_pool = True
195 return client
197 @deprecated_args(
198 args_to_warn=["retry_on_timeout"],
199 reason="TimeoutError is included by default.",
200 version="6.0.0",
201 )
202 def __init__(
203 self,
204 host: str = "localhost",
205 port: int = 6379,
206 db: int = 0,
207 password: Optional[str] = None,
208 socket_timeout: Optional[float] = None,
209 socket_connect_timeout: Optional[float] = None,
210 socket_keepalive: Optional[bool] = None,
211 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
212 connection_pool: Optional[ConnectionPool] = None,
213 unix_socket_path: Optional[str] = None,
214 encoding: str = "utf-8",
215 encoding_errors: str = "strict",
216 decode_responses: bool = False,
217 retry_on_timeout: bool = False,
218 retry: Retry = Retry(
219 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
220 ),
221 retry_on_error: Optional[List[Type[Exception]]] = None,
222 ssl: bool = False,
223 ssl_keyfile: Optional[str] = None,
224 ssl_certfile: Optional[str] = None,
225 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
226 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
227 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None,
228 ssl_ca_certs: Optional[str] = None,
229 ssl_ca_path: Optional[str] = None,
230 ssl_ca_data: Optional[str] = None,
231 ssl_check_hostname: bool = True,
232 ssl_password: Optional[str] = None,
233 ssl_validate_ocsp: bool = False,
234 ssl_validate_ocsp_stapled: bool = False,
235 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
236 ssl_ocsp_expected_cert: Optional[str] = None,
237 ssl_min_version: Optional["ssl.TLSVersion"] = None,
238 ssl_ciphers: Optional[str] = None,
239 max_connections: Optional[int] = None,
240 single_connection_client: bool = False,
241 health_check_interval: int = 0,
242 client_name: Optional[str] = None,
243 lib_name: Optional[str] = "redis-py",
244 lib_version: Optional[str] = get_lib_version(),
245 username: Optional[str] = None,
246 redis_connect_func: Optional[Callable[[], None]] = None,
247 credential_provider: Optional[CredentialProvider] = None,
248 protocol: Optional[int] = 2,
249 cache: Optional[CacheInterface] = None,
250 cache_config: Optional[CacheConfig] = None,
251 event_dispatcher: Optional[EventDispatcher] = None,
252 maint_notifications_config: Optional[MaintNotificationsConfig] = None,
253 ) -> None:
254 """
255 Initialize a new Redis client.
257 To specify a retry policy for specific errors, you have two options:
259 1. Set the `retry_on_error` to a list of the error/s to retry on, and
260 you can also set `retry` to a valid `Retry` object(in case the default
261 one is not appropriate) - with this approach the retries will be triggered
262 on the default errors specified in the Retry object enriched with the
263 errors specified in `retry_on_error`.
265 2. Define a `Retry` object with configured 'supported_errors' and set
266 it to the `retry` parameter - with this approach you completely redefine
267 the errors on which retries will happen.
269 `retry_on_timeout` is deprecated - please include the TimeoutError
270 either in the Retry object or in the `retry_on_error` list.
272 When 'connection_pool' is provided - the retry configuration of the
273 provided pool will be used.
275 Args:
277 single_connection_client:
278 if `True`, connection pool is not used. In that case `Redis`
279 instance use is not thread safe.
280 decode_responses:
281 if `True`, the response will be decoded to utf-8.
282 Argument is ignored when connection_pool is provided.
283 maint_notifications_config:
284 configuration the pool to support maintenance notifications - see
285 `redis.maint_notifications.MaintNotificationsConfig` for details.
286 Only supported with RESP3
287 If not provided and protocol is RESP3, the maintenance notifications
288 will be enabled by default (logic is included in the connection pool
289 initialization).
290 Argument is ignored when connection_pool is provided.
291 """
292 if event_dispatcher is None:
293 self._event_dispatcher = EventDispatcher()
294 else:
295 self._event_dispatcher = event_dispatcher
296 if not connection_pool:
297 if not retry_on_error:
298 retry_on_error = []
299 kwargs = {
300 "db": db,
301 "username": username,
302 "password": password,
303 "socket_timeout": socket_timeout,
304 "encoding": encoding,
305 "encoding_errors": encoding_errors,
306 "decode_responses": decode_responses,
307 "retry_on_error": retry_on_error,
308 "retry": copy.deepcopy(retry),
309 "max_connections": max_connections,
310 "health_check_interval": health_check_interval,
311 "client_name": client_name,
312 "lib_name": lib_name,
313 "lib_version": lib_version,
314 "redis_connect_func": redis_connect_func,
315 "credential_provider": credential_provider,
316 "protocol": protocol,
317 }
318 # based on input, setup appropriate connection args
319 if unix_socket_path is not None:
320 kwargs.update(
321 {
322 "path": unix_socket_path,
323 "connection_class": UnixDomainSocketConnection,
324 }
325 )
326 else:
327 # TCP specific options
328 kwargs.update(
329 {
330 "host": host,
331 "port": port,
332 "socket_connect_timeout": socket_connect_timeout,
333 "socket_keepalive": socket_keepalive,
334 "socket_keepalive_options": socket_keepalive_options,
335 }
336 )
338 if ssl:
339 kwargs.update(
340 {
341 "connection_class": SSLConnection,
342 "ssl_keyfile": ssl_keyfile,
343 "ssl_certfile": ssl_certfile,
344 "ssl_cert_reqs": ssl_cert_reqs,
345 "ssl_include_verify_flags": ssl_include_verify_flags,
346 "ssl_exclude_verify_flags": ssl_exclude_verify_flags,
347 "ssl_ca_certs": ssl_ca_certs,
348 "ssl_ca_data": ssl_ca_data,
349 "ssl_check_hostname": ssl_check_hostname,
350 "ssl_password": ssl_password,
351 "ssl_ca_path": ssl_ca_path,
352 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
353 "ssl_validate_ocsp": ssl_validate_ocsp,
354 "ssl_ocsp_context": ssl_ocsp_context,
355 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
356 "ssl_min_version": ssl_min_version,
357 "ssl_ciphers": ssl_ciphers,
358 }
359 )
360 if (cache_config or cache) and protocol in [3, "3"]:
361 kwargs.update(
362 {
363 "cache": cache,
364 "cache_config": cache_config,
365 }
366 )
367 maint_notifications_enabled = (
368 maint_notifications_config and maint_notifications_config.enabled
369 )
370 if maint_notifications_enabled and protocol not in [
371 3,
372 "3",
373 ]:
374 raise RedisError(
375 "Maintenance notifications handlers on connection are only supported with RESP version 3"
376 )
377 if maint_notifications_config:
378 kwargs.update(
379 {
380 "maint_notifications_config": maint_notifications_config,
381 }
382 )
383 connection_pool = ConnectionPool(**kwargs)
384 self._event_dispatcher.dispatch(
385 AfterPooledConnectionsInstantiationEvent(
386 [connection_pool], ClientType.SYNC, credential_provider
387 )
388 )
389 self.auto_close_connection_pool = True
390 else:
391 self.auto_close_connection_pool = False
392 self._event_dispatcher.dispatch(
393 AfterPooledConnectionsInstantiationEvent(
394 [connection_pool], ClientType.SYNC, credential_provider
395 )
396 )
398 self.connection_pool = connection_pool
400 if (cache_config or cache) and self.connection_pool.get_protocol() not in [
401 3,
402 "3",
403 ]:
404 raise RedisError("Client caching is only supported with RESP version 3")
406 self.single_connection_lock = threading.RLock()
407 self.connection = None
408 self._single_connection_client = single_connection_client
409 if self._single_connection_client:
410 self.connection = self.connection_pool.get_connection()
411 self._event_dispatcher.dispatch(
412 AfterSingleConnectionInstantiationEvent(
413 self.connection, ClientType.SYNC, self.single_connection_lock
414 )
415 )
417 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks)
419 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
420 self.response_callbacks.update(_RedisCallbacksRESP3)
421 else:
422 self.response_callbacks.update(_RedisCallbacksRESP2)
424 def __repr__(self) -> str:
425 return (
426 f"<{type(self).__module__}.{type(self).__name__}"
427 f"({repr(self.connection_pool)})>"
428 )
430 def get_encoder(self) -> "Encoder":
431 """Get the connection pool's encoder"""
432 return self.connection_pool.get_encoder()
434 def get_connection_kwargs(self) -> Dict:
435 """Get the connection's key-word arguments"""
436 return self.connection_pool.connection_kwargs
438 def get_retry(self) -> Optional[Retry]:
439 return self.get_connection_kwargs().get("retry")
441 def set_retry(self, retry: Retry) -> None:
442 self.get_connection_kwargs().update({"retry": retry})
443 self.connection_pool.set_retry(retry)
445 def set_response_callback(self, command: str, callback: Callable) -> None:
446 """Set a custom Response Callback"""
447 self.response_callbacks[command] = callback
449 def load_external_module(self, funcname, func) -> None:
450 """
451 This function can be used to add externally defined redis modules,
452 and their namespaces to the redis client.
454 funcname - A string containing the name of the function to create
455 func - The function, being added to this class.
457 ex: Assume that one has a custom redis module named foomod that
458 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
459 To load function functions into this namespace:
461 from redis import Redis
462 from foomodule import F
463 r = Redis()
464 r.load_external_module("foo", F)
465 r.foo().dothing('your', 'arguments')
467 For a concrete example see the reimport of the redisjson module in
468 tests/test_connection.py::test_loading_external_modules
469 """
470 setattr(self, funcname, func)
472 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
473 """
474 Return a new pipeline object that can queue multiple commands for
475 later execution. ``transaction`` indicates whether all commands
476 should be executed atomically. Apart from making a group of operations
477 atomic, pipelines are useful for reducing the back-and-forth overhead
478 between the client and server.
479 """
480 return Pipeline(
481 self.connection_pool, self.response_callbacks, transaction, shard_hint
482 )
484 def transaction(
485 self, func: Callable[["Pipeline"], None], *watches, **kwargs
486 ) -> Union[List[Any], Any, None]:
487 """
488 Convenience method for executing the callable `func` as a transaction
489 while watching all keys specified in `watches`. The 'func' callable
490 should expect a single argument which is a Pipeline object.
491 """
492 shard_hint = kwargs.pop("shard_hint", None)
493 value_from_callable = kwargs.pop("value_from_callable", False)
494 watch_delay = kwargs.pop("watch_delay", None)
495 with self.pipeline(True, shard_hint) as pipe:
496 while True:
497 try:
498 if watches:
499 pipe.watch(*watches)
500 func_value = func(pipe)
501 exec_value = pipe.execute()
502 return func_value if value_from_callable else exec_value
503 except WatchError:
504 if watch_delay is not None and watch_delay > 0:
505 time.sleep(watch_delay)
506 continue
508 def lock(
509 self,
510 name: str,
511 timeout: Optional[float] = None,
512 sleep: float = 0.1,
513 blocking: bool = True,
514 blocking_timeout: Optional[float] = None,
515 lock_class: Union[None, Any] = None,
516 thread_local: bool = True,
517 raise_on_release_error: bool = True,
518 ):
519 """
520 Return a new Lock object using key ``name`` that mimics
521 the behavior of threading.Lock.
523 If specified, ``timeout`` indicates a maximum life for the lock.
524 By default, it will remain locked until release() is called.
526 ``sleep`` indicates the amount of time to sleep per loop iteration
527 when the lock is in blocking mode and another client is currently
528 holding the lock.
530 ``blocking`` indicates whether calling ``acquire`` should block until
531 the lock has been acquired or to fail immediately, causing ``acquire``
532 to return False and the lock not being acquired. Defaults to True.
533 Note this value can be overridden by passing a ``blocking``
534 argument to ``acquire``.
536 ``blocking_timeout`` indicates the maximum amount of time in seconds to
537 spend trying to acquire the lock. A value of ``None`` indicates
538 continue trying forever. ``blocking_timeout`` can be specified as a
539 float or integer, both representing the number of seconds to wait.
541 ``lock_class`` forces the specified lock implementation. Note that as
542 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
543 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
544 you have created your own custom lock class.
546 ``thread_local`` indicates whether the lock token is placed in
547 thread-local storage. By default, the token is placed in thread local
548 storage so that a thread only sees its token, not a token set by
549 another thread. Consider the following timeline:
551 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
552 thread-1 sets the token to "abc"
553 time: 1, thread-2 blocks trying to acquire `my-lock` using the
554 Lock instance.
555 time: 5, thread-1 has not yet completed. redis expires the lock
556 key.
557 time: 5, thread-2 acquired `my-lock` now that it's available.
558 thread-2 sets the token to "xyz"
559 time: 6, thread-1 finishes its work and calls release(). if the
560 token is *not* stored in thread local storage, then
561 thread-1 would see the token value as "xyz" and would be
562 able to successfully release the thread-2's lock.
564 ``raise_on_release_error`` indicates whether to raise an exception when
565 the lock is no longer owned when exiting the context manager. By default,
566 this is True, meaning an exception will be raised. If False, the warning
567 will be logged and the exception will be suppressed.
569 In some use cases it's necessary to disable thread local storage. For
570 example, if you have code where one thread acquires a lock and passes
571 that lock instance to a worker thread to release later. If thread
572 local storage isn't disabled in this case, the worker thread won't see
573 the token set by the thread that acquired the lock. Our assumption
574 is that these cases aren't common and as such default to using
575 thread local storage."""
576 if lock_class is None:
577 lock_class = Lock
578 return lock_class(
579 self,
580 name,
581 timeout=timeout,
582 sleep=sleep,
583 blocking=blocking,
584 blocking_timeout=blocking_timeout,
585 thread_local=thread_local,
586 raise_on_release_error=raise_on_release_error,
587 )
589 def pubsub(self, **kwargs):
590 """
591 Return a Publish/Subscribe object. With this object, you can
592 subscribe to channels and listen for messages that get published to
593 them.
594 """
595 return PubSub(
596 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs
597 )
599 def monitor(self):
600 return Monitor(self.connection_pool)
602 def client(self):
603 return self.__class__(
604 connection_pool=self.connection_pool,
605 single_connection_client=True,
606 )
608 def __enter__(self):
609 return self
611 def __exit__(self, exc_type, exc_value, traceback):
612 self.close()
614 def __del__(self):
615 try:
616 self.close()
617 except Exception:
618 pass
620 def close(self) -> None:
621 # In case a connection property does not yet exist
622 # (due to a crash earlier in the Redis() constructor), return
623 # immediately as there is nothing to clean-up.
624 if not hasattr(self, "connection"):
625 return
627 conn = self.connection
628 if conn:
629 self.connection = None
630 self.connection_pool.release(conn)
632 if self.auto_close_connection_pool:
633 self.connection_pool.disconnect()
635 def _send_command_parse_response(self, conn, command_name, *args, **options):
636 """
637 Send a command and parse the response
638 """
639 conn.send_command(*args, **options)
640 return self.parse_response(conn, command_name, **options)
642 def _close_connection(self, conn) -> None:
643 """
644 Close the connection before retrying.
646 The supported exceptions are already checked in the
647 retry object so we don't need to do it here.
649 After we disconnect the connection, it will try to reconnect and
650 do a health check as part of the send_command logic(on connection level).
651 """
653 conn.disconnect()
655 # COMMAND EXECUTION AND PROTOCOL PARSING
656 def execute_command(self, *args, **options):
657 return self._execute_command(*args, **options)
659 def _execute_command(self, *args, **options):
660 """Execute a command and return a parsed response"""
661 pool = self.connection_pool
662 command_name = args[0]
663 conn = self.connection or pool.get_connection()
665 if self._single_connection_client:
666 self.single_connection_lock.acquire()
667 try:
668 return conn.retry.call_with_retry(
669 lambda: self._send_command_parse_response(
670 conn, command_name, *args, **options
671 ),
672 lambda _: self._close_connection(conn),
673 )
675 finally:
676 if conn and conn.should_reconnect():
677 self._close_connection(conn)
678 conn.connect()
679 if self._single_connection_client:
680 self.single_connection_lock.release()
681 if not self.connection:
682 pool.release(conn)
684 def parse_response(self, connection, command_name, **options):
685 """Parses a response from the Redis server"""
686 try:
687 if NEVER_DECODE in options:
688 response = connection.read_response(disable_decoding=True)
689 options.pop(NEVER_DECODE)
690 else:
691 response = connection.read_response()
692 except ResponseError:
693 if EMPTY_RESPONSE in options:
694 return options[EMPTY_RESPONSE]
695 raise
697 if EMPTY_RESPONSE in options:
698 options.pop(EMPTY_RESPONSE)
700 # Remove keys entry, it needs only for cache.
701 options.pop("keys", None)
703 if command_name in self.response_callbacks:
704 return self.response_callbacks[command_name](response, **options)
705 return response
707 def get_cache(self) -> Optional[CacheInterface]:
708 return self.connection_pool.cache
711StrictRedis = Redis
714class Monitor:
715 """
716 Monitor is useful for handling the MONITOR command to the redis server.
717 next_command() method returns one command from monitor
718 listen() method yields commands from monitor.
719 """
721 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)")
722 command_re = re.compile(r'"(.*?)(?<!\\)"')
724 def __init__(self, connection_pool):
725 self.connection_pool = connection_pool
726 self.connection = self.connection_pool.get_connection()
728 def __enter__(self):
729 self._start_monitor()
730 return self
732 def __exit__(self, *args):
733 self.connection.disconnect()
734 self.connection_pool.release(self.connection)
736 def next_command(self):
737 """Parse the response from a monitor command"""
738 response = self.connection.read_response()
740 if response is None:
741 return None
743 if isinstance(response, bytes):
744 response = self.connection.encoder.decode(response, force=True)
746 command_time, command_data = response.split(" ", 1)
747 m = self.monitor_re.match(command_data)
748 db_id, client_info, command = m.groups()
749 command = " ".join(self.command_re.findall(command))
750 # Redis escapes double quotes because each piece of the command
751 # string is surrounded by double quotes. We don't have that
752 # requirement so remove the escaping and leave the quote.
753 command = command.replace('\\"', '"')
755 if client_info == "lua":
756 client_address = "lua"
757 client_port = ""
758 client_type = "lua"
759 elif client_info.startswith("unix"):
760 client_address = "unix"
761 client_port = client_info[5:]
762 client_type = "unix"
763 else:
764 if client_info == "":
765 client_address = ""
766 client_port = ""
767 client_type = "unknown"
768 else:
769 # use rsplit as ipv6 addresses contain colons
770 client_address, client_port = client_info.rsplit(":", 1)
771 client_type = "tcp"
772 return {
773 "time": float(command_time),
774 "db": int(db_id),
775 "client_address": client_address,
776 "client_port": client_port,
777 "client_type": client_type,
778 "command": command,
779 }
781 def listen(self):
782 """Listen for commands coming to the server."""
783 while True:
784 yield self.next_command()
786 def _start_monitor(self):
787 self.connection.send_command("MONITOR")
788 # check that monitor returns 'OK', but don't return it to user
789 response = self.connection.read_response()
791 if not bool_ok(response):
792 raise RedisError(f"MONITOR failed: {response}")
795class PubSub:
796 """
797 PubSub provides publish, subscribe and listen support to Redis channels.
799 After subscribing to one or more channels, the listen() method will block
800 until a message arrives on one of the subscribed channels. That message
801 will be returned and it's safe to start listening again.
802 """
804 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage")
805 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe")
806 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
808 def __init__(
809 self,
810 connection_pool,
811 shard_hint=None,
812 ignore_subscribe_messages: bool = False,
813 encoder: Optional["Encoder"] = None,
814 push_handler_func: Union[None, Callable[[str], None]] = None,
815 event_dispatcher: Optional["EventDispatcher"] = None,
816 ):
817 self.connection_pool = connection_pool
818 self.shard_hint = shard_hint
819 self.ignore_subscribe_messages = ignore_subscribe_messages
820 self.connection = None
821 self.subscribed_event = threading.Event()
822 # we need to know the encoding options for this connection in order
823 # to lookup channel and pattern names for callback handlers.
824 self.encoder = encoder
825 self.push_handler_func = push_handler_func
826 if event_dispatcher is None:
827 self._event_dispatcher = EventDispatcher()
828 else:
829 self._event_dispatcher = event_dispatcher
831 self._lock = threading.RLock()
832 if self.encoder is None:
833 self.encoder = self.connection_pool.get_encoder()
834 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
835 if self.encoder.decode_responses:
836 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
837 else:
838 self.health_check_response = [b"pong", self.health_check_response_b]
839 if self.push_handler_func is None:
840 _set_info_logger()
841 self.reset()
843 def __enter__(self) -> "PubSub":
844 return self
846 def __exit__(self, exc_type, exc_value, traceback) -> None:
847 self.reset()
849 def __del__(self) -> None:
850 try:
851 # if this object went out of scope prior to shutting down
852 # subscriptions, close the connection manually before
853 # returning it to the connection pool
854 self.reset()
855 except Exception:
856 pass
858 def reset(self) -> None:
859 if self.connection:
860 self.connection.disconnect()
861 self.connection.deregister_connect_callback(self.on_connect)
862 self.connection_pool.release(self.connection)
863 self.connection = None
864 self.health_check_response_counter = 0
865 self.channels = {}
866 self.pending_unsubscribe_channels = set()
867 self.shard_channels = {}
868 self.pending_unsubscribe_shard_channels = set()
869 self.patterns = {}
870 self.pending_unsubscribe_patterns = set()
871 self.subscribed_event.clear()
873 def close(self) -> None:
874 self.reset()
876 def on_connect(self, connection) -> None:
877 "Re-subscribe to any channels and patterns previously subscribed to"
878 # NOTE: for python3, we can't pass bytestrings as keyword arguments
879 # so we need to decode channel/pattern names back to unicode strings
880 # before passing them to [p]subscribe.
881 self.pending_unsubscribe_channels.clear()
882 self.pending_unsubscribe_patterns.clear()
883 self.pending_unsubscribe_shard_channels.clear()
884 if self.channels:
885 channels = {
886 self.encoder.decode(k, force=True): v for k, v in self.channels.items()
887 }
888 self.subscribe(**channels)
889 if self.patterns:
890 patterns = {
891 self.encoder.decode(k, force=True): v for k, v in self.patterns.items()
892 }
893 self.psubscribe(**patterns)
894 if self.shard_channels:
895 shard_channels = {
896 self.encoder.decode(k, force=True): v
897 for k, v in self.shard_channels.items()
898 }
899 self.ssubscribe(**shard_channels)
901 @property
902 def subscribed(self) -> bool:
903 """Indicates if there are subscriptions to any channels or patterns"""
904 return self.subscribed_event.is_set()
906 def execute_command(self, *args):
907 """Execute a publish/subscribe command"""
909 # NOTE: don't parse the response in this function -- it could pull a
910 # legitimate message off the stack if the connection is already
911 # subscribed to one or more channels
913 if self.connection is None:
914 self.connection = self.connection_pool.get_connection()
915 # register a callback that re-subscribes to any channels we
916 # were listening to when we were disconnected
917 self.connection.register_connect_callback(self.on_connect)
918 if self.push_handler_func is not None:
919 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
920 self._event_dispatcher.dispatch(
921 AfterPubSubConnectionInstantiationEvent(
922 self.connection, self.connection_pool, ClientType.SYNC, self._lock
923 )
924 )
925 connection = self.connection
926 kwargs = {"check_health": not self.subscribed}
927 if not self.subscribed:
928 self.clean_health_check_responses()
929 with self._lock:
930 self._execute(connection, connection.send_command, *args, **kwargs)
932 def clean_health_check_responses(self) -> None:
933 """
934 If any health check responses are present, clean them
935 """
936 ttl = 10
937 conn = self.connection
938 while conn and self.health_check_response_counter > 0 and ttl > 0:
939 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
940 response = self._execute(conn, conn.read_response)
941 if self.is_health_check_response(response):
942 self.health_check_response_counter -= 1
943 else:
944 raise PubSubError(
945 "A non health check response was cleaned by "
946 "execute_command: {}".format(response)
947 )
948 ttl -= 1
950 def _reconnect(self, conn) -> None:
951 """
952 The supported exceptions are already checked in the
953 retry object so we don't need to do it here.
955 In this error handler we are trying to reconnect to the server.
956 """
957 conn.disconnect()
958 conn.connect()
960 def _execute(self, conn, command, *args, **kwargs):
961 """
962 Connect manually upon disconnection. If the Redis server is down,
963 this will fail and raise a ConnectionError as desired.
964 After reconnection, the ``on_connect`` callback should have been
965 called by the # connection to resubscribe us to any channels and
966 patterns we were previously listening to
967 """
969 if conn.should_reconnect():
970 self._reconnect(conn)
972 response = conn.retry.call_with_retry(
973 lambda: command(*args, **kwargs),
974 lambda _: self._reconnect(conn),
975 )
977 return response
979 def parse_response(self, block=True, timeout=0):
980 """Parse the response from a publish/subscribe command"""
981 conn = self.connection
982 if conn is None:
983 raise RuntimeError(
984 "pubsub connection not set: "
985 "did you forget to call subscribe() or psubscribe()?"
986 )
988 self.check_health()
990 def try_read():
991 if not block:
992 if not conn.can_read(timeout=timeout):
993 return None
994 else:
995 conn.connect()
996 return conn.read_response(disconnect_on_error=False, push_request=True)
998 response = self._execute(conn, try_read)
1000 if self.is_health_check_response(response):
1001 # ignore the health check message as user might not expect it
1002 self.health_check_response_counter -= 1
1003 return None
1004 return response
1006 def is_health_check_response(self, response) -> bool:
1007 """
1008 Check if the response is a health check response.
1009 If there are no subscriptions redis responds to PING command with a
1010 bulk response, instead of a multi-bulk with "pong" and the response.
1011 """
1012 return response in [
1013 self.health_check_response, # If there was a subscription
1014 self.health_check_response_b, # If there wasn't
1015 ]
1017 def check_health(self) -> None:
1018 conn = self.connection
1019 if conn is None:
1020 raise RuntimeError(
1021 "pubsub connection not set: "
1022 "did you forget to call subscribe() or psubscribe()?"
1023 )
1025 if conn.health_check_interval and time.monotonic() > conn.next_health_check:
1026 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
1027 self.health_check_response_counter += 1
1029 def _normalize_keys(self, data) -> Dict:
1030 """
1031 normalize channel/pattern names to be either bytes or strings
1032 based on whether responses are automatically decoded. this saves us
1033 from coercing the value for each message coming in.
1034 """
1035 encode = self.encoder.encode
1036 decode = self.encoder.decode
1037 return {decode(encode(k)): v for k, v in data.items()}
1039 def psubscribe(self, *args, **kwargs):
1040 """
1041 Subscribe to channel patterns. Patterns supplied as keyword arguments
1042 expect a pattern name as the key and a callable as the value. A
1043 pattern's callable will be invoked automatically when a message is
1044 received on that pattern rather than producing a message via
1045 ``listen()``.
1046 """
1047 if args:
1048 args = list_or_args(args[0], args[1:])
1049 new_patterns = dict.fromkeys(args)
1050 new_patterns.update(kwargs)
1051 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
1052 # update the patterns dict AFTER we send the command. we don't want to
1053 # subscribe twice to these patterns, once for the command and again
1054 # for the reconnection.
1055 new_patterns = self._normalize_keys(new_patterns)
1056 self.patterns.update(new_patterns)
1057 if not self.subscribed:
1058 # Set the subscribed_event flag to True
1059 self.subscribed_event.set()
1060 # Clear the health check counter
1061 self.health_check_response_counter = 0
1062 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1063 return ret_val
1065 def punsubscribe(self, *args):
1066 """
1067 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1068 all patterns.
1069 """
1070 if args:
1071 args = list_or_args(args[0], args[1:])
1072 patterns = self._normalize_keys(dict.fromkeys(args))
1073 else:
1074 patterns = self.patterns
1075 self.pending_unsubscribe_patterns.update(patterns)
1076 return self.execute_command("PUNSUBSCRIBE", *args)
1078 def subscribe(self, *args, **kwargs):
1079 """
1080 Subscribe to channels. Channels supplied as keyword arguments expect
1081 a channel name as the key and a callable as the value. A channel's
1082 callable will be invoked automatically when a message is received on
1083 that channel rather than producing a message via ``listen()`` or
1084 ``get_message()``.
1085 """
1086 if args:
1087 args = list_or_args(args[0], args[1:])
1088 new_channels = dict.fromkeys(args)
1089 new_channels.update(kwargs)
1090 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1091 # update the channels dict AFTER we send the command. we don't want to
1092 # subscribe twice to these channels, once for the command and again
1093 # for the reconnection.
1094 new_channels = self._normalize_keys(new_channels)
1095 self.channels.update(new_channels)
1096 if not self.subscribed:
1097 # Set the subscribed_event flag to True
1098 self.subscribed_event.set()
1099 # Clear the health check counter
1100 self.health_check_response_counter = 0
1101 self.pending_unsubscribe_channels.difference_update(new_channels)
1102 return ret_val
1104 def unsubscribe(self, *args):
1105 """
1106 Unsubscribe from the supplied channels. If empty, unsubscribe from
1107 all channels
1108 """
1109 if args:
1110 args = list_or_args(args[0], args[1:])
1111 channels = self._normalize_keys(dict.fromkeys(args))
1112 else:
1113 channels = self.channels
1114 self.pending_unsubscribe_channels.update(channels)
1115 return self.execute_command("UNSUBSCRIBE", *args)
1117 def ssubscribe(self, *args, target_node=None, **kwargs):
1118 """
1119 Subscribes the client to the specified shard channels.
1120 Channels supplied as keyword arguments expect a channel name as the key
1121 and a callable as the value. A channel's callable will be invoked automatically
1122 when a message is received on that channel rather than producing a message via
1123 ``listen()`` or ``get_sharded_message()``.
1124 """
1125 if args:
1126 args = list_or_args(args[0], args[1:])
1127 new_s_channels = dict.fromkeys(args)
1128 new_s_channels.update(kwargs)
1129 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys())
1130 # update the s_channels dict AFTER we send the command. we don't want to
1131 # subscribe twice to these channels, once for the command and again
1132 # for the reconnection.
1133 new_s_channels = self._normalize_keys(new_s_channels)
1134 self.shard_channels.update(new_s_channels)
1135 if not self.subscribed:
1136 # Set the subscribed_event flag to True
1137 self.subscribed_event.set()
1138 # Clear the health check counter
1139 self.health_check_response_counter = 0
1140 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels)
1141 return ret_val
1143 def sunsubscribe(self, *args, target_node=None):
1144 """
1145 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
1146 all shard_channels
1147 """
1148 if args:
1149 args = list_or_args(args[0], args[1:])
1150 s_channels = self._normalize_keys(dict.fromkeys(args))
1151 else:
1152 s_channels = self.shard_channels
1153 self.pending_unsubscribe_shard_channels.update(s_channels)
1154 return self.execute_command("SUNSUBSCRIBE", *args)
1156 def listen(self):
1157 "Listen for messages on channels this client has been subscribed to"
1158 while self.subscribed:
1159 response = self.handle_message(self.parse_response(block=True))
1160 if response is not None:
1161 yield response
1163 def get_message(
1164 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
1165 ):
1166 """
1167 Get the next message if one is available, otherwise None.
1169 If timeout is specified, the system will wait for `timeout` seconds
1170 before returning. Timeout should be specified as a floating point
1171 number, or None, to wait indefinitely.
1172 """
1173 if not self.subscribed:
1174 # Wait for subscription
1175 start_time = time.monotonic()
1176 if self.subscribed_event.wait(timeout) is True:
1177 # The connection was subscribed during the timeout time frame.
1178 # The timeout should be adjusted based on the time spent
1179 # waiting for the subscription
1180 time_spent = time.monotonic() - start_time
1181 timeout = max(0.0, timeout - time_spent)
1182 else:
1183 # The connection isn't subscribed to any channels or patterns,
1184 # so no messages are available
1185 return None
1187 response = self.parse_response(block=(timeout is None), timeout=timeout)
1189 if response:
1190 return self.handle_message(response, ignore_subscribe_messages)
1191 return None
1193 get_sharded_message = get_message
1195 def ping(self, message: Union[str, None] = None) -> bool:
1196 """
1197 Ping the Redis server to test connectivity.
1199 Sends a PING command to the Redis server and returns True if the server
1200 responds with "PONG".
1201 """
1202 args = ["PING", message] if message is not None else ["PING"]
1203 return self.execute_command(*args)
1205 def handle_message(self, response, ignore_subscribe_messages=False):
1206 """
1207 Parses a pub/sub message. If the channel or pattern was subscribed to
1208 with a message handler, the handler is invoked instead of a parsed
1209 message being returned.
1210 """
1211 if response is None:
1212 return None
1213 if isinstance(response, bytes):
1214 response = [b"pong", response] if response != b"PONG" else [b"pong", b""]
1216 message_type = str_if_bytes(response[0])
1217 if message_type == "pmessage":
1218 message = {
1219 "type": message_type,
1220 "pattern": response[1],
1221 "channel": response[2],
1222 "data": response[3],
1223 }
1224 elif message_type == "pong":
1225 message = {
1226 "type": message_type,
1227 "pattern": None,
1228 "channel": None,
1229 "data": response[1],
1230 }
1231 else:
1232 message = {
1233 "type": message_type,
1234 "pattern": None,
1235 "channel": response[1],
1236 "data": response[2],
1237 }
1239 # if this is an unsubscribe message, remove it from memory
1240 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1241 if message_type == "punsubscribe":
1242 pattern = response[1]
1243 if pattern in self.pending_unsubscribe_patterns:
1244 self.pending_unsubscribe_patterns.remove(pattern)
1245 self.patterns.pop(pattern, None)
1246 elif message_type == "sunsubscribe":
1247 s_channel = response[1]
1248 if s_channel in self.pending_unsubscribe_shard_channels:
1249 self.pending_unsubscribe_shard_channels.remove(s_channel)
1250 self.shard_channels.pop(s_channel, None)
1251 else:
1252 channel = response[1]
1253 if channel in self.pending_unsubscribe_channels:
1254 self.pending_unsubscribe_channels.remove(channel)
1255 self.channels.pop(channel, None)
1256 if not self.channels and not self.patterns and not self.shard_channels:
1257 # There are no subscriptions anymore, set subscribed_event flag
1258 # to false
1259 self.subscribed_event.clear()
1261 if message_type in self.PUBLISH_MESSAGE_TYPES:
1262 # if there's a message handler, invoke it
1263 if message_type == "pmessage":
1264 handler = self.patterns.get(message["pattern"], None)
1265 elif message_type == "smessage":
1266 handler = self.shard_channels.get(message["channel"], None)
1267 else:
1268 handler = self.channels.get(message["channel"], None)
1269 if handler:
1270 handler(message)
1271 return None
1272 elif message_type != "pong":
1273 # this is a subscribe/unsubscribe message. ignore if we don't
1274 # want them
1275 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1276 return None
1278 return message
1280 def run_in_thread(
1281 self,
1282 sleep_time: float = 0.0,
1283 daemon: bool = False,
1284 exception_handler: Optional[Callable] = None,
1285 pubsub=None,
1286 sharded_pubsub: bool = False,
1287 ) -> "PubSubWorkerThread":
1288 for channel, handler in self.channels.items():
1289 if handler is None:
1290 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1291 for pattern, handler in self.patterns.items():
1292 if handler is None:
1293 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1294 for s_channel, handler in self.shard_channels.items():
1295 if handler is None:
1296 raise PubSubError(
1297 f"Shard Channel: '{s_channel}' has no handler registered"
1298 )
1300 pubsub = self if pubsub is None else pubsub
1301 thread = PubSubWorkerThread(
1302 pubsub,
1303 sleep_time,
1304 daemon=daemon,
1305 exception_handler=exception_handler,
1306 sharded_pubsub=sharded_pubsub,
1307 )
1308 thread.start()
1309 return thread
1312class PubSubWorkerThread(threading.Thread):
1313 def __init__(
1314 self,
1315 pubsub,
1316 sleep_time: float,
1317 daemon: bool = False,
1318 exception_handler: Union[
1319 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
1320 ] = None,
1321 sharded_pubsub: bool = False,
1322 ):
1323 super().__init__()
1324 self.daemon = daemon
1325 self.pubsub = pubsub
1326 self.sleep_time = sleep_time
1327 self.exception_handler = exception_handler
1328 self.sharded_pubsub = sharded_pubsub
1329 self._running = threading.Event()
1331 def run(self) -> None:
1332 if self._running.is_set():
1333 return
1334 self._running.set()
1335 pubsub = self.pubsub
1336 sleep_time = self.sleep_time
1337 while self._running.is_set():
1338 try:
1339 if not self.sharded_pubsub:
1340 pubsub.get_message(
1341 ignore_subscribe_messages=True, timeout=sleep_time
1342 )
1343 else:
1344 pubsub.get_sharded_message(
1345 ignore_subscribe_messages=True, timeout=sleep_time
1346 )
1347 except BaseException as e:
1348 if self.exception_handler is None:
1349 raise
1350 self.exception_handler(e, pubsub, self)
1351 pubsub.close()
1353 def stop(self) -> None:
1354 # trip the flag so the run loop exits. the run loop will
1355 # close the pubsub connection, which disconnects the socket
1356 # and returns the connection to the pool.
1357 self._running.clear()
1360class Pipeline(Redis):
1361 """
1362 Pipelines provide a way to transmit multiple commands to the Redis server
1363 in one transmission. This is convenient for batch processing, such as
1364 saving all the values in a list to Redis.
1366 All commands executed within a pipeline(when running in transactional mode,
1367 which is the default behavior) are wrapped with MULTI and EXEC
1368 calls. This guarantees all commands executed in the pipeline will be
1369 executed atomically.
1371 Any command raising an exception does *not* halt the execution of
1372 subsequent commands in the pipeline. Instead, the exception is caught
1373 and its instance is placed into the response list returned by execute().
1374 Code iterating over the response list should be able to deal with an
1375 instance of an exception as a potential value. In general, these will be
1376 ResponseError exceptions, such as those raised when issuing a command
1377 on a key of a different datatype.
1378 """
1380 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1382 def __init__(
1383 self,
1384 connection_pool: ConnectionPool,
1385 response_callbacks,
1386 transaction,
1387 shard_hint,
1388 ):
1389 self.connection_pool = connection_pool
1390 self.connection: Optional[Connection] = None
1391 self.response_callbacks = response_callbacks
1392 self.transaction = transaction
1393 self.shard_hint = shard_hint
1394 self.watching = False
1395 self.command_stack = []
1396 self.scripts: Set[Script] = set()
1397 self.explicit_transaction = False
1399 def __enter__(self) -> "Pipeline":
1400 return self
1402 def __exit__(self, exc_type, exc_value, traceback):
1403 self.reset()
1405 def __del__(self):
1406 try:
1407 self.reset()
1408 except Exception:
1409 pass
1411 def __len__(self) -> int:
1412 return len(self.command_stack)
1414 def __bool__(self) -> bool:
1415 """Pipeline instances should always evaluate to True"""
1416 return True
1418 def reset(self) -> None:
1419 self.command_stack = []
1420 self.scripts = set()
1421 # make sure to reset the connection state in the event that we were
1422 # watching something
1423 if self.watching and self.connection:
1424 try:
1425 # call this manually since our unwatch or
1426 # immediate_execute_command methods can call reset()
1427 self.connection.send_command("UNWATCH")
1428 self.connection.read_response()
1429 except ConnectionError:
1430 # disconnect will also remove any previous WATCHes
1431 self.connection.disconnect()
1432 # clean up the other instance attributes
1433 self.watching = False
1434 self.explicit_transaction = False
1436 # we can safely return the connection to the pool here since we're
1437 # sure we're no longer WATCHing anything
1438 if self.connection:
1439 self.connection_pool.release(self.connection)
1440 self.connection = None
1442 def close(self) -> None:
1443 """Close the pipeline"""
1444 self.reset()
1446 def multi(self) -> None:
1447 """
1448 Start a transactional block of the pipeline after WATCH commands
1449 are issued. End the transactional block with `execute`.
1450 """
1451 if self.explicit_transaction:
1452 raise RedisError("Cannot issue nested calls to MULTI")
1453 if self.command_stack:
1454 raise RedisError(
1455 "Commands without an initial WATCH have already been issued"
1456 )
1457 self.explicit_transaction = True
1459 def execute_command(self, *args, **kwargs):
1460 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1461 return self.immediate_execute_command(*args, **kwargs)
1462 return self.pipeline_execute_command(*args, **kwargs)
1464 def _disconnect_reset_raise_on_watching(
1465 self,
1466 conn: AbstractConnection,
1467 error: Exception,
1468 ) -> None:
1469 """
1470 Close the connection reset watching state and
1471 raise an exception if we were watching.
1473 The supported exceptions are already checked in the
1474 retry object so we don't need to do it here.
1476 After we disconnect the connection, it will try to reconnect and
1477 do a health check as part of the send_command logic(on connection level).
1478 """
1479 conn.disconnect()
1481 # if we were already watching a variable, the watch is no longer
1482 # valid since this connection has died. raise a WatchError, which
1483 # indicates the user should retry this transaction.
1484 if self.watching:
1485 self.reset()
1486 raise WatchError(
1487 f"A {type(error).__name__} occurred while watching one or more keys"
1488 )
1490 def immediate_execute_command(self, *args, **options):
1491 """
1492 Execute a command immediately, but don't auto-retry on the supported
1493 errors for retry if we're already WATCHing a variable.
1494 Used when issuing WATCH or subsequent commands retrieving their values but before
1495 MULTI is called.
1496 """
1497 command_name = args[0]
1498 conn = self.connection
1499 # if this is the first call, we need a connection
1500 if not conn:
1501 conn = self.connection_pool.get_connection()
1502 self.connection = conn
1504 return conn.retry.call_with_retry(
1505 lambda: self._send_command_parse_response(
1506 conn, command_name, *args, **options
1507 ),
1508 lambda error: self._disconnect_reset_raise_on_watching(conn, error),
1509 )
1511 def pipeline_execute_command(self, *args, **options) -> "Pipeline":
1512 """
1513 Stage a command to be executed when execute() is next called
1515 Returns the current Pipeline object back so commands can be
1516 chained together, such as:
1518 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1520 At some other point, you can then run: pipe.execute(),
1521 which will execute all commands queued in the pipe.
1522 """
1523 self.command_stack.append((args, options))
1524 return self
1526 def _execute_transaction(
1527 self, connection: Connection, commands, raise_on_error
1528 ) -> List:
1529 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1530 all_cmds = connection.pack_commands(
1531 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1532 )
1533 connection.send_packed_command(all_cmds)
1534 errors = []
1536 # parse off the response for MULTI
1537 # NOTE: we need to handle ResponseErrors here and continue
1538 # so that we read all the additional command messages from
1539 # the socket
1540 try:
1541 self.parse_response(connection, "_")
1542 except ResponseError as e:
1543 errors.append((0, e))
1545 # and all the other commands
1546 for i, command in enumerate(commands):
1547 if EMPTY_RESPONSE in command[1]:
1548 errors.append((i, command[1][EMPTY_RESPONSE]))
1549 else:
1550 try:
1551 self.parse_response(connection, "_")
1552 except ResponseError as e:
1553 self.annotate_exception(e, i + 1, command[0])
1554 errors.append((i, e))
1556 # parse the EXEC.
1557 try:
1558 response = self.parse_response(connection, "_")
1559 except ExecAbortError:
1560 if errors:
1561 raise errors[0][1]
1562 raise
1564 # EXEC clears any watched keys
1565 self.watching = False
1567 if response is None:
1568 raise WatchError("Watched variable changed.")
1570 # put any parse errors into the response
1571 for i, e in errors:
1572 response.insert(i, e)
1574 if len(response) != len(commands):
1575 self.connection.disconnect()
1576 raise ResponseError(
1577 "Wrong number of response items from pipeline execution"
1578 )
1580 # find any errors in the response and raise if necessary
1581 if raise_on_error:
1582 self.raise_first_error(commands, response)
1584 # We have to run response callbacks manually
1585 data = []
1586 for r, cmd in zip(response, commands):
1587 if not isinstance(r, Exception):
1588 args, options = cmd
1589 # Remove keys entry, it needs only for cache.
1590 options.pop("keys", None)
1591 command_name = args[0]
1592 if command_name in self.response_callbacks:
1593 r = self.response_callbacks[command_name](r, **options)
1594 data.append(r)
1596 return data
1598 def _execute_pipeline(self, connection, commands, raise_on_error):
1599 # build up all commands into a single request to increase network perf
1600 all_cmds = connection.pack_commands([args for args, _ in commands])
1601 connection.send_packed_command(all_cmds)
1603 responses = []
1604 for args, options in commands:
1605 try:
1606 responses.append(self.parse_response(connection, args[0], **options))
1607 except ResponseError as e:
1608 responses.append(e)
1610 if raise_on_error:
1611 self.raise_first_error(commands, responses)
1613 return responses
1615 def raise_first_error(self, commands, response):
1616 for i, r in enumerate(response):
1617 if isinstance(r, ResponseError):
1618 self.annotate_exception(r, i + 1, commands[i][0])
1619 raise r
1621 def annotate_exception(self, exception, number, command):
1622 cmd = " ".join(map(safe_str, command))
1623 msg = (
1624 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
1625 f"caused error: {exception.args[0]}"
1626 )
1627 exception.args = (msg,) + exception.args[1:]
1629 def parse_response(self, connection, command_name, **options):
1630 result = Redis.parse_response(self, connection, command_name, **options)
1631 if command_name in self.UNWATCH_COMMANDS:
1632 self.watching = False
1633 elif command_name == "WATCH":
1634 self.watching = True
1635 return result
1637 def load_scripts(self):
1638 # make sure all scripts that are about to be run on this pipeline exist
1639 scripts = list(self.scripts)
1640 immediate = self.immediate_execute_command
1641 shas = [s.sha for s in scripts]
1642 # we can't use the normal script_* methods because they would just
1643 # get buffered in the pipeline.
1644 exists = immediate("SCRIPT EXISTS", *shas)
1645 if not all(exists):
1646 for s, exist in zip(scripts, exists):
1647 if not exist:
1648 s.sha = immediate("SCRIPT LOAD", s.script)
1650 def _disconnect_raise_on_watching(
1651 self,
1652 conn: AbstractConnection,
1653 error: Exception,
1654 ) -> None:
1655 """
1656 Close the connection, raise an exception if we were watching.
1658 The supported exceptions are already checked in the
1659 retry object so we don't need to do it here.
1661 After we disconnect the connection, it will try to reconnect and
1662 do a health check as part of the send_command logic(on connection level).
1663 """
1664 conn.disconnect()
1665 # if we were watching a variable, the watch is no longer valid
1666 # since this connection has died. raise a WatchError, which
1667 # indicates the user should retry this transaction.
1668 if self.watching:
1669 raise WatchError(
1670 f"A {type(error).__name__} occurred while watching one or more keys"
1671 )
1673 def execute(self, raise_on_error: bool = True) -> List[Any]:
1674 """Execute all the commands in the current pipeline"""
1675 stack = self.command_stack
1676 if not stack and not self.watching:
1677 return []
1678 if self.scripts:
1679 self.load_scripts()
1680 if self.transaction or self.explicit_transaction:
1681 execute = self._execute_transaction
1682 else:
1683 execute = self._execute_pipeline
1685 conn = self.connection
1686 if not conn:
1687 conn = self.connection_pool.get_connection()
1688 # assign to self.connection so reset() releases the connection
1689 # back to the pool after we're done
1690 self.connection = conn
1692 try:
1693 return conn.retry.call_with_retry(
1694 lambda: execute(conn, stack, raise_on_error),
1695 lambda error: self._disconnect_raise_on_watching(conn, error),
1696 )
1697 finally:
1698 # in reset() the connection is disconnected before returned to the pool if
1699 # it is marked for reconnect.
1700 self.reset()
1702 def discard(self):
1703 """
1704 Flushes all previously queued commands
1705 See: https://redis.io/commands/DISCARD
1706 """
1707 self.execute_command("DISCARD")
1709 def watch(self, *names):
1710 """Watches the values at keys ``names``"""
1711 if self.explicit_transaction:
1712 raise RedisError("Cannot issue a WATCH after a MULTI")
1713 return self.execute_command("WATCH", *names)
1715 def unwatch(self) -> bool:
1716 """Unwatches all previously specified keys"""
1717 return self.watching and self.execute_command("UNWATCH") or True