Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/oidc.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

142 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16API for retrieving OIDC tokens. 

17""" 

18 

19from __future__ import annotations 

20 

21import logging 

22import sys 

23import time 

24import urllib.parse 

25import webbrowser 

26from datetime import datetime, timezone 

27from typing import NoReturn, Optional, cast 

28 

29import id 

30import jwt 

31import requests 

32from pydantic import BaseModel, StrictStr 

33 

34from sigstore._internal import USER_AGENT 

35from sigstore.errors import Error, NetworkError 

36 

37DEFAULT_OAUTH_ISSUER_URL = "https://oauth2.sigstore.dev/auth" 

38STAGING_OAUTH_ISSUER_URL = "https://oauth2.sigstage.dev/auth" 

39 

40# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201 

41_KNOWN_OIDC_ISSUERS = { 

42 "https://accounts.google.com": "email", 

43 "https://oauth2.sigstore.dev/auth": "email", 

44 "https://oauth2.sigstage.dev/auth": "email", 

45 "https://token.actions.githubusercontent.com": "sub", 

46} 

47_DEFAULT_AUDIENCE = "sigstore" 

48 

49 

50class _OpenIDConfiguration(BaseModel): 

51 """ 

52 Represents a (subset) of the fields provided by an OpenID Connect provider's 

53 `.well-known/openid-configuration` response, as defined by OpenID Connect Discovery. 

54 

55 See: <https://openid.net/specs/openid-connect-discovery-1_0.html> 

56 """ 

57 

58 authorization_endpoint: StrictStr 

59 token_endpoint: StrictStr 

60 

61 

62class ExpiredIdentity(Exception): 

63 """An error raised when an identity token is expired.""" 

64 

65 

66class IdentityToken: 

67 """ 

68 An OIDC "identity", corresponding to an underlying OIDC token with 

69 a sensible subject, issuer, and audience for Sigstore purposes. 

70 """ 

71 

72 def __init__(self, raw_token: str) -> None: 

73 """ 

74 Create a new `IdentityToken` from the given OIDC token. 

75 """ 

76 

77 self._raw_token = raw_token 

78 

79 # NOTE: The lack of verification here is intentional, and is part of 

80 # Sigstore's verification model: clients like sigstore-python are 

81 # responsible only for forwarding the OIDC identity to Fulcio for 

82 # certificate binding and issuance. 

83 try: 

84 self._unverified_claims = jwt.decode( 

85 raw_token, 

86 options={ 

87 "verify_signature": False, 

88 "verify_aud": True, 

89 "verify_iat": True, 

90 "verify_exp": True, 

91 # These claims are required by OpenID Connect, so 

92 # we can strongly enforce their presence. 

93 # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken 

94 "require": ["aud", "sub", "iat", "exp", "iss"], 

95 }, 

96 audience=_DEFAULT_AUDIENCE, 

97 # NOTE: This leeway shouldn't be strictly necessary, but is 

98 # included to preempt any (small) skew between the host 

99 # and the originating IdP. 

100 leeway=5, 

101 ) 

102 except Exception as exc: 

103 raise IdentityError( 

104 "Identity token is malformed or missing claims" 

105 ) from exc 

106 

107 self._iss: str = self._unverified_claims["iss"] 

108 self._nbf: int | None = self._unverified_claims.get("nbf") 

109 self._exp: int = self._unverified_claims["exp"] 

110 

111 # Fail early if this token isn't within its validity period. 

112 if not self.in_validity_period(): 

113 raise IdentityError("Identity token is not within its validity period") 

114 

115 # When verifying the private key possession proof, Fulcio uses 

116 # different claims depending on the token's issuer. 

117 # We currently special-case a handful of these, and fall back 

118 # on signing the "sub" claim otherwise. 

119 identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer) 

120 if identity_claim is not None: 

121 if identity_claim not in self._unverified_claims: 

122 raise IdentityError( 

123 f"Identity token is missing the required {identity_claim!r} claim" 

124 ) 

125 

126 self._identity = str(self._unverified_claims.get(identity_claim)) 

127 else: 

128 try: 

129 self._identity = str(self._unverified_claims["sub"]) 

130 except KeyError: 

131 raise IdentityError( 

132 "Identity token is missing the required 'sub' claim" 

133 ) 

134 

135 # This identity token might have been retrieved directly from 

136 # an identity provider, or it might be a "federated" identity token 

137 # retrieved from a federated IdP (e.g., Sigstore's own Dex instance). 

138 # In the latter case, the claims will also include a `federated_claims` 

139 # set, which in turn should include a `connector_id` that reflects 

140 # the "real" token issuer. We retrieve this, despite technically 

141 # being an implementation detail, because it has value to client 

142 # users: a client might want to make sure that its user is identifying 

143 # with a *particular* IdP, which means that they need to pierce the 

144 # federation layer to check which IdP is actually being used. 

145 self._federated_issuer: str | None = None 

146 federated_claims = self._unverified_claims.get("federated_claims") 

147 if federated_claims is not None: 

148 if not isinstance(federated_claims, dict): 

149 raise IdentityError( 

150 "unexpected claim type: federated_claims is not a dict" 

151 ) 

152 

153 federated_issuer = federated_claims.get("connector_id") 

154 if federated_issuer is not None: 

155 if not isinstance(federated_issuer, str): 

156 raise IdentityError( 

157 "unexpected claim type: federated_claims.connector_id is not a string" 

158 ) 

159 

160 self._federated_issuer = federated_issuer 

161 

162 def in_validity_period(self) -> bool: 

163 """ 

164 Returns whether or not this `Identity` is currently within its self-stated validity period. 

165 

166 NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper; 

167 the check here only asserts whether the *unverified* identity's claims 

168 are within their validity period. 

169 """ 

170 

171 now = datetime.now(timezone.utc).timestamp() 

172 

173 if self._nbf is not None: 

174 return self._nbf <= now < self._exp 

175 else: 

176 return now < self._exp 

177 

178 @property 

179 def identity(self) -> str: 

180 """ 

181 Returns this `IdentityToken`'s underlying "subject". 

182 

183 Note that this is **not** always the `sub` claim in the corresponding 

184 identity token: depending onm the token's issuer, it may be a *different* 

185 claim, such as `email`. This corresponds to the Sigstore ecosystem's 

186 behavior, e.g. in each issued certificate's SAN. 

187 """ 

188 return self._identity 

189 

190 @property 

191 def issuer(self) -> str: 

192 """ 

193 Returns a URL identifying this `IdentityToken`'s issuer. 

194 """ 

195 return self._iss 

196 

197 @property 

198 def federated_issuer(self) -> str: 

199 """ 

200 Returns a URL identifying the **federated** issuer for any Sigstore 

201 certificate issued against this identity token. 

202 

203 The behavior of this field is slightly subtle: for non-federated 

204 identity providers (like a token issued directly by Google's IdP) it 

205 should be exactly equivalent to `IdentityToken.issuer`. For federated 

206 issuers (like Sigstore's own federated IdP) it should be equivalent to 

207 the underlying federated issuer's URL, which is kept in an 

208 implementation-defined claim. 

209 

210 This attribute exists so that clients who wish to inspect the expected 

211 underlying issuer of their certificates can do so without relying on 

212 implementation-specific behavior. 

213 """ 

214 if self._federated_issuer is not None: 

215 return self._federated_issuer 

216 

217 return self.issuer 

218 

219 def __str__(self) -> str: 

220 """ 

221 Returns the underlying OIDC token for this identity. 

222 

223 That this token is secret in nature and **MUST NOT** be disclosed. 

224 """ 

225 return self._raw_token 

226 

227 

228class IssuerError(Exception): 

229 """ 

230 Raised on any communication or format error with an OIDC issuer. 

231 """ 

232 

233 pass 

234 

235 

236class Issuer: 

237 """ 

238 Represents an OIDC issuer (IdP). 

239 """ 

240 

241 def __init__(self, base_url: str) -> None: 

242 """ 

243 Create a new `Issuer` from the given base URL. 

244 

245 This URL is used to locate an OpenID Connect configuration file, 

246 which is then used to bootstrap the issuer's state (such 

247 as authorization and token endpoints). 

248 """ 

249 self.session = requests.Session() 

250 self.session.headers.update({"User-Agent": USER_AGENT}) 

251 

252 oidc_config_url = urllib.parse.urljoin( 

253 f"{base_url}/", ".well-known/openid-configuration" 

254 ) 

255 

256 try: 

257 resp: requests.Response = self.session.get(oidc_config_url, timeout=30) 

258 except (requests.ConnectionError, requests.Timeout) as exc: 

259 raise NetworkError from exc 

260 

261 try: 

262 resp.raise_for_status() 

263 except requests.HTTPError as http_error: 

264 raise IssuerError from http_error 

265 

266 try: 

267 # We don't generally expect this to fail (since the provider should 

268 # return a non-success HTTP code which we catch above), but we 

269 # check just in case we have a misbehaving OIDC issuer. 

270 self.oidc_config = _OpenIDConfiguration.model_validate(resp.json()) 

271 except ValueError as exc: 

272 raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}") 

273 

274 @classmethod 

275 def production(cls) -> Issuer: 

276 """ 

277 Returns an `Issuer` configured against Sigstore's production-level services. 

278 """ 

279 return cls(DEFAULT_OAUTH_ISSUER_URL) 

280 

281 @classmethod 

282 def staging(cls) -> Issuer: 

283 """ 

284 Returns an `Issuer` configured against Sigstore's staging-level services. 

285 """ 

286 return cls(STAGING_OAUTH_ISSUER_URL) 

287 

288 def identity_token( # nosec: B107 

289 self, 

290 client_id: str = "sigstore", 

291 client_secret: str = "", 

292 force_oob: bool = False, 

293 ) -> IdentityToken: 

294 """ 

295 Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth. 

296 

297 This function blocks on user interaction. 

298 

299 The `force_oob` flag controls the kind of flow performed. When `False` (the default), 

300 this function attempts to open the user's web browser before falling back to 

301 an out-of-band flow. When `True`, the out-of-band flow is always used. 

302 """ 

303 

304 # This function and the components that it relies on are based off of: 

305 # https://github.com/psteniusubi/python-sample 

306 

307 from sigstore._internal.oidc.oauth import _OAuthFlow 

308 

309 code: str 

310 with _OAuthFlow(client_id, client_secret, self) as server: 

311 # Launch web browser 

312 if not force_oob and webbrowser.open(server.base_uri): 

313 print("Waiting for browser interaction...", file=sys.stderr) 

314 else: 

315 server.enable_oob() 

316 print( 

317 f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}", 

318 file=sys.stderr, 

319 ) 

320 

321 if not server.is_oob(): 

322 # Wait until the redirect server populates the response 

323 while server.auth_response is None: 

324 time.sleep(0.1) 

325 

326 auth_error = server.auth_response.get("error") 

327 if auth_error is not None: 

328 raise IdentityError( 

329 f"Error response from auth endpoint: {auth_error[0]}" 

330 ) 

331 code = server.auth_response["code"][0] 

332 else: 

333 # In the out-of-band case, we wait until the user provides the code 

334 code = input("Enter verification code: ") 

335 

336 # Provide code to token endpoint 

337 data = { 

338 "grant_type": "authorization_code", 

339 "redirect_uri": server.redirect_uri, 

340 "code": code, 

341 "code_verifier": server.oauth_session.code_verifier, 

342 } 

343 auth = ( 

344 client_id, 

345 client_secret, 

346 ) 

347 logging.debug(f"PAYLOAD: data={data}") 

348 try: 

349 resp = self.session.post( 

350 self.oidc_config.token_endpoint, 

351 data=data, 

352 auth=auth, 

353 timeout=30, 

354 ) 

355 except (requests.ConnectionError, requests.Timeout) as exc: 

356 raise NetworkError from exc 

357 

358 try: 

359 resp.raise_for_status() 

360 except requests.HTTPError as http_error: 

361 raise IdentityError( 

362 f"Token request failed with {resp.status_code}" 

363 ) from http_error 

364 

365 token_json = resp.json() 

366 token_error = token_json.get("error") 

367 if token_error is not None: 

368 raise IdentityError(f"Error response from token endpoint: {token_error}") 

369 

370 return IdentityToken(token_json["access_token"]) 

371 

372 

373class IdentityError(Error): 

374 """ 

375 Wraps `id`'s IdentityError. 

376 """ 

377 

378 @classmethod 

379 def raise_from_id(cls, exc: id.IdentityError) -> NoReturn: 

380 """Raises a wrapped IdentityError from the provided `id.IdentityError`.""" 

381 raise cls(str(exc)) from exc 

382 

383 def diagnostics(self) -> str: 

384 """Returns diagnostics for the error.""" 

385 if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError): 

386 return f""" 

387 Insufficient permissions for GitHub Actions workflow. 

388 

389 The most common reason for this is incorrect 

390 configuration of the top-level `permissions` setting of the 

391 workflow YAML file. It should be configured like so: 

392 

393 permissions: 

394 id-token: write 

395 

396 Relevant documentation here: 

397 

398 https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings 

399 

400 Another possible reason is that the workflow run has been 

401 triggered by a PR from a forked repository. PRs from forked 

402 repositories typically cannot be granted write access. 

403 

404 Relevant documentation here: 

405 

406 https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token 

407 

408 Additional context: 

409 

410 {self.__cause__} 

411 """ 

412 else: 

413 return f""" 

414 An issue occurred with ambient credential detection. 

415 

416 Additional context: 

417 

418 {self} 

419 """ 

420 

421 

422def detect_credential() -> Optional[str]: 

423 """Calls `id.detect_credential`, but wraps exceptions with our own exception type.""" 

424 try: 

425 return cast(Optional[str], id.detect_credential(_DEFAULT_AUDIENCE)) 

426 except id.IdentityError as exc: 

427 IdentityError.raise_from_id(exc)