Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/oauth2/service_account.py: 43%
164 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:30 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:30 +0000
1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0
17This module implements the JWT Profile for OAuth 2.0 Authorization Grants
18as defined by `RFC 7523`_ with particular support for how this RFC is
19implemented in Google's infrastructure. Google refers to these credentials
20as *Service Accounts*.
22Service accounts are used for server-to-server communication, such as
23interactions between a web application server and a Google service. The
24service account belongs to your application instead of to an individual end
25user. In contrast to other OAuth 2.0 profiles, no users are involved and your
26application "acts" as the service account.
28Typically an application uses a service account when the application uses
29Google APIs to work with its own data rather than a user's data. For example,
30an application that uses Google Cloud Datastore for data persistence would use
31a service account to authenticate its calls to the Google Cloud Datastore API.
32However, an application that needs to access a user's Drive documents would
33use the normal OAuth 2.0 profile.
35Additionally, Google Apps domain administrators can grant service accounts
36`domain-wide delegation`_ authority to access user data on behalf of users in
37the domain.
39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used
40in place of the usual authorization token returned during the standard
41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as
42the acquired access token is used as the bearer token when making requests
43using these credentials.
45This profile differs from normal OAuth 2.0 profile because no user consent
46step is required. The use of the private key allows this profile to assert
47identity directly.
49This profile also differs from the :mod:`google.auth.jwt` authentication
50because the JWT credentials use the JWT directly as the bearer token. This
51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The
52obtained OAuth 2.0 access token is used as the bearer token.
54Domain-wide delegation
55----------------------
57Domain-wide delegation allows a service account to access user data on
58behalf of any user in a Google Apps domain without consent from the user.
59For example, an application that uses the Google Calendar API to add events to
60the calendars of all users in a Google Apps domain would use a service account
61to access the Google Calendar API on behalf of users.
63The Google Apps administrator must explicitly authorize the service account to
64do this. This authorization step is referred to as "delegating domain-wide
65authority" to a service account.
67You can use domain-wise delegation by creating a set of credentials with a
68specific subject using :meth:`~Credentials.with_subject`.
70.. _RFC 7523: https://tools.ietf.org/html/rfc7523
71"""
73import copy
74import datetime
76from google.auth import _helpers
77from google.auth import _service_account_info
78from google.auth import credentials
79from google.auth import jwt
80from google.oauth2 import _client
82_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
83_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
86class Credentials(
87 credentials.Signing,
88 credentials.Scoped,
89 credentials.CredentialsWithQuotaProject,
90 credentials.CredentialsWithTokenUri,
91):
92 """Service account credentials
94 Usually, you'll create these credentials with one of the helper
95 constructors. To create credentials using a Google service account
96 private key JSON file::
98 credentials = service_account.Credentials.from_service_account_file(
99 'service-account.json')
101 Or if you already have the service account file loaded::
103 service_account_info = json.load(open('service_account.json'))
104 credentials = service_account.Credentials.from_service_account_info(
105 service_account_info)
107 Both helper methods pass on arguments to the constructor, so you can
108 specify additional scopes and a subject if necessary::
110 credentials = service_account.Credentials.from_service_account_file(
111 'service-account.json',
112 scopes=['email'],
113 subject='user@example.com')
115 The credentials are considered immutable. If you want to modify the scopes
116 or the subject used for delegation, use :meth:`with_scopes` or
117 :meth:`with_subject`::
119 scoped_credentials = credentials.with_scopes(['email'])
120 delegated_credentials = credentials.with_subject(subject)
122 To add a quota project, use :meth:`with_quota_project`::
124 credentials = credentials.with_quota_project('myproject-123')
125 """
127 def __init__(
128 self,
129 signer,
130 service_account_email,
131 token_uri,
132 scopes=None,
133 default_scopes=None,
134 subject=None,
135 project_id=None,
136 quota_project_id=None,
137 additional_claims=None,
138 always_use_jwt_access=False,
139 ):
140 """
141 Args:
142 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
143 service_account_email (str): The service account's email.
144 scopes (Sequence[str]): User-defined scopes to request during the
145 authorization grant.
146 default_scopes (Sequence[str]): Default scopes passed by a
147 Google client library. Use 'scopes' for user-defined scopes.
148 token_uri (str): The OAuth 2.0 Token URI.
149 subject (str): For domain-wide delegation, the email address of the
150 user to for which to request delegated access.
151 project_id (str): Project ID associated with the service account
152 credential.
153 quota_project_id (Optional[str]): The project ID used for quota and
154 billing.
155 additional_claims (Mapping[str, str]): Any additional claims for
156 the JWT assertion used in the authorization grant.
157 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
158 be always used.
160 .. note:: Typically one of the helper constructors
161 :meth:`from_service_account_file` or
162 :meth:`from_service_account_info` are used instead of calling the
163 constructor directly.
164 """
165 super(Credentials, self).__init__()
167 self._scopes = scopes
168 self._default_scopes = default_scopes
169 self._signer = signer
170 self._service_account_email = service_account_email
171 self._subject = subject
172 self._project_id = project_id
173 self._quota_project_id = quota_project_id
174 self._token_uri = token_uri
175 self._always_use_jwt_access = always_use_jwt_access
177 self._jwt_credentials = None
179 if additional_claims is not None:
180 self._additional_claims = additional_claims
181 else:
182 self._additional_claims = {}
184 @classmethod
185 def _from_signer_and_info(cls, signer, info, **kwargs):
186 """Creates a Credentials instance from a signer and service account
187 info.
189 Args:
190 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
191 info (Mapping[str, str]): The service account info.
192 kwargs: Additional arguments to pass to the constructor.
194 Returns:
195 google.auth.jwt.Credentials: The constructed credentials.
197 Raises:
198 ValueError: If the info is not in the expected format.
199 """
200 return cls(
201 signer,
202 service_account_email=info["client_email"],
203 token_uri=info["token_uri"],
204 project_id=info.get("project_id"),
205 **kwargs
206 )
208 @classmethod
209 def from_service_account_info(cls, info, **kwargs):
210 """Creates a Credentials instance from parsed service account info.
212 Args:
213 info (Mapping[str, str]): The service account info in Google
214 format.
215 kwargs: Additional arguments to pass to the constructor.
217 Returns:
218 google.auth.service_account.Credentials: The constructed
219 credentials.
221 Raises:
222 ValueError: If the info is not in the expected format.
223 """
224 signer = _service_account_info.from_dict(
225 info, require=["client_email", "token_uri"]
226 )
227 return cls._from_signer_and_info(signer, info, **kwargs)
229 @classmethod
230 def from_service_account_file(cls, filename, **kwargs):
231 """Creates a Credentials instance from a service account json file.
233 Args:
234 filename (str): The path to the service account json file.
235 kwargs: Additional arguments to pass to the constructor.
237 Returns:
238 google.auth.service_account.Credentials: The constructed
239 credentials.
240 """
241 info, signer = _service_account_info.from_filename(
242 filename, require=["client_email", "token_uri"]
243 )
244 return cls._from_signer_and_info(signer, info, **kwargs)
246 @property
247 def service_account_email(self):
248 """The service account email."""
249 return self._service_account_email
251 @property
252 def project_id(self):
253 """Project ID associated with this credential."""
254 return self._project_id
256 @property
257 def requires_scopes(self):
258 """Checks if the credentials requires scopes.
260 Returns:
261 bool: True if there are no scopes set otherwise False.
262 """
263 return True if not self._scopes else False
265 @_helpers.copy_docstring(credentials.Scoped)
266 def with_scopes(self, scopes, default_scopes=None):
267 return self.__class__(
268 self._signer,
269 service_account_email=self._service_account_email,
270 scopes=scopes,
271 default_scopes=default_scopes,
272 token_uri=self._token_uri,
273 subject=self._subject,
274 project_id=self._project_id,
275 quota_project_id=self._quota_project_id,
276 additional_claims=self._additional_claims.copy(),
277 always_use_jwt_access=self._always_use_jwt_access,
278 )
280 def with_always_use_jwt_access(self, always_use_jwt_access):
281 """Create a copy of these credentials with the specified always_use_jwt_access value.
283 Args:
284 always_use_jwt_access (bool): Whether always use self signed JWT or not.
286 Returns:
287 google.auth.service_account.Credentials: A new credentials
288 instance.
289 """
290 return self.__class__(
291 self._signer,
292 service_account_email=self._service_account_email,
293 scopes=self._scopes,
294 default_scopes=self._default_scopes,
295 token_uri=self._token_uri,
296 subject=self._subject,
297 project_id=self._project_id,
298 quota_project_id=self._quota_project_id,
299 additional_claims=self._additional_claims.copy(),
300 always_use_jwt_access=always_use_jwt_access,
301 )
303 def with_subject(self, subject):
304 """Create a copy of these credentials with the specified subject.
306 Args:
307 subject (str): The subject claim.
309 Returns:
310 google.auth.service_account.Credentials: A new credentials
311 instance.
312 """
313 return self.__class__(
314 self._signer,
315 service_account_email=self._service_account_email,
316 scopes=self._scopes,
317 default_scopes=self._default_scopes,
318 token_uri=self._token_uri,
319 subject=subject,
320 project_id=self._project_id,
321 quota_project_id=self._quota_project_id,
322 additional_claims=self._additional_claims.copy(),
323 always_use_jwt_access=self._always_use_jwt_access,
324 )
326 def with_claims(self, additional_claims):
327 """Returns a copy of these credentials with modified claims.
329 Args:
330 additional_claims (Mapping[str, str]): Any additional claims for
331 the JWT payload. This will be merged with the current
332 additional claims.
334 Returns:
335 google.auth.service_account.Credentials: A new credentials
336 instance.
337 """
338 new_additional_claims = copy.deepcopy(self._additional_claims)
339 new_additional_claims.update(additional_claims or {})
341 return self.__class__(
342 self._signer,
343 service_account_email=self._service_account_email,
344 scopes=self._scopes,
345 default_scopes=self._default_scopes,
346 token_uri=self._token_uri,
347 subject=self._subject,
348 project_id=self._project_id,
349 quota_project_id=self._quota_project_id,
350 additional_claims=new_additional_claims,
351 always_use_jwt_access=self._always_use_jwt_access,
352 )
354 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
355 def with_quota_project(self, quota_project_id):
357 return self.__class__(
358 self._signer,
359 service_account_email=self._service_account_email,
360 default_scopes=self._default_scopes,
361 scopes=self._scopes,
362 token_uri=self._token_uri,
363 subject=self._subject,
364 project_id=self._project_id,
365 quota_project_id=quota_project_id,
366 additional_claims=self._additional_claims.copy(),
367 always_use_jwt_access=self._always_use_jwt_access,
368 )
370 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
371 def with_token_uri(self, token_uri):
373 return self.__class__(
374 self._signer,
375 service_account_email=self._service_account_email,
376 default_scopes=self._default_scopes,
377 scopes=self._scopes,
378 token_uri=token_uri,
379 subject=self._subject,
380 project_id=self._project_id,
381 quota_project_id=self._quota_project_id,
382 additional_claims=self._additional_claims.copy(),
383 always_use_jwt_access=self._always_use_jwt_access,
384 )
386 def _make_authorization_grant_assertion(self):
387 """Create the OAuth 2.0 assertion.
389 This assertion is used during the OAuth 2.0 grant to acquire an
390 access token.
392 Returns:
393 bytes: The authorization grant assertion.
394 """
395 now = _helpers.utcnow()
396 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
397 expiry = now + lifetime
399 payload = {
400 "iat": _helpers.datetime_to_secs(now),
401 "exp": _helpers.datetime_to_secs(expiry),
402 # The issuer must be the service account email.
403 "iss": self._service_account_email,
404 # The audience must be the auth token endpoint's URI
405 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
406 "scope": _helpers.scopes_to_string(self._scopes or ()),
407 }
409 payload.update(self._additional_claims)
411 # The subject can be a user email for domain-wide delegation.
412 if self._subject:
413 payload.setdefault("sub", self._subject)
415 token = jwt.encode(self._signer, payload)
417 return token
419 @_helpers.copy_docstring(credentials.Credentials)
420 def refresh(self, request):
421 # Since domain wide delegation doesn't work with self signed JWT. If
422 # subject exists, then we should not use self signed JWT.
423 if self._subject is None and self._jwt_credentials is not None:
424 self._jwt_credentials.refresh(request)
425 self.token = self._jwt_credentials.token
426 self.expiry = self._jwt_credentials.expiry
427 else:
428 assertion = self._make_authorization_grant_assertion()
429 access_token, expiry, _ = _client.jwt_grant(
430 request, self._token_uri, assertion
431 )
432 self.token = access_token
433 self.expiry = expiry
435 def _create_self_signed_jwt(self, audience):
436 """Create a self-signed JWT from the credentials if requirements are met.
438 Args:
439 audience (str): The service URL. ``https://[API_ENDPOINT]/``
440 """
441 # https://google.aip.dev/auth/4111
442 if self._always_use_jwt_access:
443 if self._scopes:
444 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
445 self, None, additional_claims={"scope": " ".join(self._scopes)}
446 )
447 elif audience:
448 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
449 self, audience
450 )
451 elif self._default_scopes:
452 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
453 self,
454 None,
455 additional_claims={"scope": " ".join(self._default_scopes)},
456 )
457 elif not self._scopes and audience:
458 self._jwt_credentials = jwt.Credentials.from_signing_credentials(
459 self, audience
460 )
462 @_helpers.copy_docstring(credentials.Signing)
463 def sign_bytes(self, message):
464 return self._signer.sign(message)
466 @property # type: ignore
467 @_helpers.copy_docstring(credentials.Signing)
468 def signer(self):
469 return self._signer
471 @property # type: ignore
472 @_helpers.copy_docstring(credentials.Signing)
473 def signer_email(self):
474 return self._service_account_email
477class IDTokenCredentials(
478 credentials.Signing,
479 credentials.CredentialsWithQuotaProject,
480 credentials.CredentialsWithTokenUri,
481):
482 """Open ID Connect ID Token-based service account credentials.
484 These credentials are largely similar to :class:`.Credentials`, but instead
485 of using an OAuth 2.0 Access Token as the bearer token, they use an Open
486 ID Connect ID Token as the bearer token. These credentials are useful when
487 communicating to services that require ID Tokens and can not accept access
488 tokens.
490 Usually, you'll create these credentials with one of the helper
491 constructors. To create credentials using a Google service account
492 private key JSON file::
494 credentials = (
495 service_account.IDTokenCredentials.from_service_account_file(
496 'service-account.json'))
499 Or if you already have the service account file loaded::
501 service_account_info = json.load(open('service_account.json'))
502 credentials = (
503 service_account.IDTokenCredentials.from_service_account_info(
504 service_account_info))
507 Both helper methods pass on arguments to the constructor, so you can
508 specify additional scopes and a subject if necessary::
510 credentials = (
511 service_account.IDTokenCredentials.from_service_account_file(
512 'service-account.json',
513 scopes=['email'],
514 subject='user@example.com'))
517 The credentials are considered immutable. If you want to modify the scopes
518 or the subject used for delegation, use :meth:`with_scopes` or
519 :meth:`with_subject`::
521 scoped_credentials = credentials.with_scopes(['email'])
522 delegated_credentials = credentials.with_subject(subject)
524 """
526 def __init__(
527 self,
528 signer,
529 service_account_email,
530 token_uri,
531 target_audience,
532 additional_claims=None,
533 quota_project_id=None,
534 ):
535 """
536 Args:
537 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
538 service_account_email (str): The service account's email.
539 token_uri (str): The OAuth 2.0 Token URI.
540 target_audience (str): The intended audience for these credentials,
541 used when requesting the ID Token. The ID Token's ``aud`` claim
542 will be set to this string.
543 additional_claims (Mapping[str, str]): Any additional claims for
544 the JWT assertion used in the authorization grant.
545 quota_project_id (Optional[str]): The project ID used for quota and billing.
546 .. note:: Typically one of the helper constructors
547 :meth:`from_service_account_file` or
548 :meth:`from_service_account_info` are used instead of calling the
549 constructor directly.
550 """
551 super(IDTokenCredentials, self).__init__()
552 self._signer = signer
553 self._service_account_email = service_account_email
554 self._token_uri = token_uri
555 self._target_audience = target_audience
556 self._quota_project_id = quota_project_id
558 if additional_claims is not None:
559 self._additional_claims = additional_claims
560 else:
561 self._additional_claims = {}
563 @classmethod
564 def _from_signer_and_info(cls, signer, info, **kwargs):
565 """Creates a credentials instance from a signer and service account
566 info.
568 Args:
569 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
570 info (Mapping[str, str]): The service account info.
571 kwargs: Additional arguments to pass to the constructor.
573 Returns:
574 google.auth.jwt.IDTokenCredentials: The constructed credentials.
576 Raises:
577 ValueError: If the info is not in the expected format.
578 """
579 kwargs.setdefault("service_account_email", info["client_email"])
580 kwargs.setdefault("token_uri", info["token_uri"])
581 return cls(signer, **kwargs)
583 @classmethod
584 def from_service_account_info(cls, info, **kwargs):
585 """Creates a credentials instance from parsed service account info.
587 Args:
588 info (Mapping[str, str]): The service account info in Google
589 format.
590 kwargs: Additional arguments to pass to the constructor.
592 Returns:
593 google.auth.service_account.IDTokenCredentials: The constructed
594 credentials.
596 Raises:
597 ValueError: If the info is not in the expected format.
598 """
599 signer = _service_account_info.from_dict(
600 info, require=["client_email", "token_uri"]
601 )
602 return cls._from_signer_and_info(signer, info, **kwargs)
604 @classmethod
605 def from_service_account_file(cls, filename, **kwargs):
606 """Creates a credentials instance from a service account json file.
608 Args:
609 filename (str): The path to the service account json file.
610 kwargs: Additional arguments to pass to the constructor.
612 Returns:
613 google.auth.service_account.IDTokenCredentials: The constructed
614 credentials.
615 """
616 info, signer = _service_account_info.from_filename(
617 filename, require=["client_email", "token_uri"]
618 )
619 return cls._from_signer_and_info(signer, info, **kwargs)
621 def with_target_audience(self, target_audience):
622 """Create a copy of these credentials with the specified target
623 audience.
625 Args:
626 target_audience (str): The intended audience for these credentials,
627 used when requesting the ID Token.
629 Returns:
630 google.auth.service_account.IDTokenCredentials: A new credentials
631 instance.
632 """
633 return self.__class__(
634 self._signer,
635 service_account_email=self._service_account_email,
636 token_uri=self._token_uri,
637 target_audience=target_audience,
638 additional_claims=self._additional_claims.copy(),
639 quota_project_id=self.quota_project_id,
640 )
642 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
643 def with_quota_project(self, quota_project_id):
644 return self.__class__(
645 self._signer,
646 service_account_email=self._service_account_email,
647 token_uri=self._token_uri,
648 target_audience=self._target_audience,
649 additional_claims=self._additional_claims.copy(),
650 quota_project_id=quota_project_id,
651 )
653 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
654 def with_token_uri(self, token_uri):
655 return self.__class__(
656 self._signer,
657 service_account_email=self._service_account_email,
658 token_uri=token_uri,
659 target_audience=self._target_audience,
660 additional_claims=self._additional_claims.copy(),
661 quota_project_id=self._quota_project_id,
662 )
664 def _make_authorization_grant_assertion(self):
665 """Create the OAuth 2.0 assertion.
667 This assertion is used during the OAuth 2.0 grant to acquire an
668 ID token.
670 Returns:
671 bytes: The authorization grant assertion.
672 """
673 now = _helpers.utcnow()
674 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
675 expiry = now + lifetime
677 payload = {
678 "iat": _helpers.datetime_to_secs(now),
679 "exp": _helpers.datetime_to_secs(expiry),
680 # The issuer must be the service account email.
681 "iss": self.service_account_email,
682 # The audience must be the auth token endpoint's URI
683 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
684 # The target audience specifies which service the ID token is
685 # intended for.
686 "target_audience": self._target_audience,
687 }
689 payload.update(self._additional_claims)
691 token = jwt.encode(self._signer, payload)
693 return token
695 @_helpers.copy_docstring(credentials.Credentials)
696 def refresh(self, request):
697 assertion = self._make_authorization_grant_assertion()
698 access_token, expiry, _ = _client.id_token_jwt_grant(
699 request, self._token_uri, assertion
700 )
701 self.token = access_token
702 self.expiry = expiry
704 @property
705 def service_account_email(self):
706 """The service account email."""
707 return self._service_account_email
709 @_helpers.copy_docstring(credentials.Signing)
710 def sign_bytes(self, message):
711 return self._signer.sign(message)
713 @property # type: ignore
714 @_helpers.copy_docstring(credentials.Signing)
715 def signer(self):
716 return self._signer
718 @property # type: ignore
719 @_helpers.copy_docstring(credentials.Signing)
720 def signer_email(self):
721 return self._service_account_email