Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

101 statements  

1# Copyright 2016 Étienne Bersac 

2# Copyright 2016 Julien Danjou 

3# Copyright 2016 Joshua Harlow 

4# Copyright 2013-2014 Ray Holder 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17 

18import functools 

19import sys 

20import typing as t 

21 

22import tenacity 

23from tenacity import AttemptManager 

24from tenacity import BaseRetrying 

25from tenacity import DoAttempt 

26from tenacity import DoSleep 

27from tenacity import RetryCallState 

28from tenacity import RetryError 

29from tenacity import after_nothing 

30from tenacity import before_nothing 

31from tenacity import _utils 

32 

33# Import all built-in retry strategies for easier usage. 

34from .retry import RetryBaseT 

35from .retry import retry_all # noqa 

36from .retry import retry_any # noqa 

37from .retry import retry_if_exception # noqa 

38from .retry import retry_if_result # noqa 

39from ..retry import RetryBaseT as SyncRetryBaseT 

40 

41if t.TYPE_CHECKING: 

42 from tenacity.stop import StopBaseT 

43 from tenacity.wait import WaitBaseT 

44 

45WrappedFnReturnT = t.TypeVar("WrappedFnReturnT") 

46WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Awaitable[t.Any]]) 

47 

48 

49def _portable_async_sleep(seconds: float) -> t.Awaitable[None]: 

50 # If trio is already imported, then importing it is cheap. 

51 # If trio isn't already imported, then it's definitely not running, so we 

52 # can skip further checks. 

53 if "trio" in sys.modules: 

54 # If trio is available, then sniffio is too 

55 import trio 

56 import sniffio 

57 

58 if sniffio.current_async_library() == "trio": 

59 return trio.sleep(seconds) 

60 # Otherwise, assume asyncio 

61 # Lazy import asyncio as it's expensive (responsible for 25-50% of total import overhead). 

62 import asyncio 

63 

64 return asyncio.sleep(seconds) 

65 

66 

67class AsyncRetrying(BaseRetrying): 

68 def __init__( 

69 self, 

70 sleep: t.Callable[ 

71 [t.Union[int, float]], t.Union[None, t.Awaitable[None]] 

72 ] = _portable_async_sleep, 

73 stop: "StopBaseT" = tenacity.stop.stop_never, 

74 wait: "WaitBaseT" = tenacity.wait.wait_none(), 

75 retry: "t.Union[SyncRetryBaseT, RetryBaseT]" = tenacity.retry_if_exception_type(), 

76 before: t.Callable[ 

77 ["RetryCallState"], t.Union[None, t.Awaitable[None]] 

78 ] = before_nothing, 

79 after: t.Callable[ 

80 ["RetryCallState"], t.Union[None, t.Awaitable[None]] 

81 ] = after_nothing, 

82 before_sleep: t.Optional[ 

83 t.Callable[["RetryCallState"], t.Union[None, t.Awaitable[None]]] 

84 ] = None, 

85 reraise: bool = False, 

86 retry_error_cls: t.Type["RetryError"] = RetryError, 

87 retry_error_callback: t.Optional[ 

88 t.Callable[["RetryCallState"], t.Union[t.Any, t.Awaitable[t.Any]]] 

89 ] = None, 

90 ) -> None: 

91 super().__init__( 

92 sleep=sleep, # type: ignore[arg-type] 

93 stop=stop, 

94 wait=wait, 

95 retry=retry, # type: ignore[arg-type] 

96 before=before, # type: ignore[arg-type] 

97 after=after, # type: ignore[arg-type] 

98 before_sleep=before_sleep, # type: ignore[arg-type] 

99 reraise=reraise, 

100 retry_error_cls=retry_error_cls, 

101 retry_error_callback=retry_error_callback, 

102 ) 

103 

104 async def __call__( # type: ignore[override] 

105 self, fn: WrappedFn, *args: t.Any, **kwargs: t.Any 

106 ) -> WrappedFnReturnT: 

107 self.begin() 

108 

109 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs) 

110 is_async = _utils.is_coroutine_callable(fn) 

111 while True: 

112 do = await self.iter(retry_state=retry_state) 

113 if isinstance(do, DoAttempt): 

114 try: 

115 if is_async: 

116 result = await fn(*args, **kwargs) 

117 else: 

118 result = fn(*args, **kwargs) 

119 except BaseException: # noqa: B902 

120 retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type] 

121 else: 

122 retry_state.set_result(result) 

123 elif isinstance(do, DoSleep): 

124 retry_state.prepare_for_next_attempt() 

125 await self.sleep(do) # type: ignore[misc] 

126 else: 

127 return do # type: ignore[no-any-return] 

128 

129 def _add_action_func(self, fn: t.Callable[..., t.Any]) -> None: 

130 self.iter_state.actions.append(_utils.wrap_to_async_func(fn)) 

131 

132 async def _run_retry(self, retry_state: "RetryCallState") -> None: # type: ignore[override] 

133 self.iter_state.retry_run_result = await _utils.wrap_to_async_func(self.retry)( 

134 retry_state 

135 ) 

136 

137 async def _run_wait(self, retry_state: "RetryCallState") -> None: # type: ignore[override] 

138 if self.wait: 

139 sleep = await _utils.wrap_to_async_func(self.wait)(retry_state) 

140 else: 

141 sleep = 0.0 

142 

143 retry_state.upcoming_sleep = sleep 

144 

145 async def _run_stop(self, retry_state: "RetryCallState") -> None: # type: ignore[override] 

146 self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start 

147 self.iter_state.stop_run_result = await _utils.wrap_to_async_func(self.stop)( 

148 retry_state 

149 ) 

150 

151 async def iter( 

152 self, retry_state: "RetryCallState" 

153 ) -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa: A003 

154 self._begin_iter(retry_state) 

155 result = None 

156 for action in self.iter_state.actions: 

157 result = await action(retry_state) 

158 return result 

159 

160 def __iter__(self) -> t.Generator[AttemptManager, None, None]: 

161 raise TypeError("AsyncRetrying object is not iterable") 

162 

163 def __aiter__(self) -> "AsyncRetrying": 

164 self.begin() 

165 self._retry_state = RetryCallState(self, fn=None, args=(), kwargs={}) 

166 return self 

167 

168 async def __anext__(self) -> AttemptManager: 

169 while True: 

170 do = await self.iter(retry_state=self._retry_state) 

171 if do is None: 

172 raise StopAsyncIteration 

173 elif isinstance(do, DoAttempt): 

174 return AttemptManager(retry_state=self._retry_state) 

175 elif isinstance(do, DoSleep): 

176 self._retry_state.prepare_for_next_attempt() 

177 await self.sleep(do) # type: ignore[misc] 

178 else: 

179 raise StopAsyncIteration 

180 

181 def wraps(self, fn: WrappedFn) -> WrappedFn: 

182 wrapped = super().wraps(fn) 

183 # Ensure wrapper is recognized as a coroutine function. 

184 

185 @functools.wraps( 

186 fn, functools.WRAPPER_ASSIGNMENTS + ("__defaults__", "__kwdefaults__") 

187 ) 

188 async def async_wrapped(*args: t.Any, **kwargs: t.Any) -> t.Any: 

189 # Always create a copy to prevent overwriting the local contexts when 

190 # calling the same wrapped functions multiple times in the same stack 

191 copy = self.copy() 

192 async_wrapped.statistics = copy.statistics # type: ignore[attr-defined] 

193 return await copy(fn, *args, **kwargs) 

194 

195 # Preserve attributes 

196 async_wrapped.retry = self # type: ignore[attr-defined] 

197 async_wrapped.retry_with = wrapped.retry_with # type: ignore[attr-defined] 

198 async_wrapped.statistics = {} # type: ignore[attr-defined] 

199 

200 return async_wrapped # type: ignore[return-value] 

201 

202 

203__all__ = [ 

204 "retry_all", 

205 "retry_any", 

206 "retry_if_exception", 

207 "retry_if_result", 

208 "WrappedFn", 

209 "AsyncRetrying", 

210]