Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/polling/base_polling.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

355 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26import abc 

27import base64 

28import json 

29from enum import Enum 

30from typing import ( 

31 Optional, 

32 Any, 

33 Tuple, 

34 Callable, 

35 Dict, 

36 Mapping, 

37 Sequence, 

38 Generic, 

39 TypeVar, 

40 cast, 

41 Union, 

42) 

43 

44from ..exceptions import HttpResponseError, DecodeError 

45from . import PollingMethod 

46from ..pipeline.policies._utils import get_retry_after 

47from ..pipeline._tools import is_rest 

48from .._enum_meta import CaseInsensitiveEnumMeta 

49from .. import PipelineClient 

50from ..pipeline import PipelineResponse, PipelineContext 

51from ..rest._helpers import decode_to_text, get_charset_encoding 

52from ..utils._utils import case_insensitive_dict 

53from ..pipeline.transport import ( 

54 HttpTransport, 

55 HttpRequest as LegacyHttpRequest, 

56 HttpResponse as LegacyHttpResponse, 

57 AsyncHttpResponse as LegacyAsyncHttpResponse, 

58) 

59from ..rest import HttpRequest, HttpResponse, AsyncHttpResponse 

60from ._utils import ( 

61 _encode_continuation_token, 

62 _decode_continuation_token, 

63 _filter_sensitive_headers, 

64) 

65 

66 

67HttpRequestType = Union[LegacyHttpRequest, HttpRequest] 

68HttpResponseType = Union[LegacyHttpResponse, HttpResponse] # Sync only 

69AllHttpResponseType = Union[ 

70 LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse 

71] # Sync or async 

72LegacyPipelineResponseType = PipelineResponse[LegacyHttpRequest, LegacyHttpResponse] 

73NewPipelineResponseType = PipelineResponse[HttpRequest, HttpResponse] 

74PipelineResponseType = PipelineResponse[HttpRequestType, HttpResponseType] 

75HttpRequestTypeVar = TypeVar("HttpRequestTypeVar", bound=HttpRequestType) 

76HttpResponseTypeVar = TypeVar("HttpResponseTypeVar", bound=HttpResponseType) # Sync only 

77AllHttpResponseTypeVar = TypeVar("AllHttpResponseTypeVar", bound=AllHttpResponseType) # Sync or async 

78 

79ABC = abc.ABC 

80PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True) 

81PipelineClientType = TypeVar("PipelineClientType") 

82HTTPResponseType_co = TypeVar("HTTPResponseType_co", covariant=True) 

83HTTPRequestType_co = TypeVar("HTTPRequestType_co", covariant=True) 

84 

85 

86_FINISHED = frozenset(["succeeded", "canceled", "failed"]) 

87_FAILED = frozenset(["canceled", "failed"]) 

88_SUCCEEDED = frozenset(["succeeded"]) 

89 

90 

91class _ContinuationTokenHttpResponse: 

92 """A minimal HTTP response class for reconstructing responses from continuation tokens. 

93 

94 This class provides just enough interface to be used with LRO polling operations 

95 when restoring from a continuation token. 

96 

97 :param request: The HTTP request (optional, may be None if not available in the continuation token) 

98 :type request: ~azure.core.rest.HttpRequest or None 

99 :param status_code: The HTTP status code 

100 :type status_code: int 

101 :param headers: The response headers 

102 :type headers: dict 

103 :param content: The response content 

104 :type content: bytes 

105 """ 

106 

107 def __init__( 

108 self, 

109 request: Optional[HttpRequest], 

110 status_code: int, 

111 headers: Dict[str, str], 

112 content: bytes, 

113 ): 

114 self.request = request 

115 self.status_code = status_code 

116 self.headers = case_insensitive_dict(headers) 

117 self._content = content 

118 

119 @property 

120 def content(self) -> bytes: 

121 """Return the response content. 

122 

123 :return: The response content 

124 :rtype: bytes 

125 """ 

126 return self._content 

127 

128 def text(self) -> str: 

129 """Return the response content as text. 

130 

131 Uses the charset from Content-Type header if available, otherwise falls back 

132 to UTF-8 with replacement for invalid characters. 

133 

134 :return: The response content as text 

135 :rtype: str 

136 """ 

137 encoding = get_charset_encoding(self) 

138 return decode_to_text(encoding, self._content) 

139 

140 

141def _get_content(response: AllHttpResponseType) -> bytes: 

142 """Get the content of this response. This is designed specifically to avoid 

143 a warning of mypy for body() access, as this method is deprecated. 

144 

145 :param response: The response object. 

146 :type response: any 

147 :return: The content of this response. 

148 :rtype: bytes 

149 """ 

150 if isinstance(response, (LegacyHttpResponse, LegacyAsyncHttpResponse)): 

151 return response.body() 

152 return response.content 

153 

154 

155def _finished(status): 

156 if hasattr(status, "value"): 

157 status = status.value 

158 return str(status).lower() in _FINISHED 

159 

160 

161def _failed(status): 

162 if hasattr(status, "value"): 

163 status = status.value 

164 return str(status).lower() in _FAILED 

165 

166 

167def _succeeded(status): 

168 if hasattr(status, "value"): 

169 status = status.value 

170 return str(status).lower() in _SUCCEEDED 

171 

172 

173class BadStatus(Exception): 

174 """Exception raised when status is invalid.""" 

175 

176 

177class BadResponse(Exception): 

178 """Exception raised when response is invalid.""" 

179 

180 

181class OperationFailed(Exception): 

182 """Exception raised when operation failed or canceled.""" 

183 

184 

185def _as_json(response: AllHttpResponseType) -> Dict[str, Any]: 

186 """Assuming this is not empty, return the content as JSON. 

187 

188 Result/exceptions is not determined if you call this method without testing _is_empty. 

189 

190 :param response: The response object. 

191 :type response: any 

192 :return: The content of this response as dict. 

193 :rtype: dict 

194 :raises DecodeError: If response body contains invalid json data. 

195 """ 

196 try: 

197 return json.loads(response.text()) 

198 except ValueError as err: 

199 raise DecodeError("Error occurred in deserializing the response body.") from err 

200 

201 

202def _raise_if_bad_http_status_and_method(response: AllHttpResponseType) -> None: 

203 """Check response status code is valid. 

204 

205 Must be 200, 201, 202, or 204. 

206 

207 :param response: The response object. 

208 :type response: any 

209 :raises ~azure.core.polling.base_polling.BadStatus: If invalid status. 

210 """ 

211 code = response.status_code 

212 if code in {200, 201, 202, 204}: 

213 return 

214 raise BadStatus("Invalid return status {!r} for {!r} operation".format(code, response.request.method)) 

215 

216 

217def _is_empty(response: AllHttpResponseType) -> bool: 

218 """Check if response body contains meaningful content. 

219 

220 :param response: The response object. 

221 :type response: any 

222 :return: True if response body is empty, False otherwise. 

223 :rtype: bool 

224 """ 

225 return not bool(_get_content(response)) 

226 

227 

228class LongRunningOperation(ABC, Generic[HTTPRequestType_co, HTTPResponseType_co]): 

229 """Protocol to implement for a long running operation algorithm.""" 

230 

231 @abc.abstractmethod 

232 def can_poll( 

233 self, 

234 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], 

235 ) -> bool: 

236 """Answer if this polling method could be used. 

237 

238 :param pipeline_response: Initial REST call response. 

239 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

240 :return: True if this polling method could be used, False otherwise. 

241 :rtype: bool 

242 """ 

243 raise NotImplementedError() 

244 

245 @abc.abstractmethod 

246 def get_polling_url(self) -> str: 

247 """Return the polling URL. 

248 

249 :return: The polling URL. 

250 :rtype: str 

251 """ 

252 raise NotImplementedError() 

253 

254 @abc.abstractmethod 

255 def set_initial_status( 

256 self, 

257 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], 

258 ) -> str: 

259 """Process first response after initiating long running operation. 

260 

261 :param pipeline_response: Initial REST call response. 

262 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

263 :return: The initial status. 

264 :rtype: str 

265 """ 

266 raise NotImplementedError() 

267 

268 @abc.abstractmethod 

269 def get_status( 

270 self, 

271 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], 

272 ) -> str: 

273 """Return the status string extracted from this response. 

274 

275 :param pipeline_response: The response object. 

276 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

277 :return: The status string. 

278 :rtype: str 

279 """ 

280 raise NotImplementedError() 

281 

282 @abc.abstractmethod 

283 def get_final_get_url( 

284 self, 

285 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], 

286 ) -> Optional[str]: 

287 """If a final GET is needed, returns the URL. 

288 

289 :param pipeline_response: Success REST call response. 

290 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

291 :return: The URL to the final GET, or None if no final GET is needed. 

292 :rtype: str or None 

293 """ 

294 raise NotImplementedError() 

295 

296 

297class _LroOption(str, Enum, metaclass=CaseInsensitiveEnumMeta): 

298 """Known LRO options from Swagger.""" 

299 

300 FINAL_STATE_VIA = "final-state-via" 

301 

302 

303class _FinalStateViaOption(str, Enum, metaclass=CaseInsensitiveEnumMeta): 

304 """Possible final-state-via options.""" 

305 

306 AZURE_ASYNC_OPERATION_FINAL_STATE = "azure-async-operation" 

307 LOCATION_FINAL_STATE = "location" 

308 OPERATION_LOCATION_FINAL_STATE = "operation-location" 

309 

310 

311class OperationResourcePolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): 

312 """Implements a operation resource polling, typically from Operation-Location. 

313 

314 :param str operation_location_header: Name of the header to return operation format (default 'operation-location') 

315 :keyword dict[str, any] lro_options: Additional options for LRO. For more information, see 

316 https://aka.ms/azsdk/autorest/openapi/lro-options 

317 """ 

318 

319 _async_url: str 

320 """Url to resource monitor (AzureAsyncOperation or Operation-Location)""" 

321 

322 _location_url: Optional[str] 

323 """Location header if present""" 

324 

325 _request: Any 

326 """The initial request done""" 

327 

328 def __init__( 

329 self, operation_location_header: str = "operation-location", *, lro_options: Optional[Dict[str, Any]] = None 

330 ): 

331 self._operation_location_header = operation_location_header 

332 self._location_url = None 

333 self._lro_options = lro_options or {} 

334 

335 def can_poll( 

336 self, 

337 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

338 ) -> bool: 

339 """Check if status monitor header (e.g. Operation-Location) is present. 

340 

341 :param pipeline_response: Initial REST call response. 

342 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

343 :return: True if this polling method could be used, False otherwise. 

344 :rtype: bool 

345 """ 

346 response = pipeline_response.http_response 

347 return self._operation_location_header in response.headers 

348 

349 def get_polling_url(self) -> str: 

350 """Return the polling URL. 

351 

352 Will extract it from the defined header to read (e.g. Operation-Location) 

353 

354 :return: The polling URL. 

355 :rtype: str 

356 """ 

357 return self._async_url 

358 

359 def get_final_get_url( 

360 self, 

361 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

362 ) -> Optional[str]: 

363 """If a final GET is needed, returns the URL. 

364 

365 :param pipeline_response: Success REST call response. 

366 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

367 :return: The URL to the final GET, or None if no final GET is needed. 

368 :rtype: str or None 

369 """ 

370 if ( 

371 self._lro_options.get(_LroOption.FINAL_STATE_VIA) == _FinalStateViaOption.LOCATION_FINAL_STATE 

372 and self._location_url 

373 ): 

374 return self._location_url 

375 if ( 

376 self._lro_options.get(_LroOption.FINAL_STATE_VIA) 

377 in [ 

378 _FinalStateViaOption.AZURE_ASYNC_OPERATION_FINAL_STATE, 

379 _FinalStateViaOption.OPERATION_LOCATION_FINAL_STATE, 

380 ] 

381 and self._request.method == "POST" 

382 ): 

383 return None 

384 response = pipeline_response.http_response 

385 if not _is_empty(response): 

386 body = _as_json(response) 

387 # https://github.com/microsoft/api-guidelines/blob/vNext/Guidelines.md#target-resource-location 

388 resource_location = body.get("resourceLocation") 

389 if resource_location: 

390 return resource_location 

391 

392 if self._request.method in {"PUT", "PATCH"}: 

393 return self._request.url 

394 

395 if self._request.method == "POST" and self._location_url: 

396 return self._location_url 

397 

398 return None 

399 

400 def set_initial_status( 

401 self, 

402 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

403 ) -> str: 

404 """Process first response after initiating long running operation. 

405 

406 :param pipeline_response: Initial REST call response. 

407 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

408 :return: The initial status. 

409 :rtype: str 

410 """ 

411 self._request = pipeline_response.http_response.request 

412 response = pipeline_response.http_response 

413 

414 self._set_async_url_if_present(response) 

415 

416 if response.status_code in {200, 201, 202, 204} and self._async_url: 

417 # Check if we can extract status from initial response, if present 

418 try: 

419 return self.get_status(pipeline_response) 

420 # Wide catch, it may not even be JSON at all, deserialization is lenient 

421 except Exception: # pylint: disable=broad-except 

422 pass 

423 return "InProgress" 

424 raise OperationFailed("Operation failed or canceled") 

425 

426 def _set_async_url_if_present(self, response: AllHttpResponseTypeVar) -> None: 

427 self._async_url = response.headers[self._operation_location_header] 

428 

429 location_url = response.headers.get("location") 

430 if location_url: 

431 self._location_url = location_url 

432 

433 def get_status( 

434 self, 

435 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

436 ) -> str: 

437 """Process the latest status update retrieved from an "Operation-Location" header. 

438 

439 :param pipeline_response: Initial REST call response. 

440 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

441 :return: The status string. 

442 :rtype: str 

443 :raises ~azure.core.polling.base_polling.BadResponse: if response has no body, or body does not contain status. 

444 """ 

445 response = pipeline_response.http_response 

446 if _is_empty(response): 

447 raise BadResponse("The response from long running operation does not contain a body.") 

448 

449 body = _as_json(response) 

450 status = body.get("status") 

451 if not status: 

452 raise BadResponse("No status found in body") 

453 return status 

454 

455 

456class LocationPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): 

457 """Implements a Location polling.""" 

458 

459 _location_url: str 

460 """Location header""" 

461 

462 def can_poll( 

463 self, 

464 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

465 ) -> bool: 

466 """True if contains a Location header 

467 

468 :param pipeline_response: Initial REST call response. 

469 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

470 :return: True if this polling method could be used, False otherwise. 

471 :rtype: bool 

472 """ 

473 response = pipeline_response.http_response 

474 return "location" in response.headers 

475 

476 def get_polling_url(self) -> str: 

477 """Return the Location header value. 

478 

479 :return: The polling URL. 

480 :rtype: str 

481 """ 

482 return self._location_url 

483 

484 def get_final_get_url( 

485 self, 

486 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

487 ) -> Optional[str]: 

488 """If a final GET is needed, returns the URL. 

489 

490 Always return None for a Location polling. 

491 

492 :param pipeline_response: Success REST call response. 

493 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

494 :return: Always None for this implementation. 

495 :rtype: None 

496 """ 

497 return None 

498 

499 def set_initial_status( 

500 self, 

501 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

502 ) -> str: 

503 """Process first response after initiating long running operation. 

504 

505 :param pipeline_response: Initial REST call response. 

506 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

507 :return: The initial status. 

508 :rtype: str 

509 """ 

510 response = pipeline_response.http_response 

511 

512 self._location_url = response.headers["location"] 

513 

514 if response.status_code in {200, 201, 202, 204} and self._location_url: 

515 return "InProgress" 

516 raise OperationFailed("Operation failed or canceled") 

517 

518 def get_status( 

519 self, 

520 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

521 ) -> str: 

522 """Return the status string extracted from this response. 

523 

524 For Location polling, it means the status monitor returns 202. 

525 

526 :param pipeline_response: Initial REST call response. 

527 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

528 :return: The status string. 

529 :rtype: str 

530 """ 

531 response = pipeline_response.http_response 

532 if "location" in response.headers: 

533 self._location_url = response.headers["location"] 

534 

535 return "InProgress" if response.status_code == 202 else "Succeeded" 

536 

537 

538class StatusCheckPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): 

539 """Should be the fallback polling, that don't poll but exit successfully 

540 if not other polling are detected and status code is 2xx. 

541 """ 

542 

543 def can_poll( 

544 self, 

545 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

546 ) -> bool: 

547 """Answer if this polling method could be used. 

548 

549 For this implementation, always True. 

550 

551 :param pipeline_response: Initial REST call response. 

552 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

553 :return: True if this polling method could be used, False otherwise. 

554 :rtype: bool 

555 """ 

556 return True 

557 

558 def get_polling_url(self) -> str: 

559 """Return the polling URL. 

560 

561 This is not implemented for this polling, since we're never supposed to loop. 

562 

563 :return: The polling URL. 

564 :rtype: str 

565 """ 

566 raise ValueError("This polling doesn't support polling url") 

567 

568 def set_initial_status( 

569 self, 

570 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

571 ) -> str: 

572 """Process first response after initiating long running operation. 

573 

574 Will succeed immediately. 

575 

576 :param pipeline_response: Initial REST call response. 

577 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

578 :return: The initial status. 

579 :rtype: str 

580 """ 

581 return "Succeeded" 

582 

583 def get_status( 

584 self, 

585 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

586 ) -> str: 

587 """Return the status string extracted from this response. 

588 

589 Only possible status is success. 

590 

591 :param pipeline_response: Initial REST call response. 

592 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

593 :return: The status string. 

594 :rtype: str 

595 """ 

596 return "Succeeded" 

597 

598 def get_final_get_url( 

599 self, 

600 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

601 ) -> Optional[str]: 

602 """If a final GET is needed, returns the URL. 

603 

604 :param pipeline_response: Success REST call response. 

605 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

606 :rtype: str 

607 :return: Always None for this implementation. 

608 """ 

609 return None 

610 

611 

612class _SansIOLROBasePolling( 

613 Generic[ 

614 PollingReturnType_co, 

615 PipelineClientType, 

616 HttpRequestTypeVar, 

617 AllHttpResponseTypeVar, 

618 ] 

619): # pylint: disable=too-many-instance-attributes 

620 """A base class that has no opinion on IO, to help mypy be accurate. 

621 

622 :param float timeout: Default polling internal in absence of Retry-After header, in seconds. 

623 :param list[LongRunningOperation] lro_algorithms: Ordered list of LRO algorithms to use. 

624 :param lro_options: Additional options for LRO. For more information, see the algorithm's docstring. 

625 :type lro_options: dict[str, any] 

626 :param path_format_arguments: A dictionary of the format arguments to be used to format the URL. 

627 :type path_format_arguments: dict[str, str] 

628 """ 

629 

630 _deserialization_callback: Callable[[Any], PollingReturnType_co] 

631 """The deserialization callback that returns the final instance.""" 

632 

633 _operation: LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar] 

634 """The algorithm this poller has decided to use. Will loop through 'can_poll' of the input algorithms to decide.""" 

635 

636 _status: str 

637 """Hold the current status of this poller""" 

638 

639 _client: PipelineClientType 

640 """The Azure Core Pipeline client used to make request.""" 

641 

642 def __init__( 

643 self, 

644 timeout: float = 30, 

645 lro_algorithms: Optional[Sequence[LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]]] = None, 

646 lro_options: Optional[Dict[str, Any]] = None, 

647 path_format_arguments: Optional[Dict[str, str]] = None, 

648 **operation_config: Any 

649 ): 

650 self._lro_algorithms = lro_algorithms or [ 

651 OperationResourcePolling(lro_options=lro_options), 

652 LocationPolling(), 

653 StatusCheckPolling(), 

654 ] 

655 

656 self._timeout = timeout 

657 self._operation_config = operation_config 

658 self._lro_options = lro_options 

659 self._path_format_arguments = path_format_arguments 

660 

661 def initialize( 

662 self, 

663 client: PipelineClientType, 

664 initial_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

665 deserialization_callback: Callable[ 

666 [PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]], 

667 PollingReturnType_co, 

668 ], 

669 ) -> None: 

670 """Set the initial status of this LRO. 

671 

672 :param client: The Azure Core Pipeline client used to make request. 

673 :type client: ~azure.core.pipeline.PipelineClient 

674 :param initial_response: The initial response for the call. 

675 :type initial_response: ~azure.core.pipeline.PipelineResponse 

676 :param deserialization_callback: A callback function to deserialize the final response. 

677 :type deserialization_callback: callable 

678 :raises ~azure.core.HttpResponseError: If initial status is incorrect LRO state 

679 """ 

680 self._client = client 

681 self._pipeline_response = ( # pylint: disable=attribute-defined-outside-init 

682 self._initial_response # pylint: disable=attribute-defined-outside-init 

683 ) = initial_response 

684 self._deserialization_callback = deserialization_callback 

685 

686 for operation in self._lro_algorithms: 

687 if operation.can_poll(initial_response): 

688 self._operation = operation 

689 break 

690 else: 

691 raise BadResponse("Unable to find status link for polling.") 

692 

693 try: 

694 _raise_if_bad_http_status_and_method(self._initial_response.http_response) 

695 self._status = self._operation.set_initial_status(initial_response) 

696 

697 except BadStatus as err: 

698 self._status = "Failed" 

699 raise HttpResponseError(response=initial_response.http_response, error=err) from err 

700 except BadResponse as err: 

701 self._status = "Failed" 

702 raise HttpResponseError(response=initial_response.http_response, message=str(err), error=err) from err 

703 except OperationFailed as err: 

704 raise HttpResponseError(response=initial_response.http_response, error=err) from err 

705 

706 def _filter_headers_for_continuation_token(self, headers: Mapping[str, str]) -> Dict[str, str]: 

707 """Filter headers to include in the continuation token. 

708 

709 Subclasses can override this method to include additional headers needed 

710 for their specific LRO implementation. 

711 

712 :param headers: The response headers to filter. 

713 :type headers: Mapping[str, str] 

714 :return: A filtered dictionary of headers to include in the continuation token. 

715 :rtype: dict[str, str] 

716 """ 

717 return _filter_sensitive_headers(headers) 

718 

719 def get_continuation_token(self) -> str: 

720 """Get a continuation token that can be used to recreate this poller. 

721 

722 :rtype: str 

723 :return: An opaque continuation token. 

724 :raises ValueError: If the initial response is not set. 

725 """ 

726 response = self._initial_response.http_response 

727 request = response.request 

728 # Serialize the essential parts of the PipelineResponse to JSON. 

729 if request: 

730 request_headers = {} 

731 # Preserve x-ms-client-request-id for request correlation 

732 if "x-ms-client-request-id" in request.headers: 

733 request_headers["x-ms-client-request-id"] = request.headers["x-ms-client-request-id"] 

734 request_state = { 

735 "method": request.method, 

736 "url": request.url, 

737 "headers": request_headers, 

738 } 

739 else: 

740 request_state = None 

741 # Get response content, handling the case where it might not be read yet 

742 try: 

743 content = _get_content(response) or b"" 

744 except Exception: # pylint: disable=broad-except 

745 content = b"" 

746 # Get deserialized data from context if available (optimization). 

747 # If context doesn't have it, fall back to parsing the response body directly. 

748 # Note: deserialized_data is only included if it's JSON-serializable. 

749 # Non-JSON-serializable types (e.g., XML ElementTree) are skipped and set to None. 

750 # In such cases, the data can still be re-parsed from the raw content bytes. 

751 deserialized_data = None 

752 raw_deserialized = None 

753 if self._initial_response.context is not None: 

754 raw_deserialized = self._initial_response.context.get("deserialized_data") 

755 # Fallback: try to get deserialized data from the response body if context didn't have it 

756 if raw_deserialized is None and content: 

757 try: 

758 raw_deserialized = json.loads(content) 

759 except (json.JSONDecodeError, ValueError, TypeError): 

760 # Response body is not valid JSON, leave as None 

761 pass 

762 if raw_deserialized is not None: 

763 try: 

764 # Test if the data is JSON-serializable 

765 json.dumps(raw_deserialized) 

766 deserialized_data = raw_deserialized 

767 except (TypeError, ValueError): 

768 # Skip non-JSON-serializable data (e.g., XML ElementTree objects) 

769 deserialized_data = None 

770 state = { 

771 "request": request_state, 

772 "response": { 

773 "status_code": response.status_code, 

774 "headers": self._filter_headers_for_continuation_token(response.headers), 

775 "content": base64.b64encode(content).decode("ascii"), 

776 }, 

777 "context": { 

778 "deserialized_data": deserialized_data, 

779 }, 

780 } 

781 return _encode_continuation_token(state) 

782 

783 @classmethod 

784 def from_continuation_token( 

785 cls, continuation_token: str, **kwargs: Any 

786 ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]: 

787 """Recreate the poller from a continuation token. 

788 

789 :param continuation_token: The continuation token to recreate the poller. 

790 :type continuation_token: str 

791 :return: A tuple containing the client, the initial response, and the deserialization callback. 

792 :rtype: tuple[~azure.core.PipelineClient, ~azure.core.pipeline.PipelineResponse, callable] 

793 :raises ValueError: If the continuation token is invalid or if 'client' or 

794 'deserialization_callback' are not provided. 

795 """ 

796 try: 

797 client = kwargs["client"] 

798 except KeyError: 

799 raise ValueError("Need kwarg 'client' to be recreated from continuation_token") from None 

800 

801 try: 

802 deserialization_callback = kwargs["deserialization_callback"] 

803 except KeyError: 

804 raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None 

805 

806 state = _decode_continuation_token(continuation_token) 

807 # Reconstruct HttpRequest if present 

808 request_state = state.get("request") 

809 http_request = None 

810 if request_state is not None: 

811 http_request = HttpRequest( 

812 method=request_state["method"], 

813 url=request_state["url"], 

814 headers=request_state.get("headers", {}), 

815 ) 

816 # Reconstruct HttpResponse using the minimal response class 

817 response_state = state["response"] 

818 http_response = _ContinuationTokenHttpResponse( 

819 request=http_request, 

820 status_code=response_state["status_code"], 

821 headers=response_state["headers"], 

822 content=base64.b64decode(response_state["content"]), 

823 ) 

824 # Reconstruct PipelineResponse 

825 context = PipelineContext(client._pipeline._transport) # pylint: disable=protected-access 

826 context_state = state.get("context", {}) 

827 if context_state.get("deserialized_data") is not None: 

828 context["deserialized_data"] = context_state["deserialized_data"] 

829 initial_response = PipelineResponse( 

830 http_request=http_request, 

831 http_response=http_response, 

832 context=context, 

833 ) 

834 return client, initial_response, deserialization_callback 

835 

836 def status(self) -> str: 

837 """Return the current status as a string. 

838 

839 :rtype: str 

840 :return: The current status. 

841 """ 

842 if not self._operation: 

843 raise ValueError("set_initial_status was never called. Did you give this instance to a poller?") 

844 return self._status 

845 

846 def finished(self) -> bool: 

847 """Is this polling finished? 

848 

849 :rtype: bool 

850 :return: True if finished, False otherwise. 

851 """ 

852 return _finished(self.status()) 

853 

854 def resource(self) -> PollingReturnType_co: 

855 """Return the built resource. 

856 

857 :rtype: any 

858 :return: The built resource. 

859 """ 

860 return self._parse_resource(self._pipeline_response) 

861 

862 def _parse_resource( 

863 self, 

864 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

865 ) -> PollingReturnType_co: 

866 """Assuming this response is a resource, use the deserialization callback to parse it. 

867 If body is empty, assuming no resource to return. 

868 

869 :param pipeline_response: The response object. 

870 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

871 :return: The parsed resource. 

872 :rtype: any 

873 """ 

874 response = pipeline_response.http_response 

875 if not _is_empty(response): 

876 return self._deserialization_callback(pipeline_response) 

877 

878 # This "type ignore" has been discussed with architects. 

879 # We have a typing problem that if the Swagger/TSP describes a return type (PollingReturnType_co is not None), 

880 # BUT the returned payload is actually empty, we don't want to fail, but return None. 

881 # To be clean, we would have to make the polling return type Optional "just in case the Swagger/TSP is wrong". 

882 # This is reducing the quality and the value of the typing annotations 

883 # for a case that is not supposed to happen in the first place. So we decided to ignore the type error here. 

884 return None # type: ignore 

885 

886 def _get_request_id(self) -> str: 

887 return self._pipeline_response.http_response.request.headers["x-ms-client-request-id"] 

888 

889 def _extract_delay(self) -> float: 

890 delay = get_retry_after(self._pipeline_response) 

891 if delay: 

892 return delay 

893 return self._timeout 

894 

895 

896class LROBasePolling( 

897 _SansIOLROBasePolling[ 

898 PollingReturnType_co, 

899 PipelineClient[HttpRequestTypeVar, HttpResponseTypeVar], 

900 HttpRequestTypeVar, 

901 HttpResponseTypeVar, 

902 ], 

903 PollingMethod[PollingReturnType_co], 

904): 

905 """A base LRO poller. 

906 

907 This assumes a basic flow: 

908 - I analyze the response to decide the polling approach 

909 - I poll 

910 - I ask the final resource depending of the polling approach 

911 

912 If your polling need are more specific, you could implement a PollingMethod directly 

913 """ 

914 

915 _initial_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar] 

916 """Store the initial response.""" 

917 

918 _pipeline_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar] 

919 """Store the latest received HTTP response, initialized by the first answer.""" 

920 

921 @property 

922 def _transport(self) -> HttpTransport[HttpRequestTypeVar, HttpResponseTypeVar]: 

923 return self._client._pipeline._transport # pylint: disable=protected-access 

924 

925 def __getattribute__(self, name: str) -> Any: 

926 """Find the right method for the job. 

927 

928 This contains a workaround for azure-mgmt-core 1.0.0 to 1.4.0, where the MRO 

929 is changing when azure-core was refactored in 1.27.0. The MRO change was causing 

930 AsyncARMPolling to look-up the wrong methods and find the non-async ones. 

931 

932 :param str name: The name of the attribute to retrieve. 

933 :rtype: Any 

934 :return: The attribute value. 

935 """ 

936 cls = object.__getattribute__(self, "__class__") 

937 if cls.__name__ == "AsyncARMPolling" and name in [ 

938 "run", 

939 "update_status", 

940 "request_status", 

941 "_sleep", 

942 "_delay", 

943 "_poll", 

944 ]: 

945 return getattr(super(LROBasePolling, self), name) 

946 return super().__getattribute__(name) 

947 

948 def run(self) -> None: 

949 try: 

950 self._poll() 

951 

952 except BadStatus as err: 

953 self._status = "Failed" 

954 raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err 

955 

956 except BadResponse as err: 

957 self._status = "Failed" 

958 raise HttpResponseError( 

959 response=self._pipeline_response.http_response, 

960 message=str(err), 

961 error=err, 

962 ) from err 

963 

964 except OperationFailed as err: 

965 raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err 

966 

967 def _poll(self) -> None: 

968 """Poll status of operation so long as operation is incomplete and 

969 we have an endpoint to query. 

970 

971 :raises ~azure.core.polling.base_polling.OperationFailed: If operation status 'Failed' or 'Canceled'. 

972 :raises ~azure.core.polling.base_polling.BadStatus: If response status invalid. 

973 :raises ~azure.core.polling.base_polling.BadResponse: If response invalid. 

974 """ 

975 if not self.finished(): 

976 self.update_status() 

977 while not self.finished(): 

978 self._delay() 

979 self.update_status() 

980 

981 if _failed(self.status()): 

982 raise OperationFailed("Operation failed or canceled") 

983 

984 final_get_url = self._operation.get_final_get_url(self._pipeline_response) 

985 if final_get_url: 

986 self._pipeline_response = self.request_status(final_get_url) 

987 _raise_if_bad_http_status_and_method(self._pipeline_response.http_response) 

988 

989 def _sleep(self, delay: float) -> None: 

990 self._transport.sleep(delay) 

991 

992 def _delay(self) -> None: 

993 """Check for a 'retry-after' header to set timeout, 

994 otherwise use configured timeout. 

995 """ 

996 delay = self._extract_delay() 

997 self._sleep(delay) 

998 

999 def update_status(self) -> None: 

1000 """Update the current status of the LRO.""" 

1001 self._pipeline_response = self.request_status(self._operation.get_polling_url()) 

1002 _raise_if_bad_http_status_and_method(self._pipeline_response.http_response) 

1003 self._status = self._operation.get_status(self._pipeline_response) 

1004 

1005 def request_status(self, status_link: str) -> PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]: 

1006 """Do a simple GET to this status link. 

1007 

1008 This method re-inject 'x-ms-client-request-id'. 

1009 

1010 :param str status_link: The URL to poll. 

1011 :rtype: azure.core.pipeline.PipelineResponse 

1012 :return: The response of the status request. 

1013 """ 

1014 if self._path_format_arguments: 

1015 status_link = self._client.format_url(status_link, **self._path_format_arguments) 

1016 # Re-inject 'x-ms-client-request-id' while polling 

1017 if "request_id" not in self._operation_config: 

1018 self._operation_config["request_id"] = self._get_request_id() 

1019 

1020 if is_rest(self._initial_response.http_response): 

1021 rest_request = cast(HttpRequestTypeVar, HttpRequest("GET", status_link)) 

1022 # Need a cast, as "_return_pipeline_response" mutate the return type, and that return type is not 

1023 # declared in the typing of "send_request" 

1024 return cast( 

1025 PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar], 

1026 self._client.send_request(rest_request, _return_pipeline_response=True, **self._operation_config), 

1027 ) 

1028 

1029 # Legacy HttpRequest and HttpResponse from azure.core.pipeline.transport 

1030 # casting things here, as we don't want the typing system to know 

1031 # about the legacy APIs. 

1032 request = cast(HttpRequestTypeVar, self._client.get(status_link)) 

1033 return cast( 

1034 PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar], 

1035 self._client._pipeline.run( # pylint: disable=protected-access 

1036 request, stream=False, **self._operation_config 

1037 ), 

1038 ) 

1039 

1040 

1041__all__ = [ 

1042 "BadResponse", 

1043 "BadStatus", 

1044 "OperationFailed", 

1045 "LongRunningOperation", 

1046 "OperationResourcePolling", 

1047 "LocationPolling", 

1048 "StatusCheckPolling", 

1049 "LROBasePolling", 

1050]