Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/oauth2/service_account.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

260 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0 

16 

17This module implements the JWT Profile for OAuth 2.0 Authorization Grants 

18as defined by `RFC 7523`_ with particular support for how this RFC is 

19implemented in Google's infrastructure. Google refers to these credentials 

20as *Service Accounts*. 

21 

22Service accounts are used for server-to-server communication, such as 

23interactions between a web application server and a Google service. The 

24service account belongs to your application instead of to an individual end 

25user. In contrast to other OAuth 2.0 profiles, no users are involved and your 

26application "acts" as the service account. 

27 

28Typically an application uses a service account when the application uses 

29Google APIs to work with its own data rather than a user's data. For example, 

30an application that uses Google Cloud Datastore for data persistence would use 

31a service account to authenticate its calls to the Google Cloud Datastore API. 

32However, an application that needs to access a user's Drive documents would 

33use the normal OAuth 2.0 profile. 

34 

35Additionally, Google Apps domain administrators can grant service accounts 

36`domain-wide delegation`_ authority to access user data on behalf of users in 

37the domain. 

38 

39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used 

40in place of the usual authorization token returned during the standard 

41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as 

42the acquired access token is used as the bearer token when making requests 

43using these credentials. 

44 

45This profile differs from normal OAuth 2.0 profile because no user consent 

46step is required. The use of the private key allows this profile to assert 

47identity directly. 

48 

49This profile also differs from the :mod:`google.auth.jwt` authentication 

50because the JWT credentials use the JWT directly as the bearer token. This 

51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The 

52obtained OAuth 2.0 access token is used as the bearer token. 

53 

54Domain-wide delegation 

55---------------------- 

56 

57Domain-wide delegation allows a service account to access user data on 

58behalf of any user in a Google Apps domain without consent from the user. 

59For example, an application that uses the Google Calendar API to add events to 

60the calendars of all users in a Google Apps domain would use a service account 

61to access the Google Calendar API on behalf of users. 

62 

63The Google Apps administrator must explicitly authorize the service account to 

64do this. This authorization step is referred to as "delegating domain-wide 

65authority" to a service account. 

66 

67You can use domain-wise delegation by creating a set of credentials with a 

68specific subject using :meth:`~Credentials.with_subject`. 

69 

70.. _RFC 7523: https://tools.ietf.org/html/rfc7523 

71""" 

72 

73import copy 

74import datetime 

75 

76from google.auth import _constants 

77from google.auth import _helpers 

78from google.auth import _service_account_info 

79from google.auth import credentials 

80from google.auth import exceptions 

81from google.auth import iam 

82from google.auth import jwt 

83from google.auth import metrics 

84from google.oauth2 import _client 

85 

86_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

87_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" 

88 

89 

90class Credentials( 

91 credentials.Signing, 

92 credentials.Scoped, 

93 credentials.CredentialsWithQuotaProject, 

94 credentials.CredentialsWithTokenUri, 

95 credentials.CredentialsWithTrustBoundary, 

96): 

97 """Service account credentials 

98 

99 Usually, you'll create these credentials with one of the helper 

100 constructors. To create credentials using a Google service account 

101 private key JSON file:: 

102 

103 credentials = service_account.Credentials.from_service_account_file( 

104 'service-account.json') 

105 

106 Or if you already have the service account file loaded:: 

107 

108 service_account_info = json.load(open('service_account.json')) 

109 credentials = service_account.Credentials.from_service_account_info( 

110 service_account_info) 

111 

112 Both helper methods pass on arguments to the constructor, so you can 

113 specify additional scopes and a subject if necessary:: 

114 

115 credentials = service_account.Credentials.from_service_account_file( 

116 'service-account.json', 

117 scopes=['email'], 

118 subject='user@example.com') 

119 

120 The credentials are considered immutable. If you want to modify the scopes 

121 or the subject used for delegation, use :meth:`with_scopes` or 

122 :meth:`with_subject`:: 

123 

124 scoped_credentials = credentials.with_scopes(['email']) 

125 delegated_credentials = credentials.with_subject(subject) 

126 

127 To add a quota project, use :meth:`with_quota_project`:: 

128 

129 credentials = credentials.with_quota_project('myproject-123') 

130 """ 

131 

132 def __init__( 

133 self, 

134 signer, 

135 service_account_email, 

136 token_uri, 

137 scopes=None, 

138 default_scopes=None, 

139 subject=None, 

140 project_id=None, 

141 quota_project_id=None, 

142 additional_claims=None, 

143 always_use_jwt_access=False, 

144 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 

145 trust_boundary=None, 

146 ): 

147 """ 

148 Args: 

149 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

150 service_account_email (str): The service account's email. 

151 scopes (Sequence[str]): User-defined scopes to request during the 

152 authorization grant. 

153 default_scopes (Sequence[str]): Default scopes passed by a 

154 Google client library. Use 'scopes' for user-defined scopes. 

155 token_uri (str): The OAuth 2.0 Token URI. 

156 subject (str): For domain-wide delegation, the email address of the 

157 user to for which to request delegated access. 

158 project_id (str): Project ID associated with the service account 

159 credential. 

160 quota_project_id (Optional[str]): The project ID used for quota and 

161 billing. 

162 additional_claims (Mapping[str, str]): Any additional claims for 

163 the JWT assertion used in the authorization grant. 

164 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

165 be always used. 

166 universe_domain (str): The universe domain. The default 

167 universe domain is googleapis.com. For default value self 

168 signed jwt is used for token refresh. 

169 trust_boundary (Mapping[str,str]): A credential trust boundary. 

170 

171 .. note:: Typically one of the helper constructors 

172 :meth:`from_service_account_file` or 

173 :meth:`from_service_account_info` are used instead of calling the 

174 constructor directly. 

175 """ 

176 super(Credentials, self).__init__() 

177 

178 self._cred_file_path = None 

179 self._scopes = scopes 

180 self._default_scopes = default_scopes 

181 self._signer = signer 

182 self._service_account_email = service_account_email 

183 self._subject = subject 

184 self._project_id = project_id 

185 self._quota_project_id = quota_project_id 

186 self._token_uri = token_uri 

187 self._always_use_jwt_access = always_use_jwt_access 

188 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN 

189 

190 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 

191 self._always_use_jwt_access = True 

192 

193 self._jwt_credentials = None 

194 

195 if additional_claims is not None: 

196 self._additional_claims = additional_claims 

197 else: 

198 self._additional_claims = {} 

199 self._trust_boundary = trust_boundary 

200 

201 @classmethod 

202 def _from_signer_and_info(cls, signer, info, **kwargs): 

203 """Creates a Credentials instance from a signer and service account 

204 info. 

205 

206 Args: 

207 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

208 info (Mapping[str, str]): The service account info. 

209 kwargs: Additional arguments to pass to the constructor. 

210 

211 Returns: 

212 google.auth.jwt.Credentials: The constructed credentials. 

213 

214 Raises: 

215 ValueError: If the info is not in the expected format. 

216 """ 

217 return cls( 

218 signer, 

219 service_account_email=info["client_email"], 

220 token_uri=info["token_uri"], 

221 project_id=info.get("project_id"), 

222 universe_domain=info.get( 

223 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN 

224 ), 

225 trust_boundary=info.get("trust_boundary"), 

226 **kwargs, 

227 ) 

228 

229 @classmethod 

230 def from_service_account_info(cls, info, **kwargs): 

231 """Creates a Credentials instance from parsed service account info. 

232 

233 Args: 

234 info (Mapping[str, str]): The service account info in Google 

235 format. 

236 kwargs: Additional arguments to pass to the constructor. 

237 

238 Returns: 

239 google.auth.service_account.Credentials: The constructed 

240 credentials. 

241 

242 Raises: 

243 ValueError: If the info is not in the expected format. 

244 """ 

245 signer = _service_account_info.from_dict( 

246 info, require=["client_email", "token_uri"] 

247 ) 

248 return cls._from_signer_and_info(signer, info, **kwargs) 

249 

250 @classmethod 

251 def from_service_account_file(cls, filename, **kwargs): 

252 """Creates a Credentials instance from a service account json file. 

253 

254 Args: 

255 filename (str): The path to the service account json file. 

256 kwargs: Additional arguments to pass to the constructor. 

257 

258 Returns: 

259 google.auth.service_account.Credentials: The constructed 

260 credentials. 

261 """ 

262 info, signer = _service_account_info.from_filename( 

263 filename, require=["client_email", "token_uri"] 

264 ) 

265 return cls._from_signer_and_info(signer, info, **kwargs) 

266 

267 @property 

268 def service_account_email(self): 

269 """The service account email.""" 

270 return self._service_account_email 

271 

272 @property 

273 def project_id(self): 

274 """Project ID associated with this credential.""" 

275 return self._project_id 

276 

277 @property 

278 def requires_scopes(self): 

279 """Checks if the credentials requires scopes. 

280 

281 Returns: 

282 bool: True if there are no scopes set otherwise False. 

283 """ 

284 return True if not self._scopes else False 

285 

286 def _make_copy(self): 

287 cred = self.__class__( 

288 self._signer, 

289 service_account_email=self._service_account_email, 

290 scopes=copy.copy(self._scopes), 

291 default_scopes=copy.copy(self._default_scopes), 

292 token_uri=self._token_uri, 

293 subject=self._subject, 

294 project_id=self._project_id, 

295 quota_project_id=self._quota_project_id, 

296 additional_claims=self._additional_claims.copy(), 

297 always_use_jwt_access=self._always_use_jwt_access, 

298 universe_domain=self._universe_domain, 

299 trust_boundary=self._trust_boundary, 

300 ) 

301 cred._cred_file_path = self._cred_file_path 

302 return cred 

303 

304 @_helpers.copy_docstring(credentials.Scoped) 

305 def with_scopes(self, scopes, default_scopes=None): 

306 cred = self._make_copy() 

307 cred._scopes = scopes 

308 cred._default_scopes = default_scopes 

309 return cred 

310 

311 def with_always_use_jwt_access(self, always_use_jwt_access): 

312 """Create a copy of these credentials with the specified always_use_jwt_access value. 

313 

314 Args: 

315 always_use_jwt_access (bool): Whether always use self signed JWT or not. 

316 

317 Returns: 

318 google.auth.service_account.Credentials: A new credentials 

319 instance. 

320 Raises: 

321 google.auth.exceptions.InvalidValue: If the universe domain is not 

322 default and always_use_jwt_access is False. 

323 """ 

324 cred = self._make_copy() 

325 if ( 

326 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN 

327 and not always_use_jwt_access 

328 ): 

329 raise exceptions.InvalidValue( 

330 "always_use_jwt_access should be True for non-default universe domain" 

331 ) 

332 cred._always_use_jwt_access = always_use_jwt_access 

333 return cred 

334 

335 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) 

336 def with_universe_domain(self, universe_domain): 

337 cred = self._make_copy() 

338 cred._universe_domain = universe_domain 

339 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 

340 cred._always_use_jwt_access = True 

341 return cred 

342 

343 def with_subject(self, subject): 

344 """Create a copy of these credentials with the specified subject. 

345 

346 Args: 

347 subject (str): The subject claim. 

348 

349 Returns: 

350 google.auth.service_account.Credentials: A new credentials 

351 instance. 

352 """ 

353 cred = self._make_copy() 

354 cred._subject = subject 

355 return cred 

356 

357 def with_claims(self, additional_claims): 

358 """Returns a copy of these credentials with modified claims. 

359 

360 Args: 

361 additional_claims (Mapping[str, str]): Any additional claims for 

362 the JWT payload. This will be merged with the current 

363 additional claims. 

364 

365 Returns: 

366 google.auth.service_account.Credentials: A new credentials 

367 instance. 

368 """ 

369 new_additional_claims = copy.deepcopy(self._additional_claims) 

370 new_additional_claims.update(additional_claims or {}) 

371 cred = self._make_copy() 

372 cred._additional_claims = new_additional_claims 

373 return cred 

374 

375 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

376 def with_quota_project(self, quota_project_id): 

377 cred = self._make_copy() 

378 cred._quota_project_id = quota_project_id 

379 return cred 

380 

381 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

382 def with_token_uri(self, token_uri): 

383 cred = self._make_copy() 

384 cred._token_uri = token_uri 

385 return cred 

386 

387 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) 

388 def with_trust_boundary(self, trust_boundary): 

389 cred = self._make_copy() 

390 cred._trust_boundary = trust_boundary 

391 return cred 

392 

393 def _make_authorization_grant_assertion(self): 

394 """Create the OAuth 2.0 assertion. 

395 

396 This assertion is used during the OAuth 2.0 grant to acquire an 

397 access token. 

398 

399 Returns: 

400 bytes: The authorization grant assertion. 

401 """ 

402 now = _helpers.utcnow() 

403 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 

404 expiry = now + lifetime 

405 

406 payload = { 

407 "iat": _helpers.datetime_to_secs(now), 

408 "exp": _helpers.datetime_to_secs(expiry), 

409 # The issuer must be the service account email. 

410 "iss": self._service_account_email, 

411 # The audience must be the auth token endpoint's URI 

412 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, 

413 "scope": _helpers.scopes_to_string(self._scopes or ()), 

414 } 

415 

416 payload.update(self._additional_claims) 

417 

418 # The subject can be a user email for domain-wide delegation. 

419 if self._subject: 

420 payload.setdefault("sub", self._subject) 

421 

422 token = jwt.encode(self._signer, payload) 

423 

424 return token 

425 

426 def _use_self_signed_jwt(self): 

427 # Since domain wide delegation doesn't work with self signed JWT. If 

428 # subject exists, then we should not use self signed JWT. 

429 return self._subject is None and self._jwt_credentials is not None 

430 

431 def _metric_header_for_usage(self): 

432 if self._use_self_signed_jwt(): 

433 return metrics.CRED_TYPE_SA_JWT 

434 return metrics.CRED_TYPE_SA_ASSERTION 

435 

436 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) 

437 def _perform_refresh_token(self, request): 

438 if self._always_use_jwt_access and not self._jwt_credentials: 

439 # If self signed jwt should be used but jwt credential is not 

440 # created, try to create one with scopes 

441 self._create_self_signed_jwt(None) 

442 

443 if ( 

444 self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN 

445 and self._subject 

446 ): 

447 raise exceptions.RefreshError( 

448 "domain wide delegation is not supported for non-default universe domain" 

449 ) 

450 

451 if self._use_self_signed_jwt(): 

452 self._jwt_credentials.refresh(request) 

453 self.token = self._jwt_credentials.token.decode() 

454 self.expiry = self._jwt_credentials.expiry 

455 else: 

456 assertion = self._make_authorization_grant_assertion() 

457 access_token, expiry, _ = _client.jwt_grant( 

458 request, self._token_uri, assertion 

459 ) 

460 self.token = access_token 

461 self.expiry = expiry 

462 

463 def _create_self_signed_jwt(self, audience): 

464 """Create a self-signed JWT from the credentials if requirements are met. 

465 

466 Args: 

467 audience (str): The service URL. ``https://[API_ENDPOINT]/`` 

468 """ 

469 # https://google.aip.dev/auth/4111 

470 if self._always_use_jwt_access: 

471 if self._scopes: 

472 additional_claims = {"scope": " ".join(self._scopes)} 

473 if ( 

474 self._jwt_credentials is None 

475 or self._jwt_credentials.additional_claims != additional_claims 

476 ): 

477 self._jwt_credentials = jwt.Credentials.from_signing_credentials( 

478 self, None, additional_claims=additional_claims 

479 ) 

480 elif audience: 

481 if ( 

482 self._jwt_credentials is None 

483 or self._jwt_credentials._audience != audience 

484 ): 

485 self._jwt_credentials = jwt.Credentials.from_signing_credentials( 

486 self, audience 

487 ) 

488 elif self._default_scopes: 

489 additional_claims = {"scope": " ".join(self._default_scopes)} 

490 if ( 

491 self._jwt_credentials is None 

492 or additional_claims != self._jwt_credentials.additional_claims 

493 ): 

494 self._jwt_credentials = jwt.Credentials.from_signing_credentials( 

495 self, None, additional_claims=additional_claims 

496 ) 

497 elif not self._scopes and audience: 

498 self._jwt_credentials = jwt.Credentials.from_signing_credentials( 

499 self, audience 

500 ) 

501 

502 def _build_trust_boundary_lookup_url(self): 

503 """Builds and returns the URL for the trust boundary lookup API. 

504 

505 This method constructs the specific URL for the IAM Credentials API's 

506 `allowedLocations` endpoint, using the credential's universe domain 

507 and service account email. 

508 

509 Raises: 

510 ValueError: If `self.service_account_email` is None or an empty 

511 string, as it's required to form the URL. 

512 

513 Returns: 

514 str: The URL for the trust boundary lookup endpoint. 

515 """ 

516 if not self.service_account_email: 

517 raise ValueError( 

518 "Service account email is required to build the trust boundary lookup URL." 

519 ) 

520 return _constants._SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( 

521 universe_domain=self._universe_domain, 

522 service_account_email=self._service_account_email, 

523 ) 

524 

525 @_helpers.copy_docstring(credentials.Signing) 

526 def sign_bytes(self, message): 

527 return self._signer.sign(message) 

528 

529 @property # type: ignore 

530 @_helpers.copy_docstring(credentials.Signing) 

531 def signer(self): 

532 return self._signer 

533 

534 @property # type: ignore 

535 @_helpers.copy_docstring(credentials.Signing) 

536 def signer_email(self): 

537 return self._service_account_email 

538 

539 @_helpers.copy_docstring(credentials.Credentials) 

540 def get_cred_info(self): 

541 if self._cred_file_path: 

542 return { 

543 "credential_source": self._cred_file_path, 

544 "credential_type": "service account credentials", 

545 "principal": self.service_account_email, 

546 } 

547 return None 

548 

549 

550class IDTokenCredentials( 

551 credentials.Signing, 

552 credentials.CredentialsWithQuotaProject, 

553 credentials.CredentialsWithTokenUri, 

554): 

555 """Open ID Connect ID Token-based service account credentials. 

556 

557 These credentials are largely similar to :class:`.Credentials`, but instead 

558 of using an OAuth 2.0 Access Token as the bearer token, they use an Open 

559 ID Connect ID Token as the bearer token. These credentials are useful when 

560 communicating to services that require ID Tokens and can not accept access 

561 tokens. 

562 

563 Usually, you'll create these credentials with one of the helper 

564 constructors. To create credentials using a Google service account 

565 private key JSON file:: 

566 

567 credentials = ( 

568 service_account.IDTokenCredentials.from_service_account_file( 

569 'service-account.json')) 

570 

571 

572 Or if you already have the service account file loaded:: 

573 

574 service_account_info = json.load(open('service_account.json')) 

575 credentials = ( 

576 service_account.IDTokenCredentials.from_service_account_info( 

577 service_account_info)) 

578 

579 

580 Both helper methods pass on arguments to the constructor, so you can 

581 specify additional scopes and a subject if necessary:: 

582 

583 credentials = ( 

584 service_account.IDTokenCredentials.from_service_account_file( 

585 'service-account.json', 

586 scopes=['email'], 

587 subject='user@example.com')) 

588 

589 

590 The credentials are considered immutable. If you want to modify the scopes 

591 or the subject used for delegation, use :meth:`with_scopes` or 

592 :meth:`with_subject`:: 

593 

594 scoped_credentials = credentials.with_scopes(['email']) 

595 delegated_credentials = credentials.with_subject(subject) 

596 

597 """ 

598 

599 def __init__( 

600 self, 

601 signer, 

602 service_account_email, 

603 token_uri, 

604 target_audience, 

605 additional_claims=None, 

606 quota_project_id=None, 

607 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 

608 ): 

609 """ 

610 Args: 

611 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

612 service_account_email (str): The service account's email. 

613 token_uri (str): The OAuth 2.0 Token URI. 

614 target_audience (str): The intended audience for these credentials, 

615 used when requesting the ID Token. The ID Token's ``aud`` claim 

616 will be set to this string. 

617 additional_claims (Mapping[str, str]): Any additional claims for 

618 the JWT assertion used in the authorization grant. 

619 quota_project_id (Optional[str]): The project ID used for quota and billing. 

620 universe_domain (str): The universe domain. The default 

621 universe domain is googleapis.com. For default value IAM ID 

622 token endponint is used for token refresh. Note that 

623 iam.serviceAccountTokenCreator role is required to use the IAM 

624 endpoint. 

625 

626 .. note:: Typically one of the helper constructors 

627 :meth:`from_service_account_file` or 

628 :meth:`from_service_account_info` are used instead of calling the 

629 constructor directly. 

630 """ 

631 super(IDTokenCredentials, self).__init__() 

632 self._signer = signer 

633 self._service_account_email = service_account_email 

634 self._token_uri = token_uri 

635 self._target_audience = target_audience 

636 self._quota_project_id = quota_project_id 

637 self._use_iam_endpoint = False 

638 

639 if not universe_domain: 

640 self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN 

641 else: 

642 self._universe_domain = universe_domain 

643 self._iam_id_token_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace( 

644 "googleapis.com", self._universe_domain 

645 ) 

646 

647 if self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 

648 self._use_iam_endpoint = True 

649 

650 if additional_claims is not None: 

651 self._additional_claims = additional_claims 

652 else: 

653 self._additional_claims = {} 

654 

655 @classmethod 

656 def _from_signer_and_info(cls, signer, info, **kwargs): 

657 """Creates a credentials instance from a signer and service account 

658 info. 

659 

660 Args: 

661 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

662 info (Mapping[str, str]): The service account info. 

663 kwargs: Additional arguments to pass to the constructor. 

664 

665 Returns: 

666 google.auth.jwt.IDTokenCredentials: The constructed credentials. 

667 

668 Raises: 

669 ValueError: If the info is not in the expected format. 

670 """ 

671 kwargs.setdefault("service_account_email", info["client_email"]) 

672 kwargs.setdefault("token_uri", info["token_uri"]) 

673 if "universe_domain" in info: 

674 kwargs["universe_domain"] = info["universe_domain"] 

675 return cls(signer, **kwargs) 

676 

677 @classmethod 

678 def from_service_account_info(cls, info, **kwargs): 

679 """Creates a credentials instance from parsed service account info. 

680 

681 Args: 

682 info (Mapping[str, str]): The service account info in Google 

683 format. 

684 kwargs: Additional arguments to pass to the constructor. 

685 

686 Returns: 

687 google.auth.service_account.IDTokenCredentials: The constructed 

688 credentials. 

689 

690 Raises: 

691 ValueError: If the info is not in the expected format. 

692 """ 

693 signer = _service_account_info.from_dict( 

694 info, require=["client_email", "token_uri"] 

695 ) 

696 return cls._from_signer_and_info(signer, info, **kwargs) 

697 

698 @classmethod 

699 def from_service_account_file(cls, filename, **kwargs): 

700 """Creates a credentials instance from a service account json file. 

701 

702 Args: 

703 filename (str): The path to the service account json file. 

704 kwargs: Additional arguments to pass to the constructor. 

705 

706 Returns: 

707 google.auth.service_account.IDTokenCredentials: The constructed 

708 credentials. 

709 """ 

710 info, signer = _service_account_info.from_filename( 

711 filename, require=["client_email", "token_uri"] 

712 ) 

713 return cls._from_signer_and_info(signer, info, **kwargs) 

714 

715 def _make_copy(self): 

716 cred = self.__class__( 

717 self._signer, 

718 service_account_email=self._service_account_email, 

719 token_uri=self._token_uri, 

720 target_audience=self._target_audience, 

721 additional_claims=self._additional_claims.copy(), 

722 quota_project_id=self.quota_project_id, 

723 universe_domain=self._universe_domain, 

724 ) 

725 # _use_iam_endpoint is not exposed in the constructor 

726 cred._use_iam_endpoint = self._use_iam_endpoint 

727 return cred 

728 

729 def with_target_audience(self, target_audience): 

730 """Create a copy of these credentials with the specified target 

731 audience. 

732 

733 Args: 

734 target_audience (str): The intended audience for these credentials, 

735 used when requesting the ID Token. 

736 

737 Returns: 

738 google.auth.service_account.IDTokenCredentials: A new credentials 

739 instance. 

740 """ 

741 cred = self._make_copy() 

742 cred._target_audience = target_audience 

743 return cred 

744 

745 def _with_use_iam_endpoint(self, use_iam_endpoint): 

746 """Create a copy of these credentials with the use_iam_endpoint value. 

747 

748 Args: 

749 use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will 

750 be used instead of the token_uri. Note that 

751 iam.serviceAccountTokenCreator role is required to use the IAM 

752 endpoint. The default value is False. This feature is currently 

753 experimental and subject to change without notice. 

754 

755 Returns: 

756 google.auth.service_account.IDTokenCredentials: A new credentials 

757 instance. 

758 Raises: 

759 google.auth.exceptions.InvalidValue: If the universe domain is not 

760 default and use_iam_endpoint is False. 

761 """ 

762 cred = self._make_copy() 

763 if ( 

764 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN 

765 and not use_iam_endpoint 

766 ): 

767 raise exceptions.InvalidValue( 

768 "use_iam_endpoint should be True for non-default universe domain" 

769 ) 

770 cred._use_iam_endpoint = use_iam_endpoint 

771 return cred 

772 

773 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

774 def with_quota_project(self, quota_project_id): 

775 cred = self._make_copy() 

776 cred._quota_project_id = quota_project_id 

777 return cred 

778 

779 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

780 def with_token_uri(self, token_uri): 

781 cred = self._make_copy() 

782 cred._token_uri = token_uri 

783 return cred 

784 

785 def _make_authorization_grant_assertion(self): 

786 """Create the OAuth 2.0 assertion. 

787 

788 This assertion is used during the OAuth 2.0 grant to acquire an 

789 ID token. 

790 

791 Returns: 

792 bytes: The authorization grant assertion. 

793 """ 

794 now = _helpers.utcnow() 

795 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 

796 expiry = now + lifetime 

797 

798 payload = { 

799 "iat": _helpers.datetime_to_secs(now), 

800 "exp": _helpers.datetime_to_secs(expiry), 

801 # The issuer must be the service account email. 

802 "iss": self.service_account_email, 

803 # The audience must be the auth token endpoint's URI 

804 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, 

805 # The target audience specifies which service the ID token is 

806 # intended for. 

807 "target_audience": self._target_audience, 

808 } 

809 

810 payload.update(self._additional_claims) 

811 

812 token = jwt.encode(self._signer, payload) 

813 

814 return token 

815 

816 def _refresh_with_iam_endpoint(self, request): 

817 """Use IAM generateIdToken endpoint to obtain an ID token. 

818 

819 It works as follows: 

820 

821 1. First we create a self signed jwt with 

822 https://www.googleapis.com/auth/iam being the scope. 

823 

824 2. Next we use the self signed jwt as the access token, and make a POST 

825 request to IAM generateIdToken endpoint. The request body is: 

826 { 

827 "audience": self._target_audience, 

828 "includeEmail": "true", 

829 "useEmailAzp": "true", 

830 } 

831 

832 If the request is succesfully, it will return {"token":"the ID token"}, 

833 and we can extract the ID token and compute its expiry. 

834 """ 

835 jwt_credentials = jwt.Credentials.from_signing_credentials( 

836 self, 

837 None, 

838 additional_claims={"scope": "https://www.googleapis.com/auth/iam"}, 

839 ) 

840 jwt_credentials.refresh(request) 

841 

842 self.token, self.expiry = _client.call_iam_generate_id_token_endpoint( 

843 request, 

844 self._iam_id_token_endpoint, 

845 self.signer_email, 

846 self._target_audience, 

847 jwt_credentials.token.decode(), 

848 self._universe_domain, 

849 ) 

850 

851 @_helpers.copy_docstring(credentials.Credentials) 

852 def refresh(self, request): 

853 if self._use_iam_endpoint: 

854 self._refresh_with_iam_endpoint(request) 

855 else: 

856 assertion = self._make_authorization_grant_assertion() 

857 access_token, expiry, _ = _client.id_token_jwt_grant( 

858 request, self._token_uri, assertion 

859 ) 

860 self.token = access_token 

861 self.expiry = expiry 

862 

863 @property 

864 def service_account_email(self): 

865 """The service account email.""" 

866 return self._service_account_email 

867 

868 @_helpers.copy_docstring(credentials.Signing) 

869 def sign_bytes(self, message): 

870 return self._signer.sign(message) 

871 

872 @property # type: ignore 

873 @_helpers.copy_docstring(credentials.Signing) 

874 def signer(self): 

875 return self._signer 

876 

877 @property # type: ignore 

878 @_helpers.copy_docstring(credentials.Signing) 

879 def signer_email(self): 

880 return self._service_account_email