Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/polling/_poller.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

122 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26import base64 

27import logging 

28import threading 

29import uuid 

30from typing import TypeVar, Generic, Any, Callable, Optional, Tuple, List 

31from azure.core.exceptions import AzureError 

32from azure.core.tracing.decorator import distributed_trace 

33from azure.core.tracing.common import with_current_context 

34 

35 

36PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True) 

37DeserializationCallbackType = Any 

38 

39_LOGGER = logging.getLogger(__name__) 

40 

41 

42class PollingMethod(Generic[PollingReturnType_co]): 

43 """ABC class for polling method.""" 

44 

45 def initialize( 

46 self, client: Any, initial_response: Any, deserialization_callback: DeserializationCallbackType 

47 ) -> None: 

48 raise NotImplementedError("This method needs to be implemented") 

49 

50 def run(self) -> None: 

51 raise NotImplementedError("This method needs to be implemented") 

52 

53 def status(self) -> str: 

54 raise NotImplementedError("This method needs to be implemented") 

55 

56 def finished(self) -> bool: 

57 raise NotImplementedError("This method needs to be implemented") 

58 

59 def resource(self) -> PollingReturnType_co: 

60 raise NotImplementedError("This method needs to be implemented") 

61 

62 def get_continuation_token(self) -> str: 

63 raise TypeError("Polling method '{}' doesn't support get_continuation_token".format(self.__class__.__name__)) 

64 

65 @classmethod 

66 def from_continuation_token( 

67 cls, continuation_token: str, **kwargs: Any 

68 ) -> Tuple[Any, Any, DeserializationCallbackType]: 

69 raise TypeError("Polling method '{}' doesn't support from_continuation_token".format(cls.__name__)) 

70 

71 

72class _SansIONoPolling(Generic[PollingReturnType_co]): 

73 _deserialization_callback: Callable[[Any], PollingReturnType_co] 

74 """Deserialization callback passed during initialization""" 

75 

76 def __init__(self): 

77 self._initial_response = None 

78 

79 def initialize( 

80 self, 

81 _: Any, 

82 initial_response: Any, 

83 deserialization_callback: Callable[[Any], PollingReturnType_co], 

84 ) -> None: 

85 self._initial_response = initial_response 

86 self._deserialization_callback = deserialization_callback 

87 

88 def status(self) -> str: 

89 """Return the current status. 

90 

91 :rtype: str 

92 :return: The current status 

93 """ 

94 return "succeeded" 

95 

96 def finished(self) -> bool: 

97 """Is this polling finished? 

98 

99 :rtype: bool 

100 :return: Whether this polling is finished 

101 """ 

102 return True 

103 

104 def resource(self) -> PollingReturnType_co: 

105 return self._deserialization_callback(self._initial_response) 

106 

107 def get_continuation_token(self) -> str: 

108 import pickle 

109 

110 return base64.b64encode(pickle.dumps(self._initial_response)).decode("ascii") 

111 

112 @classmethod 

113 def from_continuation_token( 

114 cls, continuation_token: str, **kwargs: Any 

115 ) -> Tuple[Any, Any, Callable[[Any], PollingReturnType_co]]: 

116 try: 

117 deserialization_callback = kwargs["deserialization_callback"] 

118 except KeyError: 

119 raise ValueError("Need kwarg 'deserialization_callback' to be recreated from continuation_token") from None 

120 import pickle 

121 

122 initial_response = pickle.loads(base64.b64decode(continuation_token)) # nosec 

123 return None, initial_response, deserialization_callback 

124 

125 

126class NoPolling(_SansIONoPolling[PollingReturnType_co], PollingMethod[PollingReturnType_co]): 

127 """An empty poller that returns the deserialized initial response.""" 

128 

129 def run(self) -> None: 

130 """Empty run, no polling.""" 

131 

132 

133class LROPoller(Generic[PollingReturnType_co]): 

134 """Poller for long running operations. 

135 

136 :param client: A pipeline service client 

137 :type client: ~azure.core.PipelineClient 

138 :param initial_response: The initial call response 

139 :type initial_response: ~azure.core.pipeline.PipelineResponse 

140 :param deserialization_callback: A callback that takes a Response and return a deserialized object. 

141 If a subclass of Model is given, this passes "deserialize" as callback. 

142 :type deserialization_callback: callable or msrest.serialization.Model 

143 :param polling_method: The polling strategy to adopt 

144 :type polling_method: ~azure.core.polling.PollingMethod 

145 """ 

146 

147 def __init__( 

148 self, 

149 client: Any, 

150 initial_response: Any, 

151 deserialization_callback: Callable[[Any], PollingReturnType_co], 

152 polling_method: PollingMethod[PollingReturnType_co], 

153 ) -> None: 

154 self._callbacks: List[Callable] = [] 

155 self._polling_method = polling_method 

156 

157 # This implicit test avoids bringing in an explicit dependency on Model directly 

158 try: 

159 deserialization_callback = deserialization_callback.deserialize # type: ignore 

160 except AttributeError: 

161 pass 

162 

163 # Might raise a CloudError 

164 self._polling_method.initialize(client, initial_response, deserialization_callback) 

165 

166 # Prepare thread execution 

167 self._thread = None 

168 self._done = threading.Event() 

169 self._exception = None 

170 if self._polling_method.finished(): 

171 self._done.set() 

172 else: 

173 self._thread = threading.Thread( 

174 target=with_current_context(self._start), 

175 name="LROPoller({})".format(uuid.uuid4()), 

176 ) 

177 self._thread.daemon = True 

178 self._thread.start() 

179 

180 def _start(self): 

181 """Start the long running operation. 

182 On completion, runs any callbacks. 

183 """ 

184 try: 

185 self._polling_method.run() 

186 except AzureError as error: 

187 if not error.continuation_token: 

188 try: 

189 error.continuation_token = self.continuation_token() 

190 except Exception as err: # pylint: disable=broad-except 

191 _LOGGER.warning("Unable to retrieve continuation token: %s", err) 

192 error.continuation_token = None 

193 

194 self._exception = error 

195 except Exception as error: # pylint: disable=broad-except 

196 self._exception = error 

197 

198 finally: 

199 self._done.set() 

200 

201 callbacks, self._callbacks = self._callbacks, [] 

202 while callbacks: 

203 for call in callbacks: 

204 call(self._polling_method) 

205 callbacks, self._callbacks = self._callbacks, [] 

206 

207 def polling_method(self) -> PollingMethod[PollingReturnType_co]: 

208 """Return the polling method associated to this poller. 

209 

210 :return: The polling method 

211 :rtype: ~azure.core.polling.PollingMethod 

212 """ 

213 return self._polling_method 

214 

215 def continuation_token(self) -> str: 

216 """Return a continuation token that allows to restart the poller later. 

217 

218 :returns: An opaque continuation token 

219 :rtype: str 

220 """ 

221 return self._polling_method.get_continuation_token() 

222 

223 @classmethod 

224 def from_continuation_token( 

225 cls, polling_method: PollingMethod[PollingReturnType_co], continuation_token: str, **kwargs: Any 

226 ) -> "LROPoller[PollingReturnType_co]": 

227 ( 

228 client, 

229 initial_response, 

230 deserialization_callback, 

231 ) = polling_method.from_continuation_token(continuation_token, **kwargs) 

232 return cls(client, initial_response, deserialization_callback, polling_method) 

233 

234 def status(self) -> str: 

235 """Returns the current status string. 

236 

237 :returns: The current status string 

238 :rtype: str 

239 """ 

240 return self._polling_method.status() 

241 

242 def result(self, timeout: Optional[float] = None) -> PollingReturnType_co: 

243 """Return the result of the long running operation, or 

244 the result available after the specified timeout. 

245 

246 :param float timeout: Period of time to wait before getting back control. 

247 :returns: The deserialized resource of the long running operation, if one is available. 

248 :rtype: any or None 

249 :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query. 

250 """ 

251 self.wait(timeout) 

252 return self._polling_method.resource() 

253 

254 @distributed_trace 

255 def wait(self, timeout: Optional[float] = None) -> None: 

256 """Wait on the long running operation for a specified length 

257 of time. You can check if this call as ended with timeout with the 

258 "done()" method. 

259 

260 :param float timeout: Period of time to wait for the long running 

261 operation to complete (in seconds). 

262 :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query. 

263 """ 

264 if self._thread is None: 

265 return 

266 self._thread.join(timeout=timeout) 

267 try: 

268 # Let's handle possible None in forgiveness here 

269 # https://github.com/python/mypy/issues/8165 

270 raise self._exception # type: ignore 

271 except TypeError: # Was None 

272 pass 

273 

274 def done(self) -> bool: 

275 """Check status of the long running operation. 

276 

277 :returns: 'True' if the process has completed, else 'False'. 

278 :rtype: bool 

279 """ 

280 return self._thread is None or not self._thread.is_alive() 

281 

282 def add_done_callback(self, func: Callable) -> None: 

283 """Add callback function to be run once the long running operation 

284 has completed - regardless of the status of the operation. 

285 

286 :param callable func: Callback function that takes at least one 

287 argument, a completed LongRunningOperation. 

288 """ 

289 # Still use "_done" and not "done", since CBs are executed inside the thread. 

290 if self._done.is_set(): 

291 func(self._polling_method) 

292 # Let's add them still, for consistency (if you wish to access to it for some reasons) 

293 self._callbacks.append(func) 

294 

295 def remove_done_callback(self, func: Callable) -> None: 

296 """Remove a callback from the long running operation. 

297 

298 :param callable func: The function to be removed from the callbacks. 

299 :raises ValueError: if the long running operation has already completed. 

300 """ 

301 if self._done is None or self._done.is_set(): 

302 raise ValueError("Process is complete.") 

303 self._callbacks = [c for c in self._callbacks if c != func]