Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/retries/throttling.py: 38%
26 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1from collections import namedtuple
3CubicParams = namedtuple('CubicParams', ['w_max', 'k', 'last_fail'])
6class CubicCalculator:
7 _SCALE_CONSTANT = 0.4
8 _BETA = 0.7
10 def __init__(
11 self,
12 starting_max_rate,
13 start_time,
14 scale_constant=_SCALE_CONSTANT,
15 beta=_BETA,
16 ):
17 self._w_max = starting_max_rate
18 self._scale_constant = scale_constant
19 self._beta = beta
20 self._k = self._calculate_zero_point()
21 self._last_fail = start_time
23 def _calculate_zero_point(self):
24 scaled_value = (self._w_max * (1 - self._beta)) / self._scale_constant
25 k = scaled_value ** (1 / 3.0)
26 return k
28 def success_received(self, timestamp):
29 dt = timestamp - self._last_fail
30 new_rate = self._scale_constant * (dt - self._k) ** 3 + self._w_max
31 return new_rate
33 def error_received(self, current_rate, timestamp):
34 # Consider not having this be the current measured rate.
36 # We have a new max rate, which is the current rate we were sending
37 # at when we received an error response.
38 self._w_max = current_rate
39 self._k = self._calculate_zero_point()
40 self._last_fail = timestamp
41 return current_rate * self._beta
43 def get_params_snapshot(self):
44 """Return a read-only object of the current cubic parameters.
46 These parameters are intended to be used for debug/troubleshooting
47 purposes. These object is a read-only snapshot and cannot be used
48 to modify the behavior of the CUBIC calculations.
50 New parameters may be added to this object in the future.
52 """
53 return CubicParams(
54 w_max=self._w_max, k=self._k, last_fail=self._last_fail
55 )