Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/polling/base_polling.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

302 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26import abc 

27import base64 

28import json 

29from enum import Enum 

30from typing import ( 

31 Optional, 

32 Any, 

33 Tuple, 

34 Callable, 

35 Dict, 

36 Sequence, 

37 Generic, 

38 TypeVar, 

39 cast, 

40 Union, 

41) 

42 

43from ..exceptions import HttpResponseError, DecodeError 

44from . import PollingMethod 

45from ..pipeline.policies._utils import get_retry_after 

46from ..pipeline._tools import is_rest 

47from .._enum_meta import CaseInsensitiveEnumMeta 

48from .. import PipelineClient 

49from ..pipeline import PipelineResponse 

50from ..pipeline.transport import ( 

51 HttpTransport, 

52 HttpRequest as LegacyHttpRequest, 

53 HttpResponse as LegacyHttpResponse, 

54 AsyncHttpResponse as LegacyAsyncHttpResponse, 

55) 

56from ..rest import HttpRequest, HttpResponse, AsyncHttpResponse 

57 

58 

59HttpRequestType = Union[LegacyHttpRequest, HttpRequest] 

60HttpResponseType = Union[LegacyHttpResponse, HttpResponse] # Sync only 

61AllHttpResponseType = Union[ 

62 LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse 

63] # Sync or async 

64LegacyPipelineResponseType = PipelineResponse[LegacyHttpRequest, LegacyHttpResponse] 

65NewPipelineResponseType = PipelineResponse[HttpRequest, HttpResponse] 

66PipelineResponseType = PipelineResponse[HttpRequestType, HttpResponseType] 

67HttpRequestTypeVar = TypeVar("HttpRequestTypeVar", bound=HttpRequestType) 

68HttpResponseTypeVar = TypeVar("HttpResponseTypeVar", bound=HttpResponseType) # Sync only 

69AllHttpResponseTypeVar = TypeVar("AllHttpResponseTypeVar", bound=AllHttpResponseType) # Sync or async 

70 

71ABC = abc.ABC 

72PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True) 

73PipelineClientType = TypeVar("PipelineClientType") 

74HTTPResponseType_co = TypeVar("HTTPResponseType_co", covariant=True) 

75HTTPRequestType_co = TypeVar("HTTPRequestType_co", covariant=True) 

76 

77 

78_FINISHED = frozenset(["succeeded", "canceled", "failed"]) 

79_FAILED = frozenset(["canceled", "failed"]) 

80_SUCCEEDED = frozenset(["succeeded"]) 

81 

82 

83def _get_content(response: AllHttpResponseType) -> bytes: 

84 """Get the content of this response. This is designed specifically to avoid 

85 a warning of mypy for body() access, as this method is deprecated. 

86 

87 :param response: The response object. 

88 :type response: any 

89 :return: The content of this response. 

90 :rtype: bytes 

91 """ 

92 if isinstance(response, (LegacyHttpResponse, LegacyAsyncHttpResponse)): 

93 return response.body() 

94 return response.content 

95 

96 

97def _finished(status): 

98 if hasattr(status, "value"): 

99 status = status.value 

100 return str(status).lower() in _FINISHED 

101 

102 

103def _failed(status): 

104 if hasattr(status, "value"): 

105 status = status.value 

106 return str(status).lower() in _FAILED 

107 

108 

109def _succeeded(status): 

110 if hasattr(status, "value"): 

111 status = status.value 

112 return str(status).lower() in _SUCCEEDED 

113 

114 

115class BadStatus(Exception): 

116 """Exception raised when status is invalid.""" 

117 

118 

119class BadResponse(Exception): 

120 """Exception raised when response is invalid.""" 

121 

122 

123class OperationFailed(Exception): 

124 """Exception raised when operation failed or canceled.""" 

125 

126 

127def _as_json(response: AllHttpResponseType) -> Dict[str, Any]: 

128 """Assuming this is not empty, return the content as JSON. 

129 

130 Result/exceptions is not determined if you call this method without testing _is_empty. 

131 

132 :param response: The response object. 

133 :type response: any 

134 :return: The content of this response as dict. 

135 :rtype: dict 

136 :raises DecodeError: If response body contains invalid json data. 

137 """ 

138 try: 

139 return json.loads(response.text()) 

140 except ValueError as err: 

141 raise DecodeError("Error occurred in deserializing the response body.") from err 

142 

143 

144def _raise_if_bad_http_status_and_method(response: AllHttpResponseType) -> None: 

145 """Check response status code is valid. 

146 

147 Must be 200, 201, 202, or 204. 

148 

149 :param response: The response object. 

150 :type response: any 

151 :raises ~azure.core.polling.base_polling.BadStatus: If invalid status. 

152 """ 

153 code = response.status_code 

154 if code in {200, 201, 202, 204}: 

155 return 

156 raise BadStatus("Invalid return status {!r} for {!r} operation".format(code, response.request.method)) 

157 

158 

159def _is_empty(response: AllHttpResponseType) -> bool: 

160 """Check if response body contains meaningful content. 

161 

162 :param response: The response object. 

163 :type response: any 

164 :return: True if response body is empty, False otherwise. 

165 :rtype: bool 

166 """ 

167 return not bool(_get_content(response)) 

168 

169 

170class LongRunningOperation(ABC, Generic[HTTPRequestType_co, HTTPResponseType_co]): 

171 """Protocol to implement for a long running operation algorithm.""" 

172 

173 @abc.abstractmethod 

174 def can_poll( 

175 self, 

176 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], 

177 ) -> bool: 

178 """Answer if this polling method could be used. 

179 

180 :param pipeline_response: Initial REST call response. 

181 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

182 :return: True if this polling method could be used, False otherwise. 

183 :rtype: bool 

184 """ 

185 raise NotImplementedError() 

186 

187 @abc.abstractmethod 

188 def get_polling_url(self) -> str: 

189 """Return the polling URL. 

190 

191 :return: The polling URL. 

192 :rtype: str 

193 """ 

194 raise NotImplementedError() 

195 

196 @abc.abstractmethod 

197 def set_initial_status( 

198 self, 

199 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], 

200 ) -> str: 

201 """Process first response after initiating long running operation. 

202 

203 :param pipeline_response: Initial REST call response. 

204 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

205 :return: The initial status. 

206 :rtype: str 

207 """ 

208 raise NotImplementedError() 

209 

210 @abc.abstractmethod 

211 def get_status( 

212 self, 

213 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], 

214 ) -> str: 

215 """Return the status string extracted from this response. 

216 

217 :param pipeline_response: The response object. 

218 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

219 :return: The status string. 

220 :rtype: str 

221 """ 

222 raise NotImplementedError() 

223 

224 @abc.abstractmethod 

225 def get_final_get_url( 

226 self, 

227 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], 

228 ) -> Optional[str]: 

229 """If a final GET is needed, returns the URL. 

230 

231 :param pipeline_response: Success REST call response. 

232 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

233 :return: The URL to the final GET, or None if no final GET is needed. 

234 :rtype: str or None 

235 """ 

236 raise NotImplementedError() 

237 

238 

239class _LroOption(str, Enum, metaclass=CaseInsensitiveEnumMeta): 

240 """Known LRO options from Swagger.""" 

241 

242 FINAL_STATE_VIA = "final-state-via" 

243 

244 

245class _FinalStateViaOption(str, Enum, metaclass=CaseInsensitiveEnumMeta): 

246 """Possible final-state-via options.""" 

247 

248 AZURE_ASYNC_OPERATION_FINAL_STATE = "azure-async-operation" 

249 LOCATION_FINAL_STATE = "location" 

250 OPERATION_LOCATION_FINAL_STATE = "operation-location" 

251 

252 

253class OperationResourcePolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): 

254 """Implements a operation resource polling, typically from Operation-Location. 

255 

256 :param str operation_location_header: Name of the header to return operation format (default 'operation-location') 

257 :keyword dict[str, any] lro_options: Additional options for LRO. For more information, see 

258 https://aka.ms/azsdk/autorest/openapi/lro-options 

259 """ 

260 

261 _async_url: str 

262 """Url to resource monitor (AzureAsyncOperation or Operation-Location)""" 

263 

264 _location_url: Optional[str] 

265 """Location header if present""" 

266 

267 _request: Any 

268 """The initial request done""" 

269 

270 def __init__( 

271 self, operation_location_header: str = "operation-location", *, lro_options: Optional[Dict[str, Any]] = None 

272 ): 

273 self._operation_location_header = operation_location_header 

274 self._location_url = None 

275 self._lro_options = lro_options or {} 

276 

277 def can_poll( 

278 self, 

279 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

280 ) -> bool: 

281 """Check if status monitor header (e.g. Operation-Location) is present. 

282 

283 :param pipeline_response: Initial REST call response. 

284 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

285 :return: True if this polling method could be used, False otherwise. 

286 :rtype: bool 

287 """ 

288 response = pipeline_response.http_response 

289 return self._operation_location_header in response.headers 

290 

291 def get_polling_url(self) -> str: 

292 """Return the polling URL. 

293 

294 Will extract it from the defined header to read (e.g. Operation-Location) 

295 

296 :return: The polling URL. 

297 :rtype: str 

298 """ 

299 return self._async_url 

300 

301 def get_final_get_url( 

302 self, 

303 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

304 ) -> Optional[str]: 

305 """If a final GET is needed, returns the URL. 

306 

307 :param pipeline_response: Success REST call response. 

308 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

309 :return: The URL to the final GET, or None if no final GET is needed. 

310 :rtype: str or None 

311 """ 

312 if ( 

313 self._lro_options.get(_LroOption.FINAL_STATE_VIA) == _FinalStateViaOption.LOCATION_FINAL_STATE 

314 and self._location_url 

315 ): 

316 return self._location_url 

317 if ( 

318 self._lro_options.get(_LroOption.FINAL_STATE_VIA) 

319 in [ 

320 _FinalStateViaOption.AZURE_ASYNC_OPERATION_FINAL_STATE, 

321 _FinalStateViaOption.OPERATION_LOCATION_FINAL_STATE, 

322 ] 

323 and self._request.method == "POST" 

324 ): 

325 return None 

326 response = pipeline_response.http_response 

327 if not _is_empty(response): 

328 body = _as_json(response) 

329 # https://github.com/microsoft/api-guidelines/blob/vNext/Guidelines.md#target-resource-location 

330 resource_location = body.get("resourceLocation") 

331 if resource_location: 

332 return resource_location 

333 

334 if self._request.method in {"PUT", "PATCH"}: 

335 return self._request.url 

336 

337 if self._request.method == "POST" and self._location_url: 

338 return self._location_url 

339 

340 return None 

341 

342 def set_initial_status( 

343 self, 

344 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

345 ) -> str: 

346 """Process first response after initiating long running operation. 

347 

348 :param pipeline_response: Initial REST call response. 

349 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

350 :return: The initial status. 

351 :rtype: str 

352 """ 

353 self._request = pipeline_response.http_response.request 

354 response = pipeline_response.http_response 

355 

356 self._set_async_url_if_present(response) 

357 

358 if response.status_code in {200, 201, 202, 204} and self._async_url: 

359 # Check if we can extract status from initial response, if present 

360 try: 

361 return self.get_status(pipeline_response) 

362 # Wide catch, it may not even be JSON at all, deserialization is lenient 

363 except Exception: # pylint: disable=broad-except 

364 pass 

365 return "InProgress" 

366 raise OperationFailed("Operation failed or canceled") 

367 

368 def _set_async_url_if_present(self, response: AllHttpResponseTypeVar) -> None: 

369 self._async_url = response.headers[self._operation_location_header] 

370 

371 location_url = response.headers.get("location") 

372 if location_url: 

373 self._location_url = location_url 

374 

375 def get_status( 

376 self, 

377 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

378 ) -> str: 

379 """Process the latest status update retrieved from an "Operation-Location" header. 

380 

381 :param pipeline_response: Initial REST call response. 

382 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

383 :return: The status string. 

384 :rtype: str 

385 :raises ~azure.core.polling.base_polling.BadResponse: if response has no body, or body does not contain status. 

386 """ 

387 response = pipeline_response.http_response 

388 if _is_empty(response): 

389 raise BadResponse("The response from long running operation does not contain a body.") 

390 

391 body = _as_json(response) 

392 status = body.get("status") 

393 if not status: 

394 raise BadResponse("No status found in body") 

395 return status 

396 

397 

398class LocationPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): 

399 """Implements a Location polling.""" 

400 

401 _location_url: str 

402 """Location header""" 

403 

404 def can_poll( 

405 self, 

406 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

407 ) -> bool: 

408 """True if contains a Location header 

409 

410 :param pipeline_response: Initial REST call response. 

411 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

412 :return: True if this polling method could be used, False otherwise. 

413 :rtype: bool 

414 """ 

415 response = pipeline_response.http_response 

416 return "location" in response.headers 

417 

418 def get_polling_url(self) -> str: 

419 """Return the Location header value. 

420 

421 :return: The polling URL. 

422 :rtype: str 

423 """ 

424 return self._location_url 

425 

426 def get_final_get_url( 

427 self, 

428 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

429 ) -> Optional[str]: 

430 """If a final GET is needed, returns the URL. 

431 

432 Always return None for a Location polling. 

433 

434 :param pipeline_response: Success REST call response. 

435 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

436 :return: Always None for this implementation. 

437 :rtype: None 

438 """ 

439 return None 

440 

441 def set_initial_status( 

442 self, 

443 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

444 ) -> str: 

445 """Process first response after initiating long running operation. 

446 

447 :param pipeline_response: Initial REST call response. 

448 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

449 :return: The initial status. 

450 :rtype: str 

451 """ 

452 response = pipeline_response.http_response 

453 

454 self._location_url = response.headers["location"] 

455 

456 if response.status_code in {200, 201, 202, 204} and self._location_url: 

457 return "InProgress" 

458 raise OperationFailed("Operation failed or canceled") 

459 

460 def get_status( 

461 self, 

462 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

463 ) -> str: 

464 """Return the status string extracted from this response. 

465 

466 For Location polling, it means the status monitor returns 202. 

467 

468 :param pipeline_response: Initial REST call response. 

469 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

470 :return: The status string. 

471 :rtype: str 

472 """ 

473 response = pipeline_response.http_response 

474 if "location" in response.headers: 

475 self._location_url = response.headers["location"] 

476 

477 return "InProgress" if response.status_code == 202 else "Succeeded" 

478 

479 

480class StatusCheckPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): 

481 """Should be the fallback polling, that don't poll but exit successfully 

482 if not other polling are detected and status code is 2xx. 

483 """ 

484 

485 def can_poll( 

486 self, 

487 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

488 ) -> bool: 

489 """Answer if this polling method could be used. 

490 

491 For this implementation, always True. 

492 

493 :param pipeline_response: Initial REST call response. 

494 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

495 :return: True if this polling method could be used, False otherwise. 

496 :rtype: bool 

497 """ 

498 return True 

499 

500 def get_polling_url(self) -> str: 

501 """Return the polling URL. 

502 

503 This is not implemented for this polling, since we're never supposed to loop. 

504 

505 :return: The polling URL. 

506 :rtype: str 

507 """ 

508 raise ValueError("This polling doesn't support polling url") 

509 

510 def set_initial_status( 

511 self, 

512 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

513 ) -> str: 

514 """Process first response after initiating long running operation. 

515 

516 Will succeed immediately. 

517 

518 :param pipeline_response: Initial REST call response. 

519 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

520 :return: The initial status. 

521 :rtype: str 

522 """ 

523 return "Succeeded" 

524 

525 def get_status( 

526 self, 

527 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

528 ) -> str: 

529 """Return the status string extracted from this response. 

530 

531 Only possible status is success. 

532 

533 :param pipeline_response: Initial REST call response. 

534 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

535 :return: The status string. 

536 :rtype: str 

537 """ 

538 return "Succeeded" 

539 

540 def get_final_get_url( 

541 self, 

542 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

543 ) -> Optional[str]: 

544 """If a final GET is needed, returns the URL. 

545 

546 :param pipeline_response: Success REST call response. 

547 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

548 :rtype: str 

549 :return: Always None for this implementation. 

550 """ 

551 return None 

552 

553 

554class _SansIOLROBasePolling( 

555 Generic[ 

556 PollingReturnType_co, 

557 PipelineClientType, 

558 HttpRequestTypeVar, 

559 AllHttpResponseTypeVar, 

560 ] 

561): # pylint: disable=too-many-instance-attributes 

562 """A base class that has no opinion on IO, to help mypy be accurate. 

563 

564 :param float timeout: Default polling internal in absence of Retry-After header, in seconds. 

565 :param list[LongRunningOperation] lro_algorithms: Ordered list of LRO algorithms to use. 

566 :param lro_options: Additional options for LRO. For more information, see the algorithm's docstring. 

567 :type lro_options: dict[str, any] 

568 :param path_format_arguments: A dictionary of the format arguments to be used to format the URL. 

569 :type path_format_arguments: dict[str, str] 

570 """ 

571 

572 _deserialization_callback: Callable[[Any], PollingReturnType_co] 

573 """The deserialization callback that returns the final instance.""" 

574 

575 _operation: LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar] 

576 """The algorithm this poller has decided to use. Will loop through 'can_poll' of the input algorithms to decide.""" 

577 

578 _status: str 

579 """Hold the current status of this poller""" 

580 

581 _client: PipelineClientType 

582 """The Azure Core Pipeline client used to make request.""" 

583 

584 def __init__( 

585 self, 

586 timeout: float = 30, 

587 lro_algorithms: Optional[Sequence[LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]]] = None, 

588 lro_options: Optional[Dict[str, Any]] = None, 

589 path_format_arguments: Optional[Dict[str, str]] = None, 

590 **operation_config: Any 

591 ): 

592 self._lro_algorithms = lro_algorithms or [ 

593 OperationResourcePolling(lro_options=lro_options), 

594 LocationPolling(), 

595 StatusCheckPolling(), 

596 ] 

597 

598 self._timeout = timeout 

599 self._operation_config = operation_config 

600 self._lro_options = lro_options 

601 self._path_format_arguments = path_format_arguments 

602 

603 def initialize( 

604 self, 

605 client: PipelineClientType, 

606 initial_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

607 deserialization_callback: Callable[ 

608 [PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]], 

609 PollingReturnType_co, 

610 ], 

611 ) -> None: 

612 """Set the initial status of this LRO. 

613 

614 :param client: The Azure Core Pipeline client used to make request. 

615 :type client: ~azure.core.pipeline.PipelineClient 

616 :param initial_response: The initial response for the call. 

617 :type initial_response: ~azure.core.pipeline.PipelineResponse 

618 :param deserialization_callback: A callback function to deserialize the final response. 

619 :type deserialization_callback: callable 

620 :raises ~azure.core.HttpResponseError: If initial status is incorrect LRO state 

621 """ 

622 self._client = client 

623 self._pipeline_response = ( # pylint: disable=attribute-defined-outside-init 

624 self._initial_response # pylint: disable=attribute-defined-outside-init 

625 ) = initial_response 

626 self._deserialization_callback = deserialization_callback 

627 

628 for operation in self._lro_algorithms: 

629 if operation.can_poll(initial_response): 

630 self._operation = operation 

631 break 

632 else: 

633 raise BadResponse("Unable to find status link for polling.") 

634 

635 try: 

636 _raise_if_bad_http_status_and_method(self._initial_response.http_response) 

637 self._status = self._operation.set_initial_status(initial_response) 

638 

639 except BadStatus as err: 

640 self._status = "Failed" 

641 raise HttpResponseError(response=initial_response.http_response, error=err) from err 

642 except BadResponse as err: 

643 self._status = "Failed" 

644 raise HttpResponseError(response=initial_response.http_response, message=str(err), error=err) from err 

645 except OperationFailed as err: 

646 raise HttpResponseError(response=initial_response.http_response, error=err) from err 

647 

648 def get_continuation_token(self) -> str: 

649 """Get a continuation token that can be used to recreate this poller. 

650 The continuation token is a base64 encoded string that contains the initial response 

651 serialized with pickle. 

652 

653 :rtype: str 

654 :return: The continuation token. 

655 :raises ValueError: If the initial response is not set. 

656 """ 

657 import pickle 

658 

659 return base64.b64encode(pickle.dumps(self._initial_response)).decode("ascii") 

660 

661 @classmethod 

662 def from_continuation_token( 

663 cls, continuation_token: str, **kwargs: Any 

664 ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]: 

665 """Recreate the poller from a continuation token. 

666 

667 :param continuation_token: The continuation token to recreate the poller. 

668 :type continuation_token: str 

669 :return: A tuple containing the client, the initial response, and the deserialization callback. 

670 :rtype: tuple[~azure.core.PipelineClient, ~azure.core.pipeline.PipelineResponse, callable] 

671 :raises ValueError: If the continuation token is invalid or if 'client' or 

672 'deserialization_callback' are not provided. 

673 """ 

674 try: 

675 client = kwargs["client"] 

676 except KeyError: 

677 raise ValueError("Need kwarg 'client' to be recreated from continuation_token") from None 

678 

679 try: 

680 deserialization_callback = kwargs["deserialization_callback"] 

681 except KeyError: 

682 raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None 

683 

684 import pickle 

685 

686 initial_response = pickle.loads(base64.b64decode(continuation_token)) # nosec 

687 # Restore the transport in the context 

688 initial_response.context.transport = client._pipeline._transport # pylint: disable=protected-access 

689 return client, initial_response, deserialization_callback 

690 

691 def status(self) -> str: 

692 """Return the current status as a string. 

693 

694 :rtype: str 

695 :return: The current status. 

696 """ 

697 if not self._operation: 

698 raise ValueError("set_initial_status was never called. Did you give this instance to a poller?") 

699 return self._status 

700 

701 def finished(self) -> bool: 

702 """Is this polling finished? 

703 

704 :rtype: bool 

705 :return: True if finished, False otherwise. 

706 """ 

707 return _finished(self.status()) 

708 

709 def resource(self) -> PollingReturnType_co: 

710 """Return the built resource. 

711 

712 :rtype: any 

713 :return: The built resource. 

714 """ 

715 return self._parse_resource(self._pipeline_response) 

716 

717 def _parse_resource( 

718 self, 

719 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

720 ) -> PollingReturnType_co: 

721 """Assuming this response is a resource, use the deserialization callback to parse it. 

722 If body is empty, assuming no resource to return. 

723 

724 :param pipeline_response: The response object. 

725 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

726 :return: The parsed resource. 

727 :rtype: any 

728 """ 

729 response = pipeline_response.http_response 

730 if not _is_empty(response): 

731 return self._deserialization_callback(pipeline_response) 

732 

733 # This "type ignore" has been discussed with architects. 

734 # We have a typing problem that if the Swagger/TSP describes a return type (PollingReturnType_co is not None), 

735 # BUT the returned payload is actually empty, we don't want to fail, but return None. 

736 # To be clean, we would have to make the polling return type Optional "just in case the Swagger/TSP is wrong". 

737 # This is reducing the quality and the value of the typing annotations 

738 # for a case that is not supposed to happen in the first place. So we decided to ignore the type error here. 

739 return None # type: ignore 

740 

741 def _get_request_id(self) -> str: 

742 return self._pipeline_response.http_response.request.headers["x-ms-client-request-id"] 

743 

744 def _extract_delay(self) -> float: 

745 delay = get_retry_after(self._pipeline_response) 

746 if delay: 

747 return delay 

748 return self._timeout 

749 

750 

751class LROBasePolling( 

752 _SansIOLROBasePolling[ 

753 PollingReturnType_co, 

754 PipelineClient[HttpRequestTypeVar, HttpResponseTypeVar], 

755 HttpRequestTypeVar, 

756 HttpResponseTypeVar, 

757 ], 

758 PollingMethod[PollingReturnType_co], 

759): 

760 """A base LRO poller. 

761 

762 This assumes a basic flow: 

763 - I analyze the response to decide the polling approach 

764 - I poll 

765 - I ask the final resource depending of the polling approach 

766 

767 If your polling need are more specific, you could implement a PollingMethod directly 

768 """ 

769 

770 _initial_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar] 

771 """Store the initial response.""" 

772 

773 _pipeline_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar] 

774 """Store the latest received HTTP response, initialized by the first answer.""" 

775 

776 @property 

777 def _transport(self) -> HttpTransport[HttpRequestTypeVar, HttpResponseTypeVar]: 

778 return self._client._pipeline._transport # pylint: disable=protected-access 

779 

780 def __getattribute__(self, name: str) -> Any: 

781 """Find the right method for the job. 

782 

783 This contains a workaround for azure-mgmt-core 1.0.0 to 1.4.0, where the MRO 

784 is changing when azure-core was refactored in 1.27.0. The MRO change was causing 

785 AsyncARMPolling to look-up the wrong methods and find the non-async ones. 

786 

787 :param str name: The name of the attribute to retrieve. 

788 :rtype: Any 

789 :return: The attribute value. 

790 """ 

791 cls = object.__getattribute__(self, "__class__") 

792 if cls.__name__ == "AsyncARMPolling" and name in [ 

793 "run", 

794 "update_status", 

795 "request_status", 

796 "_sleep", 

797 "_delay", 

798 "_poll", 

799 ]: 

800 return getattr(super(LROBasePolling, self), name) 

801 return super().__getattribute__(name) 

802 

803 def run(self) -> None: 

804 try: 

805 self._poll() 

806 

807 except BadStatus as err: 

808 self._status = "Failed" 

809 raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err 

810 

811 except BadResponse as err: 

812 self._status = "Failed" 

813 raise HttpResponseError( 

814 response=self._pipeline_response.http_response, 

815 message=str(err), 

816 error=err, 

817 ) from err 

818 

819 except OperationFailed as err: 

820 raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err 

821 

822 def _poll(self) -> None: 

823 """Poll status of operation so long as operation is incomplete and 

824 we have an endpoint to query. 

825 

826 :raises ~azure.core.polling.base_polling.OperationFailed: If operation status 'Failed' or 'Canceled'. 

827 :raises ~azure.core.polling.base_polling.BadStatus: If response status invalid. 

828 :raises ~azure.core.polling.base_polling.BadResponse: If response invalid. 

829 """ 

830 if not self.finished(): 

831 self.update_status() 

832 while not self.finished(): 

833 self._delay() 

834 self.update_status() 

835 

836 if _failed(self.status()): 

837 raise OperationFailed("Operation failed or canceled") 

838 

839 final_get_url = self._operation.get_final_get_url(self._pipeline_response) 

840 if final_get_url: 

841 self._pipeline_response = self.request_status(final_get_url) 

842 _raise_if_bad_http_status_and_method(self._pipeline_response.http_response) 

843 

844 def _sleep(self, delay: float) -> None: 

845 self._transport.sleep(delay) 

846 

847 def _delay(self) -> None: 

848 """Check for a 'retry-after' header to set timeout, 

849 otherwise use configured timeout. 

850 """ 

851 delay = self._extract_delay() 

852 self._sleep(delay) 

853 

854 def update_status(self) -> None: 

855 """Update the current status of the LRO.""" 

856 self._pipeline_response = self.request_status(self._operation.get_polling_url()) 

857 _raise_if_bad_http_status_and_method(self._pipeline_response.http_response) 

858 self._status = self._operation.get_status(self._pipeline_response) 

859 

860 def request_status(self, status_link: str) -> PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]: 

861 """Do a simple GET to this status link. 

862 

863 This method re-inject 'x-ms-client-request-id'. 

864 

865 :param str status_link: The URL to poll. 

866 :rtype: azure.core.pipeline.PipelineResponse 

867 :return: The response of the status request. 

868 """ 

869 if self._path_format_arguments: 

870 status_link = self._client.format_url(status_link, **self._path_format_arguments) 

871 # Re-inject 'x-ms-client-request-id' while polling 

872 if "request_id" not in self._operation_config: 

873 self._operation_config["request_id"] = self._get_request_id() 

874 

875 if is_rest(self._initial_response.http_response): 

876 rest_request = cast(HttpRequestTypeVar, HttpRequest("GET", status_link)) 

877 # Need a cast, as "_return_pipeline_response" mutate the return type, and that return type is not 

878 # declared in the typing of "send_request" 

879 return cast( 

880 PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar], 

881 self._client.send_request(rest_request, _return_pipeline_response=True, **self._operation_config), 

882 ) 

883 

884 # Legacy HttpRequest and HttpResponse from azure.core.pipeline.transport 

885 # casting things here, as we don't want the typing system to know 

886 # about the legacy APIs. 

887 request = cast(HttpRequestTypeVar, self._client.get(status_link)) 

888 return cast( 

889 PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar], 

890 self._client._pipeline.run( # pylint: disable=protected-access 

891 request, stream=False, **self._operation_config 

892 ), 

893 ) 

894 

895 

896__all__ = [ 

897 "BadResponse", 

898 "BadStatus", 

899 "OperationFailed", 

900 "LongRunningOperation", 

901 "OperationResourcePolling", 

902 "LocationPolling", 

903 "StatusCheckPolling", 

904 "LROBasePolling", 

905]