Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/compute_engine/credentials.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

148 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Google Compute Engine credentials. 

16 

17This module provides authentication for an application running on Google 

18Compute Engine using the Compute Engine metadata server. 

19 

20""" 

21 

22import datetime 

23 

24from google.auth import _helpers 

25from google.auth import credentials 

26from google.auth import exceptions 

27from google.auth import iam 

28from google.auth import jwt 

29from google.auth import metrics 

30from google.auth.compute_engine import _metadata 

31from google.oauth2 import _client 

32 

33 

34class Credentials( 

35 credentials.Scoped, 

36 credentials.CredentialsWithQuotaProject, 

37 credentials.CredentialsWithUniverseDomain, 

38): 

39 """Compute Engine Credentials. 

40 

41 These credentials use the Google Compute Engine metadata server to obtain 

42 OAuth 2.0 access tokens associated with the instance's service account, 

43 and are also used for Cloud Run, Flex and App Engine (except for the Python 

44 2.7 runtime, which is supported only on older versions of this library). 

45 

46 For more information about Compute Engine authentication, including how 

47 to configure scopes, see the `Compute Engine authentication 

48 documentation`_. 

49 

50 .. note:: On Compute Engine the metadata server ignores requested scopes. 

51 On Cloud Run, Flex and App Engine the server honours requested scopes. 

52 

53 .. _Compute Engine authentication documentation: 

54 https://cloud.google.com/compute/docs/authentication#using 

55 """ 

56 

57 def __init__( 

58 self, 

59 service_account_email="default", 

60 quota_project_id=None, 

61 scopes=None, 

62 default_scopes=None, 

63 universe_domain=None, 

64 ): 

65 """ 

66 Args: 

67 service_account_email (str): The service account email to use, or 

68 'default'. A Compute Engine instance may have multiple service 

69 accounts. 

70 quota_project_id (Optional[str]): The project ID used for quota and 

71 billing. 

72 scopes (Optional[Sequence[str]]): The list of scopes for the credentials. 

73 default_scopes (Optional[Sequence[str]]): Default scopes passed by a 

74 Google client library. Use 'scopes' for user-defined scopes. 

75 universe_domain (Optional[str]): The universe domain. If not 

76 provided or None, credential will attempt to fetch the value 

77 from metadata server. If metadata server doesn't have universe 

78 domain endpoint, then the default googleapis.com will be used. 

79 """ 

80 super(Credentials, self).__init__() 

81 self._service_account_email = service_account_email 

82 self._quota_project_id = quota_project_id 

83 self._scopes = scopes 

84 self._default_scopes = default_scopes 

85 self._universe_domain_cached = False 

86 if universe_domain: 

87 self._universe_domain = universe_domain 

88 self._universe_domain_cached = True 

89 

90 def _retrieve_info(self, request): 

91 """Retrieve information about the service account. 

92 

93 Updates the scopes and retrieves the full service account email. 

94 

95 Args: 

96 request (google.auth.transport.Request): The object used to make 

97 HTTP requests. 

98 """ 

99 info = _metadata.get_service_account_info( 

100 request, service_account=self._service_account_email 

101 ) 

102 

103 self._service_account_email = info["email"] 

104 

105 # Don't override scopes requested by the user. 

106 if self._scopes is None: 

107 self._scopes = info["scopes"] 

108 

109 def _metric_header_for_usage(self): 

110 return metrics.CRED_TYPE_SA_MDS 

111 

112 def refresh(self, request): 

113 """Refresh the access token and scopes. 

114 

115 Args: 

116 request (google.auth.transport.Request): The object used to make 

117 HTTP requests. 

118 

119 Raises: 

120 google.auth.exceptions.RefreshError: If the Compute Engine metadata 

121 service can't be reached if if the instance has not 

122 credentials. 

123 """ 

124 scopes = self._scopes if self._scopes is not None else self._default_scopes 

125 try: 

126 self._retrieve_info(request) 

127 self.token, self.expiry = _metadata.get_service_account_token( 

128 request, service_account=self._service_account_email, scopes=scopes 

129 ) 

130 except exceptions.TransportError as caught_exc: 

131 new_exc = exceptions.RefreshError(caught_exc) 

132 raise new_exc from caught_exc 

133 

134 @property 

135 def service_account_email(self): 

136 """The service account email. 

137 

138 .. note:: This is not guaranteed to be set until :meth:`refresh` has been 

139 called. 

140 """ 

141 return self._service_account_email 

142 

143 @property 

144 def requires_scopes(self): 

145 return not self._scopes 

146 

147 @property 

148 def universe_domain(self): 

149 if self._universe_domain_cached: 

150 return self._universe_domain 

151 

152 from google.auth.transport import requests as google_auth_requests 

153 

154 self._universe_domain = _metadata.get_universe_domain( 

155 google_auth_requests.Request() 

156 ) 

157 self._universe_domain_cached = True 

158 return self._universe_domain 

159 

160 @_helpers.copy_docstring(credentials.Credentials) 

161 def get_cred_info(self): 

162 return { 

163 "credential_source": "metadata server", 

164 "credential_type": "VM credentials", 

165 "principal": self.service_account_email, 

166 } 

167 

168 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

169 def with_quota_project(self, quota_project_id): 

170 creds = self.__class__( 

171 service_account_email=self._service_account_email, 

172 quota_project_id=quota_project_id, 

173 scopes=self._scopes, 

174 default_scopes=self._default_scopes, 

175 ) 

176 creds._universe_domain = self._universe_domain 

177 creds._universe_domain_cached = self._universe_domain_cached 

178 return creds 

179 

180 @_helpers.copy_docstring(credentials.Scoped) 

181 def with_scopes(self, scopes, default_scopes=None): 

182 # Compute Engine credentials can not be scoped (the metadata service 

183 # ignores the scopes parameter). App Engine, Cloud Run and Flex support 

184 # requesting scopes. 

185 creds = self.__class__( 

186 scopes=scopes, 

187 default_scopes=default_scopes, 

188 service_account_email=self._service_account_email, 

189 quota_project_id=self._quota_project_id, 

190 ) 

191 creds._universe_domain = self._universe_domain 

192 creds._universe_domain_cached = self._universe_domain_cached 

193 return creds 

194 

195 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) 

196 def with_universe_domain(self, universe_domain): 

197 return self.__class__( 

198 scopes=self._scopes, 

199 default_scopes=self._default_scopes, 

200 service_account_email=self._service_account_email, 

201 quota_project_id=self._quota_project_id, 

202 universe_domain=universe_domain, 

203 ) 

204 

205 

206_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

207_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token" 

208 

209 

210class IDTokenCredentials( 

211 credentials.CredentialsWithQuotaProject, 

212 credentials.Signing, 

213 credentials.CredentialsWithTokenUri, 

214): 

215 """Open ID Connect ID Token-based service account credentials. 

216 

217 These credentials relies on the default service account of a GCE instance. 

218 

219 ID token can be requested from `GCE metadata server identity endpoint`_, IAM 

220 token endpoint or other token endpoints you specify. If metadata server 

221 identity endpoint is not used, the GCE instance must have been started with 

222 a service account that has access to the IAM Cloud API. 

223 

224 .. _GCE metadata server identity endpoint: 

225 https://cloud.google.com/compute/docs/instances/verifying-instance-identity 

226 """ 

227 

228 def __init__( 

229 self, 

230 request, 

231 target_audience, 

232 token_uri=None, 

233 additional_claims=None, 

234 service_account_email=None, 

235 signer=None, 

236 use_metadata_identity_endpoint=False, 

237 quota_project_id=None, 

238 ): 

239 """ 

240 Args: 

241 request (google.auth.transport.Request): The object used to make 

242 HTTP requests. 

243 target_audience (str): The intended audience for these credentials, 

244 used when requesting the ID Token. The ID Token's ``aud`` claim 

245 will be set to this string. 

246 token_uri (str): The OAuth 2.0 Token URI. 

247 additional_claims (Mapping[str, str]): Any additional claims for 

248 the JWT assertion used in the authorization grant. 

249 service_account_email (str): Optional explicit service account to 

250 use to sign JWT tokens. 

251 By default, this is the default GCE service account. 

252 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

253 In case the signer is specified, the request argument will be 

254 ignored. 

255 use_metadata_identity_endpoint (bool): Whether to use GCE metadata 

256 identity endpoint. For backward compatibility the default value 

257 is False. If set to True, ``token_uri``, ``additional_claims``, 

258 ``service_account_email``, ``signer`` argument should not be set; 

259 otherwise ValueError will be raised. 

260 quota_project_id (Optional[str]): The project ID used for quota and 

261 billing. 

262 

263 Raises: 

264 ValueError: 

265 If ``use_metadata_identity_endpoint`` is set to True, and one of 

266 ``token_uri``, ``additional_claims``, ``service_account_email``, 

267 ``signer`` arguments is set. 

268 """ 

269 super(IDTokenCredentials, self).__init__() 

270 

271 self._quota_project_id = quota_project_id 

272 self._use_metadata_identity_endpoint = use_metadata_identity_endpoint 

273 self._target_audience = target_audience 

274 

275 if use_metadata_identity_endpoint: 

276 if token_uri or additional_claims or service_account_email or signer: 

277 raise exceptions.MalformedError( 

278 "If use_metadata_identity_endpoint is set, token_uri, " 

279 "additional_claims, service_account_email, signer arguments" 

280 " must not be set" 

281 ) 

282 self._token_uri = None 

283 self._additional_claims = None 

284 self._signer = None 

285 

286 if service_account_email is None: 

287 sa_info = _metadata.get_service_account_info(request) 

288 self._service_account_email = sa_info["email"] 

289 else: 

290 self._service_account_email = service_account_email 

291 

292 if not use_metadata_identity_endpoint: 

293 if signer is None: 

294 signer = iam.Signer( 

295 request=request, 

296 credentials=Credentials(), 

297 service_account_email=self._service_account_email, 

298 ) 

299 self._signer = signer 

300 self._token_uri = token_uri or _DEFAULT_TOKEN_URI 

301 

302 if additional_claims is not None: 

303 self._additional_claims = additional_claims 

304 else: 

305 self._additional_claims = {} 

306 

307 def with_target_audience(self, target_audience): 

308 """Create a copy of these credentials with the specified target 

309 audience. 

310 Args: 

311 target_audience (str): The intended audience for these credentials, 

312 used when requesting the ID Token. 

313 Returns: 

314 google.auth.service_account.IDTokenCredentials: A new credentials 

315 instance. 

316 """ 

317 # since the signer is already instantiated, 

318 # the request is not needed 

319 if self._use_metadata_identity_endpoint: 

320 return self.__class__( 

321 None, 

322 target_audience=target_audience, 

323 use_metadata_identity_endpoint=True, 

324 quota_project_id=self._quota_project_id, 

325 ) 

326 else: 

327 return self.__class__( 

328 None, 

329 service_account_email=self._service_account_email, 

330 token_uri=self._token_uri, 

331 target_audience=target_audience, 

332 additional_claims=self._additional_claims.copy(), 

333 signer=self.signer, 

334 use_metadata_identity_endpoint=False, 

335 quota_project_id=self._quota_project_id, 

336 ) 

337 

338 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

339 def with_quota_project(self, quota_project_id): 

340 

341 # since the signer is already instantiated, 

342 # the request is not needed 

343 if self._use_metadata_identity_endpoint: 

344 return self.__class__( 

345 None, 

346 target_audience=self._target_audience, 

347 use_metadata_identity_endpoint=True, 

348 quota_project_id=quota_project_id, 

349 ) 

350 else: 

351 return self.__class__( 

352 None, 

353 service_account_email=self._service_account_email, 

354 token_uri=self._token_uri, 

355 target_audience=self._target_audience, 

356 additional_claims=self._additional_claims.copy(), 

357 signer=self.signer, 

358 use_metadata_identity_endpoint=False, 

359 quota_project_id=quota_project_id, 

360 ) 

361 

362 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

363 def with_token_uri(self, token_uri): 

364 

365 # since the signer is already instantiated, 

366 # the request is not needed 

367 if self._use_metadata_identity_endpoint: 

368 raise exceptions.MalformedError( 

369 "If use_metadata_identity_endpoint is set, token_uri" " must not be set" 

370 ) 

371 else: 

372 return self.__class__( 

373 None, 

374 service_account_email=self._service_account_email, 

375 token_uri=token_uri, 

376 target_audience=self._target_audience, 

377 additional_claims=self._additional_claims.copy(), 

378 signer=self.signer, 

379 use_metadata_identity_endpoint=False, 

380 quota_project_id=self.quota_project_id, 

381 ) 

382 

383 def _make_authorization_grant_assertion(self): 

384 """Create the OAuth 2.0 assertion. 

385 This assertion is used during the OAuth 2.0 grant to acquire an 

386 ID token. 

387 Returns: 

388 bytes: The authorization grant assertion. 

389 """ 

390 now = _helpers.utcnow() 

391 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 

392 expiry = now + lifetime 

393 

394 payload = { 

395 "iat": _helpers.datetime_to_secs(now), 

396 "exp": _helpers.datetime_to_secs(expiry), 

397 # The issuer must be the service account email. 

398 "iss": self.service_account_email, 

399 # The audience must be the auth token endpoint's URI 

400 "aud": self._token_uri, 

401 # The target audience specifies which service the ID token is 

402 # intended for. 

403 "target_audience": self._target_audience, 

404 } 

405 

406 payload.update(self._additional_claims) 

407 

408 token = jwt.encode(self._signer, payload) 

409 

410 return token 

411 

412 def _call_metadata_identity_endpoint(self, request): 

413 """Request ID token from metadata identity endpoint. 

414 

415 Args: 

416 request (google.auth.transport.Request): The object used to make 

417 HTTP requests. 

418 

419 Returns: 

420 Tuple[str, datetime.datetime]: The ID token and the expiry of the ID token. 

421 

422 Raises: 

423 google.auth.exceptions.RefreshError: If the Compute Engine metadata 

424 service can't be reached or if the instance has no credentials. 

425 ValueError: If extracting expiry from the obtained ID token fails. 

426 """ 

427 try: 

428 path = "instance/service-accounts/default/identity" 

429 params = {"audience": self._target_audience, "format": "full"} 

430 metrics_header = { 

431 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_mds() 

432 } 

433 id_token = _metadata.get( 

434 request, path, params=params, headers=metrics_header 

435 ) 

436 except exceptions.TransportError as caught_exc: 

437 new_exc = exceptions.RefreshError(caught_exc) 

438 raise new_exc from caught_exc 

439 

440 _, payload, _, _ = jwt._unverified_decode(id_token) 

441 return id_token, datetime.datetime.utcfromtimestamp(payload["exp"]) 

442 

443 def refresh(self, request): 

444 """Refreshes the ID token. 

445 

446 Args: 

447 request (google.auth.transport.Request): The object used to make 

448 HTTP requests. 

449 

450 Raises: 

451 google.auth.exceptions.RefreshError: If the credentials could 

452 not be refreshed. 

453 ValueError: If extracting expiry from the obtained ID token fails. 

454 """ 

455 if self._use_metadata_identity_endpoint: 

456 self.token, self.expiry = self._call_metadata_identity_endpoint(request) 

457 else: 

458 assertion = self._make_authorization_grant_assertion() 

459 access_token, expiry, _ = _client.id_token_jwt_grant( 

460 request, self._token_uri, assertion 

461 ) 

462 self.token = access_token 

463 self.expiry = expiry 

464 

465 @property # type: ignore 

466 @_helpers.copy_docstring(credentials.Signing) 

467 def signer(self): 

468 return self._signer 

469 

470 def sign_bytes(self, message): 

471 """Signs the given message. 

472 

473 Args: 

474 message (bytes): The message to sign. 

475 

476 Returns: 

477 bytes: The message's cryptographic signature. 

478 

479 Raises: 

480 ValueError: 

481 Signer is not available if metadata identity endpoint is used. 

482 """ 

483 if self._use_metadata_identity_endpoint: 

484 raise exceptions.InvalidOperation( 

485 "Signer is not available if metadata identity endpoint is used" 

486 ) 

487 return self._signer.sign(message) 

488 

489 @property 

490 def service_account_email(self): 

491 """The service account email.""" 

492 return self._service_account_email 

493 

494 @property 

495 def signer_email(self): 

496 return self._service_account_email