1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26from typing import TypeVar, Any, Dict, Optional, Type, List, Union, cast, IO
27from io import SEEK_SET, UnsupportedOperation
28import logging
29import time
30from enum import Enum
31from azure.core.configuration import ConnectionConfiguration
32from azure.core.pipeline import PipelineResponse, PipelineRequest, PipelineContext
33from azure.core.pipeline.transport import (
34 HttpResponse as LegacyHttpResponse,
35 AsyncHttpResponse as LegacyAsyncHttpResponse,
36 HttpRequest as LegacyHttpRequest,
37 HttpTransport,
38)
39from azure.core.rest import HttpResponse, AsyncHttpResponse, HttpRequest
40from azure.core.exceptions import (
41 AzureError,
42 ClientAuthenticationError,
43 ServiceResponseError,
44 ServiceRequestError,
45 ServiceRequestTimeoutError,
46 ServiceResponseTimeoutError,
47)
48
49from ._base import HTTPPolicy, RequestHistory
50from . import _utils
51from ..._enum_meta import CaseInsensitiveEnumMeta
52
53HTTPResponseType = TypeVar("HTTPResponseType", HttpResponse, LegacyHttpResponse)
54AllHttpResponseType = TypeVar(
55 "AllHttpResponseType", HttpResponse, LegacyHttpResponse, AsyncHttpResponse, LegacyAsyncHttpResponse
56)
57HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest)
58ClsRetryPolicy = TypeVar("ClsRetryPolicy", bound="RetryPolicyBase")
59
60_LOGGER = logging.getLogger(__name__)
61
62
63class RetryMode(str, Enum, metaclass=CaseInsensitiveEnumMeta):
64 # pylint: disable=enum-must-be-uppercase
65 Exponential = "exponential"
66 Fixed = "fixed"
67
68
69class RetryPolicyBase:
70 # pylint: disable=too-many-instance-attributes
71 #: Maximum backoff time.
72 BACKOFF_MAX = 120
73 _SAFE_CODES = set(range(506)) - set([408, 429, 500, 502, 503, 504])
74 _RETRY_CODES = set(range(999)) - _SAFE_CODES
75
76 def __init__(self, **kwargs: Any) -> None:
77 self.total_retries: int = kwargs.pop("retry_total", 10)
78 self.connect_retries: int = kwargs.pop("retry_connect", 3)
79 self.read_retries: int = kwargs.pop("retry_read", 3)
80 self.status_retries: int = kwargs.pop("retry_status", 3)
81 self.backoff_factor: float = kwargs.pop("retry_backoff_factor", 0.8)
82 self.backoff_max: int = kwargs.pop("retry_backoff_max", self.BACKOFF_MAX)
83 self.retry_mode: RetryMode = kwargs.pop("retry_mode", RetryMode.Exponential)
84 self.timeout: int = kwargs.pop("timeout", 604800)
85
86 retry_codes = self._RETRY_CODES
87 status_codes = kwargs.pop("retry_on_status_codes", [])
88 self._retry_on_status_codes = set(status_codes) | retry_codes
89 self._method_whitelist = frozenset(["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"])
90 self._respect_retry_after_header = True
91 super(RetryPolicyBase, self).__init__()
92
93 @classmethod
94 def no_retries(cls: Type[ClsRetryPolicy]) -> ClsRetryPolicy:
95 """Disable retries.
96
97 :return: A retry policy with retries disabled.
98 :rtype: ~azure.core.pipeline.policies.RetryPolicy or ~azure.core.pipeline.policies.AsyncRetryPolicy
99 """
100 return cls(retry_total=0)
101
102 def configure_retries(self, options: Dict[str, Any]) -> Dict[str, Any]:
103 """Configures the retry settings.
104
105 :param options: keyword arguments from context.
106 :type options: dict
107 :return: A dict containing settings and history for retries.
108 :rtype: dict
109 """
110 return {
111 "total": options.pop("retry_total", self.total_retries),
112 "connect": options.pop("retry_connect", self.connect_retries),
113 "read": options.pop("retry_read", self.read_retries),
114 "status": options.pop("retry_status", self.status_retries),
115 "backoff": options.pop("retry_backoff_factor", self.backoff_factor),
116 "max_backoff": options.pop("retry_backoff_max", self.BACKOFF_MAX),
117 "methods": options.pop("retry_on_methods", self._method_whitelist),
118 "timeout": options.pop("timeout", self.timeout),
119 "history": [],
120 }
121
122 def get_backoff_time(self, settings: Dict[str, Any]) -> float:
123 """Returns the current backoff time.
124
125 :param dict settings: The retry settings.
126 :return: The current backoff value.
127 :rtype: float
128 """
129 # We want to consider only the last consecutive errors sequence (Ignore redirects).
130 consecutive_errors_len = len(settings["history"])
131 if consecutive_errors_len <= 1:
132 return 0
133
134 if self.retry_mode == RetryMode.Fixed:
135 backoff_value = settings["backoff"]
136 else:
137 backoff_value = settings["backoff"] * (2 ** (consecutive_errors_len - 1))
138 return min(settings["max_backoff"], backoff_value)
139
140 def parse_retry_after(self, retry_after: str) -> float:
141 """Helper to parse Retry-After and get value in seconds.
142
143 :param str retry_after: Retry-After header
144 :rtype: float
145 :return: Value of Retry-After in seconds.
146 """
147 return _utils.parse_retry_after(retry_after)
148
149 def get_retry_after(self, response: PipelineResponse[Any, AllHttpResponseType]) -> Optional[float]:
150 """Get the value of Retry-After in seconds.
151
152 :param response: The PipelineResponse object
153 :type response: ~azure.core.pipeline.PipelineResponse
154 :return: Value of Retry-After in seconds.
155 :rtype: float or None
156 """
157 return _utils.get_retry_after(response)
158
159 def _is_connection_error(self, err: Exception) -> bool:
160 """Errors when we're fairly sure that the server did not receive the
161 request, so it should be safe to retry.
162
163 :param err: The error raised by the pipeline.
164 :type err: ~azure.core.exceptions.AzureError
165 :return: True if connection error, False if not.
166 :rtype: bool
167 """
168 return isinstance(err, ServiceRequestError)
169
170 def _is_read_error(self, err: Exception) -> bool:
171 """Errors that occur after the request has been started, so we should
172 assume that the server began processing it.
173
174 :param err: The error raised by the pipeline.
175 :type err: ~azure.core.exceptions.AzureError
176 :return: True if read error, False if not.
177 :rtype: bool
178 """
179 return isinstance(err, ServiceResponseError)
180
181 def _is_method_retryable(
182 self,
183 settings: Dict[str, Any],
184 request: HTTPRequestType,
185 response: Optional[AllHttpResponseType] = None,
186 ):
187 """Checks if a given HTTP method should be retried upon, depending if
188 it is included on the method allowlist.
189
190 :param dict settings: The retry settings.
191 :param request: The HTTP request object.
192 :type request: ~azure.core.rest.HttpRequest
193 :param response: The HTTP response object.
194 :type response: ~azure.core.rest.HttpResponse or ~azure.core.rest.AsyncHttpResponse
195 :return: True if method should be retried upon. False if not in method allowlist.
196 :rtype: bool
197 """
198 if response and request.method.upper() in ["POST", "PATCH"] and response.status_code in [500, 503, 504]:
199 return True
200 if request.method.upper() not in settings["methods"]:
201 return False
202
203 return True
204
205 def is_retry(
206 self, settings: Dict[str, Any], response: PipelineResponse[HTTPRequestType, AllHttpResponseType]
207 ) -> bool:
208 """Checks if method/status code is retryable.
209
210 Based on allowlists and control variables such as the number of
211 total retries to allow, whether to respect the Retry-After header,
212 whether this header is present, and whether the returned status
213 code is on the list of status codes to be retried upon on the
214 presence of the aforementioned header.
215
216 The behavior is:
217 - If status_code < 400: don't retry
218 - Else if Retry-After present: retry
219 - Else: retry based on the safe status code list ([408, 429, 500, 502, 503, 504])
220
221
222 :param dict settings: The retry settings.
223 :param response: The PipelineResponse object
224 :type response: ~azure.core.pipeline.PipelineResponse
225 :return: True if method/status code is retryable. False if not retryable.
226 :rtype: bool
227 """
228 if response.http_response.status_code < 400:
229 return False
230 has_retry_after = bool(response.http_response.headers.get("Retry-After"))
231 if has_retry_after and self._respect_retry_after_header:
232 return True
233 if not self._is_method_retryable(settings, response.http_request, response=response.http_response):
234 return False
235 return settings["total"] and response.http_response.status_code in self._retry_on_status_codes
236
237 def is_exhausted(self, settings: Dict[str, Any]) -> bool:
238 """Checks if any retries left.
239
240 :param dict settings: the retry settings
241 :return: False if have more retries. True if retries exhausted.
242 :rtype: bool
243 """
244 settings_retry_count = (
245 settings["total"],
246 settings["connect"],
247 settings["read"],
248 settings["status"],
249 )
250 retry_counts: List[int] = list(filter(None, settings_retry_count))
251 if not retry_counts:
252 return False
253
254 return min(retry_counts) < 0
255
256 def increment(
257 self,
258 settings: Dict[str, Any],
259 response: Optional[
260 Union[PipelineRequest[HTTPRequestType], PipelineResponse[HTTPRequestType, AllHttpResponseType]]
261 ] = None,
262 error: Optional[Exception] = None,
263 ) -> bool:
264 """Increment the retry counters.
265
266 :param settings: The retry settings.
267 :type settings: dict
268 :param response: A pipeline response object.
269 :type response: ~azure.core.pipeline.PipelineResponse
270 :param error: An error encountered during the request, or
271 None if the response was received successfully.
272 :type error: ~azure.core.exceptions.AzureError
273 :return: Whether any retry attempt is available
274 True if more retry attempts available, False otherwise
275 :rtype: bool
276 """
277 # FIXME This code is not None safe: https://github.com/Azure/azure-sdk-for-python/issues/31528
278 response = cast(
279 Union[PipelineRequest[HTTPRequestType], PipelineResponse[HTTPRequestType, AllHttpResponseType]], response
280 )
281
282 settings["total"] -= 1
283
284 if isinstance(response, PipelineResponse) and response.http_response.status_code == 202:
285 return False
286
287 if error and self._is_connection_error(error):
288 # Connect retry?
289 settings["connect"] -= 1
290 settings["history"].append(RequestHistory(response.http_request, error=error))
291
292 elif error and self._is_read_error(error):
293 # Read retry?
294 settings["read"] -= 1
295 if hasattr(response, "http_request"):
296 settings["history"].append(RequestHistory(response.http_request, error=error))
297
298 else:
299 # Incrementing because of a server error like a 500 in
300 # status_forcelist and the given method is in the allowlist
301 if response:
302 settings["status"] -= 1
303 if hasattr(response, "http_request") and hasattr(response, "http_response"):
304 settings["history"].append(
305 RequestHistory(response.http_request, http_response=response.http_response)
306 )
307
308 if self.is_exhausted(settings):
309 return False
310
311 if response.http_request.body and hasattr(response.http_request.body, "read"):
312 if "body_position" not in settings:
313 return False
314 try:
315 # attempt to rewind the body to the initial position
316 # If it has "read", it has "seek", so casting for mypy
317 cast(IO[bytes], response.http_request.body).seek(settings["body_position"], SEEK_SET)
318 except (UnsupportedOperation, ValueError, AttributeError):
319 # if body is not seekable, then retry would not work
320 return False
321 file_positions = settings.get("file_positions")
322 if response.http_request.files and file_positions:
323 try:
324 for value in response.http_request.files.values():
325 file_name, body = value[0], value[1]
326 if file_name in file_positions:
327 position = file_positions[file_name]
328 body.seek(position, SEEK_SET)
329 except (UnsupportedOperation, ValueError, AttributeError):
330 # if body is not seekable, then retry would not work
331 return False
332 return True
333
334 def update_context(self, context: PipelineContext, retry_settings: Dict[str, Any]) -> None:
335 """Updates retry history in pipeline context.
336
337 :param context: The pipeline context.
338 :type context: ~azure.core.pipeline.PipelineContext
339 :param retry_settings: The retry settings.
340 :type retry_settings: dict
341 """
342 if retry_settings["history"]:
343 context["history"] = retry_settings["history"]
344
345 def _configure_timeout(
346 self, request: PipelineRequest[HTTPRequestType], absolute_timeout: float, is_response_error: bool
347 ) -> None:
348 if absolute_timeout <= 0:
349 if is_response_error:
350 raise ServiceResponseTimeoutError("Response timeout")
351 raise ServiceRequestTimeoutError("Request timeout")
352
353 # if connection_timeout is already set, ensure it doesn't exceed absolute_timeout
354 connection_timeout = request.context.options.get("connection_timeout")
355 if connection_timeout:
356 request.context.options["connection_timeout"] = min(connection_timeout, absolute_timeout)
357
358 # otherwise, try to ensure the transport's configured connection_timeout doesn't exceed absolute_timeout
359 # ("connection_config" isn't defined on Async/HttpTransport but all implementations in this library have it)
360 elif hasattr(request.context.transport, "connection_config"):
361 # FIXME This is fragile, should be refactored. Casting my way for mypy
362 # https://github.com/Azure/azure-sdk-for-python/issues/31530
363 connection_config = cast(
364 ConnectionConfiguration, request.context.transport.connection_config # type: ignore
365 )
366
367 default_timeout = getattr(connection_config, "timeout", absolute_timeout)
368 try:
369 if absolute_timeout < default_timeout:
370 request.context.options["connection_timeout"] = absolute_timeout
371 except TypeError:
372 # transport.connection_config.timeout is something unexpected (not a number)
373 pass
374
375 def _configure_positions(self, request: PipelineRequest[HTTPRequestType], retry_settings: Dict[str, Any]) -> None:
376 body_position = None
377 file_positions: Optional[Dict[str, int]] = None
378 if request.http_request.body and hasattr(request.http_request.body, "read"):
379 try:
380 # If it has "read", it has "tell", so casting for mypy
381 body_position = cast(IO[bytes], request.http_request.body).tell()
382 except (AttributeError, UnsupportedOperation):
383 # if body position cannot be obtained, then retries will not work
384 pass
385 else:
386 if request.http_request.files:
387 file_positions = {}
388 try:
389 for value in request.http_request.files.values():
390 name, body = value[0], value[1]
391 if name and body and hasattr(body, "read"):
392 # If it has "read", it has "tell", so casting for mypy
393 position = cast(IO[bytes], body).tell()
394 file_positions[name] = position
395 except (AttributeError, UnsupportedOperation):
396 file_positions = None
397
398 retry_settings["body_position"] = body_position
399 retry_settings["file_positions"] = file_positions
400
401
402class RetryPolicy(RetryPolicyBase, HTTPPolicy[HTTPRequestType, HTTPResponseType]):
403 """A retry policy.
404
405 The retry policy in the pipeline can be configured directly, or tweaked on a per-call basis.
406
407 :keyword int retry_total: Total number of retries to allow. Takes precedence over other counts.
408 Default value is 10.
409
410 :keyword int retry_connect: How many connection-related errors to retry on.
411 These are errors raised before the request is sent to the remote server,
412 which we assume has not triggered the server to process the request. Default value is 3.
413
414 :keyword int retry_read: How many times to retry on read errors.
415 These errors are raised after the request was sent to the server, so the
416 request may have side-effects. Default value is 3.
417
418 :keyword int retry_status: How many times to retry on bad status codes. Default value is 3.
419
420 :keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
421 (most errors are resolved immediately by a second try without a delay).
422 In fixed mode, retry policy will always sleep for {backoff factor}.
423 In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
424 seconds. If the backoff_factor is 0.1, then the retry will sleep
425 for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
426
427 :keyword int retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
428
429 :keyword RetryMode retry_mode: Fixed or exponential delay between attemps, default is exponential.
430
431 :keyword int timeout: Timeout setting for the operation in seconds, default is 604800s (7 days).
432
433 .. admonition:: Example:
434
435 .. literalinclude:: ../samples/test_example_sync.py
436 :start-after: [START retry_policy]
437 :end-before: [END retry_policy]
438 :language: python
439 :dedent: 4
440 :caption: Configuring a retry policy.
441 """
442
443 def _sleep_for_retry(
444 self,
445 response: PipelineResponse[HTTPRequestType, HTTPResponseType],
446 transport: HttpTransport[HTTPRequestType, HTTPResponseType],
447 ) -> bool:
448 """Sleep based on the Retry-After response header value.
449
450 :param response: The PipelineResponse object.
451 :type response: ~azure.core.pipeline.PipelineResponse
452 :param transport: The HTTP transport type.
453 :type transport: ~azure.core.pipeline.transport.HttpTransport
454 :return: Whether a sleep was done or not
455 :rtype: bool
456 """
457 retry_after = self.get_retry_after(response)
458 if retry_after:
459 transport.sleep(retry_after)
460 return True
461 return False
462
463 def _sleep_backoff(
464 self, settings: Dict[str, Any], transport: HttpTransport[HTTPRequestType, HTTPResponseType]
465 ) -> None:
466 """Sleep using exponential backoff. Immediately returns if backoff is 0.
467
468 :param dict settings: The retry settings.
469 :param transport: The HTTP transport type.
470 :type transport: ~azure.core.pipeline.transport.HttpTransport
471 """
472 backoff = self.get_backoff_time(settings)
473 if backoff <= 0:
474 return
475 transport.sleep(backoff)
476
477 def sleep(
478 self,
479 settings: Dict[str, Any],
480 transport: HttpTransport[HTTPRequestType, HTTPResponseType],
481 response: Optional[PipelineResponse[HTTPRequestType, HTTPResponseType]] = None,
482 ) -> None:
483 """Sleep between retry attempts.
484
485 This method will respect a server's ``Retry-After`` response header
486 and sleep the duration of the time requested. If that is not present, it
487 will use an exponential backoff. By default, the backoff factor is 0 and
488 this method will return immediately.
489
490 :param dict settings: The retry settings.
491 :param transport: The HTTP transport type.
492 :type transport: ~azure.core.pipeline.transport.HttpTransport
493 :param response: The PipelineResponse object.
494 :type response: ~azure.core.pipeline.PipelineResponse
495 """
496 if response:
497 slept = self._sleep_for_retry(response, transport)
498 if slept:
499 return
500 self._sleep_backoff(settings, transport)
501
502 def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]:
503 """Sends the PipelineRequest object to the next policy. Uses retry settings if necessary.
504
505 :param request: The PipelineRequest object
506 :type request: ~azure.core.pipeline.PipelineRequest
507 :return: Returns the PipelineResponse or raises error if maximum retries exceeded.
508 :rtype: ~azure.core.pipeline.PipelineResponse
509 :raises: ~azure.core.exceptions.AzureError if maximum retries exceeded.
510 :raises: ~azure.core.exceptions.ClientAuthenticationError if authentication
511 """
512 retry_active = True
513 response = None
514 retry_settings = self.configure_retries(request.context.options)
515 self._configure_positions(request, retry_settings)
516
517 absolute_timeout = retry_settings["timeout"]
518 is_response_error = True
519
520 while retry_active:
521 start_time = time.time()
522 # PipelineContext types transport as a Union of HttpTransport and AsyncHttpTransport, but
523 # here we know that this is an HttpTransport.
524 # The correct fix is to make PipelineContext generic, but that's a breaking change and a lot of
525 # generic to update in Pipeline, PipelineClient, PipelineRequest, PipelineResponse, etc.
526 transport: HttpTransport[HTTPRequestType, HTTPResponseType] = cast(
527 HttpTransport[HTTPRequestType, HTTPResponseType], request.context.transport
528 )
529 try:
530 self._configure_timeout(request, absolute_timeout, is_response_error)
531 request.context["retry_count"] = len(retry_settings["history"])
532 response = self.next.send(request)
533 if self.is_retry(retry_settings, response):
534 retry_active = self.increment(retry_settings, response=response)
535 if retry_active:
536 self.sleep(retry_settings, transport, response=response)
537 is_response_error = True
538 continue
539 break
540 except ClientAuthenticationError: # pylint:disable=try-except-raise
541 # the authentication policy failed such that the client's request can't
542 # succeed--we'll never have a response to it, so propagate the exception
543 raise
544 except AzureError as err:
545 if absolute_timeout > 0 and self._is_method_retryable(retry_settings, request.http_request):
546 retry_active = self.increment(retry_settings, response=request, error=err)
547 if retry_active:
548 self.sleep(retry_settings, transport)
549 if isinstance(err, ServiceRequestError):
550 is_response_error = False
551 else:
552 is_response_error = True
553 continue
554 raise err
555 finally:
556 end_time = time.time()
557 if absolute_timeout:
558 absolute_timeout -= end_time - start_time
559 if not response:
560 raise AzureError("Maximum retries exceeded.")
561
562 self.update_context(response.context, retry_settings)
563 return response