Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/credentials.py: 54%
104 statements
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-16 06:17 +0000
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-16 06:17 +0000
1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""Interfaces for credentials."""
18import abc
19import os
21import six
23from google.auth import _helpers, environment_vars
24from google.auth import exceptions
25from google.auth import metrics
28@six.add_metaclass(abc.ABCMeta)
29class Credentials(object):
30 """Base class for all credentials.
32 All credentials have a :attr:`token` that is used for authentication and
33 may also optionally set an :attr:`expiry` to indicate when the token will
34 no longer be valid.
36 Most credentials will be :attr:`invalid` until :meth:`refresh` is called.
37 Credentials can do this automatically before the first HTTP request in
38 :meth:`before_request`.
40 Although the token and expiration will change as the credentials are
41 :meth:`refreshed <refresh>` and used, credentials should be considered
42 immutable. Various credentials will accept configuration such as private
43 keys, scopes, and other options. These options are not changeable after
44 construction. Some classes will provide mechanisms to copy the credentials
45 with modifications such as :meth:`ScopedCredentials.with_scopes`.
46 """
48 def __init__(self):
49 self.token = None
50 """str: The bearer token that can be used in HTTP headers to make
51 authenticated requests."""
52 self.expiry = None
53 """Optional[datetime]: When the token expires and is no longer valid.
54 If this is None, the token is assumed to never expire."""
55 self._quota_project_id = None
56 """Optional[str]: Project to use for quota and billing purposes."""
57 self._trust_boundary = None
58 """Optional[str]: Encoded string representation of credentials trust
59 boundary."""
61 @property
62 def expired(self):
63 """Checks if the credentials are expired.
65 Note that credentials can be invalid but not expired because
66 Credentials with :attr:`expiry` set to None is considered to never
67 expire.
68 """
69 if not self.expiry:
70 return False
72 # Remove some threshold from expiry to err on the side of reporting
73 # expiration early so that we avoid the 401-refresh-retry loop.
74 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD
75 return _helpers.utcnow() >= skewed_expiry
77 @property
78 def valid(self):
79 """Checks the validity of the credentials.
81 This is True if the credentials have a :attr:`token` and the token
82 is not :attr:`expired`.
83 """
84 return self.token is not None and not self.expired
86 @property
87 def quota_project_id(self):
88 """Project to use for quota and billing purposes."""
89 return self._quota_project_id
91 @abc.abstractmethod
92 def refresh(self, request):
93 """Refreshes the access token.
95 Args:
96 request (google.auth.transport.Request): The object used to make
97 HTTP requests.
99 Raises:
100 google.auth.exceptions.RefreshError: If the credentials could
101 not be refreshed.
102 """
103 # pylint: disable=missing-raises-doc
104 # (pylint doesn't recognize that this is abstract)
105 raise NotImplementedError("Refresh must be implemented")
107 def _metric_header_for_usage(self):
108 """The x-goog-api-client header for token usage metric.
110 This header will be added to the API service requests in before_request
111 method. For example, "cred-type/sa-jwt" means service account self
112 signed jwt access token is used in the API service request
113 authorization header. Children credentials classes need to override
114 this method to provide the header value, if the token usage metric is
115 needed.
117 Returns:
118 str: The x-goog-api-client header value.
119 """
120 return None
122 def apply(self, headers, token=None):
123 """Apply the token to the authentication header.
125 Args:
126 headers (Mapping): The HTTP request headers.
127 token (Optional[str]): If specified, overrides the current access
128 token.
129 """
130 headers["authorization"] = "Bearer {}".format(
131 _helpers.from_bytes(token or self.token)
132 )
133 if self._trust_boundary is not None:
134 headers["x-identity-trust-boundary"] = self._trust_boundary
135 if self.quota_project_id:
136 headers["x-goog-user-project"] = self.quota_project_id
138 def before_request(self, request, method, url, headers):
139 """Performs credential-specific before request logic.
141 Refreshes the credentials if necessary, then calls :meth:`apply` to
142 apply the token to the authentication header.
144 Args:
145 request (google.auth.transport.Request): The object used to make
146 HTTP requests.
147 method (str): The request's HTTP method or the RPC method being
148 invoked.
149 url (str): The request's URI or the RPC service's URI.
150 headers (Mapping): The request's headers.
151 """
152 # pylint: disable=unused-argument
153 # (Subclasses may use these arguments to ascertain information about
154 # the http request.)
155 if not self.valid:
156 self.refresh(request)
157 metrics.add_metric_header(headers, self._metric_header_for_usage())
158 self.apply(headers)
161class CredentialsWithQuotaProject(Credentials):
162 """Abstract base for credentials supporting ``with_quota_project`` factory"""
164 def with_quota_project(self, quota_project_id):
165 """Returns a copy of these credentials with a modified quota project.
167 Args:
168 quota_project_id (str): The project to use for quota and
169 billing purposes
171 Returns:
172 google.oauth2.credentials.Credentials: A new credentials instance.
173 """
174 raise NotImplementedError("This credential does not support quota project.")
176 def with_quota_project_from_environment(self):
177 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT)
178 if quota_from_env:
179 return self.with_quota_project(quota_from_env)
180 return self
183class CredentialsWithTokenUri(Credentials):
184 """Abstract base for credentials supporting ``with_token_uri`` factory"""
186 def with_token_uri(self, token_uri):
187 """Returns a copy of these credentials with a modified token uri.
189 Args:
190 token_uri (str): The uri to use for fetching/exchanging tokens
192 Returns:
193 google.oauth2.credentials.Credentials: A new credentials instance.
194 """
195 raise NotImplementedError("This credential does not use token uri.")
198class AnonymousCredentials(Credentials):
199 """Credentials that do not provide any authentication information.
201 These are useful in the case of services that support anonymous access or
202 local service emulators that do not use credentials.
203 """
205 @property
206 def expired(self):
207 """Returns `False`, anonymous credentials never expire."""
208 return False
210 @property
211 def valid(self):
212 """Returns `True`, anonymous credentials are always valid."""
213 return True
215 def refresh(self, request):
216 """Raises :class:``InvalidOperation``, anonymous credentials cannot be
217 refreshed."""
218 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.")
220 def apply(self, headers, token=None):
221 """Anonymous credentials do nothing to the request.
223 The optional ``token`` argument is not supported.
225 Raises:
226 google.auth.exceptions.InvalidValue: If a token was specified.
227 """
228 if token is not None:
229 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.")
231 def before_request(self, request, method, url, headers):
232 """Anonymous credentials do nothing to the request."""
235@six.add_metaclass(abc.ABCMeta)
236class ReadOnlyScoped(object):
237 """Interface for credentials whose scopes can be queried.
239 OAuth 2.0-based credentials allow limiting access using scopes as described
240 in `RFC6749 Section 3.3`_.
241 If a credential class implements this interface then the credentials either
242 use scopes in their implementation.
244 Some credentials require scopes in order to obtain a token. You can check
245 if scoping is necessary with :attr:`requires_scopes`::
247 if credentials.requires_scopes:
248 # Scoping is required.
249 credentials = credentials.with_scopes(scopes=['one', 'two'])
251 Credentials that require scopes must either be constructed with scopes::
253 credentials = SomeScopedCredentials(scopes=['one', 'two'])
255 Or must copy an existing instance using :meth:`with_scopes`::
257 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
259 Some credentials have scopes but do not allow or require scopes to be set,
260 these credentials can be used as-is.
262 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
263 """
265 def __init__(self):
266 super(ReadOnlyScoped, self).__init__()
267 self._scopes = None
268 self._default_scopes = None
270 @property
271 def scopes(self):
272 """Sequence[str]: the credentials' current set of scopes."""
273 return self._scopes
275 @property
276 def default_scopes(self):
277 """Sequence[str]: the credentials' current set of default scopes."""
278 return self._default_scopes
280 @abc.abstractproperty
281 def requires_scopes(self):
282 """True if these credentials require scopes to obtain an access token.
283 """
284 return False
286 def has_scopes(self, scopes):
287 """Checks if the credentials have the given scopes.
289 .. warning: This method is not guaranteed to be accurate if the
290 credentials are :attr:`~Credentials.invalid`.
292 Args:
293 scopes (Sequence[str]): The list of scopes to check.
295 Returns:
296 bool: True if the credentials have the given scopes.
297 """
298 credential_scopes = (
299 self._scopes if self._scopes is not None else self._default_scopes
300 )
301 return set(scopes).issubset(set(credential_scopes or []))
304class Scoped(ReadOnlyScoped):
305 """Interface for credentials whose scopes can be replaced while copying.
307 OAuth 2.0-based credentials allow limiting access using scopes as described
308 in `RFC6749 Section 3.3`_.
309 If a credential class implements this interface then the credentials either
310 use scopes in their implementation.
312 Some credentials require scopes in order to obtain a token. You can check
313 if scoping is necessary with :attr:`requires_scopes`::
315 if credentials.requires_scopes:
316 # Scoping is required.
317 credentials = credentials.create_scoped(['one', 'two'])
319 Credentials that require scopes must either be constructed with scopes::
321 credentials = SomeScopedCredentials(scopes=['one', 'two'])
323 Or must copy an existing instance using :meth:`with_scopes`::
325 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
327 Some credentials have scopes but do not allow or require scopes to be set,
328 these credentials can be used as-is.
330 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
331 """
333 @abc.abstractmethod
334 def with_scopes(self, scopes, default_scopes=None):
335 """Create a copy of these credentials with the specified scopes.
337 Args:
338 scopes (Sequence[str]): The list of scopes to attach to the
339 current credentials.
341 Raises:
342 NotImplementedError: If the credentials' scopes can not be changed.
343 This can be avoided by checking :attr:`requires_scopes` before
344 calling this method.
345 """
346 raise NotImplementedError("This class does not require scoping.")
349def with_scopes_if_required(credentials, scopes, default_scopes=None):
350 """Creates a copy of the credentials with scopes if scoping is required.
352 This helper function is useful when you do not know (or care to know) the
353 specific type of credentials you are using (such as when you use
354 :func:`google.auth.default`). This function will call
355 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if
356 the credentials require scoping. Otherwise, it will return the credentials
357 as-is.
359 Args:
360 credentials (google.auth.credentials.Credentials): The credentials to
361 scope if necessary.
362 scopes (Sequence[str]): The list of scopes to use.
363 default_scopes (Sequence[str]): Default scopes passed by a
364 Google client library. Use 'scopes' for user-defined scopes.
366 Returns:
367 google.auth.credentials.Credentials: Either a new set of scoped
368 credentials, or the passed in credentials instance if no scoping
369 was required.
370 """
371 if isinstance(credentials, Scoped) and credentials.requires_scopes:
372 return credentials.with_scopes(scopes, default_scopes=default_scopes)
373 else:
374 return credentials
377@six.add_metaclass(abc.ABCMeta)
378class Signing(object):
379 """Interface for credentials that can cryptographically sign messages."""
381 @abc.abstractmethod
382 def sign_bytes(self, message):
383 """Signs the given message.
385 Args:
386 message (bytes): The message to sign.
388 Returns:
389 bytes: The message's cryptographic signature.
390 """
391 # pylint: disable=missing-raises-doc,redundant-returns-doc
392 # (pylint doesn't recognize that this is abstract)
393 raise NotImplementedError("Sign bytes must be implemented.")
395 @abc.abstractproperty
396 def signer_email(self):
397 """Optional[str]: An email address that identifies the signer."""
398 # pylint: disable=missing-raises-doc
399 # (pylint doesn't recognize that this is abstract)
400 raise NotImplementedError("Signer email must be implemented.")
402 @abc.abstractproperty
403 def signer(self):
404 """google.auth.crypt.Signer: The signer used to sign bytes."""
405 # pylint: disable=missing-raises-doc
406 # (pylint doesn't recognize that this is abstract)
407 raise NotImplementedError("Signer must be implemented.")