Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_vendor/tenacity/_asyncio.py: 35%

54 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1# Copyright 2016 Étienne Bersac 

2# Copyright 2016 Julien Danjou 

3# Copyright 2016 Joshua Harlow 

4# Copyright 2013-2014 Ray Holder 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17 

18import functools 

19import sys 

20import typing as t 

21from asyncio import sleep 

22 

23from pip._vendor.tenacity import AttemptManager 

24from pip._vendor.tenacity import BaseRetrying 

25from pip._vendor.tenacity import DoAttempt 

26from pip._vendor.tenacity import DoSleep 

27from pip._vendor.tenacity import RetryCallState 

28 

29WrappedFnReturnT = t.TypeVar("WrappedFnReturnT") 

30WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Awaitable[t.Any]]) 

31 

32 

33class AsyncRetrying(BaseRetrying): 

34 sleep: t.Callable[[float], t.Awaitable[t.Any]] 

35 

36 def __init__(self, sleep: t.Callable[[float], t.Awaitable[t.Any]] = sleep, **kwargs: t.Any) -> None: 

37 super().__init__(**kwargs) 

38 self.sleep = sleep 

39 

40 async def __call__( # type: ignore[override] 

41 self, fn: WrappedFn, *args: t.Any, **kwargs: t.Any 

42 ) -> WrappedFnReturnT: 

43 self.begin() 

44 

45 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs) 

46 while True: 

47 do = self.iter(retry_state=retry_state) 

48 if isinstance(do, DoAttempt): 

49 try: 

50 result = await fn(*args, **kwargs) 

51 except BaseException: # noqa: B902 

52 retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type] 

53 else: 

54 retry_state.set_result(result) 

55 elif isinstance(do, DoSleep): 

56 retry_state.prepare_for_next_attempt() 

57 await self.sleep(do) 

58 else: 

59 return do # type: ignore[no-any-return] 

60 

61 def __iter__(self) -> t.Generator[AttemptManager, None, None]: 

62 raise TypeError("AsyncRetrying object is not iterable") 

63 

64 def __aiter__(self) -> "AsyncRetrying": 

65 self.begin() 

66 self._retry_state = RetryCallState(self, fn=None, args=(), kwargs={}) 

67 return self 

68 

69 async def __anext__(self) -> AttemptManager: 

70 while True: 

71 do = self.iter(retry_state=self._retry_state) 

72 if do is None: 

73 raise StopAsyncIteration 

74 elif isinstance(do, DoAttempt): 

75 return AttemptManager(retry_state=self._retry_state) 

76 elif isinstance(do, DoSleep): 

77 self._retry_state.prepare_for_next_attempt() 

78 await self.sleep(do) 

79 else: 

80 raise StopAsyncIteration 

81 

82 def wraps(self, fn: WrappedFn) -> WrappedFn: 

83 fn = super().wraps(fn) 

84 # Ensure wrapper is recognized as a coroutine function. 

85 

86 @functools.wraps(fn) 

87 async def async_wrapped(*args: t.Any, **kwargs: t.Any) -> t.Any: 

88 return await fn(*args, **kwargs) 

89 

90 # Preserve attributes 

91 async_wrapped.retry = fn.retry # type: ignore[attr-defined] 

92 async_wrapped.retry_with = fn.retry_with # type: ignore[attr-defined] 

93 

94 return async_wrapped # type: ignore[return-value]