Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/impersonated_credentials.py: 48%

124 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2018 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Google Cloud Impersonated credentials. 

16 

17This module provides authentication for applications where local credentials 

18impersonates a remote service account using `IAM Credentials API`_. 

19 

20This class can be used to impersonate a service account as long as the original 

21Credential object has the "Service Account Token Creator" role on the target 

22service account. 

23 

24 .. _IAM Credentials API: 

25 https://cloud.google.com/iam/credentials/reference/rest/ 

26""" 

27 

28import base64 

29import copy 

30from datetime import datetime 

31import http.client as http_client 

32import json 

33 

34from google.auth import _helpers 

35from google.auth import credentials 

36from google.auth import exceptions 

37from google.auth import jwt 

38from google.auth import metrics 

39 

40_IAM_SCOPE = ["https://www.googleapis.com/auth/iam"] 

41 

42_IAM_ENDPOINT = ( 

43 "https://iamcredentials.googleapis.com/v1/projects/-" 

44 + "/serviceAccounts/{}:generateAccessToken" 

45) 

46 

47_IAM_SIGN_ENDPOINT = ( 

48 "https://iamcredentials.googleapis.com/v1/projects/-" 

49 + "/serviceAccounts/{}:signBlob" 

50) 

51 

52_IAM_IDTOKEN_ENDPOINT = ( 

53 "https://iamcredentials.googleapis.com/v1/" 

54 + "projects/-/serviceAccounts/{}:generateIdToken" 

55) 

56 

57_REFRESH_ERROR = "Unable to acquire impersonated credentials" 

58 

59_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

60 

61_DEFAULT_TOKEN_URI = "https://oauth2.googleapis.com/token" 

62 

63 

64def _make_iam_token_request( 

65 request, principal, headers, body, iam_endpoint_override=None 

66): 

67 """Makes a request to the Google Cloud IAM service for an access token. 

68 Args: 

69 request (Request): The Request object to use. 

70 principal (str): The principal to request an access token for. 

71 headers (Mapping[str, str]): Map of headers to transmit. 

72 body (Mapping[str, str]): JSON Payload body for the iamcredentials 

73 API call. 

74 iam_endpoint_override (Optiona[str]): The full IAM endpoint override 

75 with the target_principal embedded. This is useful when supporting 

76 impersonation with regional endpoints. 

77 

78 Raises: 

79 google.auth.exceptions.TransportError: Raised if there is an underlying 

80 HTTP connection error 

81 google.auth.exceptions.RefreshError: Raised if the impersonated 

82 credentials are not available. Common reasons are 

83 `iamcredentials.googleapis.com` is not enabled or the 

84 `Service Account Token Creator` is not assigned 

85 """ 

86 iam_endpoint = iam_endpoint_override or _IAM_ENDPOINT.format(principal) 

87 

88 body = json.dumps(body).encode("utf-8") 

89 

90 response = request(url=iam_endpoint, method="POST", headers=headers, body=body) 

91 

92 # support both string and bytes type response.data 

93 response_body = ( 

94 response.data.decode("utf-8") 

95 if hasattr(response.data, "decode") 

96 else response.data 

97 ) 

98 

99 if response.status != http_client.OK: 

100 raise exceptions.RefreshError(_REFRESH_ERROR, response_body) 

101 

102 try: 

103 token_response = json.loads(response_body) 

104 token = token_response["accessToken"] 

105 expiry = datetime.strptime(token_response["expireTime"], "%Y-%m-%dT%H:%M:%SZ") 

106 

107 return token, expiry 

108 

109 except (KeyError, ValueError) as caught_exc: 

110 new_exc = exceptions.RefreshError( 

111 "{}: No access token or invalid expiration in response.".format( 

112 _REFRESH_ERROR 

113 ), 

114 response_body, 

115 ) 

116 raise new_exc from caught_exc 

117 

118 

119class Credentials( 

120 credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.Signing 

121): 

122 """This module defines impersonated credentials which are essentially 

123 impersonated identities. 

124 

125 Impersonated Credentials allows credentials issued to a user or 

126 service account to impersonate another. The target service account must 

127 grant the originating credential principal the 

128 `Service Account Token Creator`_ IAM role: 

129 

130 For more information about Token Creator IAM role and 

131 IAMCredentials API, see 

132 `Creating Short-Lived Service Account Credentials`_. 

133 

134 .. _Service Account Token Creator: 

135 https://cloud.google.com/iam/docs/service-accounts#the_service_account_token_creator_role 

136 

137 .. _Creating Short-Lived Service Account Credentials: 

138 https://cloud.google.com/iam/docs/creating-short-lived-service-account-credentials 

139 

140 Usage: 

141 

142 First grant source_credentials the `Service Account Token Creator` 

143 role on the target account to impersonate. In this example, the 

144 service account represented by svc_account.json has the 

145 token creator role on 

146 `impersonated-account@_project_.iam.gserviceaccount.com`. 

147 

148 Enable the IAMCredentials API on the source project: 

149 `gcloud services enable iamcredentials.googleapis.com`. 

150 

151 Initialize a source credential which does not have access to 

152 list bucket:: 

153 

154 from google.oauth2 import service_account 

155 

156 target_scopes = [ 

157 'https://www.googleapis.com/auth/devstorage.read_only'] 

158 

159 source_credentials = ( 

160 service_account.Credentials.from_service_account_file( 

161 '/path/to/svc_account.json', 

162 scopes=target_scopes)) 

163 

164 Now use the source credentials to acquire credentials to impersonate 

165 another service account:: 

166 

167 from google.auth import impersonated_credentials 

168 

169 target_credentials = impersonated_credentials.Credentials( 

170 source_credentials=source_credentials, 

171 target_principal='impersonated-account@_project_.iam.gserviceaccount.com', 

172 target_scopes = target_scopes, 

173 lifetime=500) 

174 

175 Resource access is granted:: 

176 

177 client = storage.Client(credentials=target_credentials) 

178 buckets = client.list_buckets(project='your_project') 

179 for bucket in buckets: 

180 print(bucket.name) 

181 """ 

182 

183 def __init__( 

184 self, 

185 source_credentials, 

186 target_principal, 

187 target_scopes, 

188 delegates=None, 

189 lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

190 quota_project_id=None, 

191 iam_endpoint_override=None, 

192 ): 

193 """ 

194 Args: 

195 source_credentials (google.auth.Credentials): The source credential 

196 used as to acquire the impersonated credentials. 

197 target_principal (str): The service account to impersonate. 

198 target_scopes (Sequence[str]): Scopes to request during the 

199 authorization grant. 

200 delegates (Sequence[str]): The chained list of delegates required 

201 to grant the final access_token. If set, the sequence of 

202 identities must have "Service Account Token Creator" capability 

203 granted to the prceeding identity. For example, if set to 

204 [serviceAccountB, serviceAccountC], the source_credential 

205 must have the Token Creator role on serviceAccountB. 

206 serviceAccountB must have the Token Creator on 

207 serviceAccountC. 

208 Finally, C must have Token Creator on target_principal. 

209 If left unset, source_credential must have that role on 

210 target_principal. 

211 lifetime (int): Number of seconds the delegated credential should 

212 be valid for (upto 3600). 

213 quota_project_id (Optional[str]): The project ID used for quota and billing. 

214 This project may be different from the project used to 

215 create the credentials. 

216 iam_endpoint_override (Optiona[str]): The full IAM endpoint override 

217 with the target_principal embedded. This is useful when supporting 

218 impersonation with regional endpoints. 

219 """ 

220 

221 super(Credentials, self).__init__() 

222 

223 self._source_credentials = copy.copy(source_credentials) 

224 # Service account source credentials must have the _IAM_SCOPE 

225 # added to refresh correctly. User credentials cannot have 

226 # their original scopes modified. 

227 if isinstance(self._source_credentials, credentials.Scoped): 

228 self._source_credentials = self._source_credentials.with_scopes(_IAM_SCOPE) 

229 # If the source credential is service account and self signed jwt 

230 # is needed, we need to create a jwt credential inside it 

231 if ( 

232 hasattr(self._source_credentials, "_create_self_signed_jwt") 

233 and self._source_credentials._always_use_jwt_access 

234 ): 

235 self._source_credentials._create_self_signed_jwt(None) 

236 self._target_principal = target_principal 

237 self._target_scopes = target_scopes 

238 self._delegates = delegates 

239 self._lifetime = lifetime or _DEFAULT_TOKEN_LIFETIME_SECS 

240 self.token = None 

241 self.expiry = _helpers.utcnow() 

242 self._quota_project_id = quota_project_id 

243 self._iam_endpoint_override = iam_endpoint_override 

244 

245 def _metric_header_for_usage(self): 

246 return metrics.CRED_TYPE_SA_IMPERSONATE 

247 

248 @_helpers.copy_docstring(credentials.Credentials) 

249 def refresh(self, request): 

250 self._update_token(request) 

251 

252 def _update_token(self, request): 

253 """Updates credentials with a new access_token representing 

254 the impersonated account. 

255 

256 Args: 

257 request (google.auth.transport.requests.Request): Request object 

258 to use for refreshing credentials. 

259 """ 

260 

261 # Refresh our source credentials if it is not valid. 

262 if not self._source_credentials.valid: 

263 self._source_credentials.refresh(request) 

264 

265 body = { 

266 "delegates": self._delegates, 

267 "scope": self._target_scopes, 

268 "lifetime": str(self._lifetime) + "s", 

269 } 

270 

271 headers = { 

272 "Content-Type": "application/json", 

273 metrics.API_CLIENT_HEADER: metrics.token_request_access_token_impersonate(), 

274 } 

275 

276 # Apply the source credentials authentication info. 

277 self._source_credentials.apply(headers) 

278 

279 self.token, self.expiry = _make_iam_token_request( 

280 request=request, 

281 principal=self._target_principal, 

282 headers=headers, 

283 body=body, 

284 iam_endpoint_override=self._iam_endpoint_override, 

285 ) 

286 

287 def sign_bytes(self, message): 

288 from google.auth.transport.requests import AuthorizedSession 

289 

290 iam_sign_endpoint = _IAM_SIGN_ENDPOINT.format(self._target_principal) 

291 

292 body = { 

293 "payload": base64.b64encode(message).decode("utf-8"), 

294 "delegates": self._delegates, 

295 } 

296 

297 headers = {"Content-Type": "application/json"} 

298 

299 authed_session = AuthorizedSession(self._source_credentials) 

300 

301 try: 

302 response = authed_session.post( 

303 url=iam_sign_endpoint, headers=headers, json=body 

304 ) 

305 finally: 

306 authed_session.close() 

307 

308 if response.status_code != http_client.OK: 

309 raise exceptions.TransportError( 

310 "Error calling sign_bytes: {}".format(response.json()) 

311 ) 

312 

313 return base64.b64decode(response.json()["signedBlob"]) 

314 

315 @property 

316 def signer_email(self): 

317 return self._target_principal 

318 

319 @property 

320 def service_account_email(self): 

321 return self._target_principal 

322 

323 @property 

324 def signer(self): 

325 return self 

326 

327 @property 

328 def requires_scopes(self): 

329 return not self._target_scopes 

330 

331 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

332 def with_quota_project(self, quota_project_id): 

333 return self.__class__( 

334 self._source_credentials, 

335 target_principal=self._target_principal, 

336 target_scopes=self._target_scopes, 

337 delegates=self._delegates, 

338 lifetime=self._lifetime, 

339 quota_project_id=quota_project_id, 

340 iam_endpoint_override=self._iam_endpoint_override, 

341 ) 

342 

343 @_helpers.copy_docstring(credentials.Scoped) 

344 def with_scopes(self, scopes, default_scopes=None): 

345 return self.__class__( 

346 self._source_credentials, 

347 target_principal=self._target_principal, 

348 target_scopes=scopes or default_scopes, 

349 delegates=self._delegates, 

350 lifetime=self._lifetime, 

351 quota_project_id=self._quota_project_id, 

352 iam_endpoint_override=self._iam_endpoint_override, 

353 ) 

354 

355 

356class IDTokenCredentials(credentials.CredentialsWithQuotaProject): 

357 """Open ID Connect ID Token-based service account credentials. 

358 

359 """ 

360 

361 def __init__( 

362 self, 

363 target_credentials, 

364 target_audience=None, 

365 include_email=False, 

366 quota_project_id=None, 

367 ): 

368 """ 

369 Args: 

370 target_credentials (google.auth.Credentials): The target 

371 credential used as to acquire the id tokens for. 

372 target_audience (string): Audience to issue the token for. 

373 include_email (bool): Include email in IdToken 

374 quota_project_id (Optional[str]): The project ID used for 

375 quota and billing. 

376 """ 

377 super(IDTokenCredentials, self).__init__() 

378 

379 if not isinstance(target_credentials, Credentials): 

380 raise exceptions.GoogleAuthError( 

381 "Provided Credential must be " "impersonated_credentials" 

382 ) 

383 self._target_credentials = target_credentials 

384 self._target_audience = target_audience 

385 self._include_email = include_email 

386 self._quota_project_id = quota_project_id 

387 

388 def from_credentials(self, target_credentials, target_audience=None): 

389 return self.__class__( 

390 target_credentials=target_credentials, 

391 target_audience=target_audience, 

392 include_email=self._include_email, 

393 quota_project_id=self._quota_project_id, 

394 ) 

395 

396 def with_target_audience(self, target_audience): 

397 return self.__class__( 

398 target_credentials=self._target_credentials, 

399 target_audience=target_audience, 

400 include_email=self._include_email, 

401 quota_project_id=self._quota_project_id, 

402 ) 

403 

404 def with_include_email(self, include_email): 

405 return self.__class__( 

406 target_credentials=self._target_credentials, 

407 target_audience=self._target_audience, 

408 include_email=include_email, 

409 quota_project_id=self._quota_project_id, 

410 ) 

411 

412 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

413 def with_quota_project(self, quota_project_id): 

414 return self.__class__( 

415 target_credentials=self._target_credentials, 

416 target_audience=self._target_audience, 

417 include_email=self._include_email, 

418 quota_project_id=quota_project_id, 

419 ) 

420 

421 @_helpers.copy_docstring(credentials.Credentials) 

422 def refresh(self, request): 

423 from google.auth.transport.requests import AuthorizedSession 

424 

425 iam_sign_endpoint = _IAM_IDTOKEN_ENDPOINT.format( 

426 self._target_credentials.signer_email 

427 ) 

428 

429 body = { 

430 "audience": self._target_audience, 

431 "delegates": self._target_credentials._delegates, 

432 "includeEmail": self._include_email, 

433 } 

434 

435 headers = { 

436 "Content-Type": "application/json", 

437 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_impersonate(), 

438 } 

439 

440 authed_session = AuthorizedSession( 

441 self._target_credentials._source_credentials, auth_request=request 

442 ) 

443 

444 try: 

445 response = authed_session.post( 

446 url=iam_sign_endpoint, 

447 headers=headers, 

448 data=json.dumps(body).encode("utf-8"), 

449 ) 

450 finally: 

451 authed_session.close() 

452 

453 if response.status_code != http_client.OK: 

454 raise exceptions.RefreshError( 

455 "Error getting ID token: {}".format(response.json()) 

456 ) 

457 

458 id_token = response.json()["token"] 

459 self.token = id_token 

460 self.expiry = datetime.utcfromtimestamp( 

461 jwt.decode(id_token, verify=False)["exp"] 

462 )