Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/external_account_authorized_user.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

146 statements  

1# Copyright 2022 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""External Account Authorized User Credentials. 

16This module provides credentials based on OAuth 2.0 access and refresh tokens. 

17These credentials usually access resources on behalf of a user (resource 

18owner). 

19 

20Specifically, these are sourced using external identities via Workforce Identity Federation. 

21 

22Obtaining the initial access and refresh token can be done through the Google Cloud CLI. 

23 

24Example credential: 

25{ 

26 "type": "external_account_authorized_user", 

27 "audience": "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID", 

28 "refresh_token": "refreshToken", 

29 "token_url": "https://sts.googleapis.com/v1/oauth/token", 

30 "token_info_url": "https://sts.googleapis.com/v1/instrospect", 

31 "client_id": "clientId", 

32 "client_secret": "clientSecret" 

33} 

34""" 

35 

36import datetime 

37import io 

38import json 

39import re 

40 

41from google.auth import _constants 

42from google.auth import _helpers 

43from google.auth import credentials 

44from google.auth import exceptions 

45from google.oauth2 import sts 

46from google.oauth2 import utils 

47 

48_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE = "external_account_authorized_user" 

49 

50 

51class Credentials( 

52 credentials.CredentialsWithQuotaProject, 

53 credentials.ReadOnlyScoped, 

54 credentials.CredentialsWithTokenUri, 

55 credentials.CredentialsWithTrustBoundary, 

56): 

57 """Credentials for External Account Authorized Users. 

58 

59 This is used to instantiate Credentials for exchanging refresh tokens from 

60 authorized users for Google access token and authorizing requests to Google 

61 APIs. 

62 

63 The credentials are considered immutable. If you want to modify the 

64 quota project, use `with_quota_project` and if you want to modify the token 

65 uri, use `with_token_uri`. 

66 

67 **IMPORTANT**: 

68 This class does not validate the credential configuration. A security 

69 risk occurs when a credential configuration configured with malicious urls 

70 is used. 

71 When the credential configuration is accepted from an 

72 untrusted source, you should validate it before using. 

73 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. 

74 """ 

75 

76 def __init__( 

77 self, 

78 token=None, 

79 expiry=None, 

80 refresh_token=None, 

81 audience=None, 

82 client_id=None, 

83 client_secret=None, 

84 token_url=None, 

85 token_info_url=None, 

86 revoke_url=None, 

87 scopes=None, 

88 quota_project_id=None, 

89 universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 

90 trust_boundary=None, 

91 ): 

92 """Instantiates a external account authorized user credentials object. 

93 

94 Args: 

95 token (str): The OAuth 2.0 access token. Can be None if refresh information 

96 is provided. 

97 expiry (datetime.datetime): The optional expiration datetime of the OAuth 2.0 access 

98 token. 

99 refresh_token (str): The optional OAuth 2.0 refresh token. If specified, 

100 credentials can be refreshed. 

101 audience (str): The optional STS audience which contains the resource name for the workforce 

102 pool and the provider identifier in that pool. 

103 client_id (str): The OAuth 2.0 client ID. Must be specified for refresh, can be left as 

104 None if the token can not be refreshed. 

105 client_secret (str): The OAuth 2.0 client secret. Must be specified for refresh, can be 

106 left as None if the token can not be refreshed. 

107 token_url (str): The optional STS token exchange endpoint for refresh. Must be specified for 

108 refresh, can be left as None if the token can not be refreshed. 

109 token_info_url (str): The optional STS endpoint URL for token introspection. 

110 revoke_url (str): The optional STS endpoint URL for revoking tokens. 

111 quota_project_id (str): The optional project ID used for quota and billing. 

112 This project may be different from the project used to 

113 create the credentials. 

114 universe_domain (Optional[str]): The universe domain. The default value 

115 is googleapis.com. 

116 trust_boundary (Mapping[str,str]): A credential trust boundary. 

117 

118 Returns: 

119 google.auth.external_account_authorized_user.Credentials: The 

120 constructed credentials. 

121 """ 

122 super(Credentials, self).__init__() 

123 

124 self.token = token 

125 self.expiry = expiry 

126 self._audience = audience 

127 self._refresh_token = refresh_token 

128 self._token_url = token_url 

129 self._token_info_url = token_info_url 

130 self._client_id = client_id 

131 self._client_secret = client_secret 

132 self._revoke_url = revoke_url 

133 self._quota_project_id = quota_project_id 

134 self._scopes = scopes 

135 self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN 

136 self._cred_file_path = None 

137 self._trust_boundary = trust_boundary 

138 

139 if not self.valid and not self.can_refresh: 

140 raise exceptions.InvalidOperation( 

141 "Token should be created with fields to make it valid (`token` and " 

142 "`expiry`), or fields to allow it to refresh (`refresh_token`, " 

143 "`token_url`, `client_id`, `client_secret`)." 

144 ) 

145 

146 self._client_auth = None 

147 if self._client_id: 

148 self._client_auth = utils.ClientAuthentication( 

149 utils.ClientAuthType.basic, self._client_id, self._client_secret 

150 ) 

151 self._sts_client = sts.Client(self._token_url, self._client_auth) 

152 

153 @property 

154 def info(self): 

155 """Generates the serializable dictionary representation of the current 

156 credentials. 

157 

158 Returns: 

159 Mapping: The dictionary representation of the credentials. This is the 

160 reverse of the "from_info" method defined in this class. It is 

161 useful for serializing the current credentials so it can deserialized 

162 later. 

163 """ 

164 config_info = self.constructor_args() 

165 config_info.update(type=_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE) 

166 if config_info["expiry"]: 

167 config_info["expiry"] = config_info["expiry"].isoformat() + "Z" 

168 

169 return {key: value for key, value in config_info.items() if value is not None} 

170 

171 def constructor_args(self): 

172 return { 

173 "audience": self._audience, 

174 "refresh_token": self._refresh_token, 

175 "token_url": self._token_url, 

176 "token_info_url": self._token_info_url, 

177 "client_id": self._client_id, 

178 "client_secret": self._client_secret, 

179 "token": self.token, 

180 "expiry": self.expiry, 

181 "revoke_url": self._revoke_url, 

182 "scopes": self._scopes, 

183 "quota_project_id": self._quota_project_id, 

184 "universe_domain": self._universe_domain, 

185 "trust_boundary": self._trust_boundary, 

186 } 

187 

188 @property 

189 def scopes(self): 

190 """Optional[str]: The OAuth 2.0 permission scopes.""" 

191 return self._scopes 

192 

193 @property 

194 def requires_scopes(self): 

195 """False: OAuth 2.0 credentials have their scopes set when 

196 the initial token is requested and can not be changed.""" 

197 return False 

198 

199 @property 

200 def client_id(self): 

201 """Optional[str]: The OAuth 2.0 client ID.""" 

202 return self._client_id 

203 

204 @property 

205 def client_secret(self): 

206 """Optional[str]: The OAuth 2.0 client secret.""" 

207 return self._client_secret 

208 

209 @property 

210 def audience(self): 

211 """Optional[str]: The STS audience which contains the resource name for the 

212 workforce pool and the provider identifier in that pool.""" 

213 return self._audience 

214 

215 @property 

216 def refresh_token(self): 

217 """Optional[str]: The OAuth 2.0 refresh token.""" 

218 return self._refresh_token 

219 

220 @property 

221 def token_url(self): 

222 """Optional[str]: The STS token exchange endpoint for refresh.""" 

223 return self._token_url 

224 

225 @property 

226 def token_info_url(self): 

227 """Optional[str]: The STS endpoint for token info.""" 

228 return self._token_info_url 

229 

230 @property 

231 def revoke_url(self): 

232 """Optional[str]: The STS endpoint for token revocation.""" 

233 return self._revoke_url 

234 

235 @property 

236 def is_user(self): 

237 """True: This credential always represents a user.""" 

238 return True 

239 

240 @property 

241 def can_refresh(self): 

242 return all( 

243 ( 

244 self._refresh_token, 

245 self._token_url, 

246 self._client_id, 

247 self._client_secret, 

248 ) 

249 ) 

250 

251 def get_project_id(self, request=None): 

252 """Retrieves the project ID corresponding to the workload identity or workforce pool. 

253 For workforce pool credentials, it returns the project ID corresponding to 

254 the workforce_pool_user_project. 

255 

256 When not determinable, None is returned. 

257 

258 Args: 

259 request (google.auth.transport.requests.Request): Request object. 

260 Unused here, but passed from _default.default(). 

261 

262 Return: 

263 str: project ID is not determinable for this credential type so it returns None 

264 """ 

265 

266 return None 

267 

268 def to_json(self, strip=None): 

269 """Utility function that creates a JSON representation of this 

270 credential. 

271 Args: 

272 strip (Sequence[str]): Optional list of members to exclude from the 

273 generated JSON. 

274 Returns: 

275 str: A JSON representation of this instance. When converted into 

276 a dictionary, it can be passed to from_info() 

277 to create a new instance. 

278 """ 

279 strip = strip if strip else [] 

280 return json.dumps({k: v for (k, v) in self.info.items() if k not in strip}) 

281 

282 def _perform_refresh_token(self, request): 

283 """Refreshes the access token. 

284 

285 Args: 

286 request (google.auth.transport.Request): The object used to make 

287 HTTP requests. 

288 

289 Raises: 

290 google.auth.exceptions.RefreshError: If the credentials could 

291 not be refreshed. 

292 """ 

293 if not self.can_refresh: 

294 raise exceptions.RefreshError( 

295 "The credentials do not contain the necessary fields need to " 

296 "refresh the access token. You must specify refresh_token, " 

297 "token_url, client_id, and client_secret." 

298 ) 

299 

300 now = _helpers.utcnow() 

301 response_data = self._sts_client.refresh_token(request, self._refresh_token) 

302 

303 self.token = response_data.get("access_token") 

304 

305 lifetime = datetime.timedelta(seconds=response_data.get("expires_in")) 

306 self.expiry = now + lifetime 

307 

308 if "refresh_token" in response_data: 

309 self._refresh_token = response_data["refresh_token"] 

310 

311 def _build_trust_boundary_lookup_url(self): 

312 """Builds and returns the URL for the trust boundary lookup API.""" 

313 # Audience format: //iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID 

314 match = re.search(r"locations/[^/]+/workforcePools/([^/]+)", self._audience) 

315 

316 if not match: 

317 raise exceptions.InvalidValue("Invalid workforce pool audience format.") 

318 

319 pool_id = match.groups()[0] 

320 

321 return _constants._WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( 

322 universe_domain=self._universe_domain, pool_id=pool_id 

323 ) 

324 

325 def revoke(self, request): 

326 """Revokes the refresh token. 

327 

328 Args: 

329 request (google.auth.transport.Request): The object used to make 

330 HTTP requests. 

331 

332 Raises: 

333 google.auth.exceptions.OAuthError: If the token could not be 

334 revoked. 

335 """ 

336 if not self._revoke_url or not self._refresh_token: 

337 raise exceptions.OAuthError( 

338 "The credentials do not contain the necessary fields to " 

339 "revoke the refresh token. You must specify revoke_url and " 

340 "refresh_token." 

341 ) 

342 

343 self._sts_client.revoke_token( 

344 request, self._refresh_token, "refresh_token", self._revoke_url 

345 ) 

346 self.token = None 

347 self._refresh_token = None 

348 

349 @_helpers.copy_docstring(credentials.Credentials) 

350 def get_cred_info(self): 

351 if self._cred_file_path: 

352 return { 

353 "credential_source": self._cred_file_path, 

354 "credential_type": "external account authorized user credentials", 

355 } 

356 return None 

357 

358 def _make_copy(self): 

359 kwargs = self.constructor_args() 

360 cred = self.__class__(**kwargs) 

361 cred._cred_file_path = self._cred_file_path 

362 return cred 

363 

364 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

365 def with_quota_project(self, quota_project_id): 

366 cred = self._make_copy() 

367 cred._quota_project_id = quota_project_id 

368 return cred 

369 

370 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

371 def with_token_uri(self, token_uri): 

372 cred = self._make_copy() 

373 cred._token_url = token_uri 

374 return cred 

375 

376 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) 

377 def with_universe_domain(self, universe_domain): 

378 cred = self._make_copy() 

379 cred._universe_domain = universe_domain 

380 return cred 

381 

382 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) 

383 def with_trust_boundary(self, trust_boundary): 

384 cred = self._make_copy() 

385 cred._trust_boundary = trust_boundary 

386 return cred 

387 

388 @classmethod 

389 def from_info(cls, info, **kwargs): 

390 """Creates a Credentials instance from parsed external account info. 

391 

392 **IMPORTANT**: 

393 This method does not validate the credential configuration. A security 

394 risk occurs when a credential configuration configured with malicious urls 

395 is used. 

396 When the credential configuration is accepted from an 

397 untrusted source, you should validate it before using with this method. 

398 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. 

399 

400 Args: 

401 info (Mapping[str, str]): The external account info in Google 

402 format. 

403 kwargs: Additional arguments to pass to the constructor. 

404 

405 Returns: 

406 google.auth.external_account_authorized_user.Credentials: The 

407 constructed credentials. 

408 

409 Raises: 

410 ValueError: For invalid parameters. 

411 """ 

412 expiry = info.get("expiry") 

413 if expiry: 

414 expiry = datetime.datetime.strptime( 

415 expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S" 

416 ) 

417 return cls( 

418 audience=info.get("audience"), 

419 refresh_token=info.get("refresh_token"), 

420 token_url=info.get("token_url"), 

421 token_info_url=info.get("token_info_url"), 

422 client_id=info.get("client_id"), 

423 client_secret=info.get("client_secret"), 

424 token=info.get("token"), 

425 expiry=expiry, 

426 revoke_url=info.get("revoke_url"), 

427 quota_project_id=info.get("quota_project_id"), 

428 scopes=info.get("scopes"), 

429 universe_domain=info.get( 

430 "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN 

431 ), 

432 trust_boundary=info.get("trust_boundary"), 

433 **kwargs 

434 ) 

435 

436 @classmethod 

437 def from_file(cls, filename, **kwargs): 

438 """Creates a Credentials instance from an external account json file. 

439 

440 **IMPORTANT**: 

441 This method does not validate the credential configuration. A security 

442 risk occurs when a credential configuration configured with malicious urls 

443 is used. 

444 When the credential configuration is accepted from an 

445 untrusted source, you should validate it before using with this method. 

446 Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. 

447 

448 Args: 

449 filename (str): The path to the external account json file. 

450 kwargs: Additional arguments to pass to the constructor. 

451 

452 Returns: 

453 google.auth.external_account_authorized_user.Credentials: The 

454 constructed credentials. 

455 """ 

456 with io.open(filename, "r", encoding="utf-8") as json_file: 

457 data = json.load(json_file) 

458 return cls.from_info(data, **kwargs)