Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tenacity/asyncio/__init__.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

98 statements  

1# Copyright 2016 Étienne Bersac 

2# Copyright 2016 Julien Danjou 

3# Copyright 2016 Joshua Harlow 

4# Copyright 2013-2014 Ray Holder 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17 

18import functools 

19import sys 

20import typing as t 

21 

22import tenacity 

23from tenacity import AttemptManager 

24from tenacity import BaseRetrying 

25from tenacity import DoAttempt 

26from tenacity import DoSleep 

27from tenacity import RetryCallState 

28from tenacity import RetryError 

29from tenacity import after_nothing 

30from tenacity import before_nothing 

31from tenacity import _utils 

32 

33# Import all built-in retry strategies for easier usage. 

34from .retry import RetryBaseT 

35from .retry import retry_all # noqa 

36from .retry import retry_any # noqa 

37from .retry import retry_if_exception # noqa 

38from .retry import retry_if_result # noqa 

39from ..retry import RetryBaseT as SyncRetryBaseT 

40 

41if t.TYPE_CHECKING: 

42 from tenacity.stop import StopBaseT 

43 from tenacity.wait import WaitBaseT 

44 

45WrappedFnReturnT = t.TypeVar("WrappedFnReturnT") 

46WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Awaitable[t.Any]]) 

47 

48 

49def _portable_async_sleep(seconds: float) -> t.Awaitable[None]: 

50 # If trio is already imported, then importing it is cheap. 

51 # If trio isn't already imported, then it's definitely not running, so we 

52 # can skip further checks. 

53 if "trio" in sys.modules: 

54 # If trio is available, then sniffio is too 

55 import trio 

56 import sniffio 

57 

58 if sniffio.current_async_library() == "trio": 

59 return trio.sleep(seconds) 

60 # Otherwise, assume asyncio 

61 # Lazy import asyncio as it's expensive (responsible for 25-50% of total import overhead). 

62 import asyncio 

63 

64 return asyncio.sleep(seconds) 

65 

66 

67class AsyncRetrying(BaseRetrying): 

68 def __init__( 

69 self, 

70 sleep: t.Callable[ 

71 [t.Union[int, float]], t.Union[None, t.Awaitable[None]] 

72 ] = _portable_async_sleep, 

73 stop: "StopBaseT" = tenacity.stop.stop_never, 

74 wait: "WaitBaseT" = tenacity.wait.wait_none(), 

75 retry: "t.Union[SyncRetryBaseT, RetryBaseT]" = tenacity.retry_if_exception_type(), 

76 before: t.Callable[ 

77 ["RetryCallState"], t.Union[None, t.Awaitable[None]] 

78 ] = before_nothing, 

79 after: t.Callable[ 

80 ["RetryCallState"], t.Union[None, t.Awaitable[None]] 

81 ] = after_nothing, 

82 before_sleep: t.Optional[ 

83 t.Callable[["RetryCallState"], t.Union[None, t.Awaitable[None]]] 

84 ] = None, 

85 reraise: bool = False, 

86 retry_error_cls: t.Type["RetryError"] = RetryError, 

87 retry_error_callback: t.Optional[ 

88 t.Callable[["RetryCallState"], t.Union[t.Any, t.Awaitable[t.Any]]] 

89 ] = None, 

90 ) -> None: 

91 super().__init__( 

92 sleep=sleep, # type: ignore[arg-type] 

93 stop=stop, 

94 wait=wait, 

95 retry=retry, # type: ignore[arg-type] 

96 before=before, # type: ignore[arg-type] 

97 after=after, # type: ignore[arg-type] 

98 before_sleep=before_sleep, # type: ignore[arg-type] 

99 reraise=reraise, 

100 retry_error_cls=retry_error_cls, 

101 retry_error_callback=retry_error_callback, 

102 ) 

103 

104 async def __call__( # type: ignore[override] 

105 self, fn: WrappedFn, *args: t.Any, **kwargs: t.Any 

106 ) -> WrappedFnReturnT: 

107 self.begin() 

108 

109 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs) 

110 while True: 

111 do = await self.iter(retry_state=retry_state) 

112 if isinstance(do, DoAttempt): 

113 try: 

114 result = await fn(*args, **kwargs) 

115 except BaseException: # noqa: B902 

116 retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type] 

117 else: 

118 retry_state.set_result(result) 

119 elif isinstance(do, DoSleep): 

120 retry_state.prepare_for_next_attempt() 

121 await self.sleep(do) # type: ignore[misc] 

122 else: 

123 return do # type: ignore[no-any-return] 

124 

125 def _add_action_func(self, fn: t.Callable[..., t.Any]) -> None: 

126 self.iter_state.actions.append(_utils.wrap_to_async_func(fn)) 

127 

128 async def _run_retry(self, retry_state: "RetryCallState") -> None: # type: ignore[override] 

129 self.iter_state.retry_run_result = await _utils.wrap_to_async_func(self.retry)( 

130 retry_state 

131 ) 

132 

133 async def _run_wait(self, retry_state: "RetryCallState") -> None: # type: ignore[override] 

134 if self.wait: 

135 sleep = await _utils.wrap_to_async_func(self.wait)(retry_state) 

136 else: 

137 sleep = 0.0 

138 

139 retry_state.upcoming_sleep = sleep 

140 

141 async def _run_stop(self, retry_state: "RetryCallState") -> None: # type: ignore[override] 

142 self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start 

143 self.iter_state.stop_run_result = await _utils.wrap_to_async_func(self.stop)( 

144 retry_state 

145 ) 

146 

147 async def iter( 

148 self, retry_state: "RetryCallState" 

149 ) -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa: A003 

150 self._begin_iter(retry_state) 

151 result = None 

152 for action in self.iter_state.actions: 

153 result = await action(retry_state) 

154 return result 

155 

156 def __iter__(self) -> t.Generator[AttemptManager, None, None]: 

157 raise TypeError("AsyncRetrying object is not iterable") 

158 

159 def __aiter__(self) -> "AsyncRetrying": 

160 self.begin() 

161 self._retry_state = RetryCallState(self, fn=None, args=(), kwargs={}) 

162 return self 

163 

164 async def __anext__(self) -> AttemptManager: 

165 while True: 

166 do = await self.iter(retry_state=self._retry_state) 

167 if do is None: 

168 raise StopAsyncIteration 

169 elif isinstance(do, DoAttempt): 

170 return AttemptManager(retry_state=self._retry_state) 

171 elif isinstance(do, DoSleep): 

172 self._retry_state.prepare_for_next_attempt() 

173 await self.sleep(do) # type: ignore[misc] 

174 else: 

175 raise StopAsyncIteration 

176 

177 def wraps(self, fn: WrappedFn) -> WrappedFn: 

178 wrapped = super().wraps(fn) 

179 # Ensure wrapper is recognized as a coroutine function. 

180 

181 @functools.wraps( 

182 fn, functools.WRAPPER_ASSIGNMENTS + ("__defaults__", "__kwdefaults__") 

183 ) 

184 async def async_wrapped(*args: t.Any, **kwargs: t.Any) -> t.Any: 

185 # Always create a copy to prevent overwriting the local contexts when 

186 # calling the same wrapped functions multiple times in the same stack 

187 copy = self.copy() 

188 async_wrapped.statistics = copy.statistics # type: ignore[attr-defined] 

189 return await copy(fn, *args, **kwargs) 

190 

191 # Preserve attributes 

192 async_wrapped.retry = self # type: ignore[attr-defined] 

193 async_wrapped.retry_with = wrapped.retry_with # type: ignore[attr-defined] 

194 async_wrapped.statistics = {} # type: ignore[attr-defined] 

195 

196 return async_wrapped # type: ignore[return-value] 

197 

198 

199__all__ = [ 

200 "retry_all", 

201 "retry_any", 

202 "retry_if_exception", 

203 "retry_if_result", 

204 "WrappedFn", 

205 "AsyncRetrying", 

206]