1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16"""Interfaces for credentials."""
17
18import abc
19from enum import Enum
20import os
21
22from google.auth import _helpers, environment_vars
23from google.auth import exceptions
24from google.auth import metrics
25from google.auth._credentials_base import _BaseCredentials
26from google.auth._default import _LOGGER
27from google.auth._refresh_worker import RefreshThreadManager
28
29DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
30NO_OP_TRUST_BOUNDARY_LOCATIONS: list[str] = []
31NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0"
32
33
34class Credentials(_BaseCredentials):
35 """Base class for all credentials.
36
37 All credentials have a :attr:`token` that is used for authentication and
38 may also optionally set an :attr:`expiry` to indicate when the token will
39 no longer be valid.
40
41 Most credentials will be :attr:`invalid` until :meth:`refresh` is called.
42 Credentials can do this automatically before the first HTTP request in
43 :meth:`before_request`.
44
45 Although the token and expiration will change as the credentials are
46 :meth:`refreshed <refresh>` and used, credentials should be considered
47 immutable. Various credentials will accept configuration such as private
48 keys, scopes, and other options. These options are not changeable after
49 construction. Some classes will provide mechanisms to copy the credentials
50 with modifications such as :meth:`ScopedCredentials.with_scopes`.
51 """
52
53 def __init__(self):
54 super(Credentials, self).__init__()
55
56 self.expiry = None
57 """Optional[datetime]: When the token expires and is no longer valid.
58 If this is None, the token is assumed to never expire."""
59 self._quota_project_id = None
60 """Optional[str]: Project to use for quota and billing purposes."""
61 self._trust_boundary = None
62 """Optional[dict]: Cache of a trust boundary response which has a list
63 of allowed regions and an encoded string representation of credentials
64 trust boundary."""
65 self._universe_domain = DEFAULT_UNIVERSE_DOMAIN
66 """Optional[str]: The universe domain value, default is googleapis.com
67 """
68
69 self._use_non_blocking_refresh = False
70 self._refresh_worker = RefreshThreadManager()
71
72 @property
73 def expired(self):
74 """Checks if the credentials are expired.
75
76 Note that credentials can be invalid but not expired because
77 Credentials with :attr:`expiry` set to None is considered to never
78 expire.
79
80 .. deprecated:: v2.24.0
81 Prefer checking :attr:`token_state` instead.
82 """
83 if not self.expiry:
84 return False
85 # Remove some threshold from expiry to err on the side of reporting
86 # expiration early so that we avoid the 401-refresh-retry loop.
87 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD
88 return _helpers.utcnow() >= skewed_expiry
89
90 @property
91 def valid(self):
92 """Checks the validity of the credentials.
93
94 This is True if the credentials have a :attr:`token` and the token
95 is not :attr:`expired`.
96
97 .. deprecated:: v2.24.0
98 Prefer checking :attr:`token_state` instead.
99 """
100 return self.token is not None and not self.expired
101
102 @property
103 def token_state(self):
104 """
105 See `:obj:`TokenState`
106 """
107 if self.token is None:
108 return TokenState.INVALID
109
110 # Credentials that can't expire are always treated as fresh.
111 if self.expiry is None:
112 return TokenState.FRESH
113
114 expired = _helpers.utcnow() >= self.expiry
115 if expired:
116 return TokenState.INVALID
117
118 is_stale = _helpers.utcnow() >= (self.expiry - _helpers.REFRESH_THRESHOLD)
119 if is_stale:
120 return TokenState.STALE
121
122 return TokenState.FRESH
123
124 @property
125 def quota_project_id(self):
126 """Project to use for quota and billing purposes."""
127 return self._quota_project_id
128
129 @property
130 def universe_domain(self):
131 """The universe domain value."""
132 return self._universe_domain
133
134 def get_cred_info(self):
135 """The credential information JSON.
136
137 The credential information will be added to auth related error messages
138 by client library.
139
140 Returns:
141 Mapping[str, str]: The credential information JSON.
142 """
143 return None
144
145 @abc.abstractmethod
146 def refresh(self, request):
147 """Refreshes the access token.
148
149 Args:
150 request (google.auth.transport.Request): The object used to make
151 HTTP requests.
152
153 Raises:
154 google.auth.exceptions.RefreshError: If the credentials could
155 not be refreshed.
156 """
157 # pylint: disable=missing-raises-doc
158 # (pylint doesn't recognize that this is abstract)
159 raise NotImplementedError("Refresh must be implemented")
160
161 def _metric_header_for_usage(self):
162 """The x-goog-api-client header for token usage metric.
163
164 This header will be added to the API service requests in before_request
165 method. For example, "cred-type/sa-jwt" means service account self
166 signed jwt access token is used in the API service request
167 authorization header. Children credentials classes need to override
168 this method to provide the header value, if the token usage metric is
169 needed.
170
171 Returns:
172 str: The x-goog-api-client header value.
173 """
174 return None
175
176 def apply(self, headers, token=None):
177 """Apply the token to the authentication header.
178
179 Args:
180 headers (Mapping): The HTTP request headers.
181 token (Optional[str]): If specified, overrides the current access
182 token.
183 """
184 self._apply(headers, token)
185 if self.quota_project_id:
186 headers["x-goog-user-project"] = self.quota_project_id
187
188 def _blocking_refresh(self, request):
189 if not self.valid:
190 self.refresh(request)
191
192 def _non_blocking_refresh(self, request):
193 use_blocking_refresh_fallback = False
194
195 if self.token_state == TokenState.STALE:
196 use_blocking_refresh_fallback = not self._refresh_worker.start_refresh(
197 self, request
198 )
199
200 if self.token_state == TokenState.INVALID or use_blocking_refresh_fallback:
201 self.refresh(request)
202 # If the blocking refresh succeeds then we can clear the error info
203 # on the background refresh worker, and perform refreshes in a
204 # background thread.
205 self._refresh_worker.clear_error()
206
207 def before_request(self, request, method, url, headers):
208 """Performs credential-specific before request logic.
209
210 Refreshes the credentials if necessary, then calls :meth:`apply` to
211 apply the token to the authentication header.
212
213 Args:
214 request (google.auth.transport.Request): The object used to make
215 HTTP requests.
216 method (str): The request's HTTP method or the RPC method being
217 invoked.
218 url (str): The request's URI or the RPC service's URI.
219 headers (Mapping): The request's headers.
220 """
221 # pylint: disable=unused-argument
222 # (Subclasses may use these arguments to ascertain information about
223 # the http request.)
224 if self._use_non_blocking_refresh:
225 self._non_blocking_refresh(request)
226 else:
227 self._blocking_refresh(request)
228
229 metrics.add_metric_header(headers, self._metric_header_for_usage())
230 self.apply(headers)
231
232 def with_non_blocking_refresh(self):
233 self._use_non_blocking_refresh = True
234
235
236class CredentialsWithQuotaProject(Credentials):
237 """Abstract base for credentials supporting ``with_quota_project`` factory"""
238
239 def with_quota_project(self, quota_project_id):
240 """Returns a copy of these credentials with a modified quota project.
241
242 Args:
243 quota_project_id (str): The project to use for quota and
244 billing purposes
245
246 Returns:
247 google.auth.credentials.Credentials: A new credentials instance.
248 """
249 raise NotImplementedError("This credential does not support quota project.")
250
251 def with_quota_project_from_environment(self):
252 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT)
253 if quota_from_env:
254 return self.with_quota_project(quota_from_env)
255 return self
256
257
258class CredentialsWithTokenUri(Credentials):
259 """Abstract base for credentials supporting ``with_token_uri`` factory"""
260
261 def with_token_uri(self, token_uri):
262 """Returns a copy of these credentials with a modified token uri.
263
264 Args:
265 token_uri (str): The uri to use for fetching/exchanging tokens
266
267 Returns:
268 google.auth.credentials.Credentials: A new credentials instance.
269 """
270 raise NotImplementedError("This credential does not use token uri.")
271
272
273class CredentialsWithUniverseDomain(Credentials):
274 """Abstract base for credentials supporting ``with_universe_domain`` factory"""
275
276 def with_universe_domain(self, universe_domain):
277 """Returns a copy of these credentials with a modified universe domain.
278
279 Args:
280 universe_domain (str): The universe domain to use
281
282 Returns:
283 google.auth.credentials.Credentials: A new credentials instance.
284 """
285 raise NotImplementedError(
286 "This credential does not support with_universe_domain."
287 )
288
289
290class CredentialsWithTrustBoundary(Credentials):
291 """Abstract base for credentials supporting ``with_trust_boundary`` factory"""
292
293 @abc.abstractmethod
294 def _refresh_token(self, request):
295 """Refreshes the access token.
296
297 Args:
298 request (google.auth.transport.Request): The object used to make
299 HTTP requests.
300
301 Raises:
302 google.auth.exceptions.RefreshError: If the credentials could
303 not be refreshed.
304 """
305 raise NotImplementedError("_refresh_token must be implemented")
306
307 def with_trust_boundary(self, trust_boundary):
308 """Returns a copy of these credentials with a modified trust boundary.
309
310 Args:
311 trust_boundary Mapping[str, str]: The trust boundary to use for the
312 credential. This should be a map with a "locations" key that maps to
313 a list of GCP regions, and a "encodedLocations" key that maps to a
314 hex string.
315
316 Returns:
317 google.auth.credentials.Credentials: A new credentials instance.
318 """
319 raise NotImplementedError("This credential does not support trust boundaries.")
320
321 def _is_trust_boundary_lookup_required(self):
322 """Checks if a trust boundary lookup is required.
323
324 A lookup is required if the feature is enabled via an environment
325 variable, the universe domain is supported, and a no-op boundary
326 is not already cached.
327
328 Returns:
329 bool: True if a trust boundary lookup is required, False otherwise.
330 """
331 # 1. Check if the feature is enabled via environment variable.
332 if not _helpers.get_bool_from_env(
333 environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False
334 ):
335 return False
336
337 # 2. Skip trust boundary flow for non-default universe domains.
338 if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN:
339 return False
340
341 # 3. Do not trigger refresh if credential has a cached no-op trust boundary.
342 return not self._has_no_op_trust_boundary()
343
344 def _get_trust_boundary_header(self):
345 if self._trust_boundary is not None:
346 if self._has_no_op_trust_boundary():
347 # STS expects an empty string if the trust boundary value is no-op.
348 return {"x-allowed-locations": ""}
349 else:
350 return {"x-allowed-locations": self._trust_boundary["encodedLocations"]}
351 return {}
352
353 def apply(self, headers, token=None):
354 """Apply the token to the authentication header."""
355 super().apply(headers, token)
356 headers.update(self._get_trust_boundary_header())
357
358 def refresh(self, request):
359 """Refreshes the access token and the trust boundary.
360
361 This method calls the subclass's token refresh logic and then
362 refreshes the trust boundary if applicable.
363 """
364 self._refresh_token(request)
365 self._refresh_trust_boundary(request)
366
367 def _refresh_trust_boundary(self, request):
368 """Triggers a refresh of the trust boundary and updates the cache if necessary.
369
370 Args:
371 request (google.auth.transport.Request): The object used to make
372 HTTP requests.
373
374 Raises:
375 google.auth.exceptions.RefreshError: If the trust boundary could
376 not be refreshed and no cached value is available.
377 """
378 if not self._is_trust_boundary_lookup_required():
379 return
380 try:
381 self._trust_boundary = self._lookup_trust_boundary(request)
382 except exceptions.RefreshError as error:
383 # If the call to the lookup API failed, check if there is a trust boundary
384 # already cached. If there is, do nothing. If not, then throw the error.
385 if self._trust_boundary is None:
386 raise error
387 if _helpers.is_logging_enabled(_LOGGER):
388 _LOGGER.debug(
389 "Using cached trust boundary due to refresh error: %s", error
390 )
391 return
392
393 def _lookup_trust_boundary(self, request):
394 """Calls the trust boundary lookup API to refresh the trust boundary cache.
395
396 Args:
397 request (google.auth.transport.Request): The object used to make
398 HTTP requests.
399
400 Returns:
401 trust_boundary (dict): The trust boundary object returned by the lookup API.
402
403 Raises:
404 google.auth.exceptions.RefreshError: If the trust boundary could not be
405 retrieved.
406 """
407 from google.oauth2 import _client
408
409 url = self._build_trust_boundary_lookup_url()
410 if not url:
411 raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.")
412
413 headers = {}
414 self._apply(headers)
415 headers.update(self._get_trust_boundary_header())
416 return _client._lookup_trust_boundary(request, url, headers=headers)
417
418 @abc.abstractmethod
419 def _build_trust_boundary_lookup_url(self):
420 """
421 Builds and returns the URL for the trust boundary lookup API.
422
423 This method should be implemented by subclasses to provide the
424 specific URL based on the credential type and its properties.
425
426 Returns:
427 str: The URL for the trust boundary lookup endpoint, or None
428 if lookup should be skipped (e.g., for non-applicable universe domains).
429 """
430 raise NotImplementedError(
431 "_build_trust_boundary_lookup_url must be implemented"
432 )
433
434 def _has_no_op_trust_boundary(self):
435 # A no-op trust boundary is indicated by encodedLocations being "0x0".
436 # The "locations" list may or may not be present as an empty list.
437 if (
438 self._trust_boundary is not None
439 and self._trust_boundary["encodedLocations"]
440 == NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS
441 ):
442 return True
443 return False
444
445
446class AnonymousCredentials(Credentials):
447 """Credentials that do not provide any authentication information.
448
449 These are useful in the case of services that support anonymous access or
450 local service emulators that do not use credentials.
451 """
452
453 @property
454 def expired(self):
455 """Returns `False`, anonymous credentials never expire."""
456 return False
457
458 @property
459 def valid(self):
460 """Returns `True`, anonymous credentials are always valid."""
461 return True
462
463 def refresh(self, request):
464 """Raises :class:``InvalidOperation``, anonymous credentials cannot be
465 refreshed."""
466 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.")
467
468 def apply(self, headers, token=None):
469 """Anonymous credentials do nothing to the request.
470
471 The optional ``token`` argument is not supported.
472
473 Raises:
474 google.auth.exceptions.InvalidValue: If a token was specified.
475 """
476 if token is not None:
477 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.")
478
479 def before_request(self, request, method, url, headers):
480 """Anonymous credentials do nothing to the request."""
481
482
483class ReadOnlyScoped(metaclass=abc.ABCMeta):
484 """Interface for credentials whose scopes can be queried.
485
486 OAuth 2.0-based credentials allow limiting access using scopes as described
487 in `RFC6749 Section 3.3`_.
488 If a credential class implements this interface then the credentials either
489 use scopes in their implementation.
490
491 Some credentials require scopes in order to obtain a token. You can check
492 if scoping is necessary with :attr:`requires_scopes`::
493
494 if credentials.requires_scopes:
495 # Scoping is required.
496 credentials = credentials.with_scopes(scopes=['one', 'two'])
497
498 Credentials that require scopes must either be constructed with scopes::
499
500 credentials = SomeScopedCredentials(scopes=['one', 'two'])
501
502 Or must copy an existing instance using :meth:`with_scopes`::
503
504 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
505
506 Some credentials have scopes but do not allow or require scopes to be set,
507 these credentials can be used as-is.
508
509 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
510 """
511
512 def __init__(self):
513 super(ReadOnlyScoped, self).__init__()
514 self._scopes = None
515 self._default_scopes = None
516
517 @property
518 def scopes(self):
519 """Sequence[str]: the credentials' current set of scopes."""
520 return self._scopes
521
522 @property
523 def default_scopes(self):
524 """Sequence[str]: the credentials' current set of default scopes."""
525 return self._default_scopes
526
527 @abc.abstractproperty
528 def requires_scopes(self):
529 """True if these credentials require scopes to obtain an access token."""
530 return False
531
532 def has_scopes(self, scopes):
533 """Checks if the credentials have the given scopes.
534
535 .. warning: This method is not guaranteed to be accurate if the
536 credentials are :attr:`~Credentials.invalid`.
537
538 Args:
539 scopes (Sequence[str]): The list of scopes to check.
540
541 Returns:
542 bool: True if the credentials have the given scopes.
543 """
544 credential_scopes = (
545 self._scopes if self._scopes is not None else self._default_scopes
546 )
547 return set(scopes).issubset(set(credential_scopes or []))
548
549
550class Scoped(ReadOnlyScoped):
551 """Interface for credentials whose scopes can be replaced while copying.
552
553 OAuth 2.0-based credentials allow limiting access using scopes as described
554 in `RFC6749 Section 3.3`_.
555 If a credential class implements this interface then the credentials either
556 use scopes in their implementation.
557
558 Some credentials require scopes in order to obtain a token. You can check
559 if scoping is necessary with :attr:`requires_scopes`::
560
561 if credentials.requires_scopes:
562 # Scoping is required.
563 credentials = credentials.create_scoped(['one', 'two'])
564
565 Credentials that require scopes must either be constructed with scopes::
566
567 credentials = SomeScopedCredentials(scopes=['one', 'two'])
568
569 Or must copy an existing instance using :meth:`with_scopes`::
570
571 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
572
573 Some credentials have scopes but do not allow or require scopes to be set,
574 these credentials can be used as-is.
575
576 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
577 """
578
579 @abc.abstractmethod
580 def with_scopes(self, scopes, default_scopes=None):
581 """Create a copy of these credentials with the specified scopes.
582
583 Args:
584 scopes (Sequence[str]): The list of scopes to attach to the
585 current credentials.
586
587 Raises:
588 NotImplementedError: If the credentials' scopes can not be changed.
589 This can be avoided by checking :attr:`requires_scopes` before
590 calling this method.
591 """
592 raise NotImplementedError("This class does not require scoping.")
593
594
595def with_scopes_if_required(credentials, scopes, default_scopes=None):
596 """Creates a copy of the credentials with scopes if scoping is required.
597
598 This helper function is useful when you do not know (or care to know) the
599 specific type of credentials you are using (such as when you use
600 :func:`google.auth.default`). This function will call
601 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if
602 the credentials require scoping. Otherwise, it will return the credentials
603 as-is.
604
605 Args:
606 credentials (google.auth.credentials.Credentials): The credentials to
607 scope if necessary.
608 scopes (Sequence[str]): The list of scopes to use.
609 default_scopes (Sequence[str]): Default scopes passed by a
610 Google client library. Use 'scopes' for user-defined scopes.
611
612 Returns:
613 google.auth.credentials.Credentials: Either a new set of scoped
614 credentials, or the passed in credentials instance if no scoping
615 was required.
616 """
617 if isinstance(credentials, Scoped) and credentials.requires_scopes:
618 return credentials.with_scopes(scopes, default_scopes=default_scopes)
619 else:
620 return credentials
621
622
623class Signing(metaclass=abc.ABCMeta):
624 """Interface for credentials that can cryptographically sign messages."""
625
626 @abc.abstractmethod
627 def sign_bytes(self, message):
628 """Signs the given message.
629
630 Args:
631 message (bytes): The message to sign.
632
633 Returns:
634 bytes: The message's cryptographic signature.
635 """
636 # pylint: disable=missing-raises-doc,redundant-returns-doc
637 # (pylint doesn't recognize that this is abstract)
638 raise NotImplementedError("Sign bytes must be implemented.")
639
640 @abc.abstractproperty
641 def signer_email(self):
642 """Optional[str]: An email address that identifies the signer."""
643 # pylint: disable=missing-raises-doc
644 # (pylint doesn't recognize that this is abstract)
645 raise NotImplementedError("Signer email must be implemented.")
646
647 @abc.abstractproperty
648 def signer(self):
649 """google.auth.crypt.Signer: The signer used to sign bytes."""
650 # pylint: disable=missing-raises-doc
651 # (pylint doesn't recognize that this is abstract)
652 raise NotImplementedError("Signer must be implemented.")
653
654
655class TokenState(Enum):
656 """
657 Tracks the state of a token.
658 FRESH: The token is valid. It is not expired or close to expired, or the token has no expiry.
659 STALE: The token is close to expired, and should be refreshed. The token can be used normally.
660 INVALID: The token is expired or invalid. The token cannot be used for a normal operation.
661 """
662
663 FRESH = 1
664 STALE = 2
665 INVALID = 3