Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/polling/base_polling.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

294 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26import abc 

27import base64 

28import json 

29from enum import Enum 

30from typing import ( 

31 Optional, 

32 Any, 

33 Tuple, 

34 Callable, 

35 Dict, 

36 Sequence, 

37 Generic, 

38 TypeVar, 

39 cast, 

40 Union, 

41) 

42 

43from ..exceptions import HttpResponseError, DecodeError 

44from . import PollingMethod 

45from ..pipeline.policies._utils import get_retry_after 

46from ..pipeline._tools import is_rest 

47from .._enum_meta import CaseInsensitiveEnumMeta 

48from .. import PipelineClient 

49from ..pipeline import PipelineResponse 

50from ..pipeline.transport import ( 

51 HttpTransport, 

52 HttpRequest as LegacyHttpRequest, 

53 HttpResponse as LegacyHttpResponse, 

54 AsyncHttpResponse as LegacyAsyncHttpResponse, 

55) 

56from ..rest import HttpRequest, HttpResponse, AsyncHttpResponse 

57 

58 

59HttpRequestType = Union[LegacyHttpRequest, HttpRequest] 

60HttpResponseType = Union[LegacyHttpResponse, HttpResponse] # Sync only 

61AllHttpResponseType = Union[ 

62 LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse 

63] # Sync or async 

64LegacyPipelineResponseType = PipelineResponse[LegacyHttpRequest, LegacyHttpResponse] 

65NewPipelineResponseType = PipelineResponse[HttpRequest, HttpResponse] 

66PipelineResponseType = PipelineResponse[HttpRequestType, HttpResponseType] 

67HttpRequestTypeVar = TypeVar("HttpRequestTypeVar", bound=HttpRequestType) 

68HttpResponseTypeVar = TypeVar("HttpResponseTypeVar", bound=HttpResponseType) # Sync only 

69AllHttpResponseTypeVar = TypeVar("AllHttpResponseTypeVar", bound=AllHttpResponseType) # Sync or async 

70 

71ABC = abc.ABC 

72PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True) 

73PipelineClientType = TypeVar("PipelineClientType") 

74HTTPResponseType_co = TypeVar("HTTPResponseType_co", covariant=True) 

75HTTPRequestType_co = TypeVar("HTTPRequestType_co", covariant=True) 

76 

77 

78_FINISHED = frozenset(["succeeded", "canceled", "failed"]) 

79_FAILED = frozenset(["canceled", "failed"]) 

80_SUCCEEDED = frozenset(["succeeded"]) 

81 

82 

83def _get_content(response: AllHttpResponseType) -> bytes: 

84 """Get the content of this response. This is designed specifically to avoid 

85 a warning of mypy for body() access, as this method is deprecated. 

86 

87 :param response: The response object. 

88 :type response: any 

89 :return: The content of this response. 

90 :rtype: bytes 

91 """ 

92 if isinstance(response, (LegacyHttpResponse, LegacyAsyncHttpResponse)): 

93 return response.body() 

94 return response.content 

95 

96 

97def _finished(status): 

98 if hasattr(status, "value"): 

99 status = status.value 

100 return str(status).lower() in _FINISHED 

101 

102 

103def _failed(status): 

104 if hasattr(status, "value"): 

105 status = status.value 

106 return str(status).lower() in _FAILED 

107 

108 

109def _succeeded(status): 

110 if hasattr(status, "value"): 

111 status = status.value 

112 return str(status).lower() in _SUCCEEDED 

113 

114 

115class BadStatus(Exception): 

116 pass 

117 

118 

119class BadResponse(Exception): 

120 pass 

121 

122 

123class OperationFailed(Exception): 

124 pass 

125 

126 

127def _as_json(response: AllHttpResponseType) -> Dict[str, Any]: 

128 """Assuming this is not empty, return the content as JSON. 

129 

130 Result/exceptions is not determined if you call this method without testing _is_empty. 

131 

132 :param response: The response object. 

133 :type response: any 

134 :return: The content of this response as dict. 

135 :rtype: dict 

136 :raises: DecodeError if response body contains invalid json data. 

137 """ 

138 try: 

139 return json.loads(response.text()) 

140 except ValueError as err: 

141 raise DecodeError("Error occurred in deserializing the response body.") from err 

142 

143 

144def _raise_if_bad_http_status_and_method(response: AllHttpResponseType) -> None: 

145 """Check response status code is valid. 

146 

147 Must be 200, 201, 202, or 204. 

148 

149 :param response: The response object. 

150 :type response: any 

151 :raises: BadStatus if invalid status. 

152 """ 

153 code = response.status_code 

154 if code in {200, 201, 202, 204}: 

155 return 

156 raise BadStatus("Invalid return status {!r} for {!r} operation".format(code, response.request.method)) 

157 

158 

159def _is_empty(response: AllHttpResponseType) -> bool: 

160 """Check if response body contains meaningful content. 

161 

162 :param response: The response object. 

163 :type response: any 

164 :return: True if response body is empty, False otherwise. 

165 :rtype: bool 

166 """ 

167 return not bool(_get_content(response)) 

168 

169 

170class LongRunningOperation(ABC, Generic[HTTPRequestType_co, HTTPResponseType_co]): 

171 """Protocol to implement for a long running operation algorithm.""" 

172 

173 @abc.abstractmethod 

174 def can_poll( 

175 self, 

176 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], 

177 ) -> bool: 

178 """Answer if this polling method could be used. 

179 

180 :param pipeline_response: Initial REST call response. 

181 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

182 :return: True if this polling method could be used, False otherwise. 

183 :rtype: bool 

184 """ 

185 raise NotImplementedError() 

186 

187 @abc.abstractmethod 

188 def get_polling_url(self) -> str: 

189 """Return the polling URL. 

190 

191 :return: The polling URL. 

192 :rtype: str 

193 """ 

194 raise NotImplementedError() 

195 

196 @abc.abstractmethod 

197 def set_initial_status( 

198 self, 

199 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], 

200 ) -> str: 

201 """Process first response after initiating long running operation. 

202 

203 :param pipeline_response: Initial REST call response. 

204 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

205 :return: The initial status. 

206 :rtype: str 

207 """ 

208 raise NotImplementedError() 

209 

210 @abc.abstractmethod 

211 def get_status( 

212 self, 

213 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], 

214 ) -> str: 

215 """Return the status string extracted from this response. 

216 

217 :param pipeline_response: The response object. 

218 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

219 :return: The status string. 

220 :rtype: str 

221 """ 

222 raise NotImplementedError() 

223 

224 @abc.abstractmethod 

225 def get_final_get_url( 

226 self, 

227 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co], 

228 ) -> Optional[str]: 

229 """If a final GET is needed, returns the URL. 

230 

231 :param pipeline_response: Success REST call response. 

232 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

233 :return: The URL to the final GET, or None if no final GET is needed. 

234 :rtype: str or None 

235 """ 

236 raise NotImplementedError() 

237 

238 

239class _LroOption(str, Enum, metaclass=CaseInsensitiveEnumMeta): 

240 """Known LRO options from Swagger.""" 

241 

242 FINAL_STATE_VIA = "final-state-via" 

243 

244 

245class _FinalStateViaOption(str, Enum, metaclass=CaseInsensitiveEnumMeta): 

246 """Possible final-state-via options.""" 

247 

248 AZURE_ASYNC_OPERATION_FINAL_STATE = "azure-async-operation" 

249 LOCATION_FINAL_STATE = "location" 

250 OPERATION_LOCATION_FINAL_STATE = "operation-location" 

251 

252 

253class OperationResourcePolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): 

254 """Implements a operation resource polling, typically from Operation-Location. 

255 

256 :param str operation_location_header: Name of the header to return operation format (default 'operation-location') 

257 :keyword dict[str, any] lro_options: Additional options for LRO. For more information, see 

258 https://aka.ms/azsdk/autorest/openapi/lro-options 

259 """ 

260 

261 _async_url: str 

262 """Url to resource monitor (AzureAsyncOperation or Operation-Location)""" 

263 

264 _location_url: Optional[str] 

265 """Location header if present""" 

266 

267 _request: Any 

268 """The initial request done""" 

269 

270 def __init__( 

271 self, operation_location_header: str = "operation-location", *, lro_options: Optional[Dict[str, Any]] = None 

272 ): 

273 self._operation_location_header = operation_location_header 

274 self._location_url = None 

275 self._lro_options = lro_options or {} 

276 

277 def can_poll(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> bool: 

278 """Check if status monitor header (e.g. Operation-Location) is present. 

279 

280 :param pipeline_response: Initial REST call response. 

281 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

282 :return: True if this polling method could be used, False otherwise. 

283 :rtype: bool 

284 """ 

285 response = pipeline_response.http_response 

286 return self._operation_location_header in response.headers 

287 

288 def get_polling_url(self) -> str: 

289 """Return the polling URL. 

290 

291 Will extract it from the defined header to read (e.g. Operation-Location) 

292 

293 :return: The polling URL. 

294 :rtype: str 

295 """ 

296 return self._async_url 

297 

298 def get_final_get_url( 

299 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar] 

300 ) -> Optional[str]: 

301 """If a final GET is needed, returns the URL. 

302 

303 :param pipeline_response: Success REST call response. 

304 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

305 :return: The URL to the final GET, or None if no final GET is needed. 

306 :rtype: str or None 

307 """ 

308 if ( 

309 self._lro_options.get(_LroOption.FINAL_STATE_VIA) == _FinalStateViaOption.LOCATION_FINAL_STATE 

310 and self._location_url 

311 ): 

312 return self._location_url 

313 if ( 

314 self._lro_options.get(_LroOption.FINAL_STATE_VIA) 

315 in [ 

316 _FinalStateViaOption.AZURE_ASYNC_OPERATION_FINAL_STATE, 

317 _FinalStateViaOption.OPERATION_LOCATION_FINAL_STATE, 

318 ] 

319 and self._request.method == "POST" 

320 ): 

321 return None 

322 response = pipeline_response.http_response 

323 if not _is_empty(response): 

324 body = _as_json(response) 

325 # https://github.com/microsoft/api-guidelines/blob/vNext/Guidelines.md#target-resource-location 

326 resource_location = body.get("resourceLocation") 

327 if resource_location: 

328 return resource_location 

329 

330 if self._request.method in {"PUT", "PATCH"}: 

331 return self._request.url 

332 

333 if self._request.method == "POST" and self._location_url: 

334 return self._location_url 

335 

336 return None 

337 

338 def set_initial_status( 

339 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar] 

340 ) -> str: 

341 """Process first response after initiating long running operation. 

342 

343 :param pipeline_response: Initial REST call response. 

344 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

345 :return: The initial status. 

346 :rtype: str 

347 """ 

348 self._request = pipeline_response.http_response.request 

349 response = pipeline_response.http_response 

350 

351 self._set_async_url_if_present(response) 

352 

353 if response.status_code in {200, 201, 202, 204} and self._async_url: 

354 # Check if we can extract status from initial response, if present 

355 try: 

356 return self.get_status(pipeline_response) 

357 # Wide catch, it may not even be JSON at all, deserialization is lenient 

358 except Exception: # pylint: disable=broad-except 

359 pass 

360 return "InProgress" 

361 raise OperationFailed("Operation failed or canceled") 

362 

363 def _set_async_url_if_present(self, response: AllHttpResponseTypeVar) -> None: 

364 self._async_url = response.headers[self._operation_location_header] 

365 

366 location_url = response.headers.get("location") 

367 if location_url: 

368 self._location_url = location_url 

369 

370 def get_status(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> str: 

371 """Process the latest status update retrieved from an "Operation-Location" header. 

372 

373 :param pipeline_response: Initial REST call response. 

374 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

375 :return: The status string. 

376 :rtype: str 

377 :raises: BadResponse if response has no body, or body does not contain status. 

378 """ 

379 response = pipeline_response.http_response 

380 if _is_empty(response): 

381 raise BadResponse("The response from long running operation does not contain a body.") 

382 

383 body = _as_json(response) 

384 status = body.get("status") 

385 if not status: 

386 raise BadResponse("No status found in body") 

387 return status 

388 

389 

390class LocationPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): 

391 """Implements a Location polling.""" 

392 

393 _location_url: str 

394 """Location header""" 

395 

396 def can_poll(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> bool: 

397 """True if contains a Location header 

398 

399 :param pipeline_response: Initial REST call response. 

400 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

401 :return: True if this polling method could be used, False otherwise. 

402 :rtype: bool 

403 """ 

404 response = pipeline_response.http_response 

405 return "location" in response.headers 

406 

407 def get_polling_url(self) -> str: 

408 """Return the Location header value. 

409 

410 :return: The polling URL. 

411 :rtype: str 

412 """ 

413 return self._location_url 

414 

415 def get_final_get_url( 

416 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar] 

417 ) -> Optional[str]: 

418 """If a final GET is needed, returns the URL. 

419 

420 Always return None for a Location polling. 

421 

422 :param pipeline_response: Success REST call response. 

423 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

424 :return: Always None for this implementation. 

425 :rtype: None 

426 """ 

427 return None 

428 

429 def set_initial_status( 

430 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar] 

431 ) -> str: 

432 """Process first response after initiating long running operation. 

433 

434 :param pipeline_response: Initial REST call response. 

435 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

436 :return: The initial status. 

437 :rtype: str 

438 """ 

439 response = pipeline_response.http_response 

440 

441 self._location_url = response.headers["location"] 

442 

443 if response.status_code in {200, 201, 202, 204} and self._location_url: 

444 return "InProgress" 

445 raise OperationFailed("Operation failed or canceled") 

446 

447 def get_status(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> str: 

448 """Return the status string extracted from this response. 

449 

450 For Location polling, it means the status monitor returns 202. 

451 

452 :param pipeline_response: Initial REST call response. 

453 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

454 :return: The status string. 

455 :rtype: str 

456 """ 

457 response = pipeline_response.http_response 

458 if "location" in response.headers: 

459 self._location_url = response.headers["location"] 

460 

461 return "InProgress" if response.status_code == 202 else "Succeeded" 

462 

463 

464class StatusCheckPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]): 

465 """Should be the fallback polling, that don't poll but exit successfully 

466 if not other polling are detected and status code is 2xx. 

467 """ 

468 

469 def can_poll(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> bool: 

470 """Answer if this polling method could be used. 

471 

472 For this implementation, always True. 

473 

474 :param pipeline_response: Initial REST call response. 

475 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

476 :return: True if this polling method could be used, False otherwise. 

477 :rtype: bool 

478 """ 

479 return True 

480 

481 def get_polling_url(self) -> str: 

482 """Return the polling URL. 

483 

484 This is not implemented for this polling, since we're never supposed to loop. 

485 

486 :return: The polling URL. 

487 :rtype: str 

488 """ 

489 raise ValueError("This polling doesn't support polling url") 

490 

491 def set_initial_status( 

492 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar] 

493 ) -> str: 

494 """Process first response after initiating long running operation. 

495 

496 Will succeed immediately. 

497 

498 :param pipeline_response: Initial REST call response. 

499 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

500 :return: The initial status. 

501 :rtype: str 

502 """ 

503 return "Succeeded" 

504 

505 def get_status(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> str: 

506 """Return the status string extracted from this response. 

507 

508 Only possible status is success. 

509 

510 :param pipeline_response: Initial REST call response. 

511 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

512 :return: The status string. 

513 :rtype: str 

514 """ 

515 return "Succeeded" 

516 

517 def get_final_get_url( 

518 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar] 

519 ) -> Optional[str]: 

520 """If a final GET is needed, returns the URL. 

521 

522 :param pipeline_response: Success REST call response. 

523 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

524 :rtype: str 

525 :return: Always None for this implementation. 

526 """ 

527 return None 

528 

529 

530class _SansIOLROBasePolling( 

531 Generic[PollingReturnType_co, PipelineClientType, HttpRequestTypeVar, AllHttpResponseTypeVar] 

532): # pylint: disable=too-many-instance-attributes 

533 """A base class that has no opinion on IO, to help mypy be accurate. 

534 

535 :param float timeout: Default polling internal in absence of Retry-After header, in seconds. 

536 :param list[LongRunningOperation] lro_algorithms: Ordered list of LRO algorithms to use. 

537 :param lro_options: Additional options for LRO. For more information, see the algorithm's docstring. 

538 :type lro_options: dict[str, any] 

539 :param path_format_arguments: A dictionary of the format arguments to be used to format the URL. 

540 :type path_format_arguments: dict[str, str] 

541 """ 

542 

543 _deserialization_callback: Callable[[Any], PollingReturnType_co] 

544 """The deserialization callback that returns the final instance.""" 

545 

546 _operation: LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar] 

547 """The algorithm this poller has decided to use. Will loop through 'can_poll' of the input algorithms to decide.""" 

548 

549 _status: str 

550 """Hold the current status of this poller""" 

551 

552 _client: PipelineClientType 

553 """The Azure Core Pipeline client used to make request.""" 

554 

555 def __init__( 

556 self, 

557 timeout: float = 30, 

558 lro_algorithms: Optional[Sequence[LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]]] = None, 

559 lro_options: Optional[Dict[str, Any]] = None, 

560 path_format_arguments: Optional[Dict[str, str]] = None, 

561 **operation_config: Any 

562 ): 

563 self._lro_algorithms = lro_algorithms or [ 

564 OperationResourcePolling(lro_options=lro_options), 

565 LocationPolling(), 

566 StatusCheckPolling(), 

567 ] 

568 

569 self._timeout = timeout 

570 self._operation_config = operation_config 

571 self._lro_options = lro_options 

572 self._path_format_arguments = path_format_arguments 

573 

574 def initialize( 

575 self, 

576 client: PipelineClientType, 

577 initial_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar], 

578 deserialization_callback: Callable[ 

579 [PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]], PollingReturnType_co 

580 ], 

581 ) -> None: 

582 """Set the initial status of this LRO. 

583 

584 :param client: The Azure Core Pipeline client used to make request. 

585 :type client: ~azure.core.pipeline.PipelineClient 

586 :param initial_response: The initial response for the call. 

587 :type initial_response: ~azure.core.pipeline.PipelineResponse 

588 :param deserialization_callback: A callback function to deserialize the final response. 

589 :type deserialization_callback: callable 

590 :raises: HttpResponseError if initial status is incorrect LRO state 

591 """ 

592 self._client = client 

593 self._pipeline_response = ( # pylint: disable=attribute-defined-outside-init 

594 self._initial_response # pylint: disable=attribute-defined-outside-init 

595 ) = initial_response 

596 self._deserialization_callback = deserialization_callback 

597 

598 for operation in self._lro_algorithms: 

599 if operation.can_poll(initial_response): 

600 self._operation = operation 

601 break 

602 else: 

603 raise BadResponse("Unable to find status link for polling.") 

604 

605 try: 

606 _raise_if_bad_http_status_and_method(self._initial_response.http_response) 

607 self._status = self._operation.set_initial_status(initial_response) 

608 

609 except BadStatus as err: 

610 self._status = "Failed" 

611 raise HttpResponseError(response=initial_response.http_response, error=err) from err 

612 except BadResponse as err: 

613 self._status = "Failed" 

614 raise HttpResponseError(response=initial_response.http_response, message=str(err), error=err) from err 

615 except OperationFailed as err: 

616 raise HttpResponseError(response=initial_response.http_response, error=err) from err 

617 

618 def get_continuation_token(self) -> str: 

619 import pickle 

620 

621 return base64.b64encode(pickle.dumps(self._initial_response)).decode("ascii") 

622 

623 @classmethod 

624 def from_continuation_token( 

625 cls, continuation_token: str, **kwargs: Any 

626 ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]: 

627 try: 

628 client = kwargs["client"] 

629 except KeyError: 

630 raise ValueError("Need kwarg 'client' to be recreated from continuation_token") from None 

631 

632 try: 

633 deserialization_callback = kwargs["deserialization_callback"] 

634 except KeyError: 

635 raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None 

636 

637 import pickle 

638 

639 initial_response = pickle.loads(base64.b64decode(continuation_token)) # nosec 

640 # Restore the transport in the context 

641 initial_response.context.transport = client._pipeline._transport # pylint: disable=protected-access 

642 return client, initial_response, deserialization_callback 

643 

644 def status(self) -> str: 

645 """Return the current status as a string. 

646 

647 :rtype: str 

648 :return: The current status. 

649 """ 

650 if not self._operation: 

651 raise ValueError("set_initial_status was never called. Did you give this instance to a poller?") 

652 return self._status 

653 

654 def finished(self) -> bool: 

655 """Is this polling finished? 

656 

657 :rtype: bool 

658 :return: True if finished, False otherwise. 

659 """ 

660 return _finished(self.status()) 

661 

662 def resource(self) -> PollingReturnType_co: 

663 """Return the built resource. 

664 

665 :rtype: any 

666 :return: The built resource. 

667 """ 

668 return self._parse_resource(self._pipeline_response) 

669 

670 def _parse_resource( 

671 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar] 

672 ) -> PollingReturnType_co: 

673 """Assuming this response is a resource, use the deserialization callback to parse it. 

674 If body is empty, assuming no resource to return. 

675 

676 :param pipeline_response: The response object. 

677 :type pipeline_response: ~azure.core.pipeline.PipelineResponse 

678 :return: The parsed resource. 

679 :rtype: any 

680 """ 

681 response = pipeline_response.http_response 

682 if not _is_empty(response): 

683 return self._deserialization_callback(pipeline_response) 

684 

685 # This "type ignore" has been discussed with architects. 

686 # We have a typing problem that if the Swagger/TSP describes a return type (PollingReturnType_co is not None), 

687 # BUT the returned payload is actually empty, we don't want to fail, but return None. 

688 # To be clean, we would have to make the polling return type Optional "just in case the Swagger/TSP is wrong". 

689 # This is reducing the quality and the value of the typing annotations 

690 # for a case that is not supposed to happen in the first place. So we decided to ignore the type error here. 

691 return None # type: ignore 

692 

693 def _get_request_id(self) -> str: 

694 return self._pipeline_response.http_response.request.headers["x-ms-client-request-id"] 

695 

696 def _extract_delay(self) -> float: 

697 delay = get_retry_after(self._pipeline_response) 

698 if delay: 

699 return delay 

700 return self._timeout 

701 

702 

703class LROBasePolling( 

704 _SansIOLROBasePolling[ 

705 PollingReturnType_co, 

706 PipelineClient[HttpRequestTypeVar, HttpResponseTypeVar], 

707 HttpRequestTypeVar, 

708 HttpResponseTypeVar, 

709 ], 

710 PollingMethod[PollingReturnType_co], 

711): # pylint: disable=too-many-instance-attributes 

712 """A base LRO poller. 

713 

714 This assumes a basic flow: 

715 - I analyze the response to decide the polling approach 

716 - I poll 

717 - I ask the final resource depending of the polling approach 

718 

719 If your polling need are more specific, you could implement a PollingMethod directly 

720 """ 

721 

722 _initial_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar] 

723 """Store the initial response.""" 

724 

725 _pipeline_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar] 

726 """Store the latest received HTTP response, initialized by the first answer.""" 

727 

728 @property 

729 def _transport(self) -> HttpTransport[HttpRequestTypeVar, HttpResponseTypeVar]: 

730 return self._client._pipeline._transport # pylint: disable=protected-access 

731 

732 def __getattribute__(self, name: str) -> Any: 

733 """Find the right method for the job. 

734 

735 This contains a workaround for azure-mgmt-core 1.0.0 to 1.4.0, where the MRO 

736 is changing when azure-core was refactored in 1.27.0. The MRO change was causing 

737 AsyncARMPolling to look-up the wrong methods and find the non-async ones. 

738 

739 :param str name: The name of the attribute to retrieve. 

740 :rtype: Any 

741 :return: The attribute value. 

742 """ 

743 cls = object.__getattribute__(self, "__class__") 

744 if cls.__name__ == "AsyncARMPolling" and name in [ 

745 "run", 

746 "update_status", 

747 "request_status", 

748 "_sleep", 

749 "_delay", 

750 "_poll", 

751 ]: 

752 return getattr(super(LROBasePolling, self), name) 

753 return super().__getattribute__(name) 

754 

755 def run(self) -> None: 

756 try: 

757 self._poll() 

758 

759 except BadStatus as err: 

760 self._status = "Failed" 

761 raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err 

762 

763 except BadResponse as err: 

764 self._status = "Failed" 

765 raise HttpResponseError( 

766 response=self._pipeline_response.http_response, 

767 message=str(err), 

768 error=err, 

769 ) from err 

770 

771 except OperationFailed as err: 

772 raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err 

773 

774 def _poll(self) -> None: 

775 """Poll status of operation so long as operation is incomplete and 

776 we have an endpoint to query. 

777 

778 :raises: OperationFailed if operation status 'Failed' or 'Canceled'. 

779 :raises: BadStatus if response status invalid. 

780 :raises: BadResponse if response invalid. 

781 """ 

782 if not self.finished(): 

783 self.update_status() 

784 while not self.finished(): 

785 self._delay() 

786 self.update_status() 

787 

788 if _failed(self.status()): 

789 raise OperationFailed("Operation failed or canceled") 

790 

791 final_get_url = self._operation.get_final_get_url(self._pipeline_response) 

792 if final_get_url: 

793 self._pipeline_response = self.request_status(final_get_url) 

794 _raise_if_bad_http_status_and_method(self._pipeline_response.http_response) 

795 

796 def _sleep(self, delay: float) -> None: 

797 self._transport.sleep(delay) 

798 

799 def _delay(self) -> None: 

800 """Check for a 'retry-after' header to set timeout, 

801 otherwise use configured timeout. 

802 """ 

803 delay = self._extract_delay() 

804 self._sleep(delay) 

805 

806 def update_status(self) -> None: 

807 """Update the current status of the LRO.""" 

808 self._pipeline_response = self.request_status(self._operation.get_polling_url()) 

809 _raise_if_bad_http_status_and_method(self._pipeline_response.http_response) 

810 self._status = self._operation.get_status(self._pipeline_response) 

811 

812 def request_status(self, status_link: str) -> PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]: 

813 """Do a simple GET to this status link. 

814 

815 This method re-inject 'x-ms-client-request-id'. 

816 

817 :param str status_link: The URL to poll. 

818 :rtype: azure.core.pipeline.PipelineResponse 

819 :return: The response of the status request. 

820 """ 

821 if self._path_format_arguments: 

822 status_link = self._client.format_url(status_link, **self._path_format_arguments) 

823 # Re-inject 'x-ms-client-request-id' while polling 

824 if "request_id" not in self._operation_config: 

825 self._operation_config["request_id"] = self._get_request_id() 

826 

827 if is_rest(self._initial_response.http_response): 

828 rest_request = cast(HttpRequestTypeVar, HttpRequest("GET", status_link)) 

829 # Need a cast, as "_return_pipeline_response" mutate the return type, and that return type is not 

830 # declared in the typing of "send_request" 

831 return cast( 

832 PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar], 

833 self._client.send_request(rest_request, _return_pipeline_response=True, **self._operation_config), 

834 ) 

835 

836 # Legacy HttpRequest and HttpResponse from azure.core.pipeline.transport 

837 # casting things here, as we don't want the typing system to know 

838 # about the legacy APIs. 

839 request = cast(HttpRequestTypeVar, self._client.get(status_link)) 

840 return cast( 

841 PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar], 

842 self._client._pipeline.run( # pylint: disable=protected-access 

843 request, stream=False, **self._operation_config 

844 ), 

845 ) 

846 

847 

848__all__ = [ 

849 "BadResponse", 

850 "BadStatus", 

851 "OperationFailed", 

852 "LongRunningOperation", 

853 "OperationResourcePolling", 

854 "LocationPolling", 

855 "StatusCheckPolling", 

856 "LROBasePolling", 

857]