Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/iam_credentials_v1/services/iam_credentials/transports/rest.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

187 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import dataclasses 

17import json # type: ignore 

18import logging 

19from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

20import warnings 

21 

22from google.api_core import exceptions as core_exceptions 

23from google.api_core import gapic_v1, rest_helpers, rest_streaming 

24from google.api_core import retry as retries 

25from google.auth import credentials as ga_credentials # type: ignore 

26from google.auth.transport.requests import AuthorizedSession # type: ignore 

27import google.protobuf 

28from google.protobuf import json_format 

29from requests import __version__ as requests_version 

30 

31from google.cloud.iam_credentials_v1.types import common 

32 

33from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO 

34from .rest_base import _BaseIAMCredentialsRestTransport 

35 

36try: 

37 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] 

38except AttributeError: # pragma: NO COVER 

39 OptionalRetry = Union[retries.Retry, object, None] # type: ignore 

40 

41try: 

42 from google.api_core import client_logging # type: ignore 

43 

44 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

45except ImportError: # pragma: NO COVER 

46 CLIENT_LOGGING_SUPPORTED = False 

47 

48_LOGGER = logging.getLogger(__name__) 

49 

50DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

51 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

52 grpc_version=None, 

53 rest_version=f"requests@{requests_version}", 

54) 

55 

56if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

57 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

58 

59 

60class IAMCredentialsRestInterceptor: 

61 """Interceptor for IAMCredentials. 

62 

63 Interceptors are used to manipulate requests, request metadata, and responses 

64 in arbitrary ways. 

65 Example use cases include: 

66 * Logging 

67 * Verifying requests according to service or custom semantics 

68 * Stripping extraneous information from responses 

69 

70 These use cases and more can be enabled by injecting an 

71 instance of a custom subclass when constructing the IAMCredentialsRestTransport. 

72 

73 .. code-block:: python 

74 class MyCustomIAMCredentialsInterceptor(IAMCredentialsRestInterceptor): 

75 def pre_generate_access_token(self, request, metadata): 

76 logging.log(f"Received request: {request}") 

77 return request, metadata 

78 

79 def post_generate_access_token(self, response): 

80 logging.log(f"Received response: {response}") 

81 return response 

82 

83 def pre_generate_id_token(self, request, metadata): 

84 logging.log(f"Received request: {request}") 

85 return request, metadata 

86 

87 def post_generate_id_token(self, response): 

88 logging.log(f"Received response: {response}") 

89 return response 

90 

91 def pre_sign_blob(self, request, metadata): 

92 logging.log(f"Received request: {request}") 

93 return request, metadata 

94 

95 def post_sign_blob(self, response): 

96 logging.log(f"Received response: {response}") 

97 return response 

98 

99 def pre_sign_jwt(self, request, metadata): 

100 logging.log(f"Received request: {request}") 

101 return request, metadata 

102 

103 def post_sign_jwt(self, response): 

104 logging.log(f"Received response: {response}") 

105 return response 

106 

107 transport = IAMCredentialsRestTransport(interceptor=MyCustomIAMCredentialsInterceptor()) 

108 client = IAMCredentialsClient(transport=transport) 

109 

110 

111 """ 

112 

113 def pre_generate_access_token( 

114 self, 

115 request: common.GenerateAccessTokenRequest, 

116 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

117 ) -> Tuple[ 

118 common.GenerateAccessTokenRequest, Sequence[Tuple[str, Union[str, bytes]]] 

119 ]: 

120 """Pre-rpc interceptor for generate_access_token 

121 

122 Override in a subclass to manipulate the request or metadata 

123 before they are sent to the IAMCredentials server. 

124 """ 

125 return request, metadata 

126 

127 def post_generate_access_token( 

128 self, response: common.GenerateAccessTokenResponse 

129 ) -> common.GenerateAccessTokenResponse: 

130 """Post-rpc interceptor for generate_access_token 

131 

132 DEPRECATED. Please use the `post_generate_access_token_with_metadata` 

133 interceptor instead. 

134 

135 Override in a subclass to read or manipulate the response 

136 after it is returned by the IAMCredentials server but before 

137 it is returned to user code. This `post_generate_access_token` interceptor runs 

138 before the `post_generate_access_token_with_metadata` interceptor. 

139 """ 

140 return response 

141 

142 def post_generate_access_token_with_metadata( 

143 self, 

144 response: common.GenerateAccessTokenResponse, 

145 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

146 ) -> Tuple[ 

147 common.GenerateAccessTokenResponse, Sequence[Tuple[str, Union[str, bytes]]] 

148 ]: 

149 """Post-rpc interceptor for generate_access_token 

150 

151 Override in a subclass to read or manipulate the response or metadata after it 

152 is returned by the IAMCredentials server but before it is returned to user code. 

153 

154 We recommend only using this `post_generate_access_token_with_metadata` 

155 interceptor in new development instead of the `post_generate_access_token` interceptor. 

156 When both interceptors are used, this `post_generate_access_token_with_metadata` interceptor runs after the 

157 `post_generate_access_token` interceptor. The (possibly modified) response returned by 

158 `post_generate_access_token` will be passed to 

159 `post_generate_access_token_with_metadata`. 

160 """ 

161 return response, metadata 

162 

163 def pre_generate_id_token( 

164 self, 

165 request: common.GenerateIdTokenRequest, 

166 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

167 ) -> Tuple[common.GenerateIdTokenRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

168 """Pre-rpc interceptor for generate_id_token 

169 

170 Override in a subclass to manipulate the request or metadata 

171 before they are sent to the IAMCredentials server. 

172 """ 

173 return request, metadata 

174 

175 def post_generate_id_token( 

176 self, response: common.GenerateIdTokenResponse 

177 ) -> common.GenerateIdTokenResponse: 

178 """Post-rpc interceptor for generate_id_token 

179 

180 DEPRECATED. Please use the `post_generate_id_token_with_metadata` 

181 interceptor instead. 

182 

183 Override in a subclass to read or manipulate the response 

184 after it is returned by the IAMCredentials server but before 

185 it is returned to user code. This `post_generate_id_token` interceptor runs 

186 before the `post_generate_id_token_with_metadata` interceptor. 

187 """ 

188 return response 

189 

190 def post_generate_id_token_with_metadata( 

191 self, 

192 response: common.GenerateIdTokenResponse, 

193 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

194 ) -> Tuple[common.GenerateIdTokenResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

195 """Post-rpc interceptor for generate_id_token 

196 

197 Override in a subclass to read or manipulate the response or metadata after it 

198 is returned by the IAMCredentials server but before it is returned to user code. 

199 

200 We recommend only using this `post_generate_id_token_with_metadata` 

201 interceptor in new development instead of the `post_generate_id_token` interceptor. 

202 When both interceptors are used, this `post_generate_id_token_with_metadata` interceptor runs after the 

203 `post_generate_id_token` interceptor. The (possibly modified) response returned by 

204 `post_generate_id_token` will be passed to 

205 `post_generate_id_token_with_metadata`. 

206 """ 

207 return response, metadata 

208 

209 def pre_sign_blob( 

210 self, 

211 request: common.SignBlobRequest, 

212 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

213 ) -> Tuple[common.SignBlobRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

214 """Pre-rpc interceptor for sign_blob 

215 

216 Override in a subclass to manipulate the request or metadata 

217 before they are sent to the IAMCredentials server. 

218 """ 

219 return request, metadata 

220 

221 def post_sign_blob( 

222 self, response: common.SignBlobResponse 

223 ) -> common.SignBlobResponse: 

224 """Post-rpc interceptor for sign_blob 

225 

226 DEPRECATED. Please use the `post_sign_blob_with_metadata` 

227 interceptor instead. 

228 

229 Override in a subclass to read or manipulate the response 

230 after it is returned by the IAMCredentials server but before 

231 it is returned to user code. This `post_sign_blob` interceptor runs 

232 before the `post_sign_blob_with_metadata` interceptor. 

233 """ 

234 return response 

235 

236 def post_sign_blob_with_metadata( 

237 self, 

238 response: common.SignBlobResponse, 

239 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

240 ) -> Tuple[common.SignBlobResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

241 """Post-rpc interceptor for sign_blob 

242 

243 Override in a subclass to read or manipulate the response or metadata after it 

244 is returned by the IAMCredentials server but before it is returned to user code. 

245 

246 We recommend only using this `post_sign_blob_with_metadata` 

247 interceptor in new development instead of the `post_sign_blob` interceptor. 

248 When both interceptors are used, this `post_sign_blob_with_metadata` interceptor runs after the 

249 `post_sign_blob` interceptor. The (possibly modified) response returned by 

250 `post_sign_blob` will be passed to 

251 `post_sign_blob_with_metadata`. 

252 """ 

253 return response, metadata 

254 

255 def pre_sign_jwt( 

256 self, 

257 request: common.SignJwtRequest, 

258 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

259 ) -> Tuple[common.SignJwtRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

260 """Pre-rpc interceptor for sign_jwt 

261 

262 Override in a subclass to manipulate the request or metadata 

263 before they are sent to the IAMCredentials server. 

264 """ 

265 return request, metadata 

266 

267 def post_sign_jwt(self, response: common.SignJwtResponse) -> common.SignJwtResponse: 

268 """Post-rpc interceptor for sign_jwt 

269 

270 DEPRECATED. Please use the `post_sign_jwt_with_metadata` 

271 interceptor instead. 

272 

273 Override in a subclass to read or manipulate the response 

274 after it is returned by the IAMCredentials server but before 

275 it is returned to user code. This `post_sign_jwt` interceptor runs 

276 before the `post_sign_jwt_with_metadata` interceptor. 

277 """ 

278 return response 

279 

280 def post_sign_jwt_with_metadata( 

281 self, 

282 response: common.SignJwtResponse, 

283 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

284 ) -> Tuple[common.SignJwtResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

285 """Post-rpc interceptor for sign_jwt 

286 

287 Override in a subclass to read or manipulate the response or metadata after it 

288 is returned by the IAMCredentials server but before it is returned to user code. 

289 

290 We recommend only using this `post_sign_jwt_with_metadata` 

291 interceptor in new development instead of the `post_sign_jwt` interceptor. 

292 When both interceptors are used, this `post_sign_jwt_with_metadata` interceptor runs after the 

293 `post_sign_jwt` interceptor. The (possibly modified) response returned by 

294 `post_sign_jwt` will be passed to 

295 `post_sign_jwt_with_metadata`. 

296 """ 

297 return response, metadata 

298 

299 

300@dataclasses.dataclass 

301class IAMCredentialsRestStub: 

302 _session: AuthorizedSession 

303 _host: str 

304 _interceptor: IAMCredentialsRestInterceptor 

305 

306 

307class IAMCredentialsRestTransport(_BaseIAMCredentialsRestTransport): 

308 """REST backend synchronous transport for IAMCredentials. 

309 

310 A service account is a special type of Google account that 

311 belongs to your application or a virtual machine (VM), instead 

312 of to an individual end user. Your application assumes the 

313 identity of the service account to call Google APIs, so that the 

314 users aren't directly involved. 

315 

316 Service account credentials are used to temporarily assume the 

317 identity of the service account. Supported credential types 

318 include OAuth 2.0 access tokens, OpenID Connect ID tokens, 

319 self-signed JSON Web Tokens (JWTs), and more. 

320 

321 This class defines the same methods as the primary client, so the 

322 primary client can load the underlying transport implementation 

323 and call it. 

324 

325 It sends JSON representations of protocol buffers over HTTP/1.1 

326 """ 

327 

328 def __init__( 

329 self, 

330 *, 

331 host: str = "iamcredentials.googleapis.com", 

332 credentials: Optional[ga_credentials.Credentials] = None, 

333 credentials_file: Optional[str] = None, 

334 scopes: Optional[Sequence[str]] = None, 

335 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

336 quota_project_id: Optional[str] = None, 

337 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

338 always_use_jwt_access: Optional[bool] = False, 

339 url_scheme: str = "https", 

340 interceptor: Optional[IAMCredentialsRestInterceptor] = None, 

341 api_audience: Optional[str] = None, 

342 ) -> None: 

343 """Instantiate the transport. 

344 

345 Args: 

346 host (Optional[str]): 

347 The hostname to connect to (default: 'iamcredentials.googleapis.com'). 

348 credentials (Optional[google.auth.credentials.Credentials]): The 

349 authorization credentials to attach to requests. These 

350 credentials identify the application to the service; if none 

351 are specified, the client will attempt to ascertain the 

352 credentials from the environment. 

353 

354 credentials_file (Optional[str]): A file with credentials that can 

355 be loaded with :func:`google.auth.load_credentials_from_file`. 

356 This argument is ignored if ``channel`` is provided. 

357 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

358 ignored if ``channel`` is provided. 

359 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

360 certificate to configure mutual TLS HTTP channel. It is ignored 

361 if ``channel`` is provided. 

362 quota_project_id (Optional[str]): An optional project to use for billing 

363 and quota. 

364 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

365 The client info used to send a user-agent string along with 

366 API requests. If ``None``, then default info will be used. 

367 Generally, you only need to set this if you are developing 

368 your own client library. 

369 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

370 be used for service account credentials. 

371 url_scheme: the protocol scheme for the API endpoint. Normally 

372 "https", but for testing or local servers, 

373 "http" can be specified. 

374 """ 

375 # Run the base constructor 

376 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

377 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

378 # credentials object 

379 super().__init__( 

380 host=host, 

381 credentials=credentials, 

382 client_info=client_info, 

383 always_use_jwt_access=always_use_jwt_access, 

384 url_scheme=url_scheme, 

385 api_audience=api_audience, 

386 ) 

387 self._session = AuthorizedSession( 

388 self._credentials, default_host=self.DEFAULT_HOST 

389 ) 

390 if client_cert_source_for_mtls: 

391 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

392 self._interceptor = interceptor or IAMCredentialsRestInterceptor() 

393 self._prep_wrapped_messages(client_info) 

394 

395 class _GenerateAccessToken( 

396 _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken, 

397 IAMCredentialsRestStub, 

398 ): 

399 def __hash__(self): 

400 return hash("IAMCredentialsRestTransport.GenerateAccessToken") 

401 

402 @staticmethod 

403 def _get_response( 

404 host, 

405 metadata, 

406 query_params, 

407 session, 

408 timeout, 

409 transcoded_request, 

410 body=None, 

411 ): 

412 uri = transcoded_request["uri"] 

413 method = transcoded_request["method"] 

414 headers = dict(metadata) 

415 headers["Content-Type"] = "application/json" 

416 response = getattr(session, method)( 

417 "{host}{uri}".format(host=host, uri=uri), 

418 timeout=timeout, 

419 headers=headers, 

420 params=rest_helpers.flatten_query_params(query_params, strict=True), 

421 data=body, 

422 ) 

423 return response 

424 

425 def __call__( 

426 self, 

427 request: common.GenerateAccessTokenRequest, 

428 *, 

429 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

430 timeout: Optional[float] = None, 

431 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

432 ) -> common.GenerateAccessTokenResponse: 

433 r"""Call the generate access token method over HTTP. 

434 

435 Args: 

436 request (~.common.GenerateAccessTokenRequest): 

437 The request object. 

438 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

439 should be retried. 

440 timeout (float): The timeout for this request. 

441 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

442 sent along with the request as metadata. Normally, each value must be of type `str`, 

443 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

444 be of type `bytes`. 

445 

446 Returns: 

447 ~.common.GenerateAccessTokenResponse: 

448 

449 """ 

450 

451 http_options = ( 

452 _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_http_options() 

453 ) 

454 

455 request, metadata = self._interceptor.pre_generate_access_token( 

456 request, metadata 

457 ) 

458 transcoded_request = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_transcoded_request( 

459 http_options, request 

460 ) 

461 

462 body = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_request_body_json( 

463 transcoded_request 

464 ) 

465 

466 # Jsonify the query params 

467 query_params = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_query_params_json( 

468 transcoded_request 

469 ) 

470 

471 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

472 logging.DEBUG 

473 ): # pragma: NO COVER 

474 request_url = "{host}{uri}".format( 

475 host=self._host, uri=transcoded_request["uri"] 

476 ) 

477 method = transcoded_request["method"] 

478 try: 

479 request_payload = type(request).to_json(request) 

480 except: 

481 request_payload = None 

482 http_request = { 

483 "payload": request_payload, 

484 "requestMethod": method, 

485 "requestUrl": request_url, 

486 "headers": dict(metadata), 

487 } 

488 _LOGGER.debug( 

489 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.GenerateAccessToken", 

490 extra={ 

491 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

492 "rpcName": "GenerateAccessToken", 

493 "httpRequest": http_request, 

494 "metadata": http_request["headers"], 

495 }, 

496 ) 

497 

498 # Send the request 

499 response = IAMCredentialsRestTransport._GenerateAccessToken._get_response( 

500 self._host, 

501 metadata, 

502 query_params, 

503 self._session, 

504 timeout, 

505 transcoded_request, 

506 body, 

507 ) 

508 

509 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

510 # subclass. 

511 if response.status_code >= 400: 

512 raise core_exceptions.from_http_response(response) 

513 

514 # Return the response 

515 resp = common.GenerateAccessTokenResponse() 

516 pb_resp = common.GenerateAccessTokenResponse.pb(resp) 

517 

518 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

519 

520 resp = self._interceptor.post_generate_access_token(resp) 

521 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

522 resp, _ = self._interceptor.post_generate_access_token_with_metadata( 

523 resp, response_metadata 

524 ) 

525 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

526 logging.DEBUG 

527 ): # pragma: NO COVER 

528 try: 

529 response_payload = common.GenerateAccessTokenResponse.to_json( 

530 response 

531 ) 

532 except: 

533 response_payload = None 

534 http_response = { 

535 "payload": response_payload, 

536 "headers": dict(response.headers), 

537 "status": response.status_code, 

538 } 

539 _LOGGER.debug( 

540 "Received response for google.iam.credentials_v1.IAMCredentialsClient.generate_access_token", 

541 extra={ 

542 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

543 "rpcName": "GenerateAccessToken", 

544 "metadata": http_response["headers"], 

545 "httpResponse": http_response, 

546 }, 

547 ) 

548 return resp 

549 

550 class _GenerateIdToken( 

551 _BaseIAMCredentialsRestTransport._BaseGenerateIdToken, IAMCredentialsRestStub 

552 ): 

553 def __hash__(self): 

554 return hash("IAMCredentialsRestTransport.GenerateIdToken") 

555 

556 @staticmethod 

557 def _get_response( 

558 host, 

559 metadata, 

560 query_params, 

561 session, 

562 timeout, 

563 transcoded_request, 

564 body=None, 

565 ): 

566 uri = transcoded_request["uri"] 

567 method = transcoded_request["method"] 

568 headers = dict(metadata) 

569 headers["Content-Type"] = "application/json" 

570 response = getattr(session, method)( 

571 "{host}{uri}".format(host=host, uri=uri), 

572 timeout=timeout, 

573 headers=headers, 

574 params=rest_helpers.flatten_query_params(query_params, strict=True), 

575 data=body, 

576 ) 

577 return response 

578 

579 def __call__( 

580 self, 

581 request: common.GenerateIdTokenRequest, 

582 *, 

583 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

584 timeout: Optional[float] = None, 

585 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

586 ) -> common.GenerateIdTokenResponse: 

587 r"""Call the generate id token method over HTTP. 

588 

589 Args: 

590 request (~.common.GenerateIdTokenRequest): 

591 The request object. 

592 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

593 should be retried. 

594 timeout (float): The timeout for this request. 

595 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

596 sent along with the request as metadata. Normally, each value must be of type `str`, 

597 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

598 be of type `bytes`. 

599 

600 Returns: 

601 ~.common.GenerateIdTokenResponse: 

602 

603 """ 

604 

605 http_options = ( 

606 _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_http_options() 

607 ) 

608 

609 request, metadata = self._interceptor.pre_generate_id_token( 

610 request, metadata 

611 ) 

612 transcoded_request = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_transcoded_request( 

613 http_options, request 

614 ) 

615 

616 body = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_request_body_json( 

617 transcoded_request 

618 ) 

619 

620 # Jsonify the query params 

621 query_params = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_query_params_json( 

622 transcoded_request 

623 ) 

624 

625 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

626 logging.DEBUG 

627 ): # pragma: NO COVER 

628 request_url = "{host}{uri}".format( 

629 host=self._host, uri=transcoded_request["uri"] 

630 ) 

631 method = transcoded_request["method"] 

632 try: 

633 request_payload = type(request).to_json(request) 

634 except: 

635 request_payload = None 

636 http_request = { 

637 "payload": request_payload, 

638 "requestMethod": method, 

639 "requestUrl": request_url, 

640 "headers": dict(metadata), 

641 } 

642 _LOGGER.debug( 

643 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.GenerateIdToken", 

644 extra={ 

645 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

646 "rpcName": "GenerateIdToken", 

647 "httpRequest": http_request, 

648 "metadata": http_request["headers"], 

649 }, 

650 ) 

651 

652 # Send the request 

653 response = IAMCredentialsRestTransport._GenerateIdToken._get_response( 

654 self._host, 

655 metadata, 

656 query_params, 

657 self._session, 

658 timeout, 

659 transcoded_request, 

660 body, 

661 ) 

662 

663 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

664 # subclass. 

665 if response.status_code >= 400: 

666 raise core_exceptions.from_http_response(response) 

667 

668 # Return the response 

669 resp = common.GenerateIdTokenResponse() 

670 pb_resp = common.GenerateIdTokenResponse.pb(resp) 

671 

672 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

673 

674 resp = self._interceptor.post_generate_id_token(resp) 

675 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

676 resp, _ = self._interceptor.post_generate_id_token_with_metadata( 

677 resp, response_metadata 

678 ) 

679 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

680 logging.DEBUG 

681 ): # pragma: NO COVER 

682 try: 

683 response_payload = common.GenerateIdTokenResponse.to_json(response) 

684 except: 

685 response_payload = None 

686 http_response = { 

687 "payload": response_payload, 

688 "headers": dict(response.headers), 

689 "status": response.status_code, 

690 } 

691 _LOGGER.debug( 

692 "Received response for google.iam.credentials_v1.IAMCredentialsClient.generate_id_token", 

693 extra={ 

694 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

695 "rpcName": "GenerateIdToken", 

696 "metadata": http_response["headers"], 

697 "httpResponse": http_response, 

698 }, 

699 ) 

700 return resp 

701 

702 class _SignBlob( 

703 _BaseIAMCredentialsRestTransport._BaseSignBlob, IAMCredentialsRestStub 

704 ): 

705 def __hash__(self): 

706 return hash("IAMCredentialsRestTransport.SignBlob") 

707 

708 @staticmethod 

709 def _get_response( 

710 host, 

711 metadata, 

712 query_params, 

713 session, 

714 timeout, 

715 transcoded_request, 

716 body=None, 

717 ): 

718 uri = transcoded_request["uri"] 

719 method = transcoded_request["method"] 

720 headers = dict(metadata) 

721 headers["Content-Type"] = "application/json" 

722 response = getattr(session, method)( 

723 "{host}{uri}".format(host=host, uri=uri), 

724 timeout=timeout, 

725 headers=headers, 

726 params=rest_helpers.flatten_query_params(query_params, strict=True), 

727 data=body, 

728 ) 

729 return response 

730 

731 def __call__( 

732 self, 

733 request: common.SignBlobRequest, 

734 *, 

735 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

736 timeout: Optional[float] = None, 

737 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

738 ) -> common.SignBlobResponse: 

739 r"""Call the sign blob method over HTTP. 

740 

741 Args: 

742 request (~.common.SignBlobRequest): 

743 The request object. 

744 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

745 should be retried. 

746 timeout (float): The timeout for this request. 

747 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

748 sent along with the request as metadata. Normally, each value must be of type `str`, 

749 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

750 be of type `bytes`. 

751 

752 Returns: 

753 ~.common.SignBlobResponse: 

754 

755 """ 

756 

757 http_options = ( 

758 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_http_options() 

759 ) 

760 

761 request, metadata = self._interceptor.pre_sign_blob(request, metadata) 

762 transcoded_request = ( 

763 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_transcoded_request( 

764 http_options, request 

765 ) 

766 ) 

767 

768 body = ( 

769 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_request_body_json( 

770 transcoded_request 

771 ) 

772 ) 

773 

774 # Jsonify the query params 

775 query_params = ( 

776 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_query_params_json( 

777 transcoded_request 

778 ) 

779 ) 

780 

781 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

782 logging.DEBUG 

783 ): # pragma: NO COVER 

784 request_url = "{host}{uri}".format( 

785 host=self._host, uri=transcoded_request["uri"] 

786 ) 

787 method = transcoded_request["method"] 

788 try: 

789 request_payload = type(request).to_json(request) 

790 except: 

791 request_payload = None 

792 http_request = { 

793 "payload": request_payload, 

794 "requestMethod": method, 

795 "requestUrl": request_url, 

796 "headers": dict(metadata), 

797 } 

798 _LOGGER.debug( 

799 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.SignBlob", 

800 extra={ 

801 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

802 "rpcName": "SignBlob", 

803 "httpRequest": http_request, 

804 "metadata": http_request["headers"], 

805 }, 

806 ) 

807 

808 # Send the request 

809 response = IAMCredentialsRestTransport._SignBlob._get_response( 

810 self._host, 

811 metadata, 

812 query_params, 

813 self._session, 

814 timeout, 

815 transcoded_request, 

816 body, 

817 ) 

818 

819 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

820 # subclass. 

821 if response.status_code >= 400: 

822 raise core_exceptions.from_http_response(response) 

823 

824 # Return the response 

825 resp = common.SignBlobResponse() 

826 pb_resp = common.SignBlobResponse.pb(resp) 

827 

828 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

829 

830 resp = self._interceptor.post_sign_blob(resp) 

831 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

832 resp, _ = self._interceptor.post_sign_blob_with_metadata( 

833 resp, response_metadata 

834 ) 

835 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

836 logging.DEBUG 

837 ): # pragma: NO COVER 

838 try: 

839 response_payload = common.SignBlobResponse.to_json(response) 

840 except: 

841 response_payload = None 

842 http_response = { 

843 "payload": response_payload, 

844 "headers": dict(response.headers), 

845 "status": response.status_code, 

846 } 

847 _LOGGER.debug( 

848 "Received response for google.iam.credentials_v1.IAMCredentialsClient.sign_blob", 

849 extra={ 

850 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

851 "rpcName": "SignBlob", 

852 "metadata": http_response["headers"], 

853 "httpResponse": http_response, 

854 }, 

855 ) 

856 return resp 

857 

858 class _SignJwt( 

859 _BaseIAMCredentialsRestTransport._BaseSignJwt, IAMCredentialsRestStub 

860 ): 

861 def __hash__(self): 

862 return hash("IAMCredentialsRestTransport.SignJwt") 

863 

864 @staticmethod 

865 def _get_response( 

866 host, 

867 metadata, 

868 query_params, 

869 session, 

870 timeout, 

871 transcoded_request, 

872 body=None, 

873 ): 

874 uri = transcoded_request["uri"] 

875 method = transcoded_request["method"] 

876 headers = dict(metadata) 

877 headers["Content-Type"] = "application/json" 

878 response = getattr(session, method)( 

879 "{host}{uri}".format(host=host, uri=uri), 

880 timeout=timeout, 

881 headers=headers, 

882 params=rest_helpers.flatten_query_params(query_params, strict=True), 

883 data=body, 

884 ) 

885 return response 

886 

887 def __call__( 

888 self, 

889 request: common.SignJwtRequest, 

890 *, 

891 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

892 timeout: Optional[float] = None, 

893 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

894 ) -> common.SignJwtResponse: 

895 r"""Call the sign jwt method over HTTP. 

896 

897 Args: 

898 request (~.common.SignJwtRequest): 

899 The request object. 

900 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

901 should be retried. 

902 timeout (float): The timeout for this request. 

903 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

904 sent along with the request as metadata. Normally, each value must be of type `str`, 

905 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

906 be of type `bytes`. 

907 

908 Returns: 

909 ~.common.SignJwtResponse: 

910 

911 """ 

912 

913 http_options = ( 

914 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_http_options() 

915 ) 

916 

917 request, metadata = self._interceptor.pre_sign_jwt(request, metadata) 

918 transcoded_request = ( 

919 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_transcoded_request( 

920 http_options, request 

921 ) 

922 ) 

923 

924 body = _BaseIAMCredentialsRestTransport._BaseSignJwt._get_request_body_json( 

925 transcoded_request 

926 ) 

927 

928 # Jsonify the query params 

929 query_params = ( 

930 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_query_params_json( 

931 transcoded_request 

932 ) 

933 ) 

934 

935 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

936 logging.DEBUG 

937 ): # pragma: NO COVER 

938 request_url = "{host}{uri}".format( 

939 host=self._host, uri=transcoded_request["uri"] 

940 ) 

941 method = transcoded_request["method"] 

942 try: 

943 request_payload = type(request).to_json(request) 

944 except: 

945 request_payload = None 

946 http_request = { 

947 "payload": request_payload, 

948 "requestMethod": method, 

949 "requestUrl": request_url, 

950 "headers": dict(metadata), 

951 } 

952 _LOGGER.debug( 

953 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.SignJwt", 

954 extra={ 

955 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

956 "rpcName": "SignJwt", 

957 "httpRequest": http_request, 

958 "metadata": http_request["headers"], 

959 }, 

960 ) 

961 

962 # Send the request 

963 response = IAMCredentialsRestTransport._SignJwt._get_response( 

964 self._host, 

965 metadata, 

966 query_params, 

967 self._session, 

968 timeout, 

969 transcoded_request, 

970 body, 

971 ) 

972 

973 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

974 # subclass. 

975 if response.status_code >= 400: 

976 raise core_exceptions.from_http_response(response) 

977 

978 # Return the response 

979 resp = common.SignJwtResponse() 

980 pb_resp = common.SignJwtResponse.pb(resp) 

981 

982 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

983 

984 resp = self._interceptor.post_sign_jwt(resp) 

985 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

986 resp, _ = self._interceptor.post_sign_jwt_with_metadata( 

987 resp, response_metadata 

988 ) 

989 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

990 logging.DEBUG 

991 ): # pragma: NO COVER 

992 try: 

993 response_payload = common.SignJwtResponse.to_json(response) 

994 except: 

995 response_payload = None 

996 http_response = { 

997 "payload": response_payload, 

998 "headers": dict(response.headers), 

999 "status": response.status_code, 

1000 } 

1001 _LOGGER.debug( 

1002 "Received response for google.iam.credentials_v1.IAMCredentialsClient.sign_jwt", 

1003 extra={ 

1004 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

1005 "rpcName": "SignJwt", 

1006 "metadata": http_response["headers"], 

1007 "httpResponse": http_response, 

1008 }, 

1009 ) 

1010 return resp 

1011 

1012 @property 

1013 def generate_access_token( 

1014 self, 

1015 ) -> Callable[ 

1016 [common.GenerateAccessTokenRequest], common.GenerateAccessTokenResponse 

1017 ]: 

1018 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1019 # In C++ this would require a dynamic_cast 

1020 return self._GenerateAccessToken(self._session, self._host, self._interceptor) # type: ignore 

1021 

1022 @property 

1023 def generate_id_token( 

1024 self, 

1025 ) -> Callable[[common.GenerateIdTokenRequest], common.GenerateIdTokenResponse]: 

1026 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1027 # In C++ this would require a dynamic_cast 

1028 return self._GenerateIdToken(self._session, self._host, self._interceptor) # type: ignore 

1029 

1030 @property 

1031 def sign_blob(self) -> Callable[[common.SignBlobRequest], common.SignBlobResponse]: 

1032 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1033 # In C++ this would require a dynamic_cast 

1034 return self._SignBlob(self._session, self._host, self._interceptor) # type: ignore 

1035 

1036 @property 

1037 def sign_jwt(self) -> Callable[[common.SignJwtRequest], common.SignJwtResponse]: 

1038 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1039 # In C++ this would require a dynamic_cast 

1040 return self._SignJwt(self._session, self._host, self._interceptor) # type: ignore 

1041 

1042 @property 

1043 def kind(self) -> str: 

1044 return "rest" 

1045 

1046 def close(self): 

1047 self._session.close() 

1048 

1049 

1050__all__ = ("IAMCredentialsRestTransport",)