1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""External Account Authorized User Credentials.
16This module provides credentials based on OAuth 2.0 access and refresh tokens.
17These credentials usually access resources on behalf of a user (resource
18owner).
19
20Specifically, these are sourced using external identities via Workforce Identity Federation.
21
22Obtaining the initial access and refresh token can be done through the Google Cloud CLI.
23
24Example credential:
25{
26 "type": "external_account_authorized_user",
27 "audience": "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID",
28 "refresh_token": "refreshToken",
29 "token_url": "https://sts.googleapis.com/v1/oauth/token",
30 "token_info_url": "https://sts.googleapis.com/v1/instrospect",
31 "client_id": "clientId",
32 "client_secret": "clientSecret"
33}
34"""
35
36import datetime
37import io
38import json
39import re
40
41from google.auth import _constants
42from google.auth import _helpers
43from google.auth import credentials
44from google.auth import exceptions
45from google.oauth2 import sts
46from google.oauth2 import utils
47
48_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE = "external_account_authorized_user"
49
50
51class Credentials(
52 credentials.CredentialsWithQuotaProject,
53 credentials.ReadOnlyScoped,
54 credentials.CredentialsWithTokenUri,
55 credentials.CredentialsWithTrustBoundary,
56):
57 """Credentials for External Account Authorized Users.
58
59 This is used to instantiate Credentials for exchanging refresh tokens from
60 authorized users for Google access token and authorizing requests to Google
61 APIs.
62
63 The credentials are considered immutable. If you want to modify the
64 quota project, use `with_quota_project` and if you want to modify the token
65 uri, use `with_token_uri`.
66
67 **IMPORTANT**:
68 This class does not validate the credential configuration. A security
69 risk occurs when a credential configuration configured with malicious urls
70 is used.
71 When the credential configuration is accepted from an
72 untrusted source, you should validate it before using.
73 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
74 """
75
76 def __init__(
77 self,
78 token=None,
79 expiry=None,
80 refresh_token=None,
81 audience=None,
82 client_id=None,
83 client_secret=None,
84 token_url=None,
85 token_info_url=None,
86 revoke_url=None,
87 scopes=None,
88 quota_project_id=None,
89 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
90 trust_boundary=None,
91 ):
92 """Instantiates a external account authorized user credentials object.
93
94 Args:
95 token (str): The OAuth 2.0 access token. Can be None if refresh information
96 is provided.
97 expiry (datetime.datetime): The optional expiration datetime of the OAuth 2.0 access
98 token.
99 refresh_token (str): The optional OAuth 2.0 refresh token. If specified,
100 credentials can be refreshed.
101 audience (str): The optional STS audience which contains the resource name for the workforce
102 pool and the provider identifier in that pool.
103 client_id (str): The OAuth 2.0 client ID. Must be specified for refresh, can be left as
104 None if the token can not be refreshed.
105 client_secret (str): The OAuth 2.0 client secret. Must be specified for refresh, can be
106 left as None if the token can not be refreshed.
107 token_url (str): The optional STS token exchange endpoint for refresh. Must be specified for
108 refresh, can be left as None if the token can not be refreshed.
109 token_info_url (str): The optional STS endpoint URL for token introspection.
110 revoke_url (str): The optional STS endpoint URL for revoking tokens.
111 quota_project_id (str): The optional project ID used for quota and billing.
112 This project may be different from the project used to
113 create the credentials.
114 universe_domain (Optional[str]): The universe domain. The default value
115 is googleapis.com.
116 trust_boundary (Mapping[str,str]): A credential trust boundary.
117
118 Returns:
119 google.auth.external_account_authorized_user.Credentials: The
120 constructed credentials.
121 """
122 super(Credentials, self).__init__()
123
124 self.token = token
125 self.expiry = expiry
126 self._audience = audience
127 self._refresh_token = refresh_token
128 self._token_url = token_url
129 self._token_info_url = token_info_url
130 self._client_id = client_id
131 self._client_secret = client_secret
132 self._revoke_url = revoke_url
133 self._quota_project_id = quota_project_id
134 self._scopes = scopes
135 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
136 self._cred_file_path = None
137 self._trust_boundary = trust_boundary
138
139 if not self.valid and not self.can_refresh:
140 raise exceptions.InvalidOperation(
141 "Token should be created with fields to make it valid (`token` and "
142 "`expiry`), or fields to allow it to refresh (`refresh_token`, "
143 "`token_url`, `client_id`, `client_secret`)."
144 )
145
146 self._client_auth = None
147 if self._client_id:
148 self._client_auth = utils.ClientAuthentication(
149 utils.ClientAuthType.basic, self._client_id, self._client_secret
150 )
151 self._sts_client = sts.Client(self._token_url, self._client_auth)
152
153 @property
154 def info(self):
155 """Generates the serializable dictionary representation of the current
156 credentials.
157
158 Returns:
159 Mapping: The dictionary representation of the credentials. This is the
160 reverse of the "from_info" method defined in this class. It is
161 useful for serializing the current credentials so it can deserialized
162 later.
163 """
164 config_info = self.constructor_args()
165 config_info.update(type=_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE)
166 if config_info["expiry"]:
167 config_info["expiry"] = config_info["expiry"].isoformat() + "Z"
168
169 return {key: value for key, value in config_info.items() if value is not None}
170
171 def constructor_args(self):
172 return {
173 "audience": self._audience,
174 "refresh_token": self._refresh_token,
175 "token_url": self._token_url,
176 "token_info_url": self._token_info_url,
177 "client_id": self._client_id,
178 "client_secret": self._client_secret,
179 "token": self.token,
180 "expiry": self.expiry,
181 "revoke_url": self._revoke_url,
182 "scopes": self._scopes,
183 "quota_project_id": self._quota_project_id,
184 "universe_domain": self._universe_domain,
185 "trust_boundary": self._trust_boundary,
186 }
187
188 @property
189 def scopes(self):
190 """Optional[str]: The OAuth 2.0 permission scopes."""
191 return self._scopes
192
193 @property
194 def requires_scopes(self):
195 """False: OAuth 2.0 credentials have their scopes set when
196 the initial token is requested and can not be changed."""
197 return False
198
199 @property
200 def client_id(self):
201 """Optional[str]: The OAuth 2.0 client ID."""
202 return self._client_id
203
204 @property
205 def client_secret(self):
206 """Optional[str]: The OAuth 2.0 client secret."""
207 return self._client_secret
208
209 @property
210 def audience(self):
211 """Optional[str]: The STS audience which contains the resource name for the
212 workforce pool and the provider identifier in that pool."""
213 return self._audience
214
215 @property
216 def refresh_token(self):
217 """Optional[str]: The OAuth 2.0 refresh token."""
218 return self._refresh_token
219
220 @property
221 def token_url(self):
222 """Optional[str]: The STS token exchange endpoint for refresh."""
223 return self._token_url
224
225 @property
226 def token_info_url(self):
227 """Optional[str]: The STS endpoint for token info."""
228 return self._token_info_url
229
230 @property
231 def revoke_url(self):
232 """Optional[str]: The STS endpoint for token revocation."""
233 return self._revoke_url
234
235 @property
236 def is_user(self):
237 """True: This credential always represents a user."""
238 return True
239
240 @property
241 def can_refresh(self):
242 return all(
243 (
244 self._refresh_token,
245 self._token_url,
246 self._client_id,
247 self._client_secret,
248 )
249 )
250
251 def get_project_id(self, request=None):
252 """Retrieves the project ID corresponding to the workload identity or workforce pool.
253 For workforce pool credentials, it returns the project ID corresponding to
254 the workforce_pool_user_project.
255
256 When not determinable, None is returned.
257
258 Args:
259 request (google.auth.transport.requests.Request): Request object.
260 Unused here, but passed from _default.default().
261
262 Return:
263 str: project ID is not determinable for this credential type so it returns None
264 """
265
266 return None
267
268 def to_json(self, strip=None):
269 """Utility function that creates a JSON representation of this
270 credential.
271 Args:
272 strip (Sequence[str]): Optional list of members to exclude from the
273 generated JSON.
274 Returns:
275 str: A JSON representation of this instance. When converted into
276 a dictionary, it can be passed to from_info()
277 to create a new instance.
278 """
279 strip = strip if strip else []
280 return json.dumps({k: v for (k, v) in self.info.items() if k not in strip})
281
282 def _perform_refresh_token(self, request):
283 """Refreshes the access token.
284
285 Args:
286 request (google.auth.transport.Request): The object used to make
287 HTTP requests.
288
289 Raises:
290 google.auth.exceptions.RefreshError: If the credentials could
291 not be refreshed.
292 """
293 if not self.can_refresh:
294 raise exceptions.RefreshError(
295 "The credentials do not contain the necessary fields need to "
296 "refresh the access token. You must specify refresh_token, "
297 "token_url, client_id, and client_secret."
298 )
299
300 now = _helpers.utcnow()
301 response_data = self._sts_client.refresh_token(request, self._refresh_token)
302
303 self.token = response_data.get("access_token")
304
305 lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
306 self.expiry = now + lifetime
307
308 if "refresh_token" in response_data:
309 self._refresh_token = response_data["refresh_token"]
310
311 def _build_trust_boundary_lookup_url(self):
312 """Builds and returns the URL for the trust boundary lookup API."""
313 # Audience format: //iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID
314 match = re.search(r"locations/[^/]+/workforcePools/([^/]+)", self._audience)
315
316 if not match:
317 raise exceptions.InvalidValue("Invalid workforce pool audience format.")
318
319 pool_id = match.groups()[0]
320
321 return _constants._WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
322 universe_domain=self._universe_domain, pool_id=pool_id
323 )
324
325 def revoke(self, request):
326 """Revokes the refresh token.
327
328 Args:
329 request (google.auth.transport.Request): The object used to make
330 HTTP requests.
331
332 Raises:
333 google.auth.exceptions.OAuthError: If the token could not be
334 revoked.
335 """
336 if not self._revoke_url or not self._refresh_token:
337 raise exceptions.OAuthError(
338 "The credentials do not contain the necessary fields to "
339 "revoke the refresh token. You must specify revoke_url and "
340 "refresh_token."
341 )
342
343 self._sts_client.revoke_token(
344 request, self._refresh_token, "refresh_token", self._revoke_url
345 )
346 self.token = None
347 self._refresh_token = None
348
349 @_helpers.copy_docstring(credentials.Credentials)
350 def get_cred_info(self):
351 if self._cred_file_path:
352 return {
353 "credential_source": self._cred_file_path,
354 "credential_type": "external account authorized user credentials",
355 }
356 return None
357
358 def _make_copy(self):
359 kwargs = self.constructor_args()
360 cred = self.__class__(**kwargs)
361 cred._cred_file_path = self._cred_file_path
362 return cred
363
364 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
365 def with_quota_project(self, quota_project_id):
366 cred = self._make_copy()
367 cred._quota_project_id = quota_project_id
368 return cred
369
370 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
371 def with_token_uri(self, token_uri):
372 cred = self._make_copy()
373 cred._token_url = token_uri
374 return cred
375
376 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
377 def with_universe_domain(self, universe_domain):
378 cred = self._make_copy()
379 cred._universe_domain = universe_domain
380 return cred
381
382 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
383 def with_trust_boundary(self, trust_boundary):
384 cred = self._make_copy()
385 cred._trust_boundary = trust_boundary
386 return cred
387
388 @classmethod
389 def from_info(cls, info, **kwargs):
390 """Creates a Credentials instance from parsed external account info.
391
392 **IMPORTANT**:
393 This method does not validate the credential configuration. A security
394 risk occurs when a credential configuration configured with malicious urls
395 is used.
396 When the credential configuration is accepted from an
397 untrusted source, you should validate it before using with this method.
398 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
399
400 Args:
401 info (Mapping[str, str]): The external account info in Google
402 format.
403 kwargs: Additional arguments to pass to the constructor.
404
405 Returns:
406 google.auth.external_account_authorized_user.Credentials: The
407 constructed credentials.
408
409 Raises:
410 ValueError: For invalid parameters.
411 """
412 expiry = info.get("expiry")
413 if expiry:
414 expiry = datetime.datetime.strptime(
415 expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S"
416 )
417 return cls(
418 audience=info.get("audience"),
419 refresh_token=info.get("refresh_token"),
420 token_url=info.get("token_url"),
421 token_info_url=info.get("token_info_url"),
422 client_id=info.get("client_id"),
423 client_secret=info.get("client_secret"),
424 token=info.get("token"),
425 expiry=expiry,
426 revoke_url=info.get("revoke_url"),
427 quota_project_id=info.get("quota_project_id"),
428 scopes=info.get("scopes"),
429 universe_domain=info.get(
430 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
431 ),
432 trust_boundary=info.get("trust_boundary"),
433 **kwargs
434 )
435
436 @classmethod
437 def from_file(cls, filename, **kwargs):
438 """Creates a Credentials instance from an external account json file.
439
440 **IMPORTANT**:
441 This method does not validate the credential configuration. A security
442 risk occurs when a credential configuration configured with malicious urls
443 is used.
444 When the credential configuration is accepted from an
445 untrusted source, you should validate it before using with this method.
446 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
447
448 Args:
449 filename (str): The path to the external account json file.
450 kwargs: Additional arguments to pass to the constructor.
451
452 Returns:
453 google.auth.external_account_authorized_user.Credentials: The
454 constructed credentials.
455 """
456 with io.open(filename, "r", encoding="utf-8") as json_file:
457 data = json.load(json_file)
458 return cls.from_info(data, **kwargs)