Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tenacity/__init__.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016-2018 Julien Danjou
2# Copyright 2017 Elisey Zanko
3# Copyright 2016 Étienne Bersac
4# Copyright 2016 Joshua Harlow
5# Copyright 2013-2014 Ray Holder
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18import dataclasses
19import functools
20import sys
21import threading
22import time
23import typing as t
24import warnings
25from abc import ABC, abstractmethod
26from concurrent import futures
28from . import _utils
30# Import all built-in retry strategies for easier usage.
31from .retry import retry_base # noqa
32from .retry import retry_all # noqa
33from .retry import retry_always # noqa
34from .retry import retry_any # noqa
35from .retry import retry_if_exception # noqa
36from .retry import retry_if_exception_type # noqa
37from .retry import retry_if_exception_cause_type # noqa
38from .retry import retry_if_not_exception_type # noqa
39from .retry import retry_if_not_result # noqa
40from .retry import retry_if_result # noqa
41from .retry import retry_never # noqa
42from .retry import retry_unless_exception_type # noqa
43from .retry import retry_if_exception_message # noqa
44from .retry import retry_if_not_exception_message # noqa
46# Import all nap strategies for easier usage.
47from .nap import sleep # noqa
48from .nap import sleep_using_event # noqa
50# Import all built-in stop strategies for easier usage.
51from .stop import stop_after_attempt # noqa
52from .stop import stop_after_delay # noqa
53from .stop import stop_before_delay # noqa
54from .stop import stop_all # noqa
55from .stop import stop_any # noqa
56from .stop import stop_never # noqa
57from .stop import stop_when_event_set # noqa
59# Import all built-in wait strategies for easier usage.
60from .wait import wait_chain # noqa
61from .wait import wait_combine # noqa
62from .wait import wait_exponential # noqa
63from .wait import wait_fixed # noqa
64from .wait import wait_incrementing # noqa
65from .wait import wait_none # noqa
66from .wait import wait_random # noqa
67from .wait import wait_random_exponential # noqa
68from .wait import wait_random_exponential as wait_full_jitter # noqa
69from .wait import wait_exponential_jitter # noqa
71# Import all built-in before strategies for easier usage.
72from .before import before_log # noqa
73from .before import before_nothing # noqa
75# Import all built-in after strategies for easier usage.
76from .after import after_log # noqa
77from .after import after_nothing # noqa
79# Import all built-in before sleep strategies for easier usage.
80from .before_sleep import before_sleep_log # noqa
81from .before_sleep import before_sleep_nothing # noqa
83try:
84 import tornado
85except ImportError:
86 tornado = None
88if t.TYPE_CHECKING:
89 import types
91 from typing_extensions import Self
93 from . import asyncio as tasyncio
94 from .retry import RetryBaseT
95 from .stop import StopBaseT
96 from .wait import WaitBaseT
99WrappedFnReturnT = t.TypeVar("WrappedFnReturnT")
100WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Any])
103dataclass_kwargs = {}
104if sys.version_info >= (3, 10):
105 dataclass_kwargs.update({"slots": True})
108@dataclasses.dataclass(**dataclass_kwargs)
109class IterState:
110 actions: t.List[t.Callable[["RetryCallState"], t.Any]] = dataclasses.field(
111 default_factory=list
112 )
113 retry_run_result: bool = False
114 delay_since_first_attempt: int = 0
115 stop_run_result: bool = False
116 is_explicit_retry: bool = False
118 def reset(self) -> None:
119 self.actions = []
120 self.retry_run_result = False
121 self.delay_since_first_attempt = 0
122 self.stop_run_result = False
123 self.is_explicit_retry = False
126class TryAgain(Exception):
127 """Always retry the executed function when raised."""
130NO_RESULT = object()
133class DoAttempt:
134 pass
137class DoSleep(float):
138 pass
141class BaseAction:
142 """Base class for representing actions to take by retry object.
144 Concrete implementations must define:
145 - __init__: to initialize all necessary fields
146 - REPR_FIELDS: class variable specifying attributes to include in repr(self)
147 - NAME: for identification in retry object methods and callbacks
148 """
150 REPR_FIELDS: t.Sequence[str] = ()
151 NAME: t.Optional[str] = None
153 def __repr__(self) -> str:
154 state_str = ", ".join(
155 f"{field}={getattr(self, field)!r}" for field in self.REPR_FIELDS
156 )
157 return f"{self.__class__.__name__}({state_str})"
159 def __str__(self) -> str:
160 return repr(self)
163class RetryAction(BaseAction):
164 REPR_FIELDS = ("sleep",)
165 NAME = "retry"
167 def __init__(self, sleep: t.SupportsFloat) -> None:
168 self.sleep = float(sleep)
171_unset = object()
174def _first_set(first: t.Union[t.Any, object], second: t.Any) -> t.Any:
175 return second if first is _unset else first
178class RetryError(Exception):
179 """Encapsulates the last attempt instance right before giving up."""
181 def __init__(self, last_attempt: "Future") -> None:
182 self.last_attempt = last_attempt
183 super().__init__(last_attempt)
185 def reraise(self) -> t.NoReturn:
186 if self.last_attempt.failed:
187 raise self.last_attempt.result()
188 raise self
190 def __str__(self) -> str:
191 return f"{self.__class__.__name__}[{self.last_attempt}]"
194class AttemptManager:
195 """Manage attempt context."""
197 def __init__(self, retry_state: "RetryCallState"):
198 self.retry_state = retry_state
200 def __enter__(self) -> None:
201 pass
203 def __exit__(
204 self,
205 exc_type: t.Optional[t.Type[BaseException]],
206 exc_value: t.Optional[BaseException],
207 traceback: t.Optional["types.TracebackType"],
208 ) -> t.Optional[bool]:
209 if exc_type is not None and exc_value is not None:
210 self.retry_state.set_exception((exc_type, exc_value, traceback))
211 return True # Swallow exception.
212 else:
213 # We don't have the result, actually.
214 self.retry_state.set_result(None)
215 return None
218class BaseRetrying(ABC):
219 def __init__(
220 self,
221 sleep: t.Callable[[t.Union[int, float]], None] = sleep,
222 stop: "StopBaseT" = stop_never,
223 wait: "WaitBaseT" = wait_none(),
224 retry: "RetryBaseT" = retry_if_exception_type(),
225 before: t.Callable[["RetryCallState"], None] = before_nothing,
226 after: t.Callable[["RetryCallState"], None] = after_nothing,
227 before_sleep: t.Optional[t.Callable[["RetryCallState"], None]] = None,
228 reraise: bool = False,
229 retry_error_cls: t.Type[RetryError] = RetryError,
230 retry_error_callback: t.Optional[t.Callable[["RetryCallState"], t.Any]] = None,
231 ):
232 self.sleep = sleep
233 self.stop = stop
234 self.wait = wait
235 self.retry = retry
236 self.before = before
237 self.after = after
238 self.before_sleep = before_sleep
239 self.reraise = reraise
240 self._local = threading.local()
241 self.retry_error_cls = retry_error_cls
242 self.retry_error_callback = retry_error_callback
244 def copy(
245 self,
246 sleep: t.Union[t.Callable[[t.Union[int, float]], None], object] = _unset,
247 stop: t.Union["StopBaseT", object] = _unset,
248 wait: t.Union["WaitBaseT", object] = _unset,
249 retry: t.Union[retry_base, object] = _unset,
250 before: t.Union[t.Callable[["RetryCallState"], None], object] = _unset,
251 after: t.Union[t.Callable[["RetryCallState"], None], object] = _unset,
252 before_sleep: t.Union[
253 t.Optional[t.Callable[["RetryCallState"], None]], object
254 ] = _unset,
255 reraise: t.Union[bool, object] = _unset,
256 retry_error_cls: t.Union[t.Type[RetryError], object] = _unset,
257 retry_error_callback: t.Union[
258 t.Optional[t.Callable[["RetryCallState"], t.Any]], object
259 ] = _unset,
260 ) -> "Self":
261 """Copy this object with some parameters changed if needed."""
262 return self.__class__(
263 sleep=_first_set(sleep, self.sleep),
264 stop=_first_set(stop, self.stop),
265 wait=_first_set(wait, self.wait),
266 retry=_first_set(retry, self.retry),
267 before=_first_set(before, self.before),
268 after=_first_set(after, self.after),
269 before_sleep=_first_set(before_sleep, self.before_sleep),
270 reraise=_first_set(reraise, self.reraise),
271 retry_error_cls=_first_set(retry_error_cls, self.retry_error_cls),
272 retry_error_callback=_first_set(
273 retry_error_callback, self.retry_error_callback
274 ),
275 )
277 def __repr__(self) -> str:
278 return (
279 f"<{self.__class__.__name__} object at 0x{id(self):x} ("
280 f"stop={self.stop}, "
281 f"wait={self.wait}, "
282 f"sleep={self.sleep}, "
283 f"retry={self.retry}, "
284 f"before={self.before}, "
285 f"after={self.after})>"
286 )
288 @property
289 def statistics(self) -> t.Dict[str, t.Any]:
290 """Return a dictionary of runtime statistics.
292 This dictionary will be empty when the controller has never been
293 ran. When it is running or has ran previously it should have (but
294 may not) have useful and/or informational keys and values when
295 running is underway and/or completed.
297 .. warning:: The keys in this dictionary **should** be some what
298 stable (not changing), but there existence **may**
299 change between major releases as new statistics are
300 gathered or removed so before accessing keys ensure that
301 they actually exist and handle when they do not.
303 .. note:: The values in this dictionary are local to the thread
304 running call (so if multiple threads share the same retrying
305 object - either directly or indirectly) they will each have
306 there own view of statistics they have collected (in the
307 future we may provide a way to aggregate the various
308 statistics from each thread).
309 """
310 try:
311 return self._local.statistics # type: ignore[no-any-return]
312 except AttributeError:
313 self._local.statistics = t.cast(t.Dict[str, t.Any], {})
314 return self._local.statistics
316 @property
317 def iter_state(self) -> IterState:
318 try:
319 return self._local.iter_state # type: ignore[no-any-return]
320 except AttributeError:
321 self._local.iter_state = IterState()
322 return self._local.iter_state
324 def wraps(self, f: WrappedFn) -> WrappedFn:
325 """Wrap a function for retrying.
327 :param f: A function to wraps for retrying.
328 """
330 @functools.wraps(
331 f, functools.WRAPPER_ASSIGNMENTS + ("__defaults__", "__kwdefaults__")
332 )
333 def wrapped_f(*args: t.Any, **kw: t.Any) -> t.Any:
334 # Always create a copy to prevent overwriting the local contexts when
335 # calling the same wrapped functions multiple times in the same stack
336 copy = self.copy()
337 wrapped_f.statistics = copy.statistics # type: ignore[attr-defined]
338 return copy(f, *args, **kw)
340 def retry_with(*args: t.Any, **kwargs: t.Any) -> WrappedFn:
341 return self.copy(*args, **kwargs).wraps(f)
343 # Preserve attributes
344 wrapped_f.retry = self # type: ignore[attr-defined]
345 wrapped_f.retry_with = retry_with # type: ignore[attr-defined]
346 wrapped_f.statistics = {} # type: ignore[attr-defined]
348 return wrapped_f # type: ignore[return-value]
350 def begin(self) -> None:
351 self.statistics.clear()
352 self.statistics["start_time"] = time.monotonic()
353 self.statistics["attempt_number"] = 1
354 self.statistics["idle_for"] = 0
356 def _add_action_func(self, fn: t.Callable[..., t.Any]) -> None:
357 self.iter_state.actions.append(fn)
359 def _run_retry(self, retry_state: "RetryCallState") -> None:
360 self.iter_state.retry_run_result = self.retry(retry_state)
362 def _run_wait(self, retry_state: "RetryCallState") -> None:
363 if self.wait:
364 sleep = self.wait(retry_state)
365 else:
366 sleep = 0.0
368 retry_state.upcoming_sleep = sleep
370 def _run_stop(self, retry_state: "RetryCallState") -> None:
371 self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start
372 self.iter_state.stop_run_result = self.stop(retry_state)
374 def iter(self, retry_state: "RetryCallState") -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa
375 self._begin_iter(retry_state)
376 result = None
377 for action in self.iter_state.actions:
378 result = action(retry_state)
379 return result
381 def _begin_iter(self, retry_state: "RetryCallState") -> None: # noqa
382 self.iter_state.reset()
384 fut = retry_state.outcome
385 if fut is None:
386 if self.before is not None:
387 self._add_action_func(self.before)
388 self._add_action_func(lambda rs: DoAttempt())
389 return
391 self.iter_state.is_explicit_retry = fut.failed and isinstance(
392 fut.exception(), TryAgain
393 )
394 if not self.iter_state.is_explicit_retry:
395 self._add_action_func(self._run_retry)
396 self._add_action_func(self._post_retry_check_actions)
398 def _post_retry_check_actions(self, retry_state: "RetryCallState") -> None:
399 if not (self.iter_state.is_explicit_retry or self.iter_state.retry_run_result):
400 self._add_action_func(lambda rs: rs.outcome.result())
401 return
403 if self.after is not None:
404 self._add_action_func(self.after)
406 self._add_action_func(self._run_wait)
407 self._add_action_func(self._run_stop)
408 self._add_action_func(self._post_stop_check_actions)
410 def _post_stop_check_actions(self, retry_state: "RetryCallState") -> None:
411 if self.iter_state.stop_run_result:
412 if self.retry_error_callback:
413 self._add_action_func(self.retry_error_callback)
414 return
416 def exc_check(rs: "RetryCallState") -> None:
417 fut = t.cast(Future, rs.outcome)
418 retry_exc = self.retry_error_cls(fut)
419 if self.reraise:
420 raise retry_exc.reraise()
421 raise retry_exc from fut.exception()
423 self._add_action_func(exc_check)
424 return
426 def next_action(rs: "RetryCallState") -> None:
427 sleep = rs.upcoming_sleep
428 rs.next_action = RetryAction(sleep)
429 rs.idle_for += sleep
430 self.statistics["idle_for"] += sleep
431 self.statistics["attempt_number"] += 1
433 self._add_action_func(next_action)
435 if self.before_sleep is not None:
436 self._add_action_func(self.before_sleep)
438 self._add_action_func(lambda rs: DoSleep(rs.upcoming_sleep))
440 def __iter__(self) -> t.Generator[AttemptManager, None, None]:
441 self.begin()
443 retry_state = RetryCallState(self, fn=None, args=(), kwargs={})
444 while True:
445 do = self.iter(retry_state=retry_state)
446 if isinstance(do, DoAttempt):
447 yield AttemptManager(retry_state=retry_state)
448 elif isinstance(do, DoSleep):
449 retry_state.prepare_for_next_attempt()
450 self.sleep(do)
451 else:
452 break
454 @abstractmethod
455 def __call__(
456 self,
457 fn: t.Callable[..., WrappedFnReturnT],
458 *args: t.Any,
459 **kwargs: t.Any,
460 ) -> WrappedFnReturnT:
461 pass
464class Retrying(BaseRetrying):
465 """Retrying controller."""
467 def __call__(
468 self,
469 fn: t.Callable[..., WrappedFnReturnT],
470 *args: t.Any,
471 **kwargs: t.Any,
472 ) -> WrappedFnReturnT:
473 self.begin()
475 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
476 while True:
477 do = self.iter(retry_state=retry_state)
478 if isinstance(do, DoAttempt):
479 try:
480 result = fn(*args, **kwargs)
481 except BaseException: # noqa: B902
482 retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type]
483 else:
484 retry_state.set_result(result)
485 elif isinstance(do, DoSleep):
486 retry_state.prepare_for_next_attempt()
487 self.sleep(do)
488 else:
489 return do # type: ignore[no-any-return]
492if sys.version_info >= (3, 9):
493 FutureGenericT = futures.Future[t.Any]
494else:
495 FutureGenericT = futures.Future
498class Future(FutureGenericT):
499 """Encapsulates a (future or past) attempted call to a target function."""
501 def __init__(self, attempt_number: int) -> None:
502 super().__init__()
503 self.attempt_number = attempt_number
505 @property
506 def failed(self) -> bool:
507 """Return whether a exception is being held in this future."""
508 return self.exception() is not None
510 @classmethod
511 def construct(
512 cls, attempt_number: int, value: t.Any, has_exception: bool
513 ) -> "Future":
514 """Construct a new Future object."""
515 fut = cls(attempt_number)
516 if has_exception:
517 fut.set_exception(value)
518 else:
519 fut.set_result(value)
520 return fut
523class RetryCallState:
524 """State related to a single call wrapped with Retrying."""
526 def __init__(
527 self,
528 retry_object: BaseRetrying,
529 fn: t.Optional[WrappedFn],
530 args: t.Any,
531 kwargs: t.Any,
532 ) -> None:
533 #: Retry call start timestamp
534 self.start_time = time.monotonic()
535 #: Retry manager object
536 self.retry_object = retry_object
537 #: Function wrapped by this retry call
538 self.fn = fn
539 #: Arguments of the function wrapped by this retry call
540 self.args = args
541 #: Keyword arguments of the function wrapped by this retry call
542 self.kwargs = kwargs
544 #: The number of the current attempt
545 self.attempt_number: int = 1
546 #: Last outcome (result or exception) produced by the function
547 self.outcome: t.Optional[Future] = None
548 #: Timestamp of the last outcome
549 self.outcome_timestamp: t.Optional[float] = None
550 #: Time spent sleeping in retries
551 self.idle_for: float = 0.0
552 #: Next action as decided by the retry manager
553 self.next_action: t.Optional[RetryAction] = None
554 #: Next sleep time as decided by the retry manager.
555 self.upcoming_sleep: float = 0.0
557 @property
558 def seconds_since_start(self) -> t.Optional[float]:
559 if self.outcome_timestamp is None:
560 return None
561 return self.outcome_timestamp - self.start_time
563 def prepare_for_next_attempt(self) -> None:
564 self.outcome = None
565 self.outcome_timestamp = None
566 self.attempt_number += 1
567 self.next_action = None
569 def set_result(self, val: t.Any) -> None:
570 ts = time.monotonic()
571 fut = Future(self.attempt_number)
572 fut.set_result(val)
573 self.outcome, self.outcome_timestamp = fut, ts
575 def set_exception(
576 self,
577 exc_info: t.Tuple[
578 t.Type[BaseException], BaseException, "types.TracebackType| None"
579 ],
580 ) -> None:
581 ts = time.monotonic()
582 fut = Future(self.attempt_number)
583 fut.set_exception(exc_info[1])
584 self.outcome, self.outcome_timestamp = fut, ts
586 def __repr__(self) -> str:
587 if self.outcome is None:
588 result = "none yet"
589 elif self.outcome.failed:
590 exception = self.outcome.exception()
591 result = f"failed ({exception.__class__.__name__} {exception})"
592 else:
593 result = f"returned {self.outcome.result()}"
595 slept = float(round(self.idle_for, 2))
596 clsname = self.__class__.__name__
597 return f"<{clsname} {id(self)}: attempt #{self.attempt_number}; slept for {slept}; last result: {result}>"
600@t.overload
601def retry(func: WrappedFn) -> WrappedFn: ...
604@t.overload
605def retry(
606 sleep: t.Callable[[t.Union[int, float]], t.Union[None, t.Awaitable[None]]] = sleep,
607 stop: "StopBaseT" = stop_never,
608 wait: "WaitBaseT" = wait_none(),
609 retry: "t.Union[RetryBaseT, tasyncio.retry.RetryBaseT]" = retry_if_exception_type(),
610 before: t.Callable[
611 ["RetryCallState"], t.Union[None, t.Awaitable[None]]
612 ] = before_nothing,
613 after: t.Callable[
614 ["RetryCallState"], t.Union[None, t.Awaitable[None]]
615 ] = after_nothing,
616 before_sleep: t.Optional[
617 t.Callable[["RetryCallState"], t.Union[None, t.Awaitable[None]]]
618 ] = None,
619 reraise: bool = False,
620 retry_error_cls: t.Type["RetryError"] = RetryError,
621 retry_error_callback: t.Optional[
622 t.Callable[["RetryCallState"], t.Union[t.Any, t.Awaitable[t.Any]]]
623 ] = None,
624) -> t.Callable[[WrappedFn], WrappedFn]: ...
627def retry(*dargs: t.Any, **dkw: t.Any) -> t.Any:
628 """Wrap a function with a new `Retrying` object.
630 :param dargs: positional arguments passed to Retrying object
631 :param dkw: keyword arguments passed to the Retrying object
632 """
633 # support both @retry and @retry() as valid syntax
634 if len(dargs) == 1 and callable(dargs[0]):
635 return retry()(dargs[0])
636 else:
638 def wrap(f: WrappedFn) -> WrappedFn:
639 if isinstance(f, retry_base):
640 warnings.warn(
641 f"Got retry_base instance ({f.__class__.__name__}) as callable argument, "
642 f"this will probably hang indefinitely (did you mean retry={f.__class__.__name__}(...)?)"
643 )
644 r: "BaseRetrying"
645 if _utils.is_coroutine_callable(f):
646 r = AsyncRetrying(*dargs, **dkw)
647 elif (
648 tornado
649 and hasattr(tornado.gen, "is_coroutine_function")
650 and tornado.gen.is_coroutine_function(f)
651 ):
652 r = TornadoRetrying(*dargs, **dkw)
653 else:
654 r = Retrying(*dargs, **dkw)
656 return r.wraps(f)
658 return wrap
661from tenacity.asyncio import AsyncRetrying # noqa:E402,I100
663if tornado:
664 from tenacity.tornadoweb import TornadoRetrying
667__all__ = [
668 "retry_base",
669 "retry_all",
670 "retry_always",
671 "retry_any",
672 "retry_if_exception",
673 "retry_if_exception_type",
674 "retry_if_exception_cause_type",
675 "retry_if_not_exception_type",
676 "retry_if_not_result",
677 "retry_if_result",
678 "retry_never",
679 "retry_unless_exception_type",
680 "retry_if_exception_message",
681 "retry_if_not_exception_message",
682 "sleep",
683 "sleep_using_event",
684 "stop_after_attempt",
685 "stop_after_delay",
686 "stop_before_delay",
687 "stop_all",
688 "stop_any",
689 "stop_never",
690 "stop_when_event_set",
691 "wait_chain",
692 "wait_combine",
693 "wait_exponential",
694 "wait_fixed",
695 "wait_incrementing",
696 "wait_none",
697 "wait_random",
698 "wait_random_exponential",
699 "wait_full_jitter",
700 "wait_exponential_jitter",
701 "before_log",
702 "before_nothing",
703 "after_log",
704 "after_nothing",
705 "before_sleep_log",
706 "before_sleep_nothing",
707 "retry",
708 "WrappedFn",
709 "TryAgain",
710 "NO_RESULT",
711 "DoAttempt",
712 "DoSleep",
713 "BaseAction",
714 "RetryAction",
715 "RetryError",
716 "AttemptManager",
717 "BaseRetrying",
718 "Retrying",
719 "Future",
720 "RetryCallState",
721 "AsyncRetrying",
722]