Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/external_account_authorized_user.py: 45%
110 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:22 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:22 +0000
1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""External Account Authorized User Credentials.
16This module provides credentials based on OAuth 2.0 access and refresh tokens.
17These credentials usually access resources on behalf of a user (resource
18owner).
20Specifically, these are sourced using external identities via Workforce Identity Federation.
22Obtaining the initial access and refresh token can be done through the Google Cloud CLI.
24Example credential:
25{
26 "type": "external_account_authorized_user",
27 "audience": "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID",
28 "refresh_token": "refreshToken",
29 "token_url": "https://sts.googleapis.com/v1/oauth/token",
30 "token_info_url": "https://sts.googleapis.com/v1/instrospect",
31 "client_id": "clientId",
32 "client_secret": "clientSecret"
33}
34"""
36import datetime
37import io
38import json
40from google.auth import _helpers
41from google.auth import credentials
42from google.auth import exceptions
43from google.oauth2 import sts
44from google.oauth2 import utils
46_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE = "external_account_authorized_user"
49class Credentials(
50 credentials.CredentialsWithQuotaProject,
51 credentials.ReadOnlyScoped,
52 credentials.CredentialsWithTokenUri,
53):
54 """Credentials for External Account Authorized Users.
56 This is used to instantiate Credentials for exchanging refresh tokens from
57 authorized users for Google access token and authorizing requests to Google
58 APIs.
60 The credentials are considered immutable. If you want to modify the
61 quota project, use `with_quota_project` and if you want to modify the token
62 uri, use `with_token_uri`.
63 """
65 def __init__(
66 self,
67 token=None,
68 expiry=None,
69 refresh_token=None,
70 audience=None,
71 client_id=None,
72 client_secret=None,
73 token_url=None,
74 token_info_url=None,
75 revoke_url=None,
76 scopes=None,
77 quota_project_id=None,
78 ):
79 """Instantiates a external account authorized user credentials object.
81 Args:
82 token (str): The OAuth 2.0 access token. Can be None if refresh information
83 is provided.
84 expiry (datetime.datetime): The optional expiration datetime of the OAuth 2.0 access
85 token.
86 refresh_token (str): The optional OAuth 2.0 refresh token. If specified,
87 credentials can be refreshed.
88 audience (str): The optional STS audience which contains the resource name for the workforce
89 pool and the provider identifier in that pool.
90 client_id (str): The OAuth 2.0 client ID. Must be specified for refresh, can be left as
91 None if the token can not be refreshed.
92 client_secret (str): The OAuth 2.0 client secret. Must be specified for refresh, can be
93 left as None if the token can not be refreshed.
94 token_url (str): The optional STS token exchange endpoint for refresh. Must be specified for
95 refresh, can be left as None if the token can not be refreshed.
96 token_info_url (str): The optional STS endpoint URL for token introspection.
97 revoke_url (str): The optional STS endpoint URL for revoking tokens.
98 quota_project_id (str): The optional project ID used for quota and billing.
99 This project may be different from the project used to
100 create the credentials.
102 Returns:
103 google.auth.external_account_authorized_user.Credentials: The
104 constructed credentials.
105 """
106 super(Credentials, self).__init__()
108 self.token = token
109 self.expiry = expiry
110 self._audience = audience
111 self._refresh_token = refresh_token
112 self._token_url = token_url
113 self._token_info_url = token_info_url
114 self._client_id = client_id
115 self._client_secret = client_secret
116 self._revoke_url = revoke_url
117 self._quota_project_id = quota_project_id
118 self._scopes = scopes
120 if not self.valid and not self.can_refresh:
121 raise exceptions.InvalidOperation(
122 "Token should be created with fields to make it valid (`token` and "
123 "`expiry`), or fields to allow it to refresh (`refresh_token`, "
124 "`token_url`, `client_id`, `client_secret`)."
125 )
127 self._client_auth = None
128 if self._client_id:
129 self._client_auth = utils.ClientAuthentication(
130 utils.ClientAuthType.basic, self._client_id, self._client_secret
131 )
132 self._sts_client = sts.Client(self._token_url, self._client_auth)
134 @property
135 def info(self):
136 """Generates the serializable dictionary representation of the current
137 credentials.
139 Returns:
140 Mapping: The dictionary representation of the credentials. This is the
141 reverse of the "from_info" method defined in this class. It is
142 useful for serializing the current credentials so it can deserialized
143 later.
144 """
145 config_info = self.constructor_args()
146 config_info.update(type=_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE)
147 if config_info["expiry"]:
148 config_info["expiry"] = config_info["expiry"].isoformat() + "Z"
150 return {key: value for key, value in config_info.items() if value is not None}
152 def constructor_args(self):
153 return {
154 "audience": self._audience,
155 "refresh_token": self._refresh_token,
156 "token_url": self._token_url,
157 "token_info_url": self._token_info_url,
158 "client_id": self._client_id,
159 "client_secret": self._client_secret,
160 "token": self.token,
161 "expiry": self.expiry,
162 "revoke_url": self._revoke_url,
163 "scopes": self._scopes,
164 "quota_project_id": self._quota_project_id,
165 }
167 @property
168 def scopes(self):
169 """Optional[str]: The OAuth 2.0 permission scopes."""
170 return self._scopes
172 @property
173 def requires_scopes(self):
174 """ False: OAuth 2.0 credentials have their scopes set when
175 the initial token is requested and can not be changed."""
176 return False
178 @property
179 def client_id(self):
180 """Optional[str]: The OAuth 2.0 client ID."""
181 return self._client_id
183 @property
184 def client_secret(self):
185 """Optional[str]: The OAuth 2.0 client secret."""
186 return self._client_secret
188 @property
189 def audience(self):
190 """Optional[str]: The STS audience which contains the resource name for the
191 workforce pool and the provider identifier in that pool."""
192 return self._audience
194 @property
195 def refresh_token(self):
196 """Optional[str]: The OAuth 2.0 refresh token."""
197 return self._refresh_token
199 @property
200 def token_url(self):
201 """Optional[str]: The STS token exchange endpoint for refresh."""
202 return self._token_url
204 @property
205 def token_info_url(self):
206 """Optional[str]: The STS endpoint for token info."""
207 return self._token_info_url
209 @property
210 def revoke_url(self):
211 """Optional[str]: The STS endpoint for token revocation."""
212 return self._revoke_url
214 @property
215 def is_user(self):
216 """ True: This credential always represents a user."""
217 return True
219 @property
220 def can_refresh(self):
221 return all(
222 (self._refresh_token, self._token_url, self._client_id, self._client_secret)
223 )
225 def get_project_id(self, request=None):
226 """Retrieves the project ID corresponding to the workload identity or workforce pool.
227 For workforce pool credentials, it returns the project ID corresponding to
228 the workforce_pool_user_project.
230 When not determinable, None is returned.
232 Args:
233 request (google.auth.transport.requests.Request): Request object.
234 Unused here, but passed from _default.default().
236 Return:
237 str: project ID is not determinable for this credential type so it returns None
238 """
240 return None
242 def to_json(self, strip=None):
243 """Utility function that creates a JSON representation of this
244 credential.
245 Args:
246 strip (Sequence[str]): Optional list of members to exclude from the
247 generated JSON.
248 Returns:
249 str: A JSON representation of this instance. When converted into
250 a dictionary, it can be passed to from_info()
251 to create a new instance.
252 """
253 strip = strip if strip else []
254 return json.dumps({k: v for (k, v) in self.info.items() if k not in strip})
256 def refresh(self, request):
257 """Refreshes the access token.
259 Args:
260 request (google.auth.transport.Request): The object used to make
261 HTTP requests.
263 Raises:
264 google.auth.exceptions.RefreshError: If the credentials could
265 not be refreshed.
266 """
267 if not self.can_refresh:
268 raise exceptions.RefreshError(
269 "The credentials do not contain the necessary fields need to "
270 "refresh the access token. You must specify refresh_token, "
271 "token_url, client_id, and client_secret."
272 )
274 now = _helpers.utcnow()
275 response_data = self._make_sts_request(request)
277 self.token = response_data.get("access_token")
279 lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
280 self.expiry = now + lifetime
282 if "refresh_token" in response_data:
283 self._refresh_token = response_data["refresh_token"]
285 def _make_sts_request(self, request):
286 return self._sts_client.refresh_token(request, self._refresh_token)
288 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
289 def with_quota_project(self, quota_project_id):
290 kwargs = self.constructor_args()
291 kwargs.update(quota_project_id=quota_project_id)
292 return self.__class__(**kwargs)
294 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
295 def with_token_uri(self, token_uri):
296 kwargs = self.constructor_args()
297 kwargs.update(token_url=token_uri)
298 return self.__class__(**kwargs)
300 @classmethod
301 def from_info(cls, info, **kwargs):
302 """Creates a Credentials instance from parsed external account info.
304 Args:
305 info (Mapping[str, str]): The external account info in Google
306 format.
307 kwargs: Additional arguments to pass to the constructor.
309 Returns:
310 google.auth.external_account_authorized_user.Credentials: The
311 constructed credentials.
313 Raises:
314 ValueError: For invalid parameters.
315 """
316 expiry = info.get("expiry")
317 if expiry:
318 expiry = datetime.datetime.strptime(
319 expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S"
320 )
321 return cls(
322 audience=info.get("audience"),
323 refresh_token=info.get("refresh_token"),
324 token_url=info.get("token_url"),
325 token_info_url=info.get("token_info_url"),
326 client_id=info.get("client_id"),
327 client_secret=info.get("client_secret"),
328 token=info.get("token"),
329 expiry=expiry,
330 revoke_url=info.get("revoke_url"),
331 quota_project_id=info.get("quota_project_id"),
332 scopes=info.get("scopes"),
333 **kwargs
334 )
336 @classmethod
337 def from_file(cls, filename, **kwargs):
338 """Creates a Credentials instance from an external account json file.
340 Args:
341 filename (str): The path to the external account json file.
342 kwargs: Additional arguments to pass to the constructor.
344 Returns:
345 google.auth.external_account_authorized_user.Credentials: The
346 constructed credentials.
347 """
348 with io.open(filename, "r", encoding="utf-8") as json_file:
349 data = json.load(json_file)
350 return cls.from_info(data, **kwargs)