Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/rate_limiter.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

67 statements  

1# Copyright 2021 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import datetime 

16import warnings 

17from typing import Optional 

18 

19 

20def utcnow(): 

21 """ 

22 google.cloud.firestore_v1.rate_limiter.utcnow() is deprecated. 

23 Use datetime.datetime.now(datetime.timezone.utc) instead. 

24 """ 

25 warnings.warn( 

26 "google.cloud.firestore_v1.rate_limiter.utcnow() is deprecated. " 

27 "Use datetime.datetime.now(datetime.timezone.utc) instead.", 

28 DeprecationWarning, 

29 ) 

30 return datetime.datetime.utcnow() 

31 

32 

33default_initial_tokens: int = 500 

34default_phase_length: int = 60 * 5 # 5 minutes 

35microseconds_per_second: int = 1000000 

36 

37 

38class RateLimiter: 

39 """Implements 5/5/5 ramp-up via Token Bucket algorithm. 

40 

41 5/5/5 is a ramp up strategy that starts with a budget of 500 operations per 

42 second. Additionally, every 5 minutes, the maximum budget can increase by 

43 50%. Thus, at 5:01 into a long bulk-writing process, the maximum budget 

44 becomes 750 operations per second. At 10:01, the budget becomes 1,125 

45 operations per second. 

46 

47 The Token Bucket algorithm uses the metaphor of a bucket, or pile, or really 

48 any container, if we're being honest, of tokens from which a user is able 

49 to draw. If there are tokens available, you can do the thing. If there are not, 

50 you can not do the thing. Additionally, tokens replenish at a fixed rate. 

51 

52 Usage: 

53 

54 rate_limiter = RateLimiter() 

55 tokens = rate_limiter.take_tokens(20) 

56 

57 if not tokens: 

58 queue_retry() 

59 else: 

60 for _ in range(tokens): 

61 my_operation() 

62 

63 Args: 

64 initial_tokens (Optional[int]): Starting size of the budget. Defaults 

65 to 500. 

66 phase_length (Optional[int]): Number of seconds, after which, the size 

67 of the budget can increase by 50%. Such an increase will happen every 

68 [phase_length] seconds if operation requests continue consistently. 

69 """ 

70 

71 def __init__( 

72 self, 

73 initial_tokens: int = default_initial_tokens, 

74 global_max_tokens: Optional[int] = None, 

75 phase_length: int = default_phase_length, 

76 ): 

77 # Tracks the volume of operations during a given ramp-up phase. 

78 self._operations_this_phase: int = 0 

79 

80 # If provided, this enforces a cap on the maximum number of writes per 

81 # second we can ever attempt, regardless of how many 50% increases the 

82 # 5/5/5 rule would grant. 

83 self._global_max_tokens = global_max_tokens 

84 

85 self._start: Optional[datetime.datetime] = None 

86 self._last_refill: Optional[datetime.datetime] = None 

87 

88 # Current number of available operations. Decrements with every 

89 # permitted request and refills over time. 

90 self._available_tokens: int = initial_tokens 

91 

92 # Maximum size of the available operations. Can increase by 50% 

93 # every [phase_length] number of seconds. 

94 self._maximum_tokens: int = self._available_tokens 

95 

96 if self._global_max_tokens is not None: 

97 self._available_tokens = min( 

98 self._available_tokens, self._global_max_tokens 

99 ) 

100 self._maximum_tokens = min(self._maximum_tokens, self._global_max_tokens) 

101 

102 # Number of seconds after which the [_maximum_tokens] can increase by 50%. 

103 self._phase_length: int = phase_length 

104 

105 # Tracks how many times the [_maximum_tokens] has increased by 50%. 

106 self._phase: int = 0 

107 

108 def _start_clock(self): 

109 utcnow = datetime.datetime.now(datetime.timezone.utc) 

110 self._start = self._start or utcnow 

111 self._last_refill = self._last_refill or utcnow 

112 

113 def take_tokens(self, num: int = 1, allow_less: bool = False) -> int: 

114 """Returns the number of available tokens, up to the amount requested.""" 

115 self._start_clock() 

116 self._check_phase() 

117 self._refill() 

118 

119 minimum_tokens = 1 if allow_less else num 

120 

121 if self._available_tokens >= minimum_tokens: 

122 _num_to_take = min(self._available_tokens, num) 

123 self._available_tokens -= _num_to_take 

124 self._operations_this_phase += _num_to_take 

125 return _num_to_take 

126 return 0 

127 

128 def _check_phase(self) -> None: 

129 """Increments or decrements [_phase] depending on traffic. 

130 

131 Every [_phase_length] seconds, if > 50% of available traffic was used 

132 during the window, increases [_phase], otherwise, decreases [_phase]. 

133 

134 This is a no-op unless a new [_phase_length] number of seconds since the 

135 start was crossed since it was last called. 

136 """ 

137 if self._start is None: 

138 raise TypeError("RateLimiter error: unset _start value") 

139 age: datetime.timedelta = ( 

140 datetime.datetime.now(datetime.timezone.utc) - self._start 

141 ) 

142 

143 # Uses integer division to calculate the expected phase. We start in 

144 # Phase 0, so until [_phase_length] seconds have passed, this will 

145 # not resolve to 1. 

146 expected_phase: int = age.seconds // self._phase_length 

147 

148 # Short-circuit if we are still in the expected phase. 

149 if expected_phase == self._phase: 

150 return 

151 

152 operations_last_phase: int = self._operations_this_phase 

153 self._operations_this_phase = 0 

154 

155 previous_phase: int = self._phase 

156 self._phase = expected_phase 

157 

158 # No-op if we did nothing for an entire phase 

159 if operations_last_phase and self._phase > previous_phase: 

160 self._increase_maximum_tokens() 

161 

162 def _increase_maximum_tokens(self) -> None: 

163 self._maximum_tokens = round(self._maximum_tokens * 1.5) 

164 if self._global_max_tokens is not None: 

165 self._maximum_tokens = min(self._maximum_tokens, self._global_max_tokens) 

166 

167 def _refill(self) -> None: 

168 """Replenishes any tokens that should have regenerated since the last 

169 operation.""" 

170 if self._last_refill is None: 

171 raise TypeError("RateLimiter error: unset _last_refill value") 

172 now: datetime.datetime = datetime.datetime.now(datetime.timezone.utc) 

173 time_since_last_refill: datetime.timedelta = now - self._last_refill 

174 

175 if time_since_last_refill: 

176 self._last_refill = now 

177 

178 # If we haven't done anything for 1s, then we know for certain we 

179 # should reset to max capacity. 

180 if time_since_last_refill.seconds >= 1: 

181 self._available_tokens = self._maximum_tokens 

182 

183 # If we have done something in the last 1s, then we know we should 

184 # allocate proportional tokens. 

185 else: 

186 _percent_of_max: float = ( 

187 time_since_last_refill.microseconds / microseconds_per_second 

188 ) 

189 new_tokens: int = round(_percent_of_max * self._maximum_tokens) 

190 

191 # Add the number of provisioned tokens, capped at the maximum size. 

192 self._available_tokens = min( 

193 self._maximum_tokens, 

194 self._available_tokens + new_tokens, 

195 )