Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/jwt.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

238 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""JSON Web Tokens 

16 

17Provides support for creating (encoding) and verifying (decoding) JWTs, 

18especially JWTs generated and consumed by Google infrastructure. 

19 

20See `rfc7519`_ for more details on JWTs. 

21 

22To encode a JWT use :func:`encode`:: 

23 

24 from google.auth import crypt 

25 from google.auth import jwt 

26 

27 signer = crypt.Signer(private_key) 

28 payload = {'some': 'payload'} 

29 encoded = jwt.encode(signer, payload) 

30 

31To decode a JWT and verify claims use :func:`decode`:: 

32 

33 claims = jwt.decode(encoded, certs=public_certs) 

34 

35You can also skip verification:: 

36 

37 claims = jwt.decode(encoded, verify=False) 

38 

39.. _rfc7519: https://tools.ietf.org/html/rfc7519 

40 

41""" 

42 

43try: 

44 from collections.abc import Mapping 

45# Python 2.7 compatibility 

46except ImportError: # pragma: NO COVER 

47 from collections import Mapping # type: ignore 

48import copy 

49import datetime 

50import json 

51import urllib 

52 

53import cachetools 

54 

55from google.auth import _helpers 

56from google.auth import _service_account_info 

57from google.auth import crypt 

58from google.auth import exceptions 

59import google.auth.credentials 

60 

61try: 

62 from google.auth.crypt import es 

63except ImportError: # pragma: NO COVER 

64 es = None # type: ignore 

65 

66_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

67_DEFAULT_MAX_CACHE_SIZE = 10 

68_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier} 

69_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256", "ES384"]) 

70 

71if es is not None: # pragma: NO COVER 

72 _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es.EsVerifier # type: ignore 

73 _ALGORITHM_TO_VERIFIER_CLASS["ES384"] = es.EsVerifier # type: ignore 

74 

75 

76def encode(signer, payload, header=None, key_id=None): 

77 """Make a signed JWT. 

78 

79 Args: 

80 signer (google.auth.crypt.Signer): The signer used to sign the JWT. 

81 payload (Mapping[str, str]): The JWT payload. 

82 header (Mapping[str, str]): Additional JWT header payload. 

83 key_id (str): The key id to add to the JWT header. If the 

84 signer has a key id it will be used as the default. If this is 

85 specified it will override the signer's key id. 

86 

87 Returns: 

88 bytes: The encoded JWT. 

89 """ 

90 if header is None: 

91 header = {} 

92 

93 if key_id is None: 

94 key_id = signer.key_id 

95 

96 header.update({"typ": "JWT"}) 

97 

98 if "alg" not in header: 

99 if es is not None and isinstance(signer, es.EsSigner): 

100 header.update({"alg": signer.algorithm}) 

101 else: 

102 header.update({"alg": "RS256"}) 

103 

104 if key_id is not None: 

105 header["kid"] = key_id 

106 

107 segments = [ 

108 _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")), 

109 _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")), 

110 ] 

111 

112 signing_input = b".".join(segments) 

113 signature = signer.sign(signing_input) 

114 segments.append(_helpers.unpadded_urlsafe_b64encode(signature)) 

115 

116 return b".".join(segments) 

117 

118 

119def _decode_jwt_segment(encoded_section): 

120 """Decodes a single JWT segment.""" 

121 section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section) 

122 try: 

123 return json.loads(section_bytes.decode("utf-8")) 

124 except ValueError as caught_exc: 

125 new_exc = exceptions.MalformedError( 

126 "Can't parse segment: {0}".format(section_bytes) 

127 ) 

128 raise new_exc from caught_exc 

129 

130 

131def _unverified_decode(token): 

132 """Decodes a token and does no verification. 

133 

134 Args: 

135 token (Union[str, bytes]): The encoded JWT. 

136 

137 Returns: 

138 Tuple[Mapping, Mapping, str, str]: header, payload, signed_section, and 

139 signature. 

140 

141 Raises: 

142 google.auth.exceptions.MalformedError: if there are an incorrect amount of segments in the token or segments of the wrong type. 

143 """ 

144 token = _helpers.to_bytes(token) 

145 

146 if token.count(b".") != 2: 

147 raise exceptions.MalformedError( 

148 "Wrong number of segments in token: {0}".format(token) 

149 ) 

150 

151 encoded_header, encoded_payload, signature = token.split(b".") 

152 signed_section = encoded_header + b"." + encoded_payload 

153 signature = _helpers.padded_urlsafe_b64decode(signature) 

154 

155 # Parse segments 

156 header = _decode_jwt_segment(encoded_header) 

157 payload = _decode_jwt_segment(encoded_payload) 

158 

159 if not isinstance(header, Mapping): 

160 raise exceptions.MalformedError( 

161 "Header segment should be a JSON object: {0}".format(encoded_header) 

162 ) 

163 

164 if not isinstance(payload, Mapping): 

165 raise exceptions.MalformedError( 

166 "Payload segment should be a JSON object: {0}".format(encoded_payload) 

167 ) 

168 

169 return header, payload, signed_section, signature 

170 

171 

172def decode_header(token): 

173 """Return the decoded header of a token. 

174 

175 No verification is done. This is useful to extract the key id from 

176 the header in order to acquire the appropriate certificate to verify 

177 the token. 

178 

179 Args: 

180 token (Union[str, bytes]): the encoded JWT. 

181 

182 Returns: 

183 Mapping: The decoded JWT header. 

184 """ 

185 header, _, _, _ = _unverified_decode(token) 

186 return header 

187 

188 

189def _verify_iat_and_exp(payload, clock_skew_in_seconds=0): 

190 """Verifies the ``iat`` (Issued At) and ``exp`` (Expires) claims in a token 

191 payload. 

192 

193 Args: 

194 payload (Mapping[str, str]): The JWT payload. 

195 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` 

196 validation. 

197 

198 Raises: 

199 google.auth.exceptions.InvalidValue: if value validation failed. 

200 google.auth.exceptions.MalformedError: if schema validation failed. 

201 """ 

202 now = _helpers.datetime_to_secs(_helpers.utcnow()) 

203 

204 # Make sure the iat and exp claims are present. 

205 for key in ("iat", "exp"): 

206 if key not in payload: 

207 raise exceptions.MalformedError( 

208 "Token does not contain required claim {}".format(key) 

209 ) 

210 

211 # Make sure the token wasn't issued in the future. 

212 iat = payload["iat"] 

213 # Err on the side of accepting a token that is slightly early to account 

214 # for clock skew. 

215 earliest = iat - clock_skew_in_seconds 

216 if now < earliest: 

217 raise exceptions.InvalidValue( 

218 "Token used too early, {} < {}. Check that your computer's clock is set correctly.".format( 

219 now, iat 

220 ) 

221 ) 

222 

223 # Make sure the token wasn't issued in the past. 

224 exp = payload["exp"] 

225 # Err on the side of accepting a token that is slightly out of date 

226 # to account for clow skew. 

227 latest = exp + clock_skew_in_seconds 

228 if latest < now: 

229 raise exceptions.InvalidValue("Token expired, {} < {}".format(latest, now)) 

230 

231 

232def decode(token, certs=None, verify=True, audience=None, clock_skew_in_seconds=0): 

233 """Decode and verify a JWT. 

234 

235 Args: 

236 token (str): The encoded JWT. 

237 certs (Union[str, bytes, Mapping[str, Union[str, bytes]]]): The 

238 certificate used to validate the JWT signature. If bytes or string, 

239 it must the the public key certificate in PEM format. If a mapping, 

240 it must be a mapping of key IDs to public key certificates in PEM 

241 format. The mapping must contain the same key ID that's specified 

242 in the token's header. 

243 verify (bool): Whether to perform signature and claim validation. 

244 Verification is done by default. 

245 audience (str or list): The audience claim, 'aud', that this JWT should 

246 contain. Or a list of audience claims. If None then the JWT's 'aud' 

247 parameter is not verified. 

248 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` 

249 validation. 

250 

251 Returns: 

252 Mapping[str, str]: The deserialized JSON payload in the JWT. 

253 

254 Raises: 

255 google.auth.exceptions.InvalidValue: if value validation failed. 

256 google.auth.exceptions.MalformedError: if schema validation failed. 

257 """ 

258 header, payload, signed_section, signature = _unverified_decode(token) 

259 

260 if not verify: 

261 return payload 

262 

263 # Pluck the key id and algorithm from the header and make sure we have 

264 # a verifier that can support it. 

265 key_alg = header.get("alg") 

266 key_id = header.get("kid") 

267 

268 try: 

269 verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg] 

270 except KeyError as exc: 

271 if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS: 

272 raise exceptions.InvalidValue( 

273 "The key algorithm {} requires the cryptography package to be installed.".format( 

274 key_alg 

275 ) 

276 ) from exc 

277 else: 

278 raise exceptions.InvalidValue( 

279 "Unsupported signature algorithm {}".format(key_alg) 

280 ) from exc 

281 # If certs is specified as a dictionary of key IDs to certificates, then 

282 # use the certificate identified by the key ID in the token header. 

283 if isinstance(certs, Mapping): 

284 if key_id: 

285 if key_id not in certs: 

286 raise exceptions.MalformedError( 

287 "Certificate for key id {} not found.".format(key_id) 

288 ) 

289 certs_to_check = [certs[key_id]] 

290 # If there's no key id in the header, check against all of the certs. 

291 else: 

292 certs_to_check = certs.values() 

293 else: 

294 certs_to_check = certs 

295 

296 # Verify that the signature matches the message. 

297 if not crypt.verify_signature( 

298 signed_section, signature, certs_to_check, verifier_cls 

299 ): 

300 raise exceptions.MalformedError("Could not verify token signature.") 

301 

302 # Verify the issued at and created times in the payload. 

303 _verify_iat_and_exp(payload, clock_skew_in_seconds) 

304 

305 # Check audience. 

306 if audience is not None: 

307 claim_audience = payload.get("aud") 

308 if isinstance(audience, str): 

309 audience = [audience] 

310 if claim_audience not in audience: 

311 raise exceptions.InvalidValue( 

312 "Token has wrong audience {}, expected one of {}".format( 

313 claim_audience, audience 

314 ) 

315 ) 

316 

317 return payload 

318 

319 

320class Credentials( 

321 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject 

322): 

323 """Credentials that use a JWT as the bearer token. 

324 

325 These credentials require an "audience" claim. This claim identifies the 

326 intended recipient of the bearer token. 

327 

328 The constructor arguments determine the claims for the JWT that is 

329 sent with requests. Usually, you'll construct these credentials with 

330 one of the helper constructors as shown in the next section. 

331 

332 To create JWT credentials using a Google service account private key 

333 JSON file:: 

334 

335 audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher' 

336 credentials = jwt.Credentials.from_service_account_file( 

337 'service-account.json', 

338 audience=audience) 

339 

340 If you already have the service account file loaded and parsed:: 

341 

342 service_account_info = json.load(open('service_account.json')) 

343 credentials = jwt.Credentials.from_service_account_info( 

344 service_account_info, 

345 audience=audience) 

346 

347 Both helper methods pass on arguments to the constructor, so you can 

348 specify the JWT claims:: 

349 

350 credentials = jwt.Credentials.from_service_account_file( 

351 'service-account.json', 

352 audience=audience, 

353 additional_claims={'meta': 'data'}) 

354 

355 You can also construct the credentials directly if you have a 

356 :class:`~google.auth.crypt.Signer` instance:: 

357 

358 credentials = jwt.Credentials( 

359 signer, 

360 issuer='your-issuer', 

361 subject='your-subject', 

362 audience=audience) 

363 

364 The claims are considered immutable. If you want to modify the claims, 

365 you can easily create another instance using :meth:`with_claims`:: 

366 

367 new_audience = ( 

368 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber') 

369 new_credentials = credentials.with_claims(audience=new_audience) 

370 """ 

371 

372 def __init__( 

373 self, 

374 signer, 

375 issuer, 

376 subject, 

377 audience, 

378 additional_claims=None, 

379 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

380 quota_project_id=None, 

381 ): 

382 """ 

383 Args: 

384 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

385 issuer (str): The `iss` claim. 

386 subject (str): The `sub` claim. 

387 audience (str): the `aud` claim. The intended audience for the 

388 credentials. 

389 additional_claims (Mapping[str, str]): Any additional claims for 

390 the JWT payload. 

391 token_lifetime (int): The amount of time in seconds for 

392 which the token is valid. Defaults to 1 hour. 

393 quota_project_id (Optional[str]): The project ID used for quota 

394 and billing. 

395 """ 

396 super(Credentials, self).__init__() 

397 self._signer = signer 

398 self._issuer = issuer 

399 self._subject = subject 

400 self._audience = audience 

401 self._token_lifetime = token_lifetime 

402 self._quota_project_id = quota_project_id 

403 

404 if additional_claims is None: 

405 additional_claims = {} 

406 

407 self._additional_claims = additional_claims 

408 

409 @classmethod 

410 def _from_signer_and_info(cls, signer, info, **kwargs): 

411 """Creates a Credentials instance from a signer and service account 

412 info. 

413 

414 Args: 

415 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

416 info (Mapping[str, str]): The service account info. 

417 kwargs: Additional arguments to pass to the constructor. 

418 

419 Returns: 

420 google.auth.jwt.Credentials: The constructed credentials. 

421 

422 Raises: 

423 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

424 """ 

425 kwargs.setdefault("subject", info["client_email"]) 

426 kwargs.setdefault("issuer", info["client_email"]) 

427 return cls(signer, **kwargs) 

428 

429 @classmethod 

430 def from_service_account_info(cls, info, **kwargs): 

431 """Creates an Credentials instance from a dictionary. 

432 

433 Args: 

434 info (Mapping[str, str]): The service account info in Google 

435 format. 

436 kwargs: Additional arguments to pass to the constructor. 

437 

438 Returns: 

439 google.auth.jwt.Credentials: The constructed credentials. 

440 

441 Raises: 

442 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

443 """ 

444 signer = _service_account_info.from_dict(info, require=["client_email"]) 

445 return cls._from_signer_and_info(signer, info, **kwargs) 

446 

447 @classmethod 

448 def from_service_account_file(cls, filename, **kwargs): 

449 """Creates a Credentials instance from a service account .json file 

450 in Google format. 

451 

452 Args: 

453 filename (str): The path to the service account .json file. 

454 kwargs: Additional arguments to pass to the constructor. 

455 

456 Returns: 

457 google.auth.jwt.Credentials: The constructed credentials. 

458 """ 

459 info, signer = _service_account_info.from_filename( 

460 filename, require=["client_email"] 

461 ) 

462 return cls._from_signer_and_info(signer, info, **kwargs) 

463 

464 @classmethod 

465 def from_signing_credentials(cls, credentials, audience, **kwargs): 

466 """Creates a new :class:`google.auth.jwt.Credentials` instance from an 

467 existing :class:`google.auth.credentials.Signing` instance. 

468 

469 The new instance will use the same signer as the existing instance and 

470 will use the existing instance's signer email as the issuer and 

471 subject by default. 

472 

473 Example:: 

474 

475 svc_creds = service_account.Credentials.from_service_account_file( 

476 'service_account.json') 

477 audience = ( 

478 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher') 

479 jwt_creds = jwt.Credentials.from_signing_credentials( 

480 svc_creds, audience=audience) 

481 

482 Args: 

483 credentials (google.auth.credentials.Signing): The credentials to 

484 use to construct the new credentials. 

485 audience (str): the `aud` claim. The intended audience for the 

486 credentials. 

487 kwargs: Additional arguments to pass to the constructor. 

488 

489 Returns: 

490 google.auth.jwt.Credentials: A new Credentials instance. 

491 """ 

492 kwargs.setdefault("issuer", credentials.signer_email) 

493 kwargs.setdefault("subject", credentials.signer_email) 

494 return cls(credentials.signer, audience=audience, **kwargs) 

495 

496 def with_claims( 

497 self, issuer=None, subject=None, audience=None, additional_claims=None 

498 ): 

499 """Returns a copy of these credentials with modified claims. 

500 

501 Args: 

502 issuer (str): The `iss` claim. If unspecified the current issuer 

503 claim will be used. 

504 subject (str): The `sub` claim. If unspecified the current subject 

505 claim will be used. 

506 audience (str): the `aud` claim. If unspecified the current 

507 audience claim will be used. 

508 additional_claims (Mapping[str, str]): Any additional claims for 

509 the JWT payload. This will be merged with the current 

510 additional claims. 

511 

512 Returns: 

513 google.auth.jwt.Credentials: A new credentials instance. 

514 """ 

515 new_additional_claims = copy.deepcopy(self._additional_claims) 

516 new_additional_claims.update(additional_claims or {}) 

517 

518 return self.__class__( 

519 self._signer, 

520 issuer=issuer if issuer is not None else self._issuer, 

521 subject=subject if subject is not None else self._subject, 

522 audience=audience if audience is not None else self._audience, 

523 additional_claims=new_additional_claims, 

524 quota_project_id=self._quota_project_id, 

525 ) 

526 

527 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject) 

528 def with_quota_project(self, quota_project_id): 

529 return self.__class__( 

530 self._signer, 

531 issuer=self._issuer, 

532 subject=self._subject, 

533 audience=self._audience, 

534 additional_claims=self._additional_claims, 

535 quota_project_id=quota_project_id, 

536 ) 

537 

538 def _make_jwt(self): 

539 """Make a signed JWT. 

540 

541 Returns: 

542 Tuple[bytes, datetime]: The encoded JWT and the expiration. 

543 """ 

544 now = _helpers.utcnow() 

545 lifetime = datetime.timedelta(seconds=self._token_lifetime) 

546 expiry = now + lifetime 

547 

548 payload = { 

549 "iss": self._issuer, 

550 "sub": self._subject, 

551 "iat": _helpers.datetime_to_secs(now), 

552 "exp": _helpers.datetime_to_secs(expiry), 

553 } 

554 if self._audience: 

555 payload["aud"] = self._audience 

556 

557 payload.update(self._additional_claims) 

558 

559 jwt = encode(self._signer, payload) 

560 

561 return jwt, expiry 

562 

563 def refresh(self, request): 

564 """Refreshes the access token. 

565 

566 Args: 

567 request (Any): Unused. 

568 """ 

569 # pylint: disable=unused-argument 

570 # (pylint doesn't correctly recognize overridden methods.) 

571 self.token, self.expiry = self._make_jwt() 

572 

573 @_helpers.copy_docstring(google.auth.credentials.Signing) 

574 def sign_bytes(self, message): 

575 return self._signer.sign(message) 

576 

577 @property # type: ignore 

578 @_helpers.copy_docstring(google.auth.credentials.Signing) 

579 def signer_email(self): 

580 return self._issuer 

581 

582 @property # type: ignore 

583 @_helpers.copy_docstring(google.auth.credentials.Signing) 

584 def signer(self): 

585 return self._signer 

586 

587 @property # type: ignore 

588 def additional_claims(self): 

589 """ Additional claims the JWT object was created with.""" 

590 return self._additional_claims 

591 

592 

593class OnDemandCredentials( 

594 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject 

595): 

596 """On-demand JWT credentials. 

597 

598 Like :class:`Credentials`, this class uses a JWT as the bearer token for 

599 authentication. However, this class does not require the audience at 

600 construction time. Instead, it will generate a new token on-demand for 

601 each request using the request URI as the audience. It caches tokens 

602 so that multiple requests to the same URI do not incur the overhead 

603 of generating a new token every time. 

604 

605 This behavior is especially useful for `gRPC`_ clients. A gRPC service may 

606 have multiple audience and gRPC clients may not know all of the audiences 

607 required for accessing a particular service. With these credentials, 

608 no knowledge of the audiences is required ahead of time. 

609 

610 .. _grpc: http://www.grpc.io/ 

611 """ 

612 

613 def __init__( 

614 self, 

615 signer, 

616 issuer, 

617 subject, 

618 additional_claims=None, 

619 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

620 max_cache_size=_DEFAULT_MAX_CACHE_SIZE, 

621 quota_project_id=None, 

622 ): 

623 """ 

624 Args: 

625 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

626 issuer (str): The `iss` claim. 

627 subject (str): The `sub` claim. 

628 additional_claims (Mapping[str, str]): Any additional claims for 

629 the JWT payload. 

630 token_lifetime (int): The amount of time in seconds for 

631 which the token is valid. Defaults to 1 hour. 

632 max_cache_size (int): The maximum number of JWT tokens to keep in 

633 cache. Tokens are cached using :class:`cachetools.LRUCache`. 

634 quota_project_id (Optional[str]): The project ID used for quota 

635 and billing. 

636 

637 """ 

638 super(OnDemandCredentials, self).__init__() 

639 self._signer = signer 

640 self._issuer = issuer 

641 self._subject = subject 

642 self._token_lifetime = token_lifetime 

643 self._quota_project_id = quota_project_id 

644 

645 if additional_claims is None: 

646 additional_claims = {} 

647 

648 self._additional_claims = additional_claims 

649 self._cache = cachetools.LRUCache(maxsize=max_cache_size) 

650 

651 @classmethod 

652 def _from_signer_and_info(cls, signer, info, **kwargs): 

653 """Creates an OnDemandCredentials instance from a signer and service 

654 account info. 

655 

656 Args: 

657 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

658 info (Mapping[str, str]): The service account info. 

659 kwargs: Additional arguments to pass to the constructor. 

660 

661 Returns: 

662 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

663 

664 Raises: 

665 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

666 """ 

667 kwargs.setdefault("subject", info["client_email"]) 

668 kwargs.setdefault("issuer", info["client_email"]) 

669 return cls(signer, **kwargs) 

670 

671 @classmethod 

672 def from_service_account_info(cls, info, **kwargs): 

673 """Creates an OnDemandCredentials instance from a dictionary. 

674 

675 Args: 

676 info (Mapping[str, str]): The service account info in Google 

677 format. 

678 kwargs: Additional arguments to pass to the constructor. 

679 

680 Returns: 

681 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

682 

683 Raises: 

684 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

685 """ 

686 signer = _service_account_info.from_dict(info, require=["client_email"]) 

687 return cls._from_signer_and_info(signer, info, **kwargs) 

688 

689 @classmethod 

690 def from_service_account_file(cls, filename, **kwargs): 

691 """Creates an OnDemandCredentials instance from a service account .json 

692 file in Google format. 

693 

694 Args: 

695 filename (str): The path to the service account .json file. 

696 kwargs: Additional arguments to pass to the constructor. 

697 

698 Returns: 

699 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

700 """ 

701 info, signer = _service_account_info.from_filename( 

702 filename, require=["client_email"] 

703 ) 

704 return cls._from_signer_and_info(signer, info, **kwargs) 

705 

706 @classmethod 

707 def from_signing_credentials(cls, credentials, **kwargs): 

708 """Creates a new :class:`google.auth.jwt.OnDemandCredentials` instance 

709 from an existing :class:`google.auth.credentials.Signing` instance. 

710 

711 The new instance will use the same signer as the existing instance and 

712 will use the existing instance's signer email as the issuer and 

713 subject by default. 

714 

715 Example:: 

716 

717 svc_creds = service_account.Credentials.from_service_account_file( 

718 'service_account.json') 

719 jwt_creds = jwt.OnDemandCredentials.from_signing_credentials( 

720 svc_creds) 

721 

722 Args: 

723 credentials (google.auth.credentials.Signing): The credentials to 

724 use to construct the new credentials. 

725 kwargs: Additional arguments to pass to the constructor. 

726 

727 Returns: 

728 google.auth.jwt.Credentials: A new Credentials instance. 

729 """ 

730 kwargs.setdefault("issuer", credentials.signer_email) 

731 kwargs.setdefault("subject", credentials.signer_email) 

732 return cls(credentials.signer, **kwargs) 

733 

734 def with_claims(self, issuer=None, subject=None, additional_claims=None): 

735 """Returns a copy of these credentials with modified claims. 

736 

737 Args: 

738 issuer (str): The `iss` claim. If unspecified the current issuer 

739 claim will be used. 

740 subject (str): The `sub` claim. If unspecified the current subject 

741 claim will be used. 

742 additional_claims (Mapping[str, str]): Any additional claims for 

743 the JWT payload. This will be merged with the current 

744 additional claims. 

745 

746 Returns: 

747 google.auth.jwt.OnDemandCredentials: A new credentials instance. 

748 """ 

749 new_additional_claims = copy.deepcopy(self._additional_claims) 

750 new_additional_claims.update(additional_claims or {}) 

751 

752 return self.__class__( 

753 self._signer, 

754 issuer=issuer if issuer is not None else self._issuer, 

755 subject=subject if subject is not None else self._subject, 

756 additional_claims=new_additional_claims, 

757 max_cache_size=self._cache.maxsize, 

758 quota_project_id=self._quota_project_id, 

759 ) 

760 

761 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject) 

762 def with_quota_project(self, quota_project_id): 

763 

764 return self.__class__( 

765 self._signer, 

766 issuer=self._issuer, 

767 subject=self._subject, 

768 additional_claims=self._additional_claims, 

769 max_cache_size=self._cache.maxsize, 

770 quota_project_id=quota_project_id, 

771 ) 

772 

773 @property 

774 def valid(self): 

775 """Checks the validity of the credentials. 

776 

777 These credentials are always valid because it generates tokens on 

778 demand. 

779 """ 

780 return True 

781 

782 def _make_jwt_for_audience(self, audience): 

783 """Make a new JWT for the given audience. 

784 

785 Args: 

786 audience (str): The intended audience. 

787 

788 Returns: 

789 Tuple[bytes, datetime]: The encoded JWT and the expiration. 

790 """ 

791 now = _helpers.utcnow() 

792 lifetime = datetime.timedelta(seconds=self._token_lifetime) 

793 expiry = now + lifetime 

794 

795 payload = { 

796 "iss": self._issuer, 

797 "sub": self._subject, 

798 "iat": _helpers.datetime_to_secs(now), 

799 "exp": _helpers.datetime_to_secs(expiry), 

800 "aud": audience, 

801 } 

802 

803 payload.update(self._additional_claims) 

804 

805 jwt = encode(self._signer, payload) 

806 

807 return jwt, expiry 

808 

809 def _get_jwt_for_audience(self, audience): 

810 """Get a JWT For a given audience. 

811 

812 If there is already an existing, non-expired token in the cache for 

813 the audience, that token is used. Otherwise, a new token will be 

814 created. 

815 

816 Args: 

817 audience (str): The intended audience. 

818 

819 Returns: 

820 bytes: The encoded JWT. 

821 """ 

822 token, expiry = self._cache.get(audience, (None, None)) 

823 

824 if token is None or expiry < _helpers.utcnow(): 

825 token, expiry = self._make_jwt_for_audience(audience) 

826 self._cache[audience] = token, expiry 

827 

828 return token 

829 

830 def refresh(self, request): 

831 """Raises an exception, these credentials can not be directly 

832 refreshed. 

833 

834 Args: 

835 request (Any): Unused. 

836 

837 Raises: 

838 google.auth.RefreshError 

839 """ 

840 # pylint: disable=unused-argument 

841 # (pylint doesn't correctly recognize overridden methods.) 

842 raise exceptions.RefreshError( 

843 "OnDemandCredentials can not be directly refreshed." 

844 ) 

845 

846 def before_request(self, request, method, url, headers): 

847 """Performs credential-specific before request logic. 

848 

849 Args: 

850 request (Any): Unused. JWT credentials do not need to make an 

851 HTTP request to refresh. 

852 method (str): The request's HTTP method. 

853 url (str): The request's URI. This is used as the audience claim 

854 when generating the JWT. 

855 headers (Mapping): The request's headers. 

856 """ 

857 # pylint: disable=unused-argument 

858 # (pylint doesn't correctly recognize overridden methods.) 

859 parts = urllib.parse.urlsplit(url) 

860 # Strip query string and fragment 

861 audience = urllib.parse.urlunsplit( 

862 (parts.scheme, parts.netloc, parts.path, "", "") 

863 ) 

864 token = self._get_jwt_for_audience(audience) 

865 self.apply(headers, token=token) 

866 

867 @_helpers.copy_docstring(google.auth.credentials.Signing) 

868 def sign_bytes(self, message): 

869 return self._signer.sign(message) 

870 

871 @property # type: ignore 

872 @_helpers.copy_docstring(google.auth.credentials.Signing) 

873 def signer_email(self): 

874 return self._issuer 

875 

876 @property # type: ignore 

877 @_helpers.copy_docstring(google.auth.credentials.Signing) 

878 def signer(self): 

879 return self._signer