Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tenacity/_asyncio.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

82 statements  

1# Copyright 2016 Étienne Bersac 

2# Copyright 2016 Julien Danjou 

3# Copyright 2016 Joshua Harlow 

4# Copyright 2013-2014 Ray Holder 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17 

18import functools 

19import sys 

20import typing as t 

21 

22from tenacity import AttemptManager 

23from tenacity import BaseRetrying 

24from tenacity import DoAttempt 

25from tenacity import DoSleep 

26from tenacity import RetryCallState 

27from tenacity import _utils 

28 

29WrappedFnReturnT = t.TypeVar("WrappedFnReturnT") 

30WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Awaitable[t.Any]]) 

31 

32 

33def asyncio_sleep(duration: float) -> t.Awaitable[None]: 

34 # Lazy import asyncio as it's expensive (responsible for 25-50% of total import overhead). 

35 import asyncio 

36 

37 return asyncio.sleep(duration) 

38 

39 

40class AsyncRetrying(BaseRetrying): 

41 sleep: t.Callable[[float], t.Awaitable[t.Any]] 

42 

43 def __init__( 

44 self, 

45 sleep: t.Callable[[float], t.Awaitable[t.Any]] = asyncio_sleep, 

46 **kwargs: t.Any, 

47 ) -> None: 

48 super().__init__(**kwargs) 

49 self.sleep = sleep 

50 

51 async def __call__( # type: ignore[override] 

52 self, fn: WrappedFn, *args: t.Any, **kwargs: t.Any 

53 ) -> WrappedFnReturnT: 

54 self.begin() 

55 

56 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs) 

57 while True: 

58 do = await self.iter(retry_state=retry_state) 

59 if isinstance(do, DoAttempt): 

60 try: 

61 result = await fn(*args, **kwargs) 

62 except BaseException: # noqa: B902 

63 retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type] 

64 else: 

65 retry_state.set_result(result) 

66 elif isinstance(do, DoSleep): 

67 retry_state.prepare_for_next_attempt() 

68 await self.sleep(do) 

69 else: 

70 return do # type: ignore[no-any-return] 

71 

72 @classmethod 

73 def _wrap_action_func(cls, fn: t.Callable[..., t.Any]) -> t.Callable[..., t.Any]: 

74 if _utils.is_coroutine_callable(fn): 

75 return fn 

76 

77 async def inner(*args: t.Any, **kwargs: t.Any) -> t.Any: 

78 return fn(*args, **kwargs) 

79 

80 return inner 

81 

82 def _add_action_func(self, fn: t.Callable[..., t.Any]) -> None: 

83 self.iter_state.actions.append(self._wrap_action_func(fn)) 

84 

85 async def _run_retry(self, retry_state: "RetryCallState") -> None: # type: ignore[override] 

86 self.iter_state.retry_run_result = await self._wrap_action_func(self.retry)( 

87 retry_state 

88 ) 

89 

90 async def _run_wait(self, retry_state: "RetryCallState") -> None: # type: ignore[override] 

91 if self.wait: 

92 sleep = await self._wrap_action_func(self.wait)(retry_state) 

93 else: 

94 sleep = 0.0 

95 

96 retry_state.upcoming_sleep = sleep 

97 

98 async def _run_stop(self, retry_state: "RetryCallState") -> None: # type: ignore[override] 

99 self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start 

100 self.iter_state.stop_run_result = await self._wrap_action_func(self.stop)( 

101 retry_state 

102 ) 

103 

104 async def iter( 

105 self, retry_state: "RetryCallState" 

106 ) -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa: A003 

107 self._begin_iter(retry_state) 

108 result = None 

109 for action in self.iter_state.actions: 

110 result = await action(retry_state) 

111 return result 

112 

113 def __iter__(self) -> t.Generator[AttemptManager, None, None]: 

114 raise TypeError("AsyncRetrying object is not iterable") 

115 

116 def __aiter__(self) -> "AsyncRetrying": 

117 self.begin() 

118 self._retry_state = RetryCallState(self, fn=None, args=(), kwargs={}) 

119 return self 

120 

121 async def __anext__(self) -> AttemptManager: 

122 while True: 

123 do = await self.iter(retry_state=self._retry_state) 

124 if do is None: 

125 raise StopAsyncIteration 

126 elif isinstance(do, DoAttempt): 

127 return AttemptManager(retry_state=self._retry_state) 

128 elif isinstance(do, DoSleep): 

129 self._retry_state.prepare_for_next_attempt() 

130 await self.sleep(do) 

131 else: 

132 raise StopAsyncIteration 

133 

134 def wraps(self, fn: WrappedFn) -> WrappedFn: 

135 fn = super().wraps(fn) 

136 # Ensure wrapper is recognized as a coroutine function. 

137 

138 @functools.wraps( 

139 fn, functools.WRAPPER_ASSIGNMENTS + ("__defaults__", "__kwdefaults__") 

140 ) 

141 async def async_wrapped(*args: t.Any, **kwargs: t.Any) -> t.Any: 

142 return await fn(*args, **kwargs) 

143 

144 # Preserve attributes 

145 async_wrapped.retry = fn.retry # type: ignore[attr-defined] 

146 async_wrapped.retry_with = fn.retry_with # type: ignore[attr-defined] 

147 

148 return async_wrapped # type: ignore[return-value]