1# Copyright 2016 Étienne Bersac
2# Copyright 2016 Julien Danjou
3# Copyright 2016 Joshua Harlow
4# Copyright 2013-2014 Ray Holder
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import functools
19import sys
20import typing as t
21
22from tenacity import AttemptManager
23from tenacity import BaseRetrying
24from tenacity import DoAttempt
25from tenacity import DoSleep
26from tenacity import RetryCallState
27from tenacity import _utils
28
29WrappedFnReturnT = t.TypeVar("WrappedFnReturnT")
30WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Awaitable[t.Any]])
31
32
33def asyncio_sleep(duration: float) -> t.Awaitable[None]:
34 # Lazy import asyncio as it's expensive (responsible for 25-50% of total import overhead).
35 import asyncio
36
37 return asyncio.sleep(duration)
38
39
40class AsyncRetrying(BaseRetrying):
41 sleep: t.Callable[[float], t.Awaitable[t.Any]]
42
43 def __init__(
44 self,
45 sleep: t.Callable[[float], t.Awaitable[t.Any]] = asyncio_sleep,
46 **kwargs: t.Any,
47 ) -> None:
48 super().__init__(**kwargs)
49 self.sleep = sleep
50
51 async def __call__( # type: ignore[override]
52 self, fn: WrappedFn, *args: t.Any, **kwargs: t.Any
53 ) -> WrappedFnReturnT:
54 self.begin()
55
56 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
57 while True:
58 do = await self.iter(retry_state=retry_state)
59 if isinstance(do, DoAttempt):
60 try:
61 result = await fn(*args, **kwargs)
62 except BaseException: # noqa: B902
63 retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type]
64 else:
65 retry_state.set_result(result)
66 elif isinstance(do, DoSleep):
67 retry_state.prepare_for_next_attempt()
68 await self.sleep(do)
69 else:
70 return do # type: ignore[no-any-return]
71
72 @classmethod
73 def _wrap_action_func(cls, fn: t.Callable[..., t.Any]) -> t.Callable[..., t.Any]:
74 if _utils.is_coroutine_callable(fn):
75 return fn
76
77 async def inner(*args: t.Any, **kwargs: t.Any) -> t.Any:
78 return fn(*args, **kwargs)
79
80 return inner
81
82 def _add_action_func(self, fn: t.Callable[..., t.Any]) -> None:
83 self.iter_state.actions.append(self._wrap_action_func(fn))
84
85 async def _run_retry(self, retry_state: "RetryCallState") -> None: # type: ignore[override]
86 self.iter_state.retry_run_result = await self._wrap_action_func(self.retry)(
87 retry_state
88 )
89
90 async def _run_wait(self, retry_state: "RetryCallState") -> None: # type: ignore[override]
91 if self.wait:
92 sleep = await self._wrap_action_func(self.wait)(retry_state)
93 else:
94 sleep = 0.0
95
96 retry_state.upcoming_sleep = sleep
97
98 async def _run_stop(self, retry_state: "RetryCallState") -> None: # type: ignore[override]
99 self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start
100 self.iter_state.stop_run_result = await self._wrap_action_func(self.stop)(
101 retry_state
102 )
103
104 async def iter(
105 self, retry_state: "RetryCallState"
106 ) -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa: A003
107 self._begin_iter(retry_state)
108 result = None
109 for action in self.iter_state.actions:
110 result = await action(retry_state)
111 return result
112
113 def __iter__(self) -> t.Generator[AttemptManager, None, None]:
114 raise TypeError("AsyncRetrying object is not iterable")
115
116 def __aiter__(self) -> "AsyncRetrying":
117 self.begin()
118 self._retry_state = RetryCallState(self, fn=None, args=(), kwargs={})
119 return self
120
121 async def __anext__(self) -> AttemptManager:
122 while True:
123 do = await self.iter(retry_state=self._retry_state)
124 if do is None:
125 raise StopAsyncIteration
126 elif isinstance(do, DoAttempt):
127 return AttemptManager(retry_state=self._retry_state)
128 elif isinstance(do, DoSleep):
129 self._retry_state.prepare_for_next_attempt()
130 await self.sleep(do)
131 else:
132 raise StopAsyncIteration
133
134 def wraps(self, fn: WrappedFn) -> WrappedFn:
135 fn = super().wraps(fn)
136 # Ensure wrapper is recognized as a coroutine function.
137
138 @functools.wraps(
139 fn, functools.WRAPPER_ASSIGNMENTS + ("__defaults__", "__kwdefaults__")
140 )
141 async def async_wrapped(*args: t.Any, **kwargs: t.Any) -> t.Any:
142 return await fn(*args, **kwargs)
143
144 # Preserve attributes
145 async_wrapped.retry = fn.retry # type: ignore[attr-defined]
146 async_wrapped.retry_with = fn.retry_with # type: ignore[attr-defined]
147
148 return async_wrapped # type: ignore[return-value]