Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/compute_engine/credentials.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

168 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Google Compute Engine credentials. 

16 

17This module provides authentication for an application running on Google 

18Compute Engine using the Compute Engine metadata server. 

19 

20""" 

21 

22import datetime 

23 

24from google.auth import _helpers 

25from google.auth import credentials 

26from google.auth import exceptions 

27from google.auth import iam 

28from google.auth import jwt 

29from google.auth import metrics 

30from google.auth.compute_engine import _metadata 

31from google.oauth2 import _client 

32 

33_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( 

34 "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" 

35) 

36 

37 

38class Credentials( 

39 credentials.Scoped, 

40 credentials.CredentialsWithQuotaProject, 

41 credentials.CredentialsWithUniverseDomain, 

42 credentials.CredentialsWithTrustBoundary, 

43): 

44 """Compute Engine Credentials. 

45 

46 These credentials use the Google Compute Engine metadata server to obtain 

47 OAuth 2.0 access tokens associated with the instance's service account, 

48 and are also used for Cloud Run, Flex and App Engine (except for the Python 

49 2.7 runtime, which is supported only on older versions of this library). 

50 

51 For more information about Compute Engine authentication, including how 

52 to configure scopes, see the `Compute Engine authentication 

53 documentation`_. 

54 

55 .. note:: On Compute Engine the metadata server ignores requested scopes. 

56 On Cloud Run, Flex and App Engine the server honours requested scopes. 

57 

58 .. _Compute Engine authentication documentation: 

59 https://cloud.google.com/compute/docs/authentication#using 

60 """ 

61 

62 def __init__( 

63 self, 

64 service_account_email="default", 

65 quota_project_id=None, 

66 scopes=None, 

67 default_scopes=None, 

68 universe_domain=None, 

69 trust_boundary=None, 

70 ): 

71 """ 

72 Args: 

73 service_account_email (str): The service account email to use, or 

74 'default'. A Compute Engine instance may have multiple service 

75 accounts. 

76 quota_project_id (Optional[str]): The project ID used for quota and 

77 billing. 

78 scopes (Optional[Sequence[str]]): The list of scopes for the credentials. 

79 default_scopes (Optional[Sequence[str]]): Default scopes passed by a 

80 Google client library. Use 'scopes' for user-defined scopes. 

81 universe_domain (Optional[str]): The universe domain. If not 

82 provided or None, credential will attempt to fetch the value 

83 from metadata server. If metadata server doesn't have universe 

84 domain endpoint, then the default googleapis.com will be used. 

85 trust_boundary (Mapping[str,str]): A credential trust boundary. 

86 """ 

87 super(Credentials, self).__init__() 

88 self._service_account_email = service_account_email 

89 self._quota_project_id = quota_project_id 

90 self._scopes = scopes 

91 self._default_scopes = default_scopes 

92 self._universe_domain_cached = False 

93 if universe_domain: 

94 self._universe_domain = universe_domain 

95 self._universe_domain_cached = True 

96 self._trust_boundary = trust_boundary 

97 

98 def _retrieve_info(self, request): 

99 """Retrieve information about the service account. 

100 

101 Updates the scopes and retrieves the full service account email. 

102 

103 Args: 

104 request (google.auth.transport.Request): The object used to make 

105 HTTP requests. 

106 """ 

107 info = _metadata.get_service_account_info( 

108 request, service_account=self._service_account_email 

109 ) 

110 

111 if not info or "email" not in info: 

112 raise exceptions.RefreshError( 

113 "Unexpected response from metadata server: " 

114 "service account info is missing 'email' field." 

115 ) 

116 

117 self._service_account_email = info["email"] 

118 

119 # Don't override scopes requested by the user. 

120 if self._scopes is None: 

121 self._scopes = info.get("scopes") 

122 

123 def _metric_header_for_usage(self): 

124 return metrics.CRED_TYPE_SA_MDS 

125 

126 def _perform_refresh_token(self, request): 

127 """Refresh the access token and scopes. 

128 

129 Args: 

130 request (google.auth.transport.Request): The object used to make 

131 HTTP requests. 

132 

133 Raises: 

134 google.auth.exceptions.RefreshError: If the Compute Engine metadata 

135 service can't be reached if if the instance has not 

136 credentials. 

137 """ 

138 try: 

139 self._retrieve_info(request) 

140 scopes = self._scopes if self._scopes is not None else self._default_scopes 

141 # Always fetch token with default service account email. 

142 self.token, self.expiry = _metadata.get_service_account_token( 

143 request, service_account="default", scopes=scopes 

144 ) 

145 except exceptions.TransportError as caught_exc: 

146 new_exc = exceptions.RefreshError(caught_exc) 

147 raise new_exc from caught_exc 

148 

149 def _build_trust_boundary_lookup_url(self): 

150 """Builds and returns the URL for the trust boundary lookup API for GCE.""" 

151 # If the service account email is 'default', we need to get the 

152 # actual email address from the metadata server. 

153 if self._service_account_email == "default": 

154 from google.auth.transport import requests as google_auth_requests 

155 

156 request = google_auth_requests.Request() 

157 try: 

158 info = _metadata.get_service_account_info(request, "default") 

159 if not info or "email" not in info: 

160 raise exceptions.RefreshError( 

161 "Unexpected response from metadata server: " 

162 "service account info is missing 'email' field." 

163 ) 

164 self._service_account_email = info["email"] 

165 

166 except exceptions.TransportError as e: 

167 # If fetching the service account email fails due to a transport error, 

168 # it means we cannot build the trust boundary lookup URL. 

169 # Wrap this in a RefreshError so it's caught by _refresh_trust_boundary. 

170 raise exceptions.RefreshError( 

171 "Failed to get service account email for trust boundary lookup: {}".format( 

172 e 

173 ) 

174 ) from e 

175 

176 return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( 

177 self.universe_domain, self.service_account_email 

178 ) 

179 

180 @property 

181 def service_account_email(self): 

182 """The service account email. 

183 

184 .. note:: This is not guaranteed to be set until :meth:`refresh` has been 

185 called. 

186 """ 

187 return self._service_account_email 

188 

189 @property 

190 def requires_scopes(self): 

191 return not self._scopes 

192 

193 @property 

194 def universe_domain(self): 

195 if self._universe_domain_cached: 

196 return self._universe_domain 

197 

198 from google.auth.transport import requests as google_auth_requests 

199 

200 self._universe_domain = _metadata.get_universe_domain( 

201 google_auth_requests.Request() 

202 ) 

203 self._universe_domain_cached = True 

204 return self._universe_domain 

205 

206 @_helpers.copy_docstring(credentials.Credentials) 

207 def get_cred_info(self): 

208 return { 

209 "credential_source": "metadata server", 

210 "credential_type": "VM credentials", 

211 "principal": self.service_account_email, 

212 } 

213 

214 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

215 def with_quota_project(self, quota_project_id): 

216 creds = self.__class__( 

217 service_account_email=self._service_account_email, 

218 quota_project_id=quota_project_id, 

219 scopes=self._scopes, 

220 default_scopes=self._default_scopes, 

221 universe_domain=self._universe_domain, 

222 trust_boundary=self._trust_boundary, 

223 ) 

224 creds._universe_domain_cached = self._universe_domain_cached 

225 return creds 

226 

227 @_helpers.copy_docstring(credentials.Scoped) 

228 def with_scopes(self, scopes, default_scopes=None): 

229 # Compute Engine credentials can not be scoped (the metadata service 

230 # ignores the scopes parameter). App Engine, Cloud Run and Flex support 

231 # requesting scopes. 

232 creds = self.__class__( 

233 scopes=scopes, 

234 default_scopes=default_scopes, 

235 service_account_email=self._service_account_email, 

236 quota_project_id=self._quota_project_id, 

237 universe_domain=self._universe_domain, 

238 trust_boundary=self._trust_boundary, 

239 ) 

240 creds._universe_domain_cached = self._universe_domain_cached 

241 return creds 

242 

243 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) 

244 def with_universe_domain(self, universe_domain): 

245 return self.__class__( 

246 scopes=self._scopes, 

247 default_scopes=self._default_scopes, 

248 service_account_email=self._service_account_email, 

249 quota_project_id=self._quota_project_id, 

250 trust_boundary=self._trust_boundary, 

251 universe_domain=universe_domain, 

252 ) 

253 

254 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) 

255 def with_trust_boundary(self, trust_boundary): 

256 creds = self.__class__( 

257 service_account_email=self._service_account_email, 

258 quota_project_id=self._quota_project_id, 

259 scopes=self._scopes, 

260 default_scopes=self._default_scopes, 

261 universe_domain=self._universe_domain, 

262 trust_boundary=trust_boundary, 

263 ) 

264 creds._universe_domain_cached = self._universe_domain_cached 

265 return creds 

266 

267 

268_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

269_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token" 

270 

271 

272class IDTokenCredentials( 

273 credentials.CredentialsWithQuotaProject, 

274 credentials.Signing, 

275 credentials.CredentialsWithTokenUri, 

276): 

277 """Open ID Connect ID Token-based service account credentials. 

278 

279 These credentials relies on the default service account of a GCE instance. 

280 

281 ID token can be requested from `GCE metadata server identity endpoint`_, IAM 

282 token endpoint or other token endpoints you specify. If metadata server 

283 identity endpoint is not used, the GCE instance must have been started with 

284 a service account that has access to the IAM Cloud API. 

285 

286 .. _GCE metadata server identity endpoint: 

287 https://cloud.google.com/compute/docs/instances/verifying-instance-identity 

288 """ 

289 

290 def __init__( 

291 self, 

292 request, 

293 target_audience, 

294 token_uri=None, 

295 additional_claims=None, 

296 service_account_email=None, 

297 signer=None, 

298 use_metadata_identity_endpoint=False, 

299 quota_project_id=None, 

300 ): 

301 """ 

302 Args: 

303 request (google.auth.transport.Request): The object used to make 

304 HTTP requests. 

305 target_audience (str): The intended audience for these credentials, 

306 used when requesting the ID Token. The ID Token's ``aud`` claim 

307 will be set to this string. 

308 token_uri (str): The OAuth 2.0 Token URI. 

309 additional_claims (Mapping[str, str]): Any additional claims for 

310 the JWT assertion used in the authorization grant. 

311 service_account_email (str): Optional explicit service account to 

312 use to sign JWT tokens. 

313 By default, this is the default GCE service account. 

314 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

315 In case the signer is specified, the request argument will be 

316 ignored. 

317 use_metadata_identity_endpoint (bool): Whether to use GCE metadata 

318 identity endpoint. For backward compatibility the default value 

319 is False. If set to True, ``token_uri``, ``additional_claims``, 

320 ``service_account_email``, ``signer`` argument should not be set; 

321 otherwise ValueError will be raised. 

322 quota_project_id (Optional[str]): The project ID used for quota and 

323 billing. 

324 

325 Raises: 

326 ValueError: 

327 If ``use_metadata_identity_endpoint`` is set to True, and one of 

328 ``token_uri``, ``additional_claims``, ``service_account_email``, 

329 ``signer`` arguments is set. 

330 """ 

331 super(IDTokenCredentials, self).__init__() 

332 

333 self._quota_project_id = quota_project_id 

334 self._use_metadata_identity_endpoint = use_metadata_identity_endpoint 

335 self._target_audience = target_audience 

336 

337 if use_metadata_identity_endpoint: 

338 if token_uri or additional_claims or service_account_email or signer: 

339 raise ValueError( 

340 "If use_metadata_identity_endpoint is set, token_uri, " 

341 "additional_claims, service_account_email, signer arguments" 

342 " must not be set" 

343 ) 

344 self._token_uri = None 

345 self._additional_claims = None 

346 self._signer = None 

347 

348 if service_account_email is None: 

349 sa_info = _metadata.get_service_account_info(request) 

350 self._service_account_email = sa_info["email"] 

351 else: 

352 self._service_account_email = service_account_email 

353 

354 if not use_metadata_identity_endpoint: 

355 if signer is None: 

356 signer = iam.Signer( 

357 request=request, 

358 credentials=Credentials(), 

359 service_account_email=self._service_account_email, 

360 ) 

361 self._signer = signer 

362 self._token_uri = token_uri or _DEFAULT_TOKEN_URI 

363 

364 if additional_claims is not None: 

365 self._additional_claims = additional_claims 

366 else: 

367 self._additional_claims = {} 

368 

369 def with_target_audience(self, target_audience): 

370 """Create a copy of these credentials with the specified target 

371 audience. 

372 Args: 

373 target_audience (str): The intended audience for these credentials, 

374 used when requesting the ID Token. 

375 Returns: 

376 google.auth.service_account.IDTokenCredentials: A new credentials 

377 instance. 

378 """ 

379 # since the signer is already instantiated, 

380 # the request is not needed 

381 if self._use_metadata_identity_endpoint: 

382 return self.__class__( 

383 None, 

384 target_audience=target_audience, 

385 use_metadata_identity_endpoint=True, 

386 quota_project_id=self._quota_project_id, 

387 ) 

388 else: 

389 return self.__class__( 

390 None, 

391 service_account_email=self._service_account_email, 

392 token_uri=self._token_uri, 

393 target_audience=target_audience, 

394 additional_claims=self._additional_claims.copy(), 

395 signer=self.signer, 

396 use_metadata_identity_endpoint=False, 

397 quota_project_id=self._quota_project_id, 

398 ) 

399 

400 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

401 def with_quota_project(self, quota_project_id): 

402 # since the signer is already instantiated, 

403 # the request is not needed 

404 if self._use_metadata_identity_endpoint: 

405 return self.__class__( 

406 None, 

407 target_audience=self._target_audience, 

408 use_metadata_identity_endpoint=True, 

409 quota_project_id=quota_project_id, 

410 ) 

411 else: 

412 return self.__class__( 

413 None, 

414 service_account_email=self._service_account_email, 

415 token_uri=self._token_uri, 

416 target_audience=self._target_audience, 

417 additional_claims=self._additional_claims.copy(), 

418 signer=self.signer, 

419 use_metadata_identity_endpoint=False, 

420 quota_project_id=quota_project_id, 

421 ) 

422 

423 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

424 def with_token_uri(self, token_uri): 

425 # since the signer is already instantiated, 

426 # the request is not needed 

427 if self._use_metadata_identity_endpoint: 

428 raise ValueError( 

429 "If use_metadata_identity_endpoint is set, token_uri" " must not be set" 

430 ) 

431 else: 

432 return self.__class__( 

433 None, 

434 service_account_email=self._service_account_email, 

435 token_uri=token_uri, 

436 target_audience=self._target_audience, 

437 additional_claims=self._additional_claims.copy(), 

438 signer=self.signer, 

439 use_metadata_identity_endpoint=False, 

440 quota_project_id=self.quota_project_id, 

441 ) 

442 

443 def _make_authorization_grant_assertion(self): 

444 """Create the OAuth 2.0 assertion. 

445 This assertion is used during the OAuth 2.0 grant to acquire an 

446 ID token. 

447 Returns: 

448 bytes: The authorization grant assertion. 

449 """ 

450 now = _helpers.utcnow() 

451 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 

452 expiry = now + lifetime 

453 

454 payload = { 

455 "iat": _helpers.datetime_to_secs(now), 

456 "exp": _helpers.datetime_to_secs(expiry), 

457 # The issuer must be the service account email. 

458 "iss": self.service_account_email, 

459 # The audience must be the auth token endpoint's URI 

460 "aud": self._token_uri, 

461 # The target audience specifies which service the ID token is 

462 # intended for. 

463 "target_audience": self._target_audience, 

464 } 

465 

466 payload.update(self._additional_claims) 

467 

468 token = jwt.encode(self._signer, payload) 

469 

470 return token 

471 

472 def _call_metadata_identity_endpoint(self, request): 

473 """Request ID token from metadata identity endpoint. 

474 

475 Args: 

476 request (google.auth.transport.Request): The object used to make 

477 HTTP requests. 

478 

479 Returns: 

480 Tuple[str, datetime.datetime]: The ID token and the expiry of the ID token. 

481 

482 Raises: 

483 google.auth.exceptions.RefreshError: If the Compute Engine metadata 

484 service can't be reached or if the instance has no credentials. 

485 ValueError: If extracting expiry from the obtained ID token fails. 

486 """ 

487 try: 

488 path = "instance/service-accounts/default/identity" 

489 params = {"audience": self._target_audience, "format": "full"} 

490 metrics_header = { 

491 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_mds() 

492 } 

493 id_token = _metadata.get( 

494 request, path, params=params, headers=metrics_header 

495 ) 

496 except exceptions.TransportError as caught_exc: 

497 new_exc = exceptions.RefreshError(caught_exc) 

498 raise new_exc from caught_exc 

499 

500 _, payload, _, _ = jwt._unverified_decode(id_token) 

501 return id_token, _helpers.utcfromtimestamp(payload["exp"]) 

502 

503 def refresh(self, request): 

504 """Refreshes the ID token. 

505 

506 Args: 

507 request (google.auth.transport.Request): The object used to make 

508 HTTP requests. 

509 

510 Raises: 

511 google.auth.exceptions.RefreshError: If the credentials could 

512 not be refreshed. 

513 ValueError: If extracting expiry from the obtained ID token fails. 

514 """ 

515 if self._use_metadata_identity_endpoint: 

516 self.token, self.expiry = self._call_metadata_identity_endpoint(request) 

517 else: 

518 assertion = self._make_authorization_grant_assertion() 

519 access_token, expiry, _ = _client.id_token_jwt_grant( 

520 request, self._token_uri, assertion 

521 ) 

522 self.token = access_token 

523 self.expiry = expiry 

524 

525 @property # type: ignore 

526 @_helpers.copy_docstring(credentials.Signing) 

527 def signer(self): 

528 return self._signer 

529 

530 def sign_bytes(self, message): 

531 """Signs the given message. 

532 

533 Args: 

534 message (bytes): The message to sign. 

535 

536 Returns: 

537 bytes: The message's cryptographic signature. 

538 

539 Raises: 

540 ValueError: 

541 Signer is not available if metadata identity endpoint is used. 

542 """ 

543 if self._use_metadata_identity_endpoint: 

544 raise exceptions.InvalidOperation( 

545 "Signer is not available if metadata identity endpoint is used" 

546 ) 

547 return self._signer.sign(message) 

548 

549 @property 

550 def service_account_email(self): 

551 """The service account email.""" 

552 return self._service_account_email 

553 

554 @property 

555 def signer_email(self): 

556 return self._service_account_email