Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/oidc.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

134 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16API for retrieving OIDC tokens. 

17""" 

18 

19from __future__ import annotations 

20 

21import logging 

22import sys 

23import time 

24import urllib.parse 

25import webbrowser 

26from datetime import datetime, timezone 

27from typing import NoReturn, Optional, cast 

28 

29import id 

30import jwt 

31import requests 

32from pydantic import BaseModel, StrictStr 

33 

34from sigstore._internal import USER_AGENT 

35from sigstore.errors import Error, NetworkError 

36 

37# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201 

38_KNOWN_OIDC_ISSUERS = { 

39 "https://accounts.google.com": "email", 

40 "https://oauth2.sigstore.dev/auth": "email", 

41 "https://oauth2.sigstage.dev/auth": "email", 

42 "https://token.actions.githubusercontent.com": "sub", 

43} 

44 

45_DEFAULT_CLIENT_ID = "sigstore" 

46 

47 

48class _OpenIDConfiguration(BaseModel): 

49 """ 

50 Represents a (subset) of the fields provided by an OpenID Connect provider's 

51 `.well-known/openid-configuration` response, as defined by OpenID Connect Discovery. 

52 

53 See: <https://openid.net/specs/openid-connect-discovery-1_0.html> 

54 """ 

55 

56 authorization_endpoint: StrictStr 

57 token_endpoint: StrictStr 

58 

59 

60class ExpiredIdentity(Exception): 

61 """An error raised when an identity token is expired.""" 

62 

63 

64class IdentityToken: 

65 """ 

66 An OIDC "identity", corresponding to an underlying OIDC token with 

67 a sensible subject, issuer, and audience for Sigstore purposes. 

68 """ 

69 

70 def __init__(self, raw_token: str, client_id: str = _DEFAULT_CLIENT_ID) -> None: 

71 """ 

72 Create a new `IdentityToken` from the given OIDC token. 

73 """ 

74 

75 self._raw_token = raw_token 

76 

77 # NOTE: The lack of verification here is intentional, and is part of 

78 # Sigstore's verification model: clients like sigstore-python are 

79 # responsible only for forwarding the OIDC identity to Fulcio for 

80 # certificate binding and issuance. 

81 try: 

82 self._unverified_claims = jwt.decode( 

83 raw_token, 

84 options={ 

85 "verify_signature": False, 

86 "verify_aud": True, 

87 "verify_iat": True, 

88 "verify_exp": True, 

89 # These claims are required by OpenID Connect, so 

90 # we can strongly enforce their presence. 

91 # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken 

92 "require": ["aud", "sub", "iat", "exp", "iss"], 

93 }, 

94 audience=client_id, 

95 # NOTE: This leeway shouldn't be strictly necessary, but is 

96 # included to preempt any (small) skew between the host 

97 # and the originating IdP. 

98 leeway=5, 

99 ) 

100 except Exception as exc: 

101 raise IdentityError( 

102 "Identity token is malformed or missing claims" 

103 ) from exc 

104 

105 self._iss: str = self._unverified_claims["iss"] 

106 self._nbf: int | None = self._unverified_claims.get("nbf") 

107 self._exp: int = self._unverified_claims["exp"] 

108 

109 # Fail early if this token isn't within its validity period. 

110 if not self.in_validity_period(): 

111 raise IdentityError("Identity token is not within its validity period") 

112 

113 # When verifying the private key possession proof, Fulcio uses 

114 # different claims depending on the token's issuer. 

115 # We currently special-case a handful of these, and fall back 

116 # on signing the "sub" claim otherwise. 

117 identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer) 

118 if identity_claim is not None: 

119 if identity_claim not in self._unverified_claims: 

120 raise IdentityError( 

121 f"Identity token is missing the required {identity_claim!r} claim" 

122 ) 

123 

124 self._identity = str(self._unverified_claims.get(identity_claim)) 

125 else: 

126 try: 

127 self._identity = str(self._unverified_claims["sub"]) 

128 except KeyError: 

129 raise IdentityError( 

130 "Identity token is missing the required 'sub' claim" 

131 ) 

132 

133 # This identity token might have been retrieved directly from 

134 # an identity provider, or it might be a "federated" identity token 

135 # retrieved from a federated IdP (e.g., Sigstore's own Dex instance). 

136 # In the latter case, the claims will also include a `federated_claims` 

137 # set, which in turn should include a `connector_id` that reflects 

138 # the "real" token issuer. We retrieve this, despite technically 

139 # being an implementation detail, because it has value to client 

140 # users: a client might want to make sure that its user is identifying 

141 # with a *particular* IdP, which means that they need to pierce the 

142 # federation layer to check which IdP is actually being used. 

143 self._federated_issuer: str | None = None 

144 federated_claims = self._unverified_claims.get("federated_claims") 

145 if federated_claims is not None: 

146 if not isinstance(federated_claims, dict): 

147 raise IdentityError( 

148 "unexpected claim type: federated_claims is not a dict" 

149 ) 

150 

151 federated_issuer = federated_claims.get("connector_id") 

152 if federated_issuer is not None: 

153 if not isinstance(federated_issuer, str): 

154 raise IdentityError( 

155 "unexpected claim type: federated_claims.connector_id is not a string" 

156 ) 

157 

158 self._federated_issuer = federated_issuer 

159 

160 def in_validity_period(self) -> bool: 

161 """ 

162 Returns whether or not this `Identity` is currently within its self-stated validity period. 

163 

164 NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper; 

165 the check here only asserts whether the *unverified* identity's claims 

166 are within their validity period. 

167 """ 

168 

169 now = datetime.now(timezone.utc).timestamp() 

170 

171 if self._nbf is not None: 

172 return self._nbf <= now < self._exp 

173 else: 

174 return now < self._exp 

175 

176 @property 

177 def identity(self) -> str: 

178 """ 

179 Returns this `IdentityToken`'s underlying "subject". 

180 

181 Note that this is **not** always the `sub` claim in the corresponding 

182 identity token: depending onm the token's issuer, it may be a *different* 

183 claim, such as `email`. This corresponds to the Sigstore ecosystem's 

184 behavior, e.g. in each issued certificate's SAN. 

185 """ 

186 return self._identity 

187 

188 @property 

189 def issuer(self) -> str: 

190 """ 

191 Returns a URL identifying this `IdentityToken`'s issuer. 

192 """ 

193 return self._iss 

194 

195 @property 

196 def federated_issuer(self) -> str: 

197 """ 

198 Returns a URL identifying the **federated** issuer for any Sigstore 

199 certificate issued against this identity token. 

200 

201 The behavior of this field is slightly subtle: for non-federated 

202 identity providers (like a token issued directly by Google's IdP) it 

203 should be exactly equivalent to `IdentityToken.issuer`. For federated 

204 issuers (like Sigstore's own federated IdP) it should be equivalent to 

205 the underlying federated issuer's URL, which is kept in an 

206 implementation-defined claim. 

207 

208 This attribute exists so that clients who wish to inspect the expected 

209 underlying issuer of their certificates can do so without relying on 

210 implementation-specific behavior. 

211 """ 

212 if self._federated_issuer is not None: 

213 return self._federated_issuer 

214 

215 return self.issuer 

216 

217 def __str__(self) -> str: 

218 """ 

219 Returns the underlying OIDC token for this identity. 

220 

221 That this token is secret in nature and **MUST NOT** be disclosed. 

222 """ 

223 return self._raw_token 

224 

225 

226class IssuerError(Exception): 

227 """ 

228 Raised on any communication or format error with an OIDC issuer. 

229 """ 

230 

231 pass 

232 

233 

234class Issuer: 

235 """ 

236 Represents an OIDC issuer (IdP). 

237 """ 

238 

239 def __init__(self, base_url: str) -> None: 

240 """ 

241 Create a new `Issuer` from the given base URL. 

242 

243 This URL is used to locate an OpenID Connect configuration file, 

244 which is then used to bootstrap the issuer's state (such 

245 as authorization and token endpoints). 

246 """ 

247 self.session = requests.Session() 

248 self.session.headers.update({"User-Agent": USER_AGENT}) 

249 

250 oidc_config_url = urllib.parse.urljoin( 

251 f"{base_url}/", ".well-known/openid-configuration" 

252 ) 

253 

254 try: 

255 resp: requests.Response = self.session.get(oidc_config_url, timeout=30) 

256 except (requests.ConnectionError, requests.Timeout) as exc: 

257 raise NetworkError from exc 

258 

259 try: 

260 resp.raise_for_status() 

261 except requests.HTTPError as http_error: 

262 raise IssuerError from http_error 

263 

264 try: 

265 # We don't generally expect this to fail (since the provider should 

266 # return a non-success HTTP code which we catch above), but we 

267 # check just in case we have a misbehaving OIDC issuer. 

268 self.oidc_config = _OpenIDConfiguration.model_validate(resp.json()) 

269 except ValueError as exc: 

270 raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}") 

271 

272 def identity_token( # nosec: B107 

273 self, 

274 client_id: str = _DEFAULT_CLIENT_ID, 

275 client_secret: str = "", 

276 force_oob: bool = False, 

277 ) -> IdentityToken: 

278 """ 

279 Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth. 

280 

281 This function blocks on user interaction. 

282 

283 The `force_oob` flag controls the kind of flow performed. When `False` (the default), 

284 this function attempts to open the user's web browser before falling back to 

285 an out-of-band flow. When `True`, the out-of-band flow is always used. 

286 """ 

287 

288 # This function and the components that it relies on are based off of: 

289 # https://github.com/psteniusubi/python-sample 

290 

291 from sigstore._internal.oidc.oauth import _OAuthFlow 

292 

293 code: str 

294 with _OAuthFlow(client_id, client_secret, self) as server: 

295 # Launch web browser 

296 if not force_oob and webbrowser.open(server.base_uri): 

297 print("Waiting for browser interaction...", file=sys.stderr) 

298 else: 

299 server.enable_oob() 

300 print( 

301 f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}", 

302 file=sys.stderr, 

303 ) 

304 

305 if not server.is_oob(): 

306 # Wait until the redirect server populates the response 

307 while server.auth_response is None: 

308 time.sleep(0.1) 

309 

310 auth_error = server.auth_response.get("error") 

311 if auth_error is not None: 

312 raise IdentityError( 

313 f"Error response from auth endpoint: {auth_error[0]}" 

314 ) 

315 code = server.auth_response["code"][0] 

316 else: 

317 # In the out-of-band case, we wait until the user provides the code 

318 code = input("Enter verification code: ") 

319 

320 # Provide code to token endpoint 

321 data = { 

322 "grant_type": "authorization_code", 

323 "redirect_uri": server.redirect_uri, 

324 "code": code, 

325 "code_verifier": server.oauth_session.code_verifier, 

326 } 

327 auth = ( 

328 client_id, 

329 client_secret, 

330 ) 

331 logging.debug(f"PAYLOAD: data={data}") 

332 try: 

333 resp = self.session.post( 

334 self.oidc_config.token_endpoint, 

335 data=data, 

336 auth=auth, 

337 timeout=30, 

338 ) 

339 except (requests.ConnectionError, requests.Timeout) as exc: 

340 raise NetworkError from exc 

341 

342 try: 

343 resp.raise_for_status() 

344 except requests.HTTPError as http_error: 

345 raise IdentityError( 

346 f"Token request failed with {resp.status_code}" 

347 ) from http_error 

348 

349 token_json = resp.json() 

350 token_error = token_json.get("error") 

351 if token_error is not None: 

352 raise IdentityError(f"Error response from token endpoint: {token_error}") 

353 

354 return IdentityToken(token_json["access_token"], client_id) 

355 

356 

357class IdentityError(Error): 

358 """ 

359 Wraps `id`'s IdentityError. 

360 """ 

361 

362 @classmethod 

363 def raise_from_id(cls, exc: id.IdentityError) -> NoReturn: 

364 """Raises a wrapped IdentityError from the provided `id.IdentityError`.""" 

365 raise cls(str(exc)) from exc 

366 

367 def diagnostics(self) -> str: 

368 """Returns diagnostics for the error.""" 

369 if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError): 

370 return f""" 

371 Insufficient permissions for GitHub Actions workflow. 

372 

373 The most common reason for this is incorrect 

374 configuration of the top-level `permissions` setting of the 

375 workflow YAML file. It should be configured like so: 

376 

377 permissions: 

378 id-token: write 

379 

380 Relevant documentation here: 

381 

382 https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings 

383 

384 Another possible reason is that the workflow run has been 

385 triggered by a PR from a forked repository. PRs from forked 

386 repositories typically cannot be granted write access. 

387 

388 Relevant documentation here: 

389 

390 https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token 

391 

392 Additional context: 

393 

394 {self.__cause__} 

395 """ 

396 else: 

397 return f""" 

398 An issue occurred with ambient credential detection. 

399 

400 Additional context: 

401 

402 {self} 

403 """ 

404 

405 

406def detect_credential(client_id: str = _DEFAULT_CLIENT_ID) -> str | None: 

407 """Calls `id.detect_credential`, but wraps exceptions with our own exception type.""" 

408 

409 try: 

410 return cast(Optional[str], id.detect_credential(client_id)) 

411 except id.IdentityError as exc: 

412 IdentityError.raise_from_id(exc)