1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Google Compute Engine credentials.
16
17This module provides authentication for an application running on Google
18Compute Engine using the Compute Engine metadata server.
19
20"""
21
22import datetime
23
24from google.auth import _helpers
25from google.auth import credentials
26from google.auth import exceptions
27from google.auth import iam
28from google.auth import jwt
29from google.auth import metrics
30from google.auth.compute_engine import _metadata
31from google.oauth2 import _client
32
33
34class Credentials(
35 credentials.Scoped,
36 credentials.CredentialsWithQuotaProject,
37 credentials.CredentialsWithUniverseDomain,
38):
39 """Compute Engine Credentials.
40
41 These credentials use the Google Compute Engine metadata server to obtain
42 OAuth 2.0 access tokens associated with the instance's service account,
43 and are also used for Cloud Run, Flex and App Engine (except for the Python
44 2.7 runtime, which is supported only on older versions of this library).
45
46 For more information about Compute Engine authentication, including how
47 to configure scopes, see the `Compute Engine authentication
48 documentation`_.
49
50 .. note:: On Compute Engine the metadata server ignores requested scopes.
51 On Cloud Run, Flex and App Engine the server honours requested scopes.
52
53 .. _Compute Engine authentication documentation:
54 https://cloud.google.com/compute/docs/authentication#using
55 """
56
57 def __init__(
58 self,
59 service_account_email="default",
60 quota_project_id=None,
61 scopes=None,
62 default_scopes=None,
63 universe_domain=None,
64 ):
65 """
66 Args:
67 service_account_email (str): The service account email to use, or
68 'default'. A Compute Engine instance may have multiple service
69 accounts.
70 quota_project_id (Optional[str]): The project ID used for quota and
71 billing.
72 scopes (Optional[Sequence[str]]): The list of scopes for the credentials.
73 default_scopes (Optional[Sequence[str]]): Default scopes passed by a
74 Google client library. Use 'scopes' for user-defined scopes.
75 universe_domain (Optional[str]): The universe domain. If not
76 provided or None, credential will attempt to fetch the value
77 from metadata server. If metadata server doesn't have universe
78 domain endpoint, then the default googleapis.com will be used.
79 """
80 super(Credentials, self).__init__()
81 self._service_account_email = service_account_email
82 self._quota_project_id = quota_project_id
83 self._scopes = scopes
84 self._default_scopes = default_scopes
85 self._universe_domain_cached = False
86 if universe_domain:
87 self._universe_domain = universe_domain
88 self._universe_domain_cached = True
89
90 def _retrieve_info(self, request):
91 """Retrieve information about the service account.
92
93 Updates the scopes and retrieves the full service account email.
94
95 Args:
96 request (google.auth.transport.Request): The object used to make
97 HTTP requests.
98 """
99 info = _metadata.get_service_account_info(
100 request, service_account=self._service_account_email
101 )
102
103 self._service_account_email = info["email"]
104
105 # Don't override scopes requested by the user.
106 if self._scopes is None:
107 self._scopes = info["scopes"]
108
109 def _metric_header_for_usage(self):
110 return metrics.CRED_TYPE_SA_MDS
111
112 def refresh(self, request):
113 """Refresh the access token and scopes.
114
115 Args:
116 request (google.auth.transport.Request): The object used to make
117 HTTP requests.
118
119 Raises:
120 google.auth.exceptions.RefreshError: If the Compute Engine metadata
121 service can't be reached if if the instance has not
122 credentials.
123 """
124 scopes = self._scopes if self._scopes is not None else self._default_scopes
125 try:
126 self._retrieve_info(request)
127 # Always fetch token with default service account email.
128 self.token, self.expiry = _metadata.get_service_account_token(
129 request, service_account="default", scopes=scopes
130 )
131 except exceptions.TransportError as caught_exc:
132 new_exc = exceptions.RefreshError(caught_exc)
133 raise new_exc from caught_exc
134
135 @property
136 def service_account_email(self):
137 """The service account email.
138
139 .. note:: This is not guaranteed to be set until :meth:`refresh` has been
140 called.
141 """
142 return self._service_account_email
143
144 @property
145 def requires_scopes(self):
146 return not self._scopes
147
148 @property
149 def universe_domain(self):
150 if self._universe_domain_cached:
151 return self._universe_domain
152
153 from google.auth.transport import requests as google_auth_requests
154
155 self._universe_domain = _metadata.get_universe_domain(
156 google_auth_requests.Request()
157 )
158 self._universe_domain_cached = True
159 return self._universe_domain
160
161 @_helpers.copy_docstring(credentials.Credentials)
162 def get_cred_info(self):
163 return {
164 "credential_source": "metadata server",
165 "credential_type": "VM credentials",
166 "principal": self.service_account_email,
167 }
168
169 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
170 def with_quota_project(self, quota_project_id):
171 creds = self.__class__(
172 service_account_email=self._service_account_email,
173 quota_project_id=quota_project_id,
174 scopes=self._scopes,
175 default_scopes=self._default_scopes,
176 )
177 creds._universe_domain = self._universe_domain
178 creds._universe_domain_cached = self._universe_domain_cached
179 return creds
180
181 @_helpers.copy_docstring(credentials.Scoped)
182 def with_scopes(self, scopes, default_scopes=None):
183 # Compute Engine credentials can not be scoped (the metadata service
184 # ignores the scopes parameter). App Engine, Cloud Run and Flex support
185 # requesting scopes.
186 creds = self.__class__(
187 scopes=scopes,
188 default_scopes=default_scopes,
189 service_account_email=self._service_account_email,
190 quota_project_id=self._quota_project_id,
191 )
192 creds._universe_domain = self._universe_domain
193 creds._universe_domain_cached = self._universe_domain_cached
194 return creds
195
196 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
197 def with_universe_domain(self, universe_domain):
198 return self.__class__(
199 scopes=self._scopes,
200 default_scopes=self._default_scopes,
201 service_account_email=self._service_account_email,
202 quota_project_id=self._quota_project_id,
203 universe_domain=universe_domain,
204 )
205
206
207_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
208_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
209
210
211class IDTokenCredentials(
212 credentials.CredentialsWithQuotaProject,
213 credentials.Signing,
214 credentials.CredentialsWithTokenUri,
215):
216 """Open ID Connect ID Token-based service account credentials.
217
218 These credentials relies on the default service account of a GCE instance.
219
220 ID token can be requested from `GCE metadata server identity endpoint`_, IAM
221 token endpoint or other token endpoints you specify. If metadata server
222 identity endpoint is not used, the GCE instance must have been started with
223 a service account that has access to the IAM Cloud API.
224
225 .. _GCE metadata server identity endpoint:
226 https://cloud.google.com/compute/docs/instances/verifying-instance-identity
227 """
228
229 def __init__(
230 self,
231 request,
232 target_audience,
233 token_uri=None,
234 additional_claims=None,
235 service_account_email=None,
236 signer=None,
237 use_metadata_identity_endpoint=False,
238 quota_project_id=None,
239 ):
240 """
241 Args:
242 request (google.auth.transport.Request): The object used to make
243 HTTP requests.
244 target_audience (str): The intended audience for these credentials,
245 used when requesting the ID Token. The ID Token's ``aud`` claim
246 will be set to this string.
247 token_uri (str): The OAuth 2.0 Token URI.
248 additional_claims (Mapping[str, str]): Any additional claims for
249 the JWT assertion used in the authorization grant.
250 service_account_email (str): Optional explicit service account to
251 use to sign JWT tokens.
252 By default, this is the default GCE service account.
253 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
254 In case the signer is specified, the request argument will be
255 ignored.
256 use_metadata_identity_endpoint (bool): Whether to use GCE metadata
257 identity endpoint. For backward compatibility the default value
258 is False. If set to True, ``token_uri``, ``additional_claims``,
259 ``service_account_email``, ``signer`` argument should not be set;
260 otherwise ValueError will be raised.
261 quota_project_id (Optional[str]): The project ID used for quota and
262 billing.
263
264 Raises:
265 ValueError:
266 If ``use_metadata_identity_endpoint`` is set to True, and one of
267 ``token_uri``, ``additional_claims``, ``service_account_email``,
268 ``signer`` arguments is set.
269 """
270 super(IDTokenCredentials, self).__init__()
271
272 self._quota_project_id = quota_project_id
273 self._use_metadata_identity_endpoint = use_metadata_identity_endpoint
274 self._target_audience = target_audience
275
276 if use_metadata_identity_endpoint:
277 if token_uri or additional_claims or service_account_email or signer:
278 raise exceptions.MalformedError(
279 "If use_metadata_identity_endpoint is set, token_uri, "
280 "additional_claims, service_account_email, signer arguments"
281 " must not be set"
282 )
283 self._token_uri = None
284 self._additional_claims = None
285 self._signer = None
286
287 if service_account_email is None:
288 sa_info = _metadata.get_service_account_info(request)
289 self._service_account_email = sa_info["email"]
290 else:
291 self._service_account_email = service_account_email
292
293 if not use_metadata_identity_endpoint:
294 if signer is None:
295 signer = iam.Signer(
296 request=request,
297 credentials=Credentials(),
298 service_account_email=self._service_account_email,
299 )
300 self._signer = signer
301 self._token_uri = token_uri or _DEFAULT_TOKEN_URI
302
303 if additional_claims is not None:
304 self._additional_claims = additional_claims
305 else:
306 self._additional_claims = {}
307
308 def with_target_audience(self, target_audience):
309 """Create a copy of these credentials with the specified target
310 audience.
311 Args:
312 target_audience (str): The intended audience for these credentials,
313 used when requesting the ID Token.
314 Returns:
315 google.auth.service_account.IDTokenCredentials: A new credentials
316 instance.
317 """
318 # since the signer is already instantiated,
319 # the request is not needed
320 if self._use_metadata_identity_endpoint:
321 return self.__class__(
322 None,
323 target_audience=target_audience,
324 use_metadata_identity_endpoint=True,
325 quota_project_id=self._quota_project_id,
326 )
327 else:
328 return self.__class__(
329 None,
330 service_account_email=self._service_account_email,
331 token_uri=self._token_uri,
332 target_audience=target_audience,
333 additional_claims=self._additional_claims.copy(),
334 signer=self.signer,
335 use_metadata_identity_endpoint=False,
336 quota_project_id=self._quota_project_id,
337 )
338
339 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
340 def with_quota_project(self, quota_project_id):
341
342 # since the signer is already instantiated,
343 # the request is not needed
344 if self._use_metadata_identity_endpoint:
345 return self.__class__(
346 None,
347 target_audience=self._target_audience,
348 use_metadata_identity_endpoint=True,
349 quota_project_id=quota_project_id,
350 )
351 else:
352 return self.__class__(
353 None,
354 service_account_email=self._service_account_email,
355 token_uri=self._token_uri,
356 target_audience=self._target_audience,
357 additional_claims=self._additional_claims.copy(),
358 signer=self.signer,
359 use_metadata_identity_endpoint=False,
360 quota_project_id=quota_project_id,
361 )
362
363 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
364 def with_token_uri(self, token_uri):
365
366 # since the signer is already instantiated,
367 # the request is not needed
368 if self._use_metadata_identity_endpoint:
369 raise exceptions.MalformedError(
370 "If use_metadata_identity_endpoint is set, token_uri" " must not be set"
371 )
372 else:
373 return self.__class__(
374 None,
375 service_account_email=self._service_account_email,
376 token_uri=token_uri,
377 target_audience=self._target_audience,
378 additional_claims=self._additional_claims.copy(),
379 signer=self.signer,
380 use_metadata_identity_endpoint=False,
381 quota_project_id=self.quota_project_id,
382 )
383
384 def _make_authorization_grant_assertion(self):
385 """Create the OAuth 2.0 assertion.
386 This assertion is used during the OAuth 2.0 grant to acquire an
387 ID token.
388 Returns:
389 bytes: The authorization grant assertion.
390 """
391 now = _helpers.utcnow()
392 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
393 expiry = now + lifetime
394
395 payload = {
396 "iat": _helpers.datetime_to_secs(now),
397 "exp": _helpers.datetime_to_secs(expiry),
398 # The issuer must be the service account email.
399 "iss": self.service_account_email,
400 # The audience must be the auth token endpoint's URI
401 "aud": self._token_uri,
402 # The target audience specifies which service the ID token is
403 # intended for.
404 "target_audience": self._target_audience,
405 }
406
407 payload.update(self._additional_claims)
408
409 token = jwt.encode(self._signer, payload)
410
411 return token
412
413 def _call_metadata_identity_endpoint(self, request):
414 """Request ID token from metadata identity endpoint.
415
416 Args:
417 request (google.auth.transport.Request): The object used to make
418 HTTP requests.
419
420 Returns:
421 Tuple[str, datetime.datetime]: The ID token and the expiry of the ID token.
422
423 Raises:
424 google.auth.exceptions.RefreshError: If the Compute Engine metadata
425 service can't be reached or if the instance has no credentials.
426 ValueError: If extracting expiry from the obtained ID token fails.
427 """
428 try:
429 path = "instance/service-accounts/default/identity"
430 params = {"audience": self._target_audience, "format": "full"}
431 metrics_header = {
432 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_mds()
433 }
434 id_token = _metadata.get(
435 request, path, params=params, headers=metrics_header
436 )
437 except exceptions.TransportError as caught_exc:
438 new_exc = exceptions.RefreshError(caught_exc)
439 raise new_exc from caught_exc
440
441 _, payload, _, _ = jwt._unverified_decode(id_token)
442 return id_token, datetime.datetime.utcfromtimestamp(payload["exp"])
443
444 def refresh(self, request):
445 """Refreshes the ID token.
446
447 Args:
448 request (google.auth.transport.Request): The object used to make
449 HTTP requests.
450
451 Raises:
452 google.auth.exceptions.RefreshError: If the credentials could
453 not be refreshed.
454 ValueError: If extracting expiry from the obtained ID token fails.
455 """
456 if self._use_metadata_identity_endpoint:
457 self.token, self.expiry = self._call_metadata_identity_endpoint(request)
458 else:
459 assertion = self._make_authorization_grant_assertion()
460 access_token, expiry, _ = _client.id_token_jwt_grant(
461 request, self._token_uri, assertion
462 )
463 self.token = access_token
464 self.expiry = expiry
465
466 @property # type: ignore
467 @_helpers.copy_docstring(credentials.Signing)
468 def signer(self):
469 return self._signer
470
471 def sign_bytes(self, message):
472 """Signs the given message.
473
474 Args:
475 message (bytes): The message to sign.
476
477 Returns:
478 bytes: The message's cryptographic signature.
479
480 Raises:
481 ValueError:
482 Signer is not available if metadata identity endpoint is used.
483 """
484 if self._use_metadata_identity_endpoint:
485 raise exceptions.InvalidOperation(
486 "Signer is not available if metadata identity endpoint is used"
487 )
488 return self._signer.sign(message)
489
490 @property
491 def service_account_email(self):
492 """The service account email."""
493 return self._service_account_email
494
495 @property
496 def signer_email(self):
497 return self._service_account_email