1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16"""Interfaces for credentials."""
17
18import abc
19from enum import Enum
20import logging
21import os
22from typing import List
23
24from google.auth import _helpers, environment_vars
25from google.auth import exceptions
26from google.auth import metrics
27from google.auth._credentials_base import _BaseCredentials
28from google.auth._refresh_worker import RefreshThreadManager
29
30DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
31NO_OP_TRUST_BOUNDARY_LOCATIONS: List[str] = []
32NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0"
33
34_LOGGER = logging.getLogger("google.auth._default")
35
36
37class Credentials(_BaseCredentials):
38 """Base class for all credentials.
39
40 All credentials have a :attr:`token` that is used for authentication and
41 may also optionally set an :attr:`expiry` to indicate when the token will
42 no longer be valid.
43
44 Most credentials will be :attr:`invalid` until :meth:`refresh` is called.
45 Credentials can do this automatically before the first HTTP request in
46 :meth:`before_request`.
47
48 Although the token and expiration will change as the credentials are
49 :meth:`refreshed <refresh>` and used, credentials should be considered
50 immutable. Various credentials will accept configuration such as private
51 keys, scopes, and other options. These options are not changeable after
52 construction. Some classes will provide mechanisms to copy the credentials
53 with modifications such as :meth:`ScopedCredentials.with_scopes`.
54 """
55
56 def __init__(self):
57 super(Credentials, self).__init__()
58
59 self.expiry = None
60 """Optional[datetime]: When the token expires and is no longer valid.
61 If this is None, the token is assumed to never expire."""
62 self._quota_project_id = None
63 """Optional[str]: Project to use for quota and billing purposes."""
64 self._trust_boundary = None
65 """Optional[dict]: Cache of a trust boundary response which has a list
66 of allowed regions and an encoded string representation of credentials
67 trust boundary."""
68 self._universe_domain = DEFAULT_UNIVERSE_DOMAIN
69 """Optional[str]: The universe domain value, default is googleapis.com
70 """
71
72 self._use_non_blocking_refresh = False
73 self._refresh_worker = RefreshThreadManager()
74
75 @property
76 def expired(self):
77 """Checks if the credentials are expired.
78
79 Note that credentials can be invalid but not expired because
80 Credentials with :attr:`expiry` set to None is considered to never
81 expire.
82
83 .. deprecated:: v2.24.0
84 Prefer checking :attr:`token_state` instead.
85 """
86 if not self.expiry:
87 return False
88 # Remove some threshold from expiry to err on the side of reporting
89 # expiration early so that we avoid the 401-refresh-retry loop.
90 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD
91 return _helpers.utcnow() >= skewed_expiry
92
93 @property
94 def valid(self):
95 """Checks the validity of the credentials.
96
97 This is True if the credentials have a :attr:`token` and the token
98 is not :attr:`expired`.
99
100 .. deprecated:: v2.24.0
101 Prefer checking :attr:`token_state` instead.
102 """
103 return self.token is not None and not self.expired
104
105 @property
106 def token_state(self):
107 """
108 See `:obj:`TokenState`
109 """
110 if self.token is None:
111 return TokenState.INVALID
112
113 # Credentials that can't expire are always treated as fresh.
114 if self.expiry is None:
115 return TokenState.FRESH
116
117 expired = _helpers.utcnow() >= self.expiry
118 if expired:
119 return TokenState.INVALID
120
121 is_stale = _helpers.utcnow() >= (self.expiry - _helpers.REFRESH_THRESHOLD)
122 if is_stale:
123 return TokenState.STALE
124
125 return TokenState.FRESH
126
127 @property
128 def quota_project_id(self):
129 """Project to use for quota and billing purposes."""
130 return self._quota_project_id
131
132 @property
133 def universe_domain(self):
134 """The universe domain value."""
135 return self._universe_domain
136
137 def get_cred_info(self):
138 """The credential information JSON.
139
140 The credential information will be added to auth related error messages
141 by client library.
142
143 Returns:
144 Mapping[str, str]: The credential information JSON.
145 """
146 return None
147
148 @abc.abstractmethod
149 def refresh(self, request):
150 """Refreshes the access token.
151
152 Args:
153 request (google.auth.transport.Request): The object used to make
154 HTTP requests.
155
156 Raises:
157 google.auth.exceptions.RefreshError: If the credentials could
158 not be refreshed.
159 """
160 # pylint: disable=missing-raises-doc
161 # (pylint doesn't recognize that this is abstract)
162 raise NotImplementedError("Refresh must be implemented")
163
164 def _metric_header_for_usage(self):
165 """The x-goog-api-client header for token usage metric.
166
167 This header will be added to the API service requests in before_request
168 method. For example, "cred-type/sa-jwt" means service account self
169 signed jwt access token is used in the API service request
170 authorization header. Children credentials classes need to override
171 this method to provide the header value, if the token usage metric is
172 needed.
173
174 Returns:
175 str: The x-goog-api-client header value.
176 """
177 return None
178
179 def apply(self, headers, token=None):
180 """Apply the token to the authentication header.
181
182 Args:
183 headers (Mapping): The HTTP request headers.
184 token (Optional[str]): If specified, overrides the current access
185 token.
186 """
187 self._apply(headers, token)
188 if self.quota_project_id:
189 headers["x-goog-user-project"] = self.quota_project_id
190
191 def _blocking_refresh(self, request):
192 if not self.valid:
193 self.refresh(request)
194
195 def _non_blocking_refresh(self, request):
196 use_blocking_refresh_fallback = False
197
198 if self.token_state == TokenState.STALE:
199 use_blocking_refresh_fallback = not self._refresh_worker.start_refresh(
200 self, request
201 )
202
203 if self.token_state == TokenState.INVALID or use_blocking_refresh_fallback:
204 self.refresh(request)
205 # If the blocking refresh succeeds then we can clear the error info
206 # on the background refresh worker, and perform refreshes in a
207 # background thread.
208 self._refresh_worker.clear_error()
209
210 def before_request(self, request, method, url, headers):
211 """Performs credential-specific before request logic.
212
213 Refreshes the credentials if necessary, then calls :meth:`apply` to
214 apply the token to the authentication header.
215
216 Args:
217 request (google.auth.transport.Request): The object used to make
218 HTTP requests.
219 method (str): The request's HTTP method or the RPC method being
220 invoked.
221 url (str): The request's URI or the RPC service's URI.
222 headers (Mapping): The request's headers.
223 """
224 # pylint: disable=unused-argument
225 # (Subclasses may use these arguments to ascertain information about
226 # the http request.)
227 if self._use_non_blocking_refresh:
228 self._non_blocking_refresh(request)
229 else:
230 self._blocking_refresh(request)
231
232 metrics.add_metric_header(headers, self._metric_header_for_usage())
233 self.apply(headers)
234
235 def with_non_blocking_refresh(self):
236 self._use_non_blocking_refresh = True
237
238
239class CredentialsWithQuotaProject(Credentials):
240 """Abstract base for credentials supporting ``with_quota_project`` factory"""
241
242 def with_quota_project(self, quota_project_id):
243 """Returns a copy of these credentials with a modified quota project.
244
245 Args:
246 quota_project_id (str): The project to use for quota and
247 billing purposes
248
249 Returns:
250 google.auth.credentials.Credentials: A new credentials instance.
251 """
252 raise NotImplementedError("This credential does not support quota project.")
253
254 def with_quota_project_from_environment(self):
255 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT)
256 if quota_from_env:
257 return self.with_quota_project(quota_from_env)
258 return self
259
260
261class CredentialsWithTokenUri(Credentials):
262 """Abstract base for credentials supporting ``with_token_uri`` factory"""
263
264 def with_token_uri(self, token_uri):
265 """Returns a copy of these credentials with a modified token uri.
266
267 Args:
268 token_uri (str): The uri to use for fetching/exchanging tokens
269
270 Returns:
271 google.auth.credentials.Credentials: A new credentials instance.
272 """
273 raise NotImplementedError("This credential does not use token uri.")
274
275
276class CredentialsWithUniverseDomain(Credentials):
277 """Abstract base for credentials supporting ``with_universe_domain`` factory"""
278
279 def with_universe_domain(self, universe_domain):
280 """Returns a copy of these credentials with a modified universe domain.
281
282 Args:
283 universe_domain (str): The universe domain to use
284
285 Returns:
286 google.auth.credentials.Credentials: A new credentials instance.
287 """
288 raise NotImplementedError(
289 "This credential does not support with_universe_domain."
290 )
291
292
293class CredentialsWithTrustBoundary(Credentials):
294 """Abstract base for credentials supporting ``with_trust_boundary`` factory"""
295
296 @abc.abstractmethod
297 def _perform_refresh_token(self, request):
298 """Refreshes the access token.
299
300 Args:
301 request (google.auth.transport.Request): The object used to make
302 HTTP requests.
303
304 Raises:
305 google.auth.exceptions.RefreshError: If the credentials could
306 not be refreshed.
307 """
308 raise NotImplementedError("_perform_refresh_token must be implemented")
309
310 def with_trust_boundary(self, trust_boundary):
311 """Returns a copy of these credentials with a modified trust boundary.
312
313 Args:
314 trust_boundary Mapping[str, str]: The trust boundary to use for the
315 credential. This should be a map with a "locations" key that maps to
316 a list of GCP regions, and a "encodedLocations" key that maps to a
317 hex string.
318
319 Returns:
320 google.auth.credentials.Credentials: A new credentials instance.
321 """
322 raise NotImplementedError("This credential does not support trust boundaries.")
323
324 def _is_trust_boundary_lookup_required(self):
325 """Checks if a trust boundary lookup is required.
326
327 A lookup is required if the feature is enabled via an environment
328 variable, the universe domain is supported, and a no-op boundary
329 is not already cached.
330
331 Returns:
332 bool: True if a trust boundary lookup is required, False otherwise.
333 """
334 # 1. Check if the feature is enabled via environment variable.
335 if not _helpers.get_bool_from_env(
336 environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False
337 ):
338 return False
339
340 # 2. Skip trust boundary flow for non-default universe domains.
341 if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN:
342 return False
343
344 # 3. Do not trigger refresh if credential has a cached no-op trust boundary.
345 return not self._has_no_op_trust_boundary()
346
347 def _get_trust_boundary_header(self):
348 if self._trust_boundary is not None:
349 if self._has_no_op_trust_boundary():
350 # STS expects an empty string if the trust boundary value is no-op.
351 return {"x-allowed-locations": ""}
352 else:
353 return {"x-allowed-locations": self._trust_boundary["encodedLocations"]}
354 return {}
355
356 def apply(self, headers, token=None):
357 """Apply the token to the authentication header."""
358 super().apply(headers, token)
359 headers.update(self._get_trust_boundary_header())
360
361 def refresh(self, request):
362 """Refreshes the access token and the trust boundary.
363
364 This method calls the subclass's token refresh logic and then
365 refreshes the trust boundary if applicable.
366 """
367 self._perform_refresh_token(request)
368 self._refresh_trust_boundary(request)
369
370 def _refresh_trust_boundary(self, request):
371 """Triggers a refresh of the trust boundary and updates the cache if necessary.
372
373 Args:
374 request (google.auth.transport.Request): The object used to make
375 HTTP requests.
376
377 Raises:
378 google.auth.exceptions.RefreshError: If the trust boundary could
379 not be refreshed and no cached value is available.
380 """
381 if not self._is_trust_boundary_lookup_required():
382 return
383 try:
384 self._trust_boundary = self._lookup_trust_boundary(request)
385 except exceptions.RefreshError as error:
386 # If the call to the lookup API failed, check if there is a trust boundary
387 # already cached. If there is, do nothing. If not, then throw the error.
388 if self._trust_boundary is None:
389 raise error
390 if _helpers.is_logging_enabled(_LOGGER):
391 _LOGGER.debug(
392 "Using cached trust boundary due to refresh error: %s", error
393 )
394 return
395
396 def _lookup_trust_boundary(self, request):
397 """Calls the trust boundary lookup API to refresh the trust boundary cache.
398
399 Args:
400 request (google.auth.transport.Request): The object used to make
401 HTTP requests.
402
403 Returns:
404 trust_boundary (dict): The trust boundary object returned by the lookup API.
405
406 Raises:
407 google.auth.exceptions.RefreshError: If the trust boundary could not be
408 retrieved.
409 """
410 from google.oauth2 import _client
411
412 url = self._build_trust_boundary_lookup_url()
413 if not url:
414 raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.")
415
416 headers = {}
417 self._apply(headers)
418 headers.update(self._get_trust_boundary_header())
419 return _client._lookup_trust_boundary(request, url, headers=headers)
420
421 @abc.abstractmethod
422 def _build_trust_boundary_lookup_url(self):
423 """
424 Builds and returns the URL for the trust boundary lookup API.
425
426 This method should be implemented by subclasses to provide the
427 specific URL based on the credential type and its properties.
428
429 Returns:
430 str: The URL for the trust boundary lookup endpoint, or None
431 if lookup should be skipped (e.g., for non-applicable universe domains).
432 """
433 raise NotImplementedError(
434 "_build_trust_boundary_lookup_url must be implemented"
435 )
436
437 def _has_no_op_trust_boundary(self):
438 # A no-op trust boundary is indicated by encodedLocations being "0x0".
439 # The "locations" list may or may not be present as an empty list.
440 if self._trust_boundary is None:
441 return False
442 return (
443 self._trust_boundary.get("encodedLocations")
444 == NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS
445 )
446
447
448class AnonymousCredentials(Credentials):
449 """Credentials that do not provide any authentication information.
450
451 These are useful in the case of services that support anonymous access or
452 local service emulators that do not use credentials.
453 """
454
455 @property
456 def expired(self):
457 """Returns `False`, anonymous credentials never expire."""
458 return False
459
460 @property
461 def valid(self):
462 """Returns `True`, anonymous credentials are always valid."""
463 return True
464
465 def refresh(self, request):
466 """Raises :class:``InvalidOperation``, anonymous credentials cannot be
467 refreshed."""
468 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.")
469
470 def apply(self, headers, token=None):
471 """Anonymous credentials do nothing to the request.
472
473 The optional ``token`` argument is not supported.
474
475 Raises:
476 google.auth.exceptions.InvalidValue: If a token was specified.
477 """
478 if token is not None:
479 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.")
480
481 def before_request(self, request, method, url, headers):
482 """Anonymous credentials do nothing to the request."""
483
484
485class ReadOnlyScoped(metaclass=abc.ABCMeta):
486 """Interface for credentials whose scopes can be queried.
487
488 OAuth 2.0-based credentials allow limiting access using scopes as described
489 in `RFC6749 Section 3.3`_.
490 If a credential class implements this interface then the credentials either
491 use scopes in their implementation.
492
493 Some credentials require scopes in order to obtain a token. You can check
494 if scoping is necessary with :attr:`requires_scopes`::
495
496 if credentials.requires_scopes:
497 # Scoping is required.
498 credentials = credentials.with_scopes(scopes=['one', 'two'])
499
500 Credentials that require scopes must either be constructed with scopes::
501
502 credentials = SomeScopedCredentials(scopes=['one', 'two'])
503
504 Or must copy an existing instance using :meth:`with_scopes`::
505
506 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
507
508 Some credentials have scopes but do not allow or require scopes to be set,
509 these credentials can be used as-is.
510
511 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
512 """
513
514 def __init__(self):
515 super(ReadOnlyScoped, self).__init__()
516 self._scopes = None
517 self._default_scopes = None
518
519 @property
520 def scopes(self):
521 """Sequence[str]: the credentials' current set of scopes."""
522 return self._scopes
523
524 @property
525 def default_scopes(self):
526 """Sequence[str]: the credentials' current set of default scopes."""
527 return self._default_scopes
528
529 @abc.abstractproperty
530 def requires_scopes(self):
531 """True if these credentials require scopes to obtain an access token."""
532 return False
533
534 def has_scopes(self, scopes):
535 """Checks if the credentials have the given scopes.
536
537 .. warning: This method is not guaranteed to be accurate if the
538 credentials are :attr:`~Credentials.invalid`.
539
540 Args:
541 scopes (Sequence[str]): The list of scopes to check.
542
543 Returns:
544 bool: True if the credentials have the given scopes.
545 """
546 credential_scopes = (
547 self._scopes if self._scopes is not None else self._default_scopes
548 )
549 return set(scopes).issubset(set(credential_scopes or []))
550
551
552class Scoped(ReadOnlyScoped):
553 """Interface for credentials whose scopes can be replaced while copying.
554
555 OAuth 2.0-based credentials allow limiting access using scopes as described
556 in `RFC6749 Section 3.3`_.
557 If a credential class implements this interface then the credentials either
558 use scopes in their implementation.
559
560 Some credentials require scopes in order to obtain a token. You can check
561 if scoping is necessary with :attr:`requires_scopes`::
562
563 if credentials.requires_scopes:
564 # Scoping is required.
565 credentials = credentials.create_scoped(['one', 'two'])
566
567 Credentials that require scopes must either be constructed with scopes::
568
569 credentials = SomeScopedCredentials(scopes=['one', 'two'])
570
571 Or must copy an existing instance using :meth:`with_scopes`::
572
573 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
574
575 Some credentials have scopes but do not allow or require scopes to be set,
576 these credentials can be used as-is.
577
578 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
579 """
580
581 @abc.abstractmethod
582 def with_scopes(self, scopes, default_scopes=None):
583 """Create a copy of these credentials with the specified scopes.
584
585 Args:
586 scopes (Sequence[str]): The list of scopes to attach to the
587 current credentials.
588
589 Raises:
590 NotImplementedError: If the credentials' scopes can not be changed.
591 This can be avoided by checking :attr:`requires_scopes` before
592 calling this method.
593 """
594 raise NotImplementedError("This class does not require scoping.")
595
596
597def with_scopes_if_required(credentials, scopes, default_scopes=None):
598 """Creates a copy of the credentials with scopes if scoping is required.
599
600 This helper function is useful when you do not know (or care to know) the
601 specific type of credentials you are using (such as when you use
602 :func:`google.auth.default`). This function will call
603 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if
604 the credentials require scoping. Otherwise, it will return the credentials
605 as-is.
606
607 Args:
608 credentials (google.auth.credentials.Credentials): The credentials to
609 scope if necessary.
610 scopes (Sequence[str]): The list of scopes to use.
611 default_scopes (Sequence[str]): Default scopes passed by a
612 Google client library. Use 'scopes' for user-defined scopes.
613
614 Returns:
615 google.auth.credentials.Credentials: Either a new set of scoped
616 credentials, or the passed in credentials instance if no scoping
617 was required.
618 """
619 if isinstance(credentials, Scoped) and credentials.requires_scopes:
620 return credentials.with_scopes(scopes, default_scopes=default_scopes)
621 else:
622 return credentials
623
624
625class Signing(metaclass=abc.ABCMeta):
626 """Interface for credentials that can cryptographically sign messages."""
627
628 @abc.abstractmethod
629 def sign_bytes(self, message):
630 """Signs the given message.
631
632 Args:
633 message (bytes): The message to sign.
634
635 Returns:
636 bytes: The message's cryptographic signature.
637 """
638 # pylint: disable=missing-raises-doc,redundant-returns-doc
639 # (pylint doesn't recognize that this is abstract)
640 raise NotImplementedError("Sign bytes must be implemented.")
641
642 @abc.abstractproperty
643 def signer_email(self):
644 """Optional[str]: An email address that identifies the signer."""
645 # pylint: disable=missing-raises-doc
646 # (pylint doesn't recognize that this is abstract)
647 raise NotImplementedError("Signer email must be implemented.")
648
649 @abc.abstractproperty
650 def signer(self):
651 """google.auth.crypt.Signer: The signer used to sign bytes."""
652 # pylint: disable=missing-raises-doc
653 # (pylint doesn't recognize that this is abstract)
654 raise NotImplementedError("Signer must be implemented.")
655
656
657class TokenState(Enum):
658 """
659 Tracks the state of a token.
660 FRESH: The token is valid. It is not expired or close to expired, or the token has no expiry.
661 STALE: The token is close to expired, and should be refreshed. The token can be used normally.
662 INVALID: The token is expired or invalid. The token cannot be used for a normal operation.
663 """
664
665 FRESH = 1
666 STALE = 2
667 INVALID = 3