Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/credentials.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

236 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16"""Interfaces for credentials.""" 

17 

18import abc 

19from enum import Enum 

20import logging 

21import os 

22from typing import Dict, List, Optional, TYPE_CHECKING 

23from urllib.parse import urlparse 

24import warnings 

25 

26 

27from google.auth import _helpers, environment_vars 

28from google.auth import _regional_access_boundary_utils 

29from google.auth import exceptions 

30from google.auth import metrics 

31from google.auth._credentials_base import _BaseCredentials 

32from google.auth._refresh_worker import RefreshThreadManager 

33 

34if TYPE_CHECKING: # pragma: NO COVER 

35 import google.auth.transport 

36 

37DEFAULT_UNIVERSE_DOMAIN = "googleapis.com" 

38 

39# These constants are deprecated and no longer used. 

40# They are kept solely for backward compatibility with older implementations. 

41NO_OP_TRUST_BOUNDARY_LOCATIONS: List[str] = [] 

42NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0" 

43 

44_LOGGER = logging.getLogger("google.auth._default") 

45 

46 

47class Credentials(_BaseCredentials): 

48 """Base class for all credentials. 

49 

50 All credentials have a :attr:`token` that is used for authentication and 

51 may also optionally set an :attr:`expiry` to indicate when the token will 

52 no longer be valid. 

53 

54 Most credentials will be :attr:`invalid` until :meth:`refresh` is called. 

55 Credentials can do this automatically before the first HTTP request in 

56 :meth:`before_request`. 

57 

58 Although the token and expiration will change as the credentials are 

59 :meth:`refreshed <refresh>` and used, credentials should be considered 

60 immutable. Various credentials will accept configuration such as private 

61 keys, scopes, and other options. These options are not changeable after 

62 construction. Some classes will provide mechanisms to copy the credentials 

63 with modifications such as :meth:`ScopedCredentials.with_scopes`. 

64 """ 

65 

66 def __init__(self): 

67 super(Credentials, self).__init__() 

68 

69 self.expiry = None 

70 """Optional[datetime]: When the token expires and is no longer valid. 

71 If this is None, the token is assumed to never expire.""" 

72 self._quota_project_id = None 

73 """Optional[str]: Project to use for quota and billing purposes.""" 

74 self._trust_boundary = None 

75 """Optional[dict]: Cache of a trust boundary response which has a list 

76 of allowed regions and an encoded string representation of credentials 

77 trust boundary.""" 

78 self._universe_domain = DEFAULT_UNIVERSE_DOMAIN 

79 """Optional[str]: The universe domain value, default is googleapis.com 

80 """ 

81 

82 self._use_non_blocking_refresh = False 

83 self._refresh_worker = RefreshThreadManager() 

84 

85 @property 

86 def expired(self): 

87 """Checks if the credentials are expired. 

88 

89 Note that credentials can be invalid but not expired because 

90 Credentials with :attr:`expiry` set to None is considered to never 

91 expire. 

92 

93 .. deprecated:: v2.24.0 

94 Prefer checking :attr:`token_state` instead. 

95 """ 

96 if not self.expiry: 

97 return False 

98 # Remove some threshold from expiry to err on the side of reporting 

99 # expiration early so that we avoid the 401-refresh-retry loop. 

100 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD 

101 return _helpers.utcnow() >= skewed_expiry 

102 

103 @property 

104 def valid(self): 

105 """Checks the validity of the credentials. 

106 

107 This is True if the credentials have a :attr:`token` and the token 

108 is not :attr:`expired`. 

109 

110 .. deprecated:: v2.24.0 

111 Prefer checking :attr:`token_state` instead. 

112 """ 

113 return self.token is not None and not self.expired 

114 

115 @property 

116 def token_state(self): 

117 """ 

118 See `:obj:`TokenState` 

119 """ 

120 if self.token is None: 

121 return TokenState.INVALID 

122 

123 # Credentials that can't expire are always treated as fresh. 

124 if self.expiry is None: 

125 return TokenState.FRESH 

126 

127 expired = _helpers.utcnow() >= self.expiry 

128 if expired: 

129 return TokenState.INVALID 

130 

131 is_stale = _helpers.utcnow() >= (self.expiry - _helpers.REFRESH_THRESHOLD) 

132 if is_stale: 

133 return TokenState.STALE 

134 

135 return TokenState.FRESH 

136 

137 @property 

138 def quota_project_id(self): 

139 """Project to use for quota and billing purposes.""" 

140 return self._quota_project_id 

141 

142 @property 

143 def universe_domain(self): 

144 """The universe domain value.""" 

145 return self._universe_domain 

146 

147 def get_cred_info(self): 

148 """The credential information JSON. 

149 

150 The credential information will be added to auth related error messages 

151 by client library. 

152 

153 Returns: 

154 Mapping[str, str]: The credential information JSON. 

155 """ 

156 return None 

157 

158 @abc.abstractmethod 

159 def refresh(self, request): 

160 """Refreshes the access token. 

161 

162 Args: 

163 request (google.auth.transport.Request): The object used to make 

164 HTTP requests. 

165 

166 Raises: 

167 google.auth.exceptions.RefreshError: If the credentials could 

168 not be refreshed. 

169 """ 

170 # pylint: disable=missing-raises-doc 

171 # (pylint doesn't recognize that this is abstract) 

172 raise NotImplementedError("Refresh must be implemented") 

173 

174 def _metric_header_for_usage(self): 

175 """The x-goog-api-client header for token usage metric. 

176 

177 This header will be added to the API service requests in before_request 

178 method. For example, "cred-type/sa-jwt" means service account self 

179 signed jwt access token is used in the API service request 

180 authorization header. Children credentials classes need to override 

181 this method to provide the header value, if the token usage metric is 

182 needed. 

183 

184 Returns: 

185 str: The x-goog-api-client header value. 

186 """ 

187 return None 

188 

189 def apply(self, headers, token=None): 

190 """Apply the token to the authentication header. 

191 

192 Args: 

193 headers (Mapping): The HTTP request headers. 

194 token (Optional[str]): If specified, overrides the current access 

195 token. 

196 """ 

197 self._apply(headers, token) 

198 if self.quota_project_id: 

199 headers["x-goog-user-project"] = self.quota_project_id 

200 

201 def _blocking_refresh(self, request): 

202 if not self.valid: 

203 self.refresh(request) 

204 

205 def _non_blocking_refresh(self, request): 

206 use_blocking_refresh_fallback = False 

207 

208 if self.token_state == TokenState.STALE: 

209 use_blocking_refresh_fallback = not self._refresh_worker.start_refresh( 

210 self, request 

211 ) 

212 

213 if self.token_state == TokenState.INVALID or use_blocking_refresh_fallback: 

214 self.refresh(request) 

215 # If the blocking refresh succeeds then we can clear the error info 

216 # on the background refresh worker, and perform refreshes in a 

217 # background thread. 

218 self._refresh_worker.clear_error() 

219 

220 def before_request(self, request, method, url, headers): 

221 """Performs credential-specific before request logic. 

222 

223 Refreshes the credentials if necessary, then calls :meth:`apply` to 

224 apply the token to the authentication header. 

225 

226 Args: 

227 request (google.auth.transport.Request): The object used to make 

228 HTTP requests. 

229 method (str): The request's HTTP method or the RPC method being 

230 invoked. 

231 url (str): The request's URI or the RPC service's URI. 

232 headers (Mapping): The request's headers. 

233 """ 

234 # pylint: disable=unused-argument 

235 # (Subclasses may use these arguments to ascertain information about 

236 # the http request.) 

237 if self._use_non_blocking_refresh: 

238 self._non_blocking_refresh(request) 

239 else: 

240 self._blocking_refresh(request) 

241 

242 metrics.add_metric_header(headers, self._metric_header_for_usage()) 

243 self.apply(headers) 

244 

245 def with_non_blocking_refresh(self): 

246 self._use_non_blocking_refresh = True 

247 

248 

249class CredentialsWithQuotaProject(Credentials): 

250 """Abstract base for credentials supporting ``with_quota_project`` factory""" 

251 

252 def with_quota_project(self, quota_project_id): 

253 """Returns a copy of these credentials with a modified quota project. 

254 

255 Args: 

256 quota_project_id (str): The project to use for quota and 

257 billing purposes 

258 

259 Returns: 

260 google.auth.credentials.Credentials: A new credentials instance. 

261 """ 

262 raise NotImplementedError("This credential does not support quota project.") 

263 

264 def with_quota_project_from_environment(self): 

265 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT) 

266 if quota_from_env: 

267 return self.with_quota_project(quota_from_env) 

268 return self 

269 

270 

271class CredentialsWithTokenUri(Credentials): 

272 """Abstract base for credentials supporting ``with_token_uri`` factory""" 

273 

274 def with_token_uri(self, token_uri): 

275 """Returns a copy of these credentials with a modified token uri. 

276 

277 Args: 

278 token_uri (str): The uri to use for fetching/exchanging tokens 

279 

280 Returns: 

281 google.auth.credentials.Credentials: A new credentials instance. 

282 """ 

283 raise NotImplementedError("This credential does not use token uri.") 

284 

285 

286class CredentialsWithUniverseDomain(Credentials): 

287 """Abstract base for credentials supporting ``with_universe_domain`` factory""" 

288 

289 def with_universe_domain(self, universe_domain): 

290 """Returns a copy of these credentials with a modified universe domain. 

291 

292 Args: 

293 universe_domain (str): The universe domain to use 

294 

295 Returns: 

296 google.auth.credentials.Credentials: A new credentials instance. 

297 """ 

298 raise NotImplementedError( 

299 "This credential does not support with_universe_domain." 

300 ) 

301 

302 

303class CredentialsWithRegionalAccessBoundary(Credentials): 

304 """Abstract base for credentials supporting regional access boundary configuration.""" 

305 

306 def __init__(self): 

307 super().__init__() 

308 self._rab_manager = ( 

309 _regional_access_boundary_utils._RegionalAccessBoundaryManager() 

310 ) 

311 

312 @property 

313 def regional_access_boundary(self): 

314 """Optional[str]: The encoded Regional Access Boundary locations.""" 

315 return self._rab_manager._data.encoded_locations 

316 

317 @property 

318 def regional_access_boundary_expiry(self): 

319 """Optional[datetime.datetime]: The expiration time of the Regional Access Boundary.""" 

320 return self._rab_manager._data.expiry 

321 

322 @abc.abstractmethod 

323 def _perform_refresh_token(self, request): 

324 """Refreshes the access token. 

325 

326 Args: 

327 request (google.auth.transport.Request): The object used to make 

328 HTTP requests. 

329 

330 Raises: 

331 google.auth.exceptions.RefreshError: If the credentials could 

332 not be refreshed. 

333 """ 

334 raise NotImplementedError("_perform_refresh_token must be implemented") 

335 

336 def with_trust_boundary(self, trust_boundary): 

337 """Returns a copy of these credentials. 

338 

339 .. deprecated:: 

340 Manual Regional Access Boundary overrides are not supported. 

341 This method is maintained for backwards compatibility and 

342 returns a copy of the credentials without modifying the 

343 Regional Access Boundary state. 

344 

345 Args: 

346 trust_boundary (Mapping[str, str]): Ignored. 

347 

348 Returns: 

349 google.auth.credentials.Credentials: A new credentials instance. 

350 """ 

351 import warnings 

352 

353 warnings.warn( 

354 "with_trust_boundary is deprecated and has no effect.", 

355 DeprecationWarning, 

356 stacklevel=2, 

357 ) 

358 make_copy = getattr(self, "_make_copy", None) 

359 if make_copy: 

360 return make_copy() 

361 else: 

362 raise NotImplementedError( 

363 "This credential does not support trust boundaries." 

364 ) 

365 

366 def _copy_regional_access_boundary_manager(self, target): 

367 """Copies the regional access boundary manager to another instance.""" 

368 # Create a new manager for the clone to isolate background refresh locks and threads, 

369 # but share the immutable data reference to avoid unnecessary initial lookups. 

370 new_manager = _regional_access_boundary_utils._RegionalAccessBoundaryManager() 

371 new_manager._data = self._rab_manager._data 

372 target._rab_manager = new_manager 

373 

374 def _set_regional_access_boundary(self, seed): 

375 """Applies the regional_access_boundary provided via the seed on these 

376 credentials. This is intended for internal use only as invalid 

377 seeds would produce unexpected results until automatic recovery is supported. 

378 Currently this is used by the gcloud CLI and therefore changes to the 

379 contract MUST be backwards compatible (e.g. the method signature must be 

380 unchanged and the credentials with the RAB set must be returned). 

381 

382 

383 Returns: 

384 google.auth.credentials.Credentials: The credentials instance. 

385 """ 

386 self._rab_manager.set_initial_regional_access_boundary( 

387 encoded_locations=seed.get("encodedLocations", None), 

388 expiry=seed.get("expiry", None), 

389 ) 

390 return self 

391 

392 def _set_blocking_regional_access_boundary_lookup(self): 

393 """Enables the blocking lookup mode on these credentials. 

394 This is intended for internal use only as blocking lookup requires additional 

395 care and consideration. Currently this is used by the gcloud CLI and 

396 therefore changes to the contract MUST be backwards compatible (e.g. the 

397 method signature must be unchanged and the credentials with the 

398 blocking lookup flag set to true must be returned). 

399 

400 Returns: 

401 google.auth.credentials.Credentials: The credentials instance. 

402 """ 

403 self._rab_manager.enable_blocking_lookup() 

404 return self 

405 

406 def _maybe_start_regional_access_boundary_refresh(self, request, url): 

407 """ 

408 Starts a background thread to refresh the Regional Access Boundary if needed. 

409 

410 This method checks if a refresh is necessary and if one is not already 

411 in progress or in a cooldown period. If so, it starts a background 

412 thread to perform the lookup. 

413 

414 Args: 

415 request (google.auth.transport.Request): The object used to make 

416 HTTP requests. 

417 url (str): The URL of the request. 

418 """ 

419 try: 

420 # Do not perform a lookup if the request is for a regional endpoint. 

421 hostname = urlparse(url).hostname 

422 if hostname and ( 

423 hostname.endswith(".rep.googleapis.com") 

424 or hostname.endswith(".rep.sandbox.googleapis.com") 

425 ): 

426 return 

427 except (ValueError, TypeError): 

428 # If the URL is malformed, proceed with the default lookup behavior. 

429 pass 

430 

431 # A refresh is only needed if the feature is enabled. 

432 if not self._is_regional_access_boundary_lookup_required(): 

433 return 

434 

435 # Start the background refresh if needed. 

436 self._rab_manager.maybe_start_refresh(self, request) 

437 

438 def _is_regional_access_boundary_lookup_required(self): 

439 """Checks if a Regional Access Boundary lookup is required. 

440 

441 A lookup is required if the feature is enabled via an environment 

442 variable and the universe domain is supported. 

443 

444 Returns: 

445 bool: True if a Regional Access Boundary lookup is required, False otherwise. 

446 """ 

447 # 1. Check if the feature is enabled. 

448 if not _regional_access_boundary_utils.is_regional_access_boundary_enabled(): 

449 return False 

450 

451 # 2. Skip for non-default universe domains. 

452 if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: 

453 return False 

454 

455 return True 

456 

457 def apply(self, headers, token=None): 

458 """Apply the token to the authentication header.""" 

459 super().apply(headers, token) 

460 self._rab_manager.apply_headers(headers) 

461 

462 def before_request(self, request, method, url, headers): 

463 """Refreshes the access token and triggers the Regional Access Boundary 

464 lookup if necessary. 

465 """ 

466 if self._use_non_blocking_refresh: 

467 self._non_blocking_refresh(request) 

468 else: 

469 self._blocking_refresh(request) 

470 

471 self._maybe_start_regional_access_boundary_refresh(request, url) 

472 

473 metrics.add_metric_header(headers, self._metric_header_for_usage()) 

474 self.apply(headers) 

475 

476 def refresh(self, request): 

477 """Refreshes the access token. 

478 

479 This method calls the subclass's token refresh logic. The Regional 

480 Access Boundary is refreshed separately in a non-blocking way. 

481 """ 

482 self._perform_refresh_token(request) 

483 

484 def _lookup_regional_access_boundary( 

485 self, 

486 request: "google.auth.transport.Request", # noqa: F821 

487 fail_fast: bool = False, 

488 ) -> "Optional[Dict[str, str]]": 

489 """Calls the Regional Access Boundary lookup API to retrieve the Regional Access Boundary information. 

490 

491 Args: 

492 request (google.auth.transport.Request): The object used to make 

493 HTTP requests. 

494 fail_fast (bool): Whether the lookup should fail fast (short timeout, no retries). 

495 

496 Returns: 

497 Optional[Dict[str, str]]: The Regional Access Boundary information returned by the lookup API, or None if the lookup failed. 

498 """ 

499 from google.oauth2 import _client 

500 

501 url = self._build_regional_access_boundary_lookup_url(request=request) 

502 if not url: 

503 _LOGGER.error("Failed to build Regional Access Boundary lookup URL.") 

504 return None 

505 

506 headers: Dict[str, str] = {} 

507 self._apply(headers) 

508 self._rab_manager.apply_headers(headers) 

509 return _client._lookup_regional_access_boundary( 

510 request, url, headers=headers, fail_fast=fail_fast 

511 ) 

512 

513 @abc.abstractmethod 

514 def _build_regional_access_boundary_lookup_url( 

515 self, request: "Optional[google.auth.transport.Request]" = None # noqa: F821 

516 ): 

517 """ 

518 Builds and returns the URL for the Regional Access Boundary lookup API. 

519 

520 This method should be implemented by subclasses to provide the 

521 specific URL based on the credential type and its properties. 

522 

523 Args: 

524 request (Optional[google.auth.transport.Request]): The object used 

525 to make HTTP requests. In some subclasses, this may be used to 

526 make an initial network call to resolve required metadata for the 

527 URL. 

528 

529 Returns: 

530 str: The URL for the Regional Access Boundary lookup endpoint, or None 

531 if lookup should be skipped (e.g., for non-applicable universe domains). 

532 """ 

533 raise NotImplementedError( 

534 "_build_regional_access_boundary_lookup_url must be implemented" 

535 ) 

536 

537 

538class AnonymousCredentials(Credentials): 

539 """Credentials that do not provide any authentication information. 

540 

541 These are useful in the case of services that support anonymous access or 

542 local service emulators that do not use credentials. 

543 """ 

544 

545 @property 

546 def expired(self): 

547 """Returns `False`, anonymous credentials never expire.""" 

548 return False 

549 

550 @property 

551 def valid(self): 

552 """Returns `True`, anonymous credentials are always valid.""" 

553 return True 

554 

555 def refresh(self, request): 

556 """Raises :class:``InvalidOperation``, anonymous credentials cannot be 

557 refreshed.""" 

558 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.") 

559 

560 def apply(self, headers, token=None): 

561 """Anonymous credentials do nothing to the request. 

562 

563 The optional ``token`` argument is not supported. 

564 

565 Raises: 

566 google.auth.exceptions.InvalidValue: If a token was specified. 

567 """ 

568 if token is not None: 

569 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.") 

570 

571 def before_request(self, request, method, url, headers): 

572 """Anonymous credentials do nothing to the request.""" 

573 

574 

575class ReadOnlyScoped(metaclass=abc.ABCMeta): 

576 """Interface for credentials whose scopes can be queried. 

577 

578 OAuth 2.0-based credentials allow limiting access using scopes as described 

579 in `RFC6749 Section 3.3`_. 

580 If a credential class implements this interface then the credentials either 

581 use scopes in their implementation. 

582 

583 Some credentials require scopes in order to obtain a token. You can check 

584 if scoping is necessary with :attr:`requires_scopes`:: 

585 

586 if credentials.requires_scopes: 

587 # Scoping is required. 

588 credentials = credentials.with_scopes(scopes=['one', 'two']) 

589 

590 Credentials that require scopes must either be constructed with scopes:: 

591 

592 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

593 

594 Or must copy an existing instance using :meth:`with_scopes`:: 

595 

596 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

597 

598 Some credentials have scopes but do not allow or require scopes to be set, 

599 these credentials can be used as-is. 

600 

601 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

602 """ 

603 

604 def __init__(self): 

605 super(ReadOnlyScoped, self).__init__() 

606 self._scopes = None 

607 self._default_scopes = None 

608 

609 @property 

610 def scopes(self): 

611 """Sequence[str]: the credentials' current set of scopes.""" 

612 return self._scopes 

613 

614 @property 

615 def default_scopes(self): 

616 """Sequence[str]: the credentials' current set of default scopes.""" 

617 return self._default_scopes 

618 

619 @abc.abstractproperty 

620 def requires_scopes(self): 

621 """True if these credentials require scopes to obtain an access token.""" 

622 return False 

623 

624 def has_scopes(self, scopes): 

625 """Checks if the credentials have the given scopes. 

626 

627 .. warning: This method is not guaranteed to be accurate if the 

628 credentials are :attr:`~Credentials.invalid`. 

629 

630 Args: 

631 scopes (Sequence[str]): The list of scopes to check. 

632 

633 Returns: 

634 bool: True if the credentials have the given scopes. 

635 """ 

636 credential_scopes = ( 

637 self._scopes if self._scopes is not None else self._default_scopes 

638 ) 

639 return set(scopes).issubset(set(credential_scopes or [])) 

640 

641 

642class Scoped(ReadOnlyScoped): 

643 """Interface for credentials whose scopes can be replaced while copying. 

644 

645 OAuth 2.0-based credentials allow limiting access using scopes as described 

646 in `RFC6749 Section 3.3`_. 

647 If a credential class implements this interface then the credentials either 

648 use scopes in their implementation. 

649 

650 Some credentials require scopes in order to obtain a token. You can check 

651 if scoping is necessary with :attr:`requires_scopes`:: 

652 

653 if credentials.requires_scopes: 

654 # Scoping is required. 

655 credentials = credentials.create_scoped(['one', 'two']) 

656 

657 Credentials that require scopes must either be constructed with scopes:: 

658 

659 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

660 

661 Or must copy an existing instance using :meth:`with_scopes`:: 

662 

663 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

664 

665 Some credentials have scopes but do not allow or require scopes to be set, 

666 these credentials can be used as-is. 

667 

668 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

669 """ 

670 

671 @abc.abstractmethod 

672 def with_scopes(self, scopes, default_scopes=None): 

673 """Create a copy of these credentials with the specified scopes. 

674 

675 Args: 

676 scopes (Sequence[str]): The list of scopes to attach to the 

677 current credentials. 

678 

679 Raises: 

680 NotImplementedError: If the credentials' scopes can not be changed. 

681 This can be avoided by checking :attr:`requires_scopes` before 

682 calling this method. 

683 """ 

684 raise NotImplementedError("This class does not require scoping.") 

685 

686 

687def with_scopes_if_required(credentials, scopes, default_scopes=None): 

688 """Creates a copy of the credentials with scopes if scoping is required. 

689 

690 This helper function is useful when you do not know (or care to know) the 

691 specific type of credentials you are using (such as when you use 

692 :func:`google.auth.default`). This function will call 

693 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if 

694 the credentials require scoping. Otherwise, it will return the credentials 

695 as-is. 

696 

697 Args: 

698 credentials (google.auth.credentials.Credentials): The credentials to 

699 scope if necessary. 

700 scopes (Sequence[str]): The list of scopes to use. 

701 default_scopes (Sequence[str]): Default scopes passed by a 

702 Google client library. Use 'scopes' for user-defined scopes. 

703 

704 Returns: 

705 google.auth.credentials.Credentials: Either a new set of scoped 

706 credentials, or the passed in credentials instance if no scoping 

707 was required. 

708 """ 

709 if isinstance(credentials, Scoped) and credentials.requires_scopes: 

710 return credentials.with_scopes(scopes, default_scopes=default_scopes) 

711 else: 

712 return credentials 

713 

714 

715class Signing(metaclass=abc.ABCMeta): 

716 """Interface for credentials that can cryptographically sign messages.""" 

717 

718 @abc.abstractmethod 

719 def sign_bytes(self, message): 

720 """Signs the given message. 

721 

722 Args: 

723 message (bytes): The message to sign. 

724 

725 Returns: 

726 bytes: The message's cryptographic signature. 

727 """ 

728 # pylint: disable=missing-raises-doc,redundant-returns-doc 

729 # (pylint doesn't recognize that this is abstract) 

730 raise NotImplementedError("Sign bytes must be implemented.") 

731 

732 @abc.abstractproperty 

733 def signer_email(self): 

734 """Optional[str]: An email address that identifies the signer.""" 

735 # pylint: disable=missing-raises-doc 

736 # (pylint doesn't recognize that this is abstract) 

737 raise NotImplementedError("Signer email must be implemented.") 

738 

739 @abc.abstractproperty 

740 def signer(self): 

741 """google.auth.crypt.Signer: The signer used to sign bytes.""" 

742 # pylint: disable=missing-raises-doc 

743 # (pylint doesn't recognize that this is abstract) 

744 raise NotImplementedError("Signer must be implemented.") 

745 

746 

747class TokenState(Enum): 

748 """ 

749 Tracks the state of a token. 

750 FRESH: The token is valid. It is not expired or close to expired, or the token has no expiry. 

751 STALE: The token is close to expired, and should be refreshed. The token can be used normally. 

752 INVALID: The token is expired or invalid. The token cannot be used for a normal operation. 

753 """ 

754 

755 FRESH = 1 

756 STALE = 2 

757 INVALID = 3 

758 

759 

760class CredentialsWithTrustBoundary(CredentialsWithRegionalAccessBoundary): 

761 """Abstract base for credentials supporting legacy trust boundary configuration. 

762 

763 .. deprecated:: 

764 Use :class:`~google.auth.credentials.CredentialsWithRegionalAccessBoundary` instead. 

765 """ 

766 

767 def __init__(self): 

768 super().__init__() 

769 warnings.warn( 

770 "CredentialsWithTrustBoundary is deprecated. Use CredentialsWithRegionalAccessBoundary.", 

771 DeprecationWarning, 

772 stacklevel=2, 

773 ) 

774 

775 @abc.abstractmethod 

776 def _build_trust_boundary_lookup_url(self): 

777 """Deprecated: Implement _build_regional_access_boundary_lookup_url instead.""" 

778 raise NotImplementedError() 

779 

780 def _build_regional_access_boundary_lookup_url(self, request=None): 

781 warnings.warn( 

782 "CredentialsWithTrustBoundary is deprecated. Use CredentialsWithRegionalAccessBoundary.", 

783 DeprecationWarning, 

784 stacklevel=2, 

785 ) 

786 return self._build_trust_boundary_lookup_url()