Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/oauth2/service_account.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

260 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0 

16 

17This module implements the JWT Profile for OAuth 2.0 Authorization Grants 

18as defined by `RFC 7523`_ with particular support for how this RFC is 

19implemented in Google's infrastructure. Google refers to these credentials 

20as *Service Accounts*. 

21 

22Service accounts are used for server-to-server communication, such as 

23interactions between a web application server and a Google service. The 

24service account belongs to your application instead of to an individual end 

25user. In contrast to other OAuth 2.0 profiles, no users are involved and your 

26application "acts" as the service account. 

27 

28Typically an application uses a service account when the application uses 

29Google APIs to work with its own data rather than a user's data. For example, 

30an application that uses Google Cloud Datastore for data persistence would use 

31a service account to authenticate its calls to the Google Cloud Datastore API. 

32However, an application that needs to access a user's Drive documents would 

33use the normal OAuth 2.0 profile. 

34 

35Additionally, Google Apps domain administrators can grant service accounts 

36`domain-wide delegation`_ authority to access user data on behalf of users in 

37the domain. 

38 

39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used 

40in place of the usual authorization token returned during the standard 

41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as 

42the acquired access token is used as the bearer token when making requests 

43using these credentials. 

44 

45This profile differs from normal OAuth 2.0 profile because no user consent 

46step is required. The use of the private key allows this profile to assert 

47identity directly. 

48 

49This profile also differs from the :mod:`google.auth.jwt` authentication 

50because the JWT credentials use the JWT directly as the bearer token. This 

51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The 

52obtained OAuth 2.0 access token is used as the bearer token. 

53 

54Domain-wide delegation 

55---------------------- 

56 

57Domain-wide delegation allows a service account to access user data on 

58behalf of any user in a Google Apps domain without consent from the user. 

59For example, an application that uses the Google Calendar API to add events to 

60the calendars of all users in a Google Apps domain would use a service account 

61to access the Google Calendar API on behalf of users. 

62 

63The Google Apps administrator must explicitly authorize the service account to 

64do this. This authorization step is referred to as "delegating domain-wide 

65authority" to a service account. 

66 

67You can use domain-wise delegation by creating a set of credentials with a 

68specific subject using :meth:`~Credentials.with_subject`. 

69 

70.. _RFC 7523: https://tools.ietf.org/html/rfc7523 

71""" 

72 

73import copy 

74import datetime 

75 

76from google.auth import _constants 

77from google.auth import _helpers 

78from google.auth import _service_account_info 

79from google.auth import credentials 

80from google.auth import exceptions 

81from google.auth import iam 

82from google.auth import jwt 

83from google.auth import metrics 

84from google.oauth2 import _client 

85 

86_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

87_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" 

88 

89 

90class Credentials( 

91 credentials.Signing, 

92 credentials.Scoped, 

93 credentials.CredentialsWithQuotaProject, 

94 credentials.CredentialsWithTokenUri, 

95 credentials.CredentialsWithTrustBoundary, 

96): 

97 """Service account credentials 

98 

99 Usually, you'll create these credentials with one of the helper 

100 constructors. To create credentials using a Google service account 

101 private key JSON file:: 

102 

103 credentials = service_account.Credentials.from_service_account_file( 

104 'service-account.json') 

105 

106 Or if you already have the service account file loaded:: 

107 

108 service_account_info = json.load(open('service_account.json')) 

109 credentials = service_account.Credentials.from_service_account_info( 

110 service_account_info) 

111 

112 Both helper methods pass on arguments to the constructor, so you can 

113 specify additional scopes and a subject if necessary:: 

114 

115 credentials = service_account.Credentials.from_service_account_file( 

116 'service-account.json', 

117 scopes=['email'], 

118 subject='user@example.com') 

119 

120 The credentials are considered immutable. If you want to modify the scopes 

121 or the subject used for delegation, use :meth:`with_scopes` or 

122 :meth:`with_subject`:: 

123 

124 scoped_credentials = credentials.with_scopes(['email']) 

125 delegated_credentials = credentials.with_subject(subject) 

126 

127 To add a quota project, use :meth:`with_quota_project`:: 

128 

129 credentials = credentials.with_quota_project('myproject-123') 

130 """ 

131 

132 def __init__( 

133 self, 

134 signer, 

135 service_account_email, 

136 token_uri, 

137 scopes=None, 

138 default_scopes=None, 

139 subject=None, 

140 project_id=None, 

141 quota_project_id=None, 

142 additional_claims=None, 

143 always_use_jwt_access=False, 

144 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 

145 trust_boundary=None, 

146 ): 

147 """ 

148 Args: 

149 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

150 service_account_email (str): The service account's email. 

151 scopes (Sequence[str]): User-defined scopes to request during the 

152 authorization grant. 

153 default_scopes (Sequence[str]): Default scopes passed by a 

154 Google client library. Use 'scopes' for user-defined scopes. 

155 token_uri (str): The OAuth 2.0 Token URI. 

156 subject (str): For domain-wide delegation, the email address of the 

157 user to for which to request delegated access. 

158 project_id (str): Project ID associated with the service account 

159 credential. 

160 quota_project_id (Optional[str]): The project ID used for quota and 

161 billing. 

162 additional_claims (Mapping[str, str]): Any additional claims for 

163 the JWT assertion used in the authorization grant. 

164 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

165 be always used. 

166 universe_domain (str): The universe domain. The default 

167 universe domain is googleapis.com. For default value self 

168 signed jwt is used for token refresh. 

169 trust_boundary (Mapping[str,str]): A credential trust boundary. 

170 

171 .. note:: Typically one of the helper constructors 

172 :meth:`from_service_account_file` or 

173 :meth:`from_service_account_info` are used instead of calling the 

174 constructor directly. 

175 """ 

176 super(Credentials, self).__init__() 

177 

178 self._cred_file_path = None 

179 self._scopes = scopes 

180 self._default_scopes = default_scopes 

181 self._signer = signer 

182 self._service_account_email = service_account_email 

183 self._subject = subject 

184 self._project_id = project_id 

185 self._quota_project_id = quota_project_id 

186 self._token_uri = token_uri 

187 self._always_use_jwt_access = always_use_jwt_access 

188 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN 

189 

190 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 

191 self._always_use_jwt_access = True 

192 

193 self._jwt_credentials = None 

194 

195 if additional_claims is not None: 

196 self._additional_claims = additional_claims 

197 else: 

198 self._additional_claims = {} 

199 self._trust_boundary = trust_boundary 

200 

201 @classmethod 

202 def _from_signer_and_info(cls, signer, info, **kwargs): 

203 """Creates a Credentials instance from a signer and service account 

204 info. 

205 

206 Args: 

207 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

208 info (Mapping[str, str]): The service account info. 

209 kwargs: Additional arguments to pass to the constructor. 

210 

211 Returns: 

212 google.auth.jwt.Credentials: The constructed credentials. 

213 

214 Raises: 

215 ValueError: If the info is not in the expected format. 

216 """ 

217 return cls( 

218 signer, 

219 service_account_email=info["client_email"], 

220 token_uri=info["token_uri"], 

221 project_id=info.get("project_id"), 

222 universe_domain=info.get( 

223 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN 

224 ), 

225 trust_boundary=info.get("trust_boundary"), 

226 **kwargs, 

227 ) 

228 

229 @classmethod 

230 def from_service_account_info(cls, info, **kwargs): 

231 """Creates a Credentials instance from parsed service account info. 

232 

233 Args: 

234 info (Mapping[str, str]): The service account info in Google 

235 format. 

236 kwargs: Additional arguments to pass to the constructor. 

237 

238 Returns: 

239 google.auth.service_account.Credentials: The constructed 

240 credentials. 

241 

242 Raises: 

243 ValueError: If the info is not in the expected format. 

244 """ 

245 signer = _service_account_info.from_dict( 

246 info, require=["client_email", "token_uri"] 

247 ) 

248 return cls._from_signer_and_info(signer, info, **kwargs) 

249 

250 @classmethod 

251 def from_service_account_file(cls, filename, **kwargs): 

252 """Creates a Credentials instance from a service account json file. 

253 

254 Args: 

255 filename (str): The path to the service account json file. 

256 kwargs: Additional arguments to pass to the constructor. 

257 

258 Returns: 

259 google.auth.service_account.Credentials: The constructed 

260 credentials. 

261 """ 

262 info, signer = _service_account_info.from_filename( 

263 filename, require=["client_email", "token_uri"] 

264 ) 

265 return cls._from_signer_and_info(signer, info, **kwargs) 

266 

267 @property 

268 def service_account_email(self): 

269 """The service account email.""" 

270 return self._service_account_email 

271 

272 @property 

273 def project_id(self): 

274 """Project ID associated with this credential.""" 

275 return self._project_id 

276 

277 @property 

278 def requires_scopes(self): 

279 """Checks if the credentials requires scopes. 

280 

281 Returns: 

282 bool: True if there are no scopes set otherwise False. 

283 """ 

284 return True if not self._scopes else False 

285 

286 def _make_copy(self): 

287 cred = self.__class__( 

288 self._signer, 

289 service_account_email=self._service_account_email, 

290 scopes=copy.copy(self._scopes), 

291 default_scopes=copy.copy(self._default_scopes), 

292 token_uri=self._token_uri, 

293 subject=self._subject, 

294 project_id=self._project_id, 

295 quota_project_id=self._quota_project_id, 

296 additional_claims=self._additional_claims.copy(), 

297 always_use_jwt_access=self._always_use_jwt_access, 

298 universe_domain=self._universe_domain, 

299 trust_boundary=self._trust_boundary, 

300 ) 

301 cred._cred_file_path = self._cred_file_path 

302 return cred 

303 

304 @_helpers.copy_docstring(credentials.Scoped) 

305 def with_scopes(self, scopes, default_scopes=None): 

306 cred = self._make_copy() 

307 cred._scopes = scopes 

308 cred._default_scopes = default_scopes 

309 return cred 

310 

311 def with_always_use_jwt_access(self, always_use_jwt_access): 

312 """Create a copy of these credentials with the specified always_use_jwt_access value. 

313 

314 Args: 

315 always_use_jwt_access (bool): Whether always use self signed JWT or not. 

316 

317 Returns: 

318 google.auth.service_account.Credentials: A new credentials 

319 instance. 

320 Raises: 

321 google.auth.exceptions.InvalidValue: If the universe domain is not 

322 default and always_use_jwt_access is False. 

323 """ 

324 cred = self._make_copy() 

325 if ( 

326 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN 

327 and not always_use_jwt_access 

328 ): 

329 raise exceptions.InvalidValue( 

330 "always_use_jwt_access should be True for non-default universe domain" 

331 ) 

332 cred._always_use_jwt_access = always_use_jwt_access 

333 return cred 

334 

335 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) 

336 def with_universe_domain(self, universe_domain): 

337 cred = self._make_copy() 

338 cred._universe_domain = universe_domain 

339 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 

340 cred._always_use_jwt_access = True 

341 return cred 

342 

343 def with_subject(self, subject): 

344 """Create a copy of these credentials with the specified subject. 

345 

346 Args: 

347 subject (str): The subject claim. 

348 

349 Returns: 

350 google.auth.service_account.Credentials: A new credentials 

351 instance. 

352 """ 

353 cred = self._make_copy() 

354 cred._subject = subject 

355 return cred 

356 

357 def with_claims(self, additional_claims): 

358 """Returns a copy of these credentials with modified claims. 

359 

360 Args: 

361 additional_claims (Mapping[str, str]): Any additional claims for 

362 the JWT payload. This will be merged with the current 

363 additional claims. 

364 

365 Returns: 

366 google.auth.service_account.Credentials: A new credentials 

367 instance. 

368 """ 

369 new_additional_claims = copy.deepcopy(self._additional_claims) 

370 new_additional_claims.update(additional_claims or {}) 

371 cred = self._make_copy() 

372 cred._additional_claims = new_additional_claims 

373 return cred 

374 

375 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

376 def with_quota_project(self, quota_project_id): 

377 cred = self._make_copy() 

378 cred._quota_project_id = quota_project_id 

379 return cred 

380 

381 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

382 def with_token_uri(self, token_uri): 

383 cred = self._make_copy() 

384 cred._token_uri = token_uri 

385 return cred 

386 

387 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) 

388 def with_trust_boundary(self, trust_boundary): 

389 cred = self._make_copy() 

390 cred._trust_boundary = trust_boundary 

391 return cred 

392 

393 def _make_authorization_grant_assertion(self): 

394 """Create the OAuth 2.0 assertion. 

395 

396 This assertion is used during the OAuth 2.0 grant to acquire an 

397 access token. 

398 

399 Returns: 

400 bytes: The authorization grant assertion. 

401 """ 

402 now = _helpers.utcnow() 

403 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 

404 expiry = now + lifetime 

405 

406 payload = { 

407 "iat": _helpers.datetime_to_secs(now), 

408 "exp": _helpers.datetime_to_secs(expiry), 

409 # The issuer must be the service account email. 

410 "iss": self._service_account_email, 

411 # The audience must be the auth token endpoint's URI 

412 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, 

413 "scope": _helpers.scopes_to_string(self._scopes or ()), 

414 } 

415 

416 payload.update(self._additional_claims) 

417 

418 # The subject can be a user email for domain-wide delegation. 

419 if self._subject: 

420 payload.setdefault("sub", self._subject) 

421 

422 token = jwt.encode(self._signer, payload) 

423 

424 return token 

425 

426 def _use_self_signed_jwt(self): 

427 # Since domain wide delegation doesn't work with self signed JWT. If 

428 # subject exists, then we should not use self signed JWT. 

429 return self._subject is None and self._jwt_credentials is not None 

430 

431 def _metric_header_for_usage(self): 

432 if self._use_self_signed_jwt(): 

433 return metrics.CRED_TYPE_SA_JWT 

434 return metrics.CRED_TYPE_SA_ASSERTION 

435 

436 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) 

437 def _refresh_token(self, request): 

438 if self._always_use_jwt_access and not self._jwt_credentials: 

439 # If self signed jwt should be used but jwt credential is not 

440 # created, try to create one with scopes 

441 self._create_self_signed_jwt(None) 

442 

443 if ( 

444 self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN 

445 and self._subject 

446 ): 

447 raise exceptions.RefreshError( 

448 "domain wide delegation is not supported for non-default universe domain" 

449 ) 

450 

451 if self._use_self_signed_jwt(): 

452 self._jwt_credentials.refresh(request) 

453 self.token = self._jwt_credentials.token.decode() 

454 self.expiry = self._jwt_credentials.expiry 

455 else: 

456 assertion = self._make_authorization_grant_assertion() 

457 access_token, expiry, _ = _client.jwt_grant( 

458 request, self._token_uri, assertion 

459 ) 

460 self.token = access_token 

461 self.expiry = expiry 

462 

463 def _create_self_signed_jwt(self, audience): 

464 """Create a self-signed JWT from the credentials if requirements are met. 

465 

466 Args: 

467 audience (str): The service URL. ``https://[API_ENDPOINT]/`` 

468 """ 

469 # https://google.aip.dev/auth/4111 

470 if self._always_use_jwt_access: 

471 if self._scopes: 

472 additional_claims = {"scope": " ".join(self._scopes)} 

473 if ( 

474 self._jwt_credentials is None 

475 or self._jwt_credentials.additional_claims != additional_claims 

476 ): 

477 self._jwt_credentials = jwt.Credentials.from_signing_credentials( 

478 self, None, additional_claims=additional_claims 

479 ) 

480 elif audience: 

481 if ( 

482 self._jwt_credentials is None 

483 or self._jwt_credentials._audience != audience 

484 ): 

485 

486 self._jwt_credentials = jwt.Credentials.from_signing_credentials( 

487 self, audience 

488 ) 

489 elif self._default_scopes: 

490 additional_claims = {"scope": " ".join(self._default_scopes)} 

491 if ( 

492 self._jwt_credentials is None 

493 or additional_claims != self._jwt_credentials.additional_claims 

494 ): 

495 self._jwt_credentials = jwt.Credentials.from_signing_credentials( 

496 self, None, additional_claims=additional_claims 

497 ) 

498 elif not self._scopes and audience: 

499 self._jwt_credentials = jwt.Credentials.from_signing_credentials( 

500 self, audience 

501 ) 

502 

503 def _build_trust_boundary_lookup_url(self): 

504 """Builds and returns the URL for the trust boundary lookup API. 

505 

506 This method constructs the specific URL for the IAM Credentials API's 

507 `allowedLocations` endpoint, using the credential's universe domain 

508 and service account email. 

509 

510 Raises: 

511 ValueError: If `self.service_account_email` is None or an empty 

512 string, as it's required to form the URL. 

513 

514 Returns: 

515 str: The URL for the trust boundary lookup endpoint. 

516 """ 

517 if not self.service_account_email: 

518 raise ValueError( 

519 "Service account email is required to build the trust boundary lookup URL." 

520 ) 

521 return _constants._SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( 

522 universe_domain=self._universe_domain, 

523 service_account_email=self._service_account_email, 

524 ) 

525 

526 @_helpers.copy_docstring(credentials.Signing) 

527 def sign_bytes(self, message): 

528 return self._signer.sign(message) 

529 

530 @property # type: ignore 

531 @_helpers.copy_docstring(credentials.Signing) 

532 def signer(self): 

533 return self._signer 

534 

535 @property # type: ignore 

536 @_helpers.copy_docstring(credentials.Signing) 

537 def signer_email(self): 

538 return self._service_account_email 

539 

540 @_helpers.copy_docstring(credentials.Credentials) 

541 def get_cred_info(self): 

542 if self._cred_file_path: 

543 return { 

544 "credential_source": self._cred_file_path, 

545 "credential_type": "service account credentials", 

546 "principal": self.service_account_email, 

547 } 

548 return None 

549 

550 

551class IDTokenCredentials( 

552 credentials.Signing, 

553 credentials.CredentialsWithQuotaProject, 

554 credentials.CredentialsWithTokenUri, 

555): 

556 """Open ID Connect ID Token-based service account credentials. 

557 

558 These credentials are largely similar to :class:`.Credentials`, but instead 

559 of using an OAuth 2.0 Access Token as the bearer token, they use an Open 

560 ID Connect ID Token as the bearer token. These credentials are useful when 

561 communicating to services that require ID Tokens and can not accept access 

562 tokens. 

563 

564 Usually, you'll create these credentials with one of the helper 

565 constructors. To create credentials using a Google service account 

566 private key JSON file:: 

567 

568 credentials = ( 

569 service_account.IDTokenCredentials.from_service_account_file( 

570 'service-account.json')) 

571 

572 

573 Or if you already have the service account file loaded:: 

574 

575 service_account_info = json.load(open('service_account.json')) 

576 credentials = ( 

577 service_account.IDTokenCredentials.from_service_account_info( 

578 service_account_info)) 

579 

580 

581 Both helper methods pass on arguments to the constructor, so you can 

582 specify additional scopes and a subject if necessary:: 

583 

584 credentials = ( 

585 service_account.IDTokenCredentials.from_service_account_file( 

586 'service-account.json', 

587 scopes=['email'], 

588 subject='user@example.com')) 

589 

590 

591 The credentials are considered immutable. If you want to modify the scopes 

592 or the subject used for delegation, use :meth:`with_scopes` or 

593 :meth:`with_subject`:: 

594 

595 scoped_credentials = credentials.with_scopes(['email']) 

596 delegated_credentials = credentials.with_subject(subject) 

597 

598 """ 

599 

600 def __init__( 

601 self, 

602 signer, 

603 service_account_email, 

604 token_uri, 

605 target_audience, 

606 additional_claims=None, 

607 quota_project_id=None, 

608 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 

609 ): 

610 """ 

611 Args: 

612 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

613 service_account_email (str): The service account's email. 

614 token_uri (str): The OAuth 2.0 Token URI. 

615 target_audience (str): The intended audience for these credentials, 

616 used when requesting the ID Token. The ID Token's ``aud`` claim 

617 will be set to this string. 

618 additional_claims (Mapping[str, str]): Any additional claims for 

619 the JWT assertion used in the authorization grant. 

620 quota_project_id (Optional[str]): The project ID used for quota and billing. 

621 universe_domain (str): The universe domain. The default 

622 universe domain is googleapis.com. For default value IAM ID 

623 token endponint is used for token refresh. Note that 

624 iam.serviceAccountTokenCreator role is required to use the IAM 

625 endpoint. 

626 

627 .. note:: Typically one of the helper constructors 

628 :meth:`from_service_account_file` or 

629 :meth:`from_service_account_info` are used instead of calling the 

630 constructor directly. 

631 """ 

632 super(IDTokenCredentials, self).__init__() 

633 self._signer = signer 

634 self._service_account_email = service_account_email 

635 self._token_uri = token_uri 

636 self._target_audience = target_audience 

637 self._quota_project_id = quota_project_id 

638 self._use_iam_endpoint = False 

639 

640 if not universe_domain: 

641 self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN 

642 else: 

643 self._universe_domain = universe_domain 

644 self._iam_id_token_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace( 

645 "googleapis.com", self._universe_domain 

646 ) 

647 

648 if self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 

649 self._use_iam_endpoint = True 

650 

651 if additional_claims is not None: 

652 self._additional_claims = additional_claims 

653 else: 

654 self._additional_claims = {} 

655 

656 @classmethod 

657 def _from_signer_and_info(cls, signer, info, **kwargs): 

658 """Creates a credentials instance from a signer and service account 

659 info. 

660 

661 Args: 

662 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

663 info (Mapping[str, str]): The service account info. 

664 kwargs: Additional arguments to pass to the constructor. 

665 

666 Returns: 

667 google.auth.jwt.IDTokenCredentials: The constructed credentials. 

668 

669 Raises: 

670 ValueError: If the info is not in the expected format. 

671 """ 

672 kwargs.setdefault("service_account_email", info["client_email"]) 

673 kwargs.setdefault("token_uri", info["token_uri"]) 

674 if "universe_domain" in info: 

675 kwargs["universe_domain"] = info["universe_domain"] 

676 return cls(signer, **kwargs) 

677 

678 @classmethod 

679 def from_service_account_info(cls, info, **kwargs): 

680 """Creates a credentials instance from parsed service account info. 

681 

682 Args: 

683 info (Mapping[str, str]): The service account info in Google 

684 format. 

685 kwargs: Additional arguments to pass to the constructor. 

686 

687 Returns: 

688 google.auth.service_account.IDTokenCredentials: The constructed 

689 credentials. 

690 

691 Raises: 

692 ValueError: If the info is not in the expected format. 

693 """ 

694 signer = _service_account_info.from_dict( 

695 info, require=["client_email", "token_uri"] 

696 ) 

697 return cls._from_signer_and_info(signer, info, **kwargs) 

698 

699 @classmethod 

700 def from_service_account_file(cls, filename, **kwargs): 

701 """Creates a credentials instance from a service account json file. 

702 

703 Args: 

704 filename (str): The path to the service account json file. 

705 kwargs: Additional arguments to pass to the constructor. 

706 

707 Returns: 

708 google.auth.service_account.IDTokenCredentials: The constructed 

709 credentials. 

710 """ 

711 info, signer = _service_account_info.from_filename( 

712 filename, require=["client_email", "token_uri"] 

713 ) 

714 return cls._from_signer_and_info(signer, info, **kwargs) 

715 

716 def _make_copy(self): 

717 cred = self.__class__( 

718 self._signer, 

719 service_account_email=self._service_account_email, 

720 token_uri=self._token_uri, 

721 target_audience=self._target_audience, 

722 additional_claims=self._additional_claims.copy(), 

723 quota_project_id=self.quota_project_id, 

724 universe_domain=self._universe_domain, 

725 ) 

726 # _use_iam_endpoint is not exposed in the constructor 

727 cred._use_iam_endpoint = self._use_iam_endpoint 

728 return cred 

729 

730 def with_target_audience(self, target_audience): 

731 """Create a copy of these credentials with the specified target 

732 audience. 

733 

734 Args: 

735 target_audience (str): The intended audience for these credentials, 

736 used when requesting the ID Token. 

737 

738 Returns: 

739 google.auth.service_account.IDTokenCredentials: A new credentials 

740 instance. 

741 """ 

742 cred = self._make_copy() 

743 cred._target_audience = target_audience 

744 return cred 

745 

746 def _with_use_iam_endpoint(self, use_iam_endpoint): 

747 """Create a copy of these credentials with the use_iam_endpoint value. 

748 

749 Args: 

750 use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will 

751 be used instead of the token_uri. Note that 

752 iam.serviceAccountTokenCreator role is required to use the IAM 

753 endpoint. The default value is False. This feature is currently 

754 experimental and subject to change without notice. 

755 

756 Returns: 

757 google.auth.service_account.IDTokenCredentials: A new credentials 

758 instance. 

759 Raises: 

760 google.auth.exceptions.InvalidValue: If the universe domain is not 

761 default and use_iam_endpoint is False. 

762 """ 

763 cred = self._make_copy() 

764 if ( 

765 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN 

766 and not use_iam_endpoint 

767 ): 

768 raise exceptions.InvalidValue( 

769 "use_iam_endpoint should be True for non-default universe domain" 

770 ) 

771 cred._use_iam_endpoint = use_iam_endpoint 

772 return cred 

773 

774 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

775 def with_quota_project(self, quota_project_id): 

776 cred = self._make_copy() 

777 cred._quota_project_id = quota_project_id 

778 return cred 

779 

780 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

781 def with_token_uri(self, token_uri): 

782 cred = self._make_copy() 

783 cred._token_uri = token_uri 

784 return cred 

785 

786 def _make_authorization_grant_assertion(self): 

787 """Create the OAuth 2.0 assertion. 

788 

789 This assertion is used during the OAuth 2.0 grant to acquire an 

790 ID token. 

791 

792 Returns: 

793 bytes: The authorization grant assertion. 

794 """ 

795 now = _helpers.utcnow() 

796 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 

797 expiry = now + lifetime 

798 

799 payload = { 

800 "iat": _helpers.datetime_to_secs(now), 

801 "exp": _helpers.datetime_to_secs(expiry), 

802 # The issuer must be the service account email. 

803 "iss": self.service_account_email, 

804 # The audience must be the auth token endpoint's URI 

805 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, 

806 # The target audience specifies which service the ID token is 

807 # intended for. 

808 "target_audience": self._target_audience, 

809 } 

810 

811 payload.update(self._additional_claims) 

812 

813 token = jwt.encode(self._signer, payload) 

814 

815 return token 

816 

817 def _refresh_with_iam_endpoint(self, request): 

818 """Use IAM generateIdToken endpoint to obtain an ID token. 

819 

820 It works as follows: 

821 

822 1. First we create a self signed jwt with 

823 https://www.googleapis.com/auth/iam being the scope. 

824 

825 2. Next we use the self signed jwt as the access token, and make a POST 

826 request to IAM generateIdToken endpoint. The request body is: 

827 { 

828 "audience": self._target_audience, 

829 "includeEmail": "true", 

830 "useEmailAzp": "true", 

831 } 

832 

833 If the request is succesfully, it will return {"token":"the ID token"}, 

834 and we can extract the ID token and compute its expiry. 

835 """ 

836 jwt_credentials = jwt.Credentials.from_signing_credentials( 

837 self, 

838 None, 

839 additional_claims={"scope": "https://www.googleapis.com/auth/iam"}, 

840 ) 

841 jwt_credentials.refresh(request) 

842 

843 self.token, self.expiry = _client.call_iam_generate_id_token_endpoint( 

844 request, 

845 self._iam_id_token_endpoint, 

846 self.signer_email, 

847 self._target_audience, 

848 jwt_credentials.token.decode(), 

849 self._universe_domain, 

850 ) 

851 

852 @_helpers.copy_docstring(credentials.Credentials) 

853 def refresh(self, request): 

854 if self._use_iam_endpoint: 

855 self._refresh_with_iam_endpoint(request) 

856 else: 

857 assertion = self._make_authorization_grant_assertion() 

858 access_token, expiry, _ = _client.id_token_jwt_grant( 

859 request, self._token_uri, assertion 

860 ) 

861 self.token = access_token 

862 self.expiry = expiry 

863 

864 @property 

865 def service_account_email(self): 

866 """The service account email.""" 

867 return self._service_account_email 

868 

869 @_helpers.copy_docstring(credentials.Signing) 

870 def sign_bytes(self, message): 

871 return self._signer.sign(message) 

872 

873 @property # type: ignore 

874 @_helpers.copy_docstring(credentials.Signing) 

875 def signer(self): 

876 return self._signer 

877 

878 @property # type: ignore 

879 @_helpers.copy_docstring(credentials.Signing) 

880 def signer_email(self): 

881 return self._service_account_email