1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16"""Interfaces for credentials."""
17
18import abc
19from enum import Enum
20import logging
21import os
22from typing import Dict, List, Optional, TYPE_CHECKING
23from urllib.parse import urlparse
24import warnings
25
26
27from google.auth import _helpers, environment_vars
28from google.auth import _regional_access_boundary_utils
29from google.auth import exceptions
30from google.auth import metrics
31from google.auth._credentials_base import _BaseCredentials
32from google.auth._refresh_worker import RefreshThreadManager
33
34if TYPE_CHECKING: # pragma: NO COVER
35 import google.auth.transport
36
37DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
38
39# These constants are deprecated and no longer used.
40# They are kept solely for backward compatibility with older implementations.
41NO_OP_TRUST_BOUNDARY_LOCATIONS: List[str] = []
42NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0"
43
44_LOGGER = logging.getLogger("google.auth._default")
45
46
47class Credentials(_BaseCredentials):
48 """Base class for all credentials.
49
50 All credentials have a :attr:`token` that is used for authentication and
51 may also optionally set an :attr:`expiry` to indicate when the token will
52 no longer be valid.
53
54 Most credentials will be :attr:`invalid` until :meth:`refresh` is called.
55 Credentials can do this automatically before the first HTTP request in
56 :meth:`before_request`.
57
58 Although the token and expiration will change as the credentials are
59 :meth:`refreshed <refresh>` and used, credentials should be considered
60 immutable. Various credentials will accept configuration such as private
61 keys, scopes, and other options. These options are not changeable after
62 construction. Some classes will provide mechanisms to copy the credentials
63 with modifications such as :meth:`ScopedCredentials.with_scopes`.
64 """
65
66 def __init__(self):
67 super(Credentials, self).__init__()
68
69 self.expiry = None
70 """Optional[datetime]: When the token expires and is no longer valid.
71 If this is None, the token is assumed to never expire."""
72 self._quota_project_id = None
73 """Optional[str]: Project to use for quota and billing purposes."""
74 self._trust_boundary = None
75 """Optional[dict]: Cache of a trust boundary response which has a list
76 of allowed regions and an encoded string representation of credentials
77 trust boundary."""
78 self._universe_domain = DEFAULT_UNIVERSE_DOMAIN
79 """Optional[str]: The universe domain value, default is googleapis.com
80 """
81
82 self._use_non_blocking_refresh = False
83 self._refresh_worker = RefreshThreadManager()
84
85 @property
86 def expired(self):
87 """Checks if the credentials are expired.
88
89 Note that credentials can be invalid but not expired because
90 Credentials with :attr:`expiry` set to None is considered to never
91 expire.
92
93 .. deprecated:: v2.24.0
94 Prefer checking :attr:`token_state` instead.
95 """
96 if not self.expiry:
97 return False
98 # Remove some threshold from expiry to err on the side of reporting
99 # expiration early so that we avoid the 401-refresh-retry loop.
100 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD
101 return _helpers.utcnow() >= skewed_expiry
102
103 @property
104 def valid(self):
105 """Checks the validity of the credentials.
106
107 This is True if the credentials have a :attr:`token` and the token
108 is not :attr:`expired`.
109
110 .. deprecated:: v2.24.0
111 Prefer checking :attr:`token_state` instead.
112 """
113 return self.token is not None and not self.expired
114
115 @property
116 def token_state(self):
117 """
118 See `:obj:`TokenState`
119 """
120 if self.token is None:
121 return TokenState.INVALID
122
123 # Credentials that can't expire are always treated as fresh.
124 if self.expiry is None:
125 return TokenState.FRESH
126
127 expired = _helpers.utcnow() >= self.expiry
128 if expired:
129 return TokenState.INVALID
130
131 is_stale = _helpers.utcnow() >= (self.expiry - _helpers.REFRESH_THRESHOLD)
132 if is_stale:
133 return TokenState.STALE
134
135 return TokenState.FRESH
136
137 @property
138 def quota_project_id(self):
139 """Project to use for quota and billing purposes."""
140 return self._quota_project_id
141
142 @property
143 def universe_domain(self):
144 """The universe domain value."""
145 return self._universe_domain
146
147 def get_cred_info(self):
148 """The credential information JSON.
149
150 The credential information will be added to auth related error messages
151 by client library.
152
153 Returns:
154 Mapping[str, str]: The credential information JSON.
155 """
156 return None
157
158 @abc.abstractmethod
159 def refresh(self, request):
160 """Refreshes the access token.
161
162 Args:
163 request (google.auth.transport.Request): The object used to make
164 HTTP requests.
165
166 Raises:
167 google.auth.exceptions.RefreshError: If the credentials could
168 not be refreshed.
169 """
170 # pylint: disable=missing-raises-doc
171 # (pylint doesn't recognize that this is abstract)
172 raise NotImplementedError("Refresh must be implemented")
173
174 def _metric_header_for_usage(self):
175 """The x-goog-api-client header for token usage metric.
176
177 This header will be added to the API service requests in before_request
178 method. For example, "cred-type/sa-jwt" means service account self
179 signed jwt access token is used in the API service request
180 authorization header. Children credentials classes need to override
181 this method to provide the header value, if the token usage metric is
182 needed.
183
184 Returns:
185 str: The x-goog-api-client header value.
186 """
187 return None
188
189 def apply(self, headers, token=None):
190 """Apply the token to the authentication header.
191
192 Args:
193 headers (Mapping): The HTTP request headers.
194 token (Optional[str]): If specified, overrides the current access
195 token.
196 """
197 self._apply(headers, token)
198 if self.quota_project_id:
199 headers["x-goog-user-project"] = self.quota_project_id
200
201 def _blocking_refresh(self, request):
202 if not self.valid:
203 self.refresh(request)
204
205 def _non_blocking_refresh(self, request):
206 use_blocking_refresh_fallback = False
207
208 if self.token_state == TokenState.STALE:
209 use_blocking_refresh_fallback = not self._refresh_worker.start_refresh(
210 self, request
211 )
212
213 if self.token_state == TokenState.INVALID or use_blocking_refresh_fallback:
214 self.refresh(request)
215 # If the blocking refresh succeeds then we can clear the error info
216 # on the background refresh worker, and perform refreshes in a
217 # background thread.
218 self._refresh_worker.clear_error()
219
220 def before_request(self, request, method, url, headers):
221 """Performs credential-specific before request logic.
222
223 Refreshes the credentials if necessary, then calls :meth:`apply` to
224 apply the token to the authentication header.
225
226 Args:
227 request (google.auth.transport.Request): The object used to make
228 HTTP requests.
229 method (str): The request's HTTP method or the RPC method being
230 invoked.
231 url (str): The request's URI or the RPC service's URI.
232 headers (Mapping): The request's headers.
233 """
234 # pylint: disable=unused-argument
235 # (Subclasses may use these arguments to ascertain information about
236 # the http request.)
237 if self._use_non_blocking_refresh:
238 self._non_blocking_refresh(request)
239 else:
240 self._blocking_refresh(request)
241
242 metrics.add_metric_header(headers, self._metric_header_for_usage())
243 self.apply(headers)
244
245 def with_non_blocking_refresh(self):
246 self._use_non_blocking_refresh = True
247
248
249class CredentialsWithQuotaProject(Credentials):
250 """Abstract base for credentials supporting ``with_quota_project`` factory"""
251
252 def with_quota_project(self, quota_project_id):
253 """Returns a copy of these credentials with a modified quota project.
254
255 Args:
256 quota_project_id (str): The project to use for quota and
257 billing purposes
258
259 Returns:
260 google.auth.credentials.Credentials: A new credentials instance.
261 """
262 raise NotImplementedError("This credential does not support quota project.")
263
264 def with_quota_project_from_environment(self):
265 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT)
266 if quota_from_env:
267 return self.with_quota_project(quota_from_env)
268 return self
269
270
271class CredentialsWithTokenUri(Credentials):
272 """Abstract base for credentials supporting ``with_token_uri`` factory"""
273
274 def with_token_uri(self, token_uri):
275 """Returns a copy of these credentials with a modified token uri.
276
277 Args:
278 token_uri (str): The uri to use for fetching/exchanging tokens
279
280 Returns:
281 google.auth.credentials.Credentials: A new credentials instance.
282 """
283 raise NotImplementedError("This credential does not use token uri.")
284
285
286class CredentialsWithUniverseDomain(Credentials):
287 """Abstract base for credentials supporting ``with_universe_domain`` factory"""
288
289 def with_universe_domain(self, universe_domain):
290 """Returns a copy of these credentials with a modified universe domain.
291
292 Args:
293 universe_domain (str): The universe domain to use
294
295 Returns:
296 google.auth.credentials.Credentials: A new credentials instance.
297 """
298 raise NotImplementedError(
299 "This credential does not support with_universe_domain."
300 )
301
302
303class CredentialsWithRegionalAccessBoundary(Credentials):
304 """Abstract base for credentials supporting regional access boundary configuration."""
305
306 def __init__(self):
307 super().__init__()
308 self._rab_manager = (
309 _regional_access_boundary_utils._RegionalAccessBoundaryManager()
310 )
311
312 @property
313 def regional_access_boundary(self):
314 """Optional[str]: The encoded Regional Access Boundary locations."""
315 return self._rab_manager._data.encoded_locations
316
317 @property
318 def regional_access_boundary_expiry(self):
319 """Optional[datetime.datetime]: The expiration time of the Regional Access Boundary."""
320 return self._rab_manager._data.expiry
321
322 @abc.abstractmethod
323 def _perform_refresh_token(self, request):
324 """Refreshes the access token.
325
326 Args:
327 request (google.auth.transport.Request): The object used to make
328 HTTP requests.
329
330 Raises:
331 google.auth.exceptions.RefreshError: If the credentials could
332 not be refreshed.
333 """
334 raise NotImplementedError("_perform_refresh_token must be implemented")
335
336 def with_trust_boundary(self, trust_boundary):
337 """Returns a copy of these credentials.
338
339 .. deprecated::
340 Manual Regional Access Boundary overrides are not supported.
341 This method is maintained for backwards compatibility and
342 returns a copy of the credentials without modifying the
343 Regional Access Boundary state.
344
345 Args:
346 trust_boundary (Mapping[str, str]): Ignored.
347
348 Returns:
349 google.auth.credentials.Credentials: A new credentials instance.
350 """
351 import warnings
352
353 warnings.warn(
354 "with_trust_boundary is deprecated and has no effect.",
355 DeprecationWarning,
356 stacklevel=2,
357 )
358 make_copy = getattr(self, "_make_copy", None)
359 if make_copy:
360 return make_copy()
361 else:
362 raise NotImplementedError(
363 "This credential does not support trust boundaries."
364 )
365
366 def _copy_regional_access_boundary_manager(self, target):
367 """Copies the regional access boundary manager to another instance."""
368 # Create a new manager for the clone to isolate background refresh locks and threads,
369 # but share the immutable data reference to avoid unnecessary initial lookups.
370 new_manager = _regional_access_boundary_utils._RegionalAccessBoundaryManager()
371 new_manager._data = self._rab_manager._data
372 target._rab_manager = new_manager
373
374 def _set_regional_access_boundary(self, seed):
375 """Applies the regional_access_boundary provided via the seed on these
376 credentials. This is intended for internal use only as invalid
377 seeds would produce unexpected results until automatic recovery is supported.
378 Currently this is used by the gcloud CLI and therefore changes to the
379 contract MUST be backwards compatible (e.g. the method signature must be
380 unchanged and the credentials with the RAB set must be returned).
381
382
383 Returns:
384 google.auth.credentials.Credentials: The credentials instance.
385 """
386 self._rab_manager.set_initial_regional_access_boundary(
387 encoded_locations=seed.get("encodedLocations", None),
388 expiry=seed.get("expiry", None),
389 )
390 return self
391
392 def _set_blocking_regional_access_boundary_lookup(self):
393 """Enables the blocking lookup mode on these credentials.
394 This is intended for internal use only as blocking lookup requires additional
395 care and consideration. Currently this is used by the gcloud CLI and
396 therefore changes to the contract MUST be backwards compatible (e.g. the
397 method signature must be unchanged and the credentials with the
398 blocking lookup flag set to true must be returned).
399
400 Returns:
401 google.auth.credentials.Credentials: The credentials instance.
402 """
403 self._rab_manager.enable_blocking_lookup()
404 return self
405
406 def _maybe_start_regional_access_boundary_refresh(self, request, url):
407 """
408 Starts a background thread to refresh the Regional Access Boundary if needed.
409
410 This method checks if a refresh is necessary and if one is not already
411 in progress or in a cooldown period. If so, it starts a background
412 thread to perform the lookup.
413
414 Args:
415 request (google.auth.transport.Request): The object used to make
416 HTTP requests.
417 url (str): The URL of the request.
418 """
419 try:
420 # Do not perform a lookup if the request is for a regional endpoint.
421 hostname = urlparse(url).hostname
422 if hostname and (
423 hostname.endswith(".rep.googleapis.com")
424 or hostname.endswith(".rep.sandbox.googleapis.com")
425 ):
426 return
427 except (ValueError, TypeError):
428 # If the URL is malformed, proceed with the default lookup behavior.
429 pass
430
431 # A refresh is only needed if the feature is enabled.
432 if not self._is_regional_access_boundary_lookup_required():
433 return
434
435 # Start the background refresh if needed.
436 self._rab_manager.maybe_start_refresh(self, request)
437
438 def _is_regional_access_boundary_lookup_required(self):
439 """Checks if a Regional Access Boundary lookup is required.
440
441 A lookup is required if the feature is enabled via an environment
442 variable and the universe domain is supported.
443
444 Returns:
445 bool: True if a Regional Access Boundary lookup is required, False otherwise.
446 """
447 # 1. Check if the feature is enabled.
448 if not _regional_access_boundary_utils.is_regional_access_boundary_enabled():
449 return False
450
451 # 2. Skip for non-default universe domains.
452 if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN:
453 return False
454
455 return True
456
457 def apply(self, headers, token=None):
458 """Apply the token to the authentication header."""
459 super().apply(headers, token)
460 self._rab_manager.apply_headers(headers)
461
462 def before_request(self, request, method, url, headers):
463 """Refreshes the access token and triggers the Regional Access Boundary
464 lookup if necessary.
465 """
466 if self._use_non_blocking_refresh:
467 self._non_blocking_refresh(request)
468 else:
469 self._blocking_refresh(request)
470
471 self._maybe_start_regional_access_boundary_refresh(request, url)
472
473 metrics.add_metric_header(headers, self._metric_header_for_usage())
474 self.apply(headers)
475
476 def refresh(self, request):
477 """Refreshes the access token.
478
479 This method calls the subclass's token refresh logic. The Regional
480 Access Boundary is refreshed separately in a non-blocking way.
481 """
482 self._perform_refresh_token(request)
483
484 def _lookup_regional_access_boundary(
485 self,
486 request: "google.auth.transport.Request", # noqa: F821
487 fail_fast: bool = False,
488 ) -> "Optional[Dict[str, str]]":
489 """Calls the Regional Access Boundary lookup API to retrieve the Regional Access Boundary information.
490
491 Args:
492 request (google.auth.transport.Request): The object used to make
493 HTTP requests.
494 fail_fast (bool): Whether the lookup should fail fast (short timeout, no retries).
495
496 Returns:
497 Optional[Dict[str, str]]: The Regional Access Boundary information returned by the lookup API, or None if the lookup failed.
498 """
499 from google.oauth2 import _client
500
501 url = self._build_regional_access_boundary_lookup_url(request=request)
502 if not url:
503 _LOGGER.error("Failed to build Regional Access Boundary lookup URL.")
504 return None
505
506 headers: Dict[str, str] = {}
507 self._apply(headers)
508 self._rab_manager.apply_headers(headers)
509 return _client._lookup_regional_access_boundary(
510 request, url, headers=headers, fail_fast=fail_fast
511 )
512
513 @abc.abstractmethod
514 def _build_regional_access_boundary_lookup_url(
515 self, request: "Optional[google.auth.transport.Request]" = None # noqa: F821
516 ):
517 """
518 Builds and returns the URL for the Regional Access Boundary lookup API.
519
520 This method should be implemented by subclasses to provide the
521 specific URL based on the credential type and its properties.
522
523 Args:
524 request (Optional[google.auth.transport.Request]): The object used
525 to make HTTP requests. In some subclasses, this may be used to
526 make an initial network call to resolve required metadata for the
527 URL.
528
529 Returns:
530 str: The URL for the Regional Access Boundary lookup endpoint, or None
531 if lookup should be skipped (e.g., for non-applicable universe domains).
532 """
533 raise NotImplementedError(
534 "_build_regional_access_boundary_lookup_url must be implemented"
535 )
536
537
538class AnonymousCredentials(Credentials):
539 """Credentials that do not provide any authentication information.
540
541 These are useful in the case of services that support anonymous access or
542 local service emulators that do not use credentials.
543 """
544
545 @property
546 def expired(self):
547 """Returns `False`, anonymous credentials never expire."""
548 return False
549
550 @property
551 def valid(self):
552 """Returns `True`, anonymous credentials are always valid."""
553 return True
554
555 def refresh(self, request):
556 """Raises :class:``InvalidOperation``, anonymous credentials cannot be
557 refreshed."""
558 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.")
559
560 def apply(self, headers, token=None):
561 """Anonymous credentials do nothing to the request.
562
563 The optional ``token`` argument is not supported.
564
565 Raises:
566 google.auth.exceptions.InvalidValue: If a token was specified.
567 """
568 if token is not None:
569 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.")
570
571 def before_request(self, request, method, url, headers):
572 """Anonymous credentials do nothing to the request."""
573
574
575class ReadOnlyScoped(metaclass=abc.ABCMeta):
576 """Interface for credentials whose scopes can be queried.
577
578 OAuth 2.0-based credentials allow limiting access using scopes as described
579 in `RFC6749 Section 3.3`_.
580 If a credential class implements this interface then the credentials either
581 use scopes in their implementation.
582
583 Some credentials require scopes in order to obtain a token. You can check
584 if scoping is necessary with :attr:`requires_scopes`::
585
586 if credentials.requires_scopes:
587 # Scoping is required.
588 credentials = credentials.with_scopes(scopes=['one', 'two'])
589
590 Credentials that require scopes must either be constructed with scopes::
591
592 credentials = SomeScopedCredentials(scopes=['one', 'two'])
593
594 Or must copy an existing instance using :meth:`with_scopes`::
595
596 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
597
598 Some credentials have scopes but do not allow or require scopes to be set,
599 these credentials can be used as-is.
600
601 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
602 """
603
604 def __init__(self):
605 super(ReadOnlyScoped, self).__init__()
606 self._scopes = None
607 self._default_scopes = None
608
609 @property
610 def scopes(self):
611 """Sequence[str]: the credentials' current set of scopes."""
612 return self._scopes
613
614 @property
615 def default_scopes(self):
616 """Sequence[str]: the credentials' current set of default scopes."""
617 return self._default_scopes
618
619 @abc.abstractproperty
620 def requires_scopes(self):
621 """True if these credentials require scopes to obtain an access token."""
622 return False
623
624 def has_scopes(self, scopes):
625 """Checks if the credentials have the given scopes.
626
627 .. warning: This method is not guaranteed to be accurate if the
628 credentials are :attr:`~Credentials.invalid`.
629
630 Args:
631 scopes (Sequence[str]): The list of scopes to check.
632
633 Returns:
634 bool: True if the credentials have the given scopes.
635 """
636 credential_scopes = (
637 self._scopes if self._scopes is not None else self._default_scopes
638 )
639 return set(scopes).issubset(set(credential_scopes or []))
640
641
642class Scoped(ReadOnlyScoped):
643 """Interface for credentials whose scopes can be replaced while copying.
644
645 OAuth 2.0-based credentials allow limiting access using scopes as described
646 in `RFC6749 Section 3.3`_.
647 If a credential class implements this interface then the credentials either
648 use scopes in their implementation.
649
650 Some credentials require scopes in order to obtain a token. You can check
651 if scoping is necessary with :attr:`requires_scopes`::
652
653 if credentials.requires_scopes:
654 # Scoping is required.
655 credentials = credentials.create_scoped(['one', 'two'])
656
657 Credentials that require scopes must either be constructed with scopes::
658
659 credentials = SomeScopedCredentials(scopes=['one', 'two'])
660
661 Or must copy an existing instance using :meth:`with_scopes`::
662
663 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
664
665 Some credentials have scopes but do not allow or require scopes to be set,
666 these credentials can be used as-is.
667
668 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
669 """
670
671 @abc.abstractmethod
672 def with_scopes(self, scopes, default_scopes=None):
673 """Create a copy of these credentials with the specified scopes.
674
675 Args:
676 scopes (Sequence[str]): The list of scopes to attach to the
677 current credentials.
678
679 Raises:
680 NotImplementedError: If the credentials' scopes can not be changed.
681 This can be avoided by checking :attr:`requires_scopes` before
682 calling this method.
683 """
684 raise NotImplementedError("This class does not require scoping.")
685
686
687def with_scopes_if_required(credentials, scopes, default_scopes=None):
688 """Creates a copy of the credentials with scopes if scoping is required.
689
690 This helper function is useful when you do not know (or care to know) the
691 specific type of credentials you are using (such as when you use
692 :func:`google.auth.default`). This function will call
693 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if
694 the credentials require scoping. Otherwise, it will return the credentials
695 as-is.
696
697 Args:
698 credentials (google.auth.credentials.Credentials): The credentials to
699 scope if necessary.
700 scopes (Sequence[str]): The list of scopes to use.
701 default_scopes (Sequence[str]): Default scopes passed by a
702 Google client library. Use 'scopes' for user-defined scopes.
703
704 Returns:
705 google.auth.credentials.Credentials: Either a new set of scoped
706 credentials, or the passed in credentials instance if no scoping
707 was required.
708 """
709 if isinstance(credentials, Scoped) and credentials.requires_scopes:
710 return credentials.with_scopes(scopes, default_scopes=default_scopes)
711 else:
712 return credentials
713
714
715class Signing(metaclass=abc.ABCMeta):
716 """Interface for credentials that can cryptographically sign messages."""
717
718 @abc.abstractmethod
719 def sign_bytes(self, message):
720 """Signs the given message.
721
722 Args:
723 message (bytes): The message to sign.
724
725 Returns:
726 bytes: The message's cryptographic signature.
727 """
728 # pylint: disable=missing-raises-doc,redundant-returns-doc
729 # (pylint doesn't recognize that this is abstract)
730 raise NotImplementedError("Sign bytes must be implemented.")
731
732 @abc.abstractproperty
733 def signer_email(self):
734 """Optional[str]: An email address that identifies the signer."""
735 # pylint: disable=missing-raises-doc
736 # (pylint doesn't recognize that this is abstract)
737 raise NotImplementedError("Signer email must be implemented.")
738
739 @abc.abstractproperty
740 def signer(self):
741 """google.auth.crypt.Signer: The signer used to sign bytes."""
742 # pylint: disable=missing-raises-doc
743 # (pylint doesn't recognize that this is abstract)
744 raise NotImplementedError("Signer must be implemented.")
745
746
747class TokenState(Enum):
748 """
749 Tracks the state of a token.
750 FRESH: The token is valid. It is not expired or close to expired, or the token has no expiry.
751 STALE: The token is close to expired, and should be refreshed. The token can be used normally.
752 INVALID: The token is expired or invalid. The token cannot be used for a normal operation.
753 """
754
755 FRESH = 1
756 STALE = 2
757 INVALID = 3
758
759
760class CredentialsWithTrustBoundary(CredentialsWithRegionalAccessBoundary):
761 """Abstract base for credentials supporting legacy trust boundary configuration.
762
763 .. deprecated::
764 Use :class:`~google.auth.credentials.CredentialsWithRegionalAccessBoundary` instead.
765 """
766
767 def __init__(self):
768 super().__init__()
769 warnings.warn(
770 "CredentialsWithTrustBoundary is deprecated. Use CredentialsWithRegionalAccessBoundary.",
771 DeprecationWarning,
772 stacklevel=2,
773 )
774
775 @abc.abstractmethod
776 def _build_trust_boundary_lookup_url(self):
777 """Deprecated: Implement _build_regional_access_boundary_lookup_url instead."""
778 raise NotImplementedError()
779
780 def _build_regional_access_boundary_lookup_url(self, request=None):
781 warnings.warn(
782 "CredentialsWithTrustBoundary is deprecated. Use CredentialsWithRegionalAccessBoundary.",
783 DeprecationWarning,
784 stacklevel=2,
785 )
786 return self._build_trust_boundary_lookup_url()