1"""This module implements token buckets used for client side throttling."""
2
3import threading
4import time
5
6from botocore.exceptions import CapacityNotAvailableError
7
8
9class Clock:
10 def __init__(self):
11 pass
12
13 def sleep(self, amount):
14 time.sleep(amount)
15
16 def current_time(self):
17 return time.time()
18
19
20class TokenBucket:
21 _MIN_RATE = 0.5
22
23 def __init__(self, max_rate, clock, min_rate=_MIN_RATE):
24 self._fill_rate = None
25 self._max_capacity = None
26 self._current_capacity = 0
27 self._clock = clock
28 self._last_timestamp = None
29 self._min_rate = min_rate
30 self._lock = threading.Lock()
31 self._new_fill_rate_condition = threading.Condition(self._lock)
32 self.max_rate = max_rate
33
34 @property
35 def max_rate(self):
36 return self._fill_rate
37
38 @max_rate.setter
39 def max_rate(self, value):
40 with self._new_fill_rate_condition:
41 # Before we can change the rate we need to fill any pending
42 # tokens we might have based on the current rate. If we don't
43 # do this it means everything since the last recorded timestamp
44 # will accumulate at the rate we're about to set which isn't
45 # correct.
46 self._refill()
47 self._fill_rate = max(value, self._min_rate)
48 if value >= 1:
49 self._max_capacity = value
50 else:
51 self._max_capacity = 1
52 # If we're scaling down, we also can't have a capacity that's
53 # more than our max_capacity.
54 self._current_capacity = min(
55 self._current_capacity, self._max_capacity
56 )
57 self._new_fill_rate_condition.notify()
58
59 @property
60 def max_capacity(self):
61 return self._max_capacity
62
63 @property
64 def available_capacity(self):
65 return self._current_capacity
66
67 def acquire(self, amount=1, block=True):
68 """Acquire token or return amount of time until next token available.
69
70 If block is True, then this method will block until there's sufficient
71 capacity to acquire the desired amount.
72
73 If block is False, then this method will return True is capacity
74 was successfully acquired, False otherwise.
75
76 """
77 with self._new_fill_rate_condition:
78 return self._acquire(amount=amount, block=block)
79
80 def _acquire(self, amount, block):
81 self._refill()
82 if amount <= self._current_capacity:
83 self._current_capacity -= amount
84 return True
85 else:
86 if not block:
87 raise CapacityNotAvailableError()
88 # Not enough capacity.
89 sleep_amount = self._sleep_amount(amount)
90 while sleep_amount > 0:
91 # Until python3.2, wait() always returned None so we can't
92 # tell if a timeout occurred waiting on the cond var.
93 # Because of this we'll unconditionally call _refill().
94 # The downside to this is that we were waken up via
95 # a notify(), we're calling unnecessarily calling _refill() an
96 # extra time.
97 self._new_fill_rate_condition.wait(sleep_amount)
98 self._refill()
99 sleep_amount = self._sleep_amount(amount)
100 self._current_capacity -= amount
101 return True
102
103 def _sleep_amount(self, amount):
104 return (amount - self._current_capacity) / self._fill_rate
105
106 def _refill(self):
107 timestamp = self._clock.current_time()
108 if self._last_timestamp is None:
109 self._last_timestamp = timestamp
110 return
111 current_capacity = self._current_capacity
112 fill_amount = (timestamp - self._last_timestamp) * self._fill_rate
113 new_capacity = min(self._max_capacity, current_capacity + fill_amount)
114 self._current_capacity = new_capacity
115 self._last_timestamp = timestamp