Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/impersonated_credentials.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

194 statements  

1# Copyright 2018 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Google Cloud Impersonated credentials. 

16 

17This module provides authentication for applications where local credentials 

18impersonates a remote service account using `IAM Credentials API`_. 

19 

20This class can be used to impersonate a service account as long as the original 

21Credential object has the "Service Account Token Creator" role on the target 

22service account. 

23 

24 .. _IAM Credentials API: 

25 https://cloud.google.com/iam/credentials/reference/rest/ 

26""" 

27 

28import base64 

29import copy 

30from datetime import datetime 

31import http.client as http_client 

32import json 

33 

34from google.auth import _exponential_backoff 

35from google.auth import _helpers 

36from google.auth import credentials 

37from google.auth import exceptions 

38from google.auth import iam 

39from google.auth import jwt 

40from google.auth import metrics 

41from google.oauth2 import _client 

42 

43 

44_REFRESH_ERROR = "Unable to acquire impersonated credentials" 

45 

46_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

47 

48_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" 

49 

50_SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE = "authorized_user" 

51_SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE = "service_account" 

52_SOURCE_CREDENTIAL_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE = ( 

53 "external_account_authorized_user" 

54) 

55 

56 

57def _make_iam_token_request( 

58 request, 

59 principal, 

60 headers, 

61 body, 

62 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 

63 iam_endpoint_override=None, 

64): 

65 """Makes a request to the Google Cloud IAM service for an access token. 

66 Args: 

67 request (Request): The Request object to use. 

68 principal (str): The principal to request an access token for. 

69 headers (Mapping[str, str]): Map of headers to transmit. 

70 body (Mapping[str, str]): JSON Payload body for the iamcredentials 

71 API call. 

72 iam_endpoint_override (Optiona[str]): The full IAM endpoint override 

73 with the target_principal embedded. This is useful when supporting 

74 impersonation with regional endpoints. 

75 

76 Raises: 

77 google.auth.exceptions.TransportError: Raised if there is an underlying 

78 HTTP connection error 

79 google.auth.exceptions.RefreshError: Raised if the impersonated 

80 credentials are not available. Common reasons are 

81 `iamcredentials.googleapis.com` is not enabled or the 

82 `Service Account Token Creator` is not assigned 

83 """ 

84 iam_endpoint = iam_endpoint_override or iam._IAM_ENDPOINT.replace( 

85 credentials.DEFAULT_UNIVERSE_DOMAIN, universe_domain 

86 ).format(principal) 

87 

88 body = json.dumps(body).encode("utf-8") 

89 

90 response = request(url=iam_endpoint, method="POST", headers=headers, body=body) 

91 

92 # support both string and bytes type response.data 

93 response_body = ( 

94 response.data.decode("utf-8") 

95 if hasattr(response.data, "decode") 

96 else response.data 

97 ) 

98 

99 if response.status != http_client.OK: 

100 raise exceptions.RefreshError(_REFRESH_ERROR, response_body) 

101 

102 try: 

103 token_response = json.loads(response_body) 

104 token = token_response["accessToken"] 

105 expiry = datetime.strptime(token_response["expireTime"], "%Y-%m-%dT%H:%M:%SZ") 

106 

107 return token, expiry 

108 

109 except (KeyError, ValueError) as caught_exc: 

110 new_exc = exceptions.RefreshError( 

111 "{}: No access token or invalid expiration in response.".format( 

112 _REFRESH_ERROR 

113 ), 

114 response_body, 

115 ) 

116 raise new_exc from caught_exc 

117 

118 

119class Credentials( 

120 credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.Signing 

121): 

122 """This module defines impersonated credentials which are essentially 

123 impersonated identities. 

124 

125 Impersonated Credentials allows credentials issued to a user or 

126 service account to impersonate another. The target service account must 

127 grant the originating credential principal the 

128 `Service Account Token Creator`_ IAM role: 

129 

130 For more information about Token Creator IAM role and 

131 IAMCredentials API, see 

132 `Creating Short-Lived Service Account Credentials`_. 

133 

134 .. _Service Account Token Creator: 

135 https://cloud.google.com/iam/docs/service-accounts#the_service_account_token_creator_role 

136 

137 .. _Creating Short-Lived Service Account Credentials: 

138 https://cloud.google.com/iam/docs/creating-short-lived-service-account-credentials 

139 

140 Usage: 

141 

142 First grant source_credentials the `Service Account Token Creator` 

143 role on the target account to impersonate. In this example, the 

144 service account represented by svc_account.json has the 

145 token creator role on 

146 `impersonated-account@_project_.iam.gserviceaccount.com`. 

147 

148 Enable the IAMCredentials API on the source project: 

149 `gcloud services enable iamcredentials.googleapis.com`. 

150 

151 Initialize a source credential which does not have access to 

152 list bucket:: 

153 

154 from google.oauth2 import service_account 

155 

156 target_scopes = [ 

157 'https://www.googleapis.com/auth/devstorage.read_only'] 

158 

159 source_credentials = ( 

160 service_account.Credentials.from_service_account_file( 

161 '/path/to/svc_account.json', 

162 scopes=target_scopes)) 

163 

164 Now use the source credentials to acquire credentials to impersonate 

165 another service account:: 

166 

167 from google.auth import impersonated_credentials 

168 

169 target_credentials = impersonated_credentials.Credentials( 

170 source_credentials=source_credentials, 

171 target_principal='impersonated-account@_project_.iam.gserviceaccount.com', 

172 target_scopes = target_scopes, 

173 lifetime=500) 

174 

175 Resource access is granted:: 

176 

177 client = storage.Client(credentials=target_credentials) 

178 buckets = client.list_buckets(project='your_project') 

179 for bucket in buckets: 

180 print(bucket.name) 

181 """ 

182 

183 def __init__( 

184 self, 

185 source_credentials, 

186 target_principal, 

187 target_scopes, 

188 delegates=None, 

189 subject=None, 

190 lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

191 quota_project_id=None, 

192 iam_endpoint_override=None, 

193 ): 

194 """ 

195 Args: 

196 source_credentials (google.auth.Credentials): The source credential 

197 used as to acquire the impersonated credentials. 

198 target_principal (str): The service account to impersonate. 

199 target_scopes (Sequence[str]): Scopes to request during the 

200 authorization grant. 

201 delegates (Sequence[str]): The chained list of delegates required 

202 to grant the final access_token. If set, the sequence of 

203 identities must have "Service Account Token Creator" capability 

204 granted to the prceeding identity. For example, if set to 

205 [serviceAccountB, serviceAccountC], the source_credential 

206 must have the Token Creator role on serviceAccountB. 

207 serviceAccountB must have the Token Creator on 

208 serviceAccountC. 

209 Finally, C must have Token Creator on target_principal. 

210 If left unset, source_credential must have that role on 

211 target_principal. 

212 lifetime (int): Number of seconds the delegated credential should 

213 be valid for (upto 3600). 

214 quota_project_id (Optional[str]): The project ID used for quota and billing. 

215 This project may be different from the project used to 

216 create the credentials. 

217 iam_endpoint_override (Optional[str]): The full IAM endpoint override 

218 with the target_principal embedded. This is useful when supporting 

219 impersonation with regional endpoints. 

220 subject (Optional[str]): sub field of a JWT. This field should only be set 

221 if you wish to impersonate as a user. This feature is useful when 

222 using domain wide delegation. 

223 """ 

224 

225 super(Credentials, self).__init__() 

226 

227 self._source_credentials = copy.copy(source_credentials) 

228 # Service account source credentials must have the _IAM_SCOPE 

229 # added to refresh correctly. User credentials cannot have 

230 # their original scopes modified. 

231 if isinstance(self._source_credentials, credentials.Scoped): 

232 self._source_credentials = self._source_credentials.with_scopes( 

233 iam._IAM_SCOPE 

234 ) 

235 # If the source credential is service account and self signed jwt 

236 # is needed, we need to create a jwt credential inside it 

237 if ( 

238 hasattr(self._source_credentials, "_create_self_signed_jwt") 

239 and self._source_credentials._always_use_jwt_access 

240 ): 

241 self._source_credentials._create_self_signed_jwt(None) 

242 

243 self._universe_domain = source_credentials.universe_domain 

244 self._target_principal = target_principal 

245 self._target_scopes = target_scopes 

246 self._delegates = delegates 

247 self._subject = subject 

248 self._lifetime = lifetime or _DEFAULT_TOKEN_LIFETIME_SECS 

249 self.token = None 

250 self.expiry = _helpers.utcnow() 

251 self._quota_project_id = quota_project_id 

252 self._iam_endpoint_override = iam_endpoint_override 

253 self._cred_file_path = None 

254 

255 def _metric_header_for_usage(self): 

256 return metrics.CRED_TYPE_SA_IMPERSONATE 

257 

258 @_helpers.copy_docstring(credentials.Credentials) 

259 def refresh(self, request): 

260 self._update_token(request) 

261 

262 def _update_token(self, request): 

263 """Updates credentials with a new access_token representing 

264 the impersonated account. 

265 

266 Args: 

267 request (google.auth.transport.requests.Request): Request object 

268 to use for refreshing credentials. 

269 """ 

270 

271 # Refresh our source credentials if it is not valid. 

272 if ( 

273 self._source_credentials.token_state == credentials.TokenState.STALE 

274 or self._source_credentials.token_state == credentials.TokenState.INVALID 

275 ): 

276 self._source_credentials.refresh(request) 

277 

278 body = { 

279 "delegates": self._delegates, 

280 "scope": self._target_scopes, 

281 "lifetime": str(self._lifetime) + "s", 

282 } 

283 

284 headers = { 

285 "Content-Type": "application/json", 

286 metrics.API_CLIENT_HEADER: metrics.token_request_access_token_impersonate(), 

287 } 

288 

289 # Apply the source credentials authentication info. 

290 self._source_credentials.apply(headers) 

291 

292 # If a subject is specified a domain-wide delegation auth-flow is initiated 

293 # to impersonate as the provided subject (user). 

294 if self._subject: 

295 if self.universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 

296 raise exceptions.GoogleAuthError( 

297 "Domain-wide delegation is not supported in universes other " 

298 + "than googleapis.com" 

299 ) 

300 

301 now = _helpers.utcnow() 

302 payload = { 

303 "iss": self._target_principal, 

304 "scope": _helpers.scopes_to_string(self._target_scopes or ()), 

305 "sub": self._subject, 

306 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, 

307 "iat": _helpers.datetime_to_secs(now), 

308 "exp": _helpers.datetime_to_secs(now) + _DEFAULT_TOKEN_LIFETIME_SECS, 

309 } 

310 

311 assertion = _sign_jwt_request( 

312 request=request, 

313 principal=self._target_principal, 

314 headers=headers, 

315 payload=payload, 

316 delegates=self._delegates, 

317 ) 

318 

319 self.token, self.expiry, _ = _client.jwt_grant( 

320 request, _GOOGLE_OAUTH2_TOKEN_ENDPOINT, assertion 

321 ) 

322 

323 return 

324 

325 self.token, self.expiry = _make_iam_token_request( 

326 request=request, 

327 principal=self._target_principal, 

328 headers=headers, 

329 body=body, 

330 universe_domain=self.universe_domain, 

331 iam_endpoint_override=self._iam_endpoint_override, 

332 ) 

333 

334 def sign_bytes(self, message): 

335 from google.auth.transport.requests import AuthorizedSession 

336 

337 iam_sign_endpoint = iam._IAM_SIGN_ENDPOINT.replace( 

338 credentials.DEFAULT_UNIVERSE_DOMAIN, self.universe_domain 

339 ).format(self._target_principal) 

340 

341 body = { 

342 "payload": base64.b64encode(message).decode("utf-8"), 

343 "delegates": self._delegates, 

344 } 

345 

346 headers = {"Content-Type": "application/json"} 

347 

348 authed_session = AuthorizedSession(self._source_credentials) 

349 

350 try: 

351 retries = _exponential_backoff.ExponentialBackoff() 

352 for _ in retries: 

353 response = authed_session.post( 

354 url=iam_sign_endpoint, headers=headers, json=body 

355 ) 

356 if response.status_code in iam.IAM_RETRY_CODES: 

357 continue 

358 if response.status_code != http_client.OK: 

359 raise exceptions.TransportError( 

360 "Error calling sign_bytes: {}".format(response.json()) 

361 ) 

362 

363 return base64.b64decode(response.json()["signedBlob"]) 

364 finally: 

365 authed_session.close() 

366 raise exceptions.TransportError("exhausted signBlob endpoint retries") 

367 

368 @property 

369 def signer_email(self): 

370 return self._target_principal 

371 

372 @property 

373 def service_account_email(self): 

374 return self._target_principal 

375 

376 @property 

377 def signer(self): 

378 return self 

379 

380 @property 

381 def requires_scopes(self): 

382 return not self._target_scopes 

383 

384 @_helpers.copy_docstring(credentials.Credentials) 

385 def get_cred_info(self): 

386 if self._cred_file_path: 

387 return { 

388 "credential_source": self._cred_file_path, 

389 "credential_type": "impersonated credentials", 

390 "principal": self._target_principal, 

391 } 

392 return None 

393 

394 def _make_copy(self): 

395 cred = self.__class__( 

396 self._source_credentials, 

397 target_principal=self._target_principal, 

398 target_scopes=self._target_scopes, 

399 delegates=self._delegates, 

400 lifetime=self._lifetime, 

401 quota_project_id=self._quota_project_id, 

402 iam_endpoint_override=self._iam_endpoint_override, 

403 ) 

404 cred._cred_file_path = self._cred_file_path 

405 return cred 

406 

407 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

408 def with_quota_project(self, quota_project_id): 

409 cred = self._make_copy() 

410 cred._quota_project_id = quota_project_id 

411 return cred 

412 

413 @_helpers.copy_docstring(credentials.Scoped) 

414 def with_scopes(self, scopes, default_scopes=None): 

415 cred = self._make_copy() 

416 cred._target_scopes = scopes or default_scopes 

417 return cred 

418 

419 @classmethod 

420 def from_impersonated_service_account_info(cls, info, scopes=None): 

421 """Creates a Credentials instance from parsed impersonated service account credentials info. 

422 

423 Args: 

424 info (Mapping[str, str]): The impersonated service account credentials info in Google 

425 format. 

426 scopes (Sequence[str]): Optional list of scopes to include in the 

427 credentials. 

428 

429 Returns: 

430 google.oauth2.credentials.Credentials: The constructed 

431 credentials. 

432 

433 Raises: 

434 InvalidType: If the info["source_credentials"] are not a supported impersonation type 

435 InvalidValue: If the info["service_account_impersonation_url"] is not in the expected format. 

436 ValueError: If the info is not in the expected format. 

437 """ 

438 

439 source_credentials_info = info.get("source_credentials") 

440 source_credentials_type = source_credentials_info.get("type") 

441 if source_credentials_type == _SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE: 

442 from google.oauth2 import credentials 

443 

444 source_credentials = credentials.Credentials.from_authorized_user_info( 

445 source_credentials_info 

446 ) 

447 elif source_credentials_type == _SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE: 

448 from google.oauth2 import service_account 

449 

450 source_credentials = service_account.Credentials.from_service_account_info( 

451 source_credentials_info 

452 ) 

453 elif ( 

454 source_credentials_type 

455 == _SOURCE_CREDENTIAL_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE 

456 ): 

457 from google.auth import external_account_authorized_user 

458 

459 source_credentials = external_account_authorized_user.Credentials.from_info( 

460 source_credentials_info 

461 ) 

462 else: 

463 raise exceptions.InvalidType( 

464 "source credential of type {} is not supported.".format( 

465 source_credentials_type 

466 ) 

467 ) 

468 

469 impersonation_url = info.get("service_account_impersonation_url") 

470 start_index = impersonation_url.rfind("/") 

471 end_index = impersonation_url.find(":generateAccessToken") 

472 if start_index == -1 or end_index == -1 or start_index > end_index: 

473 raise exceptions.InvalidValue( 

474 "Cannot extract target principal from {}".format(impersonation_url) 

475 ) 

476 target_principal = impersonation_url[start_index + 1 : end_index] 

477 delegates = info.get("delegates") 

478 quota_project_id = info.get("quota_project_id") 

479 

480 return cls( 

481 source_credentials, 

482 target_principal, 

483 scopes, 

484 delegates, 

485 quota_project_id=quota_project_id, 

486 ) 

487 

488 

489class IDTokenCredentials(credentials.CredentialsWithQuotaProject): 

490 """Open ID Connect ID Token-based service account credentials. 

491 

492 """ 

493 

494 def __init__( 

495 self, 

496 target_credentials, 

497 target_audience=None, 

498 include_email=False, 

499 quota_project_id=None, 

500 ): 

501 """ 

502 Args: 

503 target_credentials (google.auth.Credentials): The target 

504 credential used as to acquire the id tokens for. 

505 target_audience (string): Audience to issue the token for. 

506 include_email (bool): Include email in IdToken 

507 quota_project_id (Optional[str]): The project ID used for 

508 quota and billing. 

509 """ 

510 super(IDTokenCredentials, self).__init__() 

511 

512 if not isinstance(target_credentials, Credentials): 

513 raise exceptions.GoogleAuthError( 

514 "Provided Credential must be " "impersonated_credentials" 

515 ) 

516 self._target_credentials = target_credentials 

517 self._target_audience = target_audience 

518 self._include_email = include_email 

519 self._quota_project_id = quota_project_id 

520 

521 def from_credentials(self, target_credentials, target_audience=None): 

522 return self.__class__( 

523 target_credentials=target_credentials, 

524 target_audience=target_audience, 

525 include_email=self._include_email, 

526 quota_project_id=self._quota_project_id, 

527 ) 

528 

529 def with_target_audience(self, target_audience): 

530 return self.__class__( 

531 target_credentials=self._target_credentials, 

532 target_audience=target_audience, 

533 include_email=self._include_email, 

534 quota_project_id=self._quota_project_id, 

535 ) 

536 

537 def with_include_email(self, include_email): 

538 return self.__class__( 

539 target_credentials=self._target_credentials, 

540 target_audience=self._target_audience, 

541 include_email=include_email, 

542 quota_project_id=self._quota_project_id, 

543 ) 

544 

545 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

546 def with_quota_project(self, quota_project_id): 

547 return self.__class__( 

548 target_credentials=self._target_credentials, 

549 target_audience=self._target_audience, 

550 include_email=self._include_email, 

551 quota_project_id=quota_project_id, 

552 ) 

553 

554 @_helpers.copy_docstring(credentials.Credentials) 

555 def refresh(self, request): 

556 from google.auth.transport.requests import AuthorizedSession 

557 

558 iam_sign_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace( 

559 credentials.DEFAULT_UNIVERSE_DOMAIN, 

560 self._target_credentials.universe_domain, 

561 ).format(self._target_credentials.signer_email) 

562 

563 body = { 

564 "audience": self._target_audience, 

565 "delegates": self._target_credentials._delegates, 

566 "includeEmail": self._include_email, 

567 } 

568 

569 headers = { 

570 "Content-Type": "application/json", 

571 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_impersonate(), 

572 } 

573 

574 authed_session = AuthorizedSession( 

575 self._target_credentials._source_credentials, auth_request=request 

576 ) 

577 

578 try: 

579 response = authed_session.post( 

580 url=iam_sign_endpoint, 

581 headers=headers, 

582 data=json.dumps(body).encode("utf-8"), 

583 ) 

584 finally: 

585 authed_session.close() 

586 

587 if response.status_code != http_client.OK: 

588 raise exceptions.RefreshError( 

589 "Error getting ID token: {}".format(response.json()) 

590 ) 

591 

592 id_token = response.json()["token"] 

593 self.token = id_token 

594 self.expiry = datetime.utcfromtimestamp( 

595 jwt.decode(id_token, verify=False)["exp"] 

596 ) 

597 

598 

599def _sign_jwt_request(request, principal, headers, payload, delegates=[]): 

600 """Makes a request to the Google Cloud IAM service to sign a JWT using a 

601 service account's system-managed private key. 

602 Args: 

603 request (Request): The Request object to use. 

604 principal (str): The principal to request an access token for. 

605 headers (Mapping[str, str]): Map of headers to transmit. 

606 payload (Mapping[str, str]): The JWT payload to sign. Must be a 

607 serialized JSON object that contains a JWT Claims Set. 

608 delegates (Sequence[str]): The chained list of delegates required 

609 to grant the final access_token. If set, the sequence of 

610 identities must have "Service Account Token Creator" capability 

611 granted to the prceeding identity. For example, if set to 

612 [serviceAccountB, serviceAccountC], the source_credential 

613 must have the Token Creator role on serviceAccountB. 

614 serviceAccountB must have the Token Creator on 

615 serviceAccountC. 

616 Finally, C must have Token Creator on target_principal. 

617 If left unset, source_credential must have that role on 

618 target_principal. 

619 

620 Raises: 

621 google.auth.exceptions.TransportError: Raised if there is an underlying 

622 HTTP connection error 

623 google.auth.exceptions.RefreshError: Raised if the impersonated 

624 credentials are not available. Common reasons are 

625 `iamcredentials.googleapis.com` is not enabled or the 

626 `Service Account Token Creator` is not assigned 

627 """ 

628 iam_endpoint = iam._IAM_SIGNJWT_ENDPOINT.format(principal) 

629 

630 body = {"delegates": delegates, "payload": json.dumps(payload)} 

631 body = json.dumps(body).encode("utf-8") 

632 

633 response = request(url=iam_endpoint, method="POST", headers=headers, body=body) 

634 

635 # support both string and bytes type response.data 

636 response_body = ( 

637 response.data.decode("utf-8") 

638 if hasattr(response.data, "decode") 

639 else response.data 

640 ) 

641 

642 if response.status != http_client.OK: 

643 raise exceptions.RefreshError(_REFRESH_ERROR, response_body) 

644 

645 try: 

646 jwt_response = json.loads(response_body) 

647 signed_jwt = jwt_response["signedJwt"] 

648 return signed_jwt 

649 

650 except (KeyError, ValueError) as caught_exc: 

651 new_exc = exceptions.RefreshError( 

652 "{}: No signed JWT in response.".format(_REFRESH_ERROR), response_body 

653 ) 

654 raise new_exc from caught_exc