Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/credentials.py: 56%
97 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""Interfaces for credentials."""
18import abc
19import os
21import six
23from google.auth import _helpers, environment_vars
24from google.auth import exceptions
27@six.add_metaclass(abc.ABCMeta)
28class Credentials(object):
29 """Base class for all credentials.
31 All credentials have a :attr:`token` that is used for authentication and
32 may also optionally set an :attr:`expiry` to indicate when the token will
33 no longer be valid.
35 Most credentials will be :attr:`invalid` until :meth:`refresh` is called.
36 Credentials can do this automatically before the first HTTP request in
37 :meth:`before_request`.
39 Although the token and expiration will change as the credentials are
40 :meth:`refreshed <refresh>` and used, credentials should be considered
41 immutable. Various credentials will accept configuration such as private
42 keys, scopes, and other options. These options are not changeable after
43 construction. Some classes will provide mechanisms to copy the credentials
44 with modifications such as :meth:`ScopedCredentials.with_scopes`.
45 """
47 def __init__(self):
48 self.token = None
49 """str: The bearer token that can be used in HTTP headers to make
50 authenticated requests."""
51 self.expiry = None
52 """Optional[datetime]: When the token expires and is no longer valid.
53 If this is None, the token is assumed to never expire."""
54 self._quota_project_id = None
55 """Optional[str]: Project to use for quota and billing purposes."""
57 @property
58 def expired(self):
59 """Checks if the credentials are expired.
61 Note that credentials can be invalid but not expired because
62 Credentials with :attr:`expiry` set to None is considered to never
63 expire.
64 """
65 if not self.expiry:
66 return False
68 # Remove some threshold from expiry to err on the side of reporting
69 # expiration early so that we avoid the 401-refresh-retry loop.
70 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD
71 return _helpers.utcnow() >= skewed_expiry
73 @property
74 def valid(self):
75 """Checks the validity of the credentials.
77 This is True if the credentials have a :attr:`token` and the token
78 is not :attr:`expired`.
79 """
80 return self.token is not None and not self.expired
82 @property
83 def quota_project_id(self):
84 """Project to use for quota and billing purposes."""
85 return self._quota_project_id
87 @abc.abstractmethod
88 def refresh(self, request):
89 """Refreshes the access token.
91 Args:
92 request (google.auth.transport.Request): The object used to make
93 HTTP requests.
95 Raises:
96 google.auth.exceptions.RefreshError: If the credentials could
97 not be refreshed.
98 """
99 # pylint: disable=missing-raises-doc
100 # (pylint doesn't recognize that this is abstract)
101 raise NotImplementedError("Refresh must be implemented")
103 def apply(self, headers, token=None):
104 """Apply the token to the authentication header.
106 Args:
107 headers (Mapping): The HTTP request headers.
108 token (Optional[str]): If specified, overrides the current access
109 token.
110 """
111 headers["authorization"] = "Bearer {}".format(
112 _helpers.from_bytes(token or self.token)
113 )
114 if self.quota_project_id:
115 headers["x-goog-user-project"] = self.quota_project_id
117 def before_request(self, request, method, url, headers):
118 """Performs credential-specific before request logic.
120 Refreshes the credentials if necessary, then calls :meth:`apply` to
121 apply the token to the authentication header.
123 Args:
124 request (google.auth.transport.Request): The object used to make
125 HTTP requests.
126 method (str): The request's HTTP method or the RPC method being
127 invoked.
128 url (str): The request's URI or the RPC service's URI.
129 headers (Mapping): The request's headers.
130 """
131 # pylint: disable=unused-argument
132 # (Subclasses may use these arguments to ascertain information about
133 # the http request.)
134 if not self.valid:
135 self.refresh(request)
136 self.apply(headers)
139class CredentialsWithQuotaProject(Credentials):
140 """Abstract base for credentials supporting ``with_quota_project`` factory"""
142 def with_quota_project(self, quota_project_id):
143 """Returns a copy of these credentials with a modified quota project.
145 Args:
146 quota_project_id (str): The project to use for quota and
147 billing purposes
149 Returns:
150 google.oauth2.credentials.Credentials: A new credentials instance.
151 """
152 raise NotImplementedError("This credential does not support quota project.")
154 def with_quota_project_from_environment(self):
155 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT)
156 if quota_from_env:
157 return self.with_quota_project(quota_from_env)
158 return self
161class CredentialsWithTokenUri(Credentials):
162 """Abstract base for credentials supporting ``with_token_uri`` factory"""
164 def with_token_uri(self, token_uri):
165 """Returns a copy of these credentials with a modified token uri.
167 Args:
168 token_uri (str): The uri to use for fetching/exchanging tokens
170 Returns:
171 google.oauth2.credentials.Credentials: A new credentials instance.
172 """
173 raise NotImplementedError("This credential does not use token uri.")
176class AnonymousCredentials(Credentials):
177 """Credentials that do not provide any authentication information.
179 These are useful in the case of services that support anonymous access or
180 local service emulators that do not use credentials.
181 """
183 @property
184 def expired(self):
185 """Returns `False`, anonymous credentials never expire."""
186 return False
188 @property
189 def valid(self):
190 """Returns `True`, anonymous credentials are always valid."""
191 return True
193 def refresh(self, request):
194 """Raises :class:``InvalidOperation``, anonymous credentials cannot be
195 refreshed."""
196 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.")
198 def apply(self, headers, token=None):
199 """Anonymous credentials do nothing to the request.
201 The optional ``token`` argument is not supported.
203 Raises:
204 google.auth.exceptions.InvalidValue: If a token was specified.
205 """
206 if token is not None:
207 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.")
209 def before_request(self, request, method, url, headers):
210 """Anonymous credentials do nothing to the request."""
213@six.add_metaclass(abc.ABCMeta)
214class ReadOnlyScoped(object):
215 """Interface for credentials whose scopes can be queried.
217 OAuth 2.0-based credentials allow limiting access using scopes as described
218 in `RFC6749 Section 3.3`_.
219 If a credential class implements this interface then the credentials either
220 use scopes in their implementation.
222 Some credentials require scopes in order to obtain a token. You can check
223 if scoping is necessary with :attr:`requires_scopes`::
225 if credentials.requires_scopes:
226 # Scoping is required.
227 credentials = credentials.with_scopes(scopes=['one', 'two'])
229 Credentials that require scopes must either be constructed with scopes::
231 credentials = SomeScopedCredentials(scopes=['one', 'two'])
233 Or must copy an existing instance using :meth:`with_scopes`::
235 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
237 Some credentials have scopes but do not allow or require scopes to be set,
238 these credentials can be used as-is.
240 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
241 """
243 def __init__(self):
244 super(ReadOnlyScoped, self).__init__()
245 self._scopes = None
246 self._default_scopes = None
248 @property
249 def scopes(self):
250 """Sequence[str]: the credentials' current set of scopes."""
251 return self._scopes
253 @property
254 def default_scopes(self):
255 """Sequence[str]: the credentials' current set of default scopes."""
256 return self._default_scopes
258 @abc.abstractproperty
259 def requires_scopes(self):
260 """True if these credentials require scopes to obtain an access token.
261 """
262 return False
264 def has_scopes(self, scopes):
265 """Checks if the credentials have the given scopes.
267 .. warning: This method is not guaranteed to be accurate if the
268 credentials are :attr:`~Credentials.invalid`.
270 Args:
271 scopes (Sequence[str]): The list of scopes to check.
273 Returns:
274 bool: True if the credentials have the given scopes.
275 """
276 credential_scopes = (
277 self._scopes if self._scopes is not None else self._default_scopes
278 )
279 return set(scopes).issubset(set(credential_scopes or []))
282class Scoped(ReadOnlyScoped):
283 """Interface for credentials whose scopes can be replaced while copying.
285 OAuth 2.0-based credentials allow limiting access using scopes as described
286 in `RFC6749 Section 3.3`_.
287 If a credential class implements this interface then the credentials either
288 use scopes in their implementation.
290 Some credentials require scopes in order to obtain a token. You can check
291 if scoping is necessary with :attr:`requires_scopes`::
293 if credentials.requires_scopes:
294 # Scoping is required.
295 credentials = credentials.create_scoped(['one', 'two'])
297 Credentials that require scopes must either be constructed with scopes::
299 credentials = SomeScopedCredentials(scopes=['one', 'two'])
301 Or must copy an existing instance using :meth:`with_scopes`::
303 scoped_credentials = credentials.with_scopes(scopes=['one', 'two'])
305 Some credentials have scopes but do not allow or require scopes to be set,
306 these credentials can be used as-is.
308 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3
309 """
311 @abc.abstractmethod
312 def with_scopes(self, scopes, default_scopes=None):
313 """Create a copy of these credentials with the specified scopes.
315 Args:
316 scopes (Sequence[str]): The list of scopes to attach to the
317 current credentials.
319 Raises:
320 NotImplementedError: If the credentials' scopes can not be changed.
321 This can be avoided by checking :attr:`requires_scopes` before
322 calling this method.
323 """
324 raise NotImplementedError("This class does not require scoping.")
327def with_scopes_if_required(credentials, scopes, default_scopes=None):
328 """Creates a copy of the credentials with scopes if scoping is required.
330 This helper function is useful when you do not know (or care to know) the
331 specific type of credentials you are using (such as when you use
332 :func:`google.auth.default`). This function will call
333 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if
334 the credentials require scoping. Otherwise, it will return the credentials
335 as-is.
337 Args:
338 credentials (google.auth.credentials.Credentials): The credentials to
339 scope if necessary.
340 scopes (Sequence[str]): The list of scopes to use.
341 default_scopes (Sequence[str]): Default scopes passed by a
342 Google client library. Use 'scopes' for user-defined scopes.
344 Returns:
345 google.auth.credentials.Credentials: Either a new set of scoped
346 credentials, or the passed in credentials instance if no scoping
347 was required.
348 """
349 if isinstance(credentials, Scoped) and credentials.requires_scopes:
350 return credentials.with_scopes(scopes, default_scopes=default_scopes)
351 else:
352 return credentials
355@six.add_metaclass(abc.ABCMeta)
356class Signing(object):
357 """Interface for credentials that can cryptographically sign messages."""
359 @abc.abstractmethod
360 def sign_bytes(self, message):
361 """Signs the given message.
363 Args:
364 message (bytes): The message to sign.
366 Returns:
367 bytes: The message's cryptographic signature.
368 """
369 # pylint: disable=missing-raises-doc,redundant-returns-doc
370 # (pylint doesn't recognize that this is abstract)
371 raise NotImplementedError("Sign bytes must be implemented.")
373 @abc.abstractproperty
374 def signer_email(self):
375 """Optional[str]: An email address that identifies the signer."""
376 # pylint: disable=missing-raises-doc
377 # (pylint doesn't recognize that this is abstract)
378 raise NotImplementedError("Signer email must be implemented.")
380 @abc.abstractproperty
381 def signer(self):
382 """google.auth.crypt.Signer: The signer used to sign bytes."""
383 # pylint: disable=missing-raises-doc
384 # (pylint doesn't recognize that this is abstract)
385 raise NotImplementedError("Signer must be implemented.")