Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/jwt.py: 32%

237 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""JSON Web Tokens 

16 

17Provides support for creating (encoding) and verifying (decoding) JWTs, 

18especially JWTs generated and consumed by Google infrastructure. 

19 

20See `rfc7519`_ for more details on JWTs. 

21 

22To encode a JWT use :func:`encode`:: 

23 

24 from google.auth import crypt 

25 from google.auth import jwt 

26 

27 signer = crypt.Signer(private_key) 

28 payload = {'some': 'payload'} 

29 encoded = jwt.encode(signer, payload) 

30 

31To decode a JWT and verify claims use :func:`decode`:: 

32 

33 claims = jwt.decode(encoded, certs=public_certs) 

34 

35You can also skip verification:: 

36 

37 claims = jwt.decode(encoded, verify=False) 

38 

39.. _rfc7519: https://tools.ietf.org/html/rfc7519 

40 

41""" 

42 

43try: 

44 from collections.abc import Mapping 

45# Python 2.7 compatibility 

46except ImportError: # pragma: NO COVER 

47 from collections import Mapping # type: ignore 

48import copy 

49import datetime 

50import json 

51import urllib 

52 

53import cachetools 

54 

55from google.auth import _helpers 

56from google.auth import _service_account_info 

57from google.auth import crypt 

58from google.auth import exceptions 

59import google.auth.credentials 

60 

61try: 

62 from google.auth.crypt import es256 

63except ImportError: # pragma: NO COVER 

64 es256 = None # type: ignore 

65 

66_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

67_DEFAULT_MAX_CACHE_SIZE = 10 

68_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier} 

69_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256"]) 

70 

71if es256 is not None: # pragma: NO COVER 

72 _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es256.ES256Verifier # type: ignore 

73 

74 

75def encode(signer, payload, header=None, key_id=None): 

76 """Make a signed JWT. 

77 

78 Args: 

79 signer (google.auth.crypt.Signer): The signer used to sign the JWT. 

80 payload (Mapping[str, str]): The JWT payload. 

81 header (Mapping[str, str]): Additional JWT header payload. 

82 key_id (str): The key id to add to the JWT header. If the 

83 signer has a key id it will be used as the default. If this is 

84 specified it will override the signer's key id. 

85 

86 Returns: 

87 bytes: The encoded JWT. 

88 """ 

89 if header is None: 

90 header = {} 

91 

92 if key_id is None: 

93 key_id = signer.key_id 

94 

95 header.update({"typ": "JWT"}) 

96 

97 if "alg" not in header: 

98 if es256 is not None and isinstance(signer, es256.ES256Signer): 

99 header.update({"alg": "ES256"}) 

100 else: 

101 header.update({"alg": "RS256"}) 

102 

103 if key_id is not None: 

104 header["kid"] = key_id 

105 

106 segments = [ 

107 _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")), 

108 _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")), 

109 ] 

110 

111 signing_input = b".".join(segments) 

112 signature = signer.sign(signing_input) 

113 segments.append(_helpers.unpadded_urlsafe_b64encode(signature)) 

114 

115 return b".".join(segments) 

116 

117 

118def _decode_jwt_segment(encoded_section): 

119 """Decodes a single JWT segment.""" 

120 section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section) 

121 try: 

122 return json.loads(section_bytes.decode("utf-8")) 

123 except ValueError as caught_exc: 

124 new_exc = exceptions.MalformedError( 

125 "Can't parse segment: {0}".format(section_bytes) 

126 ) 

127 raise new_exc from caught_exc 

128 

129 

130def _unverified_decode(token): 

131 """Decodes a token and does no verification. 

132 

133 Args: 

134 token (Union[str, bytes]): The encoded JWT. 

135 

136 Returns: 

137 Tuple[Mapping, Mapping, str, str]: header, payload, signed_section, and 

138 signature. 

139 

140 Raises: 

141 google.auth.exceptions.MalformedError: if there are an incorrect amount of segments in the token or segments of the wrong type. 

142 """ 

143 token = _helpers.to_bytes(token) 

144 

145 if token.count(b".") != 2: 

146 raise exceptions.MalformedError( 

147 "Wrong number of segments in token: {0}".format(token) 

148 ) 

149 

150 encoded_header, encoded_payload, signature = token.split(b".") 

151 signed_section = encoded_header + b"." + encoded_payload 

152 signature = _helpers.padded_urlsafe_b64decode(signature) 

153 

154 # Parse segments 

155 header = _decode_jwt_segment(encoded_header) 

156 payload = _decode_jwt_segment(encoded_payload) 

157 

158 if not isinstance(header, Mapping): 

159 raise exceptions.MalformedError( 

160 "Header segment should be a JSON object: {0}".format(encoded_header) 

161 ) 

162 

163 if not isinstance(payload, Mapping): 

164 raise exceptions.MalformedError( 

165 "Payload segment should be a JSON object: {0}".format(encoded_payload) 

166 ) 

167 

168 return header, payload, signed_section, signature 

169 

170 

171def decode_header(token): 

172 """Return the decoded header of a token. 

173 

174 No verification is done. This is useful to extract the key id from 

175 the header in order to acquire the appropriate certificate to verify 

176 the token. 

177 

178 Args: 

179 token (Union[str, bytes]): the encoded JWT. 

180 

181 Returns: 

182 Mapping: The decoded JWT header. 

183 """ 

184 header, _, _, _ = _unverified_decode(token) 

185 return header 

186 

187 

188def _verify_iat_and_exp(payload, clock_skew_in_seconds=0): 

189 """Verifies the ``iat`` (Issued At) and ``exp`` (Expires) claims in a token 

190 payload. 

191 

192 Args: 

193 payload (Mapping[str, str]): The JWT payload. 

194 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` 

195 validation. 

196 

197 Raises: 

198 google.auth.exceptions.InvalidValue: if value validation failed. 

199 google.auth.exceptions.MalformedError: if schema validation failed. 

200 """ 

201 now = _helpers.datetime_to_secs(_helpers.utcnow()) 

202 

203 # Make sure the iat and exp claims are present. 

204 for key in ("iat", "exp"): 

205 if key not in payload: 

206 raise exceptions.MalformedError( 

207 "Token does not contain required claim {}".format(key) 

208 ) 

209 

210 # Make sure the token wasn't issued in the future. 

211 iat = payload["iat"] 

212 # Err on the side of accepting a token that is slightly early to account 

213 # for clock skew. 

214 earliest = iat - clock_skew_in_seconds 

215 if now < earliest: 

216 raise exceptions.InvalidValue( 

217 "Token used too early, {} < {}. Check that your computer's clock is set correctly.".format( 

218 now, iat 

219 ) 

220 ) 

221 

222 # Make sure the token wasn't issued in the past. 

223 exp = payload["exp"] 

224 # Err on the side of accepting a token that is slightly out of date 

225 # to account for clow skew. 

226 latest = exp + clock_skew_in_seconds 

227 if latest < now: 

228 raise exceptions.InvalidValue("Token expired, {} < {}".format(latest, now)) 

229 

230 

231def decode(token, certs=None, verify=True, audience=None, clock_skew_in_seconds=0): 

232 """Decode and verify a JWT. 

233 

234 Args: 

235 token (str): The encoded JWT. 

236 certs (Union[str, bytes, Mapping[str, Union[str, bytes]]]): The 

237 certificate used to validate the JWT signature. If bytes or string, 

238 it must the the public key certificate in PEM format. If a mapping, 

239 it must be a mapping of key IDs to public key certificates in PEM 

240 format. The mapping must contain the same key ID that's specified 

241 in the token's header. 

242 verify (bool): Whether to perform signature and claim validation. 

243 Verification is done by default. 

244 audience (str or list): The audience claim, 'aud', that this JWT should 

245 contain. Or a list of audience claims. If None then the JWT's 'aud' 

246 parameter is not verified. 

247 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` 

248 validation. 

249 

250 Returns: 

251 Mapping[str, str]: The deserialized JSON payload in the JWT. 

252 

253 Raises: 

254 google.auth.exceptions.InvalidValue: if value validation failed. 

255 google.auth.exceptions.MalformedError: if schema validation failed. 

256 """ 

257 header, payload, signed_section, signature = _unverified_decode(token) 

258 

259 if not verify: 

260 return payload 

261 

262 # Pluck the key id and algorithm from the header and make sure we have 

263 # a verifier that can support it. 

264 key_alg = header.get("alg") 

265 key_id = header.get("kid") 

266 

267 try: 

268 verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg] 

269 except KeyError as exc: 

270 if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS: 

271 raise exceptions.InvalidValue( 

272 "The key algorithm {} requires the cryptography package to be installed.".format( 

273 key_alg 

274 ) 

275 ) from exc 

276 else: 

277 raise exceptions.InvalidValue( 

278 "Unsupported signature algorithm {}".format(key_alg) 

279 ) from exc 

280 # If certs is specified as a dictionary of key IDs to certificates, then 

281 # use the certificate identified by the key ID in the token header. 

282 if isinstance(certs, Mapping): 

283 if key_id: 

284 if key_id not in certs: 

285 raise exceptions.MalformedError( 

286 "Certificate for key id {} not found.".format(key_id) 

287 ) 

288 certs_to_check = [certs[key_id]] 

289 # If there's no key id in the header, check against all of the certs. 

290 else: 

291 certs_to_check = certs.values() 

292 else: 

293 certs_to_check = certs 

294 

295 # Verify that the signature matches the message. 

296 if not crypt.verify_signature( 

297 signed_section, signature, certs_to_check, verifier_cls 

298 ): 

299 raise exceptions.MalformedError("Could not verify token signature.") 

300 

301 # Verify the issued at and created times in the payload. 

302 _verify_iat_and_exp(payload, clock_skew_in_seconds) 

303 

304 # Check audience. 

305 if audience is not None: 

306 claim_audience = payload.get("aud") 

307 if isinstance(audience, str): 

308 audience = [audience] 

309 if claim_audience not in audience: 

310 raise exceptions.InvalidValue( 

311 "Token has wrong audience {}, expected one of {}".format( 

312 claim_audience, audience 

313 ) 

314 ) 

315 

316 return payload 

317 

318 

319class Credentials( 

320 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject 

321): 

322 """Credentials that use a JWT as the bearer token. 

323 

324 These credentials require an "audience" claim. This claim identifies the 

325 intended recipient of the bearer token. 

326 

327 The constructor arguments determine the claims for the JWT that is 

328 sent with requests. Usually, you'll construct these credentials with 

329 one of the helper constructors as shown in the next section. 

330 

331 To create JWT credentials using a Google service account private key 

332 JSON file:: 

333 

334 audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher' 

335 credentials = jwt.Credentials.from_service_account_file( 

336 'service-account.json', 

337 audience=audience) 

338 

339 If you already have the service account file loaded and parsed:: 

340 

341 service_account_info = json.load(open('service_account.json')) 

342 credentials = jwt.Credentials.from_service_account_info( 

343 service_account_info, 

344 audience=audience) 

345 

346 Both helper methods pass on arguments to the constructor, so you can 

347 specify the JWT claims:: 

348 

349 credentials = jwt.Credentials.from_service_account_file( 

350 'service-account.json', 

351 audience=audience, 

352 additional_claims={'meta': 'data'}) 

353 

354 You can also construct the credentials directly if you have a 

355 :class:`~google.auth.crypt.Signer` instance:: 

356 

357 credentials = jwt.Credentials( 

358 signer, 

359 issuer='your-issuer', 

360 subject='your-subject', 

361 audience=audience) 

362 

363 The claims are considered immutable. If you want to modify the claims, 

364 you can easily create another instance using :meth:`with_claims`:: 

365 

366 new_audience = ( 

367 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber') 

368 new_credentials = credentials.with_claims(audience=new_audience) 

369 """ 

370 

371 def __init__( 

372 self, 

373 signer, 

374 issuer, 

375 subject, 

376 audience, 

377 additional_claims=None, 

378 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

379 quota_project_id=None, 

380 ): 

381 """ 

382 Args: 

383 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

384 issuer (str): The `iss` claim. 

385 subject (str): The `sub` claim. 

386 audience (str): the `aud` claim. The intended audience for the 

387 credentials. 

388 additional_claims (Mapping[str, str]): Any additional claims for 

389 the JWT payload. 

390 token_lifetime (int): The amount of time in seconds for 

391 which the token is valid. Defaults to 1 hour. 

392 quota_project_id (Optional[str]): The project ID used for quota 

393 and billing. 

394 """ 

395 super(Credentials, self).__init__() 

396 self._signer = signer 

397 self._issuer = issuer 

398 self._subject = subject 

399 self._audience = audience 

400 self._token_lifetime = token_lifetime 

401 self._quota_project_id = quota_project_id 

402 

403 if additional_claims is None: 

404 additional_claims = {} 

405 

406 self._additional_claims = additional_claims 

407 

408 @classmethod 

409 def _from_signer_and_info(cls, signer, info, **kwargs): 

410 """Creates a Credentials instance from a signer and service account 

411 info. 

412 

413 Args: 

414 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

415 info (Mapping[str, str]): The service account info. 

416 kwargs: Additional arguments to pass to the constructor. 

417 

418 Returns: 

419 google.auth.jwt.Credentials: The constructed credentials. 

420 

421 Raises: 

422 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

423 """ 

424 kwargs.setdefault("subject", info["client_email"]) 

425 kwargs.setdefault("issuer", info["client_email"]) 

426 return cls(signer, **kwargs) 

427 

428 @classmethod 

429 def from_service_account_info(cls, info, **kwargs): 

430 """Creates an Credentials instance from a dictionary. 

431 

432 Args: 

433 info (Mapping[str, str]): The service account info in Google 

434 format. 

435 kwargs: Additional arguments to pass to the constructor. 

436 

437 Returns: 

438 google.auth.jwt.Credentials: The constructed credentials. 

439 

440 Raises: 

441 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

442 """ 

443 signer = _service_account_info.from_dict(info, require=["client_email"]) 

444 return cls._from_signer_and_info(signer, info, **kwargs) 

445 

446 @classmethod 

447 def from_service_account_file(cls, filename, **kwargs): 

448 """Creates a Credentials instance from a service account .json file 

449 in Google format. 

450 

451 Args: 

452 filename (str): The path to the service account .json file. 

453 kwargs: Additional arguments to pass to the constructor. 

454 

455 Returns: 

456 google.auth.jwt.Credentials: The constructed credentials. 

457 """ 

458 info, signer = _service_account_info.from_filename( 

459 filename, require=["client_email"] 

460 ) 

461 return cls._from_signer_and_info(signer, info, **kwargs) 

462 

463 @classmethod 

464 def from_signing_credentials(cls, credentials, audience, **kwargs): 

465 """Creates a new :class:`google.auth.jwt.Credentials` instance from an 

466 existing :class:`google.auth.credentials.Signing` instance. 

467 

468 The new instance will use the same signer as the existing instance and 

469 will use the existing instance's signer email as the issuer and 

470 subject by default. 

471 

472 Example:: 

473 

474 svc_creds = service_account.Credentials.from_service_account_file( 

475 'service_account.json') 

476 audience = ( 

477 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher') 

478 jwt_creds = jwt.Credentials.from_signing_credentials( 

479 svc_creds, audience=audience) 

480 

481 Args: 

482 credentials (google.auth.credentials.Signing): The credentials to 

483 use to construct the new credentials. 

484 audience (str): the `aud` claim. The intended audience for the 

485 credentials. 

486 kwargs: Additional arguments to pass to the constructor. 

487 

488 Returns: 

489 google.auth.jwt.Credentials: A new Credentials instance. 

490 """ 

491 kwargs.setdefault("issuer", credentials.signer_email) 

492 kwargs.setdefault("subject", credentials.signer_email) 

493 return cls(credentials.signer, audience=audience, **kwargs) 

494 

495 def with_claims( 

496 self, issuer=None, subject=None, audience=None, additional_claims=None 

497 ): 

498 """Returns a copy of these credentials with modified claims. 

499 

500 Args: 

501 issuer (str): The `iss` claim. If unspecified the current issuer 

502 claim will be used. 

503 subject (str): The `sub` claim. If unspecified the current subject 

504 claim will be used. 

505 audience (str): the `aud` claim. If unspecified the current 

506 audience claim will be used. 

507 additional_claims (Mapping[str, str]): Any additional claims for 

508 the JWT payload. This will be merged with the current 

509 additional claims. 

510 

511 Returns: 

512 google.auth.jwt.Credentials: A new credentials instance. 

513 """ 

514 new_additional_claims = copy.deepcopy(self._additional_claims) 

515 new_additional_claims.update(additional_claims or {}) 

516 

517 return self.__class__( 

518 self._signer, 

519 issuer=issuer if issuer is not None else self._issuer, 

520 subject=subject if subject is not None else self._subject, 

521 audience=audience if audience is not None else self._audience, 

522 additional_claims=new_additional_claims, 

523 quota_project_id=self._quota_project_id, 

524 ) 

525 

526 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject) 

527 def with_quota_project(self, quota_project_id): 

528 return self.__class__( 

529 self._signer, 

530 issuer=self._issuer, 

531 subject=self._subject, 

532 audience=self._audience, 

533 additional_claims=self._additional_claims, 

534 quota_project_id=quota_project_id, 

535 ) 

536 

537 def _make_jwt(self): 

538 """Make a signed JWT. 

539 

540 Returns: 

541 Tuple[bytes, datetime]: The encoded JWT and the expiration. 

542 """ 

543 now = _helpers.utcnow() 

544 lifetime = datetime.timedelta(seconds=self._token_lifetime) 

545 expiry = now + lifetime 

546 

547 payload = { 

548 "iss": self._issuer, 

549 "sub": self._subject, 

550 "iat": _helpers.datetime_to_secs(now), 

551 "exp": _helpers.datetime_to_secs(expiry), 

552 } 

553 if self._audience: 

554 payload["aud"] = self._audience 

555 

556 payload.update(self._additional_claims) 

557 

558 jwt = encode(self._signer, payload) 

559 

560 return jwt, expiry 

561 

562 def refresh(self, request): 

563 """Refreshes the access token. 

564 

565 Args: 

566 request (Any): Unused. 

567 """ 

568 # pylint: disable=unused-argument 

569 # (pylint doesn't correctly recognize overridden methods.) 

570 self.token, self.expiry = self._make_jwt() 

571 

572 @_helpers.copy_docstring(google.auth.credentials.Signing) 

573 def sign_bytes(self, message): 

574 return self._signer.sign(message) 

575 

576 @property # type: ignore 

577 @_helpers.copy_docstring(google.auth.credentials.Signing) 

578 def signer_email(self): 

579 return self._issuer 

580 

581 @property # type: ignore 

582 @_helpers.copy_docstring(google.auth.credentials.Signing) 

583 def signer(self): 

584 return self._signer 

585 

586 @property # type: ignore 

587 def additional_claims(self): 

588 """ Additional claims the JWT object was created with.""" 

589 return self._additional_claims 

590 

591 

592class OnDemandCredentials( 

593 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject 

594): 

595 """On-demand JWT credentials. 

596 

597 Like :class:`Credentials`, this class uses a JWT as the bearer token for 

598 authentication. However, this class does not require the audience at 

599 construction time. Instead, it will generate a new token on-demand for 

600 each request using the request URI as the audience. It caches tokens 

601 so that multiple requests to the same URI do not incur the overhead 

602 of generating a new token every time. 

603 

604 This behavior is especially useful for `gRPC`_ clients. A gRPC service may 

605 have multiple audience and gRPC clients may not know all of the audiences 

606 required for accessing a particular service. With these credentials, 

607 no knowledge of the audiences is required ahead of time. 

608 

609 .. _grpc: http://www.grpc.io/ 

610 """ 

611 

612 def __init__( 

613 self, 

614 signer, 

615 issuer, 

616 subject, 

617 additional_claims=None, 

618 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

619 max_cache_size=_DEFAULT_MAX_CACHE_SIZE, 

620 quota_project_id=None, 

621 ): 

622 """ 

623 Args: 

624 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

625 issuer (str): The `iss` claim. 

626 subject (str): The `sub` claim. 

627 additional_claims (Mapping[str, str]): Any additional claims for 

628 the JWT payload. 

629 token_lifetime (int): The amount of time in seconds for 

630 which the token is valid. Defaults to 1 hour. 

631 max_cache_size (int): The maximum number of JWT tokens to keep in 

632 cache. Tokens are cached using :class:`cachetools.LRUCache`. 

633 quota_project_id (Optional[str]): The project ID used for quota 

634 and billing. 

635 

636 """ 

637 super(OnDemandCredentials, self).__init__() 

638 self._signer = signer 

639 self._issuer = issuer 

640 self._subject = subject 

641 self._token_lifetime = token_lifetime 

642 self._quota_project_id = quota_project_id 

643 

644 if additional_claims is None: 

645 additional_claims = {} 

646 

647 self._additional_claims = additional_claims 

648 self._cache = cachetools.LRUCache(maxsize=max_cache_size) 

649 

650 @classmethod 

651 def _from_signer_and_info(cls, signer, info, **kwargs): 

652 """Creates an OnDemandCredentials instance from a signer and service 

653 account info. 

654 

655 Args: 

656 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

657 info (Mapping[str, str]): The service account info. 

658 kwargs: Additional arguments to pass to the constructor. 

659 

660 Returns: 

661 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

662 

663 Raises: 

664 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

665 """ 

666 kwargs.setdefault("subject", info["client_email"]) 

667 kwargs.setdefault("issuer", info["client_email"]) 

668 return cls(signer, **kwargs) 

669 

670 @classmethod 

671 def from_service_account_info(cls, info, **kwargs): 

672 """Creates an OnDemandCredentials instance from a dictionary. 

673 

674 Args: 

675 info (Mapping[str, str]): The service account info in Google 

676 format. 

677 kwargs: Additional arguments to pass to the constructor. 

678 

679 Returns: 

680 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

681 

682 Raises: 

683 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

684 """ 

685 signer = _service_account_info.from_dict(info, require=["client_email"]) 

686 return cls._from_signer_and_info(signer, info, **kwargs) 

687 

688 @classmethod 

689 def from_service_account_file(cls, filename, **kwargs): 

690 """Creates an OnDemandCredentials instance from a service account .json 

691 file in Google format. 

692 

693 Args: 

694 filename (str): The path to the service account .json file. 

695 kwargs: Additional arguments to pass to the constructor. 

696 

697 Returns: 

698 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

699 """ 

700 info, signer = _service_account_info.from_filename( 

701 filename, require=["client_email"] 

702 ) 

703 return cls._from_signer_and_info(signer, info, **kwargs) 

704 

705 @classmethod 

706 def from_signing_credentials(cls, credentials, **kwargs): 

707 """Creates a new :class:`google.auth.jwt.OnDemandCredentials` instance 

708 from an existing :class:`google.auth.credentials.Signing` instance. 

709 

710 The new instance will use the same signer as the existing instance and 

711 will use the existing instance's signer email as the issuer and 

712 subject by default. 

713 

714 Example:: 

715 

716 svc_creds = service_account.Credentials.from_service_account_file( 

717 'service_account.json') 

718 jwt_creds = jwt.OnDemandCredentials.from_signing_credentials( 

719 svc_creds) 

720 

721 Args: 

722 credentials (google.auth.credentials.Signing): The credentials to 

723 use to construct the new credentials. 

724 kwargs: Additional arguments to pass to the constructor. 

725 

726 Returns: 

727 google.auth.jwt.Credentials: A new Credentials instance. 

728 """ 

729 kwargs.setdefault("issuer", credentials.signer_email) 

730 kwargs.setdefault("subject", credentials.signer_email) 

731 return cls(credentials.signer, **kwargs) 

732 

733 def with_claims(self, issuer=None, subject=None, additional_claims=None): 

734 """Returns a copy of these credentials with modified claims. 

735 

736 Args: 

737 issuer (str): The `iss` claim. If unspecified the current issuer 

738 claim will be used. 

739 subject (str): The `sub` claim. If unspecified the current subject 

740 claim will be used. 

741 additional_claims (Mapping[str, str]): Any additional claims for 

742 the JWT payload. This will be merged with the current 

743 additional claims. 

744 

745 Returns: 

746 google.auth.jwt.OnDemandCredentials: A new credentials instance. 

747 """ 

748 new_additional_claims = copy.deepcopy(self._additional_claims) 

749 new_additional_claims.update(additional_claims or {}) 

750 

751 return self.__class__( 

752 self._signer, 

753 issuer=issuer if issuer is not None else self._issuer, 

754 subject=subject if subject is not None else self._subject, 

755 additional_claims=new_additional_claims, 

756 max_cache_size=self._cache.maxsize, 

757 quota_project_id=self._quota_project_id, 

758 ) 

759 

760 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject) 

761 def with_quota_project(self, quota_project_id): 

762 

763 return self.__class__( 

764 self._signer, 

765 issuer=self._issuer, 

766 subject=self._subject, 

767 additional_claims=self._additional_claims, 

768 max_cache_size=self._cache.maxsize, 

769 quota_project_id=quota_project_id, 

770 ) 

771 

772 @property 

773 def valid(self): 

774 """Checks the validity of the credentials. 

775 

776 These credentials are always valid because it generates tokens on 

777 demand. 

778 """ 

779 return True 

780 

781 def _make_jwt_for_audience(self, audience): 

782 """Make a new JWT for the given audience. 

783 

784 Args: 

785 audience (str): The intended audience. 

786 

787 Returns: 

788 Tuple[bytes, datetime]: The encoded JWT and the expiration. 

789 """ 

790 now = _helpers.utcnow() 

791 lifetime = datetime.timedelta(seconds=self._token_lifetime) 

792 expiry = now + lifetime 

793 

794 payload = { 

795 "iss": self._issuer, 

796 "sub": self._subject, 

797 "iat": _helpers.datetime_to_secs(now), 

798 "exp": _helpers.datetime_to_secs(expiry), 

799 "aud": audience, 

800 } 

801 

802 payload.update(self._additional_claims) 

803 

804 jwt = encode(self._signer, payload) 

805 

806 return jwt, expiry 

807 

808 def _get_jwt_for_audience(self, audience): 

809 """Get a JWT For a given audience. 

810 

811 If there is already an existing, non-expired token in the cache for 

812 the audience, that token is used. Otherwise, a new token will be 

813 created. 

814 

815 Args: 

816 audience (str): The intended audience. 

817 

818 Returns: 

819 bytes: The encoded JWT. 

820 """ 

821 token, expiry = self._cache.get(audience, (None, None)) 

822 

823 if token is None or expiry < _helpers.utcnow(): 

824 token, expiry = self._make_jwt_for_audience(audience) 

825 self._cache[audience] = token, expiry 

826 

827 return token 

828 

829 def refresh(self, request): 

830 """Raises an exception, these credentials can not be directly 

831 refreshed. 

832 

833 Args: 

834 request (Any): Unused. 

835 

836 Raises: 

837 google.auth.RefreshError 

838 """ 

839 # pylint: disable=unused-argument 

840 # (pylint doesn't correctly recognize overridden methods.) 

841 raise exceptions.RefreshError( 

842 "OnDemandCredentials can not be directly refreshed." 

843 ) 

844 

845 def before_request(self, request, method, url, headers): 

846 """Performs credential-specific before request logic. 

847 

848 Args: 

849 request (Any): Unused. JWT credentials do not need to make an 

850 HTTP request to refresh. 

851 method (str): The request's HTTP method. 

852 url (str): The request's URI. This is used as the audience claim 

853 when generating the JWT. 

854 headers (Mapping): The request's headers. 

855 """ 

856 # pylint: disable=unused-argument 

857 # (pylint doesn't correctly recognize overridden methods.) 

858 parts = urllib.parse.urlsplit(url) 

859 # Strip query string and fragment 

860 audience = urllib.parse.urlunsplit( 

861 (parts.scheme, parts.netloc, parts.path, "", "") 

862 ) 

863 token = self._get_jwt_for_audience(audience) 

864 self.apply(headers, token=token) 

865 

866 @_helpers.copy_docstring(google.auth.credentials.Signing) 

867 def sign_bytes(self, message): 

868 return self._signer.sign(message) 

869 

870 @property # type: ignore 

871 @_helpers.copy_docstring(google.auth.credentials.Signing) 

872 def signer_email(self): 

873 return self._issuer 

874 

875 @property # type: ignore 

876 @_helpers.copy_docstring(google.auth.credentials.Signing) 

877 def signer(self): 

878 return self._signer