1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26import logging
27from typing import Callable, Any, Tuple, Generic, TypeVar, Generator, Awaitable
28
29from ..exceptions import AzureError
30from ._poller import _SansIONoPolling
31
32
33PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True)
34DeserializationCallbackType = Any
35
36_LOGGER = logging.getLogger(__name__)
37
38
39class AsyncPollingMethod(Generic[PollingReturnType_co]):
40 """ABC class for polling method."""
41
42 def initialize(
43 self,
44 client: Any,
45 initial_response: Any,
46 deserialization_callback: DeserializationCallbackType,
47 ) -> None:
48 """Initialize the polling method with the client, initial response, and deserialization callback.
49
50 :param client: A pipeline service client.
51 :type client: ~azure.core.PipelineClient
52 :param initial_response: The initial call response.
53 :type initial_response: ~azure.core.pipeline.PipelineResponse
54 :param deserialization_callback: A callback that takes a Response and returns a deserialized object.
55 If a subclass of Model is given, this passes "deserialize" as callback.
56 :type deserialization_callback: callable or msrest.serialization.Model
57 :return: None
58 :rtype: None
59 """
60 raise NotImplementedError("This method needs to be implemented")
61
62 async def run(self) -> None:
63 """Run the polling method.
64 This method should be overridden to implement the polling logic.
65
66 :return: None
67 :rtype: None
68 """
69 raise NotImplementedError("This method needs to be implemented")
70
71 def status(self) -> str:
72 """Return the current status of the polling operation.
73
74 :returns: The current status string.
75 :rtype: str
76 """
77 raise NotImplementedError("This method needs to be implemented")
78
79 def finished(self) -> bool:
80 """Check if the polling operation is finished.
81
82 :returns: True if the polling operation is finished, False otherwise.
83 :rtype: bool
84 """
85 raise NotImplementedError("This method needs to be implemented")
86
87 def resource(self) -> PollingReturnType_co:
88 """Return the resource of the long running operation.
89
90 :returns: The deserialized resource of the long running operation, if one is available.
91 :rtype: any
92 """
93 raise NotImplementedError("This method needs to be implemented")
94
95 def get_continuation_token(self) -> str:
96 """Return a continuation token that allows to restart the poller later.
97
98 :returns: An opaque continuation token
99 :rtype: str
100 """
101 raise TypeError("Polling method '{}' doesn't support get_continuation_token".format(self.__class__.__name__))
102
103 @classmethod
104 def from_continuation_token(
105 cls, continuation_token: str, **kwargs: Any
106 ) -> Tuple[Any, Any, DeserializationCallbackType]:
107 """Create a poller from a continuation token.
108
109 :param continuation_token: An opaque continuation token
110 :type continuation_token: str
111 :return: A tuple containing the client, initial response, and deserialization callback.
112 :rtype: Tuple[Any, Any, DeserializationCallbackType]
113 """
114 raise TypeError("Polling method '{}' doesn't support from_continuation_token".format(cls.__name__))
115
116
117class AsyncNoPolling(_SansIONoPolling[PollingReturnType_co], AsyncPollingMethod[PollingReturnType_co]):
118 """An empty async poller that returns the deserialized initial response."""
119
120 async def run(self) -> None:
121 """Empty run, no polling.
122 Just override initial run to add "async"
123 """
124
125
126async def async_poller(
127 client: Any,
128 initial_response: Any,
129 deserialization_callback: Callable[[Any], PollingReturnType_co],
130 polling_method: AsyncPollingMethod[PollingReturnType_co],
131) -> PollingReturnType_co:
132 """Async Poller for long running operations.
133
134 .. deprecated:: 1.5.0
135 Use :class:`AsyncLROPoller` instead.
136
137 :param client: A pipeline service client.
138 :type client: ~azure.core.PipelineClient
139 :param initial_response: The initial call response
140 :type initial_response: ~azure.core.pipeline.PipelineResponse
141 :param deserialization_callback: A callback that takes a Response and return a deserialized object.
142 If a subclass of Model is given, this passes "deserialize" as callback.
143 :type deserialization_callback: callable or msrest.serialization.Model
144 :param polling_method: The polling strategy to adopt
145 :type polling_method: ~azure.core.polling.PollingMethod
146 :return: The final resource at the end of the polling.
147 :rtype: any or None
148 """
149 poller = AsyncLROPoller(client, initial_response, deserialization_callback, polling_method)
150 return await poller
151
152
153class AsyncLROPoller(Generic[PollingReturnType_co], Awaitable[PollingReturnType_co]):
154 """Async poller for long running operations.
155
156 :param client: A pipeline service client
157 :type client: ~azure.core.PipelineClient
158 :param initial_response: The initial call response
159 :type initial_response: ~azure.core.pipeline.PipelineResponse
160 :param deserialization_callback: A callback that takes a Response and return a deserialized object.
161 If a subclass of Model is given, this passes "deserialize" as callback.
162 :type deserialization_callback: callable or msrest.serialization.Model
163 :param polling_method: The polling strategy to adopt
164 :type polling_method: ~azure.core.polling.AsyncPollingMethod
165 """
166
167 def __init__(
168 self,
169 client: Any,
170 initial_response: Any,
171 deserialization_callback: Callable[[Any], PollingReturnType_co],
172 polling_method: AsyncPollingMethod[PollingReturnType_co],
173 ):
174 self._polling_method = polling_method
175 self._done = False
176
177 # This implicit test avoids bringing in an explicit dependency on Model directly
178 try:
179 deserialization_callback = deserialization_callback.deserialize # type: ignore
180 except AttributeError:
181 pass
182
183 self._polling_method.initialize(client, initial_response, deserialization_callback)
184
185 def polling_method(self) -> AsyncPollingMethod[PollingReturnType_co]:
186 """Return the polling method associated to this poller.
187
188 :return: The polling method associated to this poller.
189 :rtype: ~azure.core.polling.AsyncPollingMethod
190 """
191 return self._polling_method
192
193 def continuation_token(self) -> str:
194 """Return a continuation token that allows to restart the poller later.
195
196 :returns: An opaque continuation token
197 :rtype: str
198 """
199 return self._polling_method.get_continuation_token()
200
201 @classmethod
202 def from_continuation_token(
203 cls, polling_method: AsyncPollingMethod[PollingReturnType_co], continuation_token: str, **kwargs: Any
204 ) -> "AsyncLROPoller[PollingReturnType_co]":
205 """Create a poller from a continuation token.
206
207 :param polling_method: The polling strategy to adopt
208 :type polling_method: ~azure.core.polling.AsyncPollingMethod
209 :param continuation_token: An opaque continuation token
210 :type continuation_token: str
211 :return: An instance of AsyncLROPoller
212 :rtype: ~azure.core.polling.AsyncLROPoller
213 :raises ~azure.core.exceptions.HttpResponseError: If the continuation token is invalid.
214 """
215 (
216 client,
217 initial_response,
218 deserialization_callback,
219 ) = polling_method.from_continuation_token(continuation_token, **kwargs)
220 return cls(client, initial_response, deserialization_callback, polling_method)
221
222 def status(self) -> str:
223 """Returns the current status string.
224
225 :returns: The current status string
226 :rtype: str
227 """
228 return self._polling_method.status()
229
230 async def result(self) -> PollingReturnType_co:
231 """Return the result of the long running operation.
232
233 :returns: The deserialized resource of the long running operation, if one is available.
234 :rtype: any or None
235 :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query.
236 """
237 await self.wait()
238 return self._polling_method.resource()
239
240 def __await__(self) -> Generator[Any, None, PollingReturnType_co]:
241 return self.result().__await__()
242
243 async def wait(self) -> None:
244 """Wait on the long running operation.
245
246 :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query.
247 """
248 try:
249 await self._polling_method.run()
250 except AzureError as error:
251 if not error.continuation_token:
252 try:
253 error.continuation_token = self.continuation_token()
254 except Exception: # pylint: disable=broad-except
255 _LOGGER.warning("Unable to retrieve continuation token.")
256 error.continuation_token = None
257 raise
258 self._done = True
259
260 def done(self) -> bool:
261 """Check status of the long running operation.
262
263 :returns: 'True' if the process has completed, else 'False'.
264 :rtype: bool
265 """
266 return self._done