1# Copyright 2018 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Google Cloud Impersonated credentials.
16
17This module provides authentication for applications where local credentials
18impersonates a remote service account using `IAM Credentials API`_.
19
20This class can be used to impersonate a service account as long as the original
21Credential object has the "Service Account Token Creator" role on the target
22service account.
23
24 .. _IAM Credentials API:
25 https://cloud.google.com/iam/credentials/reference/rest/
26"""
27
28import base64
29import copy
30from datetime import datetime
31import http.client as http_client
32import json
33
34from google.auth import _exponential_backoff
35from google.auth import _helpers
36from google.auth import credentials
37from google.auth import exceptions
38from google.auth import iam
39from google.auth import jwt
40from google.auth import metrics
41from google.oauth2 import _client
42
43
44_REFRESH_ERROR = "Unable to acquire impersonated credentials"
45
46_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
47
48_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
49_TRUST_BOUNDARY_LOOKUP_ENDPOINT = (
50 "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations"
51)
52
53_SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE = "authorized_user"
54_SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE = "service_account"
55_SOURCE_CREDENTIAL_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE = (
56 "external_account_authorized_user"
57)
58
59
60def _make_iam_token_request(
61 request,
62 principal,
63 headers,
64 body,
65 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
66 iam_endpoint_override=None,
67):
68 """Makes a request to the Google Cloud IAM service for an access token.
69 Args:
70 request (Request): The Request object to use.
71 principal (str): The principal to request an access token for.
72 headers (Mapping[str, str]): Map of headers to transmit.
73 body (Mapping[str, str]): JSON Payload body for the iamcredentials
74 API call.
75 iam_endpoint_override (Optiona[str]): The full IAM endpoint override
76 with the target_principal embedded. This is useful when supporting
77 impersonation with regional endpoints.
78
79 Raises:
80 google.auth.exceptions.TransportError: Raised if there is an underlying
81 HTTP connection error
82 google.auth.exceptions.RefreshError: Raised if the impersonated
83 credentials are not available. Common reasons are
84 `iamcredentials.googleapis.com` is not enabled or the
85 `Service Account Token Creator` is not assigned
86 """
87 iam_endpoint = iam_endpoint_override or iam._IAM_ENDPOINT.replace(
88 credentials.DEFAULT_UNIVERSE_DOMAIN, universe_domain
89 ).format(principal)
90
91 body = json.dumps(body).encode("utf-8")
92
93 response = request(url=iam_endpoint, method="POST", headers=headers, body=body)
94
95 # support both string and bytes type response.data
96 response_body = (
97 response.data.decode("utf-8")
98 if hasattr(response.data, "decode")
99 else response.data
100 )
101
102 if response.status != http_client.OK:
103 raise exceptions.RefreshError(_REFRESH_ERROR, response_body)
104
105 try:
106 token_response = json.loads(response_body)
107 token = token_response["accessToken"]
108 expiry = datetime.strptime(token_response["expireTime"], "%Y-%m-%dT%H:%M:%SZ")
109
110 return token, expiry
111
112 except (KeyError, ValueError) as caught_exc:
113 new_exc = exceptions.RefreshError(
114 "{}: No access token or invalid expiration in response.".format(
115 _REFRESH_ERROR
116 ),
117 response_body,
118 )
119 raise new_exc from caught_exc
120
121
122class Credentials(
123 credentials.Scoped,
124 credentials.CredentialsWithQuotaProject,
125 credentials.Signing,
126 credentials.CredentialsWithTrustBoundary,
127):
128 """This module defines impersonated credentials which are essentially
129 impersonated identities.
130
131 Impersonated Credentials allows credentials issued to a user or
132 service account to impersonate another. The target service account must
133 grant the originating credential principal the
134 `Service Account Token Creator`_ IAM role:
135
136 For more information about Token Creator IAM role and
137 IAMCredentials API, see
138 `Creating Short-Lived Service Account Credentials`_.
139
140 .. _Service Account Token Creator:
141 https://cloud.google.com/iam/docs/service-accounts#the_service_account_token_creator_role
142
143 .. _Creating Short-Lived Service Account Credentials:
144 https://cloud.google.com/iam/docs/creating-short-lived-service-account-credentials
145
146 Usage:
147
148 First grant source_credentials the `Service Account Token Creator`
149 role on the target account to impersonate. In this example, the
150 service account represented by svc_account.json has the
151 token creator role on
152 `impersonated-account@_project_.iam.gserviceaccount.com`.
153
154 Enable the IAMCredentials API on the source project:
155 `gcloud services enable iamcredentials.googleapis.com`.
156
157 Initialize a source credential which does not have access to
158 list bucket::
159
160 from google.oauth2 import service_account
161
162 target_scopes = [
163 'https://www.googleapis.com/auth/devstorage.read_only']
164
165 source_credentials = (
166 service_account.Credentials.from_service_account_file(
167 '/path/to/svc_account.json',
168 scopes=target_scopes))
169
170 Now use the source credentials to acquire credentials to impersonate
171 another service account::
172
173 from google.auth import impersonated_credentials
174
175 target_credentials = impersonated_credentials.Credentials(
176 source_credentials=source_credentials,
177 target_principal='impersonated-account@_project_.iam.gserviceaccount.com',
178 target_scopes = target_scopes,
179 lifetime=500)
180
181 Resource access is granted::
182
183 client = storage.Client(credentials=target_credentials)
184 buckets = client.list_buckets(project='your_project')
185 for bucket in buckets:
186 print(bucket.name)
187
188 **IMPORTANT**:
189 This class does not validate the credential configuration. A security
190 risk occurs when a credential configuration configured with malicious urls
191 is used.
192 When the credential configuration is accepted from an
193 untrusted source, you should validate it before using.
194 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
195 """
196
197 def __init__(
198 self,
199 source_credentials,
200 target_principal,
201 target_scopes,
202 delegates=None,
203 subject=None,
204 lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
205 quota_project_id=None,
206 iam_endpoint_override=None,
207 trust_boundary=None,
208 ):
209 """
210 Args:
211 source_credentials (google.auth.Credentials): The source credential
212 used as to acquire the impersonated credentials.
213 target_principal (str): The service account to impersonate.
214 target_scopes (Sequence[str]): Scopes to request during the
215 authorization grant.
216 delegates (Sequence[str]): The chained list of delegates required
217 to grant the final access_token. If set, the sequence of
218 identities must have "Service Account Token Creator" capability
219 granted to the prceeding identity. For example, if set to
220 [serviceAccountB, serviceAccountC], the source_credential
221 must have the Token Creator role on serviceAccountB.
222 serviceAccountB must have the Token Creator on
223 serviceAccountC.
224 Finally, C must have Token Creator on target_principal.
225 If left unset, source_credential must have that role on
226 target_principal.
227 lifetime (int): Number of seconds the delegated credential should
228 be valid for (upto 3600).
229 quota_project_id (Optional[str]): The project ID used for quota and billing.
230 This project may be different from the project used to
231 create the credentials.
232 iam_endpoint_override (Optional[str]): The full IAM endpoint override
233 with the target_principal embedded. This is useful when supporting
234 impersonation with regional endpoints.
235 subject (Optional[str]): sub field of a JWT. This field should only be set
236 if you wish to impersonate as a user. This feature is useful when
237 using domain wide delegation.
238 trust_boundary (Mapping[str,str]): A credential trust boundary.
239 """
240
241 super(Credentials, self).__init__()
242
243 self._source_credentials = copy.copy(source_credentials)
244 # Service account source credentials must have the _IAM_SCOPE
245 # added to refresh correctly. User credentials cannot have
246 # their original scopes modified.
247 if isinstance(self._source_credentials, credentials.Scoped):
248 self._source_credentials = self._source_credentials.with_scopes(
249 iam._IAM_SCOPE
250 )
251 # If the source credential is service account and self signed jwt
252 # is needed, we need to create a jwt credential inside it
253 if (
254 hasattr(self._source_credentials, "_create_self_signed_jwt")
255 and self._source_credentials._always_use_jwt_access
256 ):
257 self._source_credentials._create_self_signed_jwt(None)
258
259 self._universe_domain = source_credentials.universe_domain
260 self._target_principal = target_principal
261 self._target_scopes = target_scopes
262 self._delegates = delegates
263 self._subject = subject
264 self._lifetime = lifetime or _DEFAULT_TOKEN_LIFETIME_SECS
265 self.token = None
266 self.expiry = _helpers.utcnow()
267 self._quota_project_id = quota_project_id
268 self._iam_endpoint_override = iam_endpoint_override
269 self._cred_file_path = None
270 self._trust_boundary = trust_boundary
271
272 def _metric_header_for_usage(self):
273 return metrics.CRED_TYPE_SA_IMPERSONATE
274
275 def _refresh_token(self, request):
276 """Updates credentials with a new access_token representing
277 the impersonated account.
278
279 Args:
280 request (google.auth.transport.requests.Request): Request object
281 to use for refreshing credentials.
282 """
283
284 # Refresh our source credentials if it is not valid.
285 if (
286 self._source_credentials.token_state == credentials.TokenState.STALE
287 or self._source_credentials.token_state == credentials.TokenState.INVALID
288 ):
289 self._source_credentials.refresh(request)
290
291 body = {
292 "delegates": self._delegates,
293 "scope": self._target_scopes,
294 "lifetime": str(self._lifetime) + "s",
295 }
296
297 headers = {
298 "Content-Type": "application/json",
299 metrics.API_CLIENT_HEADER: metrics.token_request_access_token_impersonate(),
300 }
301
302 # Apply the source credentials authentication info.
303 self._source_credentials.apply(headers)
304
305 # If a subject is specified a domain-wide delegation auth-flow is initiated
306 # to impersonate as the provided subject (user).
307 if self._subject:
308 if self.universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
309 raise exceptions.GoogleAuthError(
310 "Domain-wide delegation is not supported in universes other "
311 + "than googleapis.com"
312 )
313
314 now = _helpers.utcnow()
315 payload = {
316 "iss": self._target_principal,
317 "scope": _helpers.scopes_to_string(self._target_scopes or ()),
318 "sub": self._subject,
319 "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
320 "iat": _helpers.datetime_to_secs(now),
321 "exp": _helpers.datetime_to_secs(now) + _DEFAULT_TOKEN_LIFETIME_SECS,
322 }
323
324 assertion = _sign_jwt_request(
325 request=request,
326 principal=self._target_principal,
327 headers=headers,
328 payload=payload,
329 delegates=self._delegates,
330 )
331
332 self.token, self.expiry, _ = _client.jwt_grant(
333 request, _GOOGLE_OAUTH2_TOKEN_ENDPOINT, assertion
334 )
335
336 return
337
338 self.token, self.expiry = _make_iam_token_request(
339 request=request,
340 principal=self._target_principal,
341 headers=headers,
342 body=body,
343 universe_domain=self.universe_domain,
344 iam_endpoint_override=self._iam_endpoint_override,
345 )
346
347 def _build_trust_boundary_lookup_url(self):
348 """Builds and returns the URL for the trust boundary lookup API.
349
350 This method constructs the specific URL for the IAM Credentials API's
351 `allowedLocations` endpoint, using the credential's universe domain
352 and service account email.
353
354 Raises:
355 ValueError: If `self.service_account_email` is None or an empty
356 string, as it's required to form the URL.
357
358 Returns:
359 str: The URL for the trust boundary lookup endpoint.
360 """
361 if not self.service_account_email:
362 raise ValueError(
363 "Service account email is required to build the trust boundary lookup URL."
364 )
365 return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
366 self.universe_domain, self.service_account_email
367 )
368
369 def sign_bytes(self, message):
370 from google.auth.transport.requests import AuthorizedSession
371
372 iam_sign_endpoint = iam._IAM_SIGN_ENDPOINT.replace(
373 credentials.DEFAULT_UNIVERSE_DOMAIN, self.universe_domain
374 ).format(self._target_principal)
375
376 body = {
377 "payload": base64.b64encode(message).decode("utf-8"),
378 "delegates": self._delegates,
379 }
380
381 headers = {"Content-Type": "application/json"}
382
383 authed_session = AuthorizedSession(self._source_credentials)
384
385 try:
386 retries = _exponential_backoff.ExponentialBackoff()
387 for _ in retries:
388 response = authed_session.post(
389 url=iam_sign_endpoint, headers=headers, json=body
390 )
391 if response.status_code in iam.IAM_RETRY_CODES:
392 continue
393 if response.status_code != http_client.OK:
394 raise exceptions.TransportError(
395 "Error calling sign_bytes: {}".format(response.json())
396 )
397
398 return base64.b64decode(response.json()["signedBlob"])
399 finally:
400 authed_session.close()
401 raise exceptions.TransportError("exhausted signBlob endpoint retries")
402
403 @property
404 def signer_email(self):
405 return self._target_principal
406
407 @property
408 def service_account_email(self):
409 return self._target_principal
410
411 @property
412 def signer(self):
413 return self
414
415 @property
416 def requires_scopes(self):
417 return not self._target_scopes
418
419 @_helpers.copy_docstring(credentials.Credentials)
420 def get_cred_info(self):
421 if self._cred_file_path:
422 return {
423 "credential_source": self._cred_file_path,
424 "credential_type": "impersonated credentials",
425 "principal": self._target_principal,
426 }
427 return None
428
429 def _make_copy(self):
430 cred = self.__class__(
431 self._source_credentials,
432 target_principal=self._target_principal,
433 target_scopes=self._target_scopes,
434 delegates=self._delegates,
435 lifetime=self._lifetime,
436 quota_project_id=self._quota_project_id,
437 iam_endpoint_override=self._iam_endpoint_override,
438 trust_boundary=self._trust_boundary,
439 )
440 cred._cred_file_path = self._cred_file_path
441 return cred
442
443 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
444 def with_trust_boundary(self, trust_boundary):
445 cred = self._make_copy()
446 cred._trust_boundary = trust_boundary
447 return cred
448
449 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
450 def with_quota_project(self, quota_project_id):
451 cred = self._make_copy()
452 cred._quota_project_id = quota_project_id
453 return cred
454
455 @_helpers.copy_docstring(credentials.Scoped)
456 def with_scopes(self, scopes, default_scopes=None):
457 cred = self._make_copy()
458 cred._target_scopes = scopes or default_scopes
459 return cred
460
461 @classmethod
462 def from_impersonated_service_account_info(cls, info, scopes=None):
463 """Creates a Credentials instance from parsed impersonated service account credentials info.
464
465 **IMPORTANT**:
466 This method does not validate the credential configuration. A security
467 risk occurs when a credential configuration configured with malicious urls
468 is used.
469 When the credential configuration is accepted from an
470 untrusted source, you should validate it before using with this method.
471 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
472
473 Args:
474 info (Mapping[str, str]): The impersonated service account credentials info in Google
475 format.
476 scopes (Sequence[str]): Optional list of scopes to include in the
477 credentials.
478
479 Returns:
480 google.oauth2.credentials.Credentials: The constructed
481 credentials.
482
483 Raises:
484 InvalidType: If the info["source_credentials"] are not a supported impersonation type
485 InvalidValue: If the info["service_account_impersonation_url"] is not in the expected format.
486 ValueError: If the info is not in the expected format.
487 """
488
489 source_credentials_info = info.get("source_credentials")
490 source_credentials_type = source_credentials_info.get("type")
491 if source_credentials_type == _SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE:
492 from google.oauth2 import credentials
493
494 source_credentials = credentials.Credentials.from_authorized_user_info(
495 source_credentials_info
496 )
497 elif source_credentials_type == _SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE:
498 from google.oauth2 import service_account
499
500 source_credentials = service_account.Credentials.from_service_account_info(
501 source_credentials_info
502 )
503 elif (
504 source_credentials_type
505 == _SOURCE_CREDENTIAL_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE
506 ):
507 from google.auth import external_account_authorized_user
508
509 source_credentials = external_account_authorized_user.Credentials.from_info(
510 source_credentials_info
511 )
512 else:
513 raise exceptions.InvalidType(
514 "source credential of type {} is not supported.".format(
515 source_credentials_type
516 )
517 )
518
519 impersonation_url = info.get("service_account_impersonation_url")
520 start_index = impersonation_url.rfind("/")
521 end_index = impersonation_url.find(":generateAccessToken")
522 if start_index == -1 or end_index == -1 or start_index > end_index:
523 raise exceptions.InvalidValue(
524 "Cannot extract target principal from {}".format(impersonation_url)
525 )
526 target_principal = impersonation_url[start_index + 1 : end_index]
527 delegates = info.get("delegates")
528 quota_project_id = info.get("quota_project_id")
529
530 return cls(
531 source_credentials,
532 target_principal,
533 scopes,
534 delegates,
535 quota_project_id=quota_project_id,
536 )
537
538
539class IDTokenCredentials(credentials.CredentialsWithQuotaProject):
540 """Open ID Connect ID Token-based service account credentials."""
541
542 def __init__(
543 self,
544 target_credentials,
545 target_audience=None,
546 include_email=False,
547 quota_project_id=None,
548 ):
549 """
550 Args:
551 target_credentials (google.auth.Credentials): The target
552 credential used as to acquire the id tokens for.
553 target_audience (string): Audience to issue the token for.
554 include_email (bool): Include email in IdToken
555 quota_project_id (Optional[str]): The project ID used for
556 quota and billing.
557 """
558 super(IDTokenCredentials, self).__init__()
559
560 if not isinstance(target_credentials, Credentials):
561 raise exceptions.GoogleAuthError(
562 "Provided Credential must be " "impersonated_credentials"
563 )
564 self._target_credentials = target_credentials
565 self._target_audience = target_audience
566 self._include_email = include_email
567 self._quota_project_id = quota_project_id
568
569 def from_credentials(self, target_credentials, target_audience=None):
570 return self.__class__(
571 target_credentials=target_credentials,
572 target_audience=target_audience,
573 include_email=self._include_email,
574 quota_project_id=self._quota_project_id,
575 )
576
577 def with_target_audience(self, target_audience):
578 return self.__class__(
579 target_credentials=self._target_credentials,
580 target_audience=target_audience,
581 include_email=self._include_email,
582 quota_project_id=self._quota_project_id,
583 )
584
585 def with_include_email(self, include_email):
586 return self.__class__(
587 target_credentials=self._target_credentials,
588 target_audience=self._target_audience,
589 include_email=include_email,
590 quota_project_id=self._quota_project_id,
591 )
592
593 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
594 def with_quota_project(self, quota_project_id):
595 return self.__class__(
596 target_credentials=self._target_credentials,
597 target_audience=self._target_audience,
598 include_email=self._include_email,
599 quota_project_id=quota_project_id,
600 )
601
602 @_helpers.copy_docstring(credentials.Credentials)
603 def refresh(self, request):
604 from google.auth.transport.requests import AuthorizedSession
605
606 iam_sign_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace(
607 credentials.DEFAULT_UNIVERSE_DOMAIN,
608 self._target_credentials.universe_domain,
609 ).format(self._target_credentials.signer_email)
610
611 body = {
612 "audience": self._target_audience,
613 "delegates": self._target_credentials._delegates,
614 "includeEmail": self._include_email,
615 }
616
617 headers = {
618 "Content-Type": "application/json",
619 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_impersonate(),
620 }
621
622 authed_session = AuthorizedSession(
623 self._target_credentials._source_credentials, auth_request=request
624 )
625
626 try:
627 response = authed_session.post(
628 url=iam_sign_endpoint,
629 headers=headers,
630 data=json.dumps(body).encode("utf-8"),
631 )
632 finally:
633 authed_session.close()
634
635 if response.status_code != http_client.OK:
636 raise exceptions.RefreshError(
637 "Error getting ID token: {}".format(response.json())
638 )
639
640 id_token = response.json()["token"]
641 self.token = id_token
642 self.expiry = datetime.utcfromtimestamp(
643 jwt.decode(id_token, verify=False)["exp"]
644 )
645
646
647def _sign_jwt_request(request, principal, headers, payload, delegates=[]):
648 """Makes a request to the Google Cloud IAM service to sign a JWT using a
649 service account's system-managed private key.
650 Args:
651 request (Request): The Request object to use.
652 principal (str): The principal to request an access token for.
653 headers (Mapping[str, str]): Map of headers to transmit.
654 payload (Mapping[str, str]): The JWT payload to sign. Must be a
655 serialized JSON object that contains a JWT Claims Set.
656 delegates (Sequence[str]): The chained list of delegates required
657 to grant the final access_token. If set, the sequence of
658 identities must have "Service Account Token Creator" capability
659 granted to the prceeding identity. For example, if set to
660 [serviceAccountB, serviceAccountC], the source_credential
661 must have the Token Creator role on serviceAccountB.
662 serviceAccountB must have the Token Creator on
663 serviceAccountC.
664 Finally, C must have Token Creator on target_principal.
665 If left unset, source_credential must have that role on
666 target_principal.
667
668 Raises:
669 google.auth.exceptions.TransportError: Raised if there is an underlying
670 HTTP connection error
671 google.auth.exceptions.RefreshError: Raised if the impersonated
672 credentials are not available. Common reasons are
673 `iamcredentials.googleapis.com` is not enabled or the
674 `Service Account Token Creator` is not assigned
675 """
676 iam_endpoint = iam._IAM_SIGNJWT_ENDPOINT.format(principal)
677
678 body = {"delegates": delegates, "payload": json.dumps(payload)}
679 body = json.dumps(body).encode("utf-8")
680
681 response = request(url=iam_endpoint, method="POST", headers=headers, body=body)
682
683 # support both string and bytes type response.data
684 response_body = (
685 response.data.decode("utf-8")
686 if hasattr(response.data, "decode")
687 else response.data
688 )
689
690 if response.status != http_client.OK:
691 raise exceptions.RefreshError(_REFRESH_ERROR, response_body)
692
693 try:
694 jwt_response = json.loads(response_body)
695 signed_jwt = jwt_response["signedJwt"]
696 return signed_jwt
697
698 except (KeyError, ValueError) as caught_exc:
699 new_exc = exceptions.RefreshError(
700 "{}: No signed JWT in response.".format(_REFRESH_ERROR), response_body
701 )
702 raise new_exc from caught_exc