Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/credentials.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

203 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16"""Interfaces for credentials.""" 

17 

18import abc 

19from enum import Enum 

20import logging 

21import os 

22from typing import List 

23 

24from google.auth import _helpers, environment_vars 

25from google.auth import exceptions 

26from google.auth import metrics 

27from google.auth._credentials_base import _BaseCredentials 

28from google.auth._refresh_worker import RefreshThreadManager 

29 

30DEFAULT_UNIVERSE_DOMAIN = "googleapis.com" 

31NO_OP_TRUST_BOUNDARY_LOCATIONS: List[str] = [] 

32NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0" 

33 

34_LOGGER = logging.getLogger("google.auth._default") 

35 

36 

37class Credentials(_BaseCredentials): 

38 """Base class for all credentials. 

39 

40 All credentials have a :attr:`token` that is used for authentication and 

41 may also optionally set an :attr:`expiry` to indicate when the token will 

42 no longer be valid. 

43 

44 Most credentials will be :attr:`invalid` until :meth:`refresh` is called. 

45 Credentials can do this automatically before the first HTTP request in 

46 :meth:`before_request`. 

47 

48 Although the token and expiration will change as the credentials are 

49 :meth:`refreshed <refresh>` and used, credentials should be considered 

50 immutable. Various credentials will accept configuration such as private 

51 keys, scopes, and other options. These options are not changeable after 

52 construction. Some classes will provide mechanisms to copy the credentials 

53 with modifications such as :meth:`ScopedCredentials.with_scopes`. 

54 """ 

55 

56 def __init__(self): 

57 super(Credentials, self).__init__() 

58 

59 self.expiry = None 

60 """Optional[datetime]: When the token expires and is no longer valid. 

61 If this is None, the token is assumed to never expire.""" 

62 self._quota_project_id = None 

63 """Optional[str]: Project to use for quota and billing purposes.""" 

64 self._trust_boundary = None 

65 """Optional[dict]: Cache of a trust boundary response which has a list 

66 of allowed regions and an encoded string representation of credentials 

67 trust boundary.""" 

68 self._universe_domain = DEFAULT_UNIVERSE_DOMAIN 

69 """Optional[str]: The universe domain value, default is googleapis.com 

70 """ 

71 

72 self._use_non_blocking_refresh = False 

73 self._refresh_worker = RefreshThreadManager() 

74 

75 @property 

76 def expired(self): 

77 """Checks if the credentials are expired. 

78 

79 Note that credentials can be invalid but not expired because 

80 Credentials with :attr:`expiry` set to None is considered to never 

81 expire. 

82 

83 .. deprecated:: v2.24.0 

84 Prefer checking :attr:`token_state` instead. 

85 """ 

86 if not self.expiry: 

87 return False 

88 # Remove some threshold from expiry to err on the side of reporting 

89 # expiration early so that we avoid the 401-refresh-retry loop. 

90 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD 

91 return _helpers.utcnow() >= skewed_expiry 

92 

93 @property 

94 def valid(self): 

95 """Checks the validity of the credentials. 

96 

97 This is True if the credentials have a :attr:`token` and the token 

98 is not :attr:`expired`. 

99 

100 .. deprecated:: v2.24.0 

101 Prefer checking :attr:`token_state` instead. 

102 """ 

103 return self.token is not None and not self.expired 

104 

105 @property 

106 def token_state(self): 

107 """ 

108 See `:obj:`TokenState` 

109 """ 

110 if self.token is None: 

111 return TokenState.INVALID 

112 

113 # Credentials that can't expire are always treated as fresh. 

114 if self.expiry is None: 

115 return TokenState.FRESH 

116 

117 expired = _helpers.utcnow() >= self.expiry 

118 if expired: 

119 return TokenState.INVALID 

120 

121 is_stale = _helpers.utcnow() >= (self.expiry - _helpers.REFRESH_THRESHOLD) 

122 if is_stale: 

123 return TokenState.STALE 

124 

125 return TokenState.FRESH 

126 

127 @property 

128 def quota_project_id(self): 

129 """Project to use for quota and billing purposes.""" 

130 return self._quota_project_id 

131 

132 @property 

133 def universe_domain(self): 

134 """The universe domain value.""" 

135 return self._universe_domain 

136 

137 def get_cred_info(self): 

138 """The credential information JSON. 

139 

140 The credential information will be added to auth related error messages 

141 by client library. 

142 

143 Returns: 

144 Mapping[str, str]: The credential information JSON. 

145 """ 

146 return None 

147 

148 @abc.abstractmethod 

149 def refresh(self, request): 

150 """Refreshes the access token. 

151 

152 Args: 

153 request (google.auth.transport.Request): The object used to make 

154 HTTP requests. 

155 

156 Raises: 

157 google.auth.exceptions.RefreshError: If the credentials could 

158 not be refreshed. 

159 """ 

160 # pylint: disable=missing-raises-doc 

161 # (pylint doesn't recognize that this is abstract) 

162 raise NotImplementedError("Refresh must be implemented") 

163 

164 def _metric_header_for_usage(self): 

165 """The x-goog-api-client header for token usage metric. 

166 

167 This header will be added to the API service requests in before_request 

168 method. For example, "cred-type/sa-jwt" means service account self 

169 signed jwt access token is used in the API service request 

170 authorization header. Children credentials classes need to override 

171 this method to provide the header value, if the token usage metric is 

172 needed. 

173 

174 Returns: 

175 str: The x-goog-api-client header value. 

176 """ 

177 return None 

178 

179 def apply(self, headers, token=None): 

180 """Apply the token to the authentication header. 

181 

182 Args: 

183 headers (Mapping): The HTTP request headers. 

184 token (Optional[str]): If specified, overrides the current access 

185 token. 

186 """ 

187 self._apply(headers, token) 

188 if self.quota_project_id: 

189 headers["x-goog-user-project"] = self.quota_project_id 

190 

191 def _blocking_refresh(self, request): 

192 if not self.valid: 

193 self.refresh(request) 

194 

195 def _non_blocking_refresh(self, request): 

196 use_blocking_refresh_fallback = False 

197 

198 if self.token_state == TokenState.STALE: 

199 use_blocking_refresh_fallback = not self._refresh_worker.start_refresh( 

200 self, request 

201 ) 

202 

203 if self.token_state == TokenState.INVALID or use_blocking_refresh_fallback: 

204 self.refresh(request) 

205 # If the blocking refresh succeeds then we can clear the error info 

206 # on the background refresh worker, and perform refreshes in a 

207 # background thread. 

208 self._refresh_worker.clear_error() 

209 

210 def before_request(self, request, method, url, headers): 

211 """Performs credential-specific before request logic. 

212 

213 Refreshes the credentials if necessary, then calls :meth:`apply` to 

214 apply the token to the authentication header. 

215 

216 Args: 

217 request (google.auth.transport.Request): The object used to make 

218 HTTP requests. 

219 method (str): The request's HTTP method or the RPC method being 

220 invoked. 

221 url (str): The request's URI or the RPC service's URI. 

222 headers (Mapping): The request's headers. 

223 """ 

224 # pylint: disable=unused-argument 

225 # (Subclasses may use these arguments to ascertain information about 

226 # the http request.) 

227 if self._use_non_blocking_refresh: 

228 self._non_blocking_refresh(request) 

229 else: 

230 self._blocking_refresh(request) 

231 

232 metrics.add_metric_header(headers, self._metric_header_for_usage()) 

233 self.apply(headers) 

234 

235 def with_non_blocking_refresh(self): 

236 self._use_non_blocking_refresh = True 

237 

238 

239class CredentialsWithQuotaProject(Credentials): 

240 """Abstract base for credentials supporting ``with_quota_project`` factory""" 

241 

242 def with_quota_project(self, quota_project_id): 

243 """Returns a copy of these credentials with a modified quota project. 

244 

245 Args: 

246 quota_project_id (str): The project to use for quota and 

247 billing purposes 

248 

249 Returns: 

250 google.auth.credentials.Credentials: A new credentials instance. 

251 """ 

252 raise NotImplementedError("This credential does not support quota project.") 

253 

254 def with_quota_project_from_environment(self): 

255 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT) 

256 if quota_from_env: 

257 return self.with_quota_project(quota_from_env) 

258 return self 

259 

260 

261class CredentialsWithTokenUri(Credentials): 

262 """Abstract base for credentials supporting ``with_token_uri`` factory""" 

263 

264 def with_token_uri(self, token_uri): 

265 """Returns a copy of these credentials with a modified token uri. 

266 

267 Args: 

268 token_uri (str): The uri to use for fetching/exchanging tokens 

269 

270 Returns: 

271 google.auth.credentials.Credentials: A new credentials instance. 

272 """ 

273 raise NotImplementedError("This credential does not use token uri.") 

274 

275 

276class CredentialsWithUniverseDomain(Credentials): 

277 """Abstract base for credentials supporting ``with_universe_domain`` factory""" 

278 

279 def with_universe_domain(self, universe_domain): 

280 """Returns a copy of these credentials with a modified universe domain. 

281 

282 Args: 

283 universe_domain (str): The universe domain to use 

284 

285 Returns: 

286 google.auth.credentials.Credentials: A new credentials instance. 

287 """ 

288 raise NotImplementedError( 

289 "This credential does not support with_universe_domain." 

290 ) 

291 

292 

293class CredentialsWithTrustBoundary(Credentials): 

294 """Abstract base for credentials supporting ``with_trust_boundary`` factory""" 

295 

296 @abc.abstractmethod 

297 def _perform_refresh_token(self, request): 

298 """Refreshes the access token. 

299 

300 Args: 

301 request (google.auth.transport.Request): The object used to make 

302 HTTP requests. 

303 

304 Raises: 

305 google.auth.exceptions.RefreshError: If the credentials could 

306 not be refreshed. 

307 """ 

308 raise NotImplementedError("_perform_refresh_token must be implemented") 

309 

310 def with_trust_boundary(self, trust_boundary): 

311 """Returns a copy of these credentials with a modified trust boundary. 

312 

313 Args: 

314 trust_boundary Mapping[str, str]: The trust boundary to use for the 

315 credential. This should be a map with a "locations" key that maps to 

316 a list of GCP regions, and a "encodedLocations" key that maps to a 

317 hex string. 

318 

319 Returns: 

320 google.auth.credentials.Credentials: A new credentials instance. 

321 """ 

322 raise NotImplementedError("This credential does not support trust boundaries.") 

323 

324 def _is_trust_boundary_lookup_required(self): 

325 """Checks if a trust boundary lookup is required. 

326 

327 A lookup is required if the feature is enabled via an environment 

328 variable, the universe domain is supported, and a no-op boundary 

329 is not already cached. 

330 

331 Returns: 

332 bool: True if a trust boundary lookup is required, False otherwise. 

333 """ 

334 # 1. Check if the feature is enabled via environment variable. 

335 if not _helpers.get_bool_from_env( 

336 environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False 

337 ): 

338 return False 

339 

340 # 2. Skip trust boundary flow for non-default universe domains. 

341 if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: 

342 return False 

343 

344 # 3. Do not trigger refresh if credential has a cached no-op trust boundary. 

345 return not self._has_no_op_trust_boundary() 

346 

347 def _get_trust_boundary_header(self): 

348 if self._trust_boundary is not None: 

349 if self._has_no_op_trust_boundary(): 

350 # STS expects an empty string if the trust boundary value is no-op. 

351 return {"x-allowed-locations": ""} 

352 else: 

353 return {"x-allowed-locations": self._trust_boundary["encodedLocations"]} 

354 return {} 

355 

356 def apply(self, headers, token=None): 

357 """Apply the token to the authentication header.""" 

358 super().apply(headers, token) 

359 headers.update(self._get_trust_boundary_header()) 

360 

361 def refresh(self, request): 

362 """Refreshes the access token and the trust boundary. 

363 

364 This method calls the subclass's token refresh logic and then 

365 refreshes the trust boundary if applicable. 

366 """ 

367 self._perform_refresh_token(request) 

368 self._refresh_trust_boundary(request) 

369 

370 def _refresh_trust_boundary(self, request): 

371 """Triggers a refresh of the trust boundary and updates the cache if necessary. 

372 

373 Args: 

374 request (google.auth.transport.Request): The object used to make 

375 HTTP requests. 

376 

377 Raises: 

378 google.auth.exceptions.RefreshError: If the trust boundary could 

379 not be refreshed and no cached value is available. 

380 """ 

381 if not self._is_trust_boundary_lookup_required(): 

382 return 

383 try: 

384 self._trust_boundary = self._lookup_trust_boundary(request) 

385 except exceptions.RefreshError as error: 

386 # If the call to the lookup API failed, check if there is a trust boundary 

387 # already cached. If there is, do nothing. If not, then throw the error. 

388 if self._trust_boundary is None: 

389 raise error 

390 if _helpers.is_logging_enabled(_LOGGER): 

391 _LOGGER.debug( 

392 "Using cached trust boundary due to refresh error: %s", error 

393 ) 

394 return 

395 

396 def _lookup_trust_boundary(self, request): 

397 """Calls the trust boundary lookup API to refresh the trust boundary cache. 

398 

399 Args: 

400 request (google.auth.transport.Request): The object used to make 

401 HTTP requests. 

402 

403 Returns: 

404 trust_boundary (dict): The trust boundary object returned by the lookup API. 

405 

406 Raises: 

407 google.auth.exceptions.RefreshError: If the trust boundary could not be 

408 retrieved. 

409 """ 

410 from google.oauth2 import _client 

411 

412 url = self._build_trust_boundary_lookup_url() 

413 if not url: 

414 raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.") 

415 

416 headers = {} 

417 self._apply(headers) 

418 headers.update(self._get_trust_boundary_header()) 

419 return _client._lookup_trust_boundary(request, url, headers=headers) 

420 

421 @abc.abstractmethod 

422 def _build_trust_boundary_lookup_url(self): 

423 """ 

424 Builds and returns the URL for the trust boundary lookup API. 

425 

426 This method should be implemented by subclasses to provide the 

427 specific URL based on the credential type and its properties. 

428 

429 Returns: 

430 str: The URL for the trust boundary lookup endpoint, or None 

431 if lookup should be skipped (e.g., for non-applicable universe domains). 

432 """ 

433 raise NotImplementedError( 

434 "_build_trust_boundary_lookup_url must be implemented" 

435 ) 

436 

437 def _has_no_op_trust_boundary(self): 

438 # A no-op trust boundary is indicated by encodedLocations being "0x0". 

439 # The "locations" list may or may not be present as an empty list. 

440 if self._trust_boundary is None: 

441 return False 

442 return ( 

443 self._trust_boundary.get("encodedLocations") 

444 == NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS 

445 ) 

446 

447 

448class AnonymousCredentials(Credentials): 

449 """Credentials that do not provide any authentication information. 

450 

451 These are useful in the case of services that support anonymous access or 

452 local service emulators that do not use credentials. 

453 """ 

454 

455 @property 

456 def expired(self): 

457 """Returns `False`, anonymous credentials never expire.""" 

458 return False 

459 

460 @property 

461 def valid(self): 

462 """Returns `True`, anonymous credentials are always valid.""" 

463 return True 

464 

465 def refresh(self, request): 

466 """Raises :class:``InvalidOperation``, anonymous credentials cannot be 

467 refreshed.""" 

468 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.") 

469 

470 def apply(self, headers, token=None): 

471 """Anonymous credentials do nothing to the request. 

472 

473 The optional ``token`` argument is not supported. 

474 

475 Raises: 

476 google.auth.exceptions.InvalidValue: If a token was specified. 

477 """ 

478 if token is not None: 

479 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.") 

480 

481 def before_request(self, request, method, url, headers): 

482 """Anonymous credentials do nothing to the request.""" 

483 

484 

485class ReadOnlyScoped(metaclass=abc.ABCMeta): 

486 """Interface for credentials whose scopes can be queried. 

487 

488 OAuth 2.0-based credentials allow limiting access using scopes as described 

489 in `RFC6749 Section 3.3`_. 

490 If a credential class implements this interface then the credentials either 

491 use scopes in their implementation. 

492 

493 Some credentials require scopes in order to obtain a token. You can check 

494 if scoping is necessary with :attr:`requires_scopes`:: 

495 

496 if credentials.requires_scopes: 

497 # Scoping is required. 

498 credentials = credentials.with_scopes(scopes=['one', 'two']) 

499 

500 Credentials that require scopes must either be constructed with scopes:: 

501 

502 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

503 

504 Or must copy an existing instance using :meth:`with_scopes`:: 

505 

506 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

507 

508 Some credentials have scopes but do not allow or require scopes to be set, 

509 these credentials can be used as-is. 

510 

511 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

512 """ 

513 

514 def __init__(self): 

515 super(ReadOnlyScoped, self).__init__() 

516 self._scopes = None 

517 self._default_scopes = None 

518 

519 @property 

520 def scopes(self): 

521 """Sequence[str]: the credentials' current set of scopes.""" 

522 return self._scopes 

523 

524 @property 

525 def default_scopes(self): 

526 """Sequence[str]: the credentials' current set of default scopes.""" 

527 return self._default_scopes 

528 

529 @abc.abstractproperty 

530 def requires_scopes(self): 

531 """True if these credentials require scopes to obtain an access token.""" 

532 return False 

533 

534 def has_scopes(self, scopes): 

535 """Checks if the credentials have the given scopes. 

536 

537 .. warning: This method is not guaranteed to be accurate if the 

538 credentials are :attr:`~Credentials.invalid`. 

539 

540 Args: 

541 scopes (Sequence[str]): The list of scopes to check. 

542 

543 Returns: 

544 bool: True if the credentials have the given scopes. 

545 """ 

546 credential_scopes = ( 

547 self._scopes if self._scopes is not None else self._default_scopes 

548 ) 

549 return set(scopes).issubset(set(credential_scopes or [])) 

550 

551 

552class Scoped(ReadOnlyScoped): 

553 """Interface for credentials whose scopes can be replaced while copying. 

554 

555 OAuth 2.0-based credentials allow limiting access using scopes as described 

556 in `RFC6749 Section 3.3`_. 

557 If a credential class implements this interface then the credentials either 

558 use scopes in their implementation. 

559 

560 Some credentials require scopes in order to obtain a token. You can check 

561 if scoping is necessary with :attr:`requires_scopes`:: 

562 

563 if credentials.requires_scopes: 

564 # Scoping is required. 

565 credentials = credentials.create_scoped(['one', 'two']) 

566 

567 Credentials that require scopes must either be constructed with scopes:: 

568 

569 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

570 

571 Or must copy an existing instance using :meth:`with_scopes`:: 

572 

573 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

574 

575 Some credentials have scopes but do not allow or require scopes to be set, 

576 these credentials can be used as-is. 

577 

578 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

579 """ 

580 

581 @abc.abstractmethod 

582 def with_scopes(self, scopes, default_scopes=None): 

583 """Create a copy of these credentials with the specified scopes. 

584 

585 Args: 

586 scopes (Sequence[str]): The list of scopes to attach to the 

587 current credentials. 

588 

589 Raises: 

590 NotImplementedError: If the credentials' scopes can not be changed. 

591 This can be avoided by checking :attr:`requires_scopes` before 

592 calling this method. 

593 """ 

594 raise NotImplementedError("This class does not require scoping.") 

595 

596 

597def with_scopes_if_required(credentials, scopes, default_scopes=None): 

598 """Creates a copy of the credentials with scopes if scoping is required. 

599 

600 This helper function is useful when you do not know (or care to know) the 

601 specific type of credentials you are using (such as when you use 

602 :func:`google.auth.default`). This function will call 

603 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if 

604 the credentials require scoping. Otherwise, it will return the credentials 

605 as-is. 

606 

607 Args: 

608 credentials (google.auth.credentials.Credentials): The credentials to 

609 scope if necessary. 

610 scopes (Sequence[str]): The list of scopes to use. 

611 default_scopes (Sequence[str]): Default scopes passed by a 

612 Google client library. Use 'scopes' for user-defined scopes. 

613 

614 Returns: 

615 google.auth.credentials.Credentials: Either a new set of scoped 

616 credentials, or the passed in credentials instance if no scoping 

617 was required. 

618 """ 

619 if isinstance(credentials, Scoped) and credentials.requires_scopes: 

620 return credentials.with_scopes(scopes, default_scopes=default_scopes) 

621 else: 

622 return credentials 

623 

624 

625class Signing(metaclass=abc.ABCMeta): 

626 """Interface for credentials that can cryptographically sign messages.""" 

627 

628 @abc.abstractmethod 

629 def sign_bytes(self, message): 

630 """Signs the given message. 

631 

632 Args: 

633 message (bytes): The message to sign. 

634 

635 Returns: 

636 bytes: The message's cryptographic signature. 

637 """ 

638 # pylint: disable=missing-raises-doc,redundant-returns-doc 

639 # (pylint doesn't recognize that this is abstract) 

640 raise NotImplementedError("Sign bytes must be implemented.") 

641 

642 @abc.abstractproperty 

643 def signer_email(self): 

644 """Optional[str]: An email address that identifies the signer.""" 

645 # pylint: disable=missing-raises-doc 

646 # (pylint doesn't recognize that this is abstract) 

647 raise NotImplementedError("Signer email must be implemented.") 

648 

649 @abc.abstractproperty 

650 def signer(self): 

651 """google.auth.crypt.Signer: The signer used to sign bytes.""" 

652 # pylint: disable=missing-raises-doc 

653 # (pylint doesn't recognize that this is abstract) 

654 raise NotImplementedError("Signer must be implemented.") 

655 

656 

657class TokenState(Enum): 

658 """ 

659 Tracks the state of a token. 

660 FRESH: The token is valid. It is not expired or close to expired, or the token has no expiry. 

661 STALE: The token is close to expired, and should be refreshed. The token can be used normally. 

662 INVALID: The token is expired or invalid. The token cannot be used for a normal operation. 

663 """ 

664 

665 FRESH = 1 

666 STALE = 2 

667 INVALID = 3