Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/impersonated_credentials.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

204 statements  

1# Copyright 2018 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Google Cloud Impersonated credentials. 

16 

17This module provides authentication for applications where local credentials 

18impersonates a remote service account using `IAM Credentials API`_. 

19 

20This class can be used to impersonate a service account as long as the original 

21Credential object has the "Service Account Token Creator" role on the target 

22service account. 

23 

24 .. _IAM Credentials API: 

25 https://cloud.google.com/iam/credentials/reference/rest/ 

26""" 

27 

28import base64 

29import copy 

30from datetime import datetime 

31import http.client as http_client 

32import json 

33 

34from google.auth import _exponential_backoff 

35from google.auth import _helpers 

36from google.auth import credentials 

37from google.auth import exceptions 

38from google.auth import iam 

39from google.auth import jwt 

40from google.auth import metrics 

41from google.oauth2 import _client 

42 

43 

44_REFRESH_ERROR = "Unable to acquire impersonated credentials" 

45 

46_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

47 

48_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" 

49_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( 

50 "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" 

51) 

52 

53_SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE = "authorized_user" 

54_SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE = "service_account" 

55_SOURCE_CREDENTIAL_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE = ( 

56 "external_account_authorized_user" 

57) 

58 

59 

60def _make_iam_token_request( 

61 request, 

62 principal, 

63 headers, 

64 body, 

65 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 

66 iam_endpoint_override=None, 

67): 

68 """Makes a request to the Google Cloud IAM service for an access token. 

69 Args: 

70 request (Request): The Request object to use. 

71 principal (str): The principal to request an access token for. 

72 headers (Mapping[str, str]): Map of headers to transmit. 

73 body (Mapping[str, str]): JSON Payload body for the iamcredentials 

74 API call. 

75 iam_endpoint_override (Optiona[str]): The full IAM endpoint override 

76 with the target_principal embedded. This is useful when supporting 

77 impersonation with regional endpoints. 

78 

79 Raises: 

80 google.auth.exceptions.TransportError: Raised if there is an underlying 

81 HTTP connection error 

82 google.auth.exceptions.RefreshError: Raised if the impersonated 

83 credentials are not available. Common reasons are 

84 `iamcredentials.googleapis.com` is not enabled or the 

85 `Service Account Token Creator` is not assigned 

86 """ 

87 iam_endpoint = iam_endpoint_override or iam._IAM_ENDPOINT.replace( 

88 credentials.DEFAULT_UNIVERSE_DOMAIN, universe_domain 

89 ).format(principal) 

90 

91 body = json.dumps(body).encode("utf-8") 

92 

93 response = request(url=iam_endpoint, method="POST", headers=headers, body=body) 

94 

95 # support both string and bytes type response.data 

96 response_body = ( 

97 response.data.decode("utf-8") 

98 if hasattr(response.data, "decode") 

99 else response.data 

100 ) 

101 

102 if response.status != http_client.OK: 

103 raise exceptions.RefreshError(_REFRESH_ERROR, response_body) 

104 

105 try: 

106 token_response = json.loads(response_body) 

107 token = token_response["accessToken"] 

108 expiry = datetime.strptime(token_response["expireTime"], "%Y-%m-%dT%H:%M:%SZ") 

109 

110 return token, expiry 

111 

112 except (KeyError, ValueError) as caught_exc: 

113 new_exc = exceptions.RefreshError( 

114 "{}: No access token or invalid expiration in response.".format( 

115 _REFRESH_ERROR 

116 ), 

117 response_body, 

118 ) 

119 raise new_exc from caught_exc 

120 

121 

122class Credentials( 

123 credentials.Scoped, 

124 credentials.CredentialsWithQuotaProject, 

125 credentials.Signing, 

126 credentials.CredentialsWithTrustBoundary, 

127): 

128 """This module defines impersonated credentials which are essentially 

129 impersonated identities. 

130 

131 Impersonated Credentials allows credentials issued to a user or 

132 service account to impersonate another. The target service account must 

133 grant the originating credential principal the 

134 `Service Account Token Creator`_ IAM role: 

135 

136 For more information about Token Creator IAM role and 

137 IAMCredentials API, see 

138 `Creating Short-Lived Service Account Credentials`_. 

139 

140 .. _Service Account Token Creator: 

141 https://cloud.google.com/iam/docs/service-accounts#the_service_account_token_creator_role 

142 

143 .. _Creating Short-Lived Service Account Credentials: 

144 https://cloud.google.com/iam/docs/creating-short-lived-service-account-credentials 

145 

146 Usage: 

147 

148 First grant source_credentials the `Service Account Token Creator` 

149 role on the target account to impersonate. In this example, the 

150 service account represented by svc_account.json has the 

151 token creator role on 

152 `impersonated-account@_project_.iam.gserviceaccount.com`. 

153 

154 Enable the IAMCredentials API on the source project: 

155 `gcloud services enable iamcredentials.googleapis.com`. 

156 

157 Initialize a source credential which does not have access to 

158 list bucket:: 

159 

160 from google.oauth2 import service_account 

161 

162 target_scopes = [ 

163 'https://www.googleapis.com/auth/devstorage.read_only'] 

164 

165 source_credentials = ( 

166 service_account.Credentials.from_service_account_file( 

167 '/path/to/svc_account.json', 

168 scopes=target_scopes)) 

169 

170 Now use the source credentials to acquire credentials to impersonate 

171 another service account:: 

172 

173 from google.auth import impersonated_credentials 

174 

175 target_credentials = impersonated_credentials.Credentials( 

176 source_credentials=source_credentials, 

177 target_principal='impersonated-account@_project_.iam.gserviceaccount.com', 

178 target_scopes = target_scopes, 

179 lifetime=500) 

180 

181 Resource access is granted:: 

182 

183 client = storage.Client(credentials=target_credentials) 

184 buckets = client.list_buckets(project='your_project') 

185 for bucket in buckets: 

186 print(bucket.name) 

187 

188 **IMPORTANT**: 

189 This class does not validate the credential configuration. A security 

190 risk occurs when a credential configuration configured with malicious urls 

191 is used. 

192 When the credential configuration is accepted from an 

193 untrusted source, you should validate it before using. 

194 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. 

195 """ 

196 

197 def __init__( 

198 self, 

199 source_credentials, 

200 target_principal, 

201 target_scopes, 

202 delegates=None, 

203 subject=None, 

204 lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

205 quota_project_id=None, 

206 iam_endpoint_override=None, 

207 trust_boundary=None, 

208 ): 

209 """ 

210 Args: 

211 source_credentials (google.auth.Credentials): The source credential 

212 used as to acquire the impersonated credentials. 

213 target_principal (str): The service account to impersonate. 

214 target_scopes (Sequence[str]): Scopes to request during the 

215 authorization grant. 

216 delegates (Sequence[str]): The chained list of delegates required 

217 to grant the final access_token. If set, the sequence of 

218 identities must have "Service Account Token Creator" capability 

219 granted to the prceeding identity. For example, if set to 

220 [serviceAccountB, serviceAccountC], the source_credential 

221 must have the Token Creator role on serviceAccountB. 

222 serviceAccountB must have the Token Creator on 

223 serviceAccountC. 

224 Finally, C must have Token Creator on target_principal. 

225 If left unset, source_credential must have that role on 

226 target_principal. 

227 lifetime (int): Number of seconds the delegated credential should 

228 be valid for (upto 3600). 

229 quota_project_id (Optional[str]): The project ID used for quota and billing. 

230 This project may be different from the project used to 

231 create the credentials. 

232 iam_endpoint_override (Optional[str]): The full IAM endpoint override 

233 with the target_principal embedded. This is useful when supporting 

234 impersonation with regional endpoints. 

235 subject (Optional[str]): sub field of a JWT. This field should only be set 

236 if you wish to impersonate as a user. This feature is useful when 

237 using domain wide delegation. 

238 trust_boundary (Mapping[str,str]): A credential trust boundary. 

239 """ 

240 

241 super(Credentials, self).__init__() 

242 

243 self._source_credentials = copy.copy(source_credentials) 

244 # Service account source credentials must have the _IAM_SCOPE 

245 # added to refresh correctly. User credentials cannot have 

246 # their original scopes modified. 

247 if isinstance(self._source_credentials, credentials.Scoped): 

248 self._source_credentials = self._source_credentials.with_scopes( 

249 iam._IAM_SCOPE 

250 ) 

251 # If the source credential is service account and self signed jwt 

252 # is needed, we need to create a jwt credential inside it 

253 if ( 

254 hasattr(self._source_credentials, "_create_self_signed_jwt") 

255 and self._source_credentials._always_use_jwt_access 

256 ): 

257 self._source_credentials._create_self_signed_jwt(None) 

258 

259 self._universe_domain = source_credentials.universe_domain 

260 self._target_principal = target_principal 

261 self._target_scopes = target_scopes 

262 self._delegates = delegates 

263 self._subject = subject 

264 self._lifetime = lifetime or _DEFAULT_TOKEN_LIFETIME_SECS 

265 self.token = None 

266 self.expiry = _helpers.utcnow() 

267 self._quota_project_id = quota_project_id 

268 self._iam_endpoint_override = iam_endpoint_override 

269 self._cred_file_path = None 

270 self._trust_boundary = trust_boundary 

271 

272 def _metric_header_for_usage(self): 

273 return metrics.CRED_TYPE_SA_IMPERSONATE 

274 

275 def _refresh_token(self, request): 

276 """Updates credentials with a new access_token representing 

277 the impersonated account. 

278 

279 Args: 

280 request (google.auth.transport.requests.Request): Request object 

281 to use for refreshing credentials. 

282 """ 

283 

284 # Refresh our source credentials if it is not valid. 

285 if ( 

286 self._source_credentials.token_state == credentials.TokenState.STALE 

287 or self._source_credentials.token_state == credentials.TokenState.INVALID 

288 ): 

289 self._source_credentials._refresh_token(request) 

290 

291 body = { 

292 "delegates": self._delegates, 

293 "scope": self._target_scopes, 

294 "lifetime": str(self._lifetime) + "s", 

295 } 

296 

297 headers = { 

298 "Content-Type": "application/json", 

299 metrics.API_CLIENT_HEADER: metrics.token_request_access_token_impersonate(), 

300 } 

301 

302 # Apply the source credentials authentication info. 

303 self._source_credentials.apply(headers) 

304 

305 # If a subject is specified a domain-wide delegation auth-flow is initiated 

306 # to impersonate as the provided subject (user). 

307 if self._subject: 

308 if self.universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 

309 raise exceptions.GoogleAuthError( 

310 "Domain-wide delegation is not supported in universes other " 

311 + "than googleapis.com" 

312 ) 

313 

314 now = _helpers.utcnow() 

315 payload = { 

316 "iss": self._target_principal, 

317 "scope": _helpers.scopes_to_string(self._target_scopes or ()), 

318 "sub": self._subject, 

319 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, 

320 "iat": _helpers.datetime_to_secs(now), 

321 "exp": _helpers.datetime_to_secs(now) + _DEFAULT_TOKEN_LIFETIME_SECS, 

322 } 

323 

324 assertion = _sign_jwt_request( 

325 request=request, 

326 principal=self._target_principal, 

327 headers=headers, 

328 payload=payload, 

329 delegates=self._delegates, 

330 ) 

331 

332 self.token, self.expiry, _ = _client.jwt_grant( 

333 request, _GOOGLE_OAUTH2_TOKEN_ENDPOINT, assertion 

334 ) 

335 

336 return 

337 

338 self.token, self.expiry = _make_iam_token_request( 

339 request=request, 

340 principal=self._target_principal, 

341 headers=headers, 

342 body=body, 

343 universe_domain=self.universe_domain, 

344 iam_endpoint_override=self._iam_endpoint_override, 

345 ) 

346 

347 def _build_trust_boundary_lookup_url(self): 

348 """Builds and returns the URL for the trust boundary lookup API. 

349 

350 This method constructs the specific URL for the IAM Credentials API's 

351 `allowedLocations` endpoint, using the credential's universe domain 

352 and service account email. 

353 

354 Raises: 

355 ValueError: If `self.service_account_email` is None or an empty 

356 string, as it's required to form the URL. 

357 

358 Returns: 

359 str: The URL for the trust boundary lookup endpoint. 

360 """ 

361 if not self.service_account_email: 

362 raise ValueError( 

363 "Service account email is required to build the trust boundary lookup URL." 

364 ) 

365 return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( 

366 self.universe_domain, self.service_account_email 

367 ) 

368 

369 def sign_bytes(self, message): 

370 from google.auth.transport.requests import AuthorizedSession 

371 

372 iam_sign_endpoint = iam._IAM_SIGN_ENDPOINT.replace( 

373 credentials.DEFAULT_UNIVERSE_DOMAIN, self.universe_domain 

374 ).format(self._target_principal) 

375 

376 body = { 

377 "payload": base64.b64encode(message).decode("utf-8"), 

378 "delegates": self._delegates, 

379 } 

380 

381 headers = {"Content-Type": "application/json"} 

382 

383 authed_session = AuthorizedSession(self._source_credentials) 

384 

385 try: 

386 retries = _exponential_backoff.ExponentialBackoff() 

387 for _ in retries: 

388 response = authed_session.post( 

389 url=iam_sign_endpoint, headers=headers, json=body 

390 ) 

391 if response.status_code in iam.IAM_RETRY_CODES: 

392 continue 

393 if response.status_code != http_client.OK: 

394 raise exceptions.TransportError( 

395 "Error calling sign_bytes: {}".format(response.json()) 

396 ) 

397 

398 return base64.b64decode(response.json()["signedBlob"]) 

399 finally: 

400 authed_session.close() 

401 raise exceptions.TransportError("exhausted signBlob endpoint retries") 

402 

403 @property 

404 def signer_email(self): 

405 return self._target_principal 

406 

407 @property 

408 def service_account_email(self): 

409 return self._target_principal 

410 

411 @property 

412 def signer(self): 

413 return self 

414 

415 @property 

416 def requires_scopes(self): 

417 return not self._target_scopes 

418 

419 @_helpers.copy_docstring(credentials.Credentials) 

420 def get_cred_info(self): 

421 if self._cred_file_path: 

422 return { 

423 "credential_source": self._cred_file_path, 

424 "credential_type": "impersonated credentials", 

425 "principal": self._target_principal, 

426 } 

427 return None 

428 

429 def _make_copy(self): 

430 cred = self.__class__( 

431 self._source_credentials, 

432 target_principal=self._target_principal, 

433 target_scopes=self._target_scopes, 

434 delegates=self._delegates, 

435 lifetime=self._lifetime, 

436 quota_project_id=self._quota_project_id, 

437 iam_endpoint_override=self._iam_endpoint_override, 

438 trust_boundary=self._trust_boundary, 

439 ) 

440 cred._cred_file_path = self._cred_file_path 

441 return cred 

442 

443 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) 

444 def with_trust_boundary(self, trust_boundary): 

445 cred = self._make_copy() 

446 cred._trust_boundary = trust_boundary 

447 return cred 

448 

449 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

450 def with_quota_project(self, quota_project_id): 

451 cred = self._make_copy() 

452 cred._quota_project_id = quota_project_id 

453 return cred 

454 

455 @_helpers.copy_docstring(credentials.Scoped) 

456 def with_scopes(self, scopes, default_scopes=None): 

457 cred = self._make_copy() 

458 cred._target_scopes = scopes or default_scopes 

459 return cred 

460 

461 @classmethod 

462 def from_impersonated_service_account_info(cls, info, scopes=None): 

463 """Creates a Credentials instance from parsed impersonated service account credentials info. 

464 

465 **IMPORTANT**: 

466 This method does not validate the credential configuration. A security 

467 risk occurs when a credential configuration configured with malicious urls 

468 is used. 

469 When the credential configuration is accepted from an 

470 untrusted source, you should validate it before using with this method. 

471 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. 

472 

473 Args: 

474 info (Mapping[str, str]): The impersonated service account credentials info in Google 

475 format. 

476 scopes (Sequence[str]): Optional list of scopes to include in the 

477 credentials. 

478 

479 Returns: 

480 google.oauth2.credentials.Credentials: The constructed 

481 credentials. 

482 

483 Raises: 

484 InvalidType: If the info["source_credentials"] are not a supported impersonation type 

485 InvalidValue: If the info["service_account_impersonation_url"] is not in the expected format. 

486 ValueError: If the info is not in the expected format. 

487 """ 

488 

489 source_credentials_info = info.get("source_credentials") 

490 source_credentials_type = source_credentials_info.get("type") 

491 if source_credentials_type == _SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE: 

492 from google.oauth2 import credentials 

493 

494 source_credentials = credentials.Credentials.from_authorized_user_info( 

495 source_credentials_info 

496 ) 

497 elif source_credentials_type == _SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE: 

498 from google.oauth2 import service_account 

499 

500 source_credentials = service_account.Credentials.from_service_account_info( 

501 source_credentials_info 

502 ) 

503 elif ( 

504 source_credentials_type 

505 == _SOURCE_CREDENTIAL_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE 

506 ): 

507 from google.auth import external_account_authorized_user 

508 

509 source_credentials = external_account_authorized_user.Credentials.from_info( 

510 source_credentials_info 

511 ) 

512 else: 

513 raise exceptions.InvalidType( 

514 "source credential of type {} is not supported.".format( 

515 source_credentials_type 

516 ) 

517 ) 

518 

519 impersonation_url = info.get("service_account_impersonation_url") 

520 start_index = impersonation_url.rfind("/") 

521 end_index = impersonation_url.find(":generateAccessToken") 

522 if start_index == -1 or end_index == -1 or start_index > end_index: 

523 raise exceptions.InvalidValue( 

524 "Cannot extract target principal from {}".format(impersonation_url) 

525 ) 

526 target_principal = impersonation_url[start_index + 1 : end_index] 

527 delegates = info.get("delegates") 

528 quota_project_id = info.get("quota_project_id") 

529 scopes = scopes or info.get("scopes") 

530 trust_boundary = info.get("trust_boundary") 

531 

532 return cls( 

533 source_credentials, 

534 target_principal, 

535 scopes, 

536 delegates, 

537 quota_project_id=quota_project_id, 

538 trust_boundary=trust_boundary, 

539 ) 

540 

541 

542class IDTokenCredentials(credentials.CredentialsWithQuotaProject): 

543 """Open ID Connect ID Token-based service account credentials.""" 

544 

545 def __init__( 

546 self, 

547 target_credentials, 

548 target_audience=None, 

549 include_email=False, 

550 quota_project_id=None, 

551 ): 

552 """ 

553 Args: 

554 target_credentials (google.auth.Credentials): The target 

555 credential used as to acquire the id tokens for. 

556 target_audience (string): Audience to issue the token for. 

557 include_email (bool): Include email in IdToken 

558 quota_project_id (Optional[str]): The project ID used for 

559 quota and billing. 

560 """ 

561 super(IDTokenCredentials, self).__init__() 

562 

563 if not isinstance(target_credentials, Credentials): 

564 raise exceptions.GoogleAuthError( 

565 "Provided Credential must be " "impersonated_credentials" 

566 ) 

567 self._target_credentials = target_credentials 

568 self._target_audience = target_audience 

569 self._include_email = include_email 

570 self._quota_project_id = quota_project_id 

571 

572 def from_credentials(self, target_credentials, target_audience=None): 

573 return self.__class__( 

574 target_credentials=target_credentials, 

575 target_audience=target_audience, 

576 include_email=self._include_email, 

577 quota_project_id=self._quota_project_id, 

578 ) 

579 

580 def with_target_audience(self, target_audience): 

581 return self.__class__( 

582 target_credentials=self._target_credentials, 

583 target_audience=target_audience, 

584 include_email=self._include_email, 

585 quota_project_id=self._quota_project_id, 

586 ) 

587 

588 def with_include_email(self, include_email): 

589 return self.__class__( 

590 target_credentials=self._target_credentials, 

591 target_audience=self._target_audience, 

592 include_email=include_email, 

593 quota_project_id=self._quota_project_id, 

594 ) 

595 

596 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

597 def with_quota_project(self, quota_project_id): 

598 return self.__class__( 

599 target_credentials=self._target_credentials, 

600 target_audience=self._target_audience, 

601 include_email=self._include_email, 

602 quota_project_id=quota_project_id, 

603 ) 

604 

605 @_helpers.copy_docstring(credentials.Credentials) 

606 def refresh(self, request): 

607 from google.auth.transport.requests import AuthorizedSession 

608 

609 iam_sign_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace( 

610 credentials.DEFAULT_UNIVERSE_DOMAIN, 

611 self._target_credentials.universe_domain, 

612 ).format(self._target_credentials.signer_email) 

613 

614 body = { 

615 "audience": self._target_audience, 

616 "delegates": self._target_credentials._delegates, 

617 "includeEmail": self._include_email, 

618 } 

619 

620 headers = { 

621 "Content-Type": "application/json", 

622 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_impersonate(), 

623 } 

624 

625 authed_session = AuthorizedSession( 

626 self._target_credentials._source_credentials, auth_request=request 

627 ) 

628 

629 try: 

630 response = authed_session.post( 

631 url=iam_sign_endpoint, 

632 headers=headers, 

633 data=json.dumps(body).encode("utf-8"), 

634 ) 

635 finally: 

636 authed_session.close() 

637 

638 if response.status_code != http_client.OK: 

639 raise exceptions.RefreshError( 

640 "Error getting ID token: {}".format(response.json()) 

641 ) 

642 

643 id_token = response.json()["token"] 

644 self.token = id_token 

645 self.expiry = datetime.utcfromtimestamp( 

646 jwt.decode(id_token, verify=False)["exp"] 

647 ) 

648 

649 

650def _sign_jwt_request(request, principal, headers, payload, delegates=[]): 

651 """Makes a request to the Google Cloud IAM service to sign a JWT using a 

652 service account's system-managed private key. 

653 Args: 

654 request (Request): The Request object to use. 

655 principal (str): The principal to request an access token for. 

656 headers (Mapping[str, str]): Map of headers to transmit. 

657 payload (Mapping[str, str]): The JWT payload to sign. Must be a 

658 serialized JSON object that contains a JWT Claims Set. 

659 delegates (Sequence[str]): The chained list of delegates required 

660 to grant the final access_token. If set, the sequence of 

661 identities must have "Service Account Token Creator" capability 

662 granted to the prceeding identity. For example, if set to 

663 [serviceAccountB, serviceAccountC], the source_credential 

664 must have the Token Creator role on serviceAccountB. 

665 serviceAccountB must have the Token Creator on 

666 serviceAccountC. 

667 Finally, C must have Token Creator on target_principal. 

668 If left unset, source_credential must have that role on 

669 target_principal. 

670 

671 Raises: 

672 google.auth.exceptions.TransportError: Raised if there is an underlying 

673 HTTP connection error 

674 google.auth.exceptions.RefreshError: Raised if the impersonated 

675 credentials are not available. Common reasons are 

676 `iamcredentials.googleapis.com` is not enabled or the 

677 `Service Account Token Creator` is not assigned 

678 """ 

679 iam_endpoint = iam._IAM_SIGNJWT_ENDPOINT.format(principal) 

680 

681 body = {"delegates": delegates, "payload": json.dumps(payload)} 

682 body = json.dumps(body).encode("utf-8") 

683 

684 response = request(url=iam_endpoint, method="POST", headers=headers, body=body) 

685 

686 # support both string and bytes type response.data 

687 response_body = ( 

688 response.data.decode("utf-8") 

689 if hasattr(response.data, "decode") 

690 else response.data 

691 ) 

692 

693 if response.status != http_client.OK: 

694 raise exceptions.RefreshError(_REFRESH_ERROR, response_body) 

695 

696 try: 

697 jwt_response = json.loads(response_body) 

698 signed_jwt = jwt_response["signedJwt"] 

699 return signed_jwt 

700 

701 except (KeyError, ValueError) as caught_exc: 

702 new_exc = exceptions.RefreshError( 

703 "{}: No signed JWT in response.".format(_REFRESH_ERROR), response_body 

704 ) 

705 raise new_exc from caught_exc