Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/retries/throttling.py: 38%

26 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1from collections import namedtuple 

2 

3CubicParams = namedtuple('CubicParams', ['w_max', 'k', 'last_fail']) 

4 

5 

6class CubicCalculator: 

7 _SCALE_CONSTANT = 0.4 

8 _BETA = 0.7 

9 

10 def __init__( 

11 self, 

12 starting_max_rate, 

13 start_time, 

14 scale_constant=_SCALE_CONSTANT, 

15 beta=_BETA, 

16 ): 

17 self._w_max = starting_max_rate 

18 self._scale_constant = scale_constant 

19 self._beta = beta 

20 self._k = self._calculate_zero_point() 

21 self._last_fail = start_time 

22 

23 def _calculate_zero_point(self): 

24 scaled_value = (self._w_max * (1 - self._beta)) / self._scale_constant 

25 k = scaled_value ** (1 / 3.0) 

26 return k 

27 

28 def success_received(self, timestamp): 

29 dt = timestamp - self._last_fail 

30 new_rate = self._scale_constant * (dt - self._k) ** 3 + self._w_max 

31 return new_rate 

32 

33 def error_received(self, current_rate, timestamp): 

34 # Consider not having this be the current measured rate. 

35 

36 # We have a new max rate, which is the current rate we were sending 

37 # at when we received an error response. 

38 self._w_max = current_rate 

39 self._k = self._calculate_zero_point() 

40 self._last_fail = timestamp 

41 return current_rate * self._beta 

42 

43 def get_params_snapshot(self): 

44 """Return a read-only object of the current cubic parameters. 

45 

46 These parameters are intended to be used for debug/troubleshooting 

47 purposes. These object is a read-only snapshot and cannot be used 

48 to modify the behavior of the CUBIC calculations. 

49 

50 New parameters may be added to this object in the future. 

51 

52 """ 

53 return CubicParams( 

54 w_max=self._w_max, k=self._k, last_fail=self._last_fail 

55 )