Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/credentials.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

246 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16"""Interfaces for credentials.""" 

17 

18import abc 

19from enum import Enum 

20import logging 

21import os 

22from typing import Dict, List, Optional, TYPE_CHECKING 

23from urllib.parse import urlparse 

24import warnings 

25 

26 

27from google.auth import _helpers, environment_vars 

28from google.auth import _regional_access_boundary_utils 

29from google.auth import exceptions 

30from google.auth import metrics 

31from google.auth._credentials_base import _BaseCredentials 

32from google.auth._refresh_worker import RefreshThreadManager 

33 

34if TYPE_CHECKING: # pragma: NO COVER 

35 import google.auth.transport 

36 

37DEFAULT_UNIVERSE_DOMAIN = _helpers.DEFAULT_UNIVERSE_DOMAIN 

38 

39# These constants are deprecated and no longer used. 

40# They are kept solely for backward compatibility with older implementations. 

41NO_OP_TRUST_BOUNDARY_LOCATIONS: List[str] = [] 

42NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0" 

43 

44_LOGGER = logging.getLogger("google.auth._default") 

45 

46 

47class Credentials(_BaseCredentials): 

48 """Base class for all credentials. 

49 

50 All credentials have a :attr:`token` that is used for authentication and 

51 may also optionally set an :attr:`expiry` to indicate when the token will 

52 no longer be valid. 

53 

54 Most credentials will be :attr:`invalid` until :meth:`refresh` is called. 

55 Credentials can do this automatically before the first HTTP request in 

56 :meth:`before_request`. 

57 

58 Although the token and expiration will change as the credentials are 

59 :meth:`refreshed <refresh>` and used, credentials should be considered 

60 immutable. Various credentials will accept configuration such as private 

61 keys, scopes, and other options. These options are not changeable after 

62 construction. Some classes will provide mechanisms to copy the credentials 

63 with modifications such as :meth:`ScopedCredentials.with_scopes`. 

64 """ 

65 

66 def __init__(self): 

67 super(Credentials, self).__init__() 

68 

69 self.expiry = None 

70 """Optional[datetime]: When the token expires and is no longer valid. 

71 If this is None, the token is assumed to never expire.""" 

72 self._quota_project_id = None 

73 """Optional[str]: Project to use for quota and billing purposes.""" 

74 self._trust_boundary = None 

75 """Optional[dict]: Cache of a trust boundary response which has a list 

76 of allowed regions and an encoded string representation of credentials 

77 trust boundary.""" 

78 self._universe_domain = DEFAULT_UNIVERSE_DOMAIN 

79 """Optional[str]: The universe domain value, default is googleapis.com 

80 """ 

81 

82 self._use_non_blocking_refresh = False 

83 self._refresh_worker = RefreshThreadManager() 

84 

85 @property 

86 def expired(self): 

87 """Checks if the credentials are expired. 

88 

89 Note that credentials can be invalid but not expired because 

90 Credentials with :attr:`expiry` set to None is considered to never 

91 expire. 

92 

93 .. deprecated:: v2.24.0 

94 Prefer checking :attr:`token_state` instead. 

95 """ 

96 if not self.expiry: 

97 return False 

98 # Remove some threshold from expiry to err on the side of reporting 

99 # expiration early so that we avoid the 401-refresh-retry loop. 

100 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD 

101 return _helpers.utcnow() >= skewed_expiry 

102 

103 @property 

104 def valid(self): 

105 """Checks the validity of the credentials. 

106 

107 This is True if the credentials have a :attr:`token` and the token 

108 is not :attr:`expired`. 

109 

110 .. deprecated:: v2.24.0 

111 Prefer checking :attr:`token_state` instead. 

112 """ 

113 return self.token is not None and not self.expired 

114 

115 @property 

116 def token_state(self): 

117 """ 

118 See `:obj:`TokenState` 

119 """ 

120 if self.token is None: 

121 return TokenState.INVALID 

122 

123 # Credentials that can't expire are always treated as fresh. 

124 if self.expiry is None: 

125 return TokenState.FRESH 

126 

127 expired = _helpers.utcnow() >= self.expiry 

128 if expired: 

129 return TokenState.INVALID 

130 

131 is_stale = _helpers.utcnow() >= (self.expiry - _helpers.REFRESH_THRESHOLD) 

132 if is_stale: 

133 return TokenState.STALE 

134 

135 return TokenState.FRESH 

136 

137 @property 

138 def quota_project_id(self): 

139 """Project to use for quota and billing purposes.""" 

140 return self._quota_project_id 

141 

142 @property 

143 def universe_domain(self): 

144 """The universe domain value.""" 

145 return self._universe_domain 

146 

147 def get_cred_info(self): 

148 """The credential information JSON. 

149 

150 The credential information will be added to auth related error messages 

151 by client library. 

152 

153 Returns: 

154 Mapping[str, str]: The credential information JSON. 

155 """ 

156 return None 

157 

158 @abc.abstractmethod 

159 def refresh(self, request): 

160 """Refreshes the access token. 

161 

162 Args: 

163 request (google.auth.transport.Request): The object used to make 

164 HTTP requests. 

165 

166 Raises: 

167 google.auth.exceptions.RefreshError: If the credentials could 

168 not be refreshed. 

169 """ 

170 # pylint: disable=missing-raises-doc 

171 # (pylint doesn't recognize that this is abstract) 

172 raise NotImplementedError("Refresh must be implemented") 

173 

174 def _metric_header_for_usage(self): 

175 """The x-goog-api-client header for token usage metric. 

176 

177 This header will be added to the API service requests in before_request 

178 method. For example, "cred-type/sa-jwt" means service account self 

179 signed jwt access token is used in the API service request 

180 authorization header. Children credentials classes need to override 

181 this method to provide the header value, if the token usage metric is 

182 needed. 

183 

184 Returns: 

185 str: The x-goog-api-client header value. 

186 """ 

187 return None 

188 

189 def apply(self, headers, token=None): 

190 """Apply the token to the authentication header. 

191 

192 Args: 

193 headers (Mapping): The HTTP request headers. 

194 token (Optional[str]): If specified, overrides the current access 

195 token. 

196 """ 

197 self._apply(headers, token) 

198 if self.quota_project_id: 

199 headers["x-goog-user-project"] = self.quota_project_id 

200 

201 def _blocking_refresh(self, request): 

202 if not self.valid: 

203 self.refresh(request) 

204 

205 def _non_blocking_refresh(self, request): 

206 use_blocking_refresh_fallback = False 

207 

208 if self.token_state == TokenState.STALE: 

209 use_blocking_refresh_fallback = not self._refresh_worker.start_refresh( 

210 self, request 

211 ) 

212 

213 if self.token_state == TokenState.INVALID or use_blocking_refresh_fallback: 

214 self.refresh(request) 

215 # If the blocking refresh succeeds then we can clear the error info 

216 # on the background refresh worker, and perform refreshes in a 

217 # background thread. 

218 self._refresh_worker.clear_error() 

219 

220 def before_request(self, request, method, url, headers): 

221 """Performs credential-specific before request logic. 

222 

223 Refreshes the credentials if necessary, then calls :meth:`apply` to 

224 apply the token to the authentication header. 

225 

226 Args: 

227 request (google.auth.transport.Request): The object used to make 

228 HTTP requests. 

229 method (str): The request's HTTP method or the RPC method being 

230 invoked. 

231 url (str): The request's URI or the RPC service's URI. 

232 headers (Mapping): The request's headers. 

233 """ 

234 # pylint: disable=unused-argument 

235 # (Subclasses may use these arguments to ascertain information about 

236 # the http request.) 

237 if self._use_non_blocking_refresh: 

238 self._non_blocking_refresh(request) 

239 else: 

240 self._blocking_refresh(request) 

241 

242 self._after_refresh(request, method, url, headers) 

243 

244 metrics.add_metric_header(headers, self._metric_header_for_usage()) 

245 self.apply(headers) 

246 

247 def _after_refresh(self, request, method, url, headers): 

248 """Hook for subclasses to perform actions after refresh but before 

249 applying credentials to headers. 

250 

251 Args: 

252 request (google.auth.transport.Request): The object used to make 

253 HTTP requests. 

254 method (str): The request's HTTP method or the RPC method being 

255 invoked. 

256 url (str): The request's URI or the RPC service's URI. 

257 headers (Mapping): The request's headers. 

258 """ 

259 pass 

260 

261 def with_non_blocking_refresh(self): 

262 self._use_non_blocking_refresh = True 

263 

264 

265class CredentialsWithQuotaProject(Credentials): 

266 """Abstract base for credentials supporting ``with_quota_project`` factory""" 

267 

268 def with_quota_project(self, quota_project_id): 

269 """Returns a copy of these credentials with a modified quota project. 

270 

271 Args: 

272 quota_project_id (str): The project to use for quota and 

273 billing purposes 

274 

275 Returns: 

276 google.auth.credentials.Credentials: A new credentials instance. 

277 """ 

278 raise NotImplementedError("This credential does not support quota project.") 

279 

280 def with_quota_project_from_environment(self): 

281 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT) 

282 if quota_from_env: 

283 return self.with_quota_project(quota_from_env) 

284 return self 

285 

286 

287class CredentialsWithTokenUri(Credentials): 

288 """Abstract base for credentials supporting ``with_token_uri`` factory""" 

289 

290 def with_token_uri(self, token_uri): 

291 """Returns a copy of these credentials with a modified token uri. 

292 

293 Args: 

294 token_uri (str): The uri to use for fetching/exchanging tokens 

295 

296 Returns: 

297 google.auth.credentials.Credentials: A new credentials instance. 

298 """ 

299 raise NotImplementedError("This credential does not use token uri.") 

300 

301 

302class CredentialsWithUniverseDomain(Credentials): 

303 """Abstract base for credentials supporting ``with_universe_domain`` factory""" 

304 

305 def with_universe_domain(self, universe_domain): 

306 """Returns a copy of these credentials with a modified universe domain. 

307 

308 Args: 

309 universe_domain (str): The universe domain to use 

310 

311 Returns: 

312 google.auth.credentials.Credentials: A new credentials instance. 

313 """ 

314 raise NotImplementedError( 

315 "This credential does not support with_universe_domain." 

316 ) 

317 

318 

319class CredentialsWithRegionalAccessBoundary(Credentials): 

320 """Abstract base for credentials supporting regional access boundary configuration.""" 

321 

322 def __init__(self): 

323 super().__init__() 

324 self._rab_manager = ( 

325 _regional_access_boundary_utils._RegionalAccessBoundaryManager() 

326 ) 

327 

328 def __setstate__(self, state): 

329 """Pickle helper that restores state, safely reconstructing RAB fields if missing.""" 

330 self.__dict__.update(state) 

331 if "_rab_manager" not in self.__dict__: 

332 from google.auth import _regional_access_boundary_utils 

333 

334 self._rab_manager = ( 

335 _regional_access_boundary_utils._RegionalAccessBoundaryManager() 

336 ) 

337 if "_use_non_blocking_refresh" not in self.__dict__: 

338 self._use_non_blocking_refresh = False 

339 if "_refresh_worker" not in self.__dict__: 

340 from google.auth._refresh_worker import RefreshThreadManager 

341 

342 self._refresh_worker = RefreshThreadManager() 

343 

344 @property 

345 def regional_access_boundary(self): 

346 """Optional[str]: The encoded Regional Access Boundary locations.""" 

347 return self._rab_manager._data.encoded_locations 

348 

349 @property 

350 def regional_access_boundary_expiry(self): 

351 """Optional[datetime.datetime]: The expiration time of the Regional Access Boundary.""" 

352 return self._rab_manager._data.expiry 

353 

354 @abc.abstractmethod 

355 def _perform_refresh_token(self, request): 

356 """Refreshes the access token. 

357 

358 Args: 

359 request (google.auth.transport.Request): The object used to make 

360 HTTP requests. 

361 

362 Raises: 

363 google.auth.exceptions.RefreshError: If the credentials could 

364 not be refreshed. 

365 """ 

366 raise NotImplementedError("_perform_refresh_token must be implemented") 

367 

368 def with_trust_boundary(self, trust_boundary): 

369 """Returns a copy of these credentials. 

370 

371 .. deprecated:: 

372 Manual Regional Access Boundary overrides are not supported. 

373 This method is maintained for backwards compatibility and 

374 returns a copy of the credentials without modifying the 

375 Regional Access Boundary state. 

376 

377 Args: 

378 trust_boundary (Mapping[str, str]): Ignored. 

379 

380 Returns: 

381 google.auth.credentials.Credentials: A new credentials instance. 

382 """ 

383 import warnings 

384 

385 warnings.warn( 

386 "with_trust_boundary is deprecated and has no effect.", 

387 DeprecationWarning, 

388 stacklevel=2, 

389 ) 

390 make_copy = getattr(self, "_make_copy", None) 

391 if make_copy: 

392 return make_copy() 

393 else: 

394 raise NotImplementedError( 

395 "This credential does not support trust boundaries." 

396 ) 

397 

398 def _copy_regional_access_boundary_manager(self, target): 

399 """Copies the regional access boundary manager state to another instance.""" 

400 target._rab_manager._data = self._rab_manager._data 

401 target._rab_manager._use_blocking_regional_access_boundary_lookup = ( 

402 self._rab_manager._use_blocking_regional_access_boundary_lookup 

403 ) 

404 

405 def _set_regional_access_boundary(self, initial_boundary): 

406 """Applies the regional_access_boundary provided via the initial_boundary on these 

407 credentials. This is intended for internal use only as an invalid 

408 initial_boundary would produce unexpected results until automatic recovery 

409 is supported. Currently this is used by the gcloud CLI and therefore changes to the 

410 contract MUST be backwards compatible (e.g. the method signature must be 

411 unchanged and the credentials with the RAB set must be returned). 

412 

413 

414 Returns: 

415 google.auth.credentials.Credentials: The credentials instance. 

416 """ 

417 self._rab_manager.set_initial_regional_access_boundary( 

418 encoded_locations=initial_boundary.get("encodedLocations", None), 

419 expiry=initial_boundary.get("expiry", None), 

420 ) 

421 return self 

422 

423 def _set_blocking_regional_access_boundary_lookup(self): 

424 """Enables the blocking lookup mode on these credentials. 

425 This is intended for internal use only as blocking lookup requires additional 

426 care and consideration. Currently this is used by the gcloud CLI and 

427 therefore changes to the contract MUST be backwards compatible (e.g. the 

428 method signature must be unchanged and the credentials with the 

429 blocking lookup flag set to true must be returned). 

430 

431 Returns: 

432 google.auth.credentials.Credentials: The credentials instance. 

433 """ 

434 self._rab_manager.enable_blocking_lookup() 

435 return self 

436 

437 def _is_regional_endpoint(self, url): 

438 """Checks if the request URL is for a regional endpoint. 

439 

440 Args: 

441 url (str): The URL of the request. 

442 

443 Returns: 

444 bool: True if the URL is a regional endpoint, False otherwise. 

445 """ 

446 try: 

447 # Do not perform a lookup if the request is for a regional endpoint. 

448 hostname = urlparse(url).hostname 

449 if hostname and ( 

450 hostname.endswith(".rep.googleapis.com") 

451 or hostname.endswith(".rep.sandbox.googleapis.com") 

452 ): 

453 return True 

454 except (ValueError, TypeError, AttributeError): 

455 # If the URL is malformed, proceed with the default lookup behavior. 

456 pass 

457 

458 return False 

459 

460 def _maybe_start_regional_access_boundary_refresh(self, request, url): 

461 """ 

462 Starts a background thread to refresh the Regional Access Boundary if needed. 

463 

464 This method checks if a refresh is necessary and if one is not already 

465 in progress or in a cooldown period. If so, it starts a background 

466 thread to perform the lookup. 

467 

468 Args: 

469 request (google.auth.transport.Request): The object used to make 

470 HTTP requests. 

471 url (str): The URL of the request. 

472 """ 

473 # Do not perform a lookup if the request is for a regional endpoint. 

474 if self._is_regional_endpoint(url): 

475 return 

476 

477 # A refresh is only needed if the feature is enabled. 

478 if not self._is_regional_access_boundary_lookup_required(): 

479 return 

480 

481 # Trigger background or blocking refresh if needed 

482 self._rab_manager.maybe_start_refresh(self, request) 

483 

484 def _is_regional_access_boundary_lookup_required(self): 

485 """Checks if a Regional Access Boundary lookup is required. 

486 

487 A lookup is required if the feature is enabled via an environment 

488 variable and the universe domain is supported. 

489 

490 Returns: 

491 bool: True if a Regional Access Boundary lookup is required, False otherwise. 

492 """ 

493 # Check if the feature is enabled. 

494 if not _regional_access_boundary_utils.is_regional_access_boundary_enabled(): 

495 return False 

496 

497 # Skip for non-default universe domains. 

498 if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: 

499 return False 

500 

501 return True 

502 

503 def apply(self, headers, token=None): 

504 """Apply the token to the authentication header.""" 

505 super().apply(headers, token) 

506 self._rab_manager.apply_headers(headers) 

507 

508 def _after_refresh(self, request, method, url, headers): 

509 """Triggers the Regional Access Boundary lookup if necessary.""" 

510 self._maybe_start_regional_access_boundary_refresh(request, url) 

511 

512 def refresh(self, request): 

513 """Refreshes the access token. 

514 

515 This method calls the subclass's token refresh logic. The Regional 

516 Access Boundary is refreshed separately in a non-blocking way. 

517 """ 

518 self._perform_refresh_token(request) 

519 

520 def _lookup_regional_access_boundary( 

521 self, 

522 request: "google.auth.transport.Request", # noqa: F821 

523 fail_fast: bool = False, 

524 ) -> "Optional[Dict[str, str]]": 

525 """Calls the Regional Access Boundary lookup API to retrieve the Regional Access Boundary information. 

526 

527 Args: 

528 request (google.auth.transport.Request): The object used to make 

529 HTTP requests. 

530 fail_fast (bool): Whether the lookup should fail fast (short timeout, no retries). 

531 

532 Returns: 

533 Optional[Dict[str, str]]: The Regional Access Boundary information returned by the lookup API, or None if the lookup failed. 

534 """ 

535 from google.oauth2 import _client 

536 

537 url = self._build_regional_access_boundary_lookup_url(request=request) 

538 if not url: 

539 _LOGGER.warning("Failed to build Regional Access Boundary lookup URL.") 

540 return None 

541 

542 headers: Dict[str, str] = {} 

543 self._apply(headers) 

544 return _client._lookup_regional_access_boundary( 

545 request, url, headers=headers, fail_fast=fail_fast 

546 ) 

547 

548 @abc.abstractmethod 

549 def _build_regional_access_boundary_lookup_url( 

550 self, request: "Optional[google.auth.transport.Request]" = None # noqa: F821 

551 ): 

552 """ 

553 Builds and returns the URL for the Regional Access Boundary lookup API. 

554 

555 This method should be implemented by subclasses to provide the 

556 specific URL based on the credential type and its properties. 

557 

558 Args: 

559 request (Optional[google.auth.transport.Request]): The object used 

560 to make HTTP requests. In some subclasses, this may be used to 

561 make an initial network call to resolve required metadata for the 

562 URL. 

563 

564 Returns: 

565 str: The URL for the Regional Access Boundary lookup endpoint, or None 

566 if lookup should be skipped (e.g., for non-applicable universe domains). 

567 """ 

568 raise NotImplementedError( 

569 "_build_regional_access_boundary_lookup_url must be implemented" 

570 ) 

571 

572 

573class AnonymousCredentials(Credentials): 

574 """Credentials that do not provide any authentication information. 

575 

576 These are useful in the case of services that support anonymous access or 

577 local service emulators that do not use credentials. 

578 """ 

579 

580 @property 

581 def expired(self): 

582 """Returns `False`, anonymous credentials never expire.""" 

583 return False 

584 

585 @property 

586 def valid(self): 

587 """Returns `True`, anonymous credentials are always valid.""" 

588 return True 

589 

590 def refresh(self, request): 

591 """Raises :class:``InvalidOperation``, anonymous credentials cannot be 

592 refreshed.""" 

593 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.") 

594 

595 def apply(self, headers, token=None): 

596 """Anonymous credentials do nothing to the request. 

597 

598 The optional ``token`` argument is not supported. 

599 

600 Raises: 

601 google.auth.exceptions.InvalidValue: If a token was specified. 

602 """ 

603 if token is not None: 

604 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.") 

605 

606 def before_request(self, request, method, url, headers): 

607 """Anonymous credentials do nothing to the request.""" 

608 

609 

610class ReadOnlyScoped(metaclass=abc.ABCMeta): 

611 """Interface for credentials whose scopes can be queried. 

612 

613 OAuth 2.0-based credentials allow limiting access using scopes as described 

614 in `RFC6749 Section 3.3`_. 

615 If a credential class implements this interface then the credentials either 

616 use scopes in their implementation. 

617 

618 Some credentials require scopes in order to obtain a token. You can check 

619 if scoping is necessary with :attr:`requires_scopes`:: 

620 

621 if credentials.requires_scopes: 

622 # Scoping is required. 

623 credentials = credentials.with_scopes(scopes=['one', 'two']) 

624 

625 Credentials that require scopes must either be constructed with scopes:: 

626 

627 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

628 

629 Or must copy an existing instance using :meth:`with_scopes`:: 

630 

631 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

632 

633 Some credentials have scopes but do not allow or require scopes to be set, 

634 these credentials can be used as-is. 

635 

636 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

637 """ 

638 

639 def __init__(self): 

640 super(ReadOnlyScoped, self).__init__() 

641 self._scopes = None 

642 self._default_scopes = None 

643 

644 @property 

645 def scopes(self): 

646 """Sequence[str]: the credentials' current set of scopes.""" 

647 return self._scopes 

648 

649 @property 

650 def default_scopes(self): 

651 """Sequence[str]: the credentials' current set of default scopes.""" 

652 return self._default_scopes 

653 

654 @abc.abstractproperty 

655 def requires_scopes(self): 

656 """True if these credentials require scopes to obtain an access token.""" 

657 return False 

658 

659 def has_scopes(self, scopes): 

660 """Checks if the credentials have the given scopes. 

661 

662 .. warning: This method is not guaranteed to be accurate if the 

663 credentials are :attr:`~Credentials.invalid`. 

664 

665 Args: 

666 scopes (Sequence[str]): The list of scopes to check. 

667 

668 Returns: 

669 bool: True if the credentials have the given scopes. 

670 """ 

671 credential_scopes = ( 

672 self._scopes if self._scopes is not None else self._default_scopes 

673 ) 

674 return set(scopes).issubset(set(credential_scopes or [])) 

675 

676 

677class Scoped(ReadOnlyScoped): 

678 """Interface for credentials whose scopes can be replaced while copying. 

679 

680 OAuth 2.0-based credentials allow limiting access using scopes as described 

681 in `RFC6749 Section 3.3`_. 

682 If a credential class implements this interface then the credentials either 

683 use scopes in their implementation. 

684 

685 Some credentials require scopes in order to obtain a token. You can check 

686 if scoping is necessary with :attr:`requires_scopes`:: 

687 

688 if credentials.requires_scopes: 

689 # Scoping is required. 

690 credentials = credentials.create_scoped(['one', 'two']) 

691 

692 Credentials that require scopes must either be constructed with scopes:: 

693 

694 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

695 

696 Or must copy an existing instance using :meth:`with_scopes`:: 

697 

698 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

699 

700 Some credentials have scopes but do not allow or require scopes to be set, 

701 these credentials can be used as-is. 

702 

703 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

704 """ 

705 

706 @abc.abstractmethod 

707 def with_scopes(self, scopes, default_scopes=None): 

708 """Create a copy of these credentials with the specified scopes. 

709 

710 Args: 

711 scopes (Sequence[str]): The list of scopes to attach to the 

712 current credentials. 

713 

714 Raises: 

715 NotImplementedError: If the credentials' scopes can not be changed. 

716 This can be avoided by checking :attr:`requires_scopes` before 

717 calling this method. 

718 """ 

719 raise NotImplementedError("This class does not require scoping.") 

720 

721 

722def with_scopes_if_required(credentials, scopes, default_scopes=None): 

723 """Creates a copy of the credentials with scopes if scoping is required. 

724 

725 This helper function is useful when you do not know (or care to know) the 

726 specific type of credentials you are using (such as when you use 

727 :func:`google.auth.default`). This function will call 

728 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if 

729 the credentials require scoping. Otherwise, it will return the credentials 

730 as-is. 

731 

732 Args: 

733 credentials (google.auth.credentials.Credentials): The credentials to 

734 scope if necessary. 

735 scopes (Sequence[str]): The list of scopes to use. 

736 default_scopes (Sequence[str]): Default scopes passed by a 

737 Google client library. Use 'scopes' for user-defined scopes. 

738 

739 Returns: 

740 google.auth.credentials.Credentials: Either a new set of scoped 

741 credentials, or the passed in credentials instance if no scoping 

742 was required. 

743 """ 

744 if isinstance(credentials, Scoped) and credentials.requires_scopes: 

745 return credentials.with_scopes(scopes, default_scopes=default_scopes) 

746 else: 

747 return credentials 

748 

749 

750class Signing(metaclass=abc.ABCMeta): 

751 """Interface for credentials that can cryptographically sign messages.""" 

752 

753 @abc.abstractmethod 

754 def sign_bytes(self, message): 

755 """Signs the given message. 

756 

757 Args: 

758 message (bytes): The message to sign. 

759 

760 Returns: 

761 bytes: The message's cryptographic signature. 

762 """ 

763 # pylint: disable=missing-raises-doc,redundant-returns-doc 

764 # (pylint doesn't recognize that this is abstract) 

765 raise NotImplementedError("Sign bytes must be implemented.") 

766 

767 @abc.abstractproperty 

768 def signer_email(self): 

769 """Optional[str]: An email address that identifies the signer.""" 

770 # pylint: disable=missing-raises-doc 

771 # (pylint doesn't recognize that this is abstract) 

772 raise NotImplementedError("Signer email must be implemented.") 

773 

774 @abc.abstractproperty 

775 def signer(self): 

776 """google.auth.crypt.Signer: The signer used to sign bytes.""" 

777 # pylint: disable=missing-raises-doc 

778 # (pylint doesn't recognize that this is abstract) 

779 raise NotImplementedError("Signer must be implemented.") 

780 

781 

782class TokenState(Enum): 

783 """ 

784 Tracks the state of a token. 

785 FRESH: The token is valid. It is not expired or close to expired, or the token has no expiry. 

786 STALE: The token is close to expired, and should be refreshed. The token can be used normally. 

787 INVALID: The token is expired or invalid. The token cannot be used for a normal operation. 

788 """ 

789 

790 FRESH = 1 

791 STALE = 2 

792 INVALID = 3 

793 

794 

795class CredentialsWithTrustBoundary(CredentialsWithRegionalAccessBoundary): 

796 """Abstract base for credentials supporting legacy trust boundary configuration. 

797 

798 .. deprecated:: 

799 Use :class:`~google.auth.credentials.CredentialsWithRegionalAccessBoundary` instead. 

800 """ 

801 

802 def __init__(self): 

803 super().__init__() 

804 warnings.warn( 

805 "CredentialsWithTrustBoundary is deprecated. Use CredentialsWithRegionalAccessBoundary.", 

806 DeprecationWarning, 

807 stacklevel=2, 

808 ) 

809 

810 @abc.abstractmethod 

811 def _build_trust_boundary_lookup_url(self): 

812 """Deprecated: Implement _build_regional_access_boundary_lookup_url instead.""" 

813 raise NotImplementedError() 

814 

815 def _build_regional_access_boundary_lookup_url(self, request=None): 

816 warnings.warn( 

817 "CredentialsWithTrustBoundary is deprecated. Use CredentialsWithRegionalAccessBoundary.", 

818 DeprecationWarning, 

819 stacklevel=2, 

820 ) 

821 return self._build_trust_boundary_lookup_url()