Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/auth/credentials.py: 61%

101 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-06 06:03 +0000

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16"""Interfaces for credentials.""" 

17 

18import abc 

19import os 

20 

21import six 

22 

23from google.auth import _helpers, environment_vars 

24from google.auth import exceptions 

25from google.auth import metrics 

26 

27 

28@six.add_metaclass(abc.ABCMeta) 

29class Credentials(object): 

30 """Base class for all credentials. 

31 

32 All credentials have a :attr:`token` that is used for authentication and 

33 may also optionally set an :attr:`expiry` to indicate when the token will 

34 no longer be valid. 

35 

36 Most credentials will be :attr:`invalid` until :meth:`refresh` is called. 

37 Credentials can do this automatically before the first HTTP request in 

38 :meth:`before_request`. 

39 

40 Although the token and expiration will change as the credentials are 

41 :meth:`refreshed <refresh>` and used, credentials should be considered 

42 immutable. Various credentials will accept configuration such as private 

43 keys, scopes, and other options. These options are not changeable after 

44 construction. Some classes will provide mechanisms to copy the credentials 

45 with modifications such as :meth:`ScopedCredentials.with_scopes`. 

46 """ 

47 

48 def __init__(self): 

49 self.token = None 

50 """str: The bearer token that can be used in HTTP headers to make 

51 authenticated requests.""" 

52 self.expiry = None 

53 """Optional[datetime]: When the token expires and is no longer valid. 

54 If this is None, the token is assumed to never expire.""" 

55 self._quota_project_id = None 

56 """Optional[str]: Project to use for quota and billing purposes.""" 

57 

58 @property 

59 def expired(self): 

60 """Checks if the credentials are expired. 

61 

62 Note that credentials can be invalid but not expired because 

63 Credentials with :attr:`expiry` set to None is considered to never 

64 expire. 

65 """ 

66 if not self.expiry: 

67 return False 

68 

69 # Remove some threshold from expiry to err on the side of reporting 

70 # expiration early so that we avoid the 401-refresh-retry loop. 

71 skewed_expiry = self.expiry - _helpers.REFRESH_THRESHOLD 

72 return _helpers.utcnow() >= skewed_expiry 

73 

74 @property 

75 def valid(self): 

76 """Checks the validity of the credentials. 

77 

78 This is True if the credentials have a :attr:`token` and the token 

79 is not :attr:`expired`. 

80 """ 

81 return self.token is not None and not self.expired 

82 

83 @property 

84 def quota_project_id(self): 

85 """Project to use for quota and billing purposes.""" 

86 return self._quota_project_id 

87 

88 @abc.abstractmethod 

89 def refresh(self, request): 

90 """Refreshes the access token. 

91 

92 Args: 

93 request (google.auth.transport.Request): The object used to make 

94 HTTP requests. 

95 

96 Raises: 

97 google.auth.exceptions.RefreshError: If the credentials could 

98 not be refreshed. 

99 """ 

100 # pylint: disable=missing-raises-doc 

101 # (pylint doesn't recognize that this is abstract) 

102 raise NotImplementedError("Refresh must be implemented") 

103 

104 def _metric_header_for_usage(self): 

105 """The x-goog-api-client header for token usage metric. 

106 

107 This header will be added to the API service requests in before_request 

108 method. For example, "cred-type/sa-jwt" means service account self 

109 signed jwt access token is used in the API service request 

110 authorization header. Children credentials classes need to override 

111 this method to provide the header value, if the token usage metric is 

112 needed. 

113 

114 Returns: 

115 str: The x-goog-api-client header value. 

116 """ 

117 return None 

118 

119 def apply(self, headers, token=None): 

120 """Apply the token to the authentication header. 

121 

122 Args: 

123 headers (Mapping): The HTTP request headers. 

124 token (Optional[str]): If specified, overrides the current access 

125 token. 

126 """ 

127 headers["authorization"] = "Bearer {}".format( 

128 _helpers.from_bytes(token or self.token) 

129 ) 

130 if self.quota_project_id: 

131 headers["x-goog-user-project"] = self.quota_project_id 

132 

133 def before_request(self, request, method, url, headers): 

134 """Performs credential-specific before request logic. 

135 

136 Refreshes the credentials if necessary, then calls :meth:`apply` to 

137 apply the token to the authentication header. 

138 

139 Args: 

140 request (google.auth.transport.Request): The object used to make 

141 HTTP requests. 

142 method (str): The request's HTTP method or the RPC method being 

143 invoked. 

144 url (str): The request's URI or the RPC service's URI. 

145 headers (Mapping): The request's headers. 

146 """ 

147 # pylint: disable=unused-argument 

148 # (Subclasses may use these arguments to ascertain information about 

149 # the http request.) 

150 if not self.valid: 

151 self.refresh(request) 

152 metrics.add_metric_header(headers, self._metric_header_for_usage()) 

153 self.apply(headers) 

154 

155 

156class CredentialsWithQuotaProject(Credentials): 

157 """Abstract base for credentials supporting ``with_quota_project`` factory""" 

158 

159 def with_quota_project(self, quota_project_id): 

160 """Returns a copy of these credentials with a modified quota project. 

161 

162 Args: 

163 quota_project_id (str): The project to use for quota and 

164 billing purposes 

165 

166 Returns: 

167 google.oauth2.credentials.Credentials: A new credentials instance. 

168 """ 

169 raise NotImplementedError("This credential does not support quota project.") 

170 

171 def with_quota_project_from_environment(self): 

172 quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT) 

173 if quota_from_env: 

174 return self.with_quota_project(quota_from_env) 

175 return self 

176 

177 

178class CredentialsWithTokenUri(Credentials): 

179 """Abstract base for credentials supporting ``with_token_uri`` factory""" 

180 

181 def with_token_uri(self, token_uri): 

182 """Returns a copy of these credentials with a modified token uri. 

183 

184 Args: 

185 token_uri (str): The uri to use for fetching/exchanging tokens 

186 

187 Returns: 

188 google.oauth2.credentials.Credentials: A new credentials instance. 

189 """ 

190 raise NotImplementedError("This credential does not use token uri.") 

191 

192 

193class AnonymousCredentials(Credentials): 

194 """Credentials that do not provide any authentication information. 

195 

196 These are useful in the case of services that support anonymous access or 

197 local service emulators that do not use credentials. 

198 """ 

199 

200 @property 

201 def expired(self): 

202 """Returns `False`, anonymous credentials never expire.""" 

203 return False 

204 

205 @property 

206 def valid(self): 

207 """Returns `True`, anonymous credentials are always valid.""" 

208 return True 

209 

210 def refresh(self, request): 

211 """Raises :class:``InvalidOperation``, anonymous credentials cannot be 

212 refreshed.""" 

213 raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.") 

214 

215 def apply(self, headers, token=None): 

216 """Anonymous credentials do nothing to the request. 

217 

218 The optional ``token`` argument is not supported. 

219 

220 Raises: 

221 google.auth.exceptions.InvalidValue: If a token was specified. 

222 """ 

223 if token is not None: 

224 raise exceptions.InvalidValue("Anonymous credentials don't support tokens.") 

225 

226 def before_request(self, request, method, url, headers): 

227 """Anonymous credentials do nothing to the request.""" 

228 

229 

230@six.add_metaclass(abc.ABCMeta) 

231class ReadOnlyScoped(object): 

232 """Interface for credentials whose scopes can be queried. 

233 

234 OAuth 2.0-based credentials allow limiting access using scopes as described 

235 in `RFC6749 Section 3.3`_. 

236 If a credential class implements this interface then the credentials either 

237 use scopes in their implementation. 

238 

239 Some credentials require scopes in order to obtain a token. You can check 

240 if scoping is necessary with :attr:`requires_scopes`:: 

241 

242 if credentials.requires_scopes: 

243 # Scoping is required. 

244 credentials = credentials.with_scopes(scopes=['one', 'two']) 

245 

246 Credentials that require scopes must either be constructed with scopes:: 

247 

248 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

249 

250 Or must copy an existing instance using :meth:`with_scopes`:: 

251 

252 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

253 

254 Some credentials have scopes but do not allow or require scopes to be set, 

255 these credentials can be used as-is. 

256 

257 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

258 """ 

259 

260 def __init__(self): 

261 super(ReadOnlyScoped, self).__init__() 

262 self._scopes = None 

263 self._default_scopes = None 

264 

265 @property 

266 def scopes(self): 

267 """Sequence[str]: the credentials' current set of scopes.""" 

268 return self._scopes 

269 

270 @property 

271 def default_scopes(self): 

272 """Sequence[str]: the credentials' current set of default scopes.""" 

273 return self._default_scopes 

274 

275 @abc.abstractproperty 

276 def requires_scopes(self): 

277 """True if these credentials require scopes to obtain an access token. 

278 """ 

279 return False 

280 

281 def has_scopes(self, scopes): 

282 """Checks if the credentials have the given scopes. 

283 

284 .. warning: This method is not guaranteed to be accurate if the 

285 credentials are :attr:`~Credentials.invalid`. 

286 

287 Args: 

288 scopes (Sequence[str]): The list of scopes to check. 

289 

290 Returns: 

291 bool: True if the credentials have the given scopes. 

292 """ 

293 credential_scopes = ( 

294 self._scopes if self._scopes is not None else self._default_scopes 

295 ) 

296 return set(scopes).issubset(set(credential_scopes or [])) 

297 

298 

299class Scoped(ReadOnlyScoped): 

300 """Interface for credentials whose scopes can be replaced while copying. 

301 

302 OAuth 2.0-based credentials allow limiting access using scopes as described 

303 in `RFC6749 Section 3.3`_. 

304 If a credential class implements this interface then the credentials either 

305 use scopes in their implementation. 

306 

307 Some credentials require scopes in order to obtain a token. You can check 

308 if scoping is necessary with :attr:`requires_scopes`:: 

309 

310 if credentials.requires_scopes: 

311 # Scoping is required. 

312 credentials = credentials.create_scoped(['one', 'two']) 

313 

314 Credentials that require scopes must either be constructed with scopes:: 

315 

316 credentials = SomeScopedCredentials(scopes=['one', 'two']) 

317 

318 Or must copy an existing instance using :meth:`with_scopes`:: 

319 

320 scoped_credentials = credentials.with_scopes(scopes=['one', 'two']) 

321 

322 Some credentials have scopes but do not allow or require scopes to be set, 

323 these credentials can be used as-is. 

324 

325 .. _RFC6749 Section 3.3: https://tools.ietf.org/html/rfc6749#section-3.3 

326 """ 

327 

328 @abc.abstractmethod 

329 def with_scopes(self, scopes, default_scopes=None): 

330 """Create a copy of these credentials with the specified scopes. 

331 

332 Args: 

333 scopes (Sequence[str]): The list of scopes to attach to the 

334 current credentials. 

335 

336 Raises: 

337 NotImplementedError: If the credentials' scopes can not be changed. 

338 This can be avoided by checking :attr:`requires_scopes` before 

339 calling this method. 

340 """ 

341 raise NotImplementedError("This class does not require scoping.") 

342 

343 

344def with_scopes_if_required(credentials, scopes, default_scopes=None): 

345 """Creates a copy of the credentials with scopes if scoping is required. 

346 

347 This helper function is useful when you do not know (or care to know) the 

348 specific type of credentials you are using (such as when you use 

349 :func:`google.auth.default`). This function will call 

350 :meth:`Scoped.with_scopes` if the credentials are scoped credentials and if 

351 the credentials require scoping. Otherwise, it will return the credentials 

352 as-is. 

353 

354 Args: 

355 credentials (google.auth.credentials.Credentials): The credentials to 

356 scope if necessary. 

357 scopes (Sequence[str]): The list of scopes to use. 

358 default_scopes (Sequence[str]): Default scopes passed by a 

359 Google client library. Use 'scopes' for user-defined scopes. 

360 

361 Returns: 

362 google.auth.credentials.Credentials: Either a new set of scoped 

363 credentials, or the passed in credentials instance if no scoping 

364 was required. 

365 """ 

366 if isinstance(credentials, Scoped) and credentials.requires_scopes: 

367 return credentials.with_scopes(scopes, default_scopes=default_scopes) 

368 else: 

369 return credentials 

370 

371 

372@six.add_metaclass(abc.ABCMeta) 

373class Signing(object): 

374 """Interface for credentials that can cryptographically sign messages.""" 

375 

376 @abc.abstractmethod 

377 def sign_bytes(self, message): 

378 """Signs the given message. 

379 

380 Args: 

381 message (bytes): The message to sign. 

382 

383 Returns: 

384 bytes: The message's cryptographic signature. 

385 """ 

386 # pylint: disable=missing-raises-doc,redundant-returns-doc 

387 # (pylint doesn't recognize that this is abstract) 

388 raise NotImplementedError("Sign bytes must be implemented.") 

389 

390 @abc.abstractproperty 

391 def signer_email(self): 

392 """Optional[str]: An email address that identifies the signer.""" 

393 # pylint: disable=missing-raises-doc 

394 # (pylint doesn't recognize that this is abstract) 

395 raise NotImplementedError("Signer email must be implemented.") 

396 

397 @abc.abstractproperty 

398 def signer(self): 

399 """google.auth.crypt.Signer: The signer used to sign bytes.""" 

400 # pylint: disable=missing-raises-doc 

401 # (pylint doesn't recognize that this is abstract) 

402 raise NotImplementedError("Signer must be implemented.")