Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/compute_engine/credentials.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

168 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Google Compute Engine credentials. 

16 

17This module provides authentication for an application running on Google 

18Compute Engine using the Compute Engine metadata server. 

19 

20""" 

21 

22import datetime 

23 

24from google.auth import _helpers 

25from google.auth import credentials 

26from google.auth import exceptions 

27from google.auth import iam 

28from google.auth import jwt 

29from google.auth import metrics 

30from google.auth.compute_engine import _metadata 

31from google.oauth2 import _client 

32 

33_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( 

34 "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" 

35) 

36 

37 

38class Credentials( 

39 credentials.Scoped, 

40 credentials.CredentialsWithQuotaProject, 

41 credentials.CredentialsWithUniverseDomain, 

42 credentials.CredentialsWithTrustBoundary, 

43): 

44 """Compute Engine Credentials. 

45 

46 These credentials use the Google Compute Engine metadata server to obtain 

47 OAuth 2.0 access tokens associated with the instance's service account, 

48 and are also used for Cloud Run, Flex and App Engine (except for the Python 

49 2.7 runtime, which is supported only on older versions of this library). 

50 

51 For more information about Compute Engine authentication, including how 

52 to configure scopes, see the `Compute Engine authentication 

53 documentation`_. 

54 

55 .. note:: On Compute Engine the metadata server ignores requested scopes. 

56 On Cloud Run, Flex and App Engine the server honours requested scopes. 

57 

58 .. _Compute Engine authentication documentation: 

59 https://cloud.google.com/compute/docs/authentication#using 

60 """ 

61 

62 def __init__( 

63 self, 

64 service_account_email="default", 

65 quota_project_id=None, 

66 scopes=None, 

67 default_scopes=None, 

68 universe_domain=None, 

69 trust_boundary=None, 

70 ): 

71 """ 

72 Args: 

73 service_account_email (str): The service account email to use, or 

74 'default'. A Compute Engine instance may have multiple service 

75 accounts. 

76 quota_project_id (Optional[str]): The project ID used for quota and 

77 billing. 

78 scopes (Optional[Sequence[str]]): The list of scopes for the credentials. 

79 default_scopes (Optional[Sequence[str]]): Default scopes passed by a 

80 Google client library. Use 'scopes' for user-defined scopes. 

81 universe_domain (Optional[str]): The universe domain. If not 

82 provided or None, credential will attempt to fetch the value 

83 from metadata server. If metadata server doesn't have universe 

84 domain endpoint, then the default googleapis.com will be used. 

85 trust_boundary (Mapping[str,str]): A credential trust boundary. 

86 """ 

87 super(Credentials, self).__init__() 

88 self._service_account_email = service_account_email 

89 self._quota_project_id = quota_project_id 

90 self._scopes = scopes 

91 self._default_scopes = default_scopes 

92 self._universe_domain_cached = False 

93 if universe_domain: 

94 self._universe_domain = universe_domain 

95 self._universe_domain_cached = True 

96 self._trust_boundary = trust_boundary 

97 

98 def _retrieve_info(self, request): 

99 """Retrieve information about the service account. 

100 

101 Updates the scopes and retrieves the full service account email. 

102 

103 Args: 

104 request (google.auth.transport.Request): The object used to make 

105 HTTP requests. 

106 """ 

107 info = _metadata.get_service_account_info( 

108 request, service_account=self._service_account_email 

109 ) 

110 

111 if not info or "email" not in info: 

112 raise exceptions.RefreshError( 

113 "Unexpected response from metadata server: " 

114 "service account info is missing 'email' field." 

115 ) 

116 

117 self._service_account_email = info["email"] 

118 

119 # Don't override scopes requested by the user. 

120 if self._scopes is None: 

121 self._scopes = info.get("scopes") 

122 

123 def _metric_header_for_usage(self): 

124 return metrics.CRED_TYPE_SA_MDS 

125 

126 def _refresh_token(self, request): 

127 """Refresh the access token and scopes. 

128 

129 Args: 

130 request (google.auth.transport.Request): The object used to make 

131 HTTP requests. 

132 

133 Raises: 

134 google.auth.exceptions.RefreshError: If the Compute Engine metadata 

135 service can't be reached if if the instance has not 

136 credentials. 

137 """ 

138 scopes = self._scopes if self._scopes is not None else self._default_scopes 

139 try: 

140 self._retrieve_info(request) 

141 # Always fetch token with default service account email. 

142 self.token, self.expiry = _metadata.get_service_account_token( 

143 request, service_account="default", scopes=scopes 

144 ) 

145 except exceptions.TransportError as caught_exc: 

146 new_exc = exceptions.RefreshError(caught_exc) 

147 raise new_exc from caught_exc 

148 

149 def _build_trust_boundary_lookup_url(self): 

150 """Builds and returns the URL for the trust boundary lookup API for GCE.""" 

151 # If the service account email is 'default', we need to get the 

152 # actual email address from the metadata server. 

153 if self._service_account_email == "default": 

154 from google.auth.transport import requests as google_auth_requests 

155 

156 request = google_auth_requests.Request() 

157 try: 

158 info = _metadata.get_service_account_info(request, "default") 

159 if not info or "email" not in info: 

160 raise exceptions.RefreshError( 

161 "Unexpected response from metadata server: " 

162 "service account info is missing 'email' field." 

163 ) 

164 self._service_account_email = info["email"] 

165 

166 except exceptions.TransportError as e: 

167 # If fetching the service account email fails due to a transport error, 

168 # it means we cannot build the trust boundary lookup URL. 

169 # Wrap this in a RefreshError so it's caught by _refresh_trust_boundary. 

170 raise exceptions.RefreshError( 

171 "Failed to get service account email for trust boundary lookup: {}".format( 

172 e 

173 ) 

174 ) from e 

175 

176 return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( 

177 self.universe_domain, self.service_account_email 

178 ) 

179 

180 @property 

181 def service_account_email(self): 

182 """The service account email. 

183 

184 .. note:: This is not guaranteed to be set until :meth:`refresh` has been 

185 called. 

186 """ 

187 return self._service_account_email 

188 

189 @property 

190 def requires_scopes(self): 

191 return not self._scopes 

192 

193 @property 

194 def universe_domain(self): 

195 if self._universe_domain_cached: 

196 return self._universe_domain 

197 

198 from google.auth.transport import requests as google_auth_requests 

199 

200 self._universe_domain = _metadata.get_universe_domain( 

201 google_auth_requests.Request() 

202 ) 

203 self._universe_domain_cached = True 

204 return self._universe_domain 

205 

206 @_helpers.copy_docstring(credentials.Credentials) 

207 def get_cred_info(self): 

208 return { 

209 "credential_source": "metadata server", 

210 "credential_type": "VM credentials", 

211 "principal": self.service_account_email, 

212 } 

213 

214 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

215 def with_quota_project(self, quota_project_id): 

216 creds = self.__class__( 

217 service_account_email=self._service_account_email, 

218 quota_project_id=quota_project_id, 

219 scopes=self._scopes, 

220 default_scopes=self._default_scopes, 

221 universe_domain=self._universe_domain, 

222 trust_boundary=self._trust_boundary, 

223 ) 

224 creds._universe_domain_cached = self._universe_domain_cached 

225 return creds 

226 

227 @_helpers.copy_docstring(credentials.Scoped) 

228 def with_scopes(self, scopes, default_scopes=None): 

229 # Compute Engine credentials can not be scoped (the metadata service 

230 # ignores the scopes parameter). App Engine, Cloud Run and Flex support 

231 # requesting scopes. 

232 creds = self.__class__( 

233 scopes=scopes, 

234 default_scopes=default_scopes, 

235 service_account_email=self._service_account_email, 

236 quota_project_id=self._quota_project_id, 

237 universe_domain=self._universe_domain, 

238 trust_boundary=self._trust_boundary, 

239 ) 

240 creds._universe_domain_cached = self._universe_domain_cached 

241 return creds 

242 

243 @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) 

244 def with_universe_domain(self, universe_domain): 

245 return self.__class__( 

246 scopes=self._scopes, 

247 default_scopes=self._default_scopes, 

248 service_account_email=self._service_account_email, 

249 quota_project_id=self._quota_project_id, 

250 trust_boundary=self._trust_boundary, 

251 universe_domain=universe_domain, 

252 ) 

253 

254 @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) 

255 def with_trust_boundary(self, trust_boundary): 

256 creds = self.__class__( 

257 service_account_email=self._service_account_email, 

258 quota_project_id=self._quota_project_id, 

259 scopes=self._scopes, 

260 default_scopes=self._default_scopes, 

261 universe_domain=self._universe_domain, 

262 trust_boundary=trust_boundary, 

263 ) 

264 creds._universe_domain_cached = self._universe_domain_cached 

265 return creds 

266 

267 

268_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

269_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token" 

270 

271 

272class IDTokenCredentials( 

273 credentials.CredentialsWithQuotaProject, 

274 credentials.Signing, 

275 credentials.CredentialsWithTokenUri, 

276): 

277 """Open ID Connect ID Token-based service account credentials. 

278 

279 These credentials relies on the default service account of a GCE instance. 

280 

281 ID token can be requested from `GCE metadata server identity endpoint`_, IAM 

282 token endpoint or other token endpoints you specify. If metadata server 

283 identity endpoint is not used, the GCE instance must have been started with 

284 a service account that has access to the IAM Cloud API. 

285 

286 .. _GCE metadata server identity endpoint: 

287 https://cloud.google.com/compute/docs/instances/verifying-instance-identity 

288 """ 

289 

290 def __init__( 

291 self, 

292 request, 

293 target_audience, 

294 token_uri=None, 

295 additional_claims=None, 

296 service_account_email=None, 

297 signer=None, 

298 use_metadata_identity_endpoint=False, 

299 quota_project_id=None, 

300 ): 

301 """ 

302 Args: 

303 request (google.auth.transport.Request): The object used to make 

304 HTTP requests. 

305 target_audience (str): The intended audience for these credentials, 

306 used when requesting the ID Token. The ID Token's ``aud`` claim 

307 will be set to this string. 

308 token_uri (str): The OAuth 2.0 Token URI. 

309 additional_claims (Mapping[str, str]): Any additional claims for 

310 the JWT assertion used in the authorization grant. 

311 service_account_email (str): Optional explicit service account to 

312 use to sign JWT tokens. 

313 By default, this is the default GCE service account. 

314 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

315 In case the signer is specified, the request argument will be 

316 ignored. 

317 use_metadata_identity_endpoint (bool): Whether to use GCE metadata 

318 identity endpoint. For backward compatibility the default value 

319 is False. If set to True, ``token_uri``, ``additional_claims``, 

320 ``service_account_email``, ``signer`` argument should not be set; 

321 otherwise ValueError will be raised. 

322 quota_project_id (Optional[str]): The project ID used for quota and 

323 billing. 

324 

325 Raises: 

326 ValueError: 

327 If ``use_metadata_identity_endpoint`` is set to True, and one of 

328 ``token_uri``, ``additional_claims``, ``service_account_email``, 

329 ``signer`` arguments is set. 

330 """ 

331 super(IDTokenCredentials, self).__init__() 

332 

333 self._quota_project_id = quota_project_id 

334 self._use_metadata_identity_endpoint = use_metadata_identity_endpoint 

335 self._target_audience = target_audience 

336 

337 if use_metadata_identity_endpoint: 

338 if token_uri or additional_claims or service_account_email or signer: 

339 raise ValueError( 

340 "If use_metadata_identity_endpoint is set, token_uri, " 

341 "additional_claims, service_account_email, signer arguments" 

342 " must not be set" 

343 ) 

344 self._token_uri = None 

345 self._additional_claims = None 

346 self._signer = None 

347 

348 if service_account_email is None: 

349 sa_info = _metadata.get_service_account_info(request) 

350 self._service_account_email = sa_info["email"] 

351 else: 

352 self._service_account_email = service_account_email 

353 

354 if not use_metadata_identity_endpoint: 

355 if signer is None: 

356 signer = iam.Signer( 

357 request=request, 

358 credentials=Credentials(), 

359 service_account_email=self._service_account_email, 

360 ) 

361 self._signer = signer 

362 self._token_uri = token_uri or _DEFAULT_TOKEN_URI 

363 

364 if additional_claims is not None: 

365 self._additional_claims = additional_claims 

366 else: 

367 self._additional_claims = {} 

368 

369 def with_target_audience(self, target_audience): 

370 """Create a copy of these credentials with the specified target 

371 audience. 

372 Args: 

373 target_audience (str): The intended audience for these credentials, 

374 used when requesting the ID Token. 

375 Returns: 

376 google.auth.service_account.IDTokenCredentials: A new credentials 

377 instance. 

378 """ 

379 # since the signer is already instantiated, 

380 # the request is not needed 

381 if self._use_metadata_identity_endpoint: 

382 return self.__class__( 

383 None, 

384 target_audience=target_audience, 

385 use_metadata_identity_endpoint=True, 

386 quota_project_id=self._quota_project_id, 

387 ) 

388 else: 

389 return self.__class__( 

390 None, 

391 service_account_email=self._service_account_email, 

392 token_uri=self._token_uri, 

393 target_audience=target_audience, 

394 additional_claims=self._additional_claims.copy(), 

395 signer=self.signer, 

396 use_metadata_identity_endpoint=False, 

397 quota_project_id=self._quota_project_id, 

398 ) 

399 

400 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

401 def with_quota_project(self, quota_project_id): 

402 

403 # since the signer is already instantiated, 

404 # the request is not needed 

405 if self._use_metadata_identity_endpoint: 

406 return self.__class__( 

407 None, 

408 target_audience=self._target_audience, 

409 use_metadata_identity_endpoint=True, 

410 quota_project_id=quota_project_id, 

411 ) 

412 else: 

413 return self.__class__( 

414 None, 

415 service_account_email=self._service_account_email, 

416 token_uri=self._token_uri, 

417 target_audience=self._target_audience, 

418 additional_claims=self._additional_claims.copy(), 

419 signer=self.signer, 

420 use_metadata_identity_endpoint=False, 

421 quota_project_id=quota_project_id, 

422 ) 

423 

424 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

425 def with_token_uri(self, token_uri): 

426 

427 # since the signer is already instantiated, 

428 # the request is not needed 

429 if self._use_metadata_identity_endpoint: 

430 raise ValueError( 

431 "If use_metadata_identity_endpoint is set, token_uri" " must not be set" 

432 ) 

433 else: 

434 return self.__class__( 

435 None, 

436 service_account_email=self._service_account_email, 

437 token_uri=token_uri, 

438 target_audience=self._target_audience, 

439 additional_claims=self._additional_claims.copy(), 

440 signer=self.signer, 

441 use_metadata_identity_endpoint=False, 

442 quota_project_id=self.quota_project_id, 

443 ) 

444 

445 def _make_authorization_grant_assertion(self): 

446 """Create the OAuth 2.0 assertion. 

447 This assertion is used during the OAuth 2.0 grant to acquire an 

448 ID token. 

449 Returns: 

450 bytes: The authorization grant assertion. 

451 """ 

452 now = _helpers.utcnow() 

453 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 

454 expiry = now + lifetime 

455 

456 payload = { 

457 "iat": _helpers.datetime_to_secs(now), 

458 "exp": _helpers.datetime_to_secs(expiry), 

459 # The issuer must be the service account email. 

460 "iss": self.service_account_email, 

461 # The audience must be the auth token endpoint's URI 

462 "aud": self._token_uri, 

463 # The target audience specifies which service the ID token is 

464 # intended for. 

465 "target_audience": self._target_audience, 

466 } 

467 

468 payload.update(self._additional_claims) 

469 

470 token = jwt.encode(self._signer, payload) 

471 

472 return token 

473 

474 def _call_metadata_identity_endpoint(self, request): 

475 """Request ID token from metadata identity endpoint. 

476 

477 Args: 

478 request (google.auth.transport.Request): The object used to make 

479 HTTP requests. 

480 

481 Returns: 

482 Tuple[str, datetime.datetime]: The ID token and the expiry of the ID token. 

483 

484 Raises: 

485 google.auth.exceptions.RefreshError: If the Compute Engine metadata 

486 service can't be reached or if the instance has no credentials. 

487 ValueError: If extracting expiry from the obtained ID token fails. 

488 """ 

489 try: 

490 path = "instance/service-accounts/default/identity" 

491 params = {"audience": self._target_audience, "format": "full"} 

492 metrics_header = { 

493 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_mds() 

494 } 

495 id_token = _metadata.get( 

496 request, path, params=params, headers=metrics_header 

497 ) 

498 except exceptions.TransportError as caught_exc: 

499 new_exc = exceptions.RefreshError(caught_exc) 

500 raise new_exc from caught_exc 

501 

502 _, payload, _, _ = jwt._unverified_decode(id_token) 

503 return id_token, datetime.datetime.utcfromtimestamp(payload["exp"]) 

504 

505 def refresh(self, request): 

506 """Refreshes the ID token. 

507 

508 Args: 

509 request (google.auth.transport.Request): The object used to make 

510 HTTP requests. 

511 

512 Raises: 

513 google.auth.exceptions.RefreshError: If the credentials could 

514 not be refreshed. 

515 ValueError: If extracting expiry from the obtained ID token fails. 

516 """ 

517 if self._use_metadata_identity_endpoint: 

518 self.token, self.expiry = self._call_metadata_identity_endpoint(request) 

519 else: 

520 assertion = self._make_authorization_grant_assertion() 

521 access_token, expiry, _ = _client.id_token_jwt_grant( 

522 request, self._token_uri, assertion 

523 ) 

524 self.token = access_token 

525 self.expiry = expiry 

526 

527 @property # type: ignore 

528 @_helpers.copy_docstring(credentials.Signing) 

529 def signer(self): 

530 return self._signer 

531 

532 def sign_bytes(self, message): 

533 """Signs the given message. 

534 

535 Args: 

536 message (bytes): The message to sign. 

537 

538 Returns: 

539 bytes: The message's cryptographic signature. 

540 

541 Raises: 

542 ValueError: 

543 Signer is not available if metadata identity endpoint is used. 

544 """ 

545 if self._use_metadata_identity_endpoint: 

546 raise exceptions.InvalidOperation( 

547 "Signer is not available if metadata identity endpoint is used" 

548 ) 

549 return self._signer.sign(message) 

550 

551 @property 

552 def service_account_email(self): 

553 """The service account email.""" 

554 return self._service_account_email 

555 

556 @property 

557 def signer_email(self): 

558 return self._service_account_email