1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16"""Interfaces for credentials."""
17
18import abc
19from enum import Enum
20import logging
21import os
22from typing import Dict, List, Optional, TYPE_CHECKING
23from urllib.parse import urlparse
24import warnings
25
26
27from google.auth import _helpers, environment_vars
28from google.auth import _regional_access_boundary_utils
29from google.auth import exceptions
30from google.auth import metrics
31from google.auth._credentials_base import _BaseCredentials
32from google.auth._refresh_worker import RefreshThreadManager
33
34if TYPE_CHECKING: # pragma: NO COVER
35 import google.auth.transport
36
37DEFAULT_UNIVERSE_DOMAIN = _helpers.DEFAULT_UNIVERSE_DOMAIN
38
39# These constants are deprecated and no longer used.
40# They are kept solely for backward compatibility with older implementations.
41NO_OP_TRUST_BOUNDARY_LOCATIONS: List[str] = []
42NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0"
43
44_LOGGER = logging.getLogger("google.auth._default")
45
46
47class Credentials(_BaseCredentials):
48 """Base class for all credentials.
49
50 All credentials have a :attr:`token` that is used for authentication and
51 may also optionally set an :attr:`expiry` to indicate when the token will
52 no longer be valid.
53
54 Most credentials will be :attr:`invalid` until :meth:`refresh` is called.
55 Credentials can do this automatically before the first HTTP request in
56 :meth:`before_request`.
57
58 Although the token and expiration will change as the credentials are
59 :meth:`refreshed <refresh>` and used, credentials should be considered
60 immutable. Various credentials will accept configuration such as private
61 keys, scopes, and other options. These options are not changeable after
62 construction. Some classes will provide mechanisms to copy the credentials
63 with modifications such as :meth:`ScopedCredentials.with_scopes`.
64 """
65
66 def __init__(self):
67 super(Credentials, self).__init__()
68
69 self.expiry = None
70 """Optional[datetime]: When the token expires and is no longer valid.
71 If this is None, the token is assumed to never expire."""
72 self._quota_project_id = None
73 """Optional[str]: Project to use for quota and billing purposes."""
74 self._trust_boundary = None
75 """Optional[dict]: Cache of a trust boundary response which has a list
76 of allowed regions and an encoded string representation of credentials
77 trust boundary."""
78 self._universe_domain = DEFAULT_UNIVERSE_DOMAIN
79 """Optional[str]: The universe domain value, default is googleapis.com
80 """
81
82 self._use_non_blocking_refresh = False
83 self._refresh_worker = RefreshThreadManager()
84
85 @property
86 def expired(self):
87 """Checks if the credentials are expired.
88
89 Note that credentials can be invalid but not expired because
90 Credentials with :attr:`expiry` set to None is considered to never
91 expire.
92
93 .. deprecated:: v2.24.0
94 Prefer checking :attr:`token_state` instead.
95 """
96 if not self.expiry:
97 return False
98 # Remove some threshold from expiry to err on the side of reporting
99 # expiration early so that we avoid the 401-refresh-retry loop.
100 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD
101 return _helpers.utcnow() >= skewed_expiry
102
103 @property
104 def valid(self):
105 """Checks the validity of the credentials.
106
107 This is True if the credentials have a :attr:`token` and the token
108 is not :attr:`expired`.
109
110 .. deprecated:: v2.24.0
111 Prefer checking :attr:`token_state` instead.
112 """
113 return self.token is not None and not self.expired
114
115 @property
116 def token_state(self):
117 """
118 See `:obj:`TokenState`
119 """
120 if self.token is None:
121 return TokenState.INVALID
122
123 # Credentials that can't expire are always treated as fresh.
124 if self.expiry is None:
125 return TokenState.FRESH
126
127 expired = _helpers.utcnow() >= self.expiry
128 if expired:
129 return TokenState.INVALID
130
131 is_stale = _helpers.utcnow() >= (self.expiry - _helpers.REFRESH_THRESHOLD)
132 if is_stale:
133 return TokenState.STALE
134
135 return TokenState.FRESH
136
137 @property
138 def quota_project_id(self):
139 """Project to use for quota and billing purposes."""
140 return self._quota_project_id
141
142 @property
143 def universe_domain(self):
144 """The universe domain value."""
145 return self._universe_domain
146
147 def get_cred_info(self):
148 """The credential information JSON.
149
150 The credential information will be added to auth related error messages
151 by client library.
152
153 Returns:
154 Mapping[str, str]: The credential information JSON.
155 """
156 return None
157
158 @abc.abstractmethod
159 def refresh(self, request):
160 """Refreshes the access token.
161
162 Args:
163 request (google.auth.transport.Request): The object used to make
164 HTTP requests.
165
166 Raises:
167 google.auth.exceptions.RefreshError: If the credentials could
168 not be refreshed.
169 """
170 # pylint: disable=missing-raises-doc
171 # (pylint doesn't recognize that this is abstract)
172 raise NotImplementedError("Refresh must be implemented")
173
174 def _metric_header_for_usage(self):
175 """The x-goog-api-client header for token usage metric.
176
177 This header will be added to the API service requests in before_request
178 method. For example, "cred-type/sa-jwt" means service account self
179 signed jwt access token is used in the API service request
180 authorization header. Children credentials classes need to override
181 this method to provide the header value, if the token usage metric is
182 needed.
183
184 Returns:
185 str: The x-goog-api-client header value.
186 """
187 return None
188
189 def apply(self, headers, token=None):
190 """Apply the token to the authentication header.
191
192 Args:
193 headers (Mapping): The HTTP request headers.
194 token (Optional[str]): If specified, overrides the current access
195 token.
196 """
197 self._apply(headers, token)
198 if self.quota_project_id:
199 headers["x-goog-user-project"] = self.quota_project_id
200
201 def _blocking_refresh(self, request):
202 if not self.valid:
203 self.refresh(request)
204
205 def _non_blocking_refresh(self, request):
206 use_blocking_refresh_fallback = False
207
208 if self.token_state == TokenState.STALE:
209 use_blocking_refresh_fallback = not self._refresh_worker.start_refresh(
210 self, request
211 )
212
213 if self.token_state == TokenState.INVALID or use_blocking_refresh_fallback:
214 self.refresh(request)
215 # If the blocking refresh succeeds then we can clear the error info
216 # on the background refresh worker, and perform refreshes in a
217 # background thread.
218 self._refresh_worker.clear_error()
219
220 def before_request(self, request, method, url, headers):
221 """Performs credential-specific before request logic.
222
223 Refreshes the credentials if necessary, then calls :meth:`apply` to
224 apply the token to the authentication header.
225
226 Args:
227 request (google.auth.transport.Request): The object used to make
228 HTTP requests.
229 method (str): The request's HTTP method or the RPC method being
230 invoked.
231 url (str): The request's URI or the RPC service's URI.
232 headers (Mapping): The request's headers.
233 """
234 # pylint: disable=unused-argument
235 # (Subclasses may use these arguments to ascertain information about
236 # the http request.)
237 if self._use_non_blocking_refresh:
238 self._non_blocking_refresh(request)
239 else:
240 self._blocking_refresh(request)
241
242 self._after_refresh(request, method, url, headers)
243
244 metrics.add_metric_header(headers, self._metric_header_for_usage())
245 self.apply(headers)
246
247 def _after_refresh(self, request, method, url, headers):
248 """Hook for subclasses to perform actions after refresh but before
249 applying credentials to headers.
250
251 Args:
252 request (google.auth.transport.Request): The object used to make
253 HTTP requests.
254 method (str): The request's HTTP method or the RPC method being
255 invoked.
256 url (str): The request's URI or the RPC service's URI.
257 headers (Mapping): The request's headers.
258 """
259 pass
260
261 def with_non_blocking_refresh(self):
262 self._use_non_blocking_refresh = True
263
264
265class CredentialsWithQuotaProject(Credentials):
266 """Abstract base for credentials supporting ``with_quota_project`` factory"""
267
268 def with_quota_project(self, quota_project_id):
269 """Returns a copy of these credentials with a modified quota project.
270
271 Args:
272 quota_project_id (str): The project to use for quota and
273 billing purposes
274
275 Returns:
276 google.auth.credentials.Credentials: A new credentials instance.
277 """
278 raise NotImplementedError("This credential does not support quota project.")
279
280 def with_quota_project_from_environment(self):
281 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT)
282 if quota_from_env:
283 return self.with_quota_project(quota_from_env)
284 return self
285
286
287class CredentialsWithTokenUri(Credentials):
288 """Abstract base for credentials supporting ``with_token_uri`` factory"""
289
290 def with_token_uri(self, token_uri):
291 """Returns a copy of these credentials with a modified token uri.
292
293 Args:
294 token_uri (str): The uri to use for fetching/exchanging tokens
295
296 Returns:
297 google.auth.credentials.Credentials: A new credentials instance.
298 """
299 raise NotImplementedError("This credential does not use token uri.")
300
301
302class CredentialsWithUniverseDomain(Credentials):
303 """Abstract base for credentials supporting ``with_universe_domain`` factory"""
304
305 def with_universe_domain(self, universe_domain):
306 """Returns a copy of these credentials with a modified universe domain.
307
308 Args:
309 universe_domain (str): The universe domain to use
310
311 Returns:
312 google.auth.credentials.Credentials: A new credentials instance.
313 """
314 raise NotImplementedError(
315 "This credential does not support with_universe_domain."
316 )
317
318
319class CredentialsWithRegionalAccessBoundary(Credentials):
320 """Abstract base for credentials supporting regional access boundary configuration."""
321
322 def __init__(self):
323 super().__init__()
324 self._rab_manager = (
325 _regional_access_boundary_utils._RegionalAccessBoundaryManager()
326 )
327
328 def __setstate__(self, state):
329 """Pickle helper that restores state, safely reconstructing RAB fields if missing."""
330 self.__dict__.update(state)
331 if "_rab_manager" not in self.__dict__:
332 from google.auth import _regional_access_boundary_utils
333
334 self._rab_manager = (
335 _regional_access_boundary_utils._RegionalAccessBoundaryManager()
336 )
337 if "_use_non_blocking_refresh" not in self.__dict__:
338 self._use_non_blocking_refresh = False
339 if "_refresh_worker" not in self.__dict__:
340 from google.auth._refresh_worker import RefreshThreadManager
341
342 self._refresh_worker = RefreshThreadManager()
343
344 @property
345 def regional_access_boundary(self):
346 """Optional[str]: The encoded Regional Access Boundary locations."""
347 return self._rab_manager._data.encoded_locations
348
349 @property
350 def regional_access_boundary_expiry(self):
351 """Optional[datetime.datetime]: The expiration time of the Regional Access Boundary."""
352 return self._rab_manager._data.expiry
353
354 @abc.abstractmethod
355 def _perform_refresh_token(self, request):
356 """Refreshes the access token.
357
358 Args:
359 request (google.auth.transport.Request): The object used to make
360 HTTP requests.
361
362 Raises:
363 google.auth.exceptions.RefreshError: If the credentials could
364 not be refreshed.
365 """
366 raise NotImplementedError("_perform_refresh_token must be implemented")
367
368 def with_trust_boundary(self, trust_boundary):
369 """Returns a copy of these credentials.
370
371 .. deprecated::
372 Manual Regional Access Boundary overrides are not supported.
373 This method is maintained for backwards compatibility and
374 returns a copy of the credentials without modifying the
375 Regional Access Boundary state.
376
377 Args:
378 trust_boundary (Mapping[str, str]): Ignored.
379
380 Returns:
381 google.auth.credentials.Credentials: A new credentials instance.
382 """
383 import warnings
384
385 warnings.warn(
386 "with_trust_boundary is deprecated and has no effect.",
387 DeprecationWarning,
388 stacklevel=2,
389 )
390 make_copy = getattr(self, "_make_copy", None)
391 if make_copy:
392 return make_copy()
393 else:
394 raise NotImplementedError(
395 "This credential does not support trust boundaries."
396 )
397
398 def _copy_regional_access_boundary_manager(self, target):
399 """Copies the regional access boundary manager state to another instance."""
400 target._rab_manager._data = self._rab_manager._data
401 target._rab_manager._use_blocking_regional_access_boundary_lookup = (
402 self._rab_manager._use_blocking_regional_access_boundary_lookup
403 )
404
405 def _set_regional_access_boundary(self, initial_boundary):
406 """Applies the regional_access_boundary provided via the initial_boundary on these
407 credentials. This is intended for internal use only as an invalid
408 initial_boundary would produce unexpected results until automatic recovery
409 is supported. Currently this is used by the gcloud CLI and therefore changes to the
410 contract MUST be backwards compatible (e.g. the method signature must be
411 unchanged and the credentials with the RAB set must be returned).
412
413
414 Returns:
415 google.auth.credentials.Credentials: The credentials instance.
416 """
417 self._rab_manager.set_initial_regional_access_boundary(
418 encoded_locations=initial_boundary.get("encodedLocations", None),
419 expiry=initial_boundary.get("expiry", None),
420 )
421 return self
422
423 def _set_blocking_regional_access_boundary_lookup(self):
424 """Enables the blocking lookup mode on these credentials.
425 This is intended for internal use only as blocking lookup requires additional
426 care and consideration. Currently this is used by the gcloud CLI and
427 therefore changes to the contract MUST be backwards compatible (e.g. the
428 method signature must be unchanged and the credentials with the
429 blocking lookup flag set to true must be returned).
430
431 Returns:
432 google.auth.credentials.Credentials: The credentials instance.
433 """
434 self._rab_manager.enable_blocking_lookup()
435 return self
436
437 def _is_regional_endpoint(self, url):
438 """Checks if the request URL is for a regional endpoint.
439
440 Args:
441 url (str): The URL of the request.
442
443 Returns:
444 bool: True if the URL is a regional endpoint, False otherwise.
445 """
446 try:
447 # Do not perform a lookup if the request is for a regional endpoint.
448 hostname = urlparse(url).hostname
449 if hostname and hostname.endswith(
450 (
451 ".rep.googleapis.com",
452 ".rep.sandbox.googleapis.com",
453 ".rep.mtls.googleapis.com",
454 ".rep.mtls.sandbox.googleapis.com",
455 )
456 ):
457 return True
458 except (ValueError, TypeError, AttributeError):
459 # If the URL is malformed, proceed with the default lookup behavior.
460 pass
461
462 return False
463
464 def _maybe_start_regional_access_boundary_refresh(self, request, url):
465 """
466 Starts a background thread to refresh the Regional Access Boundary if needed.
467
468 This method checks if a refresh is necessary and if one is not already
469 in progress or in a cooldown period. If so, it starts a background
470 thread to perform the lookup.
471
472 Args:
473 request (google.auth.transport.Request): The object used to make
474 HTTP requests.
475 url (str): The URL of the request.
476 """
477 # Do not perform a lookup if the request is for a regional endpoint.
478 if self._is_regional_endpoint(url):
479 return
480
481 # A refresh is only needed if the feature is enabled.
482 if not self._is_regional_access_boundary_lookup_required():
483 return
484
485 # Trigger background or blocking refresh if needed
486 self._rab_manager.maybe_start_refresh(self, request)
487
488 def _is_regional_access_boundary_lookup_required(self):
489 """Checks if a Regional Access Boundary lookup is required.
490
491 A lookup is required if the universe domain is supported.
492
493 Returns:
494 bool: True if a Regional Access Boundary lookup is required, False otherwise.
495 """
496 # Skip for non-default universe domains.
497 if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN:
498 return False
499
500 return True
501
502 def apply(self, headers, token=None):
503 """Apply the token to the authentication header."""
504 super().apply(headers, token)
505 self._rab_manager.apply_headers(headers)
506
507 def _after_refresh(self, request, method, url, headers):
508 """Triggers the Regional Access Boundary lookup if necessary."""
509 self._maybe_start_regional_access_boundary_refresh(request, url)
510
511 def refresh(self, request):
512 """Refreshes the access token.
513
514 This method calls the subclass's token refresh logic. The Regional
515 Access Boundary is refreshed separately in a non-blocking way.
516 """
517 self._perform_refresh_token(request)
518
519 def _lookup_regional_access_boundary(
520 self,
521 request: "google.auth.transport.Request", # noqa: F821
522 fail_fast: bool = False,
523 ) -> "Optional[Dict[str, str]]":
524 """Calls the Regional Access Boundary lookup API to retrieve the Regional Access Boundary information.
525
526 Args:
527 request (google.auth.transport.Request): The object used to make
528 HTTP requests.
529 fail_fast (bool): Whether the lookup should fail fast (short timeout, no retries).
530
531 Returns:
532 Optional[Dict[str, str]]: The Regional Access Boundary information returned by the lookup API, or None if the lookup failed.
533 """
534 from google.oauth2 import _client
535
536 url = self._build_regional_access_boundary_lookup_url(request=request)
537 if not url:
538 _LOGGER.warning("Failed to build Regional Access Boundary lookup URL.")
539 return None
540
541 headers: Dict[str, str] = {}
542 self._apply(headers)
543 return _client._lookup_regional_access_boundary(
544 request, url, headers=headers, fail_fast=fail_fast
545 )
546
547 @abc.abstractmethod
548 def _build_regional_access_boundary_lookup_url(
549 self, request: "Optional[google.auth.transport.Request]" = None # noqa: F821
550 ):
551 """
552 Builds and returns the URL for the Regional Access Boundary lookup API.
553
554 This method should be implemented by subclasses to provide the
555 specific URL based on the credential type and its properties.
556
557 Args:
558 request (Optional[google.auth.transport.Request]): The object used
559 to make HTTP requests. In some subclasses, this may be used to
560 make an initial network call to resolve required metadata for the
561 URL.
562
563 Returns:
564 str: The URL for the Regional Access Boundary lookup endpoint, or None
565 if lookup should be skipped (e.g., for non-applicable universe domains).
566 """
567 raise NotImplementedError(
568 "_build_regional_access_boundary_lookup_url must be implemented"
569 )
570
571
572class AnonymousCredentials(Credentials):
573 """Credentials that do not provide any authentication information.
574
575 These are useful in the case of services that support anonymous access or
576 local service emulators that do not use credentials.
577 """
578
579 @property
580 def expired(self):
581 """Returns `False`, anonymous credentials never expire."""
582 return False
583
584 @property
585 def valid(self):
586 """Returns `True`, anonymous credentials are always valid."""
587 return True
588
589 def refresh(self, request):
590 """Raises :class:``InvalidOperation``, anonymous credentials cannot be
591 refreshed."""
592 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.")
593
594 def apply(self, headers, token=None):
595 """Anonymous credentials do nothing to the request.
596
597 The optional ``token`` argument is not supported.
598
599 Raises:
600 google.auth.exceptions.InvalidValue: If a token was specified.
601 """
602 if token is not None:
603 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.")
604
605 def before_request(self, request, method, url, headers):
606 """Anonymous credentials do nothing to the request."""
607
608
609class ReadOnlyScoped(metaclass=abc.ABCMeta):
610 """Interface for credentials whose scopes can be queried.
611
612 OAuth 2.0-based credentials allow limiting access using scopes as described
613 in `RFC6749 Section 3.3`_.
614 If a credential class implements this interface then the credentials either
615 use scopes in their implementation.
616
617 Some credentials require scopes in order to obtain a token. You can check
618 if scoping is necessary with :attr:`requires_scopes`::
619
620 if credentials.requires_scopes:
621 # Scoping is required.
622 credentials = credentials.with_scopes(scopes=['one', 'two'])
623
624 Credentials that require scopes must either be constructed with scopes::
625
626 credentials = SomeScopedCredentials(scopes=['one', 'two'])
627
628 Or must copy an existing instance using :meth:`with_scopes`::
629
630 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
631
632 Some credentials have scopes but do not allow or require scopes to be set,
633 these credentials can be used as-is.
634
635 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
636 """
637
638 def __init__(self):
639 super(ReadOnlyScoped, self).__init__()
640 self._scopes = None
641 self._default_scopes = None
642
643 @property
644 def scopes(self):
645 """Sequence[str]: the credentials' current set of scopes."""
646 return self._scopes
647
648 @property
649 def default_scopes(self):
650 """Sequence[str]: the credentials' current set of default scopes."""
651 return self._default_scopes
652
653 @abc.abstractproperty
654 def requires_scopes(self):
655 """True if these credentials require scopes to obtain an access token."""
656 return False
657
658 def has_scopes(self, scopes):
659 """Checks if the credentials have the given scopes.
660
661 .. warning: This method is not guaranteed to be accurate if the
662 credentials are :attr:`~Credentials.invalid`.
663
664 Args:
665 scopes (Sequence[str]): The list of scopes to check.
666
667 Returns:
668 bool: True if the credentials have the given scopes.
669 """
670 credential_scopes = (
671 self._scopes if self._scopes is not None else self._default_scopes
672 )
673 return set(scopes).issubset(set(credential_scopes or []))
674
675
676class Scoped(ReadOnlyScoped):
677 """Interface for credentials whose scopes can be replaced while copying.
678
679 OAuth 2.0-based credentials allow limiting access using scopes as described
680 in `RFC6749 Section 3.3`_.
681 If a credential class implements this interface then the credentials either
682 use scopes in their implementation.
683
684 Some credentials require scopes in order to obtain a token. You can check
685 if scoping is necessary with :attr:`requires_scopes`::
686
687 if credentials.requires_scopes:
688 # Scoping is required.
689 credentials = credentials.create_scoped(['one', 'two'])
690
691 Credentials that require scopes must either be constructed with scopes::
692
693 credentials = SomeScopedCredentials(scopes=['one', 'two'])
694
695 Or must copy an existing instance using :meth:`with_scopes`::
696
697 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
698
699 Some credentials have scopes but do not allow or require scopes to be set,
700 these credentials can be used as-is.
701
702 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
703 """
704
705 @abc.abstractmethod
706 def with_scopes(self, scopes, default_scopes=None):
707 """Create a copy of these credentials with the specified scopes.
708
709 Args:
710 scopes (Sequence[str]): The list of scopes to attach to the
711 current credentials.
712
713 Raises:
714 NotImplementedError: If the credentials' scopes can not be changed.
715 This can be avoided by checking :attr:`requires_scopes` before
716 calling this method.
717 """
718 raise NotImplementedError("This class does not require scoping.")
719
720
721def with_scopes_if_required(credentials, scopes, default_scopes=None):
722 """Creates a copy of the credentials with scopes if scoping is required.
723
724 This helper function is useful when you do not know (or care to know) the
725 specific type of credentials you are using (such as when you use
726 :func:`google.auth.default`). This function will call
727 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if
728 the credentials require scoping. Otherwise, it will return the credentials
729 as-is.
730
731 Args:
732 credentials (google.auth.credentials.Credentials): The credentials to
733 scope if necessary.
734 scopes (Sequence[str]): The list of scopes to use.
735 default_scopes (Sequence[str]): Default scopes passed by a
736 Google client library. Use 'scopes' for user-defined scopes.
737
738 Returns:
739 google.auth.credentials.Credentials: Either a new set of scoped
740 credentials, or the passed in credentials instance if no scoping
741 was required.
742 """
743 if isinstance(credentials, Scoped) and credentials.requires_scopes:
744 return credentials.with_scopes(scopes, default_scopes=default_scopes)
745 else:
746 return credentials
747
748
749class Signing(metaclass=abc.ABCMeta):
750 """Interface for credentials that can cryptographically sign messages."""
751
752 @abc.abstractmethod
753 def sign_bytes(self, message):
754 """Signs the given message.
755
756 Args:
757 message (bytes): The message to sign.
758
759 Returns:
760 bytes: The message's cryptographic signature.
761 """
762 # pylint: disable=missing-raises-doc,redundant-returns-doc
763 # (pylint doesn't recognize that this is abstract)
764 raise NotImplementedError("Sign bytes must be implemented.")
765
766 @abc.abstractproperty
767 def signer_email(self):
768 """Optional[str]: An email address that identifies the signer."""
769 # pylint: disable=missing-raises-doc
770 # (pylint doesn't recognize that this is abstract)
771 raise NotImplementedError("Signer email must be implemented.")
772
773 @abc.abstractproperty
774 def signer(self):
775 """google.auth.crypt.Signer: The signer used to sign bytes."""
776 # pylint: disable=missing-raises-doc
777 # (pylint doesn't recognize that this is abstract)
778 raise NotImplementedError("Signer must be implemented.")
779
780
781class TokenState(Enum):
782 """
783 Tracks the state of a token.
784 FRESH: The token is valid. It is not expired or close to expired, or the token has no expiry.
785 STALE: The token is close to expired, and should be refreshed. The token can be used normally.
786 INVALID: The token is expired or invalid. The token cannot be used for a normal operation.
787 """
788
789 FRESH = 1
790 STALE = 2
791 INVALID = 3
792
793
794class CredentialsWithTrustBoundary(CredentialsWithRegionalAccessBoundary):
795 """Abstract base for credentials supporting legacy trust boundary configuration.
796
797 .. deprecated::
798 Use :class:`~google.auth.credentials.CredentialsWithRegionalAccessBoundary` instead.
799 """
800
801 def __init__(self):
802 super().__init__()
803 warnings.warn(
804 "CredentialsWithTrustBoundary is deprecated. Use CredentialsWithRegionalAccessBoundary.",
805 DeprecationWarning,
806 stacklevel=2,
807 )
808
809 @abc.abstractmethod
810 def _build_trust_boundary_lookup_url(self):
811 """Deprecated: Implement _build_regional_access_boundary_lookup_url instead."""
812 raise NotImplementedError()
813
814 def _build_regional_access_boundary_lookup_url(self, request=None):
815 warnings.warn(
816 "CredentialsWithTrustBoundary is deprecated. Use CredentialsWithRegionalAccessBoundary.",
817 DeprecationWarning,
818 stacklevel=2,
819 )
820 return self._build_trust_boundary_lookup_url()