Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/credentials.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

150 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16"""Interfaces for credentials.""" 

17 

18import abc 

19from enum import Enum 

20import os 

21 

22from google.auth import _helpers, environment_vars 

23from google.auth import exceptions 

24from google.auth import metrics 

25from google.auth._credentials_base import _BaseCredentials 

26from google.auth._refresh_worker import RefreshThreadManager 

27 

28DEFAULT_UNIVERSE_DOMAIN = "googleapis.com" 

29 

30 

31class Credentials(_BaseCredentials): 

32 """Base class for all credentials. 

33 

34 All credentials have a :attr:`token` that is used for authentication and 

35 may also optionally set an :attr:`expiry` to indicate when the token will 

36 no longer be valid. 

37 

38 Most credentials will be :attr:`invalid` until :meth:`refresh` is called. 

39 Credentials can do this automatically before the first HTTP request in 

40 :meth:`before_request`. 

41 

42 Although the token and expiration will change as the credentials are 

43 :meth:`refreshed <refresh>` and used, credentials should be considered 

44 immutable. Various credentials will accept configuration such as private 

45 keys, scopes, and other options. These options are not changeable after 

46 construction. Some classes will provide mechanisms to copy the credentials 

47 with modifications such as :meth:`ScopedCredentials.with_scopes`. 

48 """ 

49 

50 def __init__(self): 

51 super(Credentials, self).__init__() 

52 

53 self.expiry = None 

54 """Optional[datetime]: When the token expires and is no longer valid. 

55 If this is None, the token is assumed to never expire.""" 

56 self._quota_project_id = None 

57 """Optional[str]: Project to use for quota and billing purposes.""" 

58 self._trust_boundary = None 

59 """Optional[dict]: Cache of a trust boundary response which has a list 

60 of allowed regions and an encoded string representation of credentials 

61 trust boundary.""" 

62 self._universe_domain = DEFAULT_UNIVERSE_DOMAIN 

63 """Optional[str]: The universe domain value, default is googleapis.com 

64 """ 

65 

66 self._use_non_blocking_refresh = False 

67 self._refresh_worker = RefreshThreadManager() 

68 

69 @property 

70 def expired(self): 

71 """Checks if the credentials are expired. 

72 

73 Note that credentials can be invalid but not expired because 

74 Credentials with :attr:`expiry` set to None is considered to never 

75 expire. 

76 

77 .. deprecated:: v2.24.0 

78 Prefer checking :attr:`token_state` instead. 

79 """ 

80 if not self.expiry: 

81 return False 

82 # Remove some threshold from expiry to err on the side of reporting 

83 # expiration early so that we avoid the 401-refresh-retry loop. 

84 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD 

85 return _helpers.utcnow() >= skewed_expiry 

86 

87 @property 

88 def valid(self): 

89 """Checks the validity of the credentials. 

90 

91 This is True if the credentials have a :attr:`token` and the token 

92 is not :attr:`expired`. 

93 

94 .. deprecated:: v2.24.0 

95 Prefer checking :attr:`token_state` instead. 

96 """ 

97 return self.token is not None and not self.expired 

98 

99 @property 

100 def token_state(self): 

101 """ 

102 See `:obj:`TokenState` 

103 """ 

104 if self.token is None: 

105 return TokenState.INVALID 

106 

107 # Credentials that can't expire are always treated as fresh. 

108 if self.expiry is None: 

109 return TokenState.FRESH 

110 

111 expired = _helpers.utcnow() >= self.expiry 

112 if expired: 

113 return TokenState.INVALID 

114 

115 is_stale = _helpers.utcnow() >= (self.expiry - _helpers.REFRESH_THRESHOLD) 

116 if is_stale: 

117 return TokenState.STALE 

118 

119 return TokenState.FRESH 

120 

121 @property 

122 def quota_project_id(self): 

123 """Project to use for quota and billing purposes.""" 

124 return self._quota_project_id 

125 

126 @property 

127 def universe_domain(self): 

128 """The universe domain value.""" 

129 return self._universe_domain 

130 

131 def get_cred_info(self): 

132 """The credential information JSON. 

133 

134 The credential information will be added to auth related error messages 

135 by client library. 

136 

137 Returns: 

138 Mapping[str, str]: The credential information JSON. 

139 """ 

140 return None 

141 

142 @abc.abstractmethod 

143 def refresh(self, request): 

144 """Refreshes the access token. 

145 

146 Args: 

147 request (google.auth.transport.Request): The object used to make 

148 HTTP requests. 

149 

150 Raises: 

151 google.auth.exceptions.RefreshError: If the credentials could 

152 not be refreshed. 

153 """ 

154 # pylint: disable=missing-raises-doc 

155 # (pylint doesn't recognize that this is abstract) 

156 raise NotImplementedError("Refresh must be implemented") 

157 

158 def _metric_header_for_usage(self): 

159 """The x-goog-api-client header for token usage metric. 

160 

161 This header will be added to the API service requests in before_request 

162 method. For example, "cred-type/sa-jwt" means service account self 

163 signed jwt access token is used in the API service request 

164 authorization header. Children credentials classes need to override 

165 this method to provide the header value, if the token usage metric is 

166 needed. 

167 

168 Returns: 

169 str: The x-goog-api-client header value. 

170 """ 

171 return None 

172 

173 def apply(self, headers, token=None): 

174 """Apply the token to the authentication header. 

175 

176 Args: 

177 headers (Mapping): The HTTP request headers. 

178 token (Optional[str]): If specified, overrides the current access 

179 token. 

180 """ 

181 self._apply(headers, token=token) 

182 """Trust boundary value will be a cached value from global lookup. 

183 

184 The response of trust boundary will be a list of regions and a hex 

185 encoded representation. 

186 

187 An example of global lookup response: 

188 { 

189 "locations": [ 

190 "us-central1", "us-east1", "europe-west1", "asia-east1" 

191 ] 

192 "encoded_locations": "0xA30" 

193 } 

194 """ 

195 if self._trust_boundary is not None: 

196 headers["x-allowed-locations"] = self._trust_boundary["encoded_locations"] 

197 if self.quota_project_id: 

198 headers["x-goog-user-project"] = self.quota_project_id 

199 

200 def _blocking_refresh(self, request): 

201 if not self.valid: 

202 self.refresh(request) 

203 

204 def _non_blocking_refresh(self, request): 

205 use_blocking_refresh_fallback = False 

206 

207 if self.token_state == TokenState.STALE: 

208 use_blocking_refresh_fallback = not self._refresh_worker.start_refresh( 

209 self, request 

210 ) 

211 

212 if self.token_state == TokenState.INVALID or use_blocking_refresh_fallback: 

213 self.refresh(request) 

214 # If the blocking refresh succeeds then we can clear the error info 

215 # on the background refresh worker, and perform refreshes in a 

216 # background thread. 

217 self._refresh_worker.clear_error() 

218 

219 def before_request(self, request, method, url, headers): 

220 """Performs credential-specific before request logic. 

221 

222 Refreshes the credentials if necessary, then calls :meth:`apply` to 

223 apply the token to the authentication header. 

224 

225 Args: 

226 request (google.auth.transport.Request): The object used to make 

227 HTTP requests. 

228 method (str): The request's HTTP method or the RPC method being 

229 invoked. 

230 url (str): The request's URI or the RPC service's URI. 

231 headers (Mapping): The request's headers. 

232 """ 

233 # pylint: disable=unused-argument 

234 # (Subclasses may use these arguments to ascertain information about 

235 # the http request.) 

236 if self._use_non_blocking_refresh: 

237 self._non_blocking_refresh(request) 

238 else: 

239 self._blocking_refresh(request) 

240 

241 metrics.add_metric_header(headers, self._metric_header_for_usage()) 

242 self.apply(headers) 

243 

244 def with_non_blocking_refresh(self): 

245 self._use_non_blocking_refresh = True 

246 

247 

248class CredentialsWithQuotaProject(Credentials): 

249 """Abstract base for credentials supporting ``with_quota_project`` factory""" 

250 

251 def with_quota_project(self, quota_project_id): 

252 """Returns a copy of these credentials with a modified quota project. 

253 

254 Args: 

255 quota_project_id (str): The project to use for quota and 

256 billing purposes 

257 

258 Returns: 

259 google.auth.credentials.Credentials: A new credentials instance. 

260 """ 

261 raise NotImplementedError("This credential does not support quota project.") 

262 

263 def with_quota_project_from_environment(self): 

264 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT) 

265 if quota_from_env: 

266 return self.with_quota_project(quota_from_env) 

267 return self 

268 

269 

270class CredentialsWithTokenUri(Credentials): 

271 """Abstract base for credentials supporting ``with_token_uri`` factory""" 

272 

273 def with_token_uri(self, token_uri): 

274 """Returns a copy of these credentials with a modified token uri. 

275 

276 Args: 

277 token_uri (str): The uri to use for fetching/exchanging tokens 

278 

279 Returns: 

280 google.auth.credentials.Credentials: A new credentials instance. 

281 """ 

282 raise NotImplementedError("This credential does not use token uri.") 

283 

284 

285class CredentialsWithUniverseDomain(Credentials): 

286 """Abstract base for credentials supporting ``with_universe_domain`` factory""" 

287 

288 def with_universe_domain(self, universe_domain): 

289 """Returns a copy of these credentials with a modified universe domain. 

290 

291 Args: 

292 universe_domain (str): The universe domain to use 

293 

294 Returns: 

295 google.auth.credentials.Credentials: A new credentials instance. 

296 """ 

297 raise NotImplementedError( 

298 "This credential does not support with_universe_domain." 

299 ) 

300 

301 

302class AnonymousCredentials(Credentials): 

303 """Credentials that do not provide any authentication information. 

304 

305 These are useful in the case of services that support anonymous access or 

306 local service emulators that do not use credentials. 

307 """ 

308 

309 @property 

310 def expired(self): 

311 """Returns `False`, anonymous credentials never expire.""" 

312 return False 

313 

314 @property 

315 def valid(self): 

316 """Returns `True`, anonymous credentials are always valid.""" 

317 return True 

318 

319 def refresh(self, request): 

320 """Raises :class:``InvalidOperation``, anonymous credentials cannot be 

321 refreshed.""" 

322 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.") 

323 

324 def apply(self, headers, token=None): 

325 """Anonymous credentials do nothing to the request. 

326 

327 The optional ``token`` argument is not supported. 

328 

329 Raises: 

330 google.auth.exceptions.InvalidValue: If a token was specified. 

331 """ 

332 if token is not None: 

333 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.") 

334 

335 def before_request(self, request, method, url, headers): 

336 """Anonymous credentials do nothing to the request.""" 

337 

338 

339class ReadOnlyScoped(metaclass=abc.ABCMeta): 

340 """Interface for credentials whose scopes can be queried. 

341 

342 OAuth 2.0-based credentials allow limiting access using scopes as described 

343 in `RFC6749 Section 3.3`_. 

344 If a credential class implements this interface then the credentials either 

345 use scopes in their implementation. 

346 

347 Some credentials require scopes in order to obtain a token. You can check 

348 if scoping is necessary with :attr:`requires_scopes`:: 

349 

350 if credentials.requires_scopes: 

351 # Scoping is required. 

352 credentials = credentials.with_scopes(scopes=['one', 'two']) 

353 

354 Credentials that require scopes must either be constructed with scopes:: 

355 

356 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

357 

358 Or must copy an existing instance using :meth:`with_scopes`:: 

359 

360 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

361 

362 Some credentials have scopes but do not allow or require scopes to be set, 

363 these credentials can be used as-is. 

364 

365 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

366 """ 

367 

368 def __init__(self): 

369 super(ReadOnlyScoped, self).__init__() 

370 self._scopes = None 

371 self._default_scopes = None 

372 

373 @property 

374 def scopes(self): 

375 """Sequence[str]: the credentials' current set of scopes.""" 

376 return self._scopes 

377 

378 @property 

379 def default_scopes(self): 

380 """Sequence[str]: the credentials' current set of default scopes.""" 

381 return self._default_scopes 

382 

383 @abc.abstractproperty 

384 def requires_scopes(self): 

385 """True if these credentials require scopes to obtain an access token. 

386 """ 

387 return False 

388 

389 def has_scopes(self, scopes): 

390 """Checks if the credentials have the given scopes. 

391 

392 .. warning: This method is not guaranteed to be accurate if the 

393 credentials are :attr:`~Credentials.invalid`. 

394 

395 Args: 

396 scopes (Sequence[str]): The list of scopes to check. 

397 

398 Returns: 

399 bool: True if the credentials have the given scopes. 

400 """ 

401 credential_scopes = ( 

402 self._scopes if self._scopes is not None else self._default_scopes 

403 ) 

404 return set(scopes).issubset(set(credential_scopes or [])) 

405 

406 

407class Scoped(ReadOnlyScoped): 

408 """Interface for credentials whose scopes can be replaced while copying. 

409 

410 OAuth 2.0-based credentials allow limiting access using scopes as described 

411 in `RFC6749 Section 3.3`_. 

412 If a credential class implements this interface then the credentials either 

413 use scopes in their implementation. 

414 

415 Some credentials require scopes in order to obtain a token. You can check 

416 if scoping is necessary with :attr:`requires_scopes`:: 

417 

418 if credentials.requires_scopes: 

419 # Scoping is required. 

420 credentials = credentials.create_scoped(['one', 'two']) 

421 

422 Credentials that require scopes must either be constructed with scopes:: 

423 

424 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

425 

426 Or must copy an existing instance using :meth:`with_scopes`:: 

427 

428 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

429 

430 Some credentials have scopes but do not allow or require scopes to be set, 

431 these credentials can be used as-is. 

432 

433 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

434 """ 

435 

436 @abc.abstractmethod 

437 def with_scopes(self, scopes, default_scopes=None): 

438 """Create a copy of these credentials with the specified scopes. 

439 

440 Args: 

441 scopes (Sequence[str]): The list of scopes to attach to the 

442 current credentials. 

443 

444 Raises: 

445 NotImplementedError: If the credentials' scopes can not be changed. 

446 This can be avoided by checking :attr:`requires_scopes` before 

447 calling this method. 

448 """ 

449 raise NotImplementedError("This class does not require scoping.") 

450 

451 

452def with_scopes_if_required(credentials, scopes, default_scopes=None): 

453 """Creates a copy of the credentials with scopes if scoping is required. 

454 

455 This helper function is useful when you do not know (or care to know) the 

456 specific type of credentials you are using (such as when you use 

457 :func:`google.auth.default`). This function will call 

458 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if 

459 the credentials require scoping. Otherwise, it will return the credentials 

460 as-is. 

461 

462 Args: 

463 credentials (google.auth.credentials.Credentials): The credentials to 

464 scope if necessary. 

465 scopes (Sequence[str]): The list of scopes to use. 

466 default_scopes (Sequence[str]): Default scopes passed by a 

467 Google client library. Use 'scopes' for user-defined scopes. 

468 

469 Returns: 

470 google.auth.credentials.Credentials: Either a new set of scoped 

471 credentials, or the passed in credentials instance if no scoping 

472 was required. 

473 """ 

474 if isinstance(credentials, Scoped) and credentials.requires_scopes: 

475 return credentials.with_scopes(scopes, default_scopes=default_scopes) 

476 else: 

477 return credentials 

478 

479 

480class Signing(metaclass=abc.ABCMeta): 

481 """Interface for credentials that can cryptographically sign messages.""" 

482 

483 @abc.abstractmethod 

484 def sign_bytes(self, message): 

485 """Signs the given message. 

486 

487 Args: 

488 message (bytes): The message to sign. 

489 

490 Returns: 

491 bytes: The message's cryptographic signature. 

492 """ 

493 # pylint: disable=missing-raises-doc,redundant-returns-doc 

494 # (pylint doesn't recognize that this is abstract) 

495 raise NotImplementedError("Sign bytes must be implemented.") 

496 

497 @abc.abstractproperty 

498 def signer_email(self): 

499 """Optional[str]: An email address that identifies the signer.""" 

500 # pylint: disable=missing-raises-doc 

501 # (pylint doesn't recognize that this is abstract) 

502 raise NotImplementedError("Signer email must be implemented.") 

503 

504 @abc.abstractproperty 

505 def signer(self): 

506 """google.auth.crypt.Signer: The signer used to sign bytes.""" 

507 # pylint: disable=missing-raises-doc 

508 # (pylint doesn't recognize that this is abstract) 

509 raise NotImplementedError("Signer must be implemented.") 

510 

511 

512class TokenState(Enum): 

513 """ 

514 Tracks the state of a token. 

515 FRESH: The token is valid. It is not expired or close to expired, or the token has no expiry. 

516 STALE: The token is close to expired, and should be refreshed. The token can be used normally. 

517 INVALID: The token is expired or invalid. The token cannot be used for a normal operation. 

518 """ 

519 

520 FRESH = 1 

521 STALE = 2 

522 INVALID = 3