Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/jwt.py: 33%

238 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-06 06:03 +0000

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""JSON Web Tokens 

16 

17Provides support for creating (encoding) and verifying (decoding) JWTs, 

18especially JWTs generated and consumed by Google infrastructure. 

19 

20See `rfc7519`_ for more details on JWTs. 

21 

22To encode a JWT use :func:`encode`:: 

23 

24 from google.auth import crypt 

25 from google.auth import jwt 

26 

27 signer = crypt.Signer(private_key) 

28 payload = {'some': 'payload'} 

29 encoded = jwt.encode(signer, payload) 

30 

31To decode a JWT and verify claims use :func:`decode`:: 

32 

33 claims = jwt.decode(encoded, certs=public_certs) 

34 

35You can also skip verification:: 

36 

37 claims = jwt.decode(encoded, verify=False) 

38 

39.. _rfc7519: https://tools.ietf.org/html/rfc7519 

40 

41""" 

42 

43try: 

44 from collections.abc import Mapping 

45# Python 2.7 compatibility 

46except ImportError: # pragma: NO COVER 

47 from collections import Mapping # type: ignore 

48import copy 

49import datetime 

50import json 

51 

52import cachetools 

53import six 

54from six.moves import urllib 

55 

56from google.auth import _helpers 

57from google.auth import _service_account_info 

58from google.auth import crypt 

59from google.auth import exceptions 

60import google.auth.credentials 

61 

62try: 

63 from google.auth.crypt import es256 

64except ImportError: # pragma: NO COVER 

65 es256 = None # type: ignore 

66 

67_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

68_DEFAULT_MAX_CACHE_SIZE = 10 

69_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier} 

70_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256"]) 

71 

72if es256 is not None: # pragma: NO COVER 

73 _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es256.ES256Verifier # type: ignore 

74 

75 

76def encode(signer, payload, header=None, key_id=None): 

77 """Make a signed JWT. 

78 

79 Args: 

80 signer (google.auth.crypt.Signer): The signer used to sign the JWT. 

81 payload (Mapping[str, str]): The JWT payload. 

82 header (Mapping[str, str]): Additional JWT header payload. 

83 key_id (str): The key id to add to the JWT header. If the 

84 signer has a key id it will be used as the default. If this is 

85 specified it will override the signer's key id. 

86 

87 Returns: 

88 bytes: The encoded JWT. 

89 """ 

90 if header is None: 

91 header = {} 

92 

93 if key_id is None: 

94 key_id = signer.key_id 

95 

96 header.update({"typ": "JWT"}) 

97 

98 if "alg" not in header: 

99 if es256 is not None and isinstance(signer, es256.ES256Signer): 

100 header.update({"alg": "ES256"}) 

101 else: 

102 header.update({"alg": "RS256"}) 

103 

104 if key_id is not None: 

105 header["kid"] = key_id 

106 

107 segments = [ 

108 _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")), 

109 _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")), 

110 ] 

111 

112 signing_input = b".".join(segments) 

113 signature = signer.sign(signing_input) 

114 segments.append(_helpers.unpadded_urlsafe_b64encode(signature)) 

115 

116 return b".".join(segments) 

117 

118 

119def _decode_jwt_segment(encoded_section): 

120 """Decodes a single JWT segment.""" 

121 section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section) 

122 try: 

123 return json.loads(section_bytes.decode("utf-8")) 

124 except ValueError as caught_exc: 

125 new_exc = exceptions.MalformedError( 

126 "Can't parse segment: {0}".format(section_bytes) 

127 ) 

128 six.raise_from(new_exc, caught_exc) 

129 

130 

131def _unverified_decode(token): 

132 """Decodes a token and does no verification. 

133 

134 Args: 

135 token (Union[str, bytes]): The encoded JWT. 

136 

137 Returns: 

138 Tuple[Mapping, Mapping, str, str]: header, payload, signed_section, and 

139 signature. 

140 

141 Raises: 

142 google.auth.exceptions.MalformedError: if there are an incorrect amount of segments in the token or segments of the wrong type. 

143 """ 

144 token = _helpers.to_bytes(token) 

145 

146 if token.count(b".") != 2: 

147 raise exceptions.MalformedError( 

148 "Wrong number of segments in token: {0}".format(token) 

149 ) 

150 

151 encoded_header, encoded_payload, signature = token.split(b".") 

152 signed_section = encoded_header + b"." + encoded_payload 

153 signature = _helpers.padded_urlsafe_b64decode(signature) 

154 

155 # Parse segments 

156 header = _decode_jwt_segment(encoded_header) 

157 payload = _decode_jwt_segment(encoded_payload) 

158 

159 if not isinstance(header, Mapping): 

160 raise exceptions.MalformedError( 

161 "Header segment should be a JSON object: {0}".format(encoded_header) 

162 ) 

163 

164 if not isinstance(payload, Mapping): 

165 raise exceptions.MalformedError( 

166 "Payload segment should be a JSON object: {0}".format(encoded_payload) 

167 ) 

168 

169 return header, payload, signed_section, signature 

170 

171 

172def decode_header(token): 

173 """Return the decoded header of a token. 

174 

175 No verification is done. This is useful to extract the key id from 

176 the header in order to acquire the appropriate certificate to verify 

177 the token. 

178 

179 Args: 

180 token (Union[str, bytes]): the encoded JWT. 

181 

182 Returns: 

183 Mapping: The decoded JWT header. 

184 """ 

185 header, _, _, _ = _unverified_decode(token) 

186 return header 

187 

188 

189def _verify_iat_and_exp(payload, clock_skew_in_seconds=0): 

190 """Verifies the ``iat`` (Issued At) and ``exp`` (Expires) claims in a token 

191 payload. 

192 

193 Args: 

194 payload (Mapping[str, str]): The JWT payload. 

195 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` 

196 validation. 

197 

198 Raises: 

199 google.auth.exceptions.InvalidValue: if value validation failed. 

200 google.auth.exceptions.MalformedError: if schema validation failed. 

201 """ 

202 now = _helpers.datetime_to_secs(_helpers.utcnow()) 

203 

204 # Make sure the iat and exp claims are present. 

205 for key in ("iat", "exp"): 

206 if key not in payload: 

207 raise exceptions.MalformedError( 

208 "Token does not contain required claim {}".format(key) 

209 ) 

210 

211 # Make sure the token wasn't issued in the future. 

212 iat = payload["iat"] 

213 # Err on the side of accepting a token that is slightly early to account 

214 # for clock skew. 

215 earliest = iat - clock_skew_in_seconds 

216 if now < earliest: 

217 raise exceptions.InvalidValue( 

218 "Token used too early, {} < {}. Check that your computer's clock is set correctly.".format( 

219 now, iat 

220 ) 

221 ) 

222 

223 # Make sure the token wasn't issued in the past. 

224 exp = payload["exp"] 

225 # Err on the side of accepting a token that is slightly out of date 

226 # to account for clow skew. 

227 latest = exp + clock_skew_in_seconds 

228 if latest < now: 

229 raise exceptions.InvalidValue("Token expired, {} < {}".format(latest, now)) 

230 

231 

232def decode(token, certs=None, verify=True, audience=None, clock_skew_in_seconds=0): 

233 """Decode and verify a JWT. 

234 

235 Args: 

236 token (str): The encoded JWT. 

237 certs (Union[str, bytes, Mapping[str, Union[str, bytes]]]): The 

238 certificate used to validate the JWT signature. If bytes or string, 

239 it must the the public key certificate in PEM format. If a mapping, 

240 it must be a mapping of key IDs to public key certificates in PEM 

241 format. The mapping must contain the same key ID that's specified 

242 in the token's header. 

243 verify (bool): Whether to perform signature and claim validation. 

244 Verification is done by default. 

245 audience (str or list): The audience claim, 'aud', that this JWT should 

246 contain. Or a list of audience claims. If None then the JWT's 'aud' 

247 parameter is not verified. 

248 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` 

249 validation. 

250 

251 Returns: 

252 Mapping[str, str]: The deserialized JSON payload in the JWT. 

253 

254 Raises: 

255 google.auth.exceptions.InvalidValue: if value validation failed. 

256 google.auth.exceptions.MalformedError: if schema validation failed. 

257 """ 

258 header, payload, signed_section, signature = _unverified_decode(token) 

259 

260 if not verify: 

261 return payload 

262 

263 # Pluck the key id and algorithm from the header and make sure we have 

264 # a verifier that can support it. 

265 key_alg = header.get("alg") 

266 key_id = header.get("kid") 

267 

268 try: 

269 verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg] 

270 except KeyError as exc: 

271 if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS: 

272 six.raise_from( 

273 exceptions.InvalidValue( 

274 "The key algorithm {} requires the cryptography package " 

275 "to be installed.".format(key_alg) 

276 ), 

277 exc, 

278 ) 

279 else: 

280 six.raise_from( 

281 exceptions.InvalidValue( 

282 "Unsupported signature algorithm {}".format(key_alg) 

283 ), 

284 exc, 

285 ) 

286 

287 # If certs is specified as a dictionary of key IDs to certificates, then 

288 # use the certificate identified by the key ID in the token header. 

289 if isinstance(certs, Mapping): 

290 if key_id: 

291 if key_id not in certs: 

292 raise exceptions.MalformedError( 

293 "Certificate for key id {} not found.".format(key_id) 

294 ) 

295 certs_to_check = [certs[key_id]] 

296 # If there's no key id in the header, check against all of the certs. 

297 else: 

298 certs_to_check = certs.values() 

299 else: 

300 certs_to_check = certs 

301 

302 # Verify that the signature matches the message. 

303 if not crypt.verify_signature( 

304 signed_section, signature, certs_to_check, verifier_cls 

305 ): 

306 raise exceptions.MalformedError("Could not verify token signature.") 

307 

308 # Verify the issued at and created times in the payload. 

309 _verify_iat_and_exp(payload, clock_skew_in_seconds) 

310 

311 # Check audience. 

312 if audience is not None: 

313 claim_audience = payload.get("aud") 

314 if isinstance(audience, str): 

315 audience = [audience] 

316 if claim_audience not in audience: 

317 raise exceptions.InvalidValue( 

318 "Token has wrong audience {}, expected one of {}".format( 

319 claim_audience, audience 

320 ) 

321 ) 

322 

323 return payload 

324 

325 

326class Credentials( 

327 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject 

328): 

329 """Credentials that use a JWT as the bearer token. 

330 

331 These credentials require an "audience" claim. This claim identifies the 

332 intended recipient of the bearer token. 

333 

334 The constructor arguments determine the claims for the JWT that is 

335 sent with requests. Usually, you'll construct these credentials with 

336 one of the helper constructors as shown in the next section. 

337 

338 To create JWT credentials using a Google service account private key 

339 JSON file:: 

340 

341 audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher' 

342 credentials = jwt.Credentials.from_service_account_file( 

343 'service-account.json', 

344 audience=audience) 

345 

346 If you already have the service account file loaded and parsed:: 

347 

348 service_account_info = json.load(open('service_account.json')) 

349 credentials = jwt.Credentials.from_service_account_info( 

350 service_account_info, 

351 audience=audience) 

352 

353 Both helper methods pass on arguments to the constructor, so you can 

354 specify the JWT claims:: 

355 

356 credentials = jwt.Credentials.from_service_account_file( 

357 'service-account.json', 

358 audience=audience, 

359 additional_claims={'meta': 'data'}) 

360 

361 You can also construct the credentials directly if you have a 

362 :class:`~google.auth.crypt.Signer` instance:: 

363 

364 credentials = jwt.Credentials( 

365 signer, 

366 issuer='your-issuer', 

367 subject='your-subject', 

368 audience=audience) 

369 

370 The claims are considered immutable. If you want to modify the claims, 

371 you can easily create another instance using :meth:`with_claims`:: 

372 

373 new_audience = ( 

374 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber') 

375 new_credentials = credentials.with_claims(audience=new_audience) 

376 """ 

377 

378 def __init__( 

379 self, 

380 signer, 

381 issuer, 

382 subject, 

383 audience, 

384 additional_claims=None, 

385 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

386 quota_project_id=None, 

387 ): 

388 """ 

389 Args: 

390 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

391 issuer (str): The `iss` claim. 

392 subject (str): The `sub` claim. 

393 audience (str): the `aud` claim. The intended audience for the 

394 credentials. 

395 additional_claims (Mapping[str, str]): Any additional claims for 

396 the JWT payload. 

397 token_lifetime (int): The amount of time in seconds for 

398 which the token is valid. Defaults to 1 hour. 

399 quota_project_id (Optional[str]): The project ID used for quota 

400 and billing. 

401 """ 

402 super(Credentials, self).__init__() 

403 self._signer = signer 

404 self._issuer = issuer 

405 self._subject = subject 

406 self._audience = audience 

407 self._token_lifetime = token_lifetime 

408 self._quota_project_id = quota_project_id 

409 

410 if additional_claims is None: 

411 additional_claims = {} 

412 

413 self._additional_claims = additional_claims 

414 

415 @classmethod 

416 def _from_signer_and_info(cls, signer, info, **kwargs): 

417 """Creates a Credentials instance from a signer and service account 

418 info. 

419 

420 Args: 

421 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

422 info (Mapping[str, str]): The service account info. 

423 kwargs: Additional arguments to pass to the constructor. 

424 

425 Returns: 

426 google.auth.jwt.Credentials: The constructed credentials. 

427 

428 Raises: 

429 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

430 """ 

431 kwargs.setdefault("subject", info["client_email"]) 

432 kwargs.setdefault("issuer", info["client_email"]) 

433 return cls(signer, **kwargs) 

434 

435 @classmethod 

436 def from_service_account_info(cls, info, **kwargs): 

437 """Creates an Credentials instance from a dictionary. 

438 

439 Args: 

440 info (Mapping[str, str]): The service account info in Google 

441 format. 

442 kwargs: Additional arguments to pass to the constructor. 

443 

444 Returns: 

445 google.auth.jwt.Credentials: The constructed credentials. 

446 

447 Raises: 

448 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

449 """ 

450 signer = _service_account_info.from_dict(info, require=["client_email"]) 

451 return cls._from_signer_and_info(signer, info, **kwargs) 

452 

453 @classmethod 

454 def from_service_account_file(cls, filename, **kwargs): 

455 """Creates a Credentials instance from a service account .json file 

456 in Google format. 

457 

458 Args: 

459 filename (str): The path to the service account .json file. 

460 kwargs: Additional arguments to pass to the constructor. 

461 

462 Returns: 

463 google.auth.jwt.Credentials: The constructed credentials. 

464 """ 

465 info, signer = _service_account_info.from_filename( 

466 filename, require=["client_email"] 

467 ) 

468 return cls._from_signer_and_info(signer, info, **kwargs) 

469 

470 @classmethod 

471 def from_signing_credentials(cls, credentials, audience, **kwargs): 

472 """Creates a new :class:`google.auth.jwt.Credentials` instance from an 

473 existing :class:`google.auth.credentials.Signing` instance. 

474 

475 The new instance will use the same signer as the existing instance and 

476 will use the existing instance's signer email as the issuer and 

477 subject by default. 

478 

479 Example:: 

480 

481 svc_creds = service_account.Credentials.from_service_account_file( 

482 'service_account.json') 

483 audience = ( 

484 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher') 

485 jwt_creds = jwt.Credentials.from_signing_credentials( 

486 svc_creds, audience=audience) 

487 

488 Args: 

489 credentials (google.auth.credentials.Signing): The credentials to 

490 use to construct the new credentials. 

491 audience (str): the `aud` claim. The intended audience for the 

492 credentials. 

493 kwargs: Additional arguments to pass to the constructor. 

494 

495 Returns: 

496 google.auth.jwt.Credentials: A new Credentials instance. 

497 """ 

498 kwargs.setdefault("issuer", credentials.signer_email) 

499 kwargs.setdefault("subject", credentials.signer_email) 

500 return cls(credentials.signer, audience=audience, **kwargs) 

501 

502 def with_claims( 

503 self, issuer=None, subject=None, audience=None, additional_claims=None 

504 ): 

505 """Returns a copy of these credentials with modified claims. 

506 

507 Args: 

508 issuer (str): The `iss` claim. If unspecified the current issuer 

509 claim will be used. 

510 subject (str): The `sub` claim. If unspecified the current subject 

511 claim will be used. 

512 audience (str): the `aud` claim. If unspecified the current 

513 audience claim will be used. 

514 additional_claims (Mapping[str, str]): Any additional claims for 

515 the JWT payload. This will be merged with the current 

516 additional claims. 

517 

518 Returns: 

519 google.auth.jwt.Credentials: A new credentials instance. 

520 """ 

521 new_additional_claims = copy.deepcopy(self._additional_claims) 

522 new_additional_claims.update(additional_claims or {}) 

523 

524 return self.__class__( 

525 self._signer, 

526 issuer=issuer if issuer is not None else self._issuer, 

527 subject=subject if subject is not None else self._subject, 

528 audience=audience if audience is not None else self._audience, 

529 additional_claims=new_additional_claims, 

530 quota_project_id=self._quota_project_id, 

531 ) 

532 

533 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject) 

534 def with_quota_project(self, quota_project_id): 

535 return self.__class__( 

536 self._signer, 

537 issuer=self._issuer, 

538 subject=self._subject, 

539 audience=self._audience, 

540 additional_claims=self._additional_claims, 

541 quota_project_id=quota_project_id, 

542 ) 

543 

544 def _make_jwt(self): 

545 """Make a signed JWT. 

546 

547 Returns: 

548 Tuple[bytes, datetime]: The encoded JWT and the expiration. 

549 """ 

550 now = _helpers.utcnow() 

551 lifetime = datetime.timedelta(seconds=self._token_lifetime) 

552 expiry = now + lifetime 

553 

554 payload = { 

555 "iss": self._issuer, 

556 "sub": self._subject, 

557 "iat": _helpers.datetime_to_secs(now), 

558 "exp": _helpers.datetime_to_secs(expiry), 

559 } 

560 if self._audience: 

561 payload["aud"] = self._audience 

562 

563 payload.update(self._additional_claims) 

564 

565 jwt = encode(self._signer, payload) 

566 

567 return jwt, expiry 

568 

569 def refresh(self, request): 

570 """Refreshes the access token. 

571 

572 Args: 

573 request (Any): Unused. 

574 """ 

575 # pylint: disable=unused-argument 

576 # (pylint doesn't correctly recognize overridden methods.) 

577 self.token, self.expiry = self._make_jwt() 

578 

579 @_helpers.copy_docstring(google.auth.credentials.Signing) 

580 def sign_bytes(self, message): 

581 return self._signer.sign(message) 

582 

583 @property # type: ignore 

584 @_helpers.copy_docstring(google.auth.credentials.Signing) 

585 def signer_email(self): 

586 return self._issuer 

587 

588 @property # type: ignore 

589 @_helpers.copy_docstring(google.auth.credentials.Signing) 

590 def signer(self): 

591 return self._signer 

592 

593 @property # type: ignore 

594 def additional_claims(self): 

595 """ Additional claims the JWT object was created with.""" 

596 return self._additional_claims 

597 

598 

599class OnDemandCredentials( 

600 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject 

601): 

602 """On-demand JWT credentials. 

603 

604 Like :class:`Credentials`, this class uses a JWT as the bearer token for 

605 authentication. However, this class does not require the audience at 

606 construction time. Instead, it will generate a new token on-demand for 

607 each request using the request URI as the audience. It caches tokens 

608 so that multiple requests to the same URI do not incur the overhead 

609 of generating a new token every time. 

610 

611 This behavior is especially useful for `gRPC`_ clients. A gRPC service may 

612 have multiple audience and gRPC clients may not know all of the audiences 

613 required for accessing a particular service. With these credentials, 

614 no knowledge of the audiences is required ahead of time. 

615 

616 .. _grpc: http://www.grpc.io/ 

617 """ 

618 

619 def __init__( 

620 self, 

621 signer, 

622 issuer, 

623 subject, 

624 additional_claims=None, 

625 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

626 max_cache_size=_DEFAULT_MAX_CACHE_SIZE, 

627 quota_project_id=None, 

628 ): 

629 """ 

630 Args: 

631 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

632 issuer (str): The `iss` claim. 

633 subject (str): The `sub` claim. 

634 additional_claims (Mapping[str, str]): Any additional claims for 

635 the JWT payload. 

636 token_lifetime (int): The amount of time in seconds for 

637 which the token is valid. Defaults to 1 hour. 

638 max_cache_size (int): The maximum number of JWT tokens to keep in 

639 cache. Tokens are cached using :class:`cachetools.LRUCache`. 

640 quota_project_id (Optional[str]): The project ID used for quota 

641 and billing. 

642 

643 """ 

644 super(OnDemandCredentials, self).__init__() 

645 self._signer = signer 

646 self._issuer = issuer 

647 self._subject = subject 

648 self._token_lifetime = token_lifetime 

649 self._quota_project_id = quota_project_id 

650 

651 if additional_claims is None: 

652 additional_claims = {} 

653 

654 self._additional_claims = additional_claims 

655 self._cache = cachetools.LRUCache(maxsize=max_cache_size) 

656 

657 @classmethod 

658 def _from_signer_and_info(cls, signer, info, **kwargs): 

659 """Creates an OnDemandCredentials instance from a signer and service 

660 account info. 

661 

662 Args: 

663 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

664 info (Mapping[str, str]): The service account info. 

665 kwargs: Additional arguments to pass to the constructor. 

666 

667 Returns: 

668 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

669 

670 Raises: 

671 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

672 """ 

673 kwargs.setdefault("subject", info["client_email"]) 

674 kwargs.setdefault("issuer", info["client_email"]) 

675 return cls(signer, **kwargs) 

676 

677 @classmethod 

678 def from_service_account_info(cls, info, **kwargs): 

679 """Creates an OnDemandCredentials instance from a dictionary. 

680 

681 Args: 

682 info (Mapping[str, str]): The service account info in Google 

683 format. 

684 kwargs: Additional arguments to pass to the constructor. 

685 

686 Returns: 

687 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

688 

689 Raises: 

690 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

691 """ 

692 signer = _service_account_info.from_dict(info, require=["client_email"]) 

693 return cls._from_signer_and_info(signer, info, **kwargs) 

694 

695 @classmethod 

696 def from_service_account_file(cls, filename, **kwargs): 

697 """Creates an OnDemandCredentials instance from a service account .json 

698 file in Google format. 

699 

700 Args: 

701 filename (str): The path to the service account .json file. 

702 kwargs: Additional arguments to pass to the constructor. 

703 

704 Returns: 

705 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

706 """ 

707 info, signer = _service_account_info.from_filename( 

708 filename, require=["client_email"] 

709 ) 

710 return cls._from_signer_and_info(signer, info, **kwargs) 

711 

712 @classmethod 

713 def from_signing_credentials(cls, credentials, **kwargs): 

714 """Creates a new :class:`google.auth.jwt.OnDemandCredentials` instance 

715 from an existing :class:`google.auth.credentials.Signing` instance. 

716 

717 The new instance will use the same signer as the existing instance and 

718 will use the existing instance's signer email as the issuer and 

719 subject by default. 

720 

721 Example:: 

722 

723 svc_creds = service_account.Credentials.from_service_account_file( 

724 'service_account.json') 

725 jwt_creds = jwt.OnDemandCredentials.from_signing_credentials( 

726 svc_creds) 

727 

728 Args: 

729 credentials (google.auth.credentials.Signing): The credentials to 

730 use to construct the new credentials. 

731 kwargs: Additional arguments to pass to the constructor. 

732 

733 Returns: 

734 google.auth.jwt.Credentials: A new Credentials instance. 

735 """ 

736 kwargs.setdefault("issuer", credentials.signer_email) 

737 kwargs.setdefault("subject", credentials.signer_email) 

738 return cls(credentials.signer, **kwargs) 

739 

740 def with_claims(self, issuer=None, subject=None, additional_claims=None): 

741 """Returns a copy of these credentials with modified claims. 

742 

743 Args: 

744 issuer (str): The `iss` claim. If unspecified the current issuer 

745 claim will be used. 

746 subject (str): The `sub` claim. If unspecified the current subject 

747 claim will be used. 

748 additional_claims (Mapping[str, str]): Any additional claims for 

749 the JWT payload. This will be merged with the current 

750 additional claims. 

751 

752 Returns: 

753 google.auth.jwt.OnDemandCredentials: A new credentials instance. 

754 """ 

755 new_additional_claims = copy.deepcopy(self._additional_claims) 

756 new_additional_claims.update(additional_claims or {}) 

757 

758 return self.__class__( 

759 self._signer, 

760 issuer=issuer if issuer is not None else self._issuer, 

761 subject=subject if subject is not None else self._subject, 

762 additional_claims=new_additional_claims, 

763 max_cache_size=self._cache.maxsize, 

764 quota_project_id=self._quota_project_id, 

765 ) 

766 

767 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject) 

768 def with_quota_project(self, quota_project_id): 

769 

770 return self.__class__( 

771 self._signer, 

772 issuer=self._issuer, 

773 subject=self._subject, 

774 additional_claims=self._additional_claims, 

775 max_cache_size=self._cache.maxsize, 

776 quota_project_id=quota_project_id, 

777 ) 

778 

779 @property 

780 def valid(self): 

781 """Checks the validity of the credentials. 

782 

783 These credentials are always valid because it generates tokens on 

784 demand. 

785 """ 

786 return True 

787 

788 def _make_jwt_for_audience(self, audience): 

789 """Make a new JWT for the given audience. 

790 

791 Args: 

792 audience (str): The intended audience. 

793 

794 Returns: 

795 Tuple[bytes, datetime]: The encoded JWT and the expiration. 

796 """ 

797 now = _helpers.utcnow() 

798 lifetime = datetime.timedelta(seconds=self._token_lifetime) 

799 expiry = now + lifetime 

800 

801 payload = { 

802 "iss": self._issuer, 

803 "sub": self._subject, 

804 "iat": _helpers.datetime_to_secs(now), 

805 "exp": _helpers.datetime_to_secs(expiry), 

806 "aud": audience, 

807 } 

808 

809 payload.update(self._additional_claims) 

810 

811 jwt = encode(self._signer, payload) 

812 

813 return jwt, expiry 

814 

815 def _get_jwt_for_audience(self, audience): 

816 """Get a JWT For a given audience. 

817 

818 If there is already an existing, non-expired token in the cache for 

819 the audience, that token is used. Otherwise, a new token will be 

820 created. 

821 

822 Args: 

823 audience (str): The intended audience. 

824 

825 Returns: 

826 bytes: The encoded JWT. 

827 """ 

828 token, expiry = self._cache.get(audience, (None, None)) 

829 

830 if token is None or expiry < _helpers.utcnow(): 

831 token, expiry = self._make_jwt_for_audience(audience) 

832 self._cache[audience] = token, expiry 

833 

834 return token 

835 

836 def refresh(self, request): 

837 """Raises an exception, these credentials can not be directly 

838 refreshed. 

839 

840 Args: 

841 request (Any): Unused. 

842 

843 Raises: 

844 google.auth.RefreshError 

845 """ 

846 # pylint: disable=unused-argument 

847 # (pylint doesn't correctly recognize overridden methods.) 

848 raise exceptions.RefreshError( 

849 "OnDemandCredentials can not be directly refreshed." 

850 ) 

851 

852 def before_request(self, request, method, url, headers): 

853 """Performs credential-specific before request logic. 

854 

855 Args: 

856 request (Any): Unused. JWT credentials do not need to make an 

857 HTTP request to refresh. 

858 method (str): The request's HTTP method. 

859 url (str): The request's URI. This is used as the audience claim 

860 when generating the JWT. 

861 headers (Mapping): The request's headers. 

862 """ 

863 # pylint: disable=unused-argument 

864 # (pylint doesn't correctly recognize overridden methods.) 

865 parts = urllib.parse.urlsplit(url) 

866 # Strip query string and fragment 

867 audience = urllib.parse.urlunsplit( 

868 (parts.scheme, parts.netloc, parts.path, "", "") 

869 ) 

870 token = self._get_jwt_for_audience(audience) 

871 self.apply(headers, token=token) 

872 

873 @_helpers.copy_docstring(google.auth.credentials.Signing) 

874 def sign_bytes(self, message): 

875 return self._signer.sign(message) 

876 

877 @property # type: ignore 

878 @_helpers.copy_docstring(google.auth.credentials.Signing) 

879 def signer_email(self): 

880 return self._issuer 

881 

882 @property # type: ignore 

883 @_helpers.copy_docstring(google.auth.credentials.Signing) 

884 def signer(self): 

885 return self._signer