1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26"""
27This module is the requests implementation of Pipeline ABC
28"""
29from typing import TypeVar, Dict, Any, Optional, cast
30import logging
31import time
32from azure.core.pipeline import PipelineRequest, PipelineResponse
33from azure.core.pipeline.transport import (
34 AsyncHttpResponse as LegacyAsyncHttpResponse,
35 HttpRequest as LegacyHttpRequest,
36 AsyncHttpTransport,
37)
38from azure.core.rest import AsyncHttpResponse, HttpRequest
39from azure.core.exceptions import (
40 AzureError,
41 ClientAuthenticationError,
42 ServiceRequestError,
43)
44from ._base_async import AsyncHTTPPolicy
45from ._retry import RetryPolicyBase
46
47AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType", AsyncHttpResponse, LegacyAsyncHttpResponse)
48HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest)
49
50_LOGGER = logging.getLogger(__name__)
51
52
53class AsyncRetryPolicy(RetryPolicyBase, AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType]):
54 """Async flavor of the retry policy.
55
56 The async retry policy in the pipeline can be configured directly, or tweaked on a per-call basis.
57
58 :keyword int retry_total: Total number of retries to allow. Takes precedence over other counts.
59 Default value is 10.
60 :keyword int retry_connect: How many connection-related errors to retry on.
61 These are errors raised before the request is sent to the remote server,
62 which we assume has not triggered the server to process the request. Default value is 3.
63 :keyword int retry_read: How many times to retry on read errors.
64 These errors are raised after the request was sent to the server, so the
65 request may have side-effects. Default value is 3.
66 :keyword int retry_status: How many times to retry on bad status codes. Default value is 3.
67 :keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
68 (most errors are resolved immediately by a second try without a delay).
69 Retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
70 seconds. If the backoff_factor is 0.1, then the retry will sleep
71 for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
72 :keyword int retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
73
74 .. admonition:: Example:
75
76 .. literalinclude:: ../samples/test_example_async.py
77 :start-after: [START async_retry_policy]
78 :end-before: [END async_retry_policy]
79 :language: python
80 :dedent: 4
81 :caption: Configuring an async retry policy.
82 """
83
84 async def _sleep_for_retry(
85 self,
86 response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType],
87 transport: AsyncHttpTransport[HTTPRequestType, AsyncHTTPResponseType],
88 ) -> bool:
89 """Sleep based on the Retry-After response header value.
90
91 :param response: The PipelineResponse object.
92 :type response: ~azure.core.pipeline.PipelineResponse
93 :param transport: The HTTP transport type.
94 :type transport: ~azure.core.pipeline.transport.AsyncHttpTransport
95 :return: Whether the retry-after value was found.
96 :rtype: bool
97 """
98 retry_after = self.get_retry_after(response)
99 if retry_after:
100 await transport.sleep(retry_after)
101 return True
102 return False
103
104 async def _sleep_backoff(
105 self,
106 settings: Dict[str, Any],
107 transport: AsyncHttpTransport[HTTPRequestType, AsyncHTTPResponseType],
108 ) -> None:
109 """Sleep using exponential backoff. Immediately returns if backoff is 0.
110
111 :param dict settings: The retry settings.
112 :param transport: The HTTP transport type.
113 :type transport: ~azure.core.pipeline.transport.AsyncHttpTransport
114 """
115 backoff = self.get_backoff_time(settings)
116 if backoff <= 0:
117 return
118 await transport.sleep(backoff)
119
120 async def sleep(
121 self,
122 settings: Dict[str, Any],
123 transport: AsyncHttpTransport[HTTPRequestType, AsyncHTTPResponseType],
124 response: Optional[PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]] = None,
125 ) -> None:
126 """Sleep between retry attempts.
127
128 This method will respect a server's ``Retry-After`` response header
129 and sleep the duration of the time requested. If that is not present, it
130 will use an exponential backoff. By default, the backoff factor is 0 and
131 this method will return immediately.
132
133 :param dict settings: The retry settings.
134 :param transport: The HTTP transport type.
135 :type transport: ~azure.core.pipeline.transport.AsyncHttpTransport
136 :param response: The PipelineResponse object.
137 :type response: ~azure.core.pipeline.PipelineResponse
138 """
139 if response:
140 slept = await self._sleep_for_retry(response, transport)
141 if slept:
142 return
143 await self._sleep_backoff(settings, transport)
144
145 async def send(
146 self, request: PipelineRequest[HTTPRequestType]
147 ) -> PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]:
148 """Uses the configured retry policy to send the request to the next policy in the pipeline.
149
150 :param request: The PipelineRequest object
151 :type request: ~azure.core.pipeline.PipelineRequest
152 :return: The PipelineResponse.
153 :rtype: ~azure.core.pipeline.PipelineResponse
154 :raise ~azure.core.exceptions.AzureError: if maximum retries exceeded.
155 :raise ~azure.core.exceptions.ClientAuthenticationError: if authentication fails
156 """
157 retry_active = True
158 response = None
159 retry_settings = self.configure_retries(request.context.options)
160 self._configure_positions(request, retry_settings)
161
162 absolute_timeout = retry_settings["timeout"]
163 is_response_error = True
164
165 while retry_active:
166 start_time = time.time()
167 # PipelineContext types transport as a Union of HttpTransport and AsyncHttpTransport, but
168 # here we know that this is an AsyncHttpTransport.
169 # The correct fix is to make PipelineContext generic, but that's a breaking change and a lot of
170 # generic to update in Pipeline, PipelineClient, PipelineRequest, PipelineResponse, etc.
171 transport: AsyncHttpTransport[HTTPRequestType, AsyncHTTPResponseType] = cast(
172 AsyncHttpTransport[HTTPRequestType, AsyncHTTPResponseType],
173 request.context.transport,
174 )
175 try:
176 self._configure_timeout(request, absolute_timeout, is_response_error)
177 request.context["retry_count"] = len(retry_settings["history"])
178 response = await self.next.send(request)
179 if self.is_retry(retry_settings, response):
180 retry_active = self.increment(retry_settings, response=response)
181 if retry_active:
182 await self.sleep(
183 retry_settings,
184 transport,
185 response=response,
186 )
187 is_response_error = True
188 continue
189 break
190 except ClientAuthenticationError:
191 # the authentication policy failed such that the client's request can't
192 # succeed--we'll never have a response to it, so propagate the exception
193 raise
194 except AzureError as err:
195 if absolute_timeout > 0 and self._is_method_retryable(retry_settings, request.http_request):
196 retry_active = self.increment(retry_settings, response=request, error=err)
197 if retry_active:
198 await self.sleep(retry_settings, transport)
199 if isinstance(err, ServiceRequestError):
200 is_response_error = False
201 else:
202 is_response_error = True
203 continue
204 raise err
205 finally:
206 end_time = time.time()
207 if absolute_timeout:
208 absolute_timeout -= end_time - start_time
209 if not response:
210 raise AzureError("Maximum retries exceeded.")
211
212 self.update_context(response.context, retry_settings)
213 return response