Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/jwt.py: 32%

235 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""JSON Web Tokens 

16 

17Provides support for creating (encoding) and verifying (decoding) JWTs, 

18especially JWTs generated and consumed by Google infrastructure. 

19 

20See `rfc7519`_ for more details on JWTs. 

21 

22To encode a JWT use :func:`encode`:: 

23 

24 from google.auth import crypt 

25 from google.auth import jwt 

26 

27 signer = crypt.Signer(private_key) 

28 payload = {'some': 'payload'} 

29 encoded = jwt.encode(signer, payload) 

30 

31To decode a JWT and verify claims use :func:`decode`:: 

32 

33 claims = jwt.decode(encoded, certs=public_certs) 

34 

35You can also skip verification:: 

36 

37 claims = jwt.decode(encoded, verify=False) 

38 

39.. _rfc7519: https://tools.ietf.org/html/rfc7519 

40 

41""" 

42 

43try: 

44 from collections.abc import Mapping 

45# Python 2.7 compatibility 

46except ImportError: # pragma: NO COVER 

47 from collections import Mapping # type: ignore 

48import copy 

49import datetime 

50import json 

51 

52import cachetools 

53import six 

54from six.moves import urllib 

55 

56from google.auth import _helpers 

57from google.auth import _service_account_info 

58from google.auth import crypt 

59from google.auth import exceptions 

60import google.auth.credentials 

61 

62try: 

63 from google.auth.crypt import es256 

64except ImportError: # pragma: NO COVER 

65 es256 = None # type: ignore 

66 

67_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

68_DEFAULT_MAX_CACHE_SIZE = 10 

69_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier} 

70_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256"]) 

71 

72if es256 is not None: # pragma: NO COVER 

73 _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es256.ES256Verifier # type: ignore 

74 

75 

76def encode(signer, payload, header=None, key_id=None): 

77 """Make a signed JWT. 

78 

79 Args: 

80 signer (google.auth.crypt.Signer): The signer used to sign the JWT. 

81 payload (Mapping[str, str]): The JWT payload. 

82 header (Mapping[str, str]): Additional JWT header payload. 

83 key_id (str): The key id to add to the JWT header. If the 

84 signer has a key id it will be used as the default. If this is 

85 specified it will override the signer's key id. 

86 

87 Returns: 

88 bytes: The encoded JWT. 

89 """ 

90 if header is None: 

91 header = {} 

92 

93 if key_id is None: 

94 key_id = signer.key_id 

95 

96 header.update({"typ": "JWT"}) 

97 

98 if "alg" not in header: 

99 if es256 is not None and isinstance(signer, es256.ES256Signer): 

100 header.update({"alg": "ES256"}) 

101 else: 

102 header.update({"alg": "RS256"}) 

103 

104 if key_id is not None: 

105 header["kid"] = key_id 

106 

107 segments = [ 

108 _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")), 

109 _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")), 

110 ] 

111 

112 signing_input = b".".join(segments) 

113 signature = signer.sign(signing_input) 

114 segments.append(_helpers.unpadded_urlsafe_b64encode(signature)) 

115 

116 return b".".join(segments) 

117 

118 

119def _decode_jwt_segment(encoded_section): 

120 """Decodes a single JWT segment.""" 

121 section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section) 

122 try: 

123 return json.loads(section_bytes.decode("utf-8")) 

124 except ValueError as caught_exc: 

125 new_exc = exceptions.MalformedError( 

126 "Can't parse segment: {0}".format(section_bytes) 

127 ) 

128 six.raise_from(new_exc, caught_exc) 

129 

130 

131def _unverified_decode(token): 

132 """Decodes a token and does no verification. 

133 

134 Args: 

135 token (Union[str, bytes]): The encoded JWT. 

136 

137 Returns: 

138 Tuple[Mapping, Mapping, str, str]: header, payload, signed_section, and 

139 signature. 

140 

141 Raises: 

142 google.auth.exceptions.MalformedError: if there are an incorrect amount of segments in the token or segments of the wrong type. 

143 """ 

144 token = _helpers.to_bytes(token) 

145 

146 if token.count(b".") != 2: 

147 raise exceptions.MalformedError( 

148 "Wrong number of segments in token: {0}".format(token) 

149 ) 

150 

151 encoded_header, encoded_payload, signature = token.split(b".") 

152 signed_section = encoded_header + b"." + encoded_payload 

153 signature = _helpers.padded_urlsafe_b64decode(signature) 

154 

155 # Parse segments 

156 header = _decode_jwt_segment(encoded_header) 

157 payload = _decode_jwt_segment(encoded_payload) 

158 

159 if not isinstance(header, Mapping): 

160 raise exceptions.MalformedError( 

161 "Header segment should be a JSON object: {0}".format(encoded_header) 

162 ) 

163 

164 if not isinstance(payload, Mapping): 

165 raise exceptions.MalformedError( 

166 "Payload segment should be a JSON object: {0}".format(encoded_payload) 

167 ) 

168 

169 return header, payload, signed_section, signature 

170 

171 

172def decode_header(token): 

173 """Return the decoded header of a token. 

174 

175 No verification is done. This is useful to extract the key id from 

176 the header in order to acquire the appropriate certificate to verify 

177 the token. 

178 

179 Args: 

180 token (Union[str, bytes]): the encoded JWT. 

181 

182 Returns: 

183 Mapping: The decoded JWT header. 

184 """ 

185 header, _, _, _ = _unverified_decode(token) 

186 return header 

187 

188 

189def _verify_iat_and_exp(payload, clock_skew_in_seconds=0): 

190 """Verifies the ``iat`` (Issued At) and ``exp`` (Expires) claims in a token 

191 payload. 

192 

193 Args: 

194 payload (Mapping[str, str]): The JWT payload. 

195 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` 

196 validation. 

197 

198 Raises: 

199 google.auth.exceptions.InvalidValue: if value validation failed. 

200 google.auth.exceptions.MalformedError: if schema validation failed. 

201 """ 

202 now = _helpers.datetime_to_secs(_helpers.utcnow()) 

203 

204 # Make sure the iat and exp claims are present. 

205 for key in ("iat", "exp"): 

206 if key not in payload: 

207 raise exceptions.MalformedError( 

208 "Token does not contain required claim {}".format(key) 

209 ) 

210 

211 # Make sure the token wasn't issued in the future. 

212 iat = payload["iat"] 

213 # Err on the side of accepting a token that is slightly early to account 

214 # for clock skew. 

215 earliest = iat - clock_skew_in_seconds 

216 if now < earliest: 

217 raise exceptions.InvalidValue( 

218 "Token used too early, {} < {}. Check that your computer's clock is set correctly.".format( 

219 now, iat 

220 ) 

221 ) 

222 

223 # Make sure the token wasn't issued in the past. 

224 exp = payload["exp"] 

225 # Err on the side of accepting a token that is slightly out of date 

226 # to account for clow skew. 

227 latest = exp + clock_skew_in_seconds 

228 if latest < now: 

229 raise exceptions.InvalidValue("Token expired, {} < {}".format(latest, now)) 

230 

231 

232def decode(token, certs=None, verify=True, audience=None, clock_skew_in_seconds=0): 

233 """Decode and verify a JWT. 

234 

235 Args: 

236 token (str): The encoded JWT. 

237 certs (Union[str, bytes, Mapping[str, Union[str, bytes]]]): The 

238 certificate used to validate the JWT signature. If bytes or string, 

239 it must the the public key certificate in PEM format. If a mapping, 

240 it must be a mapping of key IDs to public key certificates in PEM 

241 format. The mapping must contain the same key ID that's specified 

242 in the token's header. 

243 verify (bool): Whether to perform signature and claim validation. 

244 Verification is done by default. 

245 audience (str or list): The audience claim, 'aud', that this JWT should 

246 contain. Or a list of audience claims. If None then the JWT's 'aud' 

247 parameter is not verified. 

248 clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` 

249 validation. 

250 

251 Returns: 

252 Mapping[str, str]: The deserialized JSON payload in the JWT. 

253 

254 Raises: 

255 google.auth.exceptions.InvalidValue: if value validation failed. 

256 google.auth.exceptions.MalformedError: if schema validation failed. 

257 """ 

258 header, payload, signed_section, signature = _unverified_decode(token) 

259 

260 if not verify: 

261 return payload 

262 

263 # Pluck the key id and algorithm from the header and make sure we have 

264 # a verifier that can support it. 

265 key_alg = header.get("alg") 

266 key_id = header.get("kid") 

267 

268 try: 

269 verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg] 

270 except KeyError as exc: 

271 if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS: 

272 six.raise_from( 

273 exceptions.InvalidValue( 

274 "The key algorithm {} requires the cryptography package " 

275 "to be installed.".format(key_alg) 

276 ), 

277 exc, 

278 ) 

279 else: 

280 six.raise_from( 

281 exceptions.InvalidValue( 

282 "Unsupported signature algorithm {}".format(key_alg) 

283 ), 

284 exc, 

285 ) 

286 

287 # If certs is specified as a dictionary of key IDs to certificates, then 

288 # use the certificate identified by the key ID in the token header. 

289 if isinstance(certs, Mapping): 

290 if key_id: 

291 if key_id not in certs: 

292 raise exceptions.MalformedError( 

293 "Certificate for key id {} not found.".format(key_id) 

294 ) 

295 certs_to_check = [certs[key_id]] 

296 # If there's no key id in the header, check against all of the certs. 

297 else: 

298 certs_to_check = certs.values() 

299 else: 

300 certs_to_check = certs 

301 

302 # Verify that the signature matches the message. 

303 if not crypt.verify_signature( 

304 signed_section, signature, certs_to_check, verifier_cls 

305 ): 

306 raise exceptions.MalformedError("Could not verify token signature.") 

307 

308 # Verify the issued at and created times in the payload. 

309 _verify_iat_and_exp(payload, clock_skew_in_seconds) 

310 

311 # Check audience. 

312 if audience is not None: 

313 claim_audience = payload.get("aud") 

314 if isinstance(audience, str): 

315 audience = [audience] 

316 if claim_audience not in audience: 

317 raise exceptions.InvalidValue( 

318 "Token has wrong audience {}, expected one of {}".format( 

319 claim_audience, audience 

320 ) 

321 ) 

322 

323 return payload 

324 

325 

326class Credentials( 

327 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject 

328): 

329 """Credentials that use a JWT as the bearer token. 

330 

331 These credentials require an "audience" claim. This claim identifies the 

332 intended recipient of the bearer token. 

333 

334 The constructor arguments determine the claims for the JWT that is 

335 sent with requests. Usually, you'll construct these credentials with 

336 one of the helper constructors as shown in the next section. 

337 

338 To create JWT credentials using a Google service account private key 

339 JSON file:: 

340 

341 audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher' 

342 credentials = jwt.Credentials.from_service_account_file( 

343 'service-account.json', 

344 audience=audience) 

345 

346 If you already have the service account file loaded and parsed:: 

347 

348 service_account_info = json.load(open('service_account.json')) 

349 credentials = jwt.Credentials.from_service_account_info( 

350 service_account_info, 

351 audience=audience) 

352 

353 Both helper methods pass on arguments to the constructor, so you can 

354 specify the JWT claims:: 

355 

356 credentials = jwt.Credentials.from_service_account_file( 

357 'service-account.json', 

358 audience=audience, 

359 additional_claims={'meta': 'data'}) 

360 

361 You can also construct the credentials directly if you have a 

362 :class:`~google.auth.crypt.Signer` instance:: 

363 

364 credentials = jwt.Credentials( 

365 signer, 

366 issuer='your-issuer', 

367 subject='your-subject', 

368 audience=audience) 

369 

370 The claims are considered immutable. If you want to modify the claims, 

371 you can easily create another instance using :meth:`with_claims`:: 

372 

373 new_audience = ( 

374 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber') 

375 new_credentials = credentials.with_claims(audience=new_audience) 

376 """ 

377 

378 def __init__( 

379 self, 

380 signer, 

381 issuer, 

382 subject, 

383 audience, 

384 additional_claims=None, 

385 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

386 quota_project_id=None, 

387 ): 

388 """ 

389 Args: 

390 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

391 issuer (str): The `iss` claim. 

392 subject (str): The `sub` claim. 

393 audience (str): the `aud` claim. The intended audience for the 

394 credentials. 

395 additional_claims (Mapping[str, str]): Any additional claims for 

396 the JWT payload. 

397 token_lifetime (int): The amount of time in seconds for 

398 which the token is valid. Defaults to 1 hour. 

399 quota_project_id (Optional[str]): The project ID used for quota 

400 and billing. 

401 """ 

402 super(Credentials, self).__init__() 

403 self._signer = signer 

404 self._issuer = issuer 

405 self._subject = subject 

406 self._audience = audience 

407 self._token_lifetime = token_lifetime 

408 self._quota_project_id = quota_project_id 

409 

410 if additional_claims is None: 

411 additional_claims = {} 

412 

413 self._additional_claims = additional_claims 

414 

415 @classmethod 

416 def _from_signer_and_info(cls, signer, info, **kwargs): 

417 """Creates a Credentials instance from a signer and service account 

418 info. 

419 

420 Args: 

421 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

422 info (Mapping[str, str]): The service account info. 

423 kwargs: Additional arguments to pass to the constructor. 

424 

425 Returns: 

426 google.auth.jwt.Credentials: The constructed credentials. 

427 

428 Raises: 

429 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

430 """ 

431 kwargs.setdefault("subject", info["client_email"]) 

432 kwargs.setdefault("issuer", info["client_email"]) 

433 return cls(signer, **kwargs) 

434 

435 @classmethod 

436 def from_service_account_info(cls, info, **kwargs): 

437 """Creates an Credentials instance from a dictionary. 

438 

439 Args: 

440 info (Mapping[str, str]): The service account info in Google 

441 format. 

442 kwargs: Additional arguments to pass to the constructor. 

443 

444 Returns: 

445 google.auth.jwt.Credentials: The constructed credentials. 

446 

447 Raises: 

448 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

449 """ 

450 signer = _service_account_info.from_dict(info, require=["client_email"]) 

451 return cls._from_signer_and_info(signer, info, **kwargs) 

452 

453 @classmethod 

454 def from_service_account_file(cls, filename, **kwargs): 

455 """Creates a Credentials instance from a service account .json file 

456 in Google format. 

457 

458 Args: 

459 filename (str): The path to the service account .json file. 

460 kwargs: Additional arguments to pass to the constructor. 

461 

462 Returns: 

463 google.auth.jwt.Credentials: The constructed credentials. 

464 """ 

465 info, signer = _service_account_info.from_filename( 

466 filename, require=["client_email"] 

467 ) 

468 return cls._from_signer_and_info(signer, info, **kwargs) 

469 

470 @classmethod 

471 def from_signing_credentials(cls, credentials, audience, **kwargs): 

472 """Creates a new :class:`google.auth.jwt.Credentials` instance from an 

473 existing :class:`google.auth.credentials.Signing` instance. 

474 

475 The new instance will use the same signer as the existing instance and 

476 will use the existing instance's signer email as the issuer and 

477 subject by default. 

478 

479 Example:: 

480 

481 svc_creds = service_account.Credentials.from_service_account_file( 

482 'service_account.json') 

483 audience = ( 

484 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher') 

485 jwt_creds = jwt.Credentials.from_signing_credentials( 

486 svc_creds, audience=audience) 

487 

488 Args: 

489 credentials (google.auth.credentials.Signing): The credentials to 

490 use to construct the new credentials. 

491 audience (str): the `aud` claim. The intended audience for the 

492 credentials. 

493 kwargs: Additional arguments to pass to the constructor. 

494 

495 Returns: 

496 google.auth.jwt.Credentials: A new Credentials instance. 

497 """ 

498 kwargs.setdefault("issuer", credentials.signer_email) 

499 kwargs.setdefault("subject", credentials.signer_email) 

500 return cls(credentials.signer, audience=audience, **kwargs) 

501 

502 def with_claims( 

503 self, issuer=None, subject=None, audience=None, additional_claims=None 

504 ): 

505 """Returns a copy of these credentials with modified claims. 

506 

507 Args: 

508 issuer (str): The `iss` claim. If unspecified the current issuer 

509 claim will be used. 

510 subject (str): The `sub` claim. If unspecified the current subject 

511 claim will be used. 

512 audience (str): the `aud` claim. If unspecified the current 

513 audience claim will be used. 

514 additional_claims (Mapping[str, str]): Any additional claims for 

515 the JWT payload. This will be merged with the current 

516 additional claims. 

517 

518 Returns: 

519 google.auth.jwt.Credentials: A new credentials instance. 

520 """ 

521 new_additional_claims = copy.deepcopy(self._additional_claims) 

522 new_additional_claims.update(additional_claims or {}) 

523 

524 return self.__class__( 

525 self._signer, 

526 issuer=issuer if issuer is not None else self._issuer, 

527 subject=subject if subject is not None else self._subject, 

528 audience=audience if audience is not None else self._audience, 

529 additional_claims=new_additional_claims, 

530 quota_project_id=self._quota_project_id, 

531 ) 

532 

533 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject) 

534 def with_quota_project(self, quota_project_id): 

535 return self.__class__( 

536 self._signer, 

537 issuer=self._issuer, 

538 subject=self._subject, 

539 audience=self._audience, 

540 additional_claims=self._additional_claims, 

541 quota_project_id=quota_project_id, 

542 ) 

543 

544 def _make_jwt(self): 

545 """Make a signed JWT. 

546 

547 Returns: 

548 Tuple[bytes, datetime]: The encoded JWT and the expiration. 

549 """ 

550 now = _helpers.utcnow() 

551 lifetime = datetime.timedelta(seconds=self._token_lifetime) 

552 expiry = now + lifetime 

553 

554 payload = { 

555 "iss": self._issuer, 

556 "sub": self._subject, 

557 "iat": _helpers.datetime_to_secs(now), 

558 "exp": _helpers.datetime_to_secs(expiry), 

559 } 

560 if self._audience: 

561 payload["aud"] = self._audience 

562 

563 payload.update(self._additional_claims) 

564 

565 jwt = encode(self._signer, payload) 

566 

567 return jwt, expiry 

568 

569 def refresh(self, request): 

570 """Refreshes the access token. 

571 

572 Args: 

573 request (Any): Unused. 

574 """ 

575 # pylint: disable=unused-argument 

576 # (pylint doesn't correctly recognize overridden methods.) 

577 self.token, self.expiry = self._make_jwt() 

578 

579 @_helpers.copy_docstring(google.auth.credentials.Signing) 

580 def sign_bytes(self, message): 

581 return self._signer.sign(message) 

582 

583 @property # type: ignore 

584 @_helpers.copy_docstring(google.auth.credentials.Signing) 

585 def signer_email(self): 

586 return self._issuer 

587 

588 @property # type: ignore 

589 @_helpers.copy_docstring(google.auth.credentials.Signing) 

590 def signer(self): 

591 return self._signer 

592 

593 

594class OnDemandCredentials( 

595 google.auth.credentials.Signing, google.auth.credentials.CredentialsWithQuotaProject 

596): 

597 """On-demand JWT credentials. 

598 

599 Like :class:`Credentials`, this class uses a JWT as the bearer token for 

600 authentication. However, this class does not require the audience at 

601 construction time. Instead, it will generate a new token on-demand for 

602 each request using the request URI as the audience. It caches tokens 

603 so that multiple requests to the same URI do not incur the overhead 

604 of generating a new token every time. 

605 

606 This behavior is especially useful for `gRPC`_ clients. A gRPC service may 

607 have multiple audience and gRPC clients may not know all of the audiences 

608 required for accessing a particular service. With these credentials, 

609 no knowledge of the audiences is required ahead of time. 

610 

611 .. _grpc: http://www.grpc.io/ 

612 """ 

613 

614 def __init__( 

615 self, 

616 signer, 

617 issuer, 

618 subject, 

619 additional_claims=None, 

620 token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

621 max_cache_size=_DEFAULT_MAX_CACHE_SIZE, 

622 quota_project_id=None, 

623 ): 

624 """ 

625 Args: 

626 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

627 issuer (str): The `iss` claim. 

628 subject (str): The `sub` claim. 

629 additional_claims (Mapping[str, str]): Any additional claims for 

630 the JWT payload. 

631 token_lifetime (int): The amount of time in seconds for 

632 which the token is valid. Defaults to 1 hour. 

633 max_cache_size (int): The maximum number of JWT tokens to keep in 

634 cache. Tokens are cached using :class:`cachetools.LRUCache`. 

635 quota_project_id (Optional[str]): The project ID used for quota 

636 and billing. 

637 

638 """ 

639 super(OnDemandCredentials, self).__init__() 

640 self._signer = signer 

641 self._issuer = issuer 

642 self._subject = subject 

643 self._token_lifetime = token_lifetime 

644 self._quota_project_id = quota_project_id 

645 

646 if additional_claims is None: 

647 additional_claims = {} 

648 

649 self._additional_claims = additional_claims 

650 self._cache = cachetools.LRUCache(maxsize=max_cache_size) 

651 

652 @classmethod 

653 def _from_signer_and_info(cls, signer, info, **kwargs): 

654 """Creates an OnDemandCredentials instance from a signer and service 

655 account info. 

656 

657 Args: 

658 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

659 info (Mapping[str, str]): The service account info. 

660 kwargs: Additional arguments to pass to the constructor. 

661 

662 Returns: 

663 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

664 

665 Raises: 

666 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

667 """ 

668 kwargs.setdefault("subject", info["client_email"]) 

669 kwargs.setdefault("issuer", info["client_email"]) 

670 return cls(signer, **kwargs) 

671 

672 @classmethod 

673 def from_service_account_info(cls, info, **kwargs): 

674 """Creates an OnDemandCredentials instance from a dictionary. 

675 

676 Args: 

677 info (Mapping[str, str]): The service account info in Google 

678 format. 

679 kwargs: Additional arguments to pass to the constructor. 

680 

681 Returns: 

682 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

683 

684 Raises: 

685 google.auth.exceptions.MalformedError: If the info is not in the expected format. 

686 """ 

687 signer = _service_account_info.from_dict(info, require=["client_email"]) 

688 return cls._from_signer_and_info(signer, info, **kwargs) 

689 

690 @classmethod 

691 def from_service_account_file(cls, filename, **kwargs): 

692 """Creates an OnDemandCredentials instance from a service account .json 

693 file in Google format. 

694 

695 Args: 

696 filename (str): The path to the service account .json file. 

697 kwargs: Additional arguments to pass to the constructor. 

698 

699 Returns: 

700 google.auth.jwt.OnDemandCredentials: The constructed credentials. 

701 """ 

702 info, signer = _service_account_info.from_filename( 

703 filename, require=["client_email"] 

704 ) 

705 return cls._from_signer_and_info(signer, info, **kwargs) 

706 

707 @classmethod 

708 def from_signing_credentials(cls, credentials, **kwargs): 

709 """Creates a new :class:`google.auth.jwt.OnDemandCredentials` instance 

710 from an existing :class:`google.auth.credentials.Signing` instance. 

711 

712 The new instance will use the same signer as the existing instance and 

713 will use the existing instance's signer email as the issuer and 

714 subject by default. 

715 

716 Example:: 

717 

718 svc_creds = service_account.Credentials.from_service_account_file( 

719 'service_account.json') 

720 jwt_creds = jwt.OnDemandCredentials.from_signing_credentials( 

721 svc_creds) 

722 

723 Args: 

724 credentials (google.auth.credentials.Signing): The credentials to 

725 use to construct the new credentials. 

726 kwargs: Additional arguments to pass to the constructor. 

727 

728 Returns: 

729 google.auth.jwt.Credentials: A new Credentials instance. 

730 """ 

731 kwargs.setdefault("issuer", credentials.signer_email) 

732 kwargs.setdefault("subject", credentials.signer_email) 

733 return cls(credentials.signer, **kwargs) 

734 

735 def with_claims(self, issuer=None, subject=None, additional_claims=None): 

736 """Returns a copy of these credentials with modified claims. 

737 

738 Args: 

739 issuer (str): The `iss` claim. If unspecified the current issuer 

740 claim will be used. 

741 subject (str): The `sub` claim. If unspecified the current subject 

742 claim will be used. 

743 additional_claims (Mapping[str, str]): Any additional claims for 

744 the JWT payload. This will be merged with the current 

745 additional claims. 

746 

747 Returns: 

748 google.auth.jwt.OnDemandCredentials: A new credentials instance. 

749 """ 

750 new_additional_claims = copy.deepcopy(self._additional_claims) 

751 new_additional_claims.update(additional_claims or {}) 

752 

753 return self.__class__( 

754 self._signer, 

755 issuer=issuer if issuer is not None else self._issuer, 

756 subject=subject if subject is not None else self._subject, 

757 additional_claims=new_additional_claims, 

758 max_cache_size=self._cache.maxsize, 

759 quota_project_id=self._quota_project_id, 

760 ) 

761 

762 @_helpers.copy_docstring(google.auth.credentials.CredentialsWithQuotaProject) 

763 def with_quota_project(self, quota_project_id): 

764 

765 return self.__class__( 

766 self._signer, 

767 issuer=self._issuer, 

768 subject=self._subject, 

769 additional_claims=self._additional_claims, 

770 max_cache_size=self._cache.maxsize, 

771 quota_project_id=quota_project_id, 

772 ) 

773 

774 @property 

775 def valid(self): 

776 """Checks the validity of the credentials. 

777 

778 These credentials are always valid because it generates tokens on 

779 demand. 

780 """ 

781 return True 

782 

783 def _make_jwt_for_audience(self, audience): 

784 """Make a new JWT for the given audience. 

785 

786 Args: 

787 audience (str): The intended audience. 

788 

789 Returns: 

790 Tuple[bytes, datetime]: The encoded JWT and the expiration. 

791 """ 

792 now = _helpers.utcnow() 

793 lifetime = datetime.timedelta(seconds=self._token_lifetime) 

794 expiry = now + lifetime 

795 

796 payload = { 

797 "iss": self._issuer, 

798 "sub": self._subject, 

799 "iat": _helpers.datetime_to_secs(now), 

800 "exp": _helpers.datetime_to_secs(expiry), 

801 "aud": audience, 

802 } 

803 

804 payload.update(self._additional_claims) 

805 

806 jwt = encode(self._signer, payload) 

807 

808 return jwt, expiry 

809 

810 def _get_jwt_for_audience(self, audience): 

811 """Get a JWT For a given audience. 

812 

813 If there is already an existing, non-expired token in the cache for 

814 the audience, that token is used. Otherwise, a new token will be 

815 created. 

816 

817 Args: 

818 audience (str): The intended audience. 

819 

820 Returns: 

821 bytes: The encoded JWT. 

822 """ 

823 token, expiry = self._cache.get(audience, (None, None)) 

824 

825 if token is None or expiry < _helpers.utcnow(): 

826 token, expiry = self._make_jwt_for_audience(audience) 

827 self._cache[audience] = token, expiry 

828 

829 return token 

830 

831 def refresh(self, request): 

832 """Raises an exception, these credentials can not be directly 

833 refreshed. 

834 

835 Args: 

836 request (Any): Unused. 

837 

838 Raises: 

839 google.auth.RefreshError 

840 """ 

841 # pylint: disable=unused-argument 

842 # (pylint doesn't correctly recognize overridden methods.) 

843 raise exceptions.RefreshError( 

844 "OnDemandCredentials can not be directly refreshed." 

845 ) 

846 

847 def before_request(self, request, method, url, headers): 

848 """Performs credential-specific before request logic. 

849 

850 Args: 

851 request (Any): Unused. JWT credentials do not need to make an 

852 HTTP request to refresh. 

853 method (str): The request's HTTP method. 

854 url (str): The request's URI. This is used as the audience claim 

855 when generating the JWT. 

856 headers (Mapping): The request's headers. 

857 """ 

858 # pylint: disable=unused-argument 

859 # (pylint doesn't correctly recognize overridden methods.) 

860 parts = urllib.parse.urlsplit(url) 

861 # Strip query string and fragment 

862 audience = urllib.parse.urlunsplit( 

863 (parts.scheme, parts.netloc, parts.path, "", "") 

864 ) 

865 token = self._get_jwt_for_audience(audience) 

866 self.apply(headers, token=token) 

867 

868 @_helpers.copy_docstring(google.auth.credentials.Signing) 

869 def sign_bytes(self, message): 

870 return self._signer.sign(message) 

871 

872 @property # type: ignore 

873 @_helpers.copy_docstring(google.auth.credentials.Signing) 

874 def signer_email(self): 

875 return self._issuer 

876 

877 @property # type: ignore 

878 @_helpers.copy_docstring(google.auth.credentials.Signing) 

879 def signer(self): 

880 return self._signer