Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_retry.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

215 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26from typing import TypeVar, Any, Dict, Optional, Type, List, Union, cast, IO 

27from io import SEEK_SET, UnsupportedOperation 

28import logging 

29import time 

30from enum import Enum 

31from azure.core.configuration import ConnectionConfiguration 

32from azure.core.pipeline import PipelineResponse, PipelineRequest, PipelineContext 

33from azure.core.pipeline.transport import ( 

34 HttpResponse as LegacyHttpResponse, 

35 AsyncHttpResponse as LegacyAsyncHttpResponse, 

36 HttpRequest as LegacyHttpRequest, 

37 HttpTransport, 

38) 

39from azure.core.rest import HttpResponse, AsyncHttpResponse, HttpRequest 

40from azure.core.exceptions import ( 

41 AzureError, 

42 ClientAuthenticationError, 

43 ServiceResponseError, 

44 ServiceRequestError, 

45 ServiceRequestTimeoutError, 

46 ServiceResponseTimeoutError, 

47) 

48 

49from ._base import HTTPPolicy, RequestHistory 

50from . import _utils 

51from ..._enum_meta import CaseInsensitiveEnumMeta 

52 

53HTTPResponseType = TypeVar("HTTPResponseType", HttpResponse, LegacyHttpResponse) 

54AllHttpResponseType = TypeVar( 

55 "AllHttpResponseType", HttpResponse, LegacyHttpResponse, AsyncHttpResponse, LegacyAsyncHttpResponse 

56) 

57HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) 

58ClsRetryPolicy = TypeVar("ClsRetryPolicy", bound="RetryPolicyBase") 

59 

60_LOGGER = logging.getLogger(__name__) 

61 

62 

63class RetryMode(str, Enum, metaclass=CaseInsensitiveEnumMeta): 

64 # pylint: disable=enum-must-be-uppercase 

65 Exponential = "exponential" 

66 Fixed = "fixed" 

67 

68 

69class RetryPolicyBase: 

70 # pylint: disable=too-many-instance-attributes 

71 #: Maximum backoff time. 

72 BACKOFF_MAX = 120 

73 _SAFE_CODES = set(range(506)) - set([408, 429, 500, 502, 503, 504]) 

74 _RETRY_CODES = set(range(999)) - _SAFE_CODES 

75 

76 def __init__(self, **kwargs: Any) -> None: 

77 self.total_retries: int = kwargs.pop("retry_total", 10) 

78 self.connect_retries: int = kwargs.pop("retry_connect", 3) 

79 self.read_retries: int = kwargs.pop("retry_read", 3) 

80 self.status_retries: int = kwargs.pop("retry_status", 3) 

81 self.backoff_factor: float = kwargs.pop("retry_backoff_factor", 0.8) 

82 self.backoff_max: int = kwargs.pop("retry_backoff_max", self.BACKOFF_MAX) 

83 self.retry_mode: RetryMode = kwargs.pop("retry_mode", RetryMode.Exponential) 

84 self.timeout: int = kwargs.pop("timeout", 604800) 

85 

86 retry_codes = self._RETRY_CODES 

87 status_codes = kwargs.pop("retry_on_status_codes", []) 

88 self._retry_on_status_codes = set(status_codes) | retry_codes 

89 self._method_whitelist = frozenset(["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]) 

90 self._respect_retry_after_header = True 

91 super(RetryPolicyBase, self).__init__() 

92 

93 @classmethod 

94 def no_retries(cls: Type[ClsRetryPolicy]) -> ClsRetryPolicy: 

95 """Disable retries. 

96 

97 :return: A retry policy with retries disabled. 

98 :rtype: ~azure.core.pipeline.policies.RetryPolicy or ~azure.core.pipeline.policies.AsyncRetryPolicy 

99 """ 

100 return cls(retry_total=0) 

101 

102 def configure_retries(self, options: Dict[str, Any]) -> Dict[str, Any]: 

103 """Configures the retry settings. 

104 

105 :param options: keyword arguments from context. 

106 :type options: dict 

107 :return: A dict containing settings and history for retries. 

108 :rtype: dict 

109 """ 

110 return { 

111 "total": options.pop("retry_total", self.total_retries), 

112 "connect": options.pop("retry_connect", self.connect_retries), 

113 "read": options.pop("retry_read", self.read_retries), 

114 "status": options.pop("retry_status", self.status_retries), 

115 "backoff": options.pop("retry_backoff_factor", self.backoff_factor), 

116 "max_backoff": options.pop("retry_backoff_max", self.BACKOFF_MAX), 

117 "methods": options.pop("retry_on_methods", self._method_whitelist), 

118 "timeout": options.pop("timeout", self.timeout), 

119 "history": [], 

120 } 

121 

122 def get_backoff_time(self, settings: Dict[str, Any]) -> float: 

123 """Returns the current backoff time. 

124 

125 :param dict settings: The retry settings. 

126 :return: The current backoff value. 

127 :rtype: float 

128 """ 

129 # We want to consider only the last consecutive errors sequence (Ignore redirects). 

130 consecutive_errors_len = len(settings["history"]) 

131 if consecutive_errors_len <= 1: 

132 return 0 

133 

134 if self.retry_mode == RetryMode.Fixed: 

135 backoff_value = settings["backoff"] 

136 else: 

137 backoff_value = settings["backoff"] * (2 ** (consecutive_errors_len - 1)) 

138 return min(settings["max_backoff"], backoff_value) 

139 

140 def parse_retry_after(self, retry_after: str) -> float: 

141 """Helper to parse Retry-After and get value in seconds. 

142 

143 :param str retry_after: Retry-After header 

144 :rtype: float 

145 :return: Value of Retry-After in seconds. 

146 """ 

147 return _utils.parse_retry_after(retry_after) 

148 

149 def get_retry_after(self, response: PipelineResponse[Any, AllHttpResponseType]) -> Optional[float]: 

150 """Get the value of Retry-After in seconds. 

151 

152 :param response: The PipelineResponse object 

153 :type response: ~azure.core.pipeline.PipelineResponse 

154 :return: Value of Retry-After in seconds. 

155 :rtype: float or None 

156 """ 

157 return _utils.get_retry_after(response) 

158 

159 def _is_connection_error(self, err: Exception) -> bool: 

160 """Errors when we're fairly sure that the server did not receive the 

161 request, so it should be safe to retry. 

162 

163 :param err: The error raised by the pipeline. 

164 :type err: ~azure.core.exceptions.AzureError 

165 :return: True if connection error, False if not. 

166 :rtype: bool 

167 """ 

168 return isinstance(err, ServiceRequestError) 

169 

170 def _is_read_error(self, err: Exception) -> bool: 

171 """Errors that occur after the request has been started, so we should 

172 assume that the server began processing it. 

173 

174 :param err: The error raised by the pipeline. 

175 :type err: ~azure.core.exceptions.AzureError 

176 :return: True if read error, False if not. 

177 :rtype: bool 

178 """ 

179 return isinstance(err, ServiceResponseError) 

180 

181 def _is_method_retryable( 

182 self, 

183 settings: Dict[str, Any], 

184 request: HTTPRequestType, 

185 response: Optional[AllHttpResponseType] = None, 

186 ): 

187 """Checks if a given HTTP method should be retried upon, depending if 

188 it is included on the method allowlist. 

189 

190 :param dict settings: The retry settings. 

191 :param request: The HTTP request object. 

192 :type request: ~azure.core.rest.HttpRequest 

193 :param response: The HTTP response object. 

194 :type response: ~azure.core.rest.HttpResponse or ~azure.core.rest.AsyncHttpResponse 

195 :return: True if method should be retried upon. False if not in method allowlist. 

196 :rtype: bool 

197 """ 

198 if response and request.method.upper() in ["POST", "PATCH"] and response.status_code in [500, 503, 504]: 

199 return True 

200 if request.method.upper() not in settings["methods"]: 

201 return False 

202 

203 return True 

204 

205 def is_retry( 

206 self, settings: Dict[str, Any], response: PipelineResponse[HTTPRequestType, AllHttpResponseType] 

207 ) -> bool: 

208 """Checks if method/status code is retryable. 

209 

210 Based on allowlists and control variables such as the number of 

211 total retries to allow, whether to respect the Retry-After header, 

212 whether this header is present, and whether the returned status 

213 code is on the list of status codes to be retried upon on the 

214 presence of the aforementioned header. 

215 

216 The behavior is: 

217 - If status_code < 400: don't retry 

218 - Else if Retry-After present: retry 

219 - Else: retry based on the safe status code list ([408, 429, 500, 502, 503, 504]) 

220 

221 

222 :param dict settings: The retry settings. 

223 :param response: The PipelineResponse object 

224 :type response: ~azure.core.pipeline.PipelineResponse 

225 :return: True if method/status code is retryable. False if not retryable. 

226 :rtype: bool 

227 """ 

228 if response.http_response.status_code < 400: 

229 return False 

230 has_retry_after = bool(response.http_response.headers.get("Retry-After")) 

231 if has_retry_after and self._respect_retry_after_header: 

232 return True 

233 if not self._is_method_retryable(settings, response.http_request, response=response.http_response): 

234 return False 

235 return settings["total"] and response.http_response.status_code in self._retry_on_status_codes 

236 

237 def is_exhausted(self, settings: Dict[str, Any]) -> bool: 

238 """Checks if any retries left. 

239 

240 :param dict settings: the retry settings 

241 :return: False if have more retries. True if retries exhausted. 

242 :rtype: bool 

243 """ 

244 settings_retry_count = ( 

245 settings["total"], 

246 settings["connect"], 

247 settings["read"], 

248 settings["status"], 

249 ) 

250 retry_counts: List[int] = list(filter(None, settings_retry_count)) 

251 if not retry_counts: 

252 return False 

253 

254 return min(retry_counts) < 0 

255 

256 def increment( 

257 self, 

258 settings: Dict[str, Any], 

259 response: Optional[ 

260 Union[PipelineRequest[HTTPRequestType], PipelineResponse[HTTPRequestType, AllHttpResponseType]] 

261 ] = None, 

262 error: Optional[Exception] = None, 

263 ) -> bool: 

264 """Increment the retry counters. 

265 

266 :param settings: The retry settings. 

267 :type settings: dict 

268 :param response: A pipeline response object. 

269 :type response: ~azure.core.pipeline.PipelineResponse 

270 :param error: An error encountered during the request, or 

271 None if the response was received successfully. 

272 :type error: ~azure.core.exceptions.AzureError 

273 :return: Whether any retry attempt is available 

274 True if more retry attempts available, False otherwise 

275 :rtype: bool 

276 """ 

277 # FIXME This code is not None safe: https://github.com/Azure/azure-sdk-for-python/issues/31528 

278 response = cast( 

279 Union[PipelineRequest[HTTPRequestType], PipelineResponse[HTTPRequestType, AllHttpResponseType]], response 

280 ) 

281 

282 settings["total"] -= 1 

283 

284 if isinstance(response, PipelineResponse) and response.http_response.status_code == 202: 

285 return False 

286 

287 if error and self._is_connection_error(error): 

288 # Connect retry? 

289 settings["connect"] -= 1 

290 settings["history"].append(RequestHistory(response.http_request, error=error)) 

291 

292 elif error and self._is_read_error(error): 

293 # Read retry? 

294 settings["read"] -= 1 

295 if hasattr(response, "http_request"): 

296 settings["history"].append(RequestHistory(response.http_request, error=error)) 

297 

298 else: 

299 # Incrementing because of a server error like a 500 in 

300 # status_forcelist and the given method is in the allowlist 

301 if response: 

302 settings["status"] -= 1 

303 if hasattr(response, "http_request") and hasattr(response, "http_response"): 

304 settings["history"].append( 

305 RequestHistory(response.http_request, http_response=response.http_response) 

306 ) 

307 

308 if self.is_exhausted(settings): 

309 return False 

310 

311 if response.http_request.body and hasattr(response.http_request.body, "read"): 

312 if "body_position" not in settings: 

313 return False 

314 try: 

315 # attempt to rewind the body to the initial position 

316 # If it has "read", it has "seek", so casting for mypy 

317 cast(IO[bytes], response.http_request.body).seek(settings["body_position"], SEEK_SET) 

318 except (UnsupportedOperation, ValueError, AttributeError): 

319 # if body is not seekable, then retry would not work 

320 return False 

321 file_positions = settings.get("file_positions") 

322 if response.http_request.files and file_positions: 

323 try: 

324 for value in response.http_request.files.values(): 

325 file_name, body = value[0], value[1] 

326 if file_name in file_positions: 

327 position = file_positions[file_name] 

328 body.seek(position, SEEK_SET) 

329 except (UnsupportedOperation, ValueError, AttributeError): 

330 # if body is not seekable, then retry would not work 

331 return False 

332 return True 

333 

334 def update_context(self, context: PipelineContext, retry_settings: Dict[str, Any]) -> None: 

335 """Updates retry history in pipeline context. 

336 

337 :param context: The pipeline context. 

338 :type context: ~azure.core.pipeline.PipelineContext 

339 :param retry_settings: The retry settings. 

340 :type retry_settings: dict 

341 """ 

342 if retry_settings["history"]: 

343 context["history"] = retry_settings["history"] 

344 

345 def _configure_timeout( 

346 self, request: PipelineRequest[HTTPRequestType], absolute_timeout: float, is_response_error: bool 

347 ) -> None: 

348 if absolute_timeout <= 0: 

349 if is_response_error: 

350 raise ServiceResponseTimeoutError("Response timeout") 

351 raise ServiceRequestTimeoutError("Request timeout") 

352 

353 # if connection_timeout is already set, ensure it doesn't exceed absolute_timeout 

354 connection_timeout = request.context.options.get("connection_timeout") 

355 if connection_timeout: 

356 request.context.options["connection_timeout"] = min(connection_timeout, absolute_timeout) 

357 

358 # otherwise, try to ensure the transport's configured connection_timeout doesn't exceed absolute_timeout 

359 # ("connection_config" isn't defined on Async/HttpTransport but all implementations in this library have it) 

360 elif hasattr(request.context.transport, "connection_config"): 

361 # FIXME This is fragile, should be refactored. Casting my way for mypy 

362 # https://github.com/Azure/azure-sdk-for-python/issues/31530 

363 connection_config = cast( 

364 ConnectionConfiguration, request.context.transport.connection_config # type: ignore 

365 ) 

366 

367 default_timeout = getattr(connection_config, "timeout", absolute_timeout) 

368 try: 

369 if absolute_timeout < default_timeout: 

370 request.context.options["connection_timeout"] = absolute_timeout 

371 except TypeError: 

372 # transport.connection_config.timeout is something unexpected (not a number) 

373 pass 

374 

375 def _configure_positions(self, request: PipelineRequest[HTTPRequestType], retry_settings: Dict[str, Any]) -> None: 

376 body_position = None 

377 file_positions: Optional[Dict[str, int]] = None 

378 if request.http_request.body and hasattr(request.http_request.body, "read"): 

379 try: 

380 # If it has "read", it has "tell", so casting for mypy 

381 body_position = cast(IO[bytes], request.http_request.body).tell() 

382 except (AttributeError, UnsupportedOperation): 

383 # if body position cannot be obtained, then retries will not work 

384 pass 

385 else: 

386 if request.http_request.files: 

387 file_positions = {} 

388 try: 

389 for value in request.http_request.files.values(): 

390 name, body = value[0], value[1] 

391 if name and body and hasattr(body, "read"): 

392 # If it has "read", it has "tell", so casting for mypy 

393 position = cast(IO[bytes], body).tell() 

394 file_positions[name] = position 

395 except (AttributeError, UnsupportedOperation): 

396 file_positions = None 

397 

398 retry_settings["body_position"] = body_position 

399 retry_settings["file_positions"] = file_positions 

400 

401 

402class RetryPolicy(RetryPolicyBase, HTTPPolicy[HTTPRequestType, HTTPResponseType]): 

403 """A retry policy. 

404 

405 The retry policy in the pipeline can be configured directly, or tweaked on a per-call basis. 

406 

407 :keyword int retry_total: Total number of retries to allow. Takes precedence over other counts. 

408 Default value is 10. 

409 

410 :keyword int retry_connect: How many connection-related errors to retry on. 

411 These are errors raised before the request is sent to the remote server, 

412 which we assume has not triggered the server to process the request. Default value is 3. 

413 

414 :keyword int retry_read: How many times to retry on read errors. 

415 These errors are raised after the request was sent to the server, so the 

416 request may have side-effects. Default value is 3. 

417 

418 :keyword int retry_status: How many times to retry on bad status codes. Default value is 3. 

419 

420 :keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try 

421 (most errors are resolved immediately by a second try without a delay). 

422 In fixed mode, retry policy will always sleep for {backoff factor}. 

423 In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))` 

424 seconds. If the backoff_factor is 0.1, then the retry will sleep 

425 for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8. 

426 

427 :keyword int retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes). 

428 

429 :keyword RetryMode retry_mode: Fixed or exponential delay between attemps, default is exponential. 

430 

431 :keyword int timeout: Timeout setting for the operation in seconds, default is 604800s (7 days). 

432 

433 .. admonition:: Example: 

434 

435 .. literalinclude:: ../samples/test_example_sync.py 

436 :start-after: [START retry_policy] 

437 :end-before: [END retry_policy] 

438 :language: python 

439 :dedent: 4 

440 :caption: Configuring a retry policy. 

441 """ 

442 

443 def _sleep_for_retry( 

444 self, 

445 response: PipelineResponse[HTTPRequestType, HTTPResponseType], 

446 transport: HttpTransport[HTTPRequestType, HTTPResponseType], 

447 ) -> bool: 

448 """Sleep based on the Retry-After response header value. 

449 

450 :param response: The PipelineResponse object. 

451 :type response: ~azure.core.pipeline.PipelineResponse 

452 :param transport: The HTTP transport type. 

453 :type transport: ~azure.core.pipeline.transport.HttpTransport 

454 :return: Whether a sleep was done or not 

455 :rtype: bool 

456 """ 

457 retry_after = self.get_retry_after(response) 

458 if retry_after: 

459 transport.sleep(retry_after) 

460 return True 

461 return False 

462 

463 def _sleep_backoff( 

464 self, settings: Dict[str, Any], transport: HttpTransport[HTTPRequestType, HTTPResponseType] 

465 ) -> None: 

466 """Sleep using exponential backoff. Immediately returns if backoff is 0. 

467 

468 :param dict settings: The retry settings. 

469 :param transport: The HTTP transport type. 

470 :type transport: ~azure.core.pipeline.transport.HttpTransport 

471 """ 

472 backoff = self.get_backoff_time(settings) 

473 if backoff <= 0: 

474 return 

475 transport.sleep(backoff) 

476 

477 def sleep( 

478 self, 

479 settings: Dict[str, Any], 

480 transport: HttpTransport[HTTPRequestType, HTTPResponseType], 

481 response: Optional[PipelineResponse[HTTPRequestType, HTTPResponseType]] = None, 

482 ) -> None: 

483 """Sleep between retry attempts. 

484 

485 This method will respect a server's ``Retry-After`` response header 

486 and sleep the duration of the time requested. If that is not present, it 

487 will use an exponential backoff. By default, the backoff factor is 0 and 

488 this method will return immediately. 

489 

490 :param dict settings: The retry settings. 

491 :param transport: The HTTP transport type. 

492 :type transport: ~azure.core.pipeline.transport.HttpTransport 

493 :param response: The PipelineResponse object. 

494 :type response: ~azure.core.pipeline.PipelineResponse 

495 """ 

496 if response: 

497 slept = self._sleep_for_retry(response, transport) 

498 if slept: 

499 return 

500 self._sleep_backoff(settings, transport) 

501 

502 def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]: 

503 """Sends the PipelineRequest object to the next policy. Uses retry settings if necessary. 

504 

505 :param request: The PipelineRequest object 

506 :type request: ~azure.core.pipeline.PipelineRequest 

507 :return: Returns the PipelineResponse or raises error if maximum retries exceeded. 

508 :rtype: ~azure.core.pipeline.PipelineResponse 

509 :raises: ~azure.core.exceptions.AzureError if maximum retries exceeded. 

510 :raises: ~azure.core.exceptions.ClientAuthenticationError if authentication 

511 """ 

512 retry_active = True 

513 response = None 

514 retry_settings = self.configure_retries(request.context.options) 

515 self._configure_positions(request, retry_settings) 

516 

517 absolute_timeout = retry_settings["timeout"] 

518 is_response_error = True 

519 

520 while retry_active: 

521 start_time = time.time() 

522 # PipelineContext types transport as a Union of HttpTransport and AsyncHttpTransport, but 

523 # here we know that this is an HttpTransport. 

524 # The correct fix is to make PipelineContext generic, but that's a breaking change and a lot of 

525 # generic to update in Pipeline, PipelineClient, PipelineRequest, PipelineResponse, etc. 

526 transport: HttpTransport[HTTPRequestType, HTTPResponseType] = cast( 

527 HttpTransport[HTTPRequestType, HTTPResponseType], request.context.transport 

528 ) 

529 try: 

530 self._configure_timeout(request, absolute_timeout, is_response_error) 

531 request.context["retry_count"] = len(retry_settings["history"]) 

532 response = self.next.send(request) 

533 if self.is_retry(retry_settings, response): 

534 retry_active = self.increment(retry_settings, response=response) 

535 if retry_active: 

536 self.sleep(retry_settings, transport, response=response) 

537 is_response_error = True 

538 continue 

539 break 

540 except ClientAuthenticationError: # pylint:disable=try-except-raise 

541 # the authentication policy failed such that the client's request can't 

542 # succeed--we'll never have a response to it, so propagate the exception 

543 raise 

544 except AzureError as err: 

545 if absolute_timeout > 0 and self._is_method_retryable(retry_settings, request.http_request): 

546 retry_active = self.increment(retry_settings, response=request, error=err) 

547 if retry_active: 

548 self.sleep(retry_settings, transport) 

549 if isinstance(err, ServiceRequestError): 

550 is_response_error = False 

551 else: 

552 is_response_error = True 

553 continue 

554 raise err 

555 finally: 

556 end_time = time.time() 

557 if absolute_timeout: 

558 absolute_timeout -= end_time - start_time 

559 if not response: 

560 raise AzureError("Maximum retries exceeded.") 

561 

562 self.update_context(response.context, retry_settings) 

563 return response