Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

128 statements  

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""AsyncIO helpers for :mod:`grpc` supporting 3.7+. 

16 

17Please combine more detailed docstring in grpc_helpers.py to use following 

18functions. This module is implementing the same surface with AsyncIO semantics. 

19""" 

20 

21import asyncio 

22import functools 

23 

24from typing import AsyncGenerator, Generic, Iterator, Optional, TypeVar 

25 

26import grpc 

27from grpc import aio 

28 

29from google.api_core import exceptions, grpc_helpers 

30 

31# denotes the proto response type for grpc calls 

32P = TypeVar("P") 

33 

34# NOTE(lidiz) Alternatively, we can hack "__getattribute__" to perform 

35# automatic patching for us. But that means the overhead of creating an 

36# extra Python function spreads to every single send and receive. 

37 

38 

39class _WrappedCall(aio.Call): 

40 def __init__(self): 

41 self._call = None 

42 

43 def with_call(self, call): 

44 """Supplies the call object separately to keep __init__ clean.""" 

45 self._call = call 

46 return self 

47 

48 async def initial_metadata(self): 

49 return await self._call.initial_metadata() 

50 

51 async def trailing_metadata(self): 

52 return await self._call.trailing_metadata() 

53 

54 async def code(self): 

55 return await self._call.code() 

56 

57 async def details(self): 

58 return await self._call.details() 

59 

60 def cancelled(self): 

61 return self._call.cancelled() 

62 

63 def done(self): 

64 return self._call.done() 

65 

66 def time_remaining(self): 

67 return self._call.time_remaining() 

68 

69 def cancel(self): 

70 return self._call.cancel() 

71 

72 def add_done_callback(self, callback): 

73 self._call.add_done_callback(callback) 

74 

75 async def wait_for_connection(self): 

76 try: 

77 await self._call.wait_for_connection() 

78 except grpc.RpcError as rpc_error: 

79 raise exceptions.from_grpc_error(rpc_error) from rpc_error 

80 

81 

82class _WrappedUnaryResponseMixin(Generic[P], _WrappedCall): 

83 def __await__(self) -> Iterator[P]: 

84 try: 

85 response = yield from self._call.__await__() 

86 return response 

87 except grpc.RpcError as rpc_error: 

88 raise exceptions.from_grpc_error(rpc_error) from rpc_error 

89 

90 

91class _WrappedStreamResponseMixin(Generic[P], _WrappedCall): 

92 def __init__(self): 

93 self._wrapped_async_generator = None 

94 

95 async def read(self) -> P: 

96 try: 

97 return await self._call.read() 

98 except grpc.RpcError as rpc_error: 

99 raise exceptions.from_grpc_error(rpc_error) from rpc_error 

100 

101 async def _wrapped_aiter(self) -> AsyncGenerator[P, None]: 

102 try: 

103 # NOTE(lidiz) coverage doesn't understand the exception raised from 

104 # __anext__ method. It is covered by test case: 

105 # test_wrap_stream_errors_aiter_non_rpc_error 

106 async for response in self._call: # pragma: no branch 

107 yield response 

108 except grpc.RpcError as rpc_error: 

109 raise exceptions.from_grpc_error(rpc_error) from rpc_error 

110 

111 def __aiter__(self) -> AsyncGenerator[P, None]: 

112 if not self._wrapped_async_generator: 

113 self._wrapped_async_generator = self._wrapped_aiter() 

114 return self._wrapped_async_generator 

115 

116 

117class _WrappedStreamRequestMixin(_WrappedCall): 

118 async def write(self, request): 

119 try: 

120 await self._call.write(request) 

121 except grpc.RpcError as rpc_error: 

122 raise exceptions.from_grpc_error(rpc_error) from rpc_error 

123 

124 async def done_writing(self): 

125 try: 

126 await self._call.done_writing() 

127 except grpc.RpcError as rpc_error: 

128 raise exceptions.from_grpc_error(rpc_error) from rpc_error 

129 

130 

131# NOTE(lidiz) Implementing each individual class separately, so we don't 

132# expose any API that should not be seen. E.g., __aiter__ in unary-unary 

133# RPC, or __await__ in stream-stream RPC. 

134class _WrappedUnaryUnaryCall(_WrappedUnaryResponseMixin[P], aio.UnaryUnaryCall): 

135 """Wrapped UnaryUnaryCall to map exceptions.""" 

136 

137 

138class _WrappedUnaryStreamCall(_WrappedStreamResponseMixin[P], aio.UnaryStreamCall): 

139 """Wrapped UnaryStreamCall to map exceptions.""" 

140 

141 

142class _WrappedStreamUnaryCall( 

143 _WrappedUnaryResponseMixin[P], _WrappedStreamRequestMixin, aio.StreamUnaryCall 

144): 

145 """Wrapped StreamUnaryCall to map exceptions.""" 

146 

147 

148class _WrappedStreamStreamCall( 

149 _WrappedStreamRequestMixin, _WrappedStreamResponseMixin[P], aio.StreamStreamCall 

150): 

151 """Wrapped StreamStreamCall to map exceptions.""" 

152 

153 

154# public type alias denoting the return type of async streaming gapic calls 

155GrpcAsyncStream = _WrappedStreamResponseMixin 

156# public type alias denoting the return type of unary gapic calls 

157AwaitableGrpcCall = _WrappedUnaryResponseMixin 

158 

159 

160def _wrap_unary_errors(callable_): 

161 """Map errors for Unary-Unary async callables.""" 

162 

163 @functools.wraps(callable_) 

164 def error_remapped_callable(*args, **kwargs): 

165 call = callable_(*args, **kwargs) 

166 return _WrappedUnaryUnaryCall().with_call(call) 

167 

168 return error_remapped_callable 

169 

170 

171def _wrap_stream_errors(callable_, wrapper_type): 

172 """Map errors for streaming RPC async callables.""" 

173 

174 @functools.wraps(callable_) 

175 async def error_remapped_callable(*args, **kwargs): 

176 call = callable_(*args, **kwargs) 

177 call = wrapper_type().with_call(call) 

178 await call.wait_for_connection() 

179 return call 

180 

181 return error_remapped_callable 

182 

183 

184def wrap_errors(callable_): 

185 """Wrap a gRPC async callable and map :class:`grpc.RpcErrors` to 

186 friendly error classes. 

187 

188 Errors raised by the gRPC callable are mapped to the appropriate 

189 :class:`google.api_core.exceptions.GoogleAPICallError` subclasses. The 

190 original `grpc.RpcError` (which is usually also a `grpc.Call`) is 

191 available from the ``response`` property on the mapped exception. This 

192 is useful for extracting metadata from the original error. 

193 

194 Args: 

195 callable_ (Callable): A gRPC callable. 

196 

197 Returns: Callable: The wrapped gRPC callable. 

198 """ 

199 grpc_helpers._patch_callable_name(callable_) 

200 

201 if isinstance(callable_, aio.UnaryStreamMultiCallable): 

202 return _wrap_stream_errors(callable_, _WrappedUnaryStreamCall) 

203 elif isinstance(callable_, aio.StreamUnaryMultiCallable): 

204 return _wrap_stream_errors(callable_, _WrappedStreamUnaryCall) 

205 elif isinstance(callable_, aio.StreamStreamMultiCallable): 

206 return _wrap_stream_errors(callable_, _WrappedStreamStreamCall) 

207 else: 

208 return _wrap_unary_errors(callable_) 

209 

210 

211def create_channel( 

212 target, 

213 credentials=None, 

214 scopes=None, 

215 ssl_credentials=None, 

216 credentials_file=None, 

217 quota_project_id=None, 

218 default_scopes=None, 

219 default_host=None, 

220 compression=None, 

221 attempt_direct_path: Optional[bool] = False, 

222 **kwargs 

223): 

224 """Create an AsyncIO secure channel with credentials. 

225 

226 Args: 

227 target (str): The target service address in the format 'hostname:port'. 

228 credentials (google.auth.credentials.Credentials): The credentials. If 

229 not specified, then this function will attempt to ascertain the 

230 credentials from the environment using :func:`google.auth.default`. 

231 scopes (Sequence[str]): A optional list of scopes needed for this 

232 service. These are only used when credentials are not specified and 

233 are passed to :func:`google.auth.default`. 

234 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel 

235 credentials. This can be used to specify different certificates. 

236 credentials_file (str): A file with credentials that can be loaded with 

237 :func:`google.auth.load_credentials_from_file`. This argument is 

238 mutually exclusive with credentials. 

239 

240 .. warning:: 

241 Important: If you accept a credential configuration (credential JSON/File/Stream) 

242 from an external source for authentication to Google Cloud Platform, you must 

243 validate it before providing it to any Google API or client library. Providing an 

244 unvalidated credential configuration to Google APIs or libraries can compromise 

245 the security of your systems and data. For more information, refer to 

246 `Validate credential configurations from external sources`_. 

247 

248 .. _Validate credential configurations from external sources: 

249 

250 https://cloud.google.com/docs/authentication/external/externally-sourced-credentials 

251 quota_project_id (str): An optional project to use for billing and quota. 

252 default_scopes (Sequence[str]): Default scopes passed by a Google client 

253 library. Use 'scopes' for user-defined scopes. 

254 default_host (str): The default endpoint. e.g., "pubsub.googleapis.com". 

255 compression (grpc.Compression): An optional value indicating the 

256 compression method to be used over the lifetime of the channel. 

257 attempt_direct_path (Optional[bool]): If set, Direct Path will be attempted 

258 when the request is made. Direct Path is only available within a Google 

259 Compute Engine (GCE) environment and provides a proxyless connection 

260 which increases the available throughput, reduces latency, and increases 

261 reliability. Note: 

262 

263 - This argument should only be set in a GCE environment and for Services 

264 that are known to support Direct Path. 

265 - If this argument is set outside of GCE, then this request will fail 

266 unless the back-end service happens to have configured fall-back to DNS. 

267 - If the request causes a `ServiceUnavailable` response, it is recommended 

268 that the client repeat the request with `attempt_direct_path` set to 

269 `False` as the Service may not support Direct Path. 

270 - Using `ssl_credentials` with `attempt_direct_path` set to `True` will 

271 result in `ValueError` as this combination is not yet supported. 

272 

273 kwargs: Additional key-word args passed to :func:`aio.secure_channel`. 

274 

275 Returns: 

276 aio.Channel: The created channel. 

277 

278 Raises: 

279 google.api_core.DuplicateCredentialArgs: If both a credentials object and credentials_file are passed. 

280 ValueError: If `ssl_credentials` is set and `attempt_direct_path` is set to `True`. 

281 """ 

282 

283 # If `ssl_credentials` is set and `attempt_direct_path` is set to `True`, 

284 # raise ValueError as this is not yet supported. 

285 # See https://github.com/googleapis/python-api-core/issues/590 

286 if ssl_credentials and attempt_direct_path: 

287 raise ValueError("Using ssl_credentials with Direct Path is not supported") 

288 

289 composite_credentials = grpc_helpers._create_composite_credentials( 

290 credentials=credentials, 

291 credentials_file=credentials_file, 

292 scopes=scopes, 

293 default_scopes=default_scopes, 

294 ssl_credentials=ssl_credentials, 

295 quota_project_id=quota_project_id, 

296 default_host=default_host, 

297 ) 

298 

299 if attempt_direct_path: 

300 target = grpc_helpers._modify_target_for_direct_path(target) 

301 

302 return aio.secure_channel( 

303 target, composite_credentials, compression=compression, **kwargs 

304 ) 

305 

306 

307class FakeUnaryUnaryCall(_WrappedUnaryUnaryCall): 

308 """Fake implementation for unary-unary RPCs. 

309 

310 It is a dummy object for response message. Supply the intended response 

311 upon the initialization, and the coroutine will return the exact response 

312 message. 

313 """ 

314 

315 def __init__(self, response=object()): 

316 self.response = response 

317 self._future = asyncio.get_event_loop().create_future() 

318 self._future.set_result(self.response) 

319 

320 def __await__(self): 

321 response = yield from self._future.__await__() 

322 return response 

323 

324 

325class FakeStreamUnaryCall(_WrappedStreamUnaryCall): 

326 """Fake implementation for stream-unary RPCs. 

327 

328 It is a dummy object for response message. Supply the intended response 

329 upon the initialization, and the coroutine will return the exact response 

330 message. 

331 """ 

332 

333 def __init__(self, response=object()): 

334 self.response = response 

335 self._future = asyncio.get_event_loop().create_future() 

336 self._future.set_result(self.response) 

337 

338 def __await__(self): 

339 response = yield from self._future.__await__() 

340 return response 

341 

342 async def wait_for_connection(self): 

343 pass