Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/rate_limiter.py: 23%
62 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
1# Copyright 2021 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import datetime
16from typing import NoReturn, Optional
17import warnings
20def utcnow():
21 """
22 google.cloud.firestore_v1.rate_limiter.utcnow() is deprecated.
23 Use datetime.datetime.now(datetime.timezone.utc) instead.
24 """
25 warnings.warn(
26 "google.cloud.firestore_v1.rate_limiter.utcnow() is deprecated. "
27 "Use datetime.datetime.now(datetime.timezone.utc) instead.",
28 DeprecationWarning,
29 )
30 return datetime.datetime.utcnow()
33default_initial_tokens: int = 500
34default_phase_length: int = 60 * 5 # 5 minutes
35microseconds_per_second: int = 1000000
38class RateLimiter:
39 """Implements 5/5/5 ramp-up via Token Bucket algorithm.
41 5/5/5 is a ramp up strategy that starts with a budget of 500 operations per
42 second. Additionally, every 5 minutes, the maximum budget can increase by
43 50%. Thus, at 5:01 into a long bulk-writing process, the maximum budget
44 becomes 750 operations per second. At 10:01, the budget becomes 1,125
45 operations per second.
47 The Token Bucket algorithm uses the metaphor of a bucket, or pile, or really
48 any container, if we're being honest, of tokens from which a user is able
49 to draw. If there are tokens available, you can do the thing. If there are not,
50 you can not do the thing. Additionally, tokens replenish at a fixed rate.
52 Usage:
54 rate_limiter = RateLimiter()
55 tokens = rate_limiter.take_tokens(20)
57 if not tokens:
58 queue_retry()
59 else:
60 for _ in range(tokens):
61 my_operation()
63 Args:
64 initial_tokens (Optional[int]): Starting size of the budget. Defaults
65 to 500.
66 phase_length (Optional[int]): Number of seconds, after which, the size
67 of the budget can increase by 50%. Such an increase will happen every
68 [phase_length] seconds if operation requests continue consistently.
69 """
71 def __init__(
72 self,
73 initial_tokens: int = default_initial_tokens,
74 global_max_tokens: Optional[int] = None,
75 phase_length: int = default_phase_length,
76 ):
77 # Tracks the volume of operations during a given ramp-up phase.
78 self._operations_this_phase: int = 0
80 # If provided, this enforces a cap on the maximum number of writes per
81 # second we can ever attempt, regardless of how many 50% increases the
82 # 5/5/5 rule would grant.
83 self._global_max_tokens = global_max_tokens
85 self._start: Optional[datetime.datetime] = None
86 self._last_refill: Optional[datetime.datetime] = None
88 # Current number of available operations. Decrements with every
89 # permitted request and refills over time.
90 self._available_tokens: int = initial_tokens
92 # Maximum size of the available operations. Can increase by 50%
93 # every [phase_length] number of seconds.
94 self._maximum_tokens: int = self._available_tokens
96 if self._global_max_tokens is not None:
97 self._available_tokens = min(
98 self._available_tokens, self._global_max_tokens
99 )
100 self._maximum_tokens = min(self._maximum_tokens, self._global_max_tokens)
102 # Number of seconds after which the [_maximum_tokens] can increase by 50%.
103 self._phase_length: int = phase_length
105 # Tracks how many times the [_maximum_tokens] has increased by 50%.
106 self._phase: int = 0
108 def _start_clock(self):
109 utcnow = datetime.datetime.now(datetime.timezone.utc)
110 self._start = self._start or utcnow
111 self._last_refill = self._last_refill or utcnow
113 def take_tokens(self, num: Optional[int] = 1, allow_less: bool = False) -> int:
114 """Returns the number of available tokens, up to the amount requested."""
115 self._start_clock()
116 self._check_phase()
117 self._refill()
119 minimum_tokens = 1 if allow_less else num
121 if self._available_tokens >= minimum_tokens:
122 _num_to_take = min(self._available_tokens, num)
123 self._available_tokens -= _num_to_take
124 self._operations_this_phase += _num_to_take
125 return _num_to_take
126 return 0
128 def _check_phase(self):
129 """Increments or decrements [_phase] depending on traffic.
131 Every [_phase_length] seconds, if > 50% of available traffic was used
132 during the window, increases [_phase], otherwise, decreases [_phase].
134 This is a no-op unless a new [_phase_length] number of seconds since the
135 start was crossed since it was last called.
136 """
137 age: datetime.timedelta = (
138 datetime.datetime.now(datetime.timezone.utc) - self._start
139 )
141 # Uses integer division to calculate the expected phase. We start in
142 # Phase 0, so until [_phase_length] seconds have passed, this will
143 # not resolve to 1.
144 expected_phase: int = age.seconds // self._phase_length
146 # Short-circuit if we are still in the expected phase.
147 if expected_phase == self._phase:
148 return
150 operations_last_phase: int = self._operations_this_phase
151 self._operations_this_phase = 0
153 previous_phase: int = self._phase
154 self._phase = expected_phase
156 # No-op if we did nothing for an entire phase
157 if operations_last_phase and self._phase > previous_phase:
158 self._increase_maximum_tokens()
160 def _increase_maximum_tokens(self) -> NoReturn:
161 self._maximum_tokens = round(self._maximum_tokens * 1.5)
162 if self._global_max_tokens is not None:
163 self._maximum_tokens = min(self._maximum_tokens, self._global_max_tokens)
165 def _refill(self) -> NoReturn:
166 """Replenishes any tokens that should have regenerated since the last
167 operation."""
168 now: datetime.datetime = datetime.datetime.now(datetime.timezone.utc)
169 time_since_last_refill: datetime.timedelta = now - self._last_refill
171 if time_since_last_refill:
172 self._last_refill = now
174 # If we haven't done anything for 1s, then we know for certain we
175 # should reset to max capacity.
176 if time_since_last_refill.seconds >= 1:
177 self._available_tokens = self._maximum_tokens
179 # If we have done something in the last 1s, then we know we should
180 # allocate proportional tokens.
181 else:
182 _percent_of_max: float = (
183 time_since_last_refill.microseconds / microseconds_per_second
184 )
185 new_tokens: int = round(_percent_of_max * self._maximum_tokens)
187 # Add the number of provisioned tokens, capped at the maximum size.
188 self._available_tokens = min(
189 self._maximum_tokens,
190 self._available_tokens + new_tokens,
191 )