Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/external_account_authorized_user.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

128 statements  

1# Copyright 2022 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""External Account Authorized User Credentials. 

16This module provides credentials based on OAuth 2.0 access and refresh tokens. 

17These credentials usually access resources on behalf of a user (resource 

18owner). 

19 

20Specifically, these are sourced using external identities via Workforce Identity Federation. 

21 

22Obtaining the initial access and refresh token can be done through the Google Cloud CLI. 

23 

24Example credential: 

25{ 

26 "type": "external_account_authorized_user", 

27 "audience": "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID", 

28 "refresh_token": "refreshToken", 

29 "token_url": "https://sts.googleapis.com/v1/oauth/token", 

30 "token_info_url": "https://sts.googleapis.com/v1/instrospect", 

31 "client_id": "clientId", 

32 "client_secret": "clientSecret" 

33} 

34""" 

35 

36import datetime 

37import io 

38import json 

39 

40from google.auth import _helpers 

41from google.auth import credentials 

42from google.auth import exceptions 

43from google.oauth2 import sts 

44from google.oauth2 import utils 

45 

46_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE = "external_account_authorized_user" 

47 

48 

49class Credentials( 

50 credentials.CredentialsWithQuotaProject, 

51 credentials.ReadOnlyScoped, 

52 credentials.CredentialsWithTokenUri, 

53): 

54 """Credentials for External Account Authorized Users. 

55 

56 This is used to instantiate Credentials for exchanging refresh tokens from 

57 authorized users for Google access token and authorizing requests to Google 

58 APIs. 

59 

60 The credentials are considered immutable. If you want to modify the 

61 quota project, use `with_quota_project` and if you want to modify the token 

62 uri, use `with_token_uri`. 

63 

64 **IMPORTANT**: 

65 This class does not validate the credential configuration. A security 

66 risk occurs when a credential configuration configured with malicious urls 

67 is used. 

68 When the credential configuration is accepted from an 

69 untrusted source, you should validate it before using. 

70 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.""" 

71 

72 def __init__( 

73 self, 

74 token=None, 

75 expiry=None, 

76 refresh_token=None, 

77 audience=None, 

78 client_id=None, 

79 client_secret=None, 

80 token_url=None, 

81 token_info_url=None, 

82 revoke_url=None, 

83 scopes=None, 

84 quota_project_id=None, 

85 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 

86 ): 

87 """Instantiates a external account authorized user credentials object. 

88 

89 Args: 

90 token (str): The OAuth 2.0 access token. Can be None if refresh information 

91 is provided. 

92 expiry (datetime.datetime): The optional expiration datetime of the OAuth 2.0 access 

93 token. 

94 refresh_token (str): The optional OAuth 2.0 refresh token. If specified, 

95 credentials can be refreshed. 

96 audience (str): The optional STS audience which contains the resource name for the workforce 

97 pool and the provider identifier in that pool. 

98 client_id (str): The OAuth 2.0 client ID. Must be specified for refresh, can be left as 

99 None if the token can not be refreshed. 

100 client_secret (str): The OAuth 2.0 client secret. Must be specified for refresh, can be 

101 left as None if the token can not be refreshed. 

102 token_url (str): The optional STS token exchange endpoint for refresh. Must be specified for 

103 refresh, can be left as None if the token can not be refreshed. 

104 token_info_url (str): The optional STS endpoint URL for token introspection. 

105 revoke_url (str): The optional STS endpoint URL for revoking tokens. 

106 quota_project_id (str): The optional project ID used for quota and billing. 

107 This project may be different from the project used to 

108 create the credentials. 

109 universe_domain (Optional[str]): The universe domain. The default value 

110 is googleapis.com. 

111 

112 Returns: 

113 google.auth.external_account_authorized_user.Credentials: The 

114 constructed credentials. 

115 """ 

116 super(Credentials, self).__init__() 

117 

118 self.token = token 

119 self.expiry = expiry 

120 self._audience = audience 

121 self._refresh_token = refresh_token 

122 self._token_url = token_url 

123 self._token_info_url = token_info_url 

124 self._client_id = client_id 

125 self._client_secret = client_secret 

126 self._revoke_url = revoke_url 

127 self._quota_project_id = quota_project_id 

128 self._scopes = scopes 

129 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN 

130 self._cred_file_path = None 

131 

132 if not self.valid and not self.can_refresh: 

133 raise exceptions.InvalidOperation( 

134 "Token should be created with fields to make it valid (`token` and " 

135 "`expiry`), or fields to allow it to refresh (`refresh_token`, " 

136 "`token_url`, `client_id`, `client_secret`)." 

137 ) 

138 

139 self._client_auth = None 

140 if self._client_id: 

141 self._client_auth = utils.ClientAuthentication( 

142 utils.ClientAuthType.basic, self._client_id, self._client_secret 

143 ) 

144 self._sts_client = sts.Client(self._token_url, self._client_auth) 

145 

146 @property 

147 def info(self): 

148 """Generates the serializable dictionary representation of the current 

149 credentials. 

150 

151 Returns: 

152 Mapping: The dictionary representation of the credentials. This is the 

153 reverse of the "from_info" method defined in this class. It is 

154 useful for serializing the current credentials so it can deserialized 

155 later. 

156 """ 

157 config_info = self.constructor_args() 

158 config_info.update(type=_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE) 

159 if config_info["expiry"]: 

160 config_info["expiry"] = config_info["expiry"].isoformat() + "Z" 

161 

162 return {key: value for key, value in config_info.items() if value is not None} 

163 

164 def constructor_args(self): 

165 return { 

166 "audience": self._audience, 

167 "refresh_token": self._refresh_token, 

168 "token_url": self._token_url, 

169 "token_info_url": self._token_info_url, 

170 "client_id": self._client_id, 

171 "client_secret": self._client_secret, 

172 "token": self.token, 

173 "expiry": self.expiry, 

174 "revoke_url": self._revoke_url, 

175 "scopes": self._scopes, 

176 "quota_project_id": self._quota_project_id, 

177 "universe_domain": self._universe_domain, 

178 } 

179 

180 @property 

181 def scopes(self): 

182 """Optional[str]: The OAuth 2.0 permission scopes.""" 

183 return self._scopes 

184 

185 @property 

186 def requires_scopes(self): 

187 """ False: OAuth 2.0 credentials have their scopes set when 

188 the initial token is requested and can not be changed.""" 

189 return False 

190 

191 @property 

192 def client_id(self): 

193 """Optional[str]: The OAuth 2.0 client ID.""" 

194 return self._client_id 

195 

196 @property 

197 def client_secret(self): 

198 """Optional[str]: The OAuth 2.0 client secret.""" 

199 return self._client_secret 

200 

201 @property 

202 def audience(self): 

203 """Optional[str]: The STS audience which contains the resource name for the 

204 workforce pool and the provider identifier in that pool.""" 

205 return self._audience 

206 

207 @property 

208 def refresh_token(self): 

209 """Optional[str]: The OAuth 2.0 refresh token.""" 

210 return self._refresh_token 

211 

212 @property 

213 def token_url(self): 

214 """Optional[str]: The STS token exchange endpoint for refresh.""" 

215 return self._token_url 

216 

217 @property 

218 def token_info_url(self): 

219 """Optional[str]: The STS endpoint for token info.""" 

220 return self._token_info_url 

221 

222 @property 

223 def revoke_url(self): 

224 """Optional[str]: The STS endpoint for token revocation.""" 

225 return self._revoke_url 

226 

227 @property 

228 def is_user(self): 

229 """ True: This credential always represents a user.""" 

230 return True 

231 

232 @property 

233 def can_refresh(self): 

234 return all( 

235 (self._refresh_token, self._token_url, self._client_id, self._client_secret) 

236 ) 

237 

238 def get_project_id(self, request=None): 

239 """Retrieves the project ID corresponding to the workload identity or workforce pool. 

240 For workforce pool credentials, it returns the project ID corresponding to 

241 the workforce_pool_user_project. 

242 

243 When not determinable, None is returned. 

244 

245 Args: 

246 request (google.auth.transport.requests.Request): Request object. 

247 Unused here, but passed from _default.default(). 

248 

249 Return: 

250 str: project ID is not determinable for this credential type so it returns None 

251 """ 

252 

253 return None 

254 

255 def to_json(self, strip=None): 

256 """Utility function that creates a JSON representation of this 

257 credential. 

258 Args: 

259 strip (Sequence[str]): Optional list of members to exclude from the 

260 generated JSON. 

261 Returns: 

262 str: A JSON representation of this instance. When converted into 

263 a dictionary, it can be passed to from_info() 

264 to create a new instance. 

265 """ 

266 strip = strip if strip else [] 

267 return json.dumps({k: v for (k, v) in self.info.items() if k not in strip}) 

268 

269 def refresh(self, request): 

270 """Refreshes the access token. 

271 

272 Args: 

273 request (google.auth.transport.Request): The object used to make 

274 HTTP requests. 

275 

276 Raises: 

277 google.auth.exceptions.RefreshError: If the credentials could 

278 not be refreshed. 

279 """ 

280 if not self.can_refresh: 

281 raise exceptions.RefreshError( 

282 "The credentials do not contain the necessary fields need to " 

283 "refresh the access token. You must specify refresh_token, " 

284 "token_url, client_id, and client_secret." 

285 ) 

286 

287 now = _helpers.utcnow() 

288 response_data = self._make_sts_request(request) 

289 

290 self.token = response_data.get("access_token") 

291 

292 lifetime = datetime.timedelta(seconds=response_data.get("expires_in")) 

293 self.expiry = now + lifetime 

294 

295 if "refresh_token" in response_data: 

296 self._refresh_token = response_data["refresh_token"] 

297 

298 def _make_sts_request(self, request): 

299 return self._sts_client.refresh_token(request, self._refresh_token) 

300 

301 @_helpers.copy_docstring(credentials.Credentials) 

302 def get_cred_info(self): 

303 if self._cred_file_path: 

304 return { 

305 "credential_source": self._cred_file_path, 

306 "credential_type": "external account authorized user credentials", 

307 } 

308 return None 

309 

310 def _make_copy(self): 

311 kwargs = self.constructor_args() 

312 cred = self.__class__(**kwargs) 

313 cred._cred_file_path = self._cred_file_path 

314 return cred 

315 

316 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

317 def with_quota_project(self, quota_project_id): 

318 cred = self._make_copy() 

319 cred._quota_project_id = quota_project_id 

320 return cred 

321 

322 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

323 def with_token_uri(self, token_uri): 

324 cred = self._make_copy() 

325 cred._token_url = token_uri 

326 return cred 

327 

328 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) 

329 def with_universe_domain(self, universe_domain): 

330 cred = self._make_copy() 

331 cred._universe_domain = universe_domain 

332 return cred 

333 

334 @classmethod 

335 def from_info(cls, info, **kwargs): 

336 """Creates a Credentials instance from parsed external account info. 

337 

338 **IMPORTANT**: 

339 This method does not validate the credential configuration. A security 

340 risk occurs when a credential configuration configured with malicious urls 

341 is used. 

342 When the credential configuration is accepted from an 

343 untrusted source, you should validate it before using with this method. 

344 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. 

345 

346 Args: 

347 info (Mapping[str, str]): The external account info in Google 

348 format. 

349 kwargs: Additional arguments to pass to the constructor. 

350 

351 Returns: 

352 google.auth.external_account_authorized_user.Credentials: The 

353 constructed credentials. 

354 

355 Raises: 

356 ValueError: For invalid parameters. 

357 """ 

358 expiry = info.get("expiry") 

359 if expiry: 

360 expiry = datetime.datetime.strptime( 

361 expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S" 

362 ) 

363 return cls( 

364 audience=info.get("audience"), 

365 refresh_token=info.get("refresh_token"), 

366 token_url=info.get("token_url"), 

367 token_info_url=info.get("token_info_url"), 

368 client_id=info.get("client_id"), 

369 client_secret=info.get("client_secret"), 

370 token=info.get("token"), 

371 expiry=expiry, 

372 revoke_url=info.get("revoke_url"), 

373 quota_project_id=info.get("quota_project_id"), 

374 scopes=info.get("scopes"), 

375 universe_domain=info.get( 

376 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN 

377 ), 

378 **kwargs 

379 ) 

380 

381 @classmethod 

382 def from_file(cls, filename, **kwargs): 

383 """Creates a Credentials instance from an external account json file. 

384 

385 **IMPORTANT**: 

386 This method does not validate the credential configuration. A security 

387 risk occurs when a credential configuration configured with malicious urls 

388 is used. 

389 When the credential configuration is accepted from an 

390 untrusted source, you should validate it before using with this method. 

391 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. 

392 

393 Args: 

394 filename (str): The path to the external account json file. 

395 kwargs: Additional arguments to pass to the constructor. 

396 

397 Returns: 

398 google.auth.external_account_authorized_user.Credentials: The 

399 constructed credentials. 

400 """ 

401 with io.open(filename, "r", encoding="utf-8") as json_file: 

402 data = json.load(json_file) 

403 return cls.from_info(data, **kwargs)