1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26import abc
27import base64
28import json
29from enum import Enum
30from typing import (
31 Optional,
32 Any,
33 Tuple,
34 Callable,
35 Dict,
36 Sequence,
37 Generic,
38 TypeVar,
39 cast,
40 Union,
41)
42
43from ..exceptions import HttpResponseError, DecodeError
44from . import PollingMethod
45from ..pipeline.policies._utils import get_retry_after
46from ..pipeline._tools import is_rest
47from .._enum_meta import CaseInsensitiveEnumMeta
48from .. import PipelineClient
49from ..pipeline import PipelineResponse
50from ..pipeline.transport import (
51 HttpTransport,
52 HttpRequest as LegacyHttpRequest,
53 HttpResponse as LegacyHttpResponse,
54 AsyncHttpResponse as LegacyAsyncHttpResponse,
55)
56from ..rest import HttpRequest, HttpResponse, AsyncHttpResponse
57
58
59HttpRequestType = Union[LegacyHttpRequest, HttpRequest]
60HttpResponseType = Union[LegacyHttpResponse, HttpResponse] # Sync only
61AllHttpResponseType = Union[
62 LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse
63] # Sync or async
64LegacyPipelineResponseType = PipelineResponse[LegacyHttpRequest, LegacyHttpResponse]
65NewPipelineResponseType = PipelineResponse[HttpRequest, HttpResponse]
66PipelineResponseType = PipelineResponse[HttpRequestType, HttpResponseType]
67HttpRequestTypeVar = TypeVar("HttpRequestTypeVar", bound=HttpRequestType)
68HttpResponseTypeVar = TypeVar("HttpResponseTypeVar", bound=HttpResponseType) # Sync only
69AllHttpResponseTypeVar = TypeVar("AllHttpResponseTypeVar", bound=AllHttpResponseType) # Sync or async
70
71ABC = abc.ABC
72PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True)
73PipelineClientType = TypeVar("PipelineClientType")
74HTTPResponseType_co = TypeVar("HTTPResponseType_co", covariant=True)
75HTTPRequestType_co = TypeVar("HTTPRequestType_co", covariant=True)
76
77
78_FINISHED = frozenset(["succeeded", "canceled", "failed"])
79_FAILED = frozenset(["canceled", "failed"])
80_SUCCEEDED = frozenset(["succeeded"])
81
82
83def _get_content(response: AllHttpResponseType) -> bytes:
84 """Get the content of this response. This is designed specifically to avoid
85 a warning of mypy for body() access, as this method is deprecated.
86
87 :param response: The response object.
88 :type response: any
89 :return: The content of this response.
90 :rtype: bytes
91 """
92 if isinstance(response, (LegacyHttpResponse, LegacyAsyncHttpResponse)):
93 return response.body()
94 return response.content
95
96
97def _finished(status):
98 if hasattr(status, "value"):
99 status = status.value
100 return str(status).lower() in _FINISHED
101
102
103def _failed(status):
104 if hasattr(status, "value"):
105 status = status.value
106 return str(status).lower() in _FAILED
107
108
109def _succeeded(status):
110 if hasattr(status, "value"):
111 status = status.value
112 return str(status).lower() in _SUCCEEDED
113
114
115class BadStatus(Exception):
116 pass
117
118
119class BadResponse(Exception):
120 pass
121
122
123class OperationFailed(Exception):
124 pass
125
126
127def _as_json(response: AllHttpResponseType) -> Dict[str, Any]:
128 """Assuming this is not empty, return the content as JSON.
129
130 Result/exceptions is not determined if you call this method without testing _is_empty.
131
132 :param response: The response object.
133 :type response: any
134 :return: The content of this response as dict.
135 :rtype: dict
136 :raises: DecodeError if response body contains invalid json data.
137 """
138 try:
139 return json.loads(response.text())
140 except ValueError as err:
141 raise DecodeError("Error occurred in deserializing the response body.") from err
142
143
144def _raise_if_bad_http_status_and_method(response: AllHttpResponseType) -> None:
145 """Check response status code is valid.
146
147 Must be 200, 201, 202, or 204.
148
149 :param response: The response object.
150 :type response: any
151 :raises: BadStatus if invalid status.
152 """
153 code = response.status_code
154 if code in {200, 201, 202, 204}:
155 return
156 raise BadStatus("Invalid return status {!r} for {!r} operation".format(code, response.request.method))
157
158
159def _is_empty(response: AllHttpResponseType) -> bool:
160 """Check if response body contains meaningful content.
161
162 :param response: The response object.
163 :type response: any
164 :return: True if response body is empty, False otherwise.
165 :rtype: bool
166 """
167 return not bool(_get_content(response))
168
169
170class LongRunningOperation(ABC, Generic[HTTPRequestType_co, HTTPResponseType_co]):
171 """Protocol to implement for a long running operation algorithm."""
172
173 @abc.abstractmethod
174 def can_poll(
175 self,
176 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
177 ) -> bool:
178 """Answer if this polling method could be used.
179
180 :param pipeline_response: Initial REST call response.
181 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
182 :return: True if this polling method could be used, False otherwise.
183 :rtype: bool
184 """
185 raise NotImplementedError()
186
187 @abc.abstractmethod
188 def get_polling_url(self) -> str:
189 """Return the polling URL.
190
191 :return: The polling URL.
192 :rtype: str
193 """
194 raise NotImplementedError()
195
196 @abc.abstractmethod
197 def set_initial_status(
198 self,
199 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
200 ) -> str:
201 """Process first response after initiating long running operation.
202
203 :param pipeline_response: Initial REST call response.
204 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
205 :return: The initial status.
206 :rtype: str
207 """
208 raise NotImplementedError()
209
210 @abc.abstractmethod
211 def get_status(
212 self,
213 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
214 ) -> str:
215 """Return the status string extracted from this response.
216
217 :param pipeline_response: The response object.
218 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
219 :return: The status string.
220 :rtype: str
221 """
222 raise NotImplementedError()
223
224 @abc.abstractmethod
225 def get_final_get_url(
226 self,
227 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
228 ) -> Optional[str]:
229 """If a final GET is needed, returns the URL.
230
231 :param pipeline_response: Success REST call response.
232 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
233 :return: The URL to the final GET, or None if no final GET is needed.
234 :rtype: str or None
235 """
236 raise NotImplementedError()
237
238
239class _LroOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
240 """Known LRO options from Swagger."""
241
242 FINAL_STATE_VIA = "final-state-via"
243
244
245class _FinalStateViaOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
246 """Possible final-state-via options."""
247
248 AZURE_ASYNC_OPERATION_FINAL_STATE = "azure-async-operation"
249 LOCATION_FINAL_STATE = "location"
250 OPERATION_LOCATION_FINAL_STATE = "operation-location"
251
252
253class OperationResourcePolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
254 """Implements a operation resource polling, typically from Operation-Location.
255
256 :param str operation_location_header: Name of the header to return operation format (default 'operation-location')
257 :keyword dict[str, any] lro_options: Additional options for LRO. For more information, see
258 https://aka.ms/azsdk/autorest/openapi/lro-options
259 """
260
261 _async_url: str
262 """Url to resource monitor (AzureAsyncOperation or Operation-Location)"""
263
264 _location_url: Optional[str]
265 """Location header if present"""
266
267 _request: Any
268 """The initial request done"""
269
270 def __init__(
271 self, operation_location_header: str = "operation-location", *, lro_options: Optional[Dict[str, Any]] = None
272 ):
273 self._operation_location_header = operation_location_header
274 self._location_url = None
275 self._lro_options = lro_options or {}
276
277 def can_poll(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> bool:
278 """Check if status monitor header (e.g. Operation-Location) is present.
279
280 :param pipeline_response: Initial REST call response.
281 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
282 :return: True if this polling method could be used, False otherwise.
283 :rtype: bool
284 """
285 response = pipeline_response.http_response
286 return self._operation_location_header in response.headers
287
288 def get_polling_url(self) -> str:
289 """Return the polling URL.
290
291 Will extract it from the defined header to read (e.g. Operation-Location)
292
293 :return: The polling URL.
294 :rtype: str
295 """
296 return self._async_url
297
298 def get_final_get_url(
299 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]
300 ) -> Optional[str]:
301 """If a final GET is needed, returns the URL.
302
303 :param pipeline_response: Success REST call response.
304 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
305 :return: The URL to the final GET, or None if no final GET is needed.
306 :rtype: str or None
307 """
308 if (
309 self._lro_options.get(_LroOption.FINAL_STATE_VIA) == _FinalStateViaOption.LOCATION_FINAL_STATE
310 and self._location_url
311 ):
312 return self._location_url
313 if (
314 self._lro_options.get(_LroOption.FINAL_STATE_VIA)
315 in [
316 _FinalStateViaOption.AZURE_ASYNC_OPERATION_FINAL_STATE,
317 _FinalStateViaOption.OPERATION_LOCATION_FINAL_STATE,
318 ]
319 and self._request.method == "POST"
320 ):
321 return None
322 response = pipeline_response.http_response
323 if not _is_empty(response):
324 body = _as_json(response)
325 # https://github.com/microsoft/api-guidelines/blob/vNext/Guidelines.md#target-resource-location
326 resource_location = body.get("resourceLocation")
327 if resource_location:
328 return resource_location
329
330 if self._request.method in {"PUT", "PATCH"}:
331 return self._request.url
332
333 if self._request.method == "POST" and self._location_url:
334 return self._location_url
335
336 return None
337
338 def set_initial_status(
339 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]
340 ) -> str:
341 """Process first response after initiating long running operation.
342
343 :param pipeline_response: Initial REST call response.
344 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
345 :return: The initial status.
346 :rtype: str
347 """
348 self._request = pipeline_response.http_response.request
349 response = pipeline_response.http_response
350
351 self._set_async_url_if_present(response)
352
353 if response.status_code in {200, 201, 202, 204} and self._async_url:
354 # Check if we can extract status from initial response, if present
355 try:
356 return self.get_status(pipeline_response)
357 # Wide catch, it may not even be JSON at all, deserialization is lenient
358 except Exception: # pylint: disable=broad-except
359 pass
360 return "InProgress"
361 raise OperationFailed("Operation failed or canceled")
362
363 def _set_async_url_if_present(self, response: AllHttpResponseTypeVar) -> None:
364 self._async_url = response.headers[self._operation_location_header]
365
366 location_url = response.headers.get("location")
367 if location_url:
368 self._location_url = location_url
369
370 def get_status(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> str:
371 """Process the latest status update retrieved from an "Operation-Location" header.
372
373 :param pipeline_response: Initial REST call response.
374 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
375 :return: The status string.
376 :rtype: str
377 :raises: BadResponse if response has no body, or body does not contain status.
378 """
379 response = pipeline_response.http_response
380 if _is_empty(response):
381 raise BadResponse("The response from long running operation does not contain a body.")
382
383 body = _as_json(response)
384 status = body.get("status")
385 if not status:
386 raise BadResponse("No status found in body")
387 return status
388
389
390class LocationPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
391 """Implements a Location polling."""
392
393 _location_url: str
394 """Location header"""
395
396 def can_poll(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> bool:
397 """True if contains a Location header
398
399 :param pipeline_response: Initial REST call response.
400 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
401 :return: True if this polling method could be used, False otherwise.
402 :rtype: bool
403 """
404 response = pipeline_response.http_response
405 return "location" in response.headers
406
407 def get_polling_url(self) -> str:
408 """Return the Location header value.
409
410 :return: The polling URL.
411 :rtype: str
412 """
413 return self._location_url
414
415 def get_final_get_url(
416 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]
417 ) -> Optional[str]:
418 """If a final GET is needed, returns the URL.
419
420 Always return None for a Location polling.
421
422 :param pipeline_response: Success REST call response.
423 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
424 :return: Always None for this implementation.
425 :rtype: None
426 """
427 return None
428
429 def set_initial_status(
430 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]
431 ) -> str:
432 """Process first response after initiating long running operation.
433
434 :param pipeline_response: Initial REST call response.
435 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
436 :return: The initial status.
437 :rtype: str
438 """
439 response = pipeline_response.http_response
440
441 self._location_url = response.headers["location"]
442
443 if response.status_code in {200, 201, 202, 204} and self._location_url:
444 return "InProgress"
445 raise OperationFailed("Operation failed or canceled")
446
447 def get_status(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> str:
448 """Return the status string extracted from this response.
449
450 For Location polling, it means the status monitor returns 202.
451
452 :param pipeline_response: Initial REST call response.
453 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
454 :return: The status string.
455 :rtype: str
456 """
457 response = pipeline_response.http_response
458 if "location" in response.headers:
459 self._location_url = response.headers["location"]
460
461 return "InProgress" if response.status_code == 202 else "Succeeded"
462
463
464class StatusCheckPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
465 """Should be the fallback polling, that don't poll but exit successfully
466 if not other polling are detected and status code is 2xx.
467 """
468
469 def can_poll(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> bool:
470 """Answer if this polling method could be used.
471
472 For this implementation, always True.
473
474 :param pipeline_response: Initial REST call response.
475 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
476 :return: True if this polling method could be used, False otherwise.
477 :rtype: bool
478 """
479 return True
480
481 def get_polling_url(self) -> str:
482 """Return the polling URL.
483
484 This is not implemented for this polling, since we're never supposed to loop.
485
486 :return: The polling URL.
487 :rtype: str
488 """
489 raise ValueError("This polling doesn't support polling url")
490
491 def set_initial_status(
492 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]
493 ) -> str:
494 """Process first response after initiating long running operation.
495
496 Will succeed immediately.
497
498 :param pipeline_response: Initial REST call response.
499 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
500 :return: The initial status.
501 :rtype: str
502 """
503 return "Succeeded"
504
505 def get_status(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> str:
506 """Return the status string extracted from this response.
507
508 Only possible status is success.
509
510 :param pipeline_response: Initial REST call response.
511 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
512 :return: The status string.
513 :rtype: str
514 """
515 return "Succeeded"
516
517 def get_final_get_url(
518 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]
519 ) -> Optional[str]:
520 """If a final GET is needed, returns the URL.
521
522 :param pipeline_response: Success REST call response.
523 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
524 :rtype: str
525 :return: Always None for this implementation.
526 """
527 return None
528
529
530class _SansIOLROBasePolling(
531 Generic[PollingReturnType_co, PipelineClientType, HttpRequestTypeVar, AllHttpResponseTypeVar]
532): # pylint: disable=too-many-instance-attributes
533 """A base class that has no opinion on IO, to help mypy be accurate.
534
535 :param float timeout: Default polling internal in absence of Retry-After header, in seconds.
536 :param list[LongRunningOperation] lro_algorithms: Ordered list of LRO algorithms to use.
537 :param lro_options: Additional options for LRO. For more information, see the algorithm's docstring.
538 :type lro_options: dict[str, any]
539 :param path_format_arguments: A dictionary of the format arguments to be used to format the URL.
540 :type path_format_arguments: dict[str, str]
541 """
542
543 _deserialization_callback: Callable[[Any], PollingReturnType_co]
544 """The deserialization callback that returns the final instance."""
545
546 _operation: LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]
547 """The algorithm this poller has decided to use. Will loop through 'can_poll' of the input algorithms to decide."""
548
549 _status: str
550 """Hold the current status of this poller"""
551
552 _client: PipelineClientType
553 """The Azure Core Pipeline client used to make request."""
554
555 def __init__(
556 self,
557 timeout: float = 30,
558 lro_algorithms: Optional[Sequence[LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]]] = None,
559 lro_options: Optional[Dict[str, Any]] = None,
560 path_format_arguments: Optional[Dict[str, str]] = None,
561 **operation_config: Any
562 ):
563 self._lro_algorithms = lro_algorithms or [
564 OperationResourcePolling(lro_options=lro_options),
565 LocationPolling(),
566 StatusCheckPolling(),
567 ]
568
569 self._timeout = timeout
570 self._operation_config = operation_config
571 self._lro_options = lro_options
572 self._path_format_arguments = path_format_arguments
573
574 def initialize(
575 self,
576 client: PipelineClientType,
577 initial_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
578 deserialization_callback: Callable[
579 [PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]], PollingReturnType_co
580 ],
581 ) -> None:
582 """Set the initial status of this LRO.
583
584 :param client: The Azure Core Pipeline client used to make request.
585 :type client: ~azure.core.pipeline.PipelineClient
586 :param initial_response: The initial response for the call.
587 :type initial_response: ~azure.core.pipeline.PipelineResponse
588 :param deserialization_callback: A callback function to deserialize the final response.
589 :type deserialization_callback: callable
590 :raises: HttpResponseError if initial status is incorrect LRO state
591 """
592 self._client = client
593 self._pipeline_response = ( # pylint: disable=attribute-defined-outside-init
594 self._initial_response # pylint: disable=attribute-defined-outside-init
595 ) = initial_response
596 self._deserialization_callback = deserialization_callback
597
598 for operation in self._lro_algorithms:
599 if operation.can_poll(initial_response):
600 self._operation = operation
601 break
602 else:
603 raise BadResponse("Unable to find status link for polling.")
604
605 try:
606 _raise_if_bad_http_status_and_method(self._initial_response.http_response)
607 self._status = self._operation.set_initial_status(initial_response)
608
609 except BadStatus as err:
610 self._status = "Failed"
611 raise HttpResponseError(response=initial_response.http_response, error=err) from err
612 except BadResponse as err:
613 self._status = "Failed"
614 raise HttpResponseError(response=initial_response.http_response, message=str(err), error=err) from err
615 except OperationFailed as err:
616 raise HttpResponseError(response=initial_response.http_response, error=err) from err
617
618 def get_continuation_token(self) -> str:
619 import pickle
620
621 return base64.b64encode(pickle.dumps(self._initial_response)).decode("ascii")
622
623 @classmethod
624 def from_continuation_token(
625 cls, continuation_token: str, **kwargs: Any
626 ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]:
627 try:
628 client = kwargs["client"]
629 except KeyError:
630 raise ValueError("Need kwarg 'client' to be recreated from continuation_token") from None
631
632 try:
633 deserialization_callback = kwargs["deserialization_callback"]
634 except KeyError:
635 raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None
636
637 import pickle
638
639 initial_response = pickle.loads(base64.b64decode(continuation_token)) # nosec
640 # Restore the transport in the context
641 initial_response.context.transport = client._pipeline._transport # pylint: disable=protected-access
642 return client, initial_response, deserialization_callback
643
644 def status(self) -> str:
645 """Return the current status as a string.
646
647 :rtype: str
648 :return: The current status.
649 """
650 if not self._operation:
651 raise ValueError("set_initial_status was never called. Did you give this instance to a poller?")
652 return self._status
653
654 def finished(self) -> bool:
655 """Is this polling finished?
656
657 :rtype: bool
658 :return: True if finished, False otherwise.
659 """
660 return _finished(self.status())
661
662 def resource(self) -> PollingReturnType_co:
663 """Return the built resource.
664
665 :rtype: any
666 :return: The built resource.
667 """
668 return self._parse_resource(self._pipeline_response)
669
670 def _parse_resource(
671 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]
672 ) -> PollingReturnType_co:
673 """Assuming this response is a resource, use the deserialization callback to parse it.
674 If body is empty, assuming no resource to return.
675
676 :param pipeline_response: The response object.
677 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
678 :return: The parsed resource.
679 :rtype: any
680 """
681 response = pipeline_response.http_response
682 if not _is_empty(response):
683 return self._deserialization_callback(pipeline_response)
684
685 # This "type ignore" has been discussed with architects.
686 # We have a typing problem that if the Swagger/TSP describes a return type (PollingReturnType_co is not None),
687 # BUT the returned payload is actually empty, we don't want to fail, but return None.
688 # To be clean, we would have to make the polling return type Optional "just in case the Swagger/TSP is wrong".
689 # This is reducing the quality and the value of the typing annotations
690 # for a case that is not supposed to happen in the first place. So we decided to ignore the type error here.
691 return None # type: ignore
692
693 def _get_request_id(self) -> str:
694 return self._pipeline_response.http_response.request.headers["x-ms-client-request-id"]
695
696 def _extract_delay(self) -> float:
697 delay = get_retry_after(self._pipeline_response)
698 if delay:
699 return delay
700 return self._timeout
701
702
703class LROBasePolling(
704 _SansIOLROBasePolling[
705 PollingReturnType_co,
706 PipelineClient[HttpRequestTypeVar, HttpResponseTypeVar],
707 HttpRequestTypeVar,
708 HttpResponseTypeVar,
709 ],
710 PollingMethod[PollingReturnType_co],
711): # pylint: disable=too-many-instance-attributes
712 """A base LRO poller.
713
714 This assumes a basic flow:
715 - I analyze the response to decide the polling approach
716 - I poll
717 - I ask the final resource depending of the polling approach
718
719 If your polling need are more specific, you could implement a PollingMethod directly
720 """
721
722 _initial_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]
723 """Store the initial response."""
724
725 _pipeline_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]
726 """Store the latest received HTTP response, initialized by the first answer."""
727
728 @property
729 def _transport(self) -> HttpTransport[HttpRequestTypeVar, HttpResponseTypeVar]:
730 return self._client._pipeline._transport # pylint: disable=protected-access
731
732 def __getattribute__(self, name: str) -> Any:
733 """Find the right method for the job.
734
735 This contains a workaround for azure-mgmt-core 1.0.0 to 1.4.0, where the MRO
736 is changing when azure-core was refactored in 1.27.0. The MRO change was causing
737 AsyncARMPolling to look-up the wrong methods and find the non-async ones.
738
739 :param str name: The name of the attribute to retrieve.
740 :rtype: Any
741 :return: The attribute value.
742 """
743 cls = object.__getattribute__(self, "__class__")
744 if cls.__name__ == "AsyncARMPolling" and name in [
745 "run",
746 "update_status",
747 "request_status",
748 "_sleep",
749 "_delay",
750 "_poll",
751 ]:
752 return getattr(super(LROBasePolling, self), name)
753 return super().__getattribute__(name)
754
755 def run(self) -> None:
756 try:
757 self._poll()
758
759 except BadStatus as err:
760 self._status = "Failed"
761 raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err
762
763 except BadResponse as err:
764 self._status = "Failed"
765 raise HttpResponseError(
766 response=self._pipeline_response.http_response,
767 message=str(err),
768 error=err,
769 ) from err
770
771 except OperationFailed as err:
772 raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err
773
774 def _poll(self) -> None:
775 """Poll status of operation so long as operation is incomplete and
776 we have an endpoint to query.
777
778 :raises: OperationFailed if operation status 'Failed' or 'Canceled'.
779 :raises: BadStatus if response status invalid.
780 :raises: BadResponse if response invalid.
781 """
782 if not self.finished():
783 self.update_status()
784 while not self.finished():
785 self._delay()
786 self.update_status()
787
788 if _failed(self.status()):
789 raise OperationFailed("Operation failed or canceled")
790
791 final_get_url = self._operation.get_final_get_url(self._pipeline_response)
792 if final_get_url:
793 self._pipeline_response = self.request_status(final_get_url)
794 _raise_if_bad_http_status_and_method(self._pipeline_response.http_response)
795
796 def _sleep(self, delay: float) -> None:
797 self._transport.sleep(delay)
798
799 def _delay(self) -> None:
800 """Check for a 'retry-after' header to set timeout,
801 otherwise use configured timeout.
802 """
803 delay = self._extract_delay()
804 self._sleep(delay)
805
806 def update_status(self) -> None:
807 """Update the current status of the LRO."""
808 self._pipeline_response = self.request_status(self._operation.get_polling_url())
809 _raise_if_bad_http_status_and_method(self._pipeline_response.http_response)
810 self._status = self._operation.get_status(self._pipeline_response)
811
812 def request_status(self, status_link: str) -> PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]:
813 """Do a simple GET to this status link.
814
815 This method re-inject 'x-ms-client-request-id'.
816
817 :param str status_link: The URL to poll.
818 :rtype: azure.core.pipeline.PipelineResponse
819 :return: The response of the status request.
820 """
821 if self._path_format_arguments:
822 status_link = self._client.format_url(status_link, **self._path_format_arguments)
823 # Re-inject 'x-ms-client-request-id' while polling
824 if "request_id" not in self._operation_config:
825 self._operation_config["request_id"] = self._get_request_id()
826
827 if is_rest(self._initial_response.http_response):
828 rest_request = cast(HttpRequestTypeVar, HttpRequest("GET", status_link))
829 # Need a cast, as "_return_pipeline_response" mutate the return type, and that return type is not
830 # declared in the typing of "send_request"
831 return cast(
832 PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar],
833 self._client.send_request(rest_request, _return_pipeline_response=True, **self._operation_config),
834 )
835
836 # Legacy HttpRequest and HttpResponse from azure.core.pipeline.transport
837 # casting things here, as we don't want the typing system to know
838 # about the legacy APIs.
839 request = cast(HttpRequestTypeVar, self._client.get(status_link))
840 return cast(
841 PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar],
842 self._client._pipeline.run( # pylint: disable=protected-access
843 request, stream=False, **self._operation_config
844 ),
845 )
846
847
848__all__ = [
849 "BadResponse",
850 "BadStatus",
851 "OperationFailed",
852 "LongRunningOperation",
853 "OperationResourcePolling",
854 "LocationPolling",
855 "StatusCheckPolling",
856 "LROBasePolling",
857]