1# Copyright 2016-2018 Julien Danjou
2# Copyright 2017 Elisey Zanko
3# Copyright 2016 Étienne Bersac
4# Copyright 2016 Joshua Harlow
5# Copyright 2013-2014 Ray Holder
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18import dataclasses
19import functools
20import sys
21import threading
22import time
23import typing as t
24import warnings
25from abc import ABC, abstractmethod
26from concurrent import futures
27from inspect import iscoroutinefunction
28
29# Import all built-in retry strategies for easier usage.
30from .retry import retry_base # noqa
31from .retry import retry_all # noqa
32from .retry import retry_always # noqa
33from .retry import retry_any # noqa
34from .retry import retry_if_exception # noqa
35from .retry import retry_if_exception_type # noqa
36from .retry import retry_if_exception_cause_type # noqa
37from .retry import retry_if_not_exception_type # noqa
38from .retry import retry_if_not_result # noqa
39from .retry import retry_if_result # noqa
40from .retry import retry_never # noqa
41from .retry import retry_unless_exception_type # noqa
42from .retry import retry_if_exception_message # noqa
43from .retry import retry_if_not_exception_message # noqa
44
45# Import all nap strategies for easier usage.
46from .nap import sleep # noqa
47from .nap import sleep_using_event # noqa
48
49# Import all built-in stop strategies for easier usage.
50from .stop import stop_after_attempt # noqa
51from .stop import stop_after_delay # noqa
52from .stop import stop_before_delay # noqa
53from .stop import stop_all # noqa
54from .stop import stop_any # noqa
55from .stop import stop_never # noqa
56from .stop import stop_when_event_set # noqa
57
58# Import all built-in wait strategies for easier usage.
59from .wait import wait_chain # noqa
60from .wait import wait_combine # noqa
61from .wait import wait_exponential # noqa
62from .wait import wait_fixed # noqa
63from .wait import wait_incrementing # noqa
64from .wait import wait_none # noqa
65from .wait import wait_random # noqa
66from .wait import wait_random_exponential # noqa
67from .wait import wait_random_exponential as wait_full_jitter # noqa
68from .wait import wait_exponential_jitter # noqa
69
70# Import all built-in before strategies for easier usage.
71from .before import before_log # noqa
72from .before import before_nothing # noqa
73
74# Import all built-in after strategies for easier usage.
75from .after import after_log # noqa
76from .after import after_nothing # noqa
77
78# Import all built-in after strategies for easier usage.
79from .before_sleep import before_sleep_log # noqa
80from .before_sleep import before_sleep_nothing # noqa
81
82try:
83 import tornado
84except ImportError:
85 tornado = None
86
87if t.TYPE_CHECKING:
88 import types
89
90 from .retry import RetryBaseT
91 from .stop import StopBaseT
92 from .wait import WaitBaseT
93
94
95WrappedFnReturnT = t.TypeVar("WrappedFnReturnT")
96WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Any])
97
98
99dataclass_kwargs = {}
100if sys.version_info >= (3, 10):
101 dataclass_kwargs.update({"slots": True})
102
103
104@dataclasses.dataclass(**dataclass_kwargs)
105class IterState:
106 actions: t.List[t.Callable[["RetryCallState"], t.Any]] = dataclasses.field(
107 default_factory=list
108 )
109 retry_run_result: bool = False
110 delay_since_first_attempt: int = 0
111 stop_run_result: bool = False
112 is_explicit_retry: bool = False
113
114 def reset(self) -> None:
115 self.actions = []
116 self.retry_run_result = False
117 self.delay_since_first_attempt = 0
118 self.stop_run_result = False
119 self.is_explicit_retry = False
120
121
122class TryAgain(Exception):
123 """Always retry the executed function when raised."""
124
125
126NO_RESULT = object()
127
128
129class DoAttempt:
130 pass
131
132
133class DoSleep(float):
134 pass
135
136
137class BaseAction:
138 """Base class for representing actions to take by retry object.
139
140 Concrete implementations must define:
141 - __init__: to initialize all necessary fields
142 - REPR_FIELDS: class variable specifying attributes to include in repr(self)
143 - NAME: for identification in retry object methods and callbacks
144 """
145
146 REPR_FIELDS: t.Sequence[str] = ()
147 NAME: t.Optional[str] = None
148
149 def __repr__(self) -> str:
150 state_str = ", ".join(
151 f"{field}={getattr(self, field)!r}" for field in self.REPR_FIELDS
152 )
153 return f"{self.__class__.__name__}({state_str})"
154
155 def __str__(self) -> str:
156 return repr(self)
157
158
159class RetryAction(BaseAction):
160 REPR_FIELDS = ("sleep",)
161 NAME = "retry"
162
163 def __init__(self, sleep: t.SupportsFloat) -> None:
164 self.sleep = float(sleep)
165
166
167_unset = object()
168
169
170def _first_set(first: t.Union[t.Any, object], second: t.Any) -> t.Any:
171 return second if first is _unset else first
172
173
174class RetryError(Exception):
175 """Encapsulates the last attempt instance right before giving up."""
176
177 def __init__(self, last_attempt: "Future") -> None:
178 self.last_attempt = last_attempt
179 super().__init__(last_attempt)
180
181 def reraise(self) -> t.NoReturn:
182 if self.last_attempt.failed:
183 raise self.last_attempt.result()
184 raise self
185
186 def __str__(self) -> str:
187 return f"{self.__class__.__name__}[{self.last_attempt}]"
188
189
190class AttemptManager:
191 """Manage attempt context."""
192
193 def __init__(self, retry_state: "RetryCallState"):
194 self.retry_state = retry_state
195
196 def __enter__(self) -> None:
197 pass
198
199 def __exit__(
200 self,
201 exc_type: t.Optional[t.Type[BaseException]],
202 exc_value: t.Optional[BaseException],
203 traceback: t.Optional["types.TracebackType"],
204 ) -> t.Optional[bool]:
205 if exc_type is not None and exc_value is not None:
206 self.retry_state.set_exception((exc_type, exc_value, traceback))
207 return True # Swallow exception.
208 else:
209 # We don't have the result, actually.
210 self.retry_state.set_result(None)
211 return None
212
213
214class BaseRetrying(ABC):
215 def __init__(
216 self,
217 sleep: t.Callable[[t.Union[int, float]], None] = sleep,
218 stop: "StopBaseT" = stop_never,
219 wait: "WaitBaseT" = wait_none(),
220 retry: "RetryBaseT" = retry_if_exception_type(),
221 before: t.Callable[["RetryCallState"], None] = before_nothing,
222 after: t.Callable[["RetryCallState"], None] = after_nothing,
223 before_sleep: t.Optional[t.Callable[["RetryCallState"], None]] = None,
224 reraise: bool = False,
225 retry_error_cls: t.Type[RetryError] = RetryError,
226 retry_error_callback: t.Optional[t.Callable[["RetryCallState"], t.Any]] = None,
227 ):
228 self.sleep = sleep
229 self.stop = stop
230 self.wait = wait
231 self.retry = retry
232 self.before = before
233 self.after = after
234 self.before_sleep = before_sleep
235 self.reraise = reraise
236 self._local = threading.local()
237 self.retry_error_cls = retry_error_cls
238 self.retry_error_callback = retry_error_callback
239
240 def copy(
241 self,
242 sleep: t.Union[t.Callable[[t.Union[int, float]], None], object] = _unset,
243 stop: t.Union["StopBaseT", object] = _unset,
244 wait: t.Union["WaitBaseT", object] = _unset,
245 retry: t.Union[retry_base, object] = _unset,
246 before: t.Union[t.Callable[["RetryCallState"], None], object] = _unset,
247 after: t.Union[t.Callable[["RetryCallState"], None], object] = _unset,
248 before_sleep: t.Union[
249 t.Optional[t.Callable[["RetryCallState"], None]], object
250 ] = _unset,
251 reraise: t.Union[bool, object] = _unset,
252 retry_error_cls: t.Union[t.Type[RetryError], object] = _unset,
253 retry_error_callback: t.Union[
254 t.Optional[t.Callable[["RetryCallState"], t.Any]], object
255 ] = _unset,
256 ) -> "BaseRetrying":
257 """Copy this object with some parameters changed if needed."""
258 return self.__class__(
259 sleep=_first_set(sleep, self.sleep),
260 stop=_first_set(stop, self.stop),
261 wait=_first_set(wait, self.wait),
262 retry=_first_set(retry, self.retry),
263 before=_first_set(before, self.before),
264 after=_first_set(after, self.after),
265 before_sleep=_first_set(before_sleep, self.before_sleep),
266 reraise=_first_set(reraise, self.reraise),
267 retry_error_cls=_first_set(retry_error_cls, self.retry_error_cls),
268 retry_error_callback=_first_set(
269 retry_error_callback, self.retry_error_callback
270 ),
271 )
272
273 def __repr__(self) -> str:
274 return (
275 f"<{self.__class__.__name__} object at 0x{id(self):x} ("
276 f"stop={self.stop}, "
277 f"wait={self.wait}, "
278 f"sleep={self.sleep}, "
279 f"retry={self.retry}, "
280 f"before={self.before}, "
281 f"after={self.after})>"
282 )
283
284 @property
285 def statistics(self) -> t.Dict[str, t.Any]:
286 """Return a dictionary of runtime statistics.
287
288 This dictionary will be empty when the controller has never been
289 ran. When it is running or has ran previously it should have (but
290 may not) have useful and/or informational keys and values when
291 running is underway and/or completed.
292
293 .. warning:: The keys in this dictionary **should** be some what
294 stable (not changing), but there existence **may**
295 change between major releases as new statistics are
296 gathered or removed so before accessing keys ensure that
297 they actually exist and handle when they do not.
298
299 .. note:: The values in this dictionary are local to the thread
300 running call (so if multiple threads share the same retrying
301 object - either directly or indirectly) they will each have
302 there own view of statistics they have collected (in the
303 future we may provide a way to aggregate the various
304 statistics from each thread).
305 """
306 try:
307 return self._local.statistics # type: ignore[no-any-return]
308 except AttributeError:
309 self._local.statistics = t.cast(t.Dict[str, t.Any], {})
310 return self._local.statistics
311
312 @property
313 def iter_state(self) -> IterState:
314 try:
315 return self._local.iter_state # type: ignore[no-any-return]
316 except AttributeError:
317 self._local.iter_state = IterState()
318 return self._local.iter_state
319
320 def wraps(self, f: WrappedFn) -> WrappedFn:
321 """Wrap a function for retrying.
322
323 :param f: A function to wraps for retrying.
324 """
325
326 @functools.wraps(
327 f, functools.WRAPPER_ASSIGNMENTS + ("__defaults__", "__kwdefaults__")
328 )
329 def wrapped_f(*args: t.Any, **kw: t.Any) -> t.Any:
330 return self(f, *args, **kw)
331
332 def retry_with(*args: t.Any, **kwargs: t.Any) -> WrappedFn:
333 return self.copy(*args, **kwargs).wraps(f)
334
335 wrapped_f.retry = self # type: ignore[attr-defined]
336 wrapped_f.retry_with = retry_with # type: ignore[attr-defined]
337
338 return wrapped_f # type: ignore[return-value]
339
340 def begin(self) -> None:
341 self.statistics.clear()
342 self.statistics["start_time"] = time.monotonic()
343 self.statistics["attempt_number"] = 1
344 self.statistics["idle_for"] = 0
345
346 def _add_action_func(self, fn: t.Callable[..., t.Any]) -> None:
347 self.iter_state.actions.append(fn)
348
349 def _run_retry(self, retry_state: "RetryCallState") -> None:
350 self.iter_state.retry_run_result = self.retry(retry_state)
351
352 def _run_wait(self, retry_state: "RetryCallState") -> None:
353 if self.wait:
354 sleep = self.wait(retry_state)
355 else:
356 sleep = 0.0
357
358 retry_state.upcoming_sleep = sleep
359
360 def _run_stop(self, retry_state: "RetryCallState") -> None:
361 self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start
362 self.iter_state.stop_run_result = self.stop(retry_state)
363
364 def iter(self, retry_state: "RetryCallState") -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa
365 self._begin_iter(retry_state)
366 result = None
367 for action in self.iter_state.actions:
368 result = action(retry_state)
369 return result
370
371 def _begin_iter(self, retry_state: "RetryCallState") -> None: # noqa
372 self.iter_state.reset()
373
374 fut = retry_state.outcome
375 if fut is None:
376 if self.before is not None:
377 self._add_action_func(self.before)
378 self._add_action_func(lambda rs: DoAttempt())
379 return
380
381 self.iter_state.is_explicit_retry = fut.failed and isinstance(
382 fut.exception(), TryAgain
383 )
384 if not self.iter_state.is_explicit_retry:
385 self._add_action_func(self._run_retry)
386 self._add_action_func(self._post_retry_check_actions)
387
388 def _post_retry_check_actions(self, retry_state: "RetryCallState") -> None:
389 if not (self.iter_state.is_explicit_retry or self.iter_state.retry_run_result):
390 self._add_action_func(lambda rs: rs.outcome.result())
391 return
392
393 if self.after is not None:
394 self._add_action_func(self.after)
395
396 self._add_action_func(self._run_wait)
397 self._add_action_func(self._run_stop)
398 self._add_action_func(self._post_stop_check_actions)
399
400 def _post_stop_check_actions(self, retry_state: "RetryCallState") -> None:
401 if self.iter_state.stop_run_result:
402 if self.retry_error_callback:
403 self._add_action_func(self.retry_error_callback)
404 return
405
406 def exc_check(rs: "RetryCallState") -> None:
407 fut = t.cast(Future, rs.outcome)
408 retry_exc = self.retry_error_cls(fut)
409 if self.reraise:
410 raise retry_exc.reraise()
411 raise retry_exc from fut.exception()
412
413 self._add_action_func(exc_check)
414 return
415
416 def next_action(rs: "RetryCallState") -> None:
417 sleep = rs.upcoming_sleep
418 rs.next_action = RetryAction(sleep)
419 rs.idle_for += sleep
420 self.statistics["idle_for"] += sleep
421 self.statistics["attempt_number"] += 1
422
423 self._add_action_func(next_action)
424
425 if self.before_sleep is not None:
426 self._add_action_func(self.before_sleep)
427
428 self._add_action_func(lambda rs: DoSleep(rs.upcoming_sleep))
429
430 def __iter__(self) -> t.Generator[AttemptManager, None, None]:
431 self.begin()
432
433 retry_state = RetryCallState(self, fn=None, args=(), kwargs={})
434 while True:
435 do = self.iter(retry_state=retry_state)
436 if isinstance(do, DoAttempt):
437 yield AttemptManager(retry_state=retry_state)
438 elif isinstance(do, DoSleep):
439 retry_state.prepare_for_next_attempt()
440 self.sleep(do)
441 else:
442 break
443
444 @abstractmethod
445 def __call__(
446 self,
447 fn: t.Callable[..., WrappedFnReturnT],
448 *args: t.Any,
449 **kwargs: t.Any,
450 ) -> WrappedFnReturnT:
451 pass
452
453
454class Retrying(BaseRetrying):
455 """Retrying controller."""
456
457 def __call__(
458 self,
459 fn: t.Callable[..., WrappedFnReturnT],
460 *args: t.Any,
461 **kwargs: t.Any,
462 ) -> WrappedFnReturnT:
463 self.begin()
464
465 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
466 while True:
467 do = self.iter(retry_state=retry_state)
468 if isinstance(do, DoAttempt):
469 try:
470 result = fn(*args, **kwargs)
471 except BaseException: # noqa: B902
472 retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type]
473 else:
474 retry_state.set_result(result)
475 elif isinstance(do, DoSleep):
476 retry_state.prepare_for_next_attempt()
477 self.sleep(do)
478 else:
479 return do # type: ignore[no-any-return]
480
481
482if sys.version_info >= (3, 9):
483 FutureGenericT = futures.Future[t.Any]
484else:
485 FutureGenericT = futures.Future
486
487
488class Future(FutureGenericT):
489 """Encapsulates a (future or past) attempted call to a target function."""
490
491 def __init__(self, attempt_number: int) -> None:
492 super().__init__()
493 self.attempt_number = attempt_number
494
495 @property
496 def failed(self) -> bool:
497 """Return whether a exception is being held in this future."""
498 return self.exception() is not None
499
500 @classmethod
501 def construct(
502 cls, attempt_number: int, value: t.Any, has_exception: bool
503 ) -> "Future":
504 """Construct a new Future object."""
505 fut = cls(attempt_number)
506 if has_exception:
507 fut.set_exception(value)
508 else:
509 fut.set_result(value)
510 return fut
511
512
513class RetryCallState:
514 """State related to a single call wrapped with Retrying."""
515
516 def __init__(
517 self,
518 retry_object: BaseRetrying,
519 fn: t.Optional[WrappedFn],
520 args: t.Any,
521 kwargs: t.Any,
522 ) -> None:
523 #: Retry call start timestamp
524 self.start_time = time.monotonic()
525 #: Retry manager object
526 self.retry_object = retry_object
527 #: Function wrapped by this retry call
528 self.fn = fn
529 #: Arguments of the function wrapped by this retry call
530 self.args = args
531 #: Keyword arguments of the function wrapped by this retry call
532 self.kwargs = kwargs
533
534 #: The number of the current attempt
535 self.attempt_number: int = 1
536 #: Last outcome (result or exception) produced by the function
537 self.outcome: t.Optional[Future] = None
538 #: Timestamp of the last outcome
539 self.outcome_timestamp: t.Optional[float] = None
540 #: Time spent sleeping in retries
541 self.idle_for: float = 0.0
542 #: Next action as decided by the retry manager
543 self.next_action: t.Optional[RetryAction] = None
544 #: Next sleep time as decided by the retry manager.
545 self.upcoming_sleep: float = 0.0
546
547 @property
548 def seconds_since_start(self) -> t.Optional[float]:
549 if self.outcome_timestamp is None:
550 return None
551 return self.outcome_timestamp - self.start_time
552
553 def prepare_for_next_attempt(self) -> None:
554 self.outcome = None
555 self.outcome_timestamp = None
556 self.attempt_number += 1
557 self.next_action = None
558
559 def set_result(self, val: t.Any) -> None:
560 ts = time.monotonic()
561 fut = Future(self.attempt_number)
562 fut.set_result(val)
563 self.outcome, self.outcome_timestamp = fut, ts
564
565 def set_exception(
566 self,
567 exc_info: t.Tuple[
568 t.Type[BaseException], BaseException, "types.TracebackType| None"
569 ],
570 ) -> None:
571 ts = time.monotonic()
572 fut = Future(self.attempt_number)
573 fut.set_exception(exc_info[1])
574 self.outcome, self.outcome_timestamp = fut, ts
575
576 def __repr__(self) -> str:
577 if self.outcome is None:
578 result = "none yet"
579 elif self.outcome.failed:
580 exception = self.outcome.exception()
581 result = f"failed ({exception.__class__.__name__} {exception})"
582 else:
583 result = f"returned {self.outcome.result()}"
584
585 slept = float(round(self.idle_for, 2))
586 clsname = self.__class__.__name__
587 return f"<{clsname} {id(self)}: attempt #{self.attempt_number}; slept for {slept}; last result: {result}>"
588
589
590@t.overload
591def retry(func: WrappedFn) -> WrappedFn: ...
592
593
594@t.overload
595def retry(
596 sleep: t.Callable[[t.Union[int, float]], t.Optional[t.Awaitable[None]]] = sleep,
597 stop: "StopBaseT" = stop_never,
598 wait: "WaitBaseT" = wait_none(),
599 retry: "RetryBaseT" = retry_if_exception_type(),
600 before: t.Callable[["RetryCallState"], None] = before_nothing,
601 after: t.Callable[["RetryCallState"], None] = after_nothing,
602 before_sleep: t.Optional[t.Callable[["RetryCallState"], None]] = None,
603 reraise: bool = False,
604 retry_error_cls: t.Type["RetryError"] = RetryError,
605 retry_error_callback: t.Optional[t.Callable[["RetryCallState"], t.Any]] = None,
606) -> t.Callable[[WrappedFn], WrappedFn]: ...
607
608
609def retry(*dargs: t.Any, **dkw: t.Any) -> t.Any:
610 """Wrap a function with a new `Retrying` object.
611
612 :param dargs: positional arguments passed to Retrying object
613 :param dkw: keyword arguments passed to the Retrying object
614 """
615 # support both @retry and @retry() as valid syntax
616 if len(dargs) == 1 and callable(dargs[0]):
617 return retry()(dargs[0])
618 else:
619
620 def wrap(f: WrappedFn) -> WrappedFn:
621 if isinstance(f, retry_base):
622 warnings.warn(
623 f"Got retry_base instance ({f.__class__.__name__}) as callable argument, "
624 f"this will probably hang indefinitely (did you mean retry={f.__class__.__name__}(...)?)"
625 )
626 r: "BaseRetrying"
627 if iscoroutinefunction(f):
628 r = AsyncRetrying(*dargs, **dkw)
629 elif (
630 tornado
631 and hasattr(tornado.gen, "is_coroutine_function")
632 and tornado.gen.is_coroutine_function(f)
633 ):
634 r = TornadoRetrying(*dargs, **dkw)
635 else:
636 r = Retrying(*dargs, **dkw)
637
638 return r.wraps(f)
639
640 return wrap
641
642
643from tenacity._asyncio import AsyncRetrying # noqa:E402,I100
644
645if tornado:
646 from tenacity.tornadoweb import TornadoRetrying
647
648
649__all__ = [
650 "retry_base",
651 "retry_all",
652 "retry_always",
653 "retry_any",
654 "retry_if_exception",
655 "retry_if_exception_type",
656 "retry_if_exception_cause_type",
657 "retry_if_not_exception_type",
658 "retry_if_not_result",
659 "retry_if_result",
660 "retry_never",
661 "retry_unless_exception_type",
662 "retry_if_exception_message",
663 "retry_if_not_exception_message",
664 "sleep",
665 "sleep_using_event",
666 "stop_after_attempt",
667 "stop_after_delay",
668 "stop_before_delay",
669 "stop_all",
670 "stop_any",
671 "stop_never",
672 "stop_when_event_set",
673 "wait_chain",
674 "wait_combine",
675 "wait_exponential",
676 "wait_fixed",
677 "wait_incrementing",
678 "wait_none",
679 "wait_random",
680 "wait_random_exponential",
681 "wait_full_jitter",
682 "wait_exponential_jitter",
683 "before_log",
684 "before_nothing",
685 "after_log",
686 "after_nothing",
687 "before_sleep_log",
688 "before_sleep_nothing",
689 "retry",
690 "WrappedFn",
691 "TryAgain",
692 "NO_RESULT",
693 "DoAttempt",
694 "DoSleep",
695 "BaseAction",
696 "RetryAction",
697 "RetryError",
698 "AttemptManager",
699 "BaseRetrying",
700 "Retrying",
701 "Future",
702 "RetryCallState",
703 "AsyncRetrying",
704]