1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Google Compute Engine credentials.
16
17This module provides authentication for an application running on Google
18Compute Engine using the Compute Engine metadata server.
19
20"""
21
22import datetime
23
24from google.auth import _helpers
25from google.auth import credentials
26from google.auth import exceptions
27from google.auth import iam
28from google.auth import jwt
29from google.auth import metrics
30from google.auth.compute_engine import _metadata
31from google.oauth2 import _client
32
33
34class Credentials(
35 credentials.Scoped,
36 credentials.CredentialsWithQuotaProject,
37 credentials.CredentialsWithUniverseDomain,
38):
39 """Compute Engine Credentials.
40
41 These credentials use the Google Compute Engine metadata server to obtain
42 OAuth 2.0 access tokens associated with the instance's service account,
43 and are also used for Cloud Run, Flex and App Engine (except for the Python
44 2.7 runtime, which is supported only on older versions of this library).
45
46 For more information about Compute Engine authentication, including how
47 to configure scopes, see the `Compute Engine authentication
48 documentation`_.
49
50 .. note:: On Compute Engine the metadata server ignores requested scopes.
51 On Cloud Run, Flex and App Engine the server honours requested scopes.
52
53 .. _Compute Engine authentication documentation:
54 https://cloud.google.com/compute/docs/authentication#using
55 """
56
57 def __init__(
58 self,
59 service_account_email="default",
60 quota_project_id=None,
61 scopes=None,
62 default_scopes=None,
63 universe_domain=None,
64 ):
65 """
66 Args:
67 service_account_email (str): The service account email to use, or
68 'default'. A Compute Engine instance may have multiple service
69 accounts.
70 quota_project_id (Optional[str]): The project ID used for quota and
71 billing.
72 scopes (Optional[Sequence[str]]): The list of scopes for the credentials.
73 default_scopes (Optional[Sequence[str]]): Default scopes passed by a
74 Google client library. Use 'scopes' for user-defined scopes.
75 universe_domain (Optional[str]): The universe domain. If not
76 provided or None, credential will attempt to fetch the value
77 from metadata server. If metadata server doesn't have universe
78 domain endpoint, then the default googleapis.com will be used.
79 """
80 super(Credentials, self).__init__()
81 self._service_account_email = service_account_email
82 self._quota_project_id = quota_project_id
83 self._scopes = scopes
84 self._default_scopes = default_scopes
85 self._universe_domain_cached = False
86 if universe_domain:
87 self._universe_domain = universe_domain
88 self._universe_domain_cached = True
89
90 def _retrieve_info(self, request):
91 """Retrieve information about the service account.
92
93 Updates the scopes and retrieves the full service account email.
94
95 Args:
96 request (google.auth.transport.Request): The object used to make
97 HTTP requests.
98 """
99 info = _metadata.get_service_account_info(
100 request, service_account=self._service_account_email
101 )
102
103 self._service_account_email = info["email"]
104
105 # Don't override scopes requested by the user.
106 if self._scopes is None:
107 self._scopes = info["scopes"]
108
109 def _metric_header_for_usage(self):
110 return metrics.CRED_TYPE_SA_MDS
111
112 def refresh(self, request):
113 """Refresh the access token and scopes.
114
115 Args:
116 request (google.auth.transport.Request): The object used to make
117 HTTP requests.
118
119 Raises:
120 google.auth.exceptions.RefreshError: If the Compute Engine metadata
121 service can't be reached if if the instance has not
122 credentials.
123 """
124 scopes = self._scopes if self._scopes is not None else self._default_scopes
125 try:
126 self._retrieve_info(request)
127 self.token, self.expiry = _metadata.get_service_account_token(
128 request, service_account=self._service_account_email, scopes=scopes
129 )
130 except exceptions.TransportError as caught_exc:
131 new_exc = exceptions.RefreshError(caught_exc)
132 raise new_exc from caught_exc
133
134 @property
135 def service_account_email(self):
136 """The service account email.
137
138 .. note:: This is not guaranteed to be set until :meth:`refresh` has been
139 called.
140 """
141 return self._service_account_email
142
143 @property
144 def requires_scopes(self):
145 return not self._scopes
146
147 @property
148 def universe_domain(self):
149 if self._universe_domain_cached:
150 return self._universe_domain
151
152 from google.auth.transport import requests as google_auth_requests
153
154 self._universe_domain = _metadata.get_universe_domain(
155 google_auth_requests.Request()
156 )
157 self._universe_domain_cached = True
158 return self._universe_domain
159
160 @_helpers.copy_docstring(credentials.Credentials)
161 def get_cred_info(self):
162 return {
163 "credential_source": "metadata server",
164 "credential_type": "VM credentials",
165 "principal": self.service_account_email,
166 }
167
168 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
169 def with_quota_project(self, quota_project_id):
170 creds = self.__class__(
171 service_account_email=self._service_account_email,
172 quota_project_id=quota_project_id,
173 scopes=self._scopes,
174 default_scopes=self._default_scopes,
175 )
176 creds._universe_domain = self._universe_domain
177 creds._universe_domain_cached = self._universe_domain_cached
178 return creds
179
180 @_helpers.copy_docstring(credentials.Scoped)
181 def with_scopes(self, scopes, default_scopes=None):
182 # Compute Engine credentials can not be scoped (the metadata service
183 # ignores the scopes parameter). App Engine, Cloud Run and Flex support
184 # requesting scopes.
185 creds = self.__class__(
186 scopes=scopes,
187 default_scopes=default_scopes,
188 service_account_email=self._service_account_email,
189 quota_project_id=self._quota_project_id,
190 )
191 creds._universe_domain = self._universe_domain
192 creds._universe_domain_cached = self._universe_domain_cached
193 return creds
194
195 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
196 def with_universe_domain(self, universe_domain):
197 return self.__class__(
198 scopes=self._scopes,
199 default_scopes=self._default_scopes,
200 service_account_email=self._service_account_email,
201 quota_project_id=self._quota_project_id,
202 universe_domain=universe_domain,
203 )
204
205
206_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
207_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
208
209
210class IDTokenCredentials(
211 credentials.CredentialsWithQuotaProject,
212 credentials.Signing,
213 credentials.CredentialsWithTokenUri,
214):
215 """Open ID Connect ID Token-based service account credentials.
216
217 These credentials relies on the default service account of a GCE instance.
218
219 ID token can be requested from `GCE metadata server identity endpoint`_, IAM
220 token endpoint or other token endpoints you specify. If metadata server
221 identity endpoint is not used, the GCE instance must have been started with
222 a service account that has access to the IAM Cloud API.
223
224 .. _GCE metadata server identity endpoint:
225 https://cloud.google.com/compute/docs/instances/verifying-instance-identity
226 """
227
228 def __init__(
229 self,
230 request,
231 target_audience,
232 token_uri=None,
233 additional_claims=None,
234 service_account_email=None,
235 signer=None,
236 use_metadata_identity_endpoint=False,
237 quota_project_id=None,
238 ):
239 """
240 Args:
241 request (google.auth.transport.Request): The object used to make
242 HTTP requests.
243 target_audience (str): The intended audience for these credentials,
244 used when requesting the ID Token. The ID Token's ``aud`` claim
245 will be set to this string.
246 token_uri (str): The OAuth 2.0 Token URI.
247 additional_claims (Mapping[str, str]): Any additional claims for
248 the JWT assertion used in the authorization grant.
249 service_account_email (str): Optional explicit service account to
250 use to sign JWT tokens.
251 By default, this is the default GCE service account.
252 signer (google.auth.crypt.Signer): The signer used to sign JWTs.
253 In case the signer is specified, the request argument will be
254 ignored.
255 use_metadata_identity_endpoint (bool): Whether to use GCE metadata
256 identity endpoint. For backward compatibility the default value
257 is False. If set to True, ``token_uri``, ``additional_claims``,
258 ``service_account_email``, ``signer`` argument should not be set;
259 otherwise ValueError will be raised.
260 quota_project_id (Optional[str]): The project ID used for quota and
261 billing.
262
263 Raises:
264 ValueError:
265 If ``use_metadata_identity_endpoint`` is set to True, and one of
266 ``token_uri``, ``additional_claims``, ``service_account_email``,
267 ``signer`` arguments is set.
268 """
269 super(IDTokenCredentials, self).__init__()
270
271 self._quota_project_id = quota_project_id
272 self._use_metadata_identity_endpoint = use_metadata_identity_endpoint
273 self._target_audience = target_audience
274
275 if use_metadata_identity_endpoint:
276 if token_uri or additional_claims or service_account_email or signer:
277 raise exceptions.MalformedError(
278 "If use_metadata_identity_endpoint is set, token_uri, "
279 "additional_claims, service_account_email, signer arguments"
280 " must not be set"
281 )
282 self._token_uri = None
283 self._additional_claims = None
284 self._signer = None
285
286 if service_account_email is None:
287 sa_info = _metadata.get_service_account_info(request)
288 self._service_account_email = sa_info["email"]
289 else:
290 self._service_account_email = service_account_email
291
292 if not use_metadata_identity_endpoint:
293 if signer is None:
294 signer = iam.Signer(
295 request=request,
296 credentials=Credentials(),
297 service_account_email=self._service_account_email,
298 )
299 self._signer = signer
300 self._token_uri = token_uri or _DEFAULT_TOKEN_URI
301
302 if additional_claims is not None:
303 self._additional_claims = additional_claims
304 else:
305 self._additional_claims = {}
306
307 def with_target_audience(self, target_audience):
308 """Create a copy of these credentials with the specified target
309 audience.
310 Args:
311 target_audience (str): The intended audience for these credentials,
312 used when requesting the ID Token.
313 Returns:
314 google.auth.service_account.IDTokenCredentials: A new credentials
315 instance.
316 """
317 # since the signer is already instantiated,
318 # the request is not needed
319 if self._use_metadata_identity_endpoint:
320 return self.__class__(
321 None,
322 target_audience=target_audience,
323 use_metadata_identity_endpoint=True,
324 quota_project_id=self._quota_project_id,
325 )
326 else:
327 return self.__class__(
328 None,
329 service_account_email=self._service_account_email,
330 token_uri=self._token_uri,
331 target_audience=target_audience,
332 additional_claims=self._additional_claims.copy(),
333 signer=self.signer,
334 use_metadata_identity_endpoint=False,
335 quota_project_id=self._quota_project_id,
336 )
337
338 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
339 def with_quota_project(self, quota_project_id):
340
341 # since the signer is already instantiated,
342 # the request is not needed
343 if self._use_metadata_identity_endpoint:
344 return self.__class__(
345 None,
346 target_audience=self._target_audience,
347 use_metadata_identity_endpoint=True,
348 quota_project_id=quota_project_id,
349 )
350 else:
351 return self.__class__(
352 None,
353 service_account_email=self._service_account_email,
354 token_uri=self._token_uri,
355 target_audience=self._target_audience,
356 additional_claims=self._additional_claims.copy(),
357 signer=self.signer,
358 use_metadata_identity_endpoint=False,
359 quota_project_id=quota_project_id,
360 )
361
362 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
363 def with_token_uri(self, token_uri):
364
365 # since the signer is already instantiated,
366 # the request is not needed
367 if self._use_metadata_identity_endpoint:
368 raise exceptions.MalformedError(
369 "If use_metadata_identity_endpoint is set, token_uri" " must not be set"
370 )
371 else:
372 return self.__class__(
373 None,
374 service_account_email=self._service_account_email,
375 token_uri=token_uri,
376 target_audience=self._target_audience,
377 additional_claims=self._additional_claims.copy(),
378 signer=self.signer,
379 use_metadata_identity_endpoint=False,
380 quota_project_id=self.quota_project_id,
381 )
382
383 def _make_authorization_grant_assertion(self):
384 """Create the OAuth 2.0 assertion.
385 This assertion is used during the OAuth 2.0 grant to acquire an
386 ID token.
387 Returns:
388 bytes: The authorization grant assertion.
389 """
390 now = _helpers.utcnow()
391 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
392 expiry = now + lifetime
393
394 payload = {
395 "iat": _helpers.datetime_to_secs(now),
396 "exp": _helpers.datetime_to_secs(expiry),
397 # The issuer must be the service account email.
398 "iss": self.service_account_email,
399 # The audience must be the auth token endpoint's URI
400 "aud": self._token_uri,
401 # The target audience specifies which service the ID token is
402 # intended for.
403 "target_audience": self._target_audience,
404 }
405
406 payload.update(self._additional_claims)
407
408 token = jwt.encode(self._signer, payload)
409
410 return token
411
412 def _call_metadata_identity_endpoint(self, request):
413 """Request ID token from metadata identity endpoint.
414
415 Args:
416 request (google.auth.transport.Request): The object used to make
417 HTTP requests.
418
419 Returns:
420 Tuple[str, datetime.datetime]: The ID token and the expiry of the ID token.
421
422 Raises:
423 google.auth.exceptions.RefreshError: If the Compute Engine metadata
424 service can't be reached or if the instance has no credentials.
425 ValueError: If extracting expiry from the obtained ID token fails.
426 """
427 try:
428 path = "instance/service-accounts/default/identity"
429 params = {"audience": self._target_audience, "format": "full"}
430 metrics_header = {
431 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_mds()
432 }
433 id_token = _metadata.get(
434 request, path, params=params, headers=metrics_header
435 )
436 except exceptions.TransportError as caught_exc:
437 new_exc = exceptions.RefreshError(caught_exc)
438 raise new_exc from caught_exc
439
440 _, payload, _, _ = jwt._unverified_decode(id_token)
441 return id_token, datetime.datetime.utcfromtimestamp(payload["exp"])
442
443 def refresh(self, request):
444 """Refreshes the ID token.
445
446 Args:
447 request (google.auth.transport.Request): The object used to make
448 HTTP requests.
449
450 Raises:
451 google.auth.exceptions.RefreshError: If the credentials could
452 not be refreshed.
453 ValueError: If extracting expiry from the obtained ID token fails.
454 """
455 if self._use_metadata_identity_endpoint:
456 self.token, self.expiry = self._call_metadata_identity_endpoint(request)
457 else:
458 assertion = self._make_authorization_grant_assertion()
459 access_token, expiry, _ = _client.id_token_jwt_grant(
460 request, self._token_uri, assertion
461 )
462 self.token = access_token
463 self.expiry = expiry
464
465 @property # type: ignore
466 @_helpers.copy_docstring(credentials.Signing)
467 def signer(self):
468 return self._signer
469
470 def sign_bytes(self, message):
471 """Signs the given message.
472
473 Args:
474 message (bytes): The message to sign.
475
476 Returns:
477 bytes: The message's cryptographic signature.
478
479 Raises:
480 ValueError:
481 Signer is not available if metadata identity endpoint is used.
482 """
483 if self._use_metadata_identity_endpoint:
484 raise exceptions.InvalidOperation(
485 "Signer is not available if metadata identity endpoint is used"
486 )
487 return self._signer.sign(message)
488
489 @property
490 def service_account_email(self):
491 """The service account email."""
492 return self._service_account_email
493
494 @property
495 def signer_email(self):
496 return self._service_account_email