Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/impersonated_credentials.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

202 statements  

1# Copyright 2018 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Google Cloud Impersonated credentials. 

16 

17This module provides authentication for applications where local credentials 

18impersonates a remote service account using `IAM Credentials API`_. 

19 

20This class can be used to impersonate a service account as long as the original 

21Credential object has the "Service Account Token Creator" role on the target 

22service account. 

23 

24 .. _IAM Credentials API: 

25 https://cloud.google.com/iam/credentials/reference/rest/ 

26""" 

27 

28import base64 

29import copy 

30from datetime import datetime 

31import http.client as http_client 

32import json 

33 

34from google.auth import _exponential_backoff 

35from google.auth import _helpers 

36from google.auth import credentials 

37from google.auth import exceptions 

38from google.auth import iam 

39from google.auth import jwt 

40from google.auth import metrics 

41from google.oauth2 import _client 

42 

43 

44_REFRESH_ERROR = "Unable to acquire impersonated credentials" 

45 

46_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

47 

48_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" 

49_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( 

50 "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" 

51) 

52 

53_SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE = "authorized_user" 

54_SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE = "service_account" 

55_SOURCE_CREDENTIAL_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE = ( 

56 "external_account_authorized_user" 

57) 

58 

59 

60def _make_iam_token_request( 

61 request, 

62 principal, 

63 headers, 

64 body, 

65 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 

66 iam_endpoint_override=None, 

67): 

68 """Makes a request to the Google Cloud IAM service for an access token. 

69 Args: 

70 request (Request): The Request object to use. 

71 principal (str): The principal to request an access token for. 

72 headers (Mapping[str, str]): Map of headers to transmit. 

73 body (Mapping[str, str]): JSON Payload body for the iamcredentials 

74 API call. 

75 iam_endpoint_override (Optiona[str]): The full IAM endpoint override 

76 with the target_principal embedded. This is useful when supporting 

77 impersonation with regional endpoints. 

78 

79 Raises: 

80 google.auth.exceptions.TransportError: Raised if there is an underlying 

81 HTTP connection error 

82 google.auth.exceptions.RefreshError: Raised if the impersonated 

83 credentials are not available. Common reasons are 

84 `iamcredentials.googleapis.com` is not enabled or the 

85 `Service Account Token Creator` is not assigned 

86 """ 

87 iam_endpoint = iam_endpoint_override or iam._IAM_ENDPOINT.replace( 

88 credentials.DEFAULT_UNIVERSE_DOMAIN, universe_domain 

89 ).format(principal) 

90 

91 body = json.dumps(body).encode("utf-8") 

92 

93 response = request(url=iam_endpoint, method="POST", headers=headers, body=body) 

94 

95 # support both string and bytes type response.data 

96 response_body = ( 

97 response.data.decode("utf-8") 

98 if hasattr(response.data, "decode") 

99 else response.data 

100 ) 

101 

102 if response.status != http_client.OK: 

103 raise exceptions.RefreshError(_REFRESH_ERROR, response_body) 

104 

105 try: 

106 token_response = json.loads(response_body) 

107 token = token_response["accessToken"] 

108 expiry = datetime.strptime(token_response["expireTime"], "%Y-%m-%dT%H:%M:%SZ") 

109 

110 return token, expiry 

111 

112 except (KeyError, ValueError) as caught_exc: 

113 new_exc = exceptions.RefreshError( 

114 "{}: No access token or invalid expiration in response.".format( 

115 _REFRESH_ERROR 

116 ), 

117 response_body, 

118 ) 

119 raise new_exc from caught_exc 

120 

121 

122class Credentials( 

123 credentials.Scoped, 

124 credentials.CredentialsWithQuotaProject, 

125 credentials.Signing, 

126 credentials.CredentialsWithTrustBoundary, 

127): 

128 """This module defines impersonated credentials which are essentially 

129 impersonated identities. 

130 

131 Impersonated Credentials allows credentials issued to a user or 

132 service account to impersonate another. The target service account must 

133 grant the originating credential principal the 

134 `Service Account Token Creator`_ IAM role: 

135 

136 For more information about Token Creator IAM role and 

137 IAMCredentials API, see 

138 `Creating Short-Lived Service Account Credentials`_. 

139 

140 .. _Service Account Token Creator: 

141 https://cloud.google.com/iam/docs/service-accounts#the_service_account_token_creator_role 

142 

143 .. _Creating Short-Lived Service Account Credentials: 

144 https://cloud.google.com/iam/docs/creating-short-lived-service-account-credentials 

145 

146 Usage: 

147 

148 First grant source_credentials the `Service Account Token Creator` 

149 role on the target account to impersonate. In this example, the 

150 service account represented by svc_account.json has the 

151 token creator role on 

152 `impersonated-account@_project_.iam.gserviceaccount.com`. 

153 

154 Enable the IAMCredentials API on the source project: 

155 `gcloud services enable iamcredentials.googleapis.com`. 

156 

157 Initialize a source credential which does not have access to 

158 list bucket:: 

159 

160 from google.oauth2 import service_account 

161 

162 target_scopes = [ 

163 'https://www.googleapis.com/auth/devstorage.read_only'] 

164 

165 source_credentials = ( 

166 service_account.Credentials.from_service_account_file( 

167 '/path/to/svc_account.json', 

168 scopes=target_scopes)) 

169 

170 Now use the source credentials to acquire credentials to impersonate 

171 another service account:: 

172 

173 from google.auth import impersonated_credentials 

174 

175 target_credentials = impersonated_credentials.Credentials( 

176 source_credentials=source_credentials, 

177 target_principal='impersonated-account@_project_.iam.gserviceaccount.com', 

178 target_scopes = target_scopes, 

179 lifetime=500) 

180 

181 Resource access is granted:: 

182 

183 client = storage.Client(credentials=target_credentials) 

184 buckets = client.list_buckets(project='your_project') 

185 for bucket in buckets: 

186 print(bucket.name) 

187 

188 **IMPORTANT**: 

189 This class does not validate the credential configuration. A security 

190 risk occurs when a credential configuration configured with malicious urls 

191 is used. 

192 When the credential configuration is accepted from an 

193 untrusted source, you should validate it before using. 

194 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. 

195 """ 

196 

197 def __init__( 

198 self, 

199 source_credentials, 

200 target_principal, 

201 target_scopes, 

202 delegates=None, 

203 subject=None, 

204 lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

205 quota_project_id=None, 

206 iam_endpoint_override=None, 

207 trust_boundary=None, 

208 ): 

209 """ 

210 Args: 

211 source_credentials (google.auth.Credentials): The source credential 

212 used as to acquire the impersonated credentials. 

213 target_principal (str): The service account to impersonate. 

214 target_scopes (Sequence[str]): Scopes to request during the 

215 authorization grant. 

216 delegates (Sequence[str]): The chained list of delegates required 

217 to grant the final access_token. If set, the sequence of 

218 identities must have "Service Account Token Creator" capability 

219 granted to the prceeding identity. For example, if set to 

220 [serviceAccountB, serviceAccountC], the source_credential 

221 must have the Token Creator role on serviceAccountB. 

222 serviceAccountB must have the Token Creator on 

223 serviceAccountC. 

224 Finally, C must have Token Creator on target_principal. 

225 If left unset, source_credential must have that role on 

226 target_principal. 

227 lifetime (int): Number of seconds the delegated credential should 

228 be valid for (upto 3600). 

229 quota_project_id (Optional[str]): The project ID used for quota and billing. 

230 This project may be different from the project used to 

231 create the credentials. 

232 iam_endpoint_override (Optional[str]): The full IAM endpoint override 

233 with the target_principal embedded. This is useful when supporting 

234 impersonation with regional endpoints. 

235 subject (Optional[str]): sub field of a JWT. This field should only be set 

236 if you wish to impersonate as a user. This feature is useful when 

237 using domain wide delegation. 

238 trust_boundary (Mapping[str,str]): A credential trust boundary. 

239 """ 

240 

241 super(Credentials, self).__init__() 

242 

243 self._source_credentials = copy.copy(source_credentials) 

244 # Service account source credentials must have the _IAM_SCOPE 

245 # added to refresh correctly. User credentials cannot have 

246 # their original scopes modified. 

247 if isinstance(self._source_credentials, credentials.Scoped): 

248 self._source_credentials = self._source_credentials.with_scopes( 

249 iam._IAM_SCOPE 

250 ) 

251 # If the source credential is service account and self signed jwt 

252 # is needed, we need to create a jwt credential inside it 

253 if ( 

254 hasattr(self._source_credentials, "_create_self_signed_jwt") 

255 and self._source_credentials._always_use_jwt_access 

256 ): 

257 self._source_credentials._create_self_signed_jwt(None) 

258 

259 self._universe_domain = source_credentials.universe_domain 

260 self._target_principal = target_principal 

261 self._target_scopes = target_scopes 

262 self._delegates = delegates 

263 self._subject = subject 

264 self._lifetime = lifetime or _DEFAULT_TOKEN_LIFETIME_SECS 

265 self.token = None 

266 self.expiry = _helpers.utcnow() 

267 self._quota_project_id = quota_project_id 

268 self._iam_endpoint_override = iam_endpoint_override 

269 self._cred_file_path = None 

270 self._trust_boundary = trust_boundary 

271 

272 def _metric_header_for_usage(self): 

273 return metrics.CRED_TYPE_SA_IMPERSONATE 

274 

275 def _refresh_token(self, request): 

276 """Updates credentials with a new access_token representing 

277 the impersonated account. 

278 

279 Args: 

280 request (google.auth.transport.requests.Request): Request object 

281 to use for refreshing credentials. 

282 """ 

283 

284 # Refresh our source credentials if it is not valid. 

285 if ( 

286 self._source_credentials.token_state == credentials.TokenState.STALE 

287 or self._source_credentials.token_state == credentials.TokenState.INVALID 

288 ): 

289 self._source_credentials.refresh(request) 

290 

291 body = { 

292 "delegates": self._delegates, 

293 "scope": self._target_scopes, 

294 "lifetime": str(self._lifetime) + "s", 

295 } 

296 

297 headers = { 

298 "Content-Type": "application/json", 

299 metrics.API_CLIENT_HEADER: metrics.token_request_access_token_impersonate(), 

300 } 

301 

302 # Apply the source credentials authentication info. 

303 self._source_credentials.apply(headers) 

304 

305 # If a subject is specified a domain-wide delegation auth-flow is initiated 

306 # to impersonate as the provided subject (user). 

307 if self._subject: 

308 if self.universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 

309 raise exceptions.GoogleAuthError( 

310 "Domain-wide delegation is not supported in universes other " 

311 + "than googleapis.com" 

312 ) 

313 

314 now = _helpers.utcnow() 

315 payload = { 

316 "iss": self._target_principal, 

317 "scope": _helpers.scopes_to_string(self._target_scopes or ()), 

318 "sub": self._subject, 

319 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, 

320 "iat": _helpers.datetime_to_secs(now), 

321 "exp": _helpers.datetime_to_secs(now) + _DEFAULT_TOKEN_LIFETIME_SECS, 

322 } 

323 

324 assertion = _sign_jwt_request( 

325 request=request, 

326 principal=self._target_principal, 

327 headers=headers, 

328 payload=payload, 

329 delegates=self._delegates, 

330 ) 

331 

332 self.token, self.expiry, _ = _client.jwt_grant( 

333 request, _GOOGLE_OAUTH2_TOKEN_ENDPOINT, assertion 

334 ) 

335 

336 return 

337 

338 self.token, self.expiry = _make_iam_token_request( 

339 request=request, 

340 principal=self._target_principal, 

341 headers=headers, 

342 body=body, 

343 universe_domain=self.universe_domain, 

344 iam_endpoint_override=self._iam_endpoint_override, 

345 ) 

346 

347 def _build_trust_boundary_lookup_url(self): 

348 """Builds and returns the URL for the trust boundary lookup API. 

349 

350 This method constructs the specific URL for the IAM Credentials API's 

351 `allowedLocations` endpoint, using the credential's universe domain 

352 and service account email. 

353 

354 Raises: 

355 ValueError: If `self.service_account_email` is None or an empty 

356 string, as it's required to form the URL. 

357 

358 Returns: 

359 str: The URL for the trust boundary lookup endpoint. 

360 """ 

361 if not self.service_account_email: 

362 raise ValueError( 

363 "Service account email is required to build the trust boundary lookup URL." 

364 ) 

365 return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( 

366 self.universe_domain, self.service_account_email 

367 ) 

368 

369 def sign_bytes(self, message): 

370 from google.auth.transport.requests import AuthorizedSession 

371 

372 iam_sign_endpoint = iam._IAM_SIGN_ENDPOINT.replace( 

373 credentials.DEFAULT_UNIVERSE_DOMAIN, self.universe_domain 

374 ).format(self._target_principal) 

375 

376 body = { 

377 "payload": base64.b64encode(message).decode("utf-8"), 

378 "delegates": self._delegates, 

379 } 

380 

381 headers = {"Content-Type": "application/json"} 

382 

383 authed_session = AuthorizedSession(self._source_credentials) 

384 

385 try: 

386 retries = _exponential_backoff.ExponentialBackoff() 

387 for _ in retries: 

388 response = authed_session.post( 

389 url=iam_sign_endpoint, headers=headers, json=body 

390 ) 

391 if response.status_code in iam.IAM_RETRY_CODES: 

392 continue 

393 if response.status_code != http_client.OK: 

394 raise exceptions.TransportError( 

395 "Error calling sign_bytes: {}".format(response.json()) 

396 ) 

397 

398 return base64.b64decode(response.json()["signedBlob"]) 

399 finally: 

400 authed_session.close() 

401 raise exceptions.TransportError("exhausted signBlob endpoint retries") 

402 

403 @property 

404 def signer_email(self): 

405 return self._target_principal 

406 

407 @property 

408 def service_account_email(self): 

409 return self._target_principal 

410 

411 @property 

412 def signer(self): 

413 return self 

414 

415 @property 

416 def requires_scopes(self): 

417 return not self._target_scopes 

418 

419 @_helpers.copy_docstring(credentials.Credentials) 

420 def get_cred_info(self): 

421 if self._cred_file_path: 

422 return { 

423 "credential_source": self._cred_file_path, 

424 "credential_type": "impersonated credentials", 

425 "principal": self._target_principal, 

426 } 

427 return None 

428 

429 def _make_copy(self): 

430 cred = self.__class__( 

431 self._source_credentials, 

432 target_principal=self._target_principal, 

433 target_scopes=self._target_scopes, 

434 delegates=self._delegates, 

435 lifetime=self._lifetime, 

436 quota_project_id=self._quota_project_id, 

437 iam_endpoint_override=self._iam_endpoint_override, 

438 trust_boundary=self._trust_boundary, 

439 ) 

440 cred._cred_file_path = self._cred_file_path 

441 return cred 

442 

443 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) 

444 def with_trust_boundary(self, trust_boundary): 

445 cred = self._make_copy() 

446 cred._trust_boundary = trust_boundary 

447 return cred 

448 

449 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

450 def with_quota_project(self, quota_project_id): 

451 cred = self._make_copy() 

452 cred._quota_project_id = quota_project_id 

453 return cred 

454 

455 @_helpers.copy_docstring(credentials.Scoped) 

456 def with_scopes(self, scopes, default_scopes=None): 

457 cred = self._make_copy() 

458 cred._target_scopes = scopes or default_scopes 

459 return cred 

460 

461 @classmethod 

462 def from_impersonated_service_account_info(cls, info, scopes=None): 

463 """Creates a Credentials instance from parsed impersonated service account credentials info. 

464 

465 **IMPORTANT**: 

466 This method does not validate the credential configuration. A security 

467 risk occurs when a credential configuration configured with malicious urls 

468 is used. 

469 When the credential configuration is accepted from an 

470 untrusted source, you should validate it before using with this method. 

471 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. 

472 

473 Args: 

474 info (Mapping[str, str]): The impersonated service account credentials info in Google 

475 format. 

476 scopes (Sequence[str]): Optional list of scopes to include in the 

477 credentials. 

478 

479 Returns: 

480 google.oauth2.credentials.Credentials: The constructed 

481 credentials. 

482 

483 Raises: 

484 InvalidType: If the info["source_credentials"] are not a supported impersonation type 

485 InvalidValue: If the info["service_account_impersonation_url"] is not in the expected format. 

486 ValueError: If the info is not in the expected format. 

487 """ 

488 

489 source_credentials_info = info.get("source_credentials") 

490 source_credentials_type = source_credentials_info.get("type") 

491 if source_credentials_type == _SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE: 

492 from google.oauth2 import credentials 

493 

494 source_credentials = credentials.Credentials.from_authorized_user_info( 

495 source_credentials_info 

496 ) 

497 elif source_credentials_type == _SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE: 

498 from google.oauth2 import service_account 

499 

500 source_credentials = service_account.Credentials.from_service_account_info( 

501 source_credentials_info 

502 ) 

503 elif ( 

504 source_credentials_type 

505 == _SOURCE_CREDENTIAL_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE 

506 ): 

507 from google.auth import external_account_authorized_user 

508 

509 source_credentials = external_account_authorized_user.Credentials.from_info( 

510 source_credentials_info 

511 ) 

512 else: 

513 raise exceptions.InvalidType( 

514 "source credential of type {} is not supported.".format( 

515 source_credentials_type 

516 ) 

517 ) 

518 

519 impersonation_url = info.get("service_account_impersonation_url") 

520 start_index = impersonation_url.rfind("/") 

521 end_index = impersonation_url.find(":generateAccessToken") 

522 if start_index == -1 or end_index == -1 or start_index > end_index: 

523 raise exceptions.InvalidValue( 

524 "Cannot extract target principal from {}".format(impersonation_url) 

525 ) 

526 target_principal = impersonation_url[start_index + 1 : end_index] 

527 delegates = info.get("delegates") 

528 quota_project_id = info.get("quota_project_id") 

529 

530 return cls( 

531 source_credentials, 

532 target_principal, 

533 scopes, 

534 delegates, 

535 quota_project_id=quota_project_id, 

536 ) 

537 

538 

539class IDTokenCredentials(credentials.CredentialsWithQuotaProject): 

540 """Open ID Connect ID Token-based service account credentials.""" 

541 

542 def __init__( 

543 self, 

544 target_credentials, 

545 target_audience=None, 

546 include_email=False, 

547 quota_project_id=None, 

548 ): 

549 """ 

550 Args: 

551 target_credentials (google.auth.Credentials): The target 

552 credential used as to acquire the id tokens for. 

553 target_audience (string): Audience to issue the token for. 

554 include_email (bool): Include email in IdToken 

555 quota_project_id (Optional[str]): The project ID used for 

556 quota and billing. 

557 """ 

558 super(IDTokenCredentials, self).__init__() 

559 

560 if not isinstance(target_credentials, Credentials): 

561 raise exceptions.GoogleAuthError( 

562 "Provided Credential must be " "impersonated_credentials" 

563 ) 

564 self._target_credentials = target_credentials 

565 self._target_audience = target_audience 

566 self._include_email = include_email 

567 self._quota_project_id = quota_project_id 

568 

569 def from_credentials(self, target_credentials, target_audience=None): 

570 return self.__class__( 

571 target_credentials=target_credentials, 

572 target_audience=target_audience, 

573 include_email=self._include_email, 

574 quota_project_id=self._quota_project_id, 

575 ) 

576 

577 def with_target_audience(self, target_audience): 

578 return self.__class__( 

579 target_credentials=self._target_credentials, 

580 target_audience=target_audience, 

581 include_email=self._include_email, 

582 quota_project_id=self._quota_project_id, 

583 ) 

584 

585 def with_include_email(self, include_email): 

586 return self.__class__( 

587 target_credentials=self._target_credentials, 

588 target_audience=self._target_audience, 

589 include_email=include_email, 

590 quota_project_id=self._quota_project_id, 

591 ) 

592 

593 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

594 def with_quota_project(self, quota_project_id): 

595 return self.__class__( 

596 target_credentials=self._target_credentials, 

597 target_audience=self._target_audience, 

598 include_email=self._include_email, 

599 quota_project_id=quota_project_id, 

600 ) 

601 

602 @_helpers.copy_docstring(credentials.Credentials) 

603 def refresh(self, request): 

604 from google.auth.transport.requests import AuthorizedSession 

605 

606 iam_sign_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace( 

607 credentials.DEFAULT_UNIVERSE_DOMAIN, 

608 self._target_credentials.universe_domain, 

609 ).format(self._target_credentials.signer_email) 

610 

611 body = { 

612 "audience": self._target_audience, 

613 "delegates": self._target_credentials._delegates, 

614 "includeEmail": self._include_email, 

615 } 

616 

617 headers = { 

618 "Content-Type": "application/json", 

619 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_impersonate(), 

620 } 

621 

622 authed_session = AuthorizedSession( 

623 self._target_credentials._source_credentials, auth_request=request 

624 ) 

625 

626 try: 

627 response = authed_session.post( 

628 url=iam_sign_endpoint, 

629 headers=headers, 

630 data=json.dumps(body).encode("utf-8"), 

631 ) 

632 finally: 

633 authed_session.close() 

634 

635 if response.status_code != http_client.OK: 

636 raise exceptions.RefreshError( 

637 "Error getting ID token: {}".format(response.json()) 

638 ) 

639 

640 id_token = response.json()["token"] 

641 self.token = id_token 

642 self.expiry = datetime.utcfromtimestamp( 

643 jwt.decode(id_token, verify=False)["exp"] 

644 ) 

645 

646 

647def _sign_jwt_request(request, principal, headers, payload, delegates=[]): 

648 """Makes a request to the Google Cloud IAM service to sign a JWT using a 

649 service account's system-managed private key. 

650 Args: 

651 request (Request): The Request object to use. 

652 principal (str): The principal to request an access token for. 

653 headers (Mapping[str, str]): Map of headers to transmit. 

654 payload (Mapping[str, str]): The JWT payload to sign. Must be a 

655 serialized JSON object that contains a JWT Claims Set. 

656 delegates (Sequence[str]): The chained list of delegates required 

657 to grant the final access_token. If set, the sequence of 

658 identities must have "Service Account Token Creator" capability 

659 granted to the prceeding identity. For example, if set to 

660 [serviceAccountB, serviceAccountC], the source_credential 

661 must have the Token Creator role on serviceAccountB. 

662 serviceAccountB must have the Token Creator on 

663 serviceAccountC. 

664 Finally, C must have Token Creator on target_principal. 

665 If left unset, source_credential must have that role on 

666 target_principal. 

667 

668 Raises: 

669 google.auth.exceptions.TransportError: Raised if there is an underlying 

670 HTTP connection error 

671 google.auth.exceptions.RefreshError: Raised if the impersonated 

672 credentials are not available. Common reasons are 

673 `iamcredentials.googleapis.com` is not enabled or the 

674 `Service Account Token Creator` is not assigned 

675 """ 

676 iam_endpoint = iam._IAM_SIGNJWT_ENDPOINT.format(principal) 

677 

678 body = {"delegates": delegates, "payload": json.dumps(payload)} 

679 body = json.dumps(body).encode("utf-8") 

680 

681 response = request(url=iam_endpoint, method="POST", headers=headers, body=body) 

682 

683 # support both string and bytes type response.data 

684 response_body = ( 

685 response.data.decode("utf-8") 

686 if hasattr(response.data, "decode") 

687 else response.data 

688 ) 

689 

690 if response.status != http_client.OK: 

691 raise exceptions.RefreshError(_REFRESH_ERROR, response_body) 

692 

693 try: 

694 jwt_response = json.loads(response_body) 

695 signed_jwt = jwt_response["signedJwt"] 

696 return signed_jwt 

697 

698 except (KeyError, ValueError) as caught_exc: 

699 new_exc = exceptions.RefreshError( 

700 "{}: No signed JWT in response.".format(_REFRESH_ERROR), response_body 

701 ) 

702 raise new_exc from caught_exc