Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/polling/_poller.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

122 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26import logging 

27import threading 

28import uuid 

29from typing import TypeVar, Generic, Any, Callable, Optional, Tuple, List 

30from azure.core.exceptions import AzureError 

31from azure.core.tracing.decorator import distributed_trace 

32from azure.core.tracing.common import with_current_context 

33from ._utils import _encode_continuation_token, _decode_continuation_token 

34 

35 

36PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True) 

37DeserializationCallbackType = Any 

38 

39_LOGGER = logging.getLogger(__name__) 

40 

41 

42class PollingMethod(Generic[PollingReturnType_co]): 

43 """ABC class for polling method.""" 

44 

45 def initialize( 

46 self, 

47 client: Any, 

48 initial_response: Any, 

49 deserialization_callback: DeserializationCallbackType, 

50 ) -> None: 

51 raise NotImplementedError("This method needs to be implemented") 

52 

53 def run(self) -> None: 

54 """Run the polling method. 

55 This method should be implemented to perform the actual polling logic. 

56 

57 :return: None 

58 :rtype: None 

59 """ 

60 raise NotImplementedError("This method needs to be implemented") 

61 

62 def status(self) -> str: 

63 """Return the current status of the polling operation. 

64 

65 :rtype: str 

66 :return: The current status of the polling operation. 

67 """ 

68 raise NotImplementedError("This method needs to be implemented") 

69 

70 def finished(self) -> bool: 

71 """Check if the polling operation is finished. 

72 

73 :rtype: bool 

74 :return: True if the polling operation is finished, False otherwise. 

75 """ 

76 raise NotImplementedError("This method needs to be implemented") 

77 

78 def resource(self) -> PollingReturnType_co: 

79 """Return the resource built by the polling operation. 

80 

81 :rtype: any 

82 :return: The resource built by the polling operation. 

83 """ 

84 raise NotImplementedError("This method needs to be implemented") 

85 

86 def get_continuation_token(self) -> str: 

87 """Return a continuation token that allows to restart the poller later. 

88 

89 :rtype: str 

90 :return: An opaque continuation token. 

91 """ 

92 raise TypeError("Polling method '{}' doesn't support get_continuation_token".format(self.__class__.__name__)) 

93 

94 @classmethod 

95 def from_continuation_token( 

96 cls, continuation_token: str, **kwargs: Any 

97 ) -> Tuple[Any, Any, DeserializationCallbackType]: 

98 """Recreate the poller from a continuation token. 

99 

100 :param continuation_token: The continuation token to recreate the poller from. 

101 :type continuation_token: str 

102 :rtype: Tuple[Any, Any, DeserializationCallbackType] 

103 :return: A tuple containing the client, initial response, and deserialization callback. 

104 """ 

105 raise TypeError("Polling method '{}' doesn't support from_continuation_token".format(cls.__name__)) 

106 

107 

108class _SansIONoPolling(Generic[PollingReturnType_co]): 

109 _deserialization_callback: Callable[[Any], PollingReturnType_co] 

110 """Deserialization callback passed during initialization""" 

111 

112 def __init__(self): 

113 self._initial_response = None 

114 

115 def initialize( 

116 self, 

117 _: Any, 

118 initial_response: Any, 

119 deserialization_callback: Callable[[Any], PollingReturnType_co], 

120 ) -> None: 

121 """Initialize the poller with the initial response and deserialization callback. 

122 

123 :param _: The client, not used in this polling method. 

124 :type _: Any 

125 :param initial_response: The initial response from the long-running operation. 

126 :type initial_response: Any 

127 :param deserialization_callback: A callback that takes a response and returns a deserialized object. 

128 :type deserialization_callback: Callable[[Any], PollingReturnType_co] 

129 :return: None 

130 :rtype: None 

131 """ 

132 self._initial_response = initial_response 

133 self._deserialization_callback = deserialization_callback 

134 

135 def status(self) -> str: 

136 """Return the current status. 

137 

138 :rtype: str 

139 :return: The current status 

140 """ 

141 return "succeeded" 

142 

143 def finished(self) -> bool: 

144 """Is this polling finished? 

145 

146 :rtype: bool 

147 :return: Whether this polling is finished 

148 """ 

149 return True 

150 

151 def resource(self) -> PollingReturnType_co: 

152 """Return the built resource. 

153 

154 :rtype: any 

155 :return: The built resource. 

156 """ 

157 return self._deserialization_callback(self._initial_response) 

158 

159 def get_continuation_token(self) -> str: 

160 """Return a continuation token that allows to restart the poller later. 

161 

162 :rtype: str 

163 :return: An opaque continuation token 

164 """ 

165 return _encode_continuation_token(self._initial_response) 

166 

167 @classmethod 

168 def from_continuation_token( 

169 cls, continuation_token: str, **kwargs: Any 

170 ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]: 

171 """Recreate the poller from a continuation token. 

172 

173 :param continuation_token: The continuation token to recreate the poller from. 

174 :type continuation_token: str 

175 :rtype: Tuple[Any, Any, Callable[[Any], PollingReturnType_co]] 

176 :return: A tuple containing the client, initial response, and deserialization callback. 

177 :raises ValueError: If 'deserialization_callback' is not provided in kwargs. 

178 """ 

179 try: 

180 deserialization_callback = kwargs["deserialization_callback"] 

181 except KeyError: 

182 raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None 

183 

184 initial_response = _decode_continuation_token(continuation_token) 

185 return None, initial_response, deserialization_callback 

186 

187 

188class NoPolling(_SansIONoPolling[PollingReturnType_co], PollingMethod[PollingReturnType_co]): 

189 """An empty poller that returns the deserialized initial response.""" 

190 

191 def run(self) -> None: 

192 """Empty run, no polling.""" 

193 

194 

195class LROPoller(Generic[PollingReturnType_co]): 

196 """Poller for long running operations. 

197 

198 :param client: A pipeline service client 

199 :type client: ~azure.core.PipelineClient 

200 :param initial_response: The initial call response 

201 :type initial_response: ~azure.core.pipeline.PipelineResponse 

202 :param deserialization_callback: A callback that takes a Response and return a deserialized object. 

203 If a subclass of Model is given, this passes "deserialize" as callback. 

204 :type deserialization_callback: callable or msrest.serialization.Model 

205 :param polling_method: The polling strategy to adopt 

206 :type polling_method: ~azure.core.polling.PollingMethod 

207 """ 

208 

209 def __init__( 

210 self, 

211 client: Any, 

212 initial_response: Any, 

213 deserialization_callback: Callable[[Any], PollingReturnType_co], 

214 polling_method: PollingMethod[PollingReturnType_co], 

215 ) -> None: 

216 self._callbacks: List[Callable] = [] 

217 self._polling_method = polling_method 

218 

219 # This implicit test avoids bringing in an explicit dependency on Model directly 

220 try: 

221 deserialization_callback = deserialization_callback.deserialize # type: ignore 

222 except AttributeError: 

223 pass 

224 

225 # Might raise a CloudError 

226 self._polling_method.initialize(client, initial_response, deserialization_callback) 

227 

228 # Prepare thread execution 

229 self._thread = None 

230 self._done = threading.Event() 

231 self._exception = None 

232 if self._polling_method.finished(): 

233 self._done.set() 

234 else: 

235 self._thread = threading.Thread( 

236 target=with_current_context(self._start), 

237 name="LROPoller({})".format(uuid.uuid4()), 

238 ) 

239 self._thread.daemon = True 

240 self._thread.start() 

241 

242 def _start(self): 

243 """Start the long running operation. 

244 On completion, runs any callbacks. 

245 """ 

246 try: 

247 self._polling_method.run() 

248 except AzureError as error: 

249 if not error.continuation_token: 

250 try: 

251 error.continuation_token = self.continuation_token() 

252 except Exception: # pylint: disable=broad-except 

253 _LOGGER.warning("Unable to retrieve continuation token.") 

254 error.continuation_token = None 

255 

256 self._exception = error 

257 except Exception as error: # pylint: disable=broad-except 

258 self._exception = error 

259 

260 finally: 

261 self._done.set() 

262 

263 callbacks, self._callbacks = self._callbacks, [] 

264 while callbacks: 

265 for call in callbacks: 

266 call(self._polling_method) 

267 callbacks, self._callbacks = self._callbacks, [] 

268 

269 def polling_method(self) -> PollingMethod[PollingReturnType_co]: 

270 """Return the polling method associated to this poller. 

271 

272 :return: The polling method 

273 :rtype: ~azure.core.polling.PollingMethod 

274 """ 

275 return self._polling_method 

276 

277 def continuation_token(self) -> str: 

278 """Return a continuation token that allows to restart the poller later. 

279 

280 :returns: An opaque continuation token 

281 :rtype: str 

282 """ 

283 return self._polling_method.get_continuation_token() 

284 

285 @classmethod 

286 def from_continuation_token( 

287 cls, polling_method: PollingMethod[PollingReturnType_co], continuation_token: str, **kwargs: Any 

288 ) -> "LROPoller[PollingReturnType_co]": 

289 """Create a poller from a continuation token. 

290 

291 :param polling_method: The polling strategy to adopt 

292 :type polling_method: ~azure.core.polling.PollingMethod 

293 :param continuation_token: An opaque continuation token 

294 :type continuation_token: str 

295 :return: An instance of LROPoller 

296 :rtype: ~azure.core.polling.LROPoller 

297 :raises ~azure.core.exceptions.HttpResponseError: If the continuation token is invalid. 

298 """ 

299 ( 

300 client, 

301 initial_response, 

302 deserialization_callback, 

303 ) = polling_method.from_continuation_token(continuation_token, **kwargs) 

304 return cls(client, initial_response, deserialization_callback, polling_method) 

305 

306 def status(self) -> str: 

307 """Returns the current status string. 

308 

309 :returns: The current status string 

310 :rtype: str 

311 """ 

312 return self._polling_method.status() 

313 

314 def result(self, timeout: Optional[float] = None) -> PollingReturnType_co: 

315 """Return the result of the long running operation, or 

316 the result available after the specified timeout. 

317 

318 :param float timeout: Period of time to wait before getting back control. 

319 :returns: The deserialized resource of the long running operation, if one is available. 

320 :rtype: any or None 

321 :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query. 

322 """ 

323 self.wait(timeout) 

324 return self._polling_method.resource() 

325 

326 @distributed_trace 

327 def wait(self, timeout: Optional[float] = None) -> None: 

328 """Wait on the long running operation for a specified length 

329 of time. You can check if this call as ended with timeout with the 

330 "done()" method. 

331 

332 :param float timeout: Period of time to wait for the long running 

333 operation to complete (in seconds). 

334 :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query. 

335 """ 

336 if self._thread is None: 

337 return 

338 self._thread.join(timeout=timeout) 

339 try: 

340 # Let's handle possible None in forgiveness here 

341 # https://github.com/python/mypy/issues/8165 

342 raise self._exception # type: ignore 

343 except TypeError: # Was None 

344 pass 

345 

346 def done(self) -> bool: 

347 """Check status of the long running operation. 

348 

349 :returns: 'True' if the process has completed, else 'False'. 

350 :rtype: bool 

351 """ 

352 return self._thread is None or not self._thread.is_alive() 

353 

354 def add_done_callback(self, func: Callable) -> None: 

355 """Add callback function to be run once the long running operation 

356 has completed - regardless of the status of the operation. 

357 

358 :param callable func: Callback function that takes at least one 

359 argument, a completed LongRunningOperation. 

360 """ 

361 # Still use "_done" and not "done", since CBs are executed inside the thread. 

362 if self._done.is_set(): 

363 func(self._polling_method) 

364 # Let's add them still, for consistency (if you wish to access to it for some reasons) 

365 self._callbacks.append(func) 

366 

367 def remove_done_callback(self, func: Callable) -> None: 

368 """Remove a callback from the long running operation. 

369 

370 :param callable func: The function to be removed from the callbacks. 

371 :raises ValueError: if the long running operation has already completed. 

372 """ 

373 if self._done is None or self._done.is_set(): 

374 raise ValueError("Process is complete.") 

375 self._callbacks = [c for c in self._callbacks if c != func]