Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/pipeline/policies/_retry.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

216 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26from typing import TypeVar, Any, Dict, Optional, Type, List, Union, cast, IO 

27from io import SEEK_SET, UnsupportedOperation 

28import logging 

29import time 

30from enum import Enum 

31from azure.core.configuration import ConnectionConfiguration 

32from azure.core.pipeline import PipelineResponse, PipelineRequest, PipelineContext 

33from azure.core.pipeline.transport import ( 

34 HttpResponse as LegacyHttpResponse, 

35 AsyncHttpResponse as LegacyAsyncHttpResponse, 

36 HttpRequest as LegacyHttpRequest, 

37 HttpTransport, 

38) 

39from azure.core.rest import HttpResponse, AsyncHttpResponse, HttpRequest 

40from azure.core.exceptions import ( 

41 AzureError, 

42 ClientAuthenticationError, 

43 ServiceResponseError, 

44 ServiceRequestError, 

45 ServiceRequestTimeoutError, 

46 ServiceResponseTimeoutError, 

47) 

48 

49from ._base import HTTPPolicy, RequestHistory 

50from . import _utils 

51from ..._enum_meta import CaseInsensitiveEnumMeta 

52 

53HTTPResponseType = TypeVar("HTTPResponseType", HttpResponse, LegacyHttpResponse) 

54AllHttpResponseType = TypeVar( 

55 "AllHttpResponseType", 

56 HttpResponse, 

57 LegacyHttpResponse, 

58 AsyncHttpResponse, 

59 LegacyAsyncHttpResponse, 

60) 

61HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) 

62ClsRetryPolicy = TypeVar("ClsRetryPolicy", bound="RetryPolicyBase") 

63 

64_LOGGER = logging.getLogger(__name__) 

65 

66 

67class RetryMode(str, Enum, metaclass=CaseInsensitiveEnumMeta): 

68 # pylint: disable=enum-must-be-uppercase 

69 """Enum for retry modes.""" 

70 Exponential = "exponential" 

71 Fixed = "fixed" 

72 

73 

74class RetryPolicyBase: 

75 # pylint: disable=too-many-instance-attributes 

76 #: Maximum backoff time. 

77 BACKOFF_MAX = 120 

78 _SAFE_CODES = set(range(506)) - set([408, 429, 500, 502, 503, 504]) 

79 _RETRY_CODES = set(range(999)) - _SAFE_CODES 

80 

81 def __init__(self, **kwargs: Any) -> None: 

82 self.total_retries: int = kwargs.pop("retry_total", 10) 

83 self.connect_retries: int = kwargs.pop("retry_connect", 3) 

84 self.read_retries: int = kwargs.pop("retry_read", 3) 

85 self.status_retries: int = kwargs.pop("retry_status", 3) 

86 self.backoff_factor: float = kwargs.pop("retry_backoff_factor", 0.8) 

87 self.backoff_max: int = kwargs.pop("retry_backoff_max", self.BACKOFF_MAX) 

88 self.retry_mode: RetryMode = kwargs.pop("retry_mode", RetryMode.Exponential) 

89 self.timeout: int = kwargs.pop("timeout", 604800) 

90 

91 retry_codes = self._RETRY_CODES 

92 status_codes = kwargs.pop("retry_on_status_codes", []) 

93 self._retry_on_status_codes = set(status_codes) | retry_codes 

94 self._method_whitelist = frozenset(["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]) 

95 self._respect_retry_after_header = True 

96 super(RetryPolicyBase, self).__init__() 

97 

98 @classmethod 

99 def no_retries(cls: Type[ClsRetryPolicy]) -> ClsRetryPolicy: 

100 """Disable retries. 

101 

102 :return: A retry policy with retries disabled. 

103 :rtype: ~azure.core.pipeline.policies.RetryPolicy or ~azure.core.pipeline.policies.AsyncRetryPolicy 

104 """ 

105 return cls(retry_total=0) 

106 

107 def configure_retries(self, options: Dict[str, Any]) -> Dict[str, Any]: 

108 """Configures the retry settings. 

109 

110 :param options: keyword arguments from context. 

111 :type options: dict 

112 :return: A dict containing settings and history for retries. 

113 :rtype: dict 

114 """ 

115 return { 

116 "total": options.pop("retry_total", self.total_retries), 

117 "connect": options.pop("retry_connect", self.connect_retries), 

118 "read": options.pop("retry_read", self.read_retries), 

119 "status": options.pop("retry_status", self.status_retries), 

120 "backoff": options.pop("retry_backoff_factor", self.backoff_factor), 

121 "max_backoff": options.pop("retry_backoff_max", self.BACKOFF_MAX), 

122 "methods": options.pop("retry_on_methods", self._method_whitelist), 

123 "timeout": options.pop("timeout", self.timeout), 

124 "history": [], 

125 } 

126 

127 def get_backoff_time(self, settings: Dict[str, Any]) -> float: 

128 """Returns the current backoff time. 

129 

130 :param dict settings: The retry settings. 

131 :return: The current backoff value. 

132 :rtype: float 

133 """ 

134 # We want to consider only the last consecutive errors sequence (Ignore redirects). 

135 consecutive_errors_len = len(settings["history"]) 

136 if consecutive_errors_len <= 1: 

137 return 0 

138 

139 if self.retry_mode == RetryMode.Fixed: 

140 backoff_value = settings["backoff"] 

141 else: 

142 backoff_value = settings["backoff"] * (2 ** (consecutive_errors_len - 1)) 

143 return min(settings["max_backoff"], backoff_value) 

144 

145 def parse_retry_after(self, retry_after: str) -> float: 

146 """Helper to parse Retry-After and get value in seconds. 

147 

148 :param str retry_after: Retry-After header 

149 :rtype: float 

150 :return: Value of Retry-After in seconds. 

151 """ 

152 return _utils.parse_retry_after(retry_after) 

153 

154 def get_retry_after(self, response: PipelineResponse[Any, AllHttpResponseType]) -> Optional[float]: 

155 """Get the value of Retry-After in seconds. 

156 

157 :param response: The PipelineResponse object 

158 :type response: ~azure.core.pipeline.PipelineResponse 

159 :return: Value of Retry-After in seconds. 

160 :rtype: float or None 

161 """ 

162 return _utils.get_retry_after(response) 

163 

164 def _is_connection_error(self, err: Exception) -> bool: 

165 """Errors when we're fairly sure that the server did not receive the 

166 request, so it should be safe to retry. 

167 

168 :param err: The error raised by the pipeline. 

169 :type err: ~azure.core.exceptions.AzureError 

170 :return: True if connection error, False if not. 

171 :rtype: bool 

172 """ 

173 return isinstance(err, ServiceRequestError) 

174 

175 def _is_read_error(self, err: Exception) -> bool: 

176 """Errors that occur after the request has been started, so we should 

177 assume that the server began processing it. 

178 

179 :param err: The error raised by the pipeline. 

180 :type err: ~azure.core.exceptions.AzureError 

181 :return: True if read error, False if not. 

182 :rtype: bool 

183 """ 

184 return isinstance(err, ServiceResponseError) 

185 

186 def _is_method_retryable( 

187 self, 

188 settings: Dict[str, Any], 

189 request: HTTPRequestType, 

190 response: Optional[AllHttpResponseType] = None, 

191 ): 

192 """Checks if a given HTTP method should be retried upon, depending if 

193 it is included on the method allowlist. 

194 

195 :param dict settings: The retry settings. 

196 :param request: The HTTP request object. 

197 :type request: ~azure.core.rest.HttpRequest 

198 :param response: The HTTP response object. 

199 :type response: ~azure.core.rest.HttpResponse or ~azure.core.rest.AsyncHttpResponse 

200 :return: True if method should be retried upon. False if not in method allowlist. 

201 :rtype: bool 

202 """ 

203 if response and request.method.upper() in ["POST", "PATCH"] and response.status_code in [500, 503, 504]: 

204 return True 

205 if request.method.upper() not in settings["methods"]: 

206 return False 

207 

208 return True 

209 

210 def is_retry( 

211 self, 

212 settings: Dict[str, Any], 

213 response: PipelineResponse[HTTPRequestType, AllHttpResponseType], 

214 ) -> bool: 

215 """Checks if method/status code is retryable. 

216 

217 Based on allowlists and control variables such as the number of 

218 total retries to allow, whether to respect the Retry-After header, 

219 whether this header is present, and whether the returned status 

220 code is on the list of status codes to be retried upon on the 

221 presence of the aforementioned header. 

222 

223 The behavior is: 

224 - If status_code < 400: don't retry 

225 - Else if Retry-After present: retry 

226 - Else: retry based on the safe status code list ([408, 429, 500, 502, 503, 504]) 

227 

228 

229 :param dict settings: The retry settings. 

230 :param response: The PipelineResponse object 

231 :type response: ~azure.core.pipeline.PipelineResponse 

232 :return: True if method/status code is retryable. False if not retryable. 

233 :rtype: bool 

234 """ 

235 if response.http_response.status_code < 400: 

236 return False 

237 has_retry_after = bool(response.http_response.headers.get("Retry-After")) 

238 if has_retry_after and self._respect_retry_after_header: 

239 return True 

240 if not self._is_method_retryable(settings, response.http_request, response=response.http_response): 

241 return False 

242 return settings["total"] and response.http_response.status_code in self._retry_on_status_codes 

243 

244 def is_exhausted(self, settings: Dict[str, Any]) -> bool: 

245 """Checks if any retries left. 

246 

247 :param dict settings: the retry settings 

248 :return: False if have more retries. True if retries exhausted. 

249 :rtype: bool 

250 """ 

251 settings_retry_count = ( 

252 settings["total"], 

253 settings["connect"], 

254 settings["read"], 

255 settings["status"], 

256 ) 

257 retry_counts: List[int] = list(filter(None, settings_retry_count)) 

258 if not retry_counts: 

259 return False 

260 

261 return min(retry_counts) < 0 

262 

263 def increment( 

264 self, 

265 settings: Dict[str, Any], 

266 response: Optional[ 

267 Union[ 

268 PipelineRequest[HTTPRequestType], 

269 PipelineResponse[HTTPRequestType, AllHttpResponseType], 

270 ] 

271 ] = None, 

272 error: Optional[Exception] = None, 

273 ) -> bool: 

274 """Increment the retry counters. 

275 

276 :param settings: The retry settings. 

277 :type settings: dict 

278 :param response: A pipeline response object. 

279 :type response: ~azure.core.pipeline.PipelineResponse 

280 :param error: An error encountered during the request, or 

281 None if the response was received successfully. 

282 :type error: ~azure.core.exceptions.AzureError 

283 :return: Whether any retry attempt is available 

284 True if more retry attempts available, False otherwise 

285 :rtype: bool 

286 """ 

287 # FIXME This code is not None safe: https://github.com/Azure/azure-sdk-for-python/issues/31528 

288 response = cast( 

289 Union[ 

290 PipelineRequest[HTTPRequestType], 

291 PipelineResponse[HTTPRequestType, AllHttpResponseType], 

292 ], 

293 response, 

294 ) 

295 

296 settings["total"] -= 1 

297 

298 if isinstance(response, PipelineResponse) and response.http_response.status_code == 202: 

299 return False 

300 

301 if error and self._is_connection_error(error): 

302 # Connect retry? 

303 settings["connect"] -= 1 

304 settings["history"].append(RequestHistory(response.http_request, error=error)) 

305 

306 elif error and self._is_read_error(error): 

307 # Read retry? 

308 settings["read"] -= 1 

309 if hasattr(response, "http_request"): 

310 settings["history"].append(RequestHistory(response.http_request, error=error)) 

311 

312 else: 

313 # Incrementing because of a server error like a 500 in 

314 # status_forcelist and the given method is in the allowlist 

315 if response: 

316 settings["status"] -= 1 

317 if hasattr(response, "http_request") and hasattr(response, "http_response"): 

318 settings["history"].append( 

319 RequestHistory(response.http_request, http_response=response.http_response) 

320 ) 

321 

322 if self.is_exhausted(settings): 

323 return False 

324 

325 if response.http_request.body and hasattr(response.http_request.body, "read"): 

326 if "body_position" not in settings: 

327 return False 

328 try: 

329 # attempt to rewind the body to the initial position 

330 # If it has "read", it has "seek", so casting for mypy 

331 cast(IO[bytes], response.http_request.body).seek(settings["body_position"], SEEK_SET) 

332 except (UnsupportedOperation, ValueError, AttributeError): 

333 # if body is not seekable, then retry would not work 

334 return False 

335 file_positions = settings.get("file_positions") 

336 if response.http_request.files and file_positions: 

337 try: 

338 for value in response.http_request.files.values(): 

339 file_name, body = value[0], value[1] 

340 if file_name in file_positions: 

341 position = file_positions[file_name] 

342 body.seek(position, SEEK_SET) 

343 except (UnsupportedOperation, ValueError, AttributeError): 

344 # if body is not seekable, then retry would not work 

345 return False 

346 return True 

347 

348 def update_context(self, context: PipelineContext, retry_settings: Dict[str, Any]) -> None: 

349 """Updates retry history in pipeline context. 

350 

351 :param context: The pipeline context. 

352 :type context: ~azure.core.pipeline.PipelineContext 

353 :param retry_settings: The retry settings. 

354 :type retry_settings: dict 

355 """ 

356 if retry_settings["history"]: 

357 context["history"] = retry_settings["history"] 

358 

359 def _configure_timeout( 

360 self, 

361 request: PipelineRequest[HTTPRequestType], 

362 absolute_timeout: float, 

363 is_response_error: bool, 

364 ) -> None: 

365 if absolute_timeout <= 0: 

366 if is_response_error: 

367 raise ServiceResponseTimeoutError("Response timeout") 

368 raise ServiceRequestTimeoutError("Request timeout") 

369 

370 # if connection_timeout is already set, ensure it doesn't exceed absolute_timeout 

371 connection_timeout = request.context.options.get("connection_timeout") 

372 if connection_timeout: 

373 request.context.options["connection_timeout"] = min(connection_timeout, absolute_timeout) 

374 

375 # otherwise, try to ensure the transport's configured connection_timeout doesn't exceed absolute_timeout 

376 # ("connection_config" isn't defined on Async/HttpTransport but all implementations in this library have it) 

377 elif hasattr(request.context.transport, "connection_config"): 

378 # FIXME This is fragile, should be refactored. Casting my way for mypy 

379 # https://github.com/Azure/azure-sdk-for-python/issues/31530 

380 connection_config = cast( 

381 ConnectionConfiguration, request.context.transport.connection_config # type: ignore 

382 ) 

383 

384 default_timeout = getattr(connection_config, "timeout", absolute_timeout) 

385 try: 

386 if absolute_timeout < default_timeout: 

387 request.context.options["connection_timeout"] = absolute_timeout 

388 except TypeError: 

389 # transport.connection_config.timeout is something unexpected (not a number) 

390 pass 

391 

392 def _configure_positions(self, request: PipelineRequest[HTTPRequestType], retry_settings: Dict[str, Any]) -> None: 

393 body_position = None 

394 file_positions: Optional[Dict[str, int]] = None 

395 if request.http_request.body and hasattr(request.http_request.body, "read"): 

396 try: 

397 # If it has "read", it has "tell", so casting for mypy 

398 body_position = cast(IO[bytes], request.http_request.body).tell() 

399 except (AttributeError, UnsupportedOperation): 

400 # if body position cannot be obtained, then retries will not work 

401 pass 

402 else: 

403 if request.http_request.files: 

404 file_positions = {} 

405 try: 

406 for value in request.http_request.files.values(): 

407 name, body = value[0], value[1] 

408 if name and body and hasattr(body, "read"): 

409 # If it has "read", it has "tell", so casting for mypy 

410 position = cast(IO[bytes], body).tell() 

411 file_positions[name] = position 

412 except (AttributeError, UnsupportedOperation): 

413 file_positions = None 

414 

415 retry_settings["body_position"] = body_position 

416 retry_settings["file_positions"] = file_positions 

417 

418 

419class RetryPolicy(RetryPolicyBase, HTTPPolicy[HTTPRequestType, HTTPResponseType]): 

420 """A retry policy. 

421 

422 The retry policy in the pipeline can be configured directly, or tweaked on a per-call basis. 

423 

424 :keyword int retry_total: Total number of retries to allow. Takes precedence over other counts. 

425 Default value is 10. 

426 :keyword int retry_connect: How many connection-related errors to retry on. 

427 These are errors raised before the request is sent to the remote server, 

428 which we assume has not triggered the server to process the request. Default value is 3. 

429 :keyword int retry_read: How many times to retry on read errors. 

430 These errors are raised after the request was sent to the server, so the 

431 request may have side-effects. Default value is 3. 

432 :keyword int retry_status: How many times to retry on bad status codes. Default value is 3. 

433 :keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try 

434 (most errors are resolved immediately by a second try without a delay). 

435 In fixed mode, retry policy will always sleep for {backoff factor}. 

436 In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))` 

437 seconds. If the backoff_factor is 0.1, then the retry will sleep 

438 for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8. 

439 :keyword int retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes). 

440 :keyword RetryMode retry_mode: Fixed or exponential delay between attemps, default is exponential. 

441 :keyword int timeout: Timeout setting for the operation in seconds, default is 604800s (7 days). 

442 

443 .. admonition:: Example: 

444 

445 .. literalinclude:: ../samples/test_example_sync.py 

446 :start-after: [START retry_policy] 

447 :end-before: [END retry_policy] 

448 :language: python 

449 :dedent: 4 

450 :caption: Configuring a retry policy. 

451 """ 

452 

453 def _sleep_for_retry( 

454 self, 

455 response: PipelineResponse[HTTPRequestType, HTTPResponseType], 

456 transport: HttpTransport[HTTPRequestType, HTTPResponseType], 

457 ) -> bool: 

458 """Sleep based on the Retry-After response header value. 

459 

460 :param response: The PipelineResponse object. 

461 :type response: ~azure.core.pipeline.PipelineResponse 

462 :param transport: The HTTP transport type. 

463 :type transport: ~azure.core.pipeline.transport.HttpTransport 

464 :return: Whether a sleep was done or not 

465 :rtype: bool 

466 """ 

467 retry_after = self.get_retry_after(response) 

468 if retry_after: 

469 transport.sleep(retry_after) 

470 return True 

471 return False 

472 

473 def _sleep_backoff( 

474 self, 

475 settings: Dict[str, Any], 

476 transport: HttpTransport[HTTPRequestType, HTTPResponseType], 

477 ) -> None: 

478 """Sleep using exponential backoff. Immediately returns if backoff is 0. 

479 

480 :param dict settings: The retry settings. 

481 :param transport: The HTTP transport type. 

482 :type transport: ~azure.core.pipeline.transport.HttpTransport 

483 """ 

484 backoff = self.get_backoff_time(settings) 

485 if backoff <= 0: 

486 return 

487 transport.sleep(backoff) 

488 

489 def sleep( 

490 self, 

491 settings: Dict[str, Any], 

492 transport: HttpTransport[HTTPRequestType, HTTPResponseType], 

493 response: Optional[PipelineResponse[HTTPRequestType, HTTPResponseType]] = None, 

494 ) -> None: 

495 """Sleep between retry attempts. 

496 

497 This method will respect a server's ``Retry-After`` response header 

498 and sleep the duration of the time requested. If that is not present, it 

499 will use an exponential backoff. By default, the backoff factor is 0 and 

500 this method will return immediately. 

501 

502 :param dict settings: The retry settings. 

503 :param transport: The HTTP transport type. 

504 :type transport: ~azure.core.pipeline.transport.HttpTransport 

505 :param response: The PipelineResponse object. 

506 :type response: ~azure.core.pipeline.PipelineResponse 

507 """ 

508 if response: 

509 slept = self._sleep_for_retry(response, transport) 

510 if slept: 

511 return 

512 self._sleep_backoff(settings, transport) 

513 

514 def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]: 

515 """Sends the PipelineRequest object to the next policy. Uses retry settings if necessary. 

516 

517 :param request: The PipelineRequest object 

518 :type request: ~azure.core.pipeline.PipelineRequest 

519 :return: The PipelineResponse. 

520 :rtype: ~azure.core.pipeline.PipelineResponse 

521 :raises ~azure.core.exceptions.AzureError: if maximum retries exceeded. 

522 :raises ~azure.core.exceptions.ClientAuthenticationError: if authentication fails. 

523 """ 

524 retry_active = True 

525 response = None 

526 retry_settings = self.configure_retries(request.context.options) 

527 self._configure_positions(request, retry_settings) 

528 

529 absolute_timeout = retry_settings["timeout"] 

530 is_response_error = True 

531 

532 while retry_active: 

533 start_time = time.time() 

534 # PipelineContext types transport as a Union of HttpTransport and AsyncHttpTransport, but 

535 # here we know that this is an HttpTransport. 

536 # The correct fix is to make PipelineContext generic, but that's a breaking change and a lot of 

537 # generic to update in Pipeline, PipelineClient, PipelineRequest, PipelineResponse, etc. 

538 transport: HttpTransport[HTTPRequestType, HTTPResponseType] = cast( 

539 HttpTransport[HTTPRequestType, HTTPResponseType], 

540 request.context.transport, 

541 ) 

542 try: 

543 self._configure_timeout(request, absolute_timeout, is_response_error) 

544 request.context["retry_count"] = len(retry_settings["history"]) 

545 response = self.next.send(request) 

546 if self.is_retry(retry_settings, response): 

547 retry_active = self.increment(retry_settings, response=response) 

548 if retry_active: 

549 self.sleep(retry_settings, transport, response=response) 

550 is_response_error = True 

551 continue 

552 break 

553 except ClientAuthenticationError: 

554 # the authentication policy failed such that the client's request can't 

555 # succeed--we'll never have a response to it, so propagate the exception 

556 raise 

557 except AzureError as err: 

558 if absolute_timeout > 0 and self._is_method_retryable(retry_settings, request.http_request): 

559 retry_active = self.increment(retry_settings, response=request, error=err) 

560 if retry_active: 

561 self.sleep(retry_settings, transport) 

562 if isinstance(err, ServiceRequestError): 

563 is_response_error = False 

564 else: 

565 is_response_error = True 

566 continue 

567 raise err 

568 finally: 

569 end_time = time.time() 

570 if absolute_timeout: 

571 absolute_timeout -= end_time - start_time 

572 if not response: 

573 raise AzureError("Maximum retries exceeded.") 

574 

575 self.update_context(response.context, retry_settings) 

576 return response