Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/credentials.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

244 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16"""Interfaces for credentials.""" 

17 

18import abc 

19from enum import Enum 

20import logging 

21import os 

22from typing import Dict, List, Optional, TYPE_CHECKING 

23from urllib.parse import urlparse 

24import warnings 

25 

26 

27from google.auth import _helpers, environment_vars 

28from google.auth import _regional_access_boundary_utils 

29from google.auth import exceptions 

30from google.auth import metrics 

31from google.auth._credentials_base import _BaseCredentials 

32from google.auth._refresh_worker import RefreshThreadManager 

33 

34if TYPE_CHECKING: # pragma: NO COVER 

35 import google.auth.transport 

36 

37DEFAULT_UNIVERSE_DOMAIN = _helpers.DEFAULT_UNIVERSE_DOMAIN 

38 

39# These constants are deprecated and no longer used. 

40# They are kept solely for backward compatibility with older implementations. 

41NO_OP_TRUST_BOUNDARY_LOCATIONS: List[str] = [] 

42NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0" 

43 

44_LOGGER = logging.getLogger("google.auth._default") 

45 

46 

47class Credentials(_BaseCredentials): 

48 """Base class for all credentials. 

49 

50 All credentials have a :attr:`token` that is used for authentication and 

51 may also optionally set an :attr:`expiry` to indicate when the token will 

52 no longer be valid. 

53 

54 Most credentials will be :attr:`invalid` until :meth:`refresh` is called. 

55 Credentials can do this automatically before the first HTTP request in 

56 :meth:`before_request`. 

57 

58 Although the token and expiration will change as the credentials are 

59 :meth:`refreshed <refresh>` and used, credentials should be considered 

60 immutable. Various credentials will accept configuration such as private 

61 keys, scopes, and other options. These options are not changeable after 

62 construction. Some classes will provide mechanisms to copy the credentials 

63 with modifications such as :meth:`ScopedCredentials.with_scopes`. 

64 """ 

65 

66 def __init__(self): 

67 super(Credentials, self).__init__() 

68 

69 self.expiry = None 

70 """Optional[datetime]: When the token expires and is no longer valid. 

71 If this is None, the token is assumed to never expire.""" 

72 self._quota_project_id = None 

73 """Optional[str]: Project to use for quota and billing purposes.""" 

74 self._trust_boundary = None 

75 """Optional[dict]: Cache of a trust boundary response which has a list 

76 of allowed regions and an encoded string representation of credentials 

77 trust boundary.""" 

78 self._universe_domain = DEFAULT_UNIVERSE_DOMAIN 

79 """Optional[str]: The universe domain value, default is googleapis.com 

80 """ 

81 

82 self._use_non_blocking_refresh = False 

83 self._refresh_worker = RefreshThreadManager() 

84 

85 @property 

86 def expired(self): 

87 """Checks if the credentials are expired. 

88 

89 Note that credentials can be invalid but not expired because 

90 Credentials with :attr:`expiry` set to None is considered to never 

91 expire. 

92 

93 .. deprecated:: v2.24.0 

94 Prefer checking :attr:`token_state` instead. 

95 """ 

96 if not self.expiry: 

97 return False 

98 # Remove some threshold from expiry to err on the side of reporting 

99 # expiration early so that we avoid the 401-refresh-retry loop. 

100 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD 

101 return _helpers.utcnow() >= skewed_expiry 

102 

103 @property 

104 def valid(self): 

105 """Checks the validity of the credentials. 

106 

107 This is True if the credentials have a :attr:`token` and the token 

108 is not :attr:`expired`. 

109 

110 .. deprecated:: v2.24.0 

111 Prefer checking :attr:`token_state` instead. 

112 """ 

113 return self.token is not None and not self.expired 

114 

115 @property 

116 def token_state(self): 

117 """ 

118 See `:obj:`TokenState` 

119 """ 

120 if self.token is None: 

121 return TokenState.INVALID 

122 

123 # Credentials that can't expire are always treated as fresh. 

124 if self.expiry is None: 

125 return TokenState.FRESH 

126 

127 expired = _helpers.utcnow() >= self.expiry 

128 if expired: 

129 return TokenState.INVALID 

130 

131 is_stale = _helpers.utcnow() >= (self.expiry - _helpers.REFRESH_THRESHOLD) 

132 if is_stale: 

133 return TokenState.STALE 

134 

135 return TokenState.FRESH 

136 

137 @property 

138 def quota_project_id(self): 

139 """Project to use for quota and billing purposes.""" 

140 return self._quota_project_id 

141 

142 @property 

143 def universe_domain(self): 

144 """The universe domain value.""" 

145 return self._universe_domain 

146 

147 def get_cred_info(self): 

148 """The credential information JSON. 

149 

150 The credential information will be added to auth related error messages 

151 by client library. 

152 

153 Returns: 

154 Mapping[str, str]: The credential information JSON. 

155 """ 

156 return None 

157 

158 @abc.abstractmethod 

159 def refresh(self, request): 

160 """Refreshes the access token. 

161 

162 Args: 

163 request (google.auth.transport.Request): The object used to make 

164 HTTP requests. 

165 

166 Raises: 

167 google.auth.exceptions.RefreshError: If the credentials could 

168 not be refreshed. 

169 """ 

170 # pylint: disable=missing-raises-doc 

171 # (pylint doesn't recognize that this is abstract) 

172 raise NotImplementedError("Refresh must be implemented") 

173 

174 def _metric_header_for_usage(self): 

175 """The x-goog-api-client header for token usage metric. 

176 

177 This header will be added to the API service requests in before_request 

178 method. For example, "cred-type/sa-jwt" means service account self 

179 signed jwt access token is used in the API service request 

180 authorization header. Children credentials classes need to override 

181 this method to provide the header value, if the token usage metric is 

182 needed. 

183 

184 Returns: 

185 str: The x-goog-api-client header value. 

186 """ 

187 return None 

188 

189 def apply(self, headers, token=None): 

190 """Apply the token to the authentication header. 

191 

192 Args: 

193 headers (Mapping): The HTTP request headers. 

194 token (Optional[str]): If specified, overrides the current access 

195 token. 

196 """ 

197 self._apply(headers, token) 

198 if self.quota_project_id: 

199 headers["x-goog-user-project"] = self.quota_project_id 

200 

201 def _blocking_refresh(self, request): 

202 if not self.valid: 

203 self.refresh(request) 

204 

205 def _non_blocking_refresh(self, request): 

206 use_blocking_refresh_fallback = False 

207 

208 if self.token_state == TokenState.STALE: 

209 use_blocking_refresh_fallback = not self._refresh_worker.start_refresh( 

210 self, request 

211 ) 

212 

213 if self.token_state == TokenState.INVALID or use_blocking_refresh_fallback: 

214 self.refresh(request) 

215 # If the blocking refresh succeeds then we can clear the error info 

216 # on the background refresh worker, and perform refreshes in a 

217 # background thread. 

218 self._refresh_worker.clear_error() 

219 

220 def before_request(self, request, method, url, headers): 

221 """Performs credential-specific before request logic. 

222 

223 Refreshes the credentials if necessary, then calls :meth:`apply` to 

224 apply the token to the authentication header. 

225 

226 Args: 

227 request (google.auth.transport.Request): The object used to make 

228 HTTP requests. 

229 method (str): The request's HTTP method or the RPC method being 

230 invoked. 

231 url (str): The request's URI or the RPC service's URI. 

232 headers (Mapping): The request's headers. 

233 """ 

234 # pylint: disable=unused-argument 

235 # (Subclasses may use these arguments to ascertain information about 

236 # the http request.) 

237 if self._use_non_blocking_refresh: 

238 self._non_blocking_refresh(request) 

239 else: 

240 self._blocking_refresh(request) 

241 

242 self._after_refresh(request, method, url, headers) 

243 

244 metrics.add_metric_header(headers, self._metric_header_for_usage()) 

245 self.apply(headers) 

246 

247 def _after_refresh(self, request, method, url, headers): 

248 """Hook for subclasses to perform actions after refresh but before 

249 applying credentials to headers. 

250 

251 Args: 

252 request (google.auth.transport.Request): The object used to make 

253 HTTP requests. 

254 method (str): The request's HTTP method or the RPC method being 

255 invoked. 

256 url (str): The request's URI or the RPC service's URI. 

257 headers (Mapping): The request's headers. 

258 """ 

259 pass 

260 

261 def with_non_blocking_refresh(self): 

262 self._use_non_blocking_refresh = True 

263 

264 

265class CredentialsWithQuotaProject(Credentials): 

266 """Abstract base for credentials supporting ``with_quota_project`` factory""" 

267 

268 def with_quota_project(self, quota_project_id): 

269 """Returns a copy of these credentials with a modified quota project. 

270 

271 Args: 

272 quota_project_id (str): The project to use for quota and 

273 billing purposes 

274 

275 Returns: 

276 google.auth.credentials.Credentials: A new credentials instance. 

277 """ 

278 raise NotImplementedError("This credential does not support quota project.") 

279 

280 def with_quota_project_from_environment(self): 

281 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT) 

282 if quota_from_env: 

283 return self.with_quota_project(quota_from_env) 

284 return self 

285 

286 

287class CredentialsWithTokenUri(Credentials): 

288 """Abstract base for credentials supporting ``with_token_uri`` factory""" 

289 

290 def with_token_uri(self, token_uri): 

291 """Returns a copy of these credentials with a modified token uri. 

292 

293 Args: 

294 token_uri (str): The uri to use for fetching/exchanging tokens 

295 

296 Returns: 

297 google.auth.credentials.Credentials: A new credentials instance. 

298 """ 

299 raise NotImplementedError("This credential does not use token uri.") 

300 

301 

302class CredentialsWithUniverseDomain(Credentials): 

303 """Abstract base for credentials supporting ``with_universe_domain`` factory""" 

304 

305 def with_universe_domain(self, universe_domain): 

306 """Returns a copy of these credentials with a modified universe domain. 

307 

308 Args: 

309 universe_domain (str): The universe domain to use 

310 

311 Returns: 

312 google.auth.credentials.Credentials: A new credentials instance. 

313 """ 

314 raise NotImplementedError( 

315 "This credential does not support with_universe_domain." 

316 ) 

317 

318 

319class CredentialsWithRegionalAccessBoundary(Credentials): 

320 """Abstract base for credentials supporting regional access boundary configuration.""" 

321 

322 def __init__(self): 

323 super().__init__() 

324 self._rab_manager = ( 

325 _regional_access_boundary_utils._RegionalAccessBoundaryManager() 

326 ) 

327 

328 def __setstate__(self, state): 

329 """Pickle helper that restores state, safely reconstructing RAB fields if missing.""" 

330 self.__dict__.update(state) 

331 if "_rab_manager" not in self.__dict__: 

332 from google.auth import _regional_access_boundary_utils 

333 

334 self._rab_manager = ( 

335 _regional_access_boundary_utils._RegionalAccessBoundaryManager() 

336 ) 

337 if "_use_non_blocking_refresh" not in self.__dict__: 

338 self._use_non_blocking_refresh = False 

339 if "_refresh_worker" not in self.__dict__: 

340 from google.auth._refresh_worker import RefreshThreadManager 

341 

342 self._refresh_worker = RefreshThreadManager() 

343 

344 @property 

345 def regional_access_boundary(self): 

346 """Optional[str]: The encoded Regional Access Boundary locations.""" 

347 return self._rab_manager._data.encoded_locations 

348 

349 @property 

350 def regional_access_boundary_expiry(self): 

351 """Optional[datetime.datetime]: The expiration time of the Regional Access Boundary.""" 

352 return self._rab_manager._data.expiry 

353 

354 @abc.abstractmethod 

355 def _perform_refresh_token(self, request): 

356 """Refreshes the access token. 

357 

358 Args: 

359 request (google.auth.transport.Request): The object used to make 

360 HTTP requests. 

361 

362 Raises: 

363 google.auth.exceptions.RefreshError: If the credentials could 

364 not be refreshed. 

365 """ 

366 raise NotImplementedError("_perform_refresh_token must be implemented") 

367 

368 def with_trust_boundary(self, trust_boundary): 

369 """Returns a copy of these credentials. 

370 

371 .. deprecated:: 

372 Manual Regional Access Boundary overrides are not supported. 

373 This method is maintained for backwards compatibility and 

374 returns a copy of the credentials without modifying the 

375 Regional Access Boundary state. 

376 

377 Args: 

378 trust_boundary (Mapping[str, str]): Ignored. 

379 

380 Returns: 

381 google.auth.credentials.Credentials: A new credentials instance. 

382 """ 

383 import warnings 

384 

385 warnings.warn( 

386 "with_trust_boundary is deprecated and has no effect.", 

387 DeprecationWarning, 

388 stacklevel=2, 

389 ) 

390 make_copy = getattr(self, "_make_copy", None) 

391 if make_copy: 

392 return make_copy() 

393 else: 

394 raise NotImplementedError( 

395 "This credential does not support trust boundaries." 

396 ) 

397 

398 def _copy_regional_access_boundary_manager(self, target): 

399 """Copies the regional access boundary manager state to another instance.""" 

400 target._rab_manager._data = self._rab_manager._data 

401 target._rab_manager._use_blocking_regional_access_boundary_lookup = ( 

402 self._rab_manager._use_blocking_regional_access_boundary_lookup 

403 ) 

404 

405 def _set_regional_access_boundary(self, initial_boundary): 

406 """Applies the regional_access_boundary provided via the initial_boundary on these 

407 credentials. This is intended for internal use only as an invalid 

408 initial_boundary would produce unexpected results until automatic recovery 

409 is supported. Currently this is used by the gcloud CLI and therefore changes to the 

410 contract MUST be backwards compatible (e.g. the method signature must be 

411 unchanged and the credentials with the RAB set must be returned). 

412 

413 

414 Returns: 

415 google.auth.credentials.Credentials: The credentials instance. 

416 """ 

417 self._rab_manager.set_initial_regional_access_boundary( 

418 encoded_locations=initial_boundary.get("encodedLocations", None), 

419 expiry=initial_boundary.get("expiry", None), 

420 ) 

421 return self 

422 

423 def _set_blocking_regional_access_boundary_lookup(self): 

424 """Enables the blocking lookup mode on these credentials. 

425 This is intended for internal use only as blocking lookup requires additional 

426 care and consideration. Currently this is used by the gcloud CLI and 

427 therefore changes to the contract MUST be backwards compatible (e.g. the 

428 method signature must be unchanged and the credentials with the 

429 blocking lookup flag set to true must be returned). 

430 

431 Returns: 

432 google.auth.credentials.Credentials: The credentials instance. 

433 """ 

434 self._rab_manager.enable_blocking_lookup() 

435 return self 

436 

437 def _is_regional_endpoint(self, url): 

438 """Checks if the request URL is for a regional endpoint. 

439 

440 Args: 

441 url (str): The URL of the request. 

442 

443 Returns: 

444 bool: True if the URL is a regional endpoint, False otherwise. 

445 """ 

446 try: 

447 # Do not perform a lookup if the request is for a regional endpoint. 

448 hostname = urlparse(url).hostname 

449 if hostname and hostname.endswith( 

450 ( 

451 ".rep.googleapis.com", 

452 ".rep.sandbox.googleapis.com", 

453 ".rep.mtls.googleapis.com", 

454 ".rep.mtls.sandbox.googleapis.com", 

455 ) 

456 ): 

457 return True 

458 except (ValueError, TypeError, AttributeError): 

459 # If the URL is malformed, proceed with the default lookup behavior. 

460 pass 

461 

462 return False 

463 

464 def _maybe_start_regional_access_boundary_refresh(self, request, url): 

465 """ 

466 Starts a background thread to refresh the Regional Access Boundary if needed. 

467 

468 This method checks if a refresh is necessary and if one is not already 

469 in progress or in a cooldown period. If so, it starts a background 

470 thread to perform the lookup. 

471 

472 Args: 

473 request (google.auth.transport.Request): The object used to make 

474 HTTP requests. 

475 url (str): The URL of the request. 

476 """ 

477 # Do not perform a lookup if the request is for a regional endpoint. 

478 if self._is_regional_endpoint(url): 

479 return 

480 

481 # A refresh is only needed if the feature is enabled. 

482 if not self._is_regional_access_boundary_lookup_required(): 

483 return 

484 

485 # Trigger background or blocking refresh if needed 

486 self._rab_manager.maybe_start_refresh(self, request) 

487 

488 def _is_regional_access_boundary_lookup_required(self): 

489 """Checks if a Regional Access Boundary lookup is required. 

490 

491 A lookup is required if the universe domain is supported. 

492 

493 Returns: 

494 bool: True if a Regional Access Boundary lookup is required, False otherwise. 

495 """ 

496 # Skip for non-default universe domains. 

497 if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN: 

498 return False 

499 

500 return True 

501 

502 def apply(self, headers, token=None): 

503 """Apply the token to the authentication header.""" 

504 super().apply(headers, token) 

505 self._rab_manager.apply_headers(headers) 

506 

507 def _after_refresh(self, request, method, url, headers): 

508 """Triggers the Regional Access Boundary lookup if necessary.""" 

509 self._maybe_start_regional_access_boundary_refresh(request, url) 

510 

511 def refresh(self, request): 

512 """Refreshes the access token. 

513 

514 This method calls the subclass's token refresh logic. The Regional 

515 Access Boundary is refreshed separately in a non-blocking way. 

516 """ 

517 self._perform_refresh_token(request) 

518 

519 def _lookup_regional_access_boundary( 

520 self, 

521 request: "google.auth.transport.Request", # noqa: F821 

522 fail_fast: bool = False, 

523 ) -> "Optional[Dict[str, str]]": 

524 """Calls the Regional Access Boundary lookup API to retrieve the Regional Access Boundary information. 

525 

526 Args: 

527 request (google.auth.transport.Request): The object used to make 

528 HTTP requests. 

529 fail_fast (bool): Whether the lookup should fail fast (short timeout, no retries). 

530 

531 Returns: 

532 Optional[Dict[str, str]]: The Regional Access Boundary information returned by the lookup API, or None if the lookup failed. 

533 """ 

534 from google.oauth2 import _client 

535 

536 url = self._build_regional_access_boundary_lookup_url(request=request) 

537 if not url: 

538 _LOGGER.warning("Failed to build Regional Access Boundary lookup URL.") 

539 return None 

540 

541 headers: Dict[str, str] = {} 

542 self._apply(headers) 

543 return _client._lookup_regional_access_boundary( 

544 request, url, headers=headers, fail_fast=fail_fast 

545 ) 

546 

547 @abc.abstractmethod 

548 def _build_regional_access_boundary_lookup_url( 

549 self, request: "Optional[google.auth.transport.Request]" = None # noqa: F821 

550 ): 

551 """ 

552 Builds and returns the URL for the Regional Access Boundary lookup API. 

553 

554 This method should be implemented by subclasses to provide the 

555 specific URL based on the credential type and its properties. 

556 

557 Args: 

558 request (Optional[google.auth.transport.Request]): The object used 

559 to make HTTP requests. In some subclasses, this may be used to 

560 make an initial network call to resolve required metadata for the 

561 URL. 

562 

563 Returns: 

564 str: The URL for the Regional Access Boundary lookup endpoint, or None 

565 if lookup should be skipped (e.g., for non-applicable universe domains). 

566 """ 

567 raise NotImplementedError( 

568 "_build_regional_access_boundary_lookup_url must be implemented" 

569 ) 

570 

571 

572class AnonymousCredentials(Credentials): 

573 """Credentials that do not provide any authentication information. 

574 

575 These are useful in the case of services that support anonymous access or 

576 local service emulators that do not use credentials. 

577 """ 

578 

579 @property 

580 def expired(self): 

581 """Returns `False`, anonymous credentials never expire.""" 

582 return False 

583 

584 @property 

585 def valid(self): 

586 """Returns `True`, anonymous credentials are always valid.""" 

587 return True 

588 

589 def refresh(self, request): 

590 """Raises :class:``InvalidOperation``, anonymous credentials cannot be 

591 refreshed.""" 

592 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.") 

593 

594 def apply(self, headers, token=None): 

595 """Anonymous credentials do nothing to the request. 

596 

597 The optional ``token`` argument is not supported. 

598 

599 Raises: 

600 google.auth.exceptions.InvalidValue: If a token was specified. 

601 """ 

602 if token is not None: 

603 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.") 

604 

605 def before_request(self, request, method, url, headers): 

606 """Anonymous credentials do nothing to the request.""" 

607 

608 

609class ReadOnlyScoped(metaclass=abc.ABCMeta): 

610 """Interface for credentials whose scopes can be queried. 

611 

612 OAuth 2.0-based credentials allow limiting access using scopes as described 

613 in `RFC6749 Section 3.3`_. 

614 If a credential class implements this interface then the credentials either 

615 use scopes in their implementation. 

616 

617 Some credentials require scopes in order to obtain a token. You can check 

618 if scoping is necessary with :attr:`requires_scopes`:: 

619 

620 if credentials.requires_scopes: 

621 # Scoping is required. 

622 credentials = credentials.with_scopes(scopes=['one', 'two']) 

623 

624 Credentials that require scopes must either be constructed with scopes:: 

625 

626 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

627 

628 Or must copy an existing instance using :meth:`with_scopes`:: 

629 

630 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

631 

632 Some credentials have scopes but do not allow or require scopes to be set, 

633 these credentials can be used as-is. 

634 

635 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

636 """ 

637 

638 def __init__(self): 

639 super(ReadOnlyScoped, self).__init__() 

640 self._scopes = None 

641 self._default_scopes = None 

642 

643 @property 

644 def scopes(self): 

645 """Sequence[str]: the credentials' current set of scopes.""" 

646 return self._scopes 

647 

648 @property 

649 def default_scopes(self): 

650 """Sequence[str]: the credentials' current set of default scopes.""" 

651 return self._default_scopes 

652 

653 @abc.abstractproperty 

654 def requires_scopes(self): 

655 """True if these credentials require scopes to obtain an access token.""" 

656 return False 

657 

658 def has_scopes(self, scopes): 

659 """Checks if the credentials have the given scopes. 

660 

661 .. warning: This method is not guaranteed to be accurate if the 

662 credentials are :attr:`~Credentials.invalid`. 

663 

664 Args: 

665 scopes (Sequence[str]): The list of scopes to check. 

666 

667 Returns: 

668 bool: True if the credentials have the given scopes. 

669 """ 

670 credential_scopes = ( 

671 self._scopes if self._scopes is not None else self._default_scopes 

672 ) 

673 return set(scopes).issubset(set(credential_scopes or [])) 

674 

675 

676class Scoped(ReadOnlyScoped): 

677 """Interface for credentials whose scopes can be replaced while copying. 

678 

679 OAuth 2.0-based credentials allow limiting access using scopes as described 

680 in `RFC6749 Section 3.3`_. 

681 If a credential class implements this interface then the credentials either 

682 use scopes in their implementation. 

683 

684 Some credentials require scopes in order to obtain a token. You can check 

685 if scoping is necessary with :attr:`requires_scopes`:: 

686 

687 if credentials.requires_scopes: 

688 # Scoping is required. 

689 credentials = credentials.create_scoped(['one', 'two']) 

690 

691 Credentials that require scopes must either be constructed with scopes:: 

692 

693 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

694 

695 Or must copy an existing instance using :meth:`with_scopes`:: 

696 

697 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

698 

699 Some credentials have scopes but do not allow or require scopes to be set, 

700 these credentials can be used as-is. 

701 

702 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

703 """ 

704 

705 @abc.abstractmethod 

706 def with_scopes(self, scopes, default_scopes=None): 

707 """Create a copy of these credentials with the specified scopes. 

708 

709 Args: 

710 scopes (Sequence[str]): The list of scopes to attach to the 

711 current credentials. 

712 

713 Raises: 

714 NotImplementedError: If the credentials' scopes can not be changed. 

715 This can be avoided by checking :attr:`requires_scopes` before 

716 calling this method. 

717 """ 

718 raise NotImplementedError("This class does not require scoping.") 

719 

720 

721def with_scopes_if_required(credentials, scopes, default_scopes=None): 

722 """Creates a copy of the credentials with scopes if scoping is required. 

723 

724 This helper function is useful when you do not know (or care to know) the 

725 specific type of credentials you are using (such as when you use 

726 :func:`google.auth.default`). This function will call 

727 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if 

728 the credentials require scoping. Otherwise, it will return the credentials 

729 as-is. 

730 

731 Args: 

732 credentials (google.auth.credentials.Credentials): The credentials to 

733 scope if necessary. 

734 scopes (Sequence[str]): The list of scopes to use. 

735 default_scopes (Sequence[str]): Default scopes passed by a 

736 Google client library. Use 'scopes' for user-defined scopes. 

737 

738 Returns: 

739 google.auth.credentials.Credentials: Either a new set of scoped 

740 credentials, or the passed in credentials instance if no scoping 

741 was required. 

742 """ 

743 if isinstance(credentials, Scoped) and credentials.requires_scopes: 

744 return credentials.with_scopes(scopes, default_scopes=default_scopes) 

745 else: 

746 return credentials 

747 

748 

749class Signing(metaclass=abc.ABCMeta): 

750 """Interface for credentials that can cryptographically sign messages.""" 

751 

752 @abc.abstractmethod 

753 def sign_bytes(self, message): 

754 """Signs the given message. 

755 

756 Args: 

757 message (bytes): The message to sign. 

758 

759 Returns: 

760 bytes: The message's cryptographic signature. 

761 """ 

762 # pylint: disable=missing-raises-doc,redundant-returns-doc 

763 # (pylint doesn't recognize that this is abstract) 

764 raise NotImplementedError("Sign bytes must be implemented.") 

765 

766 @abc.abstractproperty 

767 def signer_email(self): 

768 """Optional[str]: An email address that identifies the signer.""" 

769 # pylint: disable=missing-raises-doc 

770 # (pylint doesn't recognize that this is abstract) 

771 raise NotImplementedError("Signer email must be implemented.") 

772 

773 @abc.abstractproperty 

774 def signer(self): 

775 """google.auth.crypt.Signer: The signer used to sign bytes.""" 

776 # pylint: disable=missing-raises-doc 

777 # (pylint doesn't recognize that this is abstract) 

778 raise NotImplementedError("Signer must be implemented.") 

779 

780 

781class TokenState(Enum): 

782 """ 

783 Tracks the state of a token. 

784 FRESH: The token is valid. It is not expired or close to expired, or the token has no expiry. 

785 STALE: The token is close to expired, and should be refreshed. The token can be used normally. 

786 INVALID: The token is expired or invalid. The token cannot be used for a normal operation. 

787 """ 

788 

789 FRESH = 1 

790 STALE = 2 

791 INVALID = 3 

792 

793 

794class CredentialsWithTrustBoundary(CredentialsWithRegionalAccessBoundary): 

795 """Abstract base for credentials supporting legacy trust boundary configuration. 

796 

797 .. deprecated:: 

798 Use :class:`~google.auth.credentials.CredentialsWithRegionalAccessBoundary` instead. 

799 """ 

800 

801 def __init__(self): 

802 super().__init__() 

803 warnings.warn( 

804 "CredentialsWithTrustBoundary is deprecated. Use CredentialsWithRegionalAccessBoundary.", 

805 DeprecationWarning, 

806 stacklevel=2, 

807 ) 

808 

809 @abc.abstractmethod 

810 def _build_trust_boundary_lookup_url(self): 

811 """Deprecated: Implement _build_regional_access_boundary_lookup_url instead.""" 

812 raise NotImplementedError() 

813 

814 def _build_regional_access_boundary_lookup_url(self, request=None): 

815 warnings.warn( 

816 "CredentialsWithTrustBoundary is deprecated. Use CredentialsWithRegionalAccessBoundary.", 

817 DeprecationWarning, 

818 stacklevel=2, 

819 ) 

820 return self._build_trust_boundary_lookup_url()