Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers_async.py: 36%

122 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""AsyncIO helpers for :mod:`grpc` supporting 3.7+. 

16 

17Please combine more detailed docstring in grpc_helpers.py to use following 

18functions. This module is implementing the same surface with AsyncIO semantics. 

19""" 

20 

21import asyncio 

22import functools 

23 

24import grpc 

25from grpc import aio 

26 

27from google.api_core import exceptions, grpc_helpers 

28 

29 

30# NOTE(lidiz) Alternatively, we can hack "__getattribute__" to perform 

31# automatic patching for us. But that means the overhead of creating an 

32# extra Python function spreads to every single send and receive. 

33 

34 

35class _WrappedCall(aio.Call): 

36 def __init__(self): 

37 self._call = None 

38 

39 def with_call(self, call): 

40 """Supplies the call object separately to keep __init__ clean.""" 

41 self._call = call 

42 return self 

43 

44 async def initial_metadata(self): 

45 return await self._call.initial_metadata() 

46 

47 async def trailing_metadata(self): 

48 return await self._call.trailing_metadata() 

49 

50 async def code(self): 

51 return await self._call.code() 

52 

53 async def details(self): 

54 return await self._call.details() 

55 

56 def cancelled(self): 

57 return self._call.cancelled() 

58 

59 def done(self): 

60 return self._call.done() 

61 

62 def time_remaining(self): 

63 return self._call.time_remaining() 

64 

65 def cancel(self): 

66 return self._call.cancel() 

67 

68 def add_done_callback(self, callback): 

69 self._call.add_done_callback(callback) 

70 

71 async def wait_for_connection(self): 

72 try: 

73 await self._call.wait_for_connection() 

74 except grpc.RpcError as rpc_error: 

75 raise exceptions.from_grpc_error(rpc_error) from rpc_error 

76 

77 

78class _WrappedUnaryResponseMixin(_WrappedCall): 

79 def __await__(self): 

80 try: 

81 response = yield from self._call.__await__() 

82 return response 

83 except grpc.RpcError as rpc_error: 

84 raise exceptions.from_grpc_error(rpc_error) from rpc_error 

85 

86 

87class _WrappedStreamResponseMixin(_WrappedCall): 

88 def __init__(self): 

89 self._wrapped_async_generator = None 

90 

91 async def read(self): 

92 try: 

93 return await self._call.read() 

94 except grpc.RpcError as rpc_error: 

95 raise exceptions.from_grpc_error(rpc_error) from rpc_error 

96 

97 async def _wrapped_aiter(self): 

98 try: 

99 # NOTE(lidiz) coverage doesn't understand the exception raised from 

100 # __anext__ method. It is covered by test case: 

101 # test_wrap_stream_errors_aiter_non_rpc_error 

102 async for response in self._call: # pragma: no branch 

103 yield response 

104 except grpc.RpcError as rpc_error: 

105 raise exceptions.from_grpc_error(rpc_error) from rpc_error 

106 

107 def __aiter__(self): 

108 if not self._wrapped_async_generator: 

109 self._wrapped_async_generator = self._wrapped_aiter() 

110 return self._wrapped_async_generator 

111 

112 

113class _WrappedStreamRequestMixin(_WrappedCall): 

114 async def write(self, request): 

115 try: 

116 await self._call.write(request) 

117 except grpc.RpcError as rpc_error: 

118 raise exceptions.from_grpc_error(rpc_error) from rpc_error 

119 

120 async def done_writing(self): 

121 try: 

122 await self._call.done_writing() 

123 except grpc.RpcError as rpc_error: 

124 raise exceptions.from_grpc_error(rpc_error) from rpc_error 

125 

126 

127# NOTE(lidiz) Implementing each individual class separately, so we don't 

128# expose any API that should not be seen. E.g., __aiter__ in unary-unary 

129# RPC, or __await__ in stream-stream RPC. 

130class _WrappedUnaryUnaryCall(_WrappedUnaryResponseMixin, aio.UnaryUnaryCall): 

131 """Wrapped UnaryUnaryCall to map exceptions.""" 

132 

133 

134class _WrappedUnaryStreamCall(_WrappedStreamResponseMixin, aio.UnaryStreamCall): 

135 """Wrapped UnaryStreamCall to map exceptions.""" 

136 

137 

138class _WrappedStreamUnaryCall( 

139 _WrappedUnaryResponseMixin, _WrappedStreamRequestMixin, aio.StreamUnaryCall 

140): 

141 """Wrapped StreamUnaryCall to map exceptions.""" 

142 

143 

144class _WrappedStreamStreamCall( 

145 _WrappedStreamRequestMixin, _WrappedStreamResponseMixin, aio.StreamStreamCall 

146): 

147 """Wrapped StreamStreamCall to map exceptions.""" 

148 

149 

150def _wrap_unary_errors(callable_): 

151 """Map errors for Unary-Unary async callables.""" 

152 grpc_helpers._patch_callable_name(callable_) 

153 

154 @functools.wraps(callable_) 

155 def error_remapped_callable(*args, **kwargs): 

156 call = callable_(*args, **kwargs) 

157 return _WrappedUnaryUnaryCall().with_call(call) 

158 

159 return error_remapped_callable 

160 

161 

162def _wrap_stream_errors(callable_): 

163 """Map errors for streaming RPC async callables.""" 

164 grpc_helpers._patch_callable_name(callable_) 

165 

166 @functools.wraps(callable_) 

167 async def error_remapped_callable(*args, **kwargs): 

168 call = callable_(*args, **kwargs) 

169 

170 if isinstance(call, aio.UnaryStreamCall): 

171 call = _WrappedUnaryStreamCall().with_call(call) 

172 elif isinstance(call, aio.StreamUnaryCall): 

173 call = _WrappedStreamUnaryCall().with_call(call) 

174 elif isinstance(call, aio.StreamStreamCall): 

175 call = _WrappedStreamStreamCall().with_call(call) 

176 else: 

177 raise TypeError("Unexpected type of call %s" % type(call)) 

178 

179 await call.wait_for_connection() 

180 return call 

181 

182 return error_remapped_callable 

183 

184 

185def wrap_errors(callable_): 

186 """Wrap a gRPC async callable and map :class:`grpc.RpcErrors` to 

187 friendly error classes. 

188 

189 Errors raised by the gRPC callable are mapped to the appropriate 

190 :class:`google.api_core.exceptions.GoogleAPICallError` subclasses. The 

191 original `grpc.RpcError` (which is usually also a `grpc.Call`) is 

192 available from the ``response`` property on the mapped exception. This 

193 is useful for extracting metadata from the original error. 

194 

195 Args: 

196 callable_ (Callable): A gRPC callable. 

197 

198 Returns: Callable: The wrapped gRPC callable. 

199 """ 

200 if isinstance(callable_, aio.UnaryUnaryMultiCallable): 

201 return _wrap_unary_errors(callable_) 

202 else: 

203 return _wrap_stream_errors(callable_) 

204 

205 

206def create_channel( 

207 target, 

208 credentials=None, 

209 scopes=None, 

210 ssl_credentials=None, 

211 credentials_file=None, 

212 quota_project_id=None, 

213 default_scopes=None, 

214 default_host=None, 

215 **kwargs 

216): 

217 """Create an AsyncIO secure channel with credentials. 

218 

219 Args: 

220 target (str): The target service address in the format 'hostname:port'. 

221 credentials (google.auth.credentials.Credentials): The credentials. If 

222 not specified, then this function will attempt to ascertain the 

223 credentials from the environment using :func:`google.auth.default`. 

224 scopes (Sequence[str]): A optional list of scopes needed for this 

225 service. These are only used when credentials are not specified and 

226 are passed to :func:`google.auth.default`. 

227 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel 

228 credentials. This can be used to specify different certificates. 

229 credentials_file (str): A file with credentials that can be loaded with 

230 :func:`google.auth.load_credentials_from_file`. This argument is 

231 mutually exclusive with credentials. 

232 quota_project_id (str): An optional project to use for billing and quota. 

233 default_scopes (Sequence[str]): Default scopes passed by a Google client 

234 library. Use 'scopes' for user-defined scopes. 

235 default_host (str): The default endpoint. e.g., "pubsub.googleapis.com". 

236 kwargs: Additional key-word args passed to :func:`aio.secure_channel`. 

237 

238 Returns: 

239 aio.Channel: The created channel. 

240 

241 Raises: 

242 google.api_core.DuplicateCredentialArgs: If both a credentials object and credentials_file are passed. 

243 """ 

244 

245 composite_credentials = grpc_helpers._create_composite_credentials( 

246 credentials=credentials, 

247 credentials_file=credentials_file, 

248 scopes=scopes, 

249 default_scopes=default_scopes, 

250 ssl_credentials=ssl_credentials, 

251 quota_project_id=quota_project_id, 

252 default_host=default_host, 

253 ) 

254 

255 return aio.secure_channel(target, composite_credentials, **kwargs) 

256 

257 

258class FakeUnaryUnaryCall(_WrappedUnaryUnaryCall): 

259 """Fake implementation for unary-unary RPCs. 

260 

261 It is a dummy object for response message. Supply the intended response 

262 upon the initialization, and the coroutine will return the exact response 

263 message. 

264 """ 

265 

266 def __init__(self, response=object()): 

267 self.response = response 

268 self._future = asyncio.get_event_loop().create_future() 

269 self._future.set_result(self.response) 

270 

271 def __await__(self): 

272 response = yield from self._future.__await__() 

273 return response 

274 

275 

276class FakeStreamUnaryCall(_WrappedStreamUnaryCall): 

277 """Fake implementation for stream-unary RPCs. 

278 

279 It is a dummy object for response message. Supply the intended response 

280 upon the initialization, and the coroutine will return the exact response 

281 message. 

282 """ 

283 

284 def __init__(self, response=object()): 

285 self.response = response 

286 self._future = asyncio.get_event_loop().create_future() 

287 self._future.set_result(self.response) 

288 

289 def __await__(self): 

290 response = yield from self._future.__await__() 

291 return response 

292 

293 async def wait_for_connection(self): 

294 pass