Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyrate_limiter/limit_context_decorator.py: 38%
56 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1import asyncio
2from functools import partial
3from functools import wraps
4from inspect import iscoroutinefunction
5from logging import getLogger
6from time import sleep
7from typing import TYPE_CHECKING
8from typing import Union
10from .exceptions import BucketFullException
12logger = getLogger(__name__)
14if TYPE_CHECKING:
15 from .limiter import Limiter
18class LimitContextDecorator:
19 """A class that can be used as a:
21 * decorator
22 * async decorator
23 * contextmanager
24 * async contextmanager
26 Intended to be used via :py:meth:`.Limiter.ratelimit`. Depending on arguments, calls that exceed
27 the rate limit will either raise an exception, or sleep until space is available in the bucket.
29 Args:
30 limiter: Limiter object
31 identities: Bucket identities
32 delay: Delay until the next request instead of raising an exception
33 max_delay: The maximum allowed delay time (in seconds); anything over this will raise
34 an exception
35 """
37 def __init__(
38 self,
39 limiter: "Limiter",
40 *identities: str,
41 delay: bool = False,
42 max_delay: Union[int, float] = None,
43 ):
44 self.delay = delay
45 self.max_delay = max_delay or 0
46 self.try_acquire = partial(limiter.try_acquire, *identities)
48 def __call__(self, func):
49 """Allows usage as a decorator for both normal and async functions"""
51 @wraps(func)
52 def wrapper(*args, **kwargs):
53 self.delayed_acquire()
54 return func(*args, **kwargs)
56 @wraps(func)
57 async def async_wrapper(*args, **kwargs):
58 await self.async_delayed_acquire()
59 return await func(*args, **kwargs)
61 # Return either an async or normal wrapper, depending on the type of the wrapped function
62 return async_wrapper if iscoroutinefunction(func) else wrapper
64 def __enter__(self):
65 """Allows usage as a contextmanager"""
66 self.delayed_acquire()
68 def __exit__(self, *exc):
69 pass
71 async def __aenter__(self):
72 """Allows usage as an async contextmanager"""
73 await self.async_delayed_acquire()
75 async def __aexit__(self, *exc):
76 pass
78 def delayed_acquire(self):
79 """Delay and retry until we can successfully acquire an available bucket item"""
80 while True:
81 try:
82 self.try_acquire()
83 except BucketFullException as err:
84 delay_time = self.delay_or_reraise(err)
85 sleep(delay_time)
86 else:
87 break
89 async def async_delayed_acquire(self):
90 """Delay and retry until we can successfully acquire an available bucket item"""
91 while True:
92 try:
93 self.try_acquire()
94 except BucketFullException as err:
95 delay_time = self.delay_or_reraise(err)
96 await asyncio.sleep(delay_time)
97 else:
98 break
100 def delay_or_reraise(self, err: BucketFullException) -> float:
101 """Determine if we should delay after exceeding a rate limit. If so, return the delay time,
102 otherwise re-raise the exception.
103 """
104 delay_time = float(err.meta_info["remaining_time"])
105 logger.debug(f"Rate limit reached; {delay_time:.5f} seconds remaining before next request")
106 exceeded_max_delay = bool(self.max_delay) and (delay_time > self.max_delay)
107 if self.delay and not exceeded_max_delay:
108 return delay_time
109 raise err