Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/oidc.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16API for retrieving OIDC tokens.
17"""
19from __future__ import annotations
21import logging
22import sys
23import time
24import urllib.parse
25import webbrowser
26from datetime import datetime, timezone
27from typing import NoReturn, Optional, cast
29import id
30import jwt
31import requests
32from pydantic import BaseModel, StrictStr
34from sigstore._internal import USER_AGENT
35from sigstore.errors import Error, NetworkError
37DEFAULT_OAUTH_ISSUER_URL = "https://oauth2.sigstore.dev/auth"
38STAGING_OAUTH_ISSUER_URL = "https://oauth2.sigstage.dev/auth"
40# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201
41_KNOWN_OIDC_ISSUERS = {
42 "https://accounts.google.com": "email",
43 "https://oauth2.sigstore.dev/auth": "email",
44 "https://oauth2.sigstage.dev/auth": "email",
45 "https://token.actions.githubusercontent.com": "sub",
46}
47_DEFAULT_AUDIENCE = "sigstore"
50class _OpenIDConfiguration(BaseModel):
51 """
52 Represents a (subset) of the fields provided by an OpenID Connect provider's
53 `.well-known/openid-configuration` response, as defined by OpenID Connect Discovery.
55 See: <https://openid.net/specs/openid-connect-discovery-1_0.html>
56 """
58 authorization_endpoint: StrictStr
59 token_endpoint: StrictStr
62class ExpiredIdentity(Exception):
63 """An error raised when an identity token is expired."""
66class IdentityToken:
67 """
68 An OIDC "identity", corresponding to an underlying OIDC token with
69 a sensible subject, issuer, and audience for Sigstore purposes.
70 """
72 def __init__(self, raw_token: str) -> None:
73 """
74 Create a new `IdentityToken` from the given OIDC token.
75 """
77 self._raw_token = raw_token
79 # NOTE: The lack of verification here is intentional, and is part of
80 # Sigstore's verification model: clients like sigstore-python are
81 # responsible only for forwarding the OIDC identity to Fulcio for
82 # certificate binding and issuance.
83 try:
84 self._unverified_claims = jwt.decode(
85 raw_token,
86 options={
87 "verify_signature": False,
88 "verify_aud": True,
89 "verify_iat": True,
90 "verify_exp": True,
91 # These claims are required by OpenID Connect, so
92 # we can strongly enforce their presence.
93 # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken
94 "require": ["aud", "sub", "iat", "exp", "iss"],
95 },
96 audience=_DEFAULT_AUDIENCE,
97 # NOTE: This leeway shouldn't be strictly necessary, but is
98 # included to preempt any (small) skew between the host
99 # and the originating IdP.
100 leeway=5,
101 )
102 except Exception as exc:
103 raise IdentityError(
104 "Identity token is malformed or missing claims"
105 ) from exc
107 self._iss: str = self._unverified_claims["iss"]
108 self._nbf: int | None = self._unverified_claims.get("nbf")
109 self._exp: int = self._unverified_claims["exp"]
111 # Fail early if this token isn't within its validity period.
112 if not self.in_validity_period():
113 raise IdentityError("Identity token is not within its validity period")
115 # When verifying the private key possession proof, Fulcio uses
116 # different claims depending on the token's issuer.
117 # We currently special-case a handful of these, and fall back
118 # on signing the "sub" claim otherwise.
119 identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer)
120 if identity_claim is not None:
121 if identity_claim not in self._unverified_claims:
122 raise IdentityError(
123 f"Identity token is missing the required {identity_claim!r} claim"
124 )
126 self._identity = str(self._unverified_claims.get(identity_claim))
127 else:
128 try:
129 self._identity = str(self._unverified_claims["sub"])
130 except KeyError:
131 raise IdentityError(
132 "Identity token is missing the required 'sub' claim"
133 )
135 # This identity token might have been retrieved directly from
136 # an identity provider, or it might be a "federated" identity token
137 # retrieved from a federated IdP (e.g., Sigstore's own Dex instance).
138 # In the latter case, the claims will also include a `federated_claims`
139 # set, which in turn should include a `connector_id` that reflects
140 # the "real" token issuer. We retrieve this, despite technically
141 # being an implementation detail, because it has value to client
142 # users: a client might want to make sure that its user is identifying
143 # with a *particular* IdP, which means that they need to pierce the
144 # federation layer to check which IdP is actually being used.
145 self._federated_issuer: str | None = None
146 federated_claims = self._unverified_claims.get("federated_claims")
147 if federated_claims is not None:
148 if not isinstance(federated_claims, dict):
149 raise IdentityError(
150 "unexpected claim type: federated_claims is not a dict"
151 )
153 federated_issuer = federated_claims.get("connector_id")
154 if federated_issuer is not None:
155 if not isinstance(federated_issuer, str):
156 raise IdentityError(
157 "unexpected claim type: federated_claims.connector_id is not a string"
158 )
160 self._federated_issuer = federated_issuer
162 def in_validity_period(self) -> bool:
163 """
164 Returns whether or not this `Identity` is currently within its self-stated validity period.
166 NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper;
167 the check here only asserts whether the *unverified* identity's claims
168 are within their validity period.
169 """
171 now = datetime.now(timezone.utc).timestamp()
173 if self._nbf is not None:
174 return self._nbf <= now < self._exp
175 else:
176 return now < self._exp
178 @property
179 def identity(self) -> str:
180 """
181 Returns this `IdentityToken`'s underlying "subject".
183 Note that this is **not** always the `sub` claim in the corresponding
184 identity token: depending onm the token's issuer, it may be a *different*
185 claim, such as `email`. This corresponds to the Sigstore ecosystem's
186 behavior, e.g. in each issued certificate's SAN.
187 """
188 return self._identity
190 @property
191 def issuer(self) -> str:
192 """
193 Returns a URL identifying this `IdentityToken`'s issuer.
194 """
195 return self._iss
197 @property
198 def federated_issuer(self) -> str:
199 """
200 Returns a URL identifying the **federated** issuer for any Sigstore
201 certificate issued against this identity token.
203 The behavior of this field is slightly subtle: for non-federated
204 identity providers (like a token issued directly by Google's IdP) it
205 should be exactly equivalent to `IdentityToken.issuer`. For federated
206 issuers (like Sigstore's own federated IdP) it should be equivalent to
207 the underlying federated issuer's URL, which is kept in an
208 implementation-defined claim.
210 This attribute exists so that clients who wish to inspect the expected
211 underlying issuer of their certificates can do so without relying on
212 implementation-specific behavior.
213 """
214 if self._federated_issuer is not None:
215 return self._federated_issuer
217 return self.issuer
219 def __str__(self) -> str:
220 """
221 Returns the underlying OIDC token for this identity.
223 That this token is secret in nature and **MUST NOT** be disclosed.
224 """
225 return self._raw_token
228class IssuerError(Exception):
229 """
230 Raised on any communication or format error with an OIDC issuer.
231 """
233 pass
236class Issuer:
237 """
238 Represents an OIDC issuer (IdP).
239 """
241 def __init__(self, base_url: str) -> None:
242 """
243 Create a new `Issuer` from the given base URL.
245 This URL is used to locate an OpenID Connect configuration file,
246 which is then used to bootstrap the issuer's state (such
247 as authorization and token endpoints).
248 """
249 self.session = requests.Session()
250 self.session.headers.update({"User-Agent": USER_AGENT})
252 oidc_config_url = urllib.parse.urljoin(
253 f"{base_url}/", ".well-known/openid-configuration"
254 )
256 try:
257 resp: requests.Response = self.session.get(oidc_config_url, timeout=30)
258 except (requests.ConnectionError, requests.Timeout) as exc:
259 raise NetworkError from exc
261 try:
262 resp.raise_for_status()
263 except requests.HTTPError as http_error:
264 raise IssuerError from http_error
266 try:
267 # We don't generally expect this to fail (since the provider should
268 # return a non-success HTTP code which we catch above), but we
269 # check just in case we have a misbehaving OIDC issuer.
270 self.oidc_config = _OpenIDConfiguration.model_validate(resp.json())
271 except ValueError as exc:
272 raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}")
274 @classmethod
275 def production(cls) -> Issuer:
276 """
277 Returns an `Issuer` configured against Sigstore's production-level services.
278 """
279 return cls(DEFAULT_OAUTH_ISSUER_URL)
281 @classmethod
282 def staging(cls) -> Issuer:
283 """
284 Returns an `Issuer` configured against Sigstore's staging-level services.
285 """
286 return cls(STAGING_OAUTH_ISSUER_URL)
288 def identity_token( # nosec: B107
289 self,
290 client_id: str = "sigstore",
291 client_secret: str = "",
292 force_oob: bool = False,
293 ) -> IdentityToken:
294 """
295 Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth.
297 This function blocks on user interaction.
299 The `force_oob` flag controls the kind of flow performed. When `False` (the default),
300 this function attempts to open the user's web browser before falling back to
301 an out-of-band flow. When `True`, the out-of-band flow is always used.
302 """
304 # This function and the components that it relies on are based off of:
305 # https://github.com/psteniusubi/python-sample
307 from sigstore._internal.oidc.oauth import _OAuthFlow
309 code: str
310 with _OAuthFlow(client_id, client_secret, self) as server:
311 # Launch web browser
312 if not force_oob and webbrowser.open(server.base_uri):
313 print("Waiting for browser interaction...", file=sys.stderr)
314 else:
315 server.enable_oob()
316 print(
317 f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}",
318 file=sys.stderr,
319 )
321 if not server.is_oob():
322 # Wait until the redirect server populates the response
323 while server.auth_response is None:
324 time.sleep(0.1)
326 auth_error = server.auth_response.get("error")
327 if auth_error is not None:
328 raise IdentityError(
329 f"Error response from auth endpoint: {auth_error[0]}"
330 )
331 code = server.auth_response["code"][0]
332 else:
333 # In the out-of-band case, we wait until the user provides the code
334 code = input("Enter verification code: ")
336 # Provide code to token endpoint
337 data = {
338 "grant_type": "authorization_code",
339 "redirect_uri": server.redirect_uri,
340 "code": code,
341 "code_verifier": server.oauth_session.code_verifier,
342 }
343 auth = (
344 client_id,
345 client_secret,
346 )
347 logging.debug(f"PAYLOAD: data={data}")
348 try:
349 resp = self.session.post(
350 self.oidc_config.token_endpoint,
351 data=data,
352 auth=auth,
353 timeout=30,
354 )
355 except (requests.ConnectionError, requests.Timeout) as exc:
356 raise NetworkError from exc
358 try:
359 resp.raise_for_status()
360 except requests.HTTPError as http_error:
361 raise IdentityError(
362 f"Token request failed with {resp.status_code}"
363 ) from http_error
365 token_json = resp.json()
366 token_error = token_json.get("error")
367 if token_error is not None:
368 raise IdentityError(f"Error response from token endpoint: {token_error}")
370 return IdentityToken(token_json["access_token"])
373class IdentityError(Error):
374 """
375 Wraps `id`'s IdentityError.
376 """
378 @classmethod
379 def raise_from_id(cls, exc: id.IdentityError) -> NoReturn:
380 """Raises a wrapped IdentityError from the provided `id.IdentityError`."""
381 raise cls(str(exc)) from exc
383 def diagnostics(self) -> str:
384 """Returns diagnostics for the error."""
385 if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError):
386 return f"""
387 Insufficient permissions for GitHub Actions workflow.
389 The most common reason for this is incorrect
390 configuration of the top-level `permissions` setting of the
391 workflow YAML file. It should be configured like so:
393 permissions:
394 id-token: write
396 Relevant documentation here:
398 https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings
400 Another possible reason is that the workflow run has been
401 triggered by a PR from a forked repository. PRs from forked
402 repositories typically cannot be granted write access.
404 Relevant documentation here:
406 https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token
408 Additional context:
410 {self.__cause__}
411 """
412 else:
413 return f"""
414 An issue occurred with ambient credential detection.
416 Additional context:
418 {self}
419 """
422def detect_credential() -> Optional[str]:
423 """Calls `id.detect_credential`, but wraps exceptions with our own exception type."""
424 try:
425 return cast(Optional[str], id.detect_credential(_DEFAULT_AUDIENCE))
426 except id.IdentityError as exc:
427 IdentityError.raise_from_id(exc)