Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/jwt.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

250 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""JSON Web Tokens 

16 

17Provides support for creating (encoding) and verifying (decoding) JWTs, 

18especially JWTs generated and consumed by Google infrastructure. 

19 

20See `rfc7519`_ for more details on JWTs. 

21 

22To encode a JWT use :func:`encode`:: 

23 

24 from google.auth import crypt 

25 from google.auth import jwt 

26 

27 signer = crypt.Signer(private_key) 

28 payload = {'some': 'payload'} 

29 encoded = jwt.encode(signer, payload) 

30 

31To decode a JWT and verify claims use :func:`decode`:: 

32 

33 claims = jwt.decode(encoded, certs=public_certs) 

34 

35You can also skip verification:: 

36 

37 claims = jwt.decode(encoded, verify=False) 

38 

39.. _rfc7519: https://tools.ietf.org/html/rfc7519 

40 

41""" 

42 

43try: 

44 from collections.abc import Mapping 

45# Python 2.7 compatibility 

46except ImportError: # pragma: NO COVER 

47 from collections import Mapping # type: ignore 

48import copy 

49import datetime 

50import json 

51import urllib 

52 

53from google.auth import _cache 

54from google.auth import _helpers 

55from google.auth import _regional_access_boundary_utils 

56from google.auth import _service_account_info 

57from google.auth import crypt 

58from google.auth import exceptions 

59import google.auth.credentials 

60 

61try: 

62 from google.auth.crypt import es 

63except ImportError: # pragma: NO COVER 

64 es = None # type: ignore 

65 

66_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

67_DEFAULT_MAX_CACHE_SIZE = 10 

68_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier} 

69_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256", "ES384"]) 

70 

71if es is not None: # pragma: NO COVER 

72 _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es.EsVerifier # type: ignore 

73 _ALGORITHM_TO_VERIFIER_CLASS["ES384"] = es.EsVerifier # type: ignore 

74 

75 

76def encode(signer, payload, header=None, key_id=None): 

77 """Make a signed JWT. 

78 

79 Args: 

80 signer (google.auth.crypt.Signer): The signer used to sign the JWT. 

81 payload (Mapping[str, str]): The JWT payload. 

82 header (Mapping[str, str]): Additional JWT header payload. 

83 key_id (str): The key id to add to the JWT header. If the 

84 signer has a key id it will be used as the default. If this is 

85 specified it will override the signer's key id. 

86 

87 Returns: 

88 bytes: The encoded JWT. 

89 """ 

90 if header is None: 

91 header = {} 

92 

93 if key_id is None: 

94 key_id = signer.key_id 

95 

96 header.update({"typ": "JWT"}) 

97 

98 if "alg" not in header: 

99 if es is not None and isinstance(signer, es.EsSigner): 

100 header.update({"alg": signer.algorithm}) 

101 else: 

102 header.update({"alg": "RS256"}) 

103 

104 if key_id is not None: 

105 header["kid"] = key_id 

106 

107 segments = [ 

108 _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")), 

109 _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")), 

110 ] 

111 

112 signing_input = b".".join(segments) 

113 signature = signer.sign(signing_input) 

114 segments.append(_helpers.unpadded_urlsafe_b64encode(signature)) 

115 

116 return b".".join(segments) 

117 

118 

119def _decode_jwt_segment(encoded_section): 

120 """Decodes a single JWT segment.""" 

121 section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section) 

122 try: 

123 return json.loads(section_bytes.decode("utf-8")) 

124 except ValueError as caught_exc: 

125 new_exc = exceptions.MalformedError( 

126 "Can't parse segment: {0}".format(section_bytes) 

127 ) 

128 raise new_exc from caught_exc 

129 

130 

131def _unverified_decode(token): 

132 """Decodes a token and does no verification. 

133 

134 Args: 

135 token (Union[str, bytes]): The encoded JWT. 

136 

137 Returns: 

138 Tuple[Mapping, Mapping, str, str]: header, payload, signed_section, and 

139 signature. 

140 

141 Raises: 

142 google.auth.exceptions.MalformedError: if there are an incorrect amount of segments in the token or segments of the wrong type. 

143 """ 

144 token = _helpers.to_bytes(token) 

145 

146 if token.count(b".") != 2: 

147 raise exceptions.MalformedError( 

148 "Wrong number of segments in token: {0}".format(token) 

149 ) 

150 

151 encoded_header, encoded_payload, signature = token.split(b".") 

152 signed_section = encoded_header + b"." + encoded_payload 

153 signature = _helpers.padded_urlsafe_b64decode(signature) 

154 

155 # Parse segments 

156 header = _decode_jwt_segment(encoded_header) 

157 payload = _decode_jwt_segment(encoded_payload) 

158 

159 if not isinstance(header, Mapping): 

160 raise exceptions.MalformedError( 

161 "Header segment should be a JSON object: {0}".format(encoded_header) 

162 ) 

163 

164 if not isinstance(payload, Mapping): 

165 raise exceptions.MalformedError( 

166 "Payload segment should be a JSON object: {0}".format(encoded_payload) 

167 ) 

168 

169 return header, payload, signed_section, signature 

170 

171 

172def decode_header(token): 

173 """Return the decoded header of a token. 

174 

175 No verification is done. This is useful to extract the key id from 

176 the header in order to acquire the appropriate certificate to verify 

177 the token. 

178 

179 Args: 

180 token (Union[str, bytes]): the encoded JWT. 

181 

182 Returns: 

183 Mapping: The decoded JWT header. 

184 """ 

185 header, _, _, _ = _unverified_decode(token) 

186 return header 

187 

188 

189def _verify_iat_and_exp(payload, clock_skew_in_seconds=0): 

190 """Verifies the ``iat`` (Issued At) and ``exp`` (Expires) claims in a token 

191 payload. 

192 

193 Args: 

194 payload (Mapping[str, str]): The JWT payload. 

195 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` 

196 validation. 

197 

198 Raises: 

199 google.auth.exceptions.InvalidValue: if value validation failed. 

200 google.auth.exceptions.MalformedError: if schema validation failed. 

201 """ 

202 now = _helpers.datetime_to_secs(_helpers.utcnow()) 

203 

204 # Make sure the iat and exp claims are present. 

205 for key in ("iat", "exp"): 

206 if key not in payload: 

207 raise exceptions.MalformedError( 

208 "Token does not contain required claim {}".format(key) 

209 ) 

210 

211 # Make sure the token wasn't issued in the future. 

212 iat = payload["iat"] 

213 # Err on the side of accepting a token that is slightly early to account 

214 # for clock skew. 

215 earliest = iat - clock_skew_in_seconds 

216 if now < earliest: 

217 raise exceptions.InvalidValue( 

218 "Token used too early, {} < {}. Check that your computer's clock is set correctly.".format( 

219 now, iat 

220 ) 

221 ) 

222 

223 # Make sure the token wasn't issued in the past. 

224 exp = payload["exp"] 

225 # Err on the side of accepting a token that is slightly out of date 

226 # to account for clow skew. 

227 latest = exp + clock_skew_in_seconds 

228 if latest < now: 

229 raise exceptions.InvalidValue("Token expired, {} < {}".format(latest, now)) 

230 

231 

232def decode(token, certs=None, verify=True, audience=None, clock_skew_in_seconds=0): 

233 """Decode and verify a JWT. 

234 

235 Args: 

236 token (str): The encoded JWT. 

237 certs (Union[str, bytes, Mapping[str, Union[str, bytes]]]): The 

238 certificate used to validate the JWT signature. If bytes or string, 

239 it must the the public key certificate in PEM format. If a mapping, 

240 it must be a mapping of key IDs to public key certificates in PEM 

241 format. The mapping must contain the same key ID that's specified 

242 in the token's header. 

243 verify (bool): Whether to perform signature and claim validation. 

244 Verification is done by default. 

245 audience (str or list): The audience claim, 'aud', that this JWT should 

246 contain. Or a list of audience claims. If None then the JWT's 'aud' 

247 parameter is not verified. 

248 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` 

249 validation. 

250 

251 Returns: 

252 Mapping[str, str]: The deserialized JSON payload in the JWT. 

253 

254 Raises: 

255 google.auth.exceptions.InvalidValue: if value validation failed. 

256 google.auth.exceptions.MalformedError: if schema validation failed. 

257 """ 

258 header, payload, signed_section, signature = _unverified_decode(token) 

259 

260 if not verify: 

261 return payload 

262 

263 # Pluck the key id and algorithm from the header and make sure we have 

264 # a verifier that can support it. 

265 key_alg = header.get("alg") 

266 key_id = header.get("kid") 

267 

268 try: 

269 verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg] 

270 except KeyError as exc: 

271 if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS: 

272 raise exceptions.InvalidValue( 

273 "The key algorithm {} requires the cryptography package to be installed.".format( 

274 key_alg 

275 ) 

276 ) from exc 

277 else: 

278 raise exceptions.InvalidValue( 

279 "Unsupported signature algorithm {}".format(key_alg) 

280 ) from exc 

281 # If certs is specified as a dictionary of key IDs to certificates, then 

282 # use the certificate identified by the key ID in the token header. 

283 if isinstance(certs, Mapping): 

284 if key_id: 

285 if key_id not in certs: 

286 raise exceptions.MalformedError( 

287 "Certificate for key id {} not found.".format(key_id) 

288 ) 

289 certs_to_check = [certs[key_id]] 

290 # If there's no key id in the header, check against all of the certs. 

291 else: 

292 certs_to_check = certs.values() 

293 else: 

294 certs_to_check = certs 

295 

296 # Verify that the signature matches the message. 

297 if not crypt.verify_signature( 

298 signed_section, signature, certs_to_check, verifier_cls 

299 ): 

300 raise exceptions.MalformedError("Could not verify token signature.") 

301 

302 # Verify the issued at and created times in the payload. 

303 _verify_iat_and_exp(payload, clock_skew_in_seconds) 

304 

305 # Check audience. 

306 if audience is not None: 

307 claim_audience = payload.get("aud") 

308 if isinstance(audience, str): 

309 audience = [audience] 

310 if claim_audience not in audience: 

311 raise exceptions.InvalidValue( 

312 "Token has wrong audience {}, expected one of {}".format( 

313 claim_audience, audience 

314 ) 

315 ) 

316 

317 return payload 

318 

319 

320class Credentials( 

321 google.auth.credentials.Signing, 

322 google.auth.credentials.CredentialsWithQuotaProject, 

323 google.auth.credentials.CredentialsWithRegionalAccessBoundary, 

324): 

325 """Credentials that use a JWT as the bearer token. 

326 

327 These credentials require an "audience" claim. This claim identifies the 

328 intended recipient of the bearer token. 

329 

330 The constructor arguments determine the claims for the JWT that is 

331 sent with requests. Usually, you'll construct these credentials with 

332 one of the helper constructors as shown in the next section. 

333 

334 To create JWT credentials using a Google service account private key 

335 JSON file:: 

336 

337 audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher' 

338 credentials = jwt.Credentials.from_service_account_file( 

339 'service-account.json', 

340 audience=audience) 

341 

342 If you already have the service account file loaded and parsed:: 

343 

344 service_account_info = json.load(open('service_account.json')) 

345 credentials = jwt.Credentials.from_service_account_info( 

346 service_account_info, 

347 audience=audience) 

348 

349 Both helper methods pass on arguments to the constructor, so you can 

350 specify the JWT claims:: 

351 

352 credentials = jwt.Credentials.from_service_account_file( 

353 'service-account.json', 

354 audience=audience, 

355 additional_claims={'meta': 'data'}) 

356 

357 You can also construct the credentials directly if you have a 

358 :class:`~google.auth.crypt.Signer` instance:: 

359 

360 credentials = jwt.Credentials( 

361 signer, 

362 issuer='your-issuer', 

363 subject='your-subject', 

364 audience=audience) 

365 

366 The claims are considered immutable. If you want to modify the claims, 

367 you can easily create another instance using :meth:`with_claims`:: 

368 

369 new_audience = ( 

370 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber') 

371 new_credentials = credentials.with_claims(audience=new_audience) 

372 """ 

373 

374 def __init__( 

375 self, 

376 signer, 

377 issuer, 

378 subject, 

379 audience, 

380 additional_claims=None, 

381 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

382 quota_project_id=None, 

383 ): 

384 """ 

385 Args: 

386 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

387 issuer (str): The `iss` claim. 

388 subject (str): The `sub` claim. 

389 audience (str): the `aud` claim. The intended audience for the 

390 credentials. 

391 additional_claims (Mapping[str, str]): Any additional claims for 

392 the JWT payload. 

393 token_lifetime (int): The amount of time in seconds for 

394 which the token is valid. Defaults to 1 hour. 

395 quota_project_id (Optional[str]): The project ID used for quota 

396 and billing. 

397 """ 

398 super(Credentials, self).__init__() 

399 self._signer = signer 

400 self._issuer = issuer 

401 self._subject = subject 

402 self._audience = audience 

403 self._token_lifetime = token_lifetime 

404 self._quota_project_id = quota_project_id 

405 

406 if additional_claims is None: 

407 additional_claims = {} 

408 

409 self._additional_claims = additional_claims 

410 

411 @classmethod 

412 def _from_signer_and_info(cls, signer, info, **kwargs): 

413 """Creates a Credentials instance from a signer and service account 

414 info. 

415 

416 Args: 

417 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

418 info (Mapping[str, str]): The service account info. 

419 kwargs: Additional arguments to pass to the constructor. 

420 

421 Returns: 

422 google.auth.jwt.Credentials: The constructed credentials. 

423 

424 Raises: 

425 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

426 """ 

427 kwargs.setdefault("subject", info["client_email"]) 

428 kwargs.setdefault("issuer", info["client_email"]) 

429 return cls(signer, **kwargs) 

430 

431 @classmethod 

432 def from_service_account_info(cls, info, **kwargs): 

433 """Creates an Credentials instance from a dictionary. 

434 

435 Args: 

436 info (Mapping[str, str]): The service account info in Google 

437 format. 

438 kwargs: Additional arguments to pass to the constructor. 

439 

440 Returns: 

441 google.auth.jwt.Credentials: The constructed credentials. 

442 

443 Raises: 

444 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

445 """ 

446 signer = _service_account_info.from_dict(info, require=["client_email"]) 

447 return cls._from_signer_and_info(signer, info, **kwargs) 

448 

449 @classmethod 

450 def from_service_account_file(cls, filename, **kwargs): 

451 """Creates a Credentials instance from a service account .json file 

452 in Google format. 

453 

454 Args: 

455 filename (str): The path to the service account .json file. 

456 kwargs: Additional arguments to pass to the constructor. 

457 

458 Returns: 

459 google.auth.jwt.Credentials: The constructed credentials. 

460 """ 

461 info, signer = _service_account_info.from_filename( 

462 filename, require=["client_email"] 

463 ) 

464 return cls._from_signer_and_info(signer, info, **kwargs) 

465 

466 @classmethod 

467 def from_signing_credentials(cls, credentials, audience, **kwargs): 

468 """Creates a new :class:`google.auth.jwt.Credentials` instance from an 

469 existing :class:`google.auth.credentials.Signing` instance. 

470 

471 The new instance will use the same signer as the existing instance and 

472 will use the existing instance's signer email as the issuer and 

473 subject by default. 

474 

475 Example:: 

476 

477 svc_creds = service_account.Credentials.from_service_account_file( 

478 'service_account.json') 

479 audience = ( 

480 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher') 

481 jwt_creds = jwt.Credentials.from_signing_credentials( 

482 svc_creds, audience=audience) 

483 

484 Args: 

485 credentials (google.auth.credentials.Signing): The credentials to 

486 use to construct the new credentials. 

487 audience (str): the `aud` claim. The intended audience for the 

488 credentials. 

489 kwargs: Additional arguments to pass to the constructor. 

490 

491 Returns: 

492 google.auth.jwt.Credentials: A new Credentials instance. 

493 """ 

494 kwargs.setdefault("issuer", credentials.signer_email) 

495 kwargs.setdefault("subject", credentials.signer_email) 

496 jwt_creds = cls(credentials.signer, audience=audience, **kwargs) 

497 

498 if isinstance( 

499 credentials, 

500 google.auth.credentials.CredentialsWithRegionalAccessBoundary, 

501 ): 

502 credentials._copy_regional_access_boundary_manager(jwt_creds) 

503 

504 return jwt_creds 

505 

506 def with_claims( 

507 self, issuer=None, subject=None, audience=None, additional_claims=None 

508 ): 

509 """Returns a copy of these credentials with modified claims. 

510 

511 Args: 

512 issuer (str): The `iss` claim. If unspecified the current issuer 

513 claim will be used. 

514 subject (str): The `sub` claim. If unspecified the current subject 

515 claim will be used. 

516 audience (str): the `aud` claim. If unspecified the current 

517 audience claim will be used. 

518 additional_claims (Mapping[str, str]): Any additional claims for 

519 the JWT payload. This will be merged with the current 

520 additional claims. 

521 

522 Returns: 

523 google.auth.jwt.Credentials: A new credentials instance. 

524 """ 

525 new_additional_claims = copy.deepcopy(self._additional_claims) 

526 new_additional_claims.update(additional_claims or {}) 

527 

528 cred = self.__class__( 

529 self._signer, 

530 issuer=issuer if issuer is not None else self._issuer, 

531 subject=subject if subject is not None else self._subject, 

532 audience=audience if audience is not None else self._audience, 

533 additional_claims=new_additional_claims, 

534 quota_project_id=self._quota_project_id, 

535 ) 

536 self._copy_regional_access_boundary_manager(cred) 

537 return cred 

538 

539 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject) 

540 def with_quota_project(self, quota_project_id): 

541 cred = self.__class__( 

542 self._signer, 

543 issuer=self._issuer, 

544 subject=self._subject, 

545 audience=self._audience, 

546 additional_claims=self._additional_claims, 

547 quota_project_id=quota_project_id, 

548 ) 

549 self._copy_regional_access_boundary_manager(cred) 

550 return cred 

551 

552 def _make_jwt(self): 

553 """Make a signed JWT. 

554 

555 Returns: 

556 Tuple[bytes, datetime]: The encoded JWT and the expiration. 

557 """ 

558 now = _helpers.utcnow() 

559 lifetime = datetime.timedelta(seconds=self._token_lifetime) 

560 expiry = now + lifetime 

561 

562 payload = { 

563 "iss": self._issuer, 

564 "sub": self._subject, 

565 "iat": _helpers.datetime_to_secs(now), 

566 "exp": _helpers.datetime_to_secs(expiry), 

567 } 

568 if self._audience: 

569 payload["aud"] = self._audience 

570 

571 payload.update(self._additional_claims) 

572 

573 jwt = encode(self._signer, payload) 

574 

575 return jwt, expiry 

576 

577 def _perform_refresh_token(self, request): 

578 """Refreshes the access token. 

579 

580 Args: 

581 request (Any): Unused. 

582 """ 

583 # pylint: disable=unused-argument 

584 # (pylint doesn't correctly recognize overridden methods.) 

585 self.token, self.expiry = self._make_jwt() 

586 

587 def _build_regional_access_boundary_lookup_url(self, request=None): 

588 """Builds the lookup URL using the service account's email address.""" 

589 if not self.signer_email: 

590 return None 

591 

592 return _regional_access_boundary_utils.get_service_account_rab_endpoint( 

593 self.signer_email 

594 ) 

595 

596 @_helpers.copy_docstring(google.auth.credentials.Signing) 

597 def sign_bytes(self, message): 

598 return self._signer.sign(message) 

599 

600 @property # type: ignore 

601 @_helpers.copy_docstring(google.auth.credentials.Signing) 

602 def signer_email(self): 

603 return self._issuer 

604 

605 @property # type: ignore 

606 @_helpers.copy_docstring(google.auth.credentials.Signing) 

607 def signer(self): 

608 return self._signer 

609 

610 @property # type: ignore 

611 def additional_claims(self): 

612 """Additional claims the JWT object was created with.""" 

613 return self._additional_claims 

614 

615 

616class OnDemandCredentials( 

617 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject 

618): 

619 """On-demand JWT credentials. 

620 

621 Like :class:`Credentials`, this class uses a JWT as the bearer token for 

622 authentication. However, this class does not require the audience at 

623 construction time. Instead, it will generate a new token on-demand for 

624 each request using the request URI as the audience. It caches tokens 

625 so that multiple requests to the same URI do not incur the overhead 

626 of generating a new token every time. 

627 

628 This behavior is especially useful for `gRPC`_ clients. A gRPC service may 

629 have multiple audience and gRPC clients may not know all of the audiences 

630 required for accessing a particular service. With these credentials, 

631 no knowledge of the audiences is required ahead of time. 

632 

633 .. _grpc: http://www.grpc.io/ 

634 """ 

635 

636 def __init__( 

637 self, 

638 signer, 

639 issuer, 

640 subject, 

641 additional_claims=None, 

642 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

643 max_cache_size=_DEFAULT_MAX_CACHE_SIZE, 

644 quota_project_id=None, 

645 ): 

646 """ 

647 Args: 

648 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

649 issuer (str): The `iss` claim. 

650 subject (str): The `sub` claim. 

651 additional_claims (Mapping[str, str]): Any additional claims for 

652 the JWT payload. 

653 token_lifetime (int): The amount of time in seconds for 

654 which the token is valid. Defaults to 1 hour. 

655 max_cache_size (int): The maximum number of JWT tokens to keep in 

656 cache. Tokens are cached using :class:`google.auth._cache.LRUCache`. 

657 quota_project_id (Optional[str]): The project ID used for quota 

658 and billing. 

659 

660 """ 

661 super(OnDemandCredentials, self).__init__() 

662 self._signer = signer 

663 self._issuer = issuer 

664 self._subject = subject 

665 self._token_lifetime = token_lifetime 

666 self._quota_project_id = quota_project_id 

667 

668 if additional_claims is None: 

669 additional_claims = {} 

670 

671 self._additional_claims = additional_claims 

672 self._cache = _cache.LRUCache(maxsize=max_cache_size) 

673 

674 @classmethod 

675 def _from_signer_and_info(cls, signer, info, **kwargs): 

676 """Creates an OnDemandCredentials instance from a signer and service 

677 account info. 

678 

679 Args: 

680 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

681 info (Mapping[str, str]): The service account info. 

682 kwargs: Additional arguments to pass to the constructor. 

683 

684 Returns: 

685 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

686 

687 Raises: 

688 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

689 """ 

690 kwargs.setdefault("subject", info["client_email"]) 

691 kwargs.setdefault("issuer", info["client_email"]) 

692 return cls(signer, **kwargs) 

693 

694 @classmethod 

695 def from_service_account_info(cls, info, **kwargs): 

696 """Creates an OnDemandCredentials instance from a dictionary. 

697 

698 Args: 

699 info (Mapping[str, str]): The service account info in Google 

700 format. 

701 kwargs: Additional arguments to pass to the constructor. 

702 

703 Returns: 

704 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

705 

706 Raises: 

707 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

708 """ 

709 signer = _service_account_info.from_dict(info, require=["client_email"]) 

710 return cls._from_signer_and_info(signer, info, **kwargs) 

711 

712 @classmethod 

713 def from_service_account_file(cls, filename, **kwargs): 

714 """Creates an OnDemandCredentials instance from a service account .json 

715 file in Google format. 

716 

717 Args: 

718 filename (str): The path to the service account .json file. 

719 kwargs: Additional arguments to pass to the constructor. 

720 

721 Returns: 

722 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

723 """ 

724 info, signer = _service_account_info.from_filename( 

725 filename, require=["client_email"] 

726 ) 

727 return cls._from_signer_and_info(signer, info, **kwargs) 

728 

729 @classmethod 

730 def from_signing_credentials(cls, credentials, **kwargs): 

731 """Creates a new :class:`google.auth.jwt.OnDemandCredentials` instance 

732 from an existing :class:`google.auth.credentials.Signing` instance. 

733 

734 The new instance will use the same signer as the existing instance and 

735 will use the existing instance's signer email as the issuer and 

736 subject by default. 

737 

738 Example:: 

739 

740 svc_creds = service_account.Credentials.from_service_account_file( 

741 'service_account.json') 

742 jwt_creds = jwt.OnDemandCredentials.from_signing_credentials( 

743 svc_creds) 

744 

745 Args: 

746 credentials (google.auth.credentials.Signing): The credentials to 

747 use to construct the new credentials. 

748 kwargs: Additional arguments to pass to the constructor. 

749 

750 Returns: 

751 google.auth.jwt.Credentials: A new Credentials instance. 

752 """ 

753 kwargs.setdefault("issuer", credentials.signer_email) 

754 kwargs.setdefault("subject", credentials.signer_email) 

755 return cls(credentials.signer, **kwargs) 

756 

757 def with_claims(self, issuer=None, subject=None, additional_claims=None): 

758 """Returns a copy of these credentials with modified claims. 

759 

760 Args: 

761 issuer (str): The `iss` claim. If unspecified the current issuer 

762 claim will be used. 

763 subject (str): The `sub` claim. If unspecified the current subject 

764 claim will be used. 

765 additional_claims (Mapping[str, str]): Any additional claims for 

766 the JWT payload. This will be merged with the current 

767 additional claims. 

768 

769 Returns: 

770 google.auth.jwt.OnDemandCredentials: A new credentials instance. 

771 """ 

772 new_additional_claims = copy.deepcopy(self._additional_claims) 

773 new_additional_claims.update(additional_claims or {}) 

774 

775 return self.__class__( 

776 self._signer, 

777 issuer=issuer if issuer is not None else self._issuer, 

778 subject=subject if subject is not None else self._subject, 

779 additional_claims=new_additional_claims, 

780 max_cache_size=self._cache.maxsize, 

781 quota_project_id=self._quota_project_id, 

782 ) 

783 

784 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject) 

785 def with_quota_project(self, quota_project_id): 

786 return self.__class__( 

787 self._signer, 

788 issuer=self._issuer, 

789 subject=self._subject, 

790 additional_claims=self._additional_claims, 

791 max_cache_size=self._cache.maxsize, 

792 quota_project_id=quota_project_id, 

793 ) 

794 

795 @property 

796 def valid(self): 

797 """Checks the validity of the credentials. 

798 

799 These credentials are always valid because it generates tokens on 

800 demand. 

801 """ 

802 return True 

803 

804 def _make_jwt_for_audience(self, audience): 

805 """Make a new JWT for the given audience. 

806 

807 Args: 

808 audience (str): The intended audience. 

809 

810 Returns: 

811 Tuple[bytes, datetime]: The encoded JWT and the expiration. 

812 """ 

813 now = _helpers.utcnow() 

814 lifetime = datetime.timedelta(seconds=self._token_lifetime) 

815 expiry = now + lifetime 

816 

817 payload = { 

818 "iss": self._issuer, 

819 "sub": self._subject, 

820 "iat": _helpers.datetime_to_secs(now), 

821 "exp": _helpers.datetime_to_secs(expiry), 

822 "aud": audience, 

823 } 

824 

825 payload.update(self._additional_claims) 

826 

827 jwt = encode(self._signer, payload) 

828 

829 return jwt, expiry 

830 

831 def _get_jwt_for_audience(self, audience): 

832 """Get a JWT For a given audience. 

833 

834 If there is already an existing, non-expired token in the cache for 

835 the audience, that token is used. Otherwise, a new token will be 

836 created. 

837 

838 Args: 

839 audience (str): The intended audience. 

840 

841 Returns: 

842 bytes: The encoded JWT. 

843 """ 

844 token, expiry = self._cache.get(audience, (None, None)) 

845 

846 if token is None or expiry < _helpers.utcnow(): 

847 token, expiry = self._make_jwt_for_audience(audience) 

848 self._cache[audience] = token, expiry 

849 

850 return token 

851 

852 def refresh(self, request): 

853 """Raises an exception, these credentials can not be directly 

854 refreshed. 

855 

856 Args: 

857 request (Any): Unused. 

858 

859 Raises: 

860 google.auth.RefreshError 

861 """ 

862 # pylint: disable=unused-argument 

863 # (pylint doesn't correctly recognize overridden methods.) 

864 raise exceptions.RefreshError( 

865 "OnDemandCredentials can not be directly refreshed." 

866 ) 

867 

868 def before_request(self, request, method, url, headers): 

869 """Performs credential-specific before request logic. 

870 

871 Args: 

872 request (Any): Unused. JWT credentials do not need to make an 

873 HTTP request to refresh. 

874 method (str): The request's HTTP method. 

875 url (str): The request's URI. This is used as the audience claim 

876 when generating the JWT. 

877 headers (Mapping): The request's headers. 

878 """ 

879 # pylint: disable=unused-argument 

880 # (pylint doesn't correctly recognize overridden methods.) 

881 parts = urllib.parse.urlsplit(url) 

882 # Strip query string and fragment 

883 audience = urllib.parse.urlunsplit( 

884 (parts.scheme, parts.netloc, parts.path, "", "") 

885 ) 

886 token = self._get_jwt_for_audience(audience) 

887 self.apply(headers, token=token) 

888 

889 @_helpers.copy_docstring(google.auth.credentials.Signing) 

890 def sign_bytes(self, message): 

891 return self._signer.sign(message) 

892 

893 @property # type: ignore 

894 @_helpers.copy_docstring(google.auth.credentials.Signing) 

895 def signer_email(self): 

896 return self._issuer 

897 

898 @property # type: ignore 

899 @_helpers.copy_docstring(google.auth.credentials.Signing) 

900 def signer(self): 

901 return self._signer