1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26import abc
27import base64
28import json
29from enum import Enum
30from typing import (
31 Optional,
32 Any,
33 Tuple,
34 Callable,
35 Dict,
36 Mapping,
37 Sequence,
38 Generic,
39 TypeVar,
40 cast,
41 Union,
42)
43
44from ..exceptions import HttpResponseError, DecodeError
45from . import PollingMethod
46from ..pipeline.policies._utils import get_retry_after
47from ..pipeline._tools import is_rest
48from .._enum_meta import CaseInsensitiveEnumMeta
49from .. import PipelineClient
50from ..pipeline import PipelineResponse, PipelineContext
51from ..rest._helpers import decode_to_text, get_charset_encoding
52from ..utils._utils import case_insensitive_dict
53from ..pipeline.transport import (
54 HttpTransport,
55 HttpRequest as LegacyHttpRequest,
56 HttpResponse as LegacyHttpResponse,
57 AsyncHttpResponse as LegacyAsyncHttpResponse,
58)
59from ..rest import HttpRequest, HttpResponse, AsyncHttpResponse
60from ._utils import (
61 _encode_continuation_token,
62 _decode_continuation_token,
63 _filter_sensitive_headers,
64)
65
66
67HttpRequestType = Union[LegacyHttpRequest, HttpRequest]
68HttpResponseType = Union[LegacyHttpResponse, HttpResponse] # Sync only
69AllHttpResponseType = Union[
70 LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse
71] # Sync or async
72LegacyPipelineResponseType = PipelineResponse[LegacyHttpRequest, LegacyHttpResponse]
73NewPipelineResponseType = PipelineResponse[HttpRequest, HttpResponse]
74PipelineResponseType = PipelineResponse[HttpRequestType, HttpResponseType]
75HttpRequestTypeVar = TypeVar("HttpRequestTypeVar", bound=HttpRequestType)
76HttpResponseTypeVar = TypeVar("HttpResponseTypeVar", bound=HttpResponseType) # Sync only
77AllHttpResponseTypeVar = TypeVar("AllHttpResponseTypeVar", bound=AllHttpResponseType) # Sync or async
78
79ABC = abc.ABC
80PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True)
81PipelineClientType = TypeVar("PipelineClientType")
82HTTPResponseType_co = TypeVar("HTTPResponseType_co", covariant=True)
83HTTPRequestType_co = TypeVar("HTTPRequestType_co", covariant=True)
84
85
86_FINISHED = frozenset(["succeeded", "canceled", "failed"])
87_FAILED = frozenset(["canceled", "failed"])
88_SUCCEEDED = frozenset(["succeeded"])
89
90
91class _ContinuationTokenHttpResponse:
92 """A minimal HTTP response class for reconstructing responses from continuation tokens.
93
94 This class provides just enough interface to be used with LRO polling operations
95 when restoring from a continuation token.
96
97 :param request: The HTTP request (optional, may be None if not available in the continuation token)
98 :type request: ~azure.core.rest.HttpRequest or None
99 :param status_code: The HTTP status code
100 :type status_code: int
101 :param headers: The response headers
102 :type headers: dict
103 :param content: The response content
104 :type content: bytes
105 """
106
107 def __init__(
108 self,
109 request: Optional[HttpRequest],
110 status_code: int,
111 headers: Dict[str, str],
112 content: bytes,
113 ):
114 self.request = request
115 self.status_code = status_code
116 self.headers = case_insensitive_dict(headers)
117 self._content = content
118
119 @property
120 def content(self) -> bytes:
121 """Return the response content.
122
123 :return: The response content
124 :rtype: bytes
125 """
126 return self._content
127
128 def text(self) -> str:
129 """Return the response content as text.
130
131 Uses the charset from Content-Type header if available, otherwise falls back
132 to UTF-8 with replacement for invalid characters.
133
134 :return: The response content as text
135 :rtype: str
136 """
137 encoding = get_charset_encoding(self)
138 return decode_to_text(encoding, self._content)
139
140
141def _get_content(response: AllHttpResponseType) -> bytes:
142 """Get the content of this response. This is designed specifically to avoid
143 a warning of mypy for body() access, as this method is deprecated.
144
145 :param response: The response object.
146 :type response: any
147 :return: The content of this response.
148 :rtype: bytes
149 """
150 if isinstance(response, (LegacyHttpResponse, LegacyAsyncHttpResponse)):
151 return response.body()
152 return response.content
153
154
155def _finished(status):
156 if hasattr(status, "value"):
157 status = status.value
158 return str(status).lower() in _FINISHED
159
160
161def _failed(status):
162 if hasattr(status, "value"):
163 status = status.value
164 return str(status).lower() in _FAILED
165
166
167def _succeeded(status):
168 if hasattr(status, "value"):
169 status = status.value
170 return str(status).lower() in _SUCCEEDED
171
172
173class BadStatus(Exception):
174 """Exception raised when status is invalid."""
175
176
177class BadResponse(Exception):
178 """Exception raised when response is invalid."""
179
180
181class OperationFailed(Exception):
182 """Exception raised when operation failed or canceled."""
183
184
185def _as_json(response: AllHttpResponseType) -> Dict[str, Any]:
186 """Assuming this is not empty, return the content as JSON.
187
188 Result/exceptions is not determined if you call this method without testing _is_empty.
189
190 :param response: The response object.
191 :type response: any
192 :return: The content of this response as dict.
193 :rtype: dict
194 :raises DecodeError: If response body contains invalid json data.
195 """
196 try:
197 return json.loads(response.text())
198 except ValueError as err:
199 raise DecodeError("Error occurred in deserializing the response body.") from err
200
201
202def _raise_if_bad_http_status_and_method(response: AllHttpResponseType) -> None:
203 """Check response status code is valid.
204
205 Must be 200, 201, 202, or 204.
206
207 :param response: The response object.
208 :type response: any
209 :raises ~azure.core.polling.base_polling.BadStatus: If invalid status.
210 """
211 code = response.status_code
212 if code in {200, 201, 202, 204}:
213 return
214 raise BadStatus("Invalid return status {!r} for {!r} operation".format(code, response.request.method))
215
216
217def _is_empty(response: AllHttpResponseType) -> bool:
218 """Check if response body contains meaningful content.
219
220 :param response: The response object.
221 :type response: any
222 :return: True if response body is empty, False otherwise.
223 :rtype: bool
224 """
225 return not bool(_get_content(response))
226
227
228class LongRunningOperation(ABC, Generic[HTTPRequestType_co, HTTPResponseType_co]):
229 """Protocol to implement for a long running operation algorithm."""
230
231 @abc.abstractmethod
232 def can_poll(
233 self,
234 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
235 ) -> bool:
236 """Answer if this polling method could be used.
237
238 :param pipeline_response: Initial REST call response.
239 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
240 :return: True if this polling method could be used, False otherwise.
241 :rtype: bool
242 """
243 raise NotImplementedError()
244
245 @abc.abstractmethod
246 def get_polling_url(self) -> str:
247 """Return the polling URL.
248
249 :return: The polling URL.
250 :rtype: str
251 """
252 raise NotImplementedError()
253
254 @abc.abstractmethod
255 def set_initial_status(
256 self,
257 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
258 ) -> str:
259 """Process first response after initiating long running operation.
260
261 :param pipeline_response: Initial REST call response.
262 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
263 :return: The initial status.
264 :rtype: str
265 """
266 raise NotImplementedError()
267
268 @abc.abstractmethod
269 def get_status(
270 self,
271 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
272 ) -> str:
273 """Return the status string extracted from this response.
274
275 :param pipeline_response: The response object.
276 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
277 :return: The status string.
278 :rtype: str
279 """
280 raise NotImplementedError()
281
282 @abc.abstractmethod
283 def get_final_get_url(
284 self,
285 pipeline_response: PipelineResponse[HTTPRequestType_co, HTTPResponseType_co],
286 ) -> Optional[str]:
287 """If a final GET is needed, returns the URL.
288
289 :param pipeline_response: Success REST call response.
290 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
291 :return: The URL to the final GET, or None if no final GET is needed.
292 :rtype: str or None
293 """
294 raise NotImplementedError()
295
296
297class _LroOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
298 """Known LRO options from Swagger."""
299
300 FINAL_STATE_VIA = "final-state-via"
301
302
303class _FinalStateViaOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
304 """Possible final-state-via options."""
305
306 AZURE_ASYNC_OPERATION_FINAL_STATE = "azure-async-operation"
307 LOCATION_FINAL_STATE = "location"
308 OPERATION_LOCATION_FINAL_STATE = "operation-location"
309
310
311class OperationResourcePolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
312 """Implements a operation resource polling, typically from Operation-Location.
313
314 :param str operation_location_header: Name of the header to return operation format (default 'operation-location')
315 :keyword dict[str, any] lro_options: Additional options for LRO. For more information, see
316 https://aka.ms/azsdk/autorest/openapi/lro-options
317 """
318
319 _async_url: str
320 """Url to resource monitor (AzureAsyncOperation or Operation-Location)"""
321
322 _location_url: Optional[str]
323 """Location header if present"""
324
325 _request: Any
326 """The initial request done"""
327
328 def __init__(
329 self, operation_location_header: str = "operation-location", *, lro_options: Optional[Dict[str, Any]] = None
330 ):
331 self._operation_location_header = operation_location_header
332 self._location_url = None
333 self._lro_options = lro_options or {}
334
335 def can_poll(
336 self,
337 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
338 ) -> bool:
339 """Check if status monitor header (e.g. Operation-Location) is present.
340
341 :param pipeline_response: Initial REST call response.
342 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
343 :return: True if this polling method could be used, False otherwise.
344 :rtype: bool
345 """
346 response = pipeline_response.http_response
347 return self._operation_location_header in response.headers
348
349 def get_polling_url(self) -> str:
350 """Return the polling URL.
351
352 Will extract it from the defined header to read (e.g. Operation-Location)
353
354 :return: The polling URL.
355 :rtype: str
356 """
357 return self._async_url
358
359 def get_final_get_url(
360 self,
361 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
362 ) -> Optional[str]:
363 """If a final GET is needed, returns the URL.
364
365 :param pipeline_response: Success REST call response.
366 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
367 :return: The URL to the final GET, or None if no final GET is needed.
368 :rtype: str or None
369 """
370 if (
371 self._lro_options.get(_LroOption.FINAL_STATE_VIA) == _FinalStateViaOption.LOCATION_FINAL_STATE
372 and self._location_url
373 ):
374 return self._location_url
375 if (
376 self._lro_options.get(_LroOption.FINAL_STATE_VIA)
377 in [
378 _FinalStateViaOption.AZURE_ASYNC_OPERATION_FINAL_STATE,
379 _FinalStateViaOption.OPERATION_LOCATION_FINAL_STATE,
380 ]
381 and self._request.method == "POST"
382 ):
383 return None
384 response = pipeline_response.http_response
385 if not _is_empty(response):
386 body = _as_json(response)
387 # https://github.com/microsoft/api-guidelines/blob/vNext/Guidelines.md#target-resource-location
388 resource_location = body.get("resourceLocation")
389 if resource_location:
390 return resource_location
391
392 if self._request.method in {"PUT", "PATCH"}:
393 return self._request.url
394
395 if self._request.method == "POST" and self._location_url:
396 return self._location_url
397
398 return None
399
400 def set_initial_status(
401 self,
402 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
403 ) -> str:
404 """Process first response after initiating long running operation.
405
406 :param pipeline_response: Initial REST call response.
407 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
408 :return: The initial status.
409 :rtype: str
410 """
411 self._request = pipeline_response.http_response.request
412 response = pipeline_response.http_response
413
414 self._set_async_url_if_present(response)
415
416 if response.status_code in {200, 201, 202, 204} and self._async_url:
417 # Check if we can extract status from initial response, if present
418 try:
419 return self.get_status(pipeline_response)
420 # Wide catch, it may not even be JSON at all, deserialization is lenient
421 except Exception: # pylint: disable=broad-except
422 pass
423 return "InProgress"
424 raise OperationFailed("Operation failed or canceled")
425
426 def _set_async_url_if_present(self, response: AllHttpResponseTypeVar) -> None:
427 self._async_url = response.headers[self._operation_location_header]
428
429 location_url = response.headers.get("location")
430 if location_url:
431 self._location_url = location_url
432
433 def get_status(
434 self,
435 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
436 ) -> str:
437 """Process the latest status update retrieved from an "Operation-Location" header.
438
439 :param pipeline_response: Initial REST call response.
440 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
441 :return: The status string.
442 :rtype: str
443 :raises ~azure.core.polling.base_polling.BadResponse: if response has no body, or body does not contain status.
444 """
445 response = pipeline_response.http_response
446 if _is_empty(response):
447 raise BadResponse("The response from long running operation does not contain a body.")
448
449 body = _as_json(response)
450 status = body.get("status")
451 if not status:
452 raise BadResponse("No status found in body")
453 return status
454
455
456class LocationPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
457 """Implements a Location polling."""
458
459 _location_url: str
460 """Location header"""
461
462 def can_poll(
463 self,
464 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
465 ) -> bool:
466 """True if contains a Location header
467
468 :param pipeline_response: Initial REST call response.
469 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
470 :return: True if this polling method could be used, False otherwise.
471 :rtype: bool
472 """
473 response = pipeline_response.http_response
474 return "location" in response.headers
475
476 def get_polling_url(self) -> str:
477 """Return the Location header value.
478
479 :return: The polling URL.
480 :rtype: str
481 """
482 return self._location_url
483
484 def get_final_get_url(
485 self,
486 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
487 ) -> Optional[str]:
488 """If a final GET is needed, returns the URL.
489
490 Always return None for a Location polling.
491
492 :param pipeline_response: Success REST call response.
493 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
494 :return: Always None for this implementation.
495 :rtype: None
496 """
497 return None
498
499 def set_initial_status(
500 self,
501 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
502 ) -> str:
503 """Process first response after initiating long running operation.
504
505 :param pipeline_response: Initial REST call response.
506 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
507 :return: The initial status.
508 :rtype: str
509 """
510 response = pipeline_response.http_response
511
512 self._location_url = response.headers["location"]
513
514 if response.status_code in {200, 201, 202, 204} and self._location_url:
515 return "InProgress"
516 raise OperationFailed("Operation failed or canceled")
517
518 def get_status(
519 self,
520 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
521 ) -> str:
522 """Return the status string extracted from this response.
523
524 For Location polling, it means the status monitor returns 202.
525
526 :param pipeline_response: Initial REST call response.
527 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
528 :return: The status string.
529 :rtype: str
530 """
531 response = pipeline_response.http_response
532 if "location" in response.headers:
533 self._location_url = response.headers["location"]
534
535 return "InProgress" if response.status_code == 202 else "Succeeded"
536
537
538class StatusCheckPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
539 """Should be the fallback polling, that don't poll but exit successfully
540 if not other polling are detected and status code is 2xx.
541 """
542
543 def can_poll(
544 self,
545 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
546 ) -> bool:
547 """Answer if this polling method could be used.
548
549 For this implementation, always True.
550
551 :param pipeline_response: Initial REST call response.
552 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
553 :return: True if this polling method could be used, False otherwise.
554 :rtype: bool
555 """
556 return True
557
558 def get_polling_url(self) -> str:
559 """Return the polling URL.
560
561 This is not implemented for this polling, since we're never supposed to loop.
562
563 :return: The polling URL.
564 :rtype: str
565 """
566 raise ValueError("This polling doesn't support polling url")
567
568 def set_initial_status(
569 self,
570 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
571 ) -> str:
572 """Process first response after initiating long running operation.
573
574 Will succeed immediately.
575
576 :param pipeline_response: Initial REST call response.
577 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
578 :return: The initial status.
579 :rtype: str
580 """
581 return "Succeeded"
582
583 def get_status(
584 self,
585 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
586 ) -> str:
587 """Return the status string extracted from this response.
588
589 Only possible status is success.
590
591 :param pipeline_response: Initial REST call response.
592 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
593 :return: The status string.
594 :rtype: str
595 """
596 return "Succeeded"
597
598 def get_final_get_url(
599 self,
600 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
601 ) -> Optional[str]:
602 """If a final GET is needed, returns the URL.
603
604 :param pipeline_response: Success REST call response.
605 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
606 :rtype: str
607 :return: Always None for this implementation.
608 """
609 return None
610
611
612class _SansIOLROBasePolling(
613 Generic[
614 PollingReturnType_co,
615 PipelineClientType,
616 HttpRequestTypeVar,
617 AllHttpResponseTypeVar,
618 ]
619): # pylint: disable=too-many-instance-attributes
620 """A base class that has no opinion on IO, to help mypy be accurate.
621
622 :param float timeout: Default polling internal in absence of Retry-After header, in seconds.
623 :param list[LongRunningOperation] lro_algorithms: Ordered list of LRO algorithms to use.
624 :param lro_options: Additional options for LRO. For more information, see the algorithm's docstring.
625 :type lro_options: dict[str, any]
626 :param path_format_arguments: A dictionary of the format arguments to be used to format the URL.
627 :type path_format_arguments: dict[str, str]
628 """
629
630 _deserialization_callback: Callable[[Any], PollingReturnType_co]
631 """The deserialization callback that returns the final instance."""
632
633 _operation: LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]
634 """The algorithm this poller has decided to use. Will loop through 'can_poll' of the input algorithms to decide."""
635
636 _status: str
637 """Hold the current status of this poller"""
638
639 _client: PipelineClientType
640 """The Azure Core Pipeline client used to make request."""
641
642 def __init__(
643 self,
644 timeout: float = 30,
645 lro_algorithms: Optional[Sequence[LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]]] = None,
646 lro_options: Optional[Dict[str, Any]] = None,
647 path_format_arguments: Optional[Dict[str, str]] = None,
648 **operation_config: Any
649 ):
650 self._lro_algorithms = lro_algorithms or [
651 OperationResourcePolling(lro_options=lro_options),
652 LocationPolling(),
653 StatusCheckPolling(),
654 ]
655
656 self._timeout = timeout
657 self._operation_config = operation_config
658 self._lro_options = lro_options
659 self._path_format_arguments = path_format_arguments
660
661 def initialize(
662 self,
663 client: PipelineClientType,
664 initial_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
665 deserialization_callback: Callable[
666 [PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]],
667 PollingReturnType_co,
668 ],
669 ) -> None:
670 """Set the initial status of this LRO.
671
672 :param client: The Azure Core Pipeline client used to make request.
673 :type client: ~azure.core.pipeline.PipelineClient
674 :param initial_response: The initial response for the call.
675 :type initial_response: ~azure.core.pipeline.PipelineResponse
676 :param deserialization_callback: A callback function to deserialize the final response.
677 :type deserialization_callback: callable
678 :raises ~azure.core.HttpResponseError: If initial status is incorrect LRO state
679 """
680 self._client = client
681 self._pipeline_response = ( # pylint: disable=attribute-defined-outside-init
682 self._initial_response # pylint: disable=attribute-defined-outside-init
683 ) = initial_response
684 self._deserialization_callback = deserialization_callback
685
686 for operation in self._lro_algorithms:
687 if operation.can_poll(initial_response):
688 self._operation = operation
689 break
690 else:
691 raise BadResponse("Unable to find status link for polling.")
692
693 try:
694 _raise_if_bad_http_status_and_method(self._initial_response.http_response)
695 self._status = self._operation.set_initial_status(initial_response)
696
697 except BadStatus as err:
698 self._status = "Failed"
699 raise HttpResponseError(response=initial_response.http_response, error=err) from err
700 except BadResponse as err:
701 self._status = "Failed"
702 raise HttpResponseError(response=initial_response.http_response, message=str(err), error=err) from err
703 except OperationFailed as err:
704 raise HttpResponseError(response=initial_response.http_response, error=err) from err
705
706 def _filter_headers_for_continuation_token(self, headers: Mapping[str, str]) -> Dict[str, str]:
707 """Filter headers to include in the continuation token.
708
709 Subclasses can override this method to include additional headers needed
710 for their specific LRO implementation.
711
712 :param headers: The response headers to filter.
713 :type headers: Mapping[str, str]
714 :return: A filtered dictionary of headers to include in the continuation token.
715 :rtype: dict[str, str]
716 """
717 return _filter_sensitive_headers(headers)
718
719 def get_continuation_token(self) -> str:
720 """Get a continuation token that can be used to recreate this poller.
721
722 :rtype: str
723 :return: An opaque continuation token.
724 :raises ValueError: If the initial response is not set.
725 """
726 response = self._initial_response.http_response
727 request = response.request
728 # Serialize the essential parts of the PipelineResponse to JSON.
729 if request:
730 request_headers = {}
731 # Preserve x-ms-client-request-id for request correlation
732 if "x-ms-client-request-id" in request.headers:
733 request_headers["x-ms-client-request-id"] = request.headers["x-ms-client-request-id"]
734 request_state = {
735 "method": request.method,
736 "url": request.url,
737 "headers": request_headers,
738 }
739 else:
740 request_state = None
741 # Get response content, handling the case where it might not be read yet
742 try:
743 content = _get_content(response) or b""
744 except Exception: # pylint: disable=broad-except
745 content = b""
746 # Get deserialized data from context if available (optimization).
747 # If context doesn't have it, fall back to parsing the response body directly.
748 # Note: deserialized_data is only included if it's JSON-serializable.
749 # Non-JSON-serializable types (e.g., XML ElementTree) are skipped and set to None.
750 # In such cases, the data can still be re-parsed from the raw content bytes.
751 deserialized_data = None
752 raw_deserialized = None
753 if self._initial_response.context is not None:
754 raw_deserialized = self._initial_response.context.get("deserialized_data")
755 # Fallback: try to get deserialized data from the response body if context didn't have it
756 if raw_deserialized is None and content:
757 try:
758 raw_deserialized = json.loads(content)
759 except (json.JSONDecodeError, ValueError, TypeError):
760 # Response body is not valid JSON, leave as None
761 pass
762 if raw_deserialized is not None:
763 try:
764 # Test if the data is JSON-serializable
765 json.dumps(raw_deserialized)
766 deserialized_data = raw_deserialized
767 except (TypeError, ValueError):
768 # Skip non-JSON-serializable data (e.g., XML ElementTree objects)
769 deserialized_data = None
770 state = {
771 "request": request_state,
772 "response": {
773 "status_code": response.status_code,
774 "headers": self._filter_headers_for_continuation_token(response.headers),
775 "content": base64.b64encode(content).decode("ascii"),
776 },
777 "context": {
778 "deserialized_data": deserialized_data,
779 },
780 }
781 return _encode_continuation_token(state)
782
783 @classmethod
784 def from_continuation_token(
785 cls, continuation_token: str, **kwargs: Any
786 ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]:
787 """Recreate the poller from a continuation token.
788
789 :param continuation_token: The continuation token to recreate the poller.
790 :type continuation_token: str
791 :return: A tuple containing the client, the initial response, and the deserialization callback.
792 :rtype: tuple[~azure.core.PipelineClient, ~azure.core.pipeline.PipelineResponse, callable]
793 :raises ValueError: If the continuation token is invalid or if 'client' or
794 'deserialization_callback' are not provided.
795 """
796 try:
797 client = kwargs["client"]
798 except KeyError:
799 raise ValueError("Need kwarg 'client' to be recreated from continuation_token") from None
800
801 try:
802 deserialization_callback = kwargs["deserialization_callback"]
803 except KeyError:
804 raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None
805
806 state = _decode_continuation_token(continuation_token)
807 # Reconstruct HttpRequest if present
808 request_state = state.get("request")
809 http_request = None
810 if request_state is not None:
811 http_request = HttpRequest(
812 method=request_state["method"],
813 url=request_state["url"],
814 headers=request_state.get("headers", {}),
815 )
816 # Reconstruct HttpResponse using the minimal response class
817 response_state = state["response"]
818 http_response = _ContinuationTokenHttpResponse(
819 request=http_request,
820 status_code=response_state["status_code"],
821 headers=response_state["headers"],
822 content=base64.b64decode(response_state["content"]),
823 )
824 # Reconstruct PipelineResponse
825 context = PipelineContext(client._pipeline._transport) # pylint: disable=protected-access
826 context_state = state.get("context", {})
827 if context_state.get("deserialized_data") is not None:
828 context["deserialized_data"] = context_state["deserialized_data"]
829 initial_response = PipelineResponse(
830 http_request=http_request,
831 http_response=http_response,
832 context=context,
833 )
834 return client, initial_response, deserialization_callback
835
836 def status(self) -> str:
837 """Return the current status as a string.
838
839 :rtype: str
840 :return: The current status.
841 """
842 if not self._operation:
843 raise ValueError("set_initial_status was never called. Did you give this instance to a poller?")
844 return self._status
845
846 def finished(self) -> bool:
847 """Is this polling finished?
848
849 :rtype: bool
850 :return: True if finished, False otherwise.
851 """
852 return _finished(self.status())
853
854 def resource(self) -> PollingReturnType_co:
855 """Return the built resource.
856
857 :rtype: any
858 :return: The built resource.
859 """
860 return self._parse_resource(self._pipeline_response)
861
862 def _parse_resource(
863 self,
864 pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar],
865 ) -> PollingReturnType_co:
866 """Assuming this response is a resource, use the deserialization callback to parse it.
867 If body is empty, assuming no resource to return.
868
869 :param pipeline_response: The response object.
870 :type pipeline_response: ~azure.core.pipeline.PipelineResponse
871 :return: The parsed resource.
872 :rtype: any
873 """
874 response = pipeline_response.http_response
875 if not _is_empty(response):
876 return self._deserialization_callback(pipeline_response)
877
878 # This "type ignore" has been discussed with architects.
879 # We have a typing problem that if the Swagger/TSP describes a return type (PollingReturnType_co is not None),
880 # BUT the returned payload is actually empty, we don't want to fail, but return None.
881 # To be clean, we would have to make the polling return type Optional "just in case the Swagger/TSP is wrong".
882 # This is reducing the quality and the value of the typing annotations
883 # for a case that is not supposed to happen in the first place. So we decided to ignore the type error here.
884 return None # type: ignore
885
886 def _get_request_id(self) -> str:
887 return self._pipeline_response.http_response.request.headers["x-ms-client-request-id"]
888
889 def _extract_delay(self) -> float:
890 delay = get_retry_after(self._pipeline_response)
891 if delay:
892 return delay
893 return self._timeout
894
895
896class LROBasePolling(
897 _SansIOLROBasePolling[
898 PollingReturnType_co,
899 PipelineClient[HttpRequestTypeVar, HttpResponseTypeVar],
900 HttpRequestTypeVar,
901 HttpResponseTypeVar,
902 ],
903 PollingMethod[PollingReturnType_co],
904):
905 """A base LRO poller.
906
907 This assumes a basic flow:
908 - I analyze the response to decide the polling approach
909 - I poll
910 - I ask the final resource depending of the polling approach
911
912 If your polling need are more specific, you could implement a PollingMethod directly
913 """
914
915 _initial_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]
916 """Store the initial response."""
917
918 _pipeline_response: PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]
919 """Store the latest received HTTP response, initialized by the first answer."""
920
921 @property
922 def _transport(self) -> HttpTransport[HttpRequestTypeVar, HttpResponseTypeVar]:
923 return self._client._pipeline._transport # pylint: disable=protected-access
924
925 def __getattribute__(self, name: str) -> Any:
926 """Find the right method for the job.
927
928 This contains a workaround for azure-mgmt-core 1.0.0 to 1.4.0, where the MRO
929 is changing when azure-core was refactored in 1.27.0. The MRO change was causing
930 AsyncARMPolling to look-up the wrong methods and find the non-async ones.
931
932 :param str name: The name of the attribute to retrieve.
933 :rtype: Any
934 :return: The attribute value.
935 """
936 cls = object.__getattribute__(self, "__class__")
937 if cls.__name__ == "AsyncARMPolling" and name in [
938 "run",
939 "update_status",
940 "request_status",
941 "_sleep",
942 "_delay",
943 "_poll",
944 ]:
945 return getattr(super(LROBasePolling, self), name)
946 return super().__getattribute__(name)
947
948 def run(self) -> None:
949 try:
950 self._poll()
951
952 except BadStatus as err:
953 self._status = "Failed"
954 raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err
955
956 except BadResponse as err:
957 self._status = "Failed"
958 raise HttpResponseError(
959 response=self._pipeline_response.http_response,
960 message=str(err),
961 error=err,
962 ) from err
963
964 except OperationFailed as err:
965 raise HttpResponseError(response=self._pipeline_response.http_response, error=err) from err
966
967 def _poll(self) -> None:
968 """Poll status of operation so long as operation is incomplete and
969 we have an endpoint to query.
970
971 :raises ~azure.core.polling.base_polling.OperationFailed: If operation status 'Failed' or 'Canceled'.
972 :raises ~azure.core.polling.base_polling.BadStatus: If response status invalid.
973 :raises ~azure.core.polling.base_polling.BadResponse: If response invalid.
974 """
975 if not self.finished():
976 self.update_status()
977 while not self.finished():
978 self._delay()
979 self.update_status()
980
981 if _failed(self.status()):
982 raise OperationFailed("Operation failed or canceled")
983
984 final_get_url = self._operation.get_final_get_url(self._pipeline_response)
985 if final_get_url:
986 self._pipeline_response = self.request_status(final_get_url)
987 _raise_if_bad_http_status_and_method(self._pipeline_response.http_response)
988
989 def _sleep(self, delay: float) -> None:
990 self._transport.sleep(delay)
991
992 def _delay(self) -> None:
993 """Check for a 'retry-after' header to set timeout,
994 otherwise use configured timeout.
995 """
996 delay = self._extract_delay()
997 self._sleep(delay)
998
999 def update_status(self) -> None:
1000 """Update the current status of the LRO."""
1001 self._pipeline_response = self.request_status(self._operation.get_polling_url())
1002 _raise_if_bad_http_status_and_method(self._pipeline_response.http_response)
1003 self._status = self._operation.get_status(self._pipeline_response)
1004
1005 def request_status(self, status_link: str) -> PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar]:
1006 """Do a simple GET to this status link.
1007
1008 This method re-inject 'x-ms-client-request-id'.
1009
1010 :param str status_link: The URL to poll.
1011 :rtype: azure.core.pipeline.PipelineResponse
1012 :return: The response of the status request.
1013 """
1014 if self._path_format_arguments:
1015 status_link = self._client.format_url(status_link, **self._path_format_arguments)
1016 # Re-inject 'x-ms-client-request-id' while polling
1017 if "request_id" not in self._operation_config:
1018 self._operation_config["request_id"] = self._get_request_id()
1019
1020 if is_rest(self._initial_response.http_response):
1021 rest_request = cast(HttpRequestTypeVar, HttpRequest("GET", status_link))
1022 # Need a cast, as "_return_pipeline_response" mutate the return type, and that return type is not
1023 # declared in the typing of "send_request"
1024 return cast(
1025 PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar],
1026 self._client.send_request(rest_request, _return_pipeline_response=True, **self._operation_config),
1027 )
1028
1029 # Legacy HttpRequest and HttpResponse from azure.core.pipeline.transport
1030 # casting things here, as we don't want the typing system to know
1031 # about the legacy APIs.
1032 request = cast(HttpRequestTypeVar, self._client.get(status_link))
1033 return cast(
1034 PipelineResponse[HttpRequestTypeVar, HttpResponseTypeVar],
1035 self._client._pipeline.run( # pylint: disable=protected-access
1036 request, stream=False, **self._operation_config
1037 ),
1038 )
1039
1040
1041__all__ = [
1042 "BadResponse",
1043 "BadStatus",
1044 "OperationFailed",
1045 "LongRunningOperation",
1046 "OperationResourcePolling",
1047 "LocationPolling",
1048 "StatusCheckPolling",
1049 "LROBasePolling",
1050]