1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26from enum import Enum
27from typing import TYPE_CHECKING, Optional, Union
28
29from azure.core import CaseInsensitiveEnumMeta
30from azure.core.polling.base_polling import (
31 LongRunningOperation,
32 LROBasePolling,
33 OperationFailed,
34 BadResponse,
35 OperationResourcePolling,
36 LocationPolling,
37 StatusCheckPolling,
38 _as_json,
39 _is_empty,
40)
41
42if TYPE_CHECKING:
43 from azure.core.pipeline import PipelineResponse
44 from azure.core.pipeline.transport import (
45 HttpResponse,
46 AsyncHttpResponse,
47 HttpRequest,
48 )
49
50 ResponseType = Union[HttpResponse, AsyncHttpResponse]
51 PipelineResponseType = PipelineResponse[HttpRequest, ResponseType]
52
53
54class _LroOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
55 """Known LRO options from Swagger."""
56
57 FINAL_STATE_VIA = "final-state-via"
58
59
60class _FinalStateViaOption(str, Enum, metaclass=CaseInsensitiveEnumMeta):
61 """Possible final-state-via options."""
62
63 AZURE_ASYNC_OPERATION_FINAL_STATE = "azure-async-operation"
64 LOCATION_FINAL_STATE = "location"
65
66
67class AzureAsyncOperationPolling(OperationResourcePolling):
68 """Implements a operation resource polling, typically from Azure-AsyncOperation."""
69
70 def __init__(self, lro_options=None):
71 super(AzureAsyncOperationPolling, self).__init__(operation_location_header="azure-asyncoperation")
72
73 self._lro_options = lro_options or {}
74
75 def get_final_get_url(self, pipeline_response):
76 # type: (PipelineResponseType) -> Optional[str]
77 """If a final GET is needed, returns the URL.
78
79 :rtype: str
80 """
81 if (
82 self._lro_options.get(_LroOption.FINAL_STATE_VIA) == _FinalStateViaOption.AZURE_ASYNC_OPERATION_FINAL_STATE
83 and self._request.method == "POST"
84 ):
85 return None
86 return super(AzureAsyncOperationPolling, self).get_final_get_url(pipeline_response)
87
88
89class BodyContentPolling(LongRunningOperation):
90 """Poll based on the body content.
91
92 Implement a ARM resource poller (using provisioning state).
93 """
94
95 def __init__(self):
96 self._initial_response = None
97
98 def can_poll(self, pipeline_response):
99 # type: (PipelineResponseType) -> bool
100 """Answer if this polling method could be used."""
101 response = pipeline_response.http_response
102 return response.request.method in ["PUT", "PATCH"]
103
104 def get_polling_url(self):
105 # type: () -> str
106 """Return the polling URL."""
107 return self._initial_response.http_response.request.url
108
109 def get_final_get_url(self, pipeline_response):
110 # type: (PipelineResponseType) -> Optional[str]
111 """If a final GET is needed, returns the URL.
112
113 :rtype: str
114 """
115 return None
116
117 def set_initial_status(self, pipeline_response):
118 # type: (PipelineResponseType) -> str
119 """Process first response after initiating long running operation.
120
121 :param azure.core.pipeline.PipelineResponse response: initial REST call response.
122 """
123 self._initial_response = pipeline_response
124 response = pipeline_response.http_response
125
126 if response.status_code == 202:
127 return "InProgress"
128 if response.status_code == 201:
129 status = self._get_provisioning_state(response)
130 return status or "InProgress"
131 if response.status_code == 200:
132 status = self._get_provisioning_state(response)
133 return status or "Succeeded"
134 if response.status_code == 204:
135 return "Succeeded"
136
137 raise OperationFailed("Invalid status found")
138
139 @staticmethod
140 def _get_provisioning_state(response):
141 # type: (ResponseType) -> Optional[str]
142 """Attempt to get provisioning state from resource.
143
144 :param azure.core.pipeline.transport.HttpResponse response: latest REST call response.
145 :returns: Status if found, else 'None'.
146 """
147 if _is_empty(response):
148 return None
149 body = _as_json(response)
150 return body.get("properties", {}).get("provisioningState")
151
152 def get_status(self, pipeline_response):
153 # type: (PipelineResponseType) -> str
154 """Process the latest status update retrieved from the same URL as
155 the previous request.
156
157 :param azure.core.pipeline.PipelineResponse response: latest REST call response.
158 :raises: BadResponse if status not 200 or 204.
159 """
160 response = pipeline_response.http_response
161 if _is_empty(response):
162 raise BadResponse("The response from long running operation does not contain a body.")
163
164 status = self._get_provisioning_state(response)
165 return status or "Succeeded"
166
167
168class ARMPolling(LROBasePolling):
169 def __init__(
170 self, timeout=30, lro_algorithms=None, lro_options=None, path_format_arguments=None, **operation_config
171 ):
172 lro_algorithms = lro_algorithms or [
173 AzureAsyncOperationPolling(lro_options=lro_options),
174 LocationPolling(),
175 BodyContentPolling(),
176 StatusCheckPolling(),
177 ]
178 super(ARMPolling, self).__init__(
179 timeout=timeout,
180 lro_algorithms=lro_algorithms,
181 lro_options=lro_options,
182 path_format_arguments=path_format_arguments,
183 **operation_config
184 )
185
186
187__all__ = [
188 "AzureAsyncOperationPolling",
189 "BodyContentPolling",
190 "ARMPolling",
191]