Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/oidc.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

136 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16API for retrieving OIDC tokens. 

17""" 

18 

19from __future__ import annotations 

20 

21import logging 

22import sys 

23import time 

24import urllib.parse 

25import webbrowser 

26from datetime import datetime, timezone 

27from typing import NoReturn 

28 

29import id 

30import jwt 

31import requests 

32from pydantic import BaseModel, StrictStr 

33 

34from sigstore._internal import USER_AGENT 

35from sigstore.errors import Error, NetworkError 

36 

37# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201 

38_KNOWN_OIDC_ISSUERS = { 

39 "https://accounts.google.com": "email", 

40 "https://oauth2.sigstore.dev/auth": "email", 

41 "https://oauth2.sigstage.dev/auth": "email", 

42 "https://token.actions.githubusercontent.com": "sub", 

43} 

44 

45_DEFAULT_CLIENT_ID = "sigstore" 

46 

47 

48class _OpenIDConfiguration(BaseModel): 

49 """ 

50 Represents a (subset) of the fields provided by an OpenID Connect provider's 

51 `.well-known/openid-configuration` response, as defined by OpenID Connect Discovery. 

52 

53 See: <https://openid.net/specs/openid-connect-discovery-1_0.html> 

54 """ 

55 

56 authorization_endpoint: StrictStr 

57 token_endpoint: StrictStr 

58 

59 

60class ExpiredIdentity(Exception): 

61 """An error raised when an identity token is expired.""" 

62 

63 

64class IdentityToken: 

65 """ 

66 An OIDC "identity", corresponding to an underlying OIDC token with 

67 a sensible subject, issuer, and audience for Sigstore purposes. 

68 """ 

69 

70 def __init__(self, raw_token: str, client_id: str = _DEFAULT_CLIENT_ID) -> None: 

71 """ 

72 Create a new `IdentityToken` from the given OIDC token. 

73 """ 

74 

75 self._raw_token = raw_token 

76 

77 # NOTE: The lack of verification here is intentional, and is part of 

78 # Sigstore's verification model: clients like sigstore-python are 

79 # responsible only for forwarding the OIDC identity to Fulcio for 

80 # certificate binding and issuance. 

81 try: 

82 self._unverified_claims = jwt.decode( 

83 raw_token, 

84 options={ 

85 "verify_signature": False, 

86 "verify_aud": True, 

87 "verify_iat": True, 

88 "verify_exp": True, 

89 # These claims are required by OpenID Connect, so 

90 # we can strongly enforce their presence. 

91 # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken 

92 "require": ["aud", "sub", "iat", "exp", "iss"], 

93 }, 

94 audience=client_id, 

95 # NOTE: This leeway shouldn't be strictly necessary, but is 

96 # included to preempt any (small) skew between the host 

97 # and the originating IdP. 

98 leeway=5, 

99 ) 

100 except Exception as exc: 

101 raise IdentityError( 

102 "Identity token is malformed or missing claims" 

103 ) from exc 

104 

105 self._iss: str = self._unverified_claims["iss"] 

106 self._nbf: int | None = self._unverified_claims.get("nbf") 

107 self._exp: int = self._unverified_claims["exp"] 

108 

109 # Fail early if this token isn't within its validity period. 

110 if not self.in_validity_period(): 

111 raise IdentityError("Identity token is not within its validity period") 

112 

113 # When verifying the private key possession proof, Fulcio uses 

114 # different claims depending on the token's issuer. 

115 # We currently special-case a handful of these, and fall back 

116 # on signing the "sub" claim otherwise. 

117 identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer) 

118 if identity_claim is not None: 

119 if identity_claim not in self._unverified_claims: 

120 raise IdentityError( 

121 f"Identity token is missing the required {identity_claim!r} claim" 

122 ) 

123 

124 self._identity = str(self._unverified_claims.get(identity_claim)) 

125 else: 

126 try: 

127 self._identity = str(self._unverified_claims["sub"]) 

128 except KeyError: 

129 raise IdentityError( 

130 "Identity token is missing the required 'sub' claim" 

131 ) 

132 

133 # This identity token might have been retrieved directly from 

134 # an identity provider, or it might be a "federated" identity token 

135 # retrieved from a federated IdP (e.g., Sigstore's own Dex instance). 

136 # In the latter case, the claims will also include a `federated_claims` 

137 # set, which in turn should include a `connector_id` that reflects 

138 # the "real" token issuer. We retrieve this, despite technically 

139 # being an implementation detail, because it has value to client 

140 # users: a client might want to make sure that its user is identifying 

141 # with a *particular* IdP, which means that they need to pierce the 

142 # federation layer to check which IdP is actually being used. 

143 self._federated_issuer: str | None = None 

144 federated_claims = self._unverified_claims.get("federated_claims") 

145 if federated_claims is not None: 

146 if not isinstance(federated_claims, dict): 

147 raise IdentityError( 

148 "unexpected claim type: federated_claims is not a dict" 

149 ) 

150 

151 federated_issuer = federated_claims.get("connector_id") 

152 if federated_issuer is not None: 

153 if not isinstance(federated_issuer, str): 

154 raise IdentityError( 

155 "unexpected claim type: federated_claims.connector_id is not a string" 

156 ) 

157 

158 self._federated_issuer = federated_issuer 

159 

160 def in_validity_period(self) -> bool: 

161 """ 

162 Returns whether or not this `Identity` is currently within its self-stated validity period. 

163 

164 NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper; 

165 the check here only asserts whether the *unverified* identity's claims 

166 are within their validity period. 

167 """ 

168 

169 now = datetime.now(timezone.utc).timestamp() 

170 

171 if self._nbf is not None: 

172 return self._nbf <= now < self._exp 

173 else: 

174 return now < self._exp 

175 

176 @property 

177 def identity(self) -> str: 

178 """ 

179 Returns this `IdentityToken`'s underlying "subject". 

180 

181 Note that this is **not** always the `sub` claim in the corresponding 

182 identity token: depending onm the token's issuer, it may be a *different* 

183 claim, such as `email`. This corresponds to the Sigstore ecosystem's 

184 behavior, e.g. in each issued certificate's SAN. 

185 """ 

186 return self._identity 

187 

188 @property 

189 def issuer(self) -> str: 

190 """ 

191 Returns a URL identifying this `IdentityToken`'s issuer. 

192 """ 

193 return self._iss 

194 

195 @property 

196 def federated_issuer(self) -> str: 

197 """ 

198 Returns a URL identifying the **federated** issuer for any Sigstore 

199 certificate issued against this identity token. 

200 

201 The behavior of this field is slightly subtle: for non-federated 

202 identity providers (like a token issued directly by Google's IdP) it 

203 should be exactly equivalent to `IdentityToken.issuer`. For federated 

204 issuers (like Sigstore's own federated IdP) it should be equivalent to 

205 the underlying federated issuer's URL, which is kept in an 

206 implementation-defined claim. 

207 

208 This attribute exists so that clients who wish to inspect the expected 

209 underlying issuer of their certificates can do so without relying on 

210 implementation-specific behavior. 

211 """ 

212 if self._federated_issuer is not None: 

213 return self._federated_issuer 

214 

215 return self.issuer 

216 

217 def __str__(self) -> str: 

218 """ 

219 Returns the underlying OIDC token for this identity. 

220 

221 That this token is secret in nature and **MUST NOT** be disclosed. 

222 """ 

223 return self._raw_token 

224 

225 

226class IssuerError(Exception): 

227 """ 

228 Raised on any communication or format error with an OIDC issuer. 

229 """ 

230 

231 pass 

232 

233 

234class Issuer: 

235 """ 

236 Represents an OIDC issuer (IdP). 

237 """ 

238 

239 def __init__(self, base_url: str) -> None: 

240 """ 

241 Create a new `Issuer` from the given base URL. 

242 

243 This URL is used to locate an OpenID Connect configuration file, 

244 which is then used to bootstrap the issuer's state (such 

245 as authorization and token endpoints). 

246 """ 

247 self.session = requests.Session() 

248 self.session.headers.update({"User-Agent": USER_AGENT}) 

249 

250 oidc_config_url = urllib.parse.urljoin( 

251 f"{base_url}/", ".well-known/openid-configuration" 

252 ) 

253 

254 try: 

255 resp: requests.Response = self.session.get(oidc_config_url, timeout=30) 

256 except (requests.ConnectionError, requests.Timeout) as exc: 

257 raise NetworkError from exc 

258 

259 try: 

260 resp.raise_for_status() 

261 except requests.HTTPError as http_error: 

262 raise IssuerError from http_error 

263 

264 try: 

265 # We don't generally expect this to fail (since the provider should 

266 # return a non-success HTTP code which we catch above), but we 

267 # check just in case we have a misbehaving OIDC issuer. 

268 self.oidc_config = _OpenIDConfiguration.model_validate(resp.json()) 

269 except ValueError as exc: 

270 raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}") 

271 

272 def identity_token( # nosec: B107 

273 self, 

274 client_id: str = _DEFAULT_CLIENT_ID, 

275 client_secret: str = "", 

276 force_oob: bool = False, 

277 redirect_port: int = 0, 

278 ) -> IdentityToken: 

279 """ 

280 Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth. 

281 

282 This function blocks on user interaction. 

283 

284 The `force_oob` flag controls the kind of flow performed. When `False` (the default), 

285 this function attempts to open the user's web browser before falling back to 

286 an out-of-band flow. When `True`, the out-of-band flow is always used. 

287 

288 The `redirect_port` parameter selects the port for the local OAuth redirect 

289 server. The default of `0` requests an ephemeral port from the OS. Set this 

290 when the OIDC provider requires a pre-registered redirect URI with a fixed 

291 port (some providers do not allow wildcards on `localhost`). 

292 """ 

293 

294 # This function and the components that it relies on are based off of: 

295 # https://github.com/psteniusubi/python-sample 

296 

297 from sigstore._internal.oidc.oauth import _OAuthFlow 

298 

299 code: str 

300 with _OAuthFlow( 

301 client_id, client_secret, self, redirect_port=redirect_port 

302 ) as server: 

303 # Launch web browser 

304 if not force_oob and webbrowser.open(server.base_uri): 

305 print("Waiting for browser interaction...", file=sys.stderr) 

306 else: 

307 server.enable_oob() 

308 print( 

309 f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}", 

310 file=sys.stderr, 

311 ) 

312 

313 if not server.is_oob(): 

314 # Wait until the redirect server populates the response 

315 while server.auth_response is None: 

316 time.sleep(0.1) 

317 

318 auth_error = server.auth_response.get("error") 

319 if auth_error is not None: 

320 raise IdentityError( 

321 f"Error response from auth endpoint: {auth_error[0]}" 

322 ) 

323 

324 if server.auth_response["state"][0] != server.oauth_session.state: 

325 raise IdentityError("OAuth state mismatch") 

326 

327 code = server.auth_response["code"][0] 

328 else: 

329 # In the out-of-band case, we wait until the user provides the code 

330 code = input("Enter verification code: ") 

331 

332 # Provide code to token endpoint 

333 data = { 

334 "grant_type": "authorization_code", 

335 "redirect_uri": server.redirect_uri, 

336 "code": code, 

337 "code_verifier": server.oauth_session.code_verifier, 

338 } 

339 auth = ( 

340 client_id, 

341 client_secret, 

342 ) 

343 logging.debug(f"PAYLOAD: data={data}") 

344 try: 

345 resp = self.session.post( 

346 self.oidc_config.token_endpoint, 

347 data=data, 

348 auth=auth, 

349 timeout=30, 

350 ) 

351 except (requests.ConnectionError, requests.Timeout) as exc: 

352 raise NetworkError from exc 

353 

354 try: 

355 resp.raise_for_status() 

356 except requests.HTTPError as http_error: 

357 raise IdentityError( 

358 f"Token request failed with {resp.status_code}" 

359 ) from http_error 

360 

361 token_json = resp.json() 

362 token_error = token_json.get("error") 

363 if token_error is not None: 

364 raise IdentityError(f"Error response from token endpoint: {token_error}") 

365 

366 return IdentityToken(token_json["access_token"], client_id) 

367 

368 

369class IdentityError(Error): 

370 """ 

371 Wraps `id`'s IdentityError. 

372 """ 

373 

374 @classmethod 

375 def raise_from_id(cls, exc: id.IdentityError) -> NoReturn: 

376 """Raises a wrapped IdentityError from the provided `id.IdentityError`.""" 

377 raise cls(str(exc)) from exc 

378 

379 def diagnostics(self) -> str: 

380 """Returns diagnostics for the error.""" 

381 if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError): 

382 return f""" 

383 Insufficient permissions for GitHub Actions workflow. 

384 

385 The most common reason for this is incorrect 

386 configuration of the top-level `permissions` setting of the 

387 workflow YAML file. It should be configured like so: 

388 

389 permissions: 

390 id-token: write 

391 

392 Relevant documentation here: 

393 

394 https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings 

395 

396 Another possible reason is that the workflow run has been 

397 triggered by a PR from a forked repository. PRs from forked 

398 repositories typically cannot be granted write access. 

399 

400 Relevant documentation here: 

401 

402 https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token 

403 

404 Additional context: 

405 

406 {self.__cause__} 

407 """ 

408 else: 

409 return f""" 

410 An issue occurred with ambient credential detection. 

411 

412 Additional context: 

413 

414 {self} 

415 """ 

416 

417 

418def detect_credential(client_id: str = _DEFAULT_CLIENT_ID) -> str | None: 

419 """Calls `id.detect_credential`, but wraps exceptions with our own exception type.""" 

420 

421 try: 

422 return id.detect_credential(client_id) 

423 except id.IdentityError as exc: 

424 IdentityError.raise_from_id(exc)