1"""Standard retry behavior.
2
3This contains the default standard retry behavior.
4It provides consistent behavior with other AWS SDKs.
5
6The key base classes uses for retries:
7
8 * ``BaseRetryableChecker`` - Use to check a specific condition that
9 indicates a retry should happen. This can include things like
10 max attempts, HTTP status code checks, error code checks etc.
11 * ``RetryBackoff`` - Use to determine how long we should backoff until
12 we retry a request. This is the class that will implement delay such
13 as exponential backoff.
14 * ``RetryPolicy`` - Main class that determines if a retry should
15 happen. It can combine data from a various BaseRetryableCheckers
16 to make a final call as to whether or not a retry should happen.
17 It then uses a ``BaseRetryBackoff`` to determine how long to delay.
18 * ``RetryHandler`` - The bridge between botocore's event system
19 used by endpoint.py to manage retries and the interfaces defined
20 in this module.
21
22This allows us to define an API that has minimal coupling to the event
23based API used by botocore.
24
25"""
26
27import logging
28import random
29
30from botocore.exceptions import (
31 ConnectionError,
32 ConnectTimeoutError,
33 HTTPClientError,
34 ReadTimeoutError,
35)
36from botocore.retries import quota, special
37from botocore.retries.base import BaseRetryableChecker, BaseRetryBackoff
38
39DEFAULT_MAX_ATTEMPTS = 3
40logger = logging.getLogger(__name__)
41
42
43def register_retry_handler(client, max_attempts=DEFAULT_MAX_ATTEMPTS):
44 retry_quota = RetryQuotaChecker(quota.RetryQuota())
45
46 service_id = client.meta.service_model.service_id
47 service_event_name = service_id.hyphenize()
48 client.meta.events.register(
49 f'after-call.{service_event_name}', retry_quota.release_retry_quota
50 )
51
52 handler = RetryHandler(
53 retry_policy=RetryPolicy(
54 retry_checker=StandardRetryConditions(max_attempts=max_attempts),
55 retry_backoff=ExponentialBackoff(),
56 ),
57 retry_event_adapter=RetryEventAdapter(),
58 retry_quota=retry_quota,
59 )
60
61 unique_id = f'retry-config-{service_event_name}'
62 client.meta.events.register(
63 f'needs-retry.{service_event_name}',
64 handler.needs_retry,
65 unique_id=unique_id,
66 )
67 return handler
68
69
70class RetryHandler:
71 """Bridge between botocore's event system and this module.
72
73 This class is intended to be hooked to botocore's event system
74 as an event handler.
75 """
76
77 def __init__(self, retry_policy, retry_event_adapter, retry_quota):
78 self._retry_policy = retry_policy
79 self._retry_event_adapter = retry_event_adapter
80 self._retry_quota = retry_quota
81
82 def needs_retry(self, **kwargs):
83 """Connect as a handler to the needs-retry event."""
84 retry_delay = None
85 context = self._retry_event_adapter.create_retry_context(**kwargs)
86 if self._retry_policy.should_retry(context):
87 # Before we can retry we need to ensure we have sufficient
88 # capacity in our retry quota.
89 if self._retry_quota.acquire_retry_quota(context):
90 retry_delay = self._retry_policy.compute_retry_delay(context)
91 logger.debug(
92 "Retry needed, retrying request after delay of: %s",
93 retry_delay,
94 )
95 else:
96 logger.debug(
97 "Retry needed but retry quota reached, "
98 "not retrying request."
99 )
100 else:
101 logger.debug("Not retrying request.")
102 self._retry_event_adapter.adapt_retry_response_from_context(context)
103 return retry_delay
104
105
106class RetryEventAdapter:
107 """Adapter to existing retry interface used in the endpoints layer.
108
109 This existing interface for determining if a retry needs to happen
110 is event based and used in ``botocore.endpoint``. The interface has
111 grown organically over the years and could use some cleanup. This
112 adapter converts that interface into the interface used by the
113 new retry strategies.
114
115 """
116
117 def create_retry_context(self, **kwargs):
118 """Create context based on needs-retry kwargs."""
119 response = kwargs['response']
120 if response is None:
121 # If response is None it means that an exception was raised
122 # because we never received a response from the service. This
123 # could be something like a ConnectionError we get from our
124 # http layer.
125 http_response = None
126 parsed_response = None
127 else:
128 http_response, parsed_response = response
129 # This provides isolation between the kwargs emitted in the
130 # needs-retry event, and what this module uses to check for
131 # retries.
132 context = RetryContext(
133 attempt_number=kwargs['attempts'],
134 operation_model=kwargs['operation'],
135 http_response=http_response,
136 parsed_response=parsed_response,
137 caught_exception=kwargs['caught_exception'],
138 request_context=kwargs['request_dict']['context'],
139 )
140 return context
141
142 def adapt_retry_response_from_context(self, context):
143 """Modify response back to user back from context."""
144 # This will mutate attributes that are returned back to the end
145 # user. We do it this way so that all the various retry classes
146 # don't mutate any input parameters from the needs-retry event.
147 metadata = context.get_retry_metadata()
148 if context.parsed_response is not None:
149 context.parsed_response.setdefault('ResponseMetadata', {}).update(
150 metadata
151 )
152
153
154# Implementation note: this is meant to encapsulate all the misc stuff
155# that gets sent in the needs-retry event. This is mapped so that params
156# are more clear and explicit.
157class RetryContext:
158 """Normalize a response that we use to check if a retry should occur.
159
160 This class smoothes over the different types of responses we may get
161 from a service including:
162
163 * A modeled error response from the service that contains a service
164 code and error message.
165 * A raw HTTP response that doesn't contain service protocol specific
166 error keys.
167 * An exception received while attempting to retrieve a response.
168 This could be a ConnectionError we receive from our HTTP layer which
169 could represent that we weren't able to receive a response from
170 the service.
171
172 This class guarantees that at least one of the above attributes will be
173 non None.
174
175 This class is meant to provide a read-only view into the properties
176 associated with a possible retryable response. None of the properties
177 are meant to be modified directly.
178
179 """
180
181 def __init__(
182 self,
183 attempt_number,
184 operation_model=None,
185 parsed_response=None,
186 http_response=None,
187 caught_exception=None,
188 request_context=None,
189 ):
190 # 1-based attempt number.
191 self.attempt_number = attempt_number
192 self.operation_model = operation_model
193 # This is the parsed response dictionary we get from parsing
194 # the HTTP response from the service.
195 self.parsed_response = parsed_response
196 # This is an instance of botocore.awsrequest.AWSResponse.
197 self.http_response = http_response
198 # This is a subclass of Exception that will be non None if
199 # an exception was raised when retrying to retrieve a response.
200 self.caught_exception = caught_exception
201 # This is the request context dictionary that's added to the
202 # request dict. This is used to story any additional state
203 # about the request. We use this for storing retry quota
204 # capacity.
205 if request_context is None:
206 request_context = {}
207 self.request_context = request_context
208 self._retry_metadata = {}
209
210 # These are misc helper methods to avoid duplication in the various
211 # checkers.
212 def get_error_code(self):
213 """Check if there was a parsed response with an error code.
214
215 If we could not find any error codes, ``None`` is returned.
216
217 """
218 if self.parsed_response is None:
219 return
220 error = self.parsed_response.get('Error', {})
221 if not isinstance(error, dict):
222 return
223 return error.get('Code')
224
225 def add_retry_metadata(self, **kwargs):
226 """Add key/value pairs to the retry metadata.
227
228 This allows any objects during the retry process to add
229 metadata about any checks/validations that happened.
230
231 This gets added to the response metadata in the retry handler.
232
233 """
234 self._retry_metadata.update(**kwargs)
235
236 def get_retry_metadata(self):
237 return self._retry_metadata.copy()
238
239
240class RetryPolicy:
241 def __init__(self, retry_checker, retry_backoff):
242 self._retry_checker = retry_checker
243 self._retry_backoff = retry_backoff
244
245 def should_retry(self, context):
246 return self._retry_checker.is_retryable(context)
247
248 def compute_retry_delay(self, context):
249 return self._retry_backoff.delay_amount(context)
250
251
252class ExponentialBackoff(BaseRetryBackoff):
253 _BASE = 2
254 _MAX_BACKOFF = 20
255
256 def __init__(self, max_backoff=20, random=random.random):
257 self._base = self._BASE
258 self._max_backoff = max_backoff
259 self._random = random
260
261 def delay_amount(self, context):
262 """Calculates delay based on exponential backoff.
263
264 This class implements truncated binary exponential backoff
265 with jitter::
266
267 t_i = rand(0, 1) * min(2 ** attempt, MAX_BACKOFF)
268
269 where ``i`` is the request attempt (0 based).
270
271 """
272 # The context.attempt_number is a 1-based value, but we have
273 # to calculate the delay based on i based a 0-based value. We
274 # want the first delay to just be ``rand(0, 1)``.
275 return self._random() * min(
276 (self._base ** (context.attempt_number - 1)),
277 self._max_backoff,
278 )
279
280
281class MaxAttemptsChecker(BaseRetryableChecker):
282 def __init__(self, max_attempts):
283 self._max_attempts = max_attempts
284
285 def is_retryable(self, context):
286 under_max_attempts = context.attempt_number < self._max_attempts
287 retries_context = context.request_context.get('retries')
288 if retries_context:
289 retries_context['max'] = max(
290 retries_context.get('max', 0), self._max_attempts
291 )
292 if not under_max_attempts:
293 logger.debug("Max attempts of %s reached.", self._max_attempts)
294 context.add_retry_metadata(MaxAttemptsReached=True)
295 return under_max_attempts
296
297
298class TransientRetryableChecker(BaseRetryableChecker):
299 _TRANSIENT_ERROR_CODES = [
300 'RequestTimeout',
301 'RequestTimeoutException',
302 'PriorRequestNotComplete',
303 ]
304 _TRANSIENT_STATUS_CODES = [500, 502, 503, 504]
305 _TRANSIENT_EXCEPTION_CLS = (
306 ConnectionError,
307 HTTPClientError,
308 )
309
310 def __init__(
311 self,
312 transient_error_codes=None,
313 transient_status_codes=None,
314 transient_exception_cls=None,
315 ):
316 if transient_error_codes is None:
317 transient_error_codes = self._TRANSIENT_ERROR_CODES[:]
318 if transient_status_codes is None:
319 transient_status_codes = self._TRANSIENT_STATUS_CODES[:]
320 if transient_exception_cls is None:
321 transient_exception_cls = self._TRANSIENT_EXCEPTION_CLS
322 self._transient_error_codes = transient_error_codes
323 self._transient_status_codes = transient_status_codes
324 self._transient_exception_cls = transient_exception_cls
325
326 def is_retryable(self, context):
327 if context.get_error_code() in self._transient_error_codes:
328 return True
329 if context.http_response is not None:
330 if (
331 context.http_response.status_code
332 in self._transient_status_codes
333 ):
334 return True
335 if context.caught_exception is not None:
336 return isinstance(
337 context.caught_exception, self._transient_exception_cls
338 )
339 return False
340
341
342class ThrottledRetryableChecker(BaseRetryableChecker):
343 # This is the union of all error codes we've seen that represent
344 # a throttled error.
345 _THROTTLED_ERROR_CODES = [
346 'Throttling',
347 'ThrottlingException',
348 'ThrottledException',
349 'RequestThrottledException',
350 'TooManyRequestsException',
351 'ProvisionedThroughputExceededException',
352 'TransactionInProgressException',
353 'RequestLimitExceeded',
354 'BandwidthLimitExceeded',
355 'LimitExceededException',
356 'RequestThrottled',
357 'SlowDown',
358 'PriorRequestNotComplete',
359 'EC2ThrottledException',
360 ]
361
362 def __init__(self, throttled_error_codes=None):
363 if throttled_error_codes is None:
364 throttled_error_codes = self._THROTTLED_ERROR_CODES[:]
365 self._throttled_error_codes = throttled_error_codes
366
367 def is_retryable(self, context):
368 # Only the error code from a parsed service response is used
369 # to determine if the response is a throttled response.
370 return context.get_error_code() in self._throttled_error_codes
371
372
373class ModeledRetryableChecker(BaseRetryableChecker):
374 """Check if an error has been modeled as retryable."""
375
376 def __init__(self):
377 self._error_detector = ModeledRetryErrorDetector()
378
379 def is_retryable(self, context):
380 error_code = context.get_error_code()
381 if error_code is None:
382 return False
383 return self._error_detector.detect_error_type(context) is not None
384
385
386class ModeledRetryErrorDetector:
387 """Checks whether or not an error is a modeled retryable error."""
388
389 # There are return values from the detect_error_type() method.
390 TRANSIENT_ERROR = 'TRANSIENT_ERROR'
391 THROTTLING_ERROR = 'THROTTLING_ERROR'
392 # This class is lower level than ModeledRetryableChecker, which
393 # implements BaseRetryableChecker. This object allows you to distinguish
394 # between the various types of retryable errors.
395
396 def detect_error_type(self, context):
397 """Detect the error type associated with an error code and model.
398
399 This will either return:
400
401 * ``self.TRANSIENT_ERROR`` - If the error is a transient error
402 * ``self.THROTTLING_ERROR`` - If the error is a throttling error
403 * ``None`` - If the error is neither type of error.
404
405 """
406 error_code = context.get_error_code()
407 op_model = context.operation_model
408 if op_model is None or not op_model.error_shapes:
409 return
410 for shape in op_model.error_shapes:
411 if shape.metadata.get('retryable') is not None:
412 # Check if this error code matches the shape. This can
413 # be either by name or by a modeled error code.
414 error_code_to_check = (
415 shape.metadata.get('error', {}).get('code') or shape.name
416 )
417 if error_code == error_code_to_check:
418 if shape.metadata['retryable'].get('throttling'):
419 return self.THROTTLING_ERROR
420 return self.TRANSIENT_ERROR
421
422
423class ThrottlingErrorDetector:
424 def __init__(self, retry_event_adapter):
425 self._modeled_error_detector = ModeledRetryErrorDetector()
426 self._fixed_error_code_detector = ThrottledRetryableChecker()
427 self._retry_event_adapter = retry_event_adapter
428
429 # This expects the kwargs from needs-retry to be passed through.
430 def is_throttling_error(self, **kwargs):
431 context = self._retry_event_adapter.create_retry_context(**kwargs)
432 if self._fixed_error_code_detector.is_retryable(context):
433 return True
434 error_type = self._modeled_error_detector.detect_error_type(context)
435 return error_type == self._modeled_error_detector.THROTTLING_ERROR
436
437
438class StandardRetryConditions(BaseRetryableChecker):
439 """Concrete class that implements the standard retry policy checks.
440
441 Specifically:
442
443 not max_attempts and (transient or throttled or modeled_retry)
444
445 """
446
447 def __init__(self, max_attempts=DEFAULT_MAX_ATTEMPTS):
448 # Note: This class is for convenience so you can have the
449 # standard retry condition in a single class.
450 self._max_attempts_checker = MaxAttemptsChecker(max_attempts)
451 self._additional_checkers = OrRetryChecker(
452 [
453 TransientRetryableChecker(),
454 ThrottledRetryableChecker(),
455 ModeledRetryableChecker(),
456 OrRetryChecker(
457 [
458 special.RetryIDPCommunicationError(),
459 special.RetryDDBChecksumError(),
460 ]
461 ),
462 ]
463 )
464
465 def is_retryable(self, context):
466 return self._max_attempts_checker.is_retryable(
467 context
468 ) and self._additional_checkers.is_retryable(context)
469
470
471class OrRetryChecker(BaseRetryableChecker):
472 def __init__(self, checkers):
473 self._checkers = checkers
474
475 def is_retryable(self, context):
476 return any(checker.is_retryable(context) for checker in self._checkers)
477
478
479class RetryQuotaChecker:
480 _RETRY_COST = 5
481 _NO_RETRY_INCREMENT = 1
482 _TIMEOUT_RETRY_REQUEST = 10
483 _TIMEOUT_EXCEPTIONS = (ConnectTimeoutError, ReadTimeoutError)
484
485 # Implementation note: We're not making this a BaseRetryableChecker
486 # because this isn't just a check if we can retry. This also changes
487 # state so we have to careful when/how we call this. Making it
488 # a BaseRetryableChecker implies you can call .is_retryable(context)
489 # as many times as you want and not affect anything.
490
491 def __init__(self, quota):
492 self._quota = quota
493 # This tracks the last amount
494 self._last_amount_acquired = None
495
496 def acquire_retry_quota(self, context):
497 if self._is_timeout_error(context):
498 capacity_amount = self._TIMEOUT_RETRY_REQUEST
499 else:
500 capacity_amount = self._RETRY_COST
501 success = self._quota.acquire(capacity_amount)
502 if success:
503 # We add the capacity amount to the request context so we know
504 # how much to release later. The capacity amount can vary based
505 # on the error.
506 context.request_context['retry_quota_capacity'] = capacity_amount
507 return True
508 context.add_retry_metadata(RetryQuotaReached=True)
509 return False
510
511 def _is_timeout_error(self, context):
512 return isinstance(context.caught_exception, self._TIMEOUT_EXCEPTIONS)
513
514 # This is intended to be hooked up to ``after-call``.
515 def release_retry_quota(self, context, http_response, **kwargs):
516 # There's three possible options.
517 # 1. The HTTP response did not have a 2xx response. In that case we
518 # give no quota back.
519 # 2. The HTTP request was successful and was never retried. In
520 # that case we give _NO_RETRY_INCREMENT back.
521 # 3. The API call had retries, and we eventually receive an HTTP
522 # response with a 2xx status code. In that case we give back
523 # whatever quota was associated with the last acquisition.
524 if http_response is None:
525 return
526 status_code = http_response.status_code
527 if 200 <= status_code < 300:
528 if 'retry_quota_capacity' not in context:
529 self._quota.release(self._NO_RETRY_INCREMENT)
530 else:
531 capacity_amount = context['retry_quota_capacity']
532 self._quota.release(capacity_amount)