Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/retries/bucket.py: 31%

70 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1"""This module implements token buckets used for client side throttling.""" 

2import threading 

3import time 

4 

5from botocore.exceptions import CapacityNotAvailableError 

6 

7 

8class Clock: 

9 def __init__(self): 

10 pass 

11 

12 def sleep(self, amount): 

13 time.sleep(amount) 

14 

15 def current_time(self): 

16 return time.time() 

17 

18 

19class TokenBucket: 

20 _MIN_RATE = 0.5 

21 

22 def __init__(self, max_rate, clock, min_rate=_MIN_RATE): 

23 self._fill_rate = None 

24 self._max_capacity = None 

25 self._current_capacity = 0 

26 self._clock = clock 

27 self._last_timestamp = None 

28 self._min_rate = min_rate 

29 self._lock = threading.Lock() 

30 self._new_fill_rate_condition = threading.Condition(self._lock) 

31 self.max_rate = max_rate 

32 

33 @property 

34 def max_rate(self): 

35 return self._fill_rate 

36 

37 @max_rate.setter 

38 def max_rate(self, value): 

39 with self._new_fill_rate_condition: 

40 # Before we can change the rate we need to fill any pending 

41 # tokens we might have based on the current rate. If we don't 

42 # do this it means everything since the last recorded timestamp 

43 # will accumulate at the rate we're about to set which isn't 

44 # correct. 

45 self._refill() 

46 self._fill_rate = max(value, self._min_rate) 

47 if value >= 1: 

48 self._max_capacity = value 

49 else: 

50 self._max_capacity = 1 

51 # If we're scaling down, we also can't have a capacity that's 

52 # more than our max_capacity. 

53 self._current_capacity = min( 

54 self._current_capacity, self._max_capacity 

55 ) 

56 self._new_fill_rate_condition.notify() 

57 

58 @property 

59 def max_capacity(self): 

60 return self._max_capacity 

61 

62 @property 

63 def available_capacity(self): 

64 return self._current_capacity 

65 

66 def acquire(self, amount=1, block=True): 

67 """Acquire token or return amount of time until next token available. 

68 

69 If block is True, then this method will block until there's sufficient 

70 capacity to acquire the desired amount. 

71 

72 If block is False, then this method will return True is capacity 

73 was successfully acquired, False otherwise. 

74 

75 """ 

76 with self._new_fill_rate_condition: 

77 return self._acquire(amount=amount, block=block) 

78 

79 def _acquire(self, amount, block): 

80 self._refill() 

81 if amount <= self._current_capacity: 

82 self._current_capacity -= amount 

83 return True 

84 else: 

85 if not block: 

86 raise CapacityNotAvailableError() 

87 # Not enough capacity. 

88 sleep_amount = self._sleep_amount(amount) 

89 while sleep_amount > 0: 

90 # Until python3.2, wait() always returned None so we can't 

91 # tell if a timeout occurred waiting on the cond var. 

92 # Because of this we'll unconditionally call _refill(). 

93 # The downside to this is that we were waken up via 

94 # a notify(), we're calling unnecessarily calling _refill() an 

95 # extra time. 

96 self._new_fill_rate_condition.wait(sleep_amount) 

97 self._refill() 

98 sleep_amount = self._sleep_amount(amount) 

99 self._current_capacity -= amount 

100 return True 

101 

102 def _sleep_amount(self, amount): 

103 return (amount - self._current_capacity) / self._fill_rate 

104 

105 def _refill(self): 

106 timestamp = self._clock.current_time() 

107 if self._last_timestamp is None: 

108 self._last_timestamp = timestamp 

109 return 

110 current_capacity = self._current_capacity 

111 fill_amount = (timestamp - self._last_timestamp) * self._fill_rate 

112 new_capacity = min(self._max_capacity, current_capacity + fill_amount) 

113 self._current_capacity = new_capacity 

114 self._last_timestamp = timestamp