Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/iam_credentials_v1/services/iam_credentials/transports/rest.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

187 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import dataclasses 

17import json # type: ignore 

18import logging 

19import warnings 

20from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

21 

22import google.protobuf 

23from google.api_core import exceptions as core_exceptions 

24from google.api_core import gapic_v1, rest_helpers, rest_streaming 

25from google.api_core import retry as retries 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.auth.transport.requests import AuthorizedSession # type: ignore 

28from google.protobuf import json_format 

29from requests import __version__ as requests_version 

30 

31from google.cloud.iam_credentials_v1.types import common 

32 

33from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO 

34from .rest_base import _BaseIAMCredentialsRestTransport 

35 

36try: 

37 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] 

38except AttributeError: # pragma: NO COVER 

39 OptionalRetry = Union[retries.Retry, object, None] # type: ignore 

40 

41try: 

42 from google.api_core import client_logging # type: ignore 

43 

44 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

45except ImportError: # pragma: NO COVER 

46 CLIENT_LOGGING_SUPPORTED = False 

47 

48_LOGGER = logging.getLogger(__name__) 

49 

50DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

51 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

52 grpc_version=None, 

53 rest_version=f"requests@{requests_version}", 

54) 

55 

56if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

57 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

58 

59 

60class IAMCredentialsRestInterceptor: 

61 """Interceptor for IAMCredentials. 

62 

63 Interceptors are used to manipulate requests, request metadata, and responses 

64 in arbitrary ways. 

65 Example use cases include: 

66 * Logging 

67 * Verifying requests according to service or custom semantics 

68 * Stripping extraneous information from responses 

69 

70 These use cases and more can be enabled by injecting an 

71 instance of a custom subclass when constructing the IAMCredentialsRestTransport. 

72 

73 .. code-block:: python 

74 class MyCustomIAMCredentialsInterceptor(IAMCredentialsRestInterceptor): 

75 def pre_generate_access_token(self, request, metadata): 

76 logging.log(f"Received request: {request}") 

77 return request, metadata 

78 

79 def post_generate_access_token(self, response): 

80 logging.log(f"Received response: {response}") 

81 return response 

82 

83 def pre_generate_id_token(self, request, metadata): 

84 logging.log(f"Received request: {request}") 

85 return request, metadata 

86 

87 def post_generate_id_token(self, response): 

88 logging.log(f"Received response: {response}") 

89 return response 

90 

91 def pre_sign_blob(self, request, metadata): 

92 logging.log(f"Received request: {request}") 

93 return request, metadata 

94 

95 def post_sign_blob(self, response): 

96 logging.log(f"Received response: {response}") 

97 return response 

98 

99 def pre_sign_jwt(self, request, metadata): 

100 logging.log(f"Received request: {request}") 

101 return request, metadata 

102 

103 def post_sign_jwt(self, response): 

104 logging.log(f"Received response: {response}") 

105 return response 

106 

107 transport = IAMCredentialsRestTransport(interceptor=MyCustomIAMCredentialsInterceptor()) 

108 client = IAMCredentialsClient(transport=transport) 

109 

110 

111 """ 

112 

113 def pre_generate_access_token( 

114 self, 

115 request: common.GenerateAccessTokenRequest, 

116 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

117 ) -> Tuple[ 

118 common.GenerateAccessTokenRequest, Sequence[Tuple[str, Union[str, bytes]]] 

119 ]: 

120 """Pre-rpc interceptor for generate_access_token 

121 

122 Override in a subclass to manipulate the request or metadata 

123 before they are sent to the IAMCredentials server. 

124 """ 

125 return request, metadata 

126 

127 def post_generate_access_token( 

128 self, response: common.GenerateAccessTokenResponse 

129 ) -> common.GenerateAccessTokenResponse: 

130 """Post-rpc interceptor for generate_access_token 

131 

132 DEPRECATED. Please use the `post_generate_access_token_with_metadata` 

133 interceptor instead. 

134 

135 Override in a subclass to read or manipulate the response 

136 after it is returned by the IAMCredentials server but before 

137 it is returned to user code. This `post_generate_access_token` interceptor runs 

138 before the `post_generate_access_token_with_metadata` interceptor. 

139 """ 

140 return response 

141 

142 def post_generate_access_token_with_metadata( 

143 self, 

144 response: common.GenerateAccessTokenResponse, 

145 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

146 ) -> Tuple[ 

147 common.GenerateAccessTokenResponse, Sequence[Tuple[str, Union[str, bytes]]] 

148 ]: 

149 """Post-rpc interceptor for generate_access_token 

150 

151 Override in a subclass to read or manipulate the response or metadata after it 

152 is returned by the IAMCredentials server but before it is returned to user code. 

153 

154 We recommend only using this `post_generate_access_token_with_metadata` 

155 interceptor in new development instead of the `post_generate_access_token` interceptor. 

156 When both interceptors are used, this `post_generate_access_token_with_metadata` interceptor runs after the 

157 `post_generate_access_token` interceptor. The (possibly modified) response returned by 

158 `post_generate_access_token` will be passed to 

159 `post_generate_access_token_with_metadata`. 

160 """ 

161 return response, metadata 

162 

163 def pre_generate_id_token( 

164 self, 

165 request: common.GenerateIdTokenRequest, 

166 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

167 ) -> Tuple[common.GenerateIdTokenRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

168 """Pre-rpc interceptor for generate_id_token 

169 

170 Override in a subclass to manipulate the request or metadata 

171 before they are sent to the IAMCredentials server. 

172 """ 

173 return request, metadata 

174 

175 def post_generate_id_token( 

176 self, response: common.GenerateIdTokenResponse 

177 ) -> common.GenerateIdTokenResponse: 

178 """Post-rpc interceptor for generate_id_token 

179 

180 DEPRECATED. Please use the `post_generate_id_token_with_metadata` 

181 interceptor instead. 

182 

183 Override in a subclass to read or manipulate the response 

184 after it is returned by the IAMCredentials server but before 

185 it is returned to user code. This `post_generate_id_token` interceptor runs 

186 before the `post_generate_id_token_with_metadata` interceptor. 

187 """ 

188 return response 

189 

190 def post_generate_id_token_with_metadata( 

191 self, 

192 response: common.GenerateIdTokenResponse, 

193 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

194 ) -> Tuple[common.GenerateIdTokenResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

195 """Post-rpc interceptor for generate_id_token 

196 

197 Override in a subclass to read or manipulate the response or metadata after it 

198 is returned by the IAMCredentials server but before it is returned to user code. 

199 

200 We recommend only using this `post_generate_id_token_with_metadata` 

201 interceptor in new development instead of the `post_generate_id_token` interceptor. 

202 When both interceptors are used, this `post_generate_id_token_with_metadata` interceptor runs after the 

203 `post_generate_id_token` interceptor. The (possibly modified) response returned by 

204 `post_generate_id_token` will be passed to 

205 `post_generate_id_token_with_metadata`. 

206 """ 

207 return response, metadata 

208 

209 def pre_sign_blob( 

210 self, 

211 request: common.SignBlobRequest, 

212 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

213 ) -> Tuple[common.SignBlobRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

214 """Pre-rpc interceptor for sign_blob 

215 

216 Override in a subclass to manipulate the request or metadata 

217 before they are sent to the IAMCredentials server. 

218 """ 

219 return request, metadata 

220 

221 def post_sign_blob( 

222 self, response: common.SignBlobResponse 

223 ) -> common.SignBlobResponse: 

224 """Post-rpc interceptor for sign_blob 

225 

226 DEPRECATED. Please use the `post_sign_blob_with_metadata` 

227 interceptor instead. 

228 

229 Override in a subclass to read or manipulate the response 

230 after it is returned by the IAMCredentials server but before 

231 it is returned to user code. This `post_sign_blob` interceptor runs 

232 before the `post_sign_blob_with_metadata` interceptor. 

233 """ 

234 return response 

235 

236 def post_sign_blob_with_metadata( 

237 self, 

238 response: common.SignBlobResponse, 

239 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

240 ) -> Tuple[common.SignBlobResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

241 """Post-rpc interceptor for sign_blob 

242 

243 Override in a subclass to read or manipulate the response or metadata after it 

244 is returned by the IAMCredentials server but before it is returned to user code. 

245 

246 We recommend only using this `post_sign_blob_with_metadata` 

247 interceptor in new development instead of the `post_sign_blob` interceptor. 

248 When both interceptors are used, this `post_sign_blob_with_metadata` interceptor runs after the 

249 `post_sign_blob` interceptor. The (possibly modified) response returned by 

250 `post_sign_blob` will be passed to 

251 `post_sign_blob_with_metadata`. 

252 """ 

253 return response, metadata 

254 

255 def pre_sign_jwt( 

256 self, 

257 request: common.SignJwtRequest, 

258 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

259 ) -> Tuple[common.SignJwtRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

260 """Pre-rpc interceptor for sign_jwt 

261 

262 Override in a subclass to manipulate the request or metadata 

263 before they are sent to the IAMCredentials server. 

264 """ 

265 return request, metadata 

266 

267 def post_sign_jwt(self, response: common.SignJwtResponse) -> common.SignJwtResponse: 

268 """Post-rpc interceptor for sign_jwt 

269 

270 DEPRECATED. Please use the `post_sign_jwt_with_metadata` 

271 interceptor instead. 

272 

273 Override in a subclass to read or manipulate the response 

274 after it is returned by the IAMCredentials server but before 

275 it is returned to user code. This `post_sign_jwt` interceptor runs 

276 before the `post_sign_jwt_with_metadata` interceptor. 

277 """ 

278 return response 

279 

280 def post_sign_jwt_with_metadata( 

281 self, 

282 response: common.SignJwtResponse, 

283 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

284 ) -> Tuple[common.SignJwtResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

285 """Post-rpc interceptor for sign_jwt 

286 

287 Override in a subclass to read or manipulate the response or metadata after it 

288 is returned by the IAMCredentials server but before it is returned to user code. 

289 

290 We recommend only using this `post_sign_jwt_with_metadata` 

291 interceptor in new development instead of the `post_sign_jwt` interceptor. 

292 When both interceptors are used, this `post_sign_jwt_with_metadata` interceptor runs after the 

293 `post_sign_jwt` interceptor. The (possibly modified) response returned by 

294 `post_sign_jwt` will be passed to 

295 `post_sign_jwt_with_metadata`. 

296 """ 

297 return response, metadata 

298 

299 

300@dataclasses.dataclass 

301class IAMCredentialsRestStub: 

302 _session: AuthorizedSession 

303 _host: str 

304 _interceptor: IAMCredentialsRestInterceptor 

305 

306 

307class IAMCredentialsRestTransport(_BaseIAMCredentialsRestTransport): 

308 """REST backend synchronous transport for IAMCredentials. 

309 

310 A service account is a special type of Google account that 

311 belongs to your application or a virtual machine (VM), instead 

312 of to an individual end user. Your application assumes the 

313 identity of the service account to call Google APIs, so that the 

314 users aren't directly involved. 

315 

316 Service account credentials are used to temporarily assume the 

317 identity of the service account. Supported credential types 

318 include OAuth 2.0 access tokens, OpenID Connect ID tokens, 

319 self-signed JSON Web Tokens (JWTs), and more. 

320 

321 This class defines the same methods as the primary client, so the 

322 primary client can load the underlying transport implementation 

323 and call it. 

324 

325 It sends JSON representations of protocol buffers over HTTP/1.1 

326 """ 

327 

328 def __init__( 

329 self, 

330 *, 

331 host: str = "iamcredentials.googleapis.com", 

332 credentials: Optional[ga_credentials.Credentials] = None, 

333 credentials_file: Optional[str] = None, 

334 scopes: Optional[Sequence[str]] = None, 

335 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

336 quota_project_id: Optional[str] = None, 

337 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

338 always_use_jwt_access: Optional[bool] = False, 

339 url_scheme: str = "https", 

340 interceptor: Optional[IAMCredentialsRestInterceptor] = None, 

341 api_audience: Optional[str] = None, 

342 ) -> None: 

343 """Instantiate the transport. 

344 

345 Args: 

346 host (Optional[str]): 

347 The hostname to connect to (default: 'iamcredentials.googleapis.com'). 

348 credentials (Optional[google.auth.credentials.Credentials]): The 

349 authorization credentials to attach to requests. These 

350 credentials identify the application to the service; if none 

351 are specified, the client will attempt to ascertain the 

352 credentials from the environment. 

353 

354 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

355 be loaded with :func:`google.auth.load_credentials_from_file`. 

356 This argument is ignored if ``channel`` is provided. This argument will be 

357 removed in the next major version of this library. 

358 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

359 ignored if ``channel`` is provided. 

360 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

361 certificate to configure mutual TLS HTTP channel. It is ignored 

362 if ``channel`` is provided. 

363 quota_project_id (Optional[str]): An optional project to use for billing 

364 and quota. 

365 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

366 The client info used to send a user-agent string along with 

367 API requests. If ``None``, then default info will be used. 

368 Generally, you only need to set this if you are developing 

369 your own client library. 

370 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

371 be used for service account credentials. 

372 url_scheme: the protocol scheme for the API endpoint. Normally 

373 "https", but for testing or local servers, 

374 "http" can be specified. 

375 interceptor (Optional[IAMCredentialsRestInterceptor]): Interceptor used 

376 to manipulate requests, request metadata, and responses. 

377 api_audience (Optional[str]): The intended audience for the API calls 

378 to the service that will be set when using certain 3rd party 

379 authentication flows. Audience is typically a resource identifier. 

380 If not set, the host value will be used as a default. 

381 """ 

382 # Run the base constructor 

383 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

384 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

385 # credentials object 

386 super().__init__( 

387 host=host, 

388 credentials=credentials, 

389 client_info=client_info, 

390 always_use_jwt_access=always_use_jwt_access, 

391 url_scheme=url_scheme, 

392 api_audience=api_audience, 

393 ) 

394 self._session = AuthorizedSession( 

395 self._credentials, default_host=self.DEFAULT_HOST 

396 ) 

397 if client_cert_source_for_mtls: 

398 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

399 self._interceptor = interceptor or IAMCredentialsRestInterceptor() 

400 self._prep_wrapped_messages(client_info) 

401 

402 class _GenerateAccessToken( 

403 _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken, 

404 IAMCredentialsRestStub, 

405 ): 

406 def __hash__(self): 

407 return hash("IAMCredentialsRestTransport.GenerateAccessToken") 

408 

409 @staticmethod 

410 def _get_response( 

411 host, 

412 metadata, 

413 query_params, 

414 session, 

415 timeout, 

416 transcoded_request, 

417 body=None, 

418 ): 

419 uri = transcoded_request["uri"] 

420 method = transcoded_request["method"] 

421 headers = dict(metadata) 

422 headers["Content-Type"] = "application/json" 

423 response = getattr(session, method)( 

424 "{host}{uri}".format(host=host, uri=uri), 

425 timeout=timeout, 

426 headers=headers, 

427 params=rest_helpers.flatten_query_params(query_params, strict=True), 

428 data=body, 

429 ) 

430 return response 

431 

432 def __call__( 

433 self, 

434 request: common.GenerateAccessTokenRequest, 

435 *, 

436 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

437 timeout: Optional[float] = None, 

438 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

439 ) -> common.GenerateAccessTokenResponse: 

440 r"""Call the generate access token method over HTTP. 

441 

442 Args: 

443 request (~.common.GenerateAccessTokenRequest): 

444 The request object. 

445 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

446 should be retried. 

447 timeout (float): The timeout for this request. 

448 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

449 sent along with the request as metadata. Normally, each value must be of type `str`, 

450 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

451 be of type `bytes`. 

452 

453 Returns: 

454 ~.common.GenerateAccessTokenResponse: 

455 

456 """ 

457 

458 http_options = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_http_options() 

459 

460 request, metadata = self._interceptor.pre_generate_access_token( 

461 request, metadata 

462 ) 

463 transcoded_request = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_transcoded_request( 

464 http_options, request 

465 ) 

466 

467 body = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_request_body_json( 

468 transcoded_request 

469 ) 

470 

471 # Jsonify the query params 

472 query_params = _BaseIAMCredentialsRestTransport._BaseGenerateAccessToken._get_query_params_json( 

473 transcoded_request 

474 ) 

475 

476 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

477 logging.DEBUG 

478 ): # pragma: NO COVER 

479 request_url = "{host}{uri}".format( 

480 host=self._host, uri=transcoded_request["uri"] 

481 ) 

482 method = transcoded_request["method"] 

483 try: 

484 request_payload = type(request).to_json(request) 

485 except: 

486 request_payload = None 

487 http_request = { 

488 "payload": request_payload, 

489 "requestMethod": method, 

490 "requestUrl": request_url, 

491 "headers": dict(metadata), 

492 } 

493 _LOGGER.debug( 

494 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.GenerateAccessToken", 

495 extra={ 

496 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

497 "rpcName": "GenerateAccessToken", 

498 "httpRequest": http_request, 

499 "metadata": http_request["headers"], 

500 }, 

501 ) 

502 

503 # Send the request 

504 response = IAMCredentialsRestTransport._GenerateAccessToken._get_response( 

505 self._host, 

506 metadata, 

507 query_params, 

508 self._session, 

509 timeout, 

510 transcoded_request, 

511 body, 

512 ) 

513 

514 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

515 # subclass. 

516 if response.status_code >= 400: 

517 raise core_exceptions.from_http_response(response) 

518 

519 # Return the response 

520 resp = common.GenerateAccessTokenResponse() 

521 pb_resp = common.GenerateAccessTokenResponse.pb(resp) 

522 

523 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

524 

525 resp = self._interceptor.post_generate_access_token(resp) 

526 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

527 resp, _ = self._interceptor.post_generate_access_token_with_metadata( 

528 resp, response_metadata 

529 ) 

530 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

531 logging.DEBUG 

532 ): # pragma: NO COVER 

533 try: 

534 response_payload = common.GenerateAccessTokenResponse.to_json( 

535 response 

536 ) 

537 except: 

538 response_payload = None 

539 http_response = { 

540 "payload": response_payload, 

541 "headers": dict(response.headers), 

542 "status": response.status_code, 

543 } 

544 _LOGGER.debug( 

545 "Received response for google.iam.credentials_v1.IAMCredentialsClient.generate_access_token", 

546 extra={ 

547 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

548 "rpcName": "GenerateAccessToken", 

549 "metadata": http_response["headers"], 

550 "httpResponse": http_response, 

551 }, 

552 ) 

553 return resp 

554 

555 class _GenerateIdToken( 

556 _BaseIAMCredentialsRestTransport._BaseGenerateIdToken, IAMCredentialsRestStub 

557 ): 

558 def __hash__(self): 

559 return hash("IAMCredentialsRestTransport.GenerateIdToken") 

560 

561 @staticmethod 

562 def _get_response( 

563 host, 

564 metadata, 

565 query_params, 

566 session, 

567 timeout, 

568 transcoded_request, 

569 body=None, 

570 ): 

571 uri = transcoded_request["uri"] 

572 method = transcoded_request["method"] 

573 headers = dict(metadata) 

574 headers["Content-Type"] = "application/json" 

575 response = getattr(session, method)( 

576 "{host}{uri}".format(host=host, uri=uri), 

577 timeout=timeout, 

578 headers=headers, 

579 params=rest_helpers.flatten_query_params(query_params, strict=True), 

580 data=body, 

581 ) 

582 return response 

583 

584 def __call__( 

585 self, 

586 request: common.GenerateIdTokenRequest, 

587 *, 

588 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

589 timeout: Optional[float] = None, 

590 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

591 ) -> common.GenerateIdTokenResponse: 

592 r"""Call the generate id token method over HTTP. 

593 

594 Args: 

595 request (~.common.GenerateIdTokenRequest): 

596 The request object. 

597 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

598 should be retried. 

599 timeout (float): The timeout for this request. 

600 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

601 sent along with the request as metadata. Normally, each value must be of type `str`, 

602 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

603 be of type `bytes`. 

604 

605 Returns: 

606 ~.common.GenerateIdTokenResponse: 

607 

608 """ 

609 

610 http_options = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_http_options() 

611 

612 request, metadata = self._interceptor.pre_generate_id_token( 

613 request, metadata 

614 ) 

615 transcoded_request = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_transcoded_request( 

616 http_options, request 

617 ) 

618 

619 body = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_request_body_json( 

620 transcoded_request 

621 ) 

622 

623 # Jsonify the query params 

624 query_params = _BaseIAMCredentialsRestTransport._BaseGenerateIdToken._get_query_params_json( 

625 transcoded_request 

626 ) 

627 

628 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

629 logging.DEBUG 

630 ): # pragma: NO COVER 

631 request_url = "{host}{uri}".format( 

632 host=self._host, uri=transcoded_request["uri"] 

633 ) 

634 method = transcoded_request["method"] 

635 try: 

636 request_payload = type(request).to_json(request) 

637 except: 

638 request_payload = None 

639 http_request = { 

640 "payload": request_payload, 

641 "requestMethod": method, 

642 "requestUrl": request_url, 

643 "headers": dict(metadata), 

644 } 

645 _LOGGER.debug( 

646 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.GenerateIdToken", 

647 extra={ 

648 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

649 "rpcName": "GenerateIdToken", 

650 "httpRequest": http_request, 

651 "metadata": http_request["headers"], 

652 }, 

653 ) 

654 

655 # Send the request 

656 response = IAMCredentialsRestTransport._GenerateIdToken._get_response( 

657 self._host, 

658 metadata, 

659 query_params, 

660 self._session, 

661 timeout, 

662 transcoded_request, 

663 body, 

664 ) 

665 

666 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

667 # subclass. 

668 if response.status_code >= 400: 

669 raise core_exceptions.from_http_response(response) 

670 

671 # Return the response 

672 resp = common.GenerateIdTokenResponse() 

673 pb_resp = common.GenerateIdTokenResponse.pb(resp) 

674 

675 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

676 

677 resp = self._interceptor.post_generate_id_token(resp) 

678 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

679 resp, _ = self._interceptor.post_generate_id_token_with_metadata( 

680 resp, response_metadata 

681 ) 

682 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

683 logging.DEBUG 

684 ): # pragma: NO COVER 

685 try: 

686 response_payload = common.GenerateIdTokenResponse.to_json(response) 

687 except: 

688 response_payload = None 

689 http_response = { 

690 "payload": response_payload, 

691 "headers": dict(response.headers), 

692 "status": response.status_code, 

693 } 

694 _LOGGER.debug( 

695 "Received response for google.iam.credentials_v1.IAMCredentialsClient.generate_id_token", 

696 extra={ 

697 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

698 "rpcName": "GenerateIdToken", 

699 "metadata": http_response["headers"], 

700 "httpResponse": http_response, 

701 }, 

702 ) 

703 return resp 

704 

705 class _SignBlob( 

706 _BaseIAMCredentialsRestTransport._BaseSignBlob, IAMCredentialsRestStub 

707 ): 

708 def __hash__(self): 

709 return hash("IAMCredentialsRestTransport.SignBlob") 

710 

711 @staticmethod 

712 def _get_response( 

713 host, 

714 metadata, 

715 query_params, 

716 session, 

717 timeout, 

718 transcoded_request, 

719 body=None, 

720 ): 

721 uri = transcoded_request["uri"] 

722 method = transcoded_request["method"] 

723 headers = dict(metadata) 

724 headers["Content-Type"] = "application/json" 

725 response = getattr(session, method)( 

726 "{host}{uri}".format(host=host, uri=uri), 

727 timeout=timeout, 

728 headers=headers, 

729 params=rest_helpers.flatten_query_params(query_params, strict=True), 

730 data=body, 

731 ) 

732 return response 

733 

734 def __call__( 

735 self, 

736 request: common.SignBlobRequest, 

737 *, 

738 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

739 timeout: Optional[float] = None, 

740 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

741 ) -> common.SignBlobResponse: 

742 r"""Call the sign blob method over HTTP. 

743 

744 Args: 

745 request (~.common.SignBlobRequest): 

746 The request object. 

747 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

748 should be retried. 

749 timeout (float): The timeout for this request. 

750 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

751 sent along with the request as metadata. Normally, each value must be of type `str`, 

752 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

753 be of type `bytes`. 

754 

755 Returns: 

756 ~.common.SignBlobResponse: 

757 

758 """ 

759 

760 http_options = ( 

761 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_http_options() 

762 ) 

763 

764 request, metadata = self._interceptor.pre_sign_blob(request, metadata) 

765 transcoded_request = ( 

766 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_transcoded_request( 

767 http_options, request 

768 ) 

769 ) 

770 

771 body = ( 

772 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_request_body_json( 

773 transcoded_request 

774 ) 

775 ) 

776 

777 # Jsonify the query params 

778 query_params = ( 

779 _BaseIAMCredentialsRestTransport._BaseSignBlob._get_query_params_json( 

780 transcoded_request 

781 ) 

782 ) 

783 

784 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

785 logging.DEBUG 

786 ): # pragma: NO COVER 

787 request_url = "{host}{uri}".format( 

788 host=self._host, uri=transcoded_request["uri"] 

789 ) 

790 method = transcoded_request["method"] 

791 try: 

792 request_payload = type(request).to_json(request) 

793 except: 

794 request_payload = None 

795 http_request = { 

796 "payload": request_payload, 

797 "requestMethod": method, 

798 "requestUrl": request_url, 

799 "headers": dict(metadata), 

800 } 

801 _LOGGER.debug( 

802 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.SignBlob", 

803 extra={ 

804 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

805 "rpcName": "SignBlob", 

806 "httpRequest": http_request, 

807 "metadata": http_request["headers"], 

808 }, 

809 ) 

810 

811 # Send the request 

812 response = IAMCredentialsRestTransport._SignBlob._get_response( 

813 self._host, 

814 metadata, 

815 query_params, 

816 self._session, 

817 timeout, 

818 transcoded_request, 

819 body, 

820 ) 

821 

822 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

823 # subclass. 

824 if response.status_code >= 400: 

825 raise core_exceptions.from_http_response(response) 

826 

827 # Return the response 

828 resp = common.SignBlobResponse() 

829 pb_resp = common.SignBlobResponse.pb(resp) 

830 

831 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

832 

833 resp = self._interceptor.post_sign_blob(resp) 

834 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

835 resp, _ = self._interceptor.post_sign_blob_with_metadata( 

836 resp, response_metadata 

837 ) 

838 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

839 logging.DEBUG 

840 ): # pragma: NO COVER 

841 try: 

842 response_payload = common.SignBlobResponse.to_json(response) 

843 except: 

844 response_payload = None 

845 http_response = { 

846 "payload": response_payload, 

847 "headers": dict(response.headers), 

848 "status": response.status_code, 

849 } 

850 _LOGGER.debug( 

851 "Received response for google.iam.credentials_v1.IAMCredentialsClient.sign_blob", 

852 extra={ 

853 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

854 "rpcName": "SignBlob", 

855 "metadata": http_response["headers"], 

856 "httpResponse": http_response, 

857 }, 

858 ) 

859 return resp 

860 

861 class _SignJwt( 

862 _BaseIAMCredentialsRestTransport._BaseSignJwt, IAMCredentialsRestStub 

863 ): 

864 def __hash__(self): 

865 return hash("IAMCredentialsRestTransport.SignJwt") 

866 

867 @staticmethod 

868 def _get_response( 

869 host, 

870 metadata, 

871 query_params, 

872 session, 

873 timeout, 

874 transcoded_request, 

875 body=None, 

876 ): 

877 uri = transcoded_request["uri"] 

878 method = transcoded_request["method"] 

879 headers = dict(metadata) 

880 headers["Content-Type"] = "application/json" 

881 response = getattr(session, method)( 

882 "{host}{uri}".format(host=host, uri=uri), 

883 timeout=timeout, 

884 headers=headers, 

885 params=rest_helpers.flatten_query_params(query_params, strict=True), 

886 data=body, 

887 ) 

888 return response 

889 

890 def __call__( 

891 self, 

892 request: common.SignJwtRequest, 

893 *, 

894 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

895 timeout: Optional[float] = None, 

896 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

897 ) -> common.SignJwtResponse: 

898 r"""Call the sign jwt method over HTTP. 

899 

900 Args: 

901 request (~.common.SignJwtRequest): 

902 The request object. 

903 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

904 should be retried. 

905 timeout (float): The timeout for this request. 

906 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

907 sent along with the request as metadata. Normally, each value must be of type `str`, 

908 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

909 be of type `bytes`. 

910 

911 Returns: 

912 ~.common.SignJwtResponse: 

913 

914 """ 

915 

916 http_options = ( 

917 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_http_options() 

918 ) 

919 

920 request, metadata = self._interceptor.pre_sign_jwt(request, metadata) 

921 transcoded_request = ( 

922 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_transcoded_request( 

923 http_options, request 

924 ) 

925 ) 

926 

927 body = _BaseIAMCredentialsRestTransport._BaseSignJwt._get_request_body_json( 

928 transcoded_request 

929 ) 

930 

931 # Jsonify the query params 

932 query_params = ( 

933 _BaseIAMCredentialsRestTransport._BaseSignJwt._get_query_params_json( 

934 transcoded_request 

935 ) 

936 ) 

937 

938 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

939 logging.DEBUG 

940 ): # pragma: NO COVER 

941 request_url = "{host}{uri}".format( 

942 host=self._host, uri=transcoded_request["uri"] 

943 ) 

944 method = transcoded_request["method"] 

945 try: 

946 request_payload = type(request).to_json(request) 

947 except: 

948 request_payload = None 

949 http_request = { 

950 "payload": request_payload, 

951 "requestMethod": method, 

952 "requestUrl": request_url, 

953 "headers": dict(metadata), 

954 } 

955 _LOGGER.debug( 

956 f"Sending request for google.iam.credentials_v1.IAMCredentialsClient.SignJwt", 

957 extra={ 

958 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

959 "rpcName": "SignJwt", 

960 "httpRequest": http_request, 

961 "metadata": http_request["headers"], 

962 }, 

963 ) 

964 

965 # Send the request 

966 response = IAMCredentialsRestTransport._SignJwt._get_response( 

967 self._host, 

968 metadata, 

969 query_params, 

970 self._session, 

971 timeout, 

972 transcoded_request, 

973 body, 

974 ) 

975 

976 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

977 # subclass. 

978 if response.status_code >= 400: 

979 raise core_exceptions.from_http_response(response) 

980 

981 # Return the response 

982 resp = common.SignJwtResponse() 

983 pb_resp = common.SignJwtResponse.pb(resp) 

984 

985 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

986 

987 resp = self._interceptor.post_sign_jwt(resp) 

988 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

989 resp, _ = self._interceptor.post_sign_jwt_with_metadata( 

990 resp, response_metadata 

991 ) 

992 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

993 logging.DEBUG 

994 ): # pragma: NO COVER 

995 try: 

996 response_payload = common.SignJwtResponse.to_json(response) 

997 except: 

998 response_payload = None 

999 http_response = { 

1000 "payload": response_payload, 

1001 "headers": dict(response.headers), 

1002 "status": response.status_code, 

1003 } 

1004 _LOGGER.debug( 

1005 "Received response for google.iam.credentials_v1.IAMCredentialsClient.sign_jwt", 

1006 extra={ 

1007 "serviceName": "google.iam.credentials.v1.IAMCredentials", 

1008 "rpcName": "SignJwt", 

1009 "metadata": http_response["headers"], 

1010 "httpResponse": http_response, 

1011 }, 

1012 ) 

1013 return resp 

1014 

1015 @property 

1016 def generate_access_token( 

1017 self, 

1018 ) -> Callable[ 

1019 [common.GenerateAccessTokenRequest], common.GenerateAccessTokenResponse 

1020 ]: 

1021 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1022 # In C++ this would require a dynamic_cast 

1023 return self._GenerateAccessToken(self._session, self._host, self._interceptor) # type: ignore 

1024 

1025 @property 

1026 def generate_id_token( 

1027 self, 

1028 ) -> Callable[[common.GenerateIdTokenRequest], common.GenerateIdTokenResponse]: 

1029 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1030 # In C++ this would require a dynamic_cast 

1031 return self._GenerateIdToken(self._session, self._host, self._interceptor) # type: ignore 

1032 

1033 @property 

1034 def sign_blob(self) -> Callable[[common.SignBlobRequest], common.SignBlobResponse]: 

1035 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1036 # In C++ this would require a dynamic_cast 

1037 return self._SignBlob(self._session, self._host, self._interceptor) # type: ignore 

1038 

1039 @property 

1040 def sign_jwt(self) -> Callable[[common.SignJwtRequest], common.SignJwtResponse]: 

1041 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1042 # In C++ this would require a dynamic_cast 

1043 return self._SignJwt(self._session, self._host, self._interceptor) # type: ignore 

1044 

1045 @property 

1046 def kind(self) -> str: 

1047 return "rest" 

1048 

1049 def close(self): 

1050 self._session.close() 

1051 

1052 

1053__all__ = ("IAMCredentialsRestTransport",)