1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Google Compute Engine credentials.
16
17This module provides authentication for an application running on Google
18Compute Engine using the Compute Engine metadata server.
19
20"""
21
22import datetime
23
24from google.auth import _helpers
25from google.auth import credentials
26from google.auth import exceptions
27from google.auth import iam
28from google.auth import jwt
29from google.auth import metrics
30from google.auth.compute_engine import _metadata
31from google.oauth2 import _client
32
33_TRUST_BOUNDARY_LOOKUP_ENDPOINT = (
34 "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations"
35)
36
37
38class Credentials(
39 credentials.Scoped,
40 credentials.CredentialsWithQuotaProject,
41 credentials.CredentialsWithUniverseDomain,
42 credentials.CredentialsWithTrustBoundary,
43):
44 """Compute Engine Credentials.
45
46 These credentials use the Google Compute Engine metadata server to obtain
47 OAuth 2.0 access tokens associated with the instance's service account,
48 and are also used for Cloud Run, Flex and App Engine (except for the Python
49 2.7 runtime, which is supported only on older versions of this library).
50
51 For more information about Compute Engine authentication, including how
52 to configure scopes, see the `Compute Engine authentication
53 documentation`_.
54
55 .. note:: On Compute Engine the metadata server ignores requested scopes.
56 On Cloud Run, Flex and App Engine the server honours requested scopes.
57
58 .. _Compute Engine authentication documentation:
59 https://cloud.google.com/compute/docs/authentication#using
60 """
61
62 def __init__(
63 self,
64 service_account_email="default",
65 quota_project_id=None,
66 scopes=None,
67 default_scopes=None,
68 universe_domain=None,
69 trust_boundary=None,
70 ):
71 """
72 Args:
73 service_account_email (str): The service account email to use, or
74 'default'. A Compute Engine instance may have multiple service
75 accounts.
76 quota_project_id (Optional[str]): The project ID used for quota and
77 billing.
78 scopes (Optional[Sequence[str]]): The list of scopes for the credentials.
79 default_scopes (Optional[Sequence[str]]): Default scopes passed by a
80 Google client library. Use 'scopes' for user-defined scopes.
81 universe_domain (Optional[str]): The universe domain. If not
82 provided or None, credential will attempt to fetch the value
83 from metadata server. If metadata server doesn't have universe
84 domain endpoint, then the default googleapis.com will be used.
85 trust_boundary (Mapping[str,str]): A credential trust boundary.
86 """
87 super(Credentials, self).__init__()
88 self._service_account_email = service_account_email
89 self._quota_project_id = quota_project_id
90 self._scopes = scopes
91 self._default_scopes = default_scopes
92 self._universe_domain_cached = False
93 if universe_domain:
94 self._universe_domain = universe_domain
95 self._universe_domain_cached = True
96 self._trust_boundary = trust_boundary
97
98 def _retrieve_info(self, request):
99 """Retrieve information about the service account.
100
101 Updates the scopes and retrieves the full service account email.
102
103 Args:
104 request (google.auth.transport.Request): The object used to make
105 HTTP requests.
106 """
107 info = _metadata.get_service_account_info(
108 request, service_account=self._service_account_email
109 )
110
111 if not info or "email" not in info:
112 raise exceptions.RefreshError(
113 "Unexpected response from metadata server: "
114 "service account info is missing 'email' field."
115 )
116
117 self._service_account_email = info["email"]
118
119 # Don't override scopes requested by the user.
120 if self._scopes is None:
121 self._scopes = info.get("scopes")
122
123 def _metric_header_for_usage(self):
124 return metrics.CRED_TYPE_SA_MDS
125
126 def _refresh_token(self, request):
127 """Refresh the access token and scopes.
128
129 Args:
130 request (google.auth.transport.Request): The object used to make
131 HTTP requests.
132
133 Raises:
134 google.auth.exceptions.RefreshError: If the Compute Engine metadata
135 service can't be reached if if the instance has not
136 credentials.
137 """
138 scopes = self._scopes if self._scopes is not None else self._default_scopes
139 try:
140 self._retrieve_info(request)
141 # Always fetch token with default service account email.
142 self.token, self.expiry = _metadata.get_service_account_token(
143 request, service_account="default", scopes=scopes
144 )
145 except exceptions.TransportError as caught_exc:
146 new_exc = exceptions.RefreshError(caught_exc)
147 raise new_exc from caught_exc
148
149 def _build_trust_boundary_lookup_url(self):
150 """Builds and returns the URL for the trust boundary lookup API for GCE."""
151 # If the service account email is 'default', we need to get the
152 # actual email address from the metadata server.
153 if self._service_account_email == "default":
154 from google.auth.transport import requests as google_auth_requests
155
156 request = google_auth_requests.Request()
157 try:
158 info = _metadata.get_service_account_info(request, "default")
159 if not info or "email" not in info:
160 raise exceptions.RefreshError(
161 "Unexpected response from metadata server: "
162 "service account info is missing 'email' field."
163 )
164 self._service_account_email = info["email"]
165
166 except exceptions.TransportError as e:
167 # If fetching the service account email fails due to a transport error,
168 # it means we cannot build the trust boundary lookup URL.
169 # Wrap this in a RefreshError so it's caught by _refresh_trust_boundary.
170 raise exceptions.RefreshError(
171 "Failed to get service account email for trust boundary lookup: {}".format(
172 e
173 )
174 ) from e
175
176 return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
177 self.universe_domain, self.service_account_email
178 )
179
180 @property
181 def service_account_email(self):
182 """The service account email.
183
184 .. note:: This is not guaranteed to be set until :meth:`refresh` has been
185 called.
186 """
187 return self._service_account_email
188
189 @property
190 def requires_scopes(self):
191 return not self._scopes
192
193 @property
194 def universe_domain(self):
195 if self._universe_domain_cached:
196 return self._universe_domain
197
198 from google.auth.transport import requests as google_auth_requests
199
200 self._universe_domain = _metadata.get_universe_domain(
201 google_auth_requests.Request()
202 )
203 self._universe_domain_cached = True
204 return self._universe_domain
205
206 @_helpers.copy_docstring(credentials.Credentials)
207 def get_cred_info(self):
208 return {
209 "credential_source": "metadata server",
210 "credential_type": "VM credentials",
211 "principal": self.service_account_email,
212 }
213
214 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
215 def with_quota_project(self, quota_project_id):
216 creds = self.__class__(
217 service_account_email=self._service_account_email,
218 quota_project_id=quota_project_id,
219 scopes=self._scopes,
220 default_scopes=self._default_scopes,
221 universe_domain=self._universe_domain,
222 trust_boundary=self._trust_boundary,
223 )
224 creds._universe_domain_cached = self._universe_domain_cached
225 return creds
226
227 @_helpers.copy_docstring(credentials.Scoped)
228 def with_scopes(self, scopes, default_scopes=None):
229 # Compute Engine credentials can not be scoped (the metadata service
230 # ignores the scopes parameter). App Engine, Cloud Run and Flex support
231 # requesting scopes.
232 creds = self.__class__(
233 scopes=scopes,
234 default_scopes=default_scopes,
235 service_account_email=self._service_account_email,
236 quota_project_id=self._quota_project_id,
237 universe_domain=self._universe_domain,
238 trust_boundary=self._trust_boundary,
239 )
240 creds._universe_domain_cached = self._universe_domain_cached
241 return creds
242
243 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
244 def with_universe_domain(self, universe_domain):
245 return self.__class__(
246 scopes=self._scopes,
247 default_scopes=self._default_scopes,
248 service_account_email=self._service_account_email,
249 quota_project_id=self._quota_project_id,
250 trust_boundary=self._trust_boundary,
251 universe_domain=universe_domain,
252 )
253
254 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
255 def with_trust_boundary(self, trust_boundary):
256 creds = self.__class__(
257 service_account_email=self._service_account_email,
258 quota_project_id=self._quota_project_id,
259 scopes=self._scopes,
260 default_scopes=self._default_scopes,
261 universe_domain=self._universe_domain,
262 trust_boundary=trust_boundary,
263 )
264 creds._universe_domain_cached = self._universe_domain_cached
265 return creds
266
267
268_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
269_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
270
271
272class IDTokenCredentials(
273 credentials.CredentialsWithQuotaProject,
274 credentials.Signing,
275 credentials.CredentialsWithTokenUri,
276):
277 """Open ID Connect ID Token-based service account credentials.
278
279 These credentials relies on the default service account of a GCE instance.
280
281 ID token can be requested from `GCE metadata server identity endpoint`_, IAM
282 token endpoint or other token endpoints you specify. If metadata server
283 identity endpoint is not used, the GCE instance must have been started with
284 a service account that has access to the IAM Cloud API.
285
286 .. _GCE metadata server identity endpoint:
287 https://cloud.google.com/compute/docs/instances/verifying-instance-identity
288 """
289
290 def __init__(
291 self,
292 request,
293 target_audience,
294 token_uri=None,
295 additional_claims=None,
296 service_account_email=None,
297 signer=None,
298 use_metadata_identity_endpoint=False,
299 quota_project_id=None,
300 ):
301 """
302 Args:
303 request (google.auth.transport.Request): The object used to make
304 HTTP requests.
305 target_audience (str): The intended audience for these credentials,
306 used when requesting the ID Token. The ID Token's ``aud`` claim
307 will be set to this string.
308 token_uri (str): The OAuth 2.0 Token URI.
309 additional_claims (Mapping[str, str]): Any additional claims for
310 the JWT assertion used in the authorization grant.
311 service_account_email (str): Optional explicit service account to
312 use to sign JWT tokens.
313 By default, this is the default GCE service account.
314 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
315 In case the signer is specified, the request argument will be
316 ignored.
317 use_metadata_identity_endpoint (bool): Whether to use GCE metadata
318 identity endpoint. For backward compatibility the default value
319 is False. If set to True, ``token_uri``, ``additional_claims``,
320 ``service_account_email``, ``signer`` argument should not be set;
321 otherwise ValueError will be raised.
322 quota_project_id (Optional[str]): The project ID used for quota and
323 billing.
324
325 Raises:
326 ValueError:
327 If ``use_metadata_identity_endpoint`` is set to True, and one of
328 ``token_uri``, ``additional_claims``, ``service_account_email``,
329 ``signer`` arguments is set.
330 """
331 super(IDTokenCredentials, self).__init__()
332
333 self._quota_project_id = quota_project_id
334 self._use_metadata_identity_endpoint = use_metadata_identity_endpoint
335 self._target_audience = target_audience
336
337 if use_metadata_identity_endpoint:
338 if token_uri or additional_claims or service_account_email or signer:
339 raise ValueError(
340 "If use_metadata_identity_endpoint is set, token_uri, "
341 "additional_claims, service_account_email, signer arguments"
342 " must not be set"
343 )
344 self._token_uri = None
345 self._additional_claims = None
346 self._signer = None
347
348 if service_account_email is None:
349 sa_info = _metadata.get_service_account_info(request)
350 self._service_account_email = sa_info["email"]
351 else:
352 self._service_account_email = service_account_email
353
354 if not use_metadata_identity_endpoint:
355 if signer is None:
356 signer = iam.Signer(
357 request=request,
358 credentials=Credentials(),
359 service_account_email=self._service_account_email,
360 )
361 self._signer = signer
362 self._token_uri = token_uri or _DEFAULT_TOKEN_URI
363
364 if additional_claims is not None:
365 self._additional_claims = additional_claims
366 else:
367 self._additional_claims = {}
368
369 def with_target_audience(self, target_audience):
370 """Create a copy of these credentials with the specified target
371 audience.
372 Args:
373 target_audience (str): The intended audience for these credentials,
374 used when requesting the ID Token.
375 Returns:
376 google.auth.service_account.IDTokenCredentials: A new credentials
377 instance.
378 """
379 # since the signer is already instantiated,
380 # the request is not needed
381 if self._use_metadata_identity_endpoint:
382 return self.__class__(
383 None,
384 target_audience=target_audience,
385 use_metadata_identity_endpoint=True,
386 quota_project_id=self._quota_project_id,
387 )
388 else:
389 return self.__class__(
390 None,
391 service_account_email=self._service_account_email,
392 token_uri=self._token_uri,
393 target_audience=target_audience,
394 additional_claims=self._additional_claims.copy(),
395 signer=self.signer,
396 use_metadata_identity_endpoint=False,
397 quota_project_id=self._quota_project_id,
398 )
399
400 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
401 def with_quota_project(self, quota_project_id):
402
403 # since the signer is already instantiated,
404 # the request is not needed
405 if self._use_metadata_identity_endpoint:
406 return self.__class__(
407 None,
408 target_audience=self._target_audience,
409 use_metadata_identity_endpoint=True,
410 quota_project_id=quota_project_id,
411 )
412 else:
413 return self.__class__(
414 None,
415 service_account_email=self._service_account_email,
416 token_uri=self._token_uri,
417 target_audience=self._target_audience,
418 additional_claims=self._additional_claims.copy(),
419 signer=self.signer,
420 use_metadata_identity_endpoint=False,
421 quota_project_id=quota_project_id,
422 )
423
424 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
425 def with_token_uri(self, token_uri):
426
427 # since the signer is already instantiated,
428 # the request is not needed
429 if self._use_metadata_identity_endpoint:
430 raise ValueError(
431 "If use_metadata_identity_endpoint is set, token_uri" " must not be set"
432 )
433 else:
434 return self.__class__(
435 None,
436 service_account_email=self._service_account_email,
437 token_uri=token_uri,
438 target_audience=self._target_audience,
439 additional_claims=self._additional_claims.copy(),
440 signer=self.signer,
441 use_metadata_identity_endpoint=False,
442 quota_project_id=self.quota_project_id,
443 )
444
445 def _make_authorization_grant_assertion(self):
446 """Create the OAuth 2.0 assertion.
447 This assertion is used during the OAuth 2.0 grant to acquire an
448 ID token.
449 Returns:
450 bytes: The authorization grant assertion.
451 """
452 now = _helpers.utcnow()
453 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
454 expiry = now + lifetime
455
456 payload = {
457 "iat": _helpers.datetime_to_secs(now),
458 "exp": _helpers.datetime_to_secs(expiry),
459 # The issuer must be the service account email.
460 "iss": self.service_account_email,
461 # The audience must be the auth token endpoint's URI
462 "aud": self._token_uri,
463 # The target audience specifies which service the ID token is
464 # intended for.
465 "target_audience": self._target_audience,
466 }
467
468 payload.update(self._additional_claims)
469
470 token = jwt.encode(self._signer, payload)
471
472 return token
473
474 def _call_metadata_identity_endpoint(self, request):
475 """Request ID token from metadata identity endpoint.
476
477 Args:
478 request (google.auth.transport.Request): The object used to make
479 HTTP requests.
480
481 Returns:
482 Tuple[str, datetime.datetime]: The ID token and the expiry of the ID token.
483
484 Raises:
485 google.auth.exceptions.RefreshError: If the Compute Engine metadata
486 service can't be reached or if the instance has no credentials.
487 ValueError: If extracting expiry from the obtained ID token fails.
488 """
489 try:
490 path = "instance/service-accounts/default/identity"
491 params = {"audience": self._target_audience, "format": "full"}
492 metrics_header = {
493 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_mds()
494 }
495 id_token = _metadata.get(
496 request, path, params=params, headers=metrics_header
497 )
498 except exceptions.TransportError as caught_exc:
499 new_exc = exceptions.RefreshError(caught_exc)
500 raise new_exc from caught_exc
501
502 _, payload, _, _ = jwt._unverified_decode(id_token)
503 return id_token, datetime.datetime.utcfromtimestamp(payload["exp"])
504
505 def refresh(self, request):
506 """Refreshes the ID token.
507
508 Args:
509 request (google.auth.transport.Request): The object used to make
510 HTTP requests.
511
512 Raises:
513 google.auth.exceptions.RefreshError: If the credentials could
514 not be refreshed.
515 ValueError: If extracting expiry from the obtained ID token fails.
516 """
517 if self._use_metadata_identity_endpoint:
518 self.token, self.expiry = self._call_metadata_identity_endpoint(request)
519 else:
520 assertion = self._make_authorization_grant_assertion()
521 access_token, expiry, _ = _client.id_token_jwt_grant(
522 request, self._token_uri, assertion
523 )
524 self.token = access_token
525 self.expiry = expiry
526
527 @property # type: ignore
528 @_helpers.copy_docstring(credentials.Signing)
529 def signer(self):
530 return self._signer
531
532 def sign_bytes(self, message):
533 """Signs the given message.
534
535 Args:
536 message (bytes): The message to sign.
537
538 Returns:
539 bytes: The message's cryptographic signature.
540
541 Raises:
542 ValueError:
543 Signer is not available if metadata identity endpoint is used.
544 """
545 if self._use_metadata_identity_endpoint:
546 raise exceptions.InvalidOperation(
547 "Signer is not available if metadata identity endpoint is used"
548 )
549 return self._signer.sign(message)
550
551 @property
552 def service_account_email(self):
553 """The service account email."""
554 return self._service_account_email
555
556 @property
557 def signer_email(self):
558 return self._service_account_email