Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/polling/_async_poller.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

67 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26import logging 

27from typing import Callable, Any, Tuple, Generic, TypeVar, Generator, Awaitable 

28 

29from ..exceptions import AzureError 

30from ._poller import _SansIONoPolling 

31 

32 

33PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True) 

34DeserializationCallbackType = Any 

35 

36_LOGGER = logging.getLogger(__name__) 

37 

38 

39class AsyncPollingMethod(Generic[PollingReturnType_co]): 

40 """ABC class for polling method.""" 

41 

42 def initialize( 

43 self, 

44 client: Any, 

45 initial_response: Any, 

46 deserialization_callback: DeserializationCallbackType, 

47 ) -> None: 

48 """Initialize the polling method with the client, initial response, and deserialization callback. 

49 

50 :param client: A pipeline service client. 

51 :type client: ~azure.core.PipelineClient 

52 :param initial_response: The initial call response. 

53 :type initial_response: ~azure.core.pipeline.PipelineResponse 

54 :param deserialization_callback: A callback that takes a Response and returns a deserialized object. 

55 If a subclass of Model is given, this passes "deserialize" as callback. 

56 :type deserialization_callback: callable or msrest.serialization.Model 

57 :return: None 

58 :rtype: None 

59 """ 

60 raise NotImplementedError("This method needs to be implemented") 

61 

62 async def run(self) -> None: 

63 """Run the polling method. 

64 This method should be overridden to implement the polling logic. 

65 

66 :return: None 

67 :rtype: None 

68 """ 

69 raise NotImplementedError("This method needs to be implemented") 

70 

71 def status(self) -> str: 

72 """Return the current status of the polling operation. 

73 

74 :returns: The current status string. 

75 :rtype: str 

76 """ 

77 raise NotImplementedError("This method needs to be implemented") 

78 

79 def finished(self) -> bool: 

80 """Check if the polling operation is finished. 

81 

82 :returns: True if the polling operation is finished, False otherwise. 

83 :rtype: bool 

84 """ 

85 raise NotImplementedError("This method needs to be implemented") 

86 

87 def resource(self) -> PollingReturnType_co: 

88 """Return the resource of the long running operation. 

89 

90 :returns: The deserialized resource of the long running operation, if one is available. 

91 :rtype: any 

92 """ 

93 raise NotImplementedError("This method needs to be implemented") 

94 

95 def get_continuation_token(self) -> str: 

96 """Return a continuation token that allows to restart the poller later. 

97 

98 :returns: An opaque continuation token 

99 :rtype: str 

100 """ 

101 raise TypeError("Polling method '{}' doesn't support get_continuation_token".format(self.__class__.__name__)) 

102 

103 @classmethod 

104 def from_continuation_token( 

105 cls, continuation_token: str, **kwargs: Any 

106 ) -> Tuple[Any, Any, DeserializationCallbackType]: 

107 """Create a poller from a continuation token. 

108 

109 :param continuation_token: An opaque continuation token 

110 :type continuation_token: str 

111 :return: A tuple containing the client, initial response, and deserialization callback. 

112 :rtype: Tuple[Any, Any, DeserializationCallbackType] 

113 """ 

114 raise TypeError("Polling method '{}' doesn't support from_continuation_token".format(cls.__name__)) 

115 

116 

117class AsyncNoPolling(_SansIONoPolling[PollingReturnType_co], AsyncPollingMethod[PollingReturnType_co]): 

118 """An empty async poller that returns the deserialized initial response.""" 

119 

120 async def run(self) -> None: 

121 """Empty run, no polling. 

122 Just override initial run to add "async" 

123 """ 

124 

125 

126async def async_poller( 

127 client: Any, 

128 initial_response: Any, 

129 deserialization_callback: Callable[[Any], PollingReturnType_co], 

130 polling_method: AsyncPollingMethod[PollingReturnType_co], 

131) -> PollingReturnType_co: 

132 """Async Poller for long running operations. 

133 

134 .. deprecated:: 1.5.0 

135 Use :class:`AsyncLROPoller` instead. 

136 

137 :param client: A pipeline service client. 

138 :type client: ~azure.core.PipelineClient 

139 :param initial_response: The initial call response 

140 :type initial_response: ~azure.core.pipeline.PipelineResponse 

141 :param deserialization_callback: A callback that takes a Response and return a deserialized object. 

142 If a subclass of Model is given, this passes "deserialize" as callback. 

143 :type deserialization_callback: callable or msrest.serialization.Model 

144 :param polling_method: The polling strategy to adopt 

145 :type polling_method: ~azure.core.polling.PollingMethod 

146 :return: The final resource at the end of the polling. 

147 :rtype: any or None 

148 """ 

149 poller = AsyncLROPoller(client, initial_response, deserialization_callback, polling_method) 

150 return await poller 

151 

152 

153class AsyncLROPoller(Generic[PollingReturnType_co], Awaitable[PollingReturnType_co]): 

154 """Async poller for long running operations. 

155 

156 :param client: A pipeline service client 

157 :type client: ~azure.core.PipelineClient 

158 :param initial_response: The initial call response 

159 :type initial_response: ~azure.core.pipeline.PipelineResponse 

160 :param deserialization_callback: A callback that takes a Response and return a deserialized object. 

161 If a subclass of Model is given, this passes "deserialize" as callback. 

162 :type deserialization_callback: callable or msrest.serialization.Model 

163 :param polling_method: The polling strategy to adopt 

164 :type polling_method: ~azure.core.polling.AsyncPollingMethod 

165 """ 

166 

167 def __init__( 

168 self, 

169 client: Any, 

170 initial_response: Any, 

171 deserialization_callback: Callable[[Any], PollingReturnType_co], 

172 polling_method: AsyncPollingMethod[PollingReturnType_co], 

173 ): 

174 self._polling_method = polling_method 

175 self._done = False 

176 

177 # This implicit test avoids bringing in an explicit dependency on Model directly 

178 try: 

179 deserialization_callback = deserialization_callback.deserialize # type: ignore 

180 except AttributeError: 

181 pass 

182 

183 self._polling_method.initialize(client, initial_response, deserialization_callback) 

184 

185 def polling_method(self) -> AsyncPollingMethod[PollingReturnType_co]: 

186 """Return the polling method associated to this poller. 

187 

188 :return: The polling method associated to this poller. 

189 :rtype: ~azure.core.polling.AsyncPollingMethod 

190 """ 

191 return self._polling_method 

192 

193 def continuation_token(self) -> str: 

194 """Return a continuation token that allows to restart the poller later. 

195 

196 :returns: An opaque continuation token 

197 :rtype: str 

198 """ 

199 return self._polling_method.get_continuation_token() 

200 

201 @classmethod 

202 def from_continuation_token( 

203 cls, polling_method: AsyncPollingMethod[PollingReturnType_co], continuation_token: str, **kwargs: Any 

204 ) -> "AsyncLROPoller[PollingReturnType_co]": 

205 """Create a poller from a continuation token. 

206 

207 :param polling_method: The polling strategy to adopt 

208 :type polling_method: ~azure.core.polling.AsyncPollingMethod 

209 :param continuation_token: An opaque continuation token 

210 :type continuation_token: str 

211 :return: An instance of AsyncLROPoller 

212 :rtype: ~azure.core.polling.AsyncLROPoller 

213 :raises ~azure.core.exceptions.HttpResponseError: If the continuation token is invalid. 

214 """ 

215 ( 

216 client, 

217 initial_response, 

218 deserialization_callback, 

219 ) = polling_method.from_continuation_token(continuation_token, **kwargs) 

220 return cls(client, initial_response, deserialization_callback, polling_method) 

221 

222 def status(self) -> str: 

223 """Returns the current status string. 

224 

225 :returns: The current status string 

226 :rtype: str 

227 """ 

228 return self._polling_method.status() 

229 

230 async def result(self) -> PollingReturnType_co: 

231 """Return the result of the long running operation. 

232 

233 :returns: The deserialized resource of the long running operation, if one is available. 

234 :rtype: any or None 

235 :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query. 

236 """ 

237 await self.wait() 

238 return self._polling_method.resource() 

239 

240 def __await__(self) -> Generator[Any, None, PollingReturnType_co]: 

241 return self.result().__await__() 

242 

243 async def wait(self) -> None: 

244 """Wait on the long running operation. 

245 

246 :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query. 

247 """ 

248 try: 

249 await self._polling_method.run() 

250 except AzureError as error: 

251 if not error.continuation_token: 

252 try: 

253 error.continuation_token = self.continuation_token() 

254 except Exception: # pylint: disable=broad-except 

255 _LOGGER.warning("Unable to retrieve continuation token.") 

256 error.continuation_token = None 

257 raise 

258 self._done = True 

259 

260 def done(self) -> bool: 

261 """Check status of the long running operation. 

262 

263 :returns: 'True' if the process has completed, else 'False'. 

264 :rtype: bool 

265 """ 

266 return self._done