Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/retries/bucket.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

70 statements  

1"""This module implements token buckets used for client side throttling.""" 

2 

3import threading 

4import time 

5 

6from botocore.exceptions import CapacityNotAvailableError 

7 

8 

9class Clock: 

10 def __init__(self): 

11 pass 

12 

13 def sleep(self, amount): 

14 time.sleep(amount) 

15 

16 def current_time(self): 

17 return time.time() 

18 

19 

20class TokenBucket: 

21 _MIN_RATE = 0.5 

22 

23 def __init__(self, max_rate, clock, min_rate=_MIN_RATE): 

24 self._fill_rate = None 

25 self._max_capacity = None 

26 self._current_capacity = 0 

27 self._clock = clock 

28 self._last_timestamp = None 

29 self._min_rate = min_rate 

30 self._lock = threading.Lock() 

31 self._new_fill_rate_condition = threading.Condition(self._lock) 

32 self.max_rate = max_rate 

33 

34 @property 

35 def max_rate(self): 

36 return self._fill_rate 

37 

38 @max_rate.setter 

39 def max_rate(self, value): 

40 with self._new_fill_rate_condition: 

41 # Before we can change the rate we need to fill any pending 

42 # tokens we might have based on the current rate. If we don't 

43 # do this it means everything since the last recorded timestamp 

44 # will accumulate at the rate we're about to set which isn't 

45 # correct. 

46 self._refill() 

47 self._fill_rate = max(value, self._min_rate) 

48 if value >= 1: 

49 self._max_capacity = value 

50 else: 

51 self._max_capacity = 1 

52 # If we're scaling down, we also can't have a capacity that's 

53 # more than our max_capacity. 

54 self._current_capacity = min( 

55 self._current_capacity, self._max_capacity 

56 ) 

57 self._new_fill_rate_condition.notify() 

58 

59 @property 

60 def max_capacity(self): 

61 return self._max_capacity 

62 

63 @property 

64 def available_capacity(self): 

65 return self._current_capacity 

66 

67 def acquire(self, amount=1, block=True): 

68 """Acquire token or return amount of time until next token available. 

69 

70 If block is True, then this method will block until there's sufficient 

71 capacity to acquire the desired amount. 

72 

73 If block is False, then this method will return True is capacity 

74 was successfully acquired, False otherwise. 

75 

76 """ 

77 with self._new_fill_rate_condition: 

78 return self._acquire(amount=amount, block=block) 

79 

80 def _acquire(self, amount, block): 

81 self._refill() 

82 if amount <= self._current_capacity: 

83 self._current_capacity -= amount 

84 return True 

85 else: 

86 if not block: 

87 raise CapacityNotAvailableError() 

88 # Not enough capacity. 

89 sleep_amount = self._sleep_amount(amount) 

90 while sleep_amount > 0: 

91 # Until python3.2, wait() always returned None so we can't 

92 # tell if a timeout occurred waiting on the cond var. 

93 # Because of this we'll unconditionally call _refill(). 

94 # The downside to this is that we were waken up via 

95 # a notify(), we're calling unnecessarily calling _refill() an 

96 # extra time. 

97 self._new_fill_rate_condition.wait(sleep_amount) 

98 self._refill() 

99 sleep_amount = self._sleep_amount(amount) 

100 self._current_capacity -= amount 

101 return True 

102 

103 def _sleep_amount(self, amount): 

104 return (amount - self._current_capacity) / self._fill_rate 

105 

106 def _refill(self): 

107 timestamp = self._clock.current_time() 

108 if self._last_timestamp is None: 

109 self._last_timestamp = timestamp 

110 return 

111 current_capacity = self._current_capacity 

112 fill_amount = (timestamp - self._last_timestamp) * self._fill_rate 

113 new_capacity = min(self._max_capacity, current_capacity + fill_amount) 

114 self._current_capacity = new_capacity 

115 self._last_timestamp = timestamp