Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/compute_engine/credentials.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

149 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Google Compute Engine credentials. 

16 

17This module provides authentication for an application running on Google 

18Compute Engine using the Compute Engine metadata server. 

19 

20""" 

21 

22import datetime 

23 

24from google.auth import _helpers 

25from google.auth import credentials 

26from google.auth import exceptions 

27from google.auth import iam 

28from google.auth import jwt 

29from google.auth import metrics 

30from google.auth.compute_engine import _metadata 

31from google.oauth2 import _client 

32 

33 

34class Credentials( 

35 credentials.Scoped, 

36 credentials.CredentialsWithQuotaProject, 

37 credentials.CredentialsWithUniverseDomain, 

38): 

39 """Compute Engine Credentials. 

40 

41 These credentials use the Google Compute Engine metadata server to obtain 

42 OAuth 2.0 access tokens associated with the instance's service account, 

43 and are also used for Cloud Run, Flex and App Engine (except for the Python 

44 2.7 runtime, which is supported only on older versions of this library). 

45 

46 For more information about Compute Engine authentication, including how 

47 to configure scopes, see the `Compute Engine authentication 

48 documentation`_. 

49 

50 .. note:: On Compute Engine the metadata server ignores requested scopes. 

51 On Cloud Run, Flex and App Engine the server honours requested scopes. 

52 

53 .. _Compute Engine authentication documentation: 

54 https://cloud.google.com/compute/docs/authentication#using 

55 """ 

56 

57 def __init__( 

58 self, 

59 service_account_email="default", 

60 quota_project_id=None, 

61 scopes=None, 

62 default_scopes=None, 

63 universe_domain=None, 

64 ): 

65 """ 

66 Args: 

67 service_account_email (str): The service account email to use, or 

68 'default'. A Compute Engine instance may have multiple service 

69 accounts. 

70 quota_project_id (Optional[str]): The project ID used for quota and 

71 billing. 

72 scopes (Optional[Sequence[str]]): The list of scopes for the credentials. 

73 default_scopes (Optional[Sequence[str]]): Default scopes passed by a 

74 Google client library. Use 'scopes' for user-defined scopes. 

75 universe_domain (Optional[str]): The universe domain. If not 

76 provided or None, credential will attempt to fetch the value 

77 from metadata server. If metadata server doesn't have universe 

78 domain endpoint, then the default googleapis.com will be used. 

79 """ 

80 super(Credentials, self).__init__() 

81 self._service_account_email = service_account_email 

82 self._quota_project_id = quota_project_id 

83 self._scopes = scopes 

84 self._default_scopes = default_scopes 

85 self._universe_domain_cached = False 

86 if universe_domain: 

87 self._universe_domain = universe_domain 

88 self._universe_domain_cached = True 

89 

90 def _retrieve_info(self, request): 

91 """Retrieve information about the service account. 

92 

93 Updates the scopes and retrieves the full service account email. 

94 

95 Args: 

96 request (google.auth.transport.Request): The object used to make 

97 HTTP requests. 

98 """ 

99 info = _metadata.get_service_account_info( 

100 request, service_account=self._service_account_email 

101 ) 

102 

103 self._service_account_email = info["email"] 

104 

105 # Don't override scopes requested by the user. 

106 if self._scopes is None: 

107 self._scopes = info["scopes"] 

108 

109 def _metric_header_for_usage(self): 

110 return metrics.CRED_TYPE_SA_MDS 

111 

112 def refresh(self, request): 

113 """Refresh the access token and scopes. 

114 

115 Args: 

116 request (google.auth.transport.Request): The object used to make 

117 HTTP requests. 

118 

119 Raises: 

120 google.auth.exceptions.RefreshError: If the Compute Engine metadata 

121 service can't be reached if if the instance has not 

122 credentials. 

123 """ 

124 scopes = self._scopes if self._scopes is not None else self._default_scopes 

125 try: 

126 self._retrieve_info(request) 

127 # Always fetch token with default service account email. 

128 self.token, self.expiry = _metadata.get_service_account_token( 

129 request, service_account="default", scopes=scopes 

130 ) 

131 except exceptions.TransportError as caught_exc: 

132 new_exc = exceptions.RefreshError(caught_exc) 

133 raise new_exc from caught_exc 

134 

135 @property 

136 def service_account_email(self): 

137 """The service account email. 

138 

139 .. note:: This is not guaranteed to be set until :meth:`refresh` has been 

140 called. 

141 """ 

142 return self._service_account_email 

143 

144 @property 

145 def requires_scopes(self): 

146 return not self._scopes 

147 

148 @property 

149 def universe_domain(self): 

150 if self._universe_domain_cached: 

151 return self._universe_domain 

152 

153 from google.auth.transport import requests as google_auth_requests 

154 

155 self._universe_domain = _metadata.get_universe_domain( 

156 google_auth_requests.Request() 

157 ) 

158 self._universe_domain_cached = True 

159 return self._universe_domain 

160 

161 @_helpers.copy_docstring(credentials.Credentials) 

162 def get_cred_info(self): 

163 return { 

164 "credential_source": "metadata server", 

165 "credential_type": "VM credentials", 

166 "principal": self.service_account_email, 

167 } 

168 

169 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

170 def with_quota_project(self, quota_project_id): 

171 creds = self.__class__( 

172 service_account_email=self._service_account_email, 

173 quota_project_id=quota_project_id, 

174 scopes=self._scopes, 

175 default_scopes=self._default_scopes, 

176 ) 

177 creds._universe_domain = self._universe_domain 

178 creds._universe_domain_cached = self._universe_domain_cached 

179 return creds 

180 

181 @_helpers.copy_docstring(credentials.Scoped) 

182 def with_scopes(self, scopes, default_scopes=None): 

183 # Compute Engine credentials can not be scoped (the metadata service 

184 # ignores the scopes parameter). App Engine, Cloud Run and Flex support 

185 # requesting scopes. 

186 creds = self.__class__( 

187 scopes=scopes, 

188 default_scopes=default_scopes, 

189 service_account_email=self._service_account_email, 

190 quota_project_id=self._quota_project_id, 

191 ) 

192 creds._universe_domain = self._universe_domain 

193 creds._universe_domain_cached = self._universe_domain_cached 

194 return creds 

195 

196 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) 

197 def with_universe_domain(self, universe_domain): 

198 return self.__class__( 

199 scopes=self._scopes, 

200 default_scopes=self._default_scopes, 

201 service_account_email=self._service_account_email, 

202 quota_project_id=self._quota_project_id, 

203 universe_domain=universe_domain, 

204 ) 

205 

206 

207_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

208_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token" 

209 

210 

211class IDTokenCredentials( 

212 credentials.CredentialsWithQuotaProject, 

213 credentials.Signing, 

214 credentials.CredentialsWithTokenUri, 

215): 

216 """Open ID Connect ID Token-based service account credentials. 

217 

218 These credentials relies on the default service account of a GCE instance. 

219 

220 ID token can be requested from `GCE metadata server identity endpoint`_, IAM 

221 token endpoint or other token endpoints you specify. If metadata server 

222 identity endpoint is not used, the GCE instance must have been started with 

223 a service account that has access to the IAM Cloud API. 

224 

225 .. _GCE metadata server identity endpoint: 

226 https://cloud.google.com/compute/docs/instances/verifying-instance-identity 

227 """ 

228 

229 def __init__( 

230 self, 

231 request, 

232 target_audience, 

233 token_uri=None, 

234 additional_claims=None, 

235 service_account_email=None, 

236 signer=None, 

237 use_metadata_identity_endpoint=False, 

238 quota_project_id=None, 

239 ): 

240 """ 

241 Args: 

242 request (google.auth.transport.Request): The object used to make 

243 HTTP requests. 

244 target_audience (str): The intended audience for these credentials, 

245 used when requesting the ID Token. The ID Token's ``aud`` claim 

246 will be set to this string. 

247 token_uri (str): The OAuth 2.0 Token URI. 

248 additional_claims (Mapping[str, str]): Any additional claims for 

249 the JWT assertion used in the authorization grant. 

250 service_account_email (str): Optional explicit service account to 

251 use to sign JWT tokens. 

252 By default, this is the default GCE service account. 

253 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

254 In case the signer is specified, the request argument will be 

255 ignored. 

256 use_metadata_identity_endpoint (bool): Whether to use GCE metadata 

257 identity endpoint. For backward compatibility the default value 

258 is False. If set to True, ``token_uri``, ``additional_claims``, 

259 ``service_account_email``, ``signer`` argument should not be set; 

260 otherwise ValueError will be raised. 

261 quota_project_id (Optional[str]): The project ID used for quota and 

262 billing. 

263 

264 Raises: 

265 ValueError: 

266 If ``use_metadata_identity_endpoint`` is set to True, and one of 

267 ``token_uri``, ``additional_claims``, ``service_account_email``, 

268 ``signer`` arguments is set. 

269 """ 

270 super(IDTokenCredentials, self).__init__() 

271 

272 self._quota_project_id = quota_project_id 

273 self._use_metadata_identity_endpoint = use_metadata_identity_endpoint 

274 self._target_audience = target_audience 

275 

276 if use_metadata_identity_endpoint: 

277 if token_uri or additional_claims or service_account_email or signer: 

278 raise exceptions.MalformedError( 

279 "If use_metadata_identity_endpoint is set, token_uri, " 

280 "additional_claims, service_account_email, signer arguments" 

281 " must not be set" 

282 ) 

283 self._token_uri = None 

284 self._additional_claims = None 

285 self._signer = None 

286 

287 if service_account_email is None: 

288 sa_info = _metadata.get_service_account_info(request) 

289 self._service_account_email = sa_info["email"] 

290 else: 

291 self._service_account_email = service_account_email 

292 

293 if not use_metadata_identity_endpoint: 

294 if signer is None: 

295 signer = iam.Signer( 

296 request=request, 

297 credentials=Credentials(), 

298 service_account_email=self._service_account_email, 

299 ) 

300 self._signer = signer 

301 self._token_uri = token_uri or _DEFAULT_TOKEN_URI 

302 

303 if additional_claims is not None: 

304 self._additional_claims = additional_claims 

305 else: 

306 self._additional_claims = {} 

307 

308 def with_target_audience(self, target_audience): 

309 """Create a copy of these credentials with the specified target 

310 audience. 

311 Args: 

312 target_audience (str): The intended audience for these credentials, 

313 used when requesting the ID Token. 

314 Returns: 

315 google.auth.service_account.IDTokenCredentials: A new credentials 

316 instance. 

317 """ 

318 # since the signer is already instantiated, 

319 # the request is not needed 

320 if self._use_metadata_identity_endpoint: 

321 return self.__class__( 

322 None, 

323 target_audience=target_audience, 

324 use_metadata_identity_endpoint=True, 

325 quota_project_id=self._quota_project_id, 

326 ) 

327 else: 

328 return self.__class__( 

329 None, 

330 service_account_email=self._service_account_email, 

331 token_uri=self._token_uri, 

332 target_audience=target_audience, 

333 additional_claims=self._additional_claims.copy(), 

334 signer=self.signer, 

335 use_metadata_identity_endpoint=False, 

336 quota_project_id=self._quota_project_id, 

337 ) 

338 

339 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

340 def with_quota_project(self, quota_project_id): 

341 

342 # since the signer is already instantiated, 

343 # the request is not needed 

344 if self._use_metadata_identity_endpoint: 

345 return self.__class__( 

346 None, 

347 target_audience=self._target_audience, 

348 use_metadata_identity_endpoint=True, 

349 quota_project_id=quota_project_id, 

350 ) 

351 else: 

352 return self.__class__( 

353 None, 

354 service_account_email=self._service_account_email, 

355 token_uri=self._token_uri, 

356 target_audience=self._target_audience, 

357 additional_claims=self._additional_claims.copy(), 

358 signer=self.signer, 

359 use_metadata_identity_endpoint=False, 

360 quota_project_id=quota_project_id, 

361 ) 

362 

363 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

364 def with_token_uri(self, token_uri): 

365 

366 # since the signer is already instantiated, 

367 # the request is not needed 

368 if self._use_metadata_identity_endpoint: 

369 raise exceptions.MalformedError( 

370 "If use_metadata_identity_endpoint is set, token_uri" " must not be set" 

371 ) 

372 else: 

373 return self.__class__( 

374 None, 

375 service_account_email=self._service_account_email, 

376 token_uri=token_uri, 

377 target_audience=self._target_audience, 

378 additional_claims=self._additional_claims.copy(), 

379 signer=self.signer, 

380 use_metadata_identity_endpoint=False, 

381 quota_project_id=self.quota_project_id, 

382 ) 

383 

384 def _make_authorization_grant_assertion(self): 

385 """Create the OAuth 2.0 assertion. 

386 This assertion is used during the OAuth 2.0 grant to acquire an 

387 ID token. 

388 Returns: 

389 bytes: The authorization grant assertion. 

390 """ 

391 now = _helpers.utcnow() 

392 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 

393 expiry = now + lifetime 

394 

395 payload = { 

396 "iat": _helpers.datetime_to_secs(now), 

397 "exp": _helpers.datetime_to_secs(expiry), 

398 # The issuer must be the service account email. 

399 "iss": self.service_account_email, 

400 # The audience must be the auth token endpoint's URI 

401 "aud": self._token_uri, 

402 # The target audience specifies which service the ID token is 

403 # intended for. 

404 "target_audience": self._target_audience, 

405 } 

406 

407 payload.update(self._additional_claims) 

408 

409 token = jwt.encode(self._signer, payload) 

410 

411 return token 

412 

413 def _call_metadata_identity_endpoint(self, request): 

414 """Request ID token from metadata identity endpoint. 

415 

416 Args: 

417 request (google.auth.transport.Request): The object used to make 

418 HTTP requests. 

419 

420 Returns: 

421 Tuple[str, datetime.datetime]: The ID token and the expiry of the ID token. 

422 

423 Raises: 

424 google.auth.exceptions.RefreshError: If the Compute Engine metadata 

425 service can't be reached or if the instance has no credentials. 

426 ValueError: If extracting expiry from the obtained ID token fails. 

427 """ 

428 try: 

429 path = "instance/service-accounts/default/identity" 

430 params = {"audience": self._target_audience, "format": "full"} 

431 metrics_header = { 

432 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_mds() 

433 } 

434 id_token = _metadata.get( 

435 request, path, params=params, headers=metrics_header 

436 ) 

437 except exceptions.TransportError as caught_exc: 

438 new_exc = exceptions.RefreshError(caught_exc) 

439 raise new_exc from caught_exc 

440 

441 _, payload, _, _ = jwt._unverified_decode(id_token) 

442 return id_token, datetime.datetime.utcfromtimestamp(payload["exp"]) 

443 

444 def refresh(self, request): 

445 """Refreshes the ID token. 

446 

447 Args: 

448 request (google.auth.transport.Request): The object used to make 

449 HTTP requests. 

450 

451 Raises: 

452 google.auth.exceptions.RefreshError: If the credentials could 

453 not be refreshed. 

454 ValueError: If extracting expiry from the obtained ID token fails. 

455 """ 

456 if self._use_metadata_identity_endpoint: 

457 self.token, self.expiry = self._call_metadata_identity_endpoint(request) 

458 else: 

459 assertion = self._make_authorization_grant_assertion() 

460 access_token, expiry, _ = _client.id_token_jwt_grant( 

461 request, self._token_uri, assertion 

462 ) 

463 self.token = access_token 

464 self.expiry = expiry 

465 

466 @property # type: ignore 

467 @_helpers.copy_docstring(credentials.Signing) 

468 def signer(self): 

469 return self._signer 

470 

471 def sign_bytes(self, message): 

472 """Signs the given message. 

473 

474 Args: 

475 message (bytes): The message to sign. 

476 

477 Returns: 

478 bytes: The message's cryptographic signature. 

479 

480 Raises: 

481 ValueError: 

482 Signer is not available if metadata identity endpoint is used. 

483 """ 

484 if self._use_metadata_identity_endpoint: 

485 raise exceptions.InvalidOperation( 

486 "Signer is not available if metadata identity endpoint is used" 

487 ) 

488 return self._signer.sign(message) 

489 

490 @property 

491 def service_account_email(self): 

492 """The service account email.""" 

493 return self._service_account_email 

494 

495 @property 

496 def signer_email(self): 

497 return self._service_account_email