Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/iam_credentials_v1/services/iam_credentials/transports/grpc.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

82 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import json 

17import logging as std_logging 

18import pickle 

19import warnings 

20from typing import Callable, Dict, Optional, Sequence, Tuple, Union 

21 

22import google.auth # type: ignore 

23import google.protobuf.message 

24import grpc # type: ignore 

25import proto # type: ignore 

26from google.api_core import gapic_v1, grpc_helpers 

27from google.auth import credentials as ga_credentials # type: ignore 

28from google.auth.transport.grpc import SslCredentials # type: ignore 

29from google.protobuf.json_format import MessageToJson 

30 

31from google.cloud.iam_credentials_v1.types import common 

32 

33from .base import DEFAULT_CLIENT_INFO, IAMCredentialsTransport 

34 

35try: 

36 from google.api_core import client_logging # type: ignore 

37 

38 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

39except ImportError: # pragma: NO COVER 

40 CLIENT_LOGGING_SUPPORTED = False 

41 

42_LOGGER = std_logging.getLogger(__name__) 

43 

44 

45class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER 

46 def intercept_unary_unary(self, continuation, client_call_details, request): 

47 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

48 std_logging.DEBUG 

49 ) 

50 if logging_enabled: # pragma: NO COVER 

51 request_metadata = client_call_details.metadata 

52 if isinstance(request, proto.Message): 

53 request_payload = type(request).to_json(request) 

54 elif isinstance(request, google.protobuf.message.Message): 

55 request_payload = MessageToJson(request) 

56 else: 

57 request_payload = f"{type(request).__name__}: {pickle.dumps(request)!r}" 

58 

59 request_metadata = { 

60 key: value.decode("utf-8") if isinstance(value, bytes) else value 

61 for key, value in request_metadata 

62 } 

63 grpc_request = { 

64 "payload": request_payload, 

65 "requestMethod": "grpc", 

66 "metadata": dict(request_metadata), 

67 } 

68 _LOGGER.debug( 

69 f"Sending request for {client_call_details.method}", 

70 extra={ 

71 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

72 "rpcName": str(client_call_details.method), 

73 "request": grpc_request, 

74 "metadata": grpc_request["metadata"], 

75 }, 

76 ) 

77 response = continuation(client_call_details, request) 

78 if logging_enabled: # pragma: NO COVER 

79 response_metadata = response.trailing_metadata() 

80 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 

81 metadata = ( 

82 dict([(k, str(v)) for k, v in response_metadata]) 

83 if response_metadata 

84 else None 

85 ) 

86 result = response.result() 

87 if isinstance(result, proto.Message): 

88 response_payload = type(result).to_json(result) 

89 elif isinstance(result, google.protobuf.message.Message): 

90 response_payload = MessageToJson(result) 

91 else: 

92 response_payload = f"{type(result).__name__}: {pickle.dumps(result)!r}" 

93 grpc_response = { 

94 "payload": response_payload, 

95 "metadata": metadata, 

96 "status": "OK", 

97 } 

98 _LOGGER.debug( 

99 f"Received response for {client_call_details.method}.", 

100 extra={ 

101 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

102 "rpcName": client_call_details.method, 

103 "response": grpc_response, 

104 "metadata": grpc_response["metadata"], 

105 }, 

106 ) 

107 return response 

108 

109 

110class IAMCredentialsGrpcTransport(IAMCredentialsTransport): 

111 """gRPC backend transport for IAMCredentials. 

112 

113 A service account is a special type of Google account that 

114 belongs to your application or a virtual machine (VM), instead 

115 of to an individual end user. Your application assumes the 

116 identity of the service account to call Google APIs, so that the 

117 users aren't directly involved. 

118 

119 Service account credentials are used to temporarily assume the 

120 identity of the service account. Supported credential types 

121 include OAuth 2.0 access tokens, OpenID Connect ID tokens, 

122 self-signed JSON Web Tokens (JWTs), and more. 

123 

124 This class defines the same methods as the primary client, so the 

125 primary client can load the underlying transport implementation 

126 and call it. 

127 

128 It sends protocol buffers over the wire using gRPC (which is built on 

129 top of HTTP/2); the ``grpcio`` package must be installed. 

130 """ 

131 

132 _stubs: Dict[str, Callable] 

133 

134 def __init__( 

135 self, 

136 *, 

137 host: str = "iamcredentials.googleapis.com", 

138 credentials: Optional[ga_credentials.Credentials] = None, 

139 credentials_file: Optional[str] = None, 

140 scopes: Optional[Sequence[str]] = None, 

141 channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None, 

142 api_mtls_endpoint: Optional[str] = None, 

143 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

144 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

145 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

146 quota_project_id: Optional[str] = None, 

147 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

148 always_use_jwt_access: Optional[bool] = False, 

149 api_audience: Optional[str] = None, 

150 ) -> None: 

151 """Instantiate the transport. 

152 

153 Args: 

154 host (Optional[str]): 

155 The hostname to connect to (default: 'iamcredentials.googleapis.com'). 

156 credentials (Optional[google.auth.credentials.Credentials]): The 

157 authorization credentials to attach to requests. These 

158 credentials identify the application to the service; if none 

159 are specified, the client will attempt to ascertain the 

160 credentials from the environment. 

161 This argument is ignored if a ``channel`` instance is provided. 

162 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

163 be loaded with :func:`google.auth.load_credentials_from_file`. 

164 This argument is ignored if a ``channel`` instance is provided. 

165 This argument will be removed in the next major version of this library. 

166 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

167 ignored if a ``channel`` instance is provided. 

168 channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]): 

169 A ``Channel`` instance through which to make calls, or a Callable 

170 that constructs and returns one. If set to None, ``self.create_channel`` 

171 is used to create the channel. If a Callable is given, it will be called 

172 with the same arguments as used in ``self.create_channel``. 

173 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

174 If provided, it overrides the ``host`` argument and tries to create 

175 a mutual TLS channel with client SSL credentials from 

176 ``client_cert_source`` or application default SSL credentials. 

177 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

178 Deprecated. A callback to provide client SSL certificate bytes and 

179 private key bytes, both in PEM format. It is ignored if 

180 ``api_mtls_endpoint`` is None. 

181 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

182 for the grpc channel. It is ignored if a ``channel`` instance is provided. 

183 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

184 A callback to provide client certificate bytes and private key bytes, 

185 both in PEM format. It is used to configure a mutual TLS channel. It is 

186 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 

187 quota_project_id (Optional[str]): An optional project to use for billing 

188 and quota. 

189 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

190 The client info used to send a user-agent string along with 

191 API requests. If ``None``, then default info will be used. 

192 Generally, you only need to set this if you're developing 

193 your own client library. 

194 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

195 be used for service account credentials. 

196 api_audience (Optional[str]): The intended audience for the API calls 

197 to the service that will be set when using certain 3rd party 

198 authentication flows. Audience is typically a resource identifier. 

199 If not set, the host value will be used as a default. 

200 

201 Raises: 

202 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

203 creation failed for any reason. 

204 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

205 and ``credentials_file`` are passed. 

206 """ 

207 self._grpc_channel = None 

208 self._ssl_channel_credentials = ssl_channel_credentials 

209 self._stubs: Dict[str, Callable] = {} 

210 

211 if api_mtls_endpoint: 

212 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

213 if client_cert_source: 

214 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

215 

216 if isinstance(channel, grpc.Channel): 

217 # Ignore credentials if a channel was passed. 

218 credentials = None 

219 self._ignore_credentials = True 

220 # If a channel was explicitly provided, set it. 

221 self._grpc_channel = channel 

222 self._ssl_channel_credentials = None 

223 

224 else: 

225 if api_mtls_endpoint: 

226 host = api_mtls_endpoint 

227 

228 # Create SSL credentials with client_cert_source or application 

229 # default SSL credentials. 

230 if client_cert_source: 

231 cert, key = client_cert_source() 

232 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

233 certificate_chain=cert, private_key=key 

234 ) 

235 else: 

236 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

237 

238 else: 

239 if client_cert_source_for_mtls and not ssl_channel_credentials: 

240 cert, key = client_cert_source_for_mtls() 

241 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

242 certificate_chain=cert, private_key=key 

243 ) 

244 

245 # The base transport sets the host, credentials and scopes 

246 super().__init__( 

247 host=host, 

248 credentials=credentials, 

249 credentials_file=credentials_file, 

250 scopes=scopes, 

251 quota_project_id=quota_project_id, 

252 client_info=client_info, 

253 always_use_jwt_access=always_use_jwt_access, 

254 api_audience=api_audience, 

255 ) 

256 

257 if not self._grpc_channel: 

258 # initialize with the provided callable or the default channel 

259 channel_init = channel or type(self).create_channel 

260 self._grpc_channel = channel_init( 

261 self._host, 

262 # use the credentials which are saved 

263 credentials=self._credentials, 

264 # Set ``credentials_file`` to ``None`` here as 

265 # the credentials that we saved earlier should be used. 

266 credentials_file=None, 

267 scopes=self._scopes, 

268 ssl_credentials=self._ssl_channel_credentials, 

269 quota_project_id=quota_project_id, 

270 options=[ 

271 ("grpc.max_send_message_length", -1), 

272 ("grpc.max_receive_message_length", -1), 

273 ], 

274 ) 

275 

276 self._interceptor = _LoggingClientInterceptor() 

277 self._logged_channel = grpc.intercept_channel( 

278 self._grpc_channel, self._interceptor 

279 ) 

280 

281 # Wrap messages. This must be done after self._logged_channel exists 

282 self._prep_wrapped_messages(client_info) 

283 

284 @classmethod 

285 def create_channel( 

286 cls, 

287 host: str = "iamcredentials.googleapis.com", 

288 credentials: Optional[ga_credentials.Credentials] = None, 

289 credentials_file: Optional[str] = None, 

290 scopes: Optional[Sequence[str]] = None, 

291 quota_project_id: Optional[str] = None, 

292 **kwargs, 

293 ) -> grpc.Channel: 

294 """Create and return a gRPC channel object. 

295 Args: 

296 host (Optional[str]): The host for the channel to use. 

297 credentials (Optional[~.Credentials]): The 

298 authorization credentials to attach to requests. These 

299 credentials identify this application to the service. If 

300 none are specified, the client will attempt to ascertain 

301 the credentials from the environment. 

302 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

303 be loaded with :func:`google.auth.load_credentials_from_file`. 

304 This argument is mutually exclusive with credentials. This argument will be 

305 removed in the next major version of this library. 

306 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

307 service. These are only used when credentials are not specified and 

308 are passed to :func:`google.auth.default`. 

309 quota_project_id (Optional[str]): An optional project to use for billing 

310 and quota. 

311 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

312 channel creation. 

313 Returns: 

314 grpc.Channel: A gRPC channel object. 

315 

316 Raises: 

317 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

318 and ``credentials_file`` are passed. 

319 """ 

320 

321 return grpc_helpers.create_channel( 

322 host, 

323 credentials=credentials, 

324 credentials_file=credentials_file, 

325 quota_project_id=quota_project_id, 

326 default_scopes=cls.AUTH_SCOPES, 

327 scopes=scopes, 

328 default_host=cls.DEFAULT_HOST, 

329 **kwargs, 

330 ) 

331 

332 @property 

333 def grpc_channel(self) -> grpc.Channel: 

334 """Return the channel designed to connect to this service.""" 

335 return self._grpc_channel 

336 

337 @property 

338 def generate_access_token( 

339 self, 

340 ) -> Callable[ 

341 [common.GenerateAccessTokenRequest], common.GenerateAccessTokenResponse 

342 ]: 

343 r"""Return a callable for the generate access token method over gRPC. 

344 

345 Generates an OAuth 2.0 access token for a service 

346 account. 

347 

348 Returns: 

349 Callable[[~.GenerateAccessTokenRequest], 

350 ~.GenerateAccessTokenResponse]: 

351 A function that, when called, will call the underlying RPC 

352 on the server. 

353 """ 

354 # Generate a "stub function" on-the-fly which will actually make 

355 # the request. 

356 # gRPC handles serialization and deserialization, so we just need 

357 # to pass in the functions for each. 

358 if "generate_access_token" not in self._stubs: 

359 self._stubs["generate_access_token"] = self._logged_channel.unary_unary( 

360 "/google.iam.credentials.v1.IAMCredentials/GenerateAccessToken", 

361 request_serializer=common.GenerateAccessTokenRequest.serialize, 

362 response_deserializer=common.GenerateAccessTokenResponse.deserialize, 

363 ) 

364 return self._stubs["generate_access_token"] 

365 

366 @property 

367 def generate_id_token( 

368 self, 

369 ) -> Callable[[common.GenerateIdTokenRequest], common.GenerateIdTokenResponse]: 

370 r"""Return a callable for the generate id token method over gRPC. 

371 

372 Generates an OpenID Connect ID token for a service 

373 account. 

374 

375 Returns: 

376 Callable[[~.GenerateIdTokenRequest], 

377 ~.GenerateIdTokenResponse]: 

378 A function that, when called, will call the underlying RPC 

379 on the server. 

380 """ 

381 # Generate a "stub function" on-the-fly which will actually make 

382 # the request. 

383 # gRPC handles serialization and deserialization, so we just need 

384 # to pass in the functions for each. 

385 if "generate_id_token" not in self._stubs: 

386 self._stubs["generate_id_token"] = self._logged_channel.unary_unary( 

387 "/google.iam.credentials.v1.IAMCredentials/GenerateIdToken", 

388 request_serializer=common.GenerateIdTokenRequest.serialize, 

389 response_deserializer=common.GenerateIdTokenResponse.deserialize, 

390 ) 

391 return self._stubs["generate_id_token"] 

392 

393 @property 

394 def sign_blob(self) -> Callable[[common.SignBlobRequest], common.SignBlobResponse]: 

395 r"""Return a callable for the sign blob method over gRPC. 

396 

397 Signs a blob using a service account's system-managed 

398 private key. 

399 

400 Returns: 

401 Callable[[~.SignBlobRequest], 

402 ~.SignBlobResponse]: 

403 A function that, when called, will call the underlying RPC 

404 on the server. 

405 """ 

406 # Generate a "stub function" on-the-fly which will actually make 

407 # the request. 

408 # gRPC handles serialization and deserialization, so we just need 

409 # to pass in the functions for each. 

410 if "sign_blob" not in self._stubs: 

411 self._stubs["sign_blob"] = self._logged_channel.unary_unary( 

412 "/google.iam.credentials.v1.IAMCredentials/SignBlob", 

413 request_serializer=common.SignBlobRequest.serialize, 

414 response_deserializer=common.SignBlobResponse.deserialize, 

415 ) 

416 return self._stubs["sign_blob"] 

417 

418 @property 

419 def sign_jwt(self) -> Callable[[common.SignJwtRequest], common.SignJwtResponse]: 

420 r"""Return a callable for the sign jwt method over gRPC. 

421 

422 Signs a JWT using a service account's system-managed 

423 private key. 

424 

425 Returns: 

426 Callable[[~.SignJwtRequest], 

427 ~.SignJwtResponse]: 

428 A function that, when called, will call the underlying RPC 

429 on the server. 

430 """ 

431 # Generate a "stub function" on-the-fly which will actually make 

432 # the request. 

433 # gRPC handles serialization and deserialization, so we just need 

434 # to pass in the functions for each. 

435 if "sign_jwt" not in self._stubs: 

436 self._stubs["sign_jwt"] = self._logged_channel.unary_unary( 

437 "/google.iam.credentials.v1.IAMCredentials/SignJwt", 

438 request_serializer=common.SignJwtRequest.serialize, 

439 response_deserializer=common.SignJwtResponse.deserialize, 

440 ) 

441 return self._stubs["sign_jwt"] 

442 

443 def close(self): 

444 self._logged_channel.close() 

445 

446 @property 

447 def kind(self) -> str: 

448 return "grpc" 

449 

450 

451__all__ = ("IAMCredentialsGrpcTransport",)