Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/iam_credentials_v1/services/iam_credentials/transports/rest.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

187 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import dataclasses 

17import json # type: ignore 

18import logging 

19from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

20import warnings 

21 

22from google.api_core import exceptions as core_exceptions 

23from google.api_core import gapic_v1, rest_helpers, rest_streaming 

24from google.api_core import retry as retries 

25from google.auth import credentials as ga_credentials # type: ignore 

26from google.auth.transport.requests import AuthorizedSession # type: ignore 

27import google.protobuf 

28from google.protobuf import json_format 

29from requests import __version__ as requests_version 

30 

31from google.cloud.iam_credentials_v1.types import common 

32 

33from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO 

34from .rest_base import _BaseIAMCredentialsRestTransport 

35 

36try: 

37 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] 

38except AttributeError: # pragma: NO COVER 

39 OptionalRetry = Union[retries.Retry, object, None] # type: ignore 

40 

41try: 

42 from google.api_core import client_logging # type: ignore 

43 

44 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

45except ImportError: # pragma: NO COVER 

46 CLIENT_LOGGING_SUPPORTED = False 

47 

48_LOGGER = logging.getLogger(__name__) 

49 

50DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

51 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

52 grpc_version=None, 

53 rest_version=f"requests@{requests_version}", 

54) 

55 

56if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

57 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

58 

59 

60class IAMCredentialsRestInterceptor: 

61 """Interceptor for IAMCredentials. 

62 

63 Interceptors are used to manipulate requests, request metadata, and responses 

64 in arbitrary ways. 

65 Example use cases include: 

66 * Logging 

67 * Verifying requests according to service or custom semantics 

68 * Stripping extraneous information from responses 

69 

70 These use cases and more can be enabled by injecting an 

71 instance of a custom subclass when constructing the IAMCredentialsRestTransport. 

72 

73 .. code-block:: python 

74 class MyCustomIAMCredentialsInterceptor(IAMCredentialsRestInterceptor): 

75 def pre_generate_access_token(self, request, metadata): 

76 logging.log(f"Received request: {request}") 

77 return request, metadata 

78 

79 def post_generate_access_token(self, response): 

80 logging.log(f"Received response: {response}") 

81 return response 

82 

83 def pre_generate_id_token(self, request, metadata): 

84 logging.log(f"Received request: {request}") 

85 return request, metadata 

86 

87 def post_generate_id_token(self, response): 

88 logging.log(f"Received response: {response}") 

89 return response 

90 

91 def pre_sign_blob(self, request, metadata): 

92 logging.log(f"Received request: {request}") 

93 return request, metadata 

94 

95 def post_sign_blob(self, response): 

96 logging.log(f"Received response: {response}") 

97 return response 

98 

99 def pre_sign_jwt(self, request, metadata): 

100 logging.log(f"Received request: {request}") 

101 return request, metadata 

102 

103 def post_sign_jwt(self, response): 

104 logging.log(f"Received response: {response}") 

105 return response 

106 

107 transport = IAMCredentialsRestTransport(interceptor=MyCustomIAMCredentialsInterceptor()) 

108 client = IAMCredentialsClient(transport=transport) 

109 

110 

111 """ 

112 

113 def pre_generate_access_token( 

114 self, 

115 request: common.GenerateAccessTokenRequest, 

116 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

117 ) -> Tuple[ 

118 common.GenerateAccessTokenRequest, Sequence[Tuple[str, Union[str, bytes]]] 

119 ]: 

120 """Pre-rpc interceptor for generate_access_token 

121 

122 Override in a subclass to manipulate the request or metadata 

123 before they are sent to the IAMCredentials server. 

124 """ 

125 return request, metadata 

126 

127 def post_generate_access_token( 

128 self, response: common.GenerateAccessTokenResponse 

129 ) -> common.GenerateAccessTokenResponse: 

130 """Post-rpc interceptor for generate_access_token 

131 

132 DEPRECATED. Please use the `post_generate_access_token_with_metadata` 

133 interceptor instead. 

134 

135 Override in a subclass to read or manipulate the response 

136 after it is returned by the IAMCredentials server but before 

137 it is returned to user code. This `post_generate_access_token` interceptor runs 

138 before the `post_generate_access_token_with_metadata` interceptor. 

139 """ 

140 return response 

141 

142 def post_generate_access_token_with_metadata( 

143 self, 

144 response: common.GenerateAccessTokenResponse, 

145 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

146 ) -> Tuple[ 

147 common.GenerateAccessTokenResponse, Sequence[Tuple[str, Union[str, bytes]]] 

148 ]: 

149 """Post-rpc interceptor for generate_access_token 

150 

151 Override in a subclass to read or manipulate the response or metadata after it 

152 is returned by the IAMCredentials server but before it is returned to user code. 

153 

154 We recommend only using this `post_generate_access_token_with_metadata` 

155 interceptor in new development instead of the `post_generate_access_token` interceptor. 

156 When both interceptors are used, this `post_generate_access_token_with_metadata` interceptor runs after the 

157 `post_generate_access_token` interceptor. The (possibly modified) response returned by 

158 `post_generate_access_token` will be passed to 

159 `post_generate_access_token_with_metadata`. 

160 """ 

161 return response, metadata 

162 

163 def pre_generate_id_token( 

164 self, 

165 request: common.GenerateIdTokenRequest, 

166 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

167 ) -> Tuple[common.GenerateIdTokenRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

168 """Pre-rpc interceptor for generate_id_token 

169 

170 Override in a subclass to manipulate the request or metadata 

171 before they are sent to the IAMCredentials server. 

172 """ 

173 return request, metadata 

174 

175 def post_generate_id_token( 

176 self, response: common.GenerateIdTokenResponse 

177 ) -> common.GenerateIdTokenResponse: 

178 """Post-rpc interceptor for generate_id_token 

179 

180 DEPRECATED. Please use the `post_generate_id_token_with_metadata` 

181 interceptor instead. 

182 

183 Override in a subclass to read or manipulate the response 

184 after it is returned by the IAMCredentials server but before 

185 it is returned to user code. This `post_generate_id_token` interceptor runs 

186 before the `post_generate_id_token_with_metadata` interceptor. 

187 """ 

188 return response 

189 

190 def post_generate_id_token_with_metadata( 

191 self, 

192 response: common.GenerateIdTokenResponse, 

193 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

194 ) -> Tuple[common.GenerateIdTokenResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

195 """Post-rpc interceptor for generate_id_token 

196 

197 Override in a subclass to read or manipulate the response or metadata after it 

198 is returned by the IAMCredentials server but before it is returned to user code. 

199 

200 We recommend only using this `post_generate_id_token_with_metadata` 

201 interceptor in new development instead of the `post_generate_id_token` interceptor. 

202 When both interceptors are used, this `post_generate_id_token_with_metadata` interceptor runs after the 

203 `post_generate_id_token` interceptor. The (possibly modified) response returned by 

204 `post_generate_id_token` will be passed to 

205 `post_generate_id_token_with_metadata`. 

206 """ 

207 return response, metadata 

208 

209 def pre_sign_blob( 

210 self, 

211 request: common.SignBlobRequest, 

212 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

213 ) -> Tuple[common.SignBlobRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

214 """Pre-rpc interceptor for sign_blob 

215 

216 Override in a subclass to manipulate the request or metadata 

217 before they are sent to the IAMCredentials server. 

218 """ 

219 return request, metadata 

220 

221 def post_sign_blob( 

222 self, response: common.SignBlobResponse 

223 ) -> common.SignBlobResponse: 

224 """Post-rpc interceptor for sign_blob 

225 

226 DEPRECATED. Please use the `post_sign_blob_with_metadata` 

227 interceptor instead. 

228 

229 Override in a subclass to read or manipulate the response 

230 after it is returned by the IAMCredentials server but before 

231 it is returned to user code. This `post_sign_blob` interceptor runs 

232 before the `post_sign_blob_with_metadata` interceptor. 

233 """ 

234 return response 

235 

236 def post_sign_blob_with_metadata( 

237 self, 

238 response: common.SignBlobResponse, 

239 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

240 ) -> Tuple[common.SignBlobResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

241 """Post-rpc interceptor for sign_blob 

242 

243 Override in a subclass to read or manipulate the response or metadata after it 

244 is returned by the IAMCredentials server but before it is returned to user code. 

245 

246 We recommend only using this `post_sign_blob_with_metadata` 

247 interceptor in new development instead of the `post_sign_blob` interceptor. 

248 When both interceptors are used, this `post_sign_blob_with_metadata` interceptor runs after the 

249 `post_sign_blob` interceptor. The (possibly modified) response returned by 

250 `post_sign_blob` will be passed to 

251 `post_sign_blob_with_metadata`. 

252 """ 

253 return response, metadata 

254 

255 def pre_sign_jwt( 

256 self, 

257 request: common.SignJwtRequest, 

258 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

259 ) -> Tuple[common.SignJwtRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

260 """Pre-rpc interceptor for sign_jwt 

261 

262 Override in a subclass to manipulate the request or metadata 

263 before they are sent to the IAMCredentials server. 

264 """ 

265 return request, metadata 

266 

267 def post_sign_jwt(self, response: common.SignJwtResponse) -> common.SignJwtResponse: 

268 """Post-rpc interceptor for sign_jwt 

269 

270 DEPRECATED. Please use the `post_sign_jwt_with_metadata` 

271 interceptor instead. 

272 

273 Override in a subclass to read or manipulate the response 

274 after it is returned by the IAMCredentials server but before 

275 it is returned to user code. This `post_sign_jwt` interceptor runs 

276 before the `post_sign_jwt_with_metadata` interceptor. 

277 """ 

278 return response 

279 

280 def post_sign_jwt_with_metadata( 

281 self, 

282 response: common.SignJwtResponse, 

283 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

284 ) -> Tuple[common.SignJwtResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

285 """Post-rpc interceptor for sign_jwt 

286 

287 Override in a subclass to read or manipulate the response or metadata after it 

288 is returned by the IAMCredentials server but before it is returned to user code. 

289 

290 We recommend only using this `post_sign_jwt_with_metadata` 

291 interceptor in new development instead of the `post_sign_jwt` interceptor. 

292 When both interceptors are used, this `post_sign_jwt_with_metadata` interceptor runs after the 

293 `post_sign_jwt` interceptor. The (possibly modified) response returned by 

294 `post_sign_jwt` will be passed to 

295 `post_sign_jwt_with_metadata`. 

296 """ 

297 return response, metadata 

298 

299 

300@dataclasses.dataclass 

301class IAMCredentialsRestStub: 

302 _session: AuthorizedSession 

303 _host: str 

304 _interceptor: IAMCredentialsRestInterceptor 

305 

306 

307class IAMCredentialsRestTransport(_BaseIAMCredentialsRestTransport): 

308 """REST backend synchronous transport for IAMCredentials. 

309 

310 A service account is a special type of Google account that 

311 belongs to your application or a virtual machine (VM), instead 

312 of to an individual end user. Your application assumes the 

313 identity of the service account to call Google APIs, so that the 

314 users aren't directly involved. 

315 

316 Service account credentials are used to temporarily assume the 

317 identity of the service account. Supported credential types 

318 include OAuth 2.0 access tokens, OpenID Connect ID tokens, 

319 self-signed JSON Web Tokens (JWTs), and more. 

320 

321 This class defines the same methods as the primary client, so the 

322 primary client can load the underlying transport implementation 

323 and call it. 

324 

325 It sends JSON representations of protocol buffers over HTTP/1.1 

326 """ 

327 

328 def __init__( 

329 self, 

330 *, 

331 host: str = "iamcredentials.googleapis.com", 

332 credentials: Optional[ga_credentials.Credentials] = None, 

333 credentials_file: Optional[str] = None, 

334 scopes: Optional[Sequence[str]] = None, 

335 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

336 quota_project_id: Optional[str] = None, 

337 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

338 always_use_jwt_access: Optional[bool] = False, 

339 url_scheme: str = "https", 

340 interceptor: Optional[IAMCredentialsRestInterceptor] = None, 

341 api_audience: Optional[str] = None, 

342 ) -> None: 

343 """Instantiate the transport. 

344 

345 Args: 

346 host (Optional[str]): 

347 The hostname to connect to (default: 'iamcredentials.googleapis.com'). 

348 credentials (Optional[google.auth.credentials.Credentials]): The 

349 authorization credentials to attach to requests. These 

350 credentials identify the application to the service; if none 

351 are specified, the client will attempt to ascertain the 

352 credentials from the environment. 

353 

354 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

355 be loaded with :func:`google.auth.load_credentials_from_file`. 

356 This argument is ignored if ``channel`` is provided. This argument will be 

357 removed in the next major version of this library. 

358 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

359 ignored if ``channel`` is provided. 

360 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

361 certificate to configure mutual TLS HTTP channel. It is ignored 

362 if ``channel`` is provided. 

363 quota_project_id (Optional[str]): An optional project to use for billing 

364 and quota. 

365 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

366 The client info used to send a user-agent string along with 

367 API requests. If ``None``, then default info will be used. 

368 Generally, you only need to set this if you are developing 

369 your own client library. 

370 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

371 be used for service account credentials. 

372 url_scheme: the protocol scheme for the API endpoint. Normally 

373 "https", but for testing or local servers, 

374 "http" can be specified. 

375 """ 

376 # Run the base constructor 

377 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

378 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

379 # credentials object 

380 super().__init__( 

381 host=host, 

382 credentials=credentials, 

383 client_info=client_info, 

384 always_use_jwt_access=always_use_jwt_access, 

385 url_scheme=url_scheme, 

386 api_audience=api_audience, 

387 ) 

388 self._session = AuthorizedSession( 

389 self._credentials, default_host=self.DEFAULT_HOST 

390 ) 

391 if client_cert_source_for_mtls: 

392 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

393 self._interceptor = interceptor or IAMCredentialsRestInterceptor() 

394 self._prep_wrapped_messages(client_info) 

395 

396 class _GenerateAccessToken( 

397 _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken, 

398 IAMCredentialsRestStub, 

399 ): 

400 def __hash__(self): 

401 return hash("IAMCredentialsRestTransport.GenerateAccessToken") 

402 

403 @staticmethod 

404 def _get_response( 

405 host, 

406 metadata, 

407 query_params, 

408 session, 

409 timeout, 

410 transcoded_request, 

411 body=None, 

412 ): 

413 uri = transcoded_request["uri"] 

414 method = transcoded_request["method"] 

415 headers = dict(metadata) 

416 headers["Content-Type"] = "application/json" 

417 response = getattr(session, method)( 

418 "{host}{uri}".format(host=host, uri=uri), 

419 timeout=timeout, 

420 headers=headers, 

421 params=rest_helpers.flatten_query_params(query_params, strict=True), 

422 data=body, 

423 ) 

424 return response 

425 

426 def __call__( 

427 self, 

428 request: common.GenerateAccessTokenRequest, 

429 *, 

430 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

431 timeout: Optional[float] = None, 

432 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

433 ) -> common.GenerateAccessTokenResponse: 

434 r"""Call the generate access token method over HTTP. 

435 

436 Args: 

437 request (~.common.GenerateAccessTokenRequest): 

438 The request object. 

439 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

440 should be retried. 

441 timeout (float): The timeout for this request. 

442 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

443 sent along with the request as metadata. Normally, each value must be of type `str`, 

444 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

445 be of type `bytes`. 

446 

447 Returns: 

448 ~.common.GenerateAccessTokenResponse: 

449 

450 """ 

451 

452 http_options = ( 

453 _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_http_options() 

454 ) 

455 

456 request, metadata = self._interceptor.pre_generate_access_token( 

457 request, metadata 

458 ) 

459 transcoded_request = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_transcoded_request( 

460 http_options, request 

461 ) 

462 

463 body = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_request_body_json( 

464 transcoded_request 

465 ) 

466 

467 # Jsonify the query params 

468 query_params = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_query_params_json( 

469 transcoded_request 

470 ) 

471 

472 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

473 logging.DEBUG 

474 ): # pragma: NO COVER 

475 request_url = "{host}{uri}".format( 

476 host=self._host, uri=transcoded_request["uri"] 

477 ) 

478 method = transcoded_request["method"] 

479 try: 

480 request_payload = type(request).to_json(request) 

481 except: 

482 request_payload = None 

483 http_request = { 

484 "payload": request_payload, 

485 "requestMethod": method, 

486 "requestUrl": request_url, 

487 "headers": dict(metadata), 

488 } 

489 _LOGGER.debug( 

490 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.GenerateAccessToken", 

491 extra={ 

492 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

493 "rpcName": "GenerateAccessToken", 

494 "httpRequest": http_request, 

495 "metadata": http_request["headers"], 

496 }, 

497 ) 

498 

499 # Send the request 

500 response = IAMCredentialsRestTransport._GenerateAccessToken._get_response( 

501 self._host, 

502 metadata, 

503 query_params, 

504 self._session, 

505 timeout, 

506 transcoded_request, 

507 body, 

508 ) 

509 

510 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

511 # subclass. 

512 if response.status_code >= 400: 

513 raise core_exceptions.from_http_response(response) 

514 

515 # Return the response 

516 resp = common.GenerateAccessTokenResponse() 

517 pb_resp = common.GenerateAccessTokenResponse.pb(resp) 

518 

519 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

520 

521 resp = self._interceptor.post_generate_access_token(resp) 

522 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

523 resp, _ = self._interceptor.post_generate_access_token_with_metadata( 

524 resp, response_metadata 

525 ) 

526 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

527 logging.DEBUG 

528 ): # pragma: NO COVER 

529 try: 

530 response_payload = common.GenerateAccessTokenResponse.to_json( 

531 response 

532 ) 

533 except: 

534 response_payload = None 

535 http_response = { 

536 "payload": response_payload, 

537 "headers": dict(response.headers), 

538 "status": response.status_code, 

539 } 

540 _LOGGER.debug( 

541 "Received response for google.iam.credentials_v1.IAMCredentialsClient.generate_access_token", 

542 extra={ 

543 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

544 "rpcName": "GenerateAccessToken", 

545 "metadata": http_response["headers"], 

546 "httpResponse": http_response, 

547 }, 

548 ) 

549 return resp 

550 

551 class _GenerateIdToken( 

552 _BaseIAMCredentialsRestTransport._BaseGenerateIdToken, IAMCredentialsRestStub 

553 ): 

554 def __hash__(self): 

555 return hash("IAMCredentialsRestTransport.GenerateIdToken") 

556 

557 @staticmethod 

558 def _get_response( 

559 host, 

560 metadata, 

561 query_params, 

562 session, 

563 timeout, 

564 transcoded_request, 

565 body=None, 

566 ): 

567 uri = transcoded_request["uri"] 

568 method = transcoded_request["method"] 

569 headers = dict(metadata) 

570 headers["Content-Type"] = "application/json" 

571 response = getattr(session, method)( 

572 "{host}{uri}".format(host=host, uri=uri), 

573 timeout=timeout, 

574 headers=headers, 

575 params=rest_helpers.flatten_query_params(query_params, strict=True), 

576 data=body, 

577 ) 

578 return response 

579 

580 def __call__( 

581 self, 

582 request: common.GenerateIdTokenRequest, 

583 *, 

584 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

585 timeout: Optional[float] = None, 

586 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

587 ) -> common.GenerateIdTokenResponse: 

588 r"""Call the generate id token method over HTTP. 

589 

590 Args: 

591 request (~.common.GenerateIdTokenRequest): 

592 The request object. 

593 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

594 should be retried. 

595 timeout (float): The timeout for this request. 

596 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

597 sent along with the request as metadata. Normally, each value must be of type `str`, 

598 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

599 be of type `bytes`. 

600 

601 Returns: 

602 ~.common.GenerateIdTokenResponse: 

603 

604 """ 

605 

606 http_options = ( 

607 _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_http_options() 

608 ) 

609 

610 request, metadata = self._interceptor.pre_generate_id_token( 

611 request, metadata 

612 ) 

613 transcoded_request = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_transcoded_request( 

614 http_options, request 

615 ) 

616 

617 body = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_request_body_json( 

618 transcoded_request 

619 ) 

620 

621 # Jsonify the query params 

622 query_params = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_query_params_json( 

623 transcoded_request 

624 ) 

625 

626 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

627 logging.DEBUG 

628 ): # pragma: NO COVER 

629 request_url = "{host}{uri}".format( 

630 host=self._host, uri=transcoded_request["uri"] 

631 ) 

632 method = transcoded_request["method"] 

633 try: 

634 request_payload = type(request).to_json(request) 

635 except: 

636 request_payload = None 

637 http_request = { 

638 "payload": request_payload, 

639 "requestMethod": method, 

640 "requestUrl": request_url, 

641 "headers": dict(metadata), 

642 } 

643 _LOGGER.debug( 

644 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.GenerateIdToken", 

645 extra={ 

646 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

647 "rpcName": "GenerateIdToken", 

648 "httpRequest": http_request, 

649 "metadata": http_request["headers"], 

650 }, 

651 ) 

652 

653 # Send the request 

654 response = IAMCredentialsRestTransport._GenerateIdToken._get_response( 

655 self._host, 

656 metadata, 

657 query_params, 

658 self._session, 

659 timeout, 

660 transcoded_request, 

661 body, 

662 ) 

663 

664 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

665 # subclass. 

666 if response.status_code >= 400: 

667 raise core_exceptions.from_http_response(response) 

668 

669 # Return the response 

670 resp = common.GenerateIdTokenResponse() 

671 pb_resp = common.GenerateIdTokenResponse.pb(resp) 

672 

673 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

674 

675 resp = self._interceptor.post_generate_id_token(resp) 

676 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

677 resp, _ = self._interceptor.post_generate_id_token_with_metadata( 

678 resp, response_metadata 

679 ) 

680 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

681 logging.DEBUG 

682 ): # pragma: NO COVER 

683 try: 

684 response_payload = common.GenerateIdTokenResponse.to_json(response) 

685 except: 

686 response_payload = None 

687 http_response = { 

688 "payload": response_payload, 

689 "headers": dict(response.headers), 

690 "status": response.status_code, 

691 } 

692 _LOGGER.debug( 

693 "Received response for google.iam.credentials_v1.IAMCredentialsClient.generate_id_token", 

694 extra={ 

695 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

696 "rpcName": "GenerateIdToken", 

697 "metadata": http_response["headers"], 

698 "httpResponse": http_response, 

699 }, 

700 ) 

701 return resp 

702 

703 class _SignBlob( 

704 _BaseIAMCredentialsRestTransport._BaseSignBlob, IAMCredentialsRestStub 

705 ): 

706 def __hash__(self): 

707 return hash("IAMCredentialsRestTransport.SignBlob") 

708 

709 @staticmethod 

710 def _get_response( 

711 host, 

712 metadata, 

713 query_params, 

714 session, 

715 timeout, 

716 transcoded_request, 

717 body=None, 

718 ): 

719 uri = transcoded_request["uri"] 

720 method = transcoded_request["method"] 

721 headers = dict(metadata) 

722 headers["Content-Type"] = "application/json" 

723 response = getattr(session, method)( 

724 "{host}{uri}".format(host=host, uri=uri), 

725 timeout=timeout, 

726 headers=headers, 

727 params=rest_helpers.flatten_query_params(query_params, strict=True), 

728 data=body, 

729 ) 

730 return response 

731 

732 def __call__( 

733 self, 

734 request: common.SignBlobRequest, 

735 *, 

736 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

737 timeout: Optional[float] = None, 

738 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

739 ) -> common.SignBlobResponse: 

740 r"""Call the sign blob method over HTTP. 

741 

742 Args: 

743 request (~.common.SignBlobRequest): 

744 The request object. 

745 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

746 should be retried. 

747 timeout (float): The timeout for this request. 

748 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

749 sent along with the request as metadata. Normally, each value must be of type `str`, 

750 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

751 be of type `bytes`. 

752 

753 Returns: 

754 ~.common.SignBlobResponse: 

755 

756 """ 

757 

758 http_options = ( 

759 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_http_options() 

760 ) 

761 

762 request, metadata = self._interceptor.pre_sign_blob(request, metadata) 

763 transcoded_request = ( 

764 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_transcoded_request( 

765 http_options, request 

766 ) 

767 ) 

768 

769 body = ( 

770 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_request_body_json( 

771 transcoded_request 

772 ) 

773 ) 

774 

775 # Jsonify the query params 

776 query_params = ( 

777 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_query_params_json( 

778 transcoded_request 

779 ) 

780 ) 

781 

782 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

783 logging.DEBUG 

784 ): # pragma: NO COVER 

785 request_url = "{host}{uri}".format( 

786 host=self._host, uri=transcoded_request["uri"] 

787 ) 

788 method = transcoded_request["method"] 

789 try: 

790 request_payload = type(request).to_json(request) 

791 except: 

792 request_payload = None 

793 http_request = { 

794 "payload": request_payload, 

795 "requestMethod": method, 

796 "requestUrl": request_url, 

797 "headers": dict(metadata), 

798 } 

799 _LOGGER.debug( 

800 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.SignBlob", 

801 extra={ 

802 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

803 "rpcName": "SignBlob", 

804 "httpRequest": http_request, 

805 "metadata": http_request["headers"], 

806 }, 

807 ) 

808 

809 # Send the request 

810 response = IAMCredentialsRestTransport._SignBlob._get_response( 

811 self._host, 

812 metadata, 

813 query_params, 

814 self._session, 

815 timeout, 

816 transcoded_request, 

817 body, 

818 ) 

819 

820 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

821 # subclass. 

822 if response.status_code >= 400: 

823 raise core_exceptions.from_http_response(response) 

824 

825 # Return the response 

826 resp = common.SignBlobResponse() 

827 pb_resp = common.SignBlobResponse.pb(resp) 

828 

829 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

830 

831 resp = self._interceptor.post_sign_blob(resp) 

832 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

833 resp, _ = self._interceptor.post_sign_blob_with_metadata( 

834 resp, response_metadata 

835 ) 

836 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

837 logging.DEBUG 

838 ): # pragma: NO COVER 

839 try: 

840 response_payload = common.SignBlobResponse.to_json(response) 

841 except: 

842 response_payload = None 

843 http_response = { 

844 "payload": response_payload, 

845 "headers": dict(response.headers), 

846 "status": response.status_code, 

847 } 

848 _LOGGER.debug( 

849 "Received response for google.iam.credentials_v1.IAMCredentialsClient.sign_blob", 

850 extra={ 

851 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

852 "rpcName": "SignBlob", 

853 "metadata": http_response["headers"], 

854 "httpResponse": http_response, 

855 }, 

856 ) 

857 return resp 

858 

859 class _SignJwt( 

860 _BaseIAMCredentialsRestTransport._BaseSignJwt, IAMCredentialsRestStub 

861 ): 

862 def __hash__(self): 

863 return hash("IAMCredentialsRestTransport.SignJwt") 

864 

865 @staticmethod 

866 def _get_response( 

867 host, 

868 metadata, 

869 query_params, 

870 session, 

871 timeout, 

872 transcoded_request, 

873 body=None, 

874 ): 

875 uri = transcoded_request["uri"] 

876 method = transcoded_request["method"] 

877 headers = dict(metadata) 

878 headers["Content-Type"] = "application/json" 

879 response = getattr(session, method)( 

880 "{host}{uri}".format(host=host, uri=uri), 

881 timeout=timeout, 

882 headers=headers, 

883 params=rest_helpers.flatten_query_params(query_params, strict=True), 

884 data=body, 

885 ) 

886 return response 

887 

888 def __call__( 

889 self, 

890 request: common.SignJwtRequest, 

891 *, 

892 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

893 timeout: Optional[float] = None, 

894 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

895 ) -> common.SignJwtResponse: 

896 r"""Call the sign jwt method over HTTP. 

897 

898 Args: 

899 request (~.common.SignJwtRequest): 

900 The request object. 

901 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

902 should be retried. 

903 timeout (float): The timeout for this request. 

904 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

905 sent along with the request as metadata. Normally, each value must be of type `str`, 

906 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

907 be of type `bytes`. 

908 

909 Returns: 

910 ~.common.SignJwtResponse: 

911 

912 """ 

913 

914 http_options = ( 

915 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_http_options() 

916 ) 

917 

918 request, metadata = self._interceptor.pre_sign_jwt(request, metadata) 

919 transcoded_request = ( 

920 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_transcoded_request( 

921 http_options, request 

922 ) 

923 ) 

924 

925 body = _BaseIAMCredentialsRestTransport._BaseSignJwt._get_request_body_json( 

926 transcoded_request 

927 ) 

928 

929 # Jsonify the query params 

930 query_params = ( 

931 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_query_params_json( 

932 transcoded_request 

933 ) 

934 ) 

935 

936 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

937 logging.DEBUG 

938 ): # pragma: NO COVER 

939 request_url = "{host}{uri}".format( 

940 host=self._host, uri=transcoded_request["uri"] 

941 ) 

942 method = transcoded_request["method"] 

943 try: 

944 request_payload = type(request).to_json(request) 

945 except: 

946 request_payload = None 

947 http_request = { 

948 "payload": request_payload, 

949 "requestMethod": method, 

950 "requestUrl": request_url, 

951 "headers": dict(metadata), 

952 } 

953 _LOGGER.debug( 

954 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.SignJwt", 

955 extra={ 

956 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

957 "rpcName": "SignJwt", 

958 "httpRequest": http_request, 

959 "metadata": http_request["headers"], 

960 }, 

961 ) 

962 

963 # Send the request 

964 response = IAMCredentialsRestTransport._SignJwt._get_response( 

965 self._host, 

966 metadata, 

967 query_params, 

968 self._session, 

969 timeout, 

970 transcoded_request, 

971 body, 

972 ) 

973 

974 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

975 # subclass. 

976 if response.status_code >= 400: 

977 raise core_exceptions.from_http_response(response) 

978 

979 # Return the response 

980 resp = common.SignJwtResponse() 

981 pb_resp = common.SignJwtResponse.pb(resp) 

982 

983 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

984 

985 resp = self._interceptor.post_sign_jwt(resp) 

986 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

987 resp, _ = self._interceptor.post_sign_jwt_with_metadata( 

988 resp, response_metadata 

989 ) 

990 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

991 logging.DEBUG 

992 ): # pragma: NO COVER 

993 try: 

994 response_payload = common.SignJwtResponse.to_json(response) 

995 except: 

996 response_payload = None 

997 http_response = { 

998 "payload": response_payload, 

999 "headers": dict(response.headers), 

1000 "status": response.status_code, 

1001 } 

1002 _LOGGER.debug( 

1003 "Received response for google.iam.credentials_v1.IAMCredentialsClient.sign_jwt", 

1004 extra={ 

1005 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

1006 "rpcName": "SignJwt", 

1007 "metadata": http_response["headers"], 

1008 "httpResponse": http_response, 

1009 }, 

1010 ) 

1011 return resp 

1012 

1013 @property 

1014 def generate_access_token( 

1015 self, 

1016 ) -> Callable[ 

1017 [common.GenerateAccessTokenRequest], common.GenerateAccessTokenResponse 

1018 ]: 

1019 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1020 # In C++ this would require a dynamic_cast 

1021 return self._GenerateAccessToken(self._session, self._host, self._interceptor) # type: ignore 

1022 

1023 @property 

1024 def generate_id_token( 

1025 self, 

1026 ) -> Callable[[common.GenerateIdTokenRequest], common.GenerateIdTokenResponse]: 

1027 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1028 # In C++ this would require a dynamic_cast 

1029 return self._GenerateIdToken(self._session, self._host, self._interceptor) # type: ignore 

1030 

1031 @property 

1032 def sign_blob(self) -> Callable[[common.SignBlobRequest], common.SignBlobResponse]: 

1033 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1034 # In C++ this would require a dynamic_cast 

1035 return self._SignBlob(self._session, self._host, self._interceptor) # type: ignore 

1036 

1037 @property 

1038 def sign_jwt(self) -> Callable[[common.SignJwtRequest], common.SignJwtResponse]: 

1039 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1040 # In C++ this would require a dynamic_cast 

1041 return self._SignJwt(self._session, self._host, self._interceptor) # type: ignore 

1042 

1043 @property 

1044 def kind(self) -> str: 

1045 return "rest" 

1046 

1047 def close(self): 

1048 self._session.close() 

1049 

1050 

1051__all__ = ("IAMCredentialsRestTransport",)