Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/oauth2/service_account.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

240 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0 

16 

17This module implements the JWT Profile for OAuth 2.0 Authorization Grants 

18as defined by `RFC 7523`_ with particular support for how this RFC is 

19implemented in Google's infrastructure. Google refers to these credentials 

20as *Service Accounts*. 

21 

22Service accounts are used for server-to-server communication, such as 

23interactions between a web application server and a Google service. The 

24service account belongs to your application instead of to an individual end 

25user. In contrast to other OAuth 2.0 profiles, no users are involved and your 

26application "acts" as the service account. 

27 

28Typically an application uses a service account when the application uses 

29Google APIs to work with its own data rather than a user's data. For example, 

30an application that uses Google Cloud Datastore for data persistence would use 

31a service account to authenticate its calls to the Google Cloud Datastore API. 

32However, an application that needs to access a user's Drive documents would 

33use the normal OAuth 2.0 profile. 

34 

35Additionally, Google Apps domain administrators can grant service accounts 

36`domain-wide delegation`_ authority to access user data on behalf of users in 

37the domain. 

38 

39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used 

40in place of the usual authorization token returned during the standard 

41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as 

42the acquired access token is used as the bearer token when making requests 

43using these credentials. 

44 

45This profile differs from normal OAuth 2.0 profile because no user consent 

46step is required. The use of the private key allows this profile to assert 

47identity directly. 

48 

49This profile also differs from the :mod:`google.auth.jwt` authentication 

50because the JWT credentials use the JWT directly as the bearer token. This 

51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The 

52obtained OAuth 2.0 access token is used as the bearer token. 

53 

54Domain-wide delegation 

55---------------------- 

56 

57Domain-wide delegation allows a service account to access user data on 

58behalf of any user in a Google Apps domain without consent from the user. 

59For example, an application that uses the Google Calendar API to add events to 

60the calendars of all users in a Google Apps domain would use a service account 

61to access the Google Calendar API on behalf of users. 

62 

63The Google Apps administrator must explicitly authorize the service account to 

64do this. This authorization step is referred to as "delegating domain-wide 

65authority" to a service account. 

66 

67You can use domain-wise delegation by creating a set of credentials with a 

68specific subject using :meth:`~Credentials.with_subject`. 

69 

70.. _RFC 7523: https://tools.ietf.org/html/rfc7523 

71""" 

72 

73import copy 

74import datetime 

75 

76from google.auth import _helpers 

77from google.auth import _service_account_info 

78from google.auth import credentials 

79from google.auth import exceptions 

80from google.auth import jwt 

81from google.auth import metrics 

82from google.oauth2 import _client 

83 

84_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

85_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" 

86 

87 

88class Credentials( 

89 credentials.Signing, 

90 credentials.Scoped, 

91 credentials.CredentialsWithQuotaProject, 

92 credentials.CredentialsWithTokenUri, 

93): 

94 """Service account credentials 

95 

96 Usually, you'll create these credentials with one of the helper 

97 constructors. To create credentials using a Google service account 

98 private key JSON file:: 

99 

100 credentials = service_account.Credentials.from_service_account_file( 

101 'service-account.json') 

102 

103 Or if you already have the service account file loaded:: 

104 

105 service_account_info = json.load(open('service_account.json')) 

106 credentials = service_account.Credentials.from_service_account_info( 

107 service_account_info) 

108 

109 Both helper methods pass on arguments to the constructor, so you can 

110 specify additional scopes and a subject if necessary:: 

111 

112 credentials = service_account.Credentials.from_service_account_file( 

113 'service-account.json', 

114 scopes=['email'], 

115 subject='user@example.com') 

116 

117 The credentials are considered immutable. If you want to modify the scopes 

118 or the subject used for delegation, use :meth:`with_scopes` or 

119 :meth:`with_subject`:: 

120 

121 scoped_credentials = credentials.with_scopes(['email']) 

122 delegated_credentials = credentials.with_subject(subject) 

123 

124 To add a quota project, use :meth:`with_quota_project`:: 

125 

126 credentials = credentials.with_quota_project('myproject-123') 

127 """ 

128 

129 def __init__( 

130 self, 

131 signer, 

132 service_account_email, 

133 token_uri, 

134 scopes=None, 

135 default_scopes=None, 

136 subject=None, 

137 project_id=None, 

138 quota_project_id=None, 

139 additional_claims=None, 

140 always_use_jwt_access=False, 

141 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 

142 trust_boundary=None, 

143 ): 

144 """ 

145 Args: 

146 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

147 service_account_email (str): The service account's email. 

148 scopes (Sequence[str]): User-defined scopes to request during the 

149 authorization grant. 

150 default_scopes (Sequence[str]): Default scopes passed by a 

151 Google client library. Use 'scopes' for user-defined scopes. 

152 token_uri (str): The OAuth 2.0 Token URI. 

153 subject (str): For domain-wide delegation, the email address of the 

154 user to for which to request delegated access. 

155 project_id (str): Project ID associated with the service account 

156 credential. 

157 quota_project_id (Optional[str]): The project ID used for quota and 

158 billing. 

159 additional_claims (Mapping[str, str]): Any additional claims for 

160 the JWT assertion used in the authorization grant. 

161 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

162 be always used. 

163 universe_domain (str): The universe domain. The default 

164 universe domain is googleapis.com. For default value self 

165 signed jwt is used for token refresh. 

166 trust_boundary (str): String representation of trust boundary meta. 

167 

168 .. note:: Typically one of the helper constructors 

169 :meth:`from_service_account_file` or 

170 :meth:`from_service_account_info` are used instead of calling the 

171 constructor directly. 

172 """ 

173 super(Credentials, self).__init__() 

174 

175 self._scopes = scopes 

176 self._default_scopes = default_scopes 

177 self._signer = signer 

178 self._service_account_email = service_account_email 

179 self._subject = subject 

180 self._project_id = project_id 

181 self._quota_project_id = quota_project_id 

182 self._token_uri = token_uri 

183 self._always_use_jwt_access = always_use_jwt_access 

184 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN 

185 

186 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 

187 self._always_use_jwt_access = True 

188 

189 self._jwt_credentials = None 

190 

191 if additional_claims is not None: 

192 self._additional_claims = additional_claims 

193 else: 

194 self._additional_claims = {} 

195 self._trust_boundary = {"locations": [], "encoded_locations": "0x0"} 

196 

197 @classmethod 

198 def _from_signer_and_info(cls, signer, info, **kwargs): 

199 """Creates a Credentials instance from a signer and service account 

200 info. 

201 

202 Args: 

203 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

204 info (Mapping[str, str]): The service account info. 

205 kwargs: Additional arguments to pass to the constructor. 

206 

207 Returns: 

208 google.auth.jwt.Credentials: The constructed credentials. 

209 

210 Raises: 

211 ValueError: If the info is not in the expected format. 

212 """ 

213 return cls( 

214 signer, 

215 service_account_email=info["client_email"], 

216 token_uri=info["token_uri"], 

217 project_id=info.get("project_id"), 

218 universe_domain=info.get( 

219 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN 

220 ), 

221 trust_boundary=info.get("trust_boundary"), 

222 **kwargs 

223 ) 

224 

225 @classmethod 

226 def from_service_account_info(cls, info, **kwargs): 

227 """Creates a Credentials instance from parsed service account info. 

228 

229 Args: 

230 info (Mapping[str, str]): The service account info in Google 

231 format. 

232 kwargs: Additional arguments to pass to the constructor. 

233 

234 Returns: 

235 google.auth.service_account.Credentials: The constructed 

236 credentials. 

237 

238 Raises: 

239 ValueError: If the info is not in the expected format. 

240 """ 

241 signer = _service_account_info.from_dict( 

242 info, require=["client_email", "token_uri"] 

243 ) 

244 return cls._from_signer_and_info(signer, info, **kwargs) 

245 

246 @classmethod 

247 def from_service_account_file(cls, filename, **kwargs): 

248 """Creates a Credentials instance from a service account json file. 

249 

250 Args: 

251 filename (str): The path to the service account json file. 

252 kwargs: Additional arguments to pass to the constructor. 

253 

254 Returns: 

255 google.auth.service_account.Credentials: The constructed 

256 credentials. 

257 """ 

258 info, signer = _service_account_info.from_filename( 

259 filename, require=["client_email", "token_uri"] 

260 ) 

261 return cls._from_signer_and_info(signer, info, **kwargs) 

262 

263 @property 

264 def service_account_email(self): 

265 """The service account email.""" 

266 return self._service_account_email 

267 

268 @property 

269 def project_id(self): 

270 """Project ID associated with this credential.""" 

271 return self._project_id 

272 

273 @property 

274 def requires_scopes(self): 

275 """Checks if the credentials requires scopes. 

276 

277 Returns: 

278 bool: True if there are no scopes set otherwise False. 

279 """ 

280 return True if not self._scopes else False 

281 

282 def _make_copy(self): 

283 cred = self.__class__( 

284 self._signer, 

285 service_account_email=self._service_account_email, 

286 scopes=copy.copy(self._scopes), 

287 default_scopes=copy.copy(self._default_scopes), 

288 token_uri=self._token_uri, 

289 subject=self._subject, 

290 project_id=self._project_id, 

291 quota_project_id=self._quota_project_id, 

292 additional_claims=self._additional_claims.copy(), 

293 always_use_jwt_access=self._always_use_jwt_access, 

294 universe_domain=self._universe_domain, 

295 ) 

296 return cred 

297 

298 @_helpers.copy_docstring(credentials.Scoped) 

299 def with_scopes(self, scopes, default_scopes=None): 

300 cred = self._make_copy() 

301 cred._scopes = scopes 

302 cred._default_scopes = default_scopes 

303 return cred 

304 

305 def with_always_use_jwt_access(self, always_use_jwt_access): 

306 """Create a copy of these credentials with the specified always_use_jwt_access value. 

307 

308 Args: 

309 always_use_jwt_access (bool): Whether always use self signed JWT or not. 

310 

311 Returns: 

312 google.auth.service_account.Credentials: A new credentials 

313 instance. 

314 Raises: 

315 google.auth.exceptions.InvalidValue: If the universe domain is not 

316 default and always_use_jwt_access is False. 

317 """ 

318 cred = self._make_copy() 

319 if ( 

320 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN 

321 and not always_use_jwt_access 

322 ): 

323 raise exceptions.InvalidValue( 

324 "always_use_jwt_access should be True for non-default universe domain" 

325 ) 

326 cred._always_use_jwt_access = always_use_jwt_access 

327 return cred 

328 

329 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) 

330 def with_universe_domain(self, universe_domain): 

331 cred = self._make_copy() 

332 cred._universe_domain = universe_domain 

333 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 

334 cred._always_use_jwt_access = True 

335 return cred 

336 

337 def with_subject(self, subject): 

338 """Create a copy of these credentials with the specified subject. 

339 

340 Args: 

341 subject (str): The subject claim. 

342 

343 Returns: 

344 google.auth.service_account.Credentials: A new credentials 

345 instance. 

346 """ 

347 cred = self._make_copy() 

348 cred._subject = subject 

349 return cred 

350 

351 def with_claims(self, additional_claims): 

352 """Returns a copy of these credentials with modified claims. 

353 

354 Args: 

355 additional_claims (Mapping[str, str]): Any additional claims for 

356 the JWT payload. This will be merged with the current 

357 additional claims. 

358 

359 Returns: 

360 google.auth.service_account.Credentials: A new credentials 

361 instance. 

362 """ 

363 new_additional_claims = copy.deepcopy(self._additional_claims) 

364 new_additional_claims.update(additional_claims or {}) 

365 cred = self._make_copy() 

366 cred._additional_claims = new_additional_claims 

367 return cred 

368 

369 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

370 def with_quota_project(self, quota_project_id): 

371 cred = self._make_copy() 

372 cred._quota_project_id = quota_project_id 

373 return cred 

374 

375 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

376 def with_token_uri(self, token_uri): 

377 cred = self._make_copy() 

378 cred._token_uri = token_uri 

379 return cred 

380 

381 def _make_authorization_grant_assertion(self): 

382 """Create the OAuth 2.0 assertion. 

383 

384 This assertion is used during the OAuth 2.0 grant to acquire an 

385 access token. 

386 

387 Returns: 

388 bytes: The authorization grant assertion. 

389 """ 

390 now = _helpers.utcnow() 

391 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 

392 expiry = now + lifetime 

393 

394 payload = { 

395 "iat": _helpers.datetime_to_secs(now), 

396 "exp": _helpers.datetime_to_secs(expiry), 

397 # The issuer must be the service account email. 

398 "iss": self._service_account_email, 

399 # The audience must be the auth token endpoint's URI 

400 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, 

401 "scope": _helpers.scopes_to_string(self._scopes or ()), 

402 } 

403 

404 payload.update(self._additional_claims) 

405 

406 # The subject can be a user email for domain-wide delegation. 

407 if self._subject: 

408 payload.setdefault("sub", self._subject) 

409 

410 token = jwt.encode(self._signer, payload) 

411 

412 return token 

413 

414 def _use_self_signed_jwt(self): 

415 # Since domain wide delegation doesn't work with self signed JWT. If 

416 # subject exists, then we should not use self signed JWT. 

417 return self._subject is None and self._jwt_credentials is not None 

418 

419 def _metric_header_for_usage(self): 

420 if self._use_self_signed_jwt(): 

421 return metrics.CRED_TYPE_SA_JWT 

422 return metrics.CRED_TYPE_SA_ASSERTION 

423 

424 @_helpers.copy_docstring(credentials.Credentials) 

425 def refresh(self, request): 

426 if self._always_use_jwt_access and not self._jwt_credentials: 

427 # If self signed jwt should be used but jwt credential is not 

428 # created, try to create one with scopes 

429 self._create_self_signed_jwt(None) 

430 

431 if ( 

432 self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN 

433 and self._subject 

434 ): 

435 raise exceptions.RefreshError( 

436 "domain wide delegation is not supported for non-default universe domain" 

437 ) 

438 

439 if self._use_self_signed_jwt(): 

440 self._jwt_credentials.refresh(request) 

441 self.token = self._jwt_credentials.token.decode() 

442 self.expiry = self._jwt_credentials.expiry 

443 else: 

444 assertion = self._make_authorization_grant_assertion() 

445 access_token, expiry, _ = _client.jwt_grant( 

446 request, self._token_uri, assertion 

447 ) 

448 self.token = access_token 

449 self.expiry = expiry 

450 

451 def _create_self_signed_jwt(self, audience): 

452 """Create a self-signed JWT from the credentials if requirements are met. 

453 

454 Args: 

455 audience (str): The service URL. ``https://[API_ENDPOINT]/`` 

456 """ 

457 # https://google.aip.dev/auth/4111 

458 if self._always_use_jwt_access: 

459 if self._scopes: 

460 additional_claims = {"scope": " ".join(self._scopes)} 

461 if ( 

462 self._jwt_credentials is None 

463 or self._jwt_credentials.additional_claims != additional_claims 

464 ): 

465 self._jwt_credentials = jwt.Credentials.from_signing_credentials( 

466 self, None, additional_claims=additional_claims 

467 ) 

468 elif audience: 

469 if ( 

470 self._jwt_credentials is None 

471 or self._jwt_credentials._audience != audience 

472 ): 

473 

474 self._jwt_credentials = jwt.Credentials.from_signing_credentials( 

475 self, audience 

476 ) 

477 elif self._default_scopes: 

478 additional_claims = {"scope": " ".join(self._default_scopes)} 

479 if ( 

480 self._jwt_credentials is None 

481 or additional_claims != self._jwt_credentials.additional_claims 

482 ): 

483 self._jwt_credentials = jwt.Credentials.from_signing_credentials( 

484 self, None, additional_claims=additional_claims 

485 ) 

486 elif not self._scopes and audience: 

487 self._jwt_credentials = jwt.Credentials.from_signing_credentials( 

488 self, audience 

489 ) 

490 

491 @_helpers.copy_docstring(credentials.Signing) 

492 def sign_bytes(self, message): 

493 return self._signer.sign(message) 

494 

495 @property # type: ignore 

496 @_helpers.copy_docstring(credentials.Signing) 

497 def signer(self): 

498 return self._signer 

499 

500 @property # type: ignore 

501 @_helpers.copy_docstring(credentials.Signing) 

502 def signer_email(self): 

503 return self._service_account_email 

504 

505 

506class IDTokenCredentials( 

507 credentials.Signing, 

508 credentials.CredentialsWithQuotaProject, 

509 credentials.CredentialsWithTokenUri, 

510): 

511 """Open ID Connect ID Token-based service account credentials. 

512 

513 These credentials are largely similar to :class:`.Credentials`, but instead 

514 of using an OAuth 2.0 Access Token as the bearer token, they use an Open 

515 ID Connect ID Token as the bearer token. These credentials are useful when 

516 communicating to services that require ID Tokens and can not accept access 

517 tokens. 

518 

519 Usually, you'll create these credentials with one of the helper 

520 constructors. To create credentials using a Google service account 

521 private key JSON file:: 

522 

523 credentials = ( 

524 service_account.IDTokenCredentials.from_service_account_file( 

525 'service-account.json')) 

526 

527 

528 Or if you already have the service account file loaded:: 

529 

530 service_account_info = json.load(open('service_account.json')) 

531 credentials = ( 

532 service_account.IDTokenCredentials.from_service_account_info( 

533 service_account_info)) 

534 

535 

536 Both helper methods pass on arguments to the constructor, so you can 

537 specify additional scopes and a subject if necessary:: 

538 

539 credentials = ( 

540 service_account.IDTokenCredentials.from_service_account_file( 

541 'service-account.json', 

542 scopes=['email'], 

543 subject='user@example.com')) 

544 

545 

546 The credentials are considered immutable. If you want to modify the scopes 

547 or the subject used for delegation, use :meth:`with_scopes` or 

548 :meth:`with_subject`:: 

549 

550 scoped_credentials = credentials.with_scopes(['email']) 

551 delegated_credentials = credentials.with_subject(subject) 

552 

553 """ 

554 

555 def __init__( 

556 self, 

557 signer, 

558 service_account_email, 

559 token_uri, 

560 target_audience, 

561 additional_claims=None, 

562 quota_project_id=None, 

563 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 

564 ): 

565 """ 

566 Args: 

567 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

568 service_account_email (str): The service account's email. 

569 token_uri (str): The OAuth 2.0 Token URI. 

570 target_audience (str): The intended audience for these credentials, 

571 used when requesting the ID Token. The ID Token's ``aud`` claim 

572 will be set to this string. 

573 additional_claims (Mapping[str, str]): Any additional claims for 

574 the JWT assertion used in the authorization grant. 

575 quota_project_id (Optional[str]): The project ID used for quota and billing. 

576 universe_domain (str): The universe domain. The default 

577 universe domain is googleapis.com. For default value IAM ID 

578 token endponint is used for token refresh. Note that 

579 iam.serviceAccountTokenCreator role is required to use the IAM 

580 endpoint. 

581 .. note:: Typically one of the helper constructors 

582 :meth:`from_service_account_file` or 

583 :meth:`from_service_account_info` are used instead of calling the 

584 constructor directly. 

585 """ 

586 super(IDTokenCredentials, self).__init__() 

587 self._signer = signer 

588 self._service_account_email = service_account_email 

589 self._token_uri = token_uri 

590 self._target_audience = target_audience 

591 self._quota_project_id = quota_project_id 

592 self._use_iam_endpoint = False 

593 

594 if not universe_domain: 

595 self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN 

596 else: 

597 self._universe_domain = universe_domain 

598 

599 if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 

600 self._use_iam_endpoint = True 

601 

602 if additional_claims is not None: 

603 self._additional_claims = additional_claims 

604 else: 

605 self._additional_claims = {} 

606 

607 @classmethod 

608 def _from_signer_and_info(cls, signer, info, **kwargs): 

609 """Creates a credentials instance from a signer and service account 

610 info. 

611 

612 Args: 

613 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

614 info (Mapping[str, str]): The service account info. 

615 kwargs: Additional arguments to pass to the constructor. 

616 

617 Returns: 

618 google.auth.jwt.IDTokenCredentials: The constructed credentials. 

619 

620 Raises: 

621 ValueError: If the info is not in the expected format. 

622 """ 

623 kwargs.setdefault("service_account_email", info["client_email"]) 

624 kwargs.setdefault("token_uri", info["token_uri"]) 

625 if "universe_domain" in info: 

626 kwargs["universe_domain"] = info["universe_domain"] 

627 return cls(signer, **kwargs) 

628 

629 @classmethod 

630 def from_service_account_info(cls, info, **kwargs): 

631 """Creates a credentials instance from parsed service account info. 

632 

633 Args: 

634 info (Mapping[str, str]): The service account info in Google 

635 format. 

636 kwargs: Additional arguments to pass to the constructor. 

637 

638 Returns: 

639 google.auth.service_account.IDTokenCredentials: The constructed 

640 credentials. 

641 

642 Raises: 

643 ValueError: If the info is not in the expected format. 

644 """ 

645 signer = _service_account_info.from_dict( 

646 info, require=["client_email", "token_uri"] 

647 ) 

648 return cls._from_signer_and_info(signer, info, **kwargs) 

649 

650 @classmethod 

651 def from_service_account_file(cls, filename, **kwargs): 

652 """Creates a credentials instance from a service account json file. 

653 

654 Args: 

655 filename (str): The path to the service account json file. 

656 kwargs: Additional arguments to pass to the constructor. 

657 

658 Returns: 

659 google.auth.service_account.IDTokenCredentials: The constructed 

660 credentials. 

661 """ 

662 info, signer = _service_account_info.from_filename( 

663 filename, require=["client_email", "token_uri"] 

664 ) 

665 return cls._from_signer_and_info(signer, info, **kwargs) 

666 

667 def _make_copy(self): 

668 cred = self.__class__( 

669 self._signer, 

670 service_account_email=self._service_account_email, 

671 token_uri=self._token_uri, 

672 target_audience=self._target_audience, 

673 additional_claims=self._additional_claims.copy(), 

674 quota_project_id=self.quota_project_id, 

675 universe_domain=self._universe_domain, 

676 ) 

677 # _use_iam_endpoint is not exposed in the constructor 

678 cred._use_iam_endpoint = self._use_iam_endpoint 

679 return cred 

680 

681 def with_target_audience(self, target_audience): 

682 """Create a copy of these credentials with the specified target 

683 audience. 

684 

685 Args: 

686 target_audience (str): The intended audience for these credentials, 

687 used when requesting the ID Token. 

688 

689 Returns: 

690 google.auth.service_account.IDTokenCredentials: A new credentials 

691 instance. 

692 """ 

693 cred = self._make_copy() 

694 cred._target_audience = target_audience 

695 return cred 

696 

697 def _with_use_iam_endpoint(self, use_iam_endpoint): 

698 """Create a copy of these credentials with the use_iam_endpoint value. 

699 

700 Args: 

701 use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will 

702 be used instead of the token_uri. Note that 

703 iam.serviceAccountTokenCreator role is required to use the IAM 

704 endpoint. The default value is False. This feature is currently 

705 experimental and subject to change without notice. 

706 

707 Returns: 

708 google.auth.service_account.IDTokenCredentials: A new credentials 

709 instance. 

710 Raises: 

711 google.auth.exceptions.InvalidValue: If the universe domain is not 

712 default and use_iam_endpoint is False. 

713 """ 

714 cred = self._make_copy() 

715 if ( 

716 cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN 

717 and not use_iam_endpoint 

718 ): 

719 raise exceptions.InvalidValue( 

720 "use_iam_endpoint should be True for non-default universe domain" 

721 ) 

722 cred._use_iam_endpoint = use_iam_endpoint 

723 return cred 

724 

725 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

726 def with_quota_project(self, quota_project_id): 

727 cred = self._make_copy() 

728 cred._quota_project_id = quota_project_id 

729 return cred 

730 

731 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

732 def with_token_uri(self, token_uri): 

733 cred = self._make_copy() 

734 cred._token_uri = token_uri 

735 return cred 

736 

737 def _make_authorization_grant_assertion(self): 

738 """Create the OAuth 2.0 assertion. 

739 

740 This assertion is used during the OAuth 2.0 grant to acquire an 

741 ID token. 

742 

743 Returns: 

744 bytes: The authorization grant assertion. 

745 """ 

746 now = _helpers.utcnow() 

747 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 

748 expiry = now + lifetime 

749 

750 payload = { 

751 "iat": _helpers.datetime_to_secs(now), 

752 "exp": _helpers.datetime_to_secs(expiry), 

753 # The issuer must be the service account email. 

754 "iss": self.service_account_email, 

755 # The audience must be the auth token endpoint's URI 

756 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, 

757 # The target audience specifies which service the ID token is 

758 # intended for. 

759 "target_audience": self._target_audience, 

760 } 

761 

762 payload.update(self._additional_claims) 

763 

764 token = jwt.encode(self._signer, payload) 

765 

766 return token 

767 

768 def _refresh_with_iam_endpoint(self, request): 

769 """Use IAM generateIdToken endpoint to obtain an ID token. 

770 

771 It works as follows: 

772 

773 1. First we create a self signed jwt with 

774 https://www.googleapis.com/auth/iam being the scope. 

775 

776 2. Next we use the self signed jwt as the access token, and make a POST 

777 request to IAM generateIdToken endpoint. The request body is: 

778 { 

779 "audience": self._target_audience, 

780 "includeEmail": "true", 

781 "useEmailAzp": "true", 

782 } 

783 

784 If the request is succesfully, it will return {"token":"the ID token"}, 

785 and we can extract the ID token and compute its expiry. 

786 """ 

787 jwt_credentials = jwt.Credentials.from_signing_credentials( 

788 self, 

789 None, 

790 additional_claims={"scope": "https://www.googleapis.com/auth/iam"}, 

791 ) 

792 jwt_credentials.refresh(request) 

793 self.token, self.expiry = _client.call_iam_generate_id_token_endpoint( 

794 request, 

795 self.signer_email, 

796 self._target_audience, 

797 jwt_credentials.token.decode(), 

798 ) 

799 

800 @_helpers.copy_docstring(credentials.Credentials) 

801 def refresh(self, request): 

802 if self._use_iam_endpoint: 

803 self._refresh_with_iam_endpoint(request) 

804 else: 

805 assertion = self._make_authorization_grant_assertion() 

806 access_token, expiry, _ = _client.id_token_jwt_grant( 

807 request, self._token_uri, assertion 

808 ) 

809 self.token = access_token 

810 self.expiry = expiry 

811 

812 @property 

813 def service_account_email(self): 

814 """The service account email.""" 

815 return self._service_account_email 

816 

817 @_helpers.copy_docstring(credentials.Signing) 

818 def sign_bytes(self, message): 

819 return self._signer.sign(message) 

820 

821 @property # type: ignore 

822 @_helpers.copy_docstring(credentials.Signing) 

823 def signer(self): 

824 return self._signer 

825 

826 @property # type: ignore 

827 @_helpers.copy_docstring(credentials.Signing) 

828 def signer_email(self): 

829 return self._service_account_email