1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26from typing import TypeVar, Any, Dict, Optional, Type, List, Union, cast, IO
27from io import SEEK_SET, UnsupportedOperation
28import logging
29import time
30from enum import Enum
31from azure.core.configuration import ConnectionConfiguration
32from azure.core.pipeline import PipelineResponse, PipelineRequest, PipelineContext
33from azure.core.pipeline.transport import (
34 HttpResponse as LegacyHttpResponse,
35 AsyncHttpResponse as LegacyAsyncHttpResponse,
36 HttpRequest as LegacyHttpRequest,
37 HttpTransport,
38)
39from azure.core.rest import HttpResponse, AsyncHttpResponse, HttpRequest
40from azure.core.exceptions import (
41 AzureError,
42 ClientAuthenticationError,
43 ServiceResponseError,
44 ServiceRequestError,
45 ServiceRequestTimeoutError,
46 ServiceResponseTimeoutError,
47)
48
49from ._base import HTTPPolicy, RequestHistory
50from . import _utils
51from ..._enum_meta import CaseInsensitiveEnumMeta
52
53HTTPResponseType = TypeVar("HTTPResponseType", HttpResponse, LegacyHttpResponse)
54AllHttpResponseType = TypeVar(
55 "AllHttpResponseType",
56 HttpResponse,
57 LegacyHttpResponse,
58 AsyncHttpResponse,
59 LegacyAsyncHttpResponse,
60)
61HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest)
62ClsRetryPolicy = TypeVar("ClsRetryPolicy", bound="RetryPolicyBase")
63
64_LOGGER = logging.getLogger(__name__)
65
66
67class RetryMode(str, Enum, metaclass=CaseInsensitiveEnumMeta):
68 # pylint: disable=enum-must-be-uppercase
69 """Enum for retry modes."""
70 Exponential = "exponential"
71 Fixed = "fixed"
72
73
74class RetryPolicyBase:
75 # pylint: disable=too-many-instance-attributes
76 #: Maximum backoff time.
77 BACKOFF_MAX = 120
78 _SAFE_CODES = set(range(506)) - set([408, 429, 500, 502, 503, 504])
79 _RETRY_CODES = set(range(999)) - _SAFE_CODES
80
81 def __init__(self, **kwargs: Any) -> None:
82 self.total_retries: int = kwargs.pop("retry_total", 10)
83 self.connect_retries: int = kwargs.pop("retry_connect", 3)
84 self.read_retries: int = kwargs.pop("retry_read", 3)
85 self.status_retries: int = kwargs.pop("retry_status", 3)
86 self.backoff_factor: float = kwargs.pop("retry_backoff_factor", 0.8)
87 self.backoff_max: int = kwargs.pop("retry_backoff_max", self.BACKOFF_MAX)
88 self.retry_mode: RetryMode = kwargs.pop("retry_mode", RetryMode.Exponential)
89 self.timeout: int = kwargs.pop("timeout", 604800)
90
91 retry_codes = self._RETRY_CODES
92 status_codes = kwargs.pop("retry_on_status_codes", [])
93 self._retry_on_status_codes = set(status_codes) | retry_codes
94 self._method_whitelist = frozenset(["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"])
95 self._respect_retry_after_header = True
96 super(RetryPolicyBase, self).__init__()
97
98 @classmethod
99 def no_retries(cls: Type[ClsRetryPolicy]) -> ClsRetryPolicy:
100 """Disable retries.
101
102 :return: A retry policy with retries disabled.
103 :rtype: ~azure.core.pipeline.policies.RetryPolicy or ~azure.core.pipeline.policies.AsyncRetryPolicy
104 """
105 return cls(retry_total=0)
106
107 def configure_retries(self, options: Dict[str, Any]) -> Dict[str, Any]:
108 """Configures the retry settings.
109
110 :param options: keyword arguments from context.
111 :type options: dict
112 :return: A dict containing settings and history for retries.
113 :rtype: dict
114 """
115 return {
116 "total": options.pop("retry_total", self.total_retries),
117 "connect": options.pop("retry_connect", self.connect_retries),
118 "read": options.pop("retry_read", self.read_retries),
119 "status": options.pop("retry_status", self.status_retries),
120 "backoff": options.pop("retry_backoff_factor", self.backoff_factor),
121 "max_backoff": options.pop("retry_backoff_max", self.BACKOFF_MAX),
122 "methods": options.pop("retry_on_methods", self._method_whitelist),
123 "timeout": options.pop("timeout", self.timeout),
124 "history": [],
125 }
126
127 def get_backoff_time(self, settings: Dict[str, Any]) -> float:
128 """Returns the current backoff time.
129
130 :param dict settings: The retry settings.
131 :return: The current backoff value.
132 :rtype: float
133 """
134 # We want to consider only the last consecutive errors sequence (Ignore redirects).
135 consecutive_errors_len = len(settings["history"])
136 if consecutive_errors_len <= 1:
137 return 0
138
139 if self.retry_mode == RetryMode.Fixed:
140 backoff_value = settings["backoff"]
141 else:
142 backoff_value = settings["backoff"] * (2 ** (consecutive_errors_len - 1))
143 return min(settings["max_backoff"], backoff_value)
144
145 def parse_retry_after(self, retry_after: str) -> float:
146 """Helper to parse Retry-After and get value in seconds.
147
148 :param str retry_after: Retry-After header
149 :rtype: float
150 :return: Value of Retry-After in seconds.
151 """
152 return _utils.parse_retry_after(retry_after)
153
154 def get_retry_after(self, response: PipelineResponse[Any, AllHttpResponseType]) -> Optional[float]:
155 """Get the value of Retry-After in seconds.
156
157 :param response: The PipelineResponse object
158 :type response: ~azure.core.pipeline.PipelineResponse
159 :return: Value of Retry-After in seconds.
160 :rtype: float or None
161 """
162 return _utils.get_retry_after(response)
163
164 def _is_connection_error(self, err: Exception) -> bool:
165 """Errors when we're fairly sure that the server did not receive the
166 request, so it should be safe to retry.
167
168 :param err: The error raised by the pipeline.
169 :type err: ~azure.core.exceptions.AzureError
170 :return: True if connection error, False if not.
171 :rtype: bool
172 """
173 return isinstance(err, ServiceRequestError)
174
175 def _is_read_error(self, err: Exception) -> bool:
176 """Errors that occur after the request has been started, so we should
177 assume that the server began processing it.
178
179 :param err: The error raised by the pipeline.
180 :type err: ~azure.core.exceptions.AzureError
181 :return: True if read error, False if not.
182 :rtype: bool
183 """
184 return isinstance(err, ServiceResponseError)
185
186 def _is_method_retryable(
187 self,
188 settings: Dict[str, Any],
189 request: HTTPRequestType,
190 response: Optional[AllHttpResponseType] = None,
191 ):
192 """Checks if a given HTTP method should be retried upon, depending if
193 it is included on the method allowlist.
194
195 :param dict settings: The retry settings.
196 :param request: The HTTP request object.
197 :type request: ~azure.core.rest.HttpRequest
198 :param response: The HTTP response object.
199 :type response: ~azure.core.rest.HttpResponse or ~azure.core.rest.AsyncHttpResponse
200 :return: True if method should be retried upon. False if not in method allowlist.
201 :rtype: bool
202 """
203 if response and request.method.upper() in ["POST", "PATCH"] and response.status_code in [500, 503, 504]:
204 return True
205 if request.method.upper() not in settings["methods"]:
206 return False
207
208 return True
209
210 def is_retry(
211 self,
212 settings: Dict[str, Any],
213 response: PipelineResponse[HTTPRequestType, AllHttpResponseType],
214 ) -> bool:
215 """Checks if method/status code is retryable.
216
217 Based on allowlists and control variables such as the number of
218 total retries to allow, whether to respect the Retry-After header,
219 whether this header is present, and whether the returned status
220 code is on the list of status codes to be retried upon on the
221 presence of the aforementioned header.
222
223 The behavior is:
224 - If status_code < 400: don't retry
225 - Else if Retry-After present: retry
226 - Else: retry based on the safe status code list ([408, 429, 500, 502, 503, 504])
227
228
229 :param dict settings: The retry settings.
230 :param response: The PipelineResponse object
231 :type response: ~azure.core.pipeline.PipelineResponse
232 :return: True if method/status code is retryable. False if not retryable.
233 :rtype: bool
234 """
235 if response.http_response.status_code < 400:
236 return False
237 has_retry_after = bool(response.http_response.headers.get("Retry-After"))
238 if has_retry_after and self._respect_retry_after_header:
239 return True
240 if not self._is_method_retryable(settings, response.http_request, response=response.http_response):
241 return False
242 return settings["total"] and response.http_response.status_code in self._retry_on_status_codes
243
244 def is_exhausted(self, settings: Dict[str, Any]) -> bool:
245 """Checks if any retries left.
246
247 :param dict settings: the retry settings
248 :return: False if have more retries. True if retries exhausted.
249 :rtype: bool
250 """
251 settings_retry_count = (
252 settings["total"],
253 settings["connect"],
254 settings["read"],
255 settings["status"],
256 )
257 retry_counts: List[int] = list(filter(None, settings_retry_count))
258 if not retry_counts:
259 return False
260
261 return min(retry_counts) < 0
262
263 def increment(
264 self,
265 settings: Dict[str, Any],
266 response: Optional[
267 Union[
268 PipelineRequest[HTTPRequestType],
269 PipelineResponse[HTTPRequestType, AllHttpResponseType],
270 ]
271 ] = None,
272 error: Optional[Exception] = None,
273 ) -> bool:
274 """Increment the retry counters.
275
276 :param settings: The retry settings.
277 :type settings: dict
278 :param response: A pipeline response object.
279 :type response: ~azure.core.pipeline.PipelineResponse
280 :param error: An error encountered during the request, or
281 None if the response was received successfully.
282 :type error: ~azure.core.exceptions.AzureError
283 :return: Whether any retry attempt is available
284 True if more retry attempts available, False otherwise
285 :rtype: bool
286 """
287 # FIXME This code is not None safe: https://github.com/Azure/azure-sdk-for-python/issues/31528
288 response = cast(
289 Union[
290 PipelineRequest[HTTPRequestType],
291 PipelineResponse[HTTPRequestType, AllHttpResponseType],
292 ],
293 response,
294 )
295
296 settings["total"] -= 1
297
298 if isinstance(response, PipelineResponse) and response.http_response.status_code == 202:
299 return False
300
301 if error and self._is_connection_error(error):
302 # Connect retry?
303 settings["connect"] -= 1
304 settings["history"].append(RequestHistory(response.http_request, error=error))
305
306 elif error and self._is_read_error(error):
307 # Read retry?
308 settings["read"] -= 1
309 if hasattr(response, "http_request"):
310 settings["history"].append(RequestHistory(response.http_request, error=error))
311
312 else:
313 # Incrementing because of a server error like a 500 in
314 # status_forcelist and the given method is in the allowlist
315 if response:
316 settings["status"] -= 1
317 if hasattr(response, "http_request") and hasattr(response, "http_response"):
318 settings["history"].append(
319 RequestHistory(response.http_request, http_response=response.http_response)
320 )
321
322 if self.is_exhausted(settings):
323 return False
324
325 if response.http_request.body and hasattr(response.http_request.body, "read"):
326 if "body_position" not in settings:
327 return False
328 try:
329 # attempt to rewind the body to the initial position
330 # If it has "read", it has "seek", so casting for mypy
331 cast(IO[bytes], response.http_request.body).seek(settings["body_position"], SEEK_SET)
332 except (UnsupportedOperation, ValueError, AttributeError):
333 # if body is not seekable, then retry would not work
334 return False
335 file_positions = settings.get("file_positions")
336 if response.http_request.files and file_positions:
337 try:
338 for value in response.http_request.files.values():
339 file_name, body = value[0], value[1]
340 if file_name in file_positions:
341 position = file_positions[file_name]
342 body.seek(position, SEEK_SET)
343 except (UnsupportedOperation, ValueError, AttributeError):
344 # if body is not seekable, then retry would not work
345 return False
346 return True
347
348 def update_context(self, context: PipelineContext, retry_settings: Dict[str, Any]) -> None:
349 """Updates retry history in pipeline context.
350
351 :param context: The pipeline context.
352 :type context: ~azure.core.pipeline.PipelineContext
353 :param retry_settings: The retry settings.
354 :type retry_settings: dict
355 """
356 if retry_settings["history"]:
357 context["history"] = retry_settings["history"]
358
359 def _configure_timeout(
360 self,
361 request: PipelineRequest[HTTPRequestType],
362 absolute_timeout: float,
363 is_response_error: bool,
364 ) -> None:
365 if absolute_timeout <= 0:
366 if is_response_error:
367 raise ServiceResponseTimeoutError("Response timeout")
368 raise ServiceRequestTimeoutError("Request timeout")
369
370 # if connection_timeout is already set, ensure it doesn't exceed absolute_timeout
371 connection_timeout = request.context.options.get("connection_timeout")
372 if connection_timeout:
373 request.context.options["connection_timeout"] = min(connection_timeout, absolute_timeout)
374
375 # otherwise, try to ensure the transport's configured connection_timeout doesn't exceed absolute_timeout
376 # ("connection_config" isn't defined on Async/HttpTransport but all implementations in this library have it)
377 elif hasattr(request.context.transport, "connection_config"):
378 # FIXME This is fragile, should be refactored. Casting my way for mypy
379 # https://github.com/Azure/azure-sdk-for-python/issues/31530
380 connection_config = cast(
381 ConnectionConfiguration, request.context.transport.connection_config # type: ignore
382 )
383
384 default_timeout = getattr(connection_config, "timeout", absolute_timeout)
385 try:
386 if absolute_timeout < default_timeout:
387 request.context.options["connection_timeout"] = absolute_timeout
388 except TypeError:
389 # transport.connection_config.timeout is something unexpected (not a number)
390 pass
391
392 def _configure_positions(self, request: PipelineRequest[HTTPRequestType], retry_settings: Dict[str, Any]) -> None:
393 body_position = None
394 file_positions: Optional[Dict[str, int]] = None
395 if request.http_request.body and hasattr(request.http_request.body, "read"):
396 try:
397 # If it has "read", it has "tell", so casting for mypy
398 body_position = cast(IO[bytes], request.http_request.body).tell()
399 except (AttributeError, UnsupportedOperation):
400 # if body position cannot be obtained, then retries will not work
401 pass
402 else:
403 if request.http_request.files:
404 file_positions = {}
405 try:
406 for value in request.http_request.files.values():
407 name, body = value[0], value[1]
408 if name and body and hasattr(body, "read"):
409 # If it has "read", it has "tell", so casting for mypy
410 position = cast(IO[bytes], body).tell()
411 file_positions[name] = position
412 except (AttributeError, UnsupportedOperation):
413 file_positions = None
414
415 retry_settings["body_position"] = body_position
416 retry_settings["file_positions"] = file_positions
417
418
419class RetryPolicy(RetryPolicyBase, HTTPPolicy[HTTPRequestType, HTTPResponseType]):
420 """A retry policy.
421
422 The retry policy in the pipeline can be configured directly, or tweaked on a per-call basis.
423
424 :keyword int retry_total: Total number of retries to allow. Takes precedence over other counts.
425 Default value is 10.
426 :keyword int retry_connect: How many connection-related errors to retry on.
427 These are errors raised before the request is sent to the remote server,
428 which we assume has not triggered the server to process the request. Default value is 3.
429 :keyword int retry_read: How many times to retry on read errors.
430 These errors are raised after the request was sent to the server, so the
431 request may have side-effects. Default value is 3.
432 :keyword int retry_status: How many times to retry on bad status codes. Default value is 3.
433 :keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
434 (most errors are resolved immediately by a second try without a delay).
435 In fixed mode, retry policy will always sleep for {backoff factor}.
436 In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
437 seconds. If the backoff_factor is 0.1, then the retry will sleep
438 for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
439 :keyword int retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
440 :keyword RetryMode retry_mode: Fixed or exponential delay between attemps, default is exponential.
441 :keyword int timeout: Timeout setting for the operation in seconds, default is 604800s (7 days).
442
443 .. admonition:: Example:
444
445 .. literalinclude:: ../samples/test_example_sync.py
446 :start-after: [START retry_policy]
447 :end-before: [END retry_policy]
448 :language: python
449 :dedent: 4
450 :caption: Configuring a retry policy.
451 """
452
453 def _sleep_for_retry(
454 self,
455 response: PipelineResponse[HTTPRequestType, HTTPResponseType],
456 transport: HttpTransport[HTTPRequestType, HTTPResponseType],
457 ) -> bool:
458 """Sleep based on the Retry-After response header value.
459
460 :param response: The PipelineResponse object.
461 :type response: ~azure.core.pipeline.PipelineResponse
462 :param transport: The HTTP transport type.
463 :type transport: ~azure.core.pipeline.transport.HttpTransport
464 :return: Whether a sleep was done or not
465 :rtype: bool
466 """
467 retry_after = self.get_retry_after(response)
468 if retry_after:
469 transport.sleep(retry_after)
470 return True
471 return False
472
473 def _sleep_backoff(
474 self,
475 settings: Dict[str, Any],
476 transport: HttpTransport[HTTPRequestType, HTTPResponseType],
477 ) -> None:
478 """Sleep using exponential backoff. Immediately returns if backoff is 0.
479
480 :param dict settings: The retry settings.
481 :param transport: The HTTP transport type.
482 :type transport: ~azure.core.pipeline.transport.HttpTransport
483 """
484 backoff = self.get_backoff_time(settings)
485 if backoff <= 0:
486 return
487 transport.sleep(backoff)
488
489 def sleep(
490 self,
491 settings: Dict[str, Any],
492 transport: HttpTransport[HTTPRequestType, HTTPResponseType],
493 response: Optional[PipelineResponse[HTTPRequestType, HTTPResponseType]] = None,
494 ) -> None:
495 """Sleep between retry attempts.
496
497 This method will respect a server's ``Retry-After`` response header
498 and sleep the duration of the time requested. If that is not present, it
499 will use an exponential backoff. By default, the backoff factor is 0 and
500 this method will return immediately.
501
502 :param dict settings: The retry settings.
503 :param transport: The HTTP transport type.
504 :type transport: ~azure.core.pipeline.transport.HttpTransport
505 :param response: The PipelineResponse object.
506 :type response: ~azure.core.pipeline.PipelineResponse
507 """
508 if response:
509 slept = self._sleep_for_retry(response, transport)
510 if slept:
511 return
512 self._sleep_backoff(settings, transport)
513
514 def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]:
515 """Sends the PipelineRequest object to the next policy. Uses retry settings if necessary.
516
517 :param request: The PipelineRequest object
518 :type request: ~azure.core.pipeline.PipelineRequest
519 :return: The PipelineResponse.
520 :rtype: ~azure.core.pipeline.PipelineResponse
521 :raises ~azure.core.exceptions.AzureError: if maximum retries exceeded.
522 :raises ~azure.core.exceptions.ClientAuthenticationError: if authentication fails.
523 """
524 retry_active = True
525 response = None
526 retry_settings = self.configure_retries(request.context.options)
527 self._configure_positions(request, retry_settings)
528
529 absolute_timeout = retry_settings["timeout"]
530 is_response_error = True
531
532 while retry_active:
533 start_time = time.time()
534 # PipelineContext types transport as a Union of HttpTransport and AsyncHttpTransport, but
535 # here we know that this is an HttpTransport.
536 # The correct fix is to make PipelineContext generic, but that's a breaking change and a lot of
537 # generic to update in Pipeline, PipelineClient, PipelineRequest, PipelineResponse, etc.
538 transport: HttpTransport[HTTPRequestType, HTTPResponseType] = cast(
539 HttpTransport[HTTPRequestType, HTTPResponseType],
540 request.context.transport,
541 )
542 try:
543 self._configure_timeout(request, absolute_timeout, is_response_error)
544 request.context["retry_count"] = len(retry_settings["history"])
545 response = self.next.send(request)
546 if self.is_retry(retry_settings, response):
547 retry_active = self.increment(retry_settings, response=response)
548 if retry_active:
549 self.sleep(retry_settings, transport, response=response)
550 is_response_error = True
551 continue
552 break
553 except ClientAuthenticationError:
554 # the authentication policy failed such that the client's request can't
555 # succeed--we'll never have a response to it, so propagate the exception
556 raise
557 except AzureError as err:
558 if absolute_timeout > 0 and self._is_method_retryable(retry_settings, request.http_request):
559 retry_active = self.increment(retry_settings, response=request, error=err)
560 if retry_active:
561 self.sleep(retry_settings, transport)
562 if isinstance(err, ServiceRequestError):
563 is_response_error = False
564 else:
565 is_response_error = True
566 continue
567 raise err
568 finally:
569 end_time = time.time()
570 if absolute_timeout:
571 absolute_timeout -= end_time - start_time
572 if not response:
573 raise AzureError("Maximum retries exceeded.")
574
575 self.update_context(response.context, retry_settings)
576 return response