1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26import abc
27import base64
28import json
29from enum import Enum
30from typing import (
31 Optional,
32 Any,
33 Tuple,
34 Callable,
35 Dict,
36 Sequence,
37 Generic,
38 TypeVar,
39 cast,
40 Union,
41)
42
43from ..exceptions import HttpResponseError, DecodeError
44from . import PollingMethod
45from ..pipeline.policies._utils import get_retry_after
46from ..pipeline._tools import is_rest
47from .._enum_meta import CaseInsensitiveEnumMeta
48from .. import PipelineClient
49from ..pipeline import PipelineResponse
50from ..pipeline.transport import (
51 HttpTransport,
52 HttpRequest as LegacyHttpRequest,
53 HttpResponse as LegacyHttpResponse,
54 AsyncHttpResponse as LegacyAsyncHttpResponse,
55)
56from ..rest import HttpRequest, HttpResponse, AsyncHttpResponse
57
58
59HttpRequestType = Union[LegacyHttpRequest, HttpRequest]
60HttpResponseType = Union[LegacyHttpResponse, HttpResponse] # Sync only
61AllHttpResponseType = Union[
62 LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse
63] # Sync or async
64LegacyPipelineResponseType = PipelineResponse[LegacyHttpRequest, LegacyHttpResponse]
65NewPipelineResponseType = PipelineResponse[HttpRequest, HttpResponse]
66PipelineResponseType = PipelineResponse[HttpRequestType, HttpResponseType]
67HttpRequestTypeVar = TypeVar("HttpRequestTypeVar", bound=HttpRequestType)
68HttpResponseTypeVar = TypeVar("HttpResponseTypeVar", bound=HttpResponseType) # Sync only
69AllHttpResponseTypeVar = TypeVar("AllHttpResponseTypeVar", bound=AllHttpResponseType) # Sync or async
70
71ABC = abc.ABC
72PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True)
73PipelineClientType = TypeVar("PipelineClientType")
74HTTPResponseType_co = TypeVar("HTTPResponseType_co", covariant=True)
75HTTPRequestType_co = TypeVar("HTTPRequestType_co", covariant=True)
76
77
78_FINISHED = frozenset(["succeeded", "canceled", "failed"])
79_FAILED = frozenset(["canceled", "failed"])
80_SUCCEEDED = frozenset(["succeeded"])
81
82
83def _get_content(response: AllHttpResponseType) -> bytes:
84 """Get the content of this response. This is designed specifically to avoid
85 a warning of mypy for body() access, as this method is deprecated.
86
87 :param response: The response object.
88 :type response: any
89 :return: The content of this response.
90 :rtype: bytes
91 """
92 if isinstance(response, (LegacyHttpResponse, LegacyAsyncHttpResponse)):
93 return response.body()
94 return response.content
95
96
97def _finished(status):
98 if hasattr(status, "value"):
99 status = status.value
100 return str(status).lower() in _FINISHED
101
102
103def _failed(status):
104 if hasattr(status, "value"):
105 status = status.value
106 return str(status).lower() in _FAILED
107
108
109def _succeeded(status):
110 if hasattr(status, "value"):
111 status = status.value
112 return str(status).lower() in _SUCCEEDED
113
114
115class BadStatus(Exception):
116 """Exception raised when status is invalid."""
117
118
119class BadResponse(Exception):
120 """Exception raised when response is invalid."""
121
122
123class OperationFailed(Exception):
124 """Exception raised when operation failed or canceled."""
125
126
127def _as_json(response: AllHttpResponseType) -> Dict[str, Any]:
128 """Assuming this is not empty, return the content as JSON.
129
130 Result/exceptions is not determined if you call this method without testing _is_empty.
131
132 :param response: The response object.
133 :type response: any
134 :return: The content of this response as dict.
135 :rtype: dict
136 :raises DecodeError: If response body contains invalid json data.
137 """
138 try:
139 return json.loads(response.text())
140 except ValueError as err:
141 raise DecodeError("Error occurred in deserializing the response body.") from err
142
143
144def _raise_if_bad_http_status_and_method(response: AllHttpResponseType) -> None:
145 """Check response status code is valid.
146
147 Must be 200, 201, 202, or 204.
148
149 :param response: The response object.
150 :type response: any
151 :raises ~azure.core.polling.base_polling.BadStatus: If invalid status.
152 """
153 code = response.status_code
154 if code in {200, 201, 202, 204}:
155 return
156 raise BadStatus("Invalid return status {!r} for {!r} operation".format(code, response.request.method))
157
158
159def _is_empty(response: AllHttpResponseType) -> bool:
160 """Check if response body contains meaningful content.
161
162 :param response: The response object.
163 :type response: any
164 :return: True if response body is empty, False otherwise.
165 :rtype: bool
166 """
167 return not bool(_get_content(response))
168
169
170class LongRunningOperation(ABC, Generic[HTTPRequestType_co, HTTPResponseType_co]):
171 """Protocol to implement for a long running operation algorithm."""
172
173 @abc.abstractmethod
174 def can_poll(
175 self,
176 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
177 ) -> bool:
178 """Answer if this polling method could be used.
179
180 :param pipeline_response: Initial REST call response.
181 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
182 :return: True if this polling method could be used, False otherwise.
183 :rtype: bool
184 """
185 raise NotImplementedError()
186
187 @abc.abstractmethod
188 def get_polling_url(self) -> str:
189 """Return the polling URL.
190
191 :return: The polling URL.
192 :rtype: str
193 """
194 raise NotImplementedError()
195
196 @abc.abstractmethod
197 def set_initial_status(
198 self,
199 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
200 ) -> str:
201 """Process first response after initiating long running operation.
202
203 :param pipeline_response: Initial REST call response.
204 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
205 :return: The initial status.
206 :rtype: str
207 """
208 raise NotImplementedError()
209
210 @abc.abstractmethod
211 def get_status(
212 self,
213 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
214 ) -> str:
215 """Return the status string extracted from this response.
216
217 :param pipeline_response: The response object.
218 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
219 :return: The status string.
220 :rtype: str
221 """
222 raise NotImplementedError()
223
224 @abc.abstractmethod
225 def get_final_get_url(
226 self,
227 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
228 ) -> Optional[str]:
229 """If a final GET is needed, returns the URL.
230
231 :param pipeline_response: Success REST call response.
232 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
233 :return: The URL to the final GET, or None if no final GET is needed.
234 :rtype: str or None
235 """
236 raise NotImplementedError()
237
238
239class _LroOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
240 """Known LRO options from Swagger."""
241
242 FINAL_STATE_VIA = "final-state-via"
243
244
245class _FinalStateViaOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
246 """Possible final-state-via options."""
247
248 AZURE_ASYNC_OPERATION_FINAL_STATE = "azure-async-operation"
249 LOCATION_FINAL_STATE = "location"
250 OPERATION_LOCATION_FINAL_STATE = "operation-location"
251
252
253class OperationResourcePolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
254 """Implements a operation resource polling, typically from Operation-Location.
255
256 :param str operation_location_header: Name of the header to return operation format (default 'operation-location')
257 :keyword dict[str, any] lro_options: Additional options for LRO. For more information, see
258 https://aka.ms/azsdk/autorest/openapi/lro-options
259 """
260
261 _async_url: str
262 """Url to resource monitor (AzureAsyncOperation or Operation-Location)"""
263
264 _location_url: Optional[str]
265 """Location header if present"""
266
267 _request: Any
268 """The initial request done"""
269
270 def __init__(
271 self, operation_location_header: str = "operation-location", *, lro_options: Optional[Dict[str, Any]] = None
272 ):
273 self._operation_location_header = operation_location_header
274 self._location_url = None
275 self._lro_options = lro_options or {}
276
277 def can_poll(
278 self,
279 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
280 ) -> bool:
281 """Check if status monitor header (e.g. Operation-Location) is present.
282
283 :param pipeline_response: Initial REST call response.
284 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
285 :return: True if this polling method could be used, False otherwise.
286 :rtype: bool
287 """
288 response = pipeline_response.http_response
289 return self._operation_location_header in response.headers
290
291 def get_polling_url(self) -> str:
292 """Return the polling URL.
293
294 Will extract it from the defined header to read (e.g. Operation-Location)
295
296 :return: The polling URL.
297 :rtype: str
298 """
299 return self._async_url
300
301 def get_final_get_url(
302 self,
303 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
304 ) -> Optional[str]:
305 """If a final GET is needed, returns the URL.
306
307 :param pipeline_response: Success REST call response.
308 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
309 :return: The URL to the final GET, or None if no final GET is needed.
310 :rtype: str or None
311 """
312 if (
313 self._lro_options.get(_LroOption.FINAL_STATE_VIA) == _FinalStateViaOption.LOCATION_FINAL_STATE
314 and self._location_url
315 ):
316 return self._location_url
317 if (
318 self._lro_options.get(_LroOption.FINAL_STATE_VIA)
319 in [
320 _FinalStateViaOption.AZURE_ASYNC_OPERATION_FINAL_STATE,
321 _FinalStateViaOption.OPERATION_LOCATION_FINAL_STATE,
322 ]
323 and self._request.method == "POST"
324 ):
325 return None
326 response = pipeline_response.http_response
327 if not _is_empty(response):
328 body = _as_json(response)
329 # https://github.com/microsoft/api-guidelines/blob/vNext/Guidelines.md#target-resource-location
330 resource_location = body.get("resourceLocation")
331 if resource_location:
332 return resource_location
333
334 if self._request.method in {"PUT", "PATCH"}:
335 return self._request.url
336
337 if self._request.method == "POST" and self._location_url:
338 return self._location_url
339
340 return None
341
342 def set_initial_status(
343 self,
344 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
345 ) -> str:
346 """Process first response after initiating long running operation.
347
348 :param pipeline_response: Initial REST call response.
349 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
350 :return: The initial status.
351 :rtype: str
352 """
353 self._request = pipeline_response.http_response.request
354 response = pipeline_response.http_response
355
356 self._set_async_url_if_present(response)
357
358 if response.status_code in {200, 201, 202, 204} and self._async_url:
359 # Check if we can extract status from initial response, if present
360 try:
361 return self.get_status(pipeline_response)
362 # Wide catch, it may not even be JSON at all, deserialization is lenient
363 except Exception: # pylint: disable=broad-except
364 pass
365 return "InProgress"
366 raise OperationFailed("Operation failed or canceled")
367
368 def _set_async_url_if_present(self, response: AllHttpResponseTypeVar) -> None:
369 self._async_url = response.headers[self._operation_location_header]
370
371 location_url = response.headers.get("location")
372 if location_url:
373 self._location_url = location_url
374
375 def get_status(
376 self,
377 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
378 ) -> str:
379 """Process the latest status update retrieved from an "Operation-Location" header.
380
381 :param pipeline_response: Initial REST call response.
382 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
383 :return: The status string.
384 :rtype: str
385 :raises ~azure.core.polling.base_polling.BadResponse: if response has no body, or body does not contain status.
386 """
387 response = pipeline_response.http_response
388 if _is_empty(response):
389 raise BadResponse("The response from long running operation does not contain a body.")
390
391 body = _as_json(response)
392 status = body.get("status")
393 if not status:
394 raise BadResponse("No status found in body")
395 return status
396
397
398class LocationPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
399 """Implements a Location polling."""
400
401 _location_url: str
402 """Location header"""
403
404 def can_poll(
405 self,
406 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
407 ) -> bool:
408 """True if contains a Location header
409
410 :param pipeline_response: Initial REST call response.
411 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
412 :return: True if this polling method could be used, False otherwise.
413 :rtype: bool
414 """
415 response = pipeline_response.http_response
416 return "location" in response.headers
417
418 def get_polling_url(self) -> str:
419 """Return the Location header value.
420
421 :return: The polling URL.
422 :rtype: str
423 """
424 return self._location_url
425
426 def get_final_get_url(
427 self,
428 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
429 ) -> Optional[str]:
430 """If a final GET is needed, returns the URL.
431
432 Always return None for a Location polling.
433
434 :param pipeline_response: Success REST call response.
435 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
436 :return: Always None for this implementation.
437 :rtype: None
438 """
439 return None
440
441 def set_initial_status(
442 self,
443 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
444 ) -> str:
445 """Process first response after initiating long running operation.
446
447 :param pipeline_response: Initial REST call response.
448 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
449 :return: The initial status.
450 :rtype: str
451 """
452 response = pipeline_response.http_response
453
454 self._location_url = response.headers["location"]
455
456 if response.status_code in {200, 201, 202, 204} and self._location_url:
457 return "InProgress"
458 raise OperationFailed("Operation failed or canceled")
459
460 def get_status(
461 self,
462 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
463 ) -> str:
464 """Return the status string extracted from this response.
465
466 For Location polling, it means the status monitor returns 202.
467
468 :param pipeline_response: Initial REST call response.
469 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
470 :return: The status string.
471 :rtype: str
472 """
473 response = pipeline_response.http_response
474 if "location" in response.headers:
475 self._location_url = response.headers["location"]
476
477 return "InProgress" if response.status_code == 202 else "Succeeded"
478
479
480class StatusCheckPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
481 """Should be the fallback polling, that don't poll but exit successfully
482 if not other polling are detected and status code is 2xx.
483 """
484
485 def can_poll(
486 self,
487 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
488 ) -> bool:
489 """Answer if this polling method could be used.
490
491 For this implementation, always True.
492
493 :param pipeline_response: Initial REST call response.
494 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
495 :return: True if this polling method could be used, False otherwise.
496 :rtype: bool
497 """
498 return True
499
500 def get_polling_url(self) -> str:
501 """Return the polling URL.
502
503 This is not implemented for this polling, since we're never supposed to loop.
504
505 :return: The polling URL.
506 :rtype: str
507 """
508 raise ValueError("This polling doesn't support polling url")
509
510 def set_initial_status(
511 self,
512 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
513 ) -> str:
514 """Process first response after initiating long running operation.
515
516 Will succeed immediately.
517
518 :param pipeline_response: Initial REST call response.
519 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
520 :return: The initial status.
521 :rtype: str
522 """
523 return "Succeeded"
524
525 def get_status(
526 self,
527 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
528 ) -> str:
529 """Return the status string extracted from this response.
530
531 Only possible status is success.
532
533 :param pipeline_response: Initial REST call response.
534 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
535 :return: The status string.
536 :rtype: str
537 """
538 return "Succeeded"
539
540 def get_final_get_url(
541 self,
542 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
543 ) -> Optional[str]:
544 """If a final GET is needed, returns the URL.
545
546 :param pipeline_response: Success REST call response.
547 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
548 :rtype: str
549 :return: Always None for this implementation.
550 """
551 return None
552
553
554class _SansIOLROBasePolling(
555 Generic[
556 PollingReturnType_co,
557 PipelineClientType,
558 HttpRequestTypeVar,
559 AllHttpResponseTypeVar,
560 ]
561): # pylint: disable=too-many-instance-attributes
562 """A base class that has no opinion on IO, to help mypy be accurate.
563
564 :param float timeout: Default polling internal in absence of Retry-After header, in seconds.
565 :param list[LongRunningOperation] lro_algorithms: Ordered list of LRO algorithms to use.
566 :param lro_options: Additional options for LRO. For more information, see the algorithm's docstring.
567 :type lro_options: dict[str, any]
568 :param path_format_arguments: A dictionary of the format arguments to be used to format the URL.
569 :type path_format_arguments: dict[str, str]
570 """
571
572 _deserialization_callback: Callable[[Any], PollingReturnType_co]
573 """The deserialization callback that returns the final instance."""
574
575 _operation: LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]
576 """The algorithm this poller has decided to use. Will loop through 'can_poll' of the input algorithms to decide."""
577
578 _status: str
579 """Hold the current status of this poller"""
580
581 _client: PipelineClientType
582 """The Azure Core Pipeline client used to make request."""
583
584 def __init__(
585 self,
586 timeout: float = 30,
587 lro_algorithms: Optional[Sequence[LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]]] = None,
588 lro_options: Optional[Dict[str, Any]] = None,
589 path_format_arguments: Optional[Dict[str, str]] = None,
590 **operation_config: Any
591 ):
592 self._lro_algorithms = lro_algorithms or [
593 OperationResourcePolling(lro_options=lro_options),
594 LocationPolling(),
595 StatusCheckPolling(),
596 ]
597
598 self._timeout = timeout
599 self._operation_config = operation_config
600 self._lro_options = lro_options
601 self._path_format_arguments = path_format_arguments
602
603 def initialize(
604 self,
605 client: PipelineClientType,
606 initial_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
607 deserialization_callback: Callable[
608 [PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]],
609 PollingReturnType_co,
610 ],
611 ) -> None:
612 """Set the initial status of this LRO.
613
614 :param client: The Azure Core Pipeline client used to make request.
615 :type client: ~azure.core.pipeline.PipelineClient
616 :param initial_response: The initial response for the call.
617 :type initial_response: ~azure.core.pipeline.PipelineResponse
618 :param deserialization_callback: A callback function to deserialize the final response.
619 :type deserialization_callback: callable
620 :raises ~azure.core.HttpResponseError: If initial status is incorrect LRO state
621 """
622 self._client = client
623 self._pipeline_response = ( # pylint: disable=attribute-defined-outside-init
624 self._initial_response # pylint: disable=attribute-defined-outside-init
625 ) = initial_response
626 self._deserialization_callback = deserialization_callback
627
628 for operation in self._lro_algorithms:
629 if operation.can_poll(initial_response):
630 self._operation = operation
631 break
632 else:
633 raise BadResponse("Unable to find status link for polling.")
634
635 try:
636 _raise_if_bad_http_status_and_method(self._initial_response.http_response)
637 self._status = self._operation.set_initial_status(initial_response)
638
639 except BadStatus as err:
640 self._status = "Failed"
641 raise HttpResponseError(response=initial_response.http_response, error=err) from err
642 except BadResponse as err:
643 self._status = "Failed"
644 raise HttpResponseError(response=initial_response.http_response, message=str(err), error=err) from err
645 except OperationFailed as err:
646 raise HttpResponseError(response=initial_response.http_response, error=err) from err
647
648 def get_continuation_token(self) -> str:
649 """Get a continuation token that can be used to recreate this poller.
650 The continuation token is a base64 encoded string that contains the initial response
651 serialized with pickle.
652
653 :rtype: str
654 :return: The continuation token.
655 :raises ValueError: If the initial response is not set.
656 """
657 import pickle
658
659 return base64.b64encode(pickle.dumps(self._initial_response)).decode("ascii")
660
661 @classmethod
662 def from_continuation_token(
663 cls, continuation_token: str, **kwargs: Any
664 ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]:
665 """Recreate the poller from a continuation token.
666
667 :param continuation_token: The continuation token to recreate the poller.
668 :type continuation_token: str
669 :return: A tuple containing the client, the initial response, and the deserialization callback.
670 :rtype: tuple[~azure.core.PipelineClient, ~azure.core.pipeline.PipelineResponse, callable]
671 :raises ValueError: If the continuation token is invalid or if 'client' or
672 'deserialization_callback' are not provided.
673 """
674 try:
675 client = kwargs["client"]
676 except KeyError:
677 raise ValueError("Need kwarg 'client' to be recreated from continuation_token") from None
678
679 try:
680 deserialization_callback = kwargs["deserialization_callback"]
681 except KeyError:
682 raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None
683
684 import pickle
685
686 initial_response = pickle.loads(base64.b64decode(continuation_token)) # nosec
687 # Restore the transport in the context
688 initial_response.context.transport = client._pipeline._transport # pylint: disable=protected-access
689 return client, initial_response, deserialization_callback
690
691 def status(self) -> str:
692 """Return the current status as a string.
693
694 :rtype: str
695 :return: The current status.
696 """
697 if not self._operation:
698 raise ValueError("set_initial_status was never called. Did you give this instance to a poller?")
699 return self._status
700
701 def finished(self) -> bool:
702 """Is this polling finished?
703
704 :rtype: bool
705 :return: True if finished, False otherwise.
706 """
707 return _finished(self.status())
708
709 def resource(self) -> PollingReturnType_co:
710 """Return the built resource.
711
712 :rtype: any
713 :return: The built resource.
714 """
715 return self._parse_resource(self._pipeline_response)
716
717 def _parse_resource(
718 self,
719 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
720 ) -> PollingReturnType_co:
721 """Assuming this response is a resource, use the deserialization callback to parse it.
722 If body is empty, assuming no resource to return.
723
724 :param pipeline_response: The response object.
725 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
726 :return: The parsed resource.
727 :rtype: any
728 """
729 response = pipeline_response.http_response
730 if not _is_empty(response):
731 return self._deserialization_callback(pipeline_response)
732
733 # This "type ignore" has been discussed with architects.
734 # We have a typing problem that if the Swagger/TSP describes a return type (PollingReturnType_co is not None),
735 # BUT the returned payload is actually empty, we don't want to fail, but return None.
736 # To be clean, we would have to make the polling return type Optional "just in case the Swagger/TSP is wrong".
737 # This is reducing the quality and the value of the typing annotations
738 # for a case that is not supposed to happen in the first place. So we decided to ignore the type error here.
739 return None # type: ignore
740
741 def _get_request_id(self) -> str:
742 return self._pipeline_response.http_response.request.headers["x-ms-client-request-id"]
743
744 def _extract_delay(self) -> float:
745 delay = get_retry_after(self._pipeline_response)
746 if delay:
747 return delay
748 return self._timeout
749
750
751class LROBasePolling(
752 _SansIOLROBasePolling[
753 PollingReturnType_co,
754 PipelineClient[HttpRequestTypeVar, HttpResponseTypeVar],
755 HttpRequestTypeVar,
756 HttpResponseTypeVar,
757 ],
758 PollingMethod[PollingReturnType_co],
759):
760 """A base LRO poller.
761
762 This assumes a basic flow:
763 - I analyze the response to decide the polling approach
764 - I poll
765 - I ask the final resource depending of the polling approach
766
767 If your polling need are more specific, you could implement a PollingMethod directly
768 """
769
770 _initial_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]
771 """Store the initial response."""
772
773 _pipeline_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]
774 """Store the latest received HTTP response, initialized by the first answer."""
775
776 @property
777 def _transport(self) -> HttpTransport[HttpRequestTypeVar, HttpResponseTypeVar]:
778 return self._client._pipeline._transport # pylint: disable=protected-access
779
780 def __getattribute__(self, name: str) -> Any:
781 """Find the right method for the job.
782
783 This contains a workaround for azure-mgmt-core 1.0.0 to 1.4.0, where the MRO
784 is changing when azure-core was refactored in 1.27.0. The MRO change was causing
785 AsyncARMPolling to look-up the wrong methods and find the non-async ones.
786
787 :param str name: The name of the attribute to retrieve.
788 :rtype: Any
789 :return: The attribute value.
790 """
791 cls = object.__getattribute__(self, "__class__")
792 if cls.__name__ == "AsyncARMPolling" and name in [
793 "run",
794 "update_status",
795 "request_status",
796 "_sleep",
797 "_delay",
798 "_poll",
799 ]:
800 return getattr(super(LROBasePolling, self), name)
801 return super().__getattribute__(name)
802
803 def run(self) -> None:
804 try:
805 self._poll()
806
807 except BadStatus as err:
808 self._status = "Failed"
809 raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err
810
811 except BadResponse as err:
812 self._status = "Failed"
813 raise HttpResponseError(
814 response=self._pipeline_response.http_response,
815 message=str(err),
816 error=err,
817 ) from err
818
819 except OperationFailed as err:
820 raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err
821
822 def _poll(self) -> None:
823 """Poll status of operation so long as operation is incomplete and
824 we have an endpoint to query.
825
826 :raises ~azure.core.polling.base_polling.OperationFailed: If operation status 'Failed' or 'Canceled'.
827 :raises ~azure.core.polling.base_polling.BadStatus: If response status invalid.
828 :raises ~azure.core.polling.base_polling.BadResponse: If response invalid.
829 """
830 if not self.finished():
831 self.update_status()
832 while not self.finished():
833 self._delay()
834 self.update_status()
835
836 if _failed(self.status()):
837 raise OperationFailed("Operation failed or canceled")
838
839 final_get_url = self._operation.get_final_get_url(self._pipeline_response)
840 if final_get_url:
841 self._pipeline_response = self.request_status(final_get_url)
842 _raise_if_bad_http_status_and_method(self._pipeline_response.http_response)
843
844 def _sleep(self, delay: float) -> None:
845 self._transport.sleep(delay)
846
847 def _delay(self) -> None:
848 """Check for a 'retry-after' header to set timeout,
849 otherwise use configured timeout.
850 """
851 delay = self._extract_delay()
852 self._sleep(delay)
853
854 def update_status(self) -> None:
855 """Update the current status of the LRO."""
856 self._pipeline_response = self.request_status(self._operation.get_polling_url())
857 _raise_if_bad_http_status_and_method(self._pipeline_response.http_response)
858 self._status = self._operation.get_status(self._pipeline_response)
859
860 def request_status(self, status_link: str) -> PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]:
861 """Do a simple GET to this status link.
862
863 This method re-inject 'x-ms-client-request-id'.
864
865 :param str status_link: The URL to poll.
866 :rtype: azure.core.pipeline.PipelineResponse
867 :return: The response of the status request.
868 """
869 if self._path_format_arguments:
870 status_link = self._client.format_url(status_link, **self._path_format_arguments)
871 # Re-inject 'x-ms-client-request-id' while polling
872 if "request_id" not in self._operation_config:
873 self._operation_config["request_id"] = self._get_request_id()
874
875 if is_rest(self._initial_response.http_response):
876 rest_request = cast(HttpRequestTypeVar, HttpRequest("GET", status_link))
877 # Need a cast, as "_return_pipeline_response" mutate the return type, and that return type is not
878 # declared in the typing of "send_request"
879 return cast(
880 PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar],
881 self._client.send_request(rest_request, _return_pipeline_response=True, **self._operation_config),
882 )
883
884 # Legacy HttpRequest and HttpResponse from azure.core.pipeline.transport
885 # casting things here, as we don't want the typing system to know
886 # about the legacy APIs.
887 request = cast(HttpRequestTypeVar, self._client.get(status_link))
888 return cast(
889 PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar],
890 self._client._pipeline.run( # pylint: disable=protected-access
891 request, stream=False, **self._operation_config
892 ),
893 )
894
895
896__all__ = [
897 "BadResponse",
898 "BadStatus",
899 "OperationFailed",
900 "LongRunningOperation",
901 "OperationResourcePolling",
902 "LocationPolling",
903 "StatusCheckPolling",
904 "LROBasePolling",
905]