1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26from enum import Enum
27from typing import Optional, Union, TypeVar, Dict, Any, Sequence
28
29from azure.core import CaseInsensitiveEnumMeta
30from azure.core.polling.base_polling import (
31 LongRunningOperation,
32 LROBasePolling,
33 OperationFailed,
34 BadResponse,
35 OperationResourcePolling,
36 LocationPolling,
37 StatusCheckPolling,
38 _as_json,
39 _is_empty,
40)
41
42from azure.core.pipeline import PipelineResponse
43from azure.core.pipeline.transport import (
44 HttpRequest as LegacyHttpRequest,
45 HttpResponse as LegacyHttpResponse,
46 AsyncHttpResponse as LegacyAsyncHttpResponse,
47)
48from azure.core.rest import HttpRequest, HttpResponse, AsyncHttpResponse
49
50ResponseType = Union[HttpResponse, AsyncHttpResponse]
51PipelineResponseType = PipelineResponse[HttpRequest, ResponseType]
52HttpRequestType = Union[LegacyHttpRequest, HttpRequest]
53AllHttpResponseType = Union[
54 LegacyHttpResponse, HttpResponse, LegacyAsyncHttpResponse, AsyncHttpResponse
55] # Sync or async
56HttpRequestTypeVar = TypeVar("HttpRequestTypeVar", bound=HttpRequestType)
57AllHttpResponseTypeVar = TypeVar("AllHttpResponseTypeVar", bound=AllHttpResponseType) # Sync or async
58
59
60class _LroOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
61 """Known LRO options from Swagger."""
62
63 FINAL_STATE_VIA = "final-state-via"
64
65
66class _FinalStateViaOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
67 """Possible final-state-via options."""
68
69 AZURE_ASYNC_OPERATION_FINAL_STATE = "azure-async-operation"
70 LOCATION_FINAL_STATE = "location"
71
72
73class AzureAsyncOperationPolling(OperationResourcePolling[HttpRequestTypeVar, AllHttpResponseTypeVar]):
74 """Implements a operation resource polling, typically from Azure-AsyncOperation."""
75
76 def __init__(self, lro_options: Optional[Dict[str, Any]] = None) -> None:
77 super(AzureAsyncOperationPolling, self).__init__(operation_location_header="azure-asyncoperation")
78
79 self._lro_options = lro_options or {}
80
81 def get_final_get_url(
82 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]
83 ) -> Optional[str]:
84 """If a final GET is needed, returns the URL.
85
86 :param ~azure.core.pipeline.PipelineResponse pipeline_response: The pipeline response object.
87 :return: The URL to poll for the final GET.
88 :rtype: str
89 """
90 if (
91 self._lro_options.get(_LroOption.FINAL_STATE_VIA) == _FinalStateViaOption.AZURE_ASYNC_OPERATION_FINAL_STATE
92 and self._request.method == "POST"
93 ):
94 return None
95 return super(AzureAsyncOperationPolling, self).get_final_get_url(pipeline_response)
96
97
98class BodyContentPolling(LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]):
99 """Poll based on the body content.
100
101 Implement a ARM resource poller (using provisioning state).
102 """
103
104 _initial_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]
105 """Store the initial response."""
106
107 def can_poll(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> bool:
108 """Answer if this polling method could be used.
109
110 :param ~azure.core.pipeline.PipelineResponse pipeline_response: The pipeline response object.
111 :return: True if this polling method could be used.
112 :rtype: bool
113 """
114 response = pipeline_response.http_response
115 return response.request.method in ["PUT", "PATCH"]
116
117 def get_polling_url(self) -> str:
118 """Return the polling URL.
119 :return: The polling URL.
120 :rtype: str
121 """
122 return self._initial_response.http_response.request.url
123
124 def get_final_get_url(self, pipeline_response: Any) -> None:
125 """If a final GET is needed, returns the URL.
126
127 :param ~azure.core.pipeline.PipelineResponse pipeline_response: The pipeline response object.
128 :return: The URL to poll for the final GET.
129 :rtype: str
130 """
131 return None
132
133 def set_initial_status(
134 self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]
135 ) -> str:
136 """Process first response after initiating long running operation.
137
138 :param ~azure.core.pipeline.PipelineResponse pipeline_response: initial REST call response.
139 :return: Status string.
140 :rtype: str
141 """
142 self._initial_response = pipeline_response
143 response = pipeline_response.http_response
144
145 if response.status_code == 202:
146 return "InProgress"
147 if response.status_code == 201:
148 status = self._get_provisioning_state(response)
149 return status or "InProgress"
150 if response.status_code == 200:
151 status = self._get_provisioning_state(response)
152 return status or "Succeeded"
153 if response.status_code == 204:
154 return "Succeeded"
155
156 raise OperationFailed("Invalid status found")
157
158 @staticmethod
159 def _get_provisioning_state(response: AllHttpResponseTypeVar) -> Optional[str]:
160 """Attempt to get provisioning state from resource.
161
162 :param azure.core.pipeline.transport.HttpResponse response: latest REST call response.
163 :returns: Status if found, else 'None'.
164 :rtype: str or None
165 """
166 if _is_empty(response):
167 return None
168 body = _as_json(response)
169 return body.get("properties", {}).get("provisioningState")
170
171 def get_status(self, pipeline_response: PipelineResponse[HttpRequestTypeVar, AllHttpResponseTypeVar]) -> str:
172 """Process the latest status update retrieved from the same URL as
173 the previous request.
174
175 :param ~azure.core.pipeline.PipelineResponse pipeline_response: latest REST call response.
176 :return: Status string.
177 :rtype: str
178 :raises: BadResponse if status not 200 or 204.
179 """
180 response = pipeline_response.http_response
181 if _is_empty(response):
182 raise BadResponse("The response from long running operation does not contain a body.")
183
184 status = self._get_provisioning_state(response)
185 return status or "Succeeded"
186
187
188class ARMPolling(LROBasePolling):
189 def __init__(
190 self,
191 timeout: float = 30,
192 lro_algorithms: Optional[Sequence[LongRunningOperation[HttpRequestTypeVar, AllHttpResponseTypeVar]]] = None,
193 lro_options: Optional[Dict[str, Any]] = None,
194 path_format_arguments: Optional[Dict[str, str]] = None,
195 **operation_config: Any
196 ) -> None:
197 lro_algorithms = lro_algorithms or [
198 AzureAsyncOperationPolling(lro_options=lro_options),
199 LocationPolling(),
200 BodyContentPolling(),
201 StatusCheckPolling(),
202 ]
203 super(ARMPolling, self).__init__(
204 timeout=timeout,
205 lro_algorithms=lro_algorithms,
206 lro_options=lro_options,
207 path_format_arguments=path_format_arguments,
208 **operation_config
209 )
210
211
212__all__ = [
213 "AzureAsyncOperationPolling",
214 "BodyContentPolling",
215 "ARMPolling",
216]