Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/external_account_authorized_user.py: 45%

110 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:22 +0000

1# Copyright 2022 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""External Account Authorized User Credentials. 

16This module provides credentials based on OAuth 2.0 access and refresh tokens. 

17These credentials usually access resources on behalf of a user (resource 

18owner). 

19 

20Specifically, these are sourced using external identities via Workforce Identity Federation. 

21 

22Obtaining the initial access and refresh token can be done through the Google Cloud CLI. 

23 

24Example credential: 

25{ 

26 "type": "external_account_authorized_user", 

27 "audience": "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID", 

28 "refresh_token": "refreshToken", 

29 "token_url": "https://sts.googleapis.com/v1/oauth/token", 

30 "token_info_url": "https://sts.googleapis.com/v1/instrospect", 

31 "client_id": "clientId", 

32 "client_secret": "clientSecret" 

33} 

34""" 

35 

36import datetime 

37import io 

38import json 

39 

40from google.auth import _helpers 

41from google.auth import credentials 

42from google.auth import exceptions 

43from google.oauth2 import sts 

44from google.oauth2 import utils 

45 

46_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE = "external_account_authorized_user" 

47 

48 

49class Credentials( 

50 credentials.CredentialsWithQuotaProject, 

51 credentials.ReadOnlyScoped, 

52 credentials.CredentialsWithTokenUri, 

53): 

54 """Credentials for External Account Authorized Users. 

55 

56 This is used to instantiate Credentials for exchanging refresh tokens from 

57 authorized users for Google access token and authorizing requests to Google 

58 APIs. 

59 

60 The credentials are considered immutable. If you want to modify the 

61 quota project, use `with_quota_project` and if you want to modify the token 

62 uri, use `with_token_uri`. 

63 """ 

64 

65 def __init__( 

66 self, 

67 token=None, 

68 expiry=None, 

69 refresh_token=None, 

70 audience=None, 

71 client_id=None, 

72 client_secret=None, 

73 token_url=None, 

74 token_info_url=None, 

75 revoke_url=None, 

76 scopes=None, 

77 quota_project_id=None, 

78 ): 

79 """Instantiates a external account authorized user credentials object. 

80 

81 Args: 

82 token (str): The OAuth 2.0 access token. Can be None if refresh information 

83 is provided. 

84 expiry (datetime.datetime): The optional expiration datetime of the OAuth 2.0 access 

85 token. 

86 refresh_token (str): The optional OAuth 2.0 refresh token. If specified, 

87 credentials can be refreshed. 

88 audience (str): The optional STS audience which contains the resource name for the workforce 

89 pool and the provider identifier in that pool. 

90 client_id (str): The OAuth 2.0 client ID. Must be specified for refresh, can be left as 

91 None if the token can not be refreshed. 

92 client_secret (str): The OAuth 2.0 client secret. Must be specified for refresh, can be 

93 left as None if the token can not be refreshed. 

94 token_url (str): The optional STS token exchange endpoint for refresh. Must be specified for 

95 refresh, can be left as None if the token can not be refreshed. 

96 token_info_url (str): The optional STS endpoint URL for token introspection. 

97 revoke_url (str): The optional STS endpoint URL for revoking tokens. 

98 quota_project_id (str): The optional project ID used for quota and billing. 

99 This project may be different from the project used to 

100 create the credentials. 

101 

102 Returns: 

103 google.auth.external_account_authorized_user.Credentials: The 

104 constructed credentials. 

105 """ 

106 super(Credentials, self).__init__() 

107 

108 self.token = token 

109 self.expiry = expiry 

110 self._audience = audience 

111 self._refresh_token = refresh_token 

112 self._token_url = token_url 

113 self._token_info_url = token_info_url 

114 self._client_id = client_id 

115 self._client_secret = client_secret 

116 self._revoke_url = revoke_url 

117 self._quota_project_id = quota_project_id 

118 self._scopes = scopes 

119 

120 if not self.valid and not self.can_refresh: 

121 raise exceptions.InvalidOperation( 

122 "Token should be created with fields to make it valid (`token` and " 

123 "`expiry`), or fields to allow it to refresh (`refresh_token`, " 

124 "`token_url`, `client_id`, `client_secret`)." 

125 ) 

126 

127 self._client_auth = None 

128 if self._client_id: 

129 self._client_auth = utils.ClientAuthentication( 

130 utils.ClientAuthType.basic, self._client_id, self._client_secret 

131 ) 

132 self._sts_client = sts.Client(self._token_url, self._client_auth) 

133 

134 @property 

135 def info(self): 

136 """Generates the serializable dictionary representation of the current 

137 credentials. 

138 

139 Returns: 

140 Mapping: The dictionary representation of the credentials. This is the 

141 reverse of the "from_info" method defined in this class. It is 

142 useful for serializing the current credentials so it can deserialized 

143 later. 

144 """ 

145 config_info = self.constructor_args() 

146 config_info.update(type=_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE) 

147 if config_info["expiry"]: 

148 config_info["expiry"] = config_info["expiry"].isoformat() + "Z" 

149 

150 return {key: value for key, value in config_info.items() if value is not None} 

151 

152 def constructor_args(self): 

153 return { 

154 "audience": self._audience, 

155 "refresh_token": self._refresh_token, 

156 "token_url": self._token_url, 

157 "token_info_url": self._token_info_url, 

158 "client_id": self._client_id, 

159 "client_secret": self._client_secret, 

160 "token": self.token, 

161 "expiry": self.expiry, 

162 "revoke_url": self._revoke_url, 

163 "scopes": self._scopes, 

164 "quota_project_id": self._quota_project_id, 

165 } 

166 

167 @property 

168 def scopes(self): 

169 """Optional[str]: The OAuth 2.0 permission scopes.""" 

170 return self._scopes 

171 

172 @property 

173 def requires_scopes(self): 

174 """ False: OAuth 2.0 credentials have their scopes set when 

175 the initial token is requested and can not be changed.""" 

176 return False 

177 

178 @property 

179 def client_id(self): 

180 """Optional[str]: The OAuth 2.0 client ID.""" 

181 return self._client_id 

182 

183 @property 

184 def client_secret(self): 

185 """Optional[str]: The OAuth 2.0 client secret.""" 

186 return self._client_secret 

187 

188 @property 

189 def audience(self): 

190 """Optional[str]: The STS audience which contains the resource name for the 

191 workforce pool and the provider identifier in that pool.""" 

192 return self._audience 

193 

194 @property 

195 def refresh_token(self): 

196 """Optional[str]: The OAuth 2.0 refresh token.""" 

197 return self._refresh_token 

198 

199 @property 

200 def token_url(self): 

201 """Optional[str]: The STS token exchange endpoint for refresh.""" 

202 return self._token_url 

203 

204 @property 

205 def token_info_url(self): 

206 """Optional[str]: The STS endpoint for token info.""" 

207 return self._token_info_url 

208 

209 @property 

210 def revoke_url(self): 

211 """Optional[str]: The STS endpoint for token revocation.""" 

212 return self._revoke_url 

213 

214 @property 

215 def is_user(self): 

216 """ True: This credential always represents a user.""" 

217 return True 

218 

219 @property 

220 def can_refresh(self): 

221 return all( 

222 (self._refresh_token, self._token_url, self._client_id, self._client_secret) 

223 ) 

224 

225 def get_project_id(self, request=None): 

226 """Retrieves the project ID corresponding to the workload identity or workforce pool. 

227 For workforce pool credentials, it returns the project ID corresponding to 

228 the workforce_pool_user_project. 

229 

230 When not determinable, None is returned. 

231 

232 Args: 

233 request (google.auth.transport.requests.Request): Request object. 

234 Unused here, but passed from _default.default(). 

235 

236 Return: 

237 str: project ID is not determinable for this credential type so it returns None 

238 """ 

239 

240 return None 

241 

242 def to_json(self, strip=None): 

243 """Utility function that creates a JSON representation of this 

244 credential. 

245 Args: 

246 strip (Sequence[str]): Optional list of members to exclude from the 

247 generated JSON. 

248 Returns: 

249 str: A JSON representation of this instance. When converted into 

250 a dictionary, it can be passed to from_info() 

251 to create a new instance. 

252 """ 

253 strip = strip if strip else [] 

254 return json.dumps({k: v for (k, v) in self.info.items() if k not in strip}) 

255 

256 def refresh(self, request): 

257 """Refreshes the access token. 

258 

259 Args: 

260 request (google.auth.transport.Request): The object used to make 

261 HTTP requests. 

262 

263 Raises: 

264 google.auth.exceptions.RefreshError: If the credentials could 

265 not be refreshed. 

266 """ 

267 if not self.can_refresh: 

268 raise exceptions.RefreshError( 

269 "The credentials do not contain the necessary fields need to " 

270 "refresh the access token. You must specify refresh_token, " 

271 "token_url, client_id, and client_secret." 

272 ) 

273 

274 now = _helpers.utcnow() 

275 response_data = self._make_sts_request(request) 

276 

277 self.token = response_data.get("access_token") 

278 

279 lifetime = datetime.timedelta(seconds=response_data.get("expires_in")) 

280 self.expiry = now + lifetime 

281 

282 if "refresh_token" in response_data: 

283 self._refresh_token = response_data["refresh_token"] 

284 

285 def _make_sts_request(self, request): 

286 return self._sts_client.refresh_token(request, self._refresh_token) 

287 

288 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

289 def with_quota_project(self, quota_project_id): 

290 kwargs = self.constructor_args() 

291 kwargs.update(quota_project_id=quota_project_id) 

292 return self.__class__(**kwargs) 

293 

294 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

295 def with_token_uri(self, token_uri): 

296 kwargs = self.constructor_args() 

297 kwargs.update(token_url=token_uri) 

298 return self.__class__(**kwargs) 

299 

300 @classmethod 

301 def from_info(cls, info, **kwargs): 

302 """Creates a Credentials instance from parsed external account info. 

303 

304 Args: 

305 info (Mapping[str, str]): The external account info in Google 

306 format. 

307 kwargs: Additional arguments to pass to the constructor. 

308 

309 Returns: 

310 google.auth.external_account_authorized_user.Credentials: The 

311 constructed credentials. 

312 

313 Raises: 

314 ValueError: For invalid parameters. 

315 """ 

316 expiry = info.get("expiry") 

317 if expiry: 

318 expiry = datetime.datetime.strptime( 

319 expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S" 

320 ) 

321 return cls( 

322 audience=info.get("audience"), 

323 refresh_token=info.get("refresh_token"), 

324 token_url=info.get("token_url"), 

325 token_info_url=info.get("token_info_url"), 

326 client_id=info.get("client_id"), 

327 client_secret=info.get("client_secret"), 

328 token=info.get("token"), 

329 expiry=expiry, 

330 revoke_url=info.get("revoke_url"), 

331 quota_project_id=info.get("quota_project_id"), 

332 scopes=info.get("scopes"), 

333 **kwargs 

334 ) 

335 

336 @classmethod 

337 def from_file(cls, filename, **kwargs): 

338 """Creates a Credentials instance from an external account json file. 

339 

340 Args: 

341 filename (str): The path to the external account json file. 

342 kwargs: Additional arguments to pass to the constructor. 

343 

344 Returns: 

345 google.auth.external_account_authorized_user.Credentials: The 

346 constructed credentials. 

347 """ 

348 with io.open(filename, "r", encoding="utf-8") as json_file: 

349 data = json.load(json_file) 

350 return cls.from_info(data, **kwargs)