Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/retries/adaptive.py: 27%

66 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1import logging 

2import math 

3import threading 

4 

5from botocore.retries import bucket, standard, throttling 

6 

7logger = logging.getLogger(__name__) 

8 

9 

10def register_retry_handler(client): 

11 clock = bucket.Clock() 

12 rate_adjustor = throttling.CubicCalculator( 

13 starting_max_rate=0, start_time=clock.current_time() 

14 ) 

15 token_bucket = bucket.TokenBucket(max_rate=1, clock=clock) 

16 rate_clocker = RateClocker(clock) 

17 throttling_detector = standard.ThrottlingErrorDetector( 

18 retry_event_adapter=standard.RetryEventAdapter(), 

19 ) 

20 limiter = ClientRateLimiter( 

21 rate_adjustor=rate_adjustor, 

22 rate_clocker=rate_clocker, 

23 token_bucket=token_bucket, 

24 throttling_detector=throttling_detector, 

25 clock=clock, 

26 ) 

27 client.meta.events.register( 

28 'before-send', 

29 limiter.on_sending_request, 

30 ) 

31 client.meta.events.register( 

32 'needs-retry', 

33 limiter.on_receiving_response, 

34 ) 

35 return limiter 

36 

37 

38class ClientRateLimiter: 

39 _MAX_RATE_ADJUST_SCALE = 2.0 

40 

41 def __init__( 

42 self, 

43 rate_adjustor, 

44 rate_clocker, 

45 token_bucket, 

46 throttling_detector, 

47 clock, 

48 ): 

49 self._rate_adjustor = rate_adjustor 

50 self._rate_clocker = rate_clocker 

51 self._token_bucket = token_bucket 

52 self._throttling_detector = throttling_detector 

53 self._clock = clock 

54 self._enabled = False 

55 self._lock = threading.Lock() 

56 

57 def on_sending_request(self, request, **kwargs): 

58 if self._enabled: 

59 self._token_bucket.acquire() 

60 

61 # Hooked up to needs-retry. 

62 def on_receiving_response(self, **kwargs): 

63 measured_rate = self._rate_clocker.record() 

64 timestamp = self._clock.current_time() 

65 with self._lock: 

66 if not self._throttling_detector.is_throttling_error(**kwargs): 

67 new_rate = self._rate_adjustor.success_received(timestamp) 

68 else: 

69 if not self._enabled: 

70 rate_to_use = measured_rate 

71 else: 

72 rate_to_use = min( 

73 measured_rate, self._token_bucket.max_rate 

74 ) 

75 new_rate = self._rate_adjustor.error_received( 

76 rate_to_use, timestamp 

77 ) 

78 logger.debug( 

79 "Throttling response received, new send rate: %s " 

80 "measured rate: %s, token bucket capacity " 

81 "available: %s", 

82 new_rate, 

83 measured_rate, 

84 self._token_bucket.available_capacity, 

85 ) 

86 self._enabled = True 

87 self._token_bucket.max_rate = min( 

88 new_rate, self._MAX_RATE_ADJUST_SCALE * measured_rate 

89 ) 

90 

91 

92class RateClocker: 

93 """Tracks the rate at which a client is sending a request.""" 

94 

95 _DEFAULT_SMOOTHING = 0.8 

96 # Update the rate every _TIME_BUCKET_RANGE seconds. 

97 _TIME_BUCKET_RANGE = 0.5 

98 

99 def __init__( 

100 self, 

101 clock, 

102 smoothing=_DEFAULT_SMOOTHING, 

103 time_bucket_range=_TIME_BUCKET_RANGE, 

104 ): 

105 self._clock = clock 

106 self._measured_rate = 0 

107 self._smoothing = smoothing 

108 self._last_bucket = math.floor(self._clock.current_time()) 

109 self._time_bucket_scale = 1 / self._TIME_BUCKET_RANGE 

110 self._count = 0 

111 self._lock = threading.Lock() 

112 

113 def record(self, amount=1): 

114 with self._lock: 

115 t = self._clock.current_time() 

116 bucket = ( 

117 math.floor(t * self._time_bucket_scale) 

118 / self._time_bucket_scale 

119 ) 

120 self._count += amount 

121 if bucket > self._last_bucket: 

122 current_rate = self._count / float(bucket - self._last_bucket) 

123 self._measured_rate = (current_rate * self._smoothing) + ( 

124 self._measured_rate * (1 - self._smoothing) 

125 ) 

126 self._count = 0 

127 self._last_bucket = bucket 

128 return self._measured_rate 

129 

130 @property 

131 def measured_rate(self): 

132 return self._measured_rate