Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py: 34%

158 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:45 +0000

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for :mod:`grpc`.""" 

16from typing import Generic, TypeVar, Iterator 

17 

18import collections 

19import functools 

20import logging 

21import warnings 

22 

23import grpc 

24 

25from google.api_core import exceptions 

26import google.auth 

27import google.auth.credentials 

28import google.auth.transport.grpc 

29import google.auth.transport.requests 

30import google.protobuf 

31 

32PROTOBUF_VERSION = google.protobuf.__version__ 

33 

34# The grpcio-gcp package only has support for protobuf < 4 

35if PROTOBUF_VERSION[0:2] == "3.": # pragma: NO COVER 

36 try: 

37 import grpc_gcp 

38 

39 warnings.warn( 

40 """Support for grpcio-gcp is deprecated. This feature will be 

41 removed from `google-api-core` after January 1, 2024. If you need to 

42 continue to use this feature, please pin to a specific version of 

43 `google-api-core`.""", 

44 DeprecationWarning, 

45 ) 

46 HAS_GRPC_GCP = True 

47 except ImportError: 

48 HAS_GRPC_GCP = False 

49else: 

50 HAS_GRPC_GCP = False 

51 

52 

53# The list of gRPC Callable interfaces that return iterators. 

54_STREAM_WRAP_CLASSES = (grpc.UnaryStreamMultiCallable, grpc.StreamStreamMultiCallable) 

55 

56_LOGGER = logging.getLogger(__name__) 

57 

58# denotes the proto response type for grpc calls 

59P = TypeVar("P") 

60 

61 

62def _patch_callable_name(callable_): 

63 """Fix-up gRPC callable attributes. 

64 

65 gRPC callable lack the ``__name__`` attribute which causes 

66 :func:`functools.wraps` to error. This adds the attribute if needed. 

67 """ 

68 if not hasattr(callable_, "__name__"): 

69 callable_.__name__ = callable_.__class__.__name__ 

70 

71 

72def _wrap_unary_errors(callable_): 

73 """Map errors for Unary-Unary and Stream-Unary gRPC callables.""" 

74 _patch_callable_name(callable_) 

75 

76 @functools.wraps(callable_) 

77 def error_remapped_callable(*args, **kwargs): 

78 try: 

79 return callable_(*args, **kwargs) 

80 except grpc.RpcError as exc: 

81 raise exceptions.from_grpc_error(exc) from exc 

82 

83 return error_remapped_callable 

84 

85 

86class _StreamingResponseIterator(Generic[P], grpc.Call): 

87 def __init__(self, wrapped, prefetch_first_result=True): 

88 self._wrapped = wrapped 

89 

90 # This iterator is used in a retry context, and returned outside after init. 

91 # gRPC will not throw an exception until the stream is consumed, so we need 

92 # to retrieve the first result, in order to fail, in order to trigger a retry. 

93 try: 

94 if prefetch_first_result: 

95 self._stored_first_result = next(self._wrapped) 

96 except TypeError: 

97 # It is possible the wrapped method isn't an iterable (a grpc.Call 

98 # for instance). If this happens don't store the first result. 

99 pass 

100 except StopIteration: 

101 # ignore stop iteration at this time. This should be handled outside of retry. 

102 pass 

103 

104 def __iter__(self) -> Iterator[P]: 

105 """This iterator is also an iterable that returns itself.""" 

106 return self 

107 

108 def __next__(self) -> P: 

109 """Get the next response from the stream. 

110 

111 Returns: 

112 protobuf.Message: A single response from the stream. 

113 """ 

114 try: 

115 if hasattr(self, "_stored_first_result"): 

116 result = self._stored_first_result 

117 del self._stored_first_result 

118 return result 

119 return next(self._wrapped) 

120 except grpc.RpcError as exc: 

121 # If the stream has already returned data, we cannot recover here. 

122 raise exceptions.from_grpc_error(exc) from exc 

123 

124 # grpc.Call & grpc.RpcContext interface 

125 

126 def add_callback(self, callback): 

127 return self._wrapped.add_callback(callback) 

128 

129 def cancel(self): 

130 return self._wrapped.cancel() 

131 

132 def code(self): 

133 return self._wrapped.code() 

134 

135 def details(self): 

136 return self._wrapped.details() 

137 

138 def initial_metadata(self): 

139 return self._wrapped.initial_metadata() 

140 

141 def is_active(self): 

142 return self._wrapped.is_active() 

143 

144 def time_remaining(self): 

145 return self._wrapped.time_remaining() 

146 

147 def trailing_metadata(self): 

148 return self._wrapped.trailing_metadata() 

149 

150 

151# public type alias denoting the return type of streaming gapic calls 

152GrpcStream = _StreamingResponseIterator[P] 

153 

154 

155def _wrap_stream_errors(callable_): 

156 """Wrap errors for Unary-Stream and Stream-Stream gRPC callables. 

157 

158 The callables that return iterators require a bit more logic to re-map 

159 errors when iterating. This wraps both the initial invocation and the 

160 iterator of the return value to re-map errors. 

161 """ 

162 _patch_callable_name(callable_) 

163 

164 @functools.wraps(callable_) 

165 def error_remapped_callable(*args, **kwargs): 

166 try: 

167 result = callable_(*args, **kwargs) 

168 # Auto-fetching the first result causes PubSub client's streaming pull 

169 # to hang when re-opening the stream, thus we need examine the hacky 

170 # hidden flag to see if pre-fetching is disabled. 

171 # https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257 

172 prefetch_first = getattr(callable_, "_prefetch_first_result_", True) 

173 return _StreamingResponseIterator( 

174 result, prefetch_first_result=prefetch_first 

175 ) 

176 except grpc.RpcError as exc: 

177 raise exceptions.from_grpc_error(exc) from exc 

178 

179 return error_remapped_callable 

180 

181 

182def wrap_errors(callable_): 

183 """Wrap a gRPC callable and map :class:`grpc.RpcErrors` to friendly error 

184 classes. 

185 

186 Errors raised by the gRPC callable are mapped to the appropriate 

187 :class:`google.api_core.exceptions.GoogleAPICallError` subclasses. 

188 The original `grpc.RpcError` (which is usually also a `grpc.Call`) is 

189 available from the ``response`` property on the mapped exception. This 

190 is useful for extracting metadata from the original error. 

191 

192 Args: 

193 callable_ (Callable): A gRPC callable. 

194 

195 Returns: 

196 Callable: The wrapped gRPC callable. 

197 """ 

198 if isinstance(callable_, _STREAM_WRAP_CLASSES): 

199 return _wrap_stream_errors(callable_) 

200 else: 

201 return _wrap_unary_errors(callable_) 

202 

203 

204def _create_composite_credentials( 

205 credentials=None, 

206 credentials_file=None, 

207 default_scopes=None, 

208 scopes=None, 

209 ssl_credentials=None, 

210 quota_project_id=None, 

211 default_host=None, 

212): 

213 """Create the composite credentials for secure channels. 

214 

215 Args: 

216 credentials (google.auth.credentials.Credentials): The credentials. If 

217 not specified, then this function will attempt to ascertain the 

218 credentials from the environment using :func:`google.auth.default`. 

219 credentials_file (str): A file with credentials that can be loaded with 

220 :func:`google.auth.load_credentials_from_file`. This argument is 

221 mutually exclusive with credentials. 

222 default_scopes (Sequence[str]): A optional list of scopes needed for this 

223 service. These are only used when credentials are not specified and 

224 are passed to :func:`google.auth.default`. 

225 scopes (Sequence[str]): A optional list of scopes needed for this 

226 service. These are only used when credentials are not specified and 

227 are passed to :func:`google.auth.default`. 

228 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel 

229 credentials. This can be used to specify different certificates. 

230 quota_project_id (str): An optional project to use for billing and quota. 

231 default_host (str): The default endpoint. e.g., "pubsub.googleapis.com". 

232 

233 Returns: 

234 grpc.ChannelCredentials: The composed channel credentials object. 

235 

236 Raises: 

237 google.api_core.DuplicateCredentialArgs: If both a credentials object and credentials_file are passed. 

238 """ 

239 if credentials and credentials_file: 

240 raise exceptions.DuplicateCredentialArgs( 

241 "'credentials' and 'credentials_file' are mutually exclusive." 

242 ) 

243 

244 if credentials_file: 

245 credentials, _ = google.auth.load_credentials_from_file( 

246 credentials_file, scopes=scopes, default_scopes=default_scopes 

247 ) 

248 elif credentials: 

249 credentials = google.auth.credentials.with_scopes_if_required( 

250 credentials, scopes=scopes, default_scopes=default_scopes 

251 ) 

252 else: 

253 credentials, _ = google.auth.default( 

254 scopes=scopes, default_scopes=default_scopes 

255 ) 

256 

257 if quota_project_id and isinstance( 

258 credentials, google.auth.credentials.CredentialsWithQuotaProject 

259 ): 

260 credentials = credentials.with_quota_project(quota_project_id) 

261 

262 request = google.auth.transport.requests.Request() 

263 

264 # Create the metadata plugin for inserting the authorization header. 

265 metadata_plugin = google.auth.transport.grpc.AuthMetadataPlugin( 

266 credentials, 

267 request, 

268 default_host=default_host, 

269 ) 

270 

271 # Create a set of grpc.CallCredentials using the metadata plugin. 

272 google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin) 

273 

274 if ssl_credentials is None: 

275 ssl_credentials = grpc.ssl_channel_credentials() 

276 

277 # Combine the ssl credentials and the authorization credentials. 

278 return grpc.composite_channel_credentials(ssl_credentials, google_auth_credentials) 

279 

280 

281def create_channel( 

282 target, 

283 credentials=None, 

284 scopes=None, 

285 ssl_credentials=None, 

286 credentials_file=None, 

287 quota_project_id=None, 

288 default_scopes=None, 

289 default_host=None, 

290 compression=None, 

291 **kwargs, 

292): 

293 """Create a secure channel with credentials. 

294 

295 Args: 

296 target (str): The target service address in the format 'hostname:port'. 

297 credentials (google.auth.credentials.Credentials): The credentials. If 

298 not specified, then this function will attempt to ascertain the 

299 credentials from the environment using :func:`google.auth.default`. 

300 scopes (Sequence[str]): A optional list of scopes needed for this 

301 service. These are only used when credentials are not specified and 

302 are passed to :func:`google.auth.default`. 

303 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel 

304 credentials. This can be used to specify different certificates. 

305 credentials_file (str): A file with credentials that can be loaded with 

306 :func:`google.auth.load_credentials_from_file`. This argument is 

307 mutually exclusive with credentials. 

308 quota_project_id (str): An optional project to use for billing and quota. 

309 default_scopes (Sequence[str]): Default scopes passed by a Google client 

310 library. Use 'scopes' for user-defined scopes. 

311 default_host (str): The default endpoint. e.g., "pubsub.googleapis.com". 

312 compression (grpc.Compression): An optional value indicating the 

313 compression method to be used over the lifetime of the channel. 

314 kwargs: Additional key-word args passed to 

315 :func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`. 

316 Note: `grpc_gcp` is only supported in environments with protobuf < 4.0.0. 

317 

318 Returns: 

319 grpc.Channel: The created channel. 

320 

321 Raises: 

322 google.api_core.DuplicateCredentialArgs: If both a credentials object and credentials_file are passed. 

323 """ 

324 

325 composite_credentials = _create_composite_credentials( 

326 credentials=credentials, 

327 credentials_file=credentials_file, 

328 default_scopes=default_scopes, 

329 scopes=scopes, 

330 ssl_credentials=ssl_credentials, 

331 quota_project_id=quota_project_id, 

332 default_host=default_host, 

333 ) 

334 

335 if HAS_GRPC_GCP: # pragma: NO COVER 

336 if compression is not None and compression != grpc.Compression.NoCompression: 

337 _LOGGER.debug( 

338 "Compression argument is being ignored for grpc_gcp.secure_channel creation." 

339 ) 

340 return grpc_gcp.secure_channel(target, composite_credentials, **kwargs) 

341 return grpc.secure_channel( 

342 target, composite_credentials, compression=compression, **kwargs 

343 ) 

344 

345 

346_MethodCall = collections.namedtuple( 

347 "_MethodCall", ("request", "timeout", "metadata", "credentials", "compression") 

348) 

349 

350_ChannelRequest = collections.namedtuple("_ChannelRequest", ("method", "request")) 

351 

352 

353class _CallableStub(object): 

354 """Stub for the grpc.*MultiCallable interfaces.""" 

355 

356 def __init__(self, method, channel): 

357 self._method = method 

358 self._channel = channel 

359 self.response = None 

360 """Union[protobuf.Message, Callable[protobuf.Message], exception]: 

361 The response to give when invoking this callable. If this is a 

362 callable, it will be invoked with the request protobuf. If it's an 

363 exception, the exception will be raised when this is invoked. 

364 """ 

365 self.responses = None 

366 """Iterator[ 

367 Union[protobuf.Message, Callable[protobuf.Message], exception]]: 

368 An iterator of responses. If specified, self.response will be populated 

369 on each invocation by calling ``next(self.responses)``.""" 

370 self.requests = [] 

371 """List[protobuf.Message]: All requests sent to this callable.""" 

372 self.calls = [] 

373 """List[Tuple]: All invocations of this callable. Each tuple is the 

374 request, timeout, metadata, compression, and credentials.""" 

375 

376 def __call__( 

377 self, request, timeout=None, metadata=None, credentials=None, compression=None 

378 ): 

379 self._channel.requests.append(_ChannelRequest(self._method, request)) 

380 self.calls.append( 

381 _MethodCall(request, timeout, metadata, credentials, compression) 

382 ) 

383 self.requests.append(request) 

384 

385 response = self.response 

386 if self.responses is not None: 

387 if response is None: 

388 response = next(self.responses) 

389 else: 

390 raise ValueError( 

391 "{method}.response and {method}.responses are mutually " 

392 "exclusive.".format(method=self._method) 

393 ) 

394 

395 if callable(response): 

396 return response(request) 

397 

398 if isinstance(response, Exception): 

399 raise response 

400 

401 if response is not None: 

402 return response 

403 

404 raise ValueError('Method stub for "{}" has no response.'.format(self._method)) 

405 

406 

407def _simplify_method_name(method): 

408 """Simplifies a gRPC method name. 

409 

410 When gRPC invokes the channel to create a callable, it gives a full 

411 method name like "/google.pubsub.v1.Publisher/CreateTopic". This 

412 returns just the name of the method, in this case "CreateTopic". 

413 

414 Args: 

415 method (str): The name of the method. 

416 

417 Returns: 

418 str: The simplified name of the method. 

419 """ 

420 return method.rsplit("/", 1).pop() 

421 

422 

423class ChannelStub(grpc.Channel): 

424 """A testing stub for the grpc.Channel interface. 

425 

426 This can be used to test any client that eventually uses a gRPC channel 

427 to communicate. By passing in a channel stub, you can configure which 

428 responses are returned and track which requests are made. 

429 

430 For example: 

431 

432 .. code-block:: python 

433 

434 channel_stub = grpc_helpers.ChannelStub() 

435 client = FooClient(channel=channel_stub) 

436 

437 channel_stub.GetFoo.response = foo_pb2.Foo(name='bar') 

438 

439 foo = client.get_foo(labels=['baz']) 

440 

441 assert foo.name == 'bar' 

442 assert channel_stub.GetFoo.requests[0].labels = ['baz'] 

443 

444 Each method on the stub can be accessed and configured on the channel. 

445 Here's some examples of various configurations: 

446 

447 .. code-block:: python 

448 

449 # Return a basic response: 

450 

451 channel_stub.GetFoo.response = foo_pb2.Foo(name='bar') 

452 assert client.get_foo().name == 'bar' 

453 

454 # Raise an exception: 

455 channel_stub.GetFoo.response = NotFound('...') 

456 

457 with pytest.raises(NotFound): 

458 client.get_foo() 

459 

460 # Use a sequence of responses: 

461 channel_stub.GetFoo.responses = iter([ 

462 foo_pb2.Foo(name='bar'), 

463 foo_pb2.Foo(name='baz'), 

464 ]) 

465 

466 assert client.get_foo().name == 'bar' 

467 assert client.get_foo().name == 'baz' 

468 

469 # Use a callable 

470 

471 def on_get_foo(request): 

472 return foo_pb2.Foo(name='bar' + request.id) 

473 

474 channel_stub.GetFoo.response = on_get_foo 

475 

476 assert client.get_foo(id='123').name == 'bar123' 

477 """ 

478 

479 def __init__(self, responses=[]): 

480 self.requests = [] 

481 """Sequence[Tuple[str, protobuf.Message]]: A list of all requests made 

482 on this channel in order. The tuple is of method name, request 

483 message.""" 

484 self._method_stubs = {} 

485 

486 def _stub_for_method(self, method): 

487 method = _simplify_method_name(method) 

488 self._method_stubs[method] = _CallableStub(method, self) 

489 return self._method_stubs[method] 

490 

491 def __getattr__(self, key): 

492 try: 

493 return self._method_stubs[key] 

494 except KeyError: 

495 raise AttributeError 

496 

497 def unary_unary(self, method, request_serializer=None, response_deserializer=None): 

498 """grpc.Channel.unary_unary implementation.""" 

499 return self._stub_for_method(method) 

500 

501 def unary_stream(self, method, request_serializer=None, response_deserializer=None): 

502 """grpc.Channel.unary_stream implementation.""" 

503 return self._stub_for_method(method) 

504 

505 def stream_unary(self, method, request_serializer=None, response_deserializer=None): 

506 """grpc.Channel.stream_unary implementation.""" 

507 return self._stub_for_method(method) 

508 

509 def stream_stream( 

510 self, method, request_serializer=None, response_deserializer=None 

511 ): 

512 """grpc.Channel.stream_stream implementation.""" 

513 return self._stub_for_method(method) 

514 

515 def subscribe(self, callback, try_to_connect=False): 

516 """grpc.Channel.subscribe implementation.""" 

517 pass 

518 

519 def unsubscribe(self, callback): 

520 """grpc.Channel.unsubscribe implementation.""" 

521 pass 

522 

523 def close(self): 

524 """grpc.Channel.close implementation.""" 

525 pass