Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/compute_engine/credentials.py: 41%

134 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Google Compute Engine credentials. 

16 

17This module provides authentication for an application running on Google 

18Compute Engine using the Compute Engine metadata server. 

19 

20""" 

21 

22import datetime 

23 

24from google.auth import _helpers 

25from google.auth import credentials 

26from google.auth import exceptions 

27from google.auth import iam 

28from google.auth import jwt 

29from google.auth import metrics 

30from google.auth.compute_engine import _metadata 

31from google.auth.transport import requests as google_auth_requests 

32from google.oauth2 import _client 

33 

34 

35class Credentials(credentials.Scoped, credentials.CredentialsWithQuotaProject): 

36 """Compute Engine Credentials. 

37 

38 These credentials use the Google Compute Engine metadata server to obtain 

39 OAuth 2.0 access tokens associated with the instance's service account, 

40 and are also used for Cloud Run, Flex and App Engine (except for the Python 

41 2.7 runtime, which is supported only on older versions of this library). 

42 

43 For more information about Compute Engine authentication, including how 

44 to configure scopes, see the `Compute Engine authentication 

45 documentation`_. 

46 

47 .. note:: On Compute Engine the metadata server ignores requested scopes. 

48 On Cloud Run, Flex and App Engine the server honours requested scopes. 

49 

50 .. _Compute Engine authentication documentation: 

51 https://cloud.google.com/compute/docs/authentication#using 

52 """ 

53 

54 def __init__( 

55 self, 

56 service_account_email="default", 

57 quota_project_id=None, 

58 scopes=None, 

59 default_scopes=None, 

60 ): 

61 """ 

62 Args: 

63 service_account_email (str): The service account email to use, or 

64 'default'. A Compute Engine instance may have multiple service 

65 accounts. 

66 quota_project_id (Optional[str]): The project ID used for quota and 

67 billing. 

68 scopes (Optional[Sequence[str]]): The list of scopes for the credentials. 

69 default_scopes (Optional[Sequence[str]]): Default scopes passed by a 

70 Google client library. Use 'scopes' for user-defined scopes. 

71 """ 

72 super(Credentials, self).__init__() 

73 self._service_account_email = service_account_email 

74 self._quota_project_id = quota_project_id 

75 self._scopes = scopes 

76 self._default_scopes = default_scopes 

77 self._universe_domain_cached = False 

78 self._universe_domain_request = google_auth_requests.Request() 

79 

80 def _retrieve_info(self, request): 

81 """Retrieve information about the service account. 

82 

83 Updates the scopes and retrieves the full service account email. 

84 

85 Args: 

86 request (google.auth.transport.Request): The object used to make 

87 HTTP requests. 

88 """ 

89 info = _metadata.get_service_account_info( 

90 request, service_account=self._service_account_email 

91 ) 

92 

93 self._service_account_email = info["email"] 

94 

95 # Don't override scopes requested by the user. 

96 if self._scopes is None: 

97 self._scopes = info["scopes"] 

98 

99 def _metric_header_for_usage(self): 

100 return metrics.CRED_TYPE_SA_MDS 

101 

102 def refresh(self, request): 

103 """Refresh the access token and scopes. 

104 

105 Args: 

106 request (google.auth.transport.Request): The object used to make 

107 HTTP requests. 

108 

109 Raises: 

110 google.auth.exceptions.RefreshError: If the Compute Engine metadata 

111 service can't be reached if if the instance has not 

112 credentials. 

113 """ 

114 scopes = self._scopes if self._scopes is not None else self._default_scopes 

115 try: 

116 self._retrieve_info(request) 

117 self.token, self.expiry = _metadata.get_service_account_token( 

118 request, service_account=self._service_account_email, scopes=scopes 

119 ) 

120 except exceptions.TransportError as caught_exc: 

121 new_exc = exceptions.RefreshError(caught_exc) 

122 raise new_exc from caught_exc 

123 

124 @property 

125 def service_account_email(self): 

126 """The service account email. 

127 

128 .. note:: This is not guaranteed to be set until :meth:`refresh` has been 

129 called. 

130 """ 

131 return self._service_account_email 

132 

133 @property 

134 def requires_scopes(self): 

135 return not self._scopes 

136 

137 @property 

138 def universe_domain(self): 

139 if self._universe_domain_cached: 

140 return self._universe_domain 

141 self._universe_domain = _metadata.get_universe_domain( 

142 self._universe_domain_request 

143 ) 

144 self._universe_domain_cached = True 

145 return self._universe_domain 

146 

147 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

148 def with_quota_project(self, quota_project_id): 

149 return self.__class__( 

150 service_account_email=self._service_account_email, 

151 quota_project_id=quota_project_id, 

152 scopes=self._scopes, 

153 ) 

154 

155 @_helpers.copy_docstring(credentials.Scoped) 

156 def with_scopes(self, scopes, default_scopes=None): 

157 # Compute Engine credentials can not be scoped (the metadata service 

158 # ignores the scopes parameter). App Engine, Cloud Run and Flex support 

159 # requesting scopes. 

160 return self.__class__( 

161 scopes=scopes, 

162 default_scopes=default_scopes, 

163 service_account_email=self._service_account_email, 

164 quota_project_id=self._quota_project_id, 

165 ) 

166 

167 

168_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds 

169_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token" 

170 

171 

172class IDTokenCredentials( 

173 credentials.CredentialsWithQuotaProject, 

174 credentials.Signing, 

175 credentials.CredentialsWithTokenUri, 

176): 

177 """Open ID Connect ID Token-based service account credentials. 

178 

179 These credentials relies on the default service account of a GCE instance. 

180 

181 ID token can be requested from `GCE metadata server identity endpoint`_, IAM 

182 token endpoint or other token endpoints you specify. If metadata server 

183 identity endpoint is not used, the GCE instance must have been started with 

184 a service account that has access to the IAM Cloud API. 

185 

186 .. _GCE metadata server identity endpoint: 

187 https://cloud.google.com/compute/docs/instances/verifying-instance-identity 

188 """ 

189 

190 def __init__( 

191 self, 

192 request, 

193 target_audience, 

194 token_uri=None, 

195 additional_claims=None, 

196 service_account_email=None, 

197 signer=None, 

198 use_metadata_identity_endpoint=False, 

199 quota_project_id=None, 

200 ): 

201 """ 

202 Args: 

203 request (google.auth.transport.Request): The object used to make 

204 HTTP requests. 

205 target_audience (str): The intended audience for these credentials, 

206 used when requesting the ID Token. The ID Token's ``aud`` claim 

207 will be set to this string. 

208 token_uri (str): The OAuth 2.0 Token URI. 

209 additional_claims (Mapping[str, str]): Any additional claims for 

210 the JWT assertion used in the authorization grant. 

211 service_account_email (str): Optional explicit service account to 

212 use to sign JWT tokens. 

213 By default, this is the default GCE service account. 

214 signer (google.auth.crypt.Signer): The signer used to sign JWTs. 

215 In case the signer is specified, the request argument will be 

216 ignored. 

217 use_metadata_identity_endpoint (bool): Whether to use GCE metadata 

218 identity endpoint. For backward compatibility the default value 

219 is False. If set to True, ``token_uri``, ``additional_claims``, 

220 ``service_account_email``, ``signer`` argument should not be set; 

221 otherwise ValueError will be raised. 

222 quota_project_id (Optional[str]): The project ID used for quota and 

223 billing. 

224 

225 Raises: 

226 ValueError: 

227 If ``use_metadata_identity_endpoint`` is set to True, and one of 

228 ``token_uri``, ``additional_claims``, ``service_account_email``, 

229 ``signer`` arguments is set. 

230 """ 

231 super(IDTokenCredentials, self).__init__() 

232 

233 self._quota_project_id = quota_project_id 

234 self._use_metadata_identity_endpoint = use_metadata_identity_endpoint 

235 self._target_audience = target_audience 

236 

237 if use_metadata_identity_endpoint: 

238 if token_uri or additional_claims or service_account_email or signer: 

239 raise exceptions.MalformedError( 

240 "If use_metadata_identity_endpoint is set, token_uri, " 

241 "additional_claims, service_account_email, signer arguments" 

242 " must not be set" 

243 ) 

244 self._token_uri = None 

245 self._additional_claims = None 

246 self._signer = None 

247 

248 if service_account_email is None: 

249 sa_info = _metadata.get_service_account_info(request) 

250 self._service_account_email = sa_info["email"] 

251 else: 

252 self._service_account_email = service_account_email 

253 

254 if not use_metadata_identity_endpoint: 

255 if signer is None: 

256 signer = iam.Signer( 

257 request=request, 

258 credentials=Credentials(), 

259 service_account_email=self._service_account_email, 

260 ) 

261 self._signer = signer 

262 self._token_uri = token_uri or _DEFAULT_TOKEN_URI 

263 

264 if additional_claims is not None: 

265 self._additional_claims = additional_claims 

266 else: 

267 self._additional_claims = {} 

268 

269 def with_target_audience(self, target_audience): 

270 """Create a copy of these credentials with the specified target 

271 audience. 

272 Args: 

273 target_audience (str): The intended audience for these credentials, 

274 used when requesting the ID Token. 

275 Returns: 

276 google.auth.service_account.IDTokenCredentials: A new credentials 

277 instance. 

278 """ 

279 # since the signer is already instantiated, 

280 # the request is not needed 

281 if self._use_metadata_identity_endpoint: 

282 return self.__class__( 

283 None, 

284 target_audience=target_audience, 

285 use_metadata_identity_endpoint=True, 

286 quota_project_id=self._quota_project_id, 

287 ) 

288 else: 

289 return self.__class__( 

290 None, 

291 service_account_email=self._service_account_email, 

292 token_uri=self._token_uri, 

293 target_audience=target_audience, 

294 additional_claims=self._additional_claims.copy(), 

295 signer=self.signer, 

296 use_metadata_identity_endpoint=False, 

297 quota_project_id=self._quota_project_id, 

298 ) 

299 

300 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 

301 def with_quota_project(self, quota_project_id): 

302 

303 # since the signer is already instantiated, 

304 # the request is not needed 

305 if self._use_metadata_identity_endpoint: 

306 return self.__class__( 

307 None, 

308 target_audience=self._target_audience, 

309 use_metadata_identity_endpoint=True, 

310 quota_project_id=quota_project_id, 

311 ) 

312 else: 

313 return self.__class__( 

314 None, 

315 service_account_email=self._service_account_email, 

316 token_uri=self._token_uri, 

317 target_audience=self._target_audience, 

318 additional_claims=self._additional_claims.copy(), 

319 signer=self.signer, 

320 use_metadata_identity_endpoint=False, 

321 quota_project_id=quota_project_id, 

322 ) 

323 

324 @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 

325 def with_token_uri(self, token_uri): 

326 

327 # since the signer is already instantiated, 

328 # the request is not needed 

329 if self._use_metadata_identity_endpoint: 

330 raise exceptions.MalformedError( 

331 "If use_metadata_identity_endpoint is set, token_uri" " must not be set" 

332 ) 

333 else: 

334 return self.__class__( 

335 None, 

336 service_account_email=self._service_account_email, 

337 token_uri=token_uri, 

338 target_audience=self._target_audience, 

339 additional_claims=self._additional_claims.copy(), 

340 signer=self.signer, 

341 use_metadata_identity_endpoint=False, 

342 quota_project_id=self.quota_project_id, 

343 ) 

344 

345 def _make_authorization_grant_assertion(self): 

346 """Create the OAuth 2.0 assertion. 

347 This assertion is used during the OAuth 2.0 grant to acquire an 

348 ID token. 

349 Returns: 

350 bytes: The authorization grant assertion. 

351 """ 

352 now = _helpers.utcnow() 

353 lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 

354 expiry = now + lifetime 

355 

356 payload = { 

357 "iat": _helpers.datetime_to_secs(now), 

358 "exp": _helpers.datetime_to_secs(expiry), 

359 # The issuer must be the service account email. 

360 "iss": self.service_account_email, 

361 # The audience must be the auth token endpoint's URI 

362 "aud": self._token_uri, 

363 # The target audience specifies which service the ID token is 

364 # intended for. 

365 "target_audience": self._target_audience, 

366 } 

367 

368 payload.update(self._additional_claims) 

369 

370 token = jwt.encode(self._signer, payload) 

371 

372 return token 

373 

374 def _call_metadata_identity_endpoint(self, request): 

375 """Request ID token from metadata identity endpoint. 

376 

377 Args: 

378 request (google.auth.transport.Request): The object used to make 

379 HTTP requests. 

380 

381 Returns: 

382 Tuple[str, datetime.datetime]: The ID token and the expiry of the ID token. 

383 

384 Raises: 

385 google.auth.exceptions.RefreshError: If the Compute Engine metadata 

386 service can't be reached or if the instance has no credentials. 

387 ValueError: If extracting expiry from the obtained ID token fails. 

388 """ 

389 try: 

390 path = "instance/service-accounts/default/identity" 

391 params = {"audience": self._target_audience, "format": "full"} 

392 metrics_header = { 

393 metrics.API_CLIENT_HEADER: metrics.token_request_id_token_mds() 

394 } 

395 id_token = _metadata.get( 

396 request, path, params=params, headers=metrics_header 

397 ) 

398 except exceptions.TransportError as caught_exc: 

399 new_exc = exceptions.RefreshError(caught_exc) 

400 raise new_exc from caught_exc 

401 

402 _, payload, _, _ = jwt._unverified_decode(id_token) 

403 return id_token, datetime.datetime.utcfromtimestamp(payload["exp"]) 

404 

405 def refresh(self, request): 

406 """Refreshes the ID token. 

407 

408 Args: 

409 request (google.auth.transport.Request): The object used to make 

410 HTTP requests. 

411 

412 Raises: 

413 google.auth.exceptions.RefreshError: If the credentials could 

414 not be refreshed. 

415 ValueError: If extracting expiry from the obtained ID token fails. 

416 """ 

417 if self._use_metadata_identity_endpoint: 

418 self.token, self.expiry = self._call_metadata_identity_endpoint(request) 

419 else: 

420 assertion = self._make_authorization_grant_assertion() 

421 access_token, expiry, _ = _client.id_token_jwt_grant( 

422 request, self._token_uri, assertion 

423 ) 

424 self.token = access_token 

425 self.expiry = expiry 

426 

427 @property # type: ignore 

428 @_helpers.copy_docstring(credentials.Signing) 

429 def signer(self): 

430 return self._signer 

431 

432 def sign_bytes(self, message): 

433 """Signs the given message. 

434 

435 Args: 

436 message (bytes): The message to sign. 

437 

438 Returns: 

439 bytes: The message's cryptographic signature. 

440 

441 Raises: 

442 ValueError: 

443 Signer is not available if metadata identity endpoint is used. 

444 """ 

445 if self._use_metadata_identity_endpoint: 

446 raise exceptions.InvalidOperation( 

447 "Signer is not available if metadata identity endpoint is used" 

448 ) 

449 return self._signer.sign(message) 

450 

451 @property 

452 def service_account_email(self): 

453 """The service account email.""" 

454 return self._service_account_email 

455 

456 @property 

457 def signer_email(self): 

458 return self._service_account_email