Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/impersonated_credentials.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

134 statements  

1# Copyright 2018 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Google Cloud Impersonated credentials. 

16 

17This module provides authentication for applications where local credentials 

18impersonates a remote service account using `IAM Credentials API`_. 

19 

20This class can be used to impersonate a service account as long as the original 

21Credential object has the "Service Account Token Creator" role on the target 

22service account. 

23 

24 .. _IAM Credentials API: 

25 https://cloud.google.com/iam/credentials/reference/rest/ 

26""" 

27 

28import base64 

29import copy 

30from datetime import datetime 

31import http.client as http_client 

32import json 

33 

34from google.auth import _helpers 

35from google.auth import credentials 

36from google.auth import exceptions 

37from google.auth import iam 

38from google.auth import jwt 

39from google.auth import metrics 

40 

41 

42_REFRESH_ERROR = "Unable to acquire impersonated credentials" 

43 

44_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

45 

46 

47def _make_iam_token_request( 

48 request, principal, headers, body, iam_endpoint_override=None 

49): 

50 """Makes a request to the Google Cloud IAM service for an access token. 

51 Args: 

52 request (Request): The Request object to use. 

53 principal (str): The principal to request an access token for. 

54 headers (Mapping[str, str]): Map of headers to transmit. 

55 body (Mapping[str, str]): JSON Payload body for the iamcredentials 

56 API call. 

57 iam_endpoint_override (Optiona[str]): The full IAM endpoint override 

58 with the target_principal embedded. This is useful when supporting 

59 impersonation with regional endpoints. 

60 

61 Raises: 

62 google.auth.exceptions.TransportError: Raised if there is an underlying 

63 HTTP connection error 

64 google.auth.exceptions.RefreshError: Raised if the impersonated 

65 credentials are not available. Common reasons are 

66 `iamcredentials.googleapis.com` is not enabled or the 

67 `Service Account Token Creator` is not assigned 

68 """ 

69 iam_endpoint = iam_endpoint_override or iam._IAM_ENDPOINT.format(principal) 

70 

71 body = json.dumps(body).encode("utf-8") 

72 

73 response = request(url=iam_endpoint, method="POST", headers=headers, body=body) 

74 

75 # support both string and bytes type response.data 

76 response_body = ( 

77 response.data.decode("utf-8") 

78 if hasattr(response.data, "decode") 

79 else response.data 

80 ) 

81 

82 if response.status != http_client.OK: 

83 raise exceptions.RefreshError(_REFRESH_ERROR, response_body) 

84 

85 try: 

86 token_response = json.loads(response_body) 

87 token = token_response["accessToken"] 

88 expiry = datetime.strptime(token_response["expireTime"], "%Y-%m-%dT%H:%M:%SZ") 

89 

90 return token, expiry 

91 

92 except (KeyError, ValueError) as caught_exc: 

93 new_exc = exceptions.RefreshError( 

94 "{}: No access token or invalid expiration in response.".format( 

95 _REFRESH_ERROR 

96 ), 

97 response_body, 

98 ) 

99 raise new_exc from caught_exc 

100 

101 

102class Credentials( 

103 credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.Signing 

104): 

105 """This module defines impersonated credentials which are essentially 

106 impersonated identities. 

107 

108 Impersonated Credentials allows credentials issued to a user or 

109 service account to impersonate another. The target service account must 

110 grant the originating credential principal the 

111 `Service Account Token Creator`_ IAM role: 

112 

113 For more information about Token Creator IAM role and 

114 IAMCredentials API, see 

115 `Creating Short-Lived Service Account Credentials`_. 

116 

117 .. _Service Account Token Creator: 

118 https://cloud.google.com/iam/docs/service-accounts#the_service_account_token_creator_role 

119 

120 .. _Creating Short-Lived Service Account Credentials: 

121 https://cloud.google.com/iam/docs/creating-short-lived-service-account-credentials 

122 

123 Usage: 

124 

125 First grant source_credentials the `Service Account Token Creator` 

126 role on the target account to impersonate. In this example, the 

127 service account represented by svc_account.json has the 

128 token creator role on 

129 `impersonated-account@_project_.iam.gserviceaccount.com`. 

130 

131 Enable the IAMCredentials API on the source project: 

132 `gcloud services enable iamcredentials.googleapis.com`. 

133 

134 Initialize a source credential which does not have access to 

135 list bucket:: 

136 

137 from google.oauth2 import service_account 

138 

139 target_scopes = [ 

140 'https://www.googleapis.com/auth/devstorage.read_only'] 

141 

142 source_credentials = ( 

143 service_account.Credentials.from_service_account_file( 

144 '/path/to/svc_account.json', 

145 scopes=target_scopes)) 

146 

147 Now use the source credentials to acquire credentials to impersonate 

148 another service account:: 

149 

150 from google.auth import impersonated_credentials 

151 

152 target_credentials = impersonated_credentials.Credentials( 

153 source_credentials=source_credentials, 

154 target_principal='impersonated-account@_project_.iam.gserviceaccount.com', 

155 target_scopes = target_scopes, 

156 lifetime=500) 

157 

158 Resource access is granted:: 

159 

160 client = storage.Client(credentials=target_credentials) 

161 buckets = client.list_buckets(project='your_project') 

162 for bucket in buckets: 

163 print(bucket.name) 

164 """ 

165 

166 def __init__( 

167 self, 

168 source_credentials, 

169 target_principal, 

170 target_scopes, 

171 delegates=None, 

172 lifetime=_DEFAULT_TOKEN_LIFETIME_SECS, 

173 quota_project_id=None, 

174 iam_endpoint_override=None, 

175 ): 

176 """ 

177 Args: 

178 source_credentials (google.auth.Credentials): The source credential 

179 used as to acquire the impersonated credentials. 

180 target_principal (str): The service account to impersonate. 

181 target_scopes (Sequence[str]): Scopes to request during the 

182 authorization grant. 

183 delegates (Sequence[str]): The chained list of delegates required 

184 to grant the final access_token. If set, the sequence of 

185 identities must have "Service Account Token Creator" capability 

186 granted to the prceeding identity. For example, if set to 

187 [serviceAccountB, serviceAccountC], the source_credential 

188 must have the Token Creator role on serviceAccountB. 

189 serviceAccountB must have the Token Creator on 

190 serviceAccountC. 

191 Finally, C must have Token Creator on target_principal. 

192 If left unset, source_credential must have that role on 

193 target_principal. 

194 lifetime (int): Number of seconds the delegated credential should 

195 be valid for (upto 3600). 

196 quota_project_id (Optional[str]): The project ID used for quota and billing. 

197 This project may be different from the project used to 

198 create the credentials. 

199 iam_endpoint_override (Optiona[str]): The full IAM endpoint override 

200 with the target_principal embedded. This is useful when supporting 

201 impersonation with regional endpoints. 

202 """ 

203 

204 super(Credentials, self).__init__() 

205 

206 self._source_credentials = copy.copy(source_credentials) 

207 # Service account source credentials must have the _IAM_SCOPE 

208 # added to refresh correctly. User credentials cannot have 

209 # their original scopes modified. 

210 if isinstance(self._source_credentials, credentials.Scoped): 

211 self._source_credentials = self._source_credentials.with_scopes( 

212 iam._IAM_SCOPE 

213 ) 

214 # If the source credential is service account and self signed jwt 

215 # is needed, we need to create a jwt credential inside it 

216 if ( 

217 hasattr(self._source_credentials, "_create_self_signed_jwt") 

218 and self._source_credentials._always_use_jwt_access 

219 ): 

220 self._source_credentials._create_self_signed_jwt(None) 

221 self._target_principal = target_principal 

222 self._target_scopes = target_scopes 

223 self._delegates = delegates 

224 self._lifetime = lifetime or _DEFAULT_TOKEN_LIFETIME_SECS 

225 self.token = None 

226 self.expiry = _helpers.utcnow() 

227 self._quota_project_id = quota_project_id 

228 self._iam_endpoint_override = iam_endpoint_override 

229 self._cred_file_path = None 

230 

231 def _metric_header_for_usage(self): 

232 return metrics.CRED_TYPE_SA_IMPERSONATE 

233 

234 @_helpers.copy_docstring(credentials.Credentials) 

235 def refresh(self, request): 

236 self._update_token(request) 

237 

238 def _update_token(self, request): 

239 """Updates credentials with a new access_token representing 

240 the impersonated account. 

241 

242 Args: 

243 request (google.auth.transport.requests.Request): Request object 

244 to use for refreshing credentials. 

245 """ 

246 

247 # Refresh our source credentials if it is not valid. 

248 if ( 

249 self._source_credentials.token_state == credentials.TokenState.STALE 

250 or self._source_credentials.token_state == credentials.TokenState.INVALID 

251 ): 

252 self._source_credentials.refresh(request) 

253 

254 body = { 

255 "delegates": self._delegates, 

256 "scope": self._target_scopes, 

257 "lifetime": str(self._lifetime) + "s", 

258 } 

259 

260 headers = { 

261 "Content-Type": "application/json", 

262 metrics.API_CLIENT_HEADER: metrics.token_request_access_token_impersonate(), 

263 } 

264 

265 # Apply the source credentials authentication info. 

266 self._source_credentials.apply(headers) 

267 

268 self.token, self.expiry = _make_iam_token_request( 

269 request=request, 

270 principal=self._target_principal, 

271 headers=headers, 

272 body=body, 

273 iam_endpoint_override=self._iam_endpoint_override, 

274 ) 

275 

276 def sign_bytes(self, message): 

277 from google.auth.transport.requests import AuthorizedSession 

278 

279 iam_sign_endpoint = iam._IAM_SIGN_ENDPOINT.format(self._target_principal) 

280 

281 body = { 

282 "payload": base64.b64encode(message).decode("utf-8"), 

283 "delegates": self._delegates, 

284 } 

285 

286 headers = {"Content-Type": "application/json"} 

287 

288 authed_session = AuthorizedSession(self._source_credentials) 

289 

290 try: 

291 response = authed_session.post( 

292 url=iam_sign_endpoint, headers=headers, json=body 

293 ) 

294 finally: 

295 authed_session.close() 

296 

297 if response.status_code != http_client.OK: 

298 raise exceptions.TransportError( 

299 "Error calling sign_bytes: {}".format(response.json()) 

300 ) 

301 

302 return base64.b64decode(response.json()["signedBlob"]) 

303 

304 @property 

305 def signer_email(self): 

306 return self._target_principal 

307 

308 @property 

309 def service_account_email(self): 

310 return self._target_principal 

311 

312 @property 

313 def signer(self): 

314 return self 

315 

316 @property 

317 def requires_scopes(self): 

318 return not self._target_scopes 

319 

320 @_helpers.copy_docstring(credentials.Credentials) 

321 def get_cred_info(self): 

322 if self._cred_file_path: 

323 return { 

324 "credential_source": self._cred_file_path, 

325 "credential_type": "impersonated credentials", 

326 "principal": self._target_principal, 

327 } 

328 return None 

329 

330 def _make_copy(self): 

331 cred = self.__class__( 

332 self._source_credentials, 

333 target_principal=self._target_principal, 

334 target_scopes=self._target_scopes, 

335 delegates=self._delegates, 

336 lifetime=self._lifetime, 

337 quota_project_id=self._quota_project_id, 

338 iam_endpoint_override=self._iam_endpoint_override, 

339 ) 

340 cred._cred_file_path = self._cred_file_path 

341 return cred 

342 

343 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

344 def with_quota_project(self, quota_project_id): 

345 cred = self._make_copy() 

346 cred._quota_project_id = quota_project_id 

347 return cred 

348 

349 @_helpers.copy_docstring(credentials.Scoped) 

350 def with_scopes(self, scopes, default_scopes=None): 

351 cred = self._make_copy() 

352 cred._target_scopes = scopes or default_scopes 

353 return cred 

354 

355 

356class IDTokenCredentials(credentials.CredentialsWithQuotaProject): 

357 """Open ID Connect ID Token-based service account credentials. 

358 

359 """ 

360 

361 def __init__( 

362 self, 

363 target_credentials, 

364 target_audience=None, 

365 include_email=False, 

366 quota_project_id=None, 

367 ): 

368 """ 

369 Args: 

370 target_credentials (google.auth.Credentials): The target 

371 credential used as to acquire the id tokens for. 

372 target_audience (string): Audience to issue the token for. 

373 include_email (bool): Include email in IdToken 

374 quota_project_id (Optional[str]): The project ID used for 

375 quota and billing. 

376 """ 

377 super(IDTokenCredentials, self).__init__() 

378 

379 if not isinstance(target_credentials, Credentials): 

380 raise exceptions.GoogleAuthError( 

381 "Provided Credential must be " "impersonated_credentials" 

382 ) 

383 self._target_credentials = target_credentials 

384 self._target_audience = target_audience 

385 self._include_email = include_email 

386 self._quota_project_id = quota_project_id 

387 

388 def from_credentials(self, target_credentials, target_audience=None): 

389 return self.__class__( 

390 target_credentials=target_credentials, 

391 target_audience=target_audience, 

392 include_email=self._include_email, 

393 quota_project_id=self._quota_project_id, 

394 ) 

395 

396 def with_target_audience(self, target_audience): 

397 return self.__class__( 

398 target_credentials=self._target_credentials, 

399 target_audience=target_audience, 

400 include_email=self._include_email, 

401 quota_project_id=self._quota_project_id, 

402 ) 

403 

404 def with_include_email(self, include_email): 

405 return self.__class__( 

406 target_credentials=self._target_credentials, 

407 target_audience=self._target_audience, 

408 include_email=include_email, 

409 quota_project_id=self._quota_project_id, 

410 ) 

411 

412 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

413 def with_quota_project(self, quota_project_id): 

414 return self.__class__( 

415 target_credentials=self._target_credentials, 

416 target_audience=self._target_audience, 

417 include_email=self._include_email, 

418 quota_project_id=quota_project_id, 

419 ) 

420 

421 @_helpers.copy_docstring(credentials.Credentials) 

422 def refresh(self, request): 

423 from google.auth.transport.requests import AuthorizedSession 

424 

425 iam_sign_endpoint = iam._IAM_IDTOKEN_ENDPOINT.format( 

426 self._target_credentials.signer_email 

427 ) 

428 

429 body = { 

430 "audience": self._target_audience, 

431 "delegates": self._target_credentials._delegates, 

432 "includeEmail": self._include_email, 

433 } 

434 

435 headers = { 

436 "Content-Type": "application/json", 

437 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_impersonate(), 

438 } 

439 

440 authed_session = AuthorizedSession( 

441 self._target_credentials._source_credentials, auth_request=request 

442 ) 

443 

444 try: 

445 response = authed_session.post( 

446 url=iam_sign_endpoint, 

447 headers=headers, 

448 data=json.dumps(body).encode("utf-8"), 

449 ) 

450 finally: 

451 authed_session.close() 

452 

453 if response.status_code != http_client.OK: 

454 raise exceptions.RefreshError( 

455 "Error getting ID token: {}".format(response.json()) 

456 ) 

457 

458 id_token = response.json()["token"] 

459 self.token = id_token 

460 self.expiry = datetime.utcfromtimestamp( 

461 jwt.decode(id_token, verify=False)["exp"] 

462 )