Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/credentials.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

201 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16"""Interfaces for credentials.""" 

17 

18import abc 

19from enum import Enum 

20import os 

21 

22from google.auth import _helpers, environment_vars 

23from google.auth import exceptions 

24from google.auth import metrics 

25from google.auth._credentials_base import _BaseCredentials 

26from google.auth._default import _LOGGER 

27from google.auth._refresh_worker import RefreshThreadManager 

28 

29DEFAULT_UNIVERSE_DOMAIN = "googleapis.com" 

30NO_OP_TRUST_BOUNDARY_LOCATIONS: list[str] = [] 

31NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0" 

32 

33 

34class Credentials(_BaseCredentials): 

35 """Base class for all credentials. 

36 

37 All credentials have a :attr:`token` that is used for authentication and 

38 may also optionally set an :attr:`expiry` to indicate when the token will 

39 no longer be valid. 

40 

41 Most credentials will be :attr:`invalid` until :meth:`refresh` is called. 

42 Credentials can do this automatically before the first HTTP request in 

43 :meth:`before_request`. 

44 

45 Although the token and expiration will change as the credentials are 

46 :meth:`refreshed <refresh>` and used, credentials should be considered 

47 immutable. Various credentials will accept configuration such as private 

48 keys, scopes, and other options. These options are not changeable after 

49 construction. Some classes will provide mechanisms to copy the credentials 

50 with modifications such as :meth:`ScopedCredentials.with_scopes`. 

51 """ 

52 

53 def __init__(self): 

54 super(Credentials, self).__init__() 

55 

56 self.expiry = None 

57 """Optional[datetime]: When the token expires and is no longer valid. 

58 If this is None, the token is assumed to never expire.""" 

59 self._quota_project_id = None 

60 """Optional[str]: Project to use for quota and billing purposes.""" 

61 self._trust_boundary = None 

62 """Optional[dict]: Cache of a trust boundary response which has a list 

63 of allowed regions and an encoded string representation of credentials 

64 trust boundary.""" 

65 self._universe_domain = DEFAULT_UNIVERSE_DOMAIN 

66 """Optional[str]: The universe domain value, default is googleapis.com 

67 """ 

68 

69 self._use_non_blocking_refresh = False 

70 self._refresh_worker = RefreshThreadManager() 

71 

72 @property 

73 def expired(self): 

74 """Checks if the credentials are expired. 

75 

76 Note that credentials can be invalid but not expired because 

77 Credentials with :attr:`expiry` set to None is considered to never 

78 expire. 

79 

80 .. deprecated:: v2.24.0 

81 Prefer checking :attr:`token_state` instead. 

82 """ 

83 if not self.expiry: 

84 return False 

85 # Remove some threshold from expiry to err on the side of reporting 

86 # expiration early so that we avoid the 401-refresh-retry loop. 

87 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD 

88 return _helpers.utcnow() >= skewed_expiry 

89 

90 @property 

91 def valid(self): 

92 """Checks the validity of the credentials. 

93 

94 This is True if the credentials have a :attr:`token` and the token 

95 is not :attr:`expired`. 

96 

97 .. deprecated:: v2.24.0 

98 Prefer checking :attr:`token_state` instead. 

99 """ 

100 return self.token is not None and not self.expired 

101 

102 @property 

103 def token_state(self): 

104 """ 

105 See `:obj:`TokenState` 

106 """ 

107 if self.token is None: 

108 return TokenState.INVALID 

109 

110 # Credentials that can't expire are always treated as fresh. 

111 if self.expiry is None: 

112 return TokenState.FRESH 

113 

114 expired = _helpers.utcnow() >= self.expiry 

115 if expired: 

116 return TokenState.INVALID 

117 

118 is_stale = _helpers.utcnow() >= (self.expiry - _helpers.REFRESH_THRESHOLD) 

119 if is_stale: 

120 return TokenState.STALE 

121 

122 return TokenState.FRESH 

123 

124 @property 

125 def quota_project_id(self): 

126 """Project to use for quota and billing purposes.""" 

127 return self._quota_project_id 

128 

129 @property 

130 def universe_domain(self): 

131 """The universe domain value.""" 

132 return self._universe_domain 

133 

134 def get_cred_info(self): 

135 """The credential information JSON. 

136 

137 The credential information will be added to auth related error messages 

138 by client library. 

139 

140 Returns: 

141 Mapping[str, str]: The credential information JSON. 

142 """ 

143 return None 

144 

145 @abc.abstractmethod 

146 def refresh(self, request): 

147 """Refreshes the access token. 

148 

149 Args: 

150 request (google.auth.transport.Request): The object used to make 

151 HTTP requests. 

152 

153 Raises: 

154 google.auth.exceptions.RefreshError: If the credentials could 

155 not be refreshed. 

156 """ 

157 # pylint: disable=missing-raises-doc 

158 # (pylint doesn't recognize that this is abstract) 

159 raise NotImplementedError("Refresh must be implemented") 

160 

161 def _metric_header_for_usage(self): 

162 """The x-goog-api-client header for token usage metric. 

163 

164 This header will be added to the API service requests in before_request 

165 method. For example, "cred-type/sa-jwt" means service account self 

166 signed jwt access token is used in the API service request 

167 authorization header. Children credentials classes need to override 

168 this method to provide the header value, if the token usage metric is 

169 needed. 

170 

171 Returns: 

172 str: The x-goog-api-client header value. 

173 """ 

174 return None 

175 

176 def apply(self, headers, token=None): 

177 """Apply the token to the authentication header. 

178 

179 Args: 

180 headers (Mapping): The HTTP request headers. 

181 token (Optional[str]): If specified, overrides the current access 

182 token. 

183 """ 

184 self._apply(headers, token) 

185 if self.quota_project_id: 

186 headers["x-goog-user-project"] = self.quota_project_id 

187 

188 def _blocking_refresh(self, request): 

189 if not self.valid: 

190 self.refresh(request) 

191 

192 def _non_blocking_refresh(self, request): 

193 use_blocking_refresh_fallback = False 

194 

195 if self.token_state == TokenState.STALE: 

196 use_blocking_refresh_fallback = not self._refresh_worker.start_refresh( 

197 self, request 

198 ) 

199 

200 if self.token_state == TokenState.INVALID or use_blocking_refresh_fallback: 

201 self.refresh(request) 

202 # If the blocking refresh succeeds then we can clear the error info 

203 # on the background refresh worker, and perform refreshes in a 

204 # background thread. 

205 self._refresh_worker.clear_error() 

206 

207 def before_request(self, request, method, url, headers): 

208 """Performs credential-specific before request logic. 

209 

210 Refreshes the credentials if necessary, then calls :meth:`apply` to 

211 apply the token to the authentication header. 

212 

213 Args: 

214 request (google.auth.transport.Request): The object used to make 

215 HTTP requests. 

216 method (str): The request's HTTP method or the RPC method being 

217 invoked. 

218 url (str): The request's URI or the RPC service's URI. 

219 headers (Mapping): The request's headers. 

220 """ 

221 # pylint: disable=unused-argument 

222 # (Subclasses may use these arguments to ascertain information about 

223 # the http request.) 

224 if self._use_non_blocking_refresh: 

225 self._non_blocking_refresh(request) 

226 else: 

227 self._blocking_refresh(request) 

228 

229 metrics.add_metric_header(headers, self._metric_header_for_usage()) 

230 self.apply(headers) 

231 

232 def with_non_blocking_refresh(self): 

233 self._use_non_blocking_refresh = True 

234 

235 

236class CredentialsWithQuotaProject(Credentials): 

237 """Abstract base for credentials supporting ``with_quota_project`` factory""" 

238 

239 def with_quota_project(self, quota_project_id): 

240 """Returns a copy of these credentials with a modified quota project. 

241 

242 Args: 

243 quota_project_id (str): The project to use for quota and 

244 billing purposes 

245 

246 Returns: 

247 google.auth.credentials.Credentials: A new credentials instance. 

248 """ 

249 raise NotImplementedError("This credential does not support quota project.") 

250 

251 def with_quota_project_from_environment(self): 

252 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT) 

253 if quota_from_env: 

254 return self.with_quota_project(quota_from_env) 

255 return self 

256 

257 

258class CredentialsWithTokenUri(Credentials): 

259 """Abstract base for credentials supporting ``with_token_uri`` factory""" 

260 

261 def with_token_uri(self, token_uri): 

262 """Returns a copy of these credentials with a modified token uri. 

263 

264 Args: 

265 token_uri (str): The uri to use for fetching/exchanging tokens 

266 

267 Returns: 

268 google.auth.credentials.Credentials: A new credentials instance. 

269 """ 

270 raise NotImplementedError("This credential does not use token uri.") 

271 

272 

273class CredentialsWithUniverseDomain(Credentials): 

274 """Abstract base for credentials supporting ``with_universe_domain`` factory""" 

275 

276 def with_universe_domain(self, universe_domain): 

277 """Returns a copy of these credentials with a modified universe domain. 

278 

279 Args: 

280 universe_domain (str): The universe domain to use 

281 

282 Returns: 

283 google.auth.credentials.Credentials: A new credentials instance. 

284 """ 

285 raise NotImplementedError( 

286 "This credential does not support with_universe_domain." 

287 ) 

288 

289 

290class CredentialsWithTrustBoundary(Credentials): 

291 """Abstract base for credentials supporting ``with_trust_boundary`` factory""" 

292 

293 @abc.abstractmethod 

294 def _refresh_token(self, request): 

295 """Refreshes the access token. 

296 

297 Args: 

298 request (google.auth.transport.Request): The object used to make 

299 HTTP requests. 

300 

301 Raises: 

302 google.auth.exceptions.RefreshError: If the credentials could 

303 not be refreshed. 

304 """ 

305 raise NotImplementedError("_refresh_token must be implemented") 

306 

307 def with_trust_boundary(self, trust_boundary): 

308 """Returns a copy of these credentials with a modified trust boundary. 

309 

310 Args: 

311 trust_boundary Mapping[str, str]: The trust boundary to use for the 

312 credential. This should be a map with a "locations" key that maps to 

313 a list of GCP regions, and a "encodedLocations" key that maps to a 

314 hex string. 

315 

316 Returns: 

317 google.auth.credentials.Credentials: A new credentials instance. 

318 """ 

319 raise NotImplementedError("This credential does not support trust boundaries.") 

320 

321 def _is_trust_boundary_lookup_required(self): 

322 """Checks if a trust boundary lookup is required. 

323 

324 A lookup is required if the feature is enabled via an environment 

325 variable, the universe domain is supported, and a no-op boundary 

326 is not already cached. 

327 

328 Returns: 

329 bool: True if a trust boundary lookup is required, False otherwise. 

330 """ 

331 # 1. Check if the feature is enabled via environment variable. 

332 if not _helpers.get_bool_from_env( 

333 environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False 

334 ): 

335 return False 

336 

337 # 2. Skip trust boundary flow for non-default universe domains. 

338 if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: 

339 return False 

340 

341 # 3. Do not trigger refresh if credential has a cached no-op trust boundary. 

342 return not self._has_no_op_trust_boundary() 

343 

344 def _get_trust_boundary_header(self): 

345 if self._trust_boundary is not None: 

346 if self._has_no_op_trust_boundary(): 

347 # STS expects an empty string if the trust boundary value is no-op. 

348 return {"x-allowed-locations": ""} 

349 else: 

350 return {"x-allowed-locations": self._trust_boundary["encodedLocations"]} 

351 return {} 

352 

353 def apply(self, headers, token=None): 

354 """Apply the token to the authentication header.""" 

355 super().apply(headers, token) 

356 headers.update(self._get_trust_boundary_header()) 

357 

358 def refresh(self, request): 

359 """Refreshes the access token and the trust boundary. 

360 

361 This method calls the subclass's token refresh logic and then 

362 refreshes the trust boundary if applicable. 

363 """ 

364 self._refresh_token(request) 

365 self._refresh_trust_boundary(request) 

366 

367 def _refresh_trust_boundary(self, request): 

368 """Triggers a refresh of the trust boundary and updates the cache if necessary. 

369 

370 Args: 

371 request (google.auth.transport.Request): The object used to make 

372 HTTP requests. 

373 

374 Raises: 

375 google.auth.exceptions.RefreshError: If the trust boundary could 

376 not be refreshed and no cached value is available. 

377 """ 

378 if not self._is_trust_boundary_lookup_required(): 

379 return 

380 try: 

381 self._trust_boundary = self._lookup_trust_boundary(request) 

382 except exceptions.RefreshError as error: 

383 # If the call to the lookup API failed, check if there is a trust boundary 

384 # already cached. If there is, do nothing. If not, then throw the error. 

385 if self._trust_boundary is None: 

386 raise error 

387 if _helpers.is_logging_enabled(_LOGGER): 

388 _LOGGER.debug( 

389 "Using cached trust boundary due to refresh error: %s", error 

390 ) 

391 return 

392 

393 def _lookup_trust_boundary(self, request): 

394 """Calls the trust boundary lookup API to refresh the trust boundary cache. 

395 

396 Args: 

397 request (google.auth.transport.Request): The object used to make 

398 HTTP requests. 

399 

400 Returns: 

401 trust_boundary (dict): The trust boundary object returned by the lookup API. 

402 

403 Raises: 

404 google.auth.exceptions.RefreshError: If the trust boundary could not be 

405 retrieved. 

406 """ 

407 from google.oauth2 import _client 

408 

409 url = self._build_trust_boundary_lookup_url() 

410 if not url: 

411 raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.") 

412 

413 headers = {} 

414 self._apply(headers) 

415 headers.update(self._get_trust_boundary_header()) 

416 return _client._lookup_trust_boundary(request, url, headers=headers) 

417 

418 @abc.abstractmethod 

419 def _build_trust_boundary_lookup_url(self): 

420 """ 

421 Builds and returns the URL for the trust boundary lookup API. 

422 

423 This method should be implemented by subclasses to provide the 

424 specific URL based on the credential type and its properties. 

425 

426 Returns: 

427 str: The URL for the trust boundary lookup endpoint, or None 

428 if lookup should be skipped (e.g., for non-applicable universe domains). 

429 """ 

430 raise NotImplementedError( 

431 "_build_trust_boundary_lookup_url must be implemented" 

432 ) 

433 

434 def _has_no_op_trust_boundary(self): 

435 # A no-op trust boundary is indicated by encodedLocations being "0x0". 

436 # The "locations" list may or may not be present as an empty list. 

437 if ( 

438 self._trust_boundary is not None 

439 and self._trust_boundary["encodedLocations"] 

440 == NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS 

441 ): 

442 return True 

443 return False 

444 

445 

446class AnonymousCredentials(Credentials): 

447 """Credentials that do not provide any authentication information. 

448 

449 These are useful in the case of services that support anonymous access or 

450 local service emulators that do not use credentials. 

451 """ 

452 

453 @property 

454 def expired(self): 

455 """Returns `False`, anonymous credentials never expire.""" 

456 return False 

457 

458 @property 

459 def valid(self): 

460 """Returns `True`, anonymous credentials are always valid.""" 

461 return True 

462 

463 def refresh(self, request): 

464 """Raises :class:``InvalidOperation``, anonymous credentials cannot be 

465 refreshed.""" 

466 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.") 

467 

468 def apply(self, headers, token=None): 

469 """Anonymous credentials do nothing to the request. 

470 

471 The optional ``token`` argument is not supported. 

472 

473 Raises: 

474 google.auth.exceptions.InvalidValue: If a token was specified. 

475 """ 

476 if token is not None: 

477 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.") 

478 

479 def before_request(self, request, method, url, headers): 

480 """Anonymous credentials do nothing to the request.""" 

481 

482 

483class ReadOnlyScoped(metaclass=abc.ABCMeta): 

484 """Interface for credentials whose scopes can be queried. 

485 

486 OAuth 2.0-based credentials allow limiting access using scopes as described 

487 in `RFC6749 Section 3.3`_. 

488 If a credential class implements this interface then the credentials either 

489 use scopes in their implementation. 

490 

491 Some credentials require scopes in order to obtain a token. You can check 

492 if scoping is necessary with :attr:`requires_scopes`:: 

493 

494 if credentials.requires_scopes: 

495 # Scoping is required. 

496 credentials = credentials.with_scopes(scopes=['one', 'two']) 

497 

498 Credentials that require scopes must either be constructed with scopes:: 

499 

500 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

501 

502 Or must copy an existing instance using :meth:`with_scopes`:: 

503 

504 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

505 

506 Some credentials have scopes but do not allow or require scopes to be set, 

507 these credentials can be used as-is. 

508 

509 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

510 """ 

511 

512 def __init__(self): 

513 super(ReadOnlyScoped, self).__init__() 

514 self._scopes = None 

515 self._default_scopes = None 

516 

517 @property 

518 def scopes(self): 

519 """Sequence[str]: the credentials' current set of scopes.""" 

520 return self._scopes 

521 

522 @property 

523 def default_scopes(self): 

524 """Sequence[str]: the credentials' current set of default scopes.""" 

525 return self._default_scopes 

526 

527 @abc.abstractproperty 

528 def requires_scopes(self): 

529 """True if these credentials require scopes to obtain an access token.""" 

530 return False 

531 

532 def has_scopes(self, scopes): 

533 """Checks if the credentials have the given scopes. 

534 

535 .. warning: This method is not guaranteed to be accurate if the 

536 credentials are :attr:`~Credentials.invalid`. 

537 

538 Args: 

539 scopes (Sequence[str]): The list of scopes to check. 

540 

541 Returns: 

542 bool: True if the credentials have the given scopes. 

543 """ 

544 credential_scopes = ( 

545 self._scopes if self._scopes is not None else self._default_scopes 

546 ) 

547 return set(scopes).issubset(set(credential_scopes or [])) 

548 

549 

550class Scoped(ReadOnlyScoped): 

551 """Interface for credentials whose scopes can be replaced while copying. 

552 

553 OAuth 2.0-based credentials allow limiting access using scopes as described 

554 in `RFC6749 Section 3.3`_. 

555 If a credential class implements this interface then the credentials either 

556 use scopes in their implementation. 

557 

558 Some credentials require scopes in order to obtain a token. You can check 

559 if scoping is necessary with :attr:`requires_scopes`:: 

560 

561 if credentials.requires_scopes: 

562 # Scoping is required. 

563 credentials = credentials.create_scoped(['one', 'two']) 

564 

565 Credentials that require scopes must either be constructed with scopes:: 

566 

567 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

568 

569 Or must copy an existing instance using :meth:`with_scopes`:: 

570 

571 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

572 

573 Some credentials have scopes but do not allow or require scopes to be set, 

574 these credentials can be used as-is. 

575 

576 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

577 """ 

578 

579 @abc.abstractmethod 

580 def with_scopes(self, scopes, default_scopes=None): 

581 """Create a copy of these credentials with the specified scopes. 

582 

583 Args: 

584 scopes (Sequence[str]): The list of scopes to attach to the 

585 current credentials. 

586 

587 Raises: 

588 NotImplementedError: If the credentials' scopes can not be changed. 

589 This can be avoided by checking :attr:`requires_scopes` before 

590 calling this method. 

591 """ 

592 raise NotImplementedError("This class does not require scoping.") 

593 

594 

595def with_scopes_if_required(credentials, scopes, default_scopes=None): 

596 """Creates a copy of the credentials with scopes if scoping is required. 

597 

598 This helper function is useful when you do not know (or care to know) the 

599 specific type of credentials you are using (such as when you use 

600 :func:`google.auth.default`). This function will call 

601 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if 

602 the credentials require scoping. Otherwise, it will return the credentials 

603 as-is. 

604 

605 Args: 

606 credentials (google.auth.credentials.Credentials): The credentials to 

607 scope if necessary. 

608 scopes (Sequence[str]): The list of scopes to use. 

609 default_scopes (Sequence[str]): Default scopes passed by a 

610 Google client library. Use 'scopes' for user-defined scopes. 

611 

612 Returns: 

613 google.auth.credentials.Credentials: Either a new set of scoped 

614 credentials, or the passed in credentials instance if no scoping 

615 was required. 

616 """ 

617 if isinstance(credentials, Scoped) and credentials.requires_scopes: 

618 return credentials.with_scopes(scopes, default_scopes=default_scopes) 

619 else: 

620 return credentials 

621 

622 

623class Signing(metaclass=abc.ABCMeta): 

624 """Interface for credentials that can cryptographically sign messages.""" 

625 

626 @abc.abstractmethod 

627 def sign_bytes(self, message): 

628 """Signs the given message. 

629 

630 Args: 

631 message (bytes): The message to sign. 

632 

633 Returns: 

634 bytes: The message's cryptographic signature. 

635 """ 

636 # pylint: disable=missing-raises-doc,redundant-returns-doc 

637 # (pylint doesn't recognize that this is abstract) 

638 raise NotImplementedError("Sign bytes must be implemented.") 

639 

640 @abc.abstractproperty 

641 def signer_email(self): 

642 """Optional[str]: An email address that identifies the signer.""" 

643 # pylint: disable=missing-raises-doc 

644 # (pylint doesn't recognize that this is abstract) 

645 raise NotImplementedError("Signer email must be implemented.") 

646 

647 @abc.abstractproperty 

648 def signer(self): 

649 """google.auth.crypt.Signer: The signer used to sign bytes.""" 

650 # pylint: disable=missing-raises-doc 

651 # (pylint doesn't recognize that this is abstract) 

652 raise NotImplementedError("Signer must be implemented.") 

653 

654 

655class TokenState(Enum): 

656 """ 

657 Tracks the state of a token. 

658 FRESH: The token is valid. It is not expired or close to expired, or the token has no expiry. 

659 STALE: The token is close to expired, and should be refreshed. The token can be used normally. 

660 INVALID: The token is expired or invalid. The token cannot be used for a normal operation. 

661 """ 

662 

663 FRESH = 1 

664 STALE = 2 

665 INVALID = 3