Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/rate_limiter.py: 23%

62 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 06:27 +0000

1# Copyright 2021 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import datetime 

16from typing import NoReturn, Optional 

17import warnings 

18 

19 

20def utcnow(): 

21 """ 

22 google.cloud.firestore_v1.rate_limiter.utcnow() is deprecated. 

23 Use datetime.datetime.now(datetime.timezone.utc) instead. 

24 """ 

25 warnings.warn( 

26 "google.cloud.firestore_v1.rate_limiter.utcnow() is deprecated. " 

27 "Use datetime.datetime.now(datetime.timezone.utc) instead.", 

28 DeprecationWarning, 

29 ) 

30 return datetime.datetime.utcnow() 

31 

32 

33default_initial_tokens: int = 500 

34default_phase_length: int = 60 * 5 # 5 minutes 

35microseconds_per_second: int = 1000000 

36 

37 

38class RateLimiter: 

39 """Implements 5/5/5 ramp-up via Token Bucket algorithm. 

40 

41 5/5/5 is a ramp up strategy that starts with a budget of 500 operations per 

42 second. Additionally, every 5 minutes, the maximum budget can increase by 

43 50%. Thus, at 5:01 into a long bulk-writing process, the maximum budget 

44 becomes 750 operations per second. At 10:01, the budget becomes 1,125 

45 operations per second. 

46 

47 The Token Bucket algorithm uses the metaphor of a bucket, or pile, or really 

48 any container, if we're being honest, of tokens from which a user is able 

49 to draw. If there are tokens available, you can do the thing. If there are not, 

50 you can not do the thing. Additionally, tokens replenish at a fixed rate. 

51 

52 Usage: 

53 

54 rate_limiter = RateLimiter() 

55 tokens = rate_limiter.take_tokens(20) 

56 

57 if not tokens: 

58 queue_retry() 

59 else: 

60 for _ in range(tokens): 

61 my_operation() 

62 

63 Args: 

64 initial_tokens (Optional[int]): Starting size of the budget. Defaults 

65 to 500. 

66 phase_length (Optional[int]): Number of seconds, after which, the size 

67 of the budget can increase by 50%. Such an increase will happen every 

68 [phase_length] seconds if operation requests continue consistently. 

69 """ 

70 

71 def __init__( 

72 self, 

73 initial_tokens: int = default_initial_tokens, 

74 global_max_tokens: Optional[int] = None, 

75 phase_length: int = default_phase_length, 

76 ): 

77 # Tracks the volume of operations during a given ramp-up phase. 

78 self._operations_this_phase: int = 0 

79 

80 # If provided, this enforces a cap on the maximum number of writes per 

81 # second we can ever attempt, regardless of how many 50% increases the 

82 # 5/5/5 rule would grant. 

83 self._global_max_tokens = global_max_tokens 

84 

85 self._start: Optional[datetime.datetime] = None 

86 self._last_refill: Optional[datetime.datetime] = None 

87 

88 # Current number of available operations. Decrements with every 

89 # permitted request and refills over time. 

90 self._available_tokens: int = initial_tokens 

91 

92 # Maximum size of the available operations. Can increase by 50% 

93 # every [phase_length] number of seconds. 

94 self._maximum_tokens: int = self._available_tokens 

95 

96 if self._global_max_tokens is not None: 

97 self._available_tokens = min( 

98 self._available_tokens, self._global_max_tokens 

99 ) 

100 self._maximum_tokens = min(self._maximum_tokens, self._global_max_tokens) 

101 

102 # Number of seconds after which the [_maximum_tokens] can increase by 50%. 

103 self._phase_length: int = phase_length 

104 

105 # Tracks how many times the [_maximum_tokens] has increased by 50%. 

106 self._phase: int = 0 

107 

108 def _start_clock(self): 

109 utcnow = datetime.datetime.now(datetime.timezone.utc) 

110 self._start = self._start or utcnow 

111 self._last_refill = self._last_refill or utcnow 

112 

113 def take_tokens(self, num: Optional[int] = 1, allow_less: bool = False) -> int: 

114 """Returns the number of available tokens, up to the amount requested.""" 

115 self._start_clock() 

116 self._check_phase() 

117 self._refill() 

118 

119 minimum_tokens = 1 if allow_less else num 

120 

121 if self._available_tokens >= minimum_tokens: 

122 _num_to_take = min(self._available_tokens, num) 

123 self._available_tokens -= _num_to_take 

124 self._operations_this_phase += _num_to_take 

125 return _num_to_take 

126 return 0 

127 

128 def _check_phase(self): 

129 """Increments or decrements [_phase] depending on traffic. 

130 

131 Every [_phase_length] seconds, if > 50% of available traffic was used 

132 during the window, increases [_phase], otherwise, decreases [_phase]. 

133 

134 This is a no-op unless a new [_phase_length] number of seconds since the 

135 start was crossed since it was last called. 

136 """ 

137 age: datetime.timedelta = ( 

138 datetime.datetime.now(datetime.timezone.utc) - self._start 

139 ) 

140 

141 # Uses integer division to calculate the expected phase. We start in 

142 # Phase 0, so until [_phase_length] seconds have passed, this will 

143 # not resolve to 1. 

144 expected_phase: int = age.seconds // self._phase_length 

145 

146 # Short-circuit if we are still in the expected phase. 

147 if expected_phase == self._phase: 

148 return 

149 

150 operations_last_phase: int = self._operations_this_phase 

151 self._operations_this_phase = 0 

152 

153 previous_phase: int = self._phase 

154 self._phase = expected_phase 

155 

156 # No-op if we did nothing for an entire phase 

157 if operations_last_phase and self._phase > previous_phase: 

158 self._increase_maximum_tokens() 

159 

160 def _increase_maximum_tokens(self) -> NoReturn: 

161 self._maximum_tokens = round(self._maximum_tokens * 1.5) 

162 if self._global_max_tokens is not None: 

163 self._maximum_tokens = min(self._maximum_tokens, self._global_max_tokens) 

164 

165 def _refill(self) -> NoReturn: 

166 """Replenishes any tokens that should have regenerated since the last 

167 operation.""" 

168 now: datetime.datetime = datetime.datetime.now(datetime.timezone.utc) 

169 time_since_last_refill: datetime.timedelta = now - self._last_refill 

170 

171 if time_since_last_refill: 

172 self._last_refill = now 

173 

174 # If we haven't done anything for 1s, then we know for certain we 

175 # should reset to max capacity. 

176 if time_since_last_refill.seconds >= 1: 

177 self._available_tokens = self._maximum_tokens 

178 

179 # If we have done something in the last 1s, then we know we should 

180 # allocate proportional tokens. 

181 else: 

182 _percent_of_max: float = ( 

183 time_since_last_refill.microseconds / microseconds_per_second 

184 ) 

185 new_tokens: int = round(_percent_of_max * self._maximum_tokens) 

186 

187 # Add the number of provisioned tokens, capped at the maximum size. 

188 self._available_tokens = min( 

189 self._maximum_tokens, 

190 self._available_tokens + new_tokens, 

191 )