1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16"""Interfaces for credentials."""
17
18import abc
19from enum import Enum
20import logging
21import os
22from typing import Dict, List, Optional, TYPE_CHECKING
23from urllib.parse import urlparse
24import warnings
25
26
27from google.auth import _helpers, environment_vars
28from google.auth import _regional_access_boundary_utils
29from google.auth import exceptions
30from google.auth import metrics
31from google.auth._credentials_base import _BaseCredentials
32from google.auth._refresh_worker import RefreshThreadManager
33
34if TYPE_CHECKING: # pragma: NO COVER
35 import google.auth.transport
36
37DEFAULT_UNIVERSE_DOMAIN = _helpers.DEFAULT_UNIVERSE_DOMAIN
38
39# These constants are deprecated and no longer used.
40# They are kept solely for backward compatibility with older implementations.
41NO_OP_TRUST_BOUNDARY_LOCATIONS: List[str] = []
42NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0"
43
44_LOGGER = logging.getLogger("google.auth._default")
45
46
47class Credentials(_BaseCredentials):
48 """Base class for all credentials.
49
50 All credentials have a :attr:`token` that is used for authentication and
51 may also optionally set an :attr:`expiry` to indicate when the token will
52 no longer be valid.
53
54 Most credentials will be :attr:`invalid` until :meth:`refresh` is called.
55 Credentials can do this automatically before the first HTTP request in
56 :meth:`before_request`.
57
58 Although the token and expiration will change as the credentials are
59 :meth:`refreshed <refresh>` and used, credentials should be considered
60 immutable. Various credentials will accept configuration such as private
61 keys, scopes, and other options. These options are not changeable after
62 construction. Some classes will provide mechanisms to copy the credentials
63 with modifications such as :meth:`ScopedCredentials.with_scopes`.
64 """
65
66 def __init__(self):
67 super(Credentials, self).__init__()
68
69 self.expiry = None
70 """Optional[datetime]: When the token expires and is no longer valid.
71 If this is None, the token is assumed to never expire."""
72 self._quota_project_id = None
73 """Optional[str]: Project to use for quota and billing purposes."""
74 self._trust_boundary = None
75 """Optional[dict]: Cache of a trust boundary response which has a list
76 of allowed regions and an encoded string representation of credentials
77 trust boundary."""
78 self._universe_domain = DEFAULT_UNIVERSE_DOMAIN
79 """Optional[str]: The universe domain value, default is googleapis.com
80 """
81
82 self._use_non_blocking_refresh = False
83 self._refresh_worker = RefreshThreadManager()
84
85 @property
86 def expired(self):
87 """Checks if the credentials are expired.
88
89 Note that credentials can be invalid but not expired because
90 Credentials with :attr:`expiry` set to None is considered to never
91 expire.
92
93 .. deprecated:: v2.24.0
94 Prefer checking :attr:`token_state` instead.
95 """
96 if not self.expiry:
97 return False
98 # Remove some threshold from expiry to err on the side of reporting
99 # expiration early so that we avoid the 401-refresh-retry loop.
100 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD
101 return _helpers.utcnow() >= skewed_expiry
102
103 @property
104 def valid(self):
105 """Checks the validity of the credentials.
106
107 This is True if the credentials have a :attr:`token` and the token
108 is not :attr:`expired`.
109
110 .. deprecated:: v2.24.0
111 Prefer checking :attr:`token_state` instead.
112 """
113 return self.token is not None and not self.expired
114
115 @property
116 def token_state(self):
117 """
118 See `:obj:`TokenState`
119 """
120 if self.token is None:
121 return TokenState.INVALID
122
123 # Credentials that can't expire are always treated as fresh.
124 if self.expiry is None:
125 return TokenState.FRESH
126
127 expired = _helpers.utcnow() >= self.expiry
128 if expired:
129 return TokenState.INVALID
130
131 is_stale = _helpers.utcnow() >= (self.expiry - _helpers.REFRESH_THRESHOLD)
132 if is_stale:
133 return TokenState.STALE
134
135 return TokenState.FRESH
136
137 @property
138 def quota_project_id(self):
139 """Project to use for quota and billing purposes."""
140 return self._quota_project_id
141
142 @property
143 def universe_domain(self):
144 """The universe domain value."""
145 return self._universe_domain
146
147 def get_cred_info(self):
148 """The credential information JSON.
149
150 The credential information will be added to auth related error messages
151 by client library.
152
153 Returns:
154 Mapping[str, str]: The credential information JSON.
155 """
156 return None
157
158 @abc.abstractmethod
159 def refresh(self, request):
160 """Refreshes the access token.
161
162 Args:
163 request (google.auth.transport.Request): The object used to make
164 HTTP requests.
165
166 Raises:
167 google.auth.exceptions.RefreshError: If the credentials could
168 not be refreshed.
169 """
170 # pylint: disable=missing-raises-doc
171 # (pylint doesn't recognize that this is abstract)
172 raise NotImplementedError("Refresh must be implemented")
173
174 def _metric_header_for_usage(self):
175 """The x-goog-api-client header for token usage metric.
176
177 This header will be added to the API service requests in before_request
178 method. For example, "cred-type/sa-jwt" means service account self
179 signed jwt access token is used in the API service request
180 authorization header. Children credentials classes need to override
181 this method to provide the header value, if the token usage metric is
182 needed.
183
184 Returns:
185 str: The x-goog-api-client header value.
186 """
187 return None
188
189 def apply(self, headers, token=None):
190 """Apply the token to the authentication header.
191
192 Args:
193 headers (Mapping): The HTTP request headers.
194 token (Optional[str]): If specified, overrides the current access
195 token.
196 """
197 self._apply(headers, token)
198 if self.quota_project_id:
199 headers["x-goog-user-project"] = self.quota_project_id
200
201 def _blocking_refresh(self, request):
202 if not self.valid:
203 self.refresh(request)
204
205 def _non_blocking_refresh(self, request):
206 use_blocking_refresh_fallback = False
207
208 if self.token_state == TokenState.STALE:
209 use_blocking_refresh_fallback = not self._refresh_worker.start_refresh(
210 self, request
211 )
212
213 if self.token_state == TokenState.INVALID or use_blocking_refresh_fallback:
214 self.refresh(request)
215 # If the blocking refresh succeeds then we can clear the error info
216 # on the background refresh worker, and perform refreshes in a
217 # background thread.
218 self._refresh_worker.clear_error()
219
220 def before_request(self, request, method, url, headers):
221 """Performs credential-specific before request logic.
222
223 Refreshes the credentials if necessary, then calls :meth:`apply` to
224 apply the token to the authentication header.
225
226 Args:
227 request (google.auth.transport.Request): The object used to make
228 HTTP requests.
229 method (str): The request's HTTP method or the RPC method being
230 invoked.
231 url (str): The request's URI or the RPC service's URI.
232 headers (Mapping): The request's headers.
233 """
234 # pylint: disable=unused-argument
235 # (Subclasses may use these arguments to ascertain information about
236 # the http request.)
237 if self._use_non_blocking_refresh:
238 self._non_blocking_refresh(request)
239 else:
240 self._blocking_refresh(request)
241
242 self._after_refresh(request, method, url, headers)
243
244 metrics.add_metric_header(headers, self._metric_header_for_usage())
245 self.apply(headers)
246
247 def _after_refresh(self, request, method, url, headers):
248 """Hook for subclasses to perform actions after refresh but before
249 applying credentials to headers.
250
251 Args:
252 request (google.auth.transport.Request): The object used to make
253 HTTP requests.
254 method (str): The request's HTTP method or the RPC method being
255 invoked.
256 url (str): The request's URI or the RPC service's URI.
257 headers (Mapping): The request's headers.
258 """
259 pass
260
261 def with_non_blocking_refresh(self):
262 self._use_non_blocking_refresh = True
263
264
265class CredentialsWithQuotaProject(Credentials):
266 """Abstract base for credentials supporting ``with_quota_project`` factory"""
267
268 def with_quota_project(self, quota_project_id):
269 """Returns a copy of these credentials with a modified quota project.
270
271 Args:
272 quota_project_id (str): The project to use for quota and
273 billing purposes
274
275 Returns:
276 google.auth.credentials.Credentials: A new credentials instance.
277 """
278 raise NotImplementedError("This credential does not support quota project.")
279
280 def with_quota_project_from_environment(self):
281 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT)
282 if quota_from_env:
283 return self.with_quota_project(quota_from_env)
284 return self
285
286
287class CredentialsWithTokenUri(Credentials):
288 """Abstract base for credentials supporting ``with_token_uri`` factory"""
289
290 def with_token_uri(self, token_uri):
291 """Returns a copy of these credentials with a modified token uri.
292
293 Args:
294 token_uri (str): The uri to use for fetching/exchanging tokens
295
296 Returns:
297 google.auth.credentials.Credentials: A new credentials instance.
298 """
299 raise NotImplementedError("This credential does not use token uri.")
300
301
302class CredentialsWithUniverseDomain(Credentials):
303 """Abstract base for credentials supporting ``with_universe_domain`` factory"""
304
305 def with_universe_domain(self, universe_domain):
306 """Returns a copy of these credentials with a modified universe domain.
307
308 Args:
309 universe_domain (str): The universe domain to use
310
311 Returns:
312 google.auth.credentials.Credentials: A new credentials instance.
313 """
314 raise NotImplementedError(
315 "This credential does not support with_universe_domain."
316 )
317
318
319class CredentialsWithRegionalAccessBoundary(Credentials):
320 """Abstract base for credentials supporting regional access boundary configuration."""
321
322 def __init__(self):
323 super().__init__()
324 self._rab_manager = (
325 _regional_access_boundary_utils._RegionalAccessBoundaryManager()
326 )
327
328 def __setstate__(self, state):
329 """Pickle helper that restores state, safely reconstructing RAB fields if missing."""
330 self.__dict__.update(state)
331 if "_rab_manager" not in self.__dict__:
332 from google.auth import _regional_access_boundary_utils
333
334 self._rab_manager = (
335 _regional_access_boundary_utils._RegionalAccessBoundaryManager()
336 )
337 if "_use_non_blocking_refresh" not in self.__dict__:
338 self._use_non_blocking_refresh = False
339 if "_refresh_worker" not in self.__dict__:
340 from google.auth._refresh_worker import RefreshThreadManager
341
342 self._refresh_worker = RefreshThreadManager()
343
344 @property
345 def regional_access_boundary(self):
346 """Optional[str]: The encoded Regional Access Boundary locations."""
347 return self._rab_manager._data.encoded_locations
348
349 @property
350 def regional_access_boundary_expiry(self):
351 """Optional[datetime.datetime]: The expiration time of the Regional Access Boundary."""
352 return self._rab_manager._data.expiry
353
354 @abc.abstractmethod
355 def _perform_refresh_token(self, request):
356 """Refreshes the access token.
357
358 Args:
359 request (google.auth.transport.Request): The object used to make
360 HTTP requests.
361
362 Raises:
363 google.auth.exceptions.RefreshError: If the credentials could
364 not be refreshed.
365 """
366 raise NotImplementedError("_perform_refresh_token must be implemented")
367
368 def with_trust_boundary(self, trust_boundary):
369 """Returns a copy of these credentials.
370
371 .. deprecated::
372 Manual Regional Access Boundary overrides are not supported.
373 This method is maintained for backwards compatibility and
374 returns a copy of the credentials without modifying the
375 Regional Access Boundary state.
376
377 Args:
378 trust_boundary (Mapping[str, str]): Ignored.
379
380 Returns:
381 google.auth.credentials.Credentials: A new credentials instance.
382 """
383 import warnings
384
385 warnings.warn(
386 "with_trust_boundary is deprecated and has no effect.",
387 DeprecationWarning,
388 stacklevel=2,
389 )
390 make_copy = getattr(self, "_make_copy", None)
391 if make_copy:
392 return make_copy()
393 else:
394 raise NotImplementedError(
395 "This credential does not support trust boundaries."
396 )
397
398 def _copy_regional_access_boundary_manager(self, target):
399 """Copies the regional access boundary manager state to another instance."""
400 target._rab_manager._data = self._rab_manager._data
401 target._rab_manager._use_blocking_regional_access_boundary_lookup = (
402 self._rab_manager._use_blocking_regional_access_boundary_lookup
403 )
404
405 def _set_regional_access_boundary(self, initial_boundary):
406 """Applies the regional_access_boundary provided via the initial_boundary on these
407 credentials. This is intended for internal use only as an invalid
408 initial_boundary would produce unexpected results until automatic recovery
409 is supported. Currently this is used by the gcloud CLI and therefore changes to the
410 contract MUST be backwards compatible (e.g. the method signature must be
411 unchanged and the credentials with the RAB set must be returned).
412
413
414 Returns:
415 google.auth.credentials.Credentials: The credentials instance.
416 """
417 self._rab_manager.set_initial_regional_access_boundary(
418 encoded_locations=initial_boundary.get("encodedLocations", None),
419 expiry=initial_boundary.get("expiry", None),
420 )
421 return self
422
423 def _set_blocking_regional_access_boundary_lookup(self):
424 """Enables the blocking lookup mode on these credentials.
425 This is intended for internal use only as blocking lookup requires additional
426 care and consideration. Currently this is used by the gcloud CLI and
427 therefore changes to the contract MUST be backwards compatible (e.g. the
428 method signature must be unchanged and the credentials with the
429 blocking lookup flag set to true must be returned).
430
431 Returns:
432 google.auth.credentials.Credentials: The credentials instance.
433 """
434 self._rab_manager.enable_blocking_lookup()
435 return self
436
437 def _is_regional_endpoint(self, url):
438 """Checks if the request URL is for a regional endpoint.
439
440 Args:
441 url (str): The URL of the request.
442
443 Returns:
444 bool: True if the URL is a regional endpoint, False otherwise.
445 """
446 try:
447 # Do not perform a lookup if the request is for a regional endpoint.
448 hostname = urlparse(url).hostname
449 if hostname and (
450 hostname.endswith(".rep.googleapis.com")
451 or hostname.endswith(".rep.sandbox.googleapis.com")
452 ):
453 return True
454 except (ValueError, TypeError, AttributeError):
455 # If the URL is malformed, proceed with the default lookup behavior.
456 pass
457
458 return False
459
460 def _maybe_start_regional_access_boundary_refresh(self, request, url):
461 """
462 Starts a background thread to refresh the Regional Access Boundary if needed.
463
464 This method checks if a refresh is necessary and if one is not already
465 in progress or in a cooldown period. If so, it starts a background
466 thread to perform the lookup.
467
468 Args:
469 request (google.auth.transport.Request): The object used to make
470 HTTP requests.
471 url (str): The URL of the request.
472 """
473 # Do not perform a lookup if the request is for a regional endpoint.
474 if self._is_regional_endpoint(url):
475 return
476
477 # A refresh is only needed if the feature is enabled.
478 if not self._is_regional_access_boundary_lookup_required():
479 return
480
481 # Trigger background or blocking refresh if needed
482 self._rab_manager.maybe_start_refresh(self, request)
483
484 def _is_regional_access_boundary_lookup_required(self):
485 """Checks if a Regional Access Boundary lookup is required.
486
487 A lookup is required if the feature is enabled via an environment
488 variable and the universe domain is supported.
489
490 Returns:
491 bool: True if a Regional Access Boundary lookup is required, False otherwise.
492 """
493 # Check if the feature is enabled.
494 if not _regional_access_boundary_utils.is_regional_access_boundary_enabled():
495 return False
496
497 # Skip for non-default universe domains.
498 if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN:
499 return False
500
501 return True
502
503 def apply(self, headers, token=None):
504 """Apply the token to the authentication header."""
505 super().apply(headers, token)
506 self._rab_manager.apply_headers(headers)
507
508 def _after_refresh(self, request, method, url, headers):
509 """Triggers the Regional Access Boundary lookup if necessary."""
510 self._maybe_start_regional_access_boundary_refresh(request, url)
511
512 def refresh(self, request):
513 """Refreshes the access token.
514
515 This method calls the subclass's token refresh logic. The Regional
516 Access Boundary is refreshed separately in a non-blocking way.
517 """
518 self._perform_refresh_token(request)
519
520 def _lookup_regional_access_boundary(
521 self,
522 request: "google.auth.transport.Request", # noqa: F821
523 fail_fast: bool = False,
524 ) -> "Optional[Dict[str, str]]":
525 """Calls the Regional Access Boundary lookup API to retrieve the Regional Access Boundary information.
526
527 Args:
528 request (google.auth.transport.Request): The object used to make
529 HTTP requests.
530 fail_fast (bool): Whether the lookup should fail fast (short timeout, no retries).
531
532 Returns:
533 Optional[Dict[str, str]]: The Regional Access Boundary information returned by the lookup API, or None if the lookup failed.
534 """
535 from google.oauth2 import _client
536
537 url = self._build_regional_access_boundary_lookup_url(request=request)
538 if not url:
539 _LOGGER.warning("Failed to build Regional Access Boundary lookup URL.")
540 return None
541
542 headers: Dict[str, str] = {}
543 self._apply(headers)
544 return _client._lookup_regional_access_boundary(
545 request, url, headers=headers, fail_fast=fail_fast
546 )
547
548 @abc.abstractmethod
549 def _build_regional_access_boundary_lookup_url(
550 self, request: "Optional[google.auth.transport.Request]" = None # noqa: F821
551 ):
552 """
553 Builds and returns the URL for the Regional Access Boundary lookup API.
554
555 This method should be implemented by subclasses to provide the
556 specific URL based on the credential type and its properties.
557
558 Args:
559 request (Optional[google.auth.transport.Request]): The object used
560 to make HTTP requests. In some subclasses, this may be used to
561 make an initial network call to resolve required metadata for the
562 URL.
563
564 Returns:
565 str: The URL for the Regional Access Boundary lookup endpoint, or None
566 if lookup should be skipped (e.g., for non-applicable universe domains).
567 """
568 raise NotImplementedError(
569 "_build_regional_access_boundary_lookup_url must be implemented"
570 )
571
572
573class AnonymousCredentials(Credentials):
574 """Credentials that do not provide any authentication information.
575
576 These are useful in the case of services that support anonymous access or
577 local service emulators that do not use credentials.
578 """
579
580 @property
581 def expired(self):
582 """Returns `False`, anonymous credentials never expire."""
583 return False
584
585 @property
586 def valid(self):
587 """Returns `True`, anonymous credentials are always valid."""
588 return True
589
590 def refresh(self, request):
591 """Raises :class:``InvalidOperation``, anonymous credentials cannot be
592 refreshed."""
593 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.")
594
595 def apply(self, headers, token=None):
596 """Anonymous credentials do nothing to the request.
597
598 The optional ``token`` argument is not supported.
599
600 Raises:
601 google.auth.exceptions.InvalidValue: If a token was specified.
602 """
603 if token is not None:
604 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.")
605
606 def before_request(self, request, method, url, headers):
607 """Anonymous credentials do nothing to the request."""
608
609
610class ReadOnlyScoped(metaclass=abc.ABCMeta):
611 """Interface for credentials whose scopes can be queried.
612
613 OAuth 2.0-based credentials allow limiting access using scopes as described
614 in `RFC6749 Section 3.3`_.
615 If a credential class implements this interface then the credentials either
616 use scopes in their implementation.
617
618 Some credentials require scopes in order to obtain a token. You can check
619 if scoping is necessary with :attr:`requires_scopes`::
620
621 if credentials.requires_scopes:
622 # Scoping is required.
623 credentials = credentials.with_scopes(scopes=['one', 'two'])
624
625 Credentials that require scopes must either be constructed with scopes::
626
627 credentials = SomeScopedCredentials(scopes=['one', 'two'])
628
629 Or must copy an existing instance using :meth:`with_scopes`::
630
631 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
632
633 Some credentials have scopes but do not allow or require scopes to be set,
634 these credentials can be used as-is.
635
636 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
637 """
638
639 def __init__(self):
640 super(ReadOnlyScoped, self).__init__()
641 self._scopes = None
642 self._default_scopes = None
643
644 @property
645 def scopes(self):
646 """Sequence[str]: the credentials' current set of scopes."""
647 return self._scopes
648
649 @property
650 def default_scopes(self):
651 """Sequence[str]: the credentials' current set of default scopes."""
652 return self._default_scopes
653
654 @abc.abstractproperty
655 def requires_scopes(self):
656 """True if these credentials require scopes to obtain an access token."""
657 return False
658
659 def has_scopes(self, scopes):
660 """Checks if the credentials have the given scopes.
661
662 .. warning: This method is not guaranteed to be accurate if the
663 credentials are :attr:`~Credentials.invalid`.
664
665 Args:
666 scopes (Sequence[str]): The list of scopes to check.
667
668 Returns:
669 bool: True if the credentials have the given scopes.
670 """
671 credential_scopes = (
672 self._scopes if self._scopes is not None else self._default_scopes
673 )
674 return set(scopes).issubset(set(credential_scopes or []))
675
676
677class Scoped(ReadOnlyScoped):
678 """Interface for credentials whose scopes can be replaced while copying.
679
680 OAuth 2.0-based credentials allow limiting access using scopes as described
681 in `RFC6749 Section 3.3`_.
682 If a credential class implements this interface then the credentials either
683 use scopes in their implementation.
684
685 Some credentials require scopes in order to obtain a token. You can check
686 if scoping is necessary with :attr:`requires_scopes`::
687
688 if credentials.requires_scopes:
689 # Scoping is required.
690 credentials = credentials.create_scoped(['one', 'two'])
691
692 Credentials that require scopes must either be constructed with scopes::
693
694 credentials = SomeScopedCredentials(scopes=['one', 'two'])
695
696 Or must copy an existing instance using :meth:`with_scopes`::
697
698 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
699
700 Some credentials have scopes but do not allow or require scopes to be set,
701 these credentials can be used as-is.
702
703 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
704 """
705
706 @abc.abstractmethod
707 def with_scopes(self, scopes, default_scopes=None):
708 """Create a copy of these credentials with the specified scopes.
709
710 Args:
711 scopes (Sequence[str]): The list of scopes to attach to the
712 current credentials.
713
714 Raises:
715 NotImplementedError: If the credentials' scopes can not be changed.
716 This can be avoided by checking :attr:`requires_scopes` before
717 calling this method.
718 """
719 raise NotImplementedError("This class does not require scoping.")
720
721
722def with_scopes_if_required(credentials, scopes, default_scopes=None):
723 """Creates a copy of the credentials with scopes if scoping is required.
724
725 This helper function is useful when you do not know (or care to know) the
726 specific type of credentials you are using (such as when you use
727 :func:`google.auth.default`). This function will call
728 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if
729 the credentials require scoping. Otherwise, it will return the credentials
730 as-is.
731
732 Args:
733 credentials (google.auth.credentials.Credentials): The credentials to
734 scope if necessary.
735 scopes (Sequence[str]): The list of scopes to use.
736 default_scopes (Sequence[str]): Default scopes passed by a
737 Google client library. Use 'scopes' for user-defined scopes.
738
739 Returns:
740 google.auth.credentials.Credentials: Either a new set of scoped
741 credentials, or the passed in credentials instance if no scoping
742 was required.
743 """
744 if isinstance(credentials, Scoped) and credentials.requires_scopes:
745 return credentials.with_scopes(scopes, default_scopes=default_scopes)
746 else:
747 return credentials
748
749
750class Signing(metaclass=abc.ABCMeta):
751 """Interface for credentials that can cryptographically sign messages."""
752
753 @abc.abstractmethod
754 def sign_bytes(self, message):
755 """Signs the given message.
756
757 Args:
758 message (bytes): The message to sign.
759
760 Returns:
761 bytes: The message's cryptographic signature.
762 """
763 # pylint: disable=missing-raises-doc,redundant-returns-doc
764 # (pylint doesn't recognize that this is abstract)
765 raise NotImplementedError("Sign bytes must be implemented.")
766
767 @abc.abstractproperty
768 def signer_email(self):
769 """Optional[str]: An email address that identifies the signer."""
770 # pylint: disable=missing-raises-doc
771 # (pylint doesn't recognize that this is abstract)
772 raise NotImplementedError("Signer email must be implemented.")
773
774 @abc.abstractproperty
775 def signer(self):
776 """google.auth.crypt.Signer: The signer used to sign bytes."""
777 # pylint: disable=missing-raises-doc
778 # (pylint doesn't recognize that this is abstract)
779 raise NotImplementedError("Signer must be implemented.")
780
781
782class TokenState(Enum):
783 """
784 Tracks the state of a token.
785 FRESH: The token is valid. It is not expired or close to expired, or the token has no expiry.
786 STALE: The token is close to expired, and should be refreshed. The token can be used normally.
787 INVALID: The token is expired or invalid. The token cannot be used for a normal operation.
788 """
789
790 FRESH = 1
791 STALE = 2
792 INVALID = 3
793
794
795class CredentialsWithTrustBoundary(CredentialsWithRegionalAccessBoundary):
796 """Abstract base for credentials supporting legacy trust boundary configuration.
797
798 .. deprecated::
799 Use :class:`~google.auth.credentials.CredentialsWithRegionalAccessBoundary` instead.
800 """
801
802 def __init__(self):
803 super().__init__()
804 warnings.warn(
805 "CredentialsWithTrustBoundary is deprecated. Use CredentialsWithRegionalAccessBoundary.",
806 DeprecationWarning,
807 stacklevel=2,
808 )
809
810 @abc.abstractmethod
811 def _build_trust_boundary_lookup_url(self):
812 """Deprecated: Implement _build_regional_access_boundary_lookup_url instead."""
813 raise NotImplementedError()
814
815 def _build_regional_access_boundary_lookup_url(self, request=None):
816 warnings.warn(
817 "CredentialsWithTrustBoundary is deprecated. Use CredentialsWithRegionalAccessBoundary.",
818 DeprecationWarning,
819 stacklevel=2,
820 )
821 return self._build_trust_boundary_lookup_url()