Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/polling/_poller.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

124 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26import base64 

27import logging 

28import threading 

29import uuid 

30from typing import TypeVar, Generic, Any, Callable, Optional, Tuple, List 

31from azure.core.exceptions import AzureError 

32from azure.core.tracing.decorator import distributed_trace 

33from azure.core.tracing.common import with_current_context 

34 

35 

36PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True) 

37DeserializationCallbackType = Any 

38 

39_LOGGER = logging.getLogger(__name__) 

40 

41 

42class PollingMethod(Generic[PollingReturnType_co]): 

43 """ABC class for polling method.""" 

44 

45 def initialize( 

46 self, 

47 client: Any, 

48 initial_response: Any, 

49 deserialization_callback: DeserializationCallbackType, 

50 ) -> None: 

51 raise NotImplementedError("This method needs to be implemented") 

52 

53 def run(self) -> None: 

54 """Run the polling method. 

55 This method should be implemented to perform the actual polling logic. 

56 

57 :return: None 

58 :rtype: None 

59 """ 

60 raise NotImplementedError("This method needs to be implemented") 

61 

62 def status(self) -> str: 

63 """Return the current status of the polling operation. 

64 

65 :rtype: str 

66 :return: The current status of the polling operation. 

67 """ 

68 raise NotImplementedError("This method needs to be implemented") 

69 

70 def finished(self) -> bool: 

71 """Check if the polling operation is finished. 

72 

73 :rtype: bool 

74 :return: True if the polling operation is finished, False otherwise. 

75 """ 

76 raise NotImplementedError("This method needs to be implemented") 

77 

78 def resource(self) -> PollingReturnType_co: 

79 """Return the resource built by the polling operation. 

80 

81 :rtype: any 

82 :return: The resource built by the polling operation. 

83 """ 

84 raise NotImplementedError("This method needs to be implemented") 

85 

86 def get_continuation_token(self) -> str: 

87 """Return a continuation token that allows to restart the poller later. 

88 

89 :rtype: str 

90 :return: An opaque continuation token. 

91 """ 

92 raise TypeError("Polling method '{}' doesn't support get_continuation_token".format(self.__class__.__name__)) 

93 

94 @classmethod 

95 def from_continuation_token( 

96 cls, continuation_token: str, **kwargs: Any 

97 ) -> Tuple[Any, Any, DeserializationCallbackType]: 

98 """Recreate the poller from a continuation token. 

99 

100 :param continuation_token: The continuation token to recreate the poller from. 

101 :type continuation_token: str 

102 :rtype: Tuple[Any, Any, DeserializationCallbackType] 

103 :return: A tuple containing the client, initial response, and deserialization callback. 

104 """ 

105 raise TypeError("Polling method '{}' doesn't support from_continuation_token".format(cls.__name__)) 

106 

107 

108class _SansIONoPolling(Generic[PollingReturnType_co]): 

109 _deserialization_callback: Callable[[Any], PollingReturnType_co] 

110 """Deserialization callback passed during initialization""" 

111 

112 def __init__(self): 

113 self._initial_response = None 

114 

115 def initialize( 

116 self, 

117 _: Any, 

118 initial_response: Any, 

119 deserialization_callback: Callable[[Any], PollingReturnType_co], 

120 ) -> None: 

121 """Initialize the poller with the initial response and deserialization callback. 

122 

123 :param _: The client, not used in this polling method. 

124 :type _: Any 

125 :param initial_response: The initial response from the long-running operation. 

126 :type initial_response: Any 

127 :param deserialization_callback: A callback that takes a response and returns a deserialized object. 

128 :type deserialization_callback: Callable[[Any], PollingReturnType_co] 

129 :return: None 

130 :rtype: None 

131 """ 

132 self._initial_response = initial_response 

133 self._deserialization_callback = deserialization_callback 

134 

135 def status(self) -> str: 

136 """Return the current status. 

137 

138 :rtype: str 

139 :return: The current status 

140 """ 

141 return "succeeded" 

142 

143 def finished(self) -> bool: 

144 """Is this polling finished? 

145 

146 :rtype: bool 

147 :return: Whether this polling is finished 

148 """ 

149 return True 

150 

151 def resource(self) -> PollingReturnType_co: 

152 """Return the built resource. 

153 

154 :rtype: any 

155 :return: The built resource. 

156 """ 

157 return self._deserialization_callback(self._initial_response) 

158 

159 def get_continuation_token(self) -> str: 

160 """Return a continuation token that allows to restart the poller later. 

161 

162 :rtype: str 

163 :return: An opaque continuation token 

164 """ 

165 import pickle 

166 

167 return base64.b64encode(pickle.dumps(self._initial_response)).decode("ascii") 

168 

169 @classmethod 

170 def from_continuation_token( 

171 cls, continuation_token: str, **kwargs: Any 

172 ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]: 

173 """Recreate the poller from a continuation token. 

174 

175 :param continuation_token: The continuation token to recreate the poller from. 

176 :type continuation_token: str 

177 :rtype: Tuple[Any, Any, Callable[[Any], PollingReturnType_co]] 

178 :return: A tuple containing the client, initial response, and deserialization callback. 

179 :raises ValueError: If 'deserialization_callback' is not provided in kwargs. 

180 """ 

181 try: 

182 deserialization_callback = kwargs["deserialization_callback"] 

183 except KeyError: 

184 raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None 

185 import pickle 

186 

187 initial_response = pickle.loads(base64.b64decode(continuation_token)) # nosec 

188 return None, initial_response, deserialization_callback 

189 

190 

191class NoPolling(_SansIONoPolling[PollingReturnType_co], PollingMethod[PollingReturnType_co]): 

192 """An empty poller that returns the deserialized initial response.""" 

193 

194 def run(self) -> None: 

195 """Empty run, no polling.""" 

196 

197 

198class LROPoller(Generic[PollingReturnType_co]): 

199 """Poller for long running operations. 

200 

201 :param client: A pipeline service client 

202 :type client: ~azure.core.PipelineClient 

203 :param initial_response: The initial call response 

204 :type initial_response: ~azure.core.pipeline.PipelineResponse 

205 :param deserialization_callback: A callback that takes a Response and return a deserialized object. 

206 If a subclass of Model is given, this passes "deserialize" as callback. 

207 :type deserialization_callback: callable or msrest.serialization.Model 

208 :param polling_method: The polling strategy to adopt 

209 :type polling_method: ~azure.core.polling.PollingMethod 

210 """ 

211 

212 def __init__( 

213 self, 

214 client: Any, 

215 initial_response: Any, 

216 deserialization_callback: Callable[[Any], PollingReturnType_co], 

217 polling_method: PollingMethod[PollingReturnType_co], 

218 ) -> None: 

219 self._callbacks: List[Callable] = [] 

220 self._polling_method = polling_method 

221 

222 # This implicit test avoids bringing in an explicit dependency on Model directly 

223 try: 

224 deserialization_callback = deserialization_callback.deserialize # type: ignore 

225 except AttributeError: 

226 pass 

227 

228 # Might raise a CloudError 

229 self._polling_method.initialize(client, initial_response, deserialization_callback) 

230 

231 # Prepare thread execution 

232 self._thread = None 

233 self._done = threading.Event() 

234 self._exception = None 

235 if self._polling_method.finished(): 

236 self._done.set() 

237 else: 

238 self._thread = threading.Thread( 

239 target=with_current_context(self._start), 

240 name="LROPoller({})".format(uuid.uuid4()), 

241 ) 

242 self._thread.daemon = True 

243 self._thread.start() 

244 

245 def _start(self): 

246 """Start the long running operation. 

247 On completion, runs any callbacks. 

248 """ 

249 try: 

250 self._polling_method.run() 

251 except AzureError as error: 

252 if not error.continuation_token: 

253 try: 

254 error.continuation_token = self.continuation_token() 

255 except Exception: # pylint: disable=broad-except 

256 _LOGGER.warning("Unable to retrieve continuation token.") 

257 error.continuation_token = None 

258 

259 self._exception = error 

260 except Exception as error: # pylint: disable=broad-except 

261 self._exception = error 

262 

263 finally: 

264 self._done.set() 

265 

266 callbacks, self._callbacks = self._callbacks, [] 

267 while callbacks: 

268 for call in callbacks: 

269 call(self._polling_method) 

270 callbacks, self._callbacks = self._callbacks, [] 

271 

272 def polling_method(self) -> PollingMethod[PollingReturnType_co]: 

273 """Return the polling method associated to this poller. 

274 

275 :return: The polling method 

276 :rtype: ~azure.core.polling.PollingMethod 

277 """ 

278 return self._polling_method 

279 

280 def continuation_token(self) -> str: 

281 """Return a continuation token that allows to restart the poller later. 

282 

283 :returns: An opaque continuation token 

284 :rtype: str 

285 """ 

286 return self._polling_method.get_continuation_token() 

287 

288 @classmethod 

289 def from_continuation_token( 

290 cls, polling_method: PollingMethod[PollingReturnType_co], continuation_token: str, **kwargs: Any 

291 ) -> "LROPoller[PollingReturnType_co]": 

292 """Create a poller from a continuation token. 

293 

294 :param polling_method: The polling strategy to adopt 

295 :type polling_method: ~azure.core.polling.PollingMethod 

296 :param continuation_token: An opaque continuation token 

297 :type continuation_token: str 

298 :return: An instance of LROPoller 

299 :rtype: ~azure.core.polling.LROPoller 

300 :raises ~azure.core.exceptions.HttpResponseError: If the continuation token is invalid. 

301 """ 

302 ( 

303 client, 

304 initial_response, 

305 deserialization_callback, 

306 ) = polling_method.from_continuation_token(continuation_token, **kwargs) 

307 return cls(client, initial_response, deserialization_callback, polling_method) 

308 

309 def status(self) -> str: 

310 """Returns the current status string. 

311 

312 :returns: The current status string 

313 :rtype: str 

314 """ 

315 return self._polling_method.status() 

316 

317 def result(self, timeout: Optional[float] = None) -> PollingReturnType_co: 

318 """Return the result of the long running operation, or 

319 the result available after the specified timeout. 

320 

321 :param float timeout: Period of time to wait before getting back control. 

322 :returns: The deserialized resource of the long running operation, if one is available. 

323 :rtype: any or None 

324 :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query. 

325 """ 

326 self.wait(timeout) 

327 return self._polling_method.resource() 

328 

329 @distributed_trace 

330 def wait(self, timeout: Optional[float] = None) -> None: 

331 """Wait on the long running operation for a specified length 

332 of time. You can check if this call as ended with timeout with the 

333 "done()" method. 

334 

335 :param float timeout: Period of time to wait for the long running 

336 operation to complete (in seconds). 

337 :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query. 

338 """ 

339 if self._thread is None: 

340 return 

341 self._thread.join(timeout=timeout) 

342 try: 

343 # Let's handle possible None in forgiveness here 

344 # https://github.com/python/mypy/issues/8165 

345 raise self._exception # type: ignore 

346 except TypeError: # Was None 

347 pass 

348 

349 def done(self) -> bool: 

350 """Check status of the long running operation. 

351 

352 :returns: 'True' if the process has completed, else 'False'. 

353 :rtype: bool 

354 """ 

355 return self._thread is None or not self._thread.is_alive() 

356 

357 def add_done_callback(self, func: Callable) -> None: 

358 """Add callback function to be run once the long running operation 

359 has completed - regardless of the status of the operation. 

360 

361 :param callable func: Callback function that takes at least one 

362 argument, a completed LongRunningOperation. 

363 """ 

364 # Still use "_done" and not "done", since CBs are executed inside the thread. 

365 if self._done.is_set(): 

366 func(self._polling_method) 

367 # Let's add them still, for consistency (if you wish to access to it for some reasons) 

368 self._callbacks.append(func) 

369 

370 def remove_done_callback(self, func: Callable) -> None: 

371 """Remove a callback from the long running operation. 

372 

373 :param callable func: The function to be removed from the callbacks. 

374 :raises ValueError: if the long running operation has already completed. 

375 """ 

376 if self._done is None or self._done.is_set(): 

377 raise ValueError("Process is complete.") 

378 self._callbacks = [c for c in self._callbacks if c != func]