Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/retries/adaptive.py: 27%
66 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1import logging
2import math
3import threading
5from botocore.retries import bucket, standard, throttling
7logger = logging.getLogger(__name__)
10def register_retry_handler(client):
11 clock = bucket.Clock()
12 rate_adjustor = throttling.CubicCalculator(
13 starting_max_rate=0, start_time=clock.current_time()
14 )
15 token_bucket = bucket.TokenBucket(max_rate=1, clock=clock)
16 rate_clocker = RateClocker(clock)
17 throttling_detector = standard.ThrottlingErrorDetector(
18 retry_event_adapter=standard.RetryEventAdapter(),
19 )
20 limiter = ClientRateLimiter(
21 rate_adjustor=rate_adjustor,
22 rate_clocker=rate_clocker,
23 token_bucket=token_bucket,
24 throttling_detector=throttling_detector,
25 clock=clock,
26 )
27 client.meta.events.register(
28 'before-send',
29 limiter.on_sending_request,
30 )
31 client.meta.events.register(
32 'needs-retry',
33 limiter.on_receiving_response,
34 )
35 return limiter
38class ClientRateLimiter:
39 _MAX_RATE_ADJUST_SCALE = 2.0
41 def __init__(
42 self,
43 rate_adjustor,
44 rate_clocker,
45 token_bucket,
46 throttling_detector,
47 clock,
48 ):
49 self._rate_adjustor = rate_adjustor
50 self._rate_clocker = rate_clocker
51 self._token_bucket = token_bucket
52 self._throttling_detector = throttling_detector
53 self._clock = clock
54 self._enabled = False
55 self._lock = threading.Lock()
57 def on_sending_request(self, request, **kwargs):
58 if self._enabled:
59 self._token_bucket.acquire()
61 # Hooked up to needs-retry.
62 def on_receiving_response(self, **kwargs):
63 measured_rate = self._rate_clocker.record()
64 timestamp = self._clock.current_time()
65 with self._lock:
66 if not self._throttling_detector.is_throttling_error(**kwargs):
67 new_rate = self._rate_adjustor.success_received(timestamp)
68 else:
69 if not self._enabled:
70 rate_to_use = measured_rate
71 else:
72 rate_to_use = min(
73 measured_rate, self._token_bucket.max_rate
74 )
75 new_rate = self._rate_adjustor.error_received(
76 rate_to_use, timestamp
77 )
78 logger.debug(
79 "Throttling response received, new send rate: %s "
80 "measured rate: %s, token bucket capacity "
81 "available: %s",
82 new_rate,
83 measured_rate,
84 self._token_bucket.available_capacity,
85 )
86 self._enabled = True
87 self._token_bucket.max_rate = min(
88 new_rate, self._MAX_RATE_ADJUST_SCALE * measured_rate
89 )
92class RateClocker:
93 """Tracks the rate at which a client is sending a request."""
95 _DEFAULT_SMOOTHING = 0.8
96 # Update the rate every _TIME_BUCKET_RANGE seconds.
97 _TIME_BUCKET_RANGE = 0.5
99 def __init__(
100 self,
101 clock,
102 smoothing=_DEFAULT_SMOOTHING,
103 time_bucket_range=_TIME_BUCKET_RANGE,
104 ):
105 self._clock = clock
106 self._measured_rate = 0
107 self._smoothing = smoothing
108 self._last_bucket = math.floor(self._clock.current_time())
109 self._time_bucket_scale = 1 / self._TIME_BUCKET_RANGE
110 self._count = 0
111 self._lock = threading.Lock()
113 def record(self, amount=1):
114 with self._lock:
115 t = self._clock.current_time()
116 bucket = (
117 math.floor(t * self._time_bucket_scale)
118 / self._time_bucket_scale
119 )
120 self._count += amount
121 if bucket > self._last_bucket:
122 current_rate = self._count / float(bucket - self._last_bucket)
123 self._measured_rate = (current_rate * self._smoothing) + (
124 self._measured_rate * (1 - self._smoothing)
125 )
126 self._count = 0
127 self._last_bucket = bucket
128 return self._measured_rate
130 @property
131 def measured_rate(self):
132 return self._measured_rate