1# Copyright 2021 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import warnings
17from typing import Optional
18
19
20def utcnow():
21 """
22 google.cloud.firestore_v1.rate_limiter.utcnow() is deprecated.
23 Use datetime.datetime.now(datetime.timezone.utc) instead.
24 """
25 warnings.warn(
26 "google.cloud.firestore_v1.rate_limiter.utcnow() is deprecated. "
27 "Use datetime.datetime.now(datetime.timezone.utc) instead.",
28 DeprecationWarning,
29 )
30 return datetime.datetime.utcnow()
31
32
33default_initial_tokens: int = 500
34default_phase_length: int = 60 * 5 # 5 minutes
35microseconds_per_second: int = 1000000
36
37
38class RateLimiter:
39 """Implements 5/5/5 ramp-up via Token Bucket algorithm.
40
41 5/5/5 is a ramp up strategy that starts with a budget of 500 operations per
42 second. Additionally, every 5 minutes, the maximum budget can increase by
43 50%. Thus, at 5:01 into a long bulk-writing process, the maximum budget
44 becomes 750 operations per second. At 10:01, the budget becomes 1,125
45 operations per second.
46
47 The Token Bucket algorithm uses the metaphor of a bucket, or pile, or really
48 any container, if we're being honest, of tokens from which a user is able
49 to draw. If there are tokens available, you can do the thing. If there are not,
50 you can not do the thing. Additionally, tokens replenish at a fixed rate.
51
52 Usage:
53
54 rate_limiter = RateLimiter()
55 tokens = rate_limiter.take_tokens(20)
56
57 if not tokens:
58 queue_retry()
59 else:
60 for _ in range(tokens):
61 my_operation()
62
63 Args:
64 initial_tokens (Optional[int]): Starting size of the budget. Defaults
65 to 500.
66 phase_length (Optional[int]): Number of seconds, after which, the size
67 of the budget can increase by 50%. Such an increase will happen every
68 [phase_length] seconds if operation requests continue consistently.
69 """
70
71 def __init__(
72 self,
73 initial_tokens: int = default_initial_tokens,
74 global_max_tokens: Optional[int] = None,
75 phase_length: int = default_phase_length,
76 ):
77 # Tracks the volume of operations during a given ramp-up phase.
78 self._operations_this_phase: int = 0
79
80 # If provided, this enforces a cap on the maximum number of writes per
81 # second we can ever attempt, regardless of how many 50% increases the
82 # 5/5/5 rule would grant.
83 self._global_max_tokens = global_max_tokens
84
85 self._start: Optional[datetime.datetime] = None
86 self._last_refill: Optional[datetime.datetime] = None
87
88 # Current number of available operations. Decrements with every
89 # permitted request and refills over time.
90 self._available_tokens: int = initial_tokens
91
92 # Maximum size of the available operations. Can increase by 50%
93 # every [phase_length] number of seconds.
94 self._maximum_tokens: int = self._available_tokens
95
96 if self._global_max_tokens is not None:
97 self._available_tokens = min(
98 self._available_tokens, self._global_max_tokens
99 )
100 self._maximum_tokens = min(self._maximum_tokens, self._global_max_tokens)
101
102 # Number of seconds after which the [_maximum_tokens] can increase by 50%.
103 self._phase_length: int = phase_length
104
105 # Tracks how many times the [_maximum_tokens] has increased by 50%.
106 self._phase: int = 0
107
108 def _start_clock(self):
109 utcnow = datetime.datetime.now(datetime.timezone.utc)
110 self._start = self._start or utcnow
111 self._last_refill = self._last_refill or utcnow
112
113 def take_tokens(self, num: int = 1, allow_less: bool = False) -> int:
114 """Returns the number of available tokens, up to the amount requested."""
115 self._start_clock()
116 self._check_phase()
117 self._refill()
118
119 minimum_tokens = 1 if allow_less else num
120
121 if self._available_tokens >= minimum_tokens:
122 _num_to_take = min(self._available_tokens, num)
123 self._available_tokens -= _num_to_take
124 self._operations_this_phase += _num_to_take
125 return _num_to_take
126 return 0
127
128 def _check_phase(self) -> None:
129 """Increments or decrements [_phase] depending on traffic.
130
131 Every [_phase_length] seconds, if > 50% of available traffic was used
132 during the window, increases [_phase], otherwise, decreases [_phase].
133
134 This is a no-op unless a new [_phase_length] number of seconds since the
135 start was crossed since it was last called.
136 """
137 if self._start is None:
138 raise TypeError("RateLimiter error: unset _start value")
139 age: datetime.timedelta = (
140 datetime.datetime.now(datetime.timezone.utc) - self._start
141 )
142
143 # Uses integer division to calculate the expected phase. We start in
144 # Phase 0, so until [_phase_length] seconds have passed, this will
145 # not resolve to 1.
146 expected_phase: int = age.seconds // self._phase_length
147
148 # Short-circuit if we are still in the expected phase.
149 if expected_phase == self._phase:
150 return
151
152 operations_last_phase: int = self._operations_this_phase
153 self._operations_this_phase = 0
154
155 previous_phase: int = self._phase
156 self._phase = expected_phase
157
158 # No-op if we did nothing for an entire phase
159 if operations_last_phase and self._phase > previous_phase:
160 self._increase_maximum_tokens()
161
162 def _increase_maximum_tokens(self) -> None:
163 self._maximum_tokens = round(self._maximum_tokens * 1.5)
164 if self._global_max_tokens is not None:
165 self._maximum_tokens = min(self._maximum_tokens, self._global_max_tokens)
166
167 def _refill(self) -> None:
168 """Replenishes any tokens that should have regenerated since the last
169 operation."""
170 if self._last_refill is None:
171 raise TypeError("RateLimiter error: unset _last_refill value")
172 now: datetime.datetime = datetime.datetime.now(datetime.timezone.utc)
173 time_since_last_refill: datetime.timedelta = now - self._last_refill
174
175 if time_since_last_refill:
176 self._last_refill = now
177
178 # If we haven't done anything for 1s, then we know for certain we
179 # should reset to max capacity.
180 if time_since_last_refill.seconds >= 1:
181 self._available_tokens = self._maximum_tokens
182
183 # If we have done something in the last 1s, then we know we should
184 # allocate proportional tokens.
185 else:
186 _percent_of_max: float = (
187 time_since_last_refill.microseconds / microseconds_per_second
188 )
189 new_tokens: int = round(_percent_of_max * self._maximum_tokens)
190
191 # Add the number of provisioned tokens, capped at the maximum size.
192 self._available_tokens = min(
193 self._maximum_tokens,
194 self._available_tokens + new_tokens,
195 )