1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""External Account Authorized User Credentials.
16This module provides credentials based on OAuth 2.0 access and refresh tokens.
17These credentials usually access resources on behalf of a user (resource
18owner).
19
20Specifically, these are sourced using external identities via Workforce Identity Federation.
21
22Obtaining the initial access and refresh token can be done through the Google Cloud CLI.
23
24Example credential:
25{
26 "type": "external_account_authorized_user",
27 "audience": "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID",
28 "refresh_token": "refreshToken",
29 "token_url": "https://sts.googleapis.com/v1/oauth/token",
30 "token_info_url": "https://sts.googleapis.com/v1/instrospect",
31 "client_id": "clientId",
32 "client_secret": "clientSecret"
33}
34"""
35
36import datetime
37import io
38import json
39
40from google.auth import _helpers
41from google.auth import credentials
42from google.auth import exceptions
43from google.oauth2 import sts
44from google.oauth2 import utils
45
46_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE = "external_account_authorized_user"
47
48
49class Credentials(
50 credentials.CredentialsWithQuotaProject,
51 credentials.ReadOnlyScoped,
52 credentials.CredentialsWithTokenUri,
53):
54 """Credentials for External Account Authorized Users.
55
56 This is used to instantiate Credentials for exchanging refresh tokens from
57 authorized users for Google access token and authorizing requests to Google
58 APIs.
59
60 The credentials are considered immutable. If you want to modify the
61 quota project, use `with_quota_project` and if you want to modify the token
62 uri, use `with_token_uri`.
63
64 **IMPORTANT**:
65 This class does not validate the credential configuration. A security
66 risk occurs when a credential configuration configured with malicious urls
67 is used.
68 When the credential configuration is accepted from an
69 untrusted source, you should validate it before using.
70 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details."""
71
72 def __init__(
73 self,
74 token=None,
75 expiry=None,
76 refresh_token=None,
77 audience=None,
78 client_id=None,
79 client_secret=None,
80 token_url=None,
81 token_info_url=None,
82 revoke_url=None,
83 scopes=None,
84 quota_project_id=None,
85 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
86 ):
87 """Instantiates a external account authorized user credentials object.
88
89 Args:
90 token (str): The OAuth 2.0 access token. Can be None if refresh information
91 is provided.
92 expiry (datetime.datetime): The optional expiration datetime of the OAuth 2.0 access
93 token.
94 refresh_token (str): The optional OAuth 2.0 refresh token. If specified,
95 credentials can be refreshed.
96 audience (str): The optional STS audience which contains the resource name for the workforce
97 pool and the provider identifier in that pool.
98 client_id (str): The OAuth 2.0 client ID. Must be specified for refresh, can be left as
99 None if the token can not be refreshed.
100 client_secret (str): The OAuth 2.0 client secret. Must be specified for refresh, can be
101 left as None if the token can not be refreshed.
102 token_url (str): The optional STS token exchange endpoint for refresh. Must be specified for
103 refresh, can be left as None if the token can not be refreshed.
104 token_info_url (str): The optional STS endpoint URL for token introspection.
105 revoke_url (str): The optional STS endpoint URL for revoking tokens.
106 quota_project_id (str): The optional project ID used for quota and billing.
107 This project may be different from the project used to
108 create the credentials.
109 universe_domain (Optional[str]): The universe domain. The default value
110 is googleapis.com.
111
112 Returns:
113 google.auth.external_account_authorized_user.Credentials: The
114 constructed credentials.
115 """
116 super(Credentials, self).__init__()
117
118 self.token = token
119 self.expiry = expiry
120 self._audience = audience
121 self._refresh_token = refresh_token
122 self._token_url = token_url
123 self._token_info_url = token_info_url
124 self._client_id = client_id
125 self._client_secret = client_secret
126 self._revoke_url = revoke_url
127 self._quota_project_id = quota_project_id
128 self._scopes = scopes
129 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
130 self._cred_file_path = None
131
132 if not self.valid and not self.can_refresh:
133 raise exceptions.InvalidOperation(
134 "Token should be created with fields to make it valid (`token` and "
135 "`expiry`), or fields to allow it to refresh (`refresh_token`, "
136 "`token_url`, `client_id`, `client_secret`)."
137 )
138
139 self._client_auth = None
140 if self._client_id:
141 self._client_auth = utils.ClientAuthentication(
142 utils.ClientAuthType.basic, self._client_id, self._client_secret
143 )
144 self._sts_client = sts.Client(self._token_url, self._client_auth)
145
146 @property
147 def info(self):
148 """Generates the serializable dictionary representation of the current
149 credentials.
150
151 Returns:
152 Mapping: The dictionary representation of the credentials. This is the
153 reverse of the "from_info" method defined in this class. It is
154 useful for serializing the current credentials so it can deserialized
155 later.
156 """
157 config_info = self.constructor_args()
158 config_info.update(type=_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE)
159 if config_info["expiry"]:
160 config_info["expiry"] = config_info["expiry"].isoformat() + "Z"
161
162 return {key: value for key, value in config_info.items() if value is not None}
163
164 def constructor_args(self):
165 return {
166 "audience": self._audience,
167 "refresh_token": self._refresh_token,
168 "token_url": self._token_url,
169 "token_info_url": self._token_info_url,
170 "client_id": self._client_id,
171 "client_secret": self._client_secret,
172 "token": self.token,
173 "expiry": self.expiry,
174 "revoke_url": self._revoke_url,
175 "scopes": self._scopes,
176 "quota_project_id": self._quota_project_id,
177 "universe_domain": self._universe_domain,
178 }
179
180 @property
181 def scopes(self):
182 """Optional[str]: The OAuth 2.0 permission scopes."""
183 return self._scopes
184
185 @property
186 def requires_scopes(self):
187 """ False: OAuth 2.0 credentials have their scopes set when
188 the initial token is requested and can not be changed."""
189 return False
190
191 @property
192 def client_id(self):
193 """Optional[str]: The OAuth 2.0 client ID."""
194 return self._client_id
195
196 @property
197 def client_secret(self):
198 """Optional[str]: The OAuth 2.0 client secret."""
199 return self._client_secret
200
201 @property
202 def audience(self):
203 """Optional[str]: The STS audience which contains the resource name for the
204 workforce pool and the provider identifier in that pool."""
205 return self._audience
206
207 @property
208 def refresh_token(self):
209 """Optional[str]: The OAuth 2.0 refresh token."""
210 return self._refresh_token
211
212 @property
213 def token_url(self):
214 """Optional[str]: The STS token exchange endpoint for refresh."""
215 return self._token_url
216
217 @property
218 def token_info_url(self):
219 """Optional[str]: The STS endpoint for token info."""
220 return self._token_info_url
221
222 @property
223 def revoke_url(self):
224 """Optional[str]: The STS endpoint for token revocation."""
225 return self._revoke_url
226
227 @property
228 def is_user(self):
229 """ True: This credential always represents a user."""
230 return True
231
232 @property
233 def can_refresh(self):
234 return all(
235 (self._refresh_token, self._token_url, self._client_id, self._client_secret)
236 )
237
238 def get_project_id(self, request=None):
239 """Retrieves the project ID corresponding to the workload identity or workforce pool.
240 For workforce pool credentials, it returns the project ID corresponding to
241 the workforce_pool_user_project.
242
243 When not determinable, None is returned.
244
245 Args:
246 request (google.auth.transport.requests.Request): Request object.
247 Unused here, but passed from _default.default().
248
249 Return:
250 str: project ID is not determinable for this credential type so it returns None
251 """
252
253 return None
254
255 def to_json(self, strip=None):
256 """Utility function that creates a JSON representation of this
257 credential.
258 Args:
259 strip (Sequence[str]): Optional list of members to exclude from the
260 generated JSON.
261 Returns:
262 str: A JSON representation of this instance. When converted into
263 a dictionary, it can be passed to from_info()
264 to create a new instance.
265 """
266 strip = strip if strip else []
267 return json.dumps({k: v for (k, v) in self.info.items() if k not in strip})
268
269 def refresh(self, request):
270 """Refreshes the access token.
271
272 Args:
273 request (google.auth.transport.Request): The object used to make
274 HTTP requests.
275
276 Raises:
277 google.auth.exceptions.RefreshError: If the credentials could
278 not be refreshed.
279 """
280 if not self.can_refresh:
281 raise exceptions.RefreshError(
282 "The credentials do not contain the necessary fields need to "
283 "refresh the access token. You must specify refresh_token, "
284 "token_url, client_id, and client_secret."
285 )
286
287 now = _helpers.utcnow()
288 response_data = self._make_sts_request(request)
289
290 self.token = response_data.get("access_token")
291
292 lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
293 self.expiry = now + lifetime
294
295 if "refresh_token" in response_data:
296 self._refresh_token = response_data["refresh_token"]
297
298 def _make_sts_request(self, request):
299 return self._sts_client.refresh_token(request, self._refresh_token)
300
301 @_helpers.copy_docstring(credentials.Credentials)
302 def get_cred_info(self):
303 if self._cred_file_path:
304 return {
305 "credential_source": self._cred_file_path,
306 "credential_type": "external account authorized user credentials",
307 }
308 return None
309
310 def _make_copy(self):
311 kwargs = self.constructor_args()
312 cred = self.__class__(**kwargs)
313 cred._cred_file_path = self._cred_file_path
314 return cred
315
316 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
317 def with_quota_project(self, quota_project_id):
318 cred = self._make_copy()
319 cred._quota_project_id = quota_project_id
320 return cred
321
322 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
323 def with_token_uri(self, token_uri):
324 cred = self._make_copy()
325 cred._token_url = token_uri
326 return cred
327
328 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
329 def with_universe_domain(self, universe_domain):
330 cred = self._make_copy()
331 cred._universe_domain = universe_domain
332 return cred
333
334 @classmethod
335 def from_info(cls, info, **kwargs):
336 """Creates a Credentials instance from parsed external account info.
337
338 **IMPORTANT**:
339 This method does not validate the credential configuration. A security
340 risk occurs when a credential configuration configured with malicious urls
341 is used.
342 When the credential configuration is accepted from an
343 untrusted source, you should validate it before using with this method.
344 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
345
346 Args:
347 info (Mapping[str, str]): The external account info in Google
348 format.
349 kwargs: Additional arguments to pass to the constructor.
350
351 Returns:
352 google.auth.external_account_authorized_user.Credentials: The
353 constructed credentials.
354
355 Raises:
356 ValueError: For invalid parameters.
357 """
358 expiry = info.get("expiry")
359 if expiry:
360 expiry = datetime.datetime.strptime(
361 expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S"
362 )
363 return cls(
364 audience=info.get("audience"),
365 refresh_token=info.get("refresh_token"),
366 token_url=info.get("token_url"),
367 token_info_url=info.get("token_info_url"),
368 client_id=info.get("client_id"),
369 client_secret=info.get("client_secret"),
370 token=info.get("token"),
371 expiry=expiry,
372 revoke_url=info.get("revoke_url"),
373 quota_project_id=info.get("quota_project_id"),
374 scopes=info.get("scopes"),
375 universe_domain=info.get(
376 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
377 ),
378 **kwargs
379 )
380
381 @classmethod
382 def from_file(cls, filename, **kwargs):
383 """Creates a Credentials instance from an external account json file.
384
385 **IMPORTANT**:
386 This method does not validate the credential configuration. A security
387 risk occurs when a credential configuration configured with malicious urls
388 is used.
389 When the credential configuration is accepted from an
390 untrusted source, you should validate it before using with this method.
391 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
392
393 Args:
394 filename (str): The path to the external account json file.
395 kwargs: Additional arguments to pass to the constructor.
396
397 Returns:
398 google.auth.external_account_authorized_user.Credentials: The
399 constructed credentials.
400 """
401 with io.open(filename, "r", encoding="utf-8") as json_file:
402 data = json.load(json_file)
403 return cls.from_info(data, **kwargs)