Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/polling/_async_poller.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

66 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26import logging 

27from typing import Callable, Any, Tuple, Generic, TypeVar, Generator, Awaitable 

28 

29from ..exceptions import AzureError 

30from ._poller import _SansIONoPolling 

31 

32 

33PollingReturnType_co = TypeVar("PollingReturnType_co", covariant=True) 

34DeserializationCallbackType = Any 

35 

36_LOGGER = logging.getLogger(__name__) 

37 

38 

39class AsyncPollingMethod(Generic[PollingReturnType_co]): 

40 """ABC class for polling method.""" 

41 

42 def initialize( 

43 self, client: Any, initial_response: Any, deserialization_callback: DeserializationCallbackType 

44 ) -> None: 

45 raise NotImplementedError("This method needs to be implemented") 

46 

47 async def run(self) -> None: 

48 raise NotImplementedError("This method needs to be implemented") 

49 

50 def status(self) -> str: 

51 raise NotImplementedError("This method needs to be implemented") 

52 

53 def finished(self) -> bool: 

54 raise NotImplementedError("This method needs to be implemented") 

55 

56 def resource(self) -> PollingReturnType_co: 

57 raise NotImplementedError("This method needs to be implemented") 

58 

59 def get_continuation_token(self) -> str: 

60 raise TypeError("Polling method '{}' doesn't support get_continuation_token".format(self.__class__.__name__)) 

61 

62 @classmethod 

63 def from_continuation_token( 

64 cls, continuation_token: str, **kwargs: Any 

65 ) -> Tuple[Any, Any, DeserializationCallbackType]: 

66 raise TypeError("Polling method '{}' doesn't support from_continuation_token".format(cls.__name__)) 

67 

68 

69class AsyncNoPolling(_SansIONoPolling[PollingReturnType_co], AsyncPollingMethod[PollingReturnType_co]): 

70 """An empty async poller that returns the deserialized initial response.""" 

71 

72 async def run(self) -> None: # pylint:disable=invalid-overridden-method 

73 """Empty run, no polling. 

74 Just override initial run to add "async" 

75 """ 

76 

77 

78async def async_poller( 

79 client: Any, 

80 initial_response: Any, 

81 deserialization_callback: Callable[[Any], PollingReturnType_co], 

82 polling_method: AsyncPollingMethod[PollingReturnType_co], 

83) -> PollingReturnType_co: 

84 """Async Poller for long running operations. 

85 

86 .. deprecated:: 1.5.0 

87 Use :class:`AsyncLROPoller` instead. 

88 

89 :param client: A pipeline service client. 

90 :type client: ~azure.core.PipelineClient 

91 :param initial_response: The initial call response 

92 :type initial_response: ~azure.core.pipeline.PipelineResponse 

93 :param deserialization_callback: A callback that takes a Response and return a deserialized object. 

94 If a subclass of Model is given, this passes "deserialize" as callback. 

95 :type deserialization_callback: callable or msrest.serialization.Model 

96 :param polling_method: The polling strategy to adopt 

97 :type polling_method: ~azure.core.polling.PollingMethod 

98 :return: The final resource at the end of the polling. 

99 :rtype: any or None 

100 """ 

101 poller = AsyncLROPoller(client, initial_response, deserialization_callback, polling_method) 

102 return await poller 

103 

104 

105class AsyncLROPoller(Generic[PollingReturnType_co], Awaitable[PollingReturnType_co]): 

106 """Async poller for long running operations. 

107 

108 :param client: A pipeline service client 

109 :type client: ~azure.core.PipelineClient 

110 :param initial_response: The initial call response 

111 :type initial_response: ~azure.core.pipeline.PipelineResponse 

112 :param deserialization_callback: A callback that takes a Response and return a deserialized object. 

113 If a subclass of Model is given, this passes "deserialize" as callback. 

114 :type deserialization_callback: callable or msrest.serialization.Model 

115 :param polling_method: The polling strategy to adopt 

116 :type polling_method: ~azure.core.polling.AsyncPollingMethod 

117 """ 

118 

119 def __init__( 

120 self, 

121 client: Any, 

122 initial_response: Any, 

123 deserialization_callback: Callable[[Any], PollingReturnType_co], 

124 polling_method: AsyncPollingMethod[PollingReturnType_co], 

125 ): 

126 self._polling_method = polling_method 

127 self._done = False 

128 

129 # This implicit test avoids bringing in an explicit dependency on Model directly 

130 try: 

131 deserialization_callback = deserialization_callback.deserialize # type: ignore 

132 except AttributeError: 

133 pass 

134 

135 self._polling_method.initialize(client, initial_response, deserialization_callback) 

136 

137 def polling_method(self) -> AsyncPollingMethod[PollingReturnType_co]: 

138 """Return the polling method associated to this poller. 

139 

140 :return: The polling method associated to this poller. 

141 :rtype: ~azure.core.polling.AsyncPollingMethod 

142 """ 

143 return self._polling_method 

144 

145 def continuation_token(self) -> str: 

146 """Return a continuation token that allows to restart the poller later. 

147 

148 :returns: An opaque continuation token 

149 :rtype: str 

150 """ 

151 return self._polling_method.get_continuation_token() 

152 

153 @classmethod 

154 def from_continuation_token( 

155 cls, polling_method: AsyncPollingMethod[PollingReturnType_co], continuation_token: str, **kwargs: Any 

156 ) -> "AsyncLROPoller[PollingReturnType_co]": 

157 ( 

158 client, 

159 initial_response, 

160 deserialization_callback, 

161 ) = polling_method.from_continuation_token(continuation_token, **kwargs) 

162 return cls(client, initial_response, deserialization_callback, polling_method) 

163 

164 def status(self) -> str: 

165 """Returns the current status string. 

166 

167 :returns: The current status string 

168 :rtype: str 

169 """ 

170 return self._polling_method.status() 

171 

172 async def result(self) -> PollingReturnType_co: 

173 """Return the result of the long running operation. 

174 

175 :returns: The deserialized resource of the long running operation, if one is available. 

176 :rtype: any or None 

177 :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query. 

178 """ 

179 await self.wait() 

180 return self._polling_method.resource() 

181 

182 def __await__(self) -> Generator[Any, None, PollingReturnType_co]: 

183 return self.result().__await__() 

184 

185 async def wait(self) -> None: 

186 """Wait on the long running operation. 

187 

188 :raises ~azure.core.exceptions.HttpResponseError: Server problem with the query. 

189 """ 

190 try: 

191 await self._polling_method.run() 

192 except AzureError as error: 

193 if not error.continuation_token: 

194 try: 

195 error.continuation_token = self.continuation_token() 

196 except Exception as err: # pylint: disable=broad-except 

197 _LOGGER.warning("Unable to retrieve continuation token: %s", err) 

198 error.continuation_token = None 

199 raise 

200 self._done = True 

201 

202 def done(self) -> bool: 

203 """Check status of the long running operation. 

204 

205 :returns: 'True' if the process has completed, else 'False'. 

206 :rtype: bool 

207 """ 

208 return self._done