Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyrate_limiter/limit_context_decorator.py: 38%

56 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1import asyncio 

2from functools import partial 

3from functools import wraps 

4from inspect import iscoroutinefunction 

5from logging import getLogger 

6from time import sleep 

7from typing import TYPE_CHECKING 

8from typing import Union 

9 

10from .exceptions import BucketFullException 

11 

12logger = getLogger(__name__) 

13 

14if TYPE_CHECKING: 

15 from .limiter import Limiter 

16 

17 

18class LimitContextDecorator: 

19 """A class that can be used as a: 

20 

21 * decorator 

22 * async decorator 

23 * contextmanager 

24 * async contextmanager 

25 

26 Intended to be used via :py:meth:`.Limiter.ratelimit`. Depending on arguments, calls that exceed 

27 the rate limit will either raise an exception, or sleep until space is available in the bucket. 

28 

29 Args: 

30 limiter: Limiter object 

31 identities: Bucket identities 

32 delay: Delay until the next request instead of raising an exception 

33 max_delay: The maximum allowed delay time (in seconds); anything over this will raise 

34 an exception 

35 """ 

36 

37 def __init__( 

38 self, 

39 limiter: "Limiter", 

40 *identities: str, 

41 delay: bool = False, 

42 max_delay: Union[int, float] = None, 

43 ): 

44 self.delay = delay 

45 self.max_delay = max_delay or 0 

46 self.try_acquire = partial(limiter.try_acquire, *identities) 

47 

48 def __call__(self, func): 

49 """Allows usage as a decorator for both normal and async functions""" 

50 

51 @wraps(func) 

52 def wrapper(*args, **kwargs): 

53 self.delayed_acquire() 

54 return func(*args, **kwargs) 

55 

56 @wraps(func) 

57 async def async_wrapper(*args, **kwargs): 

58 await self.async_delayed_acquire() 

59 return await func(*args, **kwargs) 

60 

61 # Return either an async or normal wrapper, depending on the type of the wrapped function 

62 return async_wrapper if iscoroutinefunction(func) else wrapper 

63 

64 def __enter__(self): 

65 """Allows usage as a contextmanager""" 

66 self.delayed_acquire() 

67 

68 def __exit__(self, *exc): 

69 pass 

70 

71 async def __aenter__(self): 

72 """Allows usage as an async contextmanager""" 

73 await self.async_delayed_acquire() 

74 

75 async def __aexit__(self, *exc): 

76 pass 

77 

78 def delayed_acquire(self): 

79 """Delay and retry until we can successfully acquire an available bucket item""" 

80 while True: 

81 try: 

82 self.try_acquire() 

83 except BucketFullException as err: 

84 delay_time = self.delay_or_reraise(err) 

85 sleep(delay_time) 

86 else: 

87 break 

88 

89 async def async_delayed_acquire(self): 

90 """Delay and retry until we can successfully acquire an available bucket item""" 

91 while True: 

92 try: 

93 self.try_acquire() 

94 except BucketFullException as err: 

95 delay_time = self.delay_or_reraise(err) 

96 await asyncio.sleep(delay_time) 

97 else: 

98 break 

99 

100 def delay_or_reraise(self, err: BucketFullException) -> float: 

101 """Determine if we should delay after exceeding a rate limit. If so, return the delay time, 

102 otherwise re-raise the exception. 

103 """ 

104 delay_time = float(err.meta_info["remaining_time"]) 

105 logger.debug(f"Rate limit reached; {delay_time:.5f} seconds remaining before next request") 

106 exceeded_max_delay = bool(self.max_delay) and (delay_time > self.max_delay) 

107 if self.delay and not exceeded_max_delay: 

108 return delay_time 

109 raise err