Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import re
3import threading
4import time
5from itertools import chain
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Callable,
10 Dict,
11 List,
12 Mapping,
13 Optional,
14 Set,
15 Type,
16 Union,
17)
19from redis._parsers.encoders import Encoder
20from redis._parsers.helpers import (
21 _RedisCallbacks,
22 _RedisCallbacksRESP2,
23 _RedisCallbacksRESP3,
24 bool_ok,
25)
26from redis.backoff import ExponentialWithJitterBackoff
27from redis.cache import CacheConfig, CacheInterface
28from redis.commands import (
29 CoreCommands,
30 RedisModuleCommands,
31 SentinelCommands,
32 list_or_args,
33)
34from redis.commands.core import Script
35from redis.connection import (
36 AbstractConnection,
37 Connection,
38 ConnectionPool,
39 SSLConnection,
40 UnixDomainSocketConnection,
41)
42from redis.credentials import CredentialProvider
43from redis.event import (
44 AfterPooledConnectionsInstantiationEvent,
45 AfterPubSubConnectionInstantiationEvent,
46 AfterSingleConnectionInstantiationEvent,
47 ClientType,
48 EventDispatcher,
49)
50from redis.exceptions import (
51 ConnectionError,
52 ExecAbortError,
53 PubSubError,
54 RedisError,
55 ResponseError,
56 WatchError,
57)
58from redis.lock import Lock
59from redis.maintenance_events import (
60 MaintenanceEventPoolHandler,
61 MaintenanceEventsConfig,
62)
63from redis.retry import Retry
64from redis.utils import (
65 _set_info_logger,
66 deprecated_args,
67 get_lib_version,
68 safe_str,
69 str_if_bytes,
70 truncate_text,
71)
73if TYPE_CHECKING:
74 import ssl
76 import OpenSSL
78SYM_EMPTY = b""
79EMPTY_RESPONSE = "EMPTY_RESPONSE"
81# some responses (ie. dump) are binary, and just meant to never be decoded
82NEVER_DECODE = "NEVER_DECODE"
85class CaseInsensitiveDict(dict):
86 "Case insensitive dict implementation. Assumes string keys only."
88 def __init__(self, data: Dict[str, str]) -> None:
89 for k, v in data.items():
90 self[k.upper()] = v
92 def __contains__(self, k):
93 return super().__contains__(k.upper())
95 def __delitem__(self, k):
96 super().__delitem__(k.upper())
98 def __getitem__(self, k):
99 return super().__getitem__(k.upper())
101 def get(self, k, default=None):
102 return super().get(k.upper(), default)
104 def __setitem__(self, k, v):
105 super().__setitem__(k.upper(), v)
107 def update(self, data):
108 data = CaseInsensitiveDict(data)
109 super().update(data)
112class AbstractRedis:
113 pass
116class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
117 """
118 Implementation of the Redis protocol.
120 This abstract class provides a Python interface to all Redis commands
121 and an implementation of the Redis protocol.
123 Pipelines derive from this, implementing how
124 the commands are sent and received to the Redis server. Based on
125 configuration, an instance will either use a ConnectionPool, or
126 Connection object to talk to redis.
128 It is not safe to pass PubSub or Pipeline objects between threads.
129 """
131 @classmethod
132 def from_url(cls, url: str, **kwargs) -> "Redis":
133 """
134 Return a Redis client object configured from the given URL
136 For example::
138 redis://[[username]:[password]]@localhost:6379/0
139 rediss://[[username]:[password]]@localhost:6379/0
140 unix://[username@]/path/to/socket.sock?db=0[&password=password]
142 Three URL schemes are supported:
144 - `redis://` creates a TCP socket connection. See more at:
145 <https://www.iana.org/assignments/uri-schemes/prov/redis>
146 - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
147 <https://www.iana.org/assignments/uri-schemes/prov/rediss>
148 - ``unix://``: creates a Unix Domain Socket connection.
150 The username, password, hostname, path and all querystring values
151 are passed through urllib.parse.unquote in order to replace any
152 percent-encoded values with their corresponding characters.
154 There are several ways to specify a database number. The first value
155 found will be used:
157 1. A ``db`` querystring option, e.g. redis://localhost?db=0
158 2. If using the redis:// or rediss:// schemes, the path argument
159 of the url, e.g. redis://localhost/0
160 3. A ``db`` keyword argument to this function.
162 If none of these options are specified, the default db=0 is used.
164 All querystring options are cast to their appropriate Python types.
165 Boolean arguments can be specified with string values "True"/"False"
166 or "Yes"/"No". Values that cannot be properly cast cause a
167 ``ValueError`` to be raised. Once parsed, the querystring arguments
168 and keyword arguments are passed to the ``ConnectionPool``'s
169 class initializer. In the case of conflicting arguments, querystring
170 arguments always win.
172 """
173 single_connection_client = kwargs.pop("single_connection_client", False)
174 connection_pool = ConnectionPool.from_url(url, **kwargs)
175 client = cls(
176 connection_pool=connection_pool,
177 single_connection_client=single_connection_client,
178 )
179 client.auto_close_connection_pool = True
180 return client
182 @classmethod
183 def from_pool(
184 cls: Type["Redis"],
185 connection_pool: ConnectionPool,
186 ) -> "Redis":
187 """
188 Return a Redis client from the given connection pool.
189 The Redis client will take ownership of the connection pool and
190 close it when the Redis client is closed.
191 """
192 client = cls(
193 connection_pool=connection_pool,
194 )
195 client.auto_close_connection_pool = True
196 return client
198 @deprecated_args(
199 args_to_warn=["retry_on_timeout"],
200 reason="TimeoutError is included by default.",
201 version="6.0.0",
202 )
203 def __init__(
204 self,
205 host: str = "localhost",
206 port: int = 6379,
207 db: int = 0,
208 password: Optional[str] = None,
209 socket_timeout: Optional[float] = None,
210 socket_connect_timeout: Optional[float] = None,
211 socket_keepalive: Optional[bool] = None,
212 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
213 connection_pool: Optional[ConnectionPool] = None,
214 unix_socket_path: Optional[str] = None,
215 encoding: str = "utf-8",
216 encoding_errors: str = "strict",
217 decode_responses: bool = False,
218 retry_on_timeout: bool = False,
219 retry: Retry = Retry(
220 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
221 ),
222 retry_on_error: Optional[List[Type[Exception]]] = None,
223 ssl: bool = False,
224 ssl_keyfile: Optional[str] = None,
225 ssl_certfile: Optional[str] = None,
226 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
227 ssl_ca_certs: Optional[str] = None,
228 ssl_ca_path: Optional[str] = None,
229 ssl_ca_data: Optional[str] = None,
230 ssl_check_hostname: bool = True,
231 ssl_password: Optional[str] = None,
232 ssl_validate_ocsp: bool = False,
233 ssl_validate_ocsp_stapled: bool = False,
234 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
235 ssl_ocsp_expected_cert: Optional[str] = None,
236 ssl_min_version: Optional["ssl.TLSVersion"] = None,
237 ssl_ciphers: Optional[str] = None,
238 max_connections: Optional[int] = None,
239 single_connection_client: bool = False,
240 health_check_interval: int = 0,
241 client_name: Optional[str] = None,
242 lib_name: Optional[str] = "redis-py",
243 lib_version: Optional[str] = get_lib_version(),
244 username: Optional[str] = None,
245 redis_connect_func: Optional[Callable[[], None]] = None,
246 credential_provider: Optional[CredentialProvider] = None,
247 protocol: Optional[int] = 2,
248 cache: Optional[CacheInterface] = None,
249 cache_config: Optional[CacheConfig] = None,
250 event_dispatcher: Optional[EventDispatcher] = None,
251 maintenance_events_config: Optional[MaintenanceEventsConfig] = None,
252 ) -> None:
253 """
254 Initialize a new Redis client.
256 To specify a retry policy for specific errors, you have two options:
258 1. Set the `retry_on_error` to a list of the error/s to retry on, and
259 you can also set `retry` to a valid `Retry` object(in case the default
260 one is not appropriate) - with this approach the retries will be triggered
261 on the default errors specified in the Retry object enriched with the
262 errors specified in `retry_on_error`.
264 2. Define a `Retry` object with configured 'supported_errors' and set
265 it to the `retry` parameter - with this approach you completely redefine
266 the errors on which retries will happen.
268 `retry_on_timeout` is deprecated - please include the TimeoutError
269 either in the Retry object or in the `retry_on_error` list.
271 When 'connection_pool' is provided - the retry configuration of the
272 provided pool will be used.
274 Args:
276 single_connection_client:
277 if `True`, connection pool is not used. In that case `Redis`
278 instance use is not thread safe.
279 """
280 if event_dispatcher is None:
281 self._event_dispatcher = EventDispatcher()
282 else:
283 self._event_dispatcher = event_dispatcher
284 if not connection_pool:
285 if not retry_on_error:
286 retry_on_error = []
287 kwargs = {
288 "db": db,
289 "username": username,
290 "password": password,
291 "socket_timeout": socket_timeout,
292 "encoding": encoding,
293 "encoding_errors": encoding_errors,
294 "decode_responses": decode_responses,
295 "retry_on_error": retry_on_error,
296 "retry": copy.deepcopy(retry),
297 "max_connections": max_connections,
298 "health_check_interval": health_check_interval,
299 "client_name": client_name,
300 "lib_name": lib_name,
301 "lib_version": lib_version,
302 "redis_connect_func": redis_connect_func,
303 "credential_provider": credential_provider,
304 "protocol": protocol,
305 }
306 # based on input, setup appropriate connection args
307 if unix_socket_path is not None:
308 kwargs.update(
309 {
310 "path": unix_socket_path,
311 "connection_class": UnixDomainSocketConnection,
312 }
313 )
314 else:
315 # TCP specific options
316 kwargs.update(
317 {
318 "host": host,
319 "port": port,
320 "socket_connect_timeout": socket_connect_timeout,
321 "socket_keepalive": socket_keepalive,
322 "socket_keepalive_options": socket_keepalive_options,
323 }
324 )
326 if ssl:
327 kwargs.update(
328 {
329 "connection_class": SSLConnection,
330 "ssl_keyfile": ssl_keyfile,
331 "ssl_certfile": ssl_certfile,
332 "ssl_cert_reqs": ssl_cert_reqs,
333 "ssl_ca_certs": ssl_ca_certs,
334 "ssl_ca_data": ssl_ca_data,
335 "ssl_check_hostname": ssl_check_hostname,
336 "ssl_password": ssl_password,
337 "ssl_ca_path": ssl_ca_path,
338 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled,
339 "ssl_validate_ocsp": ssl_validate_ocsp,
340 "ssl_ocsp_context": ssl_ocsp_context,
341 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert,
342 "ssl_min_version": ssl_min_version,
343 "ssl_ciphers": ssl_ciphers,
344 }
345 )
346 if (cache_config or cache) and protocol in [3, "3"]:
347 kwargs.update(
348 {
349 "cache": cache,
350 "cache_config": cache_config,
351 }
352 )
353 connection_pool = ConnectionPool(**kwargs)
354 self._event_dispatcher.dispatch(
355 AfterPooledConnectionsInstantiationEvent(
356 [connection_pool], ClientType.SYNC, credential_provider
357 )
358 )
359 self.auto_close_connection_pool = True
360 else:
361 self.auto_close_connection_pool = False
362 self._event_dispatcher.dispatch(
363 AfterPooledConnectionsInstantiationEvent(
364 [connection_pool], ClientType.SYNC, credential_provider
365 )
366 )
368 self.connection_pool = connection_pool
370 if (cache_config or cache) and self.connection_pool.get_protocol() not in [
371 3,
372 "3",
373 ]:
374 raise RedisError("Client caching is only supported with RESP version 3")
376 if maintenance_events_config and self.connection_pool.get_protocol() not in [
377 3,
378 "3",
379 ]:
380 raise RedisError(
381 "Push handlers on connection are only supported with RESP version 3"
382 )
383 if maintenance_events_config and maintenance_events_config.enabled:
384 self.maintenance_events_pool_handler = MaintenanceEventPoolHandler(
385 self.connection_pool, maintenance_events_config
386 )
387 self.connection_pool.set_maintenance_events_pool_handler(
388 self.maintenance_events_pool_handler
389 )
390 else:
391 self.maintenance_events_pool_handler = None
393 self.single_connection_lock = threading.RLock()
394 self.connection = None
395 self._single_connection_client = single_connection_client
396 if self._single_connection_client:
397 self.connection = self.connection_pool.get_connection()
398 self._event_dispatcher.dispatch(
399 AfterSingleConnectionInstantiationEvent(
400 self.connection, ClientType.SYNC, self.single_connection_lock
401 )
402 )
404 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks)
406 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
407 self.response_callbacks.update(_RedisCallbacksRESP3)
408 else:
409 self.response_callbacks.update(_RedisCallbacksRESP2)
411 def __repr__(self) -> str:
412 return (
413 f"<{type(self).__module__}.{type(self).__name__}"
414 f"({repr(self.connection_pool)})>"
415 )
417 def get_encoder(self) -> "Encoder":
418 """Get the connection pool's encoder"""
419 return self.connection_pool.get_encoder()
421 def get_connection_kwargs(self) -> Dict:
422 """Get the connection's key-word arguments"""
423 return self.connection_pool.connection_kwargs
425 def get_retry(self) -> Optional[Retry]:
426 return self.get_connection_kwargs().get("retry")
428 def set_retry(self, retry: Retry) -> None:
429 self.get_connection_kwargs().update({"retry": retry})
430 self.connection_pool.set_retry(retry)
432 def set_response_callback(self, command: str, callback: Callable) -> None:
433 """Set a custom Response Callback"""
434 self.response_callbacks[command] = callback
436 def load_external_module(self, funcname, func) -> None:
437 """
438 This function can be used to add externally defined redis modules,
439 and their namespaces to the redis client.
441 funcname - A string containing the name of the function to create
442 func - The function, being added to this class.
444 ex: Assume that one has a custom redis module named foomod that
445 creates command named 'foo.dothing' and 'foo.anotherthing' in redis.
446 To load function functions into this namespace:
448 from redis import Redis
449 from foomodule import F
450 r = Redis()
451 r.load_external_module("foo", F)
452 r.foo().dothing('your', 'arguments')
454 For a concrete example see the reimport of the redisjson module in
455 tests/test_connection.py::test_loading_external_modules
456 """
457 setattr(self, funcname, func)
459 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline":
460 """
461 Return a new pipeline object that can queue multiple commands for
462 later execution. ``transaction`` indicates whether all commands
463 should be executed atomically. Apart from making a group of operations
464 atomic, pipelines are useful for reducing the back-and-forth overhead
465 between the client and server.
466 """
467 return Pipeline(
468 self.connection_pool, self.response_callbacks, transaction, shard_hint
469 )
471 def transaction(
472 self, func: Callable[["Pipeline"], None], *watches, **kwargs
473 ) -> Union[List[Any], Any, None]:
474 """
475 Convenience method for executing the callable `func` as a transaction
476 while watching all keys specified in `watches`. The 'func' callable
477 should expect a single argument which is a Pipeline object.
478 """
479 shard_hint = kwargs.pop("shard_hint", None)
480 value_from_callable = kwargs.pop("value_from_callable", False)
481 watch_delay = kwargs.pop("watch_delay", None)
482 with self.pipeline(True, shard_hint) as pipe:
483 while True:
484 try:
485 if watches:
486 pipe.watch(*watches)
487 func_value = func(pipe)
488 exec_value = pipe.execute()
489 return func_value if value_from_callable else exec_value
490 except WatchError:
491 if watch_delay is not None and watch_delay > 0:
492 time.sleep(watch_delay)
493 continue
495 def lock(
496 self,
497 name: str,
498 timeout: Optional[float] = None,
499 sleep: float = 0.1,
500 blocking: bool = True,
501 blocking_timeout: Optional[float] = None,
502 lock_class: Union[None, Any] = None,
503 thread_local: bool = True,
504 raise_on_release_error: bool = True,
505 ):
506 """
507 Return a new Lock object using key ``name`` that mimics
508 the behavior of threading.Lock.
510 If specified, ``timeout`` indicates a maximum life for the lock.
511 By default, it will remain locked until release() is called.
513 ``sleep`` indicates the amount of time to sleep per loop iteration
514 when the lock is in blocking mode and another client is currently
515 holding the lock.
517 ``blocking`` indicates whether calling ``acquire`` should block until
518 the lock has been acquired or to fail immediately, causing ``acquire``
519 to return False and the lock not being acquired. Defaults to True.
520 Note this value can be overridden by passing a ``blocking``
521 argument to ``acquire``.
523 ``blocking_timeout`` indicates the maximum amount of time in seconds to
524 spend trying to acquire the lock. A value of ``None`` indicates
525 continue trying forever. ``blocking_timeout`` can be specified as a
526 float or integer, both representing the number of seconds to wait.
528 ``lock_class`` forces the specified lock implementation. Note that as
529 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
530 a Lua-based lock). So, it's unlikely you'll need this parameter, unless
531 you have created your own custom lock class.
533 ``thread_local`` indicates whether the lock token is placed in
534 thread-local storage. By default, the token is placed in thread local
535 storage so that a thread only sees its token, not a token set by
536 another thread. Consider the following timeline:
538 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
539 thread-1 sets the token to "abc"
540 time: 1, thread-2 blocks trying to acquire `my-lock` using the
541 Lock instance.
542 time: 5, thread-1 has not yet completed. redis expires the lock
543 key.
544 time: 5, thread-2 acquired `my-lock` now that it's available.
545 thread-2 sets the token to "xyz"
546 time: 6, thread-1 finishes its work and calls release(). if the
547 token is *not* stored in thread local storage, then
548 thread-1 would see the token value as "xyz" and would be
549 able to successfully release the thread-2's lock.
551 ``raise_on_release_error`` indicates whether to raise an exception when
552 the lock is no longer owned when exiting the context manager. By default,
553 this is True, meaning an exception will be raised. If False, the warning
554 will be logged and the exception will be suppressed.
556 In some use cases it's necessary to disable thread local storage. For
557 example, if you have code where one thread acquires a lock and passes
558 that lock instance to a worker thread to release later. If thread
559 local storage isn't disabled in this case, the worker thread won't see
560 the token set by the thread that acquired the lock. Our assumption
561 is that these cases aren't common and as such default to using
562 thread local storage."""
563 if lock_class is None:
564 lock_class = Lock
565 return lock_class(
566 self,
567 name,
568 timeout=timeout,
569 sleep=sleep,
570 blocking=blocking,
571 blocking_timeout=blocking_timeout,
572 thread_local=thread_local,
573 raise_on_release_error=raise_on_release_error,
574 )
576 def pubsub(self, **kwargs):
577 """
578 Return a Publish/Subscribe object. With this object, you can
579 subscribe to channels and listen for messages that get published to
580 them.
581 """
582 return PubSub(
583 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs
584 )
586 def monitor(self):
587 return Monitor(self.connection_pool)
589 def client(self):
590 maintenance_events_config = (
591 None
592 if self.maintenance_events_pool_handler is None
593 else self.maintenance_events_pool_handler.config
594 )
595 return self.__class__(
596 connection_pool=self.connection_pool,
597 single_connection_client=True,
598 maintenance_events_config=maintenance_events_config,
599 )
601 def __enter__(self):
602 return self
604 def __exit__(self, exc_type, exc_value, traceback):
605 self.close()
607 def __del__(self):
608 try:
609 self.close()
610 except Exception:
611 pass
613 def close(self) -> None:
614 # In case a connection property does not yet exist
615 # (due to a crash earlier in the Redis() constructor), return
616 # immediately as there is nothing to clean-up.
617 if not hasattr(self, "connection"):
618 return
620 conn = self.connection
621 if conn:
622 self.connection = None
623 self.connection_pool.release(conn)
625 if self.auto_close_connection_pool:
626 self.connection_pool.disconnect()
628 def _send_command_parse_response(self, conn, command_name, *args, **options):
629 """
630 Send a command and parse the response
631 """
632 conn.send_command(*args, **options)
633 return self.parse_response(conn, command_name, **options)
635 def _close_connection(self, conn) -> None:
636 """
637 Close the connection before retrying.
639 The supported exceptions are already checked in the
640 retry object so we don't need to do it here.
642 After we disconnect the connection, it will try to reconnect and
643 do a health check as part of the send_command logic(on connection level).
644 """
646 conn.disconnect()
648 # COMMAND EXECUTION AND PROTOCOL PARSING
649 def execute_command(self, *args, **options):
650 return self._execute_command(*args, **options)
652 def _execute_command(self, *args, **options):
653 """Execute a command and return a parsed response"""
654 pool = self.connection_pool
655 command_name = args[0]
656 conn = self.connection or pool.get_connection()
658 if self._single_connection_client:
659 self.single_connection_lock.acquire()
660 try:
661 return conn.retry.call_with_retry(
662 lambda: self._send_command_parse_response(
663 conn, command_name, *args, **options
664 ),
665 lambda _: self._close_connection(conn),
666 )
668 finally:
669 if conn and conn.should_reconnect():
670 self._close_connection(conn)
671 conn.connect()
672 if self._single_connection_client:
673 self.single_connection_lock.release()
674 if not self.connection:
675 pool.release(conn)
677 def parse_response(self, connection, command_name, **options):
678 """Parses a response from the Redis server"""
679 try:
680 if NEVER_DECODE in options:
681 response = connection.read_response(disable_decoding=True)
682 options.pop(NEVER_DECODE)
683 else:
684 response = connection.read_response()
685 except ResponseError:
686 if EMPTY_RESPONSE in options:
687 return options[EMPTY_RESPONSE]
688 raise
690 if EMPTY_RESPONSE in options:
691 options.pop(EMPTY_RESPONSE)
693 # Remove keys entry, it needs only for cache.
694 options.pop("keys", None)
696 if command_name in self.response_callbacks:
697 return self.response_callbacks[command_name](response, **options)
698 return response
700 def get_cache(self) -> Optional[CacheInterface]:
701 return self.connection_pool.cache
704StrictRedis = Redis
707class Monitor:
708 """
709 Monitor is useful for handling the MONITOR command to the redis server.
710 next_command() method returns one command from monitor
711 listen() method yields commands from monitor.
712 """
714 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)")
715 command_re = re.compile(r'"(.*?)(?<!\\)"')
717 def __init__(self, connection_pool):
718 self.connection_pool = connection_pool
719 self.connection = self.connection_pool.get_connection()
721 def __enter__(self):
722 self._start_monitor()
723 return self
725 def __exit__(self, *args):
726 self.connection.disconnect()
727 self.connection_pool.release(self.connection)
729 def next_command(self):
730 """Parse the response from a monitor command"""
731 response = self.connection.read_response()
733 if response is None:
734 return None
736 if isinstance(response, bytes):
737 response = self.connection.encoder.decode(response, force=True)
739 command_time, command_data = response.split(" ", 1)
740 m = self.monitor_re.match(command_data)
741 db_id, client_info, command = m.groups()
742 command = " ".join(self.command_re.findall(command))
743 # Redis escapes double quotes because each piece of the command
744 # string is surrounded by double quotes. We don't have that
745 # requirement so remove the escaping and leave the quote.
746 command = command.replace('\\"', '"')
748 if client_info == "lua":
749 client_address = "lua"
750 client_port = ""
751 client_type = "lua"
752 elif client_info.startswith("unix"):
753 client_address = "unix"
754 client_port = client_info[5:]
755 client_type = "unix"
756 else:
757 # use rsplit as ipv6 addresses contain colons
758 client_address, client_port = client_info.rsplit(":", 1)
759 client_type = "tcp"
760 return {
761 "time": float(command_time),
762 "db": int(db_id),
763 "client_address": client_address,
764 "client_port": client_port,
765 "client_type": client_type,
766 "command": command,
767 }
769 def listen(self):
770 """Listen for commands coming to the server."""
771 while True:
772 yield self.next_command()
774 def _start_monitor(self):
775 self.connection.send_command("MONITOR")
776 # check that monitor returns 'OK', but don't return it to user
777 response = self.connection.read_response()
779 if not bool_ok(response):
780 raise RedisError(f"MONITOR failed: {response}")
783class PubSub:
784 """
785 PubSub provides publish, subscribe and listen support to Redis channels.
787 After subscribing to one or more channels, the listen() method will block
788 until a message arrives on one of the subscribed channels. That message
789 will be returned and it's safe to start listening again.
790 """
792 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage")
793 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe")
794 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
796 def __init__(
797 self,
798 connection_pool,
799 shard_hint=None,
800 ignore_subscribe_messages: bool = False,
801 encoder: Optional["Encoder"] = None,
802 push_handler_func: Union[None, Callable[[str], None]] = None,
803 event_dispatcher: Optional["EventDispatcher"] = None,
804 ):
805 self.connection_pool = connection_pool
806 self.shard_hint = shard_hint
807 self.ignore_subscribe_messages = ignore_subscribe_messages
808 self.connection = None
809 self.subscribed_event = threading.Event()
810 # we need to know the encoding options for this connection in order
811 # to lookup channel and pattern names for callback handlers.
812 self.encoder = encoder
813 self.push_handler_func = push_handler_func
814 if event_dispatcher is None:
815 self._event_dispatcher = EventDispatcher()
816 else:
817 self._event_dispatcher = event_dispatcher
819 self._lock = threading.RLock()
820 if self.encoder is None:
821 self.encoder = self.connection_pool.get_encoder()
822 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
823 if self.encoder.decode_responses:
824 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
825 else:
826 self.health_check_response = [b"pong", self.health_check_response_b]
827 if self.push_handler_func is None:
828 _set_info_logger()
829 self.reset()
831 def __enter__(self) -> "PubSub":
832 return self
834 def __exit__(self, exc_type, exc_value, traceback) -> None:
835 self.reset()
837 def __del__(self) -> None:
838 try:
839 # if this object went out of scope prior to shutting down
840 # subscriptions, close the connection manually before
841 # returning it to the connection pool
842 self.reset()
843 except Exception:
844 pass
846 def reset(self) -> None:
847 if self.connection:
848 self.connection.disconnect()
849 self.connection.deregister_connect_callback(self.on_connect)
850 self.connection_pool.release(self.connection)
851 self.connection = None
852 self.health_check_response_counter = 0
853 self.channels = {}
854 self.pending_unsubscribe_channels = set()
855 self.shard_channels = {}
856 self.pending_unsubscribe_shard_channels = set()
857 self.patterns = {}
858 self.pending_unsubscribe_patterns = set()
859 self.subscribed_event.clear()
861 def close(self) -> None:
862 self.reset()
864 def on_connect(self, connection) -> None:
865 "Re-subscribe to any channels and patterns previously subscribed to"
866 # NOTE: for python3, we can't pass bytestrings as keyword arguments
867 # so we need to decode channel/pattern names back to unicode strings
868 # before passing them to [p]subscribe.
869 self.pending_unsubscribe_channels.clear()
870 self.pending_unsubscribe_patterns.clear()
871 self.pending_unsubscribe_shard_channels.clear()
872 if self.channels:
873 channels = {
874 self.encoder.decode(k, force=True): v for k, v in self.channels.items()
875 }
876 self.subscribe(**channels)
877 if self.patterns:
878 patterns = {
879 self.encoder.decode(k, force=True): v for k, v in self.patterns.items()
880 }
881 self.psubscribe(**patterns)
882 if self.shard_channels:
883 shard_channels = {
884 self.encoder.decode(k, force=True): v
885 for k, v in self.shard_channels.items()
886 }
887 self.ssubscribe(**shard_channels)
889 @property
890 def subscribed(self) -> bool:
891 """Indicates if there are subscriptions to any channels or patterns"""
892 return self.subscribed_event.is_set()
894 def execute_command(self, *args):
895 """Execute a publish/subscribe command"""
897 # NOTE: don't parse the response in this function -- it could pull a
898 # legitimate message off the stack if the connection is already
899 # subscribed to one or more channels
901 if self.connection is None:
902 self.connection = self.connection_pool.get_connection()
903 # register a callback that re-subscribes to any channels we
904 # were listening to when we were disconnected
905 self.connection.register_connect_callback(self.on_connect)
906 if self.push_handler_func is not None:
907 self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
908 self._event_dispatcher.dispatch(
909 AfterPubSubConnectionInstantiationEvent(
910 self.connection, self.connection_pool, ClientType.SYNC, self._lock
911 )
912 )
913 connection = self.connection
914 kwargs = {"check_health": not self.subscribed}
915 if not self.subscribed:
916 self.clean_health_check_responses()
917 with self._lock:
918 self._execute(connection, connection.send_command, *args, **kwargs)
920 def clean_health_check_responses(self) -> None:
921 """
922 If any health check responses are present, clean them
923 """
924 ttl = 10
925 conn = self.connection
926 while conn and self.health_check_response_counter > 0 and ttl > 0:
927 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout):
928 response = self._execute(conn, conn.read_response)
929 if self.is_health_check_response(response):
930 self.health_check_response_counter -= 1
931 else:
932 raise PubSubError(
933 "A non health check response was cleaned by "
934 "execute_command: {}".format(response)
935 )
936 ttl -= 1
938 def _reconnect(self, conn) -> None:
939 """
940 The supported exceptions are already checked in the
941 retry object so we don't need to do it here.
943 In this error handler we are trying to reconnect to the server.
944 """
945 conn.disconnect()
946 conn.connect()
948 def _execute(self, conn, command, *args, **kwargs):
949 """
950 Connect manually upon disconnection. If the Redis server is down,
951 this will fail and raise a ConnectionError as desired.
952 After reconnection, the ``on_connect`` callback should have been
953 called by the # connection to resubscribe us to any channels and
954 patterns we were previously listening to
955 """
957 if conn.should_reconnect():
958 self._reconnect(conn)
960 response = conn.retry.call_with_retry(
961 lambda: command(*args, **kwargs),
962 lambda _: self._reconnect(conn),
963 )
965 return response
967 def parse_response(self, block=True, timeout=0):
968 """Parse the response from a publish/subscribe command"""
969 conn = self.connection
970 if conn is None:
971 raise RuntimeError(
972 "pubsub connection not set: "
973 "did you forget to call subscribe() or psubscribe()?"
974 )
976 self.check_health()
978 def try_read():
979 if not block:
980 if not conn.can_read(timeout=timeout):
981 return None
982 else:
983 conn.connect()
984 return conn.read_response(disconnect_on_error=False, push_request=True)
986 response = self._execute(conn, try_read)
988 if self.is_health_check_response(response):
989 # ignore the health check message as user might not expect it
990 self.health_check_response_counter -= 1
991 return None
992 return response
994 def is_health_check_response(self, response) -> bool:
995 """
996 Check if the response is a health check response.
997 If there are no subscriptions redis responds to PING command with a
998 bulk response, instead of a multi-bulk with "pong" and the response.
999 """
1000 return response in [
1001 self.health_check_response, # If there was a subscription
1002 self.health_check_response_b, # If there wasn't
1003 ]
1005 def check_health(self) -> None:
1006 conn = self.connection
1007 if conn is None:
1008 raise RuntimeError(
1009 "pubsub connection not set: "
1010 "did you forget to call subscribe() or psubscribe()?"
1011 )
1013 if conn.health_check_interval and time.monotonic() > conn.next_health_check:
1014 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
1015 self.health_check_response_counter += 1
1017 def _normalize_keys(self, data) -> Dict:
1018 """
1019 normalize channel/pattern names to be either bytes or strings
1020 based on whether responses are automatically decoded. this saves us
1021 from coercing the value for each message coming in.
1022 """
1023 encode = self.encoder.encode
1024 decode = self.encoder.decode
1025 return {decode(encode(k)): v for k, v in data.items()}
1027 def psubscribe(self, *args, **kwargs):
1028 """
1029 Subscribe to channel patterns. Patterns supplied as keyword arguments
1030 expect a pattern name as the key and a callable as the value. A
1031 pattern's callable will be invoked automatically when a message is
1032 received on that pattern rather than producing a message via
1033 ``listen()``.
1034 """
1035 if args:
1036 args = list_or_args(args[0], args[1:])
1037 new_patterns = dict.fromkeys(args)
1038 new_patterns.update(kwargs)
1039 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
1040 # update the patterns dict AFTER we send the command. we don't want to
1041 # subscribe twice to these patterns, once for the command and again
1042 # for the reconnection.
1043 new_patterns = self._normalize_keys(new_patterns)
1044 self.patterns.update(new_patterns)
1045 if not self.subscribed:
1046 # Set the subscribed_event flag to True
1047 self.subscribed_event.set()
1048 # Clear the health check counter
1049 self.health_check_response_counter = 0
1050 self.pending_unsubscribe_patterns.difference_update(new_patterns)
1051 return ret_val
1053 def punsubscribe(self, *args):
1054 """
1055 Unsubscribe from the supplied patterns. If empty, unsubscribe from
1056 all patterns.
1057 """
1058 if args:
1059 args = list_or_args(args[0], args[1:])
1060 patterns = self._normalize_keys(dict.fromkeys(args))
1061 else:
1062 patterns = self.patterns
1063 self.pending_unsubscribe_patterns.update(patterns)
1064 return self.execute_command("PUNSUBSCRIBE", *args)
1066 def subscribe(self, *args, **kwargs):
1067 """
1068 Subscribe to channels. Channels supplied as keyword arguments expect
1069 a channel name as the key and a callable as the value. A channel's
1070 callable will be invoked automatically when a message is received on
1071 that channel rather than producing a message via ``listen()`` or
1072 ``get_message()``.
1073 """
1074 if args:
1075 args = list_or_args(args[0], args[1:])
1076 new_channels = dict.fromkeys(args)
1077 new_channels.update(kwargs)
1078 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
1079 # update the channels dict AFTER we send the command. we don't want to
1080 # subscribe twice to these channels, once for the command and again
1081 # for the reconnection.
1082 new_channels = self._normalize_keys(new_channels)
1083 self.channels.update(new_channels)
1084 if not self.subscribed:
1085 # Set the subscribed_event flag to True
1086 self.subscribed_event.set()
1087 # Clear the health check counter
1088 self.health_check_response_counter = 0
1089 self.pending_unsubscribe_channels.difference_update(new_channels)
1090 return ret_val
1092 def unsubscribe(self, *args):
1093 """
1094 Unsubscribe from the supplied channels. If empty, unsubscribe from
1095 all channels
1096 """
1097 if args:
1098 args = list_or_args(args[0], args[1:])
1099 channels = self._normalize_keys(dict.fromkeys(args))
1100 else:
1101 channels = self.channels
1102 self.pending_unsubscribe_channels.update(channels)
1103 return self.execute_command("UNSUBSCRIBE", *args)
1105 def ssubscribe(self, *args, target_node=None, **kwargs):
1106 """
1107 Subscribes the client to the specified shard channels.
1108 Channels supplied as keyword arguments expect a channel name as the key
1109 and a callable as the value. A channel's callable will be invoked automatically
1110 when a message is received on that channel rather than producing a message via
1111 ``listen()`` or ``get_sharded_message()``.
1112 """
1113 if args:
1114 args = list_or_args(args[0], args[1:])
1115 new_s_channels = dict.fromkeys(args)
1116 new_s_channels.update(kwargs)
1117 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys())
1118 # update the s_channels dict AFTER we send the command. we don't want to
1119 # subscribe twice to these channels, once for the command and again
1120 # for the reconnection.
1121 new_s_channels = self._normalize_keys(new_s_channels)
1122 self.shard_channels.update(new_s_channels)
1123 if not self.subscribed:
1124 # Set the subscribed_event flag to True
1125 self.subscribed_event.set()
1126 # Clear the health check counter
1127 self.health_check_response_counter = 0
1128 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels)
1129 return ret_val
1131 def sunsubscribe(self, *args, target_node=None):
1132 """
1133 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
1134 all shard_channels
1135 """
1136 if args:
1137 args = list_or_args(args[0], args[1:])
1138 s_channels = self._normalize_keys(dict.fromkeys(args))
1139 else:
1140 s_channels = self.shard_channels
1141 self.pending_unsubscribe_shard_channels.update(s_channels)
1142 return self.execute_command("SUNSUBSCRIBE", *args)
1144 def listen(self):
1145 "Listen for messages on channels this client has been subscribed to"
1146 while self.subscribed:
1147 response = self.handle_message(self.parse_response(block=True))
1148 if response is not None:
1149 yield response
1151 def get_message(
1152 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
1153 ):
1154 """
1155 Get the next message if one is available, otherwise None.
1157 If timeout is specified, the system will wait for `timeout` seconds
1158 before returning. Timeout should be specified as a floating point
1159 number, or None, to wait indefinitely.
1160 """
1161 if not self.subscribed:
1162 # Wait for subscription
1163 start_time = time.monotonic()
1164 if self.subscribed_event.wait(timeout) is True:
1165 # The connection was subscribed during the timeout time frame.
1166 # The timeout should be adjusted based on the time spent
1167 # waiting for the subscription
1168 time_spent = time.monotonic() - start_time
1169 timeout = max(0.0, timeout - time_spent)
1170 else:
1171 # The connection isn't subscribed to any channels or patterns,
1172 # so no messages are available
1173 return None
1175 response = self.parse_response(block=(timeout is None), timeout=timeout)
1177 if response:
1178 return self.handle_message(response, ignore_subscribe_messages)
1179 return None
1181 get_sharded_message = get_message
1183 def ping(self, message: Union[str, None] = None) -> bool:
1184 """
1185 Ping the Redis server
1186 """
1187 args = ["PING", message] if message is not None else ["PING"]
1188 return self.execute_command(*args)
1190 def handle_message(self, response, ignore_subscribe_messages=False):
1191 """
1192 Parses a pub/sub message. If the channel or pattern was subscribed to
1193 with a message handler, the handler is invoked instead of a parsed
1194 message being returned.
1195 """
1196 if response is None:
1197 return None
1198 if isinstance(response, bytes):
1199 response = [b"pong", response] if response != b"PONG" else [b"pong", b""]
1201 message_type = str_if_bytes(response[0])
1202 if message_type == "pmessage":
1203 message = {
1204 "type": message_type,
1205 "pattern": response[1],
1206 "channel": response[2],
1207 "data": response[3],
1208 }
1209 elif message_type == "pong":
1210 message = {
1211 "type": message_type,
1212 "pattern": None,
1213 "channel": None,
1214 "data": response[1],
1215 }
1216 else:
1217 message = {
1218 "type": message_type,
1219 "pattern": None,
1220 "channel": response[1],
1221 "data": response[2],
1222 }
1224 # if this is an unsubscribe message, remove it from memory
1225 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
1226 if message_type == "punsubscribe":
1227 pattern = response[1]
1228 if pattern in self.pending_unsubscribe_patterns:
1229 self.pending_unsubscribe_patterns.remove(pattern)
1230 self.patterns.pop(pattern, None)
1231 elif message_type == "sunsubscribe":
1232 s_channel = response[1]
1233 if s_channel in self.pending_unsubscribe_shard_channels:
1234 self.pending_unsubscribe_shard_channels.remove(s_channel)
1235 self.shard_channels.pop(s_channel, None)
1236 else:
1237 channel = response[1]
1238 if channel in self.pending_unsubscribe_channels:
1239 self.pending_unsubscribe_channels.remove(channel)
1240 self.channels.pop(channel, None)
1241 if not self.channels and not self.patterns and not self.shard_channels:
1242 # There are no subscriptions anymore, set subscribed_event flag
1243 # to false
1244 self.subscribed_event.clear()
1246 if message_type in self.PUBLISH_MESSAGE_TYPES:
1247 # if there's a message handler, invoke it
1248 if message_type == "pmessage":
1249 handler = self.patterns.get(message["pattern"], None)
1250 elif message_type == "smessage":
1251 handler = self.shard_channels.get(message["channel"], None)
1252 else:
1253 handler = self.channels.get(message["channel"], None)
1254 if handler:
1255 handler(message)
1256 return None
1257 elif message_type != "pong":
1258 # this is a subscribe/unsubscribe message. ignore if we don't
1259 # want them
1260 if ignore_subscribe_messages or self.ignore_subscribe_messages:
1261 return None
1263 return message
1265 def run_in_thread(
1266 self,
1267 sleep_time: float = 0.0,
1268 daemon: bool = False,
1269 exception_handler: Optional[Callable] = None,
1270 ) -> "PubSubWorkerThread":
1271 for channel, handler in self.channels.items():
1272 if handler is None:
1273 raise PubSubError(f"Channel: '{channel}' has no handler registered")
1274 for pattern, handler in self.patterns.items():
1275 if handler is None:
1276 raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
1277 for s_channel, handler in self.shard_channels.items():
1278 if handler is None:
1279 raise PubSubError(
1280 f"Shard Channel: '{s_channel}' has no handler registered"
1281 )
1283 thread = PubSubWorkerThread(
1284 self, sleep_time, daemon=daemon, exception_handler=exception_handler
1285 )
1286 thread.start()
1287 return thread
1290class PubSubWorkerThread(threading.Thread):
1291 def __init__(
1292 self,
1293 pubsub,
1294 sleep_time: float,
1295 daemon: bool = False,
1296 exception_handler: Union[
1297 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
1298 ] = None,
1299 ):
1300 super().__init__()
1301 self.daemon = daemon
1302 self.pubsub = pubsub
1303 self.sleep_time = sleep_time
1304 self.exception_handler = exception_handler
1305 self._running = threading.Event()
1307 def run(self) -> None:
1308 if self._running.is_set():
1309 return
1310 self._running.set()
1311 pubsub = self.pubsub
1312 sleep_time = self.sleep_time
1313 while self._running.is_set():
1314 try:
1315 pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
1316 except BaseException as e:
1317 if self.exception_handler is None:
1318 raise
1319 self.exception_handler(e, pubsub, self)
1320 pubsub.close()
1322 def stop(self) -> None:
1323 # trip the flag so the run loop exits. the run loop will
1324 # close the pubsub connection, which disconnects the socket
1325 # and returns the connection to the pool.
1326 self._running.clear()
1329class Pipeline(Redis):
1330 """
1331 Pipelines provide a way to transmit multiple commands to the Redis server
1332 in one transmission. This is convenient for batch processing, such as
1333 saving all the values in a list to Redis.
1335 All commands executed within a pipeline(when running in transactional mode,
1336 which is the default behavior) are wrapped with MULTI and EXEC
1337 calls. This guarantees all commands executed in the pipeline will be
1338 executed atomically.
1340 Any command raising an exception does *not* halt the execution of
1341 subsequent commands in the pipeline. Instead, the exception is caught
1342 and its instance is placed into the response list returned by execute().
1343 Code iterating over the response list should be able to deal with an
1344 instance of an exception as a potential value. In general, these will be
1345 ResponseError exceptions, such as those raised when issuing a command
1346 on a key of a different datatype.
1347 """
1349 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
1351 def __init__(
1352 self,
1353 connection_pool: ConnectionPool,
1354 response_callbacks,
1355 transaction,
1356 shard_hint,
1357 ):
1358 self.connection_pool = connection_pool
1359 self.connection: Optional[Connection] = None
1360 self.response_callbacks = response_callbacks
1361 self.transaction = transaction
1362 self.shard_hint = shard_hint
1363 self.watching = False
1364 self.command_stack = []
1365 self.scripts: Set[Script] = set()
1366 self.explicit_transaction = False
1368 def __enter__(self) -> "Pipeline":
1369 return self
1371 def __exit__(self, exc_type, exc_value, traceback):
1372 self.reset()
1374 def __del__(self):
1375 try:
1376 self.reset()
1377 except Exception:
1378 pass
1380 def __len__(self) -> int:
1381 return len(self.command_stack)
1383 def __bool__(self) -> bool:
1384 """Pipeline instances should always evaluate to True"""
1385 return True
1387 def reset(self) -> None:
1388 self.command_stack = []
1389 self.scripts = set()
1390 # make sure to reset the connection state in the event that we were
1391 # watching something
1392 if self.watching and self.connection:
1393 try:
1394 # call this manually since our unwatch or
1395 # immediate_execute_command methods can call reset()
1396 self.connection.send_command("UNWATCH")
1397 self.connection.read_response()
1398 except ConnectionError:
1399 # disconnect will also remove any previous WATCHes
1400 self.connection.disconnect()
1401 # clean up the other instance attributes
1402 self.watching = False
1403 self.explicit_transaction = False
1405 # we can safely return the connection to the pool here since we're
1406 # sure we're no longer WATCHing anything
1407 if self.connection:
1408 self.connection_pool.release(self.connection)
1409 self.connection = None
1411 def close(self) -> None:
1412 """Close the pipeline"""
1413 self.reset()
1415 def multi(self) -> None:
1416 """
1417 Start a transactional block of the pipeline after WATCH commands
1418 are issued. End the transactional block with `execute`.
1419 """
1420 if self.explicit_transaction:
1421 raise RedisError("Cannot issue nested calls to MULTI")
1422 if self.command_stack:
1423 raise RedisError(
1424 "Commands without an initial WATCH have already been issued"
1425 )
1426 self.explicit_transaction = True
1428 def execute_command(self, *args, **kwargs):
1429 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
1430 return self.immediate_execute_command(*args, **kwargs)
1431 return self.pipeline_execute_command(*args, **kwargs)
1433 def _disconnect_reset_raise_on_watching(
1434 self,
1435 conn: AbstractConnection,
1436 error: Exception,
1437 ) -> None:
1438 """
1439 Close the connection reset watching state and
1440 raise an exception if we were watching.
1442 The supported exceptions are already checked in the
1443 retry object so we don't need to do it here.
1445 After we disconnect the connection, it will try to reconnect and
1446 do a health check as part of the send_command logic(on connection level).
1447 """
1448 conn.disconnect()
1450 # if we were already watching a variable, the watch is no longer
1451 # valid since this connection has died. raise a WatchError, which
1452 # indicates the user should retry this transaction.
1453 if self.watching:
1454 self.reset()
1455 raise WatchError(
1456 f"A {type(error).__name__} occurred while watching one or more keys"
1457 )
1459 def immediate_execute_command(self, *args, **options):
1460 """
1461 Execute a command immediately, but don't auto-retry on the supported
1462 errors for retry if we're already WATCHing a variable.
1463 Used when issuing WATCH or subsequent commands retrieving their values but before
1464 MULTI is called.
1465 """
1466 command_name = args[0]
1467 conn = self.connection
1468 # if this is the first call, we need a connection
1469 if not conn:
1470 conn = self.connection_pool.get_connection()
1471 self.connection = conn
1473 return conn.retry.call_with_retry(
1474 lambda: self._send_command_parse_response(
1475 conn, command_name, *args, **options
1476 ),
1477 lambda error: self._disconnect_reset_raise_on_watching(conn, error),
1478 )
1480 def pipeline_execute_command(self, *args, **options) -> "Pipeline":
1481 """
1482 Stage a command to be executed when execute() is next called
1484 Returns the current Pipeline object back so commands can be
1485 chained together, such as:
1487 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
1489 At some other point, you can then run: pipe.execute(),
1490 which will execute all commands queued in the pipe.
1491 """
1492 self.command_stack.append((args, options))
1493 return self
1495 def _execute_transaction(
1496 self, connection: Connection, commands, raise_on_error
1497 ) -> List:
1498 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
1499 all_cmds = connection.pack_commands(
1500 [args for args, options in cmds if EMPTY_RESPONSE not in options]
1501 )
1502 connection.send_packed_command(all_cmds)
1503 errors = []
1505 # parse off the response for MULTI
1506 # NOTE: we need to handle ResponseErrors here and continue
1507 # so that we read all the additional command messages from
1508 # the socket
1509 try:
1510 self.parse_response(connection, "_")
1511 except ResponseError as e:
1512 errors.append((0, e))
1514 # and all the other commands
1515 for i, command in enumerate(commands):
1516 if EMPTY_RESPONSE in command[1]:
1517 errors.append((i, command[1][EMPTY_RESPONSE]))
1518 else:
1519 try:
1520 self.parse_response(connection, "_")
1521 except ResponseError as e:
1522 self.annotate_exception(e, i + 1, command[0])
1523 errors.append((i, e))
1525 # parse the EXEC.
1526 try:
1527 response = self.parse_response(connection, "_")
1528 except ExecAbortError:
1529 if errors:
1530 raise errors[0][1]
1531 raise
1533 # EXEC clears any watched keys
1534 self.watching = False
1536 if response is None:
1537 raise WatchError("Watched variable changed.")
1539 # put any parse errors into the response
1540 for i, e in errors:
1541 response.insert(i, e)
1543 if len(response) != len(commands):
1544 self.connection.disconnect()
1545 raise ResponseError(
1546 "Wrong number of response items from pipeline execution"
1547 )
1549 # find any errors in the response and raise if necessary
1550 if raise_on_error:
1551 self.raise_first_error(commands, response)
1553 # We have to run response callbacks manually
1554 data = []
1555 for r, cmd in zip(response, commands):
1556 if not isinstance(r, Exception):
1557 args, options = cmd
1558 # Remove keys entry, it needs only for cache.
1559 options.pop("keys", None)
1560 command_name = args[0]
1561 if command_name in self.response_callbacks:
1562 r = self.response_callbacks[command_name](r, **options)
1563 data.append(r)
1565 return data
1567 def _execute_pipeline(self, connection, commands, raise_on_error):
1568 # build up all commands into a single request to increase network perf
1569 all_cmds = connection.pack_commands([args for args, _ in commands])
1570 connection.send_packed_command(all_cmds)
1572 responses = []
1573 for args, options in commands:
1574 try:
1575 responses.append(self.parse_response(connection, args[0], **options))
1576 except ResponseError as e:
1577 responses.append(e)
1579 if raise_on_error:
1580 self.raise_first_error(commands, responses)
1582 return responses
1584 def raise_first_error(self, commands, response):
1585 for i, r in enumerate(response):
1586 if isinstance(r, ResponseError):
1587 self.annotate_exception(r, i + 1, commands[i][0])
1588 raise r
1590 def annotate_exception(self, exception, number, command):
1591 cmd = " ".join(map(safe_str, command))
1592 msg = (
1593 f"Command # {number} ({truncate_text(cmd)}) of pipeline "
1594 f"caused error: {exception.args[0]}"
1595 )
1596 exception.args = (msg,) + exception.args[1:]
1598 def parse_response(self, connection, command_name, **options):
1599 result = Redis.parse_response(self, connection, command_name, **options)
1600 if command_name in self.UNWATCH_COMMANDS:
1601 self.watching = False
1602 elif command_name == "WATCH":
1603 self.watching = True
1604 return result
1606 def load_scripts(self):
1607 # make sure all scripts that are about to be run on this pipeline exist
1608 scripts = list(self.scripts)
1609 immediate = self.immediate_execute_command
1610 shas = [s.sha for s in scripts]
1611 # we can't use the normal script_* methods because they would just
1612 # get buffered in the pipeline.
1613 exists = immediate("SCRIPT EXISTS", *shas)
1614 if not all(exists):
1615 for s, exist in zip(scripts, exists):
1616 if not exist:
1617 s.sha = immediate("SCRIPT LOAD", s.script)
1619 def _disconnect_raise_on_watching(
1620 self,
1621 conn: AbstractConnection,
1622 error: Exception,
1623 ) -> None:
1624 """
1625 Close the connection, raise an exception if we were watching.
1627 The supported exceptions are already checked in the
1628 retry object so we don't need to do it here.
1630 After we disconnect the connection, it will try to reconnect and
1631 do a health check as part of the send_command logic(on connection level).
1632 """
1633 conn.disconnect()
1634 # if we were watching a variable, the watch is no longer valid
1635 # since this connection has died. raise a WatchError, which
1636 # indicates the user should retry this transaction.
1637 if self.watching:
1638 raise WatchError(
1639 f"A {type(error).__name__} occurred while watching one or more keys"
1640 )
1642 def execute(self, raise_on_error: bool = True) -> List[Any]:
1643 """Execute all the commands in the current pipeline"""
1644 stack = self.command_stack
1645 if not stack and not self.watching:
1646 return []
1647 if self.scripts:
1648 self.load_scripts()
1649 if self.transaction or self.explicit_transaction:
1650 execute = self._execute_transaction
1651 else:
1652 execute = self._execute_pipeline
1654 conn = self.connection
1655 if not conn:
1656 conn = self.connection_pool.get_connection()
1657 # assign to self.connection so reset() releases the connection
1658 # back to the pool after we're done
1659 self.connection = conn
1661 try:
1662 return conn.retry.call_with_retry(
1663 lambda: execute(conn, stack, raise_on_error),
1664 lambda error: self._disconnect_raise_on_watching(conn, error),
1665 )
1666 finally:
1667 # in reset() the connection is disconnected before returned to the pool if
1668 # it is marked for reconnect.
1669 self.reset()
1671 def discard(self):
1672 """
1673 Flushes all previously queued commands
1674 See: https://redis.io/commands/DISCARD
1675 """
1676 self.execute_command("DISCARD")
1678 def watch(self, *names):
1679 """Watches the values at keys ``names``"""
1680 if self.explicit_transaction:
1681 raise RedisError("Cannot issue a WATCH after a MULTI")
1682 return self.execute_command("WATCH", *names)
1684 def unwatch(self) -> bool:
1685 """Unwatches all previously specified keys"""
1686 return self.watching and self.execute_command("UNWATCH") or True