1"""Retry quota implementation."""
2
3import threading
4
5
6class RetryQuota:
7 INITIAL_CAPACITY = 500
8
9 def __init__(self, initial_capacity=INITIAL_CAPACITY, lock=None):
10 self._max_capacity = initial_capacity
11 self._available_capacity = initial_capacity
12 if lock is None:
13 lock = threading.Lock()
14 self._lock = lock
15
16 def acquire(self, capacity_amount):
17 """Attempt to aquire a certain amount of capacity.
18
19 If there's not sufficient amount of capacity available, ``False``
20 is returned. Otherwise, ``True`` is returned, which indicates that
21 capacity was successfully allocated.
22
23 """
24 # The acquire() is only called when we encounter a retryable
25 # response so we aren't worried about locking the entire method.
26 with self._lock:
27 if capacity_amount > self._available_capacity:
28 return False
29 self._available_capacity -= capacity_amount
30 return True
31
32 def release(self, capacity_amount):
33 """Release capacity back to the retry quota.
34
35 The capacity being released will be truncated if necessary
36 to ensure the max capacity is never exceeded.
37
38 """
39 # Implementation note: The release() method is called as part
40 # of the "after-call" event, which means it gets invoked for
41 # every API call. In the common case where the request is
42 # successful and we're at full capacity, we can avoid locking.
43 # We can't exceed max capacity so there's no work we have to do.
44 if self._max_capacity == self._available_capacity:
45 return
46 with self._lock:
47 amount = min(
48 self._max_capacity - self._available_capacity, capacity_amount
49 )
50 self._available_capacity += amount
51
52 @property
53 def available_capacity(self):
54 return self._available_capacity