Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py: 47%

153 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:37 +0000

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for :mod:`grpc`.""" 

16 

17import collections 

18import functools 

19import warnings 

20 

21import grpc 

22 

23from google.api_core import exceptions 

24import google.auth 

25import google.auth.credentials 

26import google.auth.transport.grpc 

27import google.auth.transport.requests 

28import google.protobuf 

29 

30PROTOBUF_VERSION = google.protobuf.__version__ 

31 

32# The grpcio-gcp package only has support for protobuf < 4 

33if PROTOBUF_VERSION[0:2] == "3.": # pragma: NO COVER 

34 try: 

35 import grpc_gcp 

36 

37 warnings.warn( 

38 """Support for grpcio-gcp is deprecated. This feature will be 

39 removed from `google-api-core` after January 1, 2024. If you need to 

40 continue to use this feature, please pin to a specific version of 

41 `google-api-core`.""", 

42 DeprecationWarning, 

43 ) 

44 HAS_GRPC_GCP = True 

45 except ImportError: 

46 HAS_GRPC_GCP = False 

47else: 

48 HAS_GRPC_GCP = False 

49 

50 

51# The list of gRPC Callable interfaces that return iterators. 

52_STREAM_WRAP_CLASSES = (grpc.UnaryStreamMultiCallable, grpc.StreamStreamMultiCallable) 

53 

54 

55def _patch_callable_name(callable_): 

56 """Fix-up gRPC callable attributes. 

57 

58 gRPC callable lack the ``__name__`` attribute which causes 

59 :func:`functools.wraps` to error. This adds the attribute if needed. 

60 """ 

61 if not hasattr(callable_, "__name__"): 

62 callable_.__name__ = callable_.__class__.__name__ 

63 

64 

65def _wrap_unary_errors(callable_): 

66 """Map errors for Unary-Unary and Stream-Unary gRPC callables.""" 

67 _patch_callable_name(callable_) 

68 

69 @functools.wraps(callable_) 

70 def error_remapped_callable(*args, **kwargs): 

71 try: 

72 return callable_(*args, **kwargs) 

73 except grpc.RpcError as exc: 

74 raise exceptions.from_grpc_error(exc) from exc 

75 

76 return error_remapped_callable 

77 

78 

79class _StreamingResponseIterator(grpc.Call): 

80 def __init__(self, wrapped, prefetch_first_result=True): 

81 self._wrapped = wrapped 

82 

83 # This iterator is used in a retry context, and returned outside after init. 

84 # gRPC will not throw an exception until the stream is consumed, so we need 

85 # to retrieve the first result, in order to fail, in order to trigger a retry. 

86 try: 

87 if prefetch_first_result: 

88 self._stored_first_result = next(self._wrapped) 

89 except TypeError: 

90 # It is possible the wrapped method isn't an iterable (a grpc.Call 

91 # for instance). If this happens don't store the first result. 

92 pass 

93 except StopIteration: 

94 # ignore stop iteration at this time. This should be handled outside of retry. 

95 pass 

96 

97 def __iter__(self): 

98 """This iterator is also an iterable that returns itself.""" 

99 return self 

100 

101 def __next__(self): 

102 """Get the next response from the stream. 

103 

104 Returns: 

105 protobuf.Message: A single response from the stream. 

106 """ 

107 try: 

108 if hasattr(self, "_stored_first_result"): 

109 result = self._stored_first_result 

110 del self._stored_first_result 

111 return result 

112 return next(self._wrapped) 

113 except grpc.RpcError as exc: 

114 # If the stream has already returned data, we cannot recover here. 

115 raise exceptions.from_grpc_error(exc) from exc 

116 

117 # grpc.Call & grpc.RpcContext interface 

118 

119 def add_callback(self, callback): 

120 return self._wrapped.add_callback(callback) 

121 

122 def cancel(self): 

123 return self._wrapped.cancel() 

124 

125 def code(self): 

126 return self._wrapped.code() 

127 

128 def details(self): 

129 return self._wrapped.details() 

130 

131 def initial_metadata(self): 

132 return self._wrapped.initial_metadata() 

133 

134 def is_active(self): 

135 return self._wrapped.is_active() 

136 

137 def time_remaining(self): 

138 return self._wrapped.time_remaining() 

139 

140 def trailing_metadata(self): 

141 return self._wrapped.trailing_metadata() 

142 

143 

144def _wrap_stream_errors(callable_): 

145 """Wrap errors for Unary-Stream and Stream-Stream gRPC callables. 

146 

147 The callables that return iterators require a bit more logic to re-map 

148 errors when iterating. This wraps both the initial invocation and the 

149 iterator of the return value to re-map errors. 

150 """ 

151 _patch_callable_name(callable_) 

152 

153 @functools.wraps(callable_) 

154 def error_remapped_callable(*args, **kwargs): 

155 try: 

156 result = callable_(*args, **kwargs) 

157 # Auto-fetching the first result causes PubSub client's streaming pull 

158 # to hang when re-opening the stream, thus we need examine the hacky 

159 # hidden flag to see if pre-fetching is disabled. 

160 # https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257 

161 prefetch_first = getattr(callable_, "_prefetch_first_result_", True) 

162 return _StreamingResponseIterator( 

163 result, prefetch_first_result=prefetch_first 

164 ) 

165 except grpc.RpcError as exc: 

166 raise exceptions.from_grpc_error(exc) from exc 

167 

168 return error_remapped_callable 

169 

170 

171def wrap_errors(callable_): 

172 """Wrap a gRPC callable and map :class:`grpc.RpcErrors` to friendly error 

173 classes. 

174 

175 Errors raised by the gRPC callable are mapped to the appropriate 

176 :class:`google.api_core.exceptions.GoogleAPICallError` subclasses. 

177 The original `grpc.RpcError` (which is usually also a `grpc.Call`) is 

178 available from the ``response`` property on the mapped exception. This 

179 is useful for extracting metadata from the original error. 

180 

181 Args: 

182 callable_ (Callable): A gRPC callable. 

183 

184 Returns: 

185 Callable: The wrapped gRPC callable. 

186 """ 

187 if isinstance(callable_, _STREAM_WRAP_CLASSES): 

188 return _wrap_stream_errors(callable_) 

189 else: 

190 return _wrap_unary_errors(callable_) 

191 

192 

193def _create_composite_credentials( 

194 credentials=None, 

195 credentials_file=None, 

196 default_scopes=None, 

197 scopes=None, 

198 ssl_credentials=None, 

199 quota_project_id=None, 

200 default_host=None, 

201): 

202 """Create the composite credentials for secure channels. 

203 

204 Args: 

205 credentials (google.auth.credentials.Credentials): The credentials. If 

206 not specified, then this function will attempt to ascertain the 

207 credentials from the environment using :func:`google.auth.default`. 

208 credentials_file (str): A file with credentials that can be loaded with 

209 :func:`google.auth.load_credentials_from_file`. This argument is 

210 mutually exclusive with credentials. 

211 default_scopes (Sequence[str]): A optional list of scopes needed for this 

212 service. These are only used when credentials are not specified and 

213 are passed to :func:`google.auth.default`. 

214 scopes (Sequence[str]): A optional list of scopes needed for this 

215 service. These are only used when credentials are not specified and 

216 are passed to :func:`google.auth.default`. 

217 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel 

218 credentials. This can be used to specify different certificates. 

219 quota_project_id (str): An optional project to use for billing and quota. 

220 default_host (str): The default endpoint. e.g., "pubsub.googleapis.com". 

221 

222 Returns: 

223 grpc.ChannelCredentials: The composed channel credentials object. 

224 

225 Raises: 

226 google.api_core.DuplicateCredentialArgs: If both a credentials object and credentials_file are passed. 

227 """ 

228 if credentials and credentials_file: 

229 raise exceptions.DuplicateCredentialArgs( 

230 "'credentials' and 'credentials_file' are mutually exclusive." 

231 ) 

232 

233 if credentials_file: 

234 credentials, _ = google.auth.load_credentials_from_file( 

235 credentials_file, scopes=scopes, default_scopes=default_scopes 

236 ) 

237 elif credentials: 

238 credentials = google.auth.credentials.with_scopes_if_required( 

239 credentials, scopes=scopes, default_scopes=default_scopes 

240 ) 

241 else: 

242 credentials, _ = google.auth.default( 

243 scopes=scopes, default_scopes=default_scopes 

244 ) 

245 

246 if quota_project_id and isinstance( 

247 credentials, google.auth.credentials.CredentialsWithQuotaProject 

248 ): 

249 credentials = credentials.with_quota_project(quota_project_id) 

250 

251 request = google.auth.transport.requests.Request() 

252 

253 # Create the metadata plugin for inserting the authorization header. 

254 metadata_plugin = google.auth.transport.grpc.AuthMetadataPlugin( 

255 credentials, 

256 request, 

257 default_host=default_host, 

258 ) 

259 

260 # Create a set of grpc.CallCredentials using the metadata plugin. 

261 google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin) 

262 

263 if ssl_credentials is None: 

264 ssl_credentials = grpc.ssl_channel_credentials() 

265 

266 # Combine the ssl credentials and the authorization credentials. 

267 return grpc.composite_channel_credentials(ssl_credentials, google_auth_credentials) 

268 

269 

270def create_channel( 

271 target, 

272 credentials=None, 

273 scopes=None, 

274 ssl_credentials=None, 

275 credentials_file=None, 

276 quota_project_id=None, 

277 default_scopes=None, 

278 default_host=None, 

279 **kwargs 

280): 

281 """Create a secure channel with credentials. 

282 

283 Args: 

284 target (str): The target service address in the format 'hostname:port'. 

285 credentials (google.auth.credentials.Credentials): The credentials. If 

286 not specified, then this function will attempt to ascertain the 

287 credentials from the environment using :func:`google.auth.default`. 

288 scopes (Sequence[str]): A optional list of scopes needed for this 

289 service. These are only used when credentials are not specified and 

290 are passed to :func:`google.auth.default`. 

291 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel 

292 credentials. This can be used to specify different certificates. 

293 credentials_file (str): A file with credentials that can be loaded with 

294 :func:`google.auth.load_credentials_from_file`. This argument is 

295 mutually exclusive with credentials. 

296 quota_project_id (str): An optional project to use for billing and quota. 

297 default_scopes (Sequence[str]): Default scopes passed by a Google client 

298 library. Use 'scopes' for user-defined scopes. 

299 default_host (str): The default endpoint. e.g., "pubsub.googleapis.com". 

300 kwargs: Additional key-word args passed to 

301 :func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`. 

302 Note: `grpc_gcp` is only supported in environments with protobuf < 4.0.0. 

303 

304 Returns: 

305 grpc.Channel: The created channel. 

306 

307 Raises: 

308 google.api_core.DuplicateCredentialArgs: If both a credentials object and credentials_file are passed. 

309 """ 

310 

311 composite_credentials = _create_composite_credentials( 

312 credentials=credentials, 

313 credentials_file=credentials_file, 

314 default_scopes=default_scopes, 

315 scopes=scopes, 

316 ssl_credentials=ssl_credentials, 

317 quota_project_id=quota_project_id, 

318 default_host=default_host, 

319 ) 

320 

321 if HAS_GRPC_GCP: # pragma: NO COVER 

322 return grpc_gcp.secure_channel(target, composite_credentials, **kwargs) 

323 return grpc.secure_channel(target, composite_credentials, **kwargs) 

324 

325 

326_MethodCall = collections.namedtuple( 

327 "_MethodCall", ("request", "timeout", "metadata", "credentials") 

328) 

329 

330_ChannelRequest = collections.namedtuple("_ChannelRequest", ("method", "request")) 

331 

332 

333class _CallableStub(object): 

334 """Stub for the grpc.*MultiCallable interfaces.""" 

335 

336 def __init__(self, method, channel): 

337 self._method = method 

338 self._channel = channel 

339 self.response = None 

340 """Union[protobuf.Message, Callable[protobuf.Message], exception]: 

341 The response to give when invoking this callable. If this is a 

342 callable, it will be invoked with the request protobuf. If it's an 

343 exception, the exception will be raised when this is invoked. 

344 """ 

345 self.responses = None 

346 """Iterator[ 

347 Union[protobuf.Message, Callable[protobuf.Message], exception]]: 

348 An iterator of responses. If specified, self.response will be populated 

349 on each invocation by calling ``next(self.responses)``.""" 

350 self.requests = [] 

351 """List[protobuf.Message]: All requests sent to this callable.""" 

352 self.calls = [] 

353 """List[Tuple]: All invocations of this callable. Each tuple is the 

354 request, timeout, metadata, and credentials.""" 

355 

356 def __call__(self, request, timeout=None, metadata=None, credentials=None): 

357 self._channel.requests.append(_ChannelRequest(self._method, request)) 

358 self.calls.append(_MethodCall(request, timeout, metadata, credentials)) 

359 self.requests.append(request) 

360 

361 response = self.response 

362 if self.responses is not None: 

363 if response is None: 

364 response = next(self.responses) 

365 else: 

366 raise ValueError( 

367 "{method}.response and {method}.responses are mutually " 

368 "exclusive.".format(method=self._method) 

369 ) 

370 

371 if callable(response): 

372 return response(request) 

373 

374 if isinstance(response, Exception): 

375 raise response 

376 

377 if response is not None: 

378 return response 

379 

380 raise ValueError('Method stub for "{}" has no response.'.format(self._method)) 

381 

382 

383def _simplify_method_name(method): 

384 """Simplifies a gRPC method name. 

385 

386 When gRPC invokes the channel to create a callable, it gives a full 

387 method name like "/google.pubsub.v1.Publisher/CreateTopic". This 

388 returns just the name of the method, in this case "CreateTopic". 

389 

390 Args: 

391 method (str): The name of the method. 

392 

393 Returns: 

394 str: The simplified name of the method. 

395 """ 

396 return method.rsplit("/", 1).pop() 

397 

398 

399class ChannelStub(grpc.Channel): 

400 """A testing stub for the grpc.Channel interface. 

401 

402 This can be used to test any client that eventually uses a gRPC channel 

403 to communicate. By passing in a channel stub, you can configure which 

404 responses are returned and track which requests are made. 

405 

406 For example: 

407 

408 .. code-block:: python 

409 

410 channel_stub = grpc_helpers.ChannelStub() 

411 client = FooClient(channel=channel_stub) 

412 

413 channel_stub.GetFoo.response = foo_pb2.Foo(name='bar') 

414 

415 foo = client.get_foo(labels=['baz']) 

416 

417 assert foo.name == 'bar' 

418 assert channel_stub.GetFoo.requests[0].labels = ['baz'] 

419 

420 Each method on the stub can be accessed and configured on the channel. 

421 Here's some examples of various configurations: 

422 

423 .. code-block:: python 

424 

425 # Return a basic response: 

426 

427 channel_stub.GetFoo.response = foo_pb2.Foo(name='bar') 

428 assert client.get_foo().name == 'bar' 

429 

430 # Raise an exception: 

431 channel_stub.GetFoo.response = NotFound('...') 

432 

433 with pytest.raises(NotFound): 

434 client.get_foo() 

435 

436 # Use a sequence of responses: 

437 channel_stub.GetFoo.responses = iter([ 

438 foo_pb2.Foo(name='bar'), 

439 foo_pb2.Foo(name='baz'), 

440 ]) 

441 

442 assert client.get_foo().name == 'bar' 

443 assert client.get_foo().name == 'baz' 

444 

445 # Use a callable 

446 

447 def on_get_foo(request): 

448 return foo_pb2.Foo(name='bar' + request.id) 

449 

450 channel_stub.GetFoo.response = on_get_foo 

451 

452 assert client.get_foo(id='123').name == 'bar123' 

453 """ 

454 

455 def __init__(self, responses=[]): 

456 self.requests = [] 

457 """Sequence[Tuple[str, protobuf.Message]]: A list of all requests made 

458 on this channel in order. The tuple is of method name, request 

459 message.""" 

460 self._method_stubs = {} 

461 

462 def _stub_for_method(self, method): 

463 method = _simplify_method_name(method) 

464 self._method_stubs[method] = _CallableStub(method, self) 

465 return self._method_stubs[method] 

466 

467 def __getattr__(self, key): 

468 try: 

469 return self._method_stubs[key] 

470 except KeyError: 

471 raise AttributeError 

472 

473 def unary_unary(self, method, request_serializer=None, response_deserializer=None): 

474 """grpc.Channel.unary_unary implementation.""" 

475 return self._stub_for_method(method) 

476 

477 def unary_stream(self, method, request_serializer=None, response_deserializer=None): 

478 """grpc.Channel.unary_stream implementation.""" 

479 return self._stub_for_method(method) 

480 

481 def stream_unary(self, method, request_serializer=None, response_deserializer=None): 

482 """grpc.Channel.stream_unary implementation.""" 

483 return self._stub_for_method(method) 

484 

485 def stream_stream( 

486 self, method, request_serializer=None, response_deserializer=None 

487 ): 

488 """grpc.Channel.stream_stream implementation.""" 

489 return self._stub_for_method(method) 

490 

491 def subscribe(self, callback, try_to_connect=False): 

492 """grpc.Channel.subscribe implementation.""" 

493 pass 

494 

495 def unsubscribe(self, callback): 

496 """grpc.Channel.unsubscribe implementation.""" 

497 pass 

498 

499 def close(self): 

500 """grpc.Channel.close implementation.""" 

501 pass