Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/retries/bucket.py: 31%
70 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1"""This module implements token buckets used for client side throttling."""
2import threading
3import time
5from botocore.exceptions import CapacityNotAvailableError
8class Clock:
9 def __init__(self):
10 pass
12 def sleep(self, amount):
13 time.sleep(amount)
15 def current_time(self):
16 return time.time()
19class TokenBucket:
20 _MIN_RATE = 0.5
22 def __init__(self, max_rate, clock, min_rate=_MIN_RATE):
23 self._fill_rate = None
24 self._max_capacity = None
25 self._current_capacity = 0
26 self._clock = clock
27 self._last_timestamp = None
28 self._min_rate = min_rate
29 self._lock = threading.Lock()
30 self._new_fill_rate_condition = threading.Condition(self._lock)
31 self.max_rate = max_rate
33 @property
34 def max_rate(self):
35 return self._fill_rate
37 @max_rate.setter
38 def max_rate(self, value):
39 with self._new_fill_rate_condition:
40 # Before we can change the rate we need to fill any pending
41 # tokens we might have based on the current rate. If we don't
42 # do this it means everything since the last recorded timestamp
43 # will accumulate at the rate we're about to set which isn't
44 # correct.
45 self._refill()
46 self._fill_rate = max(value, self._min_rate)
47 if value >= 1:
48 self._max_capacity = value
49 else:
50 self._max_capacity = 1
51 # If we're scaling down, we also can't have a capacity that's
52 # more than our max_capacity.
53 self._current_capacity = min(
54 self._current_capacity, self._max_capacity
55 )
56 self._new_fill_rate_condition.notify()
58 @property
59 def max_capacity(self):
60 return self._max_capacity
62 @property
63 def available_capacity(self):
64 return self._current_capacity
66 def acquire(self, amount=1, block=True):
67 """Acquire token or return amount of time until next token available.
69 If block is True, then this method will block until there's sufficient
70 capacity to acquire the desired amount.
72 If block is False, then this method will return True is capacity
73 was successfully acquired, False otherwise.
75 """
76 with self._new_fill_rate_condition:
77 return self._acquire(amount=amount, block=block)
79 def _acquire(self, amount, block):
80 self._refill()
81 if amount <= self._current_capacity:
82 self._current_capacity -= amount
83 return True
84 else:
85 if not block:
86 raise CapacityNotAvailableError()
87 # Not enough capacity.
88 sleep_amount = self._sleep_amount(amount)
89 while sleep_amount > 0:
90 # Until python3.2, wait() always returned None so we can't
91 # tell if a timeout occurred waiting on the cond var.
92 # Because of this we'll unconditionally call _refill().
93 # The downside to this is that we were waken up via
94 # a notify(), we're calling unnecessarily calling _refill() an
95 # extra time.
96 self._new_fill_rate_condition.wait(sleep_amount)
97 self._refill()
98 sleep_amount = self._sleep_amount(amount)
99 self._current_capacity -= amount
100 return True
102 def _sleep_amount(self, amount):
103 return (amount - self._current_capacity) / self._fill_rate
105 def _refill(self):
106 timestamp = self._clock.current_time()
107 if self._last_timestamp is None:
108 self._last_timestamp = timestamp
109 return
110 current_capacity = self._current_capacity
111 fill_amount = (timestamp - self._last_timestamp) * self._fill_rate
112 new_capacity = min(self._max_capacity, current_capacity + fill_amount)
113 self._current_capacity = new_capacity
114 self._last_timestamp = timestamp